Skip to main content

Pipeline

Control Flow Elastic Compatible

Synopsis

Executes another pipeline by name, allowing for pipeline reuse and modular configurations.

Schema

- pipeline:
name: <string>
description: <text>
if: <script>
ignore_failure: <boolean>
ignore_missing_pipeline: <boolean>
on_failure: <processor[]>
on_success: <processor[]>
tag: <string>

Configuration

The following fields are used to define the processor:

FieldRequiredDefaultDescription
nameY-Name or reference of the pipeline to execute
descriptionN-Explanatory note
ifN-Condition to run
ignore_failureNfalseSee Handling Failures
ignore_missing_pipelineNfalseIf true, silently continue when referenced pipeline is not found
on_failureN-See Handling Failures
on_successN-See Handling Success
tagN-Identifier

Details

The pipeline processor can reference other pipelines using the syntax {{ IngestPipeline "pipeline-name" }}. The names can be specified with or without the .yml/.yaml extension.

warning

As pipeline references are resolved at runtime, make sure all referenced pipelines exist in your configuration, or set ignore_missing_pipeline to true if they are optional.

note

The processor detects circular pipeline references at runtime. If a pipeline cycle is detected, the processor returns an error identifying the pipeline that caused the cycle.

Examples

Basic

Another pipeline to be executed...

pipelines:
- name: main
processors:
- pipeline:
name: '{{ IngestPipeline "parse-logs" }}'

must be defined in the configuration:

pipelines:
- name: parse-logs
processors:
- grok:
field: message
pattern: '%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}'

Conditionals

Executing the pipeline based on criteria...

  - pipeline:
name: '{{ IngestPipeline "enrich-data" }}'
if: 'ctx.level == "ERROR"'
ignore_failure: true

helps control the flow:

pipelines:
- name: enrich-data
processors:
- append:
field: tags
value: needs-review

Error Handling

Handle missing pipelines...

- pipeline:
name: '{{ IngestPipeline "optional-pipeline" }}'
ignore_missing_pipeline: true
on_failure:
- append:
field: tags
value: pipeline-failed

and specify fallback actions:

pipelines:
- name: main
processors:
- append:
field: status
value: processed