使用Embulk将RDS的慢查询日志导入Elasticsearch

在Amazon RDS中,默认配置下,慢查询日志被保存在名为mysql.slow_log的表中。我们可以使用Embulk将存储在mysql.slow_log中的慢查询日志导入到Elasticsearch中进行投入。

安装所需的插件。

使用embulk mkbundle,可以将所需的插件通过Gemfile进行管理。

$ embulk mkbundle embulk-aurora-move-slowlog
$ cd embulk-move-slowlog

在Gemfile中添加所需的插件。由于本次操作是将RDS数据库中的slowlog投入到Elasticsearch中,因此在Gemfile中加入以下的描述。

source 'https://rubygems.org/'
gem 'embulk', '~> 0.8.0'
gem 'embulk-input-mysql', '~> 0.8.2'
gem 'embulk-output-elasticsearch_ruby', '~> 0.1.4'
gem 'embulk-filter-typecast', '~> 0.1.5'
gem 'embulk-filter-column', '~> 0.6.0'

完成Gemfile后,使用以下命令来安装插件。

$ embulk bundle

创建 Embulk 的配置文件

创建一个类似以下的配置文件。

in:
  type: mysql
  user: xxxxxx
  password: yyyyy
  database: mysql
  table: slow_log
  host: zzzzzz.ap-northeast-1.rds.amazonaws.com
  select: "*"
  column_options:
    start_time: {type: string, timestamp_format: "%Y-%m-%d %H:%M:%S+09:00"}
  incremental: true
  incremental_columns: [start_time]
  options:
    useLegacyDatetimeCode: false
    serverTimezone: UTC
filters:
  - type: typecast
    columns:
      - {name: query_time, type: long}
      - {name: lock_time, type: long}
out:
  type: elasticsearch_ruby
  mode: normal
  nodes:
    - {host: "elasticsearch.example.com", port: 9200 }
  index: slow_log
  index_type: log
  request_timeout: 60

关于查询时间(query_time)和锁定时间(lock_time),在确认了数据库模式后,发现它们被存储为时间类型(time)。如果保持原样导入到Elasticsearch中,将无法准确传递数值,因此我使用embulk-filter-typecast将其转换为长整型(long)。另外,如果RDS的时区为JST,而embulk采用的是UTC时区,则可能导致搜索失败,或者在使用Resume功能时创建的文件时间被倒退。为了解决这个问题,我将其转换为字符串,并在column_options中添加时区+09:00,将其当作UTC处理,并在Elasticsearch的映射中进行时区调整。

Elasticsearch的映射设置

由于前述的原因,我们需要指定start_time的format与timestamp_format的输出格式相匹配。

{
  "template": "slow_log-*",
  "mappings": {
    "log": {
      "properties": {
        "start_time": {
          "type": "date",
          "format": "yyyy-MM-dd HH:mm:ssZZ"
        },
        "user_host": {
          "type": "string",
          "index": "not_analyzed"
        },
        "query_time": {
          "type": "long"
        },
        "lock_time": {
          "type": "long"
        },
        "row_sent": {
          "type": "long"
        },
        "rows_examined": {
          "type": "long"
        },
        "db": {
          "type": "string",
          "index": "not_analyzed"
        },
        "last_insert_id": {
          "type": "long"
        },
        "insert_id": {
          "type": "long"
        },
        "server_id": {
          "type": "long"
        },
        "sql_text": {
          "type": "string",
          "index": "not_analyzed"
        },
        "thread_id": {
          "type": "long"
        }
      }
    }
  }
}

使用上述的slow_log.json作为template将其注册到Elasticsearch中。

$ curl -XPUT elasticsearch.example.com:9200/_template/slow_log -d "$(cat slow_log.json)"

以下是slow_log的数据库结构,除了将type中的query_time和lock_time设置为long类型之外,其他尽可能按照mysql的模式进行了设置。thread_id和server_id这些字段可能没有什么用途,所以也许可以将它们设置为string类型。

mysql> desc mysql.slow_log;
+----------------+---------------------+------+-----+-------------------+-----------------------------+
| Field          | Type                | Null | Key | Default           | Extra                       |
+----------------+---------------------+------+-----+-------------------+-----------------------------+
| start_time     | timestamp           | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
| user_host      | mediumtext          | NO   |     | NULL              |                             |
| query_time     | time                | NO   |     | NULL              |                             |
| lock_time      | time                | NO   |     | NULL              |                             |
| rows_sent      | int(11)             | NO   |     | NULL              |                             |
| rows_examined  | int(11)             | NO   |     | NULL              |                             |
| db             | varchar(512)        | NO   |     | NULL              |                             |
| last_insert_id | int(11)             | NO   |     | NULL              |                             |
| insert_id      | int(11)             | NO   |     | NULL              |                             |
| server_id      | int(10) unsigned    | NO   |     | NULL              |                             |
| sql_text       | mediumtext          | NO   |     | NULL              |                             |
| thread_id      | bigint(21) unsigned | NO   |     | NULL              |                             |
+----------------+---------------------+------+-----+-------------------+-----------------------------+
12 rows in set (0.00 sec)

执行Embulk

使用以下命令进行执行:

$ embulk run config.yml -b ./

另外,由于embulk-input-mysql支持Resume功能,您可以通过执行以下命令来导入上次执行后的记录。

$ embulk run config.yml -c diff.yml -b ./

通过config.yml中的配置incremental: true和incremental_columns: [start_time],可以使得上次导入的start_time以后的数据在第二次及以后的导入中被接收。
diff.yml将输出以下内容,并且每次执行embulk时,last_record部分都将被更新。

in:
  last_record: ['2017-03-29T21:00:02.000000Z']
out: {}

管理Elasticsearch的日志时间

在embulk-output-elasticsearch_ruby中,通过在索引名称中添加%Y%m%d,可以实现每天更改Elasticsearch索引名称的功能。

in:
 type: mysql
...
中略
...
out:
 type: elasticsearch_ruby
 mode: normal
 nodes:
 - {host: "elasticsearch.example.com", port: 9200 }
 index: slow_log-%Y%m%d
 index_type: log
 request_timeout: 60

通过将描述设置为类似slow_log-20170301的索引名称,可以将数据插入,并按照映射模板设置的索引名称进行分隔。这样,可以根据每天的索引名称将索引分开。通过这样做,我认为可以轻松删除一周前插入的索引。

文中提及

我参考了以下文章。非常感谢。
1. 使用Embulk将ELB的日志导入Elasticsearch的记录
2. 使用embulk mkbundle的方法
3. 整理了Fluentd的批处理版本Embulk的总结

广告
将在 10 秒后关闭
bannerAds