在 Docker 中运行 Kafka 并进行恢复治疗
因为我的同事遇到困难,所以我决定提供支持,同时也决定重新开始使用Docker。我打算通过学习Kafka的视频来进行康复,并最终写出在k8s上运行的helm chart。

最近一直在编写C#代码,因此有一段时间没有接触它了,我试着运行一下它来回忆起来。Kafka是与Zookeeper配合使用的消息传递机制,类似于Azure中的EventHub,似乎是由Scala编写的,可以进行精细控制。最近EventHub具备了与Kafka的接口功能,因此最终想将它们连接起来,使Azure的服务更易于使用。
想起 Docker
我最初搜索了图像,但令人惊讶的是没有官方图像。虽然有Spotify等非官方的图像,但没有官方的。这次我想尝试使用对Kafka有很大贡献的Confluent公司的图像。
-
- Docker Configuration Parameters
- cp-docker-images/debian/kafka/Dockerfile
在DockerHub查看时,很难找到与Dockerfile的链接,但在上述链接中可以找到。由于需要设置许多参数,所以直接查看confluent的页面比较好(上述链接)。
启动 Zookeeper
这很简单,就像下面这样。kafka 依赖于 Zookeeper。就是上面 Confluent 页面中提到的内容。
docker run -d \
--net=host \
--name=zookeeper \
-e ZOOKEEPER_CLIENT_PORT=32181 \
-e ZOOKEEPER_TICK_TIME=2000 \
-e ZOOKEEPER_SYNC_LIMIT=2 \
confluentinc/cp-zookeeper:5.1.2
确认设置后,可以想象到 TICK_TIME 和 SYNC_LIMIT,但不清楚它们的含义。TICK_TIME 是一次 “Tick” 的时间间隔,SYNC_LIMIT 是 ZooKeeper 中从跟随者同步到领导者的间隔时间,以 Tick 为单位。也就是说,每次 Tick 间隔为 4000 毫秒。更详细信息请参阅此处。
- ZooKeeper Administrator’s Guide
启动kafka
这也是保持不变的confluent。
docker run -d \
--net=host \
--name=kafka \
-e KAFKA_ZOOKEEPER_CONNECT=localhost:32181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:29092 \
-e KAFKA_BROKER_ID=2 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka:5.1.2
我可以想像得差不多。经纪人的ID是2,复制因子为1,所以是单一配置。还需要指定监听器的端口和Zookeeper的地址。
创建一个主题
因为我完全不了解卡夫卡,所以我会按照这个教程来学习。
- kafka Quick start
要正常安装真的太麻烦了(必须安装Scala之类的),所以我想尽量使用Docker解决问题。
$ docker run -it --network host confluentinc/cp-kafka:5.1.2 /bin/bash
我在交互模式下启动。将网络设置为主机(host)是因为我想从容器中引用主机计算机的端口。在上述的docker run示例中也是如此。虽然这有点懒,但对于我在本地执行和尝试很方便。顺便一提,关于网络设置中桥接(bridge)和主机(host)的区别,有个StackOverflow的人详细解释了。
- From inside of a Docker container, how do I connect to the localhost of the machine?
首先创建一个主题。在cp-kafka的图像中,使用了上述QuickStart中使用的shell,该shell被放置在/usr/bin下。尝试执行它。简单地设置复制因子为1,分区数和主题名为test。
root@linuxkit-025000000001:/# cd /usr/bin
root@linuxkit-025000000001:/usr/bin# ./kafka-topics --create --zookeeper localhost:32181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
通过制作人发送消息
消费者本来是用Java或Scala来编写的,但是由于已经有样本被创建了,所以我打算尝试运行它。我将在刚刚创建的容器中直接运行它。
root@linuxkit-025000000001:/usr/bin# ./kafka-console-producer --broker-list localhost:29092 --topic test
>This is message
>This is another message
>
这样一来,有两条消息被发送到主题上了。虽然还没有被读取,光标仍然停留在最开始的位置。端口已根据上述设定进行了调整。
启动消费者
由于安装过程麻烦,照例使用Docker来运行。
$ docker run -it --network host confluentinc/cp-kafka:5.1.2 /bin/bash
root@linuxkit-025000000001:/# cd /usr/bin
root@linuxkit-025000000001:/usr/bin# kafka-console-consumer --bootstrap-server localhost:29092 --topic test --from-beginning
This is message
This is another message
消息已被完全消费。默认情况下,读取操作处于自动提交模式,会自动提交。
阅读代码
让我们来看一下生产者和消费者的代码,它们可能很简单。只需要读取控制台并向KafkaProducer对象发送Send消息。很简单。顺便提一句,我们已经通过关闭挂钩设置了关闭。
ConsoleProducer.scala 控制台生产者程序
def main(args: Array[String]): Unit = {
try {
val config = new ProducerConfig(args)
val reader = Class.forName(config.readerClass).getDeclaredConstructor().newInstance().asInstanceOf[MessageReader]
reader.init(System.in, getReaderProps(config))
val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
producer.close()
}
})
var record: ProducerRecord[Array[Byte], Array[Byte]] = null
do {
record = reader.readMessage()
if (record != null)
send(producer, record, config.sync)
} while (record != null)
} catch {
case e: joptsimple.OptionException =>
System.err.println(e.getMessage)
Exit.exit(1)
case e: Exception =>
e.printStackTrace
Exit.exit(1)
}
Exit.exit(0)
}
- kafka/core/src/main/scala/kafka/tools/ConsoleProducer.scala
消费者一般来说会通过消费者对象接收消息,但需要自己循环进行轮询。它只是通过接收方法接收消息并输出。顺便说一下,看这个类的配置感觉相当复杂。为什么一个这么简单的东西要设定这么多参数呢?
ConsoleConsumer.scala:控制台消费者
def process(maxMessages: Integer, formatter: MessageFormatter, consumer: ConsumerWrapper, output: PrintStream,
skipMessageOnError: Boolean) {
while (messageCount < maxMessages || maxMessages == -1) {
val msg: ConsumerRecord[Array[Byte], Array[Byte]] = try {
consumer.receive()
} catch {
case _: WakeupException =>
trace("Caught WakeupException because consumer is shutdown, ignore and terminate.")
// Consumer will be closed
return
case e: Throwable =>
error("Error processing message, terminating consumer process: ", e)
// Consumer will be closed
return
}
messageCount += 1
try {
formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers), output)
} catch {
case e: Throwable =>
if (skipMessageOnError) {
error("Error processing message, skipping this message: ", e)
} else {
// Consumer will be closed
throw e
}
}
if (checkErr(output, formatter)) {
// Consumer will be closed
return
}
}
}
- kafka/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
总结
用这种方式,可以简单地尝试Hello World的水平。接下来,应该用AKS来运行,并考虑将其转化为Helm图表。