使用Oracle Cloud的Fluentd组件,尝试通过流式传输生产消息

首先

在Oracle Cloud Infractructure(OCI)中,提供了一项名为Streaming的服务,可以实时收集和处理流数据。由于Streaming具有与Apache Kafka兼容的API,因此可以从Kafka客户端连接,并进行数据的生产和消费。

我們將使用Fluentd來確認如何將數據流式傳輸並生成。

虚拟机

请随意准备CentOS7。只要Fluentd能正常运行即可。

生成授权令牌

要通过Kafka API进行流媒体传输,需要使用IAM用户的身份验证令牌。您可以在自己的IAM用户详细信息页面上生成该令牌。

1588474181611.png

我会随便加些解释。

1588474211704.png

因为无法使用图片上的内容,所以我会将标记记录下来。

如果Token中含有分号等符号,我觉得它可能无法正常运行。如果遇到困难无法正常运行时,请考虑更换Token。

1588474230416.png

创建一个流

在OCI控制台上,选择Analytics > Streaming。然后选择Create Stream。

1588519801919.png

我会根据需要随意设定参数并创建。

1588519908911.png

当创建 Stream 时,如果 Stream Pool 为空,则会自动创建默认的 Stream Pool。teststream01将自动分配到 DefaultPool。Stream Pool 是一个管理多个 Stream 的概念,可以统一管理将 Stream 的 Endpoint 设置为公开还是私有,以及使用什么密钥来加密数据等事项。

这次选择了自动创建,因此 Endpoint 是公共的,并且使用由 Oracle 管理的加密密钥进行设置。

1588520017422.png

点击屏幕上的“查看Kafka连接设置”。

1588521024252.png

点击复制全部按钮,将所有的设置值备忘下来。

1588521056591.png

安装Fluentd

请确认以下URL并进行预设。
https://docs.fluentd.org/installation/before-install

安装Fluentd。

$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent3.sh | sh

由于使用 Systemd 进行管理,系统将会启动。

sudo systemctl start td-agent.service
sudo systemctl enable td-agent.service

配置Fluentd

编辑Fluentd并将其设置为消费流式数据流。

cat <<'EOF' > /etc/td-agent/td-agent.conf
<source>
 @type forward
</source>

<match debug.**>
  @type copy
  <store>
    @type stdout
  </store>
  <store>
    @type kafka2
    brokers cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092
    use_event_time true

    # data type settings
    <format>
      @type json
    </format>

    # topic settings
    default_topic teststream01

    # producer settings
    required_acks -1
    compression_codec gzip

    # sasl
    username poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla
    password 8t[shwUN}I-d+{}8Nx_a
    sasl_over_ssl true
    ssl_ca_certs_from_system true


    <buffer>
      @type file
      path /var/log/td-agent/buffer/oci.buffer
      chunk_limit_size 8m
      queue_limit_length 256
      flush_at_shutdown true
      flush_interval 1s
      retry_wait 30s
      retry_max_times 9
    </buffer>
  </store>
</match>
EOF

解释连接到Stream的重要要点。
对于brokers,请指定位于Stream Pool中的引导服务器的值。

brokers cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092

指定生產主題名稱。從 OCI 的角度來看,請指定流的名稱。

default_topic teststream01

以下是关于SASL认证的配置。

username : OCI の Stream Pool にある SASL CONNECTION STRINGS から、username を抜き出して指定します。

password : Auth Token を指定します。

sasl_over_ssl : true

ssl_ca_certs_from_system : trueを指定します。ssl_ca_cert で証明書を読み込ませる方法は、自分のやり方だと出来ませんでした。(自分の設定方法が悪そう)

# sasl
username poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla
password 8t[shwUN}I-d+{}8Nx_a
sasl_over_ssl true
ssl_ca_certs_from_system true

只要进行这样的设置,就可以连接,所以我们重新启动Fluentd。

sudo systemctl restart td-agent

确认动作

执行以下命令,可以向Fluentd发送Hello消息。

echo '{"I say":"hello"}' | /opt/td-agent/embedded/bin/fluent-cat debug.ok

在流媒体界面上,您可以查看数据。点击“Load Messages”按钮,就可以看到一个名为“hello”的消息。这样,您就可以确认通过Fluentd将数据传输到流媒体的步骤了。

1588778328650.png

请提供一个可参考的网址。

bannerAds