首次使用ElasticSearch

教程
構建可进行实时数据处理的无服务器应用程序。
(1)Amazon Kinesis数据流 → (2)Amazon Kinesis数据分析 → (3)Amazon Kinesis Firehose → (4)将输出到Amazon ElasticSearch并尝试进行可视化。
请参考上述教程来构建(1)和(2)。
绊倒的方面
将数据从(3)输出到(4)时,发现Amazon Kinesis Analytics的timestamp列与ElasticSearch的字段格式不匹配,因此需要在Lambda函数中对日志进行整理。
输入数据到Amazon Kinesis Analytic
{
"Distance": 30.68079984685547,
"HealthPoints": -326,
"Latitude": -7.8471452457495126,
"Longitude": 63.946930336591194,
"MagicPoints": -254,
"Name": "unicorn3",
"StatusTime": "2019-08-06 00:33:50.706"
}
实时分析的SQL
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
"Name" VARCHAR(16),
"StatusTime" TIMESTAMP,
"Distance" SMALLINT,
"MinMagicPoints" SMALLINT,
"MaxMagicPoints" SMALLINT,
"MinHealthPoints" SMALLINT,
"MaxHealthPoints" SMALLINT
);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "Name", "ROWTIME", SUM("Distance"), MIN("MagicPoints"), MAX("MagicPoints"), MIN("HealthPoints"), MAX("HealthPoints")
FROM "SOURCE_SQL_STREAM_001"
GROUP BY FLOOR("SOURCE_SQL_STREAM_001"."ROWTIME" TO MINUTE), "Name";

使用AWS Lambda对Amazon Kinesis Firehose源记录进行转换的设定。
ElasticSearch的TimeStamp格式为日期T时间,如果使用空格,则会引发以下错误:
“`
{“type”:”mapper_parsing_exception”,”reason”:”无法解析文档中id为\u002749598119027046677940487811470072065833912927359973982210.0\u0027的[date]类型字段[StatusTime]”,”caused_by”:{“type”:”illegal_argument_exception”,”reason”:”无效格式:“2019-08-01 06:42:00.000”中的“00”格式不正确”}}
“`
Analyticsから出力されるStatus Time:2019-08-06 00:48:00.0を2019-08-06T00:48:00へ変換します。
##LambdaDesigner

##function
まだpythonを身につけられておらず、スマートではありませんが、
```Lambda_function.py
from __future__ import print_function
import json
import base64
print('Loading function')
def lambda_handler(event, context):
output = []
for record in event['records']:
payload = base64.b64decode(record['data'])
payload = json.loads(payload)
s = payload['StatusTime']
s = s.replace(' ', 'T')
s = s[:-4]
data = {
'Name': payload['Name'],
'StatusTime': s,
'Distance': payload['Distance'],
'MinMagicPoints': payload['MinMagicPoints'],
'MaxMagicPoints': payload['MaxMagicPoints'],
'MinHealthPoints': payload['MinHealthPoints'],
'MaxHealthPoints': payload['MaxHealthPoints'],
}
data = json.dumps(data)
b_data=data.encode()
b64encoded=base64.b64encode(b_data)
b64encoded = b64encoded.decode()
print(b64encoded)
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': b64encoded,
'approximateArrivalTimestamp': record['approximateArrivalTimestamp']
}
output.append(output_record)
print('Successfully processed {} records.'.format(output))
return {'records': output}
重点在于 output_record 必须包含 {‘result’: ‘Ok’}。
这样,可以转换 StatusTime 的格式。
设置测试事件
{
"records": [
{
"recordId": "49598119027046677940487811666676045100381312741455953922000000",
"data": "eyJOYW1lIjoidW5jaXJvbjciLCJTdGF0dXNUaW1lIjoiMjAxOS0wOC0wMSAwNzowMzowMC4wMDAiLCJEaXN0YW5jZSI6MTc3NCwiTWluTWFnaWNQb2ludHMiOjE4MSwiTWF4TWFnaWNQb2ludHMiOjE4OSwiTWluSGVhbHRoUG9pbnRzIjotMTkwLCJNYXhIZWFsdGhQb2ludHMiOi0xODN9",
"approximateArrivalTimestamp": 1564642980052
},
{
"recordId": "49598119027046677940487811666677254026200927370630660098000000",
"data": "eyJOYW1lIjoidW5pY29ybjUiLCJTdGF0dXNUaW1lIjoiMjAxOS0wOC0wMSAwNzowMzowMC4wMDAiLCJEaXN0YW5jZSI6MTc3MCwiTWluTWFnaWNQb2ludHMiOjgwLCJNYXhNYWdpY1BvaW50cyI6ODksIk1pbkhlYWx0aFBvaW50cyI6NjYxLCJNYXhIZWFsdGhQb2ludHMiOjY2N30=",
"approximateArrivalTimestamp": 1564642980054
}
}
Lambda的事件接收到的数据经过Base64编码,并使用zip进行压缩。
設定Amazon Elasticsearch Service的目的地。
启动Amazon ElasticSearch,并指定域名和索引。如果指定的索引不存在,则会自动创建新的索引。然而,由于匹配的多个字段会变成long类型,因此无法将StatusTime作为TimeStamp有效地使用。
因此,通过访问Amazon ElasticSearch的Kibana终端节点,创建索引并进行映射操作。
亚马逊弹性搜索设置

创建索引命令
PUT /myuserlocation9
{
"mappings": {
"type_name": {
"properties": {
"Distance": {
"type": "long"
},
"MaxHealthPoints": {
"type": "long"
},
"MaxMagicPoints": {
"type": "long"
},
"MinHealthPoints": {
"type": "long"
},
"MinMagicPoints": {
"type": "long"
},
"Name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"StatusTime": {
"type": "date",
"format": "yyyy-MM-dd'T'HH:mm:ss",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"change": {
"type": "float"
},
"price": {
"type": "long"
},
"sector": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"ticker_symbol": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
}
ElasticSearch 数据可视化

获取健康检查
GET myuserlocation8/_cluster/health?pretty=true
reaponce
{
"_index" : "myuserlocation8",
"_type" : "_cluster",
"_id" : "health",
"found" : false
}
从域名中搜索与”unicorn1″匹配的内容
GET myuserlocation8/_search?q=Name:unicorn5
responce
{
"took" : 21,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 6726,
"max_score" : 1.6273899,
"hits" : [
{
"_index" : "myuserlocation8",
"_type" : "type_name",
"_id" : "49598119027046677940487813128631247686172627365223464962.0",
"_score" : 1.6273899,
"_source" : {
"Name" : "unicorn5",
"StatusTime" : "2019-08-01T09:38:00",
"Distance" : 1804,
"MinMagicPoints" : 113,
"MaxMagicPoints" : 119,
"MinHealthPoints" : 731,
"MaxHealthPoints" : 738
}
}
]
}
}
删除索引
DELETE <index_name>?pretty=true
response
{
"acknowledged" : true
}
构建无服务器的实时数据处理应用程序
首次使用 Elasticsearch
使用 Kibana 4.1.0 + ElasticSearch 1.6.0 进行数据可视化
DateTimeFormatter 类
elastic Gsub Provessor
elastic 格式