首次使用 Logstash
这篇文章是关于数据处理工具“Logstash”的入门教程。
我将解释如何使用Logstash将CSV格式的日志文件导入到Elasticsearch。
在本教程中,我们将使用Elasticsearch。
如果您尚未使用过Elasticsearch,请参考另一篇文章”初次使用Elasticsearch”。
Logstash 是什么?
Logstash是由Elastic公司开发的开源数据处理管道,可以将各种数据源导入到仓库中。

操作環境
-
- Mac OS X 10.14.5
-
- Java 11.0.2
-
- Elasticsearch 7.1.0
- Logstash 7.1.0
安装
请事先参考另一篇文章《初识 Elasticsearch》,安装和启动 Elasticsearch。
接下来,安装Logstash。
https://artifacts.elastic.co/downloads/logstash/logstash-7.1.0.tar.gz
tar -xzf logstash-7.1.0.tar.gz
cd logstash-7.1.0
创建测试数据
我们将使用具有”日期时间”、”日志级别”、”错误消息”和”用户ID”列的CSV格式日志文件作为测试数据。
请在一个合适的文件夹中创建以下的CSV文件。
20190524日志文件.csv
Date,Level,ErrorMessage,UserId
2019-05-24 10:00:00,INFO,Success.,1
2019-05-24 11:00:00,ERROR,An error has occurred.Please wait a moment and try again.,1
20190525日志文件
Date,Level,ErrorMessage,UserId
2019-05-25 10:00:00,INFO,Success.,1
2019-05-25 11:00:00,INFO,Success.,1
2019-05-25 12:00:00,ERROR,An error has occurred.Please wait a moment and try again.,2
2019-05-25 13:00:00,INFO,Success.,2
创建设置文件
创建Logstash的配置文件。
配置/logstash.conf
input {
file {
mode => "tail"
path => ["/Users/myuser/work/elasticsearch/log/*_log.csv"]
sincedb_path => "/Users/myuser/work/elasticsearch/log/sincedb"
start_position => "beginning"
codec => plain {
charset => "UTF-8"
}
}
}
filter {
csv {
columns => ["Date", "Level", "ErrorMessage","UserId"]
convert => {
"UserId" => "integer"
}
skip_header => true
}
date {
match => ["Date", "yyyy-MM-dd HH:mm:ss"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "log"
}
stdout {
codec => rubydebug
}
}
设置输入
在这个区块中,您可以进行与数据输入相关的设置。
mode => "tail"
设定文件导入模式。
在文件导入模式中,有两种模式可供选择,即尾随模式和读取模式。
在尾随模式下,可以检测到文件中有数据添加时,并将添加的数据导入进去。
path => ["/Users/myuser/work/elasticsearch/log/*_log.csv"]
請指定要匯入的檔案路徑。
请提供刚才创建的测试数据路径。
sincedb_path => "/Users/myuser/work/elasticsearch/log/sincedb"
请指定sincedb文件的路径。
sincedb 文件是用来记录已经导入的文件到哪个位置的文件。如果想再次导入同一个文件,请删除 sincedb 文件。
如果您不想创建 sincedb 文件,则可以指定 “/dev/null”(在 Windows 上是 “nul”)。
start_position => "beginning"
指定文件导入位置。
请指定「开始(文件开头)」或「结束(文件末尾)」。
codec => plain {
charset => "UTF-8"
}
请指定要导入的文件的字符编码。
在Windows环境中,默认是采用”UTF-8″,请将其更改为”SJIS”。
请参考文件输入插件的参考文档以获取更多详细信息。
过滤器的设置。
在filter模块中,您可以进行有关数据转换的设置。
columns => ["Date", "Level", "ErrorMessage","UserId"]
指定列名。
convert => {
"UserId" => "integer"
}
指定列的数据类型。
指定将「UserId」列转换为数值类型。
skip_header => true
如果存在标题行,请指定跳过读取。
date {
match => ["Date", "yyyy-MM-dd HH:mm:ss"]
}
在Elasticsearch中指定要用作“timestamp”的列的日期时间格式。
请参阅 Csv filter 插件的参考文件以获取更详细信息。
设置输出
在输出块中,进行与数据输出相关的设置。
elasticsearch {
hosts => ["localhost:9200"]
index => "log"
}
请指定 Elasticsearch 的 URI 和索引名称作为输出目标。
请查阅 Elasticsearch 输出插件的参考文档以获取更多详细信息。
stdout {
codec => rubydebug
}
同时将输出发送到Elasticsearch和命令提示符窗口。
执行数据导入
运行Logstash,将CSV格式的日志文件导入到Elasticsearch中。
sudo bin/logstash -f config/logstash.conf
如果成功,那么导入的数据将会在命令提示符中输出。
{
"Date" => "2019-05-24 10:00:00",
"@version" => "1",
"path" => "/Users/myuser/work/elasticsearch/log/20190525_log.csv",
"ErrorMessage" => "Success.",
"host" => "local",
"@timestamp" => 2019-05-25T04:00:00.000Z,
"Level" => "INFO",
"UserId" => 1,
"message" => "2019-05-24 10:00:00,INFO,Success.,1"
}
・・・以下省略
此外,您还可以使用Elasticsearch来搜索和验证索引。
指令
GET /log/_search
执行结果
{
"took" : 45,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 12,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "log",
"_type" : "_doc",
"_id" : "_nCI8WoBy6JJU2RqTkdE",
"_score" : 1.0,
"_source" : {
"Date" : "2019-05-24 10:00:00",
"ErrorMessage" : "Success.",
"Level" : "INFO",
"@timestamp" : "2019-05-24T01:00:00.000Z",
"@version" : "1",
"path" : "/Users/myuser/work/elasticsearch/log/20190524_log.csv",
"UserId" : 1,
"message" : "2019-05-24 10:00:00,INFO,Success.,1",
"host" : "local"
}
},
・・・以下省略