DevelopmentWorkflows
Workflow Examples
Workflow Examples
This document provides practical examples of workflows for common use cases.
Table of Contents
Email Workflows
Simple Email Notification
interface SendNotificationInput {
userId: string;
companyId: string;
organizationId: string;
recipients: string[];
subject: string;
message: string;
}
const sendNotification = ow.defineWorkflow(
{ name: "send-notification" },
async ({ input, step }: { input: SendNotificationInput; step: any }) => {
await step.run({ name: "send-email" }, async () => {
return await email({
userId: input.userId,
companyId: input.companyId,
organizationId: input.organizationId,
emailAction: "custom_notification",
recipients: input.recipients,
recipientName: "User",
});
});
return { success: true };
}
);
```typescript
**API Request:**
```bash
POST /api/workflow/send-notification
{
"userId": "user-123",
"companyId": "company-456",
"organizationId": "org-789",
"recipients": ["user@example.com"],
"subject": "Important Update",
"message": "Your request has been processed"
}
```typescript
### Bulk Email with Tracking
```typescript
interface SendBulkEmailInput {
userId: string;
companyId: string;
organizationId: string;
emailAction: string;
recipients: Array<{
email: string;
name: string;
customData?: Record<string, any>;
}>;
}
const sendBulkEmail = ow.defineWorkflow(
{ name: "send-bulk-email" },
async ({ input, step }: { input: SendBulkEmailInput; step: any }) => {
const results = await step.run({ name: "send-emails" }, async () => {
const emailResults = [];
for (const recipient of input.recipients) {
try {
const result = await email({
userId: input.userId,
companyId: input.companyId,
organizationId: input.organizationId,
emailAction: input.emailAction,
recipientName: recipient.name,
recipients: [recipient.email],
});
emailResults.push({
email: recipient.email,
status: "sent",
result,
});
} catch (error) {
emailResults.push({
email: recipient.email,
status: "failed",
error: error.message,
});
}
}
return emailResults;
});
await step.run({ name: "log-results" }, async () => {
await db.insert(schema.email_logs).values({
userId: input.userId,
companyId: input.companyId,
emailAction: input.emailAction,
totalRecipients: input.recipients.length,
successful: results.filter(r => r.status === "sent").length,
failed: results.filter(r => r.status === "failed").length,
results: results,
});
});
return { results };
}
);
```typescript
## Database Operations
### User Onboarding Workflow
```typescript
interface OnboardUserInput {
userId: string;
companyId: string;
organizationId: string;
}
const onboardUser = ow.defineWorkflow(
{ name: "onboard-user" },
async ({ input, step }: { input: OnboardUserInput; step: any }) => {
// Step 1: Fetch user
const user = await step.run({ name: "fetch-user" }, async () => {
const result = await db
.select()
.from(schema.users)
.where(eq(schema.users.userId, input.userId))
.limit(1);
return result[0];
});
// Step 2: Create default settings
await step.run({ name: "create-settings" }, async () => {
await db.insert(schema.user_settings).values({
userId: input.userId,
companyId: input.companyId,
theme: "light",
notifications: true,
});
});
// Step 3: Send welcome email
await step.run({ name: "send-welcome-email" }, async () => {
return await email({
userId: input.userId,
companyId: input.companyId,
organizationId: input.organizationId,
emailAction: "welcome_email",
recipientName: user.fullName || user.displayName,
recipients: [user.email],
});
});
// Step 4: Mark onboarding complete
await step.run({ name: "mark-onboarded" }, async () => {
await db
.update(schema.users)
.set({ onboarded: true, onboardedAt: new Date() })
.where(eq(schema.users.userId, input.userId));
});
return { user, onboarded: true };
}
);
```typescript
### Data Synchronization
```typescript
interface SyncDataInput {
userId: string;
companyId: string;
source: string;
target: string;
}
const syncData = ow.defineWorkflow(
{ name: "sync-data" },
async ({ input, step }: { input: SyncDataInput; step: any }) => {
// Step 1: Fetch source data
const sourceData = await step.run({ name: "fetch-source" }, async () => {
// Fetch from source system
const response = await fetch(`https://api.source.com/data`, {
headers: { Authorization: `Bearer ${token}` },
});
return await response.json();
});
// Step 2: Transform data
const transformedData = await step.run({ name: "transform" }, async () => {
return sourceData.map(item => ({
id: item.externalId,
name: item.title,
status: item.state,
// ... transform fields
}));
});
// Step 3: Upsert to target
await step.run({ name: "upsert-target" }, async () => {
for (const item of transformedData) {
await db
.insert(schema.items)
.values(item)
.onConflictDoUpdate({
target: schema.items.id,
set: {
name: item.name,
status: item.status,
updatedAt: new Date(),
},
});
}
});
// Step 4: Log sync
await step.run({ name: "log-sync" }, async () => {
await db.insert(schema.sync_logs).values({
userId: input.userId,
companyId: input.companyId,
source: input.source,
target: input.target,
recordsProcessed: transformedData.length,
syncedAt: new Date(),
});
});
return {
recordsProcessed: transformedData.length,
syncedAt: new Date(),
};
}
);
```typescript
## API Integration
### External API Call with Retry
```typescript
interface CallExternalAPIInput {
endpoint: string;
method: string;
body?: Record<string, any>;
retries?: number;
}
const callExternalAPI = ow.defineWorkflow(
{ name: "call-external-api" },
async ({ input, step }: { input: CallExternalAPIInput; step: any }) => {
const result = await step.run(
{ name: "api-call" },
async () => {
const maxRetries = input.retries || 3;
let lastError;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const response = await fetch(input.endpoint, {
method: input.method,
headers: {
"Content-Type": "application/json",
},
body: input.body ? JSON.stringify(input.body) : undefined,
});
if (!response.ok) {
throw new Error(`API returned ${response.status}`);
}
return await response.json();
} catch (error) {
lastError = error;
if (attempt < maxRetries) {
// Exponential backoff
await new Promise(resolve =>
setTimeout(resolve, Math.pow(2, attempt) * 1000)
);
}
}
}
throw lastError;
}
);
return { result };
}
);
```typescript
## Multi-Step Processing
### Document Processing Pipeline
```typescript
interface ProcessDocumentInput {
documentId: string;
userId: string;
companyId: string;
}
const processDocument = ow.defineWorkflow(
{ name: "process-document" },
async ({ input, step }: { input: ProcessDocumentInput; step: any }) => {
// Step 1: Fetch document
const document = await step.run({ name: "fetch-document" }, async () => {
const result = await db
.select()
.from(schema.documents)
.where(eq(schema.documents.id, input.documentId))
.limit(1);
return result[0];
});
// Step 2: Validate document
await step.run({ name: "validate-document" }, async () => {
if (!document.fileUrl) {
throw new Error("Document file URL is missing");
}
if (!document.type) {
throw new Error("Document type is required");
}
});
// Step 3: Process based on type
let processedData;
if (document.type === "invoice") {
processedData = await step.run(
{ name: "process-invoice" },
async () => {
// Invoice-specific processing
return { type: "invoice", data: {} };
}
);
} else if (document.type === "receipt") {
processedData = await step.run(
{ name: "process-receipt" },
async () => {
// Receipt-specific processing
return { type: "receipt", data: {} };
}
);
}
// Step 4: Store processed data
await step.run({ name: "store-processed-data" }, async () => {
await db
.update(schema.documents)
.set({
processed: true,
processedData: processedData,
processedAt: new Date(),
})
.where(eq(schema.documents.id, input.documentId));
});
// Step 5: Notify user
await step.run({ name: "notify-user" }, async () => {
return await email({
userId: input.userId,
companyId: input.companyId,
emailAction: "document_processed",
recipients: [document.userEmail],
});
});
return { document, processedData };
}
);
```typescript
## Error Handling
### Workflow with Comprehensive Error Handling
```typescript
interface ProcessOrderInput {
orderId: string;
userId: string;
}
const processOrder = ow.defineWorkflow(
{ name: "process-order" },
async ({ input, step }: { input: ProcessOrderInput; step: any }) => {
let order;
let paymentResult;
let shippingResult;
try {
// Step 1: Fetch order
order = await step.run({ name: "fetch-order" }, async () => {
const result = await db
.select()
.from(schema.orders)
.where(eq(schema.orders.id, input.orderId))
.limit(1);
if (!result[0]) {
throw new Error(`Order ${input.orderId} not found`);
}
return result[0];
});
// Step 2: Process payment
paymentResult = await step.run({ name: "process-payment" }, async () => {
const response = await fetch("https://payment.api/charge", {
method: "POST",
body: JSON.stringify({
amount: order.total,
orderId: order.id,
}),
});
if (!response.ok) {
throw new Error("Payment processing failed");
}
return await response.json();
});
// Step 3: Update order status
await step.run({ name: "update-order-status" }, async () => {
await db
.update(schema.orders)
.set({
status: "paid",
paymentId: paymentResult.paymentId,
paidAt: new Date(),
})
.where(eq(schema.orders.id, input.orderId));
});
// Step 4: Create shipping label
shippingResult = await step.run(
{ name: "create-shipping-label" },
async () => {
const response = await fetch("https://shipping.api/label", {
method: "POST",
body: JSON.stringify({
orderId: order.id,
address: order.shippingAddress,
}),
});
if (!response.ok) {
throw new Error("Shipping label creation failed");
}
return await response.json();
}
);
// Step 5: Update order with shipping info
await step.run({ name: "update-shipping-info" }, async () => {
await db
.update(schema.orders)
.set({
shippingLabelId: shippingResult.labelId,
trackingNumber: shippingResult.trackingNumber,
status: "shipped",
shippedAt: new Date(),
})
.where(eq(schema.orders.id, input.orderId));
});
return {
success: true,
order,
payment: paymentResult,
shipping: shippingResult,
};
} catch (error) {
// Log error
await step.run({ name: "log-error" }, async () => {
await db.insert(schema.workflow_errors).values({
workflowName: "process-order",
orderId: input.orderId,
error: error.message,
errorStack: error.stack,
occurredAt: new Date(),
});
});
// Update order status to failed
if (order) {
await step.run({ name: "mark-order-failed" }, async () => {
await db
.update(schema.orders)
.set({
status: "failed",
errorMessage: error.message,
})
.where(eq(schema.orders.id, input.orderId));
});
}
// Re-throw to mark workflow as failed
throw error;
}
}
);
```typescript
## Best Practices Demonstrated
These examples demonstrate:
1. **Clear Step Names**: Descriptive names like `fetch-order`, `process-payment`
2. **Idempotent Operations**: Database updates use `onConflictDoUpdate`
3. **Error Handling**: Try-catch blocks with proper error logging
4. **Data Flow**: Each step uses data from previous steps
5. **Separation of Concerns**: Each step has a single responsibility
6. **Return Values**: Useful data returned for monitoring/debugging
## Testing Examples
### Testing a Workflow Step
```typescript
describe("onboardUser workflow", () => {
it("should create user settings", async () => {
const mockUser = { userId: "123", email: "test@example.com" };
// Mock database
const mockDb = {
select: jest.fn().mockReturnThis(),
from: jest.fn().mockReturnThis(),
where: jest.fn().mockReturnThis(),
limit: jest.fn().mockResolvedValue([mockUser]),
};
// Test the step logic
const result = await fetchUser(mockUser.userId, mockDb);
expect(result).toEqual(mockUser);
});
});
```typescript
## Additional Resources
- [Creating Workflows Guide](./creating-workflows.md)
- [API Usage Documentation](./api-usage.md)
- [Architecture Overview](./architecture.md)