Queue
This content is for the 0.6.3 version. Switch to the latest version for up-to-date documentation.
Queue provides durable message queuing for asynchronous processing. You send messages to a queue, and they get processed by consumer functions at a later time.
Queues work with Observers to handle message processing. Only a single observer per queue is allowed. When messages arrive, your observer function receives them for processing.
Messages support multiple content types including text, JSON, binary data, and V8-serialized objects. The system handles message persistence, delivery guarantees, and automatic retries.
Creating
Define queues in your raindrop.manifest
:
application "demo-app" { queue "demo-queue" {}}
Accessing
Access your queue through environment bindings in your code:
export default { async fetch(request, env) { await env.DEMO_QUEUE.send({ userId: 123, action: "process" }); return new Response("Message sent"); }};
Core Concepts
Main Interfaces
- Queue - Send individual messages or batches to the queue
- Message - Represents a received message with processing controls
- MessageBatch - Group of messages processed together
Core Data Types
QueueContentType
// Supported message content formatstype QueueContentType = 'text' | 'bytes' | 'json' | 'v8';
MessageSendRequest
// Configuration for sending a message in a batchinterface MessageSendRequest<Body = unknown> { body: Body; // Message content contentType?: QueueContentType; // Content format delaySeconds?: number; // Processing delay}
QueueSendOptions
// Options for sending individual messagesinterface QueueSendOptions { contentType?: QueueContentType; // Message format delaySeconds?: number; // Processing delay (default: 0)}
QueueSendBatchOptions
// Options for batch message sendinginterface QueueSendBatchOptions { delaySeconds?: number; // Delay for all messages in batch}
QueueRetryOptions
// Configuration for message retry attemptsinterface QueueRetryOptions { delaySeconds?: number; // Delay before retry}
System Limits
- Maximum message size: 128KB
- Maximum consumer batch size: 100 messages
- Maximum messages per sendBatch call: 100 (or 256KB total)
- Maximum message retries: 100
- Maximum batch wait time: 60 seconds
- Per-queue throughput: 5,000 messages per second
- Message retention period: 4 days (default)
send
send(message: Body, options?: QueueSendOptions): Promise<void>
Promise<void> // Resolves when message is queued
Example
Send a JSON message to the queue:
// Send user registration data for processingawait env.USER_QUEUE.send( { userId: 456, email: "user@example.com", action: "welcome" }, { contentType: "json", delaySeconds: 30 });
sendBatch
sendBatch( messages: Iterable<MessageSendRequest<Body>>, options?: QueueSendBatchOptions): Promise<void>
Promise<void> // Resolves when all messages are queued
Example
Send multiple messages with different configurations:
// Send batch of notification messagesconst messages = [ { body: { type: "email", recipient: "user1@example.com" }, contentType: "json" }, { body: { type: "sms", recipient: "+1234567890" }, delaySeconds: 60 }, { body: "Push notification content", contentType: "text" }];
await env.NOTIFICATION_QUEUE.sendBatch(messages, { delaySeconds: 10 });
ack
ack(): void
void // Marks message as successfully processed
Example
Acknowledge successful message processing:
// Process message and acknowledge completionexport default { async queue(batch, env) { for (const message of batch.messages) { const data = message.body; await processUserData(data); message.ack(); // Mark as successfully processed } }};
retry
retry(options?: QueueRetryOptions): void
void // Schedules message for retry processing
Example
Retry message processing with delay:
// Retry failed message processing after 120 secondsexport default { async queue(batch, env) { for (const message of batch.messages) { try { await processMessage(message.body); message.ack(); } catch (error) { message.retry({ delaySeconds: 120 }); } } }};
ackAll
ackAll(): void
void // Marks all messages in batch as processed
Example
Acknowledge all messages in a batch:
// Process entire batch and acknowledge all messagesexport default { async queue(batch, env) { const results = await Promise.allSettled( batch.messages.map(msg => processMessage(msg.body)) );
if (results.every(r => r.status === "fulfilled")) { batch.ackAll(); // All messages processed successfully } }};
retryAll
retryAll(options?: QueueRetryOptions): void
void // Schedules all messages for retry processing
Example
Retry all messages in a batch:
// Retry entire batch if processing failsexport default { async queue(batch, env) { try { await processBatch(batch.messages); batch.ackAll(); } catch (error) { batch.retryAll({ delaySeconds: 300 }); // Retry in 5 minutes } }};