Skip to content

Queue

Queue

Overview

Queues provide asynchronous message processing with reliable delivery guarantees and automatic retry mechanisms. Each queue handles messages independently with configurable processing delays and batch operations for high-throughput scenarios.

Queues excel at decoupling system components, handling background processing, and managing workload distribution where message ordering and delivery reliability are important. They provide durability guarantees and automatic failure handling.

Key benefits include asynchronous message processing, automatic retry mechanisms, batch operation support, and configurable delay scheduling.

Prerequisites

  • Active Raindrop project with manifest configuration
  • Understanding of message queue concepts and asynchronous processing
  • Familiarity with TypeScript generics and async patterns
  • Knowledge of producer-consumer patterns and message handling

Creating/Getting Started

Define queues in your manifest file to enable message processing:

application "messaging-app" {
queue "user-notifications" {}
queue "data-processing" {}
service "api" {
domain = "api.example.com"
}
}

Generate queue bindings and handlers:

Terminal window
raindrop build generate

This creates typed environment bindings:

interface Env {
USER_NOTIFICATIONS: Queue<NotificationMessage>;
DATA_PROCESSING: Queue<ProcessingTask>;
}

Accessing/Basic Usage

Send messages to queues through environment bindings with type safety:

interface NotificationMessage {
userId: string;
type: 'email' | 'sms' | 'push';
content: string;
priority: number;
}
export default class extends Service<Env> {
async fetch(request: Request): Promise<Response> {
const { userId, message } = await request.json();
// Send single message
await this.env.USER_NOTIFICATIONS.send({
userId,
type: 'email',
content: message,
priority: 1
});
return Response.json({ status: 'queued' });
}
}

Process messages using queue consumers:

export default class extends Service<Env> {
async queue(batch: MessageBatch<NotificationMessage>): Promise<void> {
for (const message of batch.messages) {
try {
await this.processNotification(message.body);
message.ack(); // Mark message as successfully processed
} catch (error) {
console.error('Failed to process notification:', error);
message.retry({ delaySeconds: 30 }); // Retry after 30 seconds
}
}
}
private async processNotification(notification: NotificationMessage): Promise<void> {
switch (notification.type) {
case 'email':
await this.sendEmail(notification);
break;
case 'sms':
await this.sendSMS(notification);
break;
case 'push':
await this.sendPushNotification(notification);
break;
}
}
}

Core Concepts

Queues implement a producer-consumer pattern with reliable message delivery and automatic retry handling for failed messages.

Message Durability: Messages persist in the queue until successfully processed or exceed retry limits, ensuring reliable delivery.

Content Types: Queues support multiple content types (text, bytes, JSON, v8) with automatic serialization and deserialization.

Batch Processing: Consumer handlers receive message batches for efficient processing of multiple messages together.

Retry Mechanisms: Failed messages automatically retry with configurable delays and exponential backoff strategies.

Queue Interface

The Queue<Body> interface provides type-safe message operations:

interface Queue<Body = unknown> {
send(message: Body, options?: QueueSendOptions): Promise<void>;
sendBatch(messages: Iterable<MessageSendRequest<Body>>, options?: QueueSendBatchOptions): Promise<void>;
}

Generic Type:

  • Body: Message body type for type safety and serialization

Message Structure

Messages contain metadata, processing information, and acknowledgment controls:

interface Message<Body = unknown> {
readonly id: string; // Unique message identifier
readonly timestamp: Date; // When message was sent
readonly body: Body; // Message content
readonly attempts: number; // Processing attempt count
retry(options?: QueueRetryOptions): void; // Schedule retry
ack(): void; // Mark as processed
}

Message Sending

Send individual messages or batches with optional processing delays and content type specifications.

send(message, options?)

Send a single message to the queue with optional configuration:

interface ProcessingTask {
taskId: string;
operation: string;
data: any;
priority: number;
}
// Send with default options
await this.env.DATA_PROCESSING.send({
taskId: 'task-123',
operation: 'image-resize',
data: { imageUrl: 'https://example.com/image.jpg' },
priority: 1
});
// Send with custom options
await this.env.DATA_PROCESSING.send({
taskId: 'urgent-task-456',
operation: 'video-transcode',
data: { videoUrl: 'https://example.com/video.mp4' },
priority: 5
}, {
contentType: 'json',
delaySeconds: 300 // Process in 5 minutes
});

Parameters:

  • message: Body - Message content of the specified type
  • options: QueueSendOptions? - Optional sending configuration

Options:

  • contentType: 'text' | 'bytes' | 'json' | 'v8' - Message serialization format
  • delaySeconds: number - Delay before message becomes available for processing

sendBatch(messages, options?)

Send multiple messages efficiently in a single operation:

