尝试使用Apache Kafka/Storm进行协作

到目前为止,我们已经对Apache Storm进行了测试。

    • Mac OS X環境で、Apache Stormの基本動作を試してみる

 

    Apache StormのScala版Boltサンプルを試してみる
Qiita-kafkaStorm連携図.001.jpeg

创建一个 Kafka 操作环境

我們將要在Mac OS X 環境下試試看Apache Kafka的基本操作,步驟基本上與以前的文章「在Mac OS X環境中試試看Apache Kafka的基本操作」幾乎相同。我們將使用獨立環境。

    kafka本体をインストールします。
$ brew install Caskroom/cask/java
$ brew install kafka
    kafka設定ファイル編集します。
$ vi /usr/local/etc/kafka/server.properties
  ...
broker.id=1
  ...
    zookeeperを起動します。
$ zkServer start
JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
    kafkaを起動します。
$ kafka-server-start.sh /usr/local/etc/kafka/server.properties
    topic: “kafkaStorm”を作成します。そして、topic”kafkaStorm”が正しく作成されたことを確認します。
$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaStorm
Created topic "kafkaStorm".

$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafkaStorm
Topic:kafkaStorm    PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: kafkaStorm   Partition: 0    Leader: 1   Replicas: 1 Isr: 1
    kaka-pythonライブラリをインストールします。
$ pip install kafka-python==0.9.4
    KafkaのProducer側サンプルアプリ”kafkaStorm_producer.py”を配備します。
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer
from time import sleep
from datetime import datetime

kafka = KafkaClient("localhost:9092")

producer = SimpleProducer(kafka)

while 1:
  # "kafkaStorm" is the name of our topic
  now = "It is " + str(datetime.now().time())
  print now
  producer.send_messages("kafkaStorm", now )
  sleep(1)

⬛︎ 创造一个暴风雨的活动环境

以前的文章”在Mac OS X环境下尝试Apache Storm的基本运行”几乎是相同的步骤。

    storm本体を”/usr/local/storm”にインストールします。
$ brew install wget
$ cd /usr/local/
$ wget http://ftp.jaist.ac.jp/pub/apache/storm/apache-storm-0.10.0/apache-storm-0.10.0.tar.gz

$ tar zxvf apache-storm-0.10.0.tar.gz
$ ln -s apache-storm-0.10.0 storm
$ rm apache-storm-0.10.0.tar.gz
    storm環境設定を行います。(インストール先に応じて、”storm.local.dir”を変更します)
$ cd storm/conf
$ vi storm.yaml

...(snip)

storm.zookeeper.servers:
  - "127.0.0.1"

storm.local.dir: "/usr/local/storm"

nimbus.host: "127.0.0.1"

supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
    stormコマンドが簡易に起動できるように、パス設定を行っておきます。
$ vi $HOME/.profile

...(snip)

export PATH=$PATH:/usr/local/storm/bin

“Nimbus”を起動します。

$ storm nimbus
Running: java -server -Ddaemon.name=nimbus -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/storm/conf -Xmx1024m -Dlogfile.name=nimbus.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.daemon.nimbus

“Supervisor”を起動します。

$ storm supervisor
Running: java -server -Ddaemon.name=supervisor -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/storm/conf -Xmx256m -Dlogfile.name=supervisor.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.daemon.supervisor

“UI”を起動します。

$ storm ui
Running: java -server -Ddaemon.name=ui -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/apache-storm-0.10.0:/usr/local/storm/conf -Xmx768m -Dlogfile.name=ui.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.ui.core
    Strom Supervisorで動作させる”ExclamationTopology”環境を作成します。sbtコマンドを用いたビルド処理が完了したことを確認しておきます。
$ git clone https://github.com/ttsubo/scala-storm-starter.git
$ cd scala-storm-starter/
$ git checkout KafkaSpout
$ sbt compile package assembly
$ ls -l target/scala-2.11/
total 34928
drwxr-xr-x  3 ttsubo  staff       102  2 19 18:26 cache
drwxr-xr-x  3 ttsubo  staff       102  2 20 13:29 classes
-rw-r--r--  1 ttsubo  staff  17874229  2 20 17:28 scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar
-rw-r--r--  1 ttsubo  staff      7597  2 20 17:28 scala-storm-starter_2.11-0.0.2-SNAPSHOT.jar
drwxr-xr-x  2 ttsubo  staff        68  2 20 08:58 test-classes

