使用Oracle Cloud中的Fluentd,尝试将数据从Streaming传输到HDFS进行协作
首先
在Oracle Cloud Infrastructure(OCI)中,提供了一项名为Streaming的服务,可以实时收集和处理流数据。Streaming具有与Apache Kafka兼容的API,因此可以从Kafka Client连接并进行数据的生产和消费。
我們將使用Fluentd來確認將流動的數據存儲到Hadoop的HDFS的方法。
前提是要有一个条件
前提条件是基于本文中构建的Hadoop/HDFS。
HDFS配置
使用Fluentd的fluent-plugin-webhdfs将数据存储到HDFS。该插件使用WebHDFS接口来操作HDFS的REST API。由于WebHDFS默认处于禁用状态,因此需要更改Hadoop/HDFS的配置以启用它。
编辑hdfs-site.xml。
vim $HADOOP_HOME/etc/hadoop/hdfs-site.xml
添加到配置中。
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.support.append</name>
<value>true</value>
</property>
<property>
<name>dfs.support.broken.append</name>
<value>true</value>
</property>
修改后的文件整体感觉如下所示。
[opc@hivenew hadoop-3.2.1]$ cat $HADOOP_HOME/etc/hadoop/hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>dfs.name.dir</name>
<value>/home/opc/hdfs/name</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>/home/opc/hdfs/data</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.support.append</name>
<value>true</value>
</property>
<property>
<name>dfs.support.broken.append</name>
<value>true</value>
</property>
</configuration>
[opc@hivenew hadoop-3.2.1]$
启动 Hadoop / HDFS。
cd $HADOOP_HOME
sbin/start-all.sh
使用jps命令来确认是否已启动。
[opc@hivenew hadoop-3.2.1]$ jps
3057 NameNode
4483 NodeManager
4805 Jps
4358 ResourceManager
3210 DataNode
3406 SecondaryNameNode
为了确认操作,将尝试通过浏览器连接到NameNode来确认是否可以连接。

生成授权令牌
要在Streaming上使用Kafka API进行连接,需要使用IAM用户的身份认证令牌。您可以在自己的IAM用户详细页面上生成该令牌。
我会随意地进行解释。
由于无法使用图像,所以我会记下Token的信息。
创建流 liú)
在 OCI 控制台中,选择 Analytics > Streaming。然后选择创建流。
使用适当的参数进行创建。
在创建Stream时,如果Stream Pool为空,则默认的Stream Pool将被自动创建。teststream01将被自动分配给DefaultPool。Stream Pool是一种用于管理多个Stream的概念,可以集中管理Stream的Endpoint是公开还是私有,并决定使用哪个密钥进行数据加密。
这次我们选择了自动创建,所以 Endpoint 是公开的,并且使用由Oracle管理的加密密钥进行设置。
点击上述画面中的「查看Kafka连接设置」。
点击“复制全部”,将所有的设置值记下来。
安装Fluentd
请访问以下网址以进行预先设置:
https://docs.fluentd.org/installation/before-install
安装Fluentd。
$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent3.sh | sh
由于系统使用Systemd进行管理,因此会启动。
sudo systemctl start td-agent.service
sudo systemctl enable td-agent.service
Fluentd配置
将数据依次流向 Streaming(Kafka兼容API) → Fluentd → WebHDFS。
在源头中,指定了 kafka_group。使用从 Stream Pool 获取的连接信息来指定。
在存储中,指定了 webhdfs。
如果您想要查看详细的参数,请访问以下 GitHub 链接:
https://github.com/fluent/fluent-plugin-kafka
https://github.com/fluent/fluent-plugin-webhdfs/
cat <<'EOF' > /etc/td-agent/td-agent.conf
<source>
@type kafka_group
@label @STREAMING
brokers cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092
consumer_group sugigroup
topics teststream01
format text
# sasl
username poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla
password 8t[shwUN}I-d+{}8Nx_a
sasl_over_ssl true
ssl_ca_certs_from_system true
</source>
<label @STREAMING>
<match **>
@type copy
<store>
@type stdout
</store>
<store>
@type webhdfs
host 10.0.0.12
port 9870
path "/log/%Y%m%d_%H/streaming.log.#{Socket.gethostname}"
flush_interval 10s
username opc
</store>
</match>
</label>
EOF
重启Fluentd。
systemctl restart td-agent
确认动作
现在,我们已经完成了设置,让我们在OCI Streaming的管理界面上进行实际操作来确认一下。您可以在Stream上进行Produce操作,我们试着存储一个字符串“hello webhdfs!”。

在没有施加任何负载的无风状态下,我们在存储了测试数据之后,在大约30秒内就可以在HDFS上确认数据。当然,存储所需的时间会因负载情况而有所不同。根据配置了Fluentd的内容,数据被存储如下:
/记录/<日期>/流媒体.log.<主机名>
[opc@hivenew hadoop-3.2.1]$ bin/hadoop fs -ls /log/20200516_00
Found 1 items
-rw-r--r-- 1 opc supergroup 1325 2020-05-16 00:57 /log/20200516_00/streaming.log.hivenew
[opc@hivenew hadoop-3.2.1]$
当使用cat命令进行确认时,可以看到末尾出现了“hello webhdfs!”这样的文字。经过传输的数据已经自动存储到了HDFS中,这一点已经得到了确认。
[opc@hivenew hadoop-3.2.1]$ bin/hadoop fs -cat /log/20200516_00/streaming.log.hivenew
2020-05-16 00:57:30,350 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-05-16T00:06:25Z test.test {"I say":"hello1"}
2020-05-16T00:43:15Z test.test {"I say":"hello1"}
2020-05-16T00:52:05Z teststream01 {"I say":"hello"}
2020-05-16T00:52:05Z teststream01 {"I say":"hello"}
2020-05-16T00:52:05Z teststream01 {"I say":"hello"}
2020-05-16T00:52:05Z teststream01 {"I say":"hello"}
2020-05-16T00:52:05Z teststream01 {"I say":"hello1"}
2020-05-16T00:52:05Z teststream01 {"I say":"hello1"}
2020-05-16T00:52:05Z teststream01 {"I say":"hello1"}
2020-05-16T00:52:05Z teststream01 {"I say":"hello"}
2020-05-16T00:52:05Z teststream01 {"I say":"hello"}
2020-05-16T00:52:05Z teststream01 {"I say":"hello1"}
2020-05-16T00:52:05Z teststream01 {"key":"a","value":"a"}
2020-05-16T00:56:16Z teststream01 {"message":"message=1"}
2020-05-16T00:56:16Z teststream01 {"message":"message=3"}
2020-05-16T00:56:16Z teststream01 {"message":"message=5"}
2020-05-16T00:56:16Z teststream01 {"message":"message=8"}
2020-05-16T00:56:16Z teststream01 {"message":"message=4"}
2020-05-16T00:56:16Z teststream01 {"message":"message=7"}
2020-05-16T00:56:18Z teststream01 {"message":"message=0"}
2020-05-16T00:56:18Z teststream01 {"message":"message=2"}
2020-05-16T00:56:18Z teststream01 {"message":"message=6"}
2020-05-16T00:56:18Z teststream01 {"message":"message=9"}
2020-05-16T00:57:14Z teststream01 {"message":"hello webhdfs!"}