Queue
Queue
Overview
Queues provide asynchronous message processing with reliable delivery guarantees and automatic retry mechanisms. Each queue handles messages independently with configurable processing delays and batch operations for high-throughput scenarios.
Queues excel at decoupling system components, handling background processing, and managing workload distribution where message ordering and delivery reliability are important. They provide durability guarantees and automatic failure handling.
Key benefits include asynchronous message processing, automatic retry mechanisms, batch operation support, and configurable delay scheduling.
Prerequisites
- Active Raindrop project with manifest configuration
- Understanding of message queue concepts and asynchronous processing
- Familiarity with TypeScript generics and async patterns
- Knowledge of producer-consumer patterns and message handling
Creating/Getting Started
Define queues in your manifest file to enable message processing:
application "messaging-app" { queue "user-notifications" {} queue "data-processing" {}
service "api" { domain = "api.example.com" }}
Generate queue bindings and handlers:
raindrop build generate
This creates typed environment bindings:
interface Env { USER_NOTIFICATIONS: Queue<NotificationMessage>; DATA_PROCESSING: Queue<ProcessingTask>;}
Accessing/Basic Usage
Send messages to queues through environment bindings with type safety:
interface NotificationMessage { userId: string; type: 'email' | 'sms' | 'push'; content: string; priority: number;}
export default class extends Service<Env> { async fetch(request: Request): Promise<Response> { const { userId, message } = await request.json();
// Send single message await this.env.USER_NOTIFICATIONS.send({ userId, type: 'email', content: message, priority: 1 });
return Response.json({ status: 'queued' }); }}
Process messages using queue consumers:
export default class extends Service<Env> { async queue(batch: MessageBatch<NotificationMessage>): Promise<void> { for (const message of batch.messages) { try { await this.processNotification(message.body); message.ack(); // Mark message as successfully processed } catch (error) { console.error('Failed to process notification:', error); message.retry({ delaySeconds: 30 }); // Retry after 30 seconds } } }
private async processNotification(notification: NotificationMessage): Promise<void> { switch (notification.type) { case 'email': await this.sendEmail(notification); break; case 'sms': await this.sendSMS(notification); break; case 'push': await this.sendPushNotification(notification); break; } }}
Core Concepts
Queues implement a producer-consumer pattern with reliable message delivery and automatic retry handling for failed messages.
Message Durability: Messages persist in the queue until successfully processed or exceed retry limits, ensuring reliable delivery.
Content Types: Queues support multiple content types (text, bytes, JSON, v8) with automatic serialization and deserialization.
Batch Processing: Consumer handlers receive message batches for efficient processing of multiple messages together.
Retry Mechanisms: Failed messages automatically retry with configurable delays and exponential backoff strategies.
Queue Interface
The Queue<Body>
interface provides type-safe message operations:
interface Queue<Body = unknown> { send(message: Body, options?: QueueSendOptions): Promise<void>; sendBatch(messages: Iterable<MessageSendRequest<Body>>, options?: QueueSendBatchOptions): Promise<void>;}
Generic Type:
Body
: Message body type for type safety and serialization
Message Structure
Messages contain metadata, processing information, and acknowledgment controls:
interface Message<Body = unknown> { readonly id: string; // Unique message identifier readonly timestamp: Date; // When message was sent readonly body: Body; // Message content readonly attempts: number; // Processing attempt count
retry(options?: QueueRetryOptions): void; // Schedule retry ack(): void; // Mark as processed}
Message Sending
Send individual messages or batches with optional processing delays and content type specifications.
send(message, options?)
Send a single message to the queue with optional configuration:
interface ProcessingTask { taskId: string; operation: string; data: any; priority: number;}
// Send with default optionsawait this.env.DATA_PROCESSING.send({ taskId: 'task-123', operation: 'image-resize', data: { imageUrl: 'https://example.com/image.jpg' }, priority: 1});
// Send with custom optionsawait this.env.DATA_PROCESSING.send({ taskId: 'urgent-task-456', operation: 'video-transcode', data: { videoUrl: 'https://example.com/video.mp4' }, priority: 5}, { contentType: 'json', delaySeconds: 300 // Process in 5 minutes});
Parameters:
message
:Body
- Message content of the specified typeoptions
:QueueSendOptions?
- Optional sending configuration
Options:
contentType
:'text' | 'bytes' | 'json' | 'v8'
- Message serialization formatdelaySeconds
:number
- Delay before message becomes available for processing
sendBatch(messages, options?)
Send multiple messages efficiently in a single operation:
interface EmailMessage { recipient: string; subject: string; body: string; template?: string;}
const emailBatch: MessageSendRequest<EmailMessage>[] = [ { body: { recipient: 'user1@example.com', subject: 'Welcome!', body: 'Welcome to our service', template: 'welcome' }, delaySeconds: 0 }, { body: { recipient: 'user2@example.com', subject: 'Password Reset', body: 'Reset your password here', template: 'password-reset' }, delaySeconds: 60 // Send in 1 minute }];
await this.env.EMAIL_QUEUE.sendBatch(emailBatch, { delaySeconds: 30 // Default delay for all messages});
Parameters:
messages
:Iterable<MessageSendRequest<Body>>
- Collection of messages to sendoptions
:QueueSendBatchOptions?
- Batch-level configuration
MessageSendRequest Structure:
body
:Body
- Message contentcontentType
:QueueContentType?
- Message format overridedelaySeconds
:number?
- Individual message delay
Message Processing
Handle queued messages through consumer functions that process message batches with acknowledgment controls.
queue(batch)
Process message batches with individual message control:
export default class extends Service<Env> { async queue(batch: MessageBatch<ProcessingTask>): Promise<void> { console.log(`Processing batch of ${batch.messages.length} messages from ${batch.queue}`);
for (const message of batch.messages) { const startTime = Date.now();
try { await this.processTask(message.body);
const processingTime = Date.now() - startTime; console.log(`Task ${message.body.taskId} completed in ${processingTime}ms`);
message.ack(); // Mark as successfully processed
} catch (error) { console.error(`Task ${message.body.taskId} failed:`, error);
if (message.attempts < 3) { // Retry with exponential backoff const delaySeconds = Math.pow(2, message.attempts) * 30; message.retry({ delaySeconds }); } else { // Max retries exceeded - handle failure await this.handleFailedTask(message.body, error); message.ack(); // Acknowledge to remove from queue } } } }
private async processTask(task: ProcessingTask): Promise<void> { switch (task.operation) { case 'image-resize': await this.resizeImage(task.data); break; case 'video-transcode': await this.transcodeVideo(task.data); break; case 'generate-thumbnail': await this.generateThumbnail(task.data); break; default: throw new Error(`Unknown operation: ${task.operation}`); } }
private async handleFailedTask(task: ProcessingTask, error: Error): Promise<void> { // Log to dead letter queue or error tracking system await this.env.ERROR_LOG.prepare( 'INSERT INTO failed_tasks (task_id, operation, error, attempts) VALUES (?, ?, ?, ?)' ).bind(task.taskId, task.operation, error.message, 3).run(); }}
Batch Processing Patterns
Handle different batch processing scenarios effectively:
// Process messages in parallel (be careful with rate limits)async queue(batch: MessageBatch<Task>): Promise<void> { const processPromises = batch.messages.map(async (message) => { try { await this.processMessage(message.body); message.ack(); } catch (error) { message.retry({ delaySeconds: 60 }); } });
await Promise.all(processPromises);}
// Process with rate limitingasync queue(batch: MessageBatch<ApiCall>): Promise<void> { const RATE_LIMIT = 10; // Max 10 concurrent operations
for (let i = 0; i < batch.messages.length; i += RATE_LIMIT) { const chunk = batch.messages.slice(i, i + RATE_LIMIT);
const chunkPromises = chunk.map(async (message) => { try { await this.makeApiCall(message.body); message.ack(); } catch (error) { if (error.status === 429) { // Rate limited message.retry({ delaySeconds: 300 }); // Retry in 5 minutes } else { message.retry({ delaySeconds: 30 }); } } });
await Promise.all(chunkPromises);
// Brief pause between chunks if (i + RATE_LIMIT < batch.messages.length) { await new Promise(resolve => setTimeout(resolve, 1000)); } }}
// Conditional batch acknowledgmentasync queue(batch: MessageBatch<Transaction>): Promise<void> { let allSuccessful = true; const results: Array<{message: Message<Transaction>, success: boolean}> = [];
// Process all messages first for (const message of batch.messages) { try { await this.processTransaction(message.body); results.push({ message, success: true }); } catch (error) { results.push({ message, success: false }); allSuccessful = false; } }
// Handle results based on business logic if (allSuccessful) { batch.ackAll(); // All succeeded, acknowledge batch } else { // Individual handling results.forEach(({ message, success }) => { if (success) { message.ack(); } else { message.retry({ delaySeconds: 120 }); } }); }}
Retry Configuration
Configure retry behavior for failed messages with exponential backoff and maximum retry limits.
Message-Level Retries
Control retry behavior for individual messages:
// Basic retrymessage.retry(); // Use default delay
// Retry with specific delaymessage.retry({ delaySeconds: 60 });
// Retry with exponential backoffconst retryDelay = Math.min( Math.pow(2, message.attempts) * 30, // Exponential: 30s, 60s, 120s, 240s... 300 // Cap at 5 minutes);message.retry({ delaySeconds: retryDelay });
Retry Strategies
Implement different retry patterns based on error types:
async processMessage(message: Message<Task>): Promise<void> { try { await this.performTask(message.body); message.ack(); } catch (error) { const retryConfig = this.getRetryConfig(error, message.attempts);
if (retryConfig.shouldRetry) { message.retry({ delaySeconds: retryConfig.delaySeconds }); } else { await this.handlePermanentFailure(message.body, error); message.ack(); // Remove from queue } }}
private getRetryConfig(error: Error, attempts: number): {shouldRetry: boolean, delaySeconds: number} { // Network/temporary errors - retry with exponential backoff if (error.message.includes('ECONNRESET') || error.message.includes('timeout')) { return { shouldRetry: attempts < 5, delaySeconds: Math.pow(2, attempts) * 30 }; }
// Rate limiting - longer delays if (error.message.includes('rate limit') || error.message.includes('429')) { return { shouldRetry: attempts < 3, delaySeconds: 300 + (attempts * 300) // 5min, 10min, 15min }; }
// Validation errors - don't retry if (error.message.includes('validation') || error.message.includes('invalid')) { return { shouldRetry: false, delaySeconds: 0 }; }
// Default retry strategy return { shouldRetry: attempts < 3, delaySeconds: 60 };}
Code Examples
Complete queue implementations demonstrating common messaging patterns and use cases.
Order Processing System
// manifest.hclapplication "ecommerce" { queue "order-processing" {} queue "inventory-updates" {} queue "email-notifications" {}
service "orders" { domain = "orders.example.com" }
sqlDatabase "orders_db" {}}
interface OrderMessage { orderId: string; customerId: string; items: Array<{productId: string, quantity: number, price: number}>; totalAmount: number; paymentMethod: string; shippingAddress: any;}
interface InventoryUpdate { productId: string; quantityChange: number; orderId: string; operation: 'reserve' | 'confirm' | 'release';}
interface EmailNotification { recipient: string; template: string; data: any; priority: 'high' | 'normal' | 'low';}
export default class extends Service<Env> { async fetch(request: Request): Promise<Response> { const url = new URL(request.url);
if (url.pathname === '/orders' && request.method === 'POST') { return this.createOrder(request); }
return new Response('Not found', { status: 404 }); }
private async createOrder(request: Request): Promise<Response> { const orderData = await request.json();
try { // Create order record const order = await this.env.ORDERS_DB.prepare(` INSERT INTO orders (customer_id, total_amount, status, created_at) VALUES (?, ?, 'pending', ?) RETURNING id `).bind( orderData.customerId, orderData.totalAmount, Date.now() ).first();
// Queue order processing await this.env.ORDER_PROCESSING.send({ orderId: order.id, customerId: orderData.customerId, items: orderData.items, totalAmount: orderData.totalAmount, paymentMethod: orderData.paymentMethod, shippingAddress: orderData.shippingAddress });
return Response.json({ orderId: order.id, status: 'processing' });
} catch (error) { return Response.json({ error: 'Failed to create order' }, { status: 500 }); } }
// Process order queue async queue(batch: MessageBatch<OrderMessage>): Promise<void> { for (const message of batch.messages) { try { await this.processOrder(message.body); message.ack(); } catch (error) { console.error(`Order processing failed: ${message.body.orderId}`, error);
if (message.attempts < 3) { message.retry({ delaySeconds: 60 * message.attempts }); // 1min, 2min, 3min } else { await this.handleFailedOrder(message.body); message.ack(); } } } }
private async processOrder(order: OrderMessage): Promise<void> { console.log(`Processing order: ${order.orderId}`);
// Step 1: Reserve inventory await this.reserveInventory(order);
// Step 2: Process payment await this.processPayment(order);
// Step 3: Update order status await this.env.ORDERS_DB.prepare( 'UPDATE orders SET status = ?, updated_at = ? WHERE id = ?' ).bind('confirmed', Date.now(), order.orderId).run();
// Step 4: Send confirmation email await this.sendOrderConfirmation(order);
console.log(`Order completed: ${order.orderId}`); }
private async reserveInventory(order: OrderMessage): Promise<void> { const inventoryUpdates: MessageSendRequest<InventoryUpdate>[] = order.items.map(item => ({ body: { productId: item.productId, quantityChange: -item.quantity, orderId: order.orderId, operation: 'reserve' } }));
await this.env.INVENTORY_UPDATES.sendBatch(inventoryUpdates); }
private async processPayment(order: OrderMessage): Promise<void> { // Simulate payment processing await new Promise(resolve => setTimeout(resolve, 1000));
if (Math.random() < 0.1) { // 10% failure rate for demo throw new Error('Payment processing failed'); } }
private async sendOrderConfirmation(order: OrderMessage): Promise<void> { await this.env.EMAIL_NOTIFICATIONS.send({ recipient: order.customerId, template: 'order_confirmation', data: { orderId: order.orderId, items: order.items, totalAmount: order.totalAmount }, priority: 'high' }); }
private async handleFailedOrder(order: OrderMessage): Promise<void> { // Update order status await this.env.ORDERS_DB.prepare( 'UPDATE orders SET status = ?, error_message = ?, updated_at = ? WHERE id = ?' ).bind('failed', 'Processing failed after retries', Date.now(), order.orderId).run();
// Release reserved inventory const releaseUpdates: MessageSendRequest<InventoryUpdate>[] = order.items.map(item => ({ body: { productId: item.productId, quantityChange: item.quantity, orderId: order.orderId, operation: 'release' } }));
await this.env.INVENTORY_UPDATES.sendBatch(releaseUpdates);
// Send failure notification await this.env.EMAIL_NOTIFICATIONS.send({ recipient: order.customerId, template: 'order_failed', data: { orderId: order.orderId }, priority: 'high' }); }}
export default class extends Service<Env> { async queue(batch: MessageBatch<InventoryUpdate>): Promise<void> { // Group updates by product for batch processing const updatesByProduct = new Map<string, InventoryUpdate[]>();
for (const message of batch.messages) { const productId = message.body.productId; if (!updatesByProduct.has(productId)) { updatesByProduct.set(productId, []); } updatesByProduct.get(productId)!.push(message.body); }
// Process each product's updates for (const [productId, updates] of updatesByProduct) { try { await this.processProductUpdates(productId, updates);
// Acknowledge all messages for this product batch.messages .filter(m => m.body.productId === productId) .forEach(m => m.ack());
} catch (error) { console.error(`Inventory update failed for product ${productId}:`, error);
// Retry failed updates batch.messages .filter(m => m.body.productId === productId) .forEach(m => { if (m.attempts < 5) { m.retry({ delaySeconds: 30 }); } else { this.logInventoryError(m.body, error); m.ack(); // Remove from queue after max retries } }); } } }
private async processProductUpdates(productId: string, updates: InventoryUpdate[]): Promise<void> { const totalChange = updates.reduce((sum, update) => sum + update.quantityChange, 0);
// Update inventory atomically const result = await this.env.INVENTORY_DB.prepare(` UPDATE inventory SET quantity = quantity + ?, reserved = CASE WHEN ? < 0 THEN reserved + ABS(?) ELSE reserved - ABS(?) END, updated_at = ? WHERE product_id = ? RETURNING quantity, reserved `).bind( totalChange, totalChange, // For CASE condition totalChange, // Reserve amount totalChange, // Release amount Date.now(), productId ).first();
if (!result) { throw new Error(`Product ${productId} not found in inventory`); }
if (result.quantity < 0) { throw new Error(`Insufficient inventory for product ${productId}`); }
console.log(`Updated inventory for ${productId}: quantity=${result.quantity}, reserved=${result.reserved}`); }
private async logInventoryError(update: InventoryUpdate, error: Error): Promise<void> { await this.env.ERROR_LOG.prepare(` INSERT INTO inventory_errors (product_id, order_id, operation, quantity_change, error_message, created_at) VALUES (?, ?, ?, ?, ?, ?) `).bind( update.productId, update.orderId, update.operation, update.quantityChange, error.message, Date.now() ).run(); }}
Email Notification Service
export default class extends Service<Env> { async queue(batch: MessageBatch<EmailNotification>): Promise<void> { // Sort by priority: high -> normal -> low const sortedMessages = [...batch.messages].sort((a, b) => { const priorityOrder = { high: 3, normal: 2, low: 1 }; return priorityOrder[b.body.priority] - priorityOrder[a.body.priority]; });
// Process high priority emails first const highPriority = sortedMessages.filter(m => m.body.priority === 'high'); const normalPriority = sortedMessages.filter(m => m.body.priority === 'normal'); const lowPriority = sortedMessages.filter(m => m.body.priority === 'low');
// Process each priority group await this.processEmailBatch(highPriority); await this.processEmailBatch(normalPriority); await this.processEmailBatch(lowPriority); }
private async processEmailBatch(messages: Message<EmailNotification>[]): Promise<void> { const CONCURRENT_LIMIT = 5;
for (let i = 0; i < messages.length; i += CONCURRENT_LIMIT) { const chunk = messages.slice(i, i + CONCURRENT_LIMIT);
const chunkPromises = chunk.map(async (message) => { try { await this.sendEmail(message.body); message.ack(); } catch (error) { await this.handleEmailError(message, error); } });
await Promise.all(chunkPromises); } }
private async sendEmail(notification: EmailNotification): Promise<void> { const template = await this.getEmailTemplate(notification.template); const renderedEmail = await this.renderTemplate(template, notification.data);
// Send via email service (e.g., SendGrid, SES) await this.env.EMAIL_PROVIDER.send({ to: notification.recipient, subject: renderedEmail.subject, html: renderedEmail.html, text: renderedEmail.text });
console.log(`Email sent: ${notification.template} to ${notification.recipient}`); }
private async handleEmailError(message: Message<EmailNotification>, error: Error): Promise<void> { if (error.message.includes('invalid email') || error.message.includes('bounced')) { // Don't retry invalid emails await this.logEmailError(message.body, error, 'permanent'); message.ack(); } else if (message.attempts < 3) { // Retry transient errors const delay = message.body.priority === 'high' ? 60 : 300; // Faster retry for high priority message.retry({ delaySeconds: delay }); } else { // Max retries exceeded await this.logEmailError(message.body, error, 'failed'); message.ack(); } }}
## raindrop.manifest
Configure message queues in your manifest for reliable asynchronous processing:
```hclapplication "async-app" { queue "user-notifications" { # Queue for sending notifications to users }
queue "data-processing" { # Queue for background data processing tasks }
queue "email-delivery" { # Queue for email delivery and notifications }
service "producer" { domain = "api.example.com" # Service can send messages to queues via env.USER_NOTIFICATIONS, etc. }
service "consumer" { # Service with queue() method to process messages from all queues }
actor "queue-manager" { # Actors can also send and receive queue messages }}