Spaces:
Running
Running
| import json | |
| import os | |
| import re | |
| import shutil | |
| import tempfile | |
| import ast | |
| import base64 | |
| import asyncio | |
| from concurrent.futures import ThreadPoolExecutor | |
| from string import Template | |
| from datetime import datetime | |
| from pathlib import Path | |
| from types import SimpleNamespace | |
| from typing import Any | |
| import fitz # PyMuPDF | |
| import gradio as gr | |
| import pandas as pd | |
| import yaml | |
| from openai import AzureOpenAI | |
| MAX_DESCRIPTION_FIELDS = 30 | |
| MAX_EXTRACT_FIELDS = 30 | |
| DEFAULT_STORAGE_ROOT = Path("/data/llm_fulltextscreener") # Path("/tmp/llm_fulltextscreener") | |
| APP_STORAGE_ROOT = Path(os.getenv("APP_STORAGE_DIR", str(DEFAULT_STORAGE_ROOT))) | |
| USERS_ROOT_DIR = APP_STORAGE_ROOT / "users" | |
| MAX_EXPORTED_FILES = 20 | |
| MAX_INLINE_DOWNLOAD_BYTES = 8 * 1024 * 1024 | |
| VALID_DECISIONS = {"include", "exclude"} | |
| PROMPTS_DIR = Path(__file__).resolve().parent / "prompts" | |
| SYSTEM_PROMPT_PATH = PROMPTS_DIR / "system_prompt.txt" | |
| USER_PROMPT_TEMPLATE_PATH = PROMPTS_DIR / "user_prompt_template.json" | |
| SYSTEM_CRITERIA_PROMPT_PATH = PROMPTS_DIR / "system_criteria_prompt.txt" | |
| USER_CRITERIA_TEMPLATE_PATH = PROMPTS_DIR / "user_criteria_template.json" | |
| SYSTEM_LABELS_PROMPT_PATH = PROMPTS_DIR / "system_labels_prompt.txt" | |
| USER_LABELS_TEMPLATE_PATH = PROMPTS_DIR / "user_labels_template.json" | |
| def patch_asyncio_invalid_fd_cleanup() -> None: | |
| """ | |
| Work around Python 3.11 selector-loop teardown race seen on some runtimes | |
| (including Spaces), where loop __del__ may raise: | |
| ValueError: Invalid file descriptor: -1 | |
| """ | |
| original_del = getattr(asyncio.BaseEventLoop, "__del__", None) | |
| if original_del is None or getattr(original_del, "_invalid_fd_guard", False): | |
| return | |
| def _guarded_del(self): | |
| try: | |
| original_del(self) | |
| except ValueError as exc: | |
| if "Invalid file descriptor" not in str(exc): | |
| raise | |
| _guarded_del._invalid_fd_guard = True | |
| asyncio.BaseEventLoop.__del__ = _guarded_del | |
| patch_asyncio_invalid_fd_cleanup() | |
| def is_debug_enabled() -> bool: | |
| return os.getenv("APP_DEBUG", "").strip().lower() in {"1", "true", "yes", "on"} | |
| def debug_log(*parts: Any) -> None: | |
| if is_debug_enabled(): | |
| print("[DEBUG]", *parts) | |
| def normalize_key(text: str) -> str: | |
| return re.sub(r"[^a-z0-9]+", "", str(text).strip().lower()) | |
| def sanitize_user_id(raw: str) -> str: | |
| cleaned = re.sub(r"[^a-zA-Z0-9._-]+", "_", str(raw or "").strip()) | |
| return cleaned or "default" | |
| def resolve_user_id(explicit_user_id: str | None = None, request: gr.Request | None = None) -> str: | |
| if explicit_user_id and str(explicit_user_id).strip(): | |
| return sanitize_user_id(explicit_user_id) | |
| req_username = getattr(request, "username", None) if request is not None else None | |
| if req_username and str(req_username).strip(): | |
| return sanitize_user_id(str(req_username)) | |
| return "default" | |
| def init_user_id(request: gr.Request | None = None) -> str: | |
| return resolve_user_id(request=request) | |
| def get_user_session_dir(user_id: str) -> Path: | |
| return USERS_ROOT_DIR / sanitize_user_id(user_id) | |
| def get_user_session_meta_path(user_id: str) -> Path: | |
| return get_user_session_dir(user_id) / "session.json" | |
| def get_user_session_files_dir(user_id: str) -> Path: | |
| return get_user_session_dir(user_id) / "files" | |
| def get_user_exports_dir(user_id: str) -> Path: | |
| return get_user_session_dir(user_id) / "exports" | |
| def _ensure_session_dirs(user_id: str) -> None: | |
| get_user_session_files_dir(user_id).mkdir(parents=True, exist_ok=True) | |
| get_user_exports_dir(user_id).mkdir(parents=True, exist_ok=True) | |
| def _setup_storage_paths() -> None: | |
| """Configure writable temp paths in Spaces.""" | |
| USERS_ROOT_DIR.mkdir(parents=True, exist_ok=True) | |
| tmp_default = get_user_session_files_dir("default") | |
| tmp_default.mkdir(parents=True, exist_ok=True) | |
| os.environ["TMPDIR"] = str(tmp_default.resolve()) | |
| tempfile.tempdir = str(tmp_default.resolve()) | |
| def load_session_meta(user_id: str) -> dict[str, Any]: | |
| session_meta_path = get_user_session_meta_path(user_id) | |
| try: | |
| if session_meta_path.exists(): | |
| return json.loads(session_meta_path.read_text(encoding="utf-8")) | |
| except Exception: | |
| return {} | |
| return {} | |
| def save_session_meta(user_id: str, updates: dict[str, Any]) -> None: | |
| _ensure_session_dirs(user_id) | |
| session_meta_path = get_user_session_meta_path(user_id) | |
| data = load_session_meta(user_id) | |
| data.update(updates) | |
| session_meta_path.write_text(json.dumps(data, indent=2), encoding="utf-8") | |
| def persist_uploaded_file(user_id: str, file_obj, dest_name: str) -> str | None: | |
| if file_obj is None: | |
| return None | |
| src = resolve_uploaded_path(file_obj) | |
| if not src.exists() or not src.is_file(): | |
| return None | |
| _ensure_session_dirs(user_id) | |
| dest = get_user_session_files_dir(user_id) / dest_name | |
| try: | |
| if src.resolve() == dest.resolve(): | |
| return str(dest.resolve()) | |
| except Exception: | |
| pass | |
| shutil.copy2(src, dest) | |
| return str(dest.resolve()) | |
| def resolve_uploaded_path(file_obj) -> Path: | |
| if file_obj is None: | |
| return Path("") | |
| if isinstance(file_obj, (str, Path)): | |
| return Path(file_obj) | |
| file_name = getattr(file_obj, "name", "") | |
| if file_name: | |
| return Path(file_name) | |
| if isinstance(file_obj, dict): | |
| candidate = str(file_obj.get("name", "")).strip() | |
| if candidate: | |
| return Path(candidate) | |
| return Path("") | |
| def persist_dataframe(user_id: str, df: pd.DataFrame) -> str: | |
| _ensure_session_dirs(user_id) | |
| dest = get_user_session_files_dir(user_id) / "working_table.xlsx" | |
| df.to_excel(dest, index=False) | |
| return str(dest.resolve()) | |
| def _cleanup_old_exports(user_id: str, max_files: int = MAX_EXPORTED_FILES) -> None: | |
| try: | |
| export_files = [p for p in get_user_exports_dir(user_id).glob("screened_*.xlsx") if p.is_file()] | |
| export_files.sort(key=lambda p: p.stat().st_mtime, reverse=True) | |
| for old_file in export_files[max_files:]: | |
| try: | |
| old_file.unlink() | |
| except Exception: | |
| continue | |
| except Exception: | |
| return | |
| def persist_downloadable_dataframe(user_id: str, df: pd.DataFrame) -> str | None: | |
| _ensure_session_dirs(user_id) | |
| filename = f"screened_{datetime.utcnow().strftime('%Y%m%d_%H%M%S_%f')}.xlsx" | |
| export_path = get_user_exports_dir(user_id) / filename | |
| try: | |
| df.to_excel(export_path, index=False) | |
| if not export_path.exists() or export_path.stat().st_size == 0: | |
| return None | |
| if is_debug_enabled(): | |
| print( | |
| f"[DEBUG] Export ready: path={export_path.resolve()} size={export_path.stat().st_size} bytes" | |
| ) | |
| _cleanup_old_exports(user_id) | |
| return str(export_path.resolve()) | |
| except Exception: | |
| return None | |
| def build_inline_download_html(path: str | None) -> str: | |
| if not path: | |
| return "<p>Download unavailable.</p>" | |
| candidate = Path(path) | |
| if not candidate.exists() or not candidate.is_file(): | |
| return "<p>Download unavailable: exported file not found.</p>" | |
| try: | |
| raw = candidate.read_bytes() | |
| except Exception: | |
| return "<p>Download unavailable: could not read exported file.</p>" | |
| if len(raw) == 0: | |
| return "<p>Download unavailable: exported file is empty.</p>" | |
| if len(raw) > MAX_INLINE_DOWNLOAD_BYTES: | |
| size_mb = len(raw) / (1024 * 1024) | |
| return ( | |
| f"<p>Inline download disabled for large files ({size_mb:.1f} MB). " | |
| "Reduce export size and try again.</p>" | |
| ) | |
| b64 = base64.b64encode(raw).decode("ascii") | |
| filename = candidate.name | |
| href = ( | |
| "data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64," | |
| f"{b64}" | |
| ) | |
| return ( | |
| "<p><strong>Download:</strong></p>" | |
| f'<a download="{filename}" href="{href}" ' | |
| 'style="display:inline-block;padding:8px 12px;border:1px solid #888;' | |
| 'border-radius:6px;text-decoration:none;">Download Excel</a>' | |
| ) | |
| def empty_description_updates() -> list[dict[str, Any]]: | |
| return [ | |
| gr.update(label=f"Description {idx + 1}", value="", visible=False) | |
| for idx in range(MAX_DESCRIPTION_FIELDS) | |
| ] | |
| def empty_extracted_state(status: str, *, extracted_state: dict[str, Any] | None = None): | |
| return ( | |
| extracted_state or {}, | |
| [], | |
| *build_empty_extracted_input_updates(), | |
| "", | |
| 0.0, | |
| "include", | |
| "", | |
| "", | |
| "", | |
| "", | |
| status, | |
| ) | |
| def is_missing(value: Any) -> bool: | |
| if pd.isna(value): | |
| return True | |
| if isinstance(value, str) and value.strip() == "": | |
| return True | |
| return False | |
| def parse_csv_columns(raw_text: str, available_columns: list[str]) -> list[str]: | |
| if not raw_text or not raw_text.strip(): | |
| return [] | |
| requested = [item.strip() for item in raw_text.split(",") if item.strip()] | |
| return [col for col in requested if col in available_columns] | |
| def choose_url_column(df: pd.DataFrame, preferred: str | None = None) -> str: | |
| if preferred and preferred in df.columns: | |
| return preferred | |
| for col in df.columns: | |
| col_l = str(col).lower() | |
| if "url" in col_l or "link" in col_l: | |
| return col | |
| return str(df.columns[0]) | |
| def parse_criteria_file(file_obj) -> dict[str, Any] | None: | |
| if file_obj is None: | |
| return None | |
| path = resolve_uploaded_path(file_obj) | |
| if str(path).strip() == "": | |
| return None | |
| if not path.exists() or not path.is_file(): | |
| raise ValueError("Criteria file not found.") | |
| try: | |
| raw = path.read_text(encoding="utf-8") | |
| except Exception as exc: | |
| raise ValueError(f"Failed reading criteria file: {exc}") from exc | |
| try: | |
| parsed = yaml.safe_load(raw) | |
| except Exception as exc: | |
| raise ValueError(f"Invalid YAML in criteria file: {exc}") from exc | |
| if not isinstance(parsed, dict): | |
| raise ValueError("criteria.yml must contain a top-level mapping/object.") | |
| topic = str(parsed.get("topic", "")).strip() | |
| inclusion = parsed.get("inclusion_criteria", []) | |
| exclusion = parsed.get("exclusion_criteria", []) | |
| notes = str(parsed.get("notes", "")).strip() | |
| if not topic: | |
| raise ValueError("criteria.yml requires a non-empty 'topic'.") | |
| if not isinstance(inclusion, list): | |
| raise ValueError("'inclusion_criteria' must be a list of strings.") | |
| if not isinstance(exclusion, list): | |
| raise ValueError("'exclusion_criteria' must be a list of strings.") | |
| inclusion_clean = [str(item).strip() for item in inclusion if str(item).strip()] | |
| exclusion_clean = [str(item).strip() for item in exclusion if str(item).strip()] | |
| return { | |
| "topic": topic, | |
| "inclusion_criteria": inclusion_clean, | |
| "exclusion_criteria": exclusion_clean, | |
| "notes": notes, | |
| } | |
| def parse_labels_csv(raw: Any) -> list[str]: | |
| if raw is None or pd.isna(raw): | |
| return [] | |
| text = str(raw).strip() | |
| if not text: | |
| return [] | |
| labels = [item.strip() for item in text.split(",") if item.strip()] | |
| return list(dict.fromkeys(labels)) | |
| def build_default_descriptions(columns: list[str]) -> dict[str, str]: | |
| return {col: f"Extract the value for '{col}' from the article text." for col in columns} | |
| def build_description_values_from_inputs( | |
| description_columns: list[str], | |
| description_values: list[str], | |
| target_columns: list[str], | |
| ) -> dict[str, str]: | |
| defaults = build_default_descriptions(target_columns) | |
| if not isinstance(description_columns, list): | |
| description_columns = [] | |
| for idx, col in enumerate(description_columns): | |
| if idx >= len(description_values): | |
| break | |
| col_name = str(col).strip() | |
| desc = str(description_values[idx]).strip() | |
| if col_name in defaults and desc: | |
| defaults[col_name] = desc | |
| return defaults | |
| def build_description_input_updates( | |
| target_columns: list[str], | |
| previous_description_columns: list[str], | |
| previous_description_values: list[str], | |
| ) -> tuple[list[str], list[dict[str, Any]]]: | |
| description_map = build_description_values_from_inputs( | |
| previous_description_columns, | |
| previous_description_values, | |
| target_columns, | |
| ) | |
| active_columns = target_columns[:MAX_DESCRIPTION_FIELDS] | |
| updates: list[dict[str, Any]] = [] | |
| for idx in range(MAX_DESCRIPTION_FIELDS): | |
| if idx < len(active_columns): | |
| col = active_columns[idx] | |
| updates.append( | |
| gr.update( | |
| label=f"Description: {col}", | |
| value=description_map.get(col, ""), | |
| visible=True, | |
| ) | |
| ) | |
| else: | |
| updates.append( | |
| gr.update( | |
| label=f"Description {idx + 1}", | |
| value="", | |
| visible=False, | |
| ) | |
| ) | |
| return active_columns, updates | |
| def build_empty_extracted_input_updates() -> list[dict[str, Any]]: | |
| return [ | |
| gr.update(label=f"Extracted field {idx + 1}", value="", visible=False) | |
| for idx in range(MAX_EXTRACT_FIELDS) | |
| ] | |
| def build_extracted_input_updates( | |
| target_columns: list[str], | |
| field_values: dict[str, Any], | |
| ) -> tuple[list[str], list[dict[str, Any]]]: | |
| active_columns = target_columns[:MAX_EXTRACT_FIELDS] | |
| updates: list[dict[str, Any]] = [] | |
| for idx in range(MAX_EXTRACT_FIELDS): | |
| if idx < len(active_columns): | |
| col = active_columns[idx] | |
| updates.append( | |
| gr.update( | |
| label=f"Extracted: {col}", | |
| value=str(field_values.get(col, "")), | |
| visible=True, | |
| ) | |
| ) | |
| else: | |
| updates.append( | |
| gr.update( | |
| label=f"Extracted field {idx + 1}", | |
| value="", | |
| visible=False, | |
| ) | |
| ) | |
| return active_columns, updates | |
| def build_extracted_values_from_inputs( | |
| extracted_columns: list[str], | |
| extracted_values: list[str], | |
| ) -> dict[str, str]: | |
| fields: dict[str, str] = {} | |
| if not isinstance(extracted_columns, list): | |
| return fields | |
| for idx, col in enumerate(extracted_columns): | |
| if idx >= len(extracted_values): | |
| break | |
| fields[str(col)] = str(extracted_values[idx]) if extracted_values[idx] is not None else "" | |
| return fields | |
| def coerce_fields_from_llm(parsed: dict[str, Any], column_names: list[str]) -> dict[str, str]: | |
| raw_fields = parsed.get("fields", {}) | |
| fields_dict: dict[str, Any] = {} | |
| if isinstance(raw_fields, dict): | |
| fields_dict = raw_fields | |
| elif isinstance(raw_fields, str): | |
| try: | |
| maybe_obj = json.loads(raw_fields) | |
| if isinstance(maybe_obj, dict): | |
| fields_dict = maybe_obj | |
| except json.JSONDecodeError: | |
| fields_dict = {} | |
| # Fallback: model may place extracted values at top level. | |
| if not fields_dict: | |
| fields_dict = { | |
| col: parsed.get(col, "") | |
| for col in column_names | |
| if col in parsed | |
| } | |
| # Fuzzy fallback: tolerate minor key format differences. | |
| if len(fields_dict) == 0: | |
| normalized_requested = {normalize_key(col): col for col in column_names} | |
| for key, value in parsed.items(): | |
| if key in {"fields", "evidence", "confidence", "decision"}: | |
| continue | |
| norm = normalize_key(str(key)) | |
| if norm in normalized_requested: | |
| fields_dict[normalized_requested[norm]] = value | |
| return {col: str(fields_dict.get(col, "")) for col in column_names} | |
| def _parse_structured_text(raw: str) -> Any: | |
| txt = raw.strip() | |
| if not txt: | |
| return None | |
| try: | |
| return json.loads(txt) | |
| except Exception: | |
| pass | |
| try: | |
| return ast.literal_eval(txt) | |
| except Exception: | |
| return None | |
| def _coerce_evidence_items(raw: Any) -> list[dict[str, str]]: | |
| items: list[dict[str, str]] = [] | |
| if raw is None: | |
| return items | |
| if isinstance(raw, str): | |
| parsed = _parse_structured_text(raw) | |
| if parsed is None: | |
| return items | |
| raw = parsed | |
| if isinstance(raw, dict): | |
| # Supports {"FIELD": "..."} and {"FIELD": ["...", "..."]} | |
| for field, snippet_value in raw.items(): | |
| if isinstance(snippet_value, list): | |
| for s in snippet_value: | |
| snippet = str(s).strip() | |
| if snippet: | |
| items.append({"field": str(field).strip(), "snippet": snippet}) | |
| else: | |
| snippet = str(snippet_value).strip() | |
| if snippet: | |
| items.append({"field": str(field).strip(), "snippet": snippet}) | |
| return items | |
| if isinstance(raw, list): | |
| for item in raw: | |
| if isinstance(item, dict): | |
| field = str(item.get("field", "")).strip() | |
| snippet = str(item.get("snippet", "")).strip() | |
| if field and snippet: | |
| items.append({"field": field, "snippet": snippet}) | |
| elif isinstance(item, str): | |
| # String entries without explicit field are ignored here. | |
| continue | |
| return items | |
| def normalize_evidence_snippets(parsed: dict[str, Any], column_names: list[str], fields: dict[str, str]) -> list[dict[str, str]]: | |
| normalized: list[dict[str, str]] = [] | |
| # Primary source | |
| for item in _coerce_evidence_items(parsed.get("evidence_snippets", [])): | |
| field = item["field"] | |
| snippet = item["snippet"] | |
| if field in column_names and snippet: | |
| normalized.append({"field": field, "snippet": snippet}) | |
| # Secondary source: legacy or malformed `evidence` payload | |
| if not normalized: | |
| for item in _coerce_evidence_items(parsed.get("evidence", "")): | |
| field = item["field"] | |
| snippet = item["snippet"] | |
| if field in column_names and snippet: | |
| normalized.append({"field": field, "snippet": snippet}) | |
| # Fallback for legacy single-string evidence. | |
| if not normalized: | |
| legacy_evidence = str(parsed.get("evidence", "")).strip() | |
| if legacy_evidence: | |
| non_empty_fields = [col for col in column_names if str(fields.get(col, "")).strip()] | |
| target_field = non_empty_fields[0] if non_empty_fields else (column_names[0] if column_names else "unknown") | |
| normalized.append({"field": target_field, "snippet": legacy_evidence}) | |
| # De-duplicate exact repeats while preserving order. | |
| deduped: list[dict[str, str]] = [] | |
| seen: set[tuple[str, str]] = set() | |
| for item in normalized: | |
| key = (item["field"], item["snippet"]) | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| deduped.append(item) | |
| return deduped | |
| def format_evidence_for_ui(snippets: list[dict[str, str]]) -> str: | |
| if not snippets: | |
| return "" | |
| return "\n".join([f"- {item['field']}: {item['snippet']}" for item in snippets]) | |
| def detect_incomplete_rows(df: pd.DataFrame, target_columns: list[str]) -> list[int]: | |
| return [ | |
| int(idx) | |
| for idx, row in df.iterrows() | |
| if any(is_missing(row.get(col)) for col in target_columns) | |
| ] | |
| def get_missing_columns(df: pd.DataFrame, row_index: int, target_columns: list[str]) -> list[str]: | |
| row = df.loc[row_index] | |
| return [col for col in target_columns if is_missing(row.get(col))] | |
| def get_next_row(df: pd.DataFrame, incomplete_rows: list[int], position: int, target_columns: list[str]) -> tuple[int, int | None]: | |
| while position < len(incomplete_rows): | |
| row_idx = incomplete_rows[position] | |
| if len(get_missing_columns(df, row_idx, target_columns)) > 0: | |
| return position, row_idx | |
| position += 1 | |
| return position, None | |
| def _find_first_column(df: pd.DataFrame, candidates: list[str]) -> str | None: | |
| normalized = {str(col).lower().strip(): str(col) for col in df.columns} | |
| for candidate in candidates: | |
| if candidate in normalized: | |
| return normalized[candidate] | |
| for col in df.columns: | |
| col_l = str(col).lower() | |
| for candidate in candidates: | |
| if candidate in col_l: | |
| return str(col) | |
| return None | |
| def article_details_markdown(df: pd.DataFrame, row_index: int) -> str: | |
| title_col = _find_first_column(df, ["title", "article title", "paper title", "study title"]) | |
| author_col = _find_first_column(df, ["author", "authors", "first author"]) | |
| title_value = "" | |
| author_value = "" | |
| if title_col is not None: | |
| raw = df.loc[row_index, title_col] | |
| title_value = "" if pd.isna(raw) else str(raw).strip() | |
| if author_col is not None: | |
| raw = df.loc[row_index, author_col] | |
| author_value = "" if pd.isna(raw) else str(raw).strip() | |
| if not title_value: | |
| title_value = "Unknown" | |
| if not author_value: | |
| author_value = "Unknown" | |
| return f"**Title:** {title_value}\n\n**Author(s):** {author_value}" | |
| def render_current_row( | |
| df: pd.DataFrame | None, | |
| incomplete_rows: list[int] | None, | |
| position: int, | |
| url_column: str, | |
| target_columns: list[str], | |
| ) -> tuple[int, int | None, str, str, str, str, str]: | |
| if df is None or incomplete_rows is None or len(incomplete_rows) == 0: | |
| return ( | |
| position, | |
| None, | |
| "", | |
| "No rows loaded.", | |
| "", | |
| "", | |
| "", | |
| ) | |
| next_position, row_idx = get_next_row(df, incomplete_rows, position, target_columns) | |
| if row_idx is None: | |
| return ( | |
| next_position, | |
| None, | |
| "", | |
| "All target rows are complete.", | |
| "", | |
| "", | |
| f"Processed {len(incomplete_rows)} / {len(incomplete_rows)} rows.", | |
| ) | |
| article_md = article_details_markdown(df, row_idx) | |
| url_value = str(df.loc[row_idx, url_column]) if url_column in df.columns else "" | |
| url_md = f"[Open article URL]({url_value})" if url_value else "URL not available" | |
| missing_md = "" | |
| current_md = "" | |
| counter = f"Row {next_position + 1} of {len(incomplete_rows)} (index: {row_idx})" | |
| return next_position, row_idx, article_md, url_md, current_md, missing_md, counter | |
| def _parse_target_columns_for_ui( | |
| target_columns_text: str, | |
| url_column_text: str, | |
| df: pd.DataFrame | None, | |
| ) -> list[str]: | |
| raw_requested = [item.strip() for item in (target_columns_text or "").split(",") if item.strip()] | |
| deduped_requested = list(dict.fromkeys(raw_requested)) | |
| if df is None or df.empty: | |
| return deduped_requested | |
| available_columns = [str(c) for c in df.columns] | |
| url_column = choose_url_column(df, url_column_text.strip() if url_column_text else None) | |
| selected_target_columns = parse_csv_columns(target_columns_text, available_columns) | |
| if not selected_target_columns: | |
| selected_target_columns = [str(c) for c in df.columns if str(c) != url_column] | |
| return selected_target_columns | |
| def refresh_description_inputs( | |
| target_columns_text: str, | |
| url_column_text: str, | |
| df: pd.DataFrame | None, | |
| description_columns: list[str], | |
| *description_values: str, | |
| ): | |
| target_columns = _parse_target_columns_for_ui(target_columns_text, url_column_text, df) | |
| normalized_description_columns, description_updates = build_description_input_updates( | |
| target_columns, | |
| description_columns if isinstance(description_columns, list) else [], | |
| list(description_values), | |
| ) | |
| return ( | |
| normalized_description_columns, | |
| *description_updates, | |
| ) | |
| def load_excel( | |
| file_obj, | |
| criteria_file_obj, | |
| user_id_input: str, | |
| target_columns_text: str, | |
| url_column_text: str, | |
| description_columns: list[str], | |
| *description_values: str, | |
| request: gr.Request | None = None, | |
| ): | |
| user_id = resolve_user_id(explicit_user_id=user_id_input, request=request) | |
| def _failure(message: str): | |
| download_html = build_inline_download_html(None) | |
| return ( | |
| None, | |
| [], | |
| 0, | |
| None, | |
| [], | |
| "", | |
| {}, | |
| [], | |
| [], | |
| message, | |
| "", | |
| "", | |
| "", | |
| "", | |
| "", | |
| *empty_description_updates(), | |
| *build_empty_extracted_input_updates(), | |
| "", | |
| 0.0, | |
| "include", | |
| "", | |
| "", | |
| "", | |
| "", | |
| download_html, | |
| gr.update(value=None), | |
| ) | |
| if file_obj is None: | |
| return _failure("Please upload an Excel file.") | |
| try: | |
| excel_path = resolve_uploaded_path(file_obj) | |
| if str(excel_path).strip() == "": | |
| return _failure("Please upload an Excel file.") | |
| df = pd.read_excel(str(excel_path)) | |
| except Exception as exc: | |
| return _failure(f"Failed to read Excel: {exc}") | |
| if df.empty: | |
| return _failure("Excel file is empty.") | |
| url_column = choose_url_column(df, url_column_text.strip() if url_column_text else None) | |
| selected_target_columns = _parse_target_columns_for_ui(target_columns_text, url_column_text, df) | |
| incomplete_rows = detect_incomplete_rows(df, selected_target_columns) | |
| normalized_description_columns, description_updates = build_description_input_updates( | |
| selected_target_columns, | |
| description_columns if isinstance(description_columns, list) else [], | |
| list(description_values), | |
| ) | |
| extracted_columns_for_ui, extracted_updates = build_extracted_input_updates(selected_target_columns, {}) | |
| position, row_idx, article_md, url_md, current_md, missing_md, counter = render_current_row( | |
| df, | |
| incomplete_rows, | |
| 0, | |
| url_column, | |
| selected_target_columns, | |
| ) | |
| status = ( | |
| f"Loaded {len(df)} rows. Found {len(incomplete_rows)} rows with missing target values." | |
| if len(incomplete_rows) > 0 | |
| else "Loaded file, but no incomplete rows were found for the selected target columns." | |
| ) | |
| if len(selected_target_columns) > MAX_DESCRIPTION_FIELDS: | |
| status += ( | |
| f" Showing description inputs for the first {MAX_DESCRIPTION_FIELDS} target columns." | |
| ) | |
| if len(selected_target_columns) > MAX_EXTRACT_FIELDS: | |
| status += ( | |
| f" Showing extracted inputs for the first {MAX_EXTRACT_FIELDS} target columns." | |
| ) | |
| description_map = build_description_values_from_inputs( | |
| description_columns, | |
| list(description_values), | |
| selected_target_columns, | |
| ) | |
| saved_description_values = [description_map.get(col, "") for col in normalized_description_columns] | |
| saved_excel_path = persist_uploaded_file(user_id, file_obj, "uploaded_excel.xlsx") | |
| working_df_path = persist_dataframe(user_id, df) | |
| downloadable_path = persist_downloadable_dataframe(user_id, df) | |
| download_html = build_inline_download_html(downloadable_path) | |
| if not downloadable_path: | |
| status += " Download export is currently unavailable; try again after processing a row." | |
| save_session_meta( | |
| user_id, | |
| { | |
| "target_columns_text": target_columns_text or "", | |
| "url_column_text": url_column_text or "", | |
| "description_columns": normalized_description_columns, | |
| "description_values": saved_description_values, | |
| "extracted_columns": extracted_columns_for_ui, | |
| "extracted_values": ["" for _ in extracted_columns_for_ui], | |
| "evidence": "", | |
| "confidence": 0.0, | |
| "decision": "include", | |
| "criteria_rationale": "", | |
| "labels_current": "", | |
| "labels_suggested": "", | |
| "labels_rationale": "", | |
| "excel_path": saved_excel_path or "", | |
| "criteria_path": persist_uploaded_file(user_id, criteria_file_obj, "criteria.yml") | |
| or load_session_meta(user_id).get("criteria_path", ""), | |
| "df_path": working_df_path, | |
| "download_path": downloadable_path or "", | |
| } | |
| ) | |
| return ( | |
| df, | |
| incomplete_rows, | |
| position, | |
| row_idx, | |
| selected_target_columns, | |
| url_column, | |
| {}, | |
| normalized_description_columns, | |
| extracted_columns_for_ui, | |
| status, | |
| article_md, | |
| url_md, | |
| current_md, | |
| missing_md, | |
| counter, | |
| *description_updates, | |
| *extracted_updates, | |
| "", | |
| 0.0, | |
| "include", | |
| "", | |
| "", | |
| "", | |
| "", | |
| download_html, | |
| gr.update(value=None), | |
| ) | |
| def parse_pdf(file_obj) -> str: | |
| if file_obj is None: | |
| raise ValueError("Please upload a PDF file.") | |
| path = resolve_uploaded_path(file_obj) | |
| if str(path).strip() == "": | |
| raise ValueError("Please upload a PDF file.") | |
| try: | |
| with fitz.open(str(path)) as doc: | |
| text_chunks = [page.get_text("text") for page in doc] | |
| except Exception as exc: | |
| raise ValueError(f"Invalid or unreadable PDF: {exc}") from exc | |
| text = "\n".join(text_chunks).strip() | |
| if not text: | |
| raise ValueError("No text extracted from PDF. OCR fallback is not implemented in this MVP.") | |
| return text | |
| def load_prompt_file(path: Path) -> str: | |
| try: | |
| return path.read_text(encoding="utf-8").strip() | |
| except FileNotFoundError as exc: | |
| raise RuntimeError(f"Prompt file not found: {path}") from exc | |
| except Exception as exc: | |
| raise RuntimeError(f"Failed to load prompt file {path}: {exc}") from exc | |
| def build_user_prompt(text: str, column_names: list[str], column_descriptions: dict[str, str]) -> dict[str, Any]: | |
| description_block = {col: column_descriptions.get(col, "") for col in column_names} | |
| template_raw = load_prompt_file(USER_PROMPT_TEMPLATE_PATH) | |
| template = Template(template_raw) | |
| rendered = template.substitute( | |
| fields_schema_json=json.dumps({col: "string" for col in column_names}), | |
| fill_only_requested_fields_json=json.dumps(column_names), | |
| column_descriptions_json=json.dumps(description_block), | |
| article_text=json.dumps(text), | |
| ) | |
| try: | |
| return json.loads(rendered) | |
| except json.JSONDecodeError as exc: | |
| raise RuntimeError(f"User prompt template rendered invalid JSON: {exc}") from exc | |
| def build_criteria_user_prompt(text: str, criteria: dict[str, Any]) -> dict[str, Any]: | |
| template_raw = load_prompt_file(USER_CRITERIA_TEMPLATE_PATH) | |
| template = Template(template_raw) | |
| rendered = template.substitute( | |
| topic_json=json.dumps(criteria.get("topic", "")), | |
| inclusion_criteria_json=json.dumps(criteria.get("inclusion_criteria", [])), | |
| exclusion_criteria_json=json.dumps(criteria.get("exclusion_criteria", [])), | |
| notes_json=json.dumps(criteria.get("notes", "")), | |
| article_text=json.dumps(text), | |
| ) | |
| try: | |
| return json.loads(rendered) | |
| except json.JSONDecodeError as exc: | |
| raise RuntimeError(f"Criteria user prompt rendered invalid JSON: {exc}") from exc | |
| def build_labels_user_prompt(text: str, current_labels: list[str]) -> dict[str, Any]: | |
| template_raw = load_prompt_file(USER_LABELS_TEMPLATE_PATH) | |
| template = Template(template_raw) | |
| rendered = template.substitute( | |
| current_labels_json=json.dumps(current_labels), | |
| article_text=json.dumps(text), | |
| ) | |
| try: | |
| return json.loads(rendered) | |
| except json.JSONDecodeError as exc: | |
| raise RuntimeError(f"Labels user prompt rendered invalid JSON: {exc}") from exc | |
| def _azure_client() -> AzureOpenAI: | |
| endpoint = os.getenv("AZURE_OPENAI_ENDPOINT") | |
| api_key = os.getenv("AZURE_OPENAI_API_KEY") | |
| api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-08-01-preview") | |
| if not endpoint or not api_key: | |
| raise RuntimeError("AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY must be set.") | |
| return AzureOpenAI( | |
| azure_endpoint=endpoint, | |
| api_key=api_key, | |
| api_version=api_version, | |
| ) | |
| def _call_llm_json(system_prompt: str, user_prompt: dict[str, Any]) -> dict[str, Any]: | |
| deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4.1-mini") | |
| client = _azure_client() | |
| request_timeout = float(os.getenv("AZURE_OPENAI_TIMEOUT_SECONDS", "90")) | |
| try: | |
| response = client.chat.completions.create( | |
| model=deployment, | |
| temperature=0, | |
| response_format={"type": "json_object"}, | |
| timeout=request_timeout, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": json.dumps(user_prompt)}, | |
| ], | |
| ) | |
| except Exception as exc: | |
| raise RuntimeError(f"Azure OpenAI request failed: {exc}") from exc | |
| content = response.choices[0].message.content if response.choices else "" | |
| if not content: | |
| raise RuntimeError("LLM returned empty content.") | |
| try: | |
| return json.loads(content) | |
| except json.JSONDecodeError as exc: | |
| raise RuntimeError(f"LLM output is not valid JSON: {exc}") from exc | |
| def extract_with_llm(text: str, column_names: list[str], column_descriptions: dict[str, str]) -> dict[str, Any]: | |
| system_prompt = load_prompt_file(SYSTEM_PROMPT_PATH) | |
| user_prompt = build_user_prompt(text, column_names, column_descriptions) | |
| parsed = _call_llm_json(system_prompt, user_prompt) | |
| normalized_fields = coerce_fields_from_llm(parsed, column_names) | |
| normalized_evidence_snippets = normalize_evidence_snippets(parsed, column_names, normalized_fields) | |
| evidence_text = format_evidence_for_ui(normalized_evidence_snippets) | |
| if is_debug_enabled(): | |
| print("[DEBUG] LLM parsed response:", parsed) | |
| print("[DEBUG] Parsed keys:", list(parsed.keys())) | |
| print("[DEBUG] Requested columns:", column_names) | |
| print("[DEBUG] Extracted fields:", normalized_fields) | |
| confidence_raw = parsed.get("confidence", 0) | |
| try: | |
| confidence = float(confidence_raw) | |
| except Exception: | |
| confidence = 0.0 | |
| confidence = min(max(confidence, 0.0), 1.0) | |
| decision = str(parsed.get("decision", "include")).strip().lower() | |
| if decision not in VALID_DECISIONS: | |
| decision = "include" | |
| return { | |
| "fields": normalized_fields, | |
| "evidence": evidence_text, | |
| "evidence_snippets": normalized_evidence_snippets, | |
| "confidence": confidence, | |
| "decision": decision, | |
| } | |
| def evaluate_with_criteria_llm(text: str, criteria: dict[str, Any]) -> dict[str, Any]: | |
| system_prompt = load_prompt_file(SYSTEM_CRITERIA_PROMPT_PATH) | |
| user_prompt = build_criteria_user_prompt(text, criteria) | |
| parsed = _call_llm_json(system_prompt, user_prompt) | |
| confidence_raw = parsed.get("confidence", 0) | |
| try: | |
| confidence = float(confidence_raw) | |
| except Exception: | |
| confidence = 0.0 | |
| confidence = min(max(confidence, 0.0), 1.0) | |
| decision = str(parsed.get("decision", "include")).strip().lower() | |
| if decision not in VALID_DECISIONS: | |
| decision = "include" | |
| rationale = str(parsed.get("rationale", "")).strip() | |
| return { | |
| "decision": decision, | |
| "confidence": confidence, | |
| "rationale": rationale, | |
| } | |
| def validate_rayyan_labels_llm(text: str, current_labels: list[str]) -> dict[str, Any]: | |
| system_prompt = load_prompt_file(SYSTEM_LABELS_PROMPT_PATH) | |
| user_prompt = build_labels_user_prompt(text, current_labels) | |
| parsed = _call_llm_json(system_prompt, user_prompt) | |
| suggested = parsed.get("suggested_labels", []) | |
| rationale = str(parsed.get("rationale", "")).strip() | |
| if not isinstance(suggested, list): | |
| suggested = [] | |
| suggested_labels = list(dict.fromkeys([str(item).strip() for item in suggested if str(item).strip()])) | |
| # Keep switch-only behavior: same label count as original when labels exist. | |
| if current_labels: | |
| if len(suggested_labels) != len(current_labels): | |
| suggested_labels = current_labels[:] | |
| rationale = "" | |
| if not suggested_labels: | |
| suggested_labels = current_labels[:] | |
| if suggested_labels == current_labels: | |
| rationale = "" | |
| return { | |
| "current_labels": current_labels, | |
| "suggested_labels": suggested_labels, | |
| "rationale": rationale, | |
| } | |
| def labels_to_text(labels: list[str]) -> str: | |
| if not labels: | |
| return "" | |
| return ", ".join(labels) | |
| def update_row(df: pd.DataFrame, row_index: int, values: dict[str, Any]) -> pd.DataFrame: | |
| for col, val in values.items(): | |
| if col in df.columns: | |
| try: | |
| df.at[row_index, col] = val | |
| except (TypeError, ValueError): | |
| # Some Excel columns are inferred as float64 when mostly empty. | |
| # Upcast that column so text values from extraction can be stored. | |
| df[col] = df[col].astype("object") | |
| df.at[row_index, col] = val | |
| return df | |
| def process_pdf_and_extract( | |
| pdf_file, | |
| criteria_file, | |
| user_id_input: str, | |
| df: pd.DataFrame, | |
| current_row_index: int | None, | |
| target_columns: list[str], | |
| description_columns: list[str], | |
| *description_values: str, | |
| progress=gr.Progress(), | |
| request: gr.Request | None = None, | |
| ): | |
| user_id = resolve_user_id(explicit_user_id=user_id_input, request=request) | |
| if df is None or current_row_index is None: | |
| return empty_extracted_state("Load Excel and start screening first.") | |
| try: | |
| debug_log("Process PDF started", {"row_index": current_row_index}) | |
| progress(0.15, desc="Extracting text from PDF") | |
| text = parse_pdf(pdf_file) | |
| criteria = parse_criteria_file(criteria_file) if criteria_file is not None else None | |
| missing_columns = get_missing_columns(df, current_row_index, target_columns) | |
| if len(missing_columns) == 0: | |
| return empty_extracted_state("Current row has no missing target fields.") | |
| descriptions = build_description_values_from_inputs( | |
| description_columns, | |
| list(description_values), | |
| missing_columns, | |
| ) | |
| labels_column = "RAYYAN_Labels" if "RAYYAN_Labels" in df.columns else None | |
| current_labels = parse_labels_csv(df.loc[current_row_index, labels_column]) if labels_column else [] | |
| progress(0.50, desc="Running parallel LLM workflows") | |
| workflow_timeout = float(os.getenv("WORKFLOW_TIMEOUT_SECONDS", "120")) | |
| warnings: list[str] = [] | |
| with ThreadPoolExecutor(max_workers=3) as executor: | |
| extraction_future = executor.submit(extract_with_llm, text, missing_columns, descriptions) | |
| criteria_future = ( | |
| executor.submit(evaluate_with_criteria_llm, text, criteria) | |
| if criteria is not None | |
| else None | |
| ) | |
| labels_future = executor.submit(validate_rayyan_labels_llm, text, current_labels) | |
| try: | |
| result = extraction_future.result(timeout=workflow_timeout) | |
| except Exception as exc: | |
| raise RuntimeError(f"Extraction workflow failed: {exc}") from exc | |
| criteria_result = None | |
| if criteria_future is not None: | |
| try: | |
| criteria_result = criteria_future.result(timeout=workflow_timeout) | |
| except Exception as exc: | |
| warnings.append(f"Criteria workflow failed: {exc}") | |
| debug_log("Criteria workflow failed", repr(exc)) | |
| labels_result = { | |
| "current_labels": current_labels, | |
| "suggested_labels": current_labels, | |
| "rationale": "", | |
| } | |
| try: | |
| labels_result = labels_future.result(timeout=workflow_timeout) | |
| except Exception as exc: | |
| warnings.append(f"RAYYAN labels workflow failed: {exc}") | |
| debug_log("Labels workflow failed", repr(exc)) | |
| if criteria_result is not None: | |
| result["decision"] = criteria_result["decision"] | |
| result["confidence"] = criteria_result["confidence"] | |
| criteria_rationale_ui = "" | |
| if criteria_result is not None: | |
| criteria_rationale_ui = criteria_result.get("rationale", "") or "" | |
| labels_current_ui = labels_to_text(labels_result.get("current_labels", [])) | |
| labels_suggested_ui = labels_to_text(labels_result.get("suggested_labels", [])) | |
| labels_rationale_ui = str(labels_result.get("rationale", "")).strip() | |
| extracted_columns, extracted_updates = build_extracted_input_updates( | |
| missing_columns, | |
| result["fields"], | |
| ) | |
| extraction_status = "Extraction completed. Review and Accept/Edit/Reject." | |
| if criteria is None: | |
| extraction_status = ( | |
| "Extraction completed without criteria.yml; confidence/decision are based on extraction output. " | |
| "Upload criteria.yml to override them with criteria screening." | |
| ) | |
| if extracted_columns and all(str(result["fields"].get(col, "")).strip() == "" for col in extracted_columns): | |
| extraction_status = ( | |
| "Extraction completed, but all extracted fields are empty. " | |
| "Check column descriptions/PDF content. Enable APP_DEBUG=1 to inspect raw model output." | |
| ) | |
| if warnings: | |
| extraction_status = f"{extraction_status} Warnings: {' | '.join(warnings)}" | |
| final_evidence_text = result["evidence"] | |
| result["evidence"] = final_evidence_text | |
| result["labels_current"] = labels_current_ui | |
| result["labels_suggested"] = labels_suggested_ui | |
| result["labels_rationale"] = labels_rationale_ui | |
| result["criteria_rationale"] = criteria_rationale_ui | |
| description_values_list = list(description_values) | |
| saved_description_columns = description_columns if isinstance(description_columns, list) else [] | |
| save_session_meta( | |
| user_id, | |
| { | |
| "description_columns": saved_description_columns, | |
| "description_values": description_values_list[: len(saved_description_columns)], | |
| "extracted_columns": extracted_columns, | |
| "extracted_values": [str(result["fields"].get(col, "")) for col in extracted_columns], | |
| "evidence": final_evidence_text, | |
| "confidence": float(result["confidence"]), | |
| "decision": result["decision"], | |
| "criteria_rationale": criteria_rationale_ui, | |
| "labels_current": labels_current_ui, | |
| "labels_suggested": labels_suggested_ui, | |
| "labels_rationale": labels_rationale_ui, | |
| "criteria_path": persist_uploaded_file(user_id, criteria_file, "criteria.yml") | |
| or load_session_meta(user_id).get("criteria_path", ""), | |
| "pdf_path": persist_uploaded_file(user_id, pdf_file, "uploaded_pdf.pdf") | |
| or load_session_meta(user_id).get("pdf_path", ""), | |
| } | |
| ) | |
| progress(1.0, desc="Done") | |
| debug_log("Process PDF completed", {"warnings": warnings, "decision": result["decision"]}) | |
| return ( | |
| result, | |
| extracted_columns, | |
| *extracted_updates, | |
| final_evidence_text, | |
| result["confidence"], | |
| result["decision"], | |
| criteria_rationale_ui, | |
| labels_current_ui, | |
| labels_suggested_ui, | |
| labels_rationale_ui, | |
| extraction_status, | |
| ) | |
| except Exception as exc: | |
| debug_log("Process PDF failed", repr(exc)) | |
| return empty_extracted_state(f"Processing failed: {exc}") | |
| def accept_extraction( | |
| extracted_columns: list[str], | |
| user_id_input: str, | |
| df: pd.DataFrame, | |
| current_row_index: int | None, | |
| incomplete_rows: list[int], | |
| position: int, | |
| url_column: str, | |
| target_columns: list[str], | |
| *extracted_values: str, | |
| request: gr.Request | None = None, | |
| ): | |
| user_id = resolve_user_id(explicit_user_id=user_id_input, request=request) | |
| empty_extracted_updates = build_empty_extracted_input_updates() | |
| if df is None or current_row_index is None: | |
| download_html = build_inline_download_html(None) | |
| return ( | |
| df, | |
| position, | |
| current_row_index, | |
| {}, | |
| "", | |
| "", | |
| "", | |
| "", | |
| "", | |
| "Nothing to accept.", | |
| [], | |
| *empty_extracted_updates, | |
| "", | |
| 0.0, | |
| "include", | |
| "", | |
| "", | |
| "", | |
| "", | |
| download_html, | |
| gr.update(value=None), | |
| ) | |
| fields = build_extracted_values_from_inputs(extracted_columns, list(extracted_values)) | |
| df = update_row(df, current_row_index, fields) | |
| new_position = position + 1 | |
| next_position, next_row, article_md, url_md, current_md, missing_md, counter = render_current_row( | |
| df, | |
| incomplete_rows, | |
| new_position, | |
| url_column, | |
| target_columns, | |
| ) | |
| downloadable_path = persist_downloadable_dataframe(user_id, df) | |
| download_html = build_inline_download_html(downloadable_path) | |
| status = "Row updated and accepted." | |
| if not downloadable_path: | |
| status = f"{status} Download export could not be refreshed." | |
| save_session_meta( | |
| user_id, | |
| { | |
| "df_path": persist_dataframe(user_id, df), | |
| "extracted_columns": [], | |
| "extracted_values": [], | |
| "evidence": "", | |
| "confidence": 0.0, | |
| "decision": "include", | |
| "criteria_rationale": "", | |
| "labels_current": "", | |
| "labels_suggested": "", | |
| "labels_rationale": "", | |
| "position": next_position, | |
| "current_row_index": next_row, | |
| "download_path": downloadable_path or load_session_meta(user_id).get("download_path", ""), | |
| } | |
| ) | |
| return ( | |
| df, | |
| next_position, | |
| next_row, | |
| {}, | |
| article_md, | |
| url_md, | |
| current_md, | |
| missing_md, | |
| counter, | |
| status, | |
| [], | |
| *empty_extracted_updates, | |
| "", | |
| 0.0, | |
| "include", | |
| "", | |
| "", | |
| "", | |
| "", | |
| download_html, | |
| gr.update(value=None), | |
| ) | |
| def skip_row( | |
| user_id_input: str, | |
| df: pd.DataFrame, | |
| incomplete_rows: list[int], | |
| position: int, | |
| url_column: str, | |
| target_columns: list[str], | |
| request: gr.Request | None = None, | |
| ): | |
| user_id = resolve_user_id(explicit_user_id=user_id_input, request=request) | |
| empty_extracted_updates = build_empty_extracted_input_updates() | |
| if df is None: | |
| download_html = build_inline_download_html(None) | |
| return ( | |
| df, | |
| position, | |
| None, | |
| "", | |
| "", | |
| "", | |
| "", | |
| "", | |
| "No dataset loaded.", | |
| [], | |
| *empty_extracted_updates, | |
| "", | |
| 0.0, | |
| "include", | |
| "", | |
| "", | |
| "", | |
| "", | |
| {}, | |
| download_html, | |
| gr.update(value=None), | |
| ) | |
| new_position = position + 1 | |
| next_position, next_row, article_md, url_md, current_md, missing_md, counter = render_current_row( | |
| df, | |
| incomplete_rows, | |
| new_position, | |
| url_column, | |
| target_columns, | |
| ) | |
| downloadable_path = persist_downloadable_dataframe(user_id, df) | |
| download_html = build_inline_download_html(downloadable_path) | |
| status = "Row skipped." | |
| if not downloadable_path: | |
| status = f"{status} Existing download may be stale." | |
| save_session_meta( | |
| user_id, | |
| { | |
| "df_path": persist_dataframe(user_id, df), | |
| "extracted_columns": [], | |
| "extracted_values": [], | |
| "evidence": "", | |
| "confidence": 0.0, | |
| "decision": "include", | |
| "criteria_rationale": "", | |
| "labels_current": "", | |
| "labels_suggested": "", | |
| "labels_rationale": "", | |
| "position": next_position, | |
| "current_row_index": next_row, | |
| "download_path": downloadable_path or load_session_meta(user_id).get("download_path", ""), | |
| } | |
| ) | |
| return ( | |
| df, | |
| next_position, | |
| next_row, | |
| article_md, | |
| url_md, | |
| current_md, | |
| missing_md, | |
| counter, | |
| status, | |
| [], | |
| *empty_extracted_updates, | |
| "", | |
| 0.0, | |
| "include", | |
| "", | |
| "", | |
| "", | |
| "", | |
| {}, | |
| download_html, | |
| gr.update(value=None), | |
| ) | |
| def reject_extraction(user_id_input: str, request: gr.Request | None = None): | |
| user_id = resolve_user_id(explicit_user_id=user_id_input, request=request) | |
| save_session_meta( | |
| user_id, | |
| { | |
| "extracted_columns": [], | |
| "extracted_values": [], | |
| "evidence": "", | |
| "confidence": 0.0, | |
| "decision": "include", | |
| "criteria_rationale": "", | |
| "labels_current": "", | |
| "labels_suggested": "", | |
| "labels_rationale": "", | |
| } | |
| ) | |
| return empty_extracted_state("Extraction rejected. Upload another PDF or try again.") | |
| def restore_saved_session(user_id_input: str, request: gr.Request | None = None): | |
| user_id = resolve_user_id(explicit_user_id=user_id_input, request=request) | |
| meta = load_session_meta(user_id) | |
| target_columns_text = str(meta.get("target_columns_text", "")) | |
| url_column_text = str(meta.get("url_column_text", "")) | |
| description_columns = meta.get("description_columns", []) | |
| description_values = meta.get("description_values", []) | |
| if not isinstance(description_columns, list): | |
| description_columns = [] | |
| if not isinstance(description_values, list): | |
| description_values = [] | |
| excel_path = str(meta.get("df_path", "") or meta.get("excel_path", "")) | |
| pdf_path = str(meta.get("pdf_path", "")) | |
| criteria_path = str(meta.get("criteria_path", "")) | |
| download_path = str(meta.get("download_path", "")) | |
| excel_exists = bool(excel_path) and Path(excel_path).exists() | |
| pdf_exists = bool(pdf_path) and Path(pdf_path).exists() | |
| criteria_exists = bool(criteria_path) and Path(criteria_path).exists() | |
| download_exists = bool(download_path) and Path(download_path).exists() | |
| if excel_exists: | |
| loaded = load_excel( | |
| SimpleNamespace(name=excel_path), | |
| SimpleNamespace(name=criteria_path) if criteria_exists else None, | |
| user_id, | |
| target_columns_text, | |
| url_column_text, | |
| description_columns, | |
| *description_values, | |
| request=request, | |
| ) | |
| loaded = list(loaded[:-1]) # drop pdf clear update; demo.load sets pdf explicitly above | |
| loaded[9] = f"{loaded[9]} Restored saved session." | |
| else: | |
| loaded = list( | |
| load_excel( | |
| None, | |
| SimpleNamespace(name=criteria_path) if criteria_exists else None, | |
| user_id, | |
| target_columns_text, | |
| url_column_text, | |
| description_columns, | |
| *description_values, | |
| request=request, | |
| ) | |
| )[:-1] # drop pdf clear update; demo.load sets pdf explicitly above | |
| loaded[9] = "No saved session found." | |
| base_extracted_start = 15 + MAX_DESCRIPTION_FIELDS | |
| default_evidence_idx = base_extracted_start + MAX_EXTRACT_FIELDS | |
| default_confidence_idx = default_evidence_idx + 1 | |
| default_decision_idx = default_evidence_idx + 2 | |
| default_criteria_rationale_idx = default_evidence_idx + 3 | |
| default_labels_current_idx = default_evidence_idx + 4 | |
| default_labels_suggested_idx = default_evidence_idx + 5 | |
| default_labels_rationale_idx = default_evidence_idx + 6 | |
| saved_extracted_columns = meta.get("extracted_columns", []) | |
| saved_extracted_values = meta.get("extracted_values", []) | |
| if not isinstance(saved_extracted_columns, list): | |
| saved_extracted_columns = [] | |
| if not isinstance(saved_extracted_values, list): | |
| saved_extracted_values = [] | |
| saved_fields = build_extracted_values_from_inputs(saved_extracted_columns, saved_extracted_values) | |
| restored_extracted_columns, restored_extracted_updates = build_extracted_input_updates( | |
| saved_extracted_columns, | |
| saved_fields, | |
| ) | |
| if not restored_extracted_columns: | |
| restored_extracted_updates = build_empty_extracted_input_updates() | |
| extracted_state = { | |
| "fields": {col: saved_fields.get(col, "") for col in restored_extracted_columns}, | |
| "evidence": str(meta.get("evidence", "")), | |
| "confidence": float(meta.get("confidence", 0.0)), | |
| "decision": str(meta.get("decision", "include")), | |
| "criteria_rationale": str(meta.get("criteria_rationale", "")), | |
| "labels_current": str(meta.get("labels_current", "")), | |
| "labels_suggested": str(meta.get("labels_suggested", meta.get("label_suggestions", ""))), | |
| "labels_rationale": str(meta.get("labels_rationale", "")), | |
| } | |
| loaded[6] = extracted_state | |
| loaded[8] = restored_extracted_columns | |
| loaded[base_extracted_start : base_extracted_start + MAX_EXTRACT_FIELDS] = restored_extracted_updates | |
| loaded[default_evidence_idx] = extracted_state["evidence"] | |
| loaded[default_confidence_idx] = extracted_state["confidence"] | |
| loaded[default_decision_idx] = extracted_state["decision"] if extracted_state["decision"] in VALID_DECISIONS else "include" | |
| loaded[default_criteria_rationale_idx] = extracted_state["criteria_rationale"] | |
| loaded[default_labels_current_idx] = extracted_state["labels_current"] | |
| loaded[default_labels_suggested_idx] = extracted_state["labels_suggested"] | |
| loaded[default_labels_rationale_idx] = extracted_state["labels_rationale"] | |
| loaded[-1] = build_inline_download_html(download_path if download_exists else None) | |
| return ( | |
| user_id, | |
| gr.update(value=excel_path if excel_exists else None), | |
| gr.update(value=pdf_path if pdf_exists else None), | |
| gr.update(value=criteria_path if criteria_exists else None), | |
| target_columns_text, | |
| url_column_text, | |
| *loaded, | |
| gr.update(value=pdf_path if pdf_exists else None), | |
| ) | |
| def get_auth_config() -> list[tuple[str, str]] | tuple[str, str] | None: | |
| """Build Gradio basic auth config from environment variables. | |
| Expected Space Secrets: | |
| - USER1, USER2, ... with value "(username,password)" or "username,password" | |
| - Legacy fallback: | |
| - SPACE_APP_PASSWORD (required to enable legacy auth) | |
| - SPACE_APP_USERNAME (optional, defaults to 'admin') | |
| """ | |
| users: list[tuple[str, str]] = [] | |
| for key in sorted(os.environ.keys()): | |
| if not re.fullmatch(r"USER\d+", key): | |
| continue | |
| raw = os.getenv(key, "").strip() | |
| if not raw: | |
| continue | |
| username = "" | |
| password = "" | |
| try: | |
| parsed = ast.literal_eval(raw) | |
| if isinstance(parsed, tuple) and len(parsed) == 2: | |
| username = str(parsed[0]).strip() | |
| password = str(parsed[1]).strip() | |
| except Exception: | |
| parts = [part.strip() for part in raw.split(",", 1)] | |
| if len(parts) == 2: | |
| username, password = parts[0], parts[1] | |
| if username and password: | |
| users.append((username, password)) | |
| if users: | |
| return users | |
| password = os.getenv("SPACE_APP_PASSWORD", "").strip() | |
| if not password: | |
| return None | |
| username = os.getenv("SPACE_APP_USERNAME", "admin").strip() or "admin" | |
| return username, password | |
| _setup_storage_paths() | |
| with gr.Blocks(title="Scientific Article Screener") as demo: | |
| gr.Markdown("# Scientific Article Screener") | |
| gr.Markdown( | |
| "Upload an Excel file and process one incomplete row at a time with a PDF." | |
| ) | |
| # Session state | |
| df_state = gr.State(None) | |
| incomplete_rows_state = gr.State([]) | |
| position_state = gr.State(0) | |
| current_row_state = gr.State(None) | |
| target_columns_state = gr.State([]) | |
| url_column_state = gr.State("") | |
| extracted_state = gr.State({}) | |
| description_columns_state = gr.State([]) | |
| extracted_columns_state = gr.State([]) | |
| user_id_state = gr.State("default") | |
| with gr.Row(): | |
| # LEFT PANEL | |
| with gr.Column(scale=1): | |
| excel_file = gr.File(label="Upload Excel (.xlsx)", file_types=[".xlsx"]) | |
| criteria_file = gr.File(label="Upload criteria.yml (optional)", file_types=[".yml", ".yaml"]) | |
| target_columns_input = gr.Textbox( | |
| label="Target columns (comma-separated)", | |
| placeholder="Leave empty to use all columns except URL column", | |
| ) | |
| url_column_input = gr.Textbox( | |
| label="URL column name (optional)", | |
| placeholder="Leave empty to auto-detect", | |
| ) | |
| gr.Markdown("### Field descriptions") | |
| description_inputs: list[gr.Textbox] = [] | |
| for idx in range(MAX_DESCRIPTION_FIELDS): | |
| description_inputs.append( | |
| gr.Textbox( | |
| label=f"Description {idx + 1}", | |
| lines=4, | |
| visible=False, | |
| ) | |
| ) | |
| start_btn = gr.Button("Start screening", variant="primary") | |
| download_links_md = gr.HTML("") | |
| # WORKSPACE (previous middle + right, wider) | |
| with gr.Column(scale=2): | |
| row_counter = gr.Markdown("No row selected.") | |
| article_url_md = gr.Markdown("") | |
| article_details_md = gr.Markdown("") | |
| current_values_md = gr.Markdown("", visible=False) | |
| missing_columns_md = gr.Markdown("", visible=False) | |
| pdf_file = gr.File(label="Upload PDF", file_types=[".pdf"]) | |
| process_pdf_btn = gr.Button("Process PDF", variant="primary") | |
| gr.Markdown("### Extracted fields") | |
| extracted_inputs: list[gr.Textbox] = [] | |
| for idx in range(MAX_EXTRACT_FIELDS): | |
| extracted_inputs.append( | |
| gr.Textbox( | |
| label=f"Extracted field {idx + 1}", | |
| lines=1, | |
| visible=False, | |
| ) | |
| ) | |
| evidence_box = gr.Textbox(label="Evidence snippet", lines=4) | |
| confidence_box = gr.Slider(label="Confidence", minimum=0.0, maximum=1.0, step=0.01, value=0.0) | |
| decision_box = gr.Radio(label="Include/Exclude decision", choices=["include", "exclude"], value="include") | |
| criteria_rationale_box = gr.Textbox(label="Criteria rationale", lines=4) | |
| labels_current_box = gr.Textbox(label="RAYYAN current labels", lines=2) | |
| labels_suggested_box = gr.Textbox(label="RAYYAN suggested labels", lines=2) | |
| labels_rationale_box = gr.Textbox(label="RAYYAN label-switch rationale", lines=4) | |
| with gr.Row(): | |
| accept_btn = gr.Button("Accept", variant="primary") | |
| reject_btn = gr.Button("Reject") | |
| skip_btn = gr.Button("Skip") | |
| status_box = gr.Markdown("Ready.") | |
| base_row_outputs = [ | |
| df_state, | |
| incomplete_rows_state, | |
| position_state, | |
| current_row_state, | |
| target_columns_state, | |
| url_column_state, | |
| extracted_state, | |
| description_columns_state, | |
| extracted_columns_state, | |
| status_box, | |
| article_details_md, | |
| article_url_md, | |
| current_values_md, | |
| missing_columns_md, | |
| row_counter, | |
| ] | |
| extraction_outputs = [ | |
| *extracted_inputs, | |
| evidence_box, | |
| confidence_box, | |
| decision_box, | |
| criteria_rationale_box, | |
| labels_current_box, | |
| labels_suggested_box, | |
| labels_rationale_box, | |
| ] | |
| download_outputs = [download_links_md, pdf_file] | |
| demo_load_outputs = [ | |
| user_id_state, | |
| excel_file, | |
| pdf_file, | |
| criteria_file, | |
| target_columns_input, | |
| url_column_input, | |
| *base_row_outputs, | |
| *description_inputs, | |
| *extracted_inputs, | |
| evidence_box, | |
| confidence_box, | |
| decision_box, | |
| criteria_rationale_box, | |
| labels_current_box, | |
| labels_suggested_box, | |
| labels_rationale_box, | |
| download_links_md, | |
| pdf_file, | |
| ] | |
| start_outputs = [*base_row_outputs, *description_inputs, *extraction_outputs, *download_outputs] | |
| process_outputs = [extracted_state, extracted_columns_state, *extraction_outputs, status_box] | |
| accept_outputs = [ | |
| df_state, | |
| position_state, | |
| current_row_state, | |
| extracted_state, | |
| article_details_md, | |
| article_url_md, | |
| current_values_md, | |
| missing_columns_md, | |
| row_counter, | |
| status_box, | |
| extracted_columns_state, | |
| *extraction_outputs, | |
| *download_outputs, | |
| ] | |
| skip_outputs = [ | |
| df_state, | |
| position_state, | |
| current_row_state, | |
| article_details_md, | |
| article_url_md, | |
| current_values_md, | |
| missing_columns_md, | |
| row_counter, | |
| status_box, | |
| extracted_columns_state, | |
| *extraction_outputs, | |
| extracted_state, | |
| *download_outputs, | |
| ] | |
| reject_outputs = [extracted_state, extracted_columns_state, *extraction_outputs, status_box] | |
| demo.load(fn=init_user_id, inputs=[], outputs=[user_id_state]).then( | |
| fn=restore_saved_session, | |
| inputs=[user_id_state], | |
| outputs=demo_load_outputs, | |
| ) | |
| target_columns_input.change( | |
| fn=refresh_description_inputs, | |
| inputs=[target_columns_input, url_column_input, df_state, description_columns_state, *description_inputs], | |
| outputs=[description_columns_state, *description_inputs], | |
| ) | |
| url_column_input.change( | |
| fn=refresh_description_inputs, | |
| inputs=[target_columns_input, url_column_input, df_state, description_columns_state, *description_inputs], | |
| outputs=[description_columns_state, *description_inputs], | |
| ) | |
| start_btn.click( | |
| fn=load_excel, | |
| inputs=[excel_file, criteria_file, user_id_state, target_columns_input, url_column_input, description_columns_state, *description_inputs], | |
| outputs=start_outputs, | |
| ) | |
| process_pdf_btn.click( | |
| fn=process_pdf_and_extract, | |
| inputs=[pdf_file, criteria_file, user_id_state, df_state, current_row_state, target_columns_state, description_columns_state, *description_inputs], | |
| outputs=process_outputs, | |
| ) | |
| accept_btn.click( | |
| fn=accept_extraction, | |
| inputs=[ | |
| extracted_columns_state, | |
| user_id_state, | |
| df_state, | |
| current_row_state, | |
| incomplete_rows_state, | |
| position_state, | |
| url_column_state, | |
| target_columns_state, | |
| *extracted_inputs, | |
| ], | |
| outputs=accept_outputs, | |
| ) | |
| skip_btn.click( | |
| fn=skip_row, | |
| inputs=[user_id_state, df_state, incomplete_rows_state, position_state, url_column_state, target_columns_state], | |
| outputs=skip_outputs, | |
| ) | |
| reject_btn.click( | |
| fn=reject_extraction, | |
| inputs=[user_id_state], | |
| outputs=reject_outputs, | |
| ) | |
| if __name__ == "__main__": | |
| auth_config = get_auth_config() | |
| demo.launch( | |
| auth=auth_config, | |
| allowed_paths=[ | |
| str(APP_STORAGE_ROOT.resolve()), | |
| ], | |
| ) | |