在 Docker 中运行 Kafka 并进行恢复治疗

因为我的同事遇到困难,所以我决定提供支持,同时也决定重新开始使用Docker。我打算通过学习Kafka的视频来进行康复,并最终写出在k8s上运行的helm chart。

Screen Shot 2019-02-24 at 10.44.19 PM.png

最近一直在编写C#代码,因此有一段时间没有接触它了,我试着运行一下它来回忆起来。Kafka是与Zookeeper配合使用的消息传递机制,类似于Azure中的EventHub,似乎是由Scala编写的,可以进行精细控制。最近EventHub具备了与Kafka的接口功能,因此最终想将它们连接起来,使Azure的服务更易于使用。

想起 Docker

我最初搜索了图像,但令人惊讶的是没有官方图像。虽然有Spotify等非官方的图像,但没有官方的。这次我想尝试使用对Kafka有很大贡献的Confluent公司的图像。

    • Docker Configuration Parameters

 

    cp-docker-images/debian/kafka/Dockerfile

在DockerHub查看时,很难找到与Dockerfile的链接,但在上述链接中可以找到。由于需要设置许多参数,所以直接查看confluent的页面比较好(上述链接)。

启动 Zookeeper

这很简单,就像下面这样。kafka 依赖于 Zookeeper。就是上面 Confluent 页面中提到的内容。

docker run -d \
--net=host \
--name=zookeeper \
-e ZOOKEEPER_CLIENT_PORT=32181 \
-e ZOOKEEPER_TICK_TIME=2000 \
-e ZOOKEEPER_SYNC_LIMIT=2 \
confluentinc/cp-zookeeper:5.1.2

确认设置后,可以想象到 TICK_TIME 和 SYNC_LIMIT,但不清楚它们的含义。TICK_TIME 是一次 “Tick” 的时间间隔,SYNC_LIMIT 是 ZooKeeper 中从跟随者同步到领导者的间隔时间,以 Tick 为单位。也就是说,每次 Tick 间隔为 4000 毫秒。更详细信息请参阅此处。

    ZooKeeper Administrator’s Guide

启动kafka

这也是保持不变的confluent。

docker run -d \
    --net=host \
    --name=kafka \
    -e KAFKA_ZOOKEEPER_CONNECT=localhost:32181 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:29092 \
    -e KAFKA_BROKER_ID=2 \
    -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
    confluentinc/cp-kafka:5.1.2

我可以想像得差不多。经纪人的ID是2,复制因子为1,所以是单一配置。还需要指定监听器的端口和Zookeeper的地址。

创建一个主题

因为我完全不了解卡夫卡,所以我会按照这个教程来学习。

    kafka Quick start

要正常安装真的太麻烦了(必须安装Scala之类的),所以我想尽量使用Docker解决问题。

$ docker run -it --network host confluentinc/cp-kafka:5.1.2 /bin/bash

我在交互模式下启动。将网络设置为主机(host)是因为我想从容器中引用主机计算机的端口。在上述的docker run示例中也是如此。虽然这有点懒,但对于我在本地执行和尝试很方便。顺便一提,关于网络设置中桥接(bridge)和主机(host)的区别,有个StackOverflow的人详细解释了。

    From inside of a Docker container, how do I connect to the localhost of the machine?

首先创建一个主题。在cp-kafka的图像中,使用了上述QuickStart中使用的shell,该shell被放置在/usr/bin下。尝试执行它。简单地设置复制因子为1,分区数和主题名为test。

root@linuxkit-025000000001:/# cd /usr/bin
root@linuxkit-025000000001:/usr/bin# ./kafka-topics --create --zookeeper localhost:32181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

通过制作人发送消息

消费者本来是用Java或Scala来编写的,但是由于已经有样本被创建了,所以我打算尝试运行它。我将在刚刚创建的容器中直接运行它。

root@linuxkit-025000000001:/usr/bin# ./kafka-console-producer --broker-list localhost:29092 --topic test
>This is message
>This is another message
>

这样一来,有两条消息被发送到主题上了。虽然还没有被读取,光标仍然停留在最开始的位置。端口已根据上述设定进行了调整。

启动消费者

由于安装过程麻烦,照例使用Docker来运行。

$ docker run -it --network host confluentinc/cp-kafka:5.1.2 /bin/bash
root@linuxkit-025000000001:/#  cd /usr/bin
root@linuxkit-025000000001:/usr/bin# kafka-console-consumer --bootstrap-server localhost:29092 --topic test --from-beginning
This is message
This is another message

消息已被完全消费。默认情况下,读取操作处于自动提交模式,会自动提交。

阅读代码

让我们来看一下生产者和消费者的代码,它们可能很简单。只需要读取控制台并向KafkaProducer对象发送Send消息。很简单。顺便提一句,我们已经通过关闭挂钩设置了关闭。

ConsoleProducer.scala 控制台生产者程序

  def main(args: Array[String]): Unit = {

    try {
        val config = new ProducerConfig(args)
        val reader = Class.forName(config.readerClass).getDeclaredConstructor().newInstance().asInstanceOf[MessageReader]
        reader.init(System.in, getReaderProps(config))

        val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))

        Runtime.getRuntime.addShutdownHook(new Thread() {
          override def run() {
            producer.close()
          }
        })

        var record: ProducerRecord[Array[Byte], Array[Byte]] = null
        do {
          record = reader.readMessage()
          if (record != null)
            send(producer, record, config.sync)
        } while (record != null)
    } catch {
      case e: joptsimple.OptionException =>
        System.err.println(e.getMessage)
        Exit.exit(1)
      case e: Exception =>
        e.printStackTrace
        Exit.exit(1)
    }
    Exit.exit(0)
  }

    kafka/core/src/main/scala/kafka/tools/ConsoleProducer.scala

消费者一般来说会通过消费者对象接收消息,但需要自己循环进行轮询。它只是通过接收方法接收消息并输出。顺便说一下,看这个类的配置感觉相当复杂。为什么一个这么简单的东西要设定这么多参数呢?

ConsoleConsumer.scala:控制台消费者

  def process(maxMessages: Integer, formatter: MessageFormatter, consumer: ConsumerWrapper, output: PrintStream,
              skipMessageOnError: Boolean) {
    while (messageCount < maxMessages || maxMessages == -1) {
      val msg: ConsumerRecord[Array[Byte], Array[Byte]] = try {
        consumer.receive()
      } catch {
        case _: WakeupException =>
          trace("Caught WakeupException because consumer is shutdown, ignore and terminate.")
          // Consumer will be closed
          return
        case e: Throwable =>
          error("Error processing message, terminating consumer process: ", e)
          // Consumer will be closed
          return
      }
      messageCount += 1
      try {
        formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
                                             msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers), output)
      } catch {
        case e: Throwable =>
          if (skipMessageOnError) {
            error("Error processing message, skipping this message: ", e)
          } else {
            // Consumer will be closed
            throw e
          }
      }
      if (checkErr(output, formatter)) {
        // Consumer will be closed
        return
      }
    }
  }
    kafka/core/src/main/scala/kafka/tools/ConsoleConsumer.scala

总结

用这种方式,可以简单地尝试Hello World的水平。接下来,应该用AKS来运行,并考虑将其转化为Helm图表。

广告
将在 10 秒后关闭
bannerAds