Observers
Observers
Observers are powerful components in Raindrop that let you execute code in response to changes in your resources. Think of them as event listeners that automatically trigger when specific conditions are met in your application.
Types of Observers
Raindrop provides two types of observers to help you build reactive applications:
- Object Observers: Monitor changes to objects in buckets
- Queue Observers: Process messages as they arrive in queues
Object Observers
Object observers watch for changes to objects in your buckets and execute code in response. They’re particularly useful when you need to process files after upload.
To create an object observer, add it to your raindrop.manifest
.
application "demo" { // First define your bucket bucket "observed-bucket" {}
// Then create an observer observer "my-observer" { source { bucket = "observed-bucket" rule { // Specify which actions should trigger the observer actions = ["PutObject", "CompleteMultipartUpload", "CopyObject"] } } }}
import { Each } from '@liquidmetal-ai/raindrop-framework';import { Env } from './raindrop.gen';
export default class extends Each<Body, Env> { async process(message: observers.BucketEventNotification): Promise<void> { console.log(`received message: ${JSON.stringify(message)}`); }}
Supported Trigger Actions
Your observer can respond to the following bucket events:
Object Creation Events
PutObject
- Triggers when objects are uploaded directlyCopyObject
- Triggers when objects are copiedCompleteMultipartUpload
- Triggers when multipart uploads finish
Object Deletion Events
DeleteObject
- Triggers on manual object deletionLifecycleDeletion
- Triggers when objects are deleted by lifecycle rules
Queue Observers
Queue observers process messages as they arrive in a queue. They’re the perfect solution for handling asynchronous tasks and managing background jobs that shouldn’t block your main application flow.
Setting up a queue observer is straightforward. First define your queue, then create an observer that watches it.
// Define the queuequeue "observed-queue" {}
// Create the queue observerobserver "queue-observer" { source { queue = "observed-queue" }}
import { Each } from '@liquidmetal-ai/raindrop-framework';import { Env } from './raindrop.gen';
export default class extends Each<Body, Env> { async process(message: Body): Promise<void> { console.log(JSON.stringify(`received message: ${message}`)); }}export interface Body {}
Observer Base Classes
The Raindrop framework provides abstract base classes for implementing different message processing patterns.
Each<T, Env>
Process messages individually with full control over acknowledgment and retry logic:
import { Each, Message, ExecutionContext } from '@liquidmetal-ai/raindrop-framework';
export default class extends Each<NotificationMessage, Env> { async process(message: Message<NotificationMessage>): Promise<void> { try { await this.sendNotification(message.body); message.ack(); // Acknowledge successful processing } catch (error) { if (message.attempts < 3) { message.retry({ delaySeconds: 60 }); // Retry with delay } else { console.error('Max retries exceeded:', error); message.ack(); // Prevent infinite retries } } }}
Properties:
ctx
:ExecutionContext
- Execution context for the observerenv
:Env
- Environment bindings for accessing resources
Abstract Methods:
process(message: Message<T>): Promise<void>
- Process individual messages
Batch<T, Env>
Process multiple messages together for higher throughput:
import { Batch, MessageBatch, ExecutionContext } from '@liquidmetal-ai/raindrop-framework';
export default class extends Batch<EmailTask, Env> { async process(batch: MessageBatch<EmailTask>): Promise<void> { try { // Process all messages in batch const emailPromises = batch.messages.map(msg => this.sendEmail(msg.body)); await Promise.all(emailPromises);
batch.ackAll(); // Acknowledge all messages } catch (error) { // Retry entire batch batch.retryAll({ delaySeconds: 30 }); } }}
Properties:
ctx
:ExecutionContext
- Execution context for the observerenv
:Env
- Environment bindings for accessing resources
Abstract Methods:
process(batch: MessageBatch<T>): Promise<void>
- Process message batches
raindrop.manifest
Configure observers in your manifest to respond to resource events:
application "event-driven-app" { bucket "uploads" { # Bucket to observe for file uploads }
queue "notifications" { # Queue to observe for message processing }
observer "file-processor" { # Observer for bucket events source { bucket = "uploads" rule { actions = ["PutObject", "CompleteMultipartUpload"] prefix = "images/" # Optional: filter by key prefix suffix = ".jpg" # Optional: filter by key suffix } } }
observer "notification-handler" { # Observer for queue messages source { queue = "notifications" batch_size = 10 # Optional: process in batches max_batch_timeout = 5 # Optional: max batch wait time } }
observer "multi-source" { # Observer with multiple triggers source { bucket = "uploads" rule { actions = ["DeleteObject"] } } source { queue = "cleanup-tasks" } }}