Google Cloud Pub/Sub
Synopsis
Creates a subscriber that connects to a Google Cloud Pub/Sub subscription and consumes messages using concurrent goroutine receivers. Supports Application Default Credentials and explicit service account credentials.
Schema
- id: <numeric>
name: <string>
description: <string>
type: pubsub
tags: <string[]>
pipelines: <pipeline[]>
status: <boolean>
properties:
project_id: <string>
subscription_id: <string>
credentials_json: <string>
workers: <numeric>
max_outstanding_messages: <numeric>
max_outstanding_bytes: <numeric>
Configuration
The following fields are used to define the device:
Device
| Field | Required | Default | Description |
|---|---|---|---|
id | Y | Unique identifier | |
name | Y | Device name | |
description | N | - | Optional description |
type | Y | Must be pubsub | |
tags | N | - | Optional tags |
pipelines | N | - | Optional pre-processor pipelines |
status | N | true | Enable/disable the device |
Connection
| Field | Required | Default | Description |
|---|---|---|---|
project_id | Y | Google Cloud project ID | |
subscription_id | Y | Pub/Sub subscription name | |
credentials_json | N | - | Service account credentials JSON. When omitted, Application Default Credentials are used |
Performance
| Field | Required | Default | Description |
|---|---|---|---|
workers | N | 0 | Number of goroutines for concurrent message processing. 0 defaults to GOMAXPROCS |
max_outstanding_messages | N | 1000 | Maximum number of unprocessed messages held in memory at once |
max_outstanding_bytes | N | 1000000000 | Maximum total byte size of unprocessed messages held in memory at once (1 GB) |
Details
The device connects to a single Pub/Sub subscription and opens a long-lived streaming pull connection. Message delivery uses the Google Cloud Pub/Sub SDK's Receive method, which dispatches the callback concurrently from multiple goroutines controlled by the workers setting. A mutex serializes all writes to the internal stream writers, so concurrent callbacks do not overwrite each other's data.
Messages are acknowledged after a successful write. If the write fails, the message is negatively acknowledged and redelivered. Empty payloads are acknowledged immediately without processing to prevent infinite redelivery loops.
The workers field controls the number of concurrent goroutine receivers within a single Receive call. This is the correct scaling mechanism for Pub/Sub throughput: multiple Receive calls on the same subscription compete for the same messages rather than parallelizing work, whereas increasing workers lets a single subscriber drain faster.
The max_outstanding_messages and max_outstanding_bytes limits cap how many messages are held in memory concurrently, bounding memory consumption under burst conditions. Setting these values too high can cause out-of-memory errors during traffic spikes.
When credentials_json is omitted, the SDK resolves credentials using Application Default Credentials, which includes Workload Identity Federation on GKE and other Google-managed environments. When running outside Google Cloud, a service account key JSON must be supplied explicitly.
The subscription must already exist in the specified project. The device validates both project_id and subscription_id at startup and will not start if either is missing or empty. The connection state is set to connected once the consumer is successfully created.
Examples
The following are commonly used configuration types.
Basic
The minimum required configuration creates the subscriber:
Creating a basic Pub/Sub subscriber using Application Default Credentials... | |
Service Account
Explicit credentials can be provided for environments without Application Default Credentials:
Connecting with an explicit service account key... | |
High-Volume
Throughput can be increased for high message volumes by tuning goroutine count and memory limits:
Optimizing for high-throughput message consumption... | |
Setting max_outstanding_bytes above available memory can cause out-of-memory errors during traffic bursts.
Pipeline Processing
Messages can be preprocessed before ingestion:
Applying custom processing to Pub/Sub messages... | |
Pipelines are processed sequentially and can modify or drop messages before ingestion.
Memory-Constrained
Outstanding limits can be reduced to control memory consumption in constrained environments:
Limiting memory usage in resource-constrained deployments... | |