使用【Embulk】从MySQL将数据传输到Elasticsearch

首先

在上一回,我已经把Embulk命令安装到了Mac上。
这一次,我将尝试使用Embulk将MySQL数据库中的数据传输到Elasticsearch。

环境建设

我会在Mac上构建Docker环境。
– MySQL版本为5.7
– Elasticsearch版本为7.9.0
– Kibana版本为7.9.0

Docker 组合

我准备了一个样本供参考。

version: '3.1'

services:
  # MySQL
  db:
    image: mysql:5.7
    container_name: my-example-mysql57
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_ALLOW_EMPTY_PASSWORD: 1
      TZ: "UTC"
    volumes:
      - ./mysql/init:/docker-entrypoint-initdb.d
      - ./mysql/my.cnf:/etc/mysql/conf.d/my.cnf
      - ./docker/mysql/data:/var/lib/mysql
      - ./docker/mysql/log:/var/log/mysql
    ports:
      - 3357:3306
  # Elasticsearch
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch-oss:7.9.0
    container_name: my-example-elasticsearch79
    volumes:
      - ./docker/elasticsearch/data:/usr/share/elasticsearch/data
    ports:
      - 9279:9200
    expose:
      - 9300
    environment:
      - discovery.type=single-node
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - http.host=0.0.0.0
      - transport.host=127.0.0.1
  # Kibana
  kibana:
    image: docker.elastic.co/kibana/kibana-oss:7.9.0
    container_name: my-example-kibana79
    ports:
      - 5679:5601
volumes:
  my-example-elasticsearch79-data:
    driver: local

准备转送数据源

为了验证,我准备了一个简单的表格和数据。

CREATE DATABASE myembulk DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
grant select,update,insert,delete on myembulk.* to root@localhost identified by 'root';
flush privileges;

use myembulk;
CREATE TABLE myblog (
id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
title varchar(255) NOT NULL,
description text,
status tinyint(3) unsigned NOT NULL DEFAULT '0',
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

INSERT INTO myblog VALUES (1,'ブログタイトル1','ブログ本文1',1);
INSERT INTO myblog VALUES (2,'ブログタイトル2','ブログ本文2',1);
INSERT INTO myblog VALUES (3,'ブログタイトル3','ブログ本文3',1);
INSERT INTO myblog VALUES (4,'ブログタイトル4','ブログ本文4',1);
INSERT INTO myblog VALUES (5,'ブログタイトル5','ブログ本文5',1);
INSERT INTO myblog VALUES (6,'ブログタイトル6(削除済)','ブログ本文6',0);

启动Docker容器

如果能以这样的方式完成,那就是成功了。

$ docker-compose up -d

$ docker-compose ps
           Name                         Command               State                 Ports              
-------------------------------------------------------------------------------------------------------
my-example-elasticsearch79   /tini -- /usr/local/bin/do ...   Up      0.0.0.0:9279->9200/tcp, 9300/tcp 
my-example-kibana79          /usr/local/bin/dumb-init - ...   Up      0.0.0.0:5679->5601/tcp           
my-example-mysql57           docker-entrypoint.sh mysqld      Up      0.0.0.0:3357->3306/tcp, 33060/tcp

这样事先需要的准备工作已经完成了。接下来是正题:Embulk。

Embulk 寶寶熱批

安装插件

请安装一个插件,因为数据传输源是MySQL,传输目标是Elasticsearch。

$ embulk gem install embulk-input-mysql
$ embulk gem install embulk-output-elasticsearch

设定环境变量

这次我们使用 direnv 并将以下内容写入 .envrc 文件,但不限于此。

# MySQL
export DB_ADDRESS=127.0.0.1
export DB_PORT=3357
export DB_SCHEMA=myembulk
export DB_USER=root
export DB_PASSWORD=root

# Elasticsearch
export ES_ADDRESS=127.0.0.1
export ES_PORT=9279

Embulk的配置

使用Liquid模板,编写Embulk的配置文件以从MySQL将数据传输到Elasticsearch。
刚刚设置的环境变量表示为{{ env.hoge }}。

{% capture today %}{{ 'now' | date: '%Y%m%d' }}{% endcapture %}
in:
  type: mysql
  host: {{ env.DB_ADDRESS }}
  port: {{ env.DB_PORT }}
  user: {{ env.DB_USER }}
  password: {{ env.DB_PASSWORD }}
  database: {{ env.DB_SCHEMA }}
  table: myblog
  select: "id, title, description, status"
  where: "status = 1"
  default_column_options:
    TIMESTAMP: { type: string, timestamp_format: "%Y-%m-%d %H:%M:%S"}
out:
  type: elasticsearch
  mode: insert
  nodes:
  - {host: {{ env.ES_ADDRESS }}, port: {{ env.ES_PORT }}}
  index: myblog_{{ today }}
  index_type: myblog

我为Embulk准备的文件就只有这一个。

运行Embulk

$ embulk run mysql2elasticsearch.yml.liquid

只要没有错误发生,就算是成功了。

确认数据传输的目的地

如果之前使用了docker-compose,你可以通过以下URL从Kibana中进行确认。

 $ open http://localhost:5679
スクリーンショット 2020-09-04 19.01.02.png

最后

这次我们尝试使用Docker进行最小数据传输。
比我想象的要简单,成功地安装了Embulk(花费了更多时间准备Docker…)
在实际生产环境中,除了环境变量外,似乎还会增加安全性和网络认证等配置,但我认为Embulk本身仍然是一个简单的结构。

bannerAds