在Node-RED中如何使用Elasticsearch的方法
首先
由于在Node-RED中经常向Elasticsearch投放数据,因此我想写一些经验技巧。
虽然有各种附加节点,但Elasticsearch的版本更新很快,API也可能会发生变化。
因此,我认为最好的做法是结合使用标准节点。
软件环境
-
- Node-RED v1.0.3
- Elasticsearch 7.3.2
将流程细分化
首先,将与Elasticsearch通信的http节点拆分成子流程。这样,每次在安装节点时都不需要设置诸如BASIC身份验证之类的信息。另外,让我们始终在头部添加”Content-Type: ‘application/json'”。这样做可以节省一些连接Elasticsearch的设置时间。
由于URL和方法会根据请求的不同而变化,因此在子流程中不需要进行设置。

我写的代码如下。
[{"id":"3f7774f5.a6a20c","type":"subflow","name":"Elasticsearch","info":"","category":"","in":[{"x":50,"y":30,"wires":[{"id":"c5bb15c.4c10de8"}]}],"out":[{"x":580,"y":30,"wires":[{"id":"19d2a3b.d048a5c","port":0}]}],"env":[],"color":"#DDAA99"},{"id":"eeda88c1.f8b278","type":"http request","z":"3f7774f5.a6a20c","name":"Elasticsearch","method":"use","ret":"txt","paytoqs":false,"url":"","tls":"","persist":false,"proxy":"","authType":"basic","x":340,"y":30,"wires":[["19d2a3b.d048a5c"]]},{"id":"c5bb15c.4c10de8","type":"change","z":"3f7774f5.a6a20c","name":"msg.headers","rules":[{"t":"delete","p":"headers","pt":"msg"},{"t":"set","p":"headers","pt":"msg","to":"{\"Content-Type\":\"application/json\",\"Connection\":\"close\"}","tot":"json"}],"action":"","property":"","from":"","to":"","reg":false,"x":170,"y":30,"wires":[["eeda88c1.f8b278"]]},{"id":"19d2a3b.d048a5c","type":"json","z":"3f7774f5.a6a20c","name":"","property":"payload","action":"","pretty":false,"x":490,"y":30,"wires":[[]]}]
1. 数据导入
以下是一个接收来自环境传感器的信标并将数据投入Elasticsearch的示例,它被称为连接设备。

从连接扫描器节点传递的数据如下所示。

为了将此数据输入到Elasticsearch中,我们在Change节点中设置msg.method和msg.url。
msg.url使用前一个节点的值来生成动态URL。
虽然编写程序往往会依赖于function节点,但实际上可以不使用它来生成。

顺便提一句,使用JSONata表达式可以在Change节点中生成当前时间。

使用模板节点生成要投入到Elasticsearch的JSON数据。
使用Mustache模板,可以使用msg对象内的数据。

这个流程的输出在这里。(包括Elasticsearch子流程)
[{"id":"3f7774f5.a6a20c","type":"subflow","name":"Elasticsearch","info":"","category":"","in":[{"x":50,"y":30,"wires":[{"id":"c5bb15c.4c10de8"}]}],"out":[{"x":580,"y":30,"wires":[{"id":"19d2a3b.d048a5c","port":0}]}],"env":[],"color":"#DDAA99"},{"id":"eeda88c1.f8b278","type":"http request","z":"3f7774f5.a6a20c","name":"Elasticsearch","method":"use","ret":"txt","paytoqs":false,"url":"","tls":"","persist":false,"proxy":"","authType":"basic","x":340,"y":30,"wires":[["19d2a3b.d048a5c"]]},{"id":"c5bb15c.4c10de8","type":"change","z":"3f7774f5.a6a20c","name":"msg.headers","rules":[{"t":"delete","p":"headers","pt":"msg"},{"t":"set","p":"headers","pt":"msg","to":"{\"Content-Type\":\"application/json\",\"Connection\":\"close\"}","tot":"json"}],"action":"","property":"","from":"","to":"","reg":false,"x":170,"y":30,"wires":[["eeda88c1.f8b278"]]},{"id":"19d2a3b.d048a5c","type":"json","z":"3f7774f5.a6a20c","name":"","property":"payload","action":"","pretty":false,"x":490,"y":30,"wires":[[]]},{"id":"e6695b94.08cca8","type":"subflow:3f7774f5.a6a20c","z":"f4557e75.b0f3f","x":640,"y":140,"wires":[[]]},{"id":"7108220c.1b5d9c","type":"change","z":"f4557e75.b0f3f","name":"POST _doc","rules":[{"t":"set","p":"method","pt":"msg","to":"POST","tot":"str"},{"t":"set","p":"url","pt":"msg","to":"http://localhost:9200/sensor-","tot":"str"},{"t":"change","p":"url","pt":"msg","from":"$","fromt":"re","to":"payload.service","tot":"msg"},{"t":"change","p":"url","pt":"msg","from":"$","fromt":"re","to":"/_doc","tot":"str"},{"t":"set","p":"date","pt":"msg","to":"$now()","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":280,"y":140,"wires":[["69e18d2a.9e18d4"]]},{"id":"69e18d2a.9e18d4","type":"template","z":"f4557e75.b0f3f","name":"ES Query JSON","field":"payload","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"{\n \"date\": \"{{{date}}}\",\n \"device\": \"{{{payload.device}}}\",\n \"{{payload.service}}\": {{{payload.data}}}\n}","output":"str","x":460,"y":140,"wires":[["e6695b94.08cca8"]]},{"id":"d541687c.a302d8","type":"linking-scanner","z":"f4557e75.b0f3f","name":"","autostart":true,"duration":"","interval":"30","x":100,"y":140,"wires":[["7108220c.1b5d9c"]]}]
2. 时间序列查询
如果要搜索最新的温度,下面是一个简单的流程。只需使用Change节点和Elasticsearch子流程即可。

Change节点会将搜索查询设置给msg.method,msg.url和msg.payload,如下所示。

以下是Elasticsearch的查询语句。将日期按照降序排列,并且只请求第一条记录。

结果将如下所示。数据将返回到msg.payload.hits.hits[0]._source中。

最近10分钟的平均值是通过以下搜索查询进行计算的。

msg.payload.aggregations.temperature_avg.buckets[0].aggs.value已经包含在内了。

3. 单词分析
在Elasticsearch中,可以进行分词等操作。

如果您想将日文拆分成单词,您需要安装Elasticsearch插件analysis-kuromoji。

结果存储在msg.payload.tokens[]中。

我认为只要有单词,就能够进行单词分析等操作。
在Node-RED中可以通过设置和映射来更新用户词典,同时还可以使用文件节点与其组合。
最后
在我个人看来,我认为Node-RED和Elasticsearch非常适合搭配使用。在实际工作中,我们会将分析数据嵌入到原始数据中,并使用Kibana进行可视化。