はじめに

GxP の清田です。

イベント駆動型アーキテクチャ を実現するための技術の1つである、
Apach Kafka (以降 Kafka と記載) を動かして学んでみます。

素振りをしようと思った動機は、@yusuke_arclamp さんの『マイクロサービスの次に来るかもしれない言葉について』や、
社内イベント 『TechTalk「この20年の Agile, DevOps, Microservices の流れについて」』 で Kafka を耳にして気になっていたからです。

Microservice におけるシステム間連携の方法として、イベント駆動型アーキテクチャには以下の特徴があります。

    • 同期的なAPIの連携でなく非同期の連携

 

    • データを共有するのではなくイベントを配信する

 

    イベントの発行元は誰が受け取るかを気にしなくて良い(=疎結合)

これまでの開発経験で触れたことのないものだったので、一度動くアプリケーションを作って恐怖心をなくしておこうと思います。


対象読者

    • イベント駆動型アーキテクチャに興味のある人

Kafka を 動かして概念の一部を理解したい人

Docker Compose でコンテナを起動して CLI で動かして学ぶ

Spring for Apache Kafka で簡単な producer, consumer を実装する

イベントを複数のシステムが受け取って、それぞれの解釈で処理する 実装のイメージを掴みたい人

上記のいずれかに該当する場合、楽しんでいただけるかも知れません。


記載しないこと

    • 脱 Zookeeper を支える技術である Apache Kafka Raft (KRaft) について

 

    • Kafka のクラウド環境でのデプロイ(Amazon Managed Streaming for Apache Kafka など)について

 

    • イベントの配信の保証(At Least One や Exactly Once) について

 

    • RabbitMQ などの message brokers との比較について

 

    Amazon Simple Queue Service や Cloud Pub/Sub などのパブリッククラウドで提供されているサービスとの使い分けについて

上記の話題については、学習中で知識を有していないため記載しないです。(いずれも気になる)


Kafkaとは

あたりの説明で日本語でざっと読めるものとしては、以下の記事が大変参考になりました。
この記事では概念の1つずつの紹介はしていないので、参照いただけますとイメージを掴みやすいと思います。

個人的には、動かして学ぶことでこの辺りの記載を実感を持って理解することができました。

Apache Kafka(以降、Kafka)はスケーラビリティに優れた分散メッセージキューです。
Kafkaは処理性能を重視したメッセージキューであり、複数台のマシンでクラスタを構成して分散処理を行うことで、高いスループットを発揮します。後からクラスタにマシンを追加することで、処理性能とデータ保持容量をスケールアウトすることもできます。また、Kafkaはクラスタ内でデータを複製するため、一部のマシンに障害が発生してもデータを失うことなく処理を継続できます。

※イメージを掴んだ上で、Confluent のドキュメントや Kafaka のドキュメント を確認しました


Docker Compose でコンテナを起動して CLI で動かして学ぶ

まずは低レイヤーから学んでいきます。
登場人物は以下の図のものです。

kafka-advent-calendar-cli-archtecture.jpg

GitHub にサンプルを用意(KiyotaTakeshi/kafka-playground) しましたので手元で動かして確認ができます。

さくっと broker 3台でクラスターを作成します。
※今回は Zookeeper が必要な image(bitnami/kafka) を使用しています

$ git clone https://github.com/KiyotaTakeshi/kafka-playground.git

$ cd  kafka-playground

$ docker compose up -d

$ docker compose ps
NAME                           COMMAND                  SERVICE             STATUS              PORTS
kafka-playground-kafka1-1      "/opt/bitnami/script…"   kafka1              running             0.0.0.0:49760->9092/tcp
kafka-playground-kafka2-1      "/opt/bitnami/script…"   kafka2              running             0.0.0.0:49761->9092/tcp
kafka-playground-kafka3-1      "/opt/bitnami/script…"   kafka3              running             0.0.0.0:49762->9092/tcp
kafka-playground-zookeeper-1   "/opt/bitnami/script…"   zookeeper           running             0.0.0.0:2181->2181/tcp

任意の broker コンテナに入り、topic を作成します。

# どの broker に exec するかは任意(今回は kafka1)
$ docker compose exec kafka1 bash

# まずは replica は1でつくる
# partitions は4でつくる
$ /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4

