【新手】尝试使用Amazon托管的Apache Kafka (MSK)流式处理平台(第二部分:运行Clickstream实验)

1. 原因或意图

    • Kafkaの学習をしている。前の記事「【初心者】Amazon Managed Streaming for Apache Kafka (MSK) / MSK Serverless を使ってみる」にて、AWS上でのKafkaクラスターの作成、Producerからのメッセージ送信/Consumerからのメッセージ取得を行った。

 

    もう少し実際のユースケースに近いハンズオンを実施し、理解を深める。(ゴール: 処理の流れの雰囲気がなんとなく分かること)

2. 做过的事情 de

    • AWSが提供する公式ハンズオン集(AWS workshop Studio – Amazon MSK Labs)から、「Clickstream Lab」を実施する。

 

    • (理解できているかは別として)ハンズオンは一応完了したので、自分(初心者)の気づきなどをハンズオン手順の補足として記載する。

 

    • 大まかな内容は以下の通り。

Kafkaクラスターの作成
Producerにて、クリックストリームデータ(WEBサイトのアクセスログを模擬)を送信
Kinesis Data Analytics で、Kafkaクラスターに送られたメッセージを処理し、Kafkaクラスターの別のTopicに再送信するとともに、Open Search Service にも保存
Consumerにて、Kafkaクラスターからメッセージを取得
Kibanaにて、Open Search Service に保存されたデータを可視化

3. 构成图

構成図1.png

4. 步骤

按照实际操作步骤进行实施。记录遇到的问题和补充的命令等。

设置

    • 指定のCloudFormationテンプレートを用いて、VPC(必要なSubnetなど含む)、MSKクラスター、Kinesis Data Analytics のアプリケーション、OpenSearch Serviceのドメインを一気に作成する。20分程度で作成完了する。

 

    東京リージョンではエラーで作成不可のため、今回はus-west-2(オレゴン)で実施。(テンプレート内で、使用するAMIが以下の3リージョンしか設定されていないため)
  RegionAMI:
      us-east-1:
        HVM64: ami-00dc79254d0461090
      us-west-2:
        HVM64: ami-0a85857bfc5345c38
      eu-west-1:
        HVM64: ami-040ba9174949f6de4

运营生产者

    • EC2インスタンスでSchema Registry Service(AMIにインストール済)を起動し、ProducerがSchema Registry Service を使うように設定する。

 

    • 4つのTopicを手動で作成する。

ExampleTopic: Producerがメッセージを送信し、Kinesis Data Analyticsのアプリケーションがメッセージを取得する用
Departments_Agg, ClickEvents_UserId_Agg_Result, User_Sessions_Aggregates_With_Order_Checkout: Kinesis Data Analyticsのアプリがメッセージを送信し、Consumerがメッセージを取得する用

KafkaClickstreamClient-1.0-SNAPSHOT.jar を実行して、ExampleTopic に大量のメッセージを送信する。以下のような内容のデータが送信される。

{"ip": "66.249.1.133", "eventtimestamp": 1655183298630, "devicetype": "tablet", "event_type": "home_page", "product_type": "N/A", "userid": 7496, "globalseq": 2, "prevglobalseq": 0}
{"ip": "66.249.1.254", "eventtimestamp": 1655183298633, "devicetype": "mobile", "event_type": "home_page", "product_type": "N/A", "userid": 6625, "globalseq": 3, "prevglobalseq": 0}
{"ip": "66.249.1.49", "eventtimestamp": 1655183298634, "devicetype": "mobile", "event_type": "home_page", "product_type": "N/A", "userid": 4000, "globalseq": 4, "prevglobalseq": 0}
    • このProducerのJavaアプリはGitHubで公開されているため、ソースをちゃんと見れば詳しい挙動が分かるかもしれない。

 

    • Schema Registry Serviceの動作を理解するのが初心者には難しいが、スキーマを定義したり、データのシリアライズをしたりしているとざっくり理解する。(以下参照)

Avro,SchemaRegistryことはじめ
Schema Registry の概要

