Skip to content

Message Queue

The Raindrop framework provides a robust message queue system that enables asynchronous communication between services and actors. Message queues allow you to decouple components, handle workload bursts, and build resilient distributed systems that can process tasks reliably over time.

Message queues in Raindrop support multiple content types, batch operations, and flexible delivery options including delayed processing. The queue system automatically handles message persistence, delivery guarantees, and retry mechanisms, allowing you to focus on your business logic rather than infrastructure concerns.

Whether you’re processing user uploads, sending notifications, or coordinating complex workflows, Raindrop’s message queues provide the reliability and scalability needed for production applications. The queue interface is designed to be intuitive while offering powerful features for message handling and error recovery.

Prerequisites

  • Basic understanding of asynchronous programming concepts
  • Raindrop framework installed in your project
  • Familiarity with TypeScript and async/await patterns
  • Understanding of message-driven architecture patterns

Creating a New Queue

To add a message queue to your Raindrop project, you need both a queue definition and at least one observer to consume messages:

  1. Define the queue and observer in your application manifest
  2. Run raindrop build generate to create the necessary files
application "demo-app" {
queue "notification-queue" {}
observer "notification-processor" {
source {
queue = "notification-queue"
batch_size = 10
}
}
}

Once deployed, Raindrop automatically:

  • Creates your queue instance with persistent storage
  • Creates the observer to consume messages from the queue
  • Sets up all required bindings and routing between services, queues, and observers
  • Configures message delivery and retry mechanisms

Accessing Your Queue

Message queues are made available to your services and actors through environment variables. The queue name from your manifest is converted to an uppercase environment variable with underscores replacing dashes.

For example, if your manifest defines a queue as queue "notification-queue", you would access it in your code as env.NOTIFICATION_QUEUE.

export default class extends Service<Env> {
async fetch(request: Request, env: Env): Promise<Response> {
// Send a notification message to the queue
await env.NOTIFICATION_QUEUE.send({
userId: 123,
message: "Welcome to our platform!"
});
return new Response("Notification queued");
}
}

Core Concepts

Producer-Consumer Architecture

Raindrop queues follow a producer-consumer pattern with distinct roles:

Producers (Services and Actors) send messages to queues using the queue interface.

Consumers (Observers) are specialized components that consume messages from queues. Only observers can receive and process queue messages - services cannot directly consume from queues.

export default class OrderService extends Service<Env> {
async fetch(request: Request): Promise<Response> {
const orderData = await request.json();
// Service sends message to queue
await env.DEMO_QUEUE.send({
userId: orderData.userId,
type: "order_confirmation",
orderId: orderData.id
});
return new Response("Order processed");
}
}

Sending Messages

The Queue<Body> interface provides the main entry point for sending messages:

  • send(message, options?): Sends a single message to the queue with optional configuration
  • sendBatch(messages, options?): Sends multiple messages as a batch operation for improved performance

Sending Individual Messages

Send individual messages to a queue using the send() method. The queue automatically handles JSON serialization, so you can send objects directly:

// Send a JSON object (automatic serialization)
await env.NOTIFICATION_QUEUE.send({
userId: 123,
type: "welcome",
message: "Welcome to our platform!",
timestamp: new Date().toISOString()
});
// Send with delay for scheduled processing
await env.PROCESSING_QUEUE.send({
orderId: 456,
action: "process_payment",
amount: 99.99
}, {
delaySeconds: 30
});
// Simple string messages work automatically
await env.LOG_QUEUE.send("User 123 completed onboarding");

Sending Multiple Messages

Send multiple messages efficiently using sendBatch():

const messages = [
{
body: { userId: 123, action: "welcome" },
delaySeconds: 0
},
{
body: { userId: 124, action: "welcome" },
delaySeconds: 5
},
{
body: { userId: 125, action: "welcome" },
delaySeconds: 10
}
];
await env.NOTIFICATION_QUEUE.sendBatch(messages, {
delaySeconds: 0 // Global delay for the entire batch
});