作成したものを確認します。
出力から以下のことがわかります。

    • broker が3台に対して、 partition を4つに設定したため、1002(数字は実行によって異なります) の broker が2つの partiton の Leader になっている

 

    • replica は1にしているので、各 partition は1台の broker のみが保持している

Isr(In-Sync Replica) が Leader と同じ broker のみ

$ /opt/bitnami/kafka/bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-topic

Topic: test-topic   TopicId: sqlIpd0oSEC3d7PJC3c_Dg PartitionCount: 4   ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: test-topic   Partition: 0    Leader: 1002    Replicas: 1002  Isr: 1002
    Topic: test-topic   Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001
    Topic: test-topic   Partition: 2    Leader: 1003    Replicas: 1003  Isr: 1003
    Topic: test-topic   Partition: 3    Leader: 1002    Replicas: 1002  Isr: 1002

では、producer と consumer でメッセージのやり取りができることを確認します。
※関係する概念だけ記載した図です

kafka-advent-calendar-1producer-1consumer.jpg

シェルを2つ起動する必要があります。

# シェル1
# consumer の起動
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
# シェル2
# どの Broker に exec するかは任意
$ docker compose exec kafka1 bash

# producer を起動しメッセージを送る
$ /opt/bitnami/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>hello
>world

consumer 側でメッセージを受信できました。

# シェル1
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic

hello
world

もう1つシェルを立ち上げて、 consumer を増やしてみます。

kafka-advent-calendar-1producer-2consumer.jpg
# シェル3
$ docker compose exec kafka1 bash

$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic

producer からメッセージを送ると、

# シェル2
$ /opt/bitnami/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>goodbye

2つの consumer がメッセージを受信できていることがわかります。

# シェル1
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic

goodbye
# シェル3
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic

goodbye

このことから、 同じ topic から2つの consumer がメッセージを受信できる ことがわかります。
イベント駆動型アーキテクチャにおける、「イベントを複数のシステムが受け取る」が実現 できていますね(まだ CLI 上ですが)。

2つの consumer が別の subscriber として振る舞ったのはなぜでしょうか?
理由は、別の consumer group だからです。

producer を停止して、確認してみます。

# シェル2
$ /opt/bitnami/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
console-consumer-32834
console-consumer-40587

では、2つの consumer を同じ consumer group で立ち上げます。

kafka-advent-calendar-1producer-1consumer-group.jpg
# シェル1
# console-consumer という consumer group で起動
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic test-topic --group console-consumer
# シェル3
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic test-topic --group console-consumer

この状態で、 producer からメッセージを送ると

# シェル2
$ /opt/bitnami/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>hello
>world
>goodbye

2分の1でそれぞれの consumer がメッセージを受信します。

# シェル1
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
> --topic test-topic --group console-consumer

goodbye
# シェル3
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
> --topic test-topic --group console-consumer

hello
world

つまり、 とある topic に対して同じ処理をする consumer のスケールアウト が実現できました(今回はただコンソールに出力されるだけですが)。

ここまでで

    • 別の consumer group で起動して、同じ topic を複数の consumer で受け取る

イベントのシステムごとの解釈

同じ consumer group で起動して、topic を複数の同じ処理をする consumer 間で受け取る

同じ解釈をするシステムのスケールアウト

が確認できました。


では、次は broker 側のスケールアウトについて見ていきます。

以下の図のように、複数の replica を指定することで Topic の partition を複数の broker に持つことができます。

kafka-advent-calendar-broker-partition-archtecture.jpg

まず、コンテナを停止しホスト側にマウントしているデータも吹き飛ばします。
(先程作成した topic や consumer group 等がなくなります)

$ docker compose down && rm -rf .docker

コンテナを再起動し、コンテナに入ります。

$ docker compose up -d && docker compose ps

$ docker compose exec kafka1 bash

今度は、 replica 数を3(broker と同じ台数)で指定し、作成します。

# 作成
$ /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic test-topic-replicated \
> --bootstrap-server localhost:9092 --replication-factor 3 --partitions 4

作成したものを確認します。
出力から以下のことがわかります。

    • broker が3台に対して、 partition を4つに設定したため、1002(数字は実行によって異なります) の broker が2つの partiton の Leader になっている

 

    • replica を3にしているので、各 partition は3台の broker が保持している

