Stream audit logs to Splunk#

The following is a sample Logstash pipeline for streaming Intelligence Center audit trail logs to Splunk.

# Optional pipeline used to read data from Elasticsearch audit index and publish to Splunk.
# To enable this pipeline, drop this file into Logstash pipelines directory. For more details about where
# pipeline configuration files live check this out https://www.elastic.co/guide/en/logstash/7.17/dir-layout.html

# Configuration:

# Make sure you've set the following environment variable:
# - SPLUNK_HEC_ENDPOINT (required): URL of Splunk HEC endpoint. 
# - SPLUNK_HEC_TOKEN (required): Splunk HEC token used to Authenticate requests.

# For further details about Splunk HEC configuration check
# this out https://docs.splunk.com/Documentation/Splunk/8.2.6/Data/UsetheHTTPEventCollector

# You should set this in the following file
# /etc/systemd/system/logstash.service.d/20-eclecticiq-env-vars.conf

input {
    elasticsearch {
        docinfo => true
        docinfo_target => "[@metadata][doc_meta]"
        # "hosts" can be an array of hosts or a single host.
        hosts => "${ES_HOST}:${ES_PORT}"
        index => "audit*"
        query => '{"query": {"bool": {"must_not": [{"match": {"sent_to_splunk": "true"}}]}}, "sort": {"timestamp": "asc"}}'
        # run every 10 minutes
        schedule => "*/10 * * * *"
        type => audit_splunk
    }
}

filter {
    # add the input source to @metadata if it's not set yet
    if ![@metadata][source] {
        mutate {
            add_field => {"[@metadata][source]" => "%{[type]}"}
        }
    }

    # To avoid processing the same document multiple times the following strategy is used:
    # 1. Clone the current event. Then we'll have 2 events instead of 1.
    # 2. For the clone event (type => es) add a new field "sent_to_splunk => true"
    # 3. Drop useless fields added by Logstash from both events (@version, @timestamp, type, tags)
    # 4. Update the document within ES using the event with "sent_to_splunk" field.
    # 5. Send the original event to Kafka.

    # apply filters for audit events only
    if [@metadata][source] == "audit_splunk" {
        clone {
            clones => ["es"]
            ecs_compatibility => disabled
        }

        mutate {
            remove_field => ["@version", "@timestamp", "tags", "sent_to_es", "sent_to_kafka"]
        }

        if [type] == "es" {
            mutate {
                remove_field => ["type"]
                add_field => {"sent_to_splunk" => true}
            }
        }
        else {
            mutate {
                remove_field => ["type"]
            }
            ruby {
                # Splunk HEC requires that events are sent under a certain format.
                # Then this filter is moving the whole document to @metadata field
                # as well as converting 'timestamp' into an epoch in order to allow 
                # us to build the payload on the right way before send it to Splunk.
                code => '
                require "date"

                event.set("[@metadata][splunk_event]", event.to_hash)
                event.set("[@metadata][splunk_time]", DateTime.parse(event.get("timestamp")).to_time.to_i)
                '
            }
        }
    }
}

output {
    if [@metadata][source] == "audit_splunk" {
        if [sent_to_splunk] == "true" {
            # update documents in ES, marking it as processed
            elasticsearch {
                hosts => "${ES_HOST}:${ES_PORT}"
                index => "%{[@metadata][doc_meta][_index]}"
                action => update
                document_id => "%{[@metadata][doc_meta][_id]}"
            }
        }
        else {
            http {
                http_method => "post"
                url => "${SPLUNK_HEC_ENDPOINT}"
                headers => ["Authorization", "Splunk ${SPLUNK_HEC_TOKEN}"]
                format => json_batch
                mapping => {
                    event => "%{[@metadata][splunk_event]}"
                    time => "%{[@metadata][splunk_time]}"
                }
            }
        }
    }
}