Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import random | |
| from datasets import load_dataset, Dataset | |
| from typing import Dict, List | |
| import re | |
| import datetime | |
| import pandas as pd | |
| import os | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| def sanitize_theme_name(theme: str) -> str: | |
| sanitized = re.sub(r'[^\w\s-]', '', theme) | |
| sanitized = re.sub(r'[-\s]+', '_', sanitized) | |
| return sanitized.lower().strip('_') | |
| def load_questions_from_dataset() -> Dict[str, List[Dict]]: | |
| dataset = load_dataset("SASLeaderboard/sas_opposition_exam_data") | |
| dataset = dataset['train'].filter(lambda x: x['theme'] == 'FEA Urología') | |
| questions_by_theme = {} | |
| skipped = 0 | |
| loaded = 0 | |
| for item in dataset: | |
| theme = item['theme'] | |
| answers = item.get('answers', []) | |
| correct_answer = item.get('correct_answer', '') | |
| if not answers or not correct_answer or len(answers) < 3: | |
| skipped += 1 | |
| continue | |
| while len(answers) < 4: | |
| answers.append(answers[-1]) | |
| sanitized_theme = sanitize_theme_name(theme) | |
| if sanitized_theme not in questions_by_theme: | |
| questions_by_theme[sanitized_theme] = [] | |
| try: | |
| question = { | |
| "statement": item['statement'], | |
| "options": { | |
| "A": answers[0], | |
| "B": answers[1], | |
| "C": answers[2], | |
| "D": answers[3] | |
| }, | |
| "real_answer": correct_answer, | |
| "theme": theme, | |
| "sanitized_theme": sanitized_theme, | |
| "version": item.get('version', 'Default') | |
| } | |
| questions_by_theme[sanitized_theme].append(question) | |
| loaded += 1 | |
| except Exception as e: | |
| skipped += 1 | |
| continue | |
| print(f"Loaded {loaded} questions, skipped {skipped} invalid questions") | |
| return questions_by_theme | |
| def ask_ai_model(api_key: str, model: str, question: Dict) -> tuple: | |
| prompt = f"""You are a medical expert taking a urology examination. Please analyze this question carefully and provide your answer. | |
| Question: {question['statement']} | |
| Options: | |
| A) {question['options']['A']} | |
| B) {question['options']['B']} | |
| C) {question['options']['C']} | |
| D) {question['options']['D']} | |
| Please provide your answer in this exact format: | |
| Answer: [A/B/C/D] | |
| Then provide your reasoning.""" | |
| try: | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "model": model, | |
| "messages": [ | |
| {"role": "user", "content": prompt} | |
| ] | |
| } | |
| response = requests.post("https://openrouter.ai/api/v1/chat/completions", | |
| headers=headers, json=data) | |
| if response.status_code == 200: | |
| result = response.json() | |
| ai_response = result["choices"][0]["message"]["content"] | |
| ai_answer = extract_answer_from_response(ai_response) | |
| return ai_response, ai_answer | |
| else: | |
| error_msg = f"API Error {response.status_code}: {response.text}" | |
| return error_msg, "API_ERROR" | |
| except Exception as e: | |
| error_msg = f"Request Error: {str(e)}" | |
| return error_msg, "REQUEST_ERROR" | |
| def extract_answer_from_response(ai_response: str) -> str: | |
| if not ai_response: | |
| return "EMPTY_RESPONSE" | |
| lines = ai_response.split('\n') | |
| for line in lines: | |
| line_clean = line.strip().lower() | |
| if line_clean.startswith('answer:'): | |
| answer_part = line.split(':')[1].strip().upper() | |
| for char in answer_part: | |
| if char in ['A', 'B', 'C', 'D']: | |
| return char | |
| for line in lines: | |
| line_clean = line.strip().lower() | |
| if 'answer is' in line_clean: | |
| for char in ['A', 'B', 'C', 'D']: | |
| if char.lower() in line_clean.split('answer is')[1][:5]: | |
| return char | |
| for line in lines[:5]: | |
| line_upper = line.upper() | |
| for char in ['A', 'B', 'C', 'D']: | |
| patterns = [f"{char})", f"{char}.", f"OPTION {char}", f"({char})", f"CHOICE {char}"] | |
| for pattern in patterns: | |
| if pattern in line_upper: | |
| return char | |
| for line in lines[:3]: | |
| for char in ['A', 'B', 'C', 'D']: | |
| if char in line.upper(): | |
| return char | |
| for char in ['A', 'B', 'C', 'D']: | |
| if char in ai_response.upper(): | |
| return char | |
| return "NO_ANSWER_FOUND" | |
| def save_results_to_dataset(results: List[Dict], hf_token: str = None) -> str: | |
| if not results: | |
| return "No results to save" | |
| if not hf_token: | |
| hf_token = os.getenv("HF_TOKEN") | |
| if not hf_token: | |
| return "❌ HuggingFace token not found. Please provide it in the interface or set HF_TOKEN environment variable" | |
| try: | |
| try: | |
| existing_dataset = load_dataset("SASLeaderboard/results", use_auth_token=hf_token) | |
| existing_data = existing_dataset['train'].to_pandas() | |
| except Exception: | |
| existing_data = None | |
| new_data = pd.DataFrame(results) | |
| if existing_data is not None: | |
| combined_data = pd.concat([existing_data, new_data], ignore_index=True) | |
| else: | |
| combined_data = new_data | |
| new_dataset = Dataset.from_pandas(combined_data) | |
| new_dataset.push_to_hub( | |
| "SASLeaderboard/results", | |
| token=hf_token, | |
| commit_message=f"Automated exam results for {results[0]['model']} - {len(results)} questions" | |
| ) | |
| return f"✅ Successfully saved {len(results)} results to SASLeaderboard/results dataset" | |
| except Exception as e: | |
| return f"❌ Error saving results: {str(e)}" | |
| def run_automated_exam(api_key: str, model: str, hf_token: str = ""): | |
| if not api_key: | |
| yield "❌ Please provide OpenRouter API key" | |
| return | |
| if not model: | |
| yield "❌ Please provide model name" | |
| return | |
| yield "🔄 Loading questions from dataset..." | |
| try: | |
| all_questions_by_theme = load_questions_from_dataset() | |
| all_questions = [] | |
| for theme_questions in all_questions_by_theme.values(): | |
| all_questions.extend(theme_questions) | |
| total_questions = len(all_questions) | |
| yield f"✅ Loaded {total_questions} questions from dataset" | |
| yield f"🚀 Starting automated exam with ALL {total_questions} questions for model: {model}" | |
| session_id = f"{model}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| results = [] | |
| correct_count = 0 | |
| for i, question in enumerate(all_questions): | |
| ai_response, ai_answer = ask_ai_model(api_key, model, question) | |
| if ai_answer in ["API_ERROR", "REQUEST_ERROR", "EMPTY_RESPONSE", "NO_ANSWER_FOUND"]: | |
| yield f"⚠️ Question {i+1}: Error getting answer - {ai_answer}. Response: {ai_response[:100]}..." | |
| is_correct = ai_answer == question['real_answer'] | |
| if is_correct: | |
| correct_count += 1 | |
| result = { | |
| "session_id": session_id, | |
| "model": model, | |
| "question": question['statement'], | |
| "theme": question['theme'], | |
| "correct_answer": question['real_answer'], | |
| "ai_answer": ai_answer, | |
| "ai_response": ai_response, | |
| "is_correct": is_correct, | |
| "timestamp": datetime.datetime.now().isoformat(), | |
| "options_a": question['options']['A'], | |
| "options_b": question['options']['B'], | |
| "options_c": question['options']['C'], | |
| "options_d": question['options']['D'] | |
| } | |
| results.append(result) | |
| current_accuracy = (correct_count / (i + 1)) * 100 | |
| status_emoji = "✅" if is_correct else "❌" | |
| yield f"{status_emoji} Q{i+1}/{total_questions}: Accuracy: {correct_count}/{i+1} ({current_accuracy:.1f}%) | AI: {ai_answer} vs Correct: {question['real_answer']} | {question['statement'][:80]}..." | |
| yield f"💾 Saving results to HuggingFace dataset..." | |
| save_result = save_results_to_dataset(results, hf_token) | |
| final_accuracy = (correct_count / len(results)) * 100 | |
| yield f""" | |
| ## 🎯 Exam Complete! | |
| **Final Results:** | |
| - Model: {model} | |
| - Total Questions: {len(results)} | |
| - Correct Answers: {correct_count} | |
| - Final Accuracy: {final_accuracy:.1f}% | |
| - Session ID: {session_id} | |
| **Save Status:** {save_result} | |
| The automated exam has been completed successfully! | |
| """ | |
| except Exception as e: | |
| yield f"❌ Error during automated exam: {str(e)}" | |
| with gr.Blocks(title="Automated Urology Exam System") as demo: | |
| gr.Markdown("# Automated Urology Exam System") | |
| gr.Markdown("This system automatically runs a complete urology exam for AI models using ALL available questions (~150) and saves results to the dataset.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("**Get your API key:** [OpenRouter Keys](https://openrouter.ai/settings/keys)") | |
| api_key_input = gr.Textbox( | |
| label="OpenRouter API Key", | |
| type="password", | |
| placeholder="Enter your OpenRouter API key" | |
| ) | |
| with gr.Column(): | |
| gr.Markdown("**Find models:** [OpenRouter Models](https://openrouter.ai/models)") | |
| model_input = gr.Textbox( | |
| label="Model Name", | |
| placeholder="e.g., anthropic/claude-3-sonnet", | |
| value="anthropic/claude-3-sonnet" | |
| ) | |
| with gr.Row(): | |
| start_exam_btn = gr.Button("Start Automated Exam", variant="primary", size="lg") | |
| with gr.Row(): | |
| progress_output = gr.Textbox( | |
| label="Exam Progress - Dont close this window", | |
| placeholder="Exam progress will be displayed here...", | |
| lines=15, | |
| max_lines=20, | |
| interactive=False | |
| ) | |
| start_exam_btn.click( | |
| run_automated_exam, | |
| inputs=[api_key_input, model_input], | |
| outputs=[progress_output] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |