Skip to main content

Abraxas Workflows

Quick reference for creating and managing Abraxas workflows in your backend.

tip

For comprehensive documentation, see the Abraxas' documentation

How do I create a simple workflow?

Basic workflow structure:

# src/workflow/my_workflow.py
import abraxas
from pydantic import BaseModel
from typing import List

# 1. Define input/output models
class WorkflowInput(BaseModel):
user_id: str
task_count: int

class WorkflowResult(BaseModel):
message: str
created_tasks: List[str]

# 2. Create activities (the actual work)
@abraxas.activity()
async def create_tasks_activity(input: WorkflowInput) -> WorkflowResult:
"""Activities do the actual work - call APIs, process data, etc."""
task_ids = []
for i in range(input.task_count):
# Your business logic here
task_id = f"task_{input.user_id}_{i}"
task_ids.append(task_id)

return WorkflowResult(
message=f"Created {len(task_ids)} tasks",
created_tasks=task_ids
)

# 3. Define the workflow (orchestrates activities)
@abraxas.workflow.define(workflow_name="create_user_tasks")
class CreateUserTasksWorkflow:
@abraxas.workflow.entrypoint
async def run(self, input: WorkflowInput) -> WorkflowResult:
"""Entry point - coordinates activities"""
return await create_tasks_activity(input)

# 4. Register your workflow for your worker(s) in src/workflow/workers.py
ALL_WORKFLOWS = [CreateUserTasksWorkflow]

How do I start a workflow from my API?

Add workflow endpoints to your router:

# src/models/api_models.py
from pydantic import BaseModel

class StartWorkflowRequestDTO(BaseModel):
task_count: int = 5

class WorkflowResponseDTO(BaseModel):
execution_id: str
status: str

# src/routers/workflow.py
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
import abraxas
import time

from models.db_models import User
from utils.auth import get_current_user
from workflow.my_workflow import WorkflowInput
from services.abraxas_service import AbraxasDep

router = APIRouter(prefix="/workflows", tags=["Workflows"])

# src/models/api_models.py
@router.post("/create-tasks", response_model=WorkflowResponseDTO)
async def start_create_tasks_workflow(
request: StartWorkflowRequestDTO,
abraxas: AbraxasDep,
current_user: User | None = Depends(get_current_user),
):
"""Start a workflow to create tasks for the user"""
if not current_user:
raise HTTPException(status_code=401, detail="Not authenticated")

# Create workflow input
workflow_input = WorkflowInput(
user_id=str(current_user.id),
task_count=request.task_count
)

# Start the workflow and don't wait for its completion (fire and forget)
execution_id = await abraxas.start_workflow(
workflow="create_user_tasks",
input=workflow_input,
)

return WorkflowResponseDTO(
execution_id=execution_id,
status="started"
)

How do I create a multi-step workflow?

Chain multiple activities:

# src/workflow/complex_workflow.py
import abraxas
from pydantic import BaseModel
from typing import List, Dict

class ProcessingInput(BaseModel):
data: List[str]

class ValidationResult(BaseModel):
valid_items: List[str]
invalid_items: List[str]

class ProcessingResult(BaseModel):
processed_count: int
results: Dict[str, str]

class FinalResult(BaseModel):
success: bool
summary: str
details: ProcessingResult

# Step 1: Validate data
@abraxas.activity()
async def validate_data_activity(input: ProcessingInput) -> ValidationResult:
valid_items = []
invalid_items = []

for item in input.data:
if len(item) > 3: # Simple validation
valid_items.append(item)
else:
invalid_items.append(item)

return ValidationResult(
valid_items=valid_items,
invalid_items=invalid_items
)

# Step 2: Process valid data
@abraxas.activity()
async def process_data_activity(valid_items: List[str]) -> ProcessingResult:
results = {}
for item in valid_items:
# Simulate processing
results[item] = f"processed_{item}"

return ProcessingResult(
processed_count=len(results),
results=results
)

# Step 3: Generate report
@abraxas.activity()
async def generate_report_activity(
validation: ValidationResult,
processing: ProcessingResult
) -> FinalResult:
success = processing.processed_count > 0
summary = f"Processed {processing.processed_count} items, {len(validation.invalid_items)} failed validation"

return FinalResult(
success=success,
summary=summary,
details=processing
)

@abraxas.workflow.define(workflow_name="data_processing_pipeline")
class DataProcessingWorkflow:
@abraxas.workflow.entrypoint
async def run(self, input: ProcessingInput) -> FinalResult:
# Step 1: Validate
validation_result = await validate_data_activity(input)

# Step 2: Process valid items
processing_result = await process_data_activity(validation_result.valid_items)

# Step 3: Generate report
final_result = await generate_report_activity(validation_result, processing_result)

return final_result

How do I query workflow execution status?

Get workflow execution details:

# In some API router:
@router.get("/status/{execution_id}")
async def get_workflow_history(
execution_id: str,
abraxas: AbraxasDep,
):
try:
status = await abraxas.get_workflow_execution(execution_id)

return {
"status": status
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching status: {e}")

Common Workflow Patterns

Fan-out/Fan-in Pattern

@abraxas.workflow.define(workflow_name="parallel_processing")
class ParallelProcessingWorkflow:
@abraxas.workflow.entrypoint
async def run(self, input: list[str]) -> dict:
# Start multiple activities in parallel
tasks = [process_item_activity(item) for item in input]
results = await asyncio.gather(*tasks)

return {"processed": len(results), "results": results}

Troubleshooting

Workflow not starting?

  1. Check if Abraxas worker is running
  2. Verify workflow is properly registered (in src/workflow/workflows.py)
  3. Check input validation