我会在本地模式下尝试使用”ExclamationTopology”。

我们来确认一下使用Apache Kafka/Storm的流处理的运行图像。这只是一个简单的用户应用程序,每秒通知一次当前时间,所以怀疑是否可以称之为流处理…

    先ほど配備した、KafkaのProducer側サンプルアプリ”kafkaStorm_producer.py”を起動します。
$ python kafkaStorm_producer.py 
It is 09:36:12.434021
It is 09:36:13.457584
It is 09:36:14.460644
It is 09:36:15.464444
It is 09:36:16.469271
It is 09:36:17.472175
It is 09:36:18.478534
It is 09:36:19.485352
It is 09:36:20.489528
It is 09:36:21.491859
It is 09:36:22.498474
It is 09:36:23.505260
It is 09:36:24.507176
It is 09:36:25.513852

... (snip)
    • 続いて、先ほど、”ExclamationTopology”環境を作成したディレクトリに移動して、”ExclamationTopology”をローカルモードで起動します。

 

    kafkaのtopic”kafkaStorm”に保管されたストリームデータを、Storm側のSpout:word-spoutが取得して、そのあと、Bolt:exclaim1および、Bolt:exclaim2では、Tupleを受け取るごとに、”!!!”の文字列を追加していきます。
$ storm jar target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar storm.starter.topology.ExclamationTopology

