Message Queue
The Raindrop framework provides a robust message queue system that enables asynchronous communication between services and actors. Message queues allow you to decouple components, handle workload bursts, and build resilient distributed systems that can process tasks reliably over time.
Message queues in Raindrop support multiple content types, batch operations, and flexible delivery options including delayed processing. The queue system automatically handles message persistence, delivery guarantees, and retry mechanisms, allowing you to focus on your business logic rather than infrastructure concerns.
Whether you’re processing user uploads, sending notifications, or coordinating complex workflows, Raindrop’s message queues provide the reliability and scalability needed for production applications. The queue interface is designed to be intuitive while offering powerful features for message handling and error recovery.
Prerequisites
- Basic understanding of asynchronous programming concepts
- Raindrop framework installed in your project
- Familiarity with TypeScript and async/await patterns
- Understanding of message-driven architecture patterns
Creating a New Queue
To add a message queue to your Raindrop project, you need both a queue definition and at least one observer to consume messages:
- Define the queue and observer in your application manifest
- Run
raindrop build generate
to create the necessary files
application "demo-app" { queue "notification-queue" {}
observer "notification-processor" { source { queue = "notification-queue" batch_size = 10 } }}
Once deployed, Raindrop automatically:
- Creates your queue instance with persistent storage
- Creates the observer to consume messages from the queue
- Sets up all required bindings and routing between services, queues, and observers
- Configures message delivery and retry mechanisms
Accessing Your Queue
Message queues are made available to your services and actors through environment variables. The queue name from your manifest is converted to an uppercase environment variable with underscores replacing dashes.
For example, if your manifest defines a queue as queue "notification-queue"
, you would access it in your code as env.NOTIFICATION_QUEUE
.
export default class extends Service<Env> { async fetch(request: Request, env: Env): Promise<Response> { // Send a notification message to the queue await env.NOTIFICATION_QUEUE.send({ userId: 123, message: "Welcome to our platform!" }); return new Response("Notification queued"); }}
Core Concepts
Producer-Consumer Architecture
Raindrop queues follow a producer-consumer pattern with distinct roles:
Producers (Services and Actors) send messages to queues using the queue interface.
Consumers (Observers) are specialized components that consume messages from queues. Only observers can receive and process queue messages - services cannot directly consume from queues.
export default class OrderService extends Service<Env> { async fetch(request: Request): Promise<Response> { const orderData = await request.json();
// Service sends message to queue await env.DEMO_QUEUE.send({ userId: orderData.userId, type: "order_confirmation", orderId: orderData.id });
return new Response("Order processed"); }}
export default class PaymentProcessor extends Actor<Env> { async processPayment(amount: number, userId: string): Promise<void> { // Process payment logic here const paymentResult = await this.chargeCard(amount);
// Actor sends message to queue await this.env.DEMO_QUEUE.send({ type: "payment_processed", userId, amount, transactionId: paymentResult.id, timestamp: new Date().toISOString() }); }}
export default class DemoProcessor extends Each<DemoMessage, Env> { async process(message: Message<DemoMessage>): Promise<void> { try { // Observer processes the message console.log(`Processing ${message.body.type} for user ${message.body.userId}`);
// Your processing logic here await this.handleMessage(message.body);
// Acknowledge successful processing message.ack(); } catch (error) { // Retry on failure message.retry({ delaySeconds: 60 }); } }
private async handleMessage(data: DemoMessage): Promise<void> { // Your business logic here }}
Sending Messages
The Queue<Body>
interface provides the main entry point for sending messages:
- send(message, options?): Sends a single message to the queue with optional configuration
- sendBatch(messages, options?): Sends multiple messages as a batch operation for improved performance
Sending Individual Messages
Send individual messages to a queue using the send()
method. The queue automatically handles JSON serialization, so you can send objects directly:
// Send a JSON object (automatic serialization)await env.NOTIFICATION_QUEUE.send({ userId: 123, type: "welcome", message: "Welcome to our platform!", timestamp: new Date().toISOString()});
// Send with delay for scheduled processingawait env.PROCESSING_QUEUE.send({ orderId: 456, action: "process_payment", amount: 99.99}, { delaySeconds: 30});
// Simple string messages work automaticallyawait env.LOG_QUEUE.send("User 123 completed onboarding");
Sending Multiple Messages
Send multiple messages efficiently using sendBatch()
:
const messages = [ { body: { userId: 123, action: "welcome" }, delaySeconds: 0 }, { body: { userId: 124, action: "welcome" }, delaySeconds: 5 }, { body: { userId: 125, action: "welcome" }, delaySeconds: 10 }];
await env.NOTIFICATION_QUEUE.sendBatch(messages, { delaySeconds: 0 // Global delay for the entire batch});
Receiving Messages
Only observers can process queue messages. Observers implement either the Each
pattern (processing one message at a time) or the Batch
pattern (processing multiple messages together).
Observers are configured in the manifest with:
- queue: Which queue to consume from
- batch_size: Maximum number of messages to process together (1-100)
observer "notification-processor" { source { queue = "notification-queue" batch_size = 10 # Process up to 10 messages per batch }}
Single Message Processing (Each Observer)
Use the Each
pattern for processing individual messages:
export default class NotificationProcessor extends Each<NotificationData, Env> { async process(message: Message<NotificationData>): Promise<void> { try { // Process the individual message await this.sendEmail(message.body.userId, message.body.content);
// Acknowledge successful processing message.ack();
console.log(`Processed message ${message.id} (attempt ${message.attempts})`); } catch (error) { console.error(`Failed to process message ${message.id}:`, error);
// Retry with delay if processing fails message.retry({ delaySeconds: 60 }); } }}
Batch Message Processing (Batch Observer)
Use the Batch
pattern for processing multiple messages together:
import { Batch, MessageBatch } from '@liquidmetal-ai/raindrop-framework';
export default class OrderProcessor extends Batch<OrderData, Env> { async process(batch: MessageBatch<OrderData>): Promise<void> { try { console.log(`Processing batch of ${batch.messages.length} messages from queue: ${batch.queue}`);
// Process all messages in the batch together const orders = batch.messages.map(msg => msg.body); await this.processBulkOrders(orders);
// Acknowledge all messages at once batch.ackAll();
console.log(`Processed batch of ${batch.messages.length} orders`); } catch (error) { console.error(`Failed to process batch:`, error);
// Retry entire batch with delay batch.retryAll({ delaySeconds: 120 }); } }
private async processBulkOrders(orders: OrderData[]): Promise<void> { // Your bulk processing logic here }}
Message Metadata
Every message includes comprehensive metadata for monitoring and debugging:
export default class extends Each<TaskData, Env> { async process(message: Message<TaskData>): Promise<void> { console.log(`Processing message:`, { id: message.id, timestamp: message.timestamp, attempts: message.attempts, body: message.body });
// Handle messages that have been retried multiple times if (message.attempts > 3) { console.warn(`Message ${message.id} has failed ${message.attempts} times`); // Consider dead letter queue or alert handling }
message.ack(); }}
Message Types and Content
Raindrop queues use JSON serialization by default, which handles most use cases reliably and efficiently. The queue system automatically serializes and deserializes JavaScript objects, arrays, strings, numbers, and other JSON-compatible data types.
Recommended approach: Send JavaScript objects directly to the queue without specifying a content type. The framework will handle serialization automatically.
// Recommended: Simple JSON objects (default behavior)await env.NOTIFICATION_QUEUE.send({ userId: 123, type: "welcome", message: "Welcome to our platform!", timestamp: new Date().toISOString()});
// Recommended: Arrays and nested objects work seamlesslyawait env.DATA_QUEUE.send({ users: [{ id: 1, name: "Alice" }, { id: 2, name: "Bob" }], metadata: { source: "web", timestamp: new Date().toISOString() }});
For specialized use cases, queues support additional content types:
await env.DATA_QUEUE.send({ timestamp: new Date().toISOString(), userId: 123, action: "user_signup", metadata: { source: "web", campaign: "summer2024" }});
await env.LOG_QUEUE.send( "User 123 completed onboarding at " + new Date().toISOString(), { contentType: "text" });
const fileData = new Uint8Array([/* binary data */]);await env.FILE_QUEUE.send(fileData, { contentType: "bytes",});
const complexObject = { date: new Date(), map: new Map([["key", "value"]]), set: new Set([1, 2, 3])};await env.COMPLEX_QUEUE.send(complexObject, { contentType: "v8"});
Delivery Options
Queues support flexible delivery configurations:
- Immediate delivery: Messages are processed as soon as possible (default behavior, no configuration required)
- Delayed delivery: Configured using
delaySeconds
in the options when callingsend()
orsendBatch()
- Batch processing: Configured in the observer manifest using the
batch_size
setting