Skip to main content

Amazon S3

Pull

Synopsis

Amazon S3 device processes files from Amazon S3 buckets using SQS event notifications. This pull-type device consumes S3 event messages from an SQS queue, downloads referenced objects from S3, and processes them through DataStream pipelines. The device supports multiple file formats including JSON, JSONL, Parquet, and compressed archives.

Schema

- id: <numeric>
name: <string>
description: <string>
type: awss3
tags: <string[]>
pipelines: <pipeline[]>
status: <boolean>
properties:
region: <string>
access_key_id: <string>
secret_access_key: <string>
session_token: <string>
queue_url: <string>
role_arn: <string>
timeout: <numeric>
file_name_filter: <string>
max_files_in_archive: <numeric>
max_size_archive_bytes: <numeric>
sqs_max_messages: <numeric>
sqs_visibility_timeout: <numeric>
sqs_wait_time_seconds: <numeric>

Configuration

Device

FieldRequiredDefaultDescription
idY-Unique numeric identifier
nameY-Device name
descriptionN-Optional description of the device's purpose
typeY-Device type identifier (must be awss3)
tagsN-Array of labels for categorization
pipelinesN-Array of preprocessing pipeline references
statusNtrueBoolean flag to enable/disable the device

AWS Connection

FieldRequiredDefaultDescription
regionY-AWS region where S3 bucket and SQS queue are located
queue_urlY-SQS queue URL that receives S3 event notifications
access_key_idN-AWS access key ID for authentication
secret_access_keyN-AWS secret access key for authentication
session_tokenN-AWS session token for temporary credentials
role_arnN-AWS IAM role ARN to assume for cross-account access

File Processing

FieldRequiredDefaultDescription
timeoutN10Polling interval in seconds between SQS queue checks (1-10)
file_name_filterN".*"Regular expression filter for S3 object keys
max_files_in_archiveN100Maximum number of files to process from archive (0 = unlimited)
max_size_archive_bytesN104857600Maximum total size of archive contents in bytes (0 = unlimited, default: 100 MB)

SQS Configuration

FieldRequiredDefaultDescription
sqs_max_messagesN1Maximum messages to receive per poll (1-10)
sqs_visibility_timeoutN30Message visibility timeout in seconds (0-43200)
sqs_wait_time_secondsN20Long polling wait time in seconds (0-20)
Authentication Methods

The device supports multiple authentication methods:

  • IAM Role: Omit credentials to use instance/task IAM role (recommended for EC2/ECS)
  • Access Keys: Provide access_key_id and secret_access_key for static credentials
  • Role Assumption: Provide role_arn to assume a different IAM role
  • Temporary Credentials: Include session_token for temporary security credentials
Secrets Management

Avoid hardcoding access_key_id and secret_access_key in plain text. Prefer IAM roles or reference encrypted secrets (environment variables, vault integrations) supported by DataStream. Rotate credentials regularly and restrict permissions to least privilege.

Details

The Amazon S3 device operates as an event-driven pull-type data source that processes S3 objects based on SQS notifications. The device continuously polls an SQS queue for S3 event messages, downloads the referenced objects, and processes their contents through the telemetry pipeline.

Event Processing Flow: The device receives S3 event notifications from SQS containing bucket name and object key information. For each ObjectCreated event (Put, Post, Copy, CompleteMultipartUpload), the device downloads the S3 object and processes it according to its file type. After successful processing, the SQS message is deleted to prevent reprocessing.

File Format Detection: The device automatically detects file format based on file extension. Supported formats include .json (single JSON object), .jsonl (newline-delimited JSON), .parquet (columnar format), and compressed archives (.gz, .zip, .bz2, .tar). For .log and .txt files, the device performs content auto-detection by examining the first 8KB of data.

Archive Processing: Compressed archives are automatically extracted and processed. The device supports nested archives and applies file name filtering to extracted contents. Archive processing is controlled by max_files_in_archive and max_size_archive_bytes limits to prevent resource exhaustion from maliciously large archives.

Error Handling: The device distinguishes between transient and permanent errors. Transient errors (network issues, throttling) leave the SQS message in the queue for automatic retry. Permanent errors (unsupported format, file name filter mismatch, archive size limits) delete the message to prevent infinite retry loops.

SQS Integration: The device uses SQS long polling to efficiently wait for new messages. The sqs_wait_time_seconds parameter enables long polling to reduce API calls and costs. Message visibility timeout ensures that failed processing attempts don't block other consumers.

Examples

Basic IAM Role Authentication

