Logstash的Kinesis输入插件和Kinesis输出插件
首先
我使用Elastic提供的ETL工具Logstash,将日志数据导入和导出到Amazon Kinesis Data Streams进行了尝试。
【参考】
・Amazon Kinesis数据流是什么
・AWS kinesis概述
※简单来说,它是AWS托管的Apache Kafka服务。
利用环境和构成图。 (Use the environment and configuration diagram.)
※我们正在使用最新版本的Elastic Stack 6.6.0。
※我们正在使用AWS的弗吉尼亚(us-east-1)区域。

-
- LogstashからKinesis Data Streamsにデータを置くときはkinesis output pluginを利用します。
- LogstashでKinesis Data Streamsからデータを取り出すときはkinesis input pluginを利用します。
前提
假设上述环境已经准备就绪,我们省略了安装 Logstash_put-kinesis 机器的 Java、Logstash 和 Filebeat,以及安装 Logstash_get-kinesis 机器的 Java、Logstash、Elasticsearch 和 Kibana。
Logstash Kinesis output插件是什么?
你可以将Logstash导入的日志数据输出到Amazon Kinesis Data Streams作为目标。实现上述功能的是Kinesis输出插件。该插件并非由Elastic公司官方提供,而是由社区插件提供。
Logstash Kinesis输入插件是什么?
Logstash可以将Amazon Kinesis Data Streams指定为日志数据的获取源。
实现上述功能的是kinesis input插件。
该插件是Elastic官方提供的插件,但不受官方支持(´;ω;`)。
设置步骤
-
- 创建Kinesis Streams
-
- 创建IAM角色
-
- 安装插件
- 配置logstash.conf
1. 创建Kinesis Streams。

※以下的文章很好地总结了Kinesis Data Stream的规模和运作情况。
【参考】
· 介绍了通过大规模数据验证和了解AWS Kinesis Stream的案例。
2. 创建 IAM 角色
- Logstash_put-kinesisのEC2インスタンスに割り当てるIAM Roleに必要なIAM Policy
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords",
"kinesis:DescribeStream"
],
"Resource": "arn:aws:kinesis:us-east-1:<AWSアカウント>:stream/kinesis_poc"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": "*"
}
]
}
- Logstash_get-kinesisのEC2インスタンスに割り当てるIAM Roleに必要なIAM Policy
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:DescribeStream",
"kinesis:ListShards"
],
"Resource": "arn:aws:kinesis:us-east-1:<AWSアカウント>:stream/kinesis_poc"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:DescribeTable",
"dynamodb:Scan",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:GetItem",
"dynamodb:DeleteItem",
"cloudwatch:PutMetricData"
],
"Resource": "*"
}
]
}
以下是IAM策略和用户创建所需的IAM策略,但这是不够的。
如果权限不足,在/var/log/messages中会输出以下错误,所以请务必注意!
※以下示例中缺少dynamodb:DescribeTable权限。
Feb 13 16:01:39 ip-172-31-1-190 logstash: Caused by: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: User: arn:aws:sts::<AWSアカウント>:assumed-role/kinesis_get_role/i-03c32b05117b3235a is not authorized to perform: dynamodb:DescribeTable on resource: arn:aws:dynamodb:us-east-1:<AWSアカウント>:table/logstash (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: AccessDeniedException; Request ID: BQRBG0RSE187CLGTP802ONS5BFVV4KQNSO5AEMVJF66Q9ASUAAJG)
3. 安装插件
您可以通过查看/usr/share/logstash/Gemfile来了解默认安装的插件。
顺便提一下,本次涉及的两个插件都尚未默认安装,请进行额外安装。
[root@ip-172-31-1-190 ~]# cat /usr/share/logstash/Gemfile
# This is a Logstash generated Gemfile.
# If you modify this file manually all comments and formatting will be lost.
source "https://rubygems.org"
gem "logstash-core", :path => "./logstash-core"
gem "logstash-core-plugin-api", :path => "./logstash-core-plugin-api"
gem "paquet", "~> 0.2.0"
gem "ruby-progressbar", "~> 1.8.1"
gem "builder", "~> 3.2.2"
gem "ci_reporter_rspec", "1.0.0", :group => :development
gem "tins", "1.6", :group => :development
gem "rspec", "~> 3.5", :group => :development
gem "logstash-devutils", "= 1.3.5", :group => :development
gem "benchmark-ips", :group => :development
gem "octokit", "3.8.0", :group => :build
gem "stud", "~> 0.0.22", :group => :build
gem "fpm", "~> 1.3.3", :group => :build
gem "rubyzip", "~> 1.2.1", :group => :build
gem "gems", "~> 0.8.3", :group => :build
gem "rack-test", :require => "rack/test", :group => :development
gem "flores", "~> 0.0.6", :group => :development
gem "term-ansicolor", "~> 1.3.2", :group => :development
gem "json-schema", "~> 2.6", :group => :development
gem "belzebuth", :group => :development
gem "pleaserun", "~>0.0.28"
gem "webrick", "~> 1.3.1"
gem "atomic", "<= 1.1.99"
gem "rake", "~> 12.2.1", :group => :build
gem "logstash-codec-cef"
gem "logstash-codec-collectd"
gem "logstash-codec-dots"
gem "logstash-codec-edn"
gem "logstash-codec-edn_lines"
gem "logstash-codec-es_bulk"
gem "logstash-codec-fluent"
gem "logstash-codec-graphite"
gem "logstash-codec-json"
gem "logstash-codec-json_lines"
gem "logstash-codec-line"
gem "logstash-codec-msgpack"
gem "logstash-codec-multiline"
gem "logstash-codec-netflow", ">=3.14.1", "<4.0.0"
gem "logstash-codec-plain"
gem "logstash-codec-rubydebug"
gem "logstash-filter-aggregate"
gem "logstash-filter-anonymize"
gem "logstash-filter-cidr"
gem "logstash-filter-clone"
gem "logstash-filter-csv"
gem "logstash-filter-date"
gem "logstash-filter-de_dot"
gem "logstash-filter-dissect"
gem "logstash-filter-dns"
gem "logstash-filter-drop"
gem "logstash-filter-elasticsearch"
gem "logstash-filter-fingerprint"
gem "logstash-filter-geoip"
gem "logstash-filter-grok"
gem "logstash-filter-http"
gem "logstash-filter-jdbc_static"
gem "logstash-filter-jdbc_streaming"
gem "logstash-filter-json"
gem "logstash-filter-kv"
gem "logstash-filter-memcached"
gem "logstash-filter-metrics"
gem "logstash-filter-mutate"
gem "logstash-filter-ruby"
gem "logstash-filter-sleep"
gem "logstash-filter-split"
gem "logstash-filter-syslog_pri"
gem "logstash-filter-throttle"
gem "logstash-filter-translate"
gem "logstash-filter-truncate"
gem "logstash-filter-urldecode"
gem "logstash-filter-useragent"
gem "logstash-filter-xml"
gem "logstash-input-beats"
gem "logstash-input-azure_event_hubs"
gem "logstash-input-dead_letter_queue"
gem "logstash-input-elasticsearch"
gem "logstash-input-exec"
gem "logstash-input-file"
gem "logstash-input-ganglia"
gem "logstash-input-gelf"
gem "logstash-input-generator"
gem "logstash-input-graphite"
gem "logstash-input-heartbeat"
gem "logstash-input-http"
gem "logstash-input-http_poller"
gem "logstash-input-imap"
gem "logstash-input-jdbc"
gem "logstash-input-kafka"
gem "logstash-input-pipe"
gem "logstash-input-rabbitmq"
gem "logstash-input-redis"
gem "logstash-input-s3"
gem "logstash-input-snmp"
gem "logstash-input-snmptrap"
gem "logstash-input-sqs"
gem "logstash-input-stdin"
gem "logstash-input-syslog"
gem "logstash-input-tcp"
gem "logstash-input-twitter"
gem "logstash-input-udp"
gem "logstash-input-unix"
gem "logstash-output-elastic_app_search"
gem "logstash-output-cloudwatch"
gem "logstash-output-csv"
gem "logstash-output-elasticsearch"
gem "logstash-output-email"
gem "logstash-output-file"
gem "logstash-output-graphite"
gem "logstash-output-http"
gem "logstash-output-kafka"
gem "logstash-output-lumberjack"
gem "logstash-output-nagios"
gem "logstash-output-null"
gem "logstash-output-pagerduty"
gem "logstash-output-pipe"
gem "logstash-output-rabbitmq"
gem "logstash-output-redis"
gem "logstash-output-s3", ">=4.0.9", "<5.0.0"
gem "logstash-output-sns"
gem "logstash-output-sqs"
gem "logstash-output-stdout"
gem "logstash-output-tcp"
gem "logstash-output-udp"
gem "logstash-output-webhdfs"
- Logstash_put-kinesisにはkinesis output pluginをインストールします。
[root@ip-172-31-1-235 ~]# /usr/share/logstash/bin/logstash-plugin install logstash-output-kinesis
Validating logstash-output-kinesis
Installing logstash-output-kinesis
Installation successful
- Logstash_get-kinesisにはkinesis input pluginをインストールします。
[root@ip-172-31-9-35 ~]# /usr/share/logstash/bin/logstash-plugin install logstash-input-kinesis
Validating logstash-input-kinesis
Installing logstash-input-kinesis
Installation successful
4. logstash.conf的配置文件
- Logstash_put-kinesisのlogstash.confを作成します。
input {
beats {
port => 5044
}
}
output {
kinesis {
stream_name => "kinesis_poc"
max_pending_records => 10000
randomized_partition_key => true
region => "us-east-1"
}
}
将从Filebeat接收到的日志数据流向Kinesis。
在分发到多个分片时,激活随机分区键(randomized_partition_key)功能。
- Logstash_get-kinesisのlogstash.confを作成します。
input {
kinesis {
kinesis_stream_name => "kinesis_poc"
application_name => "kinesis_poc"
checkpoint_interval_seconds => 10
metrics => "cloudwatch"
region => "us-east-1"
codec => json {}
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}
这次是将输出发送到已经在同一操作系统中安装的Elasticsearch,并通过Kibana实现可视化。

执行结果

在公式网站上不知道为什么没有提到作为选项,但在GitHub上写着只有两个迭代器的指定是可以的。
总结
您觉得怎么样?
我以前多次使用AWS上的S3存储桶作为DataLake,但是为了从DataLake中采集大量日志数据并实时投入到分析所需的存储中,我需要一个可扩展的架构,所以这次我尝试了一下!
由于 Logstash 和 Kinesis 数据流之间的协作信息相对较少,希望这能对某些人有帮助^^