Ingestion for Sysadmins CentOS#

A technical reference overview of the backend components of the platform responsible for ingesting incoming data, processing it, and storing it in the main data store, the indexing and search database, and the graph database.

Ingestion#

Step

Service or process

Actions

1

Incoming feed workers

Each incoming feed worker:

  • Listens to the queue:ingestion:inbound Redis queue.

  • Polls an external API or service to download data packages.

2

Incoming feed workers

Each incoming feed worker:

  • Stores the downloaded packages to PostgreSQL.

3

Ingestion workers

Each incoming feed worker:

  • Looks up the PostgreSQL blob (package) table.

  • Picks a package for processing.

  • Writes data to the PostgreSQL entity and extract (observable ) tables.

  • Updates the PostgreSQL blob (package) table.

4

Ingestion workers

The default action to execute is synchronous.

The failover action is asynchronous.

Each ingestion worker:

  • Attempts syncing data between Elasticsearch and PostgreSQL (synchronously).

  • If the action fails, search indexer workers listen to a failover Redis queue to retry data syncing (asynchronously).

5

Ingestion workers

The default action to execute is synchronous.

The failover action is asynchronous.

Each ingestion worker:

  • Attempts syncing data between Neo4j and PostgreSQL (synchronously).

  • If the action fails, graph indexer workers listen to a failover Redis queue to retry data syncing (asynchronously).

6

Graph ingestion worker

  • Stores the temporary CSV data that eclecticiq-neo4jbatcher preprocesses for further ingestion in Neo4j to a location that both eclecticiq-neo4jbatcher and Neo4j can access.

  • Sends a query to Neo4j to initiate CSV data loading to Neo4j.

7

Neo4j

Executes the query and loads the CSV data for graph ingestion and indexing.

Query example:

LOAD CSV WITH HEADERS FROM 'file://{file}' AS line
MERGE (entity:IntelEntity {{id:line.entity}})
...

Core components#

Tip

For more information about the eiq-platform command line tool, see eiq-platform command line.

Component

Description

platform-api

platform-api is the main Python REST API that enables communications with the platform and its components. It is based on the Flask web application framework.

There is only one running instance of the platform-api process.

The instance integrates the Gunicorn web server gateway interface to exchange data with the Nginx web server, which acts as a proxy, through port 8008.

platform-api exchanges data with PostgreSQL, Neo4j, Elasticsearch, and Redis.

intel-ingestion

eclecticiq-platform-backend-ingestion drives the ingestion processing pipeline.

The default configuration spawns 4 eclecticiq-platform-backend-ingestion workers, corresponding to the number of internal threads managing concurrent user requests.

eclecticiq-platform-backend-ingestion exchanges data with PostgreSQL, Neo4j, Elasticsearch, and Redis.

Data traffic depends on the amount of incoming packages that are queued up for processing.

It is possible to increase and decrease the amount of concurrent workers using systemctl commands.

For example, to decrease the default active workers from 4 to to 2:

systemctl disable eclecticiq-platform-backend-ingestion@{3,4}
systemctl stop eclecticiq-platform-backend-ingestion@{3,4}

To increase the default active workers from 4 to 6:

systemctl enable eclecticiq-platform-backend-ingestion@{1..6}
systemctl start eclecticiq-platform-backend-ingestion@{1..6}

To restart the default workers:

systemctl restart eclecticiq-platform-backend-ingestion

To run the command manually:

eiq-platform ingestion run

Redis acts as a message broker:

  • It listens for new incoming packages.

  • It adds new incoming packages to the queue:ingestion:inbound for further processing.

graph-ingestion

eclecticiq-platform-backend-graphindex drives data ingestion and indexing to the graph database.

There is one running instance of eclecticiq-platform-backend-graphindex.

It exchanges data with Neo4j and Redis.

To restart the worker:

systemctl restart eclecticiq-platform-backend-graphindex

To run the command manually:

eiq-platform graph run-indexer

Redis acts as a message broker:

  • It listens for new incoming packages.

  • It adds new incoming packages to the queue:graph:inbound for further processing.

