Spaces:
Sleeping
Sleeping
| import os | |
| import pickle | |
| from typing import List | |
| import faiss | |
| import numpy as np | |
| from app.services.neon_index_store import NeonIndexStore | |
| class LocalVectorStore: | |
| def __init__( | |
| self, | |
| base_dir: str, | |
| rag_index_db_url: str = "", | |
| neon_max_retries: int = 5, | |
| neon_retry_backoff_sec: float = 1.0, | |
| neon_connect_timeout_sec: int = 10, | |
| ) -> None: | |
| self.base_dir = base_dir | |
| self.cloud_store = ( | |
| NeonIndexStore( | |
| rag_index_db_url, | |
| max_retries=neon_max_retries, | |
| base_backoff_sec=neon_retry_backoff_sec, | |
| connect_timeout_sec=neon_connect_timeout_sec, | |
| ) | |
| if rag_index_db_url | |
| else None | |
| ) | |
| self._hydration_error_logged_semesters: set[int] = set() | |
| self._setup_error_logged = False | |
| os.makedirs(self.base_dir, exist_ok=True) | |
| if self.cloud_store: | |
| try: | |
| self.cloud_store.ensure_table() | |
| except Exception as exc: | |
| if not self._setup_error_logged: | |
| print(f"Neon index table setup skipped: {exc}") | |
| self._setup_error_logged = True | |
| def upsert_documents( | |
| self, | |
| semester: int, | |
| course_code: str, | |
| chunks: List[str], | |
| embeddings: List[List[float]], | |
| ) -> None: | |
| self._ensure_local_semester_data(semester) | |
| records_path = self._semester_records_path(semester) | |
| records = self._load_records(records_path) | |
| records = [r for r in records if r.get("course_code") != course_code] | |
| for chunk, vector in zip(chunks, embeddings): | |
| records.append( | |
| { | |
| "course_code": course_code, | |
| "chunk": chunk, | |
| "embedding": vector, | |
| } | |
| ) | |
| self._save_records(records_path, records) | |
| self._rebuild_faiss_index(semester, records) | |
| self._sync_semester_to_cloud(semester) | |
| def search(self, semester: int, query_embedding: List[float], top_k: int = 6) -> List[dict]: | |
| self._ensure_local_semester_data(semester) | |
| records_path = self._semester_records_path(semester) | |
| index_path = self._semester_index_path(semester) | |
| records = self._load_records(records_path) | |
| if not records: | |
| return [] | |
| if not os.path.exists(index_path): | |
| self._rebuild_faiss_index(semester, records) | |
| if not os.path.exists(index_path): | |
| return [] | |
| index = faiss.read_index(index_path) | |
| q = np.array(query_embedding, dtype=np.float32).reshape(1, -1) | |
| faiss.normalize_L2(q) | |
| k = min(top_k, len(records)) | |
| _, indices = index.search(q, k) | |
| hits = [] | |
| for idx in indices[0].tolist(): | |
| if idx == -1: | |
| continue | |
| if 0 <= idx < len(records): | |
| record = records[idx] | |
| hits.append( | |
| { | |
| "course_code": record.get("course_code", ""), | |
| "chunk": record.get("chunk", ""), | |
| } | |
| ) | |
| return hits | |
| def _semester_records_path(self, semester: int) -> str: | |
| return os.path.join(self.base_dir, f"semester_{semester}.pkl") | |
| def _semester_index_path(self, semester: int) -> str: | |
| return os.path.join(self.base_dir, f"semester_{semester}.faiss") | |
| def _rebuild_faiss_index(self, semester: int, records: List[dict]) -> None: | |
| index_path = self._semester_index_path(semester) | |
| if not records: | |
| if os.path.exists(index_path): | |
| os.remove(index_path) | |
| return | |
| vectors = np.array([r["embedding"] for r in records], dtype=np.float32) | |
| if vectors.ndim != 2 or vectors.shape[0] == 0: | |
| return | |
| faiss.normalize_L2(vectors) | |
| dim = vectors.shape[1] | |
| index = faiss.IndexFlatIP(dim) | |
| index.add(vectors) | |
| faiss.write_index(index, index_path) | |
| def _ensure_local_semester_data(self, semester: int) -> None: | |
| records_path = self._semester_records_path(semester) | |
| index_path = self._semester_index_path(semester) | |
| if os.path.exists(records_path) and os.path.exists(index_path): | |
| return | |
| if not self.cloud_store: | |
| return | |
| try: | |
| payload = self.cloud_store.load_semester_files(semester) | |
| if not payload: | |
| return | |
| faiss_bytes, records_bytes = payload | |
| with open(index_path, "wb") as f: | |
| f.write(faiss_bytes) | |
| with open(records_path, "wb") as f: | |
| f.write(records_bytes) | |
| self._hydration_error_logged_semesters.discard(semester) | |
| except Exception as exc: | |
| if semester not in self._hydration_error_logged_semesters: | |
| print(f"Neon index hydration failed for semester {semester}: {exc}") | |
| self._hydration_error_logged_semesters.add(semester) | |
| def _sync_semester_to_cloud(self, semester: int) -> None: | |
| if not self.cloud_store: | |
| return | |
| records_path = self._semester_records_path(semester) | |
| index_path = self._semester_index_path(semester) | |
| if not (os.path.exists(records_path) and os.path.exists(index_path)): | |
| return | |
| try: | |
| with open(index_path, "rb") as f: | |
| faiss_bytes = f.read() | |
| with open(records_path, "rb") as f: | |
| records_bytes = f.read() | |
| self.cloud_store.save_semester_files(semester, faiss_bytes, records_bytes) | |
| except Exception as exc: | |
| print(f"Neon index sync failed for semester {semester}: {exc}") | |
| def _load_records(path: str) -> List[dict]: | |
| if not os.path.exists(path): | |
| return [] | |
| with open(path, "rb") as f: | |
| return pickle.load(f) | |
| def _save_records(path: str, data: List[dict]) -> None: | |
| with open(path, "wb") as f: | |
| pickle.dump(data, f) | |