XYLEX Group
DevelopmentWorkflows

Worker Setup and Configuration

Worker Setup and Configuration

Overview

The workflow worker is a long-running process that executes workflow steps. It polls the database for pending workflows and executes them with configurable concurrency.

File Location

workflows/
└── worker.ts

Starting the Worker

Development

# From the root directory
tsx workflows/worker.ts
```typescript

### Production

```bash
# Using Node.js
node workflows/worker.ts

# Using PM2
pm2 start workflows/worker.ts --name workflow-worker

# Using Docker
docker run -d \
  -e DATABASE_URL=postgresql://... \
  node workflows/worker.ts
```typescript

## Configuration

### Concurrency

The worker is configured with a concurrency limit:

```typescript
const worker = ow.newWorker({ concurrency: 20 });
```typescript

This means:

- Up to 20 workflows can execute simultaneously
- Each workflow runs steps sequentially
- Steps from different workflows run in parallel

### Adjusting Concurrency

Modify the concurrency value based on your needs:

```typescript
// Low concurrency (fewer resources)
const worker = ow.newWorker({ concurrency: 5 });

// High concurrency (more throughput)
const worker = ow.newWorker({ concurrency: 50 });
```typescript

**Considerations:**

- Database connection limits
- External API rate limits
- Server CPU/memory resources
- Workflow execution time

## Environment Variables

### Required

- `DATABASE_URL`: PostgreSQL connection string

  ```typescript
  DATABASE_URL=postgresql://user:password@host:5432/database
  ```typescript

### Optional

- `LOG_LEVEL`: Logging level (default: info)
- `WORKER_ID`: Unique identifier for this worker instance

## Graceful Shutdown

The worker handles graceful shutdown on:

- `SIGTERM`: Termination signal (e.g., from process manager)
- `SIGINT`: Interrupt signal (e.g., Ctrl+C)

On shutdown:

1. Worker stops accepting new workflows
2. Currently running workflows complete
3. Worker disconnects from database
4. Process exits

```typescript
process.on("SIGTERM", async () => {
  console.log("SIGTERM received, shutting down worker...");
  await worker.stop();
  process.exit(0);
});

process.on("SIGINT", async () => {
  console.log("SIGINT received, shutting down worker...");
  await worker.stop();
  process.exit(0);
});
```typescript

## Monitoring

### Worker Health

Monitor worker health by checking:

1. **Process Status**: Is the worker process running?
2. **Database Connections**: Are connections active?
3. **Workflow Execution**: Are workflows being processed?

### Logs

The worker logs:

- Workflow starts/completions
- Step executions
- Errors and failures
- Shutdown events

Example log output:

```typescript
Workflow started: send-email-received-ein (run-abc123)
Step completed: send-email (run-abc123)
Workflow completed: send-email-received-ein (run-abc123)
```typescript

### Database Monitoring

Query workflow execution stats:

```sql
-- Active workflows
SELECT COUNT(*) 
FROM openworkflow.workflow_runs 
WHERE status = 'running';

-- Failed workflows (last hour)
SELECT COUNT(*) 
FROM openworkflow.workflow_runs 
WHERE status = 'failed' 
  AND created_at > NOW() - INTERVAL '1 hour';

-- Average execution time
SELECT 
  workflow_name,
  AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) as avg_seconds
FROM openworkflow.workflow_runs
WHERE status = 'completed'
GROUP BY workflow_name;
```typescript

## Scaling

### Horizontal Scaling

Run multiple worker instances:

```bash
# Worker instance 1
WORKER_ID=worker-1 tsx workflows/worker.ts

# Worker instance 2
WORKER_ID=worker-2 tsx workflows/worker.ts

# Worker instance 3
WORKER_ID=worker-3 tsx workflows/worker.ts
```typescript

Each worker:

- Polls independently
- Picks up available workflows
- No coordination needed (database handles locking)

### Vertical Scaling

Increase concurrency on a single worker:

```typescript
const worker = ow.newWorker({ concurrency: 50 });
```typescript

**Trade-offs:**

- More concurrent workflows
- Higher resource usage
- Potential database connection limits

## Deployment

### Docker

```typescript
FROM node:20-alpine

WORKDIR /app

COPY package*.json ./
RUN npm install

COPY workflows/ ./workflows/
COPY drizzle/ ./drizzle/
COPY config.ts ./

CMD ["node", "workflows/worker.js"]
```typescript

```bash
docker build -t workflow-worker .
docker run -d \
  --name workflow-worker \
  -e DATABASE_URL=postgresql://... \
  workflow-worker
