尝试使用Apache Kafka/Storm进行协作
到目前为止,我们已经对Apache Storm进行了测试。
-
- Mac OS X環境で、Apache Stormの基本動作を試してみる
- Apache StormのScala版Boltサンプルを試してみる

创建一个 Kafka 操作环境
我們將要在Mac OS X 環境下試試看Apache Kafka的基本操作,步驟基本上與以前的文章「在Mac OS X環境中試試看Apache Kafka的基本操作」幾乎相同。我們將使用獨立環境。
- kafka本体をインストールします。
$ brew install Caskroom/cask/java
$ brew install kafka
- kafka設定ファイル編集します。
$ vi /usr/local/etc/kafka/server.properties
...
broker.id=1
...
- zookeeperを起動します。
$ zkServer start
JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
- kafkaを起動します。
$ kafka-server-start.sh /usr/local/etc/kafka/server.properties
- topic: “kafkaStorm”を作成します。そして、topic”kafkaStorm”が正しく作成されたことを確認します。
$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaStorm
Created topic "kafkaStorm".
$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafkaStorm
Topic:kafkaStorm PartitionCount:1 ReplicationFactor:1 Configs:
Topic: kafkaStorm Partition: 0 Leader: 1 Replicas: 1 Isr: 1
- kaka-pythonライブラリをインストールします。
$ pip install kafka-python==0.9.4
- KafkaのProducer側サンプルアプリ”kafkaStorm_producer.py”を配備します。
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer
from time import sleep
from datetime import datetime
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
while 1:
# "kafkaStorm" is the name of our topic
now = "It is " + str(datetime.now().time())
print now
producer.send_messages("kafkaStorm", now )
sleep(1)
⬛︎ 创造一个暴风雨的活动环境
以前的文章”在Mac OS X环境下尝试Apache Storm的基本运行”几乎是相同的步骤。
- storm本体を”/usr/local/storm”にインストールします。
$ brew install wget
$ cd /usr/local/
$ wget http://ftp.jaist.ac.jp/pub/apache/storm/apache-storm-0.10.0/apache-storm-0.10.0.tar.gz
$ tar zxvf apache-storm-0.10.0.tar.gz
$ ln -s apache-storm-0.10.0 storm
$ rm apache-storm-0.10.0.tar.gz
- storm環境設定を行います。(インストール先に応じて、”storm.local.dir”を変更します)
$ cd storm/conf
$ vi storm.yaml
...(snip)
storm.zookeeper.servers:
- "127.0.0.1"
storm.local.dir: "/usr/local/storm"
nimbus.host: "127.0.0.1"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
- stormコマンドが簡易に起動できるように、パス設定を行っておきます。
$ vi $HOME/.profile
...(snip)
export PATH=$PATH:/usr/local/storm/bin
“Nimbus”を起動します。
$ storm nimbus
Running: java -server -Ddaemon.name=nimbus -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/storm/conf -Xmx1024m -Dlogfile.name=nimbus.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.daemon.nimbus
“Supervisor”を起動します。
$ storm supervisor
Running: java -server -Ddaemon.name=supervisor -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/storm/conf -Xmx256m -Dlogfile.name=supervisor.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.daemon.supervisor
“UI”を起動します。
$ storm ui
Running: java -server -Ddaemon.name=ui -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/apache-storm-0.10.0:/usr/local/storm/conf -Xmx768m -Dlogfile.name=ui.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.ui.core
- Strom Supervisorで動作させる”ExclamationTopology”環境を作成します。sbtコマンドを用いたビルド処理が完了したことを確認しておきます。
$ git clone https://github.com/ttsubo/scala-storm-starter.git
$ cd scala-storm-starter/
$ git checkout KafkaSpout
$ sbt compile package assembly
$ ls -l target/scala-2.11/
total 34928
drwxr-xr-x 3 ttsubo staff 102 2 19 18:26 cache
drwxr-xr-x 3 ttsubo staff 102 2 20 13:29 classes
-rw-r--r-- 1 ttsubo staff 17874229 2 20 17:28 scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar
-rw-r--r-- 1 ttsubo staff 7597 2 20 17:28 scala-storm-starter_2.11-0.0.2-SNAPSHOT.jar
drwxr-xr-x 2 ttsubo staff 68 2 20 08:58 test-classes
我会在本地模式下尝试使用”ExclamationTopology”。
我们来确认一下使用Apache Kafka/Storm的流处理的运行图像。这只是一个简单的用户应用程序,每秒通知一次当前时间,所以怀疑是否可以称之为流处理…
- 先ほど配備した、KafkaのProducer側サンプルアプリ”kafkaStorm_producer.py”を起動します。
$ python kafkaStorm_producer.py
It is 09:36:12.434021
It is 09:36:13.457584
It is 09:36:14.460644
It is 09:36:15.464444
It is 09:36:16.469271
It is 09:36:17.472175
It is 09:36:18.478534
It is 09:36:19.485352
It is 09:36:20.489528
It is 09:36:21.491859
It is 09:36:22.498474
It is 09:36:23.505260
It is 09:36:24.507176
It is 09:36:25.513852
... (snip)
-
- 続いて、先ほど、”ExclamationTopology”環境を作成したディレクトリに移動して、”ExclamationTopology”をローカルモードで起動します。
- kafkaのtopic”kafkaStorm”に保管されたストリームデータを、Storm側のSpout:word-spoutが取得して、そのあと、Bolt:exclaim1および、Bolt:exclaim2では、Tupleを受け取るごとに、”!!!”の文字列を追加していきます。
$ storm jar target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar storm.starter.topology.ExclamationTopology
...(snip)
7202 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout default [It is 09:36:12.434021]
7203 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 4 TUPLE: source: word-spout:7, stream: default, id: {-2736707202019925397=8535453313544767994}, [It is 09:36:12.434021]
7203 [Thread-24-exclaim1] INFO b.s.d.executor - Processing received message FOR 4 TUPLE: source: word-spout:7, stream: default, id: {-2736707202019925397=8535453313544767994}, [It is 09:36:12.434021]
7204 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout __ack_init [-2736707202019925397 8535453313544767994 7]
7204 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-2736707202019925397 8535453313544767994 7]
7204 [Thread-24-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [It is 09:36:12.434021!!!]
7204 [Thread-16-__acker] INFO b.s.d.executor - Processing received message FOR 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-2736707202019925397 8535453313544767994 7]
7205 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout default [It is 09:36:13.457584]
7205 [Thread-16-__acker] INFO b.s.d.executor - BOLT ack TASK: 1 TIME: TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-2736707202019925397 8535453313544767994 7]
7205 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: word-spout:7, stream: default, id: {-7111756359690180959=8707010670879263031}, [It is 09:36:13.457584]
7205 [Thread-16-__acker] INFO b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: __ack_init, id: {}, [-2736707202019925397 8535453313544767994 7] TASK: 1 DELTA:
7205 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout __ack_init [-7111756359690180959 8707010670879263031 7]
7205 [Thread-10-exclaim1] INFO b.s.d.executor - Processing received message FOR 2 TUPLE: source: word-spout:7, stream: default, id: {-7111756359690180959=8707010670879263031}, [It is 09:36:13.457584]
7205 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-7111756359690180959 8707010670879263031 7]
7205 [Thread-10-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [It is 09:36:13.457584!!!]
7205 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout default [It is 09:36:14.460644]
7205 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word-spout:7, stream: default, id: {6911314124032831513=2770314375009569340}, [It is 09:36:14.460644]
7205 [Thread-10-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:2, stream: default, id: {-7111756359690180959=-7992097948249303649}, [It is 09:36:13.457584!!!]
7205 [Thread-16-__acker] INFO b.s.d.executor - Processing received message FOR 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-7111756359690180959 8707010670879263031 7]
7206 [Thread-10-exclaim1] INFO b.s.d.task - Emitting: exclaim1 __ack_ack [-7111756359690180959 -1602198103229490008]
7206 [Thread-16-__acker] INFO b.s.d.executor - BOLT ack TASK: 1 TIME: TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-7111756359690180959 8707010670879263031 7]
7206 [Thread-16-__acker] INFO b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: __ack_init, id: {}, [-7111756359690180959 8707010670879263031 7] TASK: 1 DELTA:
7206 [Thread-10-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim1:2, stream: __ack_ack, id: {}, [-7111756359690180959 -1602198103229490008]
7206 [Thread-14-exclaim1] INFO b.s.d.executor - Processing received message FOR 3 TUPLE: source: word-spout:7, stream: default, id: {6911314124032831513=2770314375009569340}, [It is 09:36:14.460644]
7206 [Thread-10-exclaim1] INFO b.s.d.executor - BOLT ack TASK: 2 TIME: TUPLE: source: word-spout:7, stream: default, id: {-7111756359690180959=8707010670879263031}, [It is 09:36:13.457584]
7206 [Thread-14-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [It is 09:36:14.460644!!!]
7206 [Thread-10-exclaim1] INFO b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: default, id: {-7111756359690180959=8707010670879263031}, [It is 09:36:13.457584] TASK: 2 DELTA:
7206 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout __ack_init [6911314124032831513 2770314375009569340 7]
7206 [Thread-18-exclaim2] INFO b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:2, stream: default, id: {-7111756359690180959=-7992097948249303649}, [It is 09:36:13.457584!!!]
7206 [Thread-14-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:3, stream: default, id: {6911314124032831513=6305416489060296620}, [It is 09:36:14.460644!!!]
7206 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [6911314124032831513 2770314375009569340 7]
7207 [Thread-18-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [It is 09:36:13.457584!!!!!!]
7207 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout default [It is 09:36:15.464444]
7207 [Thread-18-exclaim2] INFO b.s.d.task - Emitting: exclaim2 __ack_ack [-7111756359690180959 -7992097948249303649]
7207 [Thread-14-exclaim1] INFO b.s.d.task - Emitting: exclaim1 __ack_ack [6911314124032831513 8211035036717084048]
7207 [Thread-14-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim1:3, stream: __ack_ack, id: {}, [6911314124032831513 8211035036717084048]
7207 [Thread-18-exclaim2] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [-7111756359690180959 -7992097948249303649]
7207 [Thread-16-__acker] INFO b.s.d.executor - Processing received message FOR 1 TUPLE: source: exclaim1:2, stream: __ack_ack, id: {}, [-7111756359690180959 -1602198103229490008]
7207 [Thread-14-exclaim1] INFO b.s.d.executor - BOLT ack TASK: 3 TIME: TUPLE: source: word-spout:7, stream: default, id: {6911314124032831513=2770314375009569340}, [It is 09:36:14.460644]
7207 [Thread-18-exclaim2] INFO b.s.d.executor - BOLT ack TASK: 6 TIME: TUPLE: source: exclaim1:2, stream: default, id: {-7111756359690180959=-7992097948249303649}, [It is 09:36:13.457584!!!]
7207 [Thread-16-__acker] INFO b.s.d.executor - BOLT ack TASK: 1 TIME: TUPLE: source: exclaim1:2, stream: __ack_ack, id: {}, [-7111756359690180959 -1602198103229490008]
7207 [Thread-14-exclaim1] INFO b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: default, id: {6911314124032831513=2770314375009569340}, [It is 09:36:14.460644] TASK: 3 DELTA:
7207 [Thread-18-exclaim2] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: default, id: {-7111756359690180959=-7992097948249303649}, [It is 09:36:13.457584!!!] TASK: 6 DELTA:
7207 [Thread-16-__acker] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: __ack_ack, id: {}, [-7111756359690180959 -1602198103229490008] TASK: 1 DELTA:
7207 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 4 TUPLE: source: word-spout:7, stream: default, id: {4141217291662411057=-1021888700141355268}, [It is 09:36:15.464444]
7207 [Thread-18-exclaim2] INFO b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:3, stream: default, id: {6911314124032831513=6305416489060296620}, [It is 09:36:14.460644!!!]
7208 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout __ack_init [4141217291662411057 -1021888700141355268 7]
7207 [Thread-16-__acker] INFO b.s.d.executor - Processing received message FOR 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [6911314124032831513 2770314375009569340 7]
7208 [Thread-18-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [It is 09:36:14.460644!!!!!!]
7208 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [4141217291662411057 -1021888700141355268 7]
7208 [Thread-16-__acker] INFO b.s.d.executor - BOLT ack TASK: 1 TIME: TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [6911314124032831513 2770314375009569340 7]
7208 [Thread-18-exclaim2] INFO b.s.d.task - Emitting: exclaim2 __ack_ack [6911314124032831513 6305416489060296620]
7208 [Thread-16-__acker] INFO b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: __ack_init, id: {}, [6911314124032831513 2770314375009569340 7] TASK: 1 DELTA:
7208 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout default [It is 09:36:16.469271]
7208 [Thread-16-__acker] INFO b.s.d.executor - Processing received message FOR 1 TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [-7111756359690180959 -7992097948249303649]
7208 [Thread-18-exclaim2] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [6911314124032831513 6305416489060296620]
7208 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: word-spout:7, stream: default, id: {3052868074563147181=3745382070922932756}, [It is 09:36:16.469271]
7208 [Thread-18-exclaim2] INFO b.s.d.executor - BOLT ack TASK: 6 TIME: TUPLE: source: exclaim1:3, stream: default, id: {6911314124032831513=6305416489060296620}, [It is 09:36:14.460644!!!]
7209 [Thread-16-__acker] INFO b.s.d.task - Emitting direct: 7; __acker __ack_ack [-7111756359690180959]
7209 [Thread-18-exclaim2] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:3, stream: default, id: {6911314124032831513=6305416489060296620}, [It is 09:36:14.460644!!!] TASK: 6 DELTA:
7209 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout __ack_init [3052868074563147181 3745382070922932756 7]
7209 [Thread-10-exclaim1] INFO b.s.d.executor - Processing received message FOR 2 TUPLE: source: word-spout:7, stream: default, id: {3052868074563147181=3745382070922932756}, [It is 09:36:16.469271]
7209 [Thread-16-__acker] INFO b.s.d.executor - TRANSFERING tuple TASK: 7 TUPLE: source: __acker:1, stream: __ack_ack, id: {}, [-7111756359690180959]
7209 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [3052868074563147181 3745382070922932756 7]
7209 [Thread-16-__acker] INFO b.s.d.executor - BOLT ack TASK: 1 TIME: TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [-7111756359690180959 -7992097948249303649]
7209 [Thread-10-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [It is 09:36:16.469271!!!]
7209 [Thread-12-word-spout] INFO b.s.d.executor - Processing received message FOR 7 TUPLE: source: __acker:1, stream: __ack_ack, id: {}, [-7111756359690180959]
7209 [Thread-10-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 5 TUPLE: source: exclaim1:2, stream: default, id: {3052868074563147181=-6638282129905823771}, [It is 09:36:16.469271!!!]
7209 [Thread-16-__acker] INFO b.s.d.executor - Execute done TUPLE source: exclaim2:6, stream: __ack_ack, id: {}, [-7111756359690180959 -7992097948249303649] TASK: 1 DELTA:
7209 [Thread-10-exclaim1] INFO b.s.d.task - Emitting: exclaim1 __ack_ack [3052868074563147181 -8063043445652341263]
7209 [Thread-16-__acker] INFO b.s.d.executor - Processing received message FOR 1 TUPLE: source: exclaim1:3, stream: __ack_ack, id: {}, [6911314124032831513 8211035036717084048]
7209 [Thread-10-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim1:2, stream: __ack_ack, id: {}, [3052868074563147181 -8063043445652341263]
7209 [Thread-16-__acker] INFO b.s.d.executor - BOLT ack TASK: 1 TIME: TUPLE: source: exclaim1:3, stream: __ack_ack, id: {}, [6911314124032831513 8211035036717084048]
7209 [Thread-10-exclaim1] INFO b.s.d.executor - BOLT ack TASK: 2 TIME: TUPLE: source: word-spout:7, stream: default, id: {3052868074563147181=3745382070922932756}, [It is 09:36:16.469271]
7209 [Thread-12-word-spout] INFO b.s.d.executor - SPOUT Acking message -7111756359690180959 storm.kafka.PartitionManager$KafkaMessageId@2e10053a
7209 [Thread-10-exclaim1] INFO b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: default, id: {3052868074563147181=3745382070922932756}, [It is 09:36:16.469271] TASK: 2 DELTA:
7209 [Thread-16-__acker] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:3, stream: __ack_ack, id: {}, [6911314124032831513 8211035036717084048] TASK: 1 DELTA:
7210 [Thread-16-__acker] INFO b.s.d.executor - Processing received message FOR 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [4141217291662411057 -1021888700141355268 7]
7210 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout default [It is 09:36:17.472175]
7210 [Thread-22-exclaim2] INFO b.s.d.executor - Processing received message FOR 5 TUPLE: source: exclaim1:2, stream: default, id: {3052868074563147181=-6638282129905823771}, [It is 09:36:16.469271!!!]
7210 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word-spout:7, stream: default, id: {-7091949305103491056=3997162068754701889}, [It is 09:36:17.472175]
7210 [Thread-16-__acker] INFO b.s.d.executor - BOLT ack TASK: 1 TIME: TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [4141217291662411057 -1021888700141355268 7]
7210 [Thread-22-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [It is 09:36:16.469271!!!!!!]
7210 [Thread-16-__acker] INFO b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: __ack_init, id: {}, [4141217291662411057 -1021888700141355268 7] TASK: 1 DELTA:
7210 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout __ack_init [-7091949305103491056 3997162068754701889 7]
7210 [Thread-22-exclaim2] INFO b.s.d.task - Emitting: exclaim2 __ack_ack [3052868074563147181 -6638282129905823771]
7211 [Thread-16-__acker] INFO b.s.d.executor - Processing received message FOR 1 TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [6911314124032831513 6305416489060296620]
7211 [Thread-24-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:4, stream: default, id: {-2736707202019925397=8147493952586947714}, [It is 09:36:12.434021!!!]
7211 [Thread-22-exclaim2] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim2:5, stream: __ack_ack, id: {}, [3052868074563147181 -6638282129905823771]
7211 [Thread-16-__acker] INFO b.s.d.task - Emitting direct: 7; __acker __ack_ack [6911314124032831513]
7211 [Thread-24-exclaim1] INFO b.s.d.task - Emitting: exclaim1 __ack_ack [-2736707202019925397 533028969185990008]
7211 [Thread-22-exclaim2] INFO b.s.d.executor - BOLT ack TASK: 5 TIME: TUPLE: source: exclaim1:2, stream: default, id: {3052868074563147181=-6638282129905823771}, [It is 09:36:16.469271!!!]
7211 [Thread-14-exclaim1] INFO b.s.d.executor - Processing received message FOR 3 TUPLE: source: word-spout:7, stream: default, id: {-7091949305103491056=3997162068754701889}, [It is 09:36:17.472175]
7211 [Thread-16-__acker] INFO b.s.d.executor - TRANSFERING tuple TASK: 7 TUPLE: source: __acker:1, stream: __ack_ack, id: {}, [6911314124032831513]
7211 [Thread-18-exclaim2] INFO b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:4, stream: default, id: {-2736707202019925397=8147493952586947714}, [It is 09:36:12.434021!!!]
7211 [Thread-22-exclaim2] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: default, id: {3052868074563147181=-6638282129905823771}, [It is 09:36:16.469271!!!] TASK: 5 DELTA:
7211 [Thread-18-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [It is 09:36:12.434021!!!!!!]
...(snip)
非常难懂, 但在word-spout中处理了[It is 09:36:12.434021],然后在exclaim1中处理了[It is 09:36:12.434021!!!],再然后在exclaim2中处理了[It is 09:36:12.434021!!!!!!],可以确认这个过程。
以远程模式尝试“ExclamationTopology”。
-
- 引き続き、”ExclamationTopology”をリモートモードで動作させてみます。
- 今回は、トポロジ名として、”ExclamationTopology”としてTopogyを動作させました。
$ storm jar target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar storm.starter.topology.ExclamationTopology ExclamationTopology
Running: java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar:/usr/local/storm/conf:/usr/local/apache-storm-0.10.0/bin -Dstorm.jar=target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar storm.starter.topology.ExclamationTopology ExclamationTopology
438 [main] INFO b.s.u.Utils - Using defaults.yaml from resources
495 [main] INFO b.s.u.Utils - Using storm.yaml from resources
521 [main] INFO b.s.u.Utils - Using defaults.yaml from resources
529 [main] INFO b.s.u.Utils - Using storm.yaml from resources
531 [main] INFO b.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -6085723166361804975:-8023199696530217358
532 [main] INFO b.s.s.a.AuthUtils - Got AutoCreds []
548 [main] INFO b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
561 [main] INFO b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
580 [main] INFO b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
586 [main] INFO b.s.StormSubmitter - Uploading topology jar target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar to assigned location: /usr/local/storm/nimbus/inbox/stormjar-e9d58ebd-0ec2-49df-8745-fcf9f931dd47.jar
733 [main] INFO b.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /usr/local/storm/nimbus/inbox/stormjar-e9d58ebd-0ec2-49df-8745-fcf9f931dd47.jar
733 [main] INFO b.s.StormSubmitter - Submitting topology ExclamationTopology in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-6085723166361804975:-8023199696530217358","topology.workers":3,"topology.debug":true}
889 [main] INFO b.s.StormSubmitter - Finished submitting topology: ExclamationTopology



- 最後に、リモートモードで動作中の、”ExclamationTopology”トポロジを停止させます。
$ storm kill ExclamationTopology
Running: java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/storm/conf:/usr/local/apache-storm-0.10.0/bin backtype.storm.command.kill_topology ExclamationTopology
1157 [main] INFO b.s.u.Utils - Using defaults.yaml from resources
1215 [main] INFO b.s.u.Utils - Using storm.yaml from resources
1651 [main] INFO b.s.u.Utils - Using defaults.yaml from resources
1661 [main] INFO b.s.u.Utils - Using storm.yaml from resources
1672 [main] INFO b.s.thrift - Connecting to Nimbus at 127.0.0.1:6627 as user:
1672 [main] INFO b.s.u.Utils - Using defaults.yaml from resources
1678 [main] INFO b.s.u.Utils - Using storm.yaml from resources
1691 [main] INFO b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
1736 [main] INFO b.s.c.kill-topology - Killed topology: ExclamationTopology
⬛︎ 对于Apache Kafka/Storm的协同使用进行简要解释
在Apache Kafka/Storm联合中,若要在Storm端从Kafka主题“kafkaStorm”上消费流数据,需要在ExclamationTopology中定义KafkaSpout方法的运行条件。顺便提一下,KafkaSpout方法可以在这个Github仓库(Github: apache/storm)中找到。
此外,为了能够将KafkaSpout的运行条件存储在Zookeeper的分布式键值存储中,还需要进行与ZooKeeper相关的配置。
package storm.starter.topology
import backtype.storm.{ Config, LocalCluster, StormSubmitter }
import backtype.storm.testing.TestWordSpout
import backtype.storm.topology.TopologyBuilder
import backtype.storm.utils.Utils
import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts, StringScheme}
import backtype.storm.spout.SchemeAsMultiScheme
object ExclamationTopology {
def main(args: Array[String]) {
import storm.starter.bolt.ExclamationBolt
val builder: TopologyBuilder = new TopologyBuilder()
val topic = "kafkaStorm"
val kafkaZkConnect = "127.0.0.1:2181"
val zkHosts = new ZkHosts(kafkaZkConnect, "/brokers")
val zkRoot = "/kafkastorm"
val zkSpoutId = "kafka-spout"
val kafkaConfig = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId)
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
val kafkaSpout = new KafkaSpout(kafkaConfig)
val kafkaSpoutId = "word-spout"
builder.setSpout(kafkaSpoutId, kafkaSpout)
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping(kafkaSpoutId)
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1")
val config = new Config()
config.setDebug(true)
if (args != null && args.length > 0) {
config.setNumWorkers(3)
StormSubmitter.submitTopology(args(0), config, builder.createTopology())
} else {
val cluster: LocalCluster = new LocalCluster()
cluster.submitTopology("ExclamationTopology", config, builder.createTopology())
Utils.sleep(5000)
cluster.killTopology("ExclamationTopology")
cluster.shutdown()
}
}
}
⬛︎ 结束
我设法确认了使用Apache Kafka/Storm协同处理的流处理操作的工作原理。然而,由于涉及到多种Apache Kafka/Storm协同处理的技术要素,因此技术的掌握似乎并不容易。在这里没有提及,但在Apache Kafka/Storm协同处理中,也需要了解到Apache ZooKeeper的技术知识。
此外,我发现要想实现Apache Kafka/Storm协同处理集群运维,需要掌握的技术知识将变得非常庞大。希望能够出版一本涵盖这些内容的解释手册。
这次就到这里。
根据原文
-
- Storm Kafka Integration
-
- Integrating Kafka and Storm: Code Examples and State of the Game
- https://github.com/apache/storm/tree/master/external/storm-kafka