使用Elasticsearch构建集群并使用Spark创建索引的简单步骤

1. 摘要和动机

[背景]
我记录了在构建Elasticsearch集群并使用Spark插入数据的步骤。

动机

    • Elasticsearchが流行っているので触ってみたかった。

 

    • 機械学習の自然言語分類で教師データを作るときに、母体のデータから教師データっぽいものを持ってくるために、全文検索が使えないか検討したかった。

 

    • Elasticsearch以外の選択肢として、MySQL8の全文検索機能、MySQL+Mronngaを検討中ですが、indexの再構築の効率性とかを考えたときにスケールアウトしやすいほうがいいんじゃないかって思って検証してみようかと..

 

    母体のデータをHDFSに持っていたのと、Sparkが入れようと思っている基盤に入っているので、使えないかと思いました。

【环境信息】

    • OS: CentOS7

 

    • Elasticsearchバージョン: 6.4

 

    • Kibanaバージョン: 6.4

 

    • Sparkバージョン: 2.2.0 // バージョンは2.xであればOK

 

    • elasticsearch-sparkコネクタのバージョン: 6.3.2

 

    • クラスタ構成台数: 3台

 

    前提条件: Java8がインストールされていること
$ java -version
openjdk version "1.8.0_171"

[参考信息来源]

    • Elasticsearch:

https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html

Kibana:

https://www.elastic.co/guide/en/kibana

Spark

https://docs.databricks.com/spark/latest/data-sources/elasticsearch.html
https://www.elastic.co/guide/en/elasticsearch/hadoop/master/configuration.html

2. 安装Elasticsearch

2.1. 导入GPG-KEY

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

2.2. 设置yum存储库

cat <<EOF > /etc/yum.repos.d/elasticsearch.repo
[elasticsearch-6.x]
name=Elasticsearch repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

2.3. 安装 Elasticsearch

yum -y install elasticsearch

2.4. 服务的启动和自动启动设置

systemctl start elasticsearch.service
systemctl enable elasticsearch.service

2.5. 文件结构备忘录

タイプ説明デフォルトロケーションelasticsearch.ymlの設定項目homeElasticsearchのホームディレクトリ/usr/share/elasticsearch-binバイナリ系のスクリプトとpluginインストールで入るスクリプト/usr/share/elasticsearch/bin-configElasticsearchの設定ファイル/etc/elasticsearch/elasticsearch.yml-configElasticsearchの環境変数設定、ヒープサイズなど/etc/sysconfig/elasticsearch-dataデータの格納先/var/lib/elasticsearchpath.datalogsログファイルの出力先/var/log/elasticsearchpath.logspluginsプラグインの格納先/usr/share/elasticsearch/plugins

2.6. 集群配置

2.6.1. 修改设置

修改/etc/elasticsearch/elasticsearch.yml文件的设置。
修改的项目如下所示。
应对所有节点进行设置。

# クラスタ名 全ノードで揃える必要がある
cluster.name: dev-application

# ノードの名前 ここだけノードごとに変更する
node.name: node1

# listenするIP
network.host: 0.0.0.0

# HTTP接続のポート
http.port: 9200

# 新しいノードが追加されたときに、新ノードに渡すノードのリスト
# 全ノードを指定する
discovery.zen.ping.unicast.hosts: ["192.168.100.120", "192.168.100.122", "192.168.100.123"]

# マスターノードを選ぶ際の master eligible node(master候補になるノード数)
discovery.zen.minimum_master_nodes: 1

2.6.2. 完成设置后重新启动

sudo systemctl restart elasticsearch

2.6.3. 连接确认

# 設定したクラスタ名、台数で構築できているか確認できればOK
$ curl http://192.168.100.123:9200/_cluster/health?pretty
{
  "cluster_name" : "dev-application",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 3,
  "number_of_data_nodes" : 3,
  "active_primary_shards" : 0,
  "active_shards" : 0,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0
}
# 3台で構築できているか確認
# *のついているnode2がmaster
curl http://192.168.100.123:9200/_cat/nodes

192.168.100.120 33 23 0 0.01 0.04 0.05 mdi * node2
192.168.100.122 31 46 1 0.11 0.07 0.11 mdi - node3
192.168.100.123 23 97 0 0.00 0.02 0.05 mdi - node1
# masterノードの確認
curl http://192.168.100.123:9200/_cat/master

7Vr0NVHNRJ2LvkCGLJN4HA 192.168.100.120 192.168.100.120 node2

3. Kibana的安装

这是一个选项。
在任何节点上进行安装。

3.1. 导入GPG-KEY

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

3.2. 设置yum仓库

cat <<EOF > /etc/yum.repos.d/kibana.repo
[kibana-6.x]
name=Kibana repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

3.3. 安装Kibana

yum -y install kibana

3.4. 服务的启动和自动启动设置

systemctl start kibana.service
systemctl enable kibana.service

3.5. 目录结构备忘录

タイプ説明デフォルトロケーションkibana.ymlの設定項目homeKibanaのホームディレクトリ/usr/share/kibana-binバイナリ系のスクリプトとpluginインストールで入るスクリプト/usr/share/kibana/bin-configKibanaの設定ファイル/etc/kibana/kibana.yml-dataデータの格納先/var/lib/kibanapath.dataoptimizeプラグインなどによってトランスコードされたソースコード。特定の管理アクション??/usr/share/kibana/optimize-pluginsプラグインの格納先/usr/share/kibana/plugins

