XYLEX Group
DevelopmentWorkflows

Workflow Examples

Workflow Examples

This document provides practical examples of workflows for common use cases.

Table of Contents

Email Workflows

Simple Email Notification

interface SendNotificationInput {
  userId: string;
  companyId: string;
  organizationId: string;
  recipients: string[];
  subject: string;
  message: string;
}

const sendNotification = ow.defineWorkflow(
  { name: "send-notification" },
  async ({ input, step }: { input: SendNotificationInput; step: any }) => {
    await step.run({ name: "send-email" }, async () => {
      return await email({
        userId: input.userId,
        companyId: input.companyId,
        organizationId: input.organizationId,
        emailAction: "custom_notification",
        recipients: input.recipients,
        recipientName: "User",
      });
    });

    return { success: true };
  }
);
```typescript

**API Request:**

```bash
POST /api/workflow/send-notification
{
  "userId": "user-123",
  "companyId": "company-456",
  "organizationId": "org-789",
  "recipients": ["user@example.com"],
  "subject": "Important Update",
  "message": "Your request has been processed"
}
```typescript

### Bulk Email with Tracking

```typescript
interface SendBulkEmailInput {
  userId: string;
  companyId: string;
  organizationId: string;
  emailAction: string;
  recipients: Array<{
    email: string;
    name: string;
    customData?: Record<string, any>;
  }>;
}

const sendBulkEmail = ow.defineWorkflow(
  { name: "send-bulk-email" },
  async ({ input, step }: { input: SendBulkEmailInput; step: any }) => {
    const results = await step.run({ name: "send-emails" }, async () => {
      const emailResults = [];
      
      for (const recipient of input.recipients) {
        try {
          const result = await email({
            userId: input.userId,
            companyId: input.companyId,
            organizationId: input.organizationId,
            emailAction: input.emailAction,
            recipientName: recipient.name,
            recipients: [recipient.email],
          });
          emailResults.push({
            email: recipient.email,
            status: "sent",
            result,
          });
        } catch (error) {
          emailResults.push({
            email: recipient.email,
            status: "failed",
            error: error.message,
          });
        }
      }
      
      return emailResults;
    });

    await step.run({ name: "log-results" }, async () => {
      await db.insert(schema.email_logs).values({
        userId: input.userId,
        companyId: input.companyId,
        emailAction: input.emailAction,
        totalRecipients: input.recipients.length,
        successful: results.filter(r => r.status === "sent").length,
        failed: results.filter(r => r.status === "failed").length,
        results: results,
      });
    });

    return { results };
  }
);
```typescript

## Database Operations

### User Onboarding Workflow

```typescript
interface OnboardUserInput {
  userId: string;
  companyId: string;
  organizationId: string;
}

const onboardUser = ow.defineWorkflow(
  { name: "onboard-user" },
  async ({ input, step }: { input: OnboardUserInput; step: any }) => {
    // Step 1: Fetch user
    const user = await step.run({ name: "fetch-user" }, async () => {
      const result = await db
        .select()
        .from(schema.users)
        .where(eq(schema.users.userId, input.userId))
        .limit(1);
      return result[0];
    });

    // Step 2: Create default settings
    await step.run({ name: "create-settings" }, async () => {
      await db.insert(schema.user_settings).values({
        userId: input.userId,
        companyId: input.companyId,
        theme: "light",
        notifications: true,
      });
    });

    // Step 3: Send welcome email
    await step.run({ name: "send-welcome-email" }, async () => {
      return await email({
        userId: input.userId,
        companyId: input.companyId,
        organizationId: input.organizationId,
        emailAction: "welcome_email",
        recipientName: user.fullName || user.displayName,
        recipients: [user.email],
      });
    });

    // Step 4: Mark onboarding complete
    await step.run({ name: "mark-onboarded" }, async () => {
      await db
        .update(schema.users)
        .set({ onboarded: true, onboardedAt: new Date() })
        .where(eq(schema.users.userId, input.userId));
    });

    return { user, onboarded: true };
  }
);
```typescript

### Data Synchronization

```typescript
interface SyncDataInput {
  userId: string;
  companyId: string;
  source: string;
  target: string;
}

