import os import re import time import requests import pandas as pd import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import gradio as gr from bs4 import BeautifulSoup from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.vectorstores import Chroma from rank_bm25 import BM25Okapi from sentence_transformers import CrossEncoder PERSIST_DIR = "k8s_chroma_db" URLS = { # Kubernetes Docs "pods": "https://kubernetes.io/docs/concepts/workloads/pods/", "deployments": "https://kubernetes.io/docs/concepts/workloads/controllers/deployment/", "services": "https://kubernetes.io/docs/concepts/services-networking/service/", "namespaces": "https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/", "nodes": "https://kubernetes.io/docs/concepts/architecture/nodes/", "statefulsets": "https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/", "rbac": "https://kubernetes.io/docs/reference/access-authn-authz/rbac/", "persistent-volumes": "https://kubernetes.io/docs/concepts/storage/persistent-volumes/", "ingress": "https://kubernetes.io/docs/concepts/services-networking/ingress/", "autoscaling": "https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/", # Docker Docs 🐳 "docker-overview": "https://docs.docker.com/get-started/overview/", "docker-images": "https://docs.docker.com/get-started/docker-concepts/the-basics/what-are-images/", "docker-containers": "https://docs.docker.com/get-started/docker-concepts/the-basics/what-is-a-container/", "docker-volumes": "https://docs.docker.com/storage/volumes/", "docker-networking": "https://docs.docker.com/network/", "docker-compose": "https://docs.docker.com/compose/", } # ------------------ Knowledge Base ------------------ # def scrape_page(name, url): try: r = requests.get(url, timeout=20) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") # Try Kubernetes docs structure content = soup.find("div", class_="td-content") # Try Docker docs structure if not content: content = soup.find("div", class_="docs-content") if not content: return None text = content.get_text(separator="\n").strip() return Document(page_content=text, metadata={"doc_id": name, "url": url}) except Exception as e: print(f"[ERROR] scraping {url}: {e}") return None def build_or_load_kb(): embedding_model = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2" ) if os.path.isdir(PERSIST_DIR): print("[INFO] Loading existing DB...") vectordb = Chroma( embedding_function=embedding_model, persist_directory=PERSIST_DIR, ) raw = vectordb._collection.get(include=["documents", "metadatas"]) chunks = [ Document(page_content=d, metadata=m) for d, m in zip(raw["documents"], raw["metadatas"]) ] return vectordb, chunks print("[INFO] No DB found — scraping docs...") docs = [] for name, url in URLS.items(): d = scrape_page(name, url) if d: docs.append(d) print(f"[INFO] Scraped {len(docs)} docs") splitter = RecursiveCharacterTextSplitter(chunk_size=900, chunk_overlap=200) chunks = splitter.split_documents(docs) vectordb = Chroma.from_documents(chunks, embedding_model, persist_directory=PERSIST_DIR) vectordb.persist() print("[INFO] DB created.") return vectordb, chunks vectordb, chunks = build_or_load_kb() bm25 = BM25Okapi([c.page_content.split() for c in chunks]) reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-12-v2") retriever = vectordb.as_retriever( search_type="similarity_score_threshold", search_kwargs={"k": 8, "score_threshold": 0.35}, ) def hybrid_search(query, top_k=5): vector_results = retriever.invoke(query) tokenized_query = query.lower().split() bm_scores = bm25.get_scores(tokenized_query) bm_ranked = sorted(zip(bm_scores, chunks), key=lambda x: x[0], reverse=True) bm_results = [d for _, d in bm_ranked[:top_k]] combined = vector_results + bm_results # remove duplicates seen = set() unique = [] for d in combined: key = (d.metadata.get("doc_id"), d.page_content[:80]) if key not in seen: seen.add(key) unique.append(d) if not unique: return [] pairs = [(query, doc.page_content) for doc in unique] scores = reranker.predict(pairs) ranked = sorted(zip(scores, unique), key=lambda x: x[0], reverse=True)[:top_k] for s, doc in ranked: doc.metadata["rerank_score"] = float(s) return [doc for _, doc in ranked] # ------------------ LLM ------------------ # def call_llm(prompt): api_key = os.getenv("OPENROUTER_API_KEY") if not api_key: return "⚠ Missing OPENROUTER_API_KEY\nGroundedness: 0%" try: res = requests.post( "https://openrouter.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "HTTP-Referer": "https://huggingface.co/", "X-Title": "Kubernetes RAG Assistant", }, json={ "model": "meta-llama/llama-3.1-8b-instruct", "messages": [{"role": "user", "content": prompt}], "max_tokens": 400, "temperature": 0.0, }, timeout=60 ) res.raise_for_status() return res.json()["choices"][0]["message"]["content"] except Exception as e: return f"⚠ LLM error: {e}\nGroundedness: 0%" # ------------------ Chat + Metrics ------------------ # METRICS = {"q": [], "lat": [], "tok": [], "g": [], "r": [], "c": [], "t": []} def classify_query(q): q = q.lower() if "how" in q: return "how-to" if "error" in q or "fail" in q: return "debug" return "general" def answer_question(query, history): start = time.time() docs = hybrid_search(query) if not docs: reply = "Not found in docs.\nGroundedness: 0%" return history + [ {"role": "user", "content": query}, {"role": "assistant", "content": reply}, ], "" scores = [] ctx = "" sources = [] for i, d in enumerate(docs, 1): ctx += f"[{i}] {d.page_content[:900]}\nSource: {d.metadata['url']}\n\n" sources.append(f"[{i}] → {d.metadata['url']}") scores.append(d.metadata["rerank_score"]) prompt = f""" Answer using ONLY the context below. Each sentence MUST include citation like [1]. Question: {query} Context: {ctx} End with: Groundedness: XX% """ answer = call_llm(prompt) latency = time.time() - start grounded = 0 m = re.search(r"Groundedness:\s*(\d+)%", answer) if m: grounded = int(m.group(1)) cites = len(set(re.findall(r"\[(\d+)\]", answer))) avg_score = sum(scores) / len(scores) final = answer + "\n\n---\nSources:\n" + "\n".join(sources) # Log metrics correctly METRICS["q"].append(query) METRICS["lat"].append(latency) METRICS["tok"].append(len(answer.split())) METRICS["g"].append(grounded) METRICS["r"].append(avg_score) METRICS["c"].append(cites) METRICS["t"].append(classify_query(query)) history.append({"role": "user", "content": query}) history.append({"role": "assistant", "content": final}) return history, "" def update_dashboard(): rows = list(zip( range(1, len(METRICS["q"]) + 1), METRICS["q"], METRICS["lat"], METRICS["tok"], METRICS["g"], METRICS["r"], METRICS["c"], METRICS["t"], )) avgG = round(sum(METRICS["g"]) / len(METRICS["g"]), 2) avgL = round(sum(METRICS["lat"]) / len(METRICS["lat"]), 2) avgT = round(sum(METRICS["tok"]) / len(METRICS["tok"]), 2) return rows, avgG, avgL, avgT # ------------------ UI ------------------ # with gr.Blocks(title="Kubernetes RAG Assistant") as app: gr.Markdown("# ☸ Kubernetes RAG Assistant") with gr.Tab("Chat"): chat = gr.Chatbot(height=450) user_in = gr.Textbox(label="Ask about Kubernetes") clear = gr.Button("Clear") user_in.submit(answer_question, [user_in, chat], [chat, user_in]) clear.click(lambda: ([], ""), None, [chat, user_in]) with gr.Tab("Analytics"): gr.Markdown("### 📊 Query Analytics") table = gr.DataFrame( headers=[ "ID", "Query", "Latency", "Tokens", "Groundedness", "Rerank Score", "Citations", "Type", ], interactive=False ) avgG = gr.Number(label="Avg Groundedness") avgL = gr.Number(label="Avg Latency") avgT = gr.Number(label="Avg Tokens") update = gr.Button("Refresh Dashboard") update.click(update_dashboard, None, [table, avgG, avgL, avgT]) app.launch()