在elasticsearch-hadoop上尝试使用Spark
这篇文章是2014年Elasticsearch Advent Calendar第20天的帖子。
这个话题很疯狂,但我会立刻开始。
请总结一下。
似乎在elasticsearch的主页上,除了Kibana和Logstash等工具之外,还有一个名为elasticsearch-hadoop的产品。
当我看到这个名字时,我的脑海里响起了一个警告的声音,但在接触过程中我逐渐意识到这只是杞人忧天,结果变得越来越有趣了。因此,我想介绍给大家。
elasticsearch-hadoop 是什么?
这个项目的正式名称是「Elasticsearch for Apache Hadoop」。但是,通常被称为elasticsearch-hadoop。
有可能的事情
-
- Spark、Hive、PigなどからHDFSであるかのようにESをストレージとして扱う
Map/Reduceのタスク分散処理をESのノードで実現する 追記
elaticsearch on yarn というYARN上でESを動かすライブラリも含まれている
等等。
在ES节点上执行Map/Reduce的任务分布式处理,似乎需要在Hadoop端将相同的节点指定为TaskTracker,因此elasticsearch-hadoop无法完成该任务,因此取消此操作。
看起来,这些与Apache Hadoop项目相关联的库被称为elasticsearch-hadoop,将它们汇总在一起。
源代码
该项目在Github上以开源方式公开。
截至2014年12月,其稳定版本为v2.0,beta版本为v2.1。
具体信息请参见https://github.com/elasticsearch/elasticsearch-hadoop。
-
- コミット数は1000超(ES本体は10000超、kibanaやlogstashは6000超なので規模は1/10?)
-
- issuesは300, PRは50程度
- costinさんが頑張って作ってる

特点
替代HDFS的映像

举例来说,如果以最左边的MapReduce为例,根据上图所示,它通过处理输入数据的抽象类InputFormat与HDFS进行通信。
而在下图中,通过继承InputFormat类的ESInputFormat类,
将InputFormat原本执行的”输入数据的划分(InputSplit)”和”记录读取器生成(RecodeReader)”的功能对应到了ES上并进行了实现。
通过继承每个项目对HDFS的访问部分,可以实现替代HDFS的功能,这是它的特点。
虽然图画有点令人困惑,但elasticsearch-hadoop是与Hadoop关联的库,而不是在Hadoop上运行的应用程序,请注意。
第二部分:MapReduce和Elasticsearch的分片
在文件中,自夸地表示Map/Reduce的分布特性与ES的分片关系很好地配合。

