使用Riemann实时事件处理和监控
由于之前提到过collectd,所以顺便也记录一下output部分。
简要概述
使用collectd收集的度量信息在riemann中进行实时处理。
riemann 是一个用于事件数据流处理的引擎,可以添加以下基本字段和任意自定义字段。
-
- host
-
- service
-
- state
-
- time
-
- description
-
- tags
-
- metric
- ttl
■组成
collectd -> riemann -> riemann-dash
└> Graphite -> Grafana
除了Graphite之外,还可以将指标保存到InfluxDB、datadog等SaaS,并且可以通过邮件、Slack和Nagios发送警报。

黎曼安装
首先進行安裝
# yum install daemonize
# rpm -ivh https://aphyr.com/riemann/riemann-0.2.10-1.noarch.rpm
黎曼设置
配置文件结构
创建主配置文件和各角色目录,将文件进行分割
配置文件使用 Clojure 进行编写
/etc/riemann/
|--riemann.config
|
|--/streams/
| |--default.config
| |--load.config
|
|--/filters/
| |--collectd.config
| |--rewrite.config
|
|--/targets/
|--graphite.config
|--index.config
-
- streams : イベントストリーム処理の設定ファイルを配置
-
- filters : イベントに対する変換処理の設定ファイルを配置
- target : graphite への出力等 output 関連の設定ファイルを配置
主要配置文件
仅包含基本配置,如监听端口和包含目录。
(logging/init :file "/var/log/riemann/riemann.log")
(let [host "0.0.0.0"]
(tcp-server :host host) ; :port 5555
(udp-server :host host) ; :port 5555
(ws-server :host host) ; :port 5556
(sse-server :host host) ; :port 5558
(repl-server :host host) ; :port 5557
)
(periodically-expire 60)
(include "filters")
(include "targets")
(include "streams")
事件流处理。
默认处理
(streams
graph
(with { :metric 1 :service "events/sec"
:host nil :tags nil :state nil } (rate 5 index))
(expired
#(info "expired" %))
)
通过阈值对状态进行处理
(streams
rewrite-index
(cpu-stream rewrite-index)
(mem-stream rewrite-index)
(swap-stream rewrite-index)
(where (service #"/cpu-")
(split
(service #"/cpu-idle$")
(splitp >= metric
1 (with :state "critical" rewrite-index)
20 (with :state "warning" rewrite-index)
(with :state "ok" rewrite-index))
:otherwise
(splitp <= metric
80 (with :state "critical" rewrite-index)
40 (with :state "warning" rewrite-index)
(with :state "ok" rewrite-index))
))
(where (service #"/disk_")
(split
(service #"/disk_octets")
(splitp <= metric
2000 (with :state "critical" rewrite-index)
(with :state "ok" rewrite-index))
))
(where (service #"/memory-")
(split
(service #"/memory-used$")
(splitp <= metric
30000000000 (with :state "critical" rewrite-index)
2000000000 (with :state "warning" rewrite-index)
(with :state "ok" rewrite-index))
))
(where (service #"/if_")
(split
(service #"/if_octets")
(splitp <= metric
20000 (with :state "critical" rewrite-index)
1000 (with :state "warning" rewrite-index)
(with :state "ok" rewrite-index))
))
)
如果直接从collectd输入,由于服务名过长或不够明确,需要进行转换处理(比如将load/load/shortterm转换为load1等)。
(defmacro mem-stream
[& children]
`(where* (fn [{plugin# :plugin, state# :state}]
(and (= "memory" plugin#)
(not= "expired" state#)))
(by [:host]
(project* [(comp (partial = "used") :type_instance)
(comp (partial = "cached") :type_instance)
(comp (partial = "buffered") :type_instance)
(comp (partial = "free") :type_instance)]
(smap
(fn [[used# cached# buf# free#]]
(when (and used# cached# buf# free#)
(try
(assoc used#
:service "mem used%"
:metric (-> (:metric used#)
(/ (+ (:metric used#)
(:metric cached#)
(:metric buf#)
(:metric free#)))
(* 100)))
(catch Exception e#
(error e# "cannot compute mem pct for " used# cached# buf# free#)
nil))))
~@children)))))
(defmacro cpu-stream
[& children]
`(where* (fn [{plugin# :plugin, plugin_instance# :plugin_instance,
state# :state}]
(and (= "aggregation" plugin#)
(= "cpu-average" plugin_instance#)
(not= "expired" state#)))
#(info %)
(by [:host]
(project* [(comp (partial = "user") :type_instance)
(comp (partial = "system") :type_intance)
(comp (partial = "softirq") :type_instance)
(comp (partial = "interrupt") :type_instance)
(comp (partial = "steal") :type_instance)
(comp (partial = "wait") :type_instance)
(comp (partial = "nice") :type_instance)]
(smap (fn [events#]
(when-let [summed# (folds/sum events#)]
(try
(assoc summed# :service "cpu all")
(catch Exception e#
(error e# "cannot compute cpu all for " summed#)
nil))))
~@children)))))
(def default-services
[
{:service "conntrack/conntrack" :rewrite "conntrack"}
{:service "load/load/shortterm" :rewrite "load1"}
{:service "load/load/midterm" :rewrite "load5"}
{:service "load/load/longterm" :rewrite "load15"}
{:service "memory/memory-used" :rewrite "mem used"}
{:service "memory/memory-free" :rewrite "mem free"}
{:service "memory/memory-buffered" :rewrite "mem buffered"}
{:service "memory/memory-cached" :rewrite "mem cached"}
{:service #"^cpu-([0-9]+)/cpu-(.*)$" :rewrite "cpu-$1 $2"}
{:service #"^aggregation-cpu-average/cpu-(.*)$" :rewrite "cpu $1"}
{:service #"^interface-(.*)/if_(errors|packets|octets)/(tx|rx)$"
:rewrite "nic $1 $3 $2"}
{:service #"^apache-apache/apache_(.*)$" :rewrite "apache $1"}
{:service #"^mysql-monitor/mysql_(.*)-(.*)$" :rewrite "mysql $2"}
]
)
(defn rewrite-service-with
[rules]
(let [matcher (fn [s1 s2] (if (string? s1) (= s1 s2) (re-find s1 s2)))]
(fn [{:keys [service] :as event}]
(or
(first
(for [{:keys [rewrite] :as rule} rules
:when (matcher (:service rule) service)]
(assoc event :service
(if (string? (:service rule))
rewrite
(clojure.string/replace service (:service rule) rewrite)))))
event))))
(def rewrite-service
(rewrite-service-with default-services))
对输入进行处理
给予默认状态。
(def index
(default {:state "ok" :ttl 300}
(update-index (index))))
(def rewrite-index
(smap rewrite-service index))
将METRICS发送到graphite
(def graph
(graphite {:host "{graphite サーバ IP アドレス}"}))
黎曼启动
# /etc/init.d/riemann start
在中国境内安装riemann-dash。
这个应用是用Sinatra创建的,用于实时显示收到的事件。
安装步骤
# cd /opt
# git clone https://github.com/aphyr/riemann-dash.git
# cd riemann-dash
# gem install bundler
# bundle install
更改图表的时区
# vi /opt/riemann-dash/lib/riemann/dash/public/views/flot.js
max: this.max
},
xaxis: {
+ timezone: "browser",
font: this.font,
Riemann-dash启动
# RACK_ENV=production bundle exec riemann-dash -c example/config.rb &
访问以下服务器IP地址的链接:http://{riemann 服务器 IP地址}:4567

客户端(collectd)安装
以下是通过collectd获取的指标
-
- cpu
-
- apache status
-
- disk
-
- interface
- mysql
安装 collectd
# yum -y install libcurl-devel net-snmp-devel protobuf-c-devel yajl-devel
# wget http://collectd.org/files/collectd-5.5.0.tar.gz
# tar zxvf collectd-5.5.0.tar.gz
# cd collectd-5.5.0
# ./cofigure
# make; make install
收集d的配置
Hostname "*****"
FQDNLookup false
BaseDir "/opt/collectd/var/lib/collectd"
PIDFile "/opt/collectd/var/run/collectd.pid"
PluginDir "/opt/collectd/lib/collectd"
TypesDB "/opt/collectd/share/collectd/types.db"
Interval 5
Timeout 5
ReadThreads 10
WriteThreads 10
LoadPlugin syslog
LoadPlugin aggregation
LoadPlugin apache
LoadPlugin cpu
LoadPlugin interface
LoadPlugin load
LoadPlugin mysql
LoadPlugin memory
LoadPlugin write_riemann
<Plugin "aggregation">
<Aggregation>
Plugin "cpu"
Type "cpu"
GroupBy "Host"
GroupBy "TypeInstance"
CalculateAverage true
</Aggregation>
</Plugin>
<Plugin apache>
<Instance "apache">
URL "http://localhost/server-status?auto"
</Instance>
</Plugin>
<Plugin interface>
Interface "eth0"
IgnoreSelected false
</Plugin>
<Plugin mysql>
<Database demo>
Host "localhost"
User "demo"
Password "demo"
Database "demo"
</Database>
</Plugin>
<Plugin write_riemann>
<Node "*****">
Host "{riemann IP アドレス}"
Port 5555
Protocol UDP
StoreRates true
AlwaysAppendDS false
</Node>
Tag "collectd"
</Plugin>
每个客户端启动collectd。
# /etc/init.d/collectd start
使用riemann-dash实时显示指标
在各个视图中显示实时的collectd收集到的指标图表,并在每个视图中添加相应的查询内容。

仪表盘配置 (yí zhì)
{
"server": "{riemann サーバ IP アドレス}:5556",
"server_type": "ws",
"workspaces": [
{
"name": "Riemann",
"view": {
"type": "Balloon",
"weight": 1,
"id": "7fb2428439a04d9238de9f2fd9eff1f5910d2477",
"version": 38,
"child": {
"type": "VStack",
"weight": 1,
"id": "cc696f34d59ea1f1f996c11f09293ba5dd47567d",
"version": 38,
"children": [
{
"type": "HStack",
"weight": 1,
"id": "954004fabce0e68e0e227feb1d30aaf2aa22475b",
"version": 19,
"children": [
{
"type": "VStack",
"weight": 1,
"id": "20fc8f5754b44f9549bf5ba4525284c6017bd15d",
"version": 0,
"children": [
{
"type": "Grid",
"weight": 1,
"id": "3ef8b2f3786e2ab1ba017a66b0e2440b29c11159",
"version": 11,
"title": "Load",
"query": "tagged \"collectd\" and (service =~ \"cpu idle\" or service =~ \"load%\" or service =~ \"mem used\" or service =~ \"swap in\" or service =~ \"swap out\")",
"max": "",
"rows": "",
"cols": "",
"row_sort": "lexical",
"col_sort": "lexical"
}
]
},
{
"type": "VStack",
"weight": 1,
"id": "4e7fc05a9aaa0f2723d4ad5a4e0cdc5e40834cf5",
"version": 5,
"children": [
{
"type": "HStack",
"weight": 1,
"id": "d66f68fdbe079f699944f7d25afe5376e3906fc9",
"version": 1,
"children": [
{
"type": "Grid",
"weight": 1,
"id": "2c6bb867479df2aee7711f719c93b4895cc622c4",
"version": 1,
"title": "Apache",
"query": "tagged \"collectd\" and (service =~ \"apache requests\" or service =~ \"apache connections\" or service =~ \"apache bytes\")",
"max": "",
"rows": "",
"cols": "",
"row_sort": "lexical",
"col_sort": "lexical"
}
]
}
]
},
{
"type": "Grid",
"weight": 1,
"id": "850e41e4c5043cba5a9d332930205428b62a35e2",
"version": 2,
"title": "MySQL",
"query": "tagged \"collectd\" and (service =~ \"mysql insert\" or service =~ \"mysql select\" or service =~ \"mysql write\" or service =~ \"mysql update\" or service =~ \"mysql waited\" or service =~ \"mysql commit\")",
"max": "",
"rows": "",
"cols": "",
"row_sort": "lexical",
"col_sort": "lexical"
}
]
},
{
"type": "HStack",
"weight": 1,
"id": "2aacf97b956c8036d4dbac9947c7ea1e35224811",
"version": 17,
"children": [
{
"type": "Flot",
"weight": 1,
"id": "515eb5172d8d61dd37736a9991151e0c960c0c2e",
"version": 8,
"title": "Network In",
"query": "tagged \"collectd\" and service =~ \"nic%rx octets\"",
"min": null,
"max": null,
"timeRange": 180,
"graphType": "line",
"stackMode": "true"
},
{
"type": "Flot",
"weight": 1,
"id": "3957704a8803eebe37747de3c2a09948955bbbed",
"version": 3,
"title": "Network Out",
"query": "tagged \"collectd\" and service =~ \"nic%tx octets\"",
"min": null,
"max": null,
"timeRange": 180,
"graphType": "line",
"stackMode": "true"
},
{
"type": "Flot",
"weight": 1,
"id": "f2a30d5108a3b5f6b62666e53177f6cf212e3d7d",
"version": 5,
"title": "MySQL In",
"query": "tagged \"collectd\" and service =~ \"mysql-monitor/mysql_octets/rx\"",
"min": null,
"max": null,
"timeRange": 180,
"graphType": "line",
"stackMode": "false"
},
{
"type": "Flot",
"weight": 1,
"id": "8e16ee1e10ae604cb02dcc167ad60de53c49afe5",
"version": 5,
"title": "MySQL Out",
"query": "tagged \"collectd\" and service =~ \"mysql-monitor/mysql_octets/tx\"",
"min": null,
"max": null,
"timeRange": 180,
"graphType": "line",
"stackMode": "false"
}
]
},
{
"type": "View",
"weight": 1,
"id": "b233bf70e76f064a41de2d30f9c0651dd93cd826",
"version": 0
},
{
"type": "View",
"weight": 1,
"id": "719cf6e9ba5f20f5518ecfaaad7ab1c1faa317c1",
"version": 0
},
{
"type": "View",
"weight": 1,
"id": "08bb43ff8ec587f08f459b010b1fad0bdcf9102f",
"version": 0
}
]
}
},
"id": "d189ebe557de98af74b14ed7ccd8581b0522e449"
},
{
"name": "test1",
"view": {
"type": "Balloon",
"weight": 1,
"id": "b23298303d7ebbbfdc575809cc2bb0adae0412ac",
"version": 18,
"child": {
"type": "VStack",
"weight": 1,
"id": "22bb84c8df0c4da31d03ec6a619c2ac119c174e3",
"version": 18,
"children": [
{
"type": "HStack",
"weight": 1,
"id": "82498a58eb9aea04cf12119026dbd3dbf3aa6657",
"version": 16,
"children": [
{
"type": "Grid",
"weight": 1,
"id": "b5b362af5b973fdaa42938bf55115dd52ff1d628",
"version": 3,
"title": "CPU",
"query": "tagged \"collectd\" and service =~ \"cpu%\"",
"max": "",
"rows": "service",
"cols": "host"
},
{
"type": "VStack",
"weight": 1,
"id": "afb874aaf0d3570e5fd77941ba1329649cad0e2e",
"version": 8,
"children": [
{
"type": "Grid",
"weight": 1,
"id": "0b7d19e33b8e024a73c3ff05b23c23eac0e00dde",
"version": 2,
"title": "Memory",
"query": "tagged \"collectd\" and service =~ \"mem%\"",
"max": "",
"rows": "service",
"cols": "host"
},
{
"type": "Grid",
"weight": 1,
"id": "14485dfbf9f3a56254e446bc144d74b3481d716a",
"version": 4,
"title": "Swap",
"query": "tagged \"collectd\" and service =~ \"swap%\"",
"max": "",
"rows": "service",
"cols": "host"
},
{
"type": "Grid",
"weight": 1,
"id": "5a68afca5889e62ccbe0de689eb8b219e1fb0a12",
"version": 3,
"title": "Cassandra",
"query": "tagged \"collectd\" and service =~ \"Cassandra%\"",
"max": "",
"rows": "service",
"cols": "host"
},
{
"type": "View",
"weight": 1,
"id": "43406cd90ac7eccf35b1bb87042c764fb8ac79f6",
"version": 0
}
]
},
{
"type": "Grid",
"weight": 1,
"id": "f65df3ff7e24002766e4b5b2e1e867f5de346f04",
"version": 2,
"title": "Disk",
"query": "tagged \"collectd\" and service =~ \"disk%\"",
"max": "",
"rows": "service",
"cols": "host"
},
{
"type": "VStack",
"weight": 1,
"id": "a0d0d7e9f83c67b3838f9e32c8e8b5d3a92d33ee",
"version": 2,
"children": [
{
"type": "Grid",
"weight": 1,
"id": "270903b0bf5facb7208189511ddf4aa85b0e843e",
"version": 2,
"title": "Apache",
"query": "tagged \"collectd\" and service =~ \"apache%\"",
"max": "",
"rows": "service",
"cols": "host"
},
{
"type": "Grid",
"weight": 1,
"id": "3fd8066915af6d5b9e6e27dbe913b7ac42c5b408",
"version": 1,
"title": "NIC",
"query": "tagged \"collectd\" and service =~ \"nic%\"",
"max": "",
"rows": "service",
"cols": "host"
}
]
},
{
"type": "Grid",
"weight": 1,
"id": "79fb5169a9bbc90c8181e4fe93d34fb725c76192",
"version": 1,
"title": "MySQL",
"query": "tagged \"collectd\" and service =~ \"mysql%\"",
"max": "",
"rows": "service",
"cols": "host"
}
]
}
]
}
},
"id": "8deba7de3aea10b0d432d8c54ab53db6e4c16d2a"
}
]
}
当启动客户端侧的collectd后,节点将自动添加到仪表盘中。仪表盘除了可以显示指标的图形,还可以实时流动日志或进行列表显示。