Spaces:
Running
Running
| # migrate_to_4096.py (with proper field indexing for filtering) | |
| import os | |
| import asyncio | |
| from uuid import uuid4 | |
| from typing import List | |
| import httpx | |
| from qdrant_client import AsyncQdrantClient, models | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| # ------------------------------------------------------------------ | |
| # CONFIGURATION FROM ENV VARS (recommended) OR HARD-CODED (for dev) | |
| # ------------------------------------------------------------------ | |
| QDRANT_URL = os.getenv("QDRANT_URL", "https://b96fe9df-a305-449a-9d55-8e858bfa1b82.us-east-1-1.aws.cloud.qdrant.io:6333") | |
| QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.nftA6VmjSsSQHr3zkkt2wlqFgFY9uLM6gesqi6b6Cis") | |
| OPENROUTER_KEY = os.getenv("OPENROUTER_API_KEY", "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") | |
| MONGODB_URL = os.getenv("MONGODB_URL", "mongodb+srv://dmldestiny7_db_user:o7Yd2aGtsIxvJXK6@lojizcluster.sypgxgp.mongodb.net/?retryWrites=true&w=majority") | |
| DB_NAME = "lojiz" | |
| COL_NAME = "listings" | |
| EMBED_MODEL = "qwen/qwen3-embedding-8b" | |
| VECTOR_SIZE = 4096 | |
| BATCH_SIZE = 64 | |
| # ------------------------------------------------------------------ | |
| # Async clients (60s timeout for embedding API) | |
| # ------------------------------------------------------------------ | |
| qdrant = AsyncQdrantClient( | |
| url=QDRANT_URL, | |
| api_key=QDRANT_API_KEY, | |
| https=True, | |
| timeout=60 | |
| ) | |
| mongo = AsyncIOMotorClient(MONGODB_URL)[DB_NAME][COL_NAME] | |
| http = httpx.AsyncClient(timeout=60) | |
| # ------------------------------------------------------------------ | |
| # Embedding helper | |
| # ------------------------------------------------------------------ | |
| async def embed(text: str) -> List[float]: | |
| """Embed text using OpenRouter (Qwen embedding model).""" | |
| payload = { | |
| "model": EMBED_MODEL, | |
| "input": text, | |
| "encoding_format": "float" | |
| } | |
| headers = { | |
| "Authorization": f"Bearer {OPENROUTER_KEY}", | |
| "Content-Type": "application/json", | |
| "HTTP-Referer": "", | |
| "X-Title": "", | |
| } | |
| r = await http.post("https://openrouter.ai/api/v1/embeddings", json=payload, headers=headers) | |
| r.raise_for_status() | |
| return r.json()["data"][0]["embedding"] | |
| # ------------------------------------------------------------------ | |
| # Main migration function | |
| # ------------------------------------------------------------------ | |
| async def rebuild(): | |
| """Delete old collection, create new 4096-D collection with indexed fields, and migrate all documents.""" | |
| try: | |
| print("🗑️ Deleting old listings collection…") | |
| await qdrant.delete_collection("listings") | |
| print("✓ Old collection deleted") | |
| except Exception as e: | |
| print(f"⚠️ Could not delete collection (may not exist): {e}") | |
| print("📦 Creating new 4096-D listings collection with indexed fields…") | |
| await qdrant.create_collection( | |
| "listings", | |
| vectors_config=models.VectorParams(size=VECTOR_SIZE, distance=models.Distance.COSINE), | |
| ) | |
| print("✓ Collection created with 4096-D vectors") | |
| # Create payload index for filtering fields | |
| print("📇 Creating payload indexes for filtering…") | |
| await qdrant.create_payload_index( | |
| collection_name="listings", | |
| field_name="location_lower", | |
| field_schema=models.PayloadSchemaType.KEYWORD, | |
| ) | |
| await qdrant.create_payload_index( | |
| collection_name="listings", | |
| field_name="price", | |
| field_schema=models.PayloadSchemaType.INTEGER, | |
| ) | |
| await qdrant.create_payload_index( | |
| collection_name="listings", | |
| field_name="bedrooms", | |
| field_schema=models.PayloadSchemaType.INTEGER, | |
| ) | |
| await qdrant.create_payload_index( | |
| collection_name="listings", | |
| field_name="bathrooms", | |
| field_schema=models.PayloadSchemaType.INTEGER, | |
| ) | |
| await qdrant.create_payload_index( | |
| collection_name="listings", | |
| field_name="price_type_lower", | |
| field_schema=models.PayloadSchemaType.KEYWORD, | |
| ) | |
| await qdrant.create_payload_index( | |
| collection_name="listings", | |
| field_name="listing_type_lower", | |
| field_schema=models.PayloadSchemaType.KEYWORD, | |
| ) | |
| await qdrant.create_payload_index( | |
| collection_name="listings", | |
| field_name="amenities", | |
| field_schema=models.PayloadSchemaType.KEYWORD, | |
| ) | |
| print("✓ Payload indexes created") | |
| total = await mongo.count_documents({}) | |
| print(f"📊 MongoDB contains {total} listings. Starting migration…\n") | |
| if total == 0: | |
| print("⚠️ No listings found in MongoDB. Migration complete.") | |
| await http.aclose() | |
| return | |
| cursor = mongo.find({}) | |
| batch = [] | |
| count = 0 | |
| async for doc in cursor: | |
| # Build text for embedding | |
| bedrooms = doc.get("bedrooms", "") | |
| location = doc.get("location", "") | |
| description = doc.get("description", "") | |
| text = f"{bedrooms}-bed {location} {description}".strip() | |
| # Generate embedding | |
| try: | |
| vector = await embed(text) | |
| except Exception as e: | |
| print(f"❌ Failed to embed document {doc.get('_id')}: {e}") | |
| continue | |
| # Build payload with lowercase versions for case-insensitive filtering | |
| price_type = doc.get("price_type") or "" | |
| listing_type = doc.get("type") or "" # from listing_type field in mongo | |
| payload = { | |
| "_id": str(doc["_id"]), | |
| "title": doc.get("title", ""), | |
| "description": description, | |
| "location": location, | |
| "location_lower": location.lower() if location else "", | |
| "price": doc.get("price") or 0, | |
| "price_type": price_type, | |
| "price_type_lower": price_type.lower() if price_type else "", | |
| "listing_type": listing_type, | |
| "listing_type_lower": listing_type.lower() if listing_type else "", | |
| "bedrooms": doc.get("bedrooms") or 0, | |
| "bathrooms": doc.get("bathrooms") or 0, | |
| "amenities": [a.lower() for a in doc.get("amenities", [])], # Store lowercase | |
| "currency": doc.get("currency", "XOF"), | |
| } | |
| batch.append(models.PointStruct(id=str(uuid4()), vector=vector, payload=payload)) | |
| # Upload batch when it reaches BATCH_SIZE | |
| if len(batch) >= BATCH_SIZE: | |
| qdrant.upload_points("listings", batch) # NO await - blocking is OK here | |
| count += len(batch) | |
| print(f"✓ Uploaded {count}/{total} listings") | |
| batch.clear() | |
| # Upload remaining batch | |
| if batch: | |
| qdrant.upload_points("listings", batch) # NO await - blocking is OK here | |
| count += len(batch) | |
| print(f"✓ Uploaded {count}/{total} listings") | |
| print(f"\n✅ Migration complete! All {count} listings indexed with proper fields.") | |
| print("📋 All payload indexes created successfully!") | |
| await http.aclose() | |
| # ------------------------------------------------------------------ | |
| # Main entry point | |
| # ------------------------------------------------------------------ | |
| if __name__ == "__main__": | |
| asyncio.run(rebuild()) |