interface EmailMessage {
recipient: string;
subject: string;
body: string;
template?: string;
}
const emailBatch: MessageSendRequest<EmailMessage>[] = [
{
body: {
recipient: 'user1@example.com',
subject: 'Welcome!',
body: 'Welcome to our service',
template: 'welcome'
},
delaySeconds: 0
},
{
body: {
recipient: 'user2@example.com',
subject: 'Password Reset',
body: 'Reset your password here',
template: 'password-reset'
},
delaySeconds: 60 // Send in 1 minute
}
];
await this.env.EMAIL_QUEUE.sendBatch(emailBatch, {
delaySeconds: 30 // Default delay for all messages
});

Parameters:

  • messages: Iterable<MessageSendRequest<Body>> - Collection of messages to send
  • options: QueueSendBatchOptions? - Batch-level configuration

MessageSendRequest Structure:

  • body: Body - Message content
  • contentType: QueueContentType? - Message format override
  • delaySeconds: number? - Individual message delay

Message Processing

Handle queued messages through consumer functions that process message batches with acknowledgment controls.

queue(batch)

Process message batches with individual message control:

export default class extends Service<Env> {
async queue(batch: MessageBatch<ProcessingTask>): Promise<void> {
console.log(`Processing batch of ${batch.messages.length} messages from ${batch.queue}`);
for (const message of batch.messages) {
const startTime = Date.now();
try {
await this.processTask(message.body);
const processingTime = Date.now() - startTime;
console.log(`Task ${message.body.taskId} completed in ${processingTime}ms`);
message.ack(); // Mark as successfully processed
} catch (error) {
console.error(`Task ${message.body.taskId} failed:`, error);
if (message.attempts < 3) {
// Retry with exponential backoff
const delaySeconds = Math.pow(2, message.attempts) * 30;
message.retry({ delaySeconds });
} else {
// Max retries exceeded - handle failure
await this.handleFailedTask(message.body, error);
message.ack(); // Acknowledge to remove from queue
}
}
}
}
private async processTask(task: ProcessingTask): Promise<void> {
switch (task.operation) {
case 'image-resize':
await this.resizeImage(task.data);
break;
case 'video-transcode':
await this.transcodeVideo(task.data);
break;
case 'generate-thumbnail':
await this.generateThumbnail(task.data);
break;
default:
throw new Error(`Unknown operation: ${task.operation}`);
}
}
private async handleFailedTask(task: ProcessingTask, error: Error): Promise<void> {
// Log to dead letter queue or error tracking system
await this.env.ERROR_LOG.prepare(
'INSERT INTO failed_tasks (task_id, operation, error, attempts) VALUES (?, ?, ?, ?)'
).bind(task.taskId, task.operation, error.message, 3).run();
}
}

Batch Processing Patterns

Handle different batch processing scenarios effectively:

