我在本地的Docker环境中轻松地运行了Kafka 第5次尝试

简要说明

受到 Kafka 的吸引,只好先在手头的电脑上试试运行… 于是只懂得基础设施的软件工程师购买了一台新的 MacBookPro,并参考了先辈们在 Qiita 等平台上发布的文章,将验证步骤分成几个部分进行记录。关于 Kafka 的概述,请参考这篇文章。

Kafka-5.png

运行环境

macOS 大 Sur 11.1
Docker 版本 20.10.2,构建 2291f61
Python 3.8.3

创建Consumer容器

为了接收从主题-11中抽取的数据,请创建一个新的Consumer容器。
用于Python程序运行的容器创建的目录结构如下所示。

$ tree
.
├── Dockerfile
├── docker-compose.yml
├── opt
│   └── IoTTopicData-v1.py
└── requirements.txt

以下就是docker-compose.yml文件。
由于是本地的Docker环境,所以Python程序(IoTTopicData-v1.py)没有使用COPY命令,而是使用了volumes选项。

version: '3'
services:
  iot:
    build: .
    working_dir: '/app/'
    tty: true
    volumes:
      - ./opt:/app/opt

networks:
  default:
    external:
      name: iot_network

以下是DockerFile的内容。
最后一行的”requirements.txt”将在Python程序中定义所需的函数。

FROM python:3.7.5-slim
USER root

RUN apt-get update
RUN apt-get -y install locales && localedef -f UTF-8 -i ja_JP ja_JP.UTF-8

ENV LANG ja_JP.UTF-8
ENV LANGUAGE ja_JP:ja
ENV LC_ALL ja_JP.UTF-8
ENV TZ JST-9
ENV TERM xterm

RUN apt-get install -y vim less
RUN pip install --upgrade pip
RUN pip install --upgrade setuptools
RUN pip install -r requirements.txt

以下是 requirements.txt 文件。
它定义了在 Python 程序中导入所需的函数。

kafka-python

创建和确认Consumer容器

构建并运行定义的容器。

$ docker-compose up -d
    前略
Creating iottopicdata_ktp_1 ... done

我们将确认启动。

$ docker-compose ps
       Name          Command   State   Ports
--------------------------------------------
iottopicdata_ktp_1   python3   Up           

我将检查所有正在运行的容器。

$ docker ps
CONTAINER ID   IMAGE                               COMMAND                  CREATED              STATUS                  PORTS                                                    NAMES
c23123e17068   iottopicdata_ktp                    "python3"                About a minute ago   Up About a minute                                                                iottopicdata_ktp_1
35620515e9f1   confluentinc/cp-ksql-cli:5.4.3      "/bin/sh"                22 hours ago         Up 22 hours                                                                      ksql-cli
f46362cbcd5c   confluentinc/cp-ksql-server:5.4.3   "/etc/confluent/dock…"   22 hours ago         Up 22 hours (healthy)   0.0.0.0:8088->8088/tcp                                   ksql-server
4e4f79c219e1   iotsampledata_iot                   "python3"                24 hours ago         Up 24 hours                                                                      iotsampledata_iot_1
37e2e1f360f5   confluentinc/cp-kafka:5.5.1         "/bin/sh"                26 hours ago         Up 26 hours             9092/tcp                                                 cli
78d0a02910fe   confluentinc/cp-kafka:5.5.1         "/etc/confluent/dock…"   26 hours ago         Up 26 hours             0.0.0.0:9092->9092/tcp, 0.0.0.0:29092->29092/tcp         broker
ad55284f0174   confluentinc/cp-zookeeper:5.5.1     "/etc/confluent/dock…"   26 hours ago         Up 26 hours             2181/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32181->32181/tcp   zookeeper

运行在消费者端的程序

接收抽取的数据的程序如下所示。

import json
import time
import argparse
import pprint
from datetime import datetime
from kafka import KafkaProducer
from kafka import KafkaConsumer

# ターミナル出力用
def topic_to_tm(consumer):
    print('ターミナル 出力')

    # Read data from kafka
    try :
        for message in consumer:
            print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                                message.offset, message.key,
                                                message.value))
    except KeyboardInterrupt :
        print('\r\n Output to Terminal - interrupted!')
        return

