# migrate_to_4096.py (with proper field indexing for filtering) import os import asyncio from uuid import uuid4 from typing import List import httpx from qdrant_client import AsyncQdrantClient, models from motor.motor_asyncio import AsyncIOMotorClient # ------------------------------------------------------------------ # CONFIGURATION FROM ENV VARS (recommended) OR HARD-CODED (for dev) # ------------------------------------------------------------------ QDRANT_URL = os.getenv("QDRANT_URL", "https://b96fe9df-a305-449a-9d55-8e858bfa1b82.us-east-1-1.aws.cloud.qdrant.io:6333") QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.nftA6VmjSsSQHr3zkkt2wlqFgFY9uLM6gesqi6b6Cis") OPENROUTER_KEY = os.getenv("OPENROUTER_API_KEY", "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") MONGODB_URL = os.getenv("MONGODB_URL", "mongodb+srv://dmldestiny7_db_user:o7Yd2aGtsIxvJXK6@lojizcluster.sypgxgp.mongodb.net/?retryWrites=true&w=majority") DB_NAME = "lojiz" COL_NAME = "listings" EMBED_MODEL = "qwen/qwen3-embedding-8b" VECTOR_SIZE = 4096 BATCH_SIZE = 64 # ------------------------------------------------------------------ # Async clients (60s timeout for embedding API) # ------------------------------------------------------------------ qdrant = AsyncQdrantClient( url=QDRANT_URL, api_key=QDRANT_API_KEY, https=True, timeout=60 ) mongo = AsyncIOMotorClient(MONGODB_URL)[DB_NAME][COL_NAME] http = httpx.AsyncClient(timeout=60) # ------------------------------------------------------------------ # Embedding helper # ------------------------------------------------------------------ async def embed(text: str) -> List[float]: """Embed text using OpenRouter (Qwen embedding model).""" payload = { "model": EMBED_MODEL, "input": text, "encoding_format": "float" } headers = { "Authorization": f"Bearer {OPENROUTER_KEY}", "Content-Type": "application/json", "HTTP-Referer": "", "X-Title": "", } r = await http.post("https://openrouter.ai/api/v1/embeddings", json=payload, headers=headers) r.raise_for_status() return r.json()["data"][0]["embedding"] # ------------------------------------------------------------------ # Main migration function # ------------------------------------------------------------------ async def rebuild(): """Delete old collection, create new 4096-D collection with indexed fields, and migrate all documents.""" try: print("🗑️ Deleting old listings collection…") await qdrant.delete_collection("listings") print("✓ Old collection deleted") except Exception as e: print(f"⚠️ Could not delete collection (may not exist): {e}") print("📦 Creating new 4096-D listings collection with indexed fields…") await qdrant.create_collection( "listings", vectors_config=models.VectorParams(size=VECTOR_SIZE, distance=models.Distance.COSINE), ) print("✓ Collection created with 4096-D vectors") # Create payload index for filtering fields print("📇 Creating payload indexes for filtering…") await qdrant.create_payload_index( collection_name="listings", field_name="location_lower", field_schema=models.PayloadSchemaType.KEYWORD, ) await qdrant.create_payload_index( collection_name="listings", field_name="price", field_schema=models.PayloadSchemaType.INTEGER, ) await qdrant.create_payload_index( collection_name="listings", field_name="bedrooms", field_schema=models.PayloadSchemaType.INTEGER, ) await qdrant.create_payload_index( collection_name="listings", field_name="bathrooms", field_schema=models.PayloadSchemaType.INTEGER, ) await qdrant.create_payload_index( collection_name="listings", field_name="price_type_lower", field_schema=models.PayloadSchemaType.KEYWORD, ) await qdrant.create_payload_index( collection_name="listings", field_name="listing_type_lower", field_schema=models.PayloadSchemaType.KEYWORD, ) await qdrant.create_payload_index( collection_name="listings", field_name="amenities", field_schema=models.PayloadSchemaType.KEYWORD, ) print("✓ Payload indexes created") total = await mongo.count_documents({}) print(f"📊 MongoDB contains {total} listings. Starting migration…\n") if total == 0: print("⚠️ No listings found in MongoDB. Migration complete.") await http.aclose() return cursor = mongo.find({}) batch = [] count = 0 async for doc in cursor: # Build text for embedding bedrooms = doc.get("bedrooms", "") location = doc.get("location", "") description = doc.get("description", "") text = f"{bedrooms}-bed {location} {description}".strip() # Generate embedding try: vector = await embed(text) except Exception as e: print(f"❌ Failed to embed document {doc.get('_id')}: {e}") continue # Build payload with lowercase versions for case-insensitive filtering price_type = doc.get("price_type") or "" listing_type = doc.get("type") or "" # from listing_type field in mongo payload = { "_id": str(doc["_id"]), "title": doc.get("title", ""), "description": description, "location": location, "location_lower": location.lower() if location else "", "price": doc.get("price") or 0, "price_type": price_type, "price_type_lower": price_type.lower() if price_type else "", "listing_type": listing_type, "listing_type_lower": listing_type.lower() if listing_type else "", "bedrooms": doc.get("bedrooms") or 0, "bathrooms": doc.get("bathrooms") or 0, "amenities": [a.lower() for a in doc.get("amenities", [])], # Store lowercase "currency": doc.get("currency", "XOF"), } batch.append(models.PointStruct(id=str(uuid4()), vector=vector, payload=payload)) # Upload batch when it reaches BATCH_SIZE if len(batch) >= BATCH_SIZE: qdrant.upload_points("listings", batch) # NO await - blocking is OK here count += len(batch) print(f"✓ Uploaded {count}/{total} listings") batch.clear() # Upload remaining batch if batch: qdrant.upload_points("listings", batch) # NO await - blocking is OK here count += len(batch) print(f"✓ Uploaded {count}/{total} listings") print(f"\n✅ Migration complete! All {count} listings indexed with proper fields.") print("📋 All payload indexes created successfully!") await http.aclose() # ------------------------------------------------------------------ # Main entry point # ------------------------------------------------------------------ if __name__ == "__main__": asyncio.run(rebuild())