Skip to main content

Database Operations

This guide covers common database operations, patterns, and best practices when working with SQLModel and Alembic.

tip

For comprehensive documentation, see the SQLModel and SQLAlchemy documentation.

Adding Relationships

One-to-Many Relationship

Example: Tasks belong to Categories

# In db_models.py
# ModelWithIdAndTimestamps is a parent class that add an id (UUID), create_at/updated_at/deleted_at (TIMESTAMP)
class CategoryBase(ModelWithIdAndTimestamps):
name: str = Field(max_length=100, unique=True)
color: str = Field(max_length=7, default="#000000")

# A DB table should extend SQLModel and have this `table=True` paramter
class Category(CategoryBase, table=True):
tasks: list[Task] = Relationship(back_populates="category") # OneToMany

class Task(TaskBase, table=True):
user: User = Relationship(back_populates="tasks")
category_id: UUID | None = Field(foreign_key="category.id", default=None, ondelete="CASCADE") # This is the foreign key in DB
category: Category | None = Relationship(back_populates="tasks") # ManyToOne

Database Queries

Important: AsyncSession Behavior

We use AsyncSession for database operations, which means:

  • No lazy loading: You cannot access relationships after the query completes
  • Explicit loading required: Use joinedload() to fetch related data
# ❌ This will fail - relationship not loaded
async with DatabaseService.get_session() as session:
task = await session.exec(select(Task).where(Task.id == task_id))
print(task.user.name) # Error! User not loaded

# ✅ This works - relationship explicitly loaded
async with DatabaseService.get_session() as session:
result = await session.exec(
select(Task)
.where(Task.id == task_id)
.options(joinedload(Task.user))
)
task = result.unique().first()
print(task.user.name) # Works! User was eager loaded

Getting a Database Session

To get an AsyncSession (a DB connection from the pool), you have 2 options:

Use the DatabaseDep dependency - FastAPI handles the session lifecycle automatically:

@router.get("/user/{user_id}", response_model=UserDTO)
async def get_user(user_id: str, db: DatabaseDep):
query = select(User).where(User.id == user_id)
user = (await db.exec(query)).first()
return UserDTO.model_validate(**user)

Option 2: Anywhere Else in Your Application

Use DatabaseService.get_session() with a context manager:

# In Temporal activities
@abraxas.activity(display_name="Step 1: Get user")
async def get_user(params: GetUserParams) -> GetUserResult:
async with DatabaseService.get_session() as session:
user = await session.exec(select(User).where(User.id == params.id))
return GetUserResult(user=user.first())

# In background tasks or services
async def cleanup_old_tasks():
async with DatabaseService.get_session() as session:
# Your database operations here
pass
Why Two Methods?
  • DatabaseDep: Best for FastAPI routes - automatic session management, proper error handling and transaction rollback in any API call
  • DatabaseService.get_session(): Required outside of FastAPI context - you manage the session lifecycle on your own as Abraxas cache the result of its dependencies, we don't want to cache an AsyncSession

Joins and Eager Loading

What is Eager Loading?

Eager loading means fetching related data in the same query using SQL JOINs. This is essential with AsyncSession because:

  1. It prevents the N+1 query problem (1 query for main data + N queries for related data)
  2. AsyncSession doesn't support lazy loading - you must load everything you need upfront
  3. It's much more efficient - 1 query instead of potentially hundreds

Always use .unique() after eager loading to remove duplicates created by JOINs.

# Load tasks with their category and user in a single query
async def get_tasks_with_details() -> list[Task]:
async with DatabaseService.get_session() as session:
result = await session.exec(
select(Task)
.options(
joinedload(Task.category), # Eager load category
joinedload(Task.user) # Eager load user
)
)
return result.unique().all() # .unique() removes duplicates from JOIN

# Example: What happens without eager loading
async def get_task_details_wrong(task_id: UUID):
async with DatabaseService.get_session() as session:
task = await session.exec(select(Task).where(Task.id == task_id))
task = task.first()
# ❌ This will fail - user relationship not loaded!
return f"Task '{task.title}' by {task.user.name}"

# Example: Correct approach with eager loading
async def get_task_details_correct(task_id: UUID):
async with DatabaseService.get_session() as session:
result = await session.exec(
select(Task)
.where(Task.id == task_id)
.options(joinedload(Task.user))
)
task = result.unique().first()
# ✅ This works - user was loaded with the task
return f"Task '{task.title}' by {task.user.name}"

Aggregations and Statistics

SQL Functions in Python

Use SQLAlchemy's func module to access SQL functions like COUNT, SUM, AVG, etc. These run in the database, not in Python, making them very efficient for statistics.

# Count tasks by status
async def get_task_stats(user_id: UUID) -> dict:
async with DatabaseService.get_session() as session:
total_result = await session.exec(
select(func.count(Task.id)).where(Task.user_id == user_id)
)

completed_result = await session.exec(
select(func.count(Task.id)).where(
and_(Task.user_id == user_id, Task.completed == True)
)
)

total = total_result.first() or 0
completed = completed_result.first() or 0

return {
"total": total,
"completed": completed,
"pending": total - completed,
"completion_rate": (completed / total * 100) if total > 0 else 0
}

# Group tasks by category
async def get_tasks_by_category(user_id: UUID) -> dict:
async with DatabaseService.get_session() as session:
result = await session.exec(
select(Category.name, func.count(Task.id).label("task_count"))
.join(Task)
.where(Task.user_id == user_id)
.group_by(Category.id, Category.name)
)

return {row[0]: row[1] for row in result.all()}

Database Transactions

Understanding Transactions

