はじめに
Oracle Cloud Infractructure(以下OCI)では、ストリーミングデータをリアルタイムに収集・処理が出来る Streaming というサービスが提供されています。Streaming へデータを入出力する方法は、ざっくり次のものがあります
-
- OCI SDK
-
- Kafka 互換API
-
- Kafka Connect
-
- OCI CLI
- REST API
今回の記事では、Kafka Connect を使って Streaming にデータを入出力してみます。Kafka Connect は、Apache Kafka で提供されている API となっていて、簡単にデータを入出力出来ます。Connect に接続するための Connector が色々公開されているため、自分が使いたい Connector があれば、自分で実装する負担を減らせます。使いたい Connector が無ければ、自分で作っていくこともできます。
インターネット上に公開されているコネクターは、ライセンス形態にご注意ください。
Auth Token生成
Streaming で、Kafka Connect で接続するためには、IAM ユーザーで Auth Token が必要です。自分の IAMユーザーの詳細画面で、Generate します。

適当に説明をいれます。

Token が表示されるので、メモっておきます。(画像のものは現在使えません)
Tokenの中に、; などの記号が含まれていると、正しく動かない気がします。もしハマって正常に動かない時は、Tokenの入れ替えも検討してみてください。

Stream の作成
OCI Console で、Analytics > Streaming を選択します。その後、Create Stream を選びます。

適当にパラメータを入れて作成します

Stream を作ったときに、Stream Pool が何もない場合は、Default の Stream Pool が自動的に作成されます。teststream01は、DefaultPool に自動配置されます。Stream Pool は、複数のStreamを管理する概念となっていて、Stream への Endpoint を Public にするのか Private にするのか、またデータの暗号に使う鍵は何を使用するのか、といった事をまとめて管理するための概念です。
今回は自動作成にしたので、Endpoint は Public になっていて、暗号化用の鍵は Oracle 側で管理されているものを使う設定です。

上記の画面にある「View Kafka Connection Settings」を押します。

Copy All を押して、すべての設定値をメモっておきます。

Kafka Connect Configration を作成
Kafka Connector を使ってStreamingに接続するために、Kafka Connect Configration を作成します。これを作成すると、Streaming 側で Kafka 用語でいうところの 3種類のTopic が作成されます。複数の Connector が負荷分散して動作するためには、互いにどういうステータスで、どこまでのデータを処理して、といった情報共有が必要です。情報共有するために、3種類のTopic上でやり取りを行っています。
それでは、Create ボタンをおします。

名前を適当にいれます。

詳細画面が自動的に表示されます。Copy を押してメモっておきます。

