我尝试将 IoT 数据经过「Confluent + InfluxDB + Grafana」的流处理进行可视化处理
步骤1: 在Docker容器环境下重新构建Confluent平台
简述
我使用基於Confluent Platform的「cp-all-in-one」來構建了本地的Docker容器環境,通過IoT數據生成的Python程序將數據發送到Confluent以進行流處理,並確認了可以在InfluxDB+Grafana中實時可視化。我正在使用相應的Sink Connector。


以下是使用三个步骤依次解释上述内容。本次我们将解释步骤1。
步骤1:在Docker容器环境中添加Confluent Platform构建。
步骤2:在InfluxDB中确认从Broker接收到的数据。
步骤3:在通过InfluxDB的Grafana中确认实时数据可视化。
本地环境
macOS Big Sur 11.3
python 3.8.3
Docker 版本 20.10.7,构建 f0df350(CPU:8,内存:10GB,交换分区:1GB)
切換至工作目錄。 (Qiehuan zhi
- 我們將在clontentinc/cp-all-in-one的目錄下的工作目錄中直接使用這個環境。
$ cd cp-all-in-one
$ cd cp-all-in-one
安装 InfluxDB Sink 连接器
- 为了在Confluent的Connector上安装”InfluxDB Sink Connector”,需要编辑DockerFile。
FROM confluentinc/cp-server-connect-base:6.0.0
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-rabbitmq:latest
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-influxdb:latest
2. 我们将构建 Connector 的 Docker 镜像。
$ docker build -t cp-connect-base .
重新定义docker-compose.yml
- 请按照以下方式编辑位于作业目录中的「docker-compose.yml」文件。
サービス名変更有無内容zookeeper無
broker無
schema-registry無
connect有使用イメージ変更control-center無
ksqldb-server無
ksqldb-cli無
rabbitmq無
influxdb有新規追加grafana有新規追加
broker無
schema-registry無
connect有使用イメージ変更control-center無
ksqldb-server無
ksqldb-cli無
rabbitmq無
influxdb有新規追加grafana有新規追加
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:6.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:6.0.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,OPTIONS'
connect:
image: cp-connect-base:latest
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.0.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.org.apache.kafka.connect.runtime.rest=WARN,reflections=ERROR
CONNECT_HOST: connect
control-center:
image: confluentinc/cp-enterprise-control-center:6.0.0
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksqldb-server:
image: confluentinc/cp-ksqldb-server:6.0.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
KSQL_AUTO_OFFSET_RESET: "latest"
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:6.0.0
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
rabbitmq:
image: rabbitmq:3.8.17-management
restart: always
ports:
- '5672:5672'
- '15672:15672'
hostname: rabbitmq
container_name: rabbitmq
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
influxdb:
image: influxdb:1.8.6
ports:
- 8086:8086
hostname: influxdb
container_name: influxdb
grafana:
image: grafana/grafana
ports:
- 3000:3000
hostname: grafana
container_name: grafana
environment:
- GF_SERVER_ROOT_URL=http://grafana:3000
- GF_INSTALL_PLUGINS=grafana-polystat-panel,bessler-pictureit-panel,marcuscalidus-svg-panel
- GF_SECURITY_ADMIN_PASSWORD=admin
depends_on:
- influxdb
启动 Confluent 平台
- 使用 -d 选项启动 Confluent Platform,并以分离模式运行。
$ docker-compose up -d
2. 我会确认服务是否正在运行中。
$ docker-compose ps
Name Command State Ports
------------------------------------------------------------------------------------------------------------------------
broker /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp,:::9092->9092/tcp,
0.0.0.0:9101->9101/tcp,:::9101->9101/tcp
connect /etc/confluent/docker/run Up 0.0.0.0:8083->8083/tcp,:::8083->8083/tcp, 9092/tcp
control-center /etc/confluent/docker/run Up 0.0.0.0:9021->9021/tcp,:::9021->9021/tcp
grafana /run.sh Up 0.0.0.0:3000->3000/tcp,:::3000->3000/tcp
influxdb /entrypoint.sh influxd Up 0.0.0.0:8086->8086/tcp,:::8086->8086/tcp
ksqldb-cli /bin/sh Up
ksqldb-server /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp,:::8088->8088/tcp
rabbitmq docker-entrypoint.sh rabbi ... Up 15671/tcp, 0.0.0.0:15672->15672/tcp,:::15672->15672/tcp,
15691/tcp, 15692/tcp, 25672/tcp, 4369/tcp, 5671/tcp,
0.0.0.0:5672->5672/tcp,:::5672->5672/tcp
schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp,:::8081->8081/tcp
zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp,:::2181->2181/tcp, 2888/tcp, 3888/tcp
3. 确认连接器也处于运行状态。
$ curl http://localhost:8083/connector-plugins | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 793 100 793 0 0 14962 0 --:--:-- --:--:-- --:--:-- 14962
[
{
"class": "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"type": "source",
"version": "0.0.0.0"
},
{
"class": "io.confluent.influxdb.InfluxDBSinkConnector",
"type": "sink",
"version": "unknown"
},
{
"class": "io.confluent.influxdb.source.InfluxdbSourceConnector",
"type": "source",
"version": "1.2.1"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "6.0.0-ce"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "6.0.0-ce"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
通过这个步骤,在Docker容器环境下成功确认了Confluent Platform的运行。接下来,我们将发送来自IoT数据生成程序的数据,通过Broker的主题,确认数据能够通过InfluxDB进行接收。
本课题的步骤信息 de
步骤1:在Docker容器环境中重新建立Confluent Platform
步骤2:确认从Broker接收到InfluxDB的数据
步骤3:通过InfluxDB传输给Grafana并确认实时数据可视化