Transactions ensure that multiple database operations succeed or fail together. Key session methods:

  • session.add(): Mark an object to be persisted in the database
  • session.flush(): Send SQL to database but don't commit (useful to get auto-generated IDs)
  • session.commit(): Permanently save all pending changes to database
  • session.rollback(): Undo all pending changes if something goes wrong
  • session.refresh(): Update object with latest data from database (e.g., auto-generated timestamps)

Learn more about session state management in SQLAlchemy docs.

Handling Complex Operations

from sqlmodel import Session
from sqlalchemy.exc import IntegrityError

async def create_task_with_category(
task_data: CreateTaskDTO,
category_name: str,
user_id: UUID
) -> TaskDTO:
async with DatabaseService.get_session() as session:
try:
# Check if category exists
category_result = await session.exec(
select(Category).where(Category.name == category_name)
)
category = category_result.first()

# Create category if it doesn't exist
if not category:
category = Category(name=category_name)
session.add(category)
await session.flush() # Get the ID without committing

# Create task
task = Task.model_validate(task_data)
task.user_id = user_id
task.category_id = category.id

session.add(task) # Attach the task object to the current session
await session.commit() # Save all changes to database
await session.refresh(task) # Get the latest data from DB (including auto-generated fields)

return TaskDTO.model_validate(task)

except Exception:
await session.rollback() # Undo all changes if something went wrong
raise HTTPException(status_code=400, detail="Failed to create task")

Bulk Operations

Performance with Bulk Operations

When dealing with many records:

  • Add all objects to session first, then commit once
  • Use session.flush() if you need IDs before committing
  • Consider batch processing for very large datasets (>1000 items)
# Bulk insert - efficient for multiple records
async def create_multiple_tasks(
tasks_data: list[CreateTaskDTO],
user_id: UUID
) -> list[TaskDTO]:
async with DatabaseService.get_session() as session:
tasks = []
for task_data in tasks_data:
task = Task.model_validate(task_data)
task.user_id = user_id
tasks.append(task)
session.add(task) # Add to session (not saved yet)

await session.commit() # Save all at once (more efficient)

# Refresh all tasks to get auto-generated fields
for task in tasks:
await session.refresh(task)

return [TaskDTO.model_validate(task) for task in tasks]

# Bulk update
async def mark_all_completed(user_id: UUID) -> int:
async with DatabaseService.get_session() as session:
result = await session.exec(
select(Task).where(
and_(Task.user_id == user_id, Task.completed == False)
)
)
tasks = result.all()

for task in tasks:
task.completed = True

await session.commit()
return len(tasks)

Performance Tips

Database Indexing

# Add indexes in your models
class Task(TaskBase, table=True):
user_id: UUID = Field(foreign_key="user.id", index=True) # Already indexed
completed: bool = Field(default=False, index=True) # Add index for filtering
created_at: datetime = Field(index=True) # Add index for sorting

For more complexe indexes (multi-column or other types of indexes than B-Tree), use a migration script with Alembics

Query Optimization

The N+1 Query Problem

This is one of the most common performance killers for beginners:

  • N+1 means: 1 query to get all items + N queries (one per item) to get related data
  • Example: Getting 100 tasks and their users = 101 database queries!
  • Solution: Use joinedload() to get everything in 1 query
# ✅ GOOD: 1 query with JOIN (fast)
async def get_tasks_with_users() -> list[Task]:
async with DatabaseService.get_session() as session:
result = await session.exec(
select(Task).options(joinedload(Task.user))
)
return result.unique().all()

# ❌ BAD: N+1 queries (very slow)
async def get_tasks_with_users_bad() -> list[dict]:
async with DatabaseService.get_session() as session:
result = await session.exec(select(Task)) # Query 1: Get all tasks
tasks = result.all()

# This creates N additional queries!
task_data = []
for task in tasks:
# Query 2, 3, 4... N+1: One query per task!
user_result = await session.exec(select(User).where(User.id == task.user_id))
user = user_result.first()
task_data.append({"task": task, "user": user})

return task_data

Pagination

async def get_paginated_tasks(
user_id: UUID,
page: int = 1,
size: int = 20
) -> dict:
async with DatabaseService.get_session() as session:
offset = (page - 1) * size

# Get total count
count_result = await session.exec(
select(func.count(Task.id)).where(Task.user_id == user_id)
)
total = count_result.first() or 0

# Get paginated results
result = await session.exec(
select(Task)
.where(Task.user_id == user_id)
.order_by(Task.created_at.desc())
.offset(offset)
.limit(size)
)
tasks = result.all()

return {
"tasks": [TaskDTO.model_validate(task) for task in tasks],
"pagination": {
"page": page,
"size": size,
"total": total,
"pages": (total + size - 1) // size
}
}

Troubleshooting

Common Issues

Foreign Key Errors:

# ❌ Wrong - using class name
class Task(TaskBase, table=True):
user_id: UUID = Field(foreign_key="User.id") # Won't work!

# ✅ Correct - using table name (lowercase)
class Task(TaskBase, table=True):
user_id: UUID = Field(foreign_key="user.id") # Works!

Relationship Access Errors:

# ❌ Error: "Could not locate column" or "Lazy loading not available"
task = await session.exec(select(Task))
print(task.user.name) # Fails - relationship not loaded

# ✅ Solution: Always eager load relationships you need
result = await session.exec(
select(Task).options(joinedload(Task.user))
)
task = result.unique().first()
print(task.user.name) # Works!

Query Performance:

# Use EXPLAIN to analyze query performance
query = select(Task).where(Task.user_id == user_id)
print(f"Query: {query}")

# For PostgreSQL, you can check the query plan
result = await db.exec(text(f"EXPLAIN ANALYZE {query}"))