我尝试将 IoT 数据经过「Confluent + InfluxDB + Grafana」的流处理进行可视化处理

步骤1: 在Docker容器环境下重新构建Confluent平台

简述

我使用基於Confluent Platform的「cp-all-in-one」來構建了本地的Docker容器環境,通過IoT數據生成的Python程序將數據發送到Confluent以進行流處理,並確認了可以在InfluxDB+Grafana中實時可視化。我正在使用相應的Sink Connector。

image.png
image.png

以下是使用三个步骤依次解释上述内容。本次我们将解释步骤1。
步骤1:在Docker容器环境中添加Confluent Platform构建。
步骤2:在InfluxDB中确认从Broker接收到的数据。
步骤3:在通过InfluxDB的Grafana中确认实时数据可视化。

本地环境

macOS Big Sur 11.3
python 3.8.3
Docker 版本 20.10.7,构建 f0df350(CPU:8,内存:10GB,交换分区:1GB)

切換至工作目錄。 (Qiehuan zhi

    我們將在clontentinc/cp-all-in-one的目錄下的工作目錄中直接使用這個環境。
$ cd cp-all-in-one
$ cd cp-all-in-one

安装 InfluxDB Sink 连接器

    为了在Confluent的Connector上安装”InfluxDB Sink Connector”,需要编辑DockerFile。
FROM confluentinc/cp-server-connect-base:6.0.0
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-rabbitmq:latest
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-influxdb:latest

2. 我们将构建 Connector 的 Docker 镜像。

$ docker build -t cp-connect-base .

重新定义docker-compose.yml

    请按照以下方式编辑位于作业目录中的「docker-compose.yml」文件。
サービス名変更有無内容zookeeper無
broker無
schema-registry無
connect有使用イメージ変更control-center無
ksqldb-server無
ksqldb-cli無
rabbitmq無
influxdb有新規追加grafana有新規追加
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:6.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:6.0.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
      SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,OPTIONS'

  connect:
    image: cp-connect-base:latest
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.0.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.org.apache.kafka.connect.runtime.rest=WARN,reflections=ERROR
      CONNECT_HOST: connect

  control-center:
    image: confluentinc/cp-enterprise-control-center:6.0.0
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
      - ksqldb-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:6.0.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
      KSQL_AUTO_OFFSET_RESET: "latest"

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:6.0.0
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  rabbitmq:
    image: rabbitmq:3.8.17-management
    restart: always
    ports:
      - '5672:5672'
      - '15672:15672'
    hostname: rabbitmq
    container_name: rabbitmq
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest

  influxdb:
    image: influxdb:1.8.6
    ports:
      - 8086:8086
    hostname: influxdb
    container_name: influxdb

  grafana:
    image: grafana/grafana
    ports:
      - 3000:3000
    hostname: grafana
    container_name: grafana
    environment:
      - GF_SERVER_ROOT_URL=http://grafana:3000
      - GF_INSTALL_PLUGINS=grafana-polystat-panel,bessler-pictureit-panel,marcuscalidus-svg-panel
      - GF_SECURITY_ADMIN_PASSWORD=admin
    depends_on:
      - influxdb

启动 Confluent 平台

    使用 -d 选项启动 Confluent Platform,并以分离模式运行。
$ docker-compose up -d

2. 我会确认服务是否正在运行中。

$ docker-compose ps

     Name                    Command               State                               Ports                            
------------------------------------------------------------------------------------------------------------------------
broker            /etc/confluent/docker/run        Up      0.0.0.0:9092->9092/tcp,:::9092->9092/tcp,                    
                                                           0.0.0.0:9101->9101/tcp,:::9101->9101/tcp                     
connect           /etc/confluent/docker/run        Up      0.0.0.0:8083->8083/tcp,:::8083->8083/tcp, 9092/tcp           
control-center    /etc/confluent/docker/run        Up      0.0.0.0:9021->9021/tcp,:::9021->9021/tcp                     
grafana           /run.sh                          Up      0.0.0.0:3000->3000/tcp,:::3000->3000/tcp                     
influxdb          /entrypoint.sh influxd           Up      0.0.0.0:8086->8086/tcp,:::8086->8086/tcp                     
ksqldb-cli        /bin/sh                          Up                                                                   
ksqldb-server     /etc/confluent/docker/run        Up      0.0.0.0:8088->8088/tcp,:::8088->8088/tcp                     
rabbitmq          docker-entrypoint.sh rabbi ...   Up      15671/tcp, 0.0.0.0:15672->15672/tcp,:::15672->15672/tcp,     
                                                           15691/tcp, 15692/tcp, 25672/tcp, 4369/tcp, 5671/tcp,         
                                                           0.0.0.0:5672->5672/tcp,:::5672->5672/tcp                     
schema-registry   /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp,:::8081->8081/tcp                     
zookeeper         /etc/confluent/docker/run        Up      0.0.0.0:2181->2181/tcp,:::2181->2181/tcp, 2888/tcp, 3888/tcp 

3. 确认连接器也处于运行状态。

$ curl http://localhost:8083/connector-plugins | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   793  100   793    0     0  14962      0 --:--:-- --:--:-- --:--:-- 14962
[
  {
    "class": "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
    "type": "source",
    "version": "0.0.0.0"
  },
  {
    "class": "io.confluent.influxdb.InfluxDBSinkConnector",
    "type": "sink",
    "version": "unknown"
  },
  {
    "class": "io.confluent.influxdb.source.InfluxdbSourceConnector",
    "type": "source",
    "version": "1.2.1"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "6.0.0-ce"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "6.0.0-ce"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "1"
  }
]

通过这个步骤,在Docker容器环境下成功确认了Confluent Platform的运行。接下来,我们将发送来自IoT数据生成程序的数据,通过Broker的主题,确认数据能够通过InfluxDB进行接收。

本课题的步骤信息 de

步骤1:在Docker容器环境中重新建立Confluent Platform
步骤2:确认从Broker接收到InfluxDB的数据
步骤3:通过InfluxDB传输给Grafana并确认实时数据可视化

广告
将在 10 秒后关闭
bannerAds