在elasticsearch-hadoop上尝试使用Spark

这篇文章是2014年Elasticsearch Advent Calendar第20天的帖子。

这个话题很疯狂,但我会立刻开始。

请总结一下。

似乎在elasticsearch的主页上,除了Kibana和Logstash等工具之外,还有一个名为elasticsearch-hadoop的产品。

当我看到这个名字时,我的脑海里响起了一个警告的声音,但在接触过程中我逐渐意识到这只是杞人忧天,结果变得越来越有趣了。因此,我想介绍给大家。

elasticsearch-hadoop 是什么?

这个项目的正式名称是「Elasticsearch for Apache Hadoop」。但是,通常被称为elasticsearch-hadoop。

有可能的事情

    • Spark、Hive、PigなどからHDFSであるかのようにESをストレージとして扱う

Map/Reduceのタスク分散処理をESのノードで実現する 追記

elaticsearch on yarn というYARN上でESを動かすライブラリも含まれている

等等。

在ES节点上执行Map/Reduce的任务分布式处理,似乎需要在Hadoop端将相同的节点指定为TaskTracker,因此elasticsearch-hadoop无法完成该任务,因此取消此操作。

看起来,这些与Apache Hadoop项目相关联的库被称为elasticsearch-hadoop,将它们汇总在一起。

源代码

该项目在Github上以开源方式公开。
截至2014年12月,其稳定版本为v2.0,beta版本为v2.1。
具体信息请参见https://github.com/elasticsearch/elasticsearch-hadoop。

    • コミット数は1000超(ES本体は10000超、kibanaやlogstashは6000超なので規模は1/10?)

 

    • issuesは300, PRは50程度

 

    costinさんが頑張って作ってる
image-es-advent-calendar1.png

特点

替代HDFS的映像

image-es-advent-calendar2.png

举例来说,如果以最左边的MapReduce为例,根据上图所示,它通过处理输入数据的抽象类InputFormat与HDFS进行通信。
而在下图中,通过继承InputFormat类的ESInputFormat类,
将InputFormat原本执行的”输入数据的划分(InputSplit)”和”记录读取器生成(RecodeReader)”的功能对应到了ES上并进行了实现。

通过继承每个项目对HDFS的访问部分,可以实现替代HDFS的功能,这是它的特点。

虽然图画有点令人困惑,但elasticsearch-hadoop是与Hadoop关联的库,而不是在Hadoop上运行的应用程序,请注意。

第二部分:MapReduce和Elasticsearch的分片

在文件中,自夸地表示Map/Reduce的分布特性与ES的分片关系很好地配合。

image-es-advent-calendar3.png

EsInputFormat从ES获取主分片信息,并生成ShardInputSplit来定义每个分片持有的已分割文档作为输入值。
为了避免数据重复,不使用副本。
Hadoop会根据此生成分割后的Map任务,所以在elasticsearch-hadoop中,shard数=Map任务数。
(这是符合的!)

从ES返回的JSON经过Map<docid, JSON>转化成ShardRecodeReader,为之后的shuffle做准备。
在Map函数后的处理会变成熟悉的MapReduce。

只需要一个选择,用中文本地化地改述以下内容:
向ES的问题查询是通过REST

在InputSplit中获取ES的主分片信息

我将以下的REST请求发送到指定了${es.nodes}的ES。

REST1: 通过使用HTTP获取节点

查询指定的 Elasticsearch 节点的 HTTP 信息。

可用的选项:
curl {es.node}/_nodes/http

响应摘要

    • cluster_name

 

    • nodes

node

name
transport_address
host
ip
version
build
http_address
http

获取分片的方法:使用REST2中的index/_search_shards。

请使用以下的方式来改写这句话:

