Stream audit logs to Elasticsearch
The following is a sample Logstash pipeline for streaming Intelligence Center audit trail logs to a remote Elasticsearch instance.
# Optional pipeline used to read data from Elasticsearch audit index and publish to a Elasticseach index.# To enable this pipeline, drop this file into Logstash pipelines directory. For more details about where# pipeline configuration files live check this out https://www.elastic.co/guide/en/logstash/7.17/dir-layout.html# Configuration:# - Make sure you've set "AUDIT_ES_HOSTS" environment variable. You should set this in the following file# /etc/systemd/system/logstash.service.d/20-eclecticiq-env-vars.confinput { elasticsearch { docinfo => true docinfo_target => "[@metadata][doc_meta]" # "hosts" can be an array of hosts or a single host. hosts => "${ES_HOST}:${ES_PORT}" index => "audit*" query => '{"query": {"bool": {"must_not": [{"match": {"sent_to_es": "true"}}]}}, "sort": {"timestamp": "asc"}}' # run every 10 minutes schedule => "*/10 * * * *" type => audit_es }}filter { # add the input source to @metadata if it's not set yet if ![@metadata][source] { mutate { add_field => {"[@metadata][source]" => "%{[type]}"} } } # To avoid processing the same document multiple times the following strategy is used: # 1. Clone the current event. Then we'll have 2 events instead of 1. # 2. For the clone event (type => es) add a new field "sent_to_es => true" # 3. Drop useless fields added by Logstash from both events (@version, @timestamp, type, tags) # 4. Update the document within ES using the event with "sent_to_es" field. # 5. Send the original event to Kafka. # apply filters for audit events only if [@metadata][source] == "audit_es" { clone { clones => ["es"] ecs_compatibility => disabled } if [type] == "es" { mutate { add_field => {"sent_to_es" => true} } } mutate { add_field => { "[@metadata][target_index]" => "eiq-audit-%{+yyyy-MM}" } remove_field => ["@version", "@timestamp", "type", "tags", "sent_to_kafka"] } }}output { if [@metadata][source] == "audit_es" { if [sent_to_es] == "true" { # update documents in ES, marking it as processed elasticsearch { hosts => "${ES_HOST}:${ES_PORT}" index => "%{[@metadata][doc_meta][_index]}" action => update document_id => "%{[@metadata][doc_meta][_id]}" } } else { elasticsearch { hosts => "${AUDIT_ES_HOSTS}" index => "%{[@metadata][target_index]}" template_overwrite => true } } }}