Skip to main content

Embeddings & Vector Search

Quick reference for implementing embeddings and semantic search with pgvector in your backend.

How do I set up vector columns?

Add embedding fields to your models:

# In src/models/db_models.py
from pgvector.sqlalchemy import Vector
from sqlmodel import Column, Field
from typing import Any

class DocumentBase(ModelWithIdAndTimestamps):
title: str = Field(max_length=200)
content: str = Field(max_length=10000)

class Document(DocumentBase, table=True):
# Vector column for embeddings (1024 dimensions)
embedding: Any = Field(
default=None,
sa_column=Column(Vector(1024)),
description="Document embedding vector"
)

# Optional: store embedding model info
embedding_model: str = Field(default="mistral-embed", max_length=50)

user_id: UUID = Field(foreign_key="user.id", index=True)
user: User = Relationship(back_populates="documents")

# Optional: add document's metadata...

# API models
class DocumentWithSimilarityDTO(DocumentDTO):
"""Document with similarity score for search results"""
similarity_score: float | None = None
cosine_distance: float | None = None

How do I generate embeddings?

Create embeddings using Mistral AI:

# In src/services/embedding_service.py
from typing import List
from services.mistral_service import MistralServiceDep
from models.db_models import Document
from sqlmodel import select
from services.database_service import DatabaseDep

class EmbeddingService:
def __init__(self, mistral_service: MistralServiceDep):
self.mistral = mistral_service

async def generate_text_embedding(self, text: str) -> List[float]:
"""Generate embedding for text using Mistral"""
try:
# Use Mistral's embedding endpoint
response = await self.mistral.text_embedding(text)
return response["data"][0]["embedding"]
except Exception as e:
raise ValueError(f"Failed to generate embedding: {e}")

async def generate_document_embedding(self, document: Document) -> List[float]:
"""Generate embedding for a document (title + content)"""
# Combine title and content for better embeddings
text = f"{document.title}\n\n{document.content}"
return await self.generate_text_embedding(text)

async def update_document_embedding(
self,
document_id: UUID,
db: DatabaseDep
) -> Document:
"""Update embedding for an existing document"""
# Get document
result = await db.exec(select(Document).where(Document.id == document_id))
document = result.first()
if not document:
raise ValueError(f"Document {document_id} not found")

# Generate new embedding
embedding = await self.generate_document_embedding(document)

# Update document
document.embedding = embedding
document.embedding_model = "mistral-embed"

await db.commit()
await db.refresh(document)

return document

# Dependency for injection in FastAPI
async def get_embedding_service(
mistral: MistralServiceDep
) -> EmbeddingService:
return EmbeddingService(mistral)

EmbeddingServiceDep = Annotated[EmbeddingService, Depends(get_embedding_service)]

How do I create documents with embeddings?

Automatically generate embeddings when creating documents:

# In src/routers/document.py
from fastapi import APIRouter, Depends, HTTPException
from typing import List
from services.embedding_service import EmbeddingServiceDep

router = APIRouter(prefix="/documents", tags=["Documents"])

@router.post("/", response_model=DocumentDTO)
async def create_document(
document_data: CreateDocumentDTO,
current_user: User | None = Depends(get_current_user),
db: DatabaseDep,
embedding_service: EmbeddingServiceDep,
) -> DocumentDTO:
"""Create a new document with automatic embedding generation"""
if not current_user:
raise HTTPException(status_code=401, detail="Not authenticated")

# Create document without embedding first
document = Document.model_validate(document_data)
document.user = current_user

try:
# Generate embedding
embedding = await embedding_service.generate_document_embedding(document)
document.embedding = embedding
document.embedding_model = "mistral-embed"

# Save to database
db.add(document)
await db.commit()
await db.refresh(document)

return DocumentDTO.model_validate(document)

except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=f"Failed to create document: {e}")

Search documents using vector similarity:

# In src/routers/document.py
from sqlmodel import select
from typing import List

class DocumentSearchRequest(BaseModel):
query: str = Field(min_length=1, max_length=500)
limit: int = Field(default=10, ge=1, le=50)
min_similarity: float = Field(default=0.1, ge=0.0, le=1.0)

@router.post("/search", response_model=List[DocumentWithSimilarityDTO])
async def semantic_search(
search_request: DocumentSearchRequest,
current_user: User | None = Depends(get_current_user),
db: DatabaseDep,
embedding_service: EmbeddingServiceDep,
) -> List[DocumentWithSimilarityDTO]:
"""Search documents using semantic similarity"""
if not current_user:
raise HTTPException(status_code=401, detail="Not authenticated")

try:
# Generate embedding for search query
query_embedding = await embedding_service.generate_text_embedding(search_request.query)