Isr(In-Sync Replica) が3台

$ /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test-topic-replicated

Topic: test-topic-replicated    TopicId: w7ITwLFVTZi3m2AKNAUYIg PartitionCount: 4   ReplicationFactor: 3    Configs: segment.bytes=1073741824
    Topic: test-topic-replicated    Partition: 0    Leader: 1002    Replicas: 1002,1001,1003    Isr: 1002,1001,1003
    Topic: test-topic-replicated    Partition: 1    Leader: 1001    Replicas: 1001,1003,1002    Isr: 1001,1003,1002
    Topic: test-topic-replicated    Partition: 2    Leader: 1003    Replicas: 1003,1002,1001    Isr: 1003,1002,1001
    Topic: test-topic-replicated    Partition: 3    Leader: 1002    Replicas: 1002,1003,1001    Isr: 1002,1003,1001

では、 broker を停止してみましょう。

# 任意のコンテナを停止
$ docker compose stop kafka2

topic を確認してみると(コンテナ内部に入ってコマンド実行でもいいです)、出力から以下のことがわかります。

    • 停止した broker が Leader でなくなっている

 

    • 停止した broker が Isr(In-Sync Replica) に含まれなくなっている

 

    動作している broker で partition の Leader を分け合っている
$ docker compose exec -T kafka1 /bin/bash <<EOF
/opt/bitnami/kafka/bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-topic-replicated
EOF

Topic: test-topic-replicated    TopicId: w7ITwLFVTZi3m2AKNAUYIg PartitionCount: 4   ReplicationFactor: 3    Configs: segment.bytes=1073741824
    Topic: test-topic-replicated    Partition: 0    Leader: 1001    Replicas: 1002,1001,1003    Isr: 1001,1003
    Topic: test-topic-replicated    Partition: 1    Leader: 1001    Replicas: 1001,1003,1002    Isr: 1001,1003
    Topic: test-topic-replicated    Partition: 2    Leader: 1003    Replicas: 1003,1002,1001    Isr: 1003,1001
    Topic: test-topic-replicated    Partition: 3    Leader: 1003    Replicas: 1002,1003,1001    Isr: 1003,1001

では、止めていた broker を起動します。

$ docker compose start kafka2

再度 topic を確認してみると以下のことがわかりました。

Isr(In-Sync Replica) に再度起動した broker が含まれている
再度起動した broker は partition の Leader には選出されていない

$ docker compose exec -T kafka1 /bin/bash <<EOF
/opt/bitnami/kafka/bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-topic-replicated
EOF

Topic: test-topic-replicated    TopicId: w7ITwLFVTZi3m2AKNAUYIg PartitionCount: 4   ReplicationFactor: 3    Configs: segment.bytes=1073741824
    Topic: test-topic-replicated    Partition: 0    Leader: 1001    Replicas: 1002,1001,1003    Isr: 1001,1003,1002
    Topic: test-topic-replicated    Partition: 1    Leader: 1001    Replicas: 1001,1003,1002    Isr: 1001,1003,1002
    Topic: test-topic-replicated    Partition: 2    Leader: 1003    Replicas: 1003,1002,1001    Isr: 1003,1001,1002
    Topic: test-topic-replicated    Partition: 3    Leader: 1003    Replicas: 1002,1003,1001    Isr: 1003,1001,1002

※ちなみに、この状態で別の broker (今回だと kafka3)を停止すると、
先程起動した broker が Leader に含まれる挙動になりました

これらの検証で、 broker の台数を増やし replica に設定することで、
producer, consumer とやりとりをする基盤としての broker の可用性を高めることができるとわかりました。


最後に broker が実体としてどのように topic を保存しているのかを確認します。

ホスト側にマウントしているデータを確認します。topic の partition ごとにディレクトリが切られているようです。

$ ls .docker/kafka1/data/test-topic-replicated-?
.docker/kafka1/data/test-topic-replicated-0:
00000000000000000000.index  00000000000000000000.log    00000000000000000000.timeindex  leader-epoch-checkpoint     partition.metadata

.docker/kafka1/data/test-topic-replicated-1:
00000000000000000000.index  00000000000000000000.log    00000000000000000000.timeindex  leader-epoch-checkpoint     partition.metadata

