想要将嵌套结构的文档在Elasticsearch中转化为扁平结构

概述

投入了数据到Elasticsearch后,可能会出现希望后期更改文档结构的情况。

如果在网站或REST API上连续短时间访问被视为攻击,那真的很麻烦。虽然我们希望可以慢慢加载数据,但这样做将导致重新获取数据需要很长时间。

这是一个关于这种情况的小秘诀。

想做的事情

在原始数据中,数组`posts`存储在`post_stream`下。
想要将`posts`拆分,并重新进行索引。

改变之前

假设有这样的数据。
在post_stream下有两个posts。希望将其重新注册为两个文档。

{
  "field1": "test1",
  "field2": "test2",
  "field3_array": [
    "array1",
    "array2",
    "array3"
  ],
  "post_stream": {
    "posts": [
      {
        "name": "hoge",
        "id": "111"
      },
      {
        "name": "fuga",
        "id": "222"
      }
    ]
  }
}

更改后

我希望这样做。
我将每个帖子都独立成一个文档。

{
   "field1" : "test1",
   "field2" : "test2",
   "field3_array" : [
            "array1",
            "array2",
            "array3"
    ],
    "post" : {
         "name" : "hoge",
         "id" : "111"
     }
}

{
   "field1" : "test1",
   "field2" : "test2",
   "field3_array" : [
            "array1",
            "array2",
            "array3"
    ],
    "post" : {
         "name" : "fuga",
         "id" : "222"
     }
}

搜索先人的智慧

    Possible to ‘flatten’ nested mapping with reindex?

根据这里的信息,有这样的答案。

是的,我认为使用重新索引API并结合脚本可以实现。例如:ctx._source.flat_field = ctx._source.nested_field.field;
但是如果你使用了嵌套映射,该值将会是一个值的数组。
另一个解决方案是使用logstash来实现这个目的。

由于我希望将其作为单独的文档进行注册,而不是作为数组注册,因此决定尝试使用logstash。

Logstash的配置设定

整体的趋势会变成这样。

image.png
Noフェーズ使用プラグイン処理1inputelasticsearchElasticsearchから変換元のデータを取得します2filtersplitposts配列をpostごとに分解します3filtermutate新たに登録しなおすIDを採番します4outputelasticsearchElasticsearchに新しい構造のドキュメントを登録します

我认为使用elasticsearch插件和split的地方是重点。

设置文件

input {
  # Read all documents from Elasticsearch matching the given query
  elasticsearch {
    hosts => "192.168.xxx.xxx"
    index => "original_data"
    docinfo => true
    size => 50
    query => '{
      "query" : {
        "match_all": {}
       }
     }'
  }
}

filter {
    split {
       field => '[post_stream][posts]'
       target => "post"
       remove_field => "[post_stream]"
    }

    mutate {
        update => { "[@metadata][_id]" => "%{[@metadata][_id]}_%{[post][id]}" }
    }
}

output {
    stdout {}

    elasticsearch {
      hosts => "192.168.xxx.xxx"
      index => "conv.%{[@metadata][_index]}"
      document_type => "%{[@metadata][_type]}"
      document_id => "%{[@metadata][_id]}"
    }
}

解说

输入部分

input {
  # Read all documents from Elasticsearch matching the given query
  elasticsearch {
    hosts => "192.168.xxx.xxx"
    index => "original_data"
    docinfo => true
    size => 50
    query => '{
      "query" : {
        "match_all": {}
       }
     }'
  }
}

在这里,虽然文档信息的默认值是false,但我们将其设为了true。

为了能够使用类似下表的ID进行新的分配,我们正在元数据中获取_id和_index的信息。

元のdocumentID子ポストのID新しく登録するdocumentID11111-11112221-22221112-111

过滤器部分

在这方面,我认为解释在官方文档中是最易懂的。一个可能的困难是,当一个嵌套对象变成一个数组时,需要指定它时要怎么写。

以终止符分隔数值的字段。可以是多行信息或数组的标识符。嵌套数组的引用形式为:“[对象ID][数组ID]”。

文件中已经写得很清楚。
在这种情况下,只需写成 [post_stream][posts] 就可以了。

确认

如果你能通过stdout等方式确认output,就能够确认每个post的数据已经分开了。

image.png

最后

你可以将同一Elasticsearch的数据重新放入不同的索引中,也可以将其放入不同的Elasticsearch环境中,这样一来,你不仅可以替换值,还可以完全改变结构,重新开始。

就像Beats系列一样,如果启动快的话,就没什么可说的了嘛…可惜。

bannerAds