const syncData = ow.defineWorkflow(
  { name: "sync-data" },
  async ({ input, step }: { input: SyncDataInput; step: any }) => {
    // Step 1: Fetch source data
    const sourceData = await step.run({ name: "fetch-source" }, async () => {
      // Fetch from source system
      const response = await fetch(`https://api.source.com/data`, {
        headers: { Authorization: `Bearer ${token}` },
      });
      return await response.json();
    });

    // Step 2: Transform data
    const transformedData = await step.run({ name: "transform" }, async () => {
      return sourceData.map(item => ({
        id: item.externalId,
        name: item.title,
        status: item.state,
        // ... transform fields
      }));
    });

    // Step 3: Upsert to target
    await step.run({ name: "upsert-target" }, async () => {
      for (const item of transformedData) {
        await db
          .insert(schema.items)
          .values(item)
          .onConflictDoUpdate({
            target: schema.items.id,
            set: {
              name: item.name,
              status: item.status,
              updatedAt: new Date(),
            },
          });
      }
    });

    // Step 4: Log sync
    await step.run({ name: "log-sync" }, async () => {
      await db.insert(schema.sync_logs).values({
        userId: input.userId,
        companyId: input.companyId,
        source: input.source,
        target: input.target,
        recordsProcessed: transformedData.length,
        syncedAt: new Date(),
      });
    });

    return {
      recordsProcessed: transformedData.length,
      syncedAt: new Date(),
    };
  }
);
```typescript

## API Integration

### External API Call with Retry

```typescript
interface CallExternalAPIInput {
  endpoint: string;
  method: string;
  body?: Record<string, any>;
  retries?: number;
}

const callExternalAPI = ow.defineWorkflow(
  { name: "call-external-api" },
  async ({ input, step }: { input: CallExternalAPIInput; step: any }) => {
    const result = await step.run(
      { name: "api-call" },
      async () => {
        const maxRetries = input.retries || 3;
        let lastError;

        for (let attempt = 1; attempt <= maxRetries; attempt++) {
          try {
            const response = await fetch(input.endpoint, {
              method: input.method,
              headers: {
                "Content-Type": "application/json",
              },
              body: input.body ? JSON.stringify(input.body) : undefined,
            });

            if (!response.ok) {
              throw new Error(`API returned ${response.status}`);
            }

            return await response.json();
          } catch (error) {
            lastError = error;
            if (attempt < maxRetries) {
              // Exponential backoff
              await new Promise(resolve =>
                setTimeout(resolve, Math.pow(2, attempt) * 1000)
              );
            }
          }
        }

        throw lastError;
      }
    );

    return { result };
  }
);
```typescript

## Multi-Step Processing

### Document Processing Pipeline

```typescript
interface ProcessDocumentInput {
  documentId: string;
  userId: string;
  companyId: string;
}

const processDocument = ow.defineWorkflow(
  { name: "process-document" },
  async ({ input, step }: { input: ProcessDocumentInput; step: any }) => {
    // Step 1: Fetch document
    const document = await step.run({ name: "fetch-document" }, async () => {
      const result = await db
        .select()
        .from(schema.documents)
        .where(eq(schema.documents.id, input.documentId))
        .limit(1);
      return result[0];
    });

    // Step 2: Validate document
    await step.run({ name: "validate-document" }, async () => {
      if (!document.fileUrl) {
        throw new Error("Document file URL is missing");
      }
      if (!document.type) {
        throw new Error("Document type is required");
      }
    });

    // Step 3: Process based on type
    let processedData;
    if (document.type === "invoice") {
      processedData = await step.run(
        { name: "process-invoice" },
        async () => {
          // Invoice-specific processing
          return { type: "invoice", data: {} };
        }
      );
    } else if (document.type === "receipt") {
      processedData = await step.run(
        { name: "process-receipt" },
        async () => {
          // Receipt-specific processing
          return { type: "receipt", data: {} };
        }
      );
    }

    // Step 4: Store processed data
    await step.run({ name: "store-processed-data" }, async () => {
      await db
        .update(schema.documents)
        .set({
          processed: true,
          processedData: processedData,
          processedAt: new Date(),
        })
        .where(eq(schema.documents.id, input.documentId));
    });

    // Step 5: Notify user
    await step.run({ name: "notify-user" }, async () => {
      return await email({
        userId: input.userId,
        companyId: input.companyId,
        emailAction: "document_processed",
        recipients: [document.userEmail],
      });
    });

    return { document, processedData };
  }
);
```typescript

## Error Handling

### Workflow with Comprehensive Error Handling

```typescript
interface ProcessOrderInput {
  orderId: string;
  userId: string;
}

