この記事について

Money Forward Engineering 2 Advent Calendar 2022 – Qiita の 14日目 の記事です。

はじめに

Kafka Producerのトランザクションを使用したメッセージング処理を作るときに、
ローカル環境では単一ブローカーで最小で動かしたかったのですが、
はじめ全然動かったのを動くようにしたので、その経験を放出します。

これ、みんな通らないの?と思う程度には情報が見つからなかったので、だいぶ困りましたw
※kafkaのコード以外はkotlinです

環境

    • kafka(docker image): confluentinc/cp-kafka:7.2.2

 

    • kotlin: 1.7系

 

    OS: M1 Mac

コード

実装コード

今回は簡単にやりたいので、文字列を送るだけのシンプルなコードを用意しました。
ロールバックの時に意図的にエラーを発生させたいので、
mockをセットできるようにコンストラクタ引数でKafkaProducerを受け付けるようにしています。

class TransactionalKafkaProducer(
    producer: KafkaProducer<Int, String>? = null
) {
    private var producer: KafkaProducer<Int, String>
    init {
        this.producer = producer ?: ProducerFactory.createInstance()
    }

    @Suppress("TooGenericExceptionCaught")
    fun messageSend(topic: String, messages: List<String>) {
        producer.use {
            try {
                it.initTransactions()
                it.beginTransaction()
                messages.forEach { message ->
                    it.send(ProducerRecord(
                        topic,
                        Math.random().toInt(),
                        message,
                    ))
                }
                it.commitTransaction()
            } catch (e: Exception) {
                it.abortTransaction()
                println(e.stackTraceToString())
                throw e
            }
        }
    }
}

テストコード

実際にレコードは取れないという確認をするために、
トピックを消費するようにしてます。
ちゃんと動くとメッセージは取れないです。

class TransactionalKafkaProducerTests: AnnotationSpec() {

    @BeforeClass
    fun setup() {
        val props = Properties()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:19092"
        val adminClient = Admin.create(props)
        adminClient.createTopics(listOf(
            NewTopic(TEST_TOPIC, 50, 1)
        ))
    }

    @Test
    fun confirmRollback() {

        val producer = spyk(ProducerFactory.createInstance())
        val messages = listOf("message1", "message2")
        val testTarget = TransactionalKafkaProducer(producer)
        every {
            producer.commitTransaction()
        } throws (TimeoutException("thrown by mock"))
        shouldThrow<TimeoutException> {
            testTarget.messageSend(TEST_TOPIC, messages)
        }

        val consumer = KafkaConsumerExecutor(listOf(TEST_TOPIC))
        val records = consumer.consumerRecord()
        records.count() shouldBe 0
    }
}

設定

    environment:
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_LISTENER:PLAINTEXT,LOCALHOST_LISTENER:PLAINTEXT
kafka cluster
      KAFKA_ADVERTISED_LISTENERS: DOCKER_LISTENER://kafka:9092,LOCALHOST_LISTENER://localhost:19092
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER_LISTENER
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
      # When kafka cluster is running with single broker, broker id becomes 0 by default. So setting it explicitly.
      KAFKA_BROKER_ID: 1
      # In local env, save consumer offset to only 1 replica
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

テストの実行〜解決まで

簡単に組み込めてとても楽だったなーと思ってテストを動かすと、全く動いている気配がない・・・
何が起きているのか、ログを確認します。

image.png

アプリ側:

[Producer clientId=producer-tx-0, transactionalId=tx-0] Sending transactional request FindCoordinatorRequestData(key='', keyType=1, coordinatorKeys=[tx-0]) to node localhost:19092 (id: 1 rack: null) with correlation ID 205
[Producer clientId=producer-tx-0, transactionalId=tx-0] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=producer-tx-0, correlationId=205) and timeout 30000 to node 1: FindCoordinatorRequestData(key='', keyType=1, coordinatorKeys=[tx-0])
[Producer clientId=producer-tx-0, transactionalId=tx-0] Received FIND_COORDINATOR response from node 1 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=producer-tx-0, correlationId=205): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='', nodeId=0, host='', port=0, coordinators=[Coordinator(key='tx-0', nodeId=-1, host='', port=-1, errorCode=15, errorMessage='')])

errorCode: 15は、COORDINATOR_NOT_AVAILABLEで、コーディネータが利用不可らしいですけど、これだけしか書いてないと意味不明です。
なので、kafka側のログも見てみます。

kafka側:

INFO [Admin Manager on Broker 1]: Error processing create topic request CreatableTopic(name='__transaction_state', numPartitions=50, replicationFactor=3, assignments=[], configs=[CreateableTopicConfig(name='compression.type', value='uncompressed'), CreateableTopicConfig(name='cleanup.policy', value='compact'), CreateableTopicConfig(name='min.insync.replicas', value='2'), CreateableTopicConfig(name='segment.bytes', value='104857600'), CreateableTopicConfig(name='unclean.leader.election.enable', value='false')]) (kafka.server.ZkAdminManager)
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.

