使用Embulk将RDS的慢查询日志导入Elasticsearch
在Amazon RDS中,默认配置下,慢查询日志被保存在名为mysql.slow_log的表中。我们可以使用Embulk将存储在mysql.slow_log中的慢查询日志导入到Elasticsearch中进行投入。
安装所需的插件。
使用embulk mkbundle,可以将所需的插件通过Gemfile进行管理。
$ embulk mkbundle embulk-aurora-move-slowlog
$ cd embulk-move-slowlog
在Gemfile中添加所需的插件。由于本次操作是将RDS数据库中的slowlog投入到Elasticsearch中,因此在Gemfile中加入以下的描述。
source 'https://rubygems.org/'
gem 'embulk', '~> 0.8.0'
gem 'embulk-input-mysql', '~> 0.8.2'
gem 'embulk-output-elasticsearch_ruby', '~> 0.1.4'
gem 'embulk-filter-typecast', '~> 0.1.5'
gem 'embulk-filter-column', '~> 0.6.0'
完成Gemfile后,使用以下命令来安装插件。
$ embulk bundle
创建 Embulk 的配置文件
创建一个类似以下的配置文件。
in:
type: mysql
user: xxxxxx
password: yyyyy
database: mysql
table: slow_log
host: zzzzzz.ap-northeast-1.rds.amazonaws.com
select: "*"
column_options:
start_time: {type: string, timestamp_format: "%Y-%m-%d %H:%M:%S+09:00"}
incremental: true
incremental_columns: [start_time]
options:
useLegacyDatetimeCode: false
serverTimezone: UTC
filters:
- type: typecast
columns:
- {name: query_time, type: long}
- {name: lock_time, type: long}
out:
type: elasticsearch_ruby
mode: normal
nodes:
- {host: "elasticsearch.example.com", port: 9200 }
index: slow_log
index_type: log
request_timeout: 60
关于查询时间(query_time)和锁定时间(lock_time),在确认了数据库模式后,发现它们被存储为时间类型(time)。如果保持原样导入到Elasticsearch中,将无法准确传递数值,因此我使用embulk-filter-typecast将其转换为长整型(long)。另外,如果RDS的时区为JST,而embulk采用的是UTC时区,则可能导致搜索失败,或者在使用Resume功能时创建的文件时间被倒退。为了解决这个问题,我将其转换为字符串,并在column_options中添加时区+09:00,将其当作UTC处理,并在Elasticsearch的映射中进行时区调整。
Elasticsearch的映射设置
由于前述的原因,我们需要指定start_time的format与timestamp_format的输出格式相匹配。
{
"template": "slow_log-*",
"mappings": {
"log": {
"properties": {
"start_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ssZZ"
},
"user_host": {
"type": "string",
"index": "not_analyzed"
},
"query_time": {
"type": "long"
},
"lock_time": {
"type": "long"
},
"row_sent": {
"type": "long"
},
"rows_examined": {
"type": "long"
},
"db": {
"type": "string",
"index": "not_analyzed"
},
"last_insert_id": {
"type": "long"
},
"insert_id": {
"type": "long"
},
"server_id": {
"type": "long"
},
"sql_text": {
"type": "string",
"index": "not_analyzed"
},
"thread_id": {
"type": "long"
}
}
}
}
}
使用上述的slow_log.json作为template将其注册到Elasticsearch中。
$ curl -XPUT elasticsearch.example.com:9200/_template/slow_log -d "$(cat slow_log.json)"
以下是slow_log的数据库结构,除了将type中的query_time和lock_time设置为long类型之外,其他尽可能按照mysql的模式进行了设置。thread_id和server_id这些字段可能没有什么用途,所以也许可以将它们设置为string类型。
mysql> desc mysql.slow_log;
+----------------+---------------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+----------------+---------------------+------+-----+-------------------+-----------------------------+
| start_time | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
| user_host | mediumtext | NO | | NULL | |
| query_time | time | NO | | NULL | |
| lock_time | time | NO | | NULL | |
| rows_sent | int(11) | NO | | NULL | |
| rows_examined | int(11) | NO | | NULL | |
| db | varchar(512) | NO | | NULL | |
| last_insert_id | int(11) | NO | | NULL | |
| insert_id | int(11) | NO | | NULL | |
| server_id | int(10) unsigned | NO | | NULL | |
| sql_text | mediumtext | NO | | NULL | |
| thread_id | bigint(21) unsigned | NO | | NULL | |
+----------------+---------------------+------+-----+-------------------+-----------------------------+
12 rows in set (0.00 sec)
执行Embulk
使用以下命令进行执行:
$ embulk run config.yml -b ./
另外,由于embulk-input-mysql支持Resume功能,您可以通过执行以下命令来导入上次执行后的记录。
$ embulk run config.yml -c diff.yml -b ./
通过config.yml中的配置incremental: true和incremental_columns: [start_time],可以使得上次导入的start_time以后的数据在第二次及以后的导入中被接收。
diff.yml将输出以下内容,并且每次执行embulk时,last_record部分都将被更新。
in:
last_record: ['2017-03-29T21:00:02.000000Z']
out: {}
管理Elasticsearch的日志时间
在embulk-output-elasticsearch_ruby中,通过在索引名称中添加%Y%m%d,可以实现每天更改Elasticsearch索引名称的功能。
in:
type: mysql
...
中略
...
out:
type: elasticsearch_ruby
mode: normal
nodes:
- {host: "elasticsearch.example.com", port: 9200 }
index: slow_log-%Y%m%d
index_type: log
request_timeout: 60
通过将描述设置为类似slow_log-20170301的索引名称,可以将数据插入,并按照映射模板设置的索引名称进行分隔。这样,可以根据每天的索引名称将索引分开。通过这样做,我认为可以轻松删除一周前插入的索引。
文中提及
我参考了以下文章。非常感谢。
1. 使用Embulk将ELB的日志导入Elasticsearch的记录
2. 使用embulk mkbundle的方法
3. 整理了Fluentd的批处理版本Embulk的总结