我在本地的Docker环境中轻松地运行了Kafka 第5次尝试
简要说明
受到 Kafka 的吸引,只好先在手头的电脑上试试运行… 于是只懂得基础设施的软件工程师购买了一台新的 MacBookPro,并参考了先辈们在 Qiita 等平台上发布的文章,将验证步骤分成几个部分进行记录。关于 Kafka 的概述,请参考这篇文章。

运行环境
macOS 大 Sur 11.1
Docker 版本 20.10.2,构建 2291f61
Python 3.8.3
创建Consumer容器
为了接收从主题-11中抽取的数据,请创建一个新的Consumer容器。
用于Python程序运行的容器创建的目录结构如下所示。
$ tree
.
├── Dockerfile
├── docker-compose.yml
├── opt
│ └── IoTTopicData-v1.py
└── requirements.txt
以下就是docker-compose.yml文件。
由于是本地的Docker环境,所以Python程序(IoTTopicData-v1.py)没有使用COPY命令,而是使用了volumes选项。
version: '3'
services:
iot:
build: .
working_dir: '/app/'
tty: true
volumes:
- ./opt:/app/opt
networks:
default:
external:
name: iot_network
以下是DockerFile的内容。
最后一行的”requirements.txt”将在Python程序中定义所需的函数。
FROM python:3.7.5-slim
USER root
RUN apt-get update
RUN apt-get -y install locales && localedef -f UTF-8 -i ja_JP ja_JP.UTF-8
ENV LANG ja_JP.UTF-8
ENV LANGUAGE ja_JP:ja
ENV LC_ALL ja_JP.UTF-8
ENV TZ JST-9
ENV TERM xterm
RUN apt-get install -y vim less
RUN pip install --upgrade pip
RUN pip install --upgrade setuptools
RUN pip install -r requirements.txt
以下是 requirements.txt 文件。
它定义了在 Python 程序中导入所需的函数。
kafka-python
创建和确认Consumer容器
构建并运行定义的容器。
$ docker-compose up -d
前略
Creating iottopicdata_ktp_1 ... done
我们将确认启动。
$ docker-compose ps
Name Command State Ports
--------------------------------------------
iottopicdata_ktp_1 python3 Up
我将检查所有正在运行的容器。
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
c23123e17068 iottopicdata_ktp "python3" About a minute ago Up About a minute iottopicdata_ktp_1
35620515e9f1 confluentinc/cp-ksql-cli:5.4.3 "/bin/sh" 22 hours ago Up 22 hours ksql-cli
f46362cbcd5c confluentinc/cp-ksql-server:5.4.3 "/etc/confluent/dock…" 22 hours ago Up 22 hours (healthy) 0.0.0.0:8088->8088/tcp ksql-server
4e4f79c219e1 iotsampledata_iot "python3" 24 hours ago Up 24 hours iotsampledata_iot_1
37e2e1f360f5 confluentinc/cp-kafka:5.5.1 "/bin/sh" 26 hours ago Up 26 hours 9092/tcp cli
78d0a02910fe confluentinc/cp-kafka:5.5.1 "/etc/confluent/dock…" 26 hours ago Up 26 hours 0.0.0.0:9092->9092/tcp, 0.0.0.0:29092->29092/tcp broker
ad55284f0174 confluentinc/cp-zookeeper:5.5.1 "/etc/confluent/dock…" 26 hours ago Up 26 hours 2181/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32181->32181/tcp zookeeper
运行在消费者端的程序
接收抽取的数据的程序如下所示。
import json
import time
import argparse
import pprint
from datetime import datetime
from kafka import KafkaProducer
from kafka import KafkaConsumer
# ターミナル出力用
def topic_to_tm(consumer):
print('ターミナル 出力')
# Read data from kafka
try :
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
except KeyboardInterrupt :
print('\r\n Output to Terminal - interrupted!')
return
# Kafka Topic からのデータ受取
def get_kafka_topic():
# Initialize consumer variable and set property for JSON decode
consumer = KafkaConsumer ('topic-11',
bootstrap_servers = ['broker:29092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
print(consumer)
return consumer
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='KafkaからのIoT機器のなんちゃってStreamデータの取得')
parser.add_argument('--mode', type=str, default='tm', help='tm(ターミナル出力)')
args = parser.parse_args()
start = time.time()
consumer = get_kafka_topic()
topic_to_tm(consumer)
making_time = time.time() - start
print("")
print("Streamデータ取得待機時間:{0}".format(making_time) + " [sec]")
print("")
在消费者端接收消息的设置
为了接收数据,连接到Consumer并进入程序所在的目录。
$ docker exec -it iottopicdata_ktp_1 /bin/bash
root@c23123e17068:/app#
root@c23123e17068:/app# cd opt
root@c23123e17068:/app/opt#
执行IoTTopicData-v1.py文件,并设置接收提取数据的配置。
root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode tm
<kafka.consumer.group.KafkaConsumer object at 0x7f7494085650>
ターミナル 出力
↑ 目前屏幕上没有任何提示信息,但正在等待接收来自生产者的消息。
向制片人发送的消息
为了发送数据,需要打开一个新的终端窗口并连接到Producer,然后切换到程序所在的目录。
$ docker exec -it iotsampledata_iot_1 /bin/bash
root@4e4f79c219e1:/app#
root@4e4f79c219e1:/app# cd opt
root@4e4f79c219e1:/app/opt#
运行IoTSampleData-v2.py,并发送生成的数据(30个)。
root@4e4f79c219e1:/app/opt# python IoTSampleData-v2.py --mode kf --count 30
Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7fa4a03cf650>
データ作成件数:30
データ作成時間:0.12619686126708984 [sec]
消费者的提示显示了从生产者发送的数据。
root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode tm
<kafka.consumer.group.KafkaConsumer object at 0x7f7494085650>
ターミナル 出力
topic-11:0:0: key=b'2021/02/05' value={'SECTION': 'W', 'TIME': '2021-02-05T15:34:52.630342', 'PROC': '111', 'IOT_NUM': '268-8968', 'IOT_STATE': '愛知県', 'VOL_1': 113.19269863668802, 'VOL_2': 50.74199787037559}
topic-11:0:1: key=b'2021/02/05' value={'SECTION': 'E', 'TIME': '2021-02-05T15:34:52.630686', 'PROC': '111', 'IOT_NUM': '539-2454', 'IOT_STATE': '広島県', 'VOL_1': 150.6519642589583, 'VOL_2': 50.299823891383774}
通过Producer上的Python程序生成的数据经过topic-01 → Ksql → topic-11的处理后,我们确认可以在Consumer上的Python程序中接收到。
关于下一次
下一步(第6步)我们将使用Kafka的KSQL对流式抽取的数据进行处理,并通过消费者将其写入S3,以此确认。
第1回:在本地的Docker环境中运行Kafka的基本组件
第2回:确认由Kafka的Producer通过Broker发送的消息可以通过Consumer接收
第3回:确认由Producer上的Python程序生成的数据可以通过Broker传递到Consumer进行接收
第4回:确认Producer上生成的数据通过topic-01传递到KSQL(topic01_stream 1 → topic01_stream2)并进行流抽取处理
第5回:确认Producer上生成的数据通过topic-01 → Ksql → topic-11传递到Consumer上的Python程序进行接收
第6回:确认Producer上生成的数据通过topic-01 → Ksql → topic-11传递到Consumer上的Python程序,并能够将数据写入S3
第7回:确认在两个Producer容器上生成的各自数据经过topic-01 → Ksql → topic-11传递到Consumer上的Python程序可以进行接收
请提供有关此问题的信息。
或
请提供相关参考资料。
我已参考以下信息。非常感谢。
Kafka的Docker教程
使用Docker轻松构建Kafka到KSQL的环境。