Skip to content

Queue

This content is for the 0.6.3 version. Switch to the latest version for up-to-date documentation.

Queue provides durable message queuing for asynchronous processing. You send messages to a queue, and they get processed by consumer functions at a later time.

Queues work with Observers to handle message processing. Only a single observer per queue is allowed. When messages arrive, your observer function receives them for processing.

Messages support multiple content types including text, JSON, binary data, and V8-serialized objects. The system handles message persistence, delivery guarantees, and automatic retries.

Creating

Define queues in your raindrop.manifest:

application "demo-app" {
queue "demo-queue" {}
}

Accessing

Access your queue through environment bindings in your code:

export default {
async fetch(request, env) {
await env.DEMO_QUEUE.send({ userId: 123, action: "process" });
return new Response("Message sent");
}
};

Core Concepts

Main Interfaces

  • Queue - Send individual messages or batches to the queue
  • Message - Represents a received message with processing controls
  • MessageBatch - Group of messages processed together

Core Data Types

QueueContentType

// Supported message content formats
type QueueContentType = 'text' | 'bytes' | 'json' | 'v8';

MessageSendRequest

// Configuration for sending a message in a batch
interface MessageSendRequest<Body = unknown> {
body: Body; // Message content
contentType?: QueueContentType; // Content format
delaySeconds?: number; // Processing delay
}

QueueSendOptions

// Options for sending individual messages
interface QueueSendOptions {
contentType?: QueueContentType; // Message format
delaySeconds?: number; // Processing delay (default: 0)
}

QueueSendBatchOptions

// Options for batch message sending
interface QueueSendBatchOptions {
delaySeconds?: number; // Delay for all messages in batch
}

QueueRetryOptions

// Configuration for message retry attempts
interface QueueRetryOptions {
delaySeconds?: number; // Delay before retry
}

System Limits

  • Maximum message size: 128KB
  • Maximum consumer batch size: 100 messages
  • Maximum messages per sendBatch call: 100 (or 256KB total)
  • Maximum message retries: 100
  • Maximum batch wait time: 60 seconds
  • Per-queue throughput: 5,000 messages per second
  • Message retention period: 4 days (default)

send

send(message: Body, options?: QueueSendOptions): Promise<void>

Example

Send a JSON message to the queue:

// Send user registration data for processing
await env.USER_QUEUE.send(
{ userId: 456, email: "user@example.com", action: "welcome" },
{ contentType: "json", delaySeconds: 30 }
);

sendBatch

sendBatch(
messages: Iterable<MessageSendRequest<Body>>,
options?: QueueSendBatchOptions
): Promise<void>

Example

Send multiple messages with different configurations:

// Send batch of notification messages
const messages = [
{ body: { type: "email", recipient: "user1@example.com" }, contentType: "json" },
{ body: { type: "sms", recipient: "+1234567890" }, delaySeconds: 60 },
{ body: "Push notification content", contentType: "text" }
];
await env.NOTIFICATION_QUEUE.sendBatch(messages, { delaySeconds: 10 });

ack

ack(): void

Example

Acknowledge successful message processing:

// Process message and acknowledge completion
export default {
async queue(batch, env) {
for (const message of batch.messages) {
const data = message.body;
await processUserData(data);
message.ack(); // Mark as successfully processed
}
}
};

retry

retry(options?: QueueRetryOptions): void

Example

Retry message processing with delay:

// Retry failed message processing after 120 seconds
export default {
async queue(batch, env) {
for (const message of batch.messages) {
try {
await processMessage(message.body);
message.ack();
} catch (error) {
message.retry({ delaySeconds: 120 });
}
}
}
};

ackAll

ackAll(): void

Example

Acknowledge all messages in a batch:

// Process entire batch and acknowledge all messages
export default {
async queue(batch, env) {
const results = await Promise.allSettled(
batch.messages.map(msg => processMessage(msg.body))
);
if (results.every(r => r.status === "fulfilled")) {
batch.ackAll(); // All messages processed successfully
}
}
};

retryAll

retryAll(options?: QueueRetryOptions): void

Example

Retry all messages in a batch:

// Retry entire batch if processing fails
export default {
async queue(batch, env) {
try {
await processBatch(batch.messages);
batch.ackAll();
} catch (error) {
batch.retryAll({ delaySeconds: 300 }); // Retry in 5 minutes
}
}
};