search-ingestion

eclecticiq-platform-backend-searchindex drives data indexing to the indexing and search database.

There is one running instance of eclecticiq-platform-backend-searchindex.

It exchanges data with Elasticsearch and Redis.

To restart the worker:

systemctl restart eclecticiq-platform-backend-searchindex

To run the command manually:

eiq-platform search run-indexer

Redis acts as a message broker:

  • It listens for new incoming packages.

  • It adds new incoming packages to the queue:search:inbound for further processing.

Celery workers

There are several Celery workers running concurrently.

They execute tasks related to processes such as ingestion, dissemination, and discovery.

They also manage execution priority for rules, data retention policies, and enrichers.

eclecticiq-platform-backend-workers.service (sourced from EIQ platform-backend)

Wants=eclecticiq-secrets-setter.service eclecticiq-platform-backend-scheduler.service \
      eclecticiq-platform-backend-worker@discovery-priority.service \
      eclecticiq-platform-backend-worker@discovery.service \
      eclecticiq-platform-backend-worker@enrichers-priority.service \
      eclecticiq-platform-backend-worker@enrichers.service \
      eclecticiq-platform-backend-worker@entity-rules-priority.service \
      eclecticiq-platform-backend-worker@extract-rules-priority.service \
      eclecticiq-platform-backend-worker@incoming-transports-priority.service \
      eclecticiq-platform-backend-worker@incoming-transports.service \
      eclecticiq-platform-backend-worker@outgoing-feeds-priority.service \
      eclecticiq-platform-backend-worker@outgoing-feeds.service \
      eclecticiq-platform-backend-worker@outgoing-transports-priority.service \
      eclecticiq-platform-backend-worker@outgoing-transports.service \
      eclecticiq-platform-backend-worker@reindexing.service \
      eclecticiq-platform-backend-worker@retention-policies-priority.service \
      eclecticiq-platform-backend-worker@retention-policies.service \
      eclecticiq-platform-backend-worker@synchronization.service \
      eclecticiq-platform-backend-worker@utilities-priority.service \
      eclecticiq-platform-backend-worker@utilities.service

Celery queues manage workload and task distribution for the workers:

settings.py (sourced from EIQ platform-backend)

CELERY_TASK_QUEUES = [
    {"name": "enrichers", "routing_key": "eiq.enrichers.#"},
    {"name": "enrichers-priority", "routing_key": "priority.enrichers.#"},
    {"name": "incoming-transports", "routing_key": "eiq.incoming-transports.#"},
    {
        "name": "incoming-transports-priority",
        "routing_key": "priority.incoming-transports.#",
    },
    {"name": "outgoing-transports", "routing_key": "eiq.outgoing-transports.#"},
    {
        "name": "outgoing-transports-priority",
        "routing_key": "priority.outgoing-transports.#",
    },
    {"name": "outgoing-feeds", "routing_key": "eiq.outgoing-feeds.#"},
    {"name": "outgoing-feeds-priority", "routing_key": "priority.outgoing-feeds.#"},
    {"name": "utilities", "routing_key": "eiq.utilities.#"},
    {"name": "utilities-priority", "routing_key": "priority.utilities.#"},
    {"name": "discovery", "routing_key": "eiq.discovery.#"},
    {"name": "discovery-priority", "routing_key": "priority.discovery.#"},
    {"name": "entity-rules-priority", "routing_key": "priority.entity-rules.#"},
    {"name": "extract-rules-priority", "routing_key": "priority.extract-rules.#"},
    {"name": "reindexing", "routing_key": "eiq.reindexing.#"},
    {"name": "retention-policies", "routing_key": "eiq.retention-policies.#"},
    {"name": "synchronization", "routing_key": "eiq.synchronization.#"},
    {
        "name": "retention-policies-priority",
        "routing_key": "priority.retention-policies.#",
    },
]

Core dependencies#

  • OpenTaxii

  • Nginx

  • Redis

  • PostgreSQL

  • Neo4J

  • Elasticsearch

  • Logstash

  • Kibana