docker-compose で Kafka の動作確認ができて、docker-fluentd-image のカスタマイズを試すのもやったので、組み合わせて docker-compose で fluent-plugin-kafka を軽く試してみた。

準備

ディレクトリを作成して移動。

$ mkdir custom-fluentd
$ cd custom-fluentd

docker-compose.yml を作成する。

$ vi docker-compose.yml
version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper
  fluentd:
    build: .
    volumes:
      - ./log:/fluentd/log
    depends_on:
      - kafka

fluentd のイメージはカスタマイズして fluent-plugin-kafka をインストールするため、Dockerfile を作成する。
(fluent-plugin-kafka だけでなく zookeeper の gem もインストールする必要があった。)

$ vi Dockerfile
FROM fluent/fluentd:latest-onbuild
MAINTAINER YOUR_NAME <...@...>
WORKDIR /home/fluent
ENV PATH /home/fluent/.gem/ruby/2.3.0/bin:$PATH

USER root
RUN apk --no-cache add sudo build-base ruby-dev && \

    sudo -u fluent gem install zookeeper fluent-plugin-kafka && \

    rm -rf /home/fluent/.gem/ruby/2.3.0/cache/*.gem && sudo -u fluent gem sources -c && \
    apk del sudo build-base ruby-dev

EXPOSE 24284

USER fluent
CMD exec fluentd -c /fluentd/etc/$FLUENTD_CONF -p /fluentd/plugins $FLUENTD_OPT

fluentd の設定ファイルを作成する。とりあえず受信したログを全部 docker という名前のトピックに送信する設定。

$ vi fluent.conf
<source>
  @type  forward
  @id    input1
  @label @mainstream
  port  24224
</source>

<filter **>
  @type stdout
</filter>

<label @mainstream>
  <match **>
    @type kafka_buffered

    brokers kafka:9092
    zookeeper zookeeper:2181
    default_topic docker
  </match>
</label>

あと必要なディレクトリを作っておく。(plugins のほうはいらないかも。)

$ mkdir log
$ mkdir plugins

コンテナ起動

ドッカコンポーザップディーする。

$ docker-compose up -d

今回 docker-compose.yml が手抜きなので、zookeeper が立ち上がる前に kafka が立ち上がろうとして落ちてしまったり、kafka が立ち上がる前に fluentd が立ち上がろうとして落ちてしまったりする。
ので、docker-compose ps して落ちていたら、手動で docker-compose start kafka とか docker-compose start fluentd した。

うまく立ち上がると docker-compose ps がこんな感じになる。

$ docker-compose ps
          Name                         Command               State                         Ports                        
-----------------------------------------------------------------------------------------------------------------------
customfluentd_fluentd_1     /bin/sh -c exec fluentd -c ...   Up      24224/tcp, 24284/tcp, 5140/tcp                     
customfluentd_kafka_1       start-kafka.sh                   Up      0.0.0.0:32797->9092/tcp                            
customfluentd_zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

動作確認

fluent.conf に指定した docker トピックを kafka に作成しておく。

$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic docker
Created topic "test".
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181
docker

fluentd の IP を調べて、別コンテナから docker ログを送信する。

$ FLUENTD_IP=$(docker inspect -f "{{.NetworkSettings.Networks.customfluentd_default.IPAddress}}" customfluentd_fluentd_1)
$ echo $FLUENTD_IP
172.19.0.4
$ docker run --log-driver=fluentd --log-opt fluentd-address=$FLUENTD_IP:24224 python:alpine echo Hello
Hello

kafka-console-consumer.sh を立ち上げてトピックにたまったログを最初から読んでみると、ちゃんとさきほどの Hello というメッセージを受信することができた。

$ docker-compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic docker --from-beginning
{"source":"stdout","log":"Hello","container_id":"c207a0ab45f5f67cb8bf1fc04480681a1a8a8b3947aad9025bf725125583c605","container_name":"/tiny_visvesvaraya"}

上記の kafka-console-consumer.sh を起動したまま、もう一度上に書いた方法で echo Hello してみると、今度はしばらく時間がたってからメッセージを受信することができた。
バッファされているから受信に時間がかかるということなんだと思う。

まとめ

docker-compose で fluent-plugin-kafka の match + kafka_buffered を手軽に試すことができるようになった。次は source (consume する側) のほうを試してみようと思う。

参考資料

    • docker-compose で Kafka の動作確認 – Qiita

 

    • docker-fluentd-image のカスタマイズを試す – Qiita

 

    fluent/fluent-plugin-kafka: Kafka input and output plugin for Fluentd
广告
将在 10 秒后关闭
bannerAds