はじめに
PDIでなるべくリアルタイムに近いタイミングで処理を行ないたいというリクエストがあったので、Pentaho8から実装された「Kafka Consumer」ステップを検証してみました。
これにより、通常バッチ的に利用することを想定しているPDIでもリアルタイムに親しい処理が可能となり、PDIの利用用途が広がるのではないかと思いますので、似たような問題をお抱えの方の役に立てば幸いです。
Kafkaの準備
Kafkaについては、有益な記事がネット上にたくさんあるのでそれらを活用させていただきました。
今回はMAC上で検証しているのでこちらを参考にkafkaの環境を構築しました。
環境構築後、サービスを起動した状態で、トピックを作成しておきます。
$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
PDIの準備
こちらを参考にPDIをインストールし、PDIのETL処理をデザインするクライアントツール(Spoon)を起動します。
Kafka Consumerステップの設定を行う
Kafka Consumerステップを配置する

Kafka Consumerでメッセージを受け取った後に実行するTransformationを作成する

Kafka Consumerステップの設定を行う

動かしてみる

この状態で、プロデューサーからメッセージをいくつか投入してみます。
$ kafka-console-producer --broker-list localhost:9092 -topic test
> Hello!
> HelloHello!
> HelloHelloHello!

出力したファイルの内容を確認してみると、、
$ cat output.txt
Hello!
HelloHello!
HelloHelloHello!
のようになっており、メッセージを受け取って、Transformationを実行し、ファイルに出力していることが分かるかと思います。
実際に手元で動かすと、ほぼほぼリアルタイムにデータが書き込まれることが確認できると思います。
まとめ
今回の検証としては以上ですが、動かすだけであれば難しい設定も必要なく実装することができました。
また、パーティションを増やして、今回作成した”kafka_test.ktr”をPan.shで多数起動しておくことで、分散処理も可能になります。