```typescript

### Kubernetes

```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: workflow-worker
spec:
  replicas: 3
  selector:
    matchLabels:
      app: workflow-worker
  template:
    metadata:
      labels:
        app: workflow-worker
    spec:
      containers:
      - name: worker
        image: workflow-worker:latest
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: db-secret
              key: url
        resources:
          requests:
            memory: "256Mi"
            cpu: "100m"
          limits:
            memory: "512Mi"
            cpu: "500m"
```typescript

### Systemd Service

```typescript
[Unit]
Description=Workflow Worker
After=network.target

[Service]
Type=simple
User=www-data
WorkingDirectory=/var/www/suitsbooks
Environment="DATABASE_URL=postgresql://..."
ExecStart=/usr/bin/node workflows/worker.js
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target
```typescript

```bash
sudo systemctl enable workflow-worker
sudo systemctl start workflow-worker
sudo systemctl status workflow-worker
```typescript

## Troubleshooting

### Worker Not Processing Workflows

1. **Check Worker Status**

   ```bash
   ps aux | grep worker
   ```typescript

2. **Check Database Connection**

   ```sql
   SELECT * FROM pg_stat_activity WHERE application_name LIKE '%worker%';
   ```typescript

3. **Check Pending Workflows**

   ```sql
   SELECT * FROM openworkflow.workflow_runs WHERE status = 'pending';
   ```typescript

4. **Check Logs**

   ```bash
   tail -f worker.log
   ```typescript

### High Memory Usage

- Reduce concurrency
- Check for memory leaks in workflow code
- Monitor step execution times

### Database Connection Errors

- Check `DATABASE_URL` is correct
- Verify database is accessible
- Check connection pool limits
- Monitor active connections

### Workflows Stuck in "Running"

1. Check if worker is running
2. Check worker logs for errors
3. Manually cancel stuck workflows:

   ```sql
   UPDATE openworkflow.workflow_runs
   SET status = 'cancelled'
   WHERE status = 'running' 
     AND started_at < NOW() - INTERVAL '1 hour';
   ```typescript

## Best Practices

1. **Monitor Worker Health**: Set up health checks and alerts
2. **Log Aggregation**: Use centralized logging (e.g., ELK, Datadog)
3. **Resource Limits**: Set appropriate CPU/memory limits
4. **Graceful Shutdown**: Always handle shutdown signals
5. **Error Handling**: Log errors with context
6. **Scaling**: Start with low concurrency, increase gradually
7. **Backup**: Ensure database backups include workflow state

## Performance Tuning

### Database Indexes

Ensure indexes exist for workflow queries:

```sql
-- Check existing indexes
SELECT * FROM pg_indexes 
WHERE schemaname = 'openworkflow';
```typescript

### Connection Pooling

Use connection pooling for database connections:

```typescript
import { Pool } from 'pg';

const pool = new Pool({
  max: 20, // Maximum connections
  idleTimeoutMillis: 30000,
  connectionTimeoutMillis: 2000,
});
```typescript

### Worker Tuning

Adjust based on workload:

- **CPU-bound workflows**: Lower concurrency
- **I/O-bound workflows**: Higher concurrency
- **Mixed workloads**: Medium concurrency with monitoring

## Security

### Database Access

- Use read-only user for monitoring queries
- Limit worker database permissions
- Use SSL for database connections

### Environment Variables

- Never commit secrets to version control
- Use secret management (e.g., AWS Secrets Manager, HashiCorp Vault)
- Rotate credentials regularly

## Maintenance

### Regular Tasks

1. **Monitor Workflow Success Rates**: Track completion/failure rates
2. **Review Logs**: Check for errors or warnings
3. **Database Cleanup**: Archive old workflow runs (optional)
4. **Update Dependencies**: Keep OpenWorkflow updated

### Database Cleanup

Archive old workflow runs:

```sql
-- Archive completed workflows older than 30 days
CREATE TABLE openworkflow.workflow_runs_archive 
  AS TABLE openworkflow.workflow_runs WITH NO DATA;

INSERT INTO openworkflow.workflow_runs_archive
SELECT * FROM openworkflow.workflow_runs
WHERE status = 'completed' 
  AND finished_at < NOW() - INTERVAL '30 days';

DELETE FROM openworkflow.workflow_runs
WHERE status = 'completed' 
  AND finished_at < NOW() - INTERVAL '30 days';
```typescript