Skip to content

Message Queue

Overview

The Raindrop framework provides a robust message queue system that enables asynchronous communication between services and actors. Message queues allow you to decouple components, handle workload bursts, and build resilient distributed systems that can process tasks reliably over time.

Message queues in Raindrop support multiple content types, batch operations, and flexible delivery options including delayed processing. The queue system automatically handles message persistence, delivery guarantees, and retry mechanisms, allowing you to focus on your business logic rather than infrastructure concerns.

Whether you’re processing user uploads, sending notifications, or coordinating complex workflows, Raindrop’s message queues provide the reliability and scalability needed for production applications. The queue interface is designed to be intuitive while offering powerful features for message handling and error recovery.

Prerequisites

  • Basic understanding of asynchronous programming concepts
  • Raindrop framework installed in your project
  • Familiarity with TypeScript and async/await patterns
  • Understanding of message-driven architecture patterns

Configuration

To add a message queue to your Raindrop project, you need both a queue definition and at least one observer to consume messages:

  1. Define the queue and observer in your application manifest
  2. Run raindrop build generate to create the necessary files
application "demo-app" {
queue "notification-queue" {}
observer "notification-processor" {
source {
queue = "notification-queue"
batch_size = 10
}
}
}

Once deployed, Raindrop automatically:

  • Creates your queue instance with persistent storage
  • Creates the observer to consume messages from the queue
  • Sets up all required bindings and routing between services, queues, and observers
  • Configures message delivery and retry mechanisms

Access

Message queues are made available to your services and actors through environment variables. The queue name from your manifest is converted to an uppercase environment variable with underscores replacing dashes.

For example, if your manifest defines a queue as queue "notification-queue", you would access it in your code as env.NOTIFICATION_QUEUE.

export default class extends Service<Env> {
async fetch(request: Request, env: Env): Promise<Response> {
// Send a notification message to the queue
await env.NOTIFICATION_QUEUE.send({
userId: 123,
message: "Welcome to our platform!"
});
return new Response("Notification queued");
}
}

Core Interfaces

The message queue system provides these TypeScript interfaces for type-safe queue operations:

/**
* Represents a message queue that can send individual messages or batches
*/
interface Queue<Body = unknown> {
/** Sends a single message to the queue */
send(message: Body, options?: QueueSendOptions): Promise<void>;
/** Sends multiple messages to the queue as a batch */
sendBatch(messages: Iterable<MessageSendRequest<Body>>, options?: QueueSendBatchOptions): Promise<void>;
}
/**
* Represents a message received from the queue
*/
interface Message<Body = unknown> {
/** Unique identifier for the message */
readonly id: string;
/** Timestamp when the message was sent */
readonly timestamp: Date;
/** The message content */
readonly body: Body;
/** Number of processing attempts made for this message */
readonly attempts: number;
/** Retry processing this message */
retry(options?: QueueRetryOptions): void;
/** Acknowledge successful processing of this message */
ack(): void;
}
/**
* Represents a batch of messages received from the queue
*/
interface MessageBatch<Body = unknown> {
/** Array of messages in this batch */
readonly messages: readonly Message<Body>[];
/** Name or identifier of the source queue */
readonly queue: string;
/** Retry processing all messages in the batch */
retryAll(options?: QueueRetryOptions): void;
/** Acknowledge successful processing of all messages in the batch */
ackAll(): void;
}
/**
* Represents a message to be sent to the queue
*/
interface MessageSendRequest<Body = unknown> {
/** The message content */
body: Body;
/** The content type of the message */
contentType?: 'text' | 'bytes' | 'json' | 'v8';
/** Number of seconds to delay message processing */
delaySeconds?: number;
}
/** Configuration options for sending a single message */
interface QueueSendOptions {
/** The content type of the message */
contentType?: 'text' | 'bytes' | 'json' | 'v8';
/** Number of seconds to delay message processing */
delaySeconds?: number;
}
/** Configuration options for sending a batch of messages */
interface QueueSendBatchOptions {
/** Number of seconds to delay processing for all messages in the batch */
delaySeconds?: number;
}