// Process messages in parallel (be careful with rate limits)
async queue(batch: MessageBatch<Task>): Promise<void> {
const processPromises = batch.messages.map(async (message) => {
try {
await this.processMessage(message.body);
message.ack();
} catch (error) {
message.retry({ delaySeconds: 60 });
}
});
await Promise.all(processPromises);
}
// Process with rate limiting
async queue(batch: MessageBatch<ApiCall>): Promise<void> {
const RATE_LIMIT = 10; // Max 10 concurrent operations
for (let i = 0; i < batch.messages.length; i += RATE_LIMIT) {
const chunk = batch.messages.slice(i, i + RATE_LIMIT);
const chunkPromises = chunk.map(async (message) => {
try {
await this.makeApiCall(message.body);
message.ack();
} catch (error) {
if (error.status === 429) { // Rate limited
message.retry({ delaySeconds: 300 }); // Retry in 5 minutes
} else {
message.retry({ delaySeconds: 30 });
}
}
});
await Promise.all(chunkPromises);
// Brief pause between chunks
if (i + RATE_LIMIT < batch.messages.length) {
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
}
// Conditional batch acknowledgment
async queue(batch: MessageBatch<Transaction>): Promise<void> {
let allSuccessful = true;
const results: Array<{message: Message<Transaction>, success: boolean}> = [];
// Process all messages first
for (const message of batch.messages) {
try {
await this.processTransaction(message.body);
results.push({ message, success: true });
} catch (error) {
results.push({ message, success: false });
allSuccessful = false;
}
}
// Handle results based on business logic
if (allSuccessful) {
batch.ackAll(); // All succeeded, acknowledge batch
} else {
// Individual handling
results.forEach(({ message, success }) => {
if (success) {
message.ack();
} else {
message.retry({ delaySeconds: 120 });
}
});
}
}

Retry Configuration

Configure retry behavior for failed messages with exponential backoff and maximum retry limits.

Message-Level Retries

Control retry behavior for individual messages:

// Basic retry
message.retry(); // Use default delay
// Retry with specific delay
message.retry({ delaySeconds: 60 });
// Retry with exponential backoff
const retryDelay = Math.min(
Math.pow(2, message.attempts) * 30, // Exponential: 30s, 60s, 120s, 240s...
300 // Cap at 5 minutes
);
message.retry({ delaySeconds: retryDelay });

Retry Strategies

Implement different retry patterns based on error types:

async processMessage(message: Message<Task>): Promise<void> {
try {
await this.performTask(message.body);
message.ack();
} catch (error) {
const retryConfig = this.getRetryConfig(error, message.attempts);
if (retryConfig.shouldRetry) {
message.retry({ delaySeconds: retryConfig.delaySeconds });
} else {
await this.handlePermanentFailure(message.body, error);
message.ack(); // Remove from queue
}
}
}
private getRetryConfig(error: Error, attempts: number): {shouldRetry: boolean, delaySeconds: number} {
// Network/temporary errors - retry with exponential backoff
if (error.message.includes('ECONNRESET') || error.message.includes('timeout')) {
return {
shouldRetry: attempts < 5,
delaySeconds: Math.pow(2, attempts) * 30
};
}
// Rate limiting - longer delays
if (error.message.includes('rate limit') || error.message.includes('429')) {
return {
shouldRetry: attempts < 3,
delaySeconds: 300 + (attempts * 300) // 5min, 10min, 15min
};
}
// Validation errors - don't retry
if (error.message.includes('validation') || error.message.includes('invalid')) {
return {
shouldRetry: false,
delaySeconds: 0
};
}
// Default retry strategy
return {
shouldRetry: attempts < 3,
delaySeconds: 60
};
}

Code Examples

Complete queue implementations demonstrating common messaging patterns and use cases.

Order Processing System

// manifest.hcl
application "ecommerce" {
queue "order-processing" {}
queue "inventory-updates" {}
queue "email-notifications" {}
service "orders" {
domain = "orders.example.com"
}
sqlDatabase "orders_db" {}
}
interface OrderMessage {
orderId: string;
customerId: string;
items: Array<{productId: string, quantity: number, price: number}>;
totalAmount: number;
paymentMethod: string;
shippingAddress: any;
}
interface InventoryUpdate {
productId: string;
quantityChange: number;
orderId: string;
operation: 'reserve' | 'confirm' | 'release';
}
interface EmailNotification {
recipient: string;
template: string;
data: any;
priority: 'high' | 'normal' | 'low';
}
export default class extends Service<Env> {
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
if (url.pathname === '/orders' && request.method === 'POST') {
return this.createOrder(request);
}
return new Response('Not found', { status: 404 });
}
private async createOrder(request: Request): Promise<Response> {
const orderData = await request.json();
try {
// Create order record
const order = await this.env.ORDERS_DB.prepare(`
INSERT INTO orders (customer_id, total_amount, status, created_at)
VALUES (?, ?, 'pending', ?) RETURNING id
`).bind(
orderData.customerId,
orderData.totalAmount,
Date.now()
).first();
// Queue order processing
await this.env.ORDER_PROCESSING.send({
orderId: order.id,
customerId: orderData.customerId,
items: orderData.items,
totalAmount: orderData.totalAmount,
paymentMethod: orderData.paymentMethod,
shippingAddress: orderData.shippingAddress
});
return Response.json({
orderId: order.id,
status: 'processing'
});
} catch (error) {
return Response.json({
error: 'Failed to create order'
}, { status: 500 });
}
}
// Process order queue
async queue(batch: MessageBatch<OrderMessage>): Promise<void> {
for (const message of batch.messages) {
try {
await this.processOrder(message.body);
message.ack();
} catch (error) {
console.error(`Order processing failed: ${message.body.orderId}`, error);
if (message.attempts < 3) {
message.retry({ delaySeconds: 60 * message.attempts }); // 1min, 2min, 3min
} else {
await this.handleFailedOrder(message.body);
message.ack();
}
}
}
}
private async processOrder(order: OrderMessage): Promise<void> {
console.log(`Processing order: ${order.orderId}`);
// Step 1: Reserve inventory
await this.reserveInventory(order);
// Step 2: Process payment
await this.processPayment(order);
// Step 3: Update order status
await this.env.ORDERS_DB.prepare(
'UPDATE orders SET status = ?, updated_at = ? WHERE id = ?'
).bind('confirmed', Date.now(), order.orderId).run();
// Step 4: Send confirmation email
await this.sendOrderConfirmation(order);
console.log(`Order completed: ${order.orderId}`);
}
private async reserveInventory(order: OrderMessage): Promise<void> {
const inventoryUpdates: MessageSendRequest<InventoryUpdate>[] = order.items.map(item => ({
body: {
productId: item.productId,
quantityChange: -item.quantity,
orderId: order.orderId,
operation: 'reserve'
}
}));
await this.env.INVENTORY_UPDATES.sendBatch(inventoryUpdates);
}
private async processPayment(order: OrderMessage): Promise<void> {
// Simulate payment processing
await new Promise(resolve => setTimeout(resolve, 1000));
if (Math.random() < 0.1) { // 10% failure rate for demo
throw new Error('Payment processing failed');
}
}
private async sendOrderConfirmation(order: OrderMessage): Promise<void> {
await this.env.EMAIL_NOTIFICATIONS.send({
recipient: order.customerId,
template: 'order_confirmation',
data: {
orderId: order.orderId,
items: order.items,
totalAmount: order.totalAmount
},
priority: 'high'
});
}
private async handleFailedOrder(order: OrderMessage): Promise<void> {
// Update order status
await this.env.ORDERS_DB.prepare(
'UPDATE orders SET status = ?, error_message = ?, updated_at = ? WHERE id = ?'
).bind('failed', 'Processing failed after retries', Date.now(), order.orderId).run();
// Release reserved inventory
const releaseUpdates: MessageSendRequest<InventoryUpdate>[] = order.items.map(item => ({
body: {
productId: item.productId,
quantityChange: item.quantity,
orderId: order.orderId,
operation: 'release'
}
}));
await this.env.INVENTORY_UPDATES.sendBatch(releaseUpdates);
// Send failure notification
await this.env.EMAIL_NOTIFICATIONS.send({
recipient: order.customerId,
template: 'order_failed',
data: { orderId: order.orderId },
priority: 'high'
});
}
}

Email Notification Service

export default class extends Service<Env> {
async queue(batch: MessageBatch<EmailNotification>): Promise<void> {
// Sort by priority: high -> normal -> low
const sortedMessages = [...batch.messages].sort((a, b) => {
const priorityOrder = { high: 3, normal: 2, low: 1 };
return priorityOrder[b.body.priority] - priorityOrder[a.body.priority];
});
// Process high priority emails first
const highPriority = sortedMessages.filter(m => m.body.priority === 'high');
const normalPriority = sortedMessages.filter(m => m.body.priority === 'normal');
const lowPriority = sortedMessages.filter(m => m.body.priority === 'low');
// Process each priority group
await this.processEmailBatch(highPriority);
await this.processEmailBatch(normalPriority);
await this.processEmailBatch(lowPriority);
}
private async processEmailBatch(messages: Message<EmailNotification>[]): Promise<void> {
const CONCURRENT_LIMIT = 5;
for (let i = 0; i < messages.length; i += CONCURRENT_LIMIT) {
const chunk = messages.slice(i, i + CONCURRENT_LIMIT);
const chunkPromises = chunk.map(async (message) => {
try {
await this.sendEmail(message.body);
message.ack();
} catch (error) {
await this.handleEmailError(message, error);
}
});
await Promise.all(chunkPromises);
}
}
private async sendEmail(notification: EmailNotification): Promise<void> {
const template = await this.getEmailTemplate(notification.template);
const renderedEmail = await this.renderTemplate(template, notification.data);
// Send via email service (e.g., SendGrid, SES)
await this.env.EMAIL_PROVIDER.send({
to: notification.recipient,
subject: renderedEmail.subject,
html: renderedEmail.html,
text: renderedEmail.text
});
console.log(`Email sent: ${notification.template} to ${notification.recipient}`);
}
private async handleEmailError(message: Message<EmailNotification>, error: Error): Promise<void> {
if (error.message.includes('invalid email') || error.message.includes('bounced')) {
// Don't retry invalid emails
await this.logEmailError(message.body, error, 'permanent');
message.ack();
} else if (message.attempts < 3) {
// Retry transient errors
const delay = message.body.priority === 'high' ? 60 : 300; // Faster retry for high priority
message.retry({ delaySeconds: delay });
} else {
// Max retries exceeded
await this.logEmailError(message.body, error, 'failed');
message.ack();
}
}
}
## raindrop.manifest
Configure message queues in your manifest for reliable asynchronous processing:
```hcl
application "async-app" {
queue "user-notifications" {
# Queue for sending notifications to users
}
queue "data-processing" {
# Queue for background data processing tasks
}
queue "email-delivery" {
# Queue for email delivery and notifications
}
service "producer" {
domain = "api.example.com"
# Service can send messages to queues via env.USER_NOTIFICATIONS, etc.
}
service "consumer" {
# Service with queue() method to process messages from all queues
}
actor "queue-manager" {
# Actors can also send and receive queue messages
}
}