Workflows
Creating and managing workflows
Workflows
Workflows orchestrate business logic through deterministic, fault-tolerant execution.
Creating Workflows
Basic Workflow
import { workflow } from 'worlds-engine';
const myWorkflow = workflow('my-workflow', async (ctx, input) => {
const result = await ctx.run(myActivity, input);
return result;
});
Workflow with Options
const myWorkflow = workflow('my-workflow', handler, {
retry: {
maxAttempts: 3,
backoff: 'exponential',
initialInterval: 1000,
maxInterval: 30000,
multiplier: 2
},
timeout: '5m',
failureStrategy: 'compensate',
taskQueue: 'critical'
});
Workflow Context API
Running Activities
Execute activities with automatic retry:
const result = await ctx.run(activity, input);
Child Workflows
Spawn child workflows:
const child = await ctx.executeChild('child-workflow', {
data: 'value'
}, {
workflowId: 'unique-child-id'
});
const childResult = await child.result();
Compensations (Saga Pattern)
Add compensation functions for rollback:
const payment = await ctx.run(chargeCard, order);
ctx.addCompensation(async () => {
await ctx.run(refundCard, payment.id);
});
const inventory = await ctx.run(reserveInventory, order);
ctx.addCompensation(async () => {
await ctx.run(releaseInventory, inventory.id);
});
Compensations execute in reverse order (LIFO) on failure.
Sleep
Deterministic delays:
await ctx.sleep(5000); // 5 seconds
HTTP Requests
Make HTTP requests with automatic retry:
const response = await ctx.fetch('https://api.example.com/data');
const data = await response.json();
Hooks
Create hooks to receive external events:
const hook = await ctx.createHook<{ approved: boolean }>();
console.log(`Send approval to: ${hook.token}`);
const payload = await hook.wait();
if (payload.approved) {
// Continue workflow
}
Resume the hook externally:
import { resumeHook } from 'worlds-engine';
await resumeHook(hookToken, { approved: true });
Webhooks
Suspend until HTTP request received:
const webhook = await ctx.createWebhook();
console.log(`Webhook URL: ${webhook.url}`);
const request = await webhook.wait();
const body = await request.json();
Writable Streams
Stream progress updates:
const stream = ctx.getWritable();
const writer = stream.getWriter();
await writer.write({ progress: 25, status: 'processing' });
await writer.write({ progress: 50, status: 'halfway' });
await writer.write({ progress: 100, status: 'complete' });
Metadata
Access workflow metadata:
const metadata = ctx.getMetadata();
console.log(metadata.workflowId);
console.log(metadata.runId);
console.log(metadata.startedAt);
console.log(metadata.parentId);
Cancellation
Check if workflow was cancelled:
if (ctx.isCancelled()) {
// Clean up and return
return { cancelled: true };
}
Workflow Dev Kit API
Alternative functional API for workflows:
import {
getWorkflowMetadata,
getStepMetadata,
sleep,
fetch,
createHook,
createWebhook,
getWritable
} from 'worlds-engine';
const myWorkflow = async (input) => {
'use workflow';
const metadata = getWorkflowMetadata();
console.log(metadata.workflowId);
await sleep(1000);
const response = await fetch('https://api.example.com');
return response;
};
Workflow Options
interface WorkflowOptions {
// Retry configuration
retry?: {
maxAttempts: number;
backoff: 'linear' | 'exponential' | 'constant';
initialInterval: number;
maxInterval: number;
multiplier: number;
};
// Timeout
timeout?: string | number; // '5m', '1h', or milliseconds
// Failure handling
failureStrategy?: 'retry' | 'compensate' | 'cascade' | 'ignore' | 'quarantine';
// Queue routing
taskQueue?: string;
// Priority
priority?: number;
// Parent relationship
parentId?: string;
}
Workflow Lifecycle
Status States
pending- Queued, not startedrunning- Currently executingcompleted- Finished successfullyfailed- Failed and exhausted retriescompensating- Running compensationscompensated- Compensations completecancelled- Cancelled by userquarantined- Isolated for debugging
History Events
All workflow actions are recorded:
const state = await world.query(workflowId);
console.log(state.history);
// [
// { type: 'workflow_started', timestamp: 1234567890 },
// { type: 'activity_scheduled', name: 'my-activity' },
// { type: 'activity_completed', result: {...} },
// { type: 'workflow_completed', result: {...} }
// ]
Querying Workflows
Single Workflow
const state = await world.query(workflowId);
console.log(state.status);
console.log(state.result);
console.log(state.error);
Multiple Workflows
const runs = await world.queryWorkflows({
status: 'running',
workflowName: 'order-workflow',
startedAfter: Date.now() - 86400000, // Last 24 hours
limit: 50,
offset: 0
});
Controlling Workflows
Cancel
const handle = await world.execute('my-workflow', input);
await handle.cancel();
Wait for Result
const result = await handle.result(); // Waits for completion
Query Status
const state = await handle.query();
console.log(state.status);
Best Practices
1. Idempotent Activities
Activities should be idempotent for safe retries:
const createOrder = activity('create-order', async (ctx, input) => {
// Check if order already exists
const existing = await db.findOrder(input.orderId);
if (existing) return existing;
// Create if not exists
return await db.createOrder(input);
});
2. Deterministic Workflows
Workflows must be deterministic:
// ✅ Good - deterministic
const workflow = workflow('good', async (ctx, input) => {
const result = await ctx.run(activity, input);
return result;
});
// ❌ Bad - non-deterministic
const workflow = workflow('bad', async (ctx, input) => {
const random = Math.random(); // Non-deterministic!
return random;
});
3. Small Inputs/Outputs
Keep workflow inputs and outputs small:
// ✅ Good - pass IDs
await world.execute('process-order', { orderId: '12345' });
// ❌ Bad - pass large objects
await world.execute('process-order', { order: largeOrderObject });
4. Use Compensations
Always add compensations for side effects:
const payment = await ctx.run(charge, order);
ctx.addCompensation(() => ctx.run(refund, payment));
const shipment = await ctx.run(ship, order);
ctx.addCompensation(() => ctx.run(cancelShipment, shipment));
Examples
Order Processing
const processOrder = workflow('process-order', async (ctx, input) => {
// Validate order
const validation = await ctx.run(validateOrder, input.order);
if (!validation.valid) {
return { success: false, error: validation.error };
}
// Charge payment
const payment = await ctx.run(chargePayment, input.order);
ctx.addCompensation(() => ctx.run(refundPayment, payment.id));
// Reserve inventory
const inventory = await ctx.run(reserveInventory, input.order);
ctx.addCompensation(() => ctx.run(releaseInventory, inventory.id));
// Create shipment
const shipment = await ctx.run(createShipment, input.order);
ctx.addCompensation(() => ctx.run(cancelShipment, shipment.id));
return {
success: true,
orderId: input.order.id,
payment,
shipment
};
});
Approval Workflow
const approvalWorkflow = workflow('approval', async (ctx, input) => {
// Create approval hook
const hook = await ctx.createHook<{ approved: boolean; notes?: string }>();
// Send notification
await ctx.run(sendApprovalEmail, {
to: input.approver,
hookToken: hook.token
});
// Wait for response (with timeout)
const response = await Promise.race([
hook.wait(),
ctx.sleep(86400000).then(() => ({ approved: false, timeout: true }))
]);
if (response.approved) {
await ctx.run(processApproval, input);
}
return response;
});
Next Steps
- Learn about Activities
- Understand Retry & Error Handling
- Explore Saga Pattern