Core Concepts

Producer-Consumer Architecture

Raindrop queues follow a producer-consumer pattern with distinct roles:

Producers (Services and Actors) send messages to queues using the queue interface.

Consumers (Observers) are specialized components that consume messages from queues. Only observers can receive and process queue messages - services cannot directly consume from queues.

export default class OrderService extends Service<Env> {
async fetch(request: Request): Promise<Response> {
const orderData = await request.json();
// Service sends message to queue
await env.DEMO_QUEUE.send({
userId: orderData.userId,
type: "order_confirmation",
orderId: orderData.id
});
return new Response("Order processed");
}
}

Sending Methods

The queue interface provides methods for sending messages individually or in batches.

send()

Sends a single message to the queue with optional configuration.

Send individual messages to a queue using the send() method. The queue automatically handles JSON serialization, so you can send objects directly:

// Send a JSON object (automatic serialization)
await env.NOTIFICATION_QUEUE.send({
userId: 123,
type: "welcome",
message: "Welcome to our platform!",
timestamp: new Date().toISOString()
});
// Send with delay for scheduled processing
await env.PROCESSING_QUEUE.send({
orderId: 456,
action: "process_payment",
amount: 99.99
}, {
delaySeconds: 30
});
// Simple string messages work automatically
await env.LOG_QUEUE.send("User 123 completed onboarding");

sendBatch()

Sends multiple messages efficiently as a batch operation. Batch operations are more efficient than individual sends when processing multiple related messages, as they reduce network overhead and improve throughput.

const messages = [
{
body: { userId: 123, action: "welcome" },
delaySeconds: 0
},
{
body: { userId: 124, action: "welcome" },
delaySeconds: 5
},
{
body: { userId: 125, action: "welcome" },
delaySeconds: 10
}
];
await env.NOTIFICATION_QUEUE.sendBatch(messages, {
delaySeconds: 0 // Global delay for the entire batch
});

Message Methods

Individual messages received by observers provide methods for acknowledgment and retry handling.

ack()

Acknowledges successful processing of a message. This removes the message from the queue permanently.

export default class extends Each<TaskData, Env> {
async process(message: Message<TaskData>): Promise<void> {
try {
// Process the message
await this.handleTask(message.body);
// Acknowledge successful processing
message.ack();
} catch (error) {
// Handle error without ack - message will be retried
message.retry({ delaySeconds: 60 });
}
}
}

retry()

Retries processing of a message with optional delay configuration. The message will be redelivered after the specified delay.

export default class extends Each<TaskData, Env> {
async process(message: Message<TaskData>): Promise<void> {
try {
await this.processTask(message.body);
message.ack();
} catch (error) {
// Retry with exponential backoff based on attempt count
const delay = Math.min(60 * Math.pow(2, message.attempts), 3600);
message.retry({ delaySeconds: delay });
}
}
}

Batch Methods

Message batches provide methods for acknowledging or retrying all messages in the batch at once.

ackAll()

Acknowledges successful processing of all messages in the batch. This removes all messages from the queue permanently.

export default class extends Batch<OrderData, Env> {
async process(batch: MessageBatch<OrderData>): Promise<void> {
try {
// Process all messages in the batch
const orders = batch.messages.map(msg => msg.body);
await this.processBulkOrders(orders);
// Acknowledge all messages at once
batch.ackAll();
} catch (error) {
// Retry entire batch
batch.retryAll({ delaySeconds: 120 });
}
}
}

retryAll()

Retries processing of all messages in the batch with optional delay configuration. All messages will be redelivered after the specified delay.

export default class extends Batch<OrderData, Env> {
async process(batch: MessageBatch<OrderData>): Promise<void> {
try {
await this.processBulkOrders(batch.messages.map(msg => msg.body));
batch.ackAll();
} catch (error) {
console.error(`Failed to process batch:`, error);
// Retry entire batch with delay
batch.retryAll({ delaySeconds: 120 });
}
}
}

Receiving Messages

