knowledge-value-lab / batch_eval.py
feedcomposer's picture
Upload folder using huggingface_hub
7cc493d verified
Raw
History Blame Contribute Delete
8.09 kB
"""
KVL Batch Evaluator
Usage:
python batch_eval.py docs/ # evaluate all .md files in a directory
python batch_eval.py a.md b.md # evaluate specific files
python batch_eval.py docs/ --workers 4 # 4 documents in parallel
python batch_eval.py docs/ --out results/ # write reports to a specific folder
"""
from __future__ import annotations
import os, sys, time, json, argparse
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
import anthropic
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer
from kvl import ingestor, scorer, report
from kvl.modules import novelty, retrieval, generation, attribution, demand
from kvl.config import model_meta
load_dotenv()
def bar(score, width=16):
filled = round(score / 100 * width)
return "β–ˆ" * filled + "β–‘" * (width - filled)
def evaluate_one(path: Path, client, embedder, quiet: bool = False) -> dict:
"""Run the full KVL pipeline on a single file. Returns a result dict."""
t0 = time.time()
def log(msg):
if not quiet:
print(f" [{path.name}] {msg}")
try:
doc = ingestor.parse(path.read_text(encoding="utf-8"))
log(f"Ingested β€” {doc.word_count:,} words, {len(doc.chunks)} chunks")
module_results = {}
module_results["novelty"] = novelty.evaluate(client, doc, progress_cb=log)
module_results["retrieval"] = retrieval.evaluate(client, doc, embedder, progress_cb=log)
module_results["generation"] = generation.evaluate(client, doc, progress_cb=log)
module_results["attribution"] = attribution.evaluate(client, doc, module_results["generation"], embedder, progress_cb=log)
module_results["demand"] = demand.evaluate(client, doc, progress_cb=log)
dim_scores = {k: module_results[k]["score"] for k in module_results}
kvs_result = scorer.compute(dim_scores)
meta = model_meta(datetime.now().strftime("%Y-%m-%d %H:%M UTC"))
elapsed = round(time.time() - t0)
log(f"Done in {elapsed}s β€” KVS {kvs_result['kvs']}/100 ({kvs_result['classification']})")
return {
"file": str(path),
"title": doc.title,
"kvs": kvs_result["kvs"],
"classification": kvs_result["classification"],
"dim_scores": dim_scores,
"kvs_result": kvs_result,
"module_results": module_results,
"meta": meta,
"elapsed_s": elapsed,
"error": None,
}
except Exception as e:
log(f"ERROR: {e}")
return {"file": str(path), "title": path.stem, "error": str(e)}
def main():
parser = argparse.ArgumentParser(description="KVL Batch Evaluator")
parser.add_argument("inputs", nargs="+", help=".md files or directories")
parser.add_argument("--workers", type=int, default=3,
help="Number of documents to evaluate in parallel (default: 3)")
parser.add_argument("--out", default=None,
help="Output directory for reports (default: alongside each input file)")
parser.add_argument("--quiet", action="store_true",
help="Suppress per-document progress output")
parser.add_argument("--summary-only", action="store_true",
help="Print summary table only, skip writing individual reports")
args = parser.parse_args()
# Collect all .md files
paths: list[Path] = []
for inp in args.inputs:
p = Path(inp)
if p.is_dir():
paths.extend(sorted(p.glob("**/*.md")))
elif p.suffix == ".md" and p.exists():
paths.append(p)
else:
print(f"Warning: skipping {inp} (not a .md file or directory)")
if not paths:
print("No .md files found.")
sys.exit(1)
print(f"\n{'='*60}")
print(f"KVL Batch Evaluator β€” {len(paths)} document(s), {args.workers} worker(s)")
print(f"{'='*60}\n")
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
print("Loading embedding model...")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
print("Ready.\n")
out_dir = Path(args.out) if args.out else None
if out_dir:
out_dir.mkdir(parents=True, exist_ok=True)
results = []
batch_start = time.time()
with ThreadPoolExecutor(max_workers=args.workers) as pool:
futures = {
pool.submit(evaluate_one, p, client, embedder, args.quiet): p
for p in paths
}
for future in as_completed(futures):
result = future.result()
results.append(result)
# Write individual report immediately as each document finishes
if not args.summary_only and result.get("error") is None:
dest = out_dir or futures[future].parent
report_path = dest / (futures[future].stem + "_kvl_report.md")
rpt = report.generate(
result["title"],
result["kvs_result"],
result["module_results"],
result["meta"],
)
report_path.write_text(rpt, encoding="utf-8")
if not args.quiet:
print(f" Report β†’ {report_path}\n")
batch_elapsed = round(time.time() - batch_start)
# ── Summary table ─────────────────────────────────────────────────────────
results.sort(key=lambda r: r.get("kvs", -1), reverse=True)
print(f"\n{'='*60}")
print(f"BATCH SUMMARY ({len(paths)} docs, {batch_elapsed}s total)")
print(f"{'='*60}")
print(f"{'Document':<30} {'KVS':>4} {'Bar':<16} {'Classification'}")
print("-" * 72)
for r in results:
if r.get("error"):
print(f"{'[ERROR] ' + Path(r['file']).name:<30} {r['error'][:40]}")
continue
name = Path(r["file"]).stem[:28]
kvs = r["kvs"]
cls = r["classification"]
print(f"{name:<30} {kvs:>4} {bar(kvs):<16} {cls}")
print()
# ── Per-dimension breakdown ───────────────────────────────────────────────
good = [r for r in results if not r.get("error")]
if good:
dims = ["novelty", "retrieval", "generation", "attribution", "demand"]
dim_labels = {
"novelty": "Knowledge Novelty (30%)",
"retrieval": "Retrieval Utility (20%)",
"generation": "Generation Utility (25%)",
"attribution": "Attribution (15%)",
"demand": "Demand Utility (10%)",
}
print("Dimension averages across all documents:")
for d in dims:
scores = [r["dim_scores"][d] for r in good]
avg = round(sum(scores) / len(scores))
print(f" {dim_labels[d]} avg {avg:>3}/100 {bar(avg)}")
# ── Save machine-readable summary ─────────────────────────────────────────
summary_dest = out_dir or Path(".")
summary_path = summary_dest / f"kvl_batch_summary_{datetime.now().strftime('%Y%m%d_%H%M')}.json"
summary_data = [
{
"file": r["file"],
"title": r.get("title"),
"kvs": r.get("kvs"),
"classification": r.get("classification"),
"dim_scores": r.get("dim_scores"),
"elapsed_s": r.get("elapsed_s"),
"error": r.get("error"),
}
for r in results
]
summary_path.write_text(json.dumps(summary_data, indent=2), encoding="utf-8")
print(f"\nJSON summary β†’ {summary_path}")
print(f"Total wall-clock time: {batch_elapsed}s "
f"(vs {sum(r.get('elapsed_s',0) for r in good)}s sequential)")
if __name__ == "__main__":
main()