3.6. 更改设置

更改 /etc/kibana/kibana.yml 文件中的以下部分。

# KibanaをインストールしたノードのIPを指定する
# ここがlocalhostのままだと外部から接続できない
server.host: "192.168.100.123"

# elasticsearchをインストールしたノードのどれかを指定する
elasticsearch.url: "http://192.168.100.122:9200"

重新启动。

systemctl restart kibana.service

3.7. 连接确认

只要能在 http://[安装Kibana的节点的IP]:5601/ 上连接成功就可以。

kibana_init.png

4. 安装 Spark

通过这里下载Spark。虽然我正在下载2.2版本,但我认为你应该下载当前最新版本。

cd ~
curl -O http://ftp.jaist.ac.jp/pub/apache/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar xvfz spark-2.2.0-bin-hadoop2.7.tgz

设置路径

echo 'export SPARK_HOME=$HOME/spark-2.3.0-bin-hadoop2.7' >> ~/.bash_profile
echo 'export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin' >> ~/.bash_profile
source ~/.bash_profile

启动Spark并将数据连接到Elasticsearch。

5.1 下载elsticsearch-spark连接器的jar文件。

cd $HOME

wget http://central.maven.org/maven2/org/elasticsearch/elasticsearch-spark-20_2.11/6.3.2/elasticsearch-spark-20_2.11-6.3.2.jar

5.2. 准备测试数据

# ファイルのダウンロード
wget -O /tmp/akc_breed_info.csv https://query.data.world/s/msmjhcmdjslsvjzcaqmtreu52gkuno

# データ件数の確認
wc -l /tmp/akc_breed_info.csv
150

5.3. 使用Spark创建索引

如果存在索引,请提前删除索引。

curl -XDELETE http://192.168.100.123:9200/index/

启动spark-shell。

spark-shell --jars $HOME/elasticsearch-spark-20_2.11-6.3.2.jar
scala> val df = spark.read.option("header","true").csv("file:///tmp/akc_breed_info.csv")
df: org.apache.spark.sql.DataFrame = [Breed: string, height_low_inches: string ... 3 more fields]

scala> df.show
+--------------------+-----------------+------------------+--------------+---------------+
|               Breed|height_low_inches|height_high_inches|weight_low_lbs|weight_high_lbs|
+--------------------+-----------------+------------------+--------------+---------------+
|               Akita|               26|                28|            80|            120|
|  Anatolian Sheepdog|               27|                29|           100|            150|
|Bernese Mountain Dog|               23|                27|            85|            110|
|          Bloodhound|               24|                26|            80|            120|
|              Borzoi|               26|                28|            70|            100|
|         Bullmastiff|               25|                27|           100|            130|
|          Great Dane|               32|                32|           120|            160|
|      Great Pyrenees|               27|                32|            95|            120|
|Great Swiss Mount...|               23|                28|           130|            150|
|     Irish Wolfhound|               28|                35|            90|            150|
|              Kuvasz|               28|                30|            70|            120|
|             Mastiff|               27|                30|           175|            190|
|  Neopolitan Mastiff|               24|                30|           100|            150|
|        Newfoundland|               26|                28|           100|            150|
|         Otter Hound|               24|                26|            65|            110|
|          Rottweiler|               22|                27|            90|            110|
|       Saint Bernard|               25|                28|           110|            190|
|        Afghan Hound|               25|                27|            50|             60|
|    Alaskan Malamute|               na|                na|            na|             na|
|   American Foxhound|               22|                25|            65|             70|
+--------------------+-----------------+------------------+--------------+---------------+
only showing top 20 rows

// Elasticsearchのクラスタノードを指定します。
scala> val esURL = "192.168.100.120,192.168.100.122,192.168.100.123"
esURL: String = 192.168.100.120,192.168.100.122,192.168.100.123

scala> :paste
// Entering paste mode (ctrl-D to finish)

df.write
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","false")
  .option("es.port","9200")
  .option("es.net.ssl","false")
  .option("es.nodes", esURL)
  .mode("Overwrite")
  .save("sample/dogs") // index: sample, type: dogsという名前で作成

// Exiting paste mode, now interpreting.

似乎是先进去了。

5.4. 确认数据投入之后

我安装了Kibana,所以连接到了Kibana的用户界面
由于Dev Tools看起来很方便,所以我从这里开始…
Dev Tools的编辑器有自动补全功能,非常易用。

[Kibanaをインストールしたノード]上的URL链接为:http://[Kibanaをインストールしたノード]:5601。

在编辑器中写下以下内容,并发出请求,可以确认数据是否已经被输入。

    全てのドキュメントを検索
GET /sample/dogs/_search
    指定した単語を含むドキュメント検索
GET /sample/dogs/_search
{
  "query": {
    "match": {
      "Breed": "Rough"
    }
  }
}
elastic_query_search.png

待办事项(稍后验证)

    • 日本語形態素解析(MeCab, Kuromoji)の設定調査。

 

    MySQLサーバとの連携(今回はSparkと連携したが、データソースがMySQLにもあるので連携できないか調査する)
bannerAds