首次使用ElasticSearch

Untitled Diagram (6).jpg

教程

構建可进行实时数据处理的无服务器应用程序。

(1)Amazon Kinesis数据流 → (2)Amazon Kinesis数据分析 → (3)Amazon Kinesis Firehose → (4)将输出到Amazon ElasticSearch并尝试进行可视化。

请参考上述教程来构建(1)和(2)。

绊倒的方面

将数据从(3)输出到(4)时,发现Amazon Kinesis Analytics的timestamp列与ElasticSearch的字段格式不匹配,因此需要在Lambda函数中对日志进行整理。

输入数据到Amazon Kinesis Analytic

{
    "Distance": 30.68079984685547,
    "HealthPoints": -326,
    "Latitude": -7.8471452457495126,
    "Longitude": 63.946930336591194,
    "MagicPoints": -254,
    "Name": "unicorn3",
    "StatusTime": "2019-08-06 00:33:50.706"
}

实时分析的SQL

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
  "Name"                VARCHAR(16),
  "StatusTime"          TIMESTAMP,
  "Distance"            SMALLINT,
  "MinMagicPoints"      SMALLINT,
  "MaxMagicPoints"      SMALLINT,
  "MinHealthPoints"     SMALLINT,
  "MaxHealthPoints"     SMALLINT
);
![souce data.PNG](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/347413/24e81729-4186-c8c3-6f58-eb2d16cdfe14.png)

CREATE OR REPLACE PUMP "STREAM_PUMP" AS
  INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM "Name", "ROWTIME", SUM("Distance"), MIN("MagicPoints"), MAX("MagicPoints"), MIN("HealthPoints"), MAX("HealthPoints")
    FROM "SOURCE_SQL_STREAM_001"
    GROUP BY FLOOR("SOURCE_SQL_STREAM_001"."ROWTIME" TO MINUTE), "Name";
Real-time analytics.PNG

使用AWS Lambda对Amazon Kinesis Firehose源记录进行转换的设定。

ElasticSearch的TimeStamp格式为日期T时间,如果使用空格,则会引发以下错误:
“`
{“type”:”mapper_parsing_exception”,”reason”:”无法解析文档中id为\u002749598119027046677940487811470072065833912927359973982210.0\u0027的[date]类型字段[StatusTime]”,”caused_by”:{“type”:”illegal_argument_exception”,”reason”:”无效格式:“2019-08-01 06:42:00.000”中的“00”格式不正确”}}
“`


Analyticsから出力されるStatus Time:2019-08-06 00:48:00.0を2019-08-06T00:48:00へ変換します。
##LambdaDesigner
![lambda-designer.PNG](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/347413/8c8a9641-355a-2a8d-14b8-a1a89b9dbc98.png)

##function
まだpythonを身につけられておらず、スマートではありませんが、

