使用Oracle Cloud的Fluentd组件,尝试通过流式传输生产消息
首先
在Oracle Cloud Infractructure(OCI)中,提供了一项名为Streaming的服务,可以实时收集和处理流数据。由于Streaming具有与Apache Kafka兼容的API,因此可以从Kafka客户端连接,并进行数据的生产和消费。
我們將使用Fluentd來確認如何將數據流式傳輸並生成。
虚拟机
请随意准备CentOS7。只要Fluentd能正常运行即可。
生成授权令牌
要通过Kafka API进行流媒体传输,需要使用IAM用户的身份验证令牌。您可以在自己的IAM用户详细信息页面上生成该令牌。
我会随便加些解释。
因为无法使用图片上的内容,所以我会将标记记录下来。
如果Token中含有分号等符号,我觉得它可能无法正常运行。如果遇到困难无法正常运行时,请考虑更换Token。
创建一个流
在OCI控制台上,选择Analytics > Streaming。然后选择Create Stream。
我会根据需要随意设定参数并创建。
当创建 Stream 时,如果 Stream Pool 为空,则会自动创建默认的 Stream Pool。teststream01将自动分配到 DefaultPool。Stream Pool 是一个管理多个 Stream 的概念,可以统一管理将 Stream 的 Endpoint 设置为公开还是私有,以及使用什么密钥来加密数据等事项。
这次选择了自动创建,因此 Endpoint 是公共的,并且使用由 Oracle 管理的加密密钥进行设置。
点击屏幕上的“查看Kafka连接设置”。
点击复制全部按钮,将所有的设置值备忘下来。
安装Fluentd
请确认以下URL并进行预设。
https://docs.fluentd.org/installation/before-install
安装Fluentd。
$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent3.sh | sh
由于使用 Systemd 进行管理,系统将会启动。
sudo systemctl start td-agent.service
sudo systemctl enable td-agent.service
配置Fluentd
编辑Fluentd并将其设置为消费流式数据流。
cat <<'EOF' > /etc/td-agent/td-agent.conf
<source>
@type forward
</source>
<match debug.**>
@type copy
<store>
@type stdout
</store>
<store>
@type kafka2
brokers cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092
use_event_time true
# data type settings
<format>
@type json
</format>
# topic settings
default_topic teststream01
# producer settings
required_acks -1
compression_codec gzip
# sasl
username poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla
password 8t[shwUN}I-d+{}8Nx_a
sasl_over_ssl true
ssl_ca_certs_from_system true
<buffer>
@type file
path /var/log/td-agent/buffer/oci.buffer
chunk_limit_size 8m
queue_limit_length 256
flush_at_shutdown true
flush_interval 1s
retry_wait 30s
retry_max_times 9
</buffer>
</store>
</match>
EOF
解释连接到Stream的重要要点。
对于brokers,请指定位于Stream Pool中的引导服务器的值。
brokers cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092
指定生產主題名稱。從 OCI 的角度來看,請指定流的名稱。
default_topic teststream01
以下是关于SASL认证的配置。
username : OCI の Stream Pool にある SASL CONNECTION STRINGS から、username を抜き出して指定します。
password : Auth Token を指定します。
sasl_over_ssl : true
ssl_ca_certs_from_system : trueを指定します。ssl_ca_cert で証明書を読み込ませる方法は、自分のやり方だと出来ませんでした。(自分の設定方法が悪そう)
# sasl
username poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla
password 8t[shwUN}I-d+{}8Nx_a
sasl_over_ssl true
ssl_ca_certs_from_system true
只要进行这样的设置,就可以连接,所以我们重新启动Fluentd。
sudo systemctl restart td-agent
确认动作
执行以下命令,可以向Fluentd发送Hello消息。
echo '{"I say":"hello"}' | /opt/td-agent/embedded/bin/fluent-cat debug.ok
在流媒体界面上,您可以查看数据。点击“Load Messages”按钮,就可以看到一个名为“hello”的消息。这样,您就可以确认通过Fluentd将数据传输到流媒体的步骤了。
