Abraxas Workflows
Quick reference for creating and managing Abraxas workflows in your backend.
tip
For comprehensive documentation, see the Abraxas' documentation
How do I create a simple workflow?
Basic workflow structure:
# src/workflow/my_workflow.py
import abraxas
from pydantic import BaseModel
from typing import List
# 1. Define input/output models
class WorkflowInput(BaseModel):
user_id: str
task_count: int
class WorkflowResult(BaseModel):
message: str
created_tasks: List[str]
# 2. Create activities (the actual work)
@abraxas.activity()
async def create_tasks_activity(input: WorkflowInput) -> WorkflowResult:
"""Activities do the actual work - call APIs, process data, etc."""
task_ids = []
for i in range(input.task_count):
# Your business logic here
task_id = f"task_{input.user_id}_{i}"
task_ids.append(task_id)
return WorkflowResult(
message=f"Created {len(task_ids)} tasks",
created_tasks=task_ids
)
# 3. Define the workflow (orchestrates activities)
@abraxas.workflow.define(workflow_name="create_user_tasks")
class CreateUserTasksWorkflow:
@abraxas.workflow.entrypoint
async def run(self, input: WorkflowInput) -> WorkflowResult:
"""Entry point - coordinates activities"""
return await create_tasks_activity(input)
# 4. Register your workflow for your worker(s) in src/workflow/workers.py
ALL_WORKFLOWS = [CreateUserTasksWorkflow]
How do I start a workflow from my API?
Add workflow endpoints to your router:
# src/models/api_models.py
from pydantic import BaseModel
class StartWorkflowRequestDTO(BaseModel):
task_count: int = 5
class WorkflowResponseDTO(BaseModel):
execution_id: str
status: str
# src/routers/workflow.py
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
import abraxas
import time
from models.db_models import User
from utils.auth import get_current_user
from workflow.my_workflow import WorkflowInput
from services.abraxas_service import AbraxasDep
router = APIRouter(prefix="/workflows", tags=["Workflows"])
# src/models/api_models.py
@router.post("/create-tasks", response_model=WorkflowResponseDTO)
async def start_create_tasks_workflow(
request: StartWorkflowRequestDTO,
abraxas: AbraxasDep,
current_user: User | None = Depends(get_current_user),
):
"""Start a workflow to create tasks for the user"""
if not current_user:
raise HTTPException(status_code=401, detail="Not authenticated")
# Create workflow input
workflow_input = WorkflowInput(
user_id=str(current_user.id),
task_count=request.task_count
)
# Start the workflow and don't wait for its completion (fire and forget)
execution_id = await abraxas.start_workflow(
workflow="create_user_tasks",
input=workflow_input,
)
return WorkflowResponseDTO(
execution_id=execution_id,
status="started"
)
How do I create a multi-step workflow?
Chain multiple activities:
# src/workflow/complex_workflow.py
import abraxas
from pydantic import BaseModel
from typing import List, Dict
class ProcessingInput(BaseModel):
data: List[str]
class ValidationResult(BaseModel):
valid_items: List[str]
invalid_items: List[str]
class ProcessingResult(BaseModel):
processed_count: int
results: Dict[str, str]
class FinalResult(BaseModel):
success: bool
summary: str
details: ProcessingResult
# Step 1: Validate data
@abraxas.activity()
async def validate_data_activity(input: ProcessingInput) -> ValidationResult:
valid_items = []
invalid_items = []
for item in input.data:
if len(item) > 3: # Simple validation
valid_items.append(item)
else:
invalid_items.append(item)
return ValidationResult(
valid_items=valid_items,
invalid_items=invalid_items
)
# Step 2: Process valid data
@abraxas.activity()
async def process_data_activity(valid_items: List[str]) -> ProcessingResult:
results = {}
for item in valid_items:
# Simulate processing
results[item] = f"processed_{item}"
return ProcessingResult(
processed_count=len(results),
results=results
)
# Step 3: Generate report
@abraxas.activity()
async def generate_report_activity(
validation: ValidationResult,
processing: ProcessingResult
) -> FinalResult:
success = processing.processed_count > 0
summary = f"Processed {processing.processed_count} items, {len(validation.invalid_items)} failed validation"
return FinalResult(
success=success,
summary=summary,
details=processing
)
@abraxas.workflow.define(workflow_name="data_processing_pipeline")
class DataProcessingWorkflow:
@abraxas.workflow.entrypoint
async def run(self, input: ProcessingInput) -> FinalResult:
# Step 1: Validate
validation_result = await validate_data_activity(input)
# Step 2: Process valid items
processing_result = await process_data_activity(validation_result.valid_items)
# Step 3: Generate report
final_result = await generate_report_activity(validation_result, processing_result)
return final_result
How do I query workflow execution status?
Get workflow execution details:
# In some API router:
@router.get("/status/{execution_id}")
async def get_workflow_history(
execution_id: str,
abraxas: AbraxasDep,
):
try:
status = await abraxas.get_workflow_execution(execution_id)
return {
"status": status
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching status: {e}")
Common Workflow Patterns
Fan-out/Fan-in Pattern
@abraxas.workflow.define(workflow_name="parallel_processing")
class ParallelProcessingWorkflow:
@abraxas.workflow.entrypoint
async def run(self, input: list[str]) -> dict:
# Start multiple activities in parallel
tasks = [process_item_activity(item) for item in input]
results = await asyncio.gather(*tasks)
return {"processed": len(results), "results": results}
Troubleshooting
Workflow not starting?
- Check if Abraxas worker is running
- Verify workflow is properly registered (in
src/workflow/workflows.py
) - Check input validation