配置Java应用的亚马逊KDA

    • CloudFormationにより、Kinesis Data Analyticsのアプリケーションは作成済。Kafkaクラスターへの接続設定など一部のパラメータを修正する。Producer側同様、このアプリからSchema Registry Serviceへアクセスが発生する。

 

    • アプリケーションを実行すると、KafkaクラスターのExampleTopicからメッセージを取得し、メッセージを処理した上で、Kafkaクラスターの3つのTopicへのメッセージ再送信、およびOpenSearch Serviceへの書き込みが行われる。

 

    このKinesis Data Analytics上で動作するアプリケーションはS3に保存されているものを参照して使用しているが、ソースは公開されていない様子。

从亚马逊 MSK 消费

    Producerを実行したEC2インスタンスで、今度はConsumerを実行し、3つのTopicに対してデータの取得を行う。3つのTopicで別々のメッセージが取得できることを確認する。
[ec2-user@ip-10-0-0-237 ~]$ /home/ec2-user/kafka/bin/kafka-console-consumer.sh --bootstrap-server [endpoint] --topic Departments_Agg --from-beginning
{"departmentName":"video games","departmentCount":127,"windowBeginTime":1655183300000,"windowEndTime":1655183310000}
{"departmentName":"AirPods","departmentCount":125,"windowBeginTime":1655183300000,"windowEndTime":1655183310000}
{"departmentName":"AirPods","departmentCount":147,"windowBeginTime":1655183310000,"windowEndTime":1655183320000}


[ec2-user@ip-10-0-0-237 ~]$ /home/ec2-user/kafka/bin/kafka-console-consumer.sh --bootstrap-server [endpoint] --topic ClickEvents_UserId_Agg_Result --from-beginning
{"userSessionCount":256,"userSessionCountWithOrderCheckout":100,"percentSessionswithBuy":39.0,"windowBeginTime":1655183300000,"windowEndTime":1655183310000}
{"userSessionCount":269,"userSessionCountWithOrderCheckout":99,"percentSessionswithBuy":36.0,"windowBeginTime":1655183330000,"windowEndTime":1655183340000}
{"userSessionCount":267,"userSessionCountWithOrderCheckout":121,"percentSessionswithBuy":45.0,"windowBeginTime":1655183360000,"windowEndTime":1655183370000}

[ec2-user@ip-10-0-0-237 ~]$ /home/ec2-user/kafka/bin/kafka-console-consumer.sh --bootstrap-server [endpoint] --topic User_Sessions_Aggregates_With_Order_Checkout --from-beginning
{"userId":2552,"eventCount":39,"orderCheckoutEventCount":39,"deptList":["cameras","soundbars","AirPods","ear phones","cd players","cell phones","video games"],"eventKey":1,"windowBeginTime":1655183299648,"windowEndTime":1655183300767}
{"userId":19806,"eventCount":14,"orderCheckoutEventCount":14,"deptList":["soundbars","AirPods","cd players","video games","cell phones"],"eventKey":1,"windowBeginTime":1655183300177,"windowEndTime":1655183301181}
{"userId":30690,"eventCount":28,"orderCheckoutEventCount":28,"deptList":["cameras","AirPods","laptops","ear phones","cd players","cell phones","video games"],"eventKey":1,"windowBeginTime":1655183300543,"windowEndTime":1655183301549}

创建 Kibana 仪表板

    • Kinesis Data Analytics のアプリケーションがOpenSearch Serviceに保存したデータを参照し可視化する。

 

    EC2インスタンスでSSHポート転送を行い、ローカルの9092/tcpポートをリモート(KibanaのURL)の443/tcpに転送する。TeraTermの場合の設定は以下。
teraterm設定.png
    • Labに誤記があり、「https://127.0.0.1:9200/_plugin/kibana」とあるが、正しくは「https://127.0.0.1:9092/_plugin/kibana」でkibanaのダッシュボードにアクセスする。

 

    グラフ化は手順通り問題なく実施可能。

5. 感触之所在

    • 不明点はいろいろあるが、当初ゴールの「処理の雰囲気がなんとなく分かること」はいったん達成したためよしとしたい。

 

    ついでにKinesis Data Analytics も初めて設定して、よい体験になった。
广告
将在 10 秒后关闭
bannerAds