Elastic
Synopsis
Creates an HTTP listener that emulates the Elasticsearch Bulk API, allowing Elastic Beats and other Elasticsearch-compatible shippers to send data to DataStream without reconfiguration.
Schema
- id: <numeric>
name: <string>
description: <string>
type: elasticsearch
tags: <string[]>
pipelines: <pipeline[]>
status: <boolean>
properties:
port: <numeric>
address: <string>
api_version: <string>
max_body_size: <numeric>
authentication:
type: <string>
username: <string>
password: <string>
tokens: <string[]>
reuse: <boolean>
workers: <numeric>
tls:
status: <boolean>
cert_name: <string>
key_name: <string>
Configuration
The following fields are used to define the device:
Device
| Field | Required | Default | Description |
|---|---|---|---|
id | Y | Unique identifier | |
name | Y | Device name | |
description | N | - | Optional description |
type | Y | Must be elasticsearch | |
tags | N | - | Optional tags |
pipelines | N | - | Optional pre-processor pipelines |
status | N | true | Enable/disable the device |
Connection
| Field | Required | Default | Description |
|---|---|---|---|
port | Y | TCP port to listen on | |
address | N | "0.0.0.0" | Network address to bind |
api_version | N | "8.3.2" | Elasticsearch version string returned in cluster info responses |
max_body_size | N | 26214400 | Maximum request body size in bytes after decompression (default 25 MB) |
Authentication
| Field | Required | Default | Description |
|---|---|---|---|
authentication.type | N | "none" | Authentication mode: none, basic, or bearer |
authentication.username | Y* | Username for Basic authentication | |
authentication.password | Y* | Password for Basic authentication | |
tokens | Y* | Array of accepted bearer tokens |
* = Conditionally required based on authentication.type (see Details)
TLS
| Field | Required | Default | Description |
|---|---|---|---|
tls.status | N | false | Enable TLS encryption |
tls.cert_name | Y* | TLS certificate file name | |
tls.key_name | Y* | TLS private key file name |
* = Required when tls.status is true
TLS certificate and key files must be placed in the service root directory.
Performance
| Field | Required | Default | Description |
|---|---|---|---|
reuse | N | true | Enable multi-worker mode |
workers | N | NumCPU | Number of listener workers when reuse is enabled |
Details
The device listens on TCP and exposes an HTTP endpoint that responds to the same API paths used by Elasticsearch. On startup, Beats clients probe several endpoints (GET /, GET /_cluster/health, GET /_nodes, index template paths, ILM policy paths) to verify compatibility. The device acknowledges all probes with well-formed responses so Beats proceed to data ingestion without modification.
The primary ingest path is the Bulk API, available at POST /_bulk and POST /<index>/_bulk. The device parses the NDJSON bulk format (alternating action/source line pairs) and forwards each document to the DataStream pipeline. Action metadata fields (_index, _id, _pipeline, _routing) are injected into the document before forwarding. The delete action is acknowledged in the response but no event is ingested. The update action requires a doc field; scripts, upsert, and scripted_upsert are not supported and are rejected with a per-item error.
The device accepts gzip-compressed request bodies using Content-Encoding: gzip. The body size limit (max_body_size) is enforced after decompression to prevent gzip-bomb attacks. Requests exceeding the limit receive HTTP 413.
The api_version field controls the version number returned in the cluster info response at GET /. Beats use this value to determine API compatibility. The default value 8.3.2 is sufficient for current Beats versions. Change this value only if a specific shipper version requires a different response.
Elasticsearch ingest pipelines can be registered via PUT /_ingest/pipeline/<id> and retrieved via GET /_ingest/pipeline/<id>. Registered pipelines are stored in memory and are not executed during ingestion. The pipeline name is preserved as _pipeline in the forwarded event for downstream processing. The pipeline cache does not survive a collector restart.
Authentication modes:
none: All requests pass through without credential checks. This is the default.basic: Enforces HTTP Basic authentication. TheAuthorization: Basic <base64(user:pass)>header is validated againstauthentication.usernameandauthentication.password. AWWW-Authenticate: Basic realm="Elasticsearch"header is included in all 401 responses.bearer: Validates theAuthorization: Bearer <token>header against thetokenslist. Elastic Beats also send tokens as bare values without theBearerscheme prefix; both formats are accepted. Configuringbearerwith an emptytokenslist denies all requests. Usenonefor explicit open access.
Credential changes require a full collector restart to take effect. The authentication middleware builds its validation structures once at startup.
Worker count is capped at the platform's maximum supported socket count. When reuse is disabled, a single worker handles all connections.
Examples
The following are commonly used configuration types.
Basic
The minimum required configuration starts the Elasticsearch endpoint:
Creating a basic Elastic device on port 9200... | |
Basic Authentication
Beats can authenticate with a username and password:
Requiring Basic auth credentials from all senders... | |
Bearer Token Authentication
Multiple tokens can be accepted simultaneously:
Validating requests against a list of bearer tokens... | |
TLS
Connections can be encrypted for secure transport:
Enabling TLS encryption on the listener... | |
High-Volume
Worker count can be increased for high-throughput environments:
Scaling listener workers for high-volume ingestion... | |
When reuse is enabled, the actual worker count is capped at the platform's maximum supported socket count.