Logstash 安装使用

发布时间: 更新时间: 总字数:1784 阅读时间:4m 作者:IP:上海 网址

Logstash是一个具有实时流水线功能的开源数据收集引擎,Logstash可以动态地统一来自不同数据源的数据,并将数据规范化到您所选择的目的地,对于各种高级的下游分析和可视化用例清理和统一化所有的数据。本文介绍 Logstash 安装使用。

安装

基于docker安装

docker pull logstash:7.5.1
docker run -d -it --restart=always  \
  --privileged=true \
  --name=logstash \
  -p 5047:5047 \
  -p 9600:9600 \
  logstash:7.5.1

配置

syslog

修改/usr/share/logstash/configlogstash.yml

http.host: "0.0.0.0"
#xpack.monitoring.elasticsearch.hosts: [ "http://elasticsearch:9200" ]
path.config: /usr/share/logstash/config/logstash.conf

创建/usr/share/logstash/config/logstash.conf

input {
  syslog {
    port => 5047
  }
}

output {
  stdout {}
}

重启服务,可以采用如下工具排查:

# yum install nc -y
# nc -uv 127.0.0.1 5047
Ncat: Version 7.50 ( https://nmap.org/ncat )
Ncat: Connected to 127.0.0.1:5047.
s
sdsdf

# logstash 的日志如下
[2018-04-13T08:45:01,956][INFO ][logstash.javapipeline    ] Pipeline started {"pipeline.id"=>"main"}
[2018-04-13T08:45:02,063][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2018-04-13T08:45:02,086][INFO ][logstash.inputs.syslog   ] Starting syslog udp listener {:address=>"0.0.0.0:5047"}
[2018-04-13T08:45:02,118][INFO ][logstash.inputs.syslog   ] Starting syslog tcp listener {:address=>"0.0.0.0:5047"}
[2018-04-13T08:45:02,493][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}


/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/awesome_print-1.7.0/lib/awesome_print/formatters/base_formatter.rb:31: warning: constant ::Fixnum is deprecated
{
    "severity_label" => "Emergency",
        "@timestamp" => 2018-04-13T08:48:23.049Z,
          "severity" => 0,
              "tags" => [
        [0] "_grokparsefailure_sysloginput"
    ],
              "host" => "127.0.0.1",
          "priority" => 0,
           "message" => "s\n",
          "facility" => 0,
          "@version" => "1",
    "facility_label" => "kernel"
}


{
    "severity_label" => "Emergency",
        "@timestamp" => 2018-04-13T08:48:28.829Z,
          "severity" => 0,
              "tags" => [
        [0] "_grokparsefailure_sysloginput"
    ],
              "host" => "127.0.0.1",
          "priority" => 0,
           "message" => "sdsdf\n",
          "facility" => 0,
          "@version" => "1",
    "facility_label" => "kernel"
}

kafka

logstash kafka imput 参数优化:

input{
    kafka{
        bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
        topics => ["nginx"]
        codec => "json"
        auto_offset_reset => "earliest"
        session_timeout_ms => "60000"
        request_timeout_ms => "70000"
        heartbeat_interval_ms => "15000"
    }
}

参数说明:

  • request_timeout_ms 要大于 session_timeout_ms
  • heartbeat_interval_ms 比 session_timeout_ms 小,不能大于 session_timeout_ms 的 1/3

针对海量小文件场景定制的 Input 配置模板

  1. 操作系统层面限制(极其关键,否则配置无效)

由于我们将文件句柄数提升到了 5000,必须修改运行 Logstash 的系统用户的最大文件打开数。 编辑 /etc/security/limits.conf,在末尾添加:

text
# 假设使用 logstash 用户运行服务
logstash soft nofile 65535
logstash hard nofile 65535

修改后需重新登录 logstash 用户或重启服务生效,可通过 su - logstash -c "ulimit -n" 验证

  1. 全局参数配置 (logstash.yml)

主要调整 Pipeline 的批处理机制,使其更适应小文件的“零碎”数据流。

yaml
# ======================== Logstash 全局配置 ========================

# 降低批次大小:小文件单文件事件少,减小 batch 避免长时间等不到凑满而触发超时
pipeline.batch.size: 500

# 增加延迟容忍:多等 50ms,尽可能把各个小文件收集到的零散日志合到一个批次,减少对下游 ES 的 IO 压力
pipeline.batch.delay: 50

# 建议配置:指定工作线程数(通常等同于 CPU 核心数,若机器专门做采集,可适度上调)
# pipeline.workers: 4
  1. JVM 参数配置 (jvm.options)

同时保持 5000 个文件处于打开状态会占用一定量的内存,建议至少分配 2GB 堆内存(如果服务器资源充足,推荐 4GB)。

text
-Xms2g
-Xmx2g
  1. 管道采集配置 (logstash.conf)

这是针对海量小文件场景定制的 Input 配置模板,同时包含了通用的 Filter 和 Output 骨架。

ruby
# ======================== 管道配置 ========================

input {
  file {
    # 监听的目录,支持通配符
    path => "/your/log/path/**/*.log"

    # 首次读取从头开始(结合 sincedb 记录位置)
    start_position => "beginning"

    # --------------------------------------------------------
    # 【海量小文件专项优化参数】
    # --------------------------------------------------------

    # 1. 大幅提升最大并发打开文件数,解决小文件排队读取瓶颈 (需配合 ulimit -n 使用)
    max_open_files => 5000

    # 2. 扫描新文件间隔:统一延长至 10s,防止在高频生成小文件时导致目录扫描引起的极高磁盘 IO
    discover_interval => 10

    # 3. 检查文件追加间隔:缩短至 3s,提高准实时性(若磁盘 IO 出现瓶颈,可退回 5s)
    stat_interval => 3

    # 4. 读取块大小:由默认 128KB 降至 64KB。控制内存占用,防止 5000 个文件同时读取时引发 JVM 内存溢出
    file_chunk_size => 65536

    # 5. 关闭不活跃文件的超时时间:根据业务情况调整。
    # 默认是 3600s (1小时),如果小文件写完后不再更新,建议调短以尽早释放句柄给新文件
    close_older => "10m"

    # --------------------------------------------------------
    # 【多行日志合并优化】
    # --------------------------------------------------------
    codec => multiline {
      # 替换为你的实际多行正则,此处以 "I/E/W/F/D + 6位时间码" 作为新行的开头为例
      pattern => "^[IEWFD]\d{6}"
      negate => true
      what => "previous"

      # 极其关键:防止小文件写入完成后,最后一条(或者唯一一条)多行日志永远滞留在内存中丢失
      auto_flush_interval => 3
    }
  }
}

filter {
  # 此处添加具体的日志解析逻辑
  # 例如:
  # grok { ... }
  # date { ... }
  # mutate { ... }
}

output {
  # 统一批量写入 ES
  elasticsearch {
    hosts => ["http://your-es-cluster:9200"]
    index => "app-logs-%{+YYYY.MM.dd}"

    # 认证配置(如有)
    # user => "elastic"
    # password => "your_password"
  }

  # 调试时可开启控制台输出(生产环境务必注释掉)
  # stdout { codec => rubydebug }
}

附加建议与说明

  1. close_older 的配合使用: 在模板中我新增了 close_older => "10m"。在海量小文件场景中,许多文件生成写入几分钟后就成了“死文件”(永远不再写入)。Logstash 默认会维持这个文件的句柄长达 1 小时 (3600s)。将其缩短(例如 10 分钟或 5 分钟),可以更快速地释放宝贵的 max_open_files 额度给源源不断产生的新小文件。
  2. sincedb 文件的存储: 对于海量文件,Logstash 用于记录读取进度的 .sincedb 文件会变得非常大。默认情况下,它存储在 Logstash 的数据目录下。如果服务器崩溃重启,读取庞大的 sincedb 文件也需要耗时,属于正常现象。
  3. 监控观测: 部署后,建议首日重点观察两个指标:
    • 宿主机 CPU IO-Wait ( %iowait ) —— 确认 stat_interval=3discover_interval=10 的组合对硬盘压力是否在可接受范围内。
    • JVM 堆内存监控 —— 确认 5000 个并发读取通道开启时,GC 频率是否健康。如果频繁发生老年代 GC,则需要调大堆内存。

参考

  1. https://www.elastic.co/guide/en/logstash/5.6/plugins-inputs-kafka.html#plugins-inputs-kafka-session_timeout_ms
  2. https://www.elastic.co/guide/en/logstash/5.6/plugins-inputs-kafka.html#plugins-inputs-kafka-request_timeout_ms
  3. https://www.elastic.co/guide/en/logstash/5.6/plugins-inputs-kafka.html#plugins-inputs-kafka-heartbeat_interval_ms