# Perform vector search using SQLModel's vector operations
result = await db.exec(
select(
Document,
(1 - Document.embedding.cosine_distance(query_embedding)).label("similarity_score"),
Document.embedding.cosine_distance(query_embedding).label("cosine_distance")
)
.where(
Document.user_id == current_user.id,
Document.embedding.is_not(None),
(1 - Document.embedding.cosine_distance(query_embedding)) >= search_request.min_similarity
)
.order_by(Document.embedding.cosine_distance(query_embedding))
.limit(search_request.limit)
)

# Convert results to DTOs
documents = []
for row in result:
document, similarity_score, cosine_distance = row
doc_data = DocumentWithSimilarityDTO(
**document.model_dump(),
similarity_score=float(similarity_score),
cosine_distance=float(cosine_distance)
)
documents.append(doc_data)

return documents

except Exception as e:
raise HTTPException(status_code=500, detail=f"Search failed: {e}")

Combine keyword and semantic search:

class HybridSearchRequest(BaseModel):
query: str = Field(min_length=1, max_length=500)
limit: int = Field(default=10, ge=1, le=50)
semantic_weight: float = Field(default=0.7, ge=0.0, le=1.0, description="Weight for semantic search")
keyword_weight: float = Field(default=0.3, ge=0.0, le=1.0, description="Weight for keyword search")
min_similarity: float = Field(default=0.1, ge=0.0, le=1.0)

@root_validator
def weights_must_sum_to_one(cls, values):
semantic_weight = values.get('semantic_weight', 0.7)
keyword_weight = values.get('keyword_weight', 0.3)
if abs(semantic_weight + keyword_weight - 1.0) > 0.01:
raise ValueError('Semantic and keyword weights must sum to 1.0')
return values

@router.post("/hybrid-search", response_model=List[DocumentWithSimilarityDTO])
async def hybrid_search(
search_request: HybridSearchRequest,
current_user: User | None = Depends(get_current_user),
db: DatabaseDep,
embedding_service: EmbeddingServiceDep,
) -> List[DocumentWithSimilarityDTO]:
"""Hybrid search combining semantic and keyword matching"""
if not current_user:
raise HTTPException(status_code=401, detail="Not authenticated")

# Generate embedding for semantic search
query_embedding = await embedding_service.generate_text_embedding(search_request.query)
embedding_str = '[' + ','.join(map(str, query_embedding)) + ']'

# Prepare keyword search terms (or we can use the pg_trgm extension or the fuzzystrmatch extension from PG)
search_terms = search_request.query.lower().split()
keyword_conditions = []
for term in search_terms:
keyword_conditions.append(f"(LOWER(d.title) LIKE '%{term}%' OR LOWER(d.content) LIKE '%{term}%')")
keyword_match = " OR ".join(keyword_conditions) if keyword_conditions else "FALSE"

sql = text(f"""
WITH semantic_scores AS (
SELECT
d.id,
1 - (d.embedding <=> :query_embedding::vector) as semantic_similarity
FROM document d
WHERE
d.user_id = :user_id
AND d.embedding IS NOT NULL
),
keyword_scores AS (
SELECT
d.id,
CASE WHEN {keyword_match} THEN 1.0 ELSE 0.0 END as keyword_similarity
FROM document d
WHERE d.user_id = :user_id
),
combined_scores AS (
SELECT
d.id,
d.title, d.content, d.created_at, d.updated_at, d.user_id, d.embedding_model,
COALESCE(s.semantic_similarity, 0) * :semantic_weight +
COALESCE(k.keyword_similarity, 0) * :keyword_weight as combined_score,
COALESCE(s.semantic_similarity, 0) as semantic_score,
COALESCE(k.keyword_similarity, 0) as keyword_score
FROM document d
LEFT JOIN semantic_scores s ON d.id = s.id
LEFT JOIN keyword_scores k ON d.id = k.id
WHERE d.user_id = :user_id
)
SELECT * FROM combined_scores
WHERE combined_score >= :min_score
ORDER BY combined_score DESC
LIMIT :limit
""")

result = await db.exec(sql, {
"query_embedding": embedding_str,
"user_id": str(current_user.id),
"semantic_weight": search_request.semantic_weight,
"keyword_weight": search_request.keyword_weight,
"min_score": search_request.min_similarity,
"limit": search_request.limit
})

documents = []
for row in result:
doc_data = {
"id": row.id,
"title": row.title,
"content": row.content,
"created_at": row.created_at,
"updated_at": row.updated_at,
"user_id": row.user_id,
"embedding_model": row.embedding_model or "unknown",
"similarity_score": float(row.combined_score),
"cosine_distance": None # Not applicable for hybrid search
}
documents.append(DocumentWithSimilarityDTO(**doc_data))

return documents

How do I optimize vector search performance?

Database optimization and indexing:

# In an Alembic migration
op.execute("""
CREATE INDEX CONCURRENTLY idx_embeddings_hnsw
ON document
USING hnsw (embedding vector_cosine_ops)
""")