如何集成FileBeat与logstash解码audit日志?

Elastic Stack

1 前言

一个问题,一篇文章,一出故事。
笔者遇到Audit日志上传到Elasticsearch后检索发现内容为编码的文本,导致检索不到日志。
于是我们需要将通过FileBeat上传的日志经过logstash解码后再存储到Elasticsearch以利于检索。
另外,我们通过Python找到了解码关键字段的方法,详细如下:

如何解码audit日志的name字符串?


本章将参考以上方法在logstash中实现该功能。

2 最佳实践

2.1 测试环境

如何部署带安全认证的Filebeat与logstash集成?

2.2 配置FileBeat

2.2.1 创建配置文件

vim /etc/filebeat/filebeat.yml

修改为如下配置,

filebeat.modules:
- module: auditd
  log:
    enabled: true
    var.paths: ["/var/log/audit/audit.log*"]

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 3
setup.kibana:

output.logstash:
  hosts: ["logstash.cmdschool.org:5050"]
  ssl.enabled: false
  ssl.verification_mode: none
  ssl.certificate_authorities: ["/etc/pki/tls/certs/ca.crt"]
  ssl.certificate: "/etc/pki/tls/certs/logstash.cmdschool.org.crt"
  ssl.key: "/etc/pki/tls/private/logstash.cmdschool.org.key"

processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~

测试配置文件

filebeat test config -c /etc/filebeat/filebeat.yml

2.2.2 重启使配置生效

systemctl restart filebeat
systemctl status filebeat

2.3 配置logstash

2.3.1 创建配置文件

vim /etc/logstash/conf.d/sftp.cmdschoo.org_5050.conf

配置修改如下,

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.


input {
  beats {
    port => 5050
    type => "5050"
    ssl => false
    ssl_certificate_authorities => ["/etc/pki/tls/certs/ca.crt"]
    ssl_certificate => "/etc/pki/tls/certs/logstash.cmdschool.org.crt"
    ssl_key => "/etc/pki/tls/private/logstash.cmdschool.org.key"
    #ssl_verify_mode => "force_peer"
    ssl_verify_mode => "none"
  }
}

# 新增filter部分:核心解码逻辑
filter {
  # 1. 提取name字段的值(匹配日志中name=xxx的格式)
  grok {
    match => { "message" => "name=(?<message_name.raw>[^\s]+)" }
    tag_on_failure => ["_grok_name_failure"]  # 匹配失败时打标签,便于排查
  }

  # 2. Ruby过滤器:判断并解码十六进制字符串
  ruby {
    code => '
      # 获取原始name字段值(若grok匹配失败则为空)
      raw_name = event.get("message_name.raw")
      if raw_name && !raw_name.empty?
        # 定义十六进制判断规则:仅含0-9/A-F/a-f,且长度为偶数
        hex_pattern = /^[0-9a-fA-F]+$/
        is_hex = raw_name.match?(hex_pattern) && raw_name.length.even?

        if is_hex
          begin
            # 等价Python的binascii.unhexlify:十六进制转二进制
            binary_data = [raw_name].pack("H*")
            # 等价Python的replace(b"\x00", b" "):替换空字节为空格
            binary_data = binary_data.gsub("\x00", " ")
            # 等价Python的decode("utf-8","replace"):UTF-8解码,错误用?替换
            decoded_name = binary_data.force_encoding("UTF-8").encode("UTF-8", replace: "?")
            event.set("message_name.decoded", decoded_name)
          rescue => e
            # 解码异常时保留原始值,并打标签
            event.set("message_name.decoded", raw_name)
            event.tag("_hex_decode_failure")
            event.set("message_name.decode_error", e.message)
          end
        else
          # 非十六进制字符串,直接赋值
          event.set("message_name.decoded", raw_name)
        end
      end
    '
  }
}

output {
  stdout { codec => rubydebug }
  if [type] == "5050" {
    elasticsearch {
      hosts => ["http://azelasticsearch01:9200", "http://azelasticsearch02:9200", "http://azelasticsearch03:9200", "http://azelasticsearch04:9200", "http://azelasticsearch04:9200"]
      index => "sftp-%{+YYYY.MM.dd}"
      user => "elastic"
      password => "elasticpwd"
    }
  }
}

2.3.2 重启使配置生效

systemctl restart logstash
systemctl status logstash

2.4 Kibana的查询方法

参阅文档
====================
https://www.elastic.co/docs/reference/beats/filebeat/filebeat-module-auditd
https://www.doubao.com/thread/w4922391f991615ac

没有评论

发表回复

Elastic Stack
如何缩减Elasticsearch索引分片?

1 前言 一个问题,一篇文章,一出故事。 笔者集群有5个节点,每个节点总体磁盘空间是4.5T。当前每 …

Elastic Stack
如何设置Elasticsearch索引为只读?

1 前言 一个问题,一篇文章,一出故事。 基于集群性能的需求,笔者想通过将旧索切换到温暖阶段,设置为 …

Elastic Stack
如何设置Filebeat上传Keycloak的Java类型日志?

1 前言 一个问题,一篇文章,一出故事。 今天遇到需要设置Filebeat上传Java类型的日志,于 …