使用Elasticsearch/Logstash/Kibana将本地和AWS的日志数据进行可视化
组成
オンプレ側
テスト用サーバ2台とCiscoスイッチを用意。各機器のログはSyslogサーバに転送する。
SyslogサーバのログをLogstashでETL処理しElasticsearchに保存する。
最後にKibanaで各種設定やグラフ作成をする。
AWS側
CloudTrail、VPC FlowlogsをCloudwatchに送信する。
LogstashからCloudwatchのロググループを指定してログを取りに行く。
Kibanaで各種設定やグラフを作成する。

结束后

环境和前提条件
-
- AlmaLinux release 8.5 (Arctic Sphynx)
-
- 下記の設定は設定済み
-
- CloudTrail,VPCFlowlogs,CloudWatchの設定
- LogstashからCloudWatchにアクセスするIAMユーザの作成及びIAMロールの設定
请提供具体的句子或内容,我将为您提供中文的同义转述。
-
- Logstashのfilter箇所の記載方法(CloudTrail、VPCFlowlogs)は下記サイトを参考にさせて頂きました。
-
- VPC FlowLogsをLogstashで正規化してみた
- CloudTrailをElasticsearchに取り込んでみた
建立步驟(On-premise端的日誌收集)
搭建Elasticsearch服务器
- Elasticsearchインストール
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.10.1-x86_64.rpm
rpm --install elasticsearch-7.10.1-x86_64.rpm
- Elasticsearch設定
vim elasticsearch.yml
network.host: 0.0.0.0
node.name: node-1
cluster.initial_master_nodes: ["node-1"]
- vm.max_map_countパラメータ設定
vm.max_map_count=262144
sysctl -q -w vm.max_map_count=262144
systemctl enable elasticsearch
systemctl start elasticsearch
将Kibana也安装在同一台服务器上。
- Kibanaインストール
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.10.1-x86_64.rpm
rpm --install kibana-7.10.1-x86_64.rpm
- Kibana設定
server.host: "0.0.0.0"
- Kibana起動
systemctl enable kibana
systemctl start kibana
- Kibana動作確認
访问 http://xx.xx.xx.xx:5601/
搭建Logstash服务器
- OpenJDKのインストール
yum -y install java-1.8.0-openjdk
- Logstashインストール
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
[logstash-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
yum -y install logstash-7.10.1
- Logstash起動
systemctl enable logstash
systemctl restart logstash
建立Syslog服务器
- Syslog設定
使用IP地址将日志文件分离的设置
module(load="imudp") # needs to be done just once
input(type="imudp" port="514")
module(load="imtcp") # needs to be done just once
input(type="imtcp" port="514")
#### RULES ####
:fromhost-ip, isequal, "xxx.xxx.xxx.xxx" -/var/log/server/pst.log
& ~
:fromhost-ip, isequal, "xxx.xxx.xxx.xxx" -/var/log/server/redmine.log
& ~
:fromhost-ip, isequal, "xxx.xxx.xxx.xxx" -/var/log/server/ciscosw.log
& ~
/var/log/server/*.log {
weekly
rotate 54
compress
create 0664 root root
postrotate
/bin/systemctl restart rsyslog
endscript
}
systemctl restart rsyslog
Syslog转发端设置
- サーバ設定
*.* @@xxx.xxx.xxx.xxx:514
systemctl restart rsyslog
- スイッチ設定
为了测试目的,在调试中设置级别。
logging host xxx.xxx.xxx.xxx
logging trap debugging
NFS配置
使Logstash服务器能够看到Syslog服务器。
- Syslogサーバ側設定
yum -y install nfs-utils
systemctl enable nfs-server
systemctl start nfs-server
/var/log xxx.xxx.xxx.xxx/xx(rw,no_root_squash)
- Logstashサーバ側設定
yum -y install nfs-utils
systemctl enable nfs-server
systemctl start nfs-server
mount -t nfs xxx.xxx.xxx.xxx:/var/log /mnt
xxx.xxx.xxx.xxx:/var/log /mnt nfs defaults 0 0
Logstash配置
参考资料:logstash模式
- Confファイル作成
input {
file {
path => "/mnt/server/redmine.log"
path => "/mnt/server/pst.log"
path => "/mnt/server/ciscosw.log"
start_position => "beginning"
}
}
filter {
grok {
match => {
"message" => "%{SYSLOGBASE}%{SPACE}%{GREEDYDATA:SYSLOGMESSAGE}"
}
}
}
output {
elasticsearch {
hosts => ["xxx.xxx.xxx.xxx:9200"]
index => "syslog-%{+YYYY-MM-dd}"
}
}
- Logstash再起動
systemctl restart logstash
Kibana配置
- Develper Toolの画面にアクセス
请用中文将以下内容进行改述,只需提供一种选项:
http://xx.xx.xx.xx:5601/app/dev_tools#/console
这是控制台的网址:http://xx.xx.xx.xx:5601/app/dev_tools#/console。
- Indexが登録されているか確認
GET /_cat/indices?v
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open syslog-2022-02-21 KfyhDKVjRwGGVk2TWuGD1A 1 1 407 0 145.2kb 145.2kb
yellow open syslog-2022-02-22 lmZFW50vTveDW2wriHFW_w 1 1 381
- Document情報確認
GET /syslog-2022-02-* /_search
{
"query": { "match_all": {} }
}
- Index Patterns登録
按照以下顺序选择:Stack Management -> Index Patterns

输入Syslog-*后,点击下一步。
输入Syslog-*后,点击下一步。
输入Syslog-*后,点击下一步。
输入Syslog-*并点击下一步。
在输入Syslog-*后,点击下一步。
输入Syslog-*然后点击下一步。

选择时间戳后,点击”创建索引模式”。

- Syslog確認
点击“Discover”按钮

确认是否显示了Syslog的内容。

- グラフ作成
选择Visualize后,创建喜欢的图表。

AWS側のログ取込みの手順を構築する。
Logstash配置
- CloudWatchプラグインのインストール
/usr/share/logstash/bin/logstash-plugin install logstash-input-cloudwatch_logs
- Flowlogsのgrokパターン作成
VPCFLOWLOG %{NUMBER:version} %{NOTSPACE:account-id} %{NOTSPACE:interface-id} %{IP:srcaddr} %{IP:dstaddr} %{NOTSPACE:srcport} %{NOTSPACE:dstport} %{NOTSPACE:protocol} %{NUMBER:packets:float} %{NUMBER:bytes:float} %{NOTSPACE:start} %{NOTSPACE:end} %{NOTSPACE:action} %{NOTSPACE:log-status}
- Confファイル作成(VPC Flowlogs)
input {
cloudwatch_logs {
region => "ap-northeast-1"
log_group => [ "vpcflowlogs" ]
access_key_id => "xxxxxxxxx"
secret_access_key => "xxxxxxxxx" }
}
filter {
grok {
patterns_dir => [ "/etc/logstash/patterns/vpcflowlogs_patterns" ]
match => { "message" => "%{VPCFLOWLOG}"}
}
date {
match => [ "start","UNIX" ]
target => "@timestamp"
}
date {
match => [ "start","UNIX" ]
target => "start_time"
}
date {
match => [ "end","UNIX" ]
target => "end_time"
}
geoip {
source => "srcaddr"
target => "src_geoip"
tag_on_failure => "src_geoip_lookup_failure"
}
geoip {
source => "dstaddr"
target => "dst_geoip"
tag_on_failure => "dst_geoip_lookup_failure"
}
mutate {
remove_field => [ "start", 'end' ]
}
}
output {
elasticsearch {
hosts => [ "xx.xx.xx.xx:9200" ]
index => "vpcflowlogs-%{+YYYY-MM-dd}"
}
}
- Confファイル作成(CloudTrail)
input {
cloudwatch_logs {
region => "ap-northeast-1"
log_group => [ "ClouTrail-LogGroup" ]
access_key_id => "xxxxxxxxx"
secret_access_key => "xxxxxxxxx"
sincedb_path => "/var/lib/logstash/sincedb_cloudtrail"
}
}
filter {
json {
source => "message"
}
date {
match => [ "eventTime", "ISO8601" ]
target => "@timestamp"
}
ruby {
code => "event.set('[@metadata][local_time]',event.get('[@timestamp]').time.localtime.strftime('%Y-%m-%d'))"
}
useragent {
source => "userAgent"
target => "useragent"
}
geoip {
source => "sourceIPAddress"
}
mutate {
remove_field => [ "message" ]
}
}
output {
elasticsearch {
hosts => [ "xx.xx.xx.xx:9200" ]
index => "cloudtrail-%{+YYYY-MM-dd}"
}
}
- Pipelineファイル設定
設定複數次讀取conf檔案的方式
#- pipeline.id: main
# path.config: "/etc/logstash/conf.d/*.conf"
- pipeline.id: syslog
pipeline.batch.size: 125
path.config: "/etc/logstash/conf.d/syslog.conf"
pipeline.workers: 1
- pipeline.id: vpcflowlogs
pipeline.batch.size: 125
path.config: "/etc/logstash/conf.d/vpcflowlogs.conf"
pipeline.workers: 1
- pipeline.id: cloudtrail
pipeline.batch.size: 125
path.config: "/etc/logstash/conf.d/cloudtrail.conf"
pipeline.workers: 1
- Logstash再起動
systemctl restart logstash
Kibana的配置
设定方法与上述的本地环境相同。