Connector 用仮想マシンの作成
今回の手順では、適当に CentOS7 の仮想マシンを作成します。
Open JDK 8 の Install
作成した CentOS7 で Kafka Connector を動かすために、Open JDK 1.8 を Install します。好きな JDK で問題ないと思います。
sudo yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
Kafka Download
Kafka Connector を動かすためのツールが、Kafka 本体の tar gz ball に含まれているのでダウンロードします。以下の URL からダウンロードできます。
mkdir ~/kafka
cd ~/kafka
wget https://ftp.tsukuba.wide.ad.jp/software/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
解凍
tar xfvz kafka_2.12-2.5.0.tgz
作業しやすいために、Kafka Home を環境変数に設定
echo 'export KAFKA_HOME=$HOME/kafka/kafka_2.12-2.5.0/' >> ~/.bashrc
bachrc 読み込み
source ~/.bashrc
Kafka の targz ball に含まれているライブラリを確認します。
ls -la $KAFKA_HOME/libs | grep -i connect
いくつかの connector が Kafka に含まれています。
[opc@kafka01 libs]$ ls -la $KAFKA_HOME/libs | grep -i connect
-rw-r--r--. 1 opc opc 101941 Apr 8 01:14 connect-api-2.5.0.jar
-rw-r--r--. 1 opc opc 18607 Apr 8 01:16 connect-basic-auth-extension-2.5.0.jar
-rw-r--r--. 1 opc opc 20506 Apr 8 01:16 connect-file-2.5.0.jar
-rw-r--r--. 1 opc opc 45199 Apr 8 01:14 connect-json-2.5.0.jar
-rw-r--r--. 1 opc opc 90298 Apr 8 01:16 connect-mirror-2.5.0.jar
-rw-r--r--. 1 opc opc 29086 Apr 8 01:16 connect-mirror-client-2.5.0.jar
-rw-r--r--. 1 opc opc 557008 Apr 8 01:16 connect-runtime-2.5.0.jar
-rw-r--r--. 1 opc opc 91639 Apr 8 01:16 connect-transforms-2.5.0.jar
[opc@kafka01 libs]$
Kafka Connect 用設定
Kafka Connector 用に必要な設定ファイルを作成します。それぞれのパラメータは、環境に合わせて適宜変更してください。
group.id : 任意の名前。複数の Connect Worker を動かす時は、どういうの名前を指定する
bootstrap.servers : OCI の Stream Pool からコピーした値を指定
sasl.jaas.config : OCI の Stream Pool からコピーした値を指定。password=”” の部分は、Auth Token の文字列を指定
producer.sasl.jaas.config : sasl.jaas.config と同じものを指定
consumer.sasl.jaas.config : sasl.jaas.config と同じものを指定
config.storage.topic : Kafka Connect Configration からコピーした値を指定
status.storage.topic : Kafka Connect Configration からコピーした値を指定
offset.storage.topic : Kafka Connect Configration からコピーした値を指定
cat <<'EOF' > $KAFKA_HOME/config/connect-distributed-demo.properties
group.id=connect-demo-group
bootstrap.servers=cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla" password="8t[shwUN}I-d+{}8Nx_a";
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_SSL
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla" password="8t[shwUN}I-d+{}8Nx_a";
consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_SSL
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla" password="8t[shwUN}I-d+{}8Nx_a";
config.storage.replication.factor=1
config.storage.partitions=1
config.storage.topic=ocid1.connectharness.oc1.ap-tokyo-1.amaaaaaaycetm7yapmslajk3scdwvngqw47c2iixkqltnb7zllk7ca3t33va-config
offset.storage.replication.factor=1
offset.storage.partitions=1
offset.storage.topic=ocid1.connectharness.oc1.ap-tokyo-1.amaaaaaaycetm7yapmslajk3scdwvngqw47c2iixkqltnb7zllk7ca3t33va-offset
offset.flush.interval.ms=10000
offset.flush.timeout.ms=5000
status.storage.replication.factor=1
status.storage.partitions=1
status.storage.topic=ocid1.connectharness.oc1.ap-tokyo-1.amaaaaaaycetm7yapmslajk3scdwvngqw47c2iixkqltnb7zllk7ca3t33va-status
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
task.shutdown.graceful.timeout.ms=10000
EOF
Kafka Connect を実行
$KAFKA_HOME/bin/connect-distributed.sh $KAFKA_HOME/config/connect-distributed-demo.properties
実行例
たくさんの文字が流れて行って、末尾にこんな文字が表示されれば正常状態です。
[2020-05-03 03:07:46,369] INFO [Worker clientId=connect-1, groupId=connect-demo-group] Finished reading to end of log and updated config snapshot, new config log offset: 886603188224 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1085)[2020-05-03 03:07:46,369] INFO [Worker clientId=connect-1, groupId=connect-demo-group] Starting connectors and tasks using config offset 886603188224 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1111)
[2020-05-03 03:07:46,369] INFO [Worker clientId=connect-1, groupId=connect-demo-group] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1132)[2020-05-03 03:07:47,378] INFO [Worker clientId=connect-1, groupId=connect-demo-group] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1447)
Source Connector の設定
今回の記事では、File Connector を使用して動作確認をします。動作確認に使う Inputファイル・Outputファイルを作成します。Input ファイルは、1から1000までの文字を1行ごとにかいたファイルです。output.txt は touch で作っているだけなので、中身は空っぽです。
mkdir ~/connect-quickstart
seq 1000 > ~/connect-quickstart/input.txt
touch ~/connect-quickstart/output.txt
コネクタを作成するまえに、Connect Worker(仮想マシン) の中で定義されている Connector 一覧を表示します。今はなにもしていないので、空白が返ってきます。
[opc@kafka01 connect-quickstart]$ curl -s http://localhost:8083/connectors | jq .
[]
[opc@kafka01 connect-quickstart]$
Connector を設定します。
topic : OCI Stream の名前を指定
file : Input として使うファイルのPathを指定
curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"teststream01","file": "/home/opc/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
list を確認します。
[opc@kafka01 connect-quickstart]$ curl -s http://localhost:8083/connectors | jq .
[
"file-source"
]
[opc@kafka01 connect-quickstart]$
詳細情報を確認します
[opc@kafka01 connect-quickstart]$ curl -s http://localhost:8083/connectors/file-source/status | jq .
{
"name": "file-source",
"connector": {
"state": "RUNNING",
"worker_id": "10.0.0.14:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "10.0.0.14:8083"
}
],
"type": "source"
}
memo : Deleteコマンド
curl -s -X DELETE http://localhost:8083/connectors/file-source | jq .
Sink Connector の設定
Connector の Output(Sink) としての設定をします。こちらも Input と同様に File Connector を使います。
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"teststream01", "file": "/home/opc/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
fileが作成されています。
[opc@kafka01 connect-quickstart]$ head -n 10 output.txt
1
2
3
4
5
6
7
8
9
10
[opc@kafka01 connect-quickstart]$
検証してわかったこと
-
- Streaming の Stream の名前が、KafkaのTopicNameに対応している
- 一度コネクターを作成した後に、input file を更新しても output には流れない
参考URL
Kafka Docs
https://kafka.apache.org/documentation/
OCI Docs
https://docs.cloud.oracle.com/en-us/iaas/Content/Streaming/Tasks/kafkacompatibility.htm
Oracle Blogs
https://blogs.oracle.com/developers/using-kafka-connect-with-oracle-streaming-service-and-autonomous-db