以本机的方式在中文中改写以下句子:curl ${es.node}/*/_search_shards

回应摘要

    • shards

shard

state
primary
node
relocating_node
shard
index

nodes

node

在RecodeReader中向ES发出的查询请求。

使用REST3的_search api来获取文档。

使用原始的中文翻译如下:
使用cURL进行XPOST请求,目标链接为${SHARD_HOST}/${INDEX}/_search/scroll,请求参数为”${QUERY}”。

    • params

search_type=scan
scroll=keepalive(default 10m)
size=(default 50)
preference=_shards:shard;_only_node:node

SHARD_HOST变量是获取到的分片主机,INDEX和QUERY变量是在作业执行前通过Configuration预先指定的(后面会详细说明)。
elasticsearch-hadoop会逐步地通过scan and scroll方式读取给定的查询。由于在文档中找不到相关信息,所以我读了源代码。
使用_scroll_id来获取所需大小的数据,当数据不够时,通过_scroll_id获取剩余的数据。

以自然流利的中文重新表达该句:
通过XPOST方法向${SHARD_HOST}/${INDEX}/_search/scroll发送请求,并带上参数”${scrollId}”。

    • params

scroll=keepalive(default 10m)

这实际上是一个处理,用于获取与查询匹配的所有文档,即使在需要深度滚动的大型数据集中,使用elasticsearch-hadoop只需指定查询即可实现全量获取。

扫描和滚动是一种用于获取与滚动开始时内容相一致的搜索类型,即使在滚动过程中进行了更新。它通常用于重新索引等操作(由于负载较高,不能用于实时搜索)。

此外,我的看法是

    • HDFSと比べてREST分のオーバーヘッドがかかるので、単純に同じドキュメントサイズだとHDFSと同パフォーマンスはでないかも

 

    • elasticsearch-hadoopが得意とするのは、バッチ処理の中でデータを検索してドキュメントサイズを一気に減らせるようなケースや、

 

    • 更新がかかりにくくてキャッシュヒット率の高いデータを扱うケースだと思う

 

    扱うデータを選べばHDFS以上のパフォーマンスを発揮してくれそうな気配を感じる(実証なし)

让它动起来试试

one possible option for paraphrasing “目的” in Chinese is “目标” , which means “goal” or “objective.”

假设有这样的情况,我们将使用elasticsearch-hadoop来创建Spark RDD,将其作为教程的一部分。

    • kibana格好いいから、とりあえずアクセスログを突っ込んだ

 

    • しばらくして、kibanaでは出来ないような複雑な解析もしたくなった

 

    もちろんHadoopなんてない。ログはESにしか入ってない(しかも一ヶ月で吐き捨て)

如果有一个月的时间,用Spark的短批处理似乎可以完全运转。我们试着连接吧。

把所需之物放入

为了模拟环境,我们将记录访问日志。

入れるものバージョンelasticsearch1.4.2logstash1.4.2spark1.1.0

下载

cd ~/es-advent-calendar

wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.4.2.tar.gz
wget https://download.elasticsearch.org/logstash/logstash/logstash-1.4.2.tar.gz
wget http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0.tgz

tar -zxvf elasticsearch-1.4.2.tar.gz 
tar -zxvf logstash-1.4.2.tar.gz 
tar zxvf spark-1.1.0.tgz

安装Elasticsearch。

echo '
################################### My Configuration ###################################

cluster.name: logs_cluster
index.number_of_replicas: 0
' >> config/elasticsearch.yml

mkdir config/templates && echo '
{
  "template": "access_log-*",
  "mappings": {
    "_default_": {
      "dynamic_templates": [
        {
          "string_template" : {
            "match" : "*",
            "match_mapping_type" : "string",
            "mapping": {
              "type": "string",
              "index": "not_analyzed"
            }
          }
        }
      ],
      "properties": {
        "request": {
          "type": "multi_field",
          "fields": {
            "request": {
              "type": "string",
              "index" : "not_analyzed"
            },
            "split_request": {
              "type": "string",
              "index" : "analyzed"
            }
          }
        },
        "referrer": {
          "type": "multi_field",
          "fields": {
            "referrer": {
              "type": "string",
              "index" : "not_analyzed"
            },
            "split_referrer": {
              "type": "string",
              "index" : "analyzed"
            }
          }
        },
        "response": {
          "type": "integer"
        },
        "bytes": {
          "type": "integer"
        }
      }
    }
  }
}
' > config/templates/access_log_template.json

bin/elasticsearch

安装 Logstash

cd ./logstash-1.4.2

mkdir config && echo '
input {
  file {
    path => "~/es-advent-calendar/demo-access-logs/access_log-*"
    start_position => "beginning"
  }
}

filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
    break_on_match => false
    tag_on_failure => ["_message_parse_failure"]
  }
  date {
    match => ["timestamp", "dd/MMM/YYYY:HH:mm:ss Z"]
    locale => en
  }
  grok {
    match => { "request" => "^/%{WORD:first_path}/%{GREEDYDATA}$" }
    tag_on_failure => ["_request_parse_failure"]
  }
  useragent {
    source => "agent"
    target => "useragent"
  }
}

output {
  elasticsearch {
    host => "localhost"
    index => "access_log-%{+YYYY.MM.dd}"
    cluster => "logs_cluster"
    protocol => "http"
  }
}
' > config/logstash.conf

bin/logstash --config config/logstash.conf

安装 Spark

cd spark-1.1.0
mvn -DskipTests clean package

制作Java应用程序

您可以使用Maven来进行elasticsearch-hadoop和spark的版本控制。

由于源代码只有这一部分,所以没有上传至Github。

pom.xml摘录

    <dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch-hadoop</artifactId>
      <version>${eshadoop.version}</version>
    </dependency>

你好,ElasticsearchSpark.java

package org.jtodo.demo;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapWritable;

public class HelloElasticsearchSpark {

    public static void main(String[] args) {
        Configuration conf = new Configuration(); ①
        conf.set("es.nodes", "localhost:9200");
        conf.set("es.resource", "access_log-2014.12.20/logs");
        conf.set("es.query", "{\"query\": {\"term\": {\"first_path\": \"action\"}}}");

        SparkConf sparkConf = new SparkConf().setAppName("HelloElasticsearchSpark");
        sparkConf.set("spark.serializer", org.apache.spark.serializer.KryoSerializer.class.getName());

        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        JavaPairRDD<Text, MapWritable> esRDD = 
            sc.newAPIHadoopRDD(conf, EsInputFormat.class, Text.class, MapWritable.class); ②
        System.out.println("Count of records founds is " + esRDD.count());
    }
}

要点1:配置

es.nodes(默认为localhost)
指定ES。 不是列举集群中的所有节点,而是在想要对多个集群执行相同查询时列举节点。

当未在”nodes”指定时,将使用默认端口号es.port (默认为9200)。

es.resource是ES _search API的终点。可以通过index/type进行指定,可以使用通配符。

es.query可以通过URI、Query dsl或外部文件进行选择。

URI(或参数)查询 http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search

uri-request.html
es.query = ?q=costinl

查询dsl的文档位于http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-body.html

es.query = { "query" : { "term" : { "user" : "costinl" } } }

外部资源

es.query = org/mypackage/myquery.json

等等。http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/configuration.html

第二点,EsInputFormat。

只要在InputFormat类中使用elasticsearch-hadoop的EsInputFormat,就可以直接重用现有的用于HDFS的Spark驱动程序,只要这部分没有错误。

执行

maven package

cd ~/es-advent-calendar/spark-1.1.0
bin/spark-submit --class org.jtodo.demo.HelloElasticsearchSpark --master local[5] ~/es-advent-calendar/demo-elasticsearch-spark/target/demo-elasticsearch-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar

结果

如果不经过考虑,只使用ES,number_of_shards将保持默认状态,每个索引将生成5个分片。

14/12/07 21:31:59 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, ANY, 9895 bytes)
14/12/07 21:31:59 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, ANY, 9895 bytes)
14/12/07 21:31:59 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, ANY, 9895 bytes)
14/12/07 21:31:59 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, ANY, 9895 bytes)
14/12/07 21:31:59 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4, localhost, ANY, 9895 bytes)
14/12/07 21:31:59 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
14/12/07 21:31:59 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
14/12/07 21:31:59 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
14/12/07 21:31:59 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
14/12/07 21:31:59 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
14/12/07 21:31:59 INFO Executor: Fetching http://localhost:57392/jars/demo-elasticsearch-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1417955518911
14/12/07 21:31:59 INFO Utils: Fetching http://localhost:57392/jars/demo-elasticsearch-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar to /tmp/fetchFileTemp3143049760330881914.tmp
14/12/07 21:31:59 INFO Executor: Adding file:/tmp/spark-f0cec1a7-6496-47a9-bc7b-5f7ad126a3b5/demo-elasticsearch-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar to class loader
14/12/07 21:31:59 INFO NewHadoopRDD: Input split: ShardInputSplit [node=[I9ed1kejTR6aLUTuS55YKw/Jerome Beechman|127.0.0.1:9200],shard=1]
14/12/07 21:31:59 INFO NewHadoopRDD: Input split: ShardInputSplit [node=[I9ed1kejTR6aLUTuS55YKw/Jerome Beechman|127.0.0.1:9200],shard=2]
14/12/07 21:31:59 INFO NewHadoopRDD: Input split: ShardInputSplit [node=[I9ed1kejTR6aLUTuS55YKw/Jerome Beechman|127.0.0.1:9200],shard=3]
14/12/07 21:31:59 INFO NewHadoopRDD: Input split: ShardInputSplit [node=[I9ed1kejTR6aLUTuS55YKw/Jerome Beechman|127.0.0.1:9200],shard=0]
14/12/07 21:31:59 INFO NewHadoopRDD: Input split: ShardInputSplit [node=[I9ed1kejTR6aLUTuS55YKw/Jerome Beechman|127.0.0.1:9200],shard=4]
14/12/07 21:31:59 WARN EsInputFormat: Cannot determine task id...
14/12/07 21:31:59 WARN EsInputFormat: Cannot determine task id...
14/12/07 21:31:59 WARN EsInputFormat: Cannot determine task id...
14/12/07 21:31:59 WARN EsInputFormat: Cannot determine task id...
14/12/07 21:32:00 WARN EsInputFormat: Cannot determine task id...
14/12/07 21:32:00 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1651 bytes result sent to driver
14/12/07 21:32:00 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 1651 bytes result sent to driver
14/12/07 21:32:00 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4). 1651 bytes result sent to driver
14/12/07 21:32:00 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 1651 bytes result sent to driver
14/12/07 21:32:00 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1651 bytes result sent to driver
14/12/07 21:32:00 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 919 ms on localhost (1/5)
14/12/07 21:32:00 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 939 ms on localhost (2/5)
14/12/07 21:32:00 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 939 ms on localhost (3/5)
14/12/07 21:32:00 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 944 ms on localhost (4/5)
14/12/07 21:32:00 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 950 ms on localhost (5/5)
14/12/07 21:32:00 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/12/07 21:32:00 INFO DAGScheduler: Stage 0 (count at HelloElasticsearchSpark.java:25) finished in 0.977 s
14/12/07 21:32:00 INFO SparkContext: Job finished: count at HelloElasticsearchSpark.java:25, took 1.08602996 s
Count of records founds is 147

五个任务分别针对每个分片执行。

总结

只需在Spark的驱动程序中指定EsInputFormat,就可以创建来自elasticsearch的RDD,并且还配备了先进的搜索功能。当日志增加时,可以增加number_of_shards进行扩容,还可以完全利用elasticsearch的专业知识,非常好啊。

多亏了elasticsearch-hadoop,年底时可以用Spark来玩耍。

非常感谢您阅读到这里。
接下来的文章是关于@snuffkin的。

bannerAds