Skip to main content

Abraxas Workflows

Quick reference for creating and managing Abraxas workflows in your backend.

What is Abraxas?

Abraxas is Mistral's workflow orchestration engine (similar to Temporal). Key concepts:

  • Workflows: Define the sequence of steps (orchestration logic)
  • Activities: Do the actual work (API calls, DB operations, etc.)
  • Workers: Execute workflows and activities in the background

A few things to note:

  • Workflows can only run for up to 2 seconds (not counting activity execution time)
  • Activities from a single Workflow can be executed on different Workers
  • The maximum size of serialized input/output for an Activity is 2MB
  • Unlike FastAPI dependencies, Abraxas dependencies are cached

For comprehensive documentation, see the Abraxas' documentation

How do I create a simple workflow?

Activities vs Workflows

Activities should:

  • Contain business logic (API calls, DB operations)
  • Be idempotent (safe to retry)
  • Return serializable data (Pydantic models)

Workflows should:

  • Only orchestrate (no business logic)
  • Handle retries and error flows
  • Coordinate multiple activities

Never put database calls directly in workflows - use activities!

Basic workflow structure:

# src/workflow/my_workflow.py
import abraxas
from pydantic import BaseModel
from typing import List

# 1. Define input/output models
class WorkflowInput(BaseModel):
user_id: str
task_count: int

class WorkflowResult(BaseModel):
message: str
created_tasks: List[str]

# 2. Create activities (the actual work)
@abraxas.activity()
async def create_tasks_activity(input: WorkflowInput) -> WorkflowResult:
"""Activities do the actual work - call APIs, process data, etc."""
task_ids = []
for i in range(input.task_count):
# Your business logic here
task_id = f"task_{input.user_id}_{i}"
task_ids.append(task_id)

return WorkflowResult(
message=f"Created {len(task_ids)} tasks",
created_tasks=task_ids
)

# 3. Define the workflow (orchestrates activities)
@abraxas.workflow.define(workflow_name="create_user_tasks")
class CreateUserTasksWorkflow:
@abraxas.workflow.entrypoint
async def run(self, input: WorkflowInput) -> WorkflowResult:
"""Entry point - coordinates activities"""
return await create_tasks_activity(input)

# 4. Register your workflow for your worker(s) in src/workflow/workers.py
ALL_WORKFLOWS = [CreateUserTasksWorkflow]

How do I start a workflow from my API?

Async vs Sync Execution

Fire-and-forget (async):

  • Returns immediately with execution ID
  • Good for long-running tasks
  • Client polls for status if needed

Wait for result (sync):

  • Blocks until workflow completes
  • Good for short tasks
  • Returns actual result

Choose based on expected duration and UX needs.

Add workflow endpoints to your router:

# src/models/api_models.py
from pydantic import BaseModel

class StartWorkflowRequestDTO(BaseModel):
task_count: int = 5

class WorkflowResponseDTO(BaseModel):
execution_id: str
status: str

# src/routers/workflow.py
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
import abraxas
import time

from common.models.db_models import User
from utils.auth import get_current_user
from workflow.my_workflow import WorkflowInput
from common.services.abraxas_service import AbraxasDep

router = APIRouter(prefix="/workflows", tags=["Workflows"])

# src/models/api_models.py
@router.post("/create-tasks", response_model=WorkflowResponseDTO)
async def start_create_tasks_workflow(
request: StartWorkflowRequestDTO,
abraxas: AbraxasDep,
current_user: User | None = Depends(get_current_user),
):
"""Start a workflow to create tasks for the user"""
if not current_user:
raise HTTPException(status_code=401, detail="Not authenticated")

# Create workflow input
workflow_input = WorkflowInput(
user_id=str(current_user.id),
task_count=request.task_count
)

# Start the workflow and don't wait for its completion (fire and forget)
execution_id = await abraxas.start_workflow(
workflow="create_user_tasks",
input=workflow_input,
)

return WorkflowResponseDTO(
execution_id=execution_id,
status="started"
)

How do I create a multi-step workflow?

Error Handling in Workflows

Abraxas automatically retries failed activities. You can customize:

  • Retry policy: Number of attempts, backoff strategy
  • Timeouts: Activity and workflow timeouts

Activities should be idempotent to handle retries safely!

Chain multiple activities:

# src/workflow/complex_workflow.py
import abraxas
from pydantic import BaseModel
from typing import List, Dict

class ProcessingInput(BaseModel):
data: List[str]

class ValidationResult(BaseModel):
valid_items: List[str]
invalid_items: List[str]

class ProcessingResult(BaseModel):
processed_count: int
results: Dict[str, str]

class FinalResult(BaseModel):
success: bool
summary: str
details: ProcessingResult

# Step 1: Validate data
@abraxas.activity()
async def validate_data_activity(input: ProcessingInput) -> ValidationResult:
valid_items = []
invalid_items = []

for item in input.data:
if len(item) > 3: # Simple validation
valid_items.append(item)
else:
invalid_items.append(item)

return ValidationResult(
valid_items=valid_items,
invalid_items=invalid_items
)

# Step 2: Process valid data
@abraxas.activity()
async def process_data_activity(valid_items: List[str]) -> ProcessingResult:
results = {}
for item in valid_items:
# Simulate processing
results[item] = f"processed_{item}"

return ProcessingResult(
processed_count=len(results),
results=results
)

# Step 3: Generate report
@abraxas.activity()
async def generate_report_activity(
validation: ValidationResult,
processing: ProcessingResult
) -> FinalResult:
success = processing.processed_count > 0
summary = f"Processed {processing.processed_count} items, {len(validation.invalid_items)} failed validation"

return FinalResult(
success=success,
summary=summary,
details=processing
)

@abraxas.workflow.define(workflow_name="data_processing_pipeline")
class DataProcessingWorkflow:
@abraxas.workflow.entrypoint
async def run(self, input: ProcessingInput) -> FinalResult:
# Step 1: Validate
validation_result = await validate_data_activity(input)

# Step 2: Process valid items
processing_result = await process_data_activity(validation_result.valid_items)

# Step 3: Generate report
final_result = await generate_report_activity(validation_result, processing_result)

return final_result

How do I query workflow execution status?

Get workflow execution details:

# In some API router:
@router.get("/status/{execution_id}")
async def get_workflow_history(
execution_id: str,
abraxas: AbraxasDep,
):
try:
status = await abraxas.get_workflow_execution(execution_id)

return {
"status": status
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching status: {e}")

Common Workflow Patterns

Performance Considerations
  • Parallel execution: Use abraxas.execute_activities_in_parallel() for concurrent activities
  • Large payloads: Store data in S3/DB and pass references

Fan-out/Fan-in Pattern

@abraxas.workflow.define(workflow_name="parallel_processing")
class ParallelProcessingWorkflow:
@abraxas.workflow.entrypoint
async def run(self, input: list[str]) -> dict:
# Start multiple activities in parallel
tasks = [process_item_activity(item) for item in input]
results = await abraxas.execute_activities_in_parallel(*tasks)

return {"processed": len(results), "results": results}

Troubleshooting

Common Pitfalls
  1. Not registering workflows: Add to ALL_WORKFLOWS in workers.py
  2. Non-serializable data: Use Pydantic models
  3. Database in workflows: Always use activities for DB operations

Workflow not starting?

  1. Check if Abraxas worker is running
  2. Verify workflow is properly registered (in src/workflow/workflows.py)
  3. Check input validation