Subcribe via RSS

twitter, google, facebookにおけるデータ処理に関する記事

2011/12/10 | Posted in cloud, web

ちょっと最近調べてたことのまとめ。
twitter, google, facebookの技術の話を見てました。
またいいのがあれば追加していきます。








Tags: , , ,

Stormをlocalmodeで実行する

2011/11/18 | Posted in cloud, storm

stormについては

を参照してください。

今回は
の続き。

localmodeで実行ということは、stormのnimbusやsupervisorといった分散のための仕組みを利用せずに、とりあえずstormのプログラムを動かしてみる、というものです。


storm-starterのダウンロード


サンプルコードをダウンロードします。
nathanmarz/storm-starter – GitHub
$ git clone git://github.com/nathanmarz/storm-starter.git

eclipseに読み込む

前回作成途中だったJava projectにサンプルのプログラムをimportします。

projectの src に import > general > FileSystem で先ほどダウンロードした storm-starterのしたの storm-starter/src/jvm/storm を”From Directory”に指定して、”Create Top-level folder”にチェックを入れます。

次に、project のトップに storm-starter/multilang/resoucesをimportします。



こんな感じになるはず。

Storm starter

ここでエラーが出ている場合、stormが最新版じゃない可能性があります。


JARをexport

そして、このprojectをJARファイルに固めます。

Export > Java > JAR fileから、StormStarterプロジェクトを.classpathや.projectを除いてJARにします。


warningが出るけど気にしない。


実行

今作成したJARファイルをstormで実行します。
前回の準備で正しくpathが通っていればstormコマンドが利用できるはずです。
(うまく動かない場合はpermissionなどを確認してください)
$ storm jar StormStarter.jar storm.starter.ExclamationTopology

これで、ダーッと文字が出て、したのようにビックリマークがついた単語が出ていれば成功です。
ExclamationBoltのemitのログが表示されています。
....
11367 [Thread-25] INFO  backtype.storm.daemon.task  - Emitting: class storm.starter.ExclamationTopology$ExclamationBolt source: 2:3, stream: 1, id: {}, [golda!!!]
....

twitterのstreamを使うTopologyは例えば、
$ storm jar StormStarter.jar storm.starter.PrintSampleStream [username] [password]
のようなものがあります。

今回使ったstormコマンドは、
storm jar [jarファイル] [メインクラス] (引数…)
のようにjarを指定してTopologyの実行を行いました。

他のTopologyも同様に動くと思いますが、wordcountは環境によってはうまく動作しないかもしれません。


以上で、localmodeでstormのプログラムを動かすことが確認できました。


参考




Tags: ,

EclipseでStormの開発環境を作る

2011/11/17 | Posted in cloud, storm

Stormはtwitterが公開しているリアルタイム分散処理フレームワークです。
以前のエントリを参照ください。

twitterが発表したリアルタイム分散処理フレームワークStorm | tjun memo


今回は、その開発環境を作るメモ。

0. 準備

EclipseとJavaは入れて、Javaのpathは通してください。
$ echo $JAVA_HOME                                                        ~/work/storm
/System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home


1. stormのダウンロード


https://github.com/nathanmarz/storm/downloads から最新のstormをダウンロードして、zipを解凍してください。(2011/2/3の安定verは0.6.2)

そして、storm/binにpathを通します。

$ wget https://github.com/downloads/nathanmarz/storm/storm-0.6.2.zip --no-check-certificate
$ unzip storm-0.6.2.zip
$ sudo cp -R storm-0.6.2 /usr/local/
$ sudo ln -s /usr/local/storm-0.6.2 /usr/local/storm
$ export STORM_HOME=/usr/local/storm
$ export PATH=$PATH:$STORM_HOME/bin


2. twitter4jのダウンロード


Storm-starterのtwitter関連のサンプルを実行する場合は、Twitter4j(2.2.5+)をダウンロードします。他のサンプルもあるので、とりあえず飛ばしても大丈夫です。
http://twitter4j.org/en/index.html
これも解凍してください。

