建立用于测试的fluentd服务器
在创建使用Fluentd的系统的过程中,我开始纠结于如何测试数据是否能被正确地发送到Fluentd,于是进行了各种调查研究。
我想做的事情
在構建使用Fluentd的系統過程中,我想要輕鬆地寫一個測試來確認數據是否被傳送到了Fluentd。
政策
为了尽可能地减少测试时间,我希望在进行持续集成时能将使用FluentLogger投递的数据尽快与输出进行协作。
以前我自己编写了打开套接字并进行监听的代码,但随着版本升级和SSL的涉及,变得变得麻烦了。所以我希望尽量通过插件之类的方式来解决。
由于在回答程式集成(CI)方面经常使用Docker来搭建周边中间件,所以以前尝试过将Elasticsearch加入到中间件设置中进行测试,但我觉得为了测试而专门还要搭建一个单独的Elasticsearch,并安装elasticsearch-ruby进行连接实在是太麻烦了。
特别是官方镜像无法以单节点模式启动,而且当版本更新时需要在所有微服务中进行更新,这对我来说非常繁琐,因此我希望尽量在没有额外中间件的情况下进行测试。
考虑方法
实时性
在调整flush_interval的值时,对于fluentd的缓冲进行调优是一项常规工作,不过如果是在进行测试的情况下,使用flush_mode=immediate应该就可以了。
- https://docs.fluentd.org/configuration/buffer-section#flushing-parameters
在CI中的易用性
考虑到我们现在正在使用的ActiveJob等工具中是否能够巧妙地利用Redis,我发现官方插件列表中有一个看起来相当不错的插件,所以决定试一试。
- https://github.com/pokehanai/fluent-plugin-redis-store
虽然MySQL也有,但是由于从FluentLogger发送的数据结构比较杂乱,更适合直接使用Redis插件来存储。
实际试一试
在考虑不同方法的基础上,继续进行必要的Gem安装和fluentd设置。
1. 准备好
只要gem有fluent-plugin-redis-store插件,就可以一起安装在主体上。
gem install fluentd fluent-plugin-redis-store --no-document
关于Redis安装的问题就不详细介绍了,只需要用brew或者docker来搭建Redis即可。
Fluentd会从forward接收数据并发送到redis_store,所以fluent.conf的配置如下所示。
<source>
@type forward
@id input1
@label @mainstream
port 24224
</source>
<label @mainstream>
<filter **>
@type record_transformer
<record>
tag ${tag}
</record>
</filter>
<match **>
@type copy
<store>
@type stdout
</store>
<store>
@type redis_store
key_path tag
<buffer>
flush_mode immediate
</buffer>
</store>
</match>
</label>
我正在做的事情大致是这样的。
-
- sourceとしてforwardから受け取ったものに@mainstreamのラベルをつける
-
- filterとして@mainstreamの全てのデータに対してtagというキーでタグの値をセットする
-
- matchとして@mainstreamの全てのデータに対してstdoutとredis_storeの2つへアウトプットする
-
- redis_storeの設定として、Redisのキーとして使う値に上記のfilterでセットしたタグを設定する
- redis_storeのバッファ設定として、flush_mode immediateとしてバッファに入った後すぐにアウトプットする
尽管将输出到stdout只是额外添加的内容,但如果将其输出,以后调试就会变得更加容易。
将这个内容写入./fluent.conf文件中,并启动fluentd。
fluentd -c fluent.conf
既经准备就绪,虽然想要投入数据,但为了更加清楚地了解Redis的运作情况,最好单独打开一个窗口进行监控。
redis-cli monitor
将数据发送到Fluentd
实际应用中,将数据发送给Fluentd的部分通常会使用fluent-logger-ruby来发送数据,但本次尝试将使用fluent-cat作为替代。
echo '{ "key" : "sample" }' | bundle exe fluent-cat debug.test
这个命令会给数据{“key”: “sample”}附加上debug.test标签和当前时间戳,然后将数据发送出去。如果不设置的话,默认会发送到本地地址,所以数据会被发送到之前建立的fluentd服务器。
そうするとfluentd側の標準出力に下記のような出力が出る。
2020-11-03 09:37:28.016591000 +0900 debug.test: {"key":"sample","tag":"debug.test"}
これはfluent.confに設定した@type stdoutが出力しているもので、実際にデータを受け取った後にfilterがタグを付加してこの形になっていることが確認できる。
さらにRedisをモニタリングしている窓を見ると下記のように表示されている。
1604363848.025602 [0 172.28.0.1:40998] "zadd" "debug.test" "1604363848.016591" "{\"key\":\"sample\",\"tag\":\"debug.test\"}"
pluginのreadmeに書いてあるとおりzaddされている。keyはkey_path tagの設定が効いていて、データに含まれているtagの値であるdebug.testがセットされている。
2. 数据投掷后的确认
尝试从Redis中获取数据。
redis-cli zrange debug.test 0 -1 withscores
1) "{\"key\":\"sample\",\"tag\":\"debug.test\"}"
2) "1604363848.016591"
因为时间戳变成了分数,所以可以大量输入并以好的方式排序。
顺便说一句,在Ruby中写的话,会是这样的感觉。
require 'redis'
Redis.new.zrange 'debug.test', 0, -1, withscores: true
=> [["{\"key\":\"sample\",\"tag\":\"debug.test\"}", 1604363848.016591]]
暂时来说,由于可以很简单地获取值,所以相对来说使用起来还算方便。
给你一个额外的东西
考虑并列测试的易用性。
在使用parallel_tests或test-queue进行测试并行执行时,需要确定是哪个测试进程记录了日志。在这种情况下,可以在发送的数据中包含进程ID,并使用它来设置键值。
由于Rails日志文件中需要包含进程ID、时间和请求ID,我认为这是非常有必要的,不会造成任何伤害。
如果是那样的话,发送的数据将会是以下的形式。
{ "key" : "sample", "pid" : 123 }
随着这个变化,fluent.conf也需要按照以下方式进行修改。
<label @mainstream>
<filter **>
@type record_transformer
<record>
tag ${tag}
+ tag_with_pid '${tag}.${record["pid"]}'
</record>
</filter>
<match **>
@type copy
<store>
@type stdout
</store>
<store>
@type redis_store
- key_path tag
+ key_path tag_with_pid
<buffer>
flush_mode immediate
</buffer>
</store>
</match>
</label>
当这个被注册到Redis中的键带有进程ID时,之后只需使用该进程ID来确定自己进程的日志,并通过评估来判断是否实际传输了数据。
更改Redis的目标地址
似乎可以按照下面的方式修改设置,如README中所述。
<label @mainstream>
<filter **>
@type record_transformer
<record>
tag ${tag}
</record>
</filter>
<match **>
@type copy
<store>
@type stdout
</store>
<store>
@type redis_store
key_path tag
+ host 10.0.0.1
+ db 11
<buffer>
flush_mode immediate
</buffer>
</store>
</match>
</label>
当在使用RSpec或Cucumber时的辅助工具时。
如果在实际的测试中使用时,制作出这样一款助手的话,似乎可以轻松地使用。
# frozen_string_literal: true
require 'redis'
module FluentdLogHelper
def fetch_fluentd_log_by(tag:, pid: nil)
redis_key = pid ? "#{tag}.#{pid}" : tag
redis.zrange redis_key, 0, -1
end
def redis(options = {})
options[:db] ||= 0
@redis ||= Redis.new(options)
end
end
fetch_fluentd_log_by tag: 'debug.test'
fetch_fluentd_log_by tag: 'debug.test', pid: Process.pid