.docker/kafka1/data/test-topic-replicated-2:
00000000000000000000.index  00000000000000000000.log    00000000000000000000.timeindex  leader-epoch-checkpoint     partition.metadata

.docker/kafka1/data/test-topic-replicated-3:
00000000000000000000.index  00000000000000000000.log    00000000000000000000.timeindex  leader-epoch-checkpoint     partition.metadata

$ ls -ld .docker/kafka1/data/test-topic-replicated-*
drwxr-xr-x  7 t.kiyota  AD\Domain Users  224 Dec 15 16:03 .docker/kafka1/data/test-topic-replicated-0
drwxr-xr-x  7 t.kiyota  AD\Domain Users  224 Dec 15 16:03 .docker/kafka1/data/test-topic-replicated-1
drwxr-xr-x  7 t.kiyota  AD\Domain Users  224 Dec 15 15:33 .docker/kafka1/data/test-topic-replicated-2
drwxr-xr-x  7 t.kiyota  AD\Domain Users  224 Dec 15 15:33 .docker/kafka1/data/test-topic-replicated-3

replica を3で指定したため、それぞれの broker に同様のデータが存在しています。

$ ls -ld .docker/kafka2/data/test-topic-replicated-?
drwxr-xr-x  7 t.kiyota  AD\Domain Users  224 Dec 15 16:12 .docker/kafka2/data/test-topic-replicated-0
drwxr-xr-x  7 t.kiyota  AD\Domain Users  224 Dec 15 15:33 .docker/kafka2/data/test-topic-replicated-1
drwxr-xr-x  7 t.kiyota  AD\Domain Users  224 Dec 15 15:33 .docker/kafka2/data/test-topic-replicated-2
drwxr-xr-x  7 t.kiyota  AD\Domain Users  224 Dec 15 16:12 .docker/kafka2/data/test-topic-replicated-3

$ ls -ld .docker/kafka3/data/test-topic-replicated-?
drwxr-xr-x  7 t.kiyota  AD\Domain Users  224 Dec 15 15:33 .docker/kafka3/data/test-topic-replicated-0
drwxr-xr-x  7 t.kiyota  AD\Domain Users  224 Dec 15 15:33 .docker/kafka3/data/test-topic-replicated-1
drwxr-xr-x  7 t.kiyota  AD\Domain Users  224 Dec 15 16:03 .docker/kafka3/data/test-topic-replicated-2
drwxr-xr-x  7 t.kiyota  AD\Domain Users  224 Dec 15 16:03 .docker/kafka3/data/test-topic-replicated-3

以上で CLI での検証は終わります。
コンテナ起動して CLI で動作検証することで Kafka の主要な概念についてイメージが膨らんだのではないでしょうか。


Spring for Apache Kafka で簡単な producer, consumer を実装する

次は Kafka とやりとりをするアプリケーションを開発し、イベント駆動型アーキテクチャの理解を深めていきます。

最終的に動作するものは、
GitHub のサンプル(KiyotaTakeshi/kafka-spring-sample)の Gif を見ていただくとイメージがしやすいと思います。
Qiita の容量制限を超えているのか Gif が貼れなかった…

図にすると以下のものです。

kafka-advent-calendar-spring-sample-archtecture.jpg
    • order producer(Spring Boot) がWebリクエストを受け付けて、 topic(order-events) にイベントを発行

 

    • order notification(Spring Boot) がイベントを受け取ってメールを送信

コンテナ起動している mailhog にて受信したメールを確認できる

order rdb(Sring Boot) がイベントを受け取って MySQL に保存

という構成です。

    • producer 側のアプリケーションはイベントを発行するところまでしか気にしない

 

    consumer はイベントを受け取って各々の処理を行う

というイベント駆動型アーキテクチャをシンプルですが実現しました。
(他にもオブジェクトのまま NoSQL に保存する consumer があってもいいかも)


では order producer の実装を見ていきます。

topic の作成は Configuration クラスにより行っています。

@Configuration
public class AutoCreateConfig {

    // if you create manual
    // $ ./bin/kafka-topics.sh --create --topic order-events --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3
    @Bean
    public NewTopic orderEvents(){
        return TopicBuilder.name("order-events")
                .partitions(3)
                .replicas(3)
                .build();
    }
}

controller がリクエストを受け付けて、

