Logstash的Kinesis输入插件和Kinesis输出插件

首先

我使用Elastic提供的ETL工具Logstash,将日志数据导入和导出到Amazon Kinesis Data Streams进行了尝试。

【参考】
・Amazon Kinesis数据流是什么
・AWS kinesis概述
※简单来说,它是AWS托管的Apache Kafka服务。

利用环境和构成图。 (Use the environment and configuration diagram.)

productversionFilebeat6.6.0Logstash6.6.0Elasticsearch6.6.0Kibana6.6.0OS(EC2)Amazon Linux2

※我们正在使用最新版本的Elastic Stack 6.6.0。
※我们正在使用AWS的弗吉尼亚(us-east-1)区域。

image.png
    • LogstashからKinesis Data Streamsにデータを置くときはkinesis output pluginを利用します。

 

    LogstashでKinesis Data Streamsからデータを取り出すときはkinesis input pluginを利用します。

前提

假设上述环境已经准备就绪,我们省略了安装 Logstash_put-kinesis 机器的 Java、Logstash 和 Filebeat,以及安装 Logstash_get-kinesis 机器的 Java、Logstash、Elasticsearch 和 Kibana。

Logstash Kinesis output插件是什么?

你可以将Logstash导入的日志数据输出到Amazon Kinesis Data Streams作为目标。实现上述功能的是Kinesis输出插件。该插件并非由Elastic公司官方提供,而是由社区插件提供。

設定項目デフォルト値説明stream_name- (必須)kinesisストリーム名を指定します。regionus-east-1 (任意)Kinesis Data StreamsのAWSリージョンを指定します。metrics_levelcloudwatch (任意)メトリック統計情報のCloudWatchへの送信有無を指定します。access_key- (任意)Kinesisに書き込み権限のあるIAMアカウントで作成したアクセスキーを指定します。secret_key- (任意)アクセスキーを利用する場合の対になるシークレットキーを指定します。metrics_access_key- (任意)CloudWatchに書き込み権限のあるIAMアカウントで作成したアクセスキーを指定します。metrics_secret_key- (任意)CloudWatchのアクセスキーを利用する場合の対になるシークレットキーを指定します。role_arn- (任意)AssumeRoleを利用する場合のARNを指定します。metrics_role_arn- (任意)CloudWatchへの書き込みでAssumeRoleを利用する場合のARNを指定します。event_partition_keys- (任意)各レコードに挿入するパーティションキーを指定します。randomized_partition_keyfalse (任意)データ送信順序を無視したランダムなパーティションキーの利用有無を指定します。aggregation_enabledtrue (任意)Kinesisに投入するレコードを集約して送るか指定します。max_pending_records1000 (任意)Kinesis高負荷時のLogstash側でのバッファリングするレコード数を指定します。

Logstash Kinesis输入插件是什么?

