researchpilot-api / src /embeddings /embedding_model.py
Subhadip007's picture
feat: vector database indexing complete
daafb32
"""
BGE embedding model wrapper for ResearchPilot.
RESPONSIBILITIES:
1. Load and cache the BGE-base-en-v1.5 model
2. Embed document chunks (no prefix)
3. Embed user queries (with BGE instruction prefix)
4. Handle batching for large-scale embedding
WHY A WRAPPER CLASS instead of calling SentenceTransformer directly:
If we decide to swap BGE for a better model tomorrow, we change
ONE file. Nothing else in the codebase changes. This is called
the FACADE PATTERN - hide implementation behind a stable interface
"""
import logging
# Suppress noisy sentence-transformers logs
logging.getLogger("sentence-transformers").setLevel(logging.ERROR)
logging.getLogger("huggingface_hub").setLevel(logging.ERROR)
import numpy as np
from typing import Union
from src.utils.logger import get_logger
from config.settings import EMBEDDING_MODEL_NAME, EMBEDDING_BATCH_SIZE, EMBEDDING_DIMENSION
logger = get_logger(__name__)
class EmbeddingModel:
"""
Wrapper around BGE-base-en-v1.5 for document and query embedding.
Usage:
model = EmbeddingModel()
# Embed chunks (documents)
chunk_vectors = model.embed_documents(["chunk text 1", "chunk text 2"])
# Embed a user query
query_vector = model.embed_query("what is attention mechanism?")
"""
# BGE introduction prefix for queries
# This is specified in the official BGE model card
QUERY_PREFIX = "Represent this sentence for searching relevant passages: "
def __init__(self, model_name: str = EMBEDDING_MODEL_NAME):
self.model_name = model_name
self._model = None # Lazy loaded
logger.info(f"EmbeddingModel wrapper created for: {model_name}")
@property
def model(self):
"""Lazy-load model on first use."""
if self._model is None:
from sentence_transformers import SentenceTransformer
logger.info(f"Loading embedding model: {self.model_name}")
self._model = SentenceTransformer(self.model_name)
logger.info(
f"Model loaded. "
f"Embedding dimension: {self._model.get_sentence_embedding_dimension()}"
)
return self._model
def embed_documents(
self,
texts: list[str],
batch_size: int = EMBEDDING_BATCH_SIZE,
show_progress: bool = True,
) -> np.ndarray:
"""
Embed a list of document chunks.
NO prefix applied - BGE embeds documents as-is.
Args:
texts: List of chunk texts to embed
batch_size: How many chunks to process at once
show_progress: Show tqdm progress bar
Returns:
numpy array of shape (len(texts), 768)
Each row is the embedding for one chunk.
BATCHING EXPLAINED:
We cannot embed all 15,664 chunks at once - that would
require ~15,664 * 768 * 4 bytes = ~48MB just for the
output array, plus the model's working memory.
Processing in batches of 32-64 keeps memory stable
while still being fast (model processes the batch
as a single matrix multiplication).
"""
if not texts:
return np.array([])
logger.info(f"Embedding {len(texts)} documents in batches of {batch_size}")
embeddings = self.model.encode(
texts,
batch_size = batch_size,
show_progress_bar = show_progress,
normalize_embeddings = True, # L2 normalize -> cosine sim = dot product
convert_to_numpy = True,
)
logger.info(f"Embedding complete. Shape: {embeddings.shape}")
return embeddings
def embed_query(self, query: str) -> np.ndarray:
"""
Embed a single user query WITH the BGE instruction prefix.
Args:
query: Raw user question
Returns:
numpy array of shape (768,)
WHY SINGLE QUERY (not batch):
At query time, we receive one question at a time.
Batching makes no sense here - we want the answer fast.
"""
# Apply BGE's instruction prefix for retrieval queries
prefixed_query = self.QUERY_PREFIX + query
embedding = self.model.encode(
prefixed_query,
normalize_embeddings = True,
convert_to_numpy = True,
show_progress_bar = False,
)
return embedding
def embed_batch(
self,
texts: list[str],
batch_size: int = EMBEDDING_BATCH_SIZE,
) -> np.ndarray:
"""
Embed texts in batches, yielding one batch at a time.
WHY A GENERATOR:
For 15,664 chunks, we don't want to hold ALL embeddings
in memory while also saving them. This generator yields
one batch at a time - we save each batch, then free memory.
Usage:
for batch_embeddings, batch_texts in model.embed_batch(texts):
save(batch_embeddings)
"""
for i in range(0, len(texts), batch_size):
batch = texts[i : i + batch_size]
embeddings = self.model.encode(
batch,
normalize_embeddings = True,
convert_to_numpy = True,
show_progress_bar = False,
)
yield embeddings, batch