建立用于测试的fluentd服务器

在创建使用Fluentd的系统的过程中,我开始纠结于如何测试数据是否能被正确地发送到Fluentd,于是进行了各种调查研究。

我想做的事情

在構建使用Fluentd的系統過程中,我想要輕鬆地寫一個測試來確認數據是否被傳送到了Fluentd。

政策

为了尽可能地减少测试时间,我希望在进行持续集成时能将使用FluentLogger投递的数据尽快与输出进行协作。

以前我自己编写了打开套接字并进行监听的代码,但随着版本升级和SSL的涉及,变得变得麻烦了。所以我希望尽量通过插件之类的方式来解决。

由于在回答程式集成(CI)方面经常使用Docker来搭建周边中间件,所以以前尝试过将Elasticsearch加入到中间件设置中进行测试,但我觉得为了测试而专门还要搭建一个单独的Elasticsearch,并安装elasticsearch-ruby进行连接实在是太麻烦了。

特别是官方镜像无法以单节点模式启动,而且当版本更新时需要在所有微服务中进行更新,这对我来说非常繁琐,因此我希望尽量在没有额外中间件的情况下进行测试。

考虑方法

实时性

在调整flush_interval的值时,对于fluentd的缓冲进行调优是一项常规工作,不过如果是在进行测试的情况下,使用flush_mode=immediate应该就可以了。

    https://docs.fluentd.org/configuration/buffer-section#flushing-parameters

在CI中的易用性

考虑到我们现在正在使用的ActiveJob等工具中是否能够巧妙地利用Redis,我发现官方插件列表中有一个看起来相当不错的插件,所以决定试一试。

    https://github.com/pokehanai/fluent-plugin-redis-store

虽然MySQL也有,但是由于从FluentLogger发送的数据结构比较杂乱,更适合直接使用Redis插件来存储。

实际试一试

在考虑不同方法的基础上,继续进行必要的Gem安装和fluentd设置。

1. 准备好

只要gem有fluent-plugin-redis-store插件,就可以一起安装在主体上。

gem install fluentd fluent-plugin-redis-store --no-document

关于Redis安装的问题就不详细介绍了,只需要用brew或者docker来搭建Redis即可。

Fluentd会从forward接收数据并发送到redis_store,所以fluent.conf的配置如下所示。

<source>
  @type forward
  @id input1
  @label @mainstream
  port 24224
</source>

<label @mainstream>
  <filter **>
    @type record_transformer
    <record>
      tag ${tag}
    </record>
  </filter>
  <match **>
    @type copy
    <store>
      @type stdout
    </store>
    <store>
      @type redis_store
      key_path tag
      <buffer>
        flush_mode immediate
      </buffer>
    </store>
  </match>
</label>

我正在做的事情大致是这样的。

    • sourceとしてforwardから受け取ったものに@mainstreamのラベルをつける

 

    • filterとして@mainstreamの全てのデータに対してtagというキーでタグの値をセットする

 

    • matchとして@mainstreamの全てのデータに対してstdoutとredis_storeの2つへアウトプットする

 

    • redis_storeの設定として、Redisのキーとして使う値に上記のfilterでセットしたタグを設定する

 

    redis_storeのバッファ設定として、flush_mode immediateとしてバッファに入った後すぐにアウトプットする

尽管将输出到stdout只是额外添加的内容,但如果将其输出,以后调试就会变得更加容易。

将这个内容写入./fluent.conf文件中,并启动fluentd。

fluentd -c fluent.conf

既经准备就绪,虽然想要投入数据,但为了更加清楚地了解Redis的运作情况,最好单独打开一个窗口进行监控。

redis-cli monitor

将数据发送到Fluentd

实际应用中,将数据发送给Fluentd的部分通常会使用fluent-logger-ruby来发送数据,但本次尝试将使用fluent-cat作为替代。

echo '{ "key" : "sample" }' | bundle exe fluent-cat debug.test