twitter4j/lib以下のJARファイルをstormから利用するため、JARをstormのlibディレクトリに置きます。
$ sudo cp twitter4j/lib/*.jar /usr/local/storm-0.5.4/lib/


3. Eclipseでプロジェクトの作成


stormのプログラムのために、設定を行います。

  • 3-1.
    新規のJava projectを作成

  • 3-2.
    Project Libraryの”Add External JARsに、先程ダウンロードしたstorm以下にあるJARを入れます。
    必要なのは storm/storm-*.*.*.jarstorm/lib/以下の全てのJAR になります。

  • 3-3.
    twitter関連のサンプルを実行する場合、同じくJARをlibraryに追加します。
    先ほどダウンロードして解凍した twitter4j/lib/以下の twitter4j-core-2.2.5.jartwitter4j-stream-2.2.5.jar を追加。



準備は以上です。

次回はstormのサンプルを実行する部分を説明します。

参考




Tags:

twitterが発表したリアルタイム分散処理フレームワークStorm

2011/10/2 | Posted in cloud, storm


Creative Commons License photo credit: CoreBurn

twitterが先日stormというリアルタイムな分散処理フレームワークstormを公開しました。
(ここでいうリアルタイムとは、すぐに、とか連続的に、というニュアンスで使っています)
Stormは、連続的に来るデータに対して同じ処理を繰り返しかけて新たなストリームを作る、という目的のフレームワークです。
分散やメッセージの保証をフレームワークに任せて、SpoutとBoltという処理を書くだけでシステムを作ることができます。
updateも頻繁に行われていて、これから利用者が増えていくかもしれません。

分散処理フレームワークというとHadoopが有名ですが、Hadoopではまとまったデータを一気に処理するバッチ処理を、簡単に分散でき高速に処理することができるフレームワークでした。
一方Stormでは、連続的に到達するデータに対して、連続的に結果を出すような処理を、簡単に分散させて処理することができるフレームワークになります。
YahooのS4などが近いです。

もともとはtwitterが買収したbacktypeの技術になります。
twitterでは、例えば連続的発生するtweetをstream APIから読み込んで、現在のtrend topicを出す、というような処理に使われているようです。
2011/09/19に行われたstrange loopというイベントで発表して、オープンソースとして公開されました。
2011/09/19の公開時には ver.0.5.4でしたが、現在はver.0.6.2までアップデートされています。(2012/1/31現在)

関連する日本語の記事:



開発者向けの情報



関連しそうなスライド

Stormの概要をつかむには以下のスライドが分かりやすいと思います。(英語です)

Stormとはどのようなものか

stormでの処理はどのようなものか、どのように書けるのか、先程のwikiや以下のサイトに記述があるので、そこから簡単に紹介します。
Twitter Engineering: A Storm is coming: more details and plans for release

自分の理解は間違っている可能性があるので、上のサイトやスライドなどを読むことをおすすめします。
また、間違いがあれば教えていただけると幸いです。


Stormでは何ができるか?

  • 1,stream processing
  • データストリームを処理して、データベースを更新するような処理。
    キューとワーカーでやっていた処理を、耐故障性を持ちながらスケーラブルに行うことができます。

  • 2,Continuout computation
  • continuousなクエリを行い、その結果をリアルタイムにクライアントにstreamすることができる。
    twitterのトレンドトピックをストリームするようなもの。ブラウザからリアルタイムにtrend topicが見れる

  • 3, Distributed RPC
  • 動作中のStormに対して、任意のクエリ投げて並行して処理させることも可能です。
    Stormはクエリメッセージを待ち受け、クライアントにその結果を返します。


stormの処理

Stormの構成はHadoopに似ています。HadoopにおけるMapreduce jobに対応するのが、Stormでは Topology になります。TopologiesはMapReduce job と大きく異なるが、一番の違いは、topologiesには終わりがないということ(ユーザが止めるまで動き続ける)。

Stormのノードにはmasterノードとworkerノードがあり、masterノードは Nimbus と呼ばれます(hadoopにおけるjob tracker)。

Nimbusはworkerへタスクを割り振り、workerのモニタリングを行います。
それぞれのworkerノードでは、Supervisorというデーモンを動いています。SupervisorはNimbusからworkがアサインされるのを待ち受け、workerプロセスのスタート、ストップを行います。それぞれのworkerプロセスはTopologyのサブセットを実行する。

NimbusとSupervisor間の調停にはzookeeper を利用します。
NimbusとSupervisorのデーモンはステートレスで、ステートはzookeeperやローカルディスクに保持されます。
そのため、workerプロセスをkill -9 しても自動的に立ち上がり動作することができます。これにより、stormクラスタはstableとなっています。


Stream と Topologies

Streamとは途切れずに連続するタプルのことで、例えば連続的なtweetのようなものです。
Stormはstreamを、新たなストリームへ、分散して信頼できる方法で変換します。例えばtweetのstreamを、トレンドトピックのストリームへ変換します。

streamの変換のために、stormはSpoutとBoltという2つの機能を提供します。

spoutはストリームの源であり、spoutからStormの処理が開始されます。
spoutの役割は、例えばキューからタプルを読み込んでstreamを生成したり、twitterAPI からtweetのstreamを作成したりする処理になります。

Boltは1回のstream変換処理になります。
複雑な変換(tweetsのstreamからtrend topicのstreamを作成する処理のように) の場合、複数のBoltでその処理を実現することになります。
Spoutと、Boltで構成される複数段階のstream変換は、Topologyというpackageにまとめられる。そのtopologyをstormのクラスタに渡すことで処理される。

Topologyはstreamの変化のグラフであり、それぞれのノードがspoutかboltとなっている。図の矢印は、どのBoltがどのstreamを使うか(subscribe)を示している。
stormではこれらの処理が分散して平行に行われる。
tupleは、中間的な状態にはならず、tupleを生成したthreadから、そのtupleを利用するthreadへ直接送られる。

Wordcountの例


Topologyの定義
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("MySpout", new KestrelSpout("kestrel.backtype.com",
                                      22133,
                                      "sentence_queue",
                                      new StringScheme()));

builder.setBolt("SplitBolt", new SplitSentence(), 10)
  .shuffleGrouping("MySpout");

builder.setBolt("WordCountBolt", new WordCount(), 20)
  .fieldsGrouping("SplitBolt", new Fields("word"));

spoutはtopologyにsetSpoutメソッドからユニークなID(上の例では”MySpout”)を割り当てられて追加されます。
topologyの全てのノード(spoutやbolt)は必ずIDが割り当てられ、そのIDを用いてboltからoutput streamがsubscribeされます。
boltの追加はsetBoltを用います。


Topologyにおける個々のboltは以下のように書けます。
Boltの例(SplitSentencce)
public class SplitSentence implements IBasicBolt {
  
  public void prepare(Map conf, TopologyContext context) {
  }

  public void execute(Tuple tuple, BasicOutputCollector collector) {
    String sentence = tuple.getString(0);
    for(String word: sentence.split(" ")) {
      collector.emit(new Values(word));
    }
  }

  public void cleanup() {
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }
}
これは、文章を入力として、それぞれの単語をtupleとして出力していくBoltの例になります。
executeメソッドで、タプルを受け取って、タプルを生成する処理を行なっています。


また、boltは他の言語でも書くこともできます。
pythonの例
import storm

class SplitSentenceBolt(storm.BasicBolt):
    def process(self, tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])


ToplogyにおけるsetBoltメソッドのの最後の引数は、boltの処理をどれだけ並列するかを示します。
上の例に出てきたsplitsentenceでは並列数が10なので、クラスタ内で10のthreadで平行して処理されます。
topologyをスケールしたい場合、 ボトルネックとなる処理の、この値を大きくすればよい。

setBoltメソッドは、宣言したinputのオブジェクトを返します。
上記の例のSplitSentence boltは、id”MySpout”のoutput streamを、shuffle groupingを用いてsubscribeしています。
この場合、id=”MySpout”とは、KestrelSpoutであるので、kestrelSpoutを参照します。
ポイントは、KestrelSpoutから出力される全てのtupleを、splitsentenctBoltが消費する、ということです。

boltは複数のinput streamをsubscribeできるので、下のようにすれば複数のstreamを合わせるような処理ができます。
builder.setBolt(4, new MyBolt(), 12)
    .shuffleGrouping(1)
    .shuffleGrouping(2)
    .fieldsGrouping(3, new Fields("id1", "id2"));


この後にもWordCountの説明や複数のstream処理などの説明もありますが省略します。

Stormがやってくれること

  • メッセージの処理の保証
  • ロバストな処理の管理
  • 故障検知と再割当て
  • 効率的なmessage passing
  • Local ModeとDistributed Modeを提供
メッセージ処理をどのように保証するか、は以下のページに詳しく書いてあります。

故障検知については以下のページに書いてあります。



Stormに似たその他の技術

StormはCEP(“Complex Event Processing” systems)です。
そのようなシステムは、他にもEsperやS4(yahooが作ったやつ)があります。
S4と似ているが、S4より優れているところとして、Stormでは故障があってもメッセージの処理が保証されるとのこと。

まとめ

Stormは、連続的に来るデータに対して同じ処理を繰り返しかけて新たなストリームを作る、という目的のフレームワークです。
分散などをフレームワークに任せて、SpoutとBoltという処理を書くだけでシステムを作ることができます。
updateも頻繁に行われていて、これから利用者が増えていくかもしれません。

ここに書いていないことも、上で紹介したサイトに行けばいろいろ書いてあります。
AWS向けのパッケージなどもあります。
興味がある人は是非読んでみてください。




Tags:

googleの講演聞いた

2010/7/4 | Posted in cloud, Diary

大学で講演があったので、聞きに行ってみた。google_tech_talk

2010/07/02

“Practical and Simple Parallel Computation in the Cloud”

by Grzegorz Malewicz

内容は、まあタイトルの通りで、大量のデータにどう向かっていくか、という話。
基本的なところから話すので、知ってる内容も結構あった。Big tableとかMapReduceとかPageRankとか。あとはParallel computingの基本的な話とか。

よく分からなかったのが、
の話。英語力が足りなかったのかもしれない。
これを使うとPageRankの実装が15行でできるらしい。

まあまとめると、googleのアプローチとしては、
不安定なperformanceや failure は起きるものなので、それを考慮して設計。

scalable reliable software on unreliable hardware

これに尽きる。

あと面白かったのがベンチマークの話で、

1PBのデータ(10^13のレコード、それぞれが100bytes)を
- 4000台のマルチコアのマシン
- 48,000のハードドライブ
でソートするのに6h 2m

ベンチマークに使うデータが1PBっていうサイズなのはさすがです。

あとは質疑応答
  • map reduceにcast できない問題はあるのか?
  • 少し手を加えれば、多くの問題はcastできる。新たなモデルも考えている。
  • street viewなどでも同じデータストレージシステムを使っている?
  • 分からないが、たぶん違う。
    big tableはデータの変換などを行うことに特化したデータ構造で、street viewなどのように書き込んだらあとは参照するだけの情報の場合、違うデータアーキテクチャがベターだろう。
  • 処理が多少速くなっても、結局ネットワークがボトルネックになるのでは?
  • データをいかに集めておくかが重要で、またネットワークトポロジーなども工夫している。
  • pregelは利用可能な実装はある?
  • オープンソースで現在の実装はない。論文に説明がある。
合ってるか分かりません。

あとGoogleは飯がうまくて優秀な仲間がいていい環境だよ、と宣伝してました。




Tags: ,
  • 書いてる人:

    Junichiro Takagi
    高木潤一郎

    http://tjun.jp
    mail@tjun.jp

    未踏でクリエータやってました。メーカーでエンジニアやってます。
    このブログは、会社とは関係なく個人の意見を書いています。

    RSS:


  • friendfeed: