Skip to content

Pattern: SQS Event Pipeline

Route EventBridge events through an SQS queue into a Lambda handler that gets typed, validated records — with automatic retry on failure.

The Pattern

typescript
// mantle.config.ts — declare the queue and EventBridge routing
export default defineConfig({
  eventbridge: {
    bus: 'MediaDownloader',
    sqsTargets: [
      {
        detailType: 'DownloadRequested',
        queue: 'DownloadQueue',
        inputTransformer: {
          inputPaths: {
            fileId: '$.detail.fileId',
            sourceUrl: '$.detail.sourceUrl',
            correlationId: '$.detail.correlationId',
          },
          inputTemplate: `{"fileId": <fileId>, "sourceUrl": <sourceUrl>, "correlationId": <correlationId>}`
        }
      }
    ]
  },
  queues: [
    {name: 'DownloadQueue', visibilityTimeoutSeconds: 900}
  ],
})

// src/lambdas/sqs/StartFileUpload/index.ts — consume the queue
import { defineSqsHandler } from '@mantleframework/core'
import { validateSchema } from '@mantleframework/validation'

const sqs = defineSqsHandler({
  operationName: 'StartFileUpload',
  parseBody: true,
  timeout: 900,
  memorySize: 2048,
  queue: 'DownloadQueue',
})

export const handler = sqs(async (record) => {
  const receiveCount = parseInt(record.attributes?.ApproximateReceiveCount ?? '1', 10)

  const validationResult = validateSchema(downloadQueueMessageSchema, record.body)
  if (!validationResult.success) {
    // Discard malformed messages — they will never succeed on retry
    logError('Invalid SQS message format - discarding', { messageId: record.messageId })
    return
  }

  const message = validationResult.data as ValidatedDownloadQueueMessage
  await processDownloadRequest(message, receiveCount)
  // Throw to trigger SQS retry; return to remove message from queue
})

How It Works

EventBridge matches events by detailType and delivers them to an SQS queue via a target rule. The inputTransformer reshapes the event payload before it lands in the queue, so the Lambda receives a clean JSON body rather than the raw EventBridge envelope.

defineSqsHandler wires the Lambda to the queue and exposes individual record objects to the handler — one invocation per batch record, not per batch. Returning normally removes the record from the queue. Throwing causes SQS to retry the record after the visibilityTimeoutSeconds delay, up to the queue's maxReceiveCount.

Malformed messages that fail schema validation are discarded immediately (return, not throw) because they will never succeed no matter how many times SQS retries them.

Real-World Usage

Source: aws-cloudformation-media-downloader/src/lambdas/sqs/StartFileUpload/index.ts

The media-downloader uses this pattern to decouple the API layer (which emits DownloadRequested events) from the download worker (which streams video to S3). The queue buffers requests and absorbs the Lambda's 15-minute execution limit without blocking the API response.

A second SQS consumer — SendPushNotification — uses the same pattern with parseBody: false and reads metadata from record.messageAttributes instead:

typescript
// src/lambdas/sqs/SendPushNotification/index.ts
const sqs = defineSqsHandler<string>({
  operationName: 'SendPushNotification',
  parseBody: false,
  queue: 'SendPushNotification',
})

export const handler = sqs(async (record) => {
  const rawAttributes = {
    notificationType: record.messageAttributes.notificationType?.stringValue,
    userId: record.messageAttributes.userId?.stringValue,
  }
  // validate, then fan-out to all registered devices
})

Configuration

typescript
// mantle.config.ts
queues: [
  {name: 'DownloadQueue', visibilityTimeoutSeconds: 900},
  {name: 'SendPushNotification'},
]

visibilityTimeoutSeconds must be at least as long as the Lambda timeout to prevent duplicate processing during a slow execution. When a Lambda uses a non-conventional env var name for a queue (e.g., SNS_QUEUE_URL instead of SEND_PUSH_NOTIFICATION_QUEUE_URL), use bind in defineLambda to map the env var to the queue name:

typescript
defineLambda({bind: {SNS_QUEUE_URL: 'SendPushNotification'}})

Variations

Body parsing off (parseBody: false): Use when the message body is not JSON or when all routing data lives in messageAttributes. The raw string body is available on record.body.

EventBridge → SQS (no direct trigger): The sqsTargets config in mantle.config.ts creates the EventBridge rule and target automatically. For queues that receive messages directly from application code (not EventBridge), omit sqsTargets and use sendMessage() from @mantleframework/aws.

Retry vs. discard: Throw to retry (SQS visibility timeout); return to discard. Validate messages at the top of the handler and discard malformed ones immediately to avoid burning retries on unrecoverable input.