import json import os from pathlib import Path from datetime import datetime from enum import Enum, auto import re import uuid import pandas as pd import streamlit as st import time from collections import OrderedDict from html_style import html_style from config import config, MCM_PROBLEMS, stages_config from agent_api import ModelingAgentSystem # Define enum for mapping UI stages to agent steps class GenerationStep(Enum): PROBLEM_ANALYSIS = "Problem Analysis" HIGH_LEVEL_MODELING = "High-Level Modeling" TASK_DECOMPOSITION = "Task Decomposition" DEPENDENCY_ANALYSIS = "Dependency Analysis" # Task-specific steps follow pattern: Task {N} {StepType} @classmethod def get_task_step(cls, task_id, step_type): """Returns a step name for a specific task and step type.""" return f"Task {task_id} {step_type}" @classmethod def is_task_step(cls, step_name): """Checks if a step name belongs to a task-specific step.""" return step_name.startswith("Task ") @classmethod def parse_task_step(cls, step_name): """Parses a task step name into task_id and step_type components.""" if not cls.is_task_step(step_name): return None, None parts = step_name.split(" ", 2) if len(parts) < 3: return None, None try: task_id = int(parts[1]) step_type = parts[2] return task_id, step_type except ValueError: return None, None # --- Configuration --- APP_TITLE = "Mathematical Modeling Multi-Agent System" st.set_page_config(page_title=APP_TITLE, layout="wide", initial_sidebar_state="expanded") # --- Custom CSS for Styling --- # Requirement 4: Optimize UI style and color scheme st.markdown(html_style, unsafe_allow_html=True) # Define Stage Structure (Template) # CHAPTER_STRUCTURE = OrderedDict(config['stages']) # --- Helper Functions --- def initialize_session_state(): """Initializes session state variables if they don't exist.""" defaults = { "api_base_url": "", # Default empty string "api_key": "", # Default empty string "api_model": "", # Default model suggestion "problem_input_method": "Select Predefined", "selected_mcm_problem": list(MCM_PROBLEMS.keys())[0], "custom_background": "", "custom_requirements": "", "problem_defined": False, "problem_definition_expanded": True, # Requirement 1: Control expander state "current_problem_title": "", "current_problem_details": "", "stages": {}, # Stores content, status, edit_mode for each stage "active_stage": None, # Will be set after chapter structure is determined "critique_rounds": 0, "modeling_agent": None, # Will store the ModelingAgentSystem instance "agent_initialized": False, # Flag to track if agent was initialized "session_id": str(uuid.uuid4()), # Generate a unique session ID "uploaded_files": [], # Track uploaded data files } # # Try to load secrets if available # try: # if "API_BASE_URL" in st.secrets: # defaults["api_base_url"] = st.secrets["API_BASE_URL"] # if "API_KEY" in st.secrets: # defaults["api_key"] = st.secrets["API_KEY"] # except FileNotFoundError: # # Secrets file not found, use defaults # pass for key, value in defaults.items(): if key not in st.session_state: st.session_state[key] = value # Initialize stage structure in session state if problem is defined but stages aren't init'd # or if the stages structure is empty if st.session_state.problem_defined and not st.session_state.stages: reset_stages() def reset_stages(): """ Resets stage content and progress. """ # Initialize ModelingAgentSystem if API is configured if st.session_state.api_base_url and st.session_state.api_key and st.session_state.api_model: initialize_modeling_agent() # If agent is initialized, use its planned steps if st.session_state.agent_initialized and st.session_state.modeling_agent: # Get the planned steps from the agent planned_steps = st.session_state.modeling_agent.get_planned_steps() completed_steps = st.session_state.modeling_agent.get_completed_steps() # Initialize stages based on planned steps st.session_state.stages = {} for step_name in planned_steps: status = "completed" if step_name in completed_steps else "not_started" # For the first non-completed step, mark as in_progress if status == "not_started" and not any(s["status"] == "in_progress" for s in st.session_state.stages.values()): status = "in_progress" st.session_state.stages[step_name] = { "title": step_name, # Use step name as title "content": f"# {step_name}\n\n*(Content not generated yet. Use the '✨ Generate Content' button and '✏️ Edit Content' after generation.)*", "status": status, "edit_mode": False, "prompt_hint": f"Complete the {step_name} section. {stages_config.get(step_name, '')}" # Default prompt hint } # Set active stage to first in-progress stage, or first stage if none in_progress_stages = [k for k, v in st.session_state.stages.items() if v["status"] == "in_progress"] if in_progress_stages: st.session_state.active_stage = in_progress_stages[0] elif st.session_state.stages: st.session_state.active_stage = next(iter(st.session_state.stages)) else: # Fallback to static structure if agent is not initialized chapter_structure = OrderedDict(config['stages']) st.session_state.stages = { key: {"title": data["title"], "content": f"# {data['title']}\n\n*(Content not generated yet. Use the '✨ Generate Content' button and '✏️ Edit Content' after generation.)*", "status": "not_started", # States: not_started, in_progress, completed "edit_mode": False, "prompt_hint": data["prompt_hint"]} for key, data in chapter_structure.items() } # Mark the first stage as in_progress initially if chapter_structure: first_stage_key = list(chapter_structure.keys())[0] if first_stage_key in st.session_state.stages: st.session_state.stages[first_stage_key]["status"] = "in_progress" st.session_state.active_stage = first_stage_key def initialize_modeling_agent(): """Initialize the ModelingAgentSystem with current configuration""" # Check if API details are provided in session state first if not st.session_state.api_base_url or not st.session_state.api_key or not st.session_state.api_model: st.error("API Base URL, API Key, and Model Name must be configured before initializing the agent.") st.session_state.modeling_agent = None st.session_state.agent_initialized = False return # Stop initialization if config is missing # Create config for the agent system agent_config = { 'top_method_num': 6, 'problem_analysis_round': st.session_state.critique_rounds, 'problem_modeling_round': st.session_state.critique_rounds, 'task_formulas_round': st.session_state.critique_rounds, 'tasknum': 1, 'chart_num': 1, 'model_name': st.session_state.api_model, "method_name": "MAPP-Streamlit" } # Create or get problem path based on selected problem if st.session_state.problem_input_method == "Select Predefined": problem_key = st.session_state.selected_mcm_problem # Use problem from predefined problems problem_path = f'../data/actor_data/input/problem/{problem_key}.json' print(problem_path) dataset_path = "" # No custom dataset for predefined problems else: # Create dirs for session data session_dir = Path('data') / st.session_state.session_id problem_dir = session_dir data_dir = session_dir / 'data' output_dir = session_dir / 'output' for dir_path in [problem_dir, data_dir, output_dir]: dir_path.mkdir(parents=True, exist_ok=True) # Create custom problem JSON file dataset_paths = [] for uploaded_file_info in st.session_state.uploaded_files: dataset_paths.append(str(uploaded_file_info['path'])) custom_problem = { "title": "Custom Problem", # Set default title instead of using custom_title "background": st.session_state.custom_background, "problem_requirement": st.session_state.custom_requirements, "dataset_path": dataset_paths, "dataset_description": {}, "variable_description": {}, "addendum": "" } problem_path = problem_dir / 'problem.json' with open(problem_path, 'w') as f: json.dump(custom_problem, f, indent=2) dataset_path = str(data_dir) # Initialize the agent system output_path = str(Path('data') / st.session_state.session_id / 'output') os.makedirs(output_path, exist_ok=True) try: # Initialize ModelingAgentSystem st.session_state.modeling_agent = ModelingAgentSystem( problem_path=str(problem_path), # Ensure path is a string config=agent_config, dataset_path=dataset_path, output_path=output_path, name=st.session_state.selected_mcm_problem if st.session_state.problem_input_method == "Select Predefined" else "custom" ) # Set API details using reset method after initialization st.session_state.modeling_agent.llm.reset( api_key=st.session_state.api_key, api_base=st.session_state.api_base_url, model_name=st.session_state.api_model ) # Mark agent as initialized st.session_state.agent_initialized = True # Update the current problem details for display if st.session_state.problem_input_method == "Select Predefined": problem_key = st.session_state.selected_mcm_problem st.session_state.current_problem_title = problem_key st.session_state.current_problem_details = f"**Background:**\n{MCM_PROBLEMS[problem_key]['background']}\n\n**Requirements:**\n{MCM_PROBLEMS[problem_key]['problem_requirement']}" else: # Use "Custom Problem" directly instead of custom_title st.session_state.current_problem_title = "Custom Problem" st.session_state.current_problem_details = f"**Background:**\n{st.session_state.custom_background}\n\n**Requirements:**\n{st.session_state.custom_requirements}" except Exception as e: st.error(f"Failed to initialize modeling agent: {e}") st.session_state.modeling_agent = None st.session_state.agent_initialized = False def update_api_settings(): """Update API settings for the ModelingAgentSystem in the middle of a session.""" if not st.session_state.agent_initialized or st.session_state.modeling_agent is None: # st.error("ModelingAgentSystem not initialized. Please define a problem first.") return False # Check if we have all required API information if not (st.session_state.api_base_url and st.session_state.api_key and st.session_state.api_model): st.error("Please provide valid API Base URL, API Key, and Model Name.") return False try: # Reset the LLM with new API settings st.session_state.modeling_agent.llm.reset( api_key=st.session_state.api_key, api_base=st.session_state.api_base_url, model_name=st.session_state.api_model ) print(f'Reset LLM: {st.session_state.api_model}') # Update the model name in the config st.session_state.modeling_agent.config['model_name'] = st.session_state.api_model # Also update the agents that use this LLM for agent_name in ['pa', 'pm', 'td', 'task', 'mr', 'chart', 'coordinator']: if hasattr(st.session_state.modeling_agent, agent_name): agent = getattr(st.session_state.modeling_agent, agent_name) if hasattr(agent, 'llm'): agent.llm = st.session_state.modeling_agent.llm st.success("API settings updated successfully!") return True except Exception as e: st.error(f"Failed to update API settings: {e}") import traceback st.error(traceback.format_exc()) return False def get_navigatable_stages(): """Determines which stages can be navigated to (unlocked) based on agent's step dependencies.""" # If no agent or no stages, return empty list if not st.session_state.agent_initialized or not st.session_state.stages: return [] navigatable = [] if st.session_state.modeling_agent: # Get completed steps from agent completed_steps = st.session_state.modeling_agent.get_completed_steps() # Get dependencies between steps from agent dependencies = st.session_state.modeling_agent._define_dependencies() # A stage is navigatable if all its dependencies are completed for stage_key in st.session_state.stages.keys(): # If stage is already completed, it's navigatable if stage_key in completed_steps: navigatable.append(stage_key) continue # Check if all dependencies are completed deps = dependencies.get(stage_key, []) if all(dep in completed_steps for dep in deps): navigatable.append(stage_key) else: # Strict sequential unlocking - only allow completed stages and the next one chapter_keys = list(st.session_state.stages.keys()) # Always allow navigation to completed stages for i, key in enumerate(chapter_keys): if st.session_state.stages[key]['status'] == 'completed': navigatable.append(key) # Find the first non-completed stage (if any) next_stage = None for key in chapter_keys: if st.session_state.stages[key]['status'] != 'completed': next_stage = key break # Add the next non-completed stage to navigatable list (if found) if next_stage: navigatable.append(next_stage) # Ensure the first stage is always navigatable if not navigatable and chapter_keys: navigatable.append(chapter_keys[0]) return navigatable def get_stage_status_icon(status): """Returns an icon based on stage status.""" if status == "completed": return "✅" elif status == "in_progress": return "⏳" else: # not_started return "📄" # Use a neutral icon for not started but accessible def get_stage_display_status(key, navigatable_stages): """ Gets status or locked state """ if key not in st.session_state.stages: return "locked" # Should not happen with current logic, but safeguard if key in navigatable_stages: return st.session_state.stages[key]["status"] else: return "locked" def generate_markdown_export(): """Concatenates all stage content into a single markdown string.""" # If we have a modeling agent, use its paper content if st.session_state.agent_initialized and st.session_state.modeling_agent: paper = st.session_state.modeling_agent.get_paper() with open('paper.json', 'w') as f: json.dump(paper, f, indent=2) full_doc = f"# {st.session_state.current_problem_title}\n\n" full_doc += f"## Problem Description\n\n{st.session_state.current_problem_details}\n\n---\n\n" # Add problem analysis if available if 'problem_analysis' in paper: full_doc += f"# Problem Analysis\n\n{paper['problem_analysis']}\n\n---\n\n" # Add high-level modeling if available if 'high_level_modeling' in paper: full_doc += f"# High-Level Modeling\n\n{paper['high_level_modeling']}\n\n---\n\n" # Add task decomposition if available if 'task_decomposition_summary' in paper: full_doc += f"# Task Decomposition\n\n{paper['task_decomposition_summary']}\n\n---\n\n" # Add task dependency analysis if available if 'task_dependency_analysis' in paper: full_doc += f"# Task Dependency Analysis\n\n" if isinstance(paper['task_dependency_analysis'], list): for i, analysis in enumerate(paper['task_dependency_analysis']): full_doc += f"## Task {i+1} Dependencies\n{analysis}\n\n" else: full_doc += f"{paper['task_dependency_analysis']}\n\n" full_doc += "---\n\n" # Add all tasks for i, task in enumerate(paper.get('tasks', [])): if task: # Only include non-empty task dictionaries task_id = i + 1 full_doc += f"# Task {task_id}\n\n" # Add task components in a logical order components_order = [ 'task_description', 'task_analysis', 'preliminary_formulas', 'mathematical_modeling_process', 'solution_interpretation', 'subtask_outcome_analysis', 'charts' ] for component in components_order: if component in task: component_title = component.replace('_', ' ').title() content = task[component] if component == 'charts' and isinstance(content, list): full_doc += f"## {component_title}\n\n" for j, chart in enumerate(content): full_doc += f"### Chart {j+1}\n{chart}\n\n" else: full_doc += f"## {component_title}\n\n{content}\n\n" full_doc += "---\n\n" return full_doc else: # Fall back to the original implementation full_doc = f"# {st.session_state.current_problem_title}\n\n" full_doc += f"## Problem Description\n\n{st.session_state.current_problem_details}\n\n---\n\n" for key, data in st.session_state.stages.items(): # Include content of all stages, even if not started full_doc += data["content"] + "\n\n---\n\n" return full_doc # Instead of static predefinition, we'll create a function to build the chapter structure dynamically def get_chapter_structure(): """Dynamically build the chapter structure using ModelingAgentSystem interfaces.""" if st.session_state.agent_initialized and st.session_state.modeling_agent: # Get all planned steps (complete structure) planned_steps = st.session_state.modeling_agent.get_planned_steps() # Build chapter structure chapter_structure = OrderedDict() for step_name in planned_steps: # Extract title and create a prompt hint title = step_name prompt_hint = f"Complete the {step_name} section. {stages_config.get(step_name, '')}" # For task steps, create more specific prompts if GenerationStep.is_task_step(step_name): task_id, step_type = GenerationStep.parse_task_step(step_name) if task_id and step_type: prompt_hint = f"Complete the {step_type} for Task {task_id}." chapter_structure[step_name] = { "title": title, "prompt_hint": prompt_hint } return chapter_structure else: # Fallback to the original definition if modeling_agent is not available return OrderedDict(config['stages']) def sync_stages_with_agent(): """Synchronizes session stages with modeling agent progress""" if not st.session_state.agent_initialized or not st.session_state.modeling_agent: return # Get agent's current state completed_steps = st.session_state.modeling_agent.get_completed_steps() planned_steps = st.session_state.modeling_agent.get_planned_steps() paper = st.session_state.modeling_agent.get_paper() # Get current chapter structure chapter_structure = get_chapter_structure() # First, update stage structure - add any new steps for step_name in planned_steps: if step_name not in st.session_state.stages: prompt_hint = chapter_structure.get(step_name, {}).get('prompt_hint', f"Complete the {step_name} section. {stages_config.get(step_name, '')}") st.session_state.stages[step_name] = { "title": step_name, "content": f"# {step_name}\n\n*(Content not generated yet. Use the '✨ Generate Content' button and '✏️ Edit Content' after generation.)*", "status": "completed" if step_name in completed_steps else "not_started", "edit_mode": False, "prompt_hint": prompt_hint } # Update stage statuses for step_name in st.session_state.stages: if step_name in completed_steps: st.session_state.stages[step_name]["status"] = "completed" # Update content based on what's in the paper # Sync Problem Background and Problem Requirement from paper if 'problem_background' in paper and 'Problem Background' in st.session_state.stages: st.session_state.stages['Problem Background']['content'] = f"# Problem Background\n\n{paper['problem_background']}" if 'problem_requirement' in paper and 'Problem Requirement' in st.session_state.stages: st.session_state.stages['Problem Requirement']['content'] = f"# Problem Requirement\n\n{paper['problem_requirement']}" # Update Problem Analysis content if 'problem_analysis' in paper and 'Problem Analysis' in st.session_state.stages: st.session_state.stages['Problem Analysis']['content'] = f"# Problem Analysis\n\n{paper['problem_analysis']}" # Update High-Level Modeling content if 'high_level_modeling' in paper and 'High-Level Modeling' in st.session_state.stages: st.session_state.stages['High-Level Modeling']['content'] = f"# High-Level Modeling\n\n{paper['high_level_modeling']}" # Update Task Decomposition content if 'task_decomposition_summary' in paper and 'Task Decomposition' in st.session_state.stages: st.session_state.stages['Task Decomposition']['content'] = f"# Task Decomposition\n\n{paper['task_decomposition_summary']}" # Update Task Dependency Analysis content if 'task_dependency_analysis' in paper and 'Dependency Analysis' in st.session_state.stages: dependency_content = "# Task Dependency Analysis\n\n" if isinstance(paper['task_dependency_analysis'], list): for i, analysis in enumerate(paper['task_dependency_analysis']): dependency_content += f"## Task {i+1} Dependencies\n{analysis}\n\n" else: dependency_content += str(paper['task_dependency_analysis']) st.session_state.stages['Dependency Analysis']['content'] = dependency_content # Update task-specific contents if 'tasks' in paper: for task_index, task_dict in enumerate(paper['tasks']): if not task_dict: # Skip empty task dictionaries continue task_id = task_index + 1 # 1-based ID for display # Map each task component to the corresponding step component_to_step = { 'task_description': f'Task {task_id} Description', 'task_analysis': f'Task {task_id} Analysis', 'preliminary_formulas': f'Task {task_id} Preliminary Formulas', 'mathematical_modeling_process': f'Task {task_id} Mathematical Modeling Process', 'task_code': f'Task {task_id} Code', 'solution_interpretation': f'Task {task_id} Solution Interpretation', 'subtask_outcome_analysis': f'Task {task_id} Subtask Outcome Analysis', 'charts': f'Task {task_id} Charts' } # Update each component if it exists for component, step_name in component_to_step.items(): if component in task_dict and step_name in st.session_state.stages: content = task_dict[component] # Format content based on component type if component == 'charts': formatted_content = f"# Charts for Task {task_id}\n\n" if isinstance(content, list): for i, chart in enumerate(content): formatted_content += f"## Chart {i+1}\n{chart}\n\n" else: formatted_content += str(content) else: # Default formatting formatted_content = f"# {step_name}\n\n{content}" st.session_state.stages[step_name]['content'] = formatted_content def _handle_content_edit(active_stage_key, new_content): """将用户编辑的内容同步回 ModelingAgentSystem""" if not st.session_state.agent_initialized or not st.session_state.modeling_agent: return # 更新 session_state 中的内容 st.session_state.stages[active_stage_key]['content'] = new_content # 根据步骤类型更新 agent 的 paper 字典 agent = st.session_state.modeling_agent paper = agent.paper if active_stage_key == 'Problem Analysis': paper['problem_analysis'] = new_content.replace('# Problem Analysis\n\n', '') elif active_stage_key == 'High-Level Modeling': paper['high_level_modeling'] = new_content.replace('# High-Level Modeling\n\n', '') elif active_stage_key == 'Task Decomposition': paper['task_decomposition_summary'] = new_content.replace('# Task Decomposition\n\n', '') elif active_stage_key == 'Dependency Analysis': # 可能需要特殊处理,因为这可能是一个结构化的内容 clean_content = new_content.replace('# Task Dependency Analysis\n\n', '') paper['task_dependency_analysis'] = clean_content elif active_stage_key.startswith('Task '): # 解析任务 ID 和步骤类型 match = re.match(r"Task (\d+) (.*)", active_stage_key) if match: task_id = int(match.group(1)) step_type = match.group(2) task_index = task_id - 1 # 确保 task 列表足够长 while len(paper['tasks']) <= task_index: paper['tasks'].append({}) # 映射步骤类型到 paper 中的键 step_to_key = { 'Description': 'task_description', 'Analysis': 'task_analysis', 'Preliminary Formulas': 'preliminary_formulas', 'Mathematical Modeling Process': 'mathematical_modeling_process', 'Solution Interpretation': 'solution_interpretation', 'Subtask Outcome Analysis': 'subtask_outcome_analysis', 'Charts': 'charts' } if step_type in step_to_key: key = step_to_key[step_type] clean_content = new_content.replace(f'# {active_stage_key}\n\n', '') paper['tasks'][task_index][key] = clean_content # 同时更新协调器的内存,如果适用 if step_type != 'Charts': # Charts 可能有特殊格式 agent.coordinator.memory.setdefault(str(task_id), {})[key] = clean_content # --- Main App Logic --- initialize_session_state() # Only show title and caption on the initial page if not st.session_state.problem_defined: st.title(f"{APP_TITLE}") # Added a bit of flair st.caption("An AI-assisted platform for structuring and drafting mathematical modeling reports.") # --- Sidebar --- with st.sidebar: st.header("⚙️ Configuration") # Use secrets if available, otherwise show inputs api_base_provided = bool(st.session_state.api_base_url) api_key_provided = bool(st.session_state.api_key) api_model_provided = bool(st.session_state.api_model) with st.expander("LLM API Details", expanded=not (api_base_provided and api_key_provided and api_model_provided)): # Sync with main content if those fields exist if 'api_base_url_main' in st.session_state: st.session_state.api_base_url = st.session_state.api_base_url_main if 'api_key_main' in st.session_state: st.session_state.api_key = st.session_state.api_key_main if 'api_model_main' in st.session_state: st.session_state.api_model = st.session_state.api_model_main st.text_input( "API Base URL", value=st.session_state.api_base_url, key="api_base_url", placeholder="e.g., https://api.openai.com/v1", help="Your OpenAI compatible API endpoint." ) st.text_input( "API Key", value=st.session_state.api_key, key="api_key", type="password", help="Your OpenAI compatible API key. Can also be set via Streamlit secrets (API_KEY)." ) st.text_input( "Model Name", value=st.session_state.api_model, key="api_model", placeholder="e.g., gpt-4-turbo", help="The specific model to use for generation." ) if st.button("Save", key="save_api_settings", type="secondary", use_container_width=True): st.session_state.api_base_url_main = st.session_state.api_base_url st.session_state.api_key_main = st.session_state.api_key st.session_state.api_model_main = st.session_state.api_model update_api_settings() st.divider() # Requirement 1: Put Problem Definition in a controllable expander st.header("🔍 Problem Definition") with st.expander("Problem Background & Requirements", expanded=st.session_state.problem_definition_expanded): # Check if API keys are provided before allowing problem definition api_configured = bool(st.session_state.api_base_url and st.session_state.api_key and st.session_state.api_model) if not api_configured: st.warning("Please provide valid API Base URL, API Key, and Model Name in the configuration above to define a problem.") # Don't st.stop() here, allow viewing config expander else: problem_input_method = st.radio( "Select Problem Source:", ["Select Predefined", "Input Custom"], key="problem_input_method", horizontal=True, # help="Choose a built-in MCM/ICM problem or define your own." ) confirm_problem = False if st.session_state.problem_input_method == "Select Predefined": st.selectbox( "Choose MCM/ICM Problem:", options=list(MCM_PROBLEMS.keys()), format_func=lambda x: f"MCM_{x}", # Show full title key="selected_mcm_problem" ) if st.button("Load Problem", type="primary", key="load_predefined", use_container_width=True): confirm_problem = True problem_key = st.session_state.selected_mcm_problem st.session_state.current_problem_title = problem_key # MCM_PROBLEMS[problem_key]['title'] st.session_state.current_problem_details = f"**Background:**\n{MCM_PROBLEMS[problem_key]['background']}\n\n**Requirements:**\n{MCM_PROBLEMS[problem_key]['problem_requirement']}" else: # Input Custom # Removing the custom title input field # st.text_input("Custom Problem Title:", key="custom_title", placeholder="Enter a short title for your problem") st.text_area("Problem Background:", key="custom_background", height=150, placeholder="Provide context and background information.") st.text_area("Problem Requirements:", key="custom_requirements", height=100, placeholder="Detail the specific tasks or questions to be addressed.") # Add file upload functionality st.subheader("Upload Data Files") uploaded_files = st.file_uploader( "Upload CSV or Excel files for your problem (optional)", type=["csv", "xlsx", "xls"], accept_multiple_files=True, help="Data files will be available for the modeling agent to use." ) # Process uploaded files if uploaded_files and len(uploaded_files) > 0: # Clear previous uploads if new files are uploaded if "last_upload_count" not in st.session_state or st.session_state.last_upload_count != len(uploaded_files): st.session_state.uploaded_files = [] st.session_state.last_upload_count = len(uploaded_files) # Save uploaded files for uploaded_file in uploaded_files: # Check if file was already processed file_already_processed = any(info['name'] == uploaded_file.name for info in st.session_state.uploaded_files) if not file_already_processed: # Create directory for files if it doesn't exist file_dir = Path('data') / st.session_state.session_id / 'data' file_dir.mkdir(parents=True, exist_ok=True) # Save file to disk file_path = file_dir / uploaded_file.name with open(file_path, "wb") as f: f.write(uploaded_file.getbuffer()) # Try to read file to get preview preview = None if uploaded_file.name.endswith(('.csv', '.xlsx', '.xls')): try: if uploaded_file.name.endswith('.csv'): df = pd.read_csv(file_path) else: df = pd.read_excel(file_path) preview = df.head(5) except Exception as e: preview = f"Error reading file: {str(e)}" # Add file info to session state st.session_state.uploaded_files.append({ 'name': uploaded_file.name, 'path': str(file_path.absolute()), 'preview': preview }) # Display uploaded files if st.session_state.uploaded_files: st.success(f"{len(st.session_state.uploaded_files)} file(s) uploaded successfully") for file_info in st.session_state.uploaded_files: # Replace nested expander with a container and bolded title with st.container(border=True): st.markdown(f"**📄 {file_info['name']}**") if isinstance(file_info['preview'], pd.DataFrame): st.dataframe(file_info['preview']) else: st.write(file_info['preview']) if st.button("Set Custom Problem", type="primary", key="load_custom", use_container_width=True): if st.session_state.custom_background and st.session_state.custom_requirements: confirm_problem = True # Use "Custom Problem" as the default title instead of custom_title st.session_state.current_problem_title = "Custom Problem" st.session_state.current_problem_details = f"**Background:**\n{st.session_state.custom_background}\n\n**Requirements:**\n{st.session_state.custom_requirements}" else: st.warning("Please provide background and requirements for the custom problem.") # Handle problem confirmation and stage reset if confirm_problem: if st.session_state.problem_defined: # If a problem was already defined, show info about reset st.toast("Reloading problem: Existing stage content and progress will be reset.") time.sleep(1) # Give user time to see toast st.session_state.problem_defined = True st.session_state.problem_definition_expanded = False # Requirement 1: Collapse expander reset_stages() st.rerun() # Rerun to update sidebar navigation and main content area # --- Stage Navigation (Displayed only if a problem is defined) --- if st.session_state.problem_defined: st.divider() st.header("📚 Stages") navigatable_stages = get_navigatable_stages() # Ensure the current active stage is valid if st.session_state.active_stage not in navigatable_stages: # If current active is somehow locked (e.g., after loading a new problem), # default to the last navigatable one. if navigatable_stages: st.session_state.active_stage = navigatable_stages[-1] else: # Should not happen, but fallback to first stage # st.session_state.active_stage = list(st.session_state.stages.keys())[0] pass # Requirement 2 & 3: Use buttons for navigation, disable locked stages # Using a container to apply styles easier via CSS selector with st.container(border=False): st.markdown('
', unsafe_allow_html=True) # Wrapper for CSS targeting for key, data in st.session_state.stages.items(): stage_info = st.session_state.stages.get(key) if stage_info: is_navigatable = key in navigatable_stages is_active = key == st.session_state.active_stage display_status = get_stage_display_status(key, navigatable_stages) if display_status == "locked": icon = "🔒" label_markdown = f"{icon}  {stage_info['title']}" # Add non-breaking spaces else: icon = get_stage_status_icon(display_status) label_markdown = f"{icon}  {stage_info['title']}" # Use markdown in button label to render icons correctly button_label_html = f'
{label_markdown}
' # Set button type: primary if active, secondary otherwise button_type = "primary" if is_active else "secondary" if st.button( label=label_markdown, # Use markdown directly key=f"nav_{key}", disabled=not is_navigatable, use_container_width=True, type=button_type, help=f"Status: {display_status.replace('_', ' ').title()}" if is_navigatable else "Complete previous stages to unlock" ): if is_navigatable and not is_active: st.session_state.active_stage = key # Turn off edit mode when switching stages if st.session_state.stages[key].get('edit_mode', False): st.session_state.stages[key]['edit_mode'] = False st.rerun() st.markdown('
', unsafe_allow_html=True) # Close wrapper st.divider() st.header("📄 Solution Report") if st.session_state.stages: markdown_content = generate_markdown_export() st.download_button( label="📥 Export Intermediate Process (.md)", data=markdown_content, file_name=f"mapp_export_{st.session_state.current_problem_title.replace(' ', '_').lower()}.md", mime="text/markdown", use_container_width=True ) # 检查是否所有章节都已完成 all_completed = all(stage_data["status"] == "completed" for stage_data in st.session_state.stages.values()) # 添加导出完整报告按钮(只有在所有章节完成后才能点击) if st.button( "📊 Export Solution Report (.latex & .pdf)", disabled=not all_completed, use_container_width=True, help="Generate and download a complete LaTeX and PDF report (available after all stages are completed)" ): if st.session_state.agent_initialized and st.session_state.modeling_agent: with st.spinner("Generating LaTeX and PDF solution report... This may take a few minutes."): try: # 获取输出路径 output_path = str(Path('data') / st.session_state.session_id / 'output') # 调用agent生成LaTeX和PDF st.session_state.modeling_agent.generate_paper(output_path) # 存储文件路径到session_state,使按钮可以在后续渲染中保持显示 st.session_state.latex_path = f'{output_path}/latex/solution.tex' st.session_state.pdf_path = f'{output_path}/latex/solution.pdf' except Exception as e: st.error(f"Error generating report: {str(e)}") import traceback st.error(traceback.format_exc()) st.rerun() # 刷新页面以显示下载按钮 else: st.error("ModelingAgentSystem not initialized. Please check API configuration.") # 检查session_state中是否有生成的文件路径,并显示对应的下载按钮 if hasattr(st.session_state, 'latex_path') and Path(st.session_state.latex_path).exists(): with open(st.session_state.latex_path, "rb") as f: st.download_button( label="📥 Download LaTeX (.tex)", data=f, file_name="solution.tex", mime="application/x-tex", key="download_latex" # 添加唯一key ) if hasattr(st.session_state, 'pdf_path') and Path(st.session_state.pdf_path).exists(): with open(st.session_state.pdf_path, "rb") as f: st.download_button( label="📥 Download PDF Report", data=f, file_name="solution.pdf", mime="application/pdf", key="download_pdf" # 添加唯一key ) else: st.info("Define a problem and generate content to enable export.") # --- Main Content Area --- if not st.session_state.problem_defined: # Enhanced initial message st.info("⬅️ Welcome to Mathematical Modeling Agent! Please configure your API details and define a modeling problem using the sidebar to begin.") st.markdown("---") st.subheader("How it works:") st.markdown(""" 1. **Configure API:** Enter your OpenAI compatible API details in the sidebar. These can also be set via Streamlit secrets (`API_BASE_URL`, `API_KEY`). 2. **Define Problem:** Choose a predefined problem or input your own custom problem description and requirements. 3. **Navigate Stages:** Use the sidebar to move through the standard sections of a modeling report. Stages unlock as you mark previous ones complete. 4. **Generate & Edit:** For each stage, you can: * Use the **✨ Generate Content** button (with optional instructions) to get an initial draft from the AI (mock generation in this version). * **✏️ Edit Content** after generation. * Mark stages as **✅ Complete** to unlock the next one. 5. **Export:** Download your progress as a Markdown file at any time, or export your complete solution as both LaTeX and PDF files when finished. """) else: active_stage_key = st.session_state.active_stage stage_data = st.session_state.stages[active_stage_key] # Display Stage Title and Goal st.header(f"{stage_data['title']}") st.markdown(f"> **Goal:** *{stage_data['prompt_hint']}*") st.divider() # Requirement 3: Use dividers for better separation # --- AI Generation Controls --- # Requirement 3: Optimize UI layout with st.container(border=True): # Put generation controls in a bordered container st.subheader("🚀 Agent Content Generation") col1, col2 = st.columns([3, 1]) with col1: user_prompt = st.text_area( "Instructions / Prompt Refinement:", key=f"prompt_{active_stage_key}", placeholder="Optional: Provide specific instructions, focus points, or data for the Agent to use in this stage.", help="Guide the AI generation for this specific stage.", height=100 ) with col2: st.session_state.critique_rounds = st.slider( "Critic Rounds", 0, 3, st.session_state.critique_rounds, help="Simulated self-critique iterations for the AI (0-3). More rounds might improve quality but take longer (mock only).", key=f"critique_{active_stage_key}" # Unique key per stage ) if st.button("✨ Generate Content", key=f"generate_{active_stage_key}", type="primary", use_container_width=True): if not st.session_state.agent_initialized or st.session_state.modeling_agent is None: st.error("ModelingAgentSystem not initialized. Please check API configuration.") else: with st.spinner(f"🤖 Generating content for '{active_stage_key}'... Please wait."): try: # Update the critique rounds in case they changed st.session_state.modeling_agent.config['problem_analysis_round'] = st.session_state.critique_rounds st.session_state.modeling_agent.config['problem_modeling_round'] = st.session_state.critique_rounds st.session_state.modeling_agent.config['task_formulas_round'] = st.session_state.critique_rounds # Call the agent's generate_step method if user_prompt: print(user_prompt, st.session_state.critique_rounds) success = st.session_state.modeling_agent.generate_step(active_stage_key, user_prompt=user_prompt, round=st.session_state.critique_rounds) if success: # Sync stages with the updated agent state sync_stages_with_agent() # Update the stage status if st.session_state.stages[active_stage_key]['status'] == 'not_started': st.session_state.stages[active_stage_key]['status'] = 'in_progress' st.success(f"Successfully generated content for '{active_stage_key}'!") # Check if we should advance to next stage completed_steps = st.session_state.modeling_agent.get_completed_steps() if active_stage_key in completed_steps: st.session_state.stages[active_stage_key]['status'] = 'completed' else: st.error(f"Could not generate content for '{active_stage_key}'. Check dependencies or retry.") except Exception as e: st.error(f"Error generating content: {str(e)}") import traceback st.error(traceback.format_exc()) st.rerun() # Update display immediately st.divider() # Requirement 3: Use dividers # --- Content Display and Editing --- # st.subheader("Stage Content") edit_mode = st.session_state.stages[active_stage_key].get('edit_mode', False) content = st.session_state.stages[active_stage_key]['content'] is_completed = st.session_state.stages[active_stage_key]['status'] == 'completed' if edit_mode: # --- Edit Mode --- st.info("✍️ You are in Edit Mode. Use Markdown syntax. Save or Cancel when done.") new_content = st.text_area( "Edit Content (Markdown Supported)", value=content, key=f"editor_{active_stage_key}", height=500, # Increased height for editing label_visibility="collapsed" ) col_save, col_cancel, _ = st.columns([1, 1, 4]) # Keep button layout simple with col_save: if st.button("💾 Save Changes", key=f"save_{active_stage_key}", type="primary"): _handle_content_edit(active_stage_key, new_content) st.session_state.stages[active_stage_key]['edit_mode'] = False # If content is saved and stage was 'not_started', move to 'in_progress' if st.session_state.stages[active_stage_key]['status'] == 'not_started': st.session_state.stages[active_stage_key]['status'] = 'in_progress' st.toast("Changes saved!", icon="💾") st.rerun() with col_cancel: if st.button("❌ Cancel Edit", key=f"cancel_{active_stage_key}"): st.session_state.stages[active_stage_key]['edit_mode'] = False st.rerun() else: # --- View Mode --- with st.container(border=True): # Put content in a bordered container for visual grouping st.markdown(content, unsafe_allow_html=True) # Render the markdown content # st.markdown("---") # Separator before action buttons # Action buttons layout - Requirement 3: Optimize UI layout cols = st.columns(3) with cols[0]: if st.button("✏️ Edit Content", key=f"edit_{active_stage_key}", use_container_width=True, disabled=edit_mode): st.session_state.stages[active_stage_key]['edit_mode'] = True st.rerun() with cols[1]: # Allow marking as complete only if not already completed if not is_completed: if st.button("✅ Mark as Complete", key=f"complete_{active_stage_key}", use_container_width=True): st.session_state.stages[active_stage_key]['status'] = 'completed' st.toast(f"Stage '{stage_data['title']}' marked complete!", icon="✅") # Try to advance to the next stage automatically stage_keys = list(st.session_state.stages.keys()) current_index = stage_keys.index(active_stage_key) if current_index + 1 < len(stage_keys): next_stage_key = stage_keys[current_index + 1] # Unlock the next stage by setting its status to in_progress if not started if st.session_state.stages[next_stage_key]['status'] == 'not_started': st.session_state.stages[next_stage_key]['status'] = 'in_progress' st.session_state.active_stage = next_stage_key # Move focus else: st.success("🎉 All stages completed!") # Optional: Message when last stage is done st.rerun() # Rerun to update sidebar icons/state and main view else: # 如果已经完成,显示一个可点击的"再次完成"按钮 if st.button("✅ Completed (Click to advance)", key=f"completed_{active_stage_key}", use_container_width=True): # 尝试前进到下一个阶段 stage_keys = list(st.session_state.stages.keys()) current_index = stage_keys.index(active_stage_key) if current_index + 1 < len(stage_keys): next_stage_key = stage_keys[current_index + 1] # 如果下一阶段未开始,设置为进行中 if st.session_state.stages[next_stage_key]['status'] == 'not_started': st.session_state.stages[next_stage_key]['status'] = 'in_progress' st.session_state.active_stage = next_stage_key # 移动焦点 st.rerun() # 重新运行以更新侧边栏图标/状态和主视图 else: st.success("🎉 All stages completed!") # 可选:完成最后一个阶段时的消息 # Placeholder for potential future actions in the third column # with cols[2]: # st.button("Other Action?", use_container_width=True) # --- Footer --- st.markdown("---") st.caption("Mathematical Modeling Multi-Agent System | Prototype") # 在重要操作后添加同步调用 def on_page_load(): """页面加载时同步代理状态""" if st.session_state.agent_initialized and st.session_state.modeling_agent: sync_stages_with_agent() # 在 app.py 主循环的开始处调用 if st.session_state.problem_defined and st.session_state.agent_initialized: on_page_load()