Stream audit logs to Splunk#
The following is a sample Logstash pipeline for streaming EclecticIQ Intelligence Center audit trail logs to Splunk.
Important
Logstash pipelines are not available for EclecticIQ Hosted environments. To set up audit trail log streaming for your Hosted environment, contact support or customer success.
# Optional pipeline used to read data from Elasticsearch audit index and publish to Splunk.
# To enable this pipeline, drop this file into Logstash pipelines directory. For more details about where
# pipeline configuration files live check this out https://www.elastic.co/guide/en/logstash/7.17/dir-layout.html
# Configuration:
# Make sure you've set the following environment variable:
# - SPLUNK_HEC_ENDPOINT (required): URL of Splunk HEC endpoint.
# - SPLUNK_HEC_TOKEN (required): Splunk HEC token used to Authenticate requests.
# For further details about Splunk HEC configuration check
# this out https://docs.splunk.com/Documentation/Splunk/8.2.6/Data/UsetheHTTPEventCollector
# You should set this in the following file
# /etc/systemd/system/logstash.service.d/20-eclecticiq-env-vars.conf
input {
elasticsearch {
docinfo => true
docinfo_target => "[@metadata][doc_meta]"
# "hosts" can be an array of hosts or a single host.
hosts => "${ES_HOST}:${ES_PORT}"
index => "audit*"
query => '{"query": {"bool": {"must_not": [{"match": {"sent_to_splunk": "true"}}]}}, "sort": {"timestamp": "asc"}}'
# run every 10 minutes
schedule => "*/10 * * * *"
type => audit_splunk
}
}
filter {
# add the input source to @metadata if it's not set yet
if ![@metadata][source] {
mutate {
add_field => {"[@metadata][source]" => "%{[type]}"}
}
}
# To avoid processing the same document multiple times the following strategy is used:
# 1. Clone the current event. Then we'll have 2 events instead of 1.
# 2. For the clone event (type => es) add a new field "sent_to_splunk => true"
# 3. Drop useless fields added by Logstash from both events (@version, @timestamp, type, tags)
# 4. Update the document within ES using the event with "sent_to_splunk" field.
# 5. Send the original event to Kafka.
# apply filters for audit events only
if [@metadata][source] == "audit_splunk" {
clone {
clones => ["es"]
ecs_compatibility => disabled
}
mutate {
remove_field => ["@version", "@timestamp", "tags", "sent_to_es", "sent_to_kafka"]
}
if [type] == "es" {
mutate {
remove_field => ["type"]
add_field => {"sent_to_splunk" => true}
}
}
else {
mutate {
remove_field => ["type"]
}
ruby {
# Splunk HEC requires that events are sent under a certain format.
# Then this filter is moving the whole document to @metadata field
# as well as converting 'timestamp' into an epoch in order to allow
# us to build the payload on the right way before send it to Splunk.
code => '
require "date"
event.set("[@metadata][splunk_event]", event.to_hash)
event.set("[@metadata][splunk_time]", DateTime.parse(event.get("timestamp")).to_time.to_i)
'
}
}
}
}
output {
if [@metadata][source] == "audit_splunk" {
if [sent_to_splunk] == "true" {
# update documents in ES, marking it as processed
elasticsearch {
hosts => "${ES_HOST}:${ES_PORT}"
index => "%{[@metadata][doc_meta][_index]}"
action => update
document_id => "%{[@metadata][doc_meta][_id]}"
}
}
else {
http {
http_method => "post"
url => "${SPLUNK_HEC_ENDPOINT}"
headers => ["Authorization", "Splunk ${SPLUNK_HEC_TOKEN}"]
format => json_batch
mapping => {
event => "%{[@metadata][splunk_event]}"
time => "%{[@metadata][splunk_time]}"
}
}
}
}
}