kafkaトランザクションで使用する、内部的なトピックである「__transaction_state」の生成に失敗していることがわかります。
(わかります。と書いているけど、内部的なトピックを作ってるんだーへーって実際は思いましたw)
また、一番下に「Replication factor: 3 larger than available brokers: 1.」と書いてあり、
ブローカのノード数は3以上用意する必要がある、みたいなことが書いてあることがわかります。

そして、ネットを検索すると、トランザクションを使いたいならブローカーを3以上用意するのが必須という記述がチラホラ・・・。
でもCIでも動かしたいのに、ブローカー3つも立てたくないし、ローカル環境でブローカー1個で動かせないとか、開発者ライクじゃなさ過ぎるから、そんなことないでしょ。
と思ってkafka本体のコードを読むことに。
(Documentのどこかに書いてあるのかなぁと思いながら調べてはみたけど、見つけ出せなかった)

エラーメッセージを足がかりに設定を突き止める

「larger than available brokers」でコードを検索すると、以下のコードが見つかります。
3つ目の条件に引っかかってることがわかるので、
replicationFactorとbrokerMetadatasに、どこから何が渡されているのかを探します。

def assignReplicasToBrokers(brokerMetadatas: Iterable[BrokerMetadata],
                            nPartitions: Int,
                            replicationFactor: Int,
                            fixedStartIndex: Int = -1,
                            startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
  if (nPartitions <= 0)
    throw new InvalidPartitionsException("Number of partitions must be larger than 0.")
  if (replicationFactor <= 0)
    throw new InvalidReplicationFactorException("Replication factor must be larger than 0.")
  if (replicationFactor > brokerMetadatas.size)
    throw new InvalidReplicationFactorException(s"Replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}.")
  if (brokerMetadatas.forall(_.rack.isEmpty))
    assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
      startPartitionId)
  else {
    if (brokerMetadatas.exists(_.rack.isEmpty))
      throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.")
    assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
      startPartitionId)
  }
}

IntelliJの[Find Usage]を使って、呼び出し階層を遡っていくと、(遡り過程は省略)
以下の設定値が上記エラーが起きた各パラメータに渡されていることがわかります。
なので、このパラメータを書き換えればOKです。

    • transaction.state.log.replication.factor

 

    transaction.state.log.min.isr
val OffsetCommitRequiredAcksProp = "offsets.commit.required.acks"
/** ********* Transaction management configuration ***********/
val TransactionalIdExpirationMsProp = "transactional.id.expiration.ms"
val TransactionsMaxTimeoutMsProp = "transaction.max.timeout.ms"
val TransactionsTopicMinISRProp = "transaction.state.log.min.isr"
val TransactionsLoadBufferSizeProp = "transaction.state.log.load.buffer.size"
val TransactionsTopicPartitionsProp = "transaction.state.log.num.partitions"
val TransactionsTopicSegmentBytesProp = "transaction.state.log.segment.bytes"
val TransactionsTopicReplicationFactorProp = "transaction.state.log.replication.factor"
val TransactionsAbortTimedOutTransactionCleanupIntervalMsProp = "transaction.abort.timed.out.transaction.cleanup.interval.ms"
val TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp = "transaction.remove.expired.transaction.cleanup.interval.ms"

docker-compose.ymlに設定を追加する

docker-compose上に設定を書く場合は、

KAFKA_

に続けて、スネークケース & 大文字で設定値のキーを書いて、その値を設定すれば良いので以下のようになります。(下2行を追加)

    environment:
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_LISTENER:PLAINTEXT,LOCALHOST_LISTENER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: DOCKER_LISTENER://kafka:9092,LOCALHOST_LISTENER://localhost:19092
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER_LISTENER
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
      KAFKA_BROKER_ID: 1
      # In local env, save consumer offset to only 1 replica
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      # In local env, transaction's inner topic settings in kafka set to 1 only.
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 # <- add
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 # <- add

テストを実行

image.png

終わりに

kafka Producerのトランザクションを使用した開発を行いたい場合に、
単一ブローカーで簡単に動かせるようになりました。
ローカル環境でテストが簡単に行えるようになったので、自信を持って実行環境にデプロイできるようになりますね。

ドキュメントのこの辺を見ると、もっと簡単に辿り着けたよ、などあれば教えていただけると嬉しいです。

また、このページのデモコードは以下にあります。
https://github.com/sh-ogawa/kafka-demo

广告
将在 10 秒后关闭
bannerAds