使用【Embulk】从MySQL将数据传输到Elasticsearch
首先
在上一回,我已经把Embulk命令安装到了Mac上。
这一次,我将尝试使用Embulk将MySQL数据库中的数据传输到Elasticsearch。
环境建设
我会在Mac上构建Docker环境。
– MySQL版本为5.7
– Elasticsearch版本为7.9.0
– Kibana版本为7.9.0
Docker 组合
我准备了一个样本供参考。
version: '3.1'
services:
# MySQL
db:
image: mysql:5.7
container_name: my-example-mysql57
restart: always
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_ALLOW_EMPTY_PASSWORD: 1
TZ: "UTC"
volumes:
- ./mysql/init:/docker-entrypoint-initdb.d
- ./mysql/my.cnf:/etc/mysql/conf.d/my.cnf
- ./docker/mysql/data:/var/lib/mysql
- ./docker/mysql/log:/var/log/mysql
ports:
- 3357:3306
# Elasticsearch
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch-oss:7.9.0
container_name: my-example-elasticsearch79
volumes:
- ./docker/elasticsearch/data:/usr/share/elasticsearch/data
ports:
- 9279:9200
expose:
- 9300
environment:
- discovery.type=single-node
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- http.host=0.0.0.0
- transport.host=127.0.0.1
# Kibana
kibana:
image: docker.elastic.co/kibana/kibana-oss:7.9.0
container_name: my-example-kibana79
ports:
- 5679:5601
volumes:
my-example-elasticsearch79-data:
driver: local
准备转送数据源
为了验证,我准备了一个简单的表格和数据。
CREATE DATABASE myembulk DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
grant select,update,insert,delete on myembulk.* to root@localhost identified by 'root';
flush privileges;
use myembulk;
CREATE TABLE myblog (
id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
title varchar(255) NOT NULL,
description text,
status tinyint(3) unsigned NOT NULL DEFAULT '0',
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO myblog VALUES (1,'ブログタイトル1','ブログ本文1',1);
INSERT INTO myblog VALUES (2,'ブログタイトル2','ブログ本文2',1);
INSERT INTO myblog VALUES (3,'ブログタイトル3','ブログ本文3',1);
INSERT INTO myblog VALUES (4,'ブログタイトル4','ブログ本文4',1);
INSERT INTO myblog VALUES (5,'ブログタイトル5','ブログ本文5',1);
INSERT INTO myblog VALUES (6,'ブログタイトル6(削除済)','ブログ本文6',0);
启动Docker容器
如果能以这样的方式完成,那就是成功了。
$ docker-compose up -d
$ docker-compose ps
Name Command State Ports
-------------------------------------------------------------------------------------------------------
my-example-elasticsearch79 /tini -- /usr/local/bin/do ... Up 0.0.0.0:9279->9200/tcp, 9300/tcp
my-example-kibana79 /usr/local/bin/dumb-init - ... Up 0.0.0.0:5679->5601/tcp
my-example-mysql57 docker-entrypoint.sh mysqld Up 0.0.0.0:3357->3306/tcp, 33060/tcp
这样事先需要的准备工作已经完成了。接下来是正题:Embulk。
Embulk 寶寶熱批
安装插件
请安装一个插件,因为数据传输源是MySQL,传输目标是Elasticsearch。
$ embulk gem install embulk-input-mysql
$ embulk gem install embulk-output-elasticsearch
设定环境变量
这次我们使用 direnv 并将以下内容写入 .envrc 文件,但不限于此。
# MySQL
export DB_ADDRESS=127.0.0.1
export DB_PORT=3357
export DB_SCHEMA=myembulk
export DB_USER=root
export DB_PASSWORD=root
# Elasticsearch
export ES_ADDRESS=127.0.0.1
export ES_PORT=9279
Embulk的配置
使用Liquid模板,编写Embulk的配置文件以从MySQL将数据传输到Elasticsearch。
刚刚设置的环境变量表示为{{ env.hoge }}。
{% capture today %}{{ 'now' | date: '%Y%m%d' }}{% endcapture %}
in:
type: mysql
host: {{ env.DB_ADDRESS }}
port: {{ env.DB_PORT }}
user: {{ env.DB_USER }}
password: {{ env.DB_PASSWORD }}
database: {{ env.DB_SCHEMA }}
table: myblog
select: "id, title, description, status"
where: "status = 1"
default_column_options:
TIMESTAMP: { type: string, timestamp_format: "%Y-%m-%d %H:%M:%S"}
out:
type: elasticsearch
mode: insert
nodes:
- {host: {{ env.ES_ADDRESS }}, port: {{ env.ES_PORT }}}
index: myblog_{{ today }}
index_type: myblog
我为Embulk准备的文件就只有这一个。
运行Embulk
$ embulk run mysql2elasticsearch.yml.liquid
只要没有错误发生,就算是成功了。
确认数据传输的目的地
如果之前使用了docker-compose,你可以通过以下URL从Kibana中进行确认。
$ open http://localhost:5679

最后
这次我们尝试使用Docker进行最小数据传输。
比我想象的要简单,成功地安装了Embulk(花费了更多时间准备Docker…)
在实际生产环境中,除了环境变量外,似乎还会增加安全性和网络认证等配置,但我认为Embulk本身仍然是一个简单的结构。