@RestController
@RequestMapping("/order-events")
@Slf4j
@AllArgsConstructor
public class OrderEventsController {

    private OrderEventProducer orderEventProducer;

    @PostMapping
    public ResponseEntity<OrderEvent> postOrderEvent(@RequestBody @Valid OrderEvent orderEvent)
            throws JsonProcessingException {

        log.info("invoke kafka producer");
        orderEvent.setOrderEventType(OrderEventType.NEW);
        orderEventProducer.sendOrderEvent(orderEvent);

        return ResponseEntity.status(HttpStatus.CREATED).body(orderEvent);
    }

kafkaTemplate を使用して topic に対して非同期で message を送信します。

    public ListenableFuture<SendResult<Integer, String>> sendOrderEvent(OrderEvent orderEvent) throws JsonProcessingException {
        Integer key = orderEvent.getOrderEventId();
        String value = objectMapper.writeValueAsString(orderEvent);
        ListenableFuture<SendResult<Integer, String>> listenableFuture = kafkaTemplate.send(TOPIC, key, value);

        listenableFuture.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onFailure(Throwable ex) {
                handleFailure(key, value, ex);
            }

            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                handleSuccess(key, value, result);
            }
        });
        return listenableFuture;
    }

イベントを配るだけなのでシンプルな実装になっています。

application.yaml は接続先の Kafka Cluster の情報やシリアライズ、デシリアライズの設定を記載します。
admin の設定は上記の topic の生成で必要なものです。

spring:
  kafka:
    template:
      default-topic: "order-events"
    producer:
      bootstrap-servers:
        - localhost:9092
        - localhost:9093
        - localhost:9094
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    admin:
      properties:
        bootstrap.servers: localhost:9092,localhost:9093,localhost:9094

次に order notification の実装を見ていきます。

こちらの application.yaml は consumer として接続先の Kafka Cluster の情報を記載しています。
また、メールの送信先の情報も記載しています。
(今回はコンテナ起動した mailhog を使用していますが、 mailtrap のようなクラウドサービスを使ってもいいかもです)

group-id: order-notification-events-group で指定してるのは consumer group です。

spring:
  kafka:
    consumer:
      bootstrap-servers:
        - localhost:9092
        - localhost:9093
        - localhost:9094
      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: order-notification-events-group
  mail:
    # if you use mailtrap instead of mailhog
    # host: smtp.mailtrap.io
    host: localhost
    port: 1025
    # username: dummy
    # password: dummy
    protocol: smtp

@KafkaListener で topic を指定することで、 event を受け取って処理することができます。

@Component
@Slf4j
@AllArgsConstructor
public class OrderEventsConsumer {

    private final OrderEventsService orderEventsService;

    @KafkaListener(topics = {"order-events"})
    public void onMessage(ConsumerRecord<Integer, String> consumerRecord) throws JsonProcessingException {
        log.info("ConsumerRecord: {} ", consumerRecord);
        orderEventsService.processOrderEvent(consumerRecord);
    }
}

テンプレートエンジンである Thymeleaf で、
event として受け取ったオブジェクトを本文に含めた HTML 形式のメールを送信します。

    private void sendMail(NotificationEmail notificationEmail) {
        MimeMessagePreparator messagePreparer = mimeMessage -> {
            MimeMessageHelper helper = new MimeMessageHelper(mimeMessage, "UTF-8");
            helper.setFrom("kafka-spring-sample@example.com");
            helper.setTo(notificationEmail.getRecipient());
            helper.setSubject(notificationEmail.getSubject());
            helper.setText(build(notificationEmail.getBody()), true);
        };
        try {
            mailSender.send(messagePreparer);
            log.info("Email sent!!");
        } catch (MailException e) {
            log.error("Exception occurred when sending mail", e);
            throw new RuntimeException("Exception occurred when sending mail to " + notificationEmail.getRecipient(), e);
        }
    }

最後に order rdb の実装を見ていきます。

application.yaml は order notification と同じ Kafka に関する設定に加えて DB 周りの設定を記載しています。

order notification とは別の consumer group の group-id: order-rdb-events-group が指定されていることがポイントです。

CLI で確認したように 同じ topic から consumer が別の処理をするため(同じイベントを複数のシステムが受け取るため)に、別のIDが指定 されています。