Only observers can process queue messages. Observers implement either the Each pattern (processing one message at a time) or the Batch pattern (processing multiple messages together).

Observers are configured in the manifest with:

  • queue: Which queue to consume from
  • batch_size: Maximum number of messages to process together (1-100)
observer "notification-processor" {
source {
queue = "notification-queue"
batch_size = 10 # Process up to 10 messages per batch
}
}

Choose batch_size based on your processing needs: smaller batches (1-10) for low-latency processing, larger batches (50-100) for high-throughput scenarios.

Each Observer Pattern

Use the Each pattern for processing individual messages. Each message is processed separately and can be acknowledged or retried independently.

export default class NotificationProcessor extends Each<NotificationData, Env> {
async process(message: Message<NotificationData>): Promise<void> {
try {
// Process the individual message
await this.sendEmail(message.body.userId, message.body.content);
// Acknowledge successful processing
message.ack();
console.log(`Processed message ${message.id} (attempt ${message.attempts})`);
} catch (error) {
console.error(`Failed to process message ${message.id}:`, error);
// Retry with delay if processing fails
message.retry({ delaySeconds: 60 });
}
}
}

Batch Observer Pattern

Use the Batch pattern for processing multiple messages together. When using the Batch pattern, make sure to import both Batch and MessageBatch from the framework.

import { Batch, MessageBatch } from '@liquidmetal-ai/raindrop-framework';
export default class OrderProcessor extends Batch<OrderData, Env> {
async process(batch: MessageBatch<OrderData>): Promise<void> {
try {
console.log(`Processing batch of ${batch.messages.length} messages from queue: ${batch.queue}`);
// Process all messages in the batch together
const orders = batch.messages.map(msg => msg.body);
await this.processBulkOrders(orders);
// Acknowledge all messages at once
batch.ackAll();
console.log(`Processed batch of ${batch.messages.length} orders`);
} catch (error) {
console.error(`Failed to process batch:`, error);
// Retry entire batch with delay
batch.retryAll({ delaySeconds: 120 });
}
}
private async processBulkOrders(orders: OrderData[]): Promise<void> {
// Your bulk processing logic here
}
}

Message Metadata

Every message includes metadata for monitoring and debugging:

export default class extends Each<TaskData, Env> {
async process(message: Message<TaskData>): Promise<void> {
console.log(`Processing message:`, {
id: message.id, // Unique message identifier
timestamp: message.timestamp, // When message was sent
attempts: message.attempts, // Number of processing attempts
body: message.body // The message content
});
// Handle messages that have been retried multiple times
if (message.attempts > 3) {
console.warn(`Message ${message.id} has failed ${message.attempts} times`);
// Consider dead letter queue or alert handling
}
message.ack();
}
}

Message Types and Content

Raindrop queues use JSON serialization by default, which handles most use cases reliably and efficiently. The queue system automatically serializes and deserializes JavaScript objects, arrays, strings, numbers, and other JSON-compatible data types.

Recommended approach: Send JavaScript objects directly to the queue without specifying a content type. The framework will handle serialization automatically.

// Recommended: Simple JSON objects (default behavior)
await env.NOTIFICATION_QUEUE.send({
userId: 123,
type: "welcome",
message: "Welcome to our platform!",
timestamp: new Date().toISOString()
});
// Recommended: Arrays and nested objects work seamlessly
await env.DATA_QUEUE.send({
users: [{ id: 1, name: "Alice" }, { id: 2, name: "Bob" }],
metadata: { source: "web", timestamp: new Date().toISOString() }
});

For specialized use cases, queues support additional content types:

await env.DATA_QUEUE.send({
timestamp: new Date().toISOString(),
userId: 123,
action: "user_signup",
metadata: { source: "web", campaign: "summer2024" }
});

Delivery Options

Queues support flexible delivery configurations:

  • Immediate delivery: Messages are processed as soon as possible (default behavior, no configuration required)
  • Delayed delivery: Configured using delaySeconds in the options when calling send() or sendBatch()
  • Batch processing: Configured in the observer manifest using the batch_size setting