Receiving Messages

Only observers can process queue messages. Observers implement either the Each pattern (processing one message at a time) or the Batch pattern (processing multiple messages together).

Observers are configured in the manifest with:

  • queue: Which queue to consume from
  • batch_size: Maximum number of messages to process together (1-100)
observer "notification-processor" {
source {
queue = "notification-queue"
batch_size = 10 # Process up to 10 messages per batch
}
}

Single Message Processing (Each Observer)

Use the Each pattern for processing individual messages:

export default class NotificationProcessor extends Each<NotificationData, Env> {
async process(message: Message<NotificationData>): Promise<void> {
try {
// Process the individual message
await this.sendEmail(message.body.userId, message.body.content);
// Acknowledge successful processing
message.ack();
console.log(`Processed message ${message.id} (attempt ${message.attempts})`);
} catch (error) {
console.error(`Failed to process message ${message.id}:`, error);
// Retry with delay if processing fails
message.retry({ delaySeconds: 60 });
}
}
}

Batch Message Processing (Batch Observer)

Use the Batch pattern for processing multiple messages together:

import { Batch, MessageBatch } from '@liquidmetal-ai/raindrop-framework';
export default class OrderProcessor extends Batch<OrderData, Env> {
async process(batch: MessageBatch<OrderData>): Promise<void> {
try {
console.log(`Processing batch of ${batch.messages.length} messages from queue: ${batch.queue}`);
// Process all messages in the batch together
const orders = batch.messages.map(msg => msg.body);
await this.processBulkOrders(orders);
// Acknowledge all messages at once
batch.ackAll();
console.log(`Processed batch of ${batch.messages.length} orders`);
} catch (error) {
console.error(`Failed to process batch:`, error);
// Retry entire batch with delay
batch.retryAll({ delaySeconds: 120 });
}
}
private async processBulkOrders(orders: OrderData[]): Promise<void> {
// Your bulk processing logic here
}
}

Message Metadata

Every message includes comprehensive metadata for monitoring and debugging:

export default class extends Each<TaskData, Env> {
async process(message: Message<TaskData>): Promise<void> {
console.log(`Processing message:`, {
id: message.id,
timestamp: message.timestamp,
attempts: message.attempts,
body: message.body
});
// Handle messages that have been retried multiple times
if (message.attempts > 3) {
console.warn(`Message ${message.id} has failed ${message.attempts} times`);
// Consider dead letter queue or alert handling
}
message.ack();
}
}

Message Types and Content

Raindrop queues use JSON serialization by default, which handles most use cases reliably and efficiently. The queue system automatically serializes and deserializes JavaScript objects, arrays, strings, numbers, and other JSON-compatible data types.

Recommended approach: Send JavaScript objects directly to the queue without specifying a content type. The framework will handle serialization automatically.

// Recommended: Simple JSON objects (default behavior)
await env.NOTIFICATION_QUEUE.send({
userId: 123,
type: "welcome",
message: "Welcome to our platform!",
timestamp: new Date().toISOString()
});
// Recommended: Arrays and nested objects work seamlessly
await env.DATA_QUEUE.send({
users: [{ id: 1, name: "Alice" }, { id: 2, name: "Bob" }],
metadata: { source: "web", timestamp: new Date().toISOString() }
});

For specialized use cases, queues support additional content types:

await env.DATA_QUEUE.send({
timestamp: new Date().toISOString(),
userId: 123,
action: "user_signup",
metadata: { source: "web", campaign: "summer2024" }
});

Delivery Options

Queues support flexible delivery configurations:

  • Immediate delivery: Messages are processed as soon as possible (default behavior, no configuration required)
  • Delayed delivery: Configured using delaySeconds in the options when calling send() or sendBatch()
  • Batch processing: Configured in the observer manifest using the batch_size setting