
photo credit: CoreBurn
twitterが先日stormというリアルタイムな分散処理フレームワークstormを公開しました。
(ここでいうリアルタイムとは、すぐに、とか連続的に、というニュアンスで使っています)
Stormは、連続的に来るデータに対して同じ処理を繰り返しかけて新たなストリームを作る、という目的のフレームワークです。
分散やメッセージの保証をフレームワークに任せて、SpoutとBoltという処理を書くだけでシステムを作ることができます。
updateも頻繁に行われていて、これから利用者が増えていくかもしれません。
分散処理フレームワークというとHadoopが有名ですが、Hadoopではまとまったデータを一気に処理するバッチ処理を、簡単に分散でき高速に処理することができるフレームワークでした。
一方Stormでは、連続的に到達するデータに対して、連続的に結果を出すような処理を、簡単に分散させて処理することができるフレームワークになります。
YahooのS4などが近いです。
もともとはtwitterが買収したbacktypeの技術になります。
twitterでは、例えば連続的発生するtweetをstream APIから読み込んで、現在のtrend topicを出す、というような処理に使われているようです。
2011/09/19に行われたstrange loopというイベントで発表して、オープンソースとして公開されました。
2011/09/19の公開時には ver.0.5.4でしたが、現在はver.0.6.2までアップデートされています。(2012/1/31現在)
関連する日本語の記事:
開発者向けの情報
関連しそうなスライド
Stormの概要をつかむには以下のスライドが分かりやすいと思います。(英語です)
Stormとはどのようなものか
stormでの処理はどのようなものか、どのように書けるのか、先程のwikiや以下のサイトに記述があるので、そこから簡単に紹介します。
Twitter Engineering: A Storm is coming: more details and plans for release
自分の理解は間違っている可能性があるので、上のサイトやスライドなどを読むことをおすすめします。
また、間違いがあれば教えていただけると幸いです。
Stormでは何ができるか?
- 1,stream processing
データストリームを処理して、データベースを更新するような処理。
キューとワーカーでやっていた処理を、耐故障性を持ちながらスケーラブルに行うことができます。
- 2,Continuout computation
continuousなクエリを行い、その結果をリアルタイムにクライアントにstreamすることができる。
twitterのトレンドトピックをストリームするようなもの。ブラウザからリアルタイムにtrend topicが見れる
- 3, Distributed RPC
動作中のStormに対して、任意のクエリ投げて並行して処理させることも可能です。
Stormはクエリメッセージを待ち受け、クライアントにその結果を返します。
stormの処理
Stormの構成はHadoopに似ています。HadoopにおけるMapreduce jobに対応するのが、Stormでは Topology になります。TopologiesはMapReduce job と大きく異なるが、一番の違いは、topologiesには終わりがないということ(ユーザが止めるまで動き続ける)。
Stormのノードにはmasterノードとworkerノードがあり、masterノードは Nimbus と呼ばれます(hadoopにおけるjob tracker)。

Nimbusはworkerへタスクを割り振り、workerのモニタリングを行います。
それぞれのworkerノードでは、Supervisorというデーモンを動いています。SupervisorはNimbusからworkがアサインされるのを待ち受け、workerプロセスのスタート、ストップを行います。それぞれのworkerプロセスはTopologyのサブセットを実行する。
NimbusとSupervisor間の調停にはzookeeper を利用します。
NimbusとSupervisorのデーモンはステートレスで、ステートはzookeeperやローカルディスクに保持されます。
そのため、workerプロセスをkill -9 しても自動的に立ち上がり動作することができます。これにより、stormクラスタはstableとなっています。
Stream と Topologies
Streamとは途切れずに連続するタプルのことで、例えば連続的なtweetのようなものです。
Stormはstreamを、新たなストリームへ、分散して信頼できる方法で変換します。例えばtweetのstreamを、トレンドトピックのストリームへ変換します。
streamの変換のために、stormはSpoutとBoltという2つの機能を提供します。