EsInputFormat从ES获取主分片信息,并生成ShardInputSplit来定义每个分片持有的已分割文档作为输入值。
为了避免数据重复,不使用副本。
Hadoop会根据此生成分割后的Map任务,所以在elasticsearch-hadoop中,shard数=Map任务数。
(这是符合的!)
从ES返回的JSON经过Map<docid, JSON>转化成ShardRecodeReader,为之后的shuffle做准备。
在Map函数后的处理会变成熟悉的MapReduce。
只需要一个选择,用中文本地化地改述以下内容:
向ES的问题查询是通过REST
在InputSplit中获取ES的主分片信息
我将以下的REST请求发送到指定了${es.nodes}的ES。
REST1: 通过使用HTTP获取节点
查询指定的 Elasticsearch 节点的 HTTP 信息。
可用的选项:
curl {es.node}/_nodes/http
响应摘要
-
- cluster_name
-
- nodes
node
name
transport_address
host
ip
version
build
http_address
http
获取分片的方法:使用REST2中的index/_search_shards。
请使用以下的方式来改写这句话:
以本机的方式在中文中改写以下句子:curl ${es.node}/*/_search_shards
回应摘要
-
- shards
shard
state
primary
node
relocating_node
shard
index
nodes
node
在RecodeReader中向ES发出的查询请求。
使用REST3的_search api来获取文档。
使用原始的中文翻译如下:
使用cURL进行XPOST请求,目标链接为${SHARD_HOST}/${INDEX}/_search/scroll,请求参数为”${QUERY}”。
-
- params
search_type=scan
scroll=keepalive(default 10m)
size=(default 50)
preference=_shards:shard;_only_node:node
SHARD_HOST变量是获取到的分片主机,INDEX和QUERY变量是在作业执行前通过Configuration预先指定的(后面会详细说明)。
elasticsearch-hadoop会逐步地通过scan and scroll方式读取给定的查询。由于在文档中找不到相关信息,所以我读了源代码。
使用_scroll_id来获取所需大小的数据,当数据不够时,通过_scroll_id获取剩余的数据。
以自然流利的中文重新表达该句:
通过XPOST方法向${SHARD_HOST}/${INDEX}/_search/scroll发送请求,并带上参数”${scrollId}”。
-
- params
scroll=keepalive(default 10m)
这实际上是一个处理,用于获取与查询匹配的所有文档,即使在需要深度滚动的大型数据集中,使用elasticsearch-hadoop只需指定查询即可实现全量获取。
扫描和滚动是一种用于获取与滚动开始时内容相一致的搜索类型,即使在滚动过程中进行了更新。它通常用于重新索引等操作(由于负载较高,不能用于实时搜索)。
此外,我的看法是
-
- HDFSと比べてREST分のオーバーヘッドがかかるので、単純に同じドキュメントサイズだとHDFSと同パフォーマンスはでないかも
-
- elasticsearch-hadoopが得意とするのは、バッチ処理の中でデータを検索してドキュメントサイズを一気に減らせるようなケースや、
-
- 更新がかかりにくくてキャッシュヒット率の高いデータを扱うケースだと思う
- 扱うデータを選べばHDFS以上のパフォーマンスを発揮してくれそうな気配を感じる(実証なし)
让它动起来试试
one possible option for paraphrasing “目的” in Chinese is “目标” , which means “goal” or “objective.”
假设有这样的情况,我们将使用elasticsearch-hadoop来创建Spark RDD,将其作为教程的一部分。
-
- kibana格好いいから、とりあえずアクセスログを突っ込んだ
-
- しばらくして、kibanaでは出来ないような複雑な解析もしたくなった
- もちろんHadoopなんてない。ログはESにしか入ってない(しかも一ヶ月で吐き捨て)
如果有一个月的时间,用Spark的短批处理似乎可以完全运转。我们试着连接吧。
把所需之物放入
为了模拟环境,我们将记录访问日志。
下载
cd ~/es-advent-calendar
wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.4.2.tar.gz
wget https://download.elasticsearch.org/logstash/logstash/logstash-1.4.2.tar.gz
wget http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0.tgz
tar -zxvf elasticsearch-1.4.2.tar.gz
tar -zxvf logstash-1.4.2.tar.gz
tar zxvf spark-1.1.0.tgz
安装Elasticsearch。
echo '
################################### My Configuration ###################################
cluster.name: logs_cluster
index.number_of_replicas: 0
' >> config/elasticsearch.yml
mkdir config/templates && echo '
{
"template": "access_log-*",
"mappings": {
"_default_": {
"dynamic_templates": [
{
"string_template" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping": {
"type": "string",
"index": "not_analyzed"
}
}
}
],
"properties": {
"request": {
"type": "multi_field",
"fields": {
"request": {
"type": "string",
"index" : "not_analyzed"
},
"split_request": {
"type": "string",
"index" : "analyzed"
}
}
},
"referrer": {
"type": "multi_field",
"fields": {
"referrer": {
"type": "string",
"index" : "not_analyzed"
},
"split_referrer": {
"type": "string",
"index" : "analyzed"
}
}
},
"response": {
"type": "integer"
},
"bytes": {
"type": "integer"
}
}
}
}
}
' > config/templates/access_log_template.json
bin/elasticsearch
安装 Logstash
cd ./logstash-1.4.2
mkdir config && echo '
input {
file {
path => "~/es-advent-calendar/demo-access-logs/access_log-*"
start_position => "beginning"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
break_on_match => false
tag_on_failure => ["_message_parse_failure"]
}
date {
match => ["timestamp", "dd/MMM/YYYY:HH:mm:ss Z"]
locale => en
}
grok {
match => { "request" => "^/%{WORD:first_path}/%{GREEDYDATA}$" }
tag_on_failure => ["_request_parse_failure"]
}
useragent {
source => "agent"
target => "useragent"
}
}
output {
elasticsearch {
host => "localhost"
index => "access_log-%{+YYYY.MM.dd}"
cluster => "logs_cluster"
protocol => "http"
}
}
' > config/logstash.conf
bin/logstash --config config/logstash.conf
安装 Spark
cd spark-1.1.0
mvn -DskipTests clean package
制作Java应用程序
您可以使用Maven来进行elasticsearch-hadoop和spark的版本控制。
由于源代码只有这一部分,所以没有上传至Github。
pom.xml摘录
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>${eshadoop.version}</version>
</dependency>
你好,ElasticsearchSpark.java
package org.jtodo.demo;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapWritable;
public class HelloElasticsearchSpark {
public static void main(String[] args) {
Configuration conf = new Configuration(); ①
conf.set("es.nodes", "localhost:9200");
conf.set("es.resource", "access_log-2014.12.20/logs");
conf.set("es.query", "{\"query\": {\"term\": {\"first_path\": \"action\"}}}");
SparkConf sparkConf = new SparkConf().setAppName("HelloElasticsearchSpark");
sparkConf.set("spark.serializer", org.apache.spark.serializer.KryoSerializer.class.getName());
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaPairRDD<Text, MapWritable> esRDD =
sc.newAPIHadoopRDD(conf, EsInputFormat.class, Text.class, MapWritable.class); ②
System.out.println("Count of records founds is " + esRDD.count());
}
}
要点1:配置
es.nodes(默认为localhost)
指定ES。 不是列举集群中的所有节点,而是在想要对多个集群执行相同查询时列举节点。
当未在”nodes”指定时,将使用默认端口号es.port (默认为9200)。
es.resource是ES _search API的终点。可以通过index/type进行指定,可以使用通配符。
es.query可以通过URI、Query dsl或外部文件进行选择。
URI(或参数)查询 http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search
uri-request.html
es.query = ?q=costinl
查询dsl的文档位于http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-body.html
es.query = { "query" : { "term" : { "user" : "costinl" } } }
外部资源
es.query = org/mypackage/myquery.json
等等。http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/configuration.html
第二点,EsInputFormat。
只要在InputFormat类中使用elasticsearch-hadoop的EsInputFormat,就可以直接重用现有的用于HDFS的Spark驱动程序,只要这部分没有错误。
执行
maven package
cd ~/es-advent-calendar/spark-1.1.0
bin/spark-submit --class org.jtodo.demo.HelloElasticsearchSpark --master local[5] ~/es-advent-calendar/demo-elasticsearch-spark/target/demo-elasticsearch-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar
结果
如果不经过考虑,只使用ES,number_of_shards将保持默认状态,每个索引将生成5个分片。
14/12/07 21:31:59 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, ANY, 9895 bytes)
14/12/07 21:31:59 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, ANY, 9895 bytes)
14/12/07 21:31:59 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, ANY, 9895 bytes)
14/12/07 21:31:59 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, ANY, 9895 bytes)
14/12/07 21:31:59 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4, localhost, ANY, 9895 bytes)
14/12/07 21:31:59 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
14/12/07 21:31:59 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
14/12/07 21:31:59 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
14/12/07 21:31:59 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
14/12/07 21:31:59 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
14/12/07 21:31:59 INFO Executor: Fetching http://localhost:57392/jars/demo-elasticsearch-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1417955518911
14/12/07 21:31:59 INFO Utils: Fetching http://localhost:57392/jars/demo-elasticsearch-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar to /tmp/fetchFileTemp3143049760330881914.tmp
14/12/07 21:31:59 INFO Executor: Adding file:/tmp/spark-f0cec1a7-6496-47a9-bc7b-5f7ad126a3b5/demo-elasticsearch-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar to class loader
14/12/07 21:31:59 INFO NewHadoopRDD: Input split: ShardInputSplit [node=[I9ed1kejTR6aLUTuS55YKw/Jerome Beechman|127.0.0.1:9200],shard=1]
14/12/07 21:31:59 INFO NewHadoopRDD: Input split: ShardInputSplit [node=[I9ed1kejTR6aLUTuS55YKw/Jerome Beechman|127.0.0.1:9200],shard=2]
14/12/07 21:31:59 INFO NewHadoopRDD: Input split: ShardInputSplit [node=[I9ed1kejTR6aLUTuS55YKw/Jerome Beechman|127.0.0.1:9200],shard=3]
14/12/07 21:31:59 INFO NewHadoopRDD: Input split: ShardInputSplit [node=[I9ed1kejTR6aLUTuS55YKw/Jerome Beechman|127.0.0.1:9200],shard=0]
14/12/07 21:31:59 INFO NewHadoopRDD: Input split: ShardInputSplit [node=[I9ed1kejTR6aLUTuS55YKw/Jerome Beechman|127.0.0.1:9200],shard=4]
14/12/07 21:31:59 WARN EsInputFormat: Cannot determine task id...
14/12/07 21:31:59 WARN EsInputFormat: Cannot determine task id...
14/12/07 21:31:59 WARN EsInputFormat: Cannot determine task id...
14/12/07 21:31:59 WARN EsInputFormat: Cannot determine task id...
14/12/07 21:32:00 WARN EsInputFormat: Cannot determine task id...
14/12/07 21:32:00 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1651 bytes result sent to driver
14/12/07 21:32:00 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 1651 bytes result sent to driver
14/12/07 21:32:00 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4). 1651 bytes result sent to driver
14/12/07 21:32:00 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 1651 bytes result sent to driver
14/12/07 21:32:00 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1651 bytes result sent to driver
14/12/07 21:32:00 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 919 ms on localhost (1/5)
14/12/07 21:32:00 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 939 ms on localhost (2/5)
14/12/07 21:32:00 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 939 ms on localhost (3/5)
14/12/07 21:32:00 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 944 ms on localhost (4/5)
14/12/07 21:32:00 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 950 ms on localhost (5/5)
14/12/07 21:32:00 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
14/12/07 21:32:00 INFO DAGScheduler: Stage 0 (count at HelloElasticsearchSpark.java:25) finished in 0.977 s
14/12/07 21:32:00 INFO SparkContext: Job finished: count at HelloElasticsearchSpark.java:25, took 1.08602996 s
Count of records founds is 147
五个任务分别针对每个分片执行。
总结
只需在Spark的驱动程序中指定EsInputFormat,就可以创建来自elasticsearch的RDD,并且还配备了先进的搜索功能。当日志增加时,可以增加number_of_shards进行扩容,还可以完全利用elasticsearch的专业知识,非常好啊。
多亏了elasticsearch-hadoop,年底时可以用Spark来玩耍。
非常感谢您阅读到这里。
接下来的文章是关于@snuffkin的。