Skip to content

Observers

Observers

Observers are powerful components in Raindrop that let you execute code in response to changes in your resources. Think of them as event listeners that automatically trigger when specific conditions are met in your application.

Types of Observers

Raindrop provides two types of observers to help you build reactive applications:

  • Object Observers: Monitor changes to objects in buckets
  • Queue Observers: Process messages as they arrive in queues

Object Observers

Object observers watch for changes to objects in your buckets and execute code in response. They’re particularly useful when you need to process files after upload.

To create an object observer, add it to your raindrop.manifest.

application "demo" {
// First define your bucket
bucket "observed-bucket" {}
// Then create an observer
observer "my-observer" {
source {
bucket = "observed-bucket"
rule {
// Specify which actions should trigger the observer
actions = ["PutObject", "CompleteMultipartUpload", "CopyObject"]
}
}
}
}

Supported Trigger Actions

Your observer can respond to the following bucket events:

Object Creation Events

  • PutObject - Triggers when objects are uploaded directly
  • CopyObject - Triggers when objects are copied
  • CompleteMultipartUpload - Triggers when multipart uploads finish

Object Deletion Events

  • DeleteObject - Triggers on manual object deletion
  • LifecycleDeletion - Triggers when objects are deleted by lifecycle rules

Queue Observers

Queue observers process messages as they arrive in a queue. They’re the perfect solution for handling asynchronous tasks and managing background jobs that shouldn’t block your main application flow.

Setting up a queue observer is straightforward. First define your queue, then create an observer that watches it.

// Define the queue
queue "observed-queue" {}
// Create the queue observer
observer "queue-observer" {
source {
queue = "observed-queue"
}
}

Observer Base Classes

The Raindrop framework provides abstract base classes for implementing different message processing patterns.

Each<T, Env>

Process messages individually with full control over acknowledgment and retry logic:

import { Each, Message, ExecutionContext } from '@liquidmetal-ai/raindrop-framework';
export default class extends Each<NotificationMessage, Env> {
async process(message: Message<NotificationMessage>): Promise<void> {
try {
await this.sendNotification(message.body);
message.ack(); // Acknowledge successful processing
} catch (error) {
if (message.attempts < 3) {
message.retry({ delaySeconds: 60 }); // Retry with delay
} else {
console.error('Max retries exceeded:', error);
message.ack(); // Prevent infinite retries
}
}
}
}

Properties:

  • ctx: ExecutionContext - Execution context for the observer
  • env: Env - Environment bindings for accessing resources

Abstract Methods:

  • process(message: Message<T>): Promise<void> - Process individual messages

Batch<T, Env>

Process multiple messages together for higher throughput:

import { Batch, MessageBatch, ExecutionContext } from '@liquidmetal-ai/raindrop-framework';
export default class extends Batch<EmailTask, Env> {
async process(batch: MessageBatch<EmailTask>): Promise<void> {
try {
// Process all messages in batch
const emailPromises = batch.messages.map(msg => this.sendEmail(msg.body));
await Promise.all(emailPromises);
batch.ackAll(); // Acknowledge all messages
} catch (error) {
// Retry entire batch
batch.retryAll({ delaySeconds: 30 });
}
}
}

Properties:

  • ctx: ExecutionContext - Execution context for the observer
  • env: Env - Environment bindings for accessing resources

Abstract Methods:

  • process(batch: MessageBatch<T>): Promise<void> - Process message batches

raindrop.manifest

Configure observers in your manifest to respond to resource events:

application "event-driven-app" {
bucket "uploads" {
# Bucket to observe for file uploads
}
queue "notifications" {
# Queue to observe for message processing
}
observer "file-processor" {
# Observer for bucket events
source {
bucket = "uploads"
rule {
actions = ["PutObject", "CompleteMultipartUpload"]
prefix = "images/" # Optional: filter by key prefix
suffix = ".jpg" # Optional: filter by key suffix
}
}
}
observer "notification-handler" {
# Observer for queue messages
source {
queue = "notifications"
batch_size = 10 # Optional: process in batches
max_batch_timeout = 5 # Optional: max batch wait time
}
}
observer "multi-source" {
# Observer with multiple triggers
source {
bucket = "uploads"
rule {
actions = ["DeleteObject"]
}
}
source {
queue = "cleanup-tasks"
}
}
}