spring:
  kafka:
    consumer:
      bootstrap-servers:
        - localhost:9092
        - localhost:9093
        - localhost:9094
      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: order-rdb-events-group
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/kafka-spring-sample?createDatabaseIfNotExist=true
    username: root
    password: 1qazxsw2
  jpa:
    show-sql: true
    properties:
      hibernate:
        dialect: org.hibernate.dialect.MySQL8Dialect
        format_sql: true
    hibernate:
      ddl-auto: update
# パラメータの値をログに表示 ex.) binding parameter [1] as [INTEGER] - [1]
# 2021-11-21 00:55:11.604 TRACE 11400 --- [ main] o.h.type.descriptor.sql.BasicBinder      : binding parameter [1] as [BIGINT] - [1]
logging.level.org.hibernate.type.descriptor.sql.BasicBinder: trace

topic を指定してイベントを受け取る処理は同じです。

@Component
@Slf4j
@AllArgsConstructor
public class OrderEventsConsumer {

    private final OrderEventsService orderEventsService;

    @KafkaListener(topics = {"order-events"})
    public void onMessage(ConsumerRecord<Integer, String> consumerRecord) throws JsonProcessingException {
        log.info("ConsumerRecord: {} ", consumerRecord);
        orderEventsService.processOrderEvent(consumerRecord);
    }
}

あとは受け取ったイベントのモデルで MySQL に保存しています。

    private void save(OrderEvent orderEvent) {
        orderEvent.getOrder().setOrderEvent(orderEvent);
        orderEventsRepository.save(orderEvent);
        log.info("Successfully Persisted the Order Event {}", orderEvent);
    }

では、 HTTP リクエストしてみましょう。
※サンプルコード(KiyotaTakeshi/kafka-spring-sample)に postman のコレクションを配置しています

もろもろ必要なコンテナを起動します。

$ docker compose up -d

$ docker compose ps                                                             

NAME                COMMAND                  SERVICE             STATUS              PORTS
order-kafka1        "/opt/bitnami/script…"   kafka1              running             0.0.0.0:9092->9092/tcp
order-kafka2        "/opt/bitnami/script…"   kafka2              running             0.0.0.0:9093->9093/tcp
order-kafka3        "/opt/bitnami/script…"   kafka3              running             0.0.0.0:9094->9094/tcp
order-mailhog       "MailHog"                mailhog             running             0.0.0.0:1025->1025/tcp, 0.0.0.0:8025->8025/tcp
order-mysql         "docker-entrypoint.s…"   mysql               running             0.0.0.0:3306->3306/tcp
order-zookeeper     "/opt/bitnami/script…"   zookeeper           running             0.0.0.0:2181->2181/tcp

root project でビルドして executable jar を生成します。
IDE で起動しても問題ないです!

$ export JAVA_HOME=`/usr/libexec/java_home -v 11`

$ java -version                                                                
openjdk version "11.0.11" 2021-04-20 LTS
OpenJDK Runtime Environment Corretto-11.0.11.9.1 (build 11.0.11+9-LTS)
OpenJDK 64-Bit Server VM Corretto-11.0.11.9.1 (build 11.0.11+9-LTS, mixed mode)

$ ./gradlew clean build

$ ls -l order-notification-consumer/build/libs/order-notification-consumer-0.0.1-SNAPSHOT.jar

$ ls -l order-producer/build/libs/order-producer-0.0.1-SNAPSHOT.jar

$ ls -l order-rdb-consumer/build/libs/order-rdb-consumer-0.0.1-SNAPSHOT.jar

topic を作成するため、 order producer から起動します。

$ java -jar order-producer/build/libs/order-producer-0.0.1-SNAPSHOT.jar

そして order notification, order rdb を起動すると準備完了です。

$ java -jar order-notification-consumer/build/libs/order-notification-consumer-0.0.1-SNAPSHOT.jar

$ java -jar order-rdb-consumer/build/libs/order-rdb-consumer-0.0.1-SNAPSHOT.jar 

HTTP リクエストをすると、

$ curl --location --request POST 'http://localhost:8080/order-events' \
--header 'Content-Type: application/json' \
--data-raw '{
    "orderEventId": null,
    "order": {
        "id": 43333,
        "name": "チョコモナカジャンボ",
        "price": 145
    }
}'

DB にレコードが保存され、

select * from orders;