这个命令会给数据{“key”: “sample”}附加上debug.test标签和当前时间戳,然后将数据发送出去。如果不设置的话,默认会发送到本地地址,所以数据会被发送到之前建立的fluentd服务器。

そうするとfluentd側の標準出力に下記のような出力が出る。

2020-11-03 09:37:28.016591000 +0900 debug.test: {"key":"sample","tag":"debug.test"}

これはfluent.confに設定した@type stdoutが出力しているもので、実際にデータを受け取った後にfilterがタグを付加してこの形になっていることが確認できる。

さらにRedisをモニタリングしている窓を見ると下記のように表示されている。

1604363848.025602 [0 172.28.0.1:40998] "zadd" "debug.test" "1604363848.016591" "{\"key\":\"sample\",\"tag\":\"debug.test\"}"

pluginのreadmeに書いてあるとおりzaddされている。keyはkey_path tagの設定が効いていて、データに含まれているtagの値であるdebug.testがセットされている。

2. 数据投掷后的确认

尝试从Redis中获取数据。

redis-cli zrange debug.test 0 -1 withscores
1) "{\"key\":\"sample\",\"tag\":\"debug.test\"}"
2) "1604363848.016591"

因为时间戳变成了分数,所以可以大量输入并以好的方式排序。

顺便说一句,在Ruby中写的话,会是这样的感觉。

require 'redis'

Redis.new.zrange 'debug.test', 0, -1, withscores: true
 => [["{\"key\":\"sample\",\"tag\":\"debug.test\"}", 1604363848.016591]]

暂时来说,由于可以很简单地获取值,所以相对来说使用起来还算方便。

给你一个额外的东西

考虑并列测试的易用性。

在使用parallel_tests或test-queue进行测试并行执行时,需要确定是哪个测试进程记录了日志。在这种情况下,可以在发送的数据中包含进程ID,并使用它来设置键值。

由于Rails日志文件中需要包含进程ID、时间和请求ID,我认为这是非常有必要的,不会造成任何伤害。

如果是那样的话,发送的数据将会是以下的形式。

{ "key" : "sample", "pid" : 123 }

随着这个变化,fluent.conf也需要按照以下方式进行修改。

  <label @mainstream>
    <filter **>
      @type record_transformer
      <record>
        tag ${tag}
+       tag_with_pid '${tag}.${record["pid"]}'
      </record>
    </filter>
    <match **>
      @type copy
      <store>
        @type stdout
      </store>
      <store>
        @type redis_store
-       key_path tag
+       key_path tag_with_pid
        <buffer>
          flush_mode immediate
        </buffer>
      </store>
    </match>
  </label>

当这个被注册到Redis中的键带有进程ID时,之后只需使用该进程ID来确定自己进程的日志,并通过评估来判断是否实际传输了数据。

更改Redis的目标地址

似乎可以按照下面的方式修改设置,如README中所述。

  <label @mainstream>
    <filter **>
      @type record_transformer
      <record>
        tag ${tag}
      </record>
    </filter>
    <match **>
      @type copy
      <store>
        @type stdout
      </store>
      <store>
        @type redis_store
        key_path tag
+       host 10.0.0.1
+       db   11
        <buffer>
          flush_mode immediate
        </buffer>
      </store>
    </match>
  </label>

当在使用RSpec或Cucumber时的辅助工具时。

如果在实际的测试中使用时,制作出这样一款助手的话,似乎可以轻松地使用。

# frozen_string_literal: true

require 'redis'

module FluentdLogHelper

  def fetch_fluentd_log_by(tag:, pid: nil)
    redis_key = pid ? "#{tag}.#{pid}" : tag
    redis.zrange redis_key, 0, -1
  end

  def redis(options = {})
    options[:db] ||= 0
    @redis ||= Redis.new(options)
  end

end
fetch_fluentd_log_by tag: 'debug.test'
fetch_fluentd_log_by tag: 'debug.test', pid: Process.pid
bannerAds