Stream audit logs to Kafka#

The following is a sample Logstash pipeline for streaming EclecticIQ Intelligence Center audit trail logs to Kafka.

Important

Logstash pipelines are not available for EclecticIQ Hosted environments. To set up audit trail log streaming for your Hosted environment, contact support or customer success.

# Optional pipeline used to read data from Elasticsearch audit index and publish to a Kafka topic.
# To enable this pipeline, drop this file into Logstash pipelines directory. For more details about where
# pipeline configuration files live check this out https://www.elastic.co/guide/en/logstash/7.17/dir-layout.html

# Configuration:
# - Make sure you've set "KAFKA_HOSTS" environment variable. You should set this in the following file
# /etc/systemd/system/logstash.service.d/20-eclecticiq-env-vars.conf

input {
    elasticsearch {
        docinfo => true
        docinfo_target => "[@metadata][doc_meta]"
        # "hosts" can be an array of hosts or a single host.
        hosts => "${ES_HOST}:${ES_PORT}"
        index => "audit*"
        query => '{"query": {"bool": {"must_not": [{"match": {"sent_to_kafka": "true"}}]}}, "sort": {"timestamp": "asc"}}'
        # run every 10 minutes
        schedule => "*/10 * * * *"
        type => audit_kafka
    }
}

filter {
    # add the input source to @metadata if it's not set yet
    if ![@metadata][source] {
        mutate {
            add_field => {"[@metadata][source]" => "%{[type]}"}
        }
    }

    # To avoid processing the same document multiple times the following strategy is used:
    # 1. Clone the current event. Then we'll have 2 events instead of 1.
    # 2. For the clone event (type => es) add a new field "sent_to_kafka => true"
    # 3. Drop useless fields added by Logstash from both events (@version, @timestamp, type, tags)
    # 4. Update the document within ES using the event with "sent_to_kafka" field.
    # 5. Send the original event to Kafka.

    # apply filters for audit events only
    if [@metadata][source] == "audit_kafka" {
        clone {
            clones => ["es"]
            ecs_compatibility => disabled
        }
        if [type] == "es" {
            mutate {
                add_field => {"sent_to_kafka" => true}
            }
        }
        mutate {
            remove_field => ["@version", "@timestamp", "type", "tags", "sent_to_es", "sent_to_splunk"]
        }
    }
}

output {
    if [@metadata][source] == "audit_kafka" {
        if [sent_to_kafka] == "true" {
            # update documents in ES, marking it as processed
            elasticsearch {
                hosts => "${ES_HOST}:${ES_PORT}"
                index => "%{[@metadata][doc_meta][_index]}"
                action => update
                document_id => "%{[@metadata][doc_meta][_id]}"
            }
        }
        else {
            kafka {
                codec => json
                topic_id => "eclecticiq_audit"
                # bootstrap_servers is a comma-separated list of hosts.
                # e.g.: host1:port1,host2:port2
                bootstrap_servers => "${KAFKA_HOSTS}"
            }
        }
    }
}