# Kafka Topic からのデータ受取
def get_kafka_topic():
    # Initialize consumer variable and set property for JSON decode
    consumer = KafkaConsumer ('topic-11',
                        bootstrap_servers = ['broker:29092'],
                        value_deserializer=lambda m: json.loads(m.decode('utf-8')))

    print(consumer)
    return consumer


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='KafkaからのIoT機器のなんちゃってStreamデータの取得')
    parser.add_argument('--mode', type=str, default='tm', help='tm(ターミナル出力)')
    args = parser.parse_args()

    start = time.time()

    consumer = get_kafka_topic()
    topic_to_tm(consumer)

    making_time = time.time() - start

    print("")
    print("Streamデータ取得待機時間:{0}".format(making_time) + " [sec]")
    print("")

在消费者端接收消息的设置

为了接收数据,连接到Consumer并进入程序所在的目录。

$ docker exec -it iottopicdata_ktp_1 /bin/bash
root@c23123e17068:/app#
root@c23123e17068:/app# cd opt
root@c23123e17068:/app/opt#

执行IoTTopicData-v1.py文件,并设置接收提取数据的配置。

root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode tm
<kafka.consumer.group.KafkaConsumer object at 0x7f7494085650>
ターミナル 出力

↑ 目前屏幕上没有任何提示信息,但正在等待接收来自生产者的消息。

向制片人发送的消息

为了发送数据,需要打开一个新的终端窗口并连接到Producer,然后切换到程序所在的目录。

$ docker exec -it iotsampledata_iot_1 /bin/bash
root@4e4f79c219e1:/app#
root@4e4f79c219e1:/app# cd opt
root@4e4f79c219e1:/app/opt#

运行IoTSampleData-v2.py,并发送生成的数据(30个)。

root@4e4f79c219e1:/app/opt# python IoTSampleData-v2.py --mode kf --count 30
Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7fa4a03cf650>

データ作成件数:30
データ作成時間:0.12619686126708984 [sec]

消费者的提示显示了从生产者发送的数据。

root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode tm
<kafka.consumer.group.KafkaConsumer object at 0x7f7494085650>
ターミナル 出力
topic-11:0:0: key=b'2021/02/05' value={'SECTION': 'W', 'TIME': '2021-02-05T15:34:52.630342', 'PROC': '111', 'IOT_NUM': '268-8968', 'IOT_STATE': '愛知県', 'VOL_1': 113.19269863668802, 'VOL_2': 50.74199787037559}
topic-11:0:1: key=b'2021/02/05' value={'SECTION': 'E', 'TIME': '2021-02-05T15:34:52.630686', 'PROC': '111', 'IOT_NUM': '539-2454', 'IOT_STATE': '広島県', 'VOL_1': 150.6519642589583, 'VOL_2': 50.299823891383774}

通过Producer上的Python程序生成的数据经过topic-01 → Ksql → topic-11的处理后,我们确认可以在Consumer上的Python程序中接收到。

关于下一次

下一步(第6步)我们将使用Kafka的KSQL对流式抽取的数据进行处理,并通过消费者将其写入S3,以此确认。

第1回:在本地的Docker环境中运行Kafka的基本组件
第2回:确认由Kafka的Producer通过Broker发送的消息可以通过Consumer接收
第3回:确认由Producer上的Python程序生成的数据可以通过Broker传递到Consumer进行接收
第4回:确认Producer上生成的数据通过topic-01传递到KSQL(topic01_stream 1 → topic01_stream2)并进行流抽取处理
第5回:确认Producer上生成的数据通过topic-01 → Ksql → topic-11传递到Consumer上的Python程序进行接收
第6回:确认Producer上生成的数据通过topic-01 → Ksql → topic-11传递到Consumer上的Python程序,并能够将数据写入S3
第7回:确认在两个Producer容器上生成的各自数据经过topic-01 → Ksql → topic-11传递到Consumer上的Python程序可以进行接收

请提供有关此问题的信息。

请提供相关参考资料。

我已参考以下信息。非常感谢。
Kafka的Docker教程
使用Docker轻松构建Kafka到KSQL的环境。

广告
将在 10 秒后关闭
bannerAds