Logstash是一个具有实时流水线功能的开源数据收集引擎,Logstash可以动态地统一来自不同数据源的数据,并将数据规范化到您所选择的目的地,对于各种高级的下游分析和可视化用例清理和统一化所有的数据。本文介绍 Logstash 安装使用。
安装
基于docker安装
docker pull logstash:7.5.1
docker run -d -it --restart=always \
--privileged=true \
--name=logstash \
-p 5047:5047 \
-p 9600:9600 \
logstash:7.5.1配置
syslog
修改/usr/share/logstash/configlogstash.yml
http.host: "0.0.0.0"
#xpack.monitoring.elasticsearch.hosts: [ "http://elasticsearch:9200" ]
path.config: /usr/share/logstash/config/logstash.conf创建/usr/share/logstash/config/logstash.conf
input {
syslog {
port => 5047
}
}
output {
stdout {}
}重启服务,可以采用如下工具排查:
# yum install nc -y
# nc -uv 127.0.0.1 5047
Ncat: Version 7.50 ( https://nmap.org/ncat )
Ncat: Connected to 127.0.0.1:5047.
s
sdsdf
# logstash 的日志如下
[2018-04-13T08:45:01,956][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"main"}
[2018-04-13T08:45:02,063][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2018-04-13T08:45:02,086][INFO ][logstash.inputs.syslog ] Starting syslog udp listener {:address=>"0.0.0.0:5047"}
[2018-04-13T08:45:02,118][INFO ][logstash.inputs.syslog ] Starting syslog tcp listener {:address=>"0.0.0.0:5047"}
[2018-04-13T08:45:02,493][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/awesome_print-1.7.0/lib/awesome_print/formatters/base_formatter.rb:31: warning: constant ::Fixnum is deprecated
{
"severity_label" => "Emergency",
"@timestamp" => 2018-04-13T08:48:23.049Z,
"severity" => 0,
"tags" => [
[0] "_grokparsefailure_sysloginput"
],
"host" => "127.0.0.1",
"priority" => 0,
"message" => "s\n",
"facility" => 0,
"@version" => "1",
"facility_label" => "kernel"
}
{
"severity_label" => "Emergency",
"@timestamp" => 2018-04-13T08:48:28.829Z,
"severity" => 0,
"tags" => [
[0] "_grokparsefailure_sysloginput"
],
"host" => "127.0.0.1",
"priority" => 0,
"message" => "sdsdf\n",
"facility" => 0,
"@version" => "1",
"facility_label" => "kernel"
}kafka
logstash kafka imput 参数优化:
input{
kafka{
bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
topics => ["nginx"]
codec => "json"
auto_offset_reset => "earliest"
session_timeout_ms => "60000"
request_timeout_ms => "70000"
heartbeat_interval_ms => "15000"
}
}参数说明:
- request_timeout_ms 要大于 session_timeout_ms
- heartbeat_interval_ms 比 session_timeout_ms 小,不能大于 session_timeout_ms 的 1/3
针对海量小文件场景定制的 Input 配置模板
- 操作系统层面限制(极其关键,否则配置无效)
由于我们将文件句柄数提升到了 5000,必须修改运行 Logstash 的系统用户的最大文件打开数。
编辑 /etc/security/limits.conf,在末尾添加:
# 假设使用 logstash 用户运行服务
logstash soft nofile 65535
logstash hard nofile 65535修改后需重新登录 logstash 用户或重启服务生效,可通过 su - logstash -c "ulimit -n" 验证
- 全局参数配置 (
logstash.yml)
主要调整 Pipeline 的批处理机制,使其更适应小文件的“零碎”数据流。
# ======================== Logstash 全局配置 ========================
# 降低批次大小:小文件单文件事件少,减小 batch 避免长时间等不到凑满而触发超时
pipeline.batch.size: 500
# 增加延迟容忍:多等 50ms,尽可能把各个小文件收集到的零散日志合到一个批次,减少对下游 ES 的 IO 压力
pipeline.batch.delay: 50
# 建议配置:指定工作线程数(通常等同于 CPU 核心数,若机器专门做采集,可适度上调)
# pipeline.workers: 4- JVM 参数配置 (
jvm.options)
同时保持 5000 个文件处于打开状态会占用一定量的内存,建议至少分配 2GB 堆内存(如果服务器资源充足,推荐 4GB)。
-Xms2g
-Xmx2g- 管道采集配置 (
logstash.conf)
这是针对海量小文件场景定制的 Input 配置模板,同时包含了通用的 Filter 和 Output 骨架。
# ======================== 管道配置 ========================
input {
file {
# 监听的目录,支持通配符
path => "/your/log/path/**/*.log"
# 首次读取从头开始(结合 sincedb 记录位置)
start_position => "beginning"
# --------------------------------------------------------
# 【海量小文件专项优化参数】
# --------------------------------------------------------
# 1. 大幅提升最大并发打开文件数,解决小文件排队读取瓶颈 (需配合 ulimit -n 使用)
max_open_files => 5000
# 2. 扫描新文件间隔:统一延长至 10s,防止在高频生成小文件时导致目录扫描引起的极高磁盘 IO
discover_interval => 10
# 3. 检查文件追加间隔:缩短至 3s,提高准实时性(若磁盘 IO 出现瓶颈,可退回 5s)
stat_interval => 3
# 4. 读取块大小:由默认 128KB 降至 64KB。控制内存占用,防止 5000 个文件同时读取时引发 JVM 内存溢出
file_chunk_size => 65536
# 5. 关闭不活跃文件的超时时间:根据业务情况调整。
# 默认是 3600s (1小时),如果小文件写完后不再更新,建议调短以尽早释放句柄给新文件
close_older => "10m"
# --------------------------------------------------------
# 【多行日志合并优化】
# --------------------------------------------------------
codec => multiline {
# 替换为你的实际多行正则,此处以 "I/E/W/F/D + 6位时间码" 作为新行的开头为例
pattern => "^[IEWFD]\d{6}"
negate => true
what => "previous"
# 极其关键:防止小文件写入完成后,最后一条(或者唯一一条)多行日志永远滞留在内存中丢失
auto_flush_interval => 3
}
}
}
filter {
# 此处添加具体的日志解析逻辑
# 例如:
# grok { ... }
# date { ... }
# mutate { ... }
}
output {
# 统一批量写入 ES
elasticsearch {
hosts => ["http://your-es-cluster:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
# 认证配置(如有)
# user => "elastic"
# password => "your_password"
}
# 调试时可开启控制台输出(生产环境务必注释掉)
# stdout { codec => rubydebug }
}附加建议与说明
close_older的配合使用: 在模板中我新增了close_older => "10m"。在海量小文件场景中,许多文件生成写入几分钟后就成了“死文件”(永远不再写入)。Logstash 默认会维持这个文件的句柄长达 1 小时 (3600s)。将其缩短(例如 10 分钟或 5 分钟),可以更快速地释放宝贵的max_open_files额度给源源不断产生的新小文件。- sincedb 文件的存储:
对于海量文件,Logstash 用于记录读取进度的
.sincedb文件会变得非常大。默认情况下,它存储在 Logstash 的数据目录下。如果服务器崩溃重启,读取庞大的 sincedb 文件也需要耗时,属于正常现象。 - 监控观测:
部署后,建议首日重点观察两个指标:
- 宿主机 CPU IO-Wait (
%iowait) —— 确认stat_interval=3和discover_interval=10的组合对硬盘压力是否在可接受范围内。 - JVM 堆内存监控 —— 确认 5000 个并发读取通道开启时,GC 频率是否健康。如果频繁发生老年代 GC,则需要调大堆内存。
- 宿主机 CPU IO-Wait (
参考
- https://www.elastic.co/guide/en/logstash/5.6/plugins-inputs-kafka.html#plugins-inputs-kafka-session_timeout_ms
- https://www.elastic.co/guide/en/logstash/5.6/plugins-inputs-kafka.html#plugins-inputs-kafka-request_timeout_ms
- https://www.elastic.co/guide/en/logstash/5.6/plugins-inputs-kafka.html#plugins-inputs-kafka-heartbeat_interval_ms