【Logstash】的安装(Linux)和与PostgreSQL的连接、CSV导入方法

1. 进行 Logstash 的安装和基本设置。

下载

请从以下URL进行下载:
https://www.elastic.co/jp/downloads/past-releases/logstash-7-10-0

解凍 (jiě – 正在解凍 jiě -> Defrosting

解凍。

tar -zxvf logstash-7.10.0-linux-x86_64.tar.gz

设置文件(logstash.yml)的配置

如果您没有为Elasticsearch引入xpack,那么默认情况下是没有问题的。

如果在Elasticsearch中引入了xpack,则需要下面的设置:
xpack.monitoring.elasticsearch.hosts: [ “http://localhost:9200” ]
xpack.monitoring.elasticsearch.username: logstash_system
xpack.monitoring.elasticsearch.password: password99打开配置文件,并添加上述设置
cd /logstash-7.10.0/config
vi logstash.yml

设置追加后的logstash.yml
(点此查看)logstash.yml
# YAML格式的设置文件
#
# 设置可以以层级形式指定,例如:
#
# pipeline:
# batch:
# size: 125
# delay: 5
#
# 或者以平面键形式指定,例如:
#
# pipeline.batch.size: 125
# pipeline.batch.delay: 5
#
# ———— 节点标识 ————
#
# 使用描述性名称指定节点:
#
# node.name: test
#
# 如果省略,则节点名称将默认为计算机的主机名
#
# ———— 数据路径 ————–
#
# logstash及其插件应使用的目录,以满足任何持久化需求。默认为LOGSTASH_HOME/data
#
# path.data:
#
# ———— Pipeline设置 ————–
#
# pipeline的ID。
#
# pipeline.id: main
#
# 设置并行执行管道的过滤器+输出阶段的工作进程数。
#
# 这默认为主机的CPU核心数。
#
# pipeline.workers: 2
#
# 从输入中检索多少个事件,然后发送到过滤器+工作进程
#
# pipeline.batch.size: 125
#
# 等待毫秒数,然后轮询下一个事件,然后将大小不足的批次发送到过滤器+输出
#
# pipeline.batch.delay: 50
#
# 在关闭期间强制Logstash退出,即使内存中仍有未处理的事件。默认情况下,logstash将拒绝退出,直到所有已接收的事件都被推送到输出。
#
# 警告:启用此功能可能会导致关闭期间数据丢失
#
# pipeline.unsafe_shutdown: false
#
# 设置管道事件排序。选项为”auto”(默认值),”true”或”false”。
# 如果’pipeline.workers’设置也设置为’1’,则”auto”将自动启用排序。
# “true”会强制对管道进行排序,并阻止logstash在存在多个工作进程时启动。
# “false”将禁用为保留顺序而执行的任何额外处理。
#
pipeline.ordered: auto
#
# ———— 管道配置设置 ————–
#
# 从主管道获取管道配置的位置
#
# path.config:
#
# 主管道的管道配置字符串
#
# config.string:
#
# 在启动时,测试配置是否有效并退出(模拟运行)
#
# config.test_and_exit: false
#
# 定期检查配置是否更改并重新加载管道
# 这也可以通过SIGHUP信号手动触发
#
# config.reload.automatic: false
#
# 检查管道配置是否更改的频率(以秒为单位)
# 注意,必须提供单位值(s)。没有限定词的值(例如60)
# 将被视为纳秒。
# 不建议以这种方式设置间隔,以免在以后的版本中发生更改。
#
# config.reload.interval: 3s
#
# 在调试日志消息中显示完全编译的配置
# 注意:–log.level必须为’debug’
#
# config.debug: false
#
# 启用时,在管道配置文件中处理转义字符,如\n和\”。
#
# config.support_escapes: false
#
# ———— HTTP API设置 ————-
# 在这里定义与HTTP API有关的设置。
#
# HTTP API默认已启用。可以禁用它,但依赖它的功能将无法正常工作。
# http.enabled: true
#
# 默认情况下,HTTP API仅绑定到主机的本地回环接口,
# 确保它不对网络中的其他部分可访问。因为API
# 既没有身份验证也没有授权,而且未经硬化或
# 为公开可达的API测试,所以应尽量避免绑定到公开可访问的IP上。
#
# http.host: 127.0.0.1
#
# HTTP API Web服务器将在给定范围中的可用端口上侦听。
# 值可以指定为单个端口(例如`9600`)或一个包含范围
# 端口的包容范围(例如`9600-9700`)。
#
# http.port: 9600-9700
#
# ———— 模块设置 —————
# 在这里定义模块。模块定义必须定义为数组。
# 简单的方法是在每个`name`前加上`-`,并将
# 所有相关变量保留在与其关联的`name`下方,
# 并在下一个`name`上方,例如:
#
# modules:
# – name: MODULE_NAME
# var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE
# var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE
# var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE
# var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE
#
# 模块变量名称必须采用以下格式
#
# var.PLUGIN_TYPE.PLUGIN_NAME.KEY
#
#

2. 数据协同

2-1. 对于CSV数据来说

准备用于协作的CSV文件。

在这里准备两个文件。

Id,Title,Context,UserId,UpdateDate
0001,This section is Title1.,This section is context1.,1,2023-09-11 10:00:00
0002,This section is Title2.,This section is context2.,1,2023-09-11 11:00:00
Id,Title,Context,UserId,UpdateDate
0003,This section is Title3.,This section is context4.,3,2023-09-11 12:00:00
0004,This section is Title4.,This section is context3.,4,2023-09-11 13:00:00

准备用于导入的设置文件。

文件名可以任意指定(在这里,已经写明为logstash_forCSV.conf)
筛选器 => columns 是将CSV文件的列名与之匹配
筛选器 {
csv {
columns => [“Id”, “Title”, “Context”,”UserId”,”UpdateDate”]
convert => {
“UserId” => “integer”
}
skip_header => true
}
date {
match => [“UpdateDate”, “yyyy-MM-dd HH:mm:ss”]
}
}
输出 => elasticsearch => 用户应指定”elastic”
输出 {
elasticsearch {hosts => [“http://localhost:9200”]
action => “update”
index => “test”
doc_as_upsert => true
document_id => “%{doc_id}”
user => “elastic”
password => “password99”
}
stdout {
codec => rubydebug
}
}

如果想指定_id,设置以下三个项目:
action => “update”
doc_as_upsert => true
document_id => “%{doc_id}”
日志记录配置文件logstash_forCsv.conf
input {
file {
mode => “tail”
path => [“/home/hoge/work/testData/*_test.csv”]
sincedb_path => “/home/hoge/local_dev/logstash-7.10.0/log/sincedb”
start_position => “beginning”
codec => plain {
charset => “UTF-8”
}
}
}filter {
csv {
columns => [“Id”, “Title”, “Context”,”UserId”,”UpdateDate”]
convert => {
“UserId” => “integer”
}
skip_header => true
}
date {
match => [“UpdateDate”, “yyyy-MM-dd HH:mm:ss”]
}
}

output {
elasticsearch {
hosts => [“http://localhost:9200”]
action => “update”
index => “test”
doc_as_upsert => true
document_id => “%{doc_id}”
user => “elastic”
password => “password99”
}
stdout {
codec => rubydebug
}
}

在PostgreSQL的情况下

准备用于PostgreSQL的JDBC。

请点击此链接下载 PostgreSQL JDBC 驱动:https://jdbc.postgresql.org/download/。

准备用于导入的设置文件。

文件名可以任意指定(此处使用logstash_postgres.conf中指定的文件名)
使用jdbc连接pg数据库,设置连接信息和SQL语句。
输入 {
jdbc {
jdbc_driver_library => [“/home/hogehoge/logstash-7.10.0/lib/postgreJDBC/42.6.0/postgresql-42.6.0.jar”]
jdbc_driver_class => “org.postgresql.Driver”
jdbc_connection_string => “jdbc:postgresql://localhost:5432/dbname”
jdbc_user => “user”
jdbc_password => “password”tracking_column => “updated_at”
use_column_value => true
tracking_column_type => “timestamp”
schedule => “*/10 * * * * *”

statement => ”
SELECT col_uk, col1,col2,col3
FROM test_table

}
}

输出 => elasticsearch => 用户应指定”elastic”。
如果想要指定document_id,可以使用%{列名}进行设置。
output {
elasticsearch {hosts => [“http://localhost:9200”]
action => “update”
index => “test_index”
doc_as_upsert => true
document_id => “%{col_uk}”
user => “elastic”
password => “password99”

}
stdout {
codec => rubydebug
}
}

样本:用于Postgre联结的配置文件 logstash_postgre.conf(点击这里打开)

logstash_postgre.conf
输入 {
jdbc {
jdbc_driver_library => [“/home/hogehoge/logstash-7.10.0/lib/postgreJDBC/42.6.0/postgresql-42.6.0.jar”]
jdbc_driver_class => “org.postgresql.Driver”
jdbc_connection_string => “jdbc:postgresql://localhost:5432/dbname”
jdbc_user => “user”
jdbc_password => “password”

tracking_column => “updated_at”
use_column_value => true
tracking_column_type => “timestamp”
schedule => “*/10 * * * * *”

statement => ”
SELECT col_uk, col1,col2,col3
FROM test_table

}
}
过滤器 {
}

输出 {
elasticsearch {

hosts => [“http://localhost:9200”]
action => “update”
index => “test_index”
doc_as_upsert => true
document_id => “%{col_uk}”
user => “elastic”
password => “password99”

}
stdout {
codec => rubydebug
}
}

3. 执行

在logstash的目录中执行(将要执行的配置文件作为参数传递)。

bin/logstash -f config/logstash_postgre.conf

4.确认的方式

使用kibana启动
http://(kibana网址):5601/app/home#/

image.png
image.png
广告
将在 10 秒后关闭
bannerAds