AIDA / migrate_to_4096.py
destinyebuka's picture
Deploy Lojiz Platform with Aida AI backend
79ef7e1
# migrate_to_4096.py (with proper field indexing for filtering)
import os
import asyncio
from uuid import uuid4
from typing import List
import httpx
from qdrant_client import AsyncQdrantClient, models
from motor.motor_asyncio import AsyncIOMotorClient
# ------------------------------------------------------------------
# CONFIGURATION FROM ENV VARS (recommended) OR HARD-CODED (for dev)
# ------------------------------------------------------------------
QDRANT_URL = os.getenv("QDRANT_URL", "https://b96fe9df-a305-449a-9d55-8e858bfa1b82.us-east-1-1.aws.cloud.qdrant.io:6333")
QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.nftA6VmjSsSQHr3zkkt2wlqFgFY9uLM6gesqi6b6Cis")
OPENROUTER_KEY = os.getenv("OPENROUTER_API_KEY", "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")
MONGODB_URL = os.getenv("MONGODB_URL", "mongodb+srv://dmldestiny7_db_user:o7Yd2aGtsIxvJXK6@lojizcluster.sypgxgp.mongodb.net/?retryWrites=true&w=majority")
DB_NAME = "lojiz"
COL_NAME = "listings"
EMBED_MODEL = "qwen/qwen3-embedding-8b"
VECTOR_SIZE = 4096
BATCH_SIZE = 64
# ------------------------------------------------------------------
# Async clients (60s timeout for embedding API)
# ------------------------------------------------------------------
qdrant = AsyncQdrantClient(
url=QDRANT_URL,
api_key=QDRANT_API_KEY,
https=True,
timeout=60
)
mongo = AsyncIOMotorClient(MONGODB_URL)[DB_NAME][COL_NAME]
http = httpx.AsyncClient(timeout=60)
# ------------------------------------------------------------------
# Embedding helper
# ------------------------------------------------------------------
async def embed(text: str) -> List[float]:
"""Embed text using OpenRouter (Qwen embedding model)."""
payload = {
"model": EMBED_MODEL,
"input": text,
"encoding_format": "float"
}
headers = {
"Authorization": f"Bearer {OPENROUTER_KEY}",
"Content-Type": "application/json",
"HTTP-Referer": "",
"X-Title": "",
}
r = await http.post("https://openrouter.ai/api/v1/embeddings", json=payload, headers=headers)
r.raise_for_status()
return r.json()["data"][0]["embedding"]
# ------------------------------------------------------------------
# Main migration function
# ------------------------------------------------------------------
async def rebuild():
"""Delete old collection, create new 4096-D collection with indexed fields, and migrate all documents."""
try:
print("🗑️ Deleting old listings collection…")
await qdrant.delete_collection("listings")
print("✓ Old collection deleted")
except Exception as e:
print(f"⚠️ Could not delete collection (may not exist): {e}")
print("📦 Creating new 4096-D listings collection with indexed fields…")
await qdrant.create_collection(
"listings",
vectors_config=models.VectorParams(size=VECTOR_SIZE, distance=models.Distance.COSINE),
)
print("✓ Collection created with 4096-D vectors")
# Create payload index for filtering fields
print("📇 Creating payload indexes for filtering…")
await qdrant.create_payload_index(
collection_name="listings",
field_name="location_lower",
field_schema=models.PayloadSchemaType.KEYWORD,
)
await qdrant.create_payload_index(
collection_name="listings",
field_name="price",
field_schema=models.PayloadSchemaType.INTEGER,
)
await qdrant.create_payload_index(
collection_name="listings",
field_name="bedrooms",
field_schema=models.PayloadSchemaType.INTEGER,
)
await qdrant.create_payload_index(
collection_name="listings",
field_name="bathrooms",
field_schema=models.PayloadSchemaType.INTEGER,
)
await qdrant.create_payload_index(
collection_name="listings",
field_name="price_type_lower",
field_schema=models.PayloadSchemaType.KEYWORD,
)
await qdrant.create_payload_index(
collection_name="listings",
field_name="listing_type_lower",
field_schema=models.PayloadSchemaType.KEYWORD,
)
await qdrant.create_payload_index(
collection_name="listings",
field_name="amenities",
field_schema=models.PayloadSchemaType.KEYWORD,
)
print("✓ Payload indexes created")
total = await mongo.count_documents({})
print(f"📊 MongoDB contains {total} listings. Starting migration…\n")
if total == 0:
print("⚠️ No listings found in MongoDB. Migration complete.")
await http.aclose()
return
cursor = mongo.find({})
batch = []
count = 0
async for doc in cursor:
# Build text for embedding
bedrooms = doc.get("bedrooms", "")
location = doc.get("location", "")
description = doc.get("description", "")
text = f"{bedrooms}-bed {location} {description}".strip()
# Generate embedding
try:
vector = await embed(text)
except Exception as e:
print(f"❌ Failed to embed document {doc.get('_id')}: {e}")
continue
# Build payload with lowercase versions for case-insensitive filtering
price_type = doc.get("price_type") or ""
listing_type = doc.get("type") or "" # from listing_type field in mongo
payload = {
"_id": str(doc["_id"]),
"title": doc.get("title", ""),
"description": description,
"location": location,
"location_lower": location.lower() if location else "",
"price": doc.get("price") or 0,
"price_type": price_type,
"price_type_lower": price_type.lower() if price_type else "",
"listing_type": listing_type,
"listing_type_lower": listing_type.lower() if listing_type else "",
"bedrooms": doc.get("bedrooms") or 0,
"bathrooms": doc.get("bathrooms") or 0,
"amenities": [a.lower() for a in doc.get("amenities", [])], # Store lowercase
"currency": doc.get("currency", "XOF"),
}
batch.append(models.PointStruct(id=str(uuid4()), vector=vector, payload=payload))
# Upload batch when it reaches BATCH_SIZE
if len(batch) >= BATCH_SIZE:
qdrant.upload_points("listings", batch) # NO await - blocking is OK here
count += len(batch)
print(f"✓ Uploaded {count}/{total} listings")
batch.clear()
# Upload remaining batch
if batch:
qdrant.upload_points("listings", batch) # NO await - blocking is OK here
count += len(batch)
print(f"✓ Uploaded {count}/{total} listings")
print(f"\n✅ Migration complete! All {count} listings indexed with proper fields.")
print("📋 All payload indexes created successfully!")
await http.aclose()
# ------------------------------------------------------------------
# Main entry point
# ------------------------------------------------------------------
if __name__ == "__main__":
asyncio.run(rebuild())