/*
+-----+----------+-----+--------------+
|id   |name      |price|order_event_id|
+-----+----------+-----+--------------+
|43333|チョコモナカジャンボ|145  |1             |
+-----+----------+-----+--------------+
*/

チョコモナカジャンボ の購入を受け付けたメールが届きました。

kafka-advent-calendar-mailhog.png

イベント駆動型アーキテクチャで複数のアプリが各々の解釈でイベントを処理するイメージが掴めましたね。


では consumer を複数起動してみましょう。複数のシェルで executable jar を実行してみます。
そうすると興味深いログが出力されます。

order-notification-events-group: partitions assigned: [order-events-0] から、
consumer がどの partition と疎通するかがわかります。

# シェル1
$ java -jar order-notification-consumer/build/libs/order-notification-consumer-0.0.1-SNAPSHOT.jar

2021-12-15 18:49:59.734  INFO 70648 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : order-notification-events-group: partitions assigned: [order-events-2]
# シェル2
$ java -jar order-notification-consumer/build/libs/order-notification-consumer-0.0.1-SNAPSHOT.jar

2021-12-15 18:49:59.732  INFO 70918 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : order-notification-events-group: partitions assigned: [order-events-1]
# シェル3
$ java -jar order-notification-consumer/build/libs/order-notification-consumer-0.0.1-SNAPSHOT.jar

2021-12-15 18:49:59.740  INFO 71159 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : order-notification-events-group: partitions assigned: [order-events-0]

この状態でリクエストしてみても、consumer group が同じ consumer を複数起動しているだけなので、
event は1回しか処理されず、バニラモナカジャンボの購入を受け付けたメールも一度しか送信しません。

2021-12-15 18:53:44.635  INFO 70918 --- [ntainer#0-0-C-1] c.k.o.service.OrderEventsService         : orderEvent: OrderEvent(orderEventId=null, orderEventType=NEW, order=Order(id=41111, name=バニラモナカジャンボ, price=145))
2021-12-15 18:53:45.087  INFO 70918 --- [ntainer#0-0-C-1] c.k.o.service.OrderEventsService         : Email sent!!

consumer が1台ダウンしてしまった仮定して、プロセスを1つ止めてみます。
すると、 (Re-)joining group が走って、起動している consumer で partition が割り振られます。

# シェル1
2021-12-15 18:49:17.045  INFO 70648 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-order-notification-events-group-1, groupId=order-notification-events-group] (Re-)joining group

2021-12-15 18:57:54.080  INFO 70648 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : order-notification-events-group: partitions assigned: [order-events-2]
# シェル2
2021-12-15 18:57:54.078  INFO 71159 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : order-notification-events-group: partitions assigned: [order-events-1, order-events-0]

consumer として実装するアプリケーションを水平にスケールすることで、可用性を高める事ができます。
(それぞれの partition に均等にイベントが配信されるのであれば、 partition の数まで consumer を増やせば処理の並列度も上がりシステム全体のスループットも向上すると思います)


最後に

気になっていたイベント駆動アーキテクチャについて、動くものを作ったことで理解が進みました。(知らないものに対する恐怖心が和らぎました)

一方で、今回のメール送信にあたる「通知サービス」のように、
サービスを分割する粒度や設計方法については学習と経験を積む必要があると感じました。

また、 Kafka は broker, consumer などの各コンポーネントのスケーラビリティが高いことがわかりましたが、
イベント駆動アーキテクチャを実現するための技術として、ユースケースに合わせて適切な技術を選んでいくことも必要だと感じました。


参考

    • Kafka: The Definitive Guide

 

    • KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum

 

    • 「Kafka Summit 2020」開催!ストレージ階層化,ZooKeeperフリー,クラウドネイティブ ―次の10年に向けて進化を続けるKafkaのいま

 

    • Exactly-Once Semantics Are Possible: Here’s How Kafka Does It

 

    • Apache Kafka for Developers using Spring Boot[LatestEdition]

 

    • Spring for Apache Kafka 公式ドキュメント

 

    • spring.pleiades.io による Spring for Apache Kafka の日本語ドキュメント

 

    Using ThymeLeaf and FreeMarker Emails Templates with Spring

ソースコード

    • KiyotaTakeshi/kafka-playground

 

    KiyotaTakeshi/kafka-spring-sample