spoutはストリームの源であり、spoutからStormの処理が開始されます。
spoutの役割は、例えばキューからタプルを読み込んでstreamを生成したり、twitterAPI からtweetのstreamを作成したりする処理になります。
Boltは1回のstream変換処理になります。
複雑な変換(tweetsのstreamからtrend topicのstreamを作成する処理のように) の場合、複数のBoltでその処理を実現することになります。
Spoutと、Boltで構成される複数段階のstream変換は、Topologyというpackageにまとめられる。そのtopologyをstormのクラスタに渡すことで処理される。
Topologyはstreamの変化のグラフであり、それぞれのノードがspoutかboltとなっている。図の矢印は、どのBoltがどのstreamを使うか(subscribe)を示している。
stormではこれらの処理が分散して平行に行われる。
tupleは、中間的な状態にはならず、tupleを生成したthreadから、そのtupleを利用するthreadへ直接送られる。
Wordcountの例
Topologyの定義
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("MySpout", new KestrelSpout("kestrel.backtype.com",
22133,
"sentence_queue",
new StringScheme()));
builder.setBolt("SplitBolt", new SplitSentence(), 10)
.shuffleGrouping("MySpout");
builder.setBolt("WordCountBolt", new WordCount(), 20)
.fieldsGrouping("SplitBolt", new Fields("word"));
spoutはtopologyにsetSpoutメソッドからユニークなID(上の例では”MySpout”)を割り当てられて追加されます。
topologyの全てのノード(spoutやbolt)は必ずIDが割り当てられ、そのIDを用いてboltからoutput streamがsubscribeされます。
boltの追加はsetBoltを用います。
Topologyにおける個々のboltは以下のように書けます。
Boltの例(SplitSentencce)
public class SplitSentence implements IBasicBolt {
public void prepare(Map conf, TopologyContext context) {
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
これは、文章を入力として、それぞれの単語をtupleとして出力していくBoltの例になります。
executeメソッドで、タプルを受け取って、タプルを生成する処理を行なっています。
また、boltは他の言語でも書くこともできます。
pythonの例
import storm
class SplitSentenceBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
ToplogyにおけるsetBoltメソッドのの最後の引数は、boltの処理をどれだけ並列するかを示します。
上の例に出てきたsplitsentenceでは並列数が10なので、クラスタ内で10のthreadで平行して処理されます。
topologyをスケールしたい場合、 ボトルネックとなる処理の、この値を大きくすればよい。
setBoltメソッドは、宣言したinputのオブジェクトを返します。
上記の例のSplitSentence boltは、id”MySpout”のoutput streamを、shuffle groupingを用いてsubscribeしています。
この場合、id=”MySpout”とは、KestrelSpoutであるので、kestrelSpoutを参照します。
ポイントは、KestrelSpoutから出力される全てのtupleを、splitsentenctBoltが消費する、ということです。
boltは複数のinput streamをsubscribeできるので、下のようにすれば複数のstreamを合わせるような処理ができます。
builder.setBolt(4, new MyBolt(), 12)
.shuffleGrouping(1)
.shuffleGrouping(2)
.fieldsGrouping(3, new Fields("id1", "id2"));
この後にもWordCountの説明や複数のstream処理などの説明もありますが省略します。
Stormがやってくれること
- メッセージの処理の保証
- ロバストな処理の管理
- 故障検知と再割当て
- 効率的なmessage passing
- Local ModeとDistributed Modeを提供
メッセージ処理をどのように保証するか、は以下のページに詳しく書いてあります。
故障検知については以下のページに書いてあります。
Stormに似たその他の技術
StormはCEP(“Complex Event Processing” systems)です。
そのようなシステムは、他にもEsperやS4(yahooが作ったやつ)があります。
S4と似ているが、S4より優れているところとして、Stormでは故障があってもメッセージの処理が保証されるとのこと。
まとめ
Stormは、連続的に来るデータに対して同じ処理を繰り返しかけて新たなストリームを作る、という目的のフレームワークです。
分散などをフレームワークに任せて、SpoutとBoltという処理を書くだけでシステムを作ることができます。
updateも頻繁に行われていて、これから利用者が増えていくかもしれません。
ここに書いていないことも、上で紹介したサイトに行けばいろいろ書いてあります。
AWS向けのパッケージなどもあります。
興味がある人は是非読んでみてください。
Tags:
storm