Configuring Amazon S3 device using IAM role for authentication (recommended for EC2/ECS deployments)...

- id: 1
name: s3-cloudtrail-logs
type: awss3
properties:
region: "us-east-1"
queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/cloudtrail-events"

Device polls SQS queue for S3 events and processes CloudTrail log files using instance IAM role...

{
"eventVersion": "1.08",
"userIdentity": {
"type": "IAMUser",
"userName": "admin"
},
"eventTime": "2024-01-15T10:30:00Z",
"eventSource": "s3.amazonaws.com",
"source_file": "AWSLogs/123456789012/CloudTrail/us-east-1/2024/01/15/log.json"
}

Access Key Authentication

Using static AWS access keys for authentication with file name filtering...

- id: 2
name: s3-application-logs
type: awss3
properties:
region: "us-west-2"
access_key_id: "AKIAIOSFODNN7EXAMPLE"
secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
queue_url: "https://sqs.us-west-2.amazonaws.com/123456789012/app-logs"
file_name_filter: "prod/.*\\.jsonl$"

Device processes only JSONL files from the 'prod/' prefix, filtering out other files...

{
"timestamp": "2024-01-15T10:30:00Z",
"level": "ERROR",
"message": "Database connection timeout",
"source_file": "prod/app-errors-2024-01-15.jsonl"
}

Cross-Account Role Assumption

Assuming an IAM role in a different AWS account for cross-account S3 access...

- id: 3
name: s3-partner-data
type: awss3
properties:
region: "eu-west-1"
role_arn: "arn:aws:iam::987654321098:role/PartnerDataAccess"
queue_url: "https://sqs.eu-west-1.amazonaws.com/987654321098/partner-events"
timeout: 5

Device assumes the specified role to access S3 buckets in the partner account...

{
"partner_id": "partner-123",
"event_type": "transaction",
"amount": 1500.00,
"timestamp": "2024-01-15T10:30:00Z",
"source_file": "partner-data/transactions-2024-01-15.json"
}

High-Volume Processing

Optimizing for high-volume S3 data processing with batch message retrieval...

- id: 4
name: s3-high-volume
type: awss3
properties:
region: "us-east-1"
queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/bulk-data"
timeout: 1
sqs_max_messages: 10
sqs_visibility_timeout: 300
sqs_wait_time_seconds: 20

Device retrieves up to 10 messages per poll with 5-minute visibility timeout for efficient high-volume processing...

{
"record_id": "rec_001",
"metric_value": 42.5,
"timestamp": "2024-01-15T10:30:00Z",
"processing_info": {
"batch_size": 10,
"poll_interval": 1
}
}

Parquet File Processing

Processing Parquet files from S3 for analytics data ingestion...

- id: 5
name: s3-analytics-parquet
type: awss3
properties:
region: "us-east-1"
queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/analytics-events"
file_name_filter: ".*\\.parquet$"

Device processes only Parquet files using columnar format for efficient large-dataset handling...

{
"user_id": "user_12345",
"page_views": 42,
"session_duration": 1800,
"timestamp": "2024-01-15T10:30:00Z",
"source_file": "analytics/user-behavior-2024-01-15.parquet"
}

Archive Processing with Limits

Processing compressed archives with safety limits to prevent resource exhaustion...

- id: 6
name: s3-compressed-logs
type: awss3
properties:
region: "us-east-1"
queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/log-archives"
file_name_filter: ".*\\.(gz|zip)$"
max_files_in_archive: 50
max_size_archive_bytes: 52428800

Device extracts and processes compressed archives, limiting to 50 files and 50 MB total size...

{
"log_entry": "Application started successfully",
"timestamp": "2024-01-15T10:30:00Z",
"source_file": "logs/app-2024-01-15.log.gz",
"archive_info": {
"total_files": 12,
"total_size_bytes": 8388608
}
}

Pipeline Integration

Integrating S3 device with preprocessing pipelines for data transformation...

- id: 7
name: s3-with-pipeline
type: awss3
tags:
- "aws"
- "s3"
pipelines:
- timestamp-normalization
- field-enrichment
properties:
region: "us-east-1"
queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/raw-data"

S3 data is processed through normalization and enrichment pipelines before routing to targets...

{
"timestamp": "2024-01-15T10:30:00.000Z",
"event_type": "user_login",
"user_id": "user_456",
"enriched_data": {
"normalized_timestamp": "2024-01-15T10:30:00Z",
"event_category": "authentication",
"source_bucket": "raw-data"
}
}