DevelopmentWorkflows
Worker Setup and Configuration
Worker Setup and Configuration
Overview
The workflow worker is a long-running process that executes workflow steps. It polls the database for pending workflows and executes them with configurable concurrency.
File Location
workflows/
└── worker.ts
Starting the Worker
Development
# From the root directory
tsx workflows/worker.ts
```typescript
### Production
```bash
# Using Node.js
node workflows/worker.ts
# Using PM2
pm2 start workflows/worker.ts --name workflow-worker
# Using Docker
docker run -d \
-e DATABASE_URL=postgresql://... \
node workflows/worker.ts
```typescript
## Configuration
### Concurrency
The worker is configured with a concurrency limit:
```typescript
const worker = ow.newWorker({ concurrency: 20 });
```typescript
This means:
- Up to 20 workflows can execute simultaneously
- Each workflow runs steps sequentially
- Steps from different workflows run in parallel
### Adjusting Concurrency
Modify the concurrency value based on your needs:
```typescript
// Low concurrency (fewer resources)
const worker = ow.newWorker({ concurrency: 5 });
// High concurrency (more throughput)
const worker = ow.newWorker({ concurrency: 50 });
```typescript
**Considerations:**
- Database connection limits
- External API rate limits
- Server CPU/memory resources
- Workflow execution time
## Environment Variables
### Required
- `DATABASE_URL`: PostgreSQL connection string
```typescript
DATABASE_URL=postgresql://user:password@host:5432/database
```typescript
### Optional
- `LOG_LEVEL`: Logging level (default: info)
- `WORKER_ID`: Unique identifier for this worker instance
## Graceful Shutdown
The worker handles graceful shutdown on:
- `SIGTERM`: Termination signal (e.g., from process manager)
- `SIGINT`: Interrupt signal (e.g., Ctrl+C)
On shutdown:
1. Worker stops accepting new workflows
2. Currently running workflows complete
3. Worker disconnects from database
4. Process exits
```typescript
process.on("SIGTERM", async () => {
console.log("SIGTERM received, shutting down worker...");
await worker.stop();
process.exit(0);
});
process.on("SIGINT", async () => {
console.log("SIGINT received, shutting down worker...");
await worker.stop();
process.exit(0);
});
```typescript
## Monitoring
### Worker Health
Monitor worker health by checking:
1. **Process Status**: Is the worker process running?
2. **Database Connections**: Are connections active?
3. **Workflow Execution**: Are workflows being processed?
### Logs
The worker logs:
- Workflow starts/completions
- Step executions
- Errors and failures
- Shutdown events
Example log output:
```typescript
Workflow started: send-email-received-ein (run-abc123)
Step completed: send-email (run-abc123)
Workflow completed: send-email-received-ein (run-abc123)
```typescript
### Database Monitoring
Query workflow execution stats:
```sql
-- Active workflows
SELECT COUNT(*)
FROM openworkflow.workflow_runs
WHERE status = 'running';
-- Failed workflows (last hour)
SELECT COUNT(*)
FROM openworkflow.workflow_runs
WHERE status = 'failed'
AND created_at > NOW() - INTERVAL '1 hour';
-- Average execution time
SELECT
workflow_name,
AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) as avg_seconds
FROM openworkflow.workflow_runs
WHERE status = 'completed'
GROUP BY workflow_name;
```typescript
## Scaling
### Horizontal Scaling
Run multiple worker instances:
```bash
# Worker instance 1
WORKER_ID=worker-1 tsx workflows/worker.ts
# Worker instance 2
WORKER_ID=worker-2 tsx workflows/worker.ts
# Worker instance 3
WORKER_ID=worker-3 tsx workflows/worker.ts
```typescript
Each worker:
- Polls independently
- Picks up available workflows
- No coordination needed (database handles locking)
### Vertical Scaling
Increase concurrency on a single worker:
```typescript
const worker = ow.newWorker({ concurrency: 50 });
```typescript
**Trade-offs:**
- More concurrent workflows
- Higher resource usage
- Potential database connection limits
## Deployment
### Docker
```typescript
FROM node:20-alpine
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY workflows/ ./workflows/
COPY drizzle/ ./drizzle/
COPY config.ts ./
CMD ["node", "workflows/worker.js"]
```typescript
```bash
docker build -t workflow-worker .
docker run -d \
--name workflow-worker \
-e DATABASE_URL=postgresql://... \
workflow-worker
```typescript
### Kubernetes
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: workflow-worker
spec:
replicas: 3
selector:
matchLabels:
app: workflow-worker
template:
metadata:
labels:
app: workflow-worker
spec:
containers:
- name: worker
image: workflow-worker:latest
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secret
key: url
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
```typescript
### Systemd Service
```typescript
[Unit]
Description=Workflow Worker
After=network.target
[Service]
Type=simple
User=www-data
WorkingDirectory=/var/www/suitsbooks
Environment="DATABASE_URL=postgresql://..."
ExecStart=/usr/bin/node workflows/worker.js
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
```typescript
```bash
sudo systemctl enable workflow-worker
sudo systemctl start workflow-worker
sudo systemctl status workflow-worker
```typescript
## Troubleshooting
### Worker Not Processing Workflows
1. **Check Worker Status**
```bash
ps aux | grep worker
```typescript
2. **Check Database Connection**
```sql
SELECT * FROM pg_stat_activity WHERE application_name LIKE '%worker%';
```typescript
3. **Check Pending Workflows**
```sql
SELECT * FROM openworkflow.workflow_runs WHERE status = 'pending';
```typescript
4. **Check Logs**
```bash
tail -f worker.log
```typescript
### High Memory Usage
- Reduce concurrency
- Check for memory leaks in workflow code
- Monitor step execution times
### Database Connection Errors
- Check `DATABASE_URL` is correct
- Verify database is accessible
- Check connection pool limits
- Monitor active connections
### Workflows Stuck in "Running"
1. Check if worker is running
2. Check worker logs for errors
3. Manually cancel stuck workflows:
```sql
UPDATE openworkflow.workflow_runs
SET status = 'cancelled'
WHERE status = 'running'
AND started_at < NOW() - INTERVAL '1 hour';
```typescript
## Best Practices
1. **Monitor Worker Health**: Set up health checks and alerts
2. **Log Aggregation**: Use centralized logging (e.g., ELK, Datadog)
3. **Resource Limits**: Set appropriate CPU/memory limits
4. **Graceful Shutdown**: Always handle shutdown signals
5. **Error Handling**: Log errors with context
6. **Scaling**: Start with low concurrency, increase gradually
7. **Backup**: Ensure database backups include workflow state
## Performance Tuning
### Database Indexes
Ensure indexes exist for workflow queries:
```sql
-- Check existing indexes
SELECT * FROM pg_indexes
WHERE schemaname = 'openworkflow';
```typescript
### Connection Pooling
Use connection pooling for database connections:
```typescript
import { Pool } from 'pg';
const pool = new Pool({
max: 20, // Maximum connections
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000,
});
```typescript
### Worker Tuning
Adjust based on workload:
- **CPU-bound workflows**: Lower concurrency
- **I/O-bound workflows**: Higher concurrency
- **Mixed workloads**: Medium concurrency with monitoring
## Security
### Database Access
- Use read-only user for monitoring queries
- Limit worker database permissions
- Use SSL for database connections
### Environment Variables
- Never commit secrets to version control
- Use secret management (e.g., AWS Secrets Manager, HashiCorp Vault)
- Rotate credentials regularly
## Maintenance
### Regular Tasks
1. **Monitor Workflow Success Rates**: Track completion/failure rates
2. **Review Logs**: Check for errors or warnings
3. **Database Cleanup**: Archive old workflow runs (optional)
4. **Update Dependencies**: Keep OpenWorkflow updated
### Database Cleanup
Archive old workflow runs:
```sql
-- Archive completed workflows older than 30 days
CREATE TABLE openworkflow.workflow_runs_archive
AS TABLE openworkflow.workflow_runs WITH NO DATA;
INSERT INTO openworkflow.workflow_runs_archive
SELECT * FROM openworkflow.workflow_runs
WHERE status = 'completed'
AND finished_at < NOW() - INTERVAL '30 days';
DELETE FROM openworkflow.workflow_runs
WHERE status = 'completed'
AND finished_at < NOW() - INTERVAL '30 days';
```typescript