Stream audit logs to Elasticsearch

The following is a sample Logstash pipeline for streaming Intelligence Center audit trail logs to a remote Elasticsearch instance.

# Optional pipeline used to read data from Elasticsearch audit index and publish to a Elasticseach index.
# To enable this pipeline, drop this file into Logstash pipelines directory. For more details about where
# pipeline configuration files live check this out https://www.elastic.co/guide/en/logstash/7.17/dir-layout.html
 
# Configuration:
# - Make sure you've set "AUDIT_ES_HOSTS" environment variable. You should set this in the following file
# /etc/systemd/system/logstash.service.d/20-eclecticiq-env-vars.conf
 
input {
elasticsearch {
docinfo => true
docinfo_target => "[@metadata][doc_meta]"
# "hosts" can be an array of hosts or a single host.
hosts => "${ES_HOST}:${ES_PORT}"
index => "audit*"
query => '{"query": {"bool": {"must_not": [{"match": {"sent_to_es": "true"}}]}}, "sort": {"timestamp": "asc"}}'
# run every 10 minutes
schedule => "*/10 * * * *"
type => audit_es
}
}
 
filter {
# add the input source to @metadata if it's not set yet
if ![@metadata][source] {
mutate {
add_field => {"[@metadata][source]" => "%{[type]}"}
}
}
 
# To avoid processing the same document multiple times the following strategy is used:
# 1. Clone the current event. Then we'll have 2 events instead of 1.
# 2. For the clone event (type => es) add a new field "sent_to_es => true"
# 3. Drop useless fields added by Logstash from both events (@version, @timestamp, type, tags)
# 4. Update the document within ES using the event with "sent_to_es" field.
# 5. Send the original event to Kafka.
 
# apply filters for audit events only
if [@metadata][source] == "audit_es" {
clone {
clones => ["es"]
ecs_compatibility => disabled
}
if [type] == "es" {
mutate {
add_field => {"sent_to_es" => true}
}
}
mutate {
add_field => { "[@metadata][target_index]" => "eiq-audit-%{+yyyy-MM}" }
remove_field => ["@version", "@timestamp", "type", "tags", "sent_to_kafka"]
}
}
}
 
output {
if [@metadata][source] == "audit_es" {
if [sent_to_es] == "true" {
# update documents in ES, marking it as processed
elasticsearch {
hosts => "${ES_HOST}:${ES_PORT}"
index => "%{[@metadata][doc_meta][_index]}"
action => update
document_id => "%{[@metadata][doc_meta][_id]}"
}
}
else {
elasticsearch {
hosts => "${AUDIT_ES_HOSTS}"
index => "%{[@metadata][target_index]}"
template_overwrite => true
}
}
}
}