...(snip)
7202 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout default [It is 09:36:12.434021]
7203 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 4 TUPLE: source: word-spout:7, stream: default, id: {-2736707202019925397=8535453313544767994}, [It is 09:36:12.434021]
7203 [Thread-24-exclaim1] INFO  b.s.d.executor - Processing received message FOR 4 TUPLE: source: word-spout:7, stream: default, id: {-2736707202019925397=8535453313544767994}, [It is 09:36:12.434021]
7204 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout __ack_init [-2736707202019925397 8535453313544767994 7]
7204 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-2736707202019925397 8535453313544767994 7]
7204 [Thread-24-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [It is 09:36:12.434021!!!]
7204 [Thread-16-__acker] INFO  b.s.d.executor - Processing received message FOR 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-2736707202019925397 8535453313544767994 7]
7205 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout default [It is 09:36:13.457584]
7205 [Thread-16-__acker] INFO  b.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-2736707202019925397 8535453313544767994 7]
7205 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: word-spout:7, stream: default, id: {-7111756359690180959=8707010670879263031}, [It is 09:36:13.457584]
7205 [Thread-16-__acker] INFO  b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: __ack_init, id: {}, [-2736707202019925397 8535453313544767994 7] TASK: 1 DELTA: 
7205 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout __ack_init [-7111756359690180959 8707010670879263031 7]
7205 [Thread-10-exclaim1] INFO  b.s.d.executor - Processing received message FOR 2 TUPLE: source: word-spout:7, stream: default, id: {-7111756359690180959=8707010670879263031}, [It is 09:36:13.457584]
7205 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-7111756359690180959 8707010670879263031 7]
7205 [Thread-10-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [It is 09:36:13.457584!!!]
7205 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout default [It is 09:36:14.460644]
7205 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word-spout:7, stream: default, id: {6911314124032831513=2770314375009569340}, [It is 09:36:14.460644]
7205 [Thread-10-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:2, stream: default, id: {-7111756359690180959=-7992097948249303649}, [It is 09:36:13.457584!!!]
7205 [Thread-16-__acker] INFO  b.s.d.executor - Processing received message FOR 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-7111756359690180959 8707010670879263031 7]
7206 [Thread-10-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 __ack_ack [-7111756359690180959 -1602198103229490008]
7206 [Thread-16-__acker] INFO  b.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-7111756359690180959 8707010670879263031 7]
7206 [Thread-16-__acker] INFO  b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: __ack_init, id: {}, [-7111756359690180959 8707010670879263031 7] TASK: 1 DELTA: 
7206 [Thread-10-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim1:2, stream: __ack_ack, id: {}, [-7111756359690180959 -1602198103229490008]
7206 [Thread-14-exclaim1] INFO  b.s.d.executor - Processing received message FOR 3 TUPLE: source: word-spout:7, stream: default, id: {6911314124032831513=2770314375009569340}, [It is 09:36:14.460644]
7206 [Thread-10-exclaim1] INFO  b.s.d.executor - BOLT ack TASK: 2 TIME:  TUPLE: source: word-spout:7, stream: default, id: {-7111756359690180959=8707010670879263031}, [It is 09:36:13.457584]
7206 [Thread-14-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [It is 09:36:14.460644!!!]
7206 [Thread-10-exclaim1] INFO  b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: default, id: {-7111756359690180959=8707010670879263031}, [It is 09:36:13.457584] TASK: 2 DELTA: 
7206 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout __ack_init [6911314124032831513 2770314375009569340 7]
7206 [Thread-18-exclaim2] INFO  b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:2, stream: default, id: {-7111756359690180959=-7992097948249303649}, [It is 09:36:13.457584!!!]
7206 [Thread-14-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:3, stream: default, id: {6911314124032831513=6305416489060296620}, [It is 09:36:14.460644!!!]
7206 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [6911314124032831513 2770314375009569340 7]
7207 [Thread-18-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [It is 09:36:13.457584!!!!!!]
7207 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout default [It is 09:36:15.464444]
7207 [Thread-18-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 __ack_ack [-7111756359690180959 -7992097948249303649]
7207 [Thread-14-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 __ack_ack [6911314124032831513 8211035036717084048]
7207 [Thread-14-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim1:3, stream: __ack_ack, id: {}, [6911314124032831513 8211035036717084048]
7207 [Thread-18-exclaim2] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [-7111756359690180959 -7992097948249303649]
7207 [Thread-16-__acker] INFO  b.s.d.executor - Processing received message FOR 1 TUPLE: source: exclaim1:2, stream: __ack_ack, id: {}, [-7111756359690180959 -1602198103229490008]
7207 [Thread-14-exclaim1] INFO  b.s.d.executor - BOLT ack TASK: 3 TIME:  TUPLE: source: word-spout:7, stream: default, id: {6911314124032831513=2770314375009569340}, [It is 09:36:14.460644]
7207 [Thread-18-exclaim2] INFO  b.s.d.executor - BOLT ack TASK: 6 TIME:  TUPLE: source: exclaim1:2, stream: default, id: {-7111756359690180959=-7992097948249303649}, [It is 09:36:13.457584!!!]
7207 [Thread-16-__acker] INFO  b.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: exclaim1:2, stream: __ack_ack, id: {}, [-7111756359690180959 -1602198103229490008]
7207 [Thread-14-exclaim1] INFO  b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: default, id: {6911314124032831513=2770314375009569340}, [It is 09:36:14.460644] TASK: 3 DELTA: 
7207 [Thread-18-exclaim2] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: default, id: {-7111756359690180959=-7992097948249303649}, [It is 09:36:13.457584!!!] TASK: 6 DELTA: 
7207 [Thread-16-__acker] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: __ack_ack, id: {}, [-7111756359690180959 -1602198103229490008] TASK: 1 DELTA: 
7207 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 4 TUPLE: source: word-spout:7, stream: default, id: {4141217291662411057=-1021888700141355268}, [It is 09:36:15.464444]
7207 [Thread-18-exclaim2] INFO  b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:3, stream: default, id: {6911314124032831513=6305416489060296620}, [It is 09:36:14.460644!!!]
7208 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout __ack_init [4141217291662411057 -1021888700141355268 7]
7207 [Thread-16-__acker] INFO  b.s.d.executor - Processing received message FOR 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [6911314124032831513 2770314375009569340 7]
7208 [Thread-18-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [It is 09:36:14.460644!!!!!!]
7208 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [4141217291662411057 -1021888700141355268 7]
7208 [Thread-16-__acker] INFO  b.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [6911314124032831513 2770314375009569340 7]
7208 [Thread-18-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 __ack_ack [6911314124032831513 6305416489060296620]
7208 [Thread-16-__acker] INFO  b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: __ack_init, id: {}, [6911314124032831513 2770314375009569340 7] TASK: 1 DELTA: 
7208 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout default [It is 09:36:16.469271]
7208 [Thread-16-__acker] INFO  b.s.d.executor - Processing received message FOR 1 TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [-7111756359690180959 -7992097948249303649]
7208 [Thread-18-exclaim2] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [6911314124032831513 6305416489060296620]
7208 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: word-spout:7, stream: default, id: {3052868074563147181=3745382070922932756}, [It is 09:36:16.469271]
7208 [Thread-18-exclaim2] INFO  b.s.d.executor - BOLT ack TASK: 6 TIME:  TUPLE: source: exclaim1:3, stream: default, id: {6911314124032831513=6305416489060296620}, [It is 09:36:14.460644!!!]
7209 [Thread-16-__acker] INFO  b.s.d.task - Emitting direct: 7; __acker __ack_ack [-7111756359690180959]
7209 [Thread-18-exclaim2] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:3, stream: default, id: {6911314124032831513=6305416489060296620}, [It is 09:36:14.460644!!!] TASK: 6 DELTA: 
7209 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout __ack_init [3052868074563147181 3745382070922932756 7]
7209 [Thread-10-exclaim1] INFO  b.s.d.executor - Processing received message FOR 2 TUPLE: source: word-spout:7, stream: default, id: {3052868074563147181=3745382070922932756}, [It is 09:36:16.469271]
7209 [Thread-16-__acker] INFO  b.s.d.executor - TRANSFERING tuple TASK: 7 TUPLE: source: __acker:1, stream: __ack_ack, id: {}, [-7111756359690180959]
7209 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [3052868074563147181 3745382070922932756 7]
7209 [Thread-16-__acker] INFO  b.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [-7111756359690180959 -7992097948249303649]
7209 [Thread-10-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [It is 09:36:16.469271!!!]
7209 [Thread-12-word-spout] INFO  b.s.d.executor - Processing received message FOR 7 TUPLE: source: __acker:1, stream: __ack_ack, id: {}, [-7111756359690180959]
7209 [Thread-10-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 5 TUPLE: source: exclaim1:2, stream: default, id: {3052868074563147181=-6638282129905823771}, [It is 09:36:16.469271!!!]
7209 [Thread-16-__acker] INFO  b.s.d.executor - Execute done TUPLE source: exclaim2:6, stream: __ack_ack, id: {}, [-7111756359690180959 -7992097948249303649] TASK: 1 DELTA: 
7209 [Thread-10-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 __ack_ack [3052868074563147181 -8063043445652341263]
7209 [Thread-16-__acker] INFO  b.s.d.executor - Processing received message FOR 1 TUPLE: source: exclaim1:3, stream: __ack_ack, id: {}, [6911314124032831513 8211035036717084048]
7209 [Thread-10-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim1:2, stream: __ack_ack, id: {}, [3052868074563147181 -8063043445652341263]
7209 [Thread-16-__acker] INFO  b.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: exclaim1:3, stream: __ack_ack, id: {}, [6911314124032831513 8211035036717084048]
7209 [Thread-10-exclaim1] INFO  b.s.d.executor - BOLT ack TASK: 2 TIME:  TUPLE: source: word-spout:7, stream: default, id: {3052868074563147181=3745382070922932756}, [It is 09:36:16.469271]
7209 [Thread-12-word-spout] INFO  b.s.d.executor - SPOUT Acking message -7111756359690180959 storm.kafka.PartitionManager$KafkaMessageId@2e10053a
7209 [Thread-10-exclaim1] INFO  b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: default, id: {3052868074563147181=3745382070922932756}, [It is 09:36:16.469271] TASK: 2 DELTA: 
7209 [Thread-16-__acker] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:3, stream: __ack_ack, id: {}, [6911314124032831513 8211035036717084048] TASK: 1 DELTA: 
7210 [Thread-16-__acker] INFO  b.s.d.executor - Processing received message FOR 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [4141217291662411057 -1021888700141355268 7]
7210 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout default [It is 09:36:17.472175]
7210 [Thread-22-exclaim2] INFO  b.s.d.executor - Processing received message FOR 5 TUPLE: source: exclaim1:2, stream: default, id: {3052868074563147181=-6638282129905823771}, [It is 09:36:16.469271!!!]
7210 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word-spout:7, stream: default, id: {-7091949305103491056=3997162068754701889}, [It is 09:36:17.472175]
7210 [Thread-16-__acker] INFO  b.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [4141217291662411057 -1021888700141355268 7]
7210 [Thread-22-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [It is 09:36:16.469271!!!!!!]
7210 [Thread-16-__acker] INFO  b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: __ack_init, id: {}, [4141217291662411057 -1021888700141355268 7] TASK: 1 DELTA: 
7210 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout __ack_init [-7091949305103491056 3997162068754701889 7]
7210 [Thread-22-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 __ack_ack [3052868074563147181 -6638282129905823771]
7211 [Thread-16-__acker] INFO  b.s.d.executor - Processing received message FOR 1 TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [6911314124032831513 6305416489060296620]
7211 [Thread-24-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:4, stream: default, id: {-2736707202019925397=8147493952586947714}, [It is 09:36:12.434021!!!]
7211 [Thread-22-exclaim2] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim2:5, stream: __ack_ack, id: {}, [3052868074563147181 -6638282129905823771]
7211 [Thread-16-__acker] INFO  b.s.d.task - Emitting direct: 7; __acker __ack_ack [6911314124032831513]
7211 [Thread-24-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 __ack_ack [-2736707202019925397 533028969185990008]
7211 [Thread-22-exclaim2] INFO  b.s.d.executor - BOLT ack TASK: 5 TIME:  TUPLE: source: exclaim1:2, stream: default, id: {3052868074563147181=-6638282129905823771}, [It is 09:36:16.469271!!!]
7211 [Thread-14-exclaim1] INFO  b.s.d.executor - Processing received message FOR 3 TUPLE: source: word-spout:7, stream: default, id: {-7091949305103491056=3997162068754701889}, [It is 09:36:17.472175]
7211 [Thread-16-__acker] INFO  b.s.d.executor - TRANSFERING tuple TASK: 7 TUPLE: source: __acker:1, stream: __ack_ack, id: {}, [6911314124032831513]
7211 [Thread-18-exclaim2] INFO  b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:4, stream: default, id: {-2736707202019925397=8147493952586947714}, [It is 09:36:12.434021!!!]
7211 [Thread-22-exclaim2] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: default, id: {3052868074563147181=-6638282129905823771}, [It is 09:36:16.469271!!!] TASK: 5 DELTA: 
7211 [Thread-18-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [It is 09:36:12.434021!!!!!!]

...(snip)

非常难懂, 但在word-spout中处理了[It is 09:36:12.434021],然后在exclaim1中处理了[It is 09:36:12.434021!!!],再然后在exclaim2中处理了[It is 09:36:12.434021!!!!!!],可以确认这个过程。

以远程模式尝试“ExclamationTopology”。

    • 引き続き、”ExclamationTopology”をリモートモードで動作させてみます。

 

    今回は、トポロジ名として、”ExclamationTopology”としてTopogyを動作させました。
$ storm jar target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar storm.starter.topology.ExclamationTopology ExclamationTopology
Running: java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar:/usr/local/storm/conf:/usr/local/apache-storm-0.10.0/bin -Dstorm.jar=target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar storm.starter.topology.ExclamationTopology ExclamationTopology
438  [main] INFO  b.s.u.Utils - Using defaults.yaml from resources
495  [main] INFO  b.s.u.Utils - Using storm.yaml from resources
521  [main] INFO  b.s.u.Utils - Using defaults.yaml from resources
529  [main] INFO  b.s.u.Utils - Using storm.yaml from resources
531  [main] INFO  b.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -6085723166361804975:-8023199696530217358
532  [main] INFO  b.s.s.a.AuthUtils - Got AutoCreds []
548  [main] INFO  b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
561  [main] INFO  b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
580  [main] INFO  b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
586  [main] INFO  b.s.StormSubmitter - Uploading topology jar target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar to assigned location: /usr/local/storm/nimbus/inbox/stormjar-e9d58ebd-0ec2-49df-8745-fcf9f931dd47.jar
733  [main] INFO  b.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /usr/local/storm/nimbus/inbox/stormjar-e9d58ebd-0ec2-49df-8745-fcf9f931dd47.jar
733  [main] INFO  b.s.StormSubmitter - Submitting topology ExclamationTopology in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-6085723166361804975:-8023199696530217358","topology.workers":3,"topology.debug":true}
889  [main] INFO  b.s.StormSubmitter - Finished submitting topology: ExclamationTopology
Topology_summary.png
Topology_detail.png
TopologyVisualization.png
    最後に、リモートモードで動作中の、”ExclamationTopology”トポロジを停止させます。
$ storm kill ExclamationTopology
Running: java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/storm/conf:/usr/local/apache-storm-0.10.0/bin backtype.storm.command.kill_topology ExclamationTopology
1157 [main] INFO  b.s.u.Utils - Using defaults.yaml from resources
1215 [main] INFO  b.s.u.Utils - Using storm.yaml from resources
1651 [main] INFO  b.s.u.Utils - Using defaults.yaml from resources
1661 [main] INFO  b.s.u.Utils - Using storm.yaml from resources
1672 [main] INFO  b.s.thrift - Connecting to Nimbus at 127.0.0.1:6627 as user: 
1672 [main] INFO  b.s.u.Utils - Using defaults.yaml from resources
1678 [main] INFO  b.s.u.Utils - Using storm.yaml from resources
1691 [main] INFO  b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
1736 [main] INFO  b.s.c.kill-topology - Killed topology: ExclamationTopology

⬛︎ 对于Apache Kafka/Storm的协同使用进行简要解释

在Apache Kafka/Storm联合中,若要在Storm端从Kafka主题“kafkaStorm”上消费流数据,需要在ExclamationTopology中定义KafkaSpout方法的运行条件。顺便提一下,KafkaSpout方法可以在这个Github仓库(Github: apache/storm)中找到。
此外,为了能够将KafkaSpout的运行条件存储在Zookeeper的分布式键值存储中,还需要进行与ZooKeeper相关的配置。

package storm.starter.topology

import backtype.storm.{ Config, LocalCluster, StormSubmitter }
import backtype.storm.testing.TestWordSpout
import backtype.storm.topology.TopologyBuilder
import backtype.storm.utils.Utils
import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts, StringScheme}
import backtype.storm.spout.SchemeAsMultiScheme

object ExclamationTopology {
  def main(args: Array[String]) {
    import storm.starter.bolt.ExclamationBolt

    val builder: TopologyBuilder = new TopologyBuilder()

    val topic = "kafkaStorm"
    val kafkaZkConnect = "127.0.0.1:2181"
    val zkHosts = new ZkHosts(kafkaZkConnect, "/brokers")
    val zkRoot = "/kafkastorm"
    val zkSpoutId = "kafka-spout"
    val kafkaConfig = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId)
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    val kafkaSpout = new KafkaSpout(kafkaConfig)
    val kafkaSpoutId = "word-spout"
    builder.setSpout(kafkaSpoutId, kafkaSpout)

    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping(kafkaSpoutId)
    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1")

    val config = new Config()
    config.setDebug(true)

    if (args != null && args.length > 0) {
      config.setNumWorkers(3)
      StormSubmitter.submitTopology(args(0), config, builder.createTopology())
    } else {
      val cluster: LocalCluster = new LocalCluster()
      cluster.submitTopology("ExclamationTopology", config, builder.createTopology())
      Utils.sleep(5000)
      cluster.killTopology("ExclamationTopology")
      cluster.shutdown()
    }
  }
}

⬛︎ 结束

我设法确认了使用Apache Kafka/Storm协同处理的流处理操作的工作原理。然而,由于涉及到多种Apache Kafka/Storm协同处理的技术要素,因此技术的掌握似乎并不容易。在这里没有提及,但在Apache Kafka/Storm协同处理中,也需要了解到Apache ZooKeeper的技术知识。
此外,我发现要想实现Apache Kafka/Storm协同处理集群运维,需要掌握的技术知识将变得非常庞大。希望能够出版一本涵盖这些内容的解释手册。
这次就到这里。

根据原文

    • Storm Kafka Integration

 

    • Integrating Kafka and Storm: Code Examples and State of the Game

 

    https://github.com/apache/storm/tree/master/external/storm-kafka
bannerAds