如何集成FileBeat与logstash解码audit日志?
- By : Will
- Category : Elastic Stack
Elastic Stack
1 前言
一个问题,一篇文章,一出故事。
笔者遇到Audit日志上传到Elasticsearch后检索发现内容为编码的文本,导致检索不到日志。
于是我们需要将通过FileBeat上传的日志经过logstash解码后再存储到Elasticsearch以利于检索。
另外,我们通过Python找到了解码关键字段的方法,详细如下:
本章将参考以上方法在logstash中实现该功能。
2 最佳实践
2.1 测试环境
2.2 配置FileBeat
2.2.1 创建配置文件
vim /etc/filebeat/filebeat.yml
修改为如下配置,
filebeat.modules:
- module: auditd
log:
enabled: true
var.paths: ["/var/log/audit/audit.log*"]
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 3
setup.kibana:
output.logstash:
hosts: ["logstash.cmdschool.org:5050"]
ssl.enabled: false
ssl.verification_mode: none
ssl.certificate_authorities: ["/etc/pki/tls/certs/ca.crt"]
ssl.certificate: "/etc/pki/tls/certs/logstash.cmdschool.org.crt"
ssl.key: "/etc/pki/tls/private/logstash.cmdschool.org.key"
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
测试配置文件
filebeat test config -c /etc/filebeat/filebeat.yml
2.2.2 重启使配置生效
systemctl restart filebeat systemctl status filebeat
2.3 配置logstash
2.3.1 创建配置文件
vim /etc/logstash/conf.d/sftp.cmdschoo.org_5050.conf
配置修改如下,
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 5050
type => "5050"
ssl => false
ssl_certificate_authorities => ["/etc/pki/tls/certs/ca.crt"]
ssl_certificate => "/etc/pki/tls/certs/logstash.cmdschool.org.crt"
ssl_key => "/etc/pki/tls/private/logstash.cmdschool.org.key"
#ssl_verify_mode => "force_peer"
ssl_verify_mode => "none"
}
}
# 新增filter部分:核心解码逻辑
filter {
# 1. 提取name字段的值(匹配日志中name=xxx的格式)
grok {
match => { "message" => "name=(?<message_name.raw>[^\s]+)" }
tag_on_failure => ["_grok_name_failure"] # 匹配失败时打标签,便于排查
}
# 2. Ruby过滤器:判断并解码十六进制字符串
ruby {
code => '
# 获取原始name字段值(若grok匹配失败则为空)
raw_name = event.get("message_name.raw")
if raw_name && !raw_name.empty?
# 定义十六进制判断规则:仅含0-9/A-F/a-f,且长度为偶数
hex_pattern = /^[0-9a-fA-F]+$/
is_hex = raw_name.match?(hex_pattern) && raw_name.length.even?
if is_hex
begin
# 等价Python的binascii.unhexlify:十六进制转二进制
binary_data = [raw_name].pack("H*")
# 等价Python的replace(b"\x00", b" "):替换空字节为空格
binary_data = binary_data.gsub("\x00", " ")
# 等价Python的decode("utf-8","replace"):UTF-8解码,错误用?替换
decoded_name = binary_data.force_encoding("UTF-8").encode("UTF-8", replace: "?")
event.set("message_name.decoded", decoded_name)
rescue => e
# 解码异常时保留原始值,并打标签
event.set("message_name.decoded", raw_name)
event.tag("_hex_decode_failure")
event.set("message_name.decode_error", e.message)
end
else
# 非十六进制字符串,直接赋值
event.set("message_name.decoded", raw_name)
end
end
'
}
}
output {
stdout { codec => rubydebug }
if [type] == "5050" {
elasticsearch {
hosts => ["http://azelasticsearch01:9200", "http://azelasticsearch02:9200", "http://azelasticsearch03:9200", "http://azelasticsearch04:9200", "http://azelasticsearch04:9200"]
index => "sftp-%{+YYYY.MM.dd}"
user => "elastic"
password => "elasticpwd"
}
}
}
2.3.2 重启使配置生效
systemctl restart logstash systemctl status logstash
2.4 Kibana的查询方法

参阅文档
====================
https://www.elastic.co/docs/reference/beats/filebeat/filebeat-module-auditd
https://www.doubao.com/thread/w4922391f991615ac
没有评论