Message Queue
Overview
The Raindrop framework provides a robust message queue system that enables asynchronous communication between services and actors. Message queues allow you to decouple components, handle workload bursts, and build resilient distributed systems that can process tasks reliably over time.
Message queues in Raindrop support multiple content types, batch operations, and flexible delivery options including delayed processing. The queue system automatically handles message persistence, delivery guarantees, and retry mechanisms, allowing you to focus on your business logic rather than infrastructure concerns.
Whether you’re processing user uploads, sending notifications, or coordinating complex workflows, Raindrop’s message queues provide the reliability and scalability needed for production applications. The queue interface is designed to be intuitive while offering powerful features for message handling and error recovery.
Prerequisites
- Basic understanding of asynchronous programming concepts
- Raindrop framework installed in your project
- Familiarity with TypeScript and async/await patterns
- Understanding of message-driven architecture patterns
Configuration
To add a message queue to your Raindrop project, you need both a queue definition and at least one observer to consume messages:
- Define the queue and observer in your application manifest
- Run
raindrop build generate
to create the necessary files
application "demo-app" { queue "notification-queue" {}
observer "notification-processor" { source { queue = "notification-queue" batch_size = 10 } }}
Once deployed, Raindrop automatically:
- Creates your queue instance with persistent storage
- Creates the observer to consume messages from the queue
- Sets up all required bindings and routing between services, queues, and observers
- Configures message delivery and retry mechanisms
Access
Message queues are made available to your services and actors through environment variables. The queue name from your manifest is converted to an uppercase environment variable with underscores replacing dashes.
For example, if your manifest defines a queue as queue "notification-queue"
, you would access it in your code as env.NOTIFICATION_QUEUE
.
export default class extends Service<Env> { async fetch(request: Request, env: Env): Promise<Response> { // Send a notification message to the queue await env.NOTIFICATION_QUEUE.send({ userId: 123, message: "Welcome to our platform!" }); return new Response("Notification queued"); }}
Core Interfaces
The message queue system provides these TypeScript interfaces for type-safe queue operations:
/** * Represents a message queue that can send individual messages or batches */interface Queue<Body = unknown> { /** Sends a single message to the queue */ send(message: Body, options?: QueueSendOptions): Promise<void>; /** Sends multiple messages to the queue as a batch */ sendBatch(messages: Iterable<MessageSendRequest<Body>>, options?: QueueSendBatchOptions): Promise<void>;}
/** * Represents a message received from the queue */interface Message<Body = unknown> { /** Unique identifier for the message */ readonly id: string; /** Timestamp when the message was sent */ readonly timestamp: Date; /** The message content */ readonly body: Body; /** Number of processing attempts made for this message */ readonly attempts: number; /** Retry processing this message */ retry(options?: QueueRetryOptions): void; /** Acknowledge successful processing of this message */ ack(): void;}
/** * Represents a batch of messages received from the queue */interface MessageBatch<Body = unknown> { /** Array of messages in this batch */ readonly messages: readonly Message<Body>[]; /** Name or identifier of the source queue */ readonly queue: string; /** Retry processing all messages in the batch */ retryAll(options?: QueueRetryOptions): void; /** Acknowledge successful processing of all messages in the batch */ ackAll(): void;}
/** * Represents a message to be sent to the queue */interface MessageSendRequest<Body = unknown> { /** The message content */ body: Body; /** The content type of the message */ contentType?: 'text' | 'bytes' | 'json' | 'v8'; /** Number of seconds to delay message processing */ delaySeconds?: number;}
/** Configuration options for sending a single message */interface QueueSendOptions { /** The content type of the message */ contentType?: 'text' | 'bytes' | 'json' | 'v8'; /** Number of seconds to delay message processing */ delaySeconds?: number;}
/** Configuration options for sending a batch of messages */interface QueueSendBatchOptions { /** Number of seconds to delay processing for all messages in the batch */ delaySeconds?: number;}
Core Concepts
Producer-Consumer Architecture
Raindrop queues follow a producer-consumer pattern with distinct roles:
Producers (Services and Actors) send messages to queues using the queue interface.
Consumers (Observers) are specialized components that consume messages from queues. Only observers can receive and process queue messages - services cannot directly consume from queues.
export default class OrderService extends Service<Env> { async fetch(request: Request): Promise<Response> { const orderData = await request.json();
// Service sends message to queue await env.DEMO_QUEUE.send({ userId: orderData.userId, type: "order_confirmation", orderId: orderData.id });
return new Response("Order processed"); }}
export default class PaymentProcessor extends Actor<Env> { async processPayment(amount: number, userId: string): Promise<void> { // Process payment logic here const paymentResult = await this.chargeCard(amount);
// Actor sends message to queue await this.env.DEMO_QUEUE.send({ type: "payment_processed", userId, amount, transactionId: paymentResult.id, timestamp: new Date().toISOString() }); }}
export default class DemoProcessor extends Each<DemoMessage, Env> { async process(message: Message<DemoMessage>): Promise<void> { try { // Observer processes the message console.log(`Processing ${message.body.type} for user ${message.body.userId}`);
// Your processing logic here await this.handleMessage(message.body);
// Acknowledge successful processing message.ack(); } catch (error) { // Retry on failure message.retry({ delaySeconds: 60 }); } }
private async handleMessage(data: DemoMessage): Promise<void> { // Your business logic here }}
Sending Methods
The queue interface provides methods for sending messages individually or in batches.
send()
Sends a single message to the queue with optional configuration.
Send individual messages to a queue using the send()
method. The queue automatically handles JSON serialization, so you can send objects directly:
// Send a JSON object (automatic serialization)await env.NOTIFICATION_QUEUE.send({ userId: 123, type: "welcome", message: "Welcome to our platform!", timestamp: new Date().toISOString()});
// Send with delay for scheduled processingawait env.PROCESSING_QUEUE.send({ orderId: 456, action: "process_payment", amount: 99.99}, { delaySeconds: 30});
// Simple string messages work automaticallyawait env.LOG_QUEUE.send("User 123 completed onboarding");
sendBatch()
Sends multiple messages efficiently as a batch operation. Batch operations are more efficient than individual sends when processing multiple related messages, as they reduce network overhead and improve throughput.
const messages = [ { body: { userId: 123, action: "welcome" }, delaySeconds: 0 }, { body: { userId: 124, action: "welcome" }, delaySeconds: 5 }, { body: { userId: 125, action: "welcome" }, delaySeconds: 10 }];
await env.NOTIFICATION_QUEUE.sendBatch(messages, { delaySeconds: 0 // Global delay for the entire batch});
Message Methods
Individual messages received by observers provide methods for acknowledgment and retry handling.
ack()
Acknowledges successful processing of a message. This removes the message from the queue permanently.
export default class extends Each<TaskData, Env> { async process(message: Message<TaskData>): Promise<void> { try { // Process the message await this.handleTask(message.body);
// Acknowledge successful processing message.ack(); } catch (error) { // Handle error without ack - message will be retried message.retry({ delaySeconds: 60 }); } }}
retry()
Retries processing of a message with optional delay configuration. The message will be redelivered after the specified delay.
export default class extends Each<TaskData, Env> { async process(message: Message<TaskData>): Promise<void> { try { await this.processTask(message.body); message.ack(); } catch (error) { // Retry with exponential backoff based on attempt count const delay = Math.min(60 * Math.pow(2, message.attempts), 3600); message.retry({ delaySeconds: delay }); } }}
Batch Methods
Message batches provide methods for acknowledging or retrying all messages in the batch at once.
ackAll()
Acknowledges successful processing of all messages in the batch. This removes all messages from the queue permanently.
export default class extends Batch<OrderData, Env> { async process(batch: MessageBatch<OrderData>): Promise<void> { try { // Process all messages in the batch const orders = batch.messages.map(msg => msg.body); await this.processBulkOrders(orders);
// Acknowledge all messages at once batch.ackAll(); } catch (error) { // Retry entire batch batch.retryAll({ delaySeconds: 120 }); } }}
retryAll()
Retries processing of all messages in the batch with optional delay configuration. All messages will be redelivered after the specified delay.
export default class extends Batch<OrderData, Env> { async process(batch: MessageBatch<OrderData>): Promise<void> { try { await this.processBulkOrders(batch.messages.map(msg => msg.body)); batch.ackAll(); } catch (error) { console.error(`Failed to process batch:`, error); // Retry entire batch with delay batch.retryAll({ delaySeconds: 120 }); } }}
Receiving Messages
Only observers can process queue messages. Observers implement either the Each
pattern (processing one message at a time) or the Batch
pattern (processing multiple messages together).
Observers are configured in the manifest with:
- queue: Which queue to consume from
- batch_size: Maximum number of messages to process together (1-100)
observer "notification-processor" { source { queue = "notification-queue" batch_size = 10 # Process up to 10 messages per batch }}
Choose batch_size based on your processing needs: smaller batches (1-10) for low-latency processing, larger batches (50-100) for high-throughput scenarios.
Each Observer Pattern
Use the Each
pattern for processing individual messages. Each message is processed separately and can be acknowledged or retried independently.
export default class NotificationProcessor extends Each<NotificationData, Env> { async process(message: Message<NotificationData>): Promise<void> { try { // Process the individual message await this.sendEmail(message.body.userId, message.body.content);
// Acknowledge successful processing message.ack();
console.log(`Processed message ${message.id} (attempt ${message.attempts})`); } catch (error) { console.error(`Failed to process message ${message.id}:`, error);
// Retry with delay if processing fails message.retry({ delaySeconds: 60 }); } }}
Batch Observer Pattern
Use the Batch
pattern for processing multiple messages together. When using the Batch
pattern, make sure to import both Batch
and MessageBatch
from the framework.
import { Batch, MessageBatch } from '@liquidmetal-ai/raindrop-framework';
export default class OrderProcessor extends Batch<OrderData, Env> { async process(batch: MessageBatch<OrderData>): Promise<void> { try { console.log(`Processing batch of ${batch.messages.length} messages from queue: ${batch.queue}`);
// Process all messages in the batch together const orders = batch.messages.map(msg => msg.body); await this.processBulkOrders(orders);
// Acknowledge all messages at once batch.ackAll();
console.log(`Processed batch of ${batch.messages.length} orders`); } catch (error) { console.error(`Failed to process batch:`, error);
// Retry entire batch with delay batch.retryAll({ delaySeconds: 120 }); } }
private async processBulkOrders(orders: OrderData[]): Promise<void> { // Your bulk processing logic here }}
Message Metadata
Every message includes metadata for monitoring and debugging:
export default class extends Each<TaskData, Env> { async process(message: Message<TaskData>): Promise<void> { console.log(`Processing message:`, { id: message.id, // Unique message identifier timestamp: message.timestamp, // When message was sent attempts: message.attempts, // Number of processing attempts body: message.body // The message content });
// Handle messages that have been retried multiple times if (message.attempts > 3) { console.warn(`Message ${message.id} has failed ${message.attempts} times`); // Consider dead letter queue or alert handling }
message.ack(); }}
Message Types and Content
Raindrop queues use JSON serialization by default, which handles most use cases reliably and efficiently. The queue system automatically serializes and deserializes JavaScript objects, arrays, strings, numbers, and other JSON-compatible data types.
Recommended approach: Send JavaScript objects directly to the queue without specifying a content type. The framework will handle serialization automatically.
// Recommended: Simple JSON objects (default behavior)await env.NOTIFICATION_QUEUE.send({ userId: 123, type: "welcome", message: "Welcome to our platform!", timestamp: new Date().toISOString()});
// Recommended: Arrays and nested objects work seamlesslyawait env.DATA_QUEUE.send({ users: [{ id: 1, name: "Alice" }, { id: 2, name: "Bob" }], metadata: { source: "web", timestamp: new Date().toISOString() }});
For specialized use cases, queues support additional content types:
await env.DATA_QUEUE.send({ timestamp: new Date().toISOString(), userId: 123, action: "user_signup", metadata: { source: "web", campaign: "summer2024" }});
await env.LOG_QUEUE.send( "User 123 completed onboarding at " + new Date().toISOString(), { contentType: "text" });
const fileData = new Uint8Array([/* binary data */]);await env.FILE_QUEUE.send(fileData, { contentType: "bytes",});
const complexObject = { date: new Date(), map: new Map([["key", "value"]]), set: new Set([1, 2, 3])};await env.COMPLEX_QUEUE.send(complexObject, { contentType: "v8"});
Delivery Options
Queues support flexible delivery configurations:
- Immediate delivery: Messages are processed as soon as possible (default behavior, no configuration required)
- Delayed delivery: Configured using
delaySeconds
in the options when callingsend()
orsendBatch()
- Batch processing: Configured in the observer manifest using the
batch_size
setting