```Lambda_function.py
from __future__ import print_function
import json
import base64

print('Loading function')


def lambda_handler(event, context):
    output = []
    for record in event['records']:
        payload = base64.b64decode(record['data'])
        payload = json.loads(payload)
        s = payload['StatusTime']
        s = s.replace(' ', 'T')
        s = s[:-4]
        data = {
            'Name': payload['Name'],
            'StatusTime': s,
            'Distance': payload['Distance'],
            'MinMagicPoints': payload['MinMagicPoints'],
            'MaxMagicPoints': payload['MaxMagicPoints'],
            'MinHealthPoints': payload['MinHealthPoints'],
            'MaxHealthPoints': payload['MaxHealthPoints'],
        }
        data = json.dumps(data)
        b_data=data.encode()
        b64encoded=base64.b64encode(b_data)
        b64encoded = b64encoded.decode()
        print(b64encoded)
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': b64encoded,
            'approximateArrivalTimestamp': record['approximateArrivalTimestamp']
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(output))

    return {'records': output}

重点在于 output_record 必须包含 {‘result’: ‘Ok’}。
这样,可以转换 StatusTime 的格式。

设置测试事件


{
  "records": [
    {
      "recordId": "49598119027046677940487811666676045100381312741455953922000000",
      "data": "eyJOYW1lIjoidW5jaXJvbjciLCJTdGF0dXNUaW1lIjoiMjAxOS0wOC0wMSAwNzowMzowMC4wMDAiLCJEaXN0YW5jZSI6MTc3NCwiTWluTWFnaWNQb2ludHMiOjE4MSwiTWF4TWFnaWNQb2ludHMiOjE4OSwiTWluSGVhbHRoUG9pbnRzIjotMTkwLCJNYXhIZWFsdGhQb2ludHMiOi0xODN9",
      "approximateArrivalTimestamp": 1564642980052
    },
    {
      "recordId": "49598119027046677940487811666677254026200927370630660098000000",
      "data": "eyJOYW1lIjoidW5pY29ybjUiLCJTdGF0dXNUaW1lIjoiMjAxOS0wOC0wMSAwNzowMzowMC4wMDAiLCJEaXN0YW5jZSI6MTc3MCwiTWluTWFnaWNQb2ludHMiOjgwLCJNYXhNYWdpY1BvaW50cyI6ODksIk1pbkhlYWx0aFBvaW50cyI6NjYxLCJNYXhIZWFsdGhQb2ludHMiOjY2N30=",
      "approximateArrivalTimestamp": 1564642980054
    }
}

Lambda的事件接收到的数据经过Base64编码,并使用zip进行压缩。

設定Amazon Elasticsearch Service的目的地。

启动Amazon ElasticSearch,并指定域名和索引。如果指定的索引不存在,则会自动创建新的索引。然而,由于匹配的多个字段会变成long类型,因此无法将StatusTime作为TimeStamp有效地使用。

因此,通过访问Amazon ElasticSearch的Kibana终端节点,创建索引并进行映射操作。

亚马逊弹性搜索设置

kibana-console.PNG

创建索引命令

PUT /myuserlocation9
{
  "mappings": {
    "type_name": {
      "properties": {
        "Distance": {
          "type": "long"
        },
        "MaxHealthPoints": {
          "type": "long"
        },
        "MaxMagicPoints": {
          "type": "long"
        },
        "MinHealthPoints": {
          "type": "long"
        },
        "MinMagicPoints": {
          "type": "long"
        },
        "Name": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "StatusTime": {
          "type": "date",
          "format": "yyyy-MM-dd'T'HH:mm:ss",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "change": {
          "type": "float"
        },
        "price": {
          "type": "long"
        },
        "sector": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "ticker_symbol": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        }
      }
    }
  }
}

ElasticSearch 数据可视化

visualization.PNG

获取健康检查

GET myuserlocation8/_cluster/health?pretty=true
reaponce
{
  "_index" : "myuserlocation8",
  "_type" : "_cluster",
  "_id" : "health",
  "found" : false
}

从域名中搜索与”unicorn1″匹配的内容

GET myuserlocation8/_search?q=Name:unicorn5
responce
{
  "took" : 21,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 6726,
    "max_score" : 1.6273899,
    "hits" : [
      {
        "_index" : "myuserlocation8",
        "_type" : "type_name",
        "_id" : "49598119027046677940487813128631247686172627365223464962.0",
        "_score" : 1.6273899,
        "_source" : {
          "Name" : "unicorn5",
          "StatusTime" : "2019-08-01T09:38:00",
          "Distance" : 1804,
          "MinMagicPoints" : 113,
          "MaxMagicPoints" : 119,
          "MinHealthPoints" : 731,
          "MaxHealthPoints" : 738
        }
      }
    ]
  }
}

删除索引

DELETE <index_name>?pretty=true
response
{
  "acknowledged" : true
}

构建无服务器的实时数据处理应用程序
首次使用 Elasticsearch
使用 Kibana 4.1.0 + ElasticSearch 1.6.0 进行数据可视化
DateTimeFormatter 类
elastic Gsub Provessor
elastic 格式

广告
将在 10 秒后关闭
bannerAds