Logstash可以将Amazon Kinesis Data Streams指定为日志数据的获取源。
实现上述功能的是kinesis input插件。
该插件是Elastic官方提供的插件,但不受官方支持(´;ω;`)。

設定項目デフォルト値説明application_namelogstash (任意)dynamodb調整テーブルに使用されるアプリケーション名を指定します。checkpoint_interval_seconds60 (任意)dynamodbをチェックにしに行くインターバル(秒)を指定します。kinesis_stream_name- (必須)kinesisストリーム名を指定します。metricsnil (任意)メトリック統計情報のCloudWatchへの送信有無を指定します。profile- (任意)AWS認証情報を参照する場合のファイルパスを指定します。regionus-east-1 (任意)Kinesis Data StreamsのAWSリージョンを指定します。role_arn- (任意)AssumeRoleを利用する場合のARNを指定します。role_session_name- (任意)IAMロールを引き受けるときに使用するセッション名を指定します。

设置步骤

    1. 创建Kinesis Streams

 

    1. 创建IAM角色

 

    1. 安装插件

 

    配置logstash.conf

1. 创建Kinesis Streams。

image.png

※以下的文章很好地总结了Kinesis Data Stream的规模和运作情况。

【参考】
· 介绍了通过大规模数据验证和了解AWS Kinesis Stream的案例。

2. 创建 IAM 角色

    Logstash_put-kinesisのEC2インスタンスに割り当てるIAM Roleに必要なIAM Policy
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:<AWSアカウント>:stream/kinesis_poc"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        }
    ]
}
    Logstash_get-kinesisのEC2インスタンスに割り当てるIAM Roleに必要なIAM Policy
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:<AWSアカウント>:stream/kinesis_poc"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:Scan",
                "dynamodb:PutItem",
                "dynamodb:UpdateItem",
                "dynamodb:GetItem",
                "dynamodb:DeleteItem",
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        }
    ]
}

以下是IAM策略和用户创建所需的IAM策略,但这是不够的。
如果权限不足,在/var/log/messages中会输出以下错误,所以请务必注意!
※以下示例中缺少dynamodb:DescribeTable权限。

Feb 13 16:01:39 ip-172-31-1-190 logstash: Caused by: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: User: arn:aws:sts::<AWSアカウント>:assumed-role/kinesis_get_role/i-03c32b05117b3235a is not authorized to perform: dynamodb:DescribeTable on resource: arn:aws:dynamodb:us-east-1:<AWSアカウント>:table/logstash (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: AccessDeniedException; Request ID: BQRBG0RSE187CLGTP802ONS5BFVV4KQNSO5AEMVJF66Q9ASUAAJG)

3. 安装插件

您可以通过查看/usr/share/logstash/Gemfile来了解默认安装的插件。
顺便提一下,本次涉及的两个插件都尚未默认安装,请进行额外安装。

[root@ip-172-31-1-190 ~]# cat /usr/share/logstash/Gemfile
# This is a Logstash generated Gemfile.
# If you modify this file manually all comments and formatting will be lost.

source "https://rubygems.org"
gem "logstash-core", :path => "./logstash-core"
gem "logstash-core-plugin-api", :path => "./logstash-core-plugin-api"
gem "paquet", "~> 0.2.0"
gem "ruby-progressbar", "~> 1.8.1"
gem "builder", "~> 3.2.2"
gem "ci_reporter_rspec", "1.0.0", :group => :development
gem "tins", "1.6", :group => :development
gem "rspec", "~> 3.5", :group => :development
gem "logstash-devutils", "= 1.3.5", :group => :development
gem "benchmark-ips", :group => :development
gem "octokit", "3.8.0", :group => :build
gem "stud", "~> 0.0.22", :group => :build
gem "fpm", "~> 1.3.3", :group => :build
gem "rubyzip", "~> 1.2.1", :group => :build
gem "gems", "~> 0.8.3", :group => :build
gem "rack-test", :require => "rack/test", :group => :development
gem "flores", "~> 0.0.6", :group => :development
gem "term-ansicolor", "~> 1.3.2", :group => :development
gem "json-schema", "~> 2.6", :group => :development
gem "belzebuth", :group => :development
gem "pleaserun", "~>0.0.28"
gem "webrick", "~> 1.3.1"
gem "atomic", "<= 1.1.99"
gem "rake", "~> 12.2.1", :group => :build
gem "logstash-codec-cef"
gem "logstash-codec-collectd"
gem "logstash-codec-dots"
gem "logstash-codec-edn"
gem "logstash-codec-edn_lines"
gem "logstash-codec-es_bulk"
gem "logstash-codec-fluent"
gem "logstash-codec-graphite"
gem "logstash-codec-json"
gem "logstash-codec-json_lines"
gem "logstash-codec-line"
gem "logstash-codec-msgpack"
gem "logstash-codec-multiline"
gem "logstash-codec-netflow", ">=3.14.1", "<4.0.0"
gem "logstash-codec-plain"
gem "logstash-codec-rubydebug"
gem "logstash-filter-aggregate"
gem "logstash-filter-anonymize"
gem "logstash-filter-cidr"
gem "logstash-filter-clone"
gem "logstash-filter-csv"
gem "logstash-filter-date"
gem "logstash-filter-de_dot"
gem "logstash-filter-dissect"
gem "logstash-filter-dns"
gem "logstash-filter-drop"
gem "logstash-filter-elasticsearch"
gem "logstash-filter-fingerprint"
gem "logstash-filter-geoip"
gem "logstash-filter-grok"
gem "logstash-filter-http"
gem "logstash-filter-jdbc_static"
gem "logstash-filter-jdbc_streaming"
gem "logstash-filter-json"
gem "logstash-filter-kv"
gem "logstash-filter-memcached"
gem "logstash-filter-metrics"
gem "logstash-filter-mutate"
gem "logstash-filter-ruby"
gem "logstash-filter-sleep"
gem "logstash-filter-split"
gem "logstash-filter-syslog_pri"
gem "logstash-filter-throttle"
gem "logstash-filter-translate"
gem "logstash-filter-truncate"
gem "logstash-filter-urldecode"
gem "logstash-filter-useragent"
gem "logstash-filter-xml"
gem "logstash-input-beats"
gem "logstash-input-azure_event_hubs"
gem "logstash-input-dead_letter_queue"
gem "logstash-input-elasticsearch"
gem "logstash-input-exec"
gem "logstash-input-file"
gem "logstash-input-ganglia"
gem "logstash-input-gelf"
gem "logstash-input-generator"
gem "logstash-input-graphite"
gem "logstash-input-heartbeat"
gem "logstash-input-http"
gem "logstash-input-http_poller"
gem "logstash-input-imap"
gem "logstash-input-jdbc"
gem "logstash-input-kafka"
gem "logstash-input-pipe"
gem "logstash-input-rabbitmq"
gem "logstash-input-redis"
gem "logstash-input-s3"
gem "logstash-input-snmp"
gem "logstash-input-snmptrap"
gem "logstash-input-sqs"
gem "logstash-input-stdin"
gem "logstash-input-syslog"
gem "logstash-input-tcp"
gem "logstash-input-twitter"
gem "logstash-input-udp"
gem "logstash-input-unix"
gem "logstash-output-elastic_app_search"
gem "logstash-output-cloudwatch"
gem "logstash-output-csv"
gem "logstash-output-elasticsearch"
gem "logstash-output-email"
gem "logstash-output-file"
gem "logstash-output-graphite"
gem "logstash-output-http"
gem "logstash-output-kafka"
gem "logstash-output-lumberjack"
gem "logstash-output-nagios"
gem "logstash-output-null"
gem "logstash-output-pagerduty"
gem "logstash-output-pipe"
gem "logstash-output-rabbitmq"
gem "logstash-output-redis"
gem "logstash-output-s3", ">=4.0.9", "<5.0.0"
gem "logstash-output-sns"
gem "logstash-output-sqs"
gem "logstash-output-stdout"
gem "logstash-output-tcp"
gem "logstash-output-udp"
gem "logstash-output-webhdfs"
    Logstash_put-kinesisにはkinesis output pluginをインストールします。
[root@ip-172-31-1-235 ~]# /usr/share/logstash/bin/logstash-plugin install logstash-output-kinesis
Validating logstash-output-kinesis
Installing logstash-output-kinesis
Installation successful
    Logstash_get-kinesisにはkinesis input pluginをインストールします。
[root@ip-172-31-9-35 ~]# /usr/share/logstash/bin/logstash-plugin install logstash-input-kinesis
Validating logstash-input-kinesis
Installing logstash-input-kinesis
Installation successful

4. logstash.conf的配置文件

    Logstash_put-kinesisのlogstash.confを作成します。
input {
  beats {
    port => 5044
  }
}
output {
  kinesis {
    stream_name => "kinesis_poc"
    max_pending_records => 10000
    randomized_partition_key => true
    region => "us-east-1"
  }
}

将从Filebeat接收到的日志数据流向Kinesis。
在分发到多个分片时,激活随机分区键(randomized_partition_key)功能。

    Logstash_get-kinesisのlogstash.confを作成します。
input {
  kinesis {
    kinesis_stream_name => "kinesis_poc"
    application_name => "kinesis_poc"
    checkpoint_interval_seconds => 10
    metrics => "cloudwatch"
    region => "us-east-1"
    codec => json {}
  }
}
output {
  elasticsearch {
    hosts => [ "localhost:9200" ]
  }
}

这次是将输出发送到已经在同一操作系统中安装的Elasticsearch,并通过Kibana实现可视化。

image.png

执行结果

image.png

在公式网站上不知道为什么没有提到作为选项,但在GitHub上写着只有两个迭代器的指定是可以的。

总结

您觉得怎么样?

我以前多次使用AWS上的S3存储桶作为DataLake,但是为了从DataLake中采集大量日志数据并实时投入到分析所需的存储中,我需要一个可扩展的架构,所以这次我尝试了一下!

由于 Logstash 和 Kinesis 数据流之间的协作信息相对较少,希望这能对某些人有帮助^^

广告
将在 10 秒后关闭
bannerAds