diogo.rodrigues.silva
Add user specific sessions
61d06a0
import json
import os
import re
import shutil
import tempfile
import ast
import base64
import asyncio
from concurrent.futures import ThreadPoolExecutor
from string import Template
from datetime import datetime
from pathlib import Path
from types import SimpleNamespace
from typing import Any
import fitz # PyMuPDF
import gradio as gr
import pandas as pd
import yaml
from openai import AzureOpenAI
MAX_DESCRIPTION_FIELDS = 30
MAX_EXTRACT_FIELDS = 30
DEFAULT_STORAGE_ROOT = Path("/data/llm_fulltextscreener") # Path("/tmp/llm_fulltextscreener")
APP_STORAGE_ROOT = Path(os.getenv("APP_STORAGE_DIR", str(DEFAULT_STORAGE_ROOT)))
USERS_ROOT_DIR = APP_STORAGE_ROOT / "users"
MAX_EXPORTED_FILES = 20
MAX_INLINE_DOWNLOAD_BYTES = 8 * 1024 * 1024
VALID_DECISIONS = {"include", "exclude"}
PROMPTS_DIR = Path(__file__).resolve().parent / "prompts"
SYSTEM_PROMPT_PATH = PROMPTS_DIR / "system_prompt.txt"
USER_PROMPT_TEMPLATE_PATH = PROMPTS_DIR / "user_prompt_template.json"
SYSTEM_CRITERIA_PROMPT_PATH = PROMPTS_DIR / "system_criteria_prompt.txt"
USER_CRITERIA_TEMPLATE_PATH = PROMPTS_DIR / "user_criteria_template.json"
SYSTEM_LABELS_PROMPT_PATH = PROMPTS_DIR / "system_labels_prompt.txt"
USER_LABELS_TEMPLATE_PATH = PROMPTS_DIR / "user_labels_template.json"
def patch_asyncio_invalid_fd_cleanup() -> None:
"""
Work around Python 3.11 selector-loop teardown race seen on some runtimes
(including Spaces), where loop __del__ may raise:
ValueError: Invalid file descriptor: -1
"""
original_del = getattr(asyncio.BaseEventLoop, "__del__", None)
if original_del is None or getattr(original_del, "_invalid_fd_guard", False):
return
def _guarded_del(self):
try:
original_del(self)
except ValueError as exc:
if "Invalid file descriptor" not in str(exc):
raise
_guarded_del._invalid_fd_guard = True
asyncio.BaseEventLoop.__del__ = _guarded_del
patch_asyncio_invalid_fd_cleanup()
def is_debug_enabled() -> bool:
return os.getenv("APP_DEBUG", "").strip().lower() in {"1", "true", "yes", "on"}
def debug_log(*parts: Any) -> None:
if is_debug_enabled():
print("[DEBUG]", *parts)
def normalize_key(text: str) -> str:
return re.sub(r"[^a-z0-9]+", "", str(text).strip().lower())
def sanitize_user_id(raw: str) -> str:
cleaned = re.sub(r"[^a-zA-Z0-9._-]+", "_", str(raw or "").strip())
return cleaned or "default"
def resolve_user_id(explicit_user_id: str | None = None, request: gr.Request | None = None) -> str:
if explicit_user_id and str(explicit_user_id).strip():
return sanitize_user_id(explicit_user_id)
req_username = getattr(request, "username", None) if request is not None else None
if req_username and str(req_username).strip():
return sanitize_user_id(str(req_username))
return "default"
def init_user_id(request: gr.Request | None = None) -> str:
return resolve_user_id(request=request)
def get_user_session_dir(user_id: str) -> Path:
return USERS_ROOT_DIR / sanitize_user_id(user_id)
def get_user_session_meta_path(user_id: str) -> Path:
return get_user_session_dir(user_id) / "session.json"
def get_user_session_files_dir(user_id: str) -> Path:
return get_user_session_dir(user_id) / "files"
def get_user_exports_dir(user_id: str) -> Path:
return get_user_session_dir(user_id) / "exports"
def _ensure_session_dirs(user_id: str) -> None:
get_user_session_files_dir(user_id).mkdir(parents=True, exist_ok=True)
get_user_exports_dir(user_id).mkdir(parents=True, exist_ok=True)
def _setup_storage_paths() -> None:
"""Configure writable temp paths in Spaces."""
USERS_ROOT_DIR.mkdir(parents=True, exist_ok=True)
tmp_default = get_user_session_files_dir("default")
tmp_default.mkdir(parents=True, exist_ok=True)
os.environ["TMPDIR"] = str(tmp_default.resolve())
tempfile.tempdir = str(tmp_default.resolve())
def load_session_meta(user_id: str) -> dict[str, Any]:
session_meta_path = get_user_session_meta_path(user_id)
try:
if session_meta_path.exists():
return json.loads(session_meta_path.read_text(encoding="utf-8"))
except Exception:
return {}
return {}
def save_session_meta(user_id: str, updates: dict[str, Any]) -> None:
_ensure_session_dirs(user_id)
session_meta_path = get_user_session_meta_path(user_id)
data = load_session_meta(user_id)
data.update(updates)
session_meta_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
def persist_uploaded_file(user_id: str, file_obj, dest_name: str) -> str | None:
if file_obj is None:
return None
src = resolve_uploaded_path(file_obj)
if not src.exists() or not src.is_file():
return None
_ensure_session_dirs(user_id)
dest = get_user_session_files_dir(user_id) / dest_name
try:
if src.resolve() == dest.resolve():
return str(dest.resolve())
except Exception:
pass
shutil.copy2(src, dest)
return str(dest.resolve())
def resolve_uploaded_path(file_obj) -> Path:
if file_obj is None:
return Path("")
if isinstance(file_obj, (str, Path)):
return Path(file_obj)
file_name = getattr(file_obj, "name", "")
if file_name:
return Path(file_name)
if isinstance(file_obj, dict):
candidate = str(file_obj.get("name", "")).strip()
if candidate:
return Path(candidate)
return Path("")
def persist_dataframe(user_id: str, df: pd.DataFrame) -> str:
_ensure_session_dirs(user_id)
dest = get_user_session_files_dir(user_id) / "working_table.xlsx"
df.to_excel(dest, index=False)
return str(dest.resolve())
def _cleanup_old_exports(user_id: str, max_files: int = MAX_EXPORTED_FILES) -> None:
try:
export_files = [p for p in get_user_exports_dir(user_id).glob("screened_*.xlsx") if p.is_file()]
export_files.sort(key=lambda p: p.stat().st_mtime, reverse=True)
for old_file in export_files[max_files:]:
try:
old_file.unlink()
except Exception:
continue
except Exception:
return
def persist_downloadable_dataframe(user_id: str, df: pd.DataFrame) -> str | None:
_ensure_session_dirs(user_id)
filename = f"screened_{datetime.utcnow().strftime('%Y%m%d_%H%M%S_%f')}.xlsx"
export_path = get_user_exports_dir(user_id) / filename
try:
df.to_excel(export_path, index=False)
if not export_path.exists() or export_path.stat().st_size == 0:
return None
if is_debug_enabled():
print(
f"[DEBUG] Export ready: path={export_path.resolve()} size={export_path.stat().st_size} bytes"
)
_cleanup_old_exports(user_id)
return str(export_path.resolve())
except Exception:
return None
def build_inline_download_html(path: str | None) -> str:
if not path:
return "<p>Download unavailable.</p>"
candidate = Path(path)
if not candidate.exists() or not candidate.is_file():
return "<p>Download unavailable: exported file not found.</p>"
try:
raw = candidate.read_bytes()
except Exception:
return "<p>Download unavailable: could not read exported file.</p>"
if len(raw) == 0:
return "<p>Download unavailable: exported file is empty.</p>"
if len(raw) > MAX_INLINE_DOWNLOAD_BYTES:
size_mb = len(raw) / (1024 * 1024)
return (
f"<p>Inline download disabled for large files ({size_mb:.1f} MB). "
"Reduce export size and try again.</p>"
)
b64 = base64.b64encode(raw).decode("ascii")
filename = candidate.name
href = (
"data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64,"
f"{b64}"
)
return (
"<p><strong>Download:</strong></p>"
f'<a download="{filename}" href="{href}" '
'style="display:inline-block;padding:8px 12px;border:1px solid #888;'
'border-radius:6px;text-decoration:none;">Download Excel</a>'
)
def empty_description_updates() -> list[dict[str, Any]]:
return [
gr.update(label=f"Description {idx + 1}", value="", visible=False)
for idx in range(MAX_DESCRIPTION_FIELDS)
]
def empty_extracted_state(status: str, *, extracted_state: dict[str, Any] | None = None):
return (
extracted_state or {},
[],
*build_empty_extracted_input_updates(),
"",
0.0,
"include",
"",
"",
"",
"",
status,
)
def is_missing(value: Any) -> bool:
if pd.isna(value):
return True
if isinstance(value, str) and value.strip() == "":
return True
return False
def parse_csv_columns(raw_text: str, available_columns: list[str]) -> list[str]:
if not raw_text or not raw_text.strip():
return []
requested = [item.strip() for item in raw_text.split(",") if item.strip()]
return [col for col in requested if col in available_columns]
def choose_url_column(df: pd.DataFrame, preferred: str | None = None) -> str:
if preferred and preferred in df.columns:
return preferred
for col in df.columns:
col_l = str(col).lower()
if "url" in col_l or "link" in col_l:
return col
return str(df.columns[0])
def parse_criteria_file(file_obj) -> dict[str, Any] | None:
if file_obj is None:
return None
path = resolve_uploaded_path(file_obj)
if str(path).strip() == "":
return None
if not path.exists() or not path.is_file():
raise ValueError("Criteria file not found.")
try:
raw = path.read_text(encoding="utf-8")
except Exception as exc:
raise ValueError(f"Failed reading criteria file: {exc}") from exc
try:
parsed = yaml.safe_load(raw)
except Exception as exc:
raise ValueError(f"Invalid YAML in criteria file: {exc}") from exc
if not isinstance(parsed, dict):
raise ValueError("criteria.yml must contain a top-level mapping/object.")
topic = str(parsed.get("topic", "")).strip()
inclusion = parsed.get("inclusion_criteria", [])
exclusion = parsed.get("exclusion_criteria", [])
notes = str(parsed.get("notes", "")).strip()
if not topic:
raise ValueError("criteria.yml requires a non-empty 'topic'.")
if not isinstance(inclusion, list):
raise ValueError("'inclusion_criteria' must be a list of strings.")
if not isinstance(exclusion, list):
raise ValueError("'exclusion_criteria' must be a list of strings.")
inclusion_clean = [str(item).strip() for item in inclusion if str(item).strip()]
exclusion_clean = [str(item).strip() for item in exclusion if str(item).strip()]
return {
"topic": topic,
"inclusion_criteria": inclusion_clean,
"exclusion_criteria": exclusion_clean,
"notes": notes,
}
def parse_labels_csv(raw: Any) -> list[str]:
if raw is None or pd.isna(raw):
return []
text = str(raw).strip()
if not text:
return []
labels = [item.strip() for item in text.split(",") if item.strip()]
return list(dict.fromkeys(labels))
def build_default_descriptions(columns: list[str]) -> dict[str, str]:
return {col: f"Extract the value for '{col}' from the article text." for col in columns}
def build_description_values_from_inputs(
description_columns: list[str],
description_values: list[str],
target_columns: list[str],
) -> dict[str, str]:
defaults = build_default_descriptions(target_columns)
if not isinstance(description_columns, list):
description_columns = []
for idx, col in enumerate(description_columns):
if idx >= len(description_values):
break
col_name = str(col).strip()
desc = str(description_values[idx]).strip()
if col_name in defaults and desc:
defaults[col_name] = desc
return defaults
def build_description_input_updates(
target_columns: list[str],
previous_description_columns: list[str],
previous_description_values: list[str],
) -> tuple[list[str], list[dict[str, Any]]]:
description_map = build_description_values_from_inputs(
previous_description_columns,
previous_description_values,
target_columns,
)
active_columns = target_columns[:MAX_DESCRIPTION_FIELDS]
updates: list[dict[str, Any]] = []
for idx in range(MAX_DESCRIPTION_FIELDS):
if idx < len(active_columns):
col = active_columns[idx]
updates.append(
gr.update(
label=f"Description: {col}",
value=description_map.get(col, ""),
visible=True,
)
)
else:
updates.append(
gr.update(
label=f"Description {idx + 1}",
value="",
visible=False,
)
)
return active_columns, updates
def build_empty_extracted_input_updates() -> list[dict[str, Any]]:
return [
gr.update(label=f"Extracted field {idx + 1}", value="", visible=False)
for idx in range(MAX_EXTRACT_FIELDS)
]
def build_extracted_input_updates(
target_columns: list[str],
field_values: dict[str, Any],
) -> tuple[list[str], list[dict[str, Any]]]:
active_columns = target_columns[:MAX_EXTRACT_FIELDS]
updates: list[dict[str, Any]] = []
for idx in range(MAX_EXTRACT_FIELDS):
if idx < len(active_columns):
col = active_columns[idx]
updates.append(
gr.update(
label=f"Extracted: {col}",
value=str(field_values.get(col, "")),
visible=True,
)
)
else:
updates.append(
gr.update(
label=f"Extracted field {idx + 1}",
value="",
visible=False,
)
)
return active_columns, updates
def build_extracted_values_from_inputs(
extracted_columns: list[str],
extracted_values: list[str],
) -> dict[str, str]:
fields: dict[str, str] = {}
if not isinstance(extracted_columns, list):
return fields
for idx, col in enumerate(extracted_columns):
if idx >= len(extracted_values):
break
fields[str(col)] = str(extracted_values[idx]) if extracted_values[idx] is not None else ""
return fields
def coerce_fields_from_llm(parsed: dict[str, Any], column_names: list[str]) -> dict[str, str]:
raw_fields = parsed.get("fields", {})
fields_dict: dict[str, Any] = {}
if isinstance(raw_fields, dict):
fields_dict = raw_fields
elif isinstance(raw_fields, str):
try:
maybe_obj = json.loads(raw_fields)
if isinstance(maybe_obj, dict):
fields_dict = maybe_obj
except json.JSONDecodeError:
fields_dict = {}
# Fallback: model may place extracted values at top level.
if not fields_dict:
fields_dict = {
col: parsed.get(col, "")
for col in column_names
if col in parsed
}
# Fuzzy fallback: tolerate minor key format differences.
if len(fields_dict) == 0:
normalized_requested = {normalize_key(col): col for col in column_names}
for key, value in parsed.items():
if key in {"fields", "evidence", "confidence", "decision"}:
continue
norm = normalize_key(str(key))
if norm in normalized_requested:
fields_dict[normalized_requested[norm]] = value
return {col: str(fields_dict.get(col, "")) for col in column_names}
def _parse_structured_text(raw: str) -> Any:
txt = raw.strip()
if not txt:
return None
try:
return json.loads(txt)
except Exception:
pass
try:
return ast.literal_eval(txt)
except Exception:
return None
def _coerce_evidence_items(raw: Any) -> list[dict[str, str]]:
items: list[dict[str, str]] = []
if raw is None:
return items
if isinstance(raw, str):
parsed = _parse_structured_text(raw)
if parsed is None:
return items
raw = parsed
if isinstance(raw, dict):
# Supports {"FIELD": "..."} and {"FIELD": ["...", "..."]}
for field, snippet_value in raw.items():
if isinstance(snippet_value, list):
for s in snippet_value:
snippet = str(s).strip()
if snippet:
items.append({"field": str(field).strip(), "snippet": snippet})
else:
snippet = str(snippet_value).strip()
if snippet:
items.append({"field": str(field).strip(), "snippet": snippet})
return items
if isinstance(raw, list):
for item in raw:
if isinstance(item, dict):
field = str(item.get("field", "")).strip()
snippet = str(item.get("snippet", "")).strip()
if field and snippet:
items.append({"field": field, "snippet": snippet})
elif isinstance(item, str):
# String entries without explicit field are ignored here.
continue
return items
def normalize_evidence_snippets(parsed: dict[str, Any], column_names: list[str], fields: dict[str, str]) -> list[dict[str, str]]:
normalized: list[dict[str, str]] = []
# Primary source
for item in _coerce_evidence_items(parsed.get("evidence_snippets", [])):
field = item["field"]
snippet = item["snippet"]
if field in column_names and snippet:
normalized.append({"field": field, "snippet": snippet})
# Secondary source: legacy or malformed `evidence` payload
if not normalized:
for item in _coerce_evidence_items(parsed.get("evidence", "")):
field = item["field"]
snippet = item["snippet"]
if field in column_names and snippet:
normalized.append({"field": field, "snippet": snippet})
# Fallback for legacy single-string evidence.
if not normalized:
legacy_evidence = str(parsed.get("evidence", "")).strip()
if legacy_evidence:
non_empty_fields = [col for col in column_names if str(fields.get(col, "")).strip()]
target_field = non_empty_fields[0] if non_empty_fields else (column_names[0] if column_names else "unknown")
normalized.append({"field": target_field, "snippet": legacy_evidence})
# De-duplicate exact repeats while preserving order.
deduped: list[dict[str, str]] = []
seen: set[tuple[str, str]] = set()
for item in normalized:
key = (item["field"], item["snippet"])
if key in seen:
continue
seen.add(key)
deduped.append(item)
return deduped
def format_evidence_for_ui(snippets: list[dict[str, str]]) -> str:
if not snippets:
return ""
return "\n".join([f"- {item['field']}: {item['snippet']}" for item in snippets])
def detect_incomplete_rows(df: pd.DataFrame, target_columns: list[str]) -> list[int]:
return [
int(idx)
for idx, row in df.iterrows()
if any(is_missing(row.get(col)) for col in target_columns)
]
def get_missing_columns(df: pd.DataFrame, row_index: int, target_columns: list[str]) -> list[str]:
row = df.loc[row_index]
return [col for col in target_columns if is_missing(row.get(col))]
def get_next_row(df: pd.DataFrame, incomplete_rows: list[int], position: int, target_columns: list[str]) -> tuple[int, int | None]:
while position < len(incomplete_rows):
row_idx = incomplete_rows[position]
if len(get_missing_columns(df, row_idx, target_columns)) > 0:
return position, row_idx
position += 1
return position, None
def _find_first_column(df: pd.DataFrame, candidates: list[str]) -> str | None:
normalized = {str(col).lower().strip(): str(col) for col in df.columns}
for candidate in candidates:
if candidate in normalized:
return normalized[candidate]
for col in df.columns:
col_l = str(col).lower()
for candidate in candidates:
if candidate in col_l:
return str(col)
return None
def article_details_markdown(df: pd.DataFrame, row_index: int) -> str:
title_col = _find_first_column(df, ["title", "article title", "paper title", "study title"])
author_col = _find_first_column(df, ["author", "authors", "first author"])
title_value = ""
author_value = ""
if title_col is not None:
raw = df.loc[row_index, title_col]
title_value = "" if pd.isna(raw) else str(raw).strip()
if author_col is not None:
raw = df.loc[row_index, author_col]
author_value = "" if pd.isna(raw) else str(raw).strip()
if not title_value:
title_value = "Unknown"
if not author_value:
author_value = "Unknown"
return f"**Title:** {title_value}\n\n**Author(s):** {author_value}"
def render_current_row(
df: pd.DataFrame | None,
incomplete_rows: list[int] | None,
position: int,
url_column: str,
target_columns: list[str],
) -> tuple[int, int | None, str, str, str, str, str]:
if df is None or incomplete_rows is None or len(incomplete_rows) == 0:
return (
position,
None,
"",
"No rows loaded.",
"",
"",
"",
)
next_position, row_idx = get_next_row(df, incomplete_rows, position, target_columns)
if row_idx is None:
return (
next_position,
None,
"",
"All target rows are complete.",
"",
"",
f"Processed {len(incomplete_rows)} / {len(incomplete_rows)} rows.",
)
article_md = article_details_markdown(df, row_idx)
url_value = str(df.loc[row_idx, url_column]) if url_column in df.columns else ""
url_md = f"[Open article URL]({url_value})" if url_value else "URL not available"
missing_md = ""
current_md = ""
counter = f"Row {next_position + 1} of {len(incomplete_rows)} (index: {row_idx})"
return next_position, row_idx, article_md, url_md, current_md, missing_md, counter
def _parse_target_columns_for_ui(
target_columns_text: str,
url_column_text: str,
df: pd.DataFrame | None,
) -> list[str]:
raw_requested = [item.strip() for item in (target_columns_text or "").split(",") if item.strip()]
deduped_requested = list(dict.fromkeys(raw_requested))
if df is None or df.empty:
return deduped_requested
available_columns = [str(c) for c in df.columns]
url_column = choose_url_column(df, url_column_text.strip() if url_column_text else None)
selected_target_columns = parse_csv_columns(target_columns_text, available_columns)
if not selected_target_columns:
selected_target_columns = [str(c) for c in df.columns if str(c) != url_column]
return selected_target_columns
def refresh_description_inputs(
target_columns_text: str,
url_column_text: str,
df: pd.DataFrame | None,
description_columns: list[str],
*description_values: str,
):
target_columns = _parse_target_columns_for_ui(target_columns_text, url_column_text, df)
normalized_description_columns, description_updates = build_description_input_updates(
target_columns,
description_columns if isinstance(description_columns, list) else [],
list(description_values),
)
return (
normalized_description_columns,
*description_updates,
)
def load_excel(
file_obj,
criteria_file_obj,
user_id_input: str,
target_columns_text: str,
url_column_text: str,
description_columns: list[str],
*description_values: str,
request: gr.Request | None = None,
):
user_id = resolve_user_id(explicit_user_id=user_id_input, request=request)
def _failure(message: str):
download_html = build_inline_download_html(None)
return (
None,
[],
0,
None,
[],
"",
{},
[],
[],
message,
"",
"",
"",
"",
"",
*empty_description_updates(),
*build_empty_extracted_input_updates(),
"",
0.0,
"include",
"",
"",
"",
"",
download_html,
gr.update(value=None),
)
if file_obj is None:
return _failure("Please upload an Excel file.")
try:
excel_path = resolve_uploaded_path(file_obj)
if str(excel_path).strip() == "":
return _failure("Please upload an Excel file.")
df = pd.read_excel(str(excel_path))
except Exception as exc:
return _failure(f"Failed to read Excel: {exc}")
if df.empty:
return _failure("Excel file is empty.")
url_column = choose_url_column(df, url_column_text.strip() if url_column_text else None)
selected_target_columns = _parse_target_columns_for_ui(target_columns_text, url_column_text, df)
incomplete_rows = detect_incomplete_rows(df, selected_target_columns)
normalized_description_columns, description_updates = build_description_input_updates(
selected_target_columns,
description_columns if isinstance(description_columns, list) else [],
list(description_values),
)
extracted_columns_for_ui, extracted_updates = build_extracted_input_updates(selected_target_columns, {})
position, row_idx, article_md, url_md, current_md, missing_md, counter = render_current_row(
df,
incomplete_rows,
0,
url_column,
selected_target_columns,
)
status = (
f"Loaded {len(df)} rows. Found {len(incomplete_rows)} rows with missing target values."
if len(incomplete_rows) > 0
else "Loaded file, but no incomplete rows were found for the selected target columns."
)
if len(selected_target_columns) > MAX_DESCRIPTION_FIELDS:
status += (
f" Showing description inputs for the first {MAX_DESCRIPTION_FIELDS} target columns."
)
if len(selected_target_columns) > MAX_EXTRACT_FIELDS:
status += (
f" Showing extracted inputs for the first {MAX_EXTRACT_FIELDS} target columns."
)
description_map = build_description_values_from_inputs(
description_columns,
list(description_values),
selected_target_columns,
)
saved_description_values = [description_map.get(col, "") for col in normalized_description_columns]
saved_excel_path = persist_uploaded_file(user_id, file_obj, "uploaded_excel.xlsx")
working_df_path = persist_dataframe(user_id, df)
downloadable_path = persist_downloadable_dataframe(user_id, df)
download_html = build_inline_download_html(downloadable_path)
if not downloadable_path:
status += " Download export is currently unavailable; try again after processing a row."
save_session_meta(
user_id,
{
"target_columns_text": target_columns_text or "",
"url_column_text": url_column_text or "",
"description_columns": normalized_description_columns,
"description_values": saved_description_values,
"extracted_columns": extracted_columns_for_ui,
"extracted_values": ["" for _ in extracted_columns_for_ui],
"evidence": "",
"confidence": 0.0,
"decision": "include",
"criteria_rationale": "",
"labels_current": "",
"labels_suggested": "",
"labels_rationale": "",
"excel_path": saved_excel_path or "",
"criteria_path": persist_uploaded_file(user_id, criteria_file_obj, "criteria.yml")
or load_session_meta(user_id).get("criteria_path", ""),
"df_path": working_df_path,
"download_path": downloadable_path or "",
}
)
return (
df,
incomplete_rows,
position,
row_idx,
selected_target_columns,
url_column,
{},
normalized_description_columns,
extracted_columns_for_ui,
status,
article_md,
url_md,
current_md,
missing_md,
counter,
*description_updates,
*extracted_updates,
"",
0.0,
"include",
"",
"",
"",
"",
download_html,
gr.update(value=None),
)
def parse_pdf(file_obj) -> str:
if file_obj is None:
raise ValueError("Please upload a PDF file.")
path = resolve_uploaded_path(file_obj)
if str(path).strip() == "":
raise ValueError("Please upload a PDF file.")
try:
with fitz.open(str(path)) as doc:
text_chunks = [page.get_text("text") for page in doc]
except Exception as exc:
raise ValueError(f"Invalid or unreadable PDF: {exc}") from exc
text = "\n".join(text_chunks).strip()
if not text:
raise ValueError("No text extracted from PDF. OCR fallback is not implemented in this MVP.")
return text
def load_prompt_file(path: Path) -> str:
try:
return path.read_text(encoding="utf-8").strip()
except FileNotFoundError as exc:
raise RuntimeError(f"Prompt file not found: {path}") from exc
except Exception as exc:
raise RuntimeError(f"Failed to load prompt file {path}: {exc}") from exc
def build_user_prompt(text: str, column_names: list[str], column_descriptions: dict[str, str]) -> dict[str, Any]:
description_block = {col: column_descriptions.get(col, "") for col in column_names}
template_raw = load_prompt_file(USER_PROMPT_TEMPLATE_PATH)
template = Template(template_raw)
rendered = template.substitute(
fields_schema_json=json.dumps({col: "string" for col in column_names}),
fill_only_requested_fields_json=json.dumps(column_names),
column_descriptions_json=json.dumps(description_block),
article_text=json.dumps(text),
)
try:
return json.loads(rendered)
except json.JSONDecodeError as exc:
raise RuntimeError(f"User prompt template rendered invalid JSON: {exc}") from exc
def build_criteria_user_prompt(text: str, criteria: dict[str, Any]) -> dict[str, Any]:
template_raw = load_prompt_file(USER_CRITERIA_TEMPLATE_PATH)
template = Template(template_raw)
rendered = template.substitute(
topic_json=json.dumps(criteria.get("topic", "")),
inclusion_criteria_json=json.dumps(criteria.get("inclusion_criteria", [])),
exclusion_criteria_json=json.dumps(criteria.get("exclusion_criteria", [])),
notes_json=json.dumps(criteria.get("notes", "")),
article_text=json.dumps(text),
)
try:
return json.loads(rendered)
except json.JSONDecodeError as exc:
raise RuntimeError(f"Criteria user prompt rendered invalid JSON: {exc}") from exc
def build_labels_user_prompt(text: str, current_labels: list[str]) -> dict[str, Any]:
template_raw = load_prompt_file(USER_LABELS_TEMPLATE_PATH)
template = Template(template_raw)
rendered = template.substitute(
current_labels_json=json.dumps(current_labels),
article_text=json.dumps(text),
)
try:
return json.loads(rendered)
except json.JSONDecodeError as exc:
raise RuntimeError(f"Labels user prompt rendered invalid JSON: {exc}") from exc
def _azure_client() -> AzureOpenAI:
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
api_key = os.getenv("AZURE_OPENAI_API_KEY")
api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-08-01-preview")
if not endpoint or not api_key:
raise RuntimeError("AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY must be set.")
return AzureOpenAI(
azure_endpoint=endpoint,
api_key=api_key,
api_version=api_version,
)
def _call_llm_json(system_prompt: str, user_prompt: dict[str, Any]) -> dict[str, Any]:
deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4.1-mini")
client = _azure_client()
request_timeout = float(os.getenv("AZURE_OPENAI_TIMEOUT_SECONDS", "90"))
try:
response = client.chat.completions.create(
model=deployment,
temperature=0,
response_format={"type": "json_object"},
timeout=request_timeout,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": json.dumps(user_prompt)},
],
)
except Exception as exc:
raise RuntimeError(f"Azure OpenAI request failed: {exc}") from exc
content = response.choices[0].message.content if response.choices else ""
if not content:
raise RuntimeError("LLM returned empty content.")
try:
return json.loads(content)
except json.JSONDecodeError as exc:
raise RuntimeError(f"LLM output is not valid JSON: {exc}") from exc
def extract_with_llm(text: str, column_names: list[str], column_descriptions: dict[str, str]) -> dict[str, Any]:
system_prompt = load_prompt_file(SYSTEM_PROMPT_PATH)
user_prompt = build_user_prompt(text, column_names, column_descriptions)
parsed = _call_llm_json(system_prompt, user_prompt)
normalized_fields = coerce_fields_from_llm(parsed, column_names)
normalized_evidence_snippets = normalize_evidence_snippets(parsed, column_names, normalized_fields)
evidence_text = format_evidence_for_ui(normalized_evidence_snippets)
if is_debug_enabled():
print("[DEBUG] LLM parsed response:", parsed)
print("[DEBUG] Parsed keys:", list(parsed.keys()))
print("[DEBUG] Requested columns:", column_names)
print("[DEBUG] Extracted fields:", normalized_fields)
confidence_raw = parsed.get("confidence", 0)
try:
confidence = float(confidence_raw)
except Exception:
confidence = 0.0
confidence = min(max(confidence, 0.0), 1.0)
decision = str(parsed.get("decision", "include")).strip().lower()
if decision not in VALID_DECISIONS:
decision = "include"
return {
"fields": normalized_fields,
"evidence": evidence_text,
"evidence_snippets": normalized_evidence_snippets,
"confidence": confidence,
"decision": decision,
}
def evaluate_with_criteria_llm(text: str, criteria: dict[str, Any]) -> dict[str, Any]:
system_prompt = load_prompt_file(SYSTEM_CRITERIA_PROMPT_PATH)
user_prompt = build_criteria_user_prompt(text, criteria)
parsed = _call_llm_json(system_prompt, user_prompt)
confidence_raw = parsed.get("confidence", 0)
try:
confidence = float(confidence_raw)
except Exception:
confidence = 0.0
confidence = min(max(confidence, 0.0), 1.0)
decision = str(parsed.get("decision", "include")).strip().lower()
if decision not in VALID_DECISIONS:
decision = "include"
rationale = str(parsed.get("rationale", "")).strip()
return {
"decision": decision,
"confidence": confidence,
"rationale": rationale,
}
def validate_rayyan_labels_llm(text: str, current_labels: list[str]) -> dict[str, Any]:
system_prompt = load_prompt_file(SYSTEM_LABELS_PROMPT_PATH)
user_prompt = build_labels_user_prompt(text, current_labels)
parsed = _call_llm_json(system_prompt, user_prompt)
suggested = parsed.get("suggested_labels", [])
rationale = str(parsed.get("rationale", "")).strip()
if not isinstance(suggested, list):
suggested = []
suggested_labels = list(dict.fromkeys([str(item).strip() for item in suggested if str(item).strip()]))
# Keep switch-only behavior: same label count as original when labels exist.
if current_labels:
if len(suggested_labels) != len(current_labels):
suggested_labels = current_labels[:]
rationale = ""
if not suggested_labels:
suggested_labels = current_labels[:]
if suggested_labels == current_labels:
rationale = ""
return {
"current_labels": current_labels,
"suggested_labels": suggested_labels,
"rationale": rationale,
}
def labels_to_text(labels: list[str]) -> str:
if not labels:
return ""
return ", ".join(labels)
def update_row(df: pd.DataFrame, row_index: int, values: dict[str, Any]) -> pd.DataFrame:
for col, val in values.items():
if col in df.columns:
try:
df.at[row_index, col] = val
except (TypeError, ValueError):
# Some Excel columns are inferred as float64 when mostly empty.
# Upcast that column so text values from extraction can be stored.
df[col] = df[col].astype("object")
df.at[row_index, col] = val
return df
def process_pdf_and_extract(
pdf_file,
criteria_file,
user_id_input: str,
df: pd.DataFrame,
current_row_index: int | None,
target_columns: list[str],
description_columns: list[str],
*description_values: str,
progress=gr.Progress(),
request: gr.Request | None = None,
):
user_id = resolve_user_id(explicit_user_id=user_id_input, request=request)
if df is None or current_row_index is None:
return empty_extracted_state("Load Excel and start screening first.")
try:
debug_log("Process PDF started", {"row_index": current_row_index})
progress(0.15, desc="Extracting text from PDF")
text = parse_pdf(pdf_file)
criteria = parse_criteria_file(criteria_file) if criteria_file is not None else None
missing_columns = get_missing_columns(df, current_row_index, target_columns)
if len(missing_columns) == 0:
return empty_extracted_state("Current row has no missing target fields.")
descriptions = build_description_values_from_inputs(
description_columns,
list(description_values),
missing_columns,
)
labels_column = "RAYYAN_Labels" if "RAYYAN_Labels" in df.columns else None
current_labels = parse_labels_csv(df.loc[current_row_index, labels_column]) if labels_column else []
progress(0.50, desc="Running parallel LLM workflows")
workflow_timeout = float(os.getenv("WORKFLOW_TIMEOUT_SECONDS", "120"))
warnings: list[str] = []
with ThreadPoolExecutor(max_workers=3) as executor:
extraction_future = executor.submit(extract_with_llm, text, missing_columns, descriptions)
criteria_future = (
executor.submit(evaluate_with_criteria_llm, text, criteria)
if criteria is not None
else None
)
labels_future = executor.submit(validate_rayyan_labels_llm, text, current_labels)
try:
result = extraction_future.result(timeout=workflow_timeout)
except Exception as exc:
raise RuntimeError(f"Extraction workflow failed: {exc}") from exc
criteria_result = None
if criteria_future is not None:
try:
criteria_result = criteria_future.result(timeout=workflow_timeout)
except Exception as exc:
warnings.append(f"Criteria workflow failed: {exc}")
debug_log("Criteria workflow failed", repr(exc))
labels_result = {
"current_labels": current_labels,
"suggested_labels": current_labels,
"rationale": "",
}
try:
labels_result = labels_future.result(timeout=workflow_timeout)
except Exception as exc:
warnings.append(f"RAYYAN labels workflow failed: {exc}")
debug_log("Labels workflow failed", repr(exc))
if criteria_result is not None:
result["decision"] = criteria_result["decision"]
result["confidence"] = criteria_result["confidence"]
criteria_rationale_ui = ""
if criteria_result is not None:
criteria_rationale_ui = criteria_result.get("rationale", "") or ""
labels_current_ui = labels_to_text(labels_result.get("current_labels", []))
labels_suggested_ui = labels_to_text(labels_result.get("suggested_labels", []))
labels_rationale_ui = str(labels_result.get("rationale", "")).strip()
extracted_columns, extracted_updates = build_extracted_input_updates(
missing_columns,
result["fields"],
)
extraction_status = "Extraction completed. Review and Accept/Edit/Reject."
if criteria is None:
extraction_status = (
"Extraction completed without criteria.yml; confidence/decision are based on extraction output. "
"Upload criteria.yml to override them with criteria screening."
)
if extracted_columns and all(str(result["fields"].get(col, "")).strip() == "" for col in extracted_columns):
extraction_status = (
"Extraction completed, but all extracted fields are empty. "
"Check column descriptions/PDF content. Enable APP_DEBUG=1 to inspect raw model output."
)
if warnings:
extraction_status = f"{extraction_status} Warnings: {' | '.join(warnings)}"
final_evidence_text = result["evidence"]
result["evidence"] = final_evidence_text
result["labels_current"] = labels_current_ui
result["labels_suggested"] = labels_suggested_ui
result["labels_rationale"] = labels_rationale_ui
result["criteria_rationale"] = criteria_rationale_ui
description_values_list = list(description_values)
saved_description_columns = description_columns if isinstance(description_columns, list) else []
save_session_meta(
user_id,
{
"description_columns": saved_description_columns,
"description_values": description_values_list[: len(saved_description_columns)],
"extracted_columns": extracted_columns,
"extracted_values": [str(result["fields"].get(col, "")) for col in extracted_columns],
"evidence": final_evidence_text,
"confidence": float(result["confidence"]),
"decision": result["decision"],
"criteria_rationale": criteria_rationale_ui,
"labels_current": labels_current_ui,
"labels_suggested": labels_suggested_ui,
"labels_rationale": labels_rationale_ui,
"criteria_path": persist_uploaded_file(user_id, criteria_file, "criteria.yml")
or load_session_meta(user_id).get("criteria_path", ""),
"pdf_path": persist_uploaded_file(user_id, pdf_file, "uploaded_pdf.pdf")
or load_session_meta(user_id).get("pdf_path", ""),
}
)
progress(1.0, desc="Done")
debug_log("Process PDF completed", {"warnings": warnings, "decision": result["decision"]})
return (
result,
extracted_columns,
*extracted_updates,
final_evidence_text,
result["confidence"],
result["decision"],
criteria_rationale_ui,
labels_current_ui,
labels_suggested_ui,
labels_rationale_ui,
extraction_status,
)
except Exception as exc:
debug_log("Process PDF failed", repr(exc))
return empty_extracted_state(f"Processing failed: {exc}")
def accept_extraction(
extracted_columns: list[str],
user_id_input: str,
df: pd.DataFrame,
current_row_index: int | None,
incomplete_rows: list[int],
position: int,
url_column: str,
target_columns: list[str],
*extracted_values: str,
request: gr.Request | None = None,
):
user_id = resolve_user_id(explicit_user_id=user_id_input, request=request)
empty_extracted_updates = build_empty_extracted_input_updates()
if df is None or current_row_index is None:
download_html = build_inline_download_html(None)
return (
df,
position,
current_row_index,
{},
"",
"",
"",
"",
"",
"Nothing to accept.",
[],
*empty_extracted_updates,
"",
0.0,
"include",
"",
"",
"",
"",
download_html,
gr.update(value=None),
)
fields = build_extracted_values_from_inputs(extracted_columns, list(extracted_values))
df = update_row(df, current_row_index, fields)
new_position = position + 1
next_position, next_row, article_md, url_md, current_md, missing_md, counter = render_current_row(
df,
incomplete_rows,
new_position,
url_column,
target_columns,
)
downloadable_path = persist_downloadable_dataframe(user_id, df)
download_html = build_inline_download_html(downloadable_path)
status = "Row updated and accepted."
if not downloadable_path:
status = f"{status} Download export could not be refreshed."
save_session_meta(
user_id,
{
"df_path": persist_dataframe(user_id, df),
"extracted_columns": [],
"extracted_values": [],
"evidence": "",
"confidence": 0.0,
"decision": "include",
"criteria_rationale": "",
"labels_current": "",
"labels_suggested": "",
"labels_rationale": "",
"position": next_position,
"current_row_index": next_row,
"download_path": downloadable_path or load_session_meta(user_id).get("download_path", ""),
}
)
return (
df,
next_position,
next_row,
{},
article_md,
url_md,
current_md,
missing_md,
counter,
status,
[],
*empty_extracted_updates,
"",
0.0,
"include",
"",
"",
"",
"",
download_html,
gr.update(value=None),
)
def skip_row(
user_id_input: str,
df: pd.DataFrame,
incomplete_rows: list[int],
position: int,
url_column: str,
target_columns: list[str],
request: gr.Request | None = None,
):
user_id = resolve_user_id(explicit_user_id=user_id_input, request=request)
empty_extracted_updates = build_empty_extracted_input_updates()
if df is None:
download_html = build_inline_download_html(None)
return (
df,
position,
None,
"",
"",
"",
"",
"",
"No dataset loaded.",
[],
*empty_extracted_updates,
"",
0.0,
"include",
"",
"",
"",
"",
{},
download_html,
gr.update(value=None),
)
new_position = position + 1
next_position, next_row, article_md, url_md, current_md, missing_md, counter = render_current_row(
df,
incomplete_rows,
new_position,
url_column,
target_columns,
)
downloadable_path = persist_downloadable_dataframe(user_id, df)
download_html = build_inline_download_html(downloadable_path)
status = "Row skipped."
if not downloadable_path:
status = f"{status} Existing download may be stale."
save_session_meta(
user_id,
{
"df_path": persist_dataframe(user_id, df),
"extracted_columns": [],
"extracted_values": [],
"evidence": "",
"confidence": 0.0,
"decision": "include",
"criteria_rationale": "",
"labels_current": "",
"labels_suggested": "",
"labels_rationale": "",
"position": next_position,
"current_row_index": next_row,
"download_path": downloadable_path or load_session_meta(user_id).get("download_path", ""),
}
)
return (
df,
next_position,
next_row,
article_md,
url_md,
current_md,
missing_md,
counter,
status,
[],
*empty_extracted_updates,
"",
0.0,
"include",
"",
"",
"",
"",
{},
download_html,
gr.update(value=None),
)
def reject_extraction(user_id_input: str, request: gr.Request | None = None):
user_id = resolve_user_id(explicit_user_id=user_id_input, request=request)
save_session_meta(
user_id,
{
"extracted_columns": [],
"extracted_values": [],
"evidence": "",
"confidence": 0.0,
"decision": "include",
"criteria_rationale": "",
"labels_current": "",
"labels_suggested": "",
"labels_rationale": "",
}
)
return empty_extracted_state("Extraction rejected. Upload another PDF or try again.")
def restore_saved_session(user_id_input: str, request: gr.Request | None = None):
user_id = resolve_user_id(explicit_user_id=user_id_input, request=request)
meta = load_session_meta(user_id)
target_columns_text = str(meta.get("target_columns_text", ""))
url_column_text = str(meta.get("url_column_text", ""))
description_columns = meta.get("description_columns", [])
description_values = meta.get("description_values", [])
if not isinstance(description_columns, list):
description_columns = []
if not isinstance(description_values, list):
description_values = []
excel_path = str(meta.get("df_path", "") or meta.get("excel_path", ""))
pdf_path = str(meta.get("pdf_path", ""))
criteria_path = str(meta.get("criteria_path", ""))
download_path = str(meta.get("download_path", ""))
excel_exists = bool(excel_path) and Path(excel_path).exists()
pdf_exists = bool(pdf_path) and Path(pdf_path).exists()
criteria_exists = bool(criteria_path) and Path(criteria_path).exists()
download_exists = bool(download_path) and Path(download_path).exists()
if excel_exists:
loaded = load_excel(
SimpleNamespace(name=excel_path),
SimpleNamespace(name=criteria_path) if criteria_exists else None,
user_id,
target_columns_text,
url_column_text,
description_columns,
*description_values,
request=request,
)
loaded = list(loaded[:-1]) # drop pdf clear update; demo.load sets pdf explicitly above
loaded[9] = f"{loaded[9]} Restored saved session."
else:
loaded = list(
load_excel(
None,
SimpleNamespace(name=criteria_path) if criteria_exists else None,
user_id,
target_columns_text,
url_column_text,
description_columns,
*description_values,
request=request,
)
)[:-1] # drop pdf clear update; demo.load sets pdf explicitly above
loaded[9] = "No saved session found."
base_extracted_start = 15 + MAX_DESCRIPTION_FIELDS
default_evidence_idx = base_extracted_start + MAX_EXTRACT_FIELDS
default_confidence_idx = default_evidence_idx + 1
default_decision_idx = default_evidence_idx + 2
default_criteria_rationale_idx = default_evidence_idx + 3
default_labels_current_idx = default_evidence_idx + 4
default_labels_suggested_idx = default_evidence_idx + 5
default_labels_rationale_idx = default_evidence_idx + 6
saved_extracted_columns = meta.get("extracted_columns", [])
saved_extracted_values = meta.get("extracted_values", [])
if not isinstance(saved_extracted_columns, list):
saved_extracted_columns = []
if not isinstance(saved_extracted_values, list):
saved_extracted_values = []
saved_fields = build_extracted_values_from_inputs(saved_extracted_columns, saved_extracted_values)
restored_extracted_columns, restored_extracted_updates = build_extracted_input_updates(
saved_extracted_columns,
saved_fields,
)
if not restored_extracted_columns:
restored_extracted_updates = build_empty_extracted_input_updates()
extracted_state = {
"fields": {col: saved_fields.get(col, "") for col in restored_extracted_columns},
"evidence": str(meta.get("evidence", "")),
"confidence": float(meta.get("confidence", 0.0)),
"decision": str(meta.get("decision", "include")),
"criteria_rationale": str(meta.get("criteria_rationale", "")),
"labels_current": str(meta.get("labels_current", "")),
"labels_suggested": str(meta.get("labels_suggested", meta.get("label_suggestions", ""))),
"labels_rationale": str(meta.get("labels_rationale", "")),
}
loaded[6] = extracted_state
loaded[8] = restored_extracted_columns
loaded[base_extracted_start : base_extracted_start + MAX_EXTRACT_FIELDS] = restored_extracted_updates
loaded[default_evidence_idx] = extracted_state["evidence"]
loaded[default_confidence_idx] = extracted_state["confidence"]
loaded[default_decision_idx] = extracted_state["decision"] if extracted_state["decision"] in VALID_DECISIONS else "include"
loaded[default_criteria_rationale_idx] = extracted_state["criteria_rationale"]
loaded[default_labels_current_idx] = extracted_state["labels_current"]
loaded[default_labels_suggested_idx] = extracted_state["labels_suggested"]
loaded[default_labels_rationale_idx] = extracted_state["labels_rationale"]
loaded[-1] = build_inline_download_html(download_path if download_exists else None)
return (
user_id,
gr.update(value=excel_path if excel_exists else None),
gr.update(value=pdf_path if pdf_exists else None),
gr.update(value=criteria_path if criteria_exists else None),
target_columns_text,
url_column_text,
*loaded,
gr.update(value=pdf_path if pdf_exists else None),
)
def get_auth_config() -> list[tuple[str, str]] | tuple[str, str] | None:
"""Build Gradio basic auth config from environment variables.
Expected Space Secrets:
- USER1, USER2, ... with value "(username,password)" or "username,password"
- Legacy fallback:
- SPACE_APP_PASSWORD (required to enable legacy auth)
- SPACE_APP_USERNAME (optional, defaults to 'admin')
"""
users: list[tuple[str, str]] = []
for key in sorted(os.environ.keys()):
if not re.fullmatch(r"USER\d+", key):
continue
raw = os.getenv(key, "").strip()
if not raw:
continue
username = ""
password = ""
try:
parsed = ast.literal_eval(raw)
if isinstance(parsed, tuple) and len(parsed) == 2:
username = str(parsed[0]).strip()
password = str(parsed[1]).strip()
except Exception:
parts = [part.strip() for part in raw.split(",", 1)]
if len(parts) == 2:
username, password = parts[0], parts[1]
if username and password:
users.append((username, password))
if users:
return users
password = os.getenv("SPACE_APP_PASSWORD", "").strip()
if not password:
return None
username = os.getenv("SPACE_APP_USERNAME", "admin").strip() or "admin"
return username, password
_setup_storage_paths()
with gr.Blocks(title="Scientific Article Screener") as demo:
gr.Markdown("# Scientific Article Screener")
gr.Markdown(
"Upload an Excel file and process one incomplete row at a time with a PDF."
)
# Session state
df_state = gr.State(None)
incomplete_rows_state = gr.State([])
position_state = gr.State(0)
current_row_state = gr.State(None)
target_columns_state = gr.State([])
url_column_state = gr.State("")
extracted_state = gr.State({})
description_columns_state = gr.State([])
extracted_columns_state = gr.State([])
user_id_state = gr.State("default")
with gr.Row():
# LEFT PANEL
with gr.Column(scale=1):
excel_file = gr.File(label="Upload Excel (.xlsx)", file_types=[".xlsx"])
criteria_file = gr.File(label="Upload criteria.yml (optional)", file_types=[".yml", ".yaml"])
target_columns_input = gr.Textbox(
label="Target columns (comma-separated)",
placeholder="Leave empty to use all columns except URL column",
)
url_column_input = gr.Textbox(
label="URL column name (optional)",
placeholder="Leave empty to auto-detect",
)
gr.Markdown("### Field descriptions")
description_inputs: list[gr.Textbox] = []
for idx in range(MAX_DESCRIPTION_FIELDS):
description_inputs.append(
gr.Textbox(
label=f"Description {idx + 1}",
lines=4,
visible=False,
)
)
start_btn = gr.Button("Start screening", variant="primary")
download_links_md = gr.HTML("")
# WORKSPACE (previous middle + right, wider)
with gr.Column(scale=2):
row_counter = gr.Markdown("No row selected.")
article_url_md = gr.Markdown("")
article_details_md = gr.Markdown("")
current_values_md = gr.Markdown("", visible=False)
missing_columns_md = gr.Markdown("", visible=False)
pdf_file = gr.File(label="Upload PDF", file_types=[".pdf"])
process_pdf_btn = gr.Button("Process PDF", variant="primary")
gr.Markdown("### Extracted fields")
extracted_inputs: list[gr.Textbox] = []
for idx in range(MAX_EXTRACT_FIELDS):
extracted_inputs.append(
gr.Textbox(
label=f"Extracted field {idx + 1}",
lines=1,
visible=False,
)
)
evidence_box = gr.Textbox(label="Evidence snippet", lines=4)
confidence_box = gr.Slider(label="Confidence", minimum=0.0, maximum=1.0, step=0.01, value=0.0)
decision_box = gr.Radio(label="Include/Exclude decision", choices=["include", "exclude"], value="include")
criteria_rationale_box = gr.Textbox(label="Criteria rationale", lines=4)
labels_current_box = gr.Textbox(label="RAYYAN current labels", lines=2)
labels_suggested_box = gr.Textbox(label="RAYYAN suggested labels", lines=2)
labels_rationale_box = gr.Textbox(label="RAYYAN label-switch rationale", lines=4)
with gr.Row():
accept_btn = gr.Button("Accept", variant="primary")
reject_btn = gr.Button("Reject")
skip_btn = gr.Button("Skip")
status_box = gr.Markdown("Ready.")
base_row_outputs = [
df_state,
incomplete_rows_state,
position_state,
current_row_state,
target_columns_state,
url_column_state,
extracted_state,
description_columns_state,
extracted_columns_state,
status_box,
article_details_md,
article_url_md,
current_values_md,
missing_columns_md,
row_counter,
]
extraction_outputs = [
*extracted_inputs,
evidence_box,
confidence_box,
decision_box,
criteria_rationale_box,
labels_current_box,
labels_suggested_box,
labels_rationale_box,
]
download_outputs = [download_links_md, pdf_file]
demo_load_outputs = [
user_id_state,
excel_file,
pdf_file,
criteria_file,
target_columns_input,
url_column_input,
*base_row_outputs,
*description_inputs,
*extracted_inputs,
evidence_box,
confidence_box,
decision_box,
criteria_rationale_box,
labels_current_box,
labels_suggested_box,
labels_rationale_box,
download_links_md,
pdf_file,
]
start_outputs = [*base_row_outputs, *description_inputs, *extraction_outputs, *download_outputs]
process_outputs = [extracted_state, extracted_columns_state, *extraction_outputs, status_box]
accept_outputs = [
df_state,
position_state,
current_row_state,
extracted_state,
article_details_md,
article_url_md,
current_values_md,
missing_columns_md,
row_counter,
status_box,
extracted_columns_state,
*extraction_outputs,
*download_outputs,
]
skip_outputs = [
df_state,
position_state,
current_row_state,
article_details_md,
article_url_md,
current_values_md,
missing_columns_md,
row_counter,
status_box,
extracted_columns_state,
*extraction_outputs,
extracted_state,
*download_outputs,
]
reject_outputs = [extracted_state, extracted_columns_state, *extraction_outputs, status_box]
demo.load(fn=init_user_id, inputs=[], outputs=[user_id_state]).then(
fn=restore_saved_session,
inputs=[user_id_state],
outputs=demo_load_outputs,
)
target_columns_input.change(
fn=refresh_description_inputs,
inputs=[target_columns_input, url_column_input, df_state, description_columns_state, *description_inputs],
outputs=[description_columns_state, *description_inputs],
)
url_column_input.change(
fn=refresh_description_inputs,
inputs=[target_columns_input, url_column_input, df_state, description_columns_state, *description_inputs],
outputs=[description_columns_state, *description_inputs],
)
start_btn.click(
fn=load_excel,
inputs=[excel_file, criteria_file, user_id_state, target_columns_input, url_column_input, description_columns_state, *description_inputs],
outputs=start_outputs,
)
process_pdf_btn.click(
fn=process_pdf_and_extract,
inputs=[pdf_file, criteria_file, user_id_state, df_state, current_row_state, target_columns_state, description_columns_state, *description_inputs],
outputs=process_outputs,
)
accept_btn.click(
fn=accept_extraction,
inputs=[
extracted_columns_state,
user_id_state,
df_state,
current_row_state,
incomplete_rows_state,
position_state,
url_column_state,
target_columns_state,
*extracted_inputs,
],
outputs=accept_outputs,
)
skip_btn.click(
fn=skip_row,
inputs=[user_id_state, df_state, incomplete_rows_state, position_state, url_column_state, target_columns_state],
outputs=skip_outputs,
)
reject_btn.click(
fn=reject_extraction,
inputs=[user_id_state],
outputs=reject_outputs,
)
if __name__ == "__main__":
auth_config = get_auth_config()
demo.launch(
auth=auth_config,
allowed_paths=[
str(APP_STORAGE_ROOT.resolve()),
],
)