Apache Stormのサンプルソースを読む

storm-ui-2

前回までで、Raspberry Pi 2台の上でサンプルソース(”storm-starter”)を実行させるところまではできたので、今回は肝心のその中身について理解していきたいと思います。

これらのサイトがとても参考になります。

というか、上記のページさえ見れば自分のページはいらないんじゃないかとも思うのですが、あくまで自分の勉強用なので、興味のある方のみ、お付き合いください。

前回実行したサンプルは”ExclamationTopology”だったので、これのソースを見てみることにします。

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package storm.starter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.util.Map;

/**
 * This is a basic example of a Storm topology.
 */
public class ExclamationTopology {

  public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
      _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }

  }

  public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("word", new TestWordSpout(), 10);
    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");

    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
    }
    else {

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("test", conf, builder.createTopology());
      Utils.sleep(10000);
      cluster.killTopology("test");
      cluster.shutdown();
    }
  }
}

Topology(SpoutとBoltからなるネットワーク構造)を作って、Spoutをセットして、Boltをセットして、Configを作って、引数があればSubmitして分散環境で実行、なければローカルでクラスタを作って実行する、以上。Topologyの構築自体はとても簡単なようですが、一個一個の用語からちゃんと見ていかないとなんとも言えないですね。

TopologyはSpoutとBoltからなるネットワーク構造(グラフ)。このTopologyをmain関数で定義して、依存関係を含めてソースを一つのjarにまとめて、”$ storm jar”のコマンドでNimbusにSubmitしてやると、リアルタイム処理がスタートする。”$ storm jar”コマンンドが、クライアントPCとNimbusの接続、およびNimbusへのjarファイルのアップロードの面倒を見てくれる。

Streamは、連続するTupleのこと。Tupleとは、Stormで処理されるメッセージを保持するデータモデルのこと。Streamを変化させていくために用意されているものが、SpoutとBolt。自分でStormアプリケーションを書くということは、SpoutとBoltにどういうロジックを書くか、ということに等しい。

Spoutは、Tupleを送りだすもの。何かを読み込んだ結果をTupleとしてStreamに流すこともあれば、APIで取得した結果をTupleとしてStreamに流すこともある。

Boltは任意の数のStreamを受け付けて、処理し、新たなStreamを作ることができる。Boltでは関数の実行や、Tupleのフィルタリング、Streamの統合、Databaseとの会話など、いろいろできる。

SpoutとBoltは、自分をsubscribeしているすべてのBoltに対して、Tupleを送り出すことができる。

Topologyにおける各ノード(= Spout & Bolt)は、並列に実行される。各ノードをどれだけ並行に実施したいかは、自分で設定することができる。

Topology内の各ノードは、自身が送り出すTupleを宣言しなければならない。これに相当しているのが、上記のサンプルソースのExclamationTopologyクラス。つまり、ExclamationBoltノードは、入力されたTupleの最初のフィールドの値に対して、”!!!”をつけて、新たに”word”というフィールドを持つTupleを送り出している。

ExclamationTopologyクラスはBaseRichBoltを継承しているが、これはBoltがシャットダウンされたときの処理を記述するcleanupメソッドと、設定用のgetComponentConfigurationメソッドの記述が不要なときに、Boltの記述を簡潔にするために使う。これらのメソッドを記述する必要がある場合は、IRichBoltインタフェースを実装する。

ちなみに、setSpoutでTopologyにセットされているTestWordSpoutの実装も見てみると、同じく”word”というフィールドを持つTupleを送り出してる。この送り出しは、nextTupleメソッドが呼ばれるたびに発生するようだけれど。。。このメソッドを呼ぶのは誰なんだろう?Stormのフレームワークが定期的に呼びつけるもので、自分でSleepの待ち時間を設定するものなのか。。。すみません、よくわかりません。

 

あとはStreamのグルーピングというものもありますが、これはWordCountTopologyのサンプルを見た方が良さそうなので、ここでは省略します。

 

今のところ、Stormの雰囲気はとりあえずわかった、というぐらいです。あとは実際にどんなユースケースに適用できるかとかを自分で考えながらやっていくしかないかな。