const processOrder = ow.defineWorkflow(
  { name: "process-order" },
  async ({ input, step }: { input: ProcessOrderInput; step: any }) => {
    let order;
    let paymentResult;
    let shippingResult;

    try {
      // Step 1: Fetch order
      order = await step.run({ name: "fetch-order" }, async () => {
        const result = await db
          .select()
          .from(schema.orders)
          .where(eq(schema.orders.id, input.orderId))
          .limit(1);
        
        if (!result[0]) {
          throw new Error(`Order ${input.orderId} not found`);
        }
        
        return result[0];
      });

      // Step 2: Process payment
      paymentResult = await step.run({ name: "process-payment" }, async () => {
        const response = await fetch("https://payment.api/charge", {
          method: "POST",
          body: JSON.stringify({
            amount: order.total,
            orderId: order.id,
          }),
        });

        if (!response.ok) {
          throw new Error("Payment processing failed");
        }

        return await response.json();
      });

      // Step 3: Update order status
      await step.run({ name: "update-order-status" }, async () => {
        await db
          .update(schema.orders)
          .set({
            status: "paid",
            paymentId: paymentResult.paymentId,
            paidAt: new Date(),
          })
          .where(eq(schema.orders.id, input.orderId));
      });

      // Step 4: Create shipping label
      shippingResult = await step.run(
        { name: "create-shipping-label" },
        async () => {
          const response = await fetch("https://shipping.api/label", {
            method: "POST",
            body: JSON.stringify({
              orderId: order.id,
              address: order.shippingAddress,
            }),
          });

          if (!response.ok) {
            throw new Error("Shipping label creation failed");
          }

          return await response.json();
        }
      );

      // Step 5: Update order with shipping info
      await step.run({ name: "update-shipping-info" }, async () => {
        await db
          .update(schema.orders)
          .set({
            shippingLabelId: shippingResult.labelId,
            trackingNumber: shippingResult.trackingNumber,
            status: "shipped",
            shippedAt: new Date(),
          })
          .where(eq(schema.orders.id, input.orderId));
      });

      return {
        success: true,
        order,
        payment: paymentResult,
        shipping: shippingResult,
      };
    } catch (error) {
      // Log error
      await step.run({ name: "log-error" }, async () => {
        await db.insert(schema.workflow_errors).values({
          workflowName: "process-order",
          orderId: input.orderId,
          error: error.message,
          errorStack: error.stack,
          occurredAt: new Date(),
        });
      });

      // Update order status to failed
      if (order) {
        await step.run({ name: "mark-order-failed" }, async () => {
          await db
            .update(schema.orders)
            .set({
              status: "failed",
              errorMessage: error.message,
            })
            .where(eq(schema.orders.id, input.orderId));
        });
      }

      // Re-throw to mark workflow as failed
      throw error;
    }
  }
);
```typescript

## Best Practices Demonstrated

These examples demonstrate:

1. **Clear Step Names**: Descriptive names like `fetch-order`, `process-payment`
2. **Idempotent Operations**: Database updates use `onConflictDoUpdate`
3. **Error Handling**: Try-catch blocks with proper error logging
4. **Data Flow**: Each step uses data from previous steps
5. **Separation of Concerns**: Each step has a single responsibility
6. **Return Values**: Useful data returned for monitoring/debugging

## Testing Examples

### Testing a Workflow Step

```typescript
describe("onboardUser workflow", () => {
  it("should create user settings", async () => {
    const mockUser = { userId: "123", email: "test@example.com" };
    
    // Mock database
    const mockDb = {
      select: jest.fn().mockReturnThis(),
      from: jest.fn().mockReturnThis(),
      where: jest.fn().mockReturnThis(),
      limit: jest.fn().mockResolvedValue([mockUser]),
    };

    // Test the step logic
    const result = await fetchUser(mockUser.userId, mockDb);
    expect(result).toEqual(mockUser);
  });
});
```typescript

## Additional Resources

- [Creating Workflows Guide](./creating-workflows.md)
- [API Usage Documentation](./api-usage.md)
- [Architecture Overview](./architecture.md)