Spaces:
Sleeping
Sleeping
| """ | |
| BOOK BUDDY β Ask questions about your PDFs | |
| This file is written with super-simple names and big comments, | |
| so a kid can read it and understand whatβs going on. | |
| How it works: | |
| 1) We read your PDF and cut it into small text pieces. | |
| 2) We make "numbers" (embeddings) for each piece so we can search fast. | |
| 3) When you ask a question, we find the best pieces and give them | |
| to a friendly robot model (Mistral) to make a short answer. | |
| 4) We also show which book files we used (sources). | |
| """ | |
| import os | |
| import numpy as np | |
| import gradio as gr | |
| from typing import List, Tuple | |
| from pypdf import PdfReader | |
| from sentence_transformers import SentenceTransformer | |
| from huggingface_hub import InferenceClient | |
| # ====== SETTINGS YOU CAN CHANGE ====== | |
| ROBOT_MODEL = os.getenv("GEN_MODEL", "mistralai/Mistral-7B-Instruct-v0.2") | |
| HF_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN") | |
| EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" | |
| PIECE_SIZE = 900 # how big each text piece is | |
| PIECE_OVERLAP = 150 # how much pieces overlap | |
| TOP_K = 4 # how many pieces we use to answer | |
| # ====== TRY FAISS (fast search). IF NOT, USE SIMPLE NUMPY SEARCH ====== | |
| USE_FAISS = True | |
| try: | |
| import faiss # fast similarity search | |
| except Exception: | |
| USE_FAISS = False | |
| # ====== GLOBAL MEMORY (lives while the app is running) ====== | |
| make_numbers = SentenceTransformer(EMBEDDING_MODEL) | |
| faiss_index = None # used if FAISS works | |
| all_vectors = None # used if FAISS doesn't work | |
| all_pieces: List[str] = [] | |
| all_files: List[str] = [] | |
| client = InferenceClient(model=ROBOT_MODEL, token=HF_TOKEN) | |
| # A friendly rule for the robot | |
| ROBOT_RULES = ( | |
| "You are a helpful assistant. Use the given CONTEXT to answer the QUESTION.\n" | |
| "!!IMPORTANT!! - If the answer is not in the context, Strictly say 'I don't know.', Do not respond any other answers!!!\n" | |
| "Be short and add source filenames at the end like [source: file.pdf]." | |
| ) | |
| # ====== LITTLE HELPER FUNCTIONS ====== | |
| def read_pdf_text(path: str) -> str: | |
| """Open a PDF and return all the text inside.""" | |
| reader = PdfReader(path) | |
| pages = [(p.extract_text() or "") for p in reader.pages] | |
| return "\n".join(pages) | |
| def cut_into_pieces(big_text: str, size: int, overlap: int) -> List[str]: | |
| """Cut text into small overlapping pieces (like puzzle pieces).""" | |
| pieces, step = [], size - overlap | |
| i, n = 0, len(big_text) | |
| while i < n: | |
| chunk = big_text[i:i+size].strip() | |
| if chunk: | |
| pieces.append(chunk) | |
| i += step | |
| return pieces | |
| def embed_texts(texts: List[str]) -> np.ndarray: | |
| """Turn text into numbers so we can search by meaning.""" | |
| X = make_numbers.encode(texts, convert_to_numpy=True, normalize_embeddings=True) | |
| return np.asarray(X, dtype=np.float32) | |
| def start_memory(dim: int): | |
| """Create the place where we store the vectors (FAISS or NumPy).""" | |
| global faiss_index, all_vectors | |
| if USE_FAISS: | |
| faiss_index = faiss.IndexFlatIP(dim) # inner product = cosine because normalized | |
| else: | |
| faiss_index = None | |
| all_vectors = None | |
| def add_to_memory(vectors: np.ndarray): | |
| """Put new vectors into our memory.""" | |
| global all_vectors | |
| if USE_FAISS: | |
| faiss_index.add(vectors) | |
| else: | |
| all_vectors = vectors if all_vectors is None else np.vstack([all_vectors, vectors]) | |
| def search_best_pieces(query_vector: np.ndarray, k: int) -> Tuple[np.ndarray, np.ndarray]: | |
| """Find the k best matching pieces for the question.""" | |
| if USE_FAISS: | |
| return faiss_index.search(query_vector, k) # returns (distances, indices) | |
| scores = all_vectors @ query_vector[0] # cosine/IP because normalized | |
| idx = np.argsort(-scores)[:k] | |
| return scores[idx][None, :], idx[None, :] | |
| # ====== MAIN ACTIONS THE BUTTONS CALL ====== | |
| def reset_everything(): | |
| """Clear all memory (like starting fresh).""" | |
| global faiss_index, all_vectors, all_pieces, all_files | |
| faiss_index = None | |
| all_vectors = None | |
| all_pieces = [] | |
| all_files = [] | |
| return "Cleared! Upload PDFs again and click Build Index." | |
| def build_memory_from_pdfs(files) -> str: | |
| """Read PDFs β cut into pieces β turn to numbers β store them.""" | |
| global all_pieces, all_files | |
| all_pieces, all_files = [], [] | |
| # 1) read + cut | |
| for f in files: | |
| text = read_pdf_text(f.name) | |
| pieces = cut_into_pieces(text, PIECE_SIZE, PIECE_OVERLAP) | |
| all_pieces.extend(pieces) | |
| all_files.extend([os.path.basename(f.name)] * len(pieces)) | |
| if not all_pieces: | |
| return "No text found. Try another PDF." | |
| # 2) embeddings + memory | |
| E = embed_texts(all_pieces) | |
| start_memory(E.shape[1]) | |
| add_to_memory(E) | |
| return f"Indexed {len(all_pieces)} pieces from {len(files)} file(s)." | |
| def ask_robot(question: str) -> str: | |
| """Search the best pieces and ask the robot model to answer.""" | |
| if not question.strip(): | |
| return "Type a question in the box." | |
| if (USE_FAISS and faiss_index is None) or (not USE_FAISS and all_vectors is None) or not all_pieces: | |
| return "Upload PDFs and press **Build Index** first." | |
| # 1) find helpful pieces | |
| qv = embed_texts([question]) | |
| _, idxs = search_best_pieces(qv, TOP_K) | |
| ids = [i for i in idxs[0].tolist() if i >= 0] | |
| # 2) build the context we give to the robot | |
| context_blocks = [] | |
| used_files = [] | |
| for rank, i in enumerate(ids, start=1): | |
| snippet = all_pieces[i][:1000] | |
| fname = all_files[i] | |
| context_blocks.append(f"[{rank}] {fname}\n{snippet}") | |
| used_files.append(fname) | |
| context_text = "\n\n---\n".join(context_blocks) | |
| # 3) talk to the robot on Hugging Face | |
| messages = [ | |
| {"role": "system", "content": ROBOT_RULES}, | |
| {"role": "user", "content": f"QUESTION: {question}\n\nCONTEXT:\n{context_text}"}, | |
| ] | |
| # Plan A: chat-completions (most models) | |
| try: | |
| resp = client.chat.completions.create( | |
| model=ROBOT_MODEL, | |
| messages=messages, | |
| max_tokens=512, | |
| temperature=0.2, | |
| top_p=0.95, | |
| ) | |
| out = resp.choices[0].message.content | |
| except Exception: | |
| # Plan B: plain text generation (some endpoints) | |
| prompt = f"[INST] {ROBOT_RULES}\n\nQUESTION: {question}\n\nCONTEXT:\n{context_text}\n[/INST]" | |
| out = client.text_generation( | |
| prompt, | |
| max_new_tokens=512, | |
| temperature=0.2, | |
| top_p=0.95, | |
| repetition_penalty=1.05, | |
| do_sample=True, | |
| return_full_text=False, | |
| ) | |
| # 4) add sources (the book files we used) | |
| unique_sources = ", ".join(sorted(set(used_files))) if used_files else "N/A" | |
| return f"{out.strip()}\n\nSources: {unique_sources}" | |
| # ====== THE SIMPLE WEB PAGE ====== | |
| with gr.Blocks(title="π Book Buddy β Ask your PDFs") as demo: | |
| gr.Markdown( | |
| "## π Book Buddy\n" | |
| "1) Upload your PDF book. 2) Press **Build Index** (Book Buddy learns!). " | |
| "3) Ask your question. 4) Look at **Sources** to see which file was used.\n" | |
| "_Tip: start with one small PDF so itβs fast._" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| pdfs = gr.File(file_count="multiple", file_types=[".pdf"], label="Upload PDF books") | |
| build_btn = gr.Button("π§ Build Index", variant="primary") | |
| reset_btn = gr.Button("π Reset") | |
| status = gr.Markdown() | |
| with gr.Column(scale=2): | |
| q = gr.Textbox(label="Ask a question", placeholder="Example: Give me 3 key points from this book.") | |
| examples = gr.Examples( | |
| examples=[ | |
| ["Summarize the main idea in 2 sentences."], | |
| ["List 3 important facts from this book."], | |
| ], | |
| inputs=q, | |
| ) | |
| ask_btn = gr.Button("β‘οΈ Ask") | |
| answer = gr.Markdown() | |
| build_btn.click(build_memory_from_pdfs, inputs=[pdfs], outputs=[status]) | |
| reset_btn.click(fn=reset_everything, inputs=None, outputs=[status]) | |
| ask_btn.click(ask_robot, inputs=[q], outputs=[answer]) | |
| q.submit(ask_robot, inputs=[q], outputs=[answer]) | |
| if __name__ == "__main__": | |
| demo.launch() | |