Stream audit logs to Kafka
The following is a sample Logstash pipeline for streaming Intelligence Center audit trail logs to Kafka.
# Optional pipeline used to read data from Elasticsearch audit index and publish to a Kafka topic.
# To enable this pipeline, drop this file into Logstash pipelines directory. For more details about where
# pipeline configuration files live check this out https://www.elastic.co/guide/en/logstash/7.17/dir-layout.html
# Configuration:
# - Make sure you've set "KAFKA_HOSTS" environment variable. You should set this in the following file
# /etc/systemd/system/logstash.service.d/20-eclecticiq-env-vars.conf
input
{
elasticsearch {
docinfo
=
> true
docinfo_target
=
>
"[@metadata][doc_meta]"
# "hosts" can be an array of hosts or a single host.
hosts
=
>
"${ES_HOST}:${ES_PORT}"
index
=
>
"audit*"
query
=
>
'{"query": {"bool": {"must_not": [{"match": {"sent_to_kafka": "true"}}]}}, "sort": {"timestamp": "asc"}}'
# run every 10 minutes
schedule
=
>
"*/10 * * * *"
type
=
> audit_kafka
}
}
filter
{
# add the input source to @metadata if it's not set yet
if
![@metadata][source] {
mutate {
add_field
=
> {
"[@metadata][source]"
=
>
"%{[type]}"
}
}
}
# To avoid processing the same document multiple times the following strategy is used:
# 1. Clone the current event. Then we'll have 2 events instead of 1.
# 2. For the clone event (type => es) add a new field "sent_to_kafka => true"
# 3. Drop useless fields added by Logstash from both events (@version, @timestamp, type, tags)
# 4. Update the document within ES using the event with "sent_to_kafka" field.
# 5. Send the original event to Kafka.
# apply filters for audit events only
if
[@metadata][source]
=
=
"audit_kafka"
{
clone {
clones
=
> [
"es"
]
ecs_compatibility
=
> disabled
}
if
[
type
]
=
=
"es"
{
mutate {
add_field
=
> {
"sent_to_kafka"
=
> true}
}
}
mutate {
remove_field
=
> [
"@version"
,
"@timestamp"
,
"type"
,
"tags"
,
"sent_to_es"
]
}
}
}
output {
if
[@metadata][source]
=
=
"audit_kafka"
{
if
[sent_to_kafka]
=
=
"true"
{
# update documents in ES, marking it as processed
elasticsearch {
hosts
=
>
"${ES_HOST}:${ES_PORT}"
index
=
>
"%{[@metadata][doc_meta][_index]}"
action
=
> update
document_id
=
>
"%{[@metadata][doc_meta][_id]}"
}
}
else
{
kafka {
codec
=
> json
topic_id
=
>
"eclecticiq_audit"
# bootstrap_servers is a comma-separated list of hosts.
# e.g.: host1:port1,host2:port2
bootstrap_servers
=
>
"${KAFKA_HOSTS}"
}
}
}
}