Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| from pathlib import Path | |
| from datetime import datetime | |
| from enum import Enum, auto | |
| import re | |
| import uuid | |
| import pandas as pd | |
| import streamlit as st | |
| import time | |
| from collections import OrderedDict | |
| from html_style import html_style | |
| from config import config, MCM_PROBLEMS, stages_config | |
| from agent_api import ModelingAgentSystem | |
| # Define enum for mapping UI stages to agent steps | |
| class GenerationStep(Enum): | |
| PROBLEM_ANALYSIS = "Problem Analysis" | |
| HIGH_LEVEL_MODELING = "High-Level Modeling" | |
| TASK_DECOMPOSITION = "Task Decomposition" | |
| DEPENDENCY_ANALYSIS = "Dependency Analysis" | |
| # Task-specific steps follow pattern: Task {N} {StepType} | |
| def get_task_step(cls, task_id, step_type): | |
| """Returns a step name for a specific task and step type.""" | |
| return f"Task {task_id} {step_type}" | |
| def is_task_step(cls, step_name): | |
| """Checks if a step name belongs to a task-specific step.""" | |
| return step_name.startswith("Task ") | |
| def parse_task_step(cls, step_name): | |
| """Parses a task step name into task_id and step_type components.""" | |
| if not cls.is_task_step(step_name): | |
| return None, None | |
| parts = step_name.split(" ", 2) | |
| if len(parts) < 3: | |
| return None, None | |
| try: | |
| task_id = int(parts[1]) | |
| step_type = parts[2] | |
| return task_id, step_type | |
| except ValueError: | |
| return None, None | |
| # --- Configuration --- | |
| APP_TITLE = "Mathematical Modeling Multi-Agent System" | |
| st.set_page_config(page_title=APP_TITLE, layout="wide", initial_sidebar_state="expanded") | |
| # --- Custom CSS for Styling --- | |
| # Requirement 4: Optimize UI style and color scheme | |
| st.markdown(html_style, unsafe_allow_html=True) | |
| # Define Stage Structure (Template) | |
| # CHAPTER_STRUCTURE = OrderedDict(config['stages']) | |
| # --- Helper Functions --- | |
| def initialize_session_state(): | |
| """Initializes session state variables if they don't exist.""" | |
| defaults = { | |
| "api_base_url": "", # Default empty string | |
| "api_key": "", # Default empty string | |
| "api_model": "", # Default model suggestion | |
| "problem_input_method": "Select Predefined", | |
| "selected_mcm_problem": list(MCM_PROBLEMS.keys())[0], | |
| "custom_background": "", | |
| "custom_requirements": "", | |
| "problem_defined": False, | |
| "problem_definition_expanded": True, # Requirement 1: Control expander state | |
| "current_problem_title": "", | |
| "current_problem_details": "", | |
| "stages": {}, # Stores content, status, edit_mode for each stage | |
| "active_stage": None, # Will be set after chapter structure is determined | |
| "critique_rounds": 0, | |
| "modeling_agent": None, # Will store the ModelingAgentSystem instance | |
| "agent_initialized": False, # Flag to track if agent was initialized | |
| "session_id": str(uuid.uuid4()), # Generate a unique session ID | |
| "uploaded_files": [], # Track uploaded data files | |
| } | |
| # # Try to load secrets if available | |
| # try: | |
| # if "API_BASE_URL" in st.secrets: | |
| # defaults["api_base_url"] = st.secrets["API_BASE_URL"] | |
| # if "API_KEY" in st.secrets: | |
| # defaults["api_key"] = st.secrets["API_KEY"] | |
| # except FileNotFoundError: | |
| # # Secrets file not found, use defaults | |
| # pass | |
| for key, value in defaults.items(): | |
| if key not in st.session_state: | |
| st.session_state[key] = value | |
| # Initialize stage structure in session state if problem is defined but stages aren't init'd | |
| # or if the stages structure is empty | |
| if st.session_state.problem_defined and not st.session_state.stages: | |
| reset_stages() | |
| def reset_stages(): | |
| """ Resets stage content and progress. """ | |
| # Initialize ModelingAgentSystem if API is configured | |
| if st.session_state.api_base_url and st.session_state.api_key and st.session_state.api_model: | |
| initialize_modeling_agent() | |
| # If agent is initialized, use its planned steps | |
| if st.session_state.agent_initialized and st.session_state.modeling_agent: | |
| # Get the planned steps from the agent | |
| planned_steps = st.session_state.modeling_agent.get_planned_steps() | |
| completed_steps = st.session_state.modeling_agent.get_completed_steps() | |
| # Initialize stages based on planned steps | |
| st.session_state.stages = {} | |
| for step_name in planned_steps: | |
| status = "completed" if step_name in completed_steps else "not_started" | |
| # For the first non-completed step, mark as in_progress | |
| if status == "not_started" and not any(s["status"] == "in_progress" for s in st.session_state.stages.values()): | |
| status = "in_progress" | |
| st.session_state.stages[step_name] = { | |
| "title": step_name, # Use step name as title | |
| "content": f"# {step_name}\n\n*(Content not generated yet. Use the '✨ Generate Content' button and '✏️ Edit Content' after generation.)*", | |
| "status": status, | |
| "edit_mode": False, | |
| "prompt_hint": f"Complete the {step_name} section. {stages_config.get(step_name, '')}" # Default prompt hint | |
| } | |
| # Set active stage to first in-progress stage, or first stage if none | |
| in_progress_stages = [k for k, v in st.session_state.stages.items() if v["status"] == "in_progress"] | |
| if in_progress_stages: | |
| st.session_state.active_stage = in_progress_stages[0] | |
| elif st.session_state.stages: | |
| st.session_state.active_stage = next(iter(st.session_state.stages)) | |
| else: | |
| # Fallback to static structure if agent is not initialized | |
| chapter_structure = OrderedDict(config['stages']) | |
| st.session_state.stages = { | |
| key: {"title": data["title"], | |
| "content": f"# {data['title']}\n\n*(Content not generated yet. Use the '✨ Generate Content' button and '✏️ Edit Content' after generation.)*", | |
| "status": "not_started", # States: not_started, in_progress, completed | |
| "edit_mode": False, | |
| "prompt_hint": data["prompt_hint"]} | |
| for key, data in chapter_structure.items() | |
| } | |
| # Mark the first stage as in_progress initially | |
| if chapter_structure: | |
| first_stage_key = list(chapter_structure.keys())[0] | |
| if first_stage_key in st.session_state.stages: | |
| st.session_state.stages[first_stage_key]["status"] = "in_progress" | |
| st.session_state.active_stage = first_stage_key | |
| def initialize_modeling_agent(): | |
| """Initialize the ModelingAgentSystem with current configuration""" | |
| # Check if API details are provided in session state first | |
| if not st.session_state.api_base_url or not st.session_state.api_key or not st.session_state.api_model: | |
| st.error("API Base URL, API Key, and Model Name must be configured before initializing the agent.") | |
| st.session_state.modeling_agent = None | |
| st.session_state.agent_initialized = False | |
| return # Stop initialization if config is missing | |
| # Create config for the agent system | |
| agent_config = { | |
| 'top_method_num': 6, | |
| 'problem_analysis_round': st.session_state.critique_rounds, | |
| 'problem_modeling_round': st.session_state.critique_rounds, | |
| 'task_formulas_round': st.session_state.critique_rounds, | |
| 'tasknum': 1, | |
| 'chart_num': 1, | |
| 'model_name': st.session_state.api_model, | |
| "method_name": "MAPP-Streamlit" | |
| } | |
| # Create or get problem path based on selected problem | |
| if st.session_state.problem_input_method == "Select Predefined": | |
| problem_key = st.session_state.selected_mcm_problem | |
| # Use problem from predefined problems | |
| problem_path = f'../data/actor_data/input/problem/{problem_key}.json' | |
| print(problem_path) | |
| dataset_path = "" # No custom dataset for predefined problems | |
| else: | |
| # Create dirs for session data | |
| session_dir = Path('data') / st.session_state.session_id | |
| problem_dir = session_dir | |
| data_dir = session_dir / 'data' | |
| output_dir = session_dir / 'output' | |
| for dir_path in [problem_dir, data_dir, output_dir]: | |
| dir_path.mkdir(parents=True, exist_ok=True) | |
| # Create custom problem JSON file | |
| dataset_paths = [] | |
| for uploaded_file_info in st.session_state.uploaded_files: | |
| dataset_paths.append(str(uploaded_file_info['path'])) | |
| custom_problem = { | |
| "title": "Custom Problem", # Set default title instead of using custom_title | |
| "background": st.session_state.custom_background, | |
| "problem_requirement": st.session_state.custom_requirements, | |
| "dataset_path": dataset_paths, | |
| "dataset_description": {}, | |
| "variable_description": {}, | |
| "addendum": "" | |
| } | |
| problem_path = problem_dir / 'problem.json' | |
| with open(problem_path, 'w') as f: | |
| json.dump(custom_problem, f, indent=2) | |
| dataset_path = str(data_dir) | |
| # Initialize the agent system | |
| output_path = str(Path('data') / st.session_state.session_id / 'output') | |
| os.makedirs(output_path, exist_ok=True) | |
| try: | |
| # Initialize ModelingAgentSystem | |
| st.session_state.modeling_agent = ModelingAgentSystem( | |
| problem_path=str(problem_path), # Ensure path is a string | |
| config=agent_config, | |
| dataset_path=dataset_path, | |
| output_path=output_path, | |
| name=st.session_state.selected_mcm_problem if st.session_state.problem_input_method == "Select Predefined" else "custom" | |
| ) | |
| # Set API details using reset method after initialization | |
| st.session_state.modeling_agent.llm.reset( | |
| api_key=st.session_state.api_key, | |
| api_base=st.session_state.api_base_url, | |
| model_name=st.session_state.api_model | |
| ) | |
| # Mark agent as initialized | |
| st.session_state.agent_initialized = True | |
| # Update the current problem details for display | |
| if st.session_state.problem_input_method == "Select Predefined": | |
| problem_key = st.session_state.selected_mcm_problem | |
| st.session_state.current_problem_title = problem_key | |
| st.session_state.current_problem_details = f"**Background:**\n{MCM_PROBLEMS[problem_key]['background']}\n\n**Requirements:**\n{MCM_PROBLEMS[problem_key]['problem_requirement']}" | |
| else: | |
| # Use "Custom Problem" directly instead of custom_title | |
| st.session_state.current_problem_title = "Custom Problem" | |
| st.session_state.current_problem_details = f"**Background:**\n{st.session_state.custom_background}\n\n**Requirements:**\n{st.session_state.custom_requirements}" | |
| except Exception as e: | |
| st.error(f"Failed to initialize modeling agent: {e}") | |
| st.session_state.modeling_agent = None | |
| st.session_state.agent_initialized = False | |
| def update_api_settings(): | |
| """Update API settings for the ModelingAgentSystem in the middle of a session.""" | |
| if not st.session_state.agent_initialized or st.session_state.modeling_agent is None: | |
| # st.error("ModelingAgentSystem not initialized. Please define a problem first.") | |
| return False | |
| # Check if we have all required API information | |
| if not (st.session_state.api_base_url and st.session_state.api_key and st.session_state.api_model): | |
| st.error("Please provide valid API Base URL, API Key, and Model Name.") | |
| return False | |
| try: | |
| # Reset the LLM with new API settings | |
| st.session_state.modeling_agent.llm.reset( | |
| api_key=st.session_state.api_key, | |
| api_base=st.session_state.api_base_url, | |
| model_name=st.session_state.api_model | |
| ) | |
| print(f'Reset LLM: {st.session_state.api_model}') | |
| # Update the model name in the config | |
| st.session_state.modeling_agent.config['model_name'] = st.session_state.api_model | |
| # Also update the agents that use this LLM | |
| for agent_name in ['pa', 'pm', 'td', 'task', 'mr', 'chart', 'coordinator']: | |
| if hasattr(st.session_state.modeling_agent, agent_name): | |
| agent = getattr(st.session_state.modeling_agent, agent_name) | |
| if hasattr(agent, 'llm'): | |
| agent.llm = st.session_state.modeling_agent.llm | |
| st.success("API settings updated successfully!") | |
| return True | |
| except Exception as e: | |
| st.error(f"Failed to update API settings: {e}") | |
| import traceback | |
| st.error(traceback.format_exc()) | |
| return False | |
| def get_navigatable_stages(): | |
| """Determines which stages can be navigated to (unlocked) based on agent's step dependencies.""" | |
| # If no agent or no stages, return empty list | |
| if not st.session_state.agent_initialized or not st.session_state.stages: | |
| return [] | |
| navigatable = [] | |
| if st.session_state.modeling_agent: | |
| # Get completed steps from agent | |
| completed_steps = st.session_state.modeling_agent.get_completed_steps() | |
| # Get dependencies between steps from agent | |
| dependencies = st.session_state.modeling_agent._define_dependencies() | |
| # A stage is navigatable if all its dependencies are completed | |
| for stage_key in st.session_state.stages.keys(): | |
| # If stage is already completed, it's navigatable | |
| if stage_key in completed_steps: | |
| navigatable.append(stage_key) | |
| continue | |
| # Check if all dependencies are completed | |
| deps = dependencies.get(stage_key, []) | |
| if all(dep in completed_steps for dep in deps): | |
| navigatable.append(stage_key) | |
| else: | |
| # Strict sequential unlocking - only allow completed stages and the next one | |
| chapter_keys = list(st.session_state.stages.keys()) | |
| # Always allow navigation to completed stages | |
| for i, key in enumerate(chapter_keys): | |
| if st.session_state.stages[key]['status'] == 'completed': | |
| navigatable.append(key) | |
| # Find the first non-completed stage (if any) | |
| next_stage = None | |
| for key in chapter_keys: | |
| if st.session_state.stages[key]['status'] != 'completed': | |
| next_stage = key | |
| break | |
| # Add the next non-completed stage to navigatable list (if found) | |
| if next_stage: | |
| navigatable.append(next_stage) | |
| # Ensure the first stage is always navigatable | |
| if not navigatable and chapter_keys: | |
| navigatable.append(chapter_keys[0]) | |
| return navigatable | |
| def get_stage_status_icon(status): | |
| """Returns an icon based on stage status.""" | |
| if status == "completed": | |
| return "✅" | |
| elif status == "in_progress": | |
| return "⏳" | |
| else: # not_started | |
| return "📄" # Use a neutral icon for not started but accessible | |
| def get_stage_display_status(key, navigatable_stages): | |
| """ Gets status or locked state """ | |
| if key not in st.session_state.stages: | |
| return "locked" # Should not happen with current logic, but safeguard | |
| if key in navigatable_stages: | |
| return st.session_state.stages[key]["status"] | |
| else: | |
| return "locked" | |
| def generate_markdown_export(): | |
| """Concatenates all stage content into a single markdown string.""" | |
| # If we have a modeling agent, use its paper content | |
| if st.session_state.agent_initialized and st.session_state.modeling_agent: | |
| paper = st.session_state.modeling_agent.get_paper() | |
| with open('paper.json', 'w') as f: | |
| json.dump(paper, f, indent=2) | |
| full_doc = f"# {st.session_state.current_problem_title}\n\n" | |
| full_doc += f"## Problem Description\n\n{st.session_state.current_problem_details}\n\n---\n\n" | |
| # Add problem analysis if available | |
| if 'problem_analysis' in paper: | |
| full_doc += f"# Problem Analysis\n\n{paper['problem_analysis']}\n\n---\n\n" | |
| # Add high-level modeling if available | |
| if 'high_level_modeling' in paper: | |
| full_doc += f"# High-Level Modeling\n\n{paper['high_level_modeling']}\n\n---\n\n" | |
| # Add task decomposition if available | |
| if 'task_decomposition_summary' in paper: | |
| full_doc += f"# Task Decomposition\n\n{paper['task_decomposition_summary']}\n\n---\n\n" | |
| # Add task dependency analysis if available | |
| if 'task_dependency_analysis' in paper: | |
| full_doc += f"# Task Dependency Analysis\n\n" | |
| if isinstance(paper['task_dependency_analysis'], list): | |
| for i, analysis in enumerate(paper['task_dependency_analysis']): | |
| full_doc += f"## Task {i+1} Dependencies\n{analysis}\n\n" | |
| else: | |
| full_doc += f"{paper['task_dependency_analysis']}\n\n" | |
| full_doc += "---\n\n" | |
| # Add all tasks | |
| for i, task in enumerate(paper.get('tasks', [])): | |
| if task: # Only include non-empty task dictionaries | |
| task_id = i + 1 | |
| full_doc += f"# Task {task_id}\n\n" | |
| # Add task components in a logical order | |
| components_order = [ | |
| 'task_description', | |
| 'task_analysis', | |
| 'preliminary_formulas', | |
| 'mathematical_modeling_process', | |
| 'solution_interpretation', | |
| 'subtask_outcome_analysis', | |
| 'charts' | |
| ] | |
| for component in components_order: | |
| if component in task: | |
| component_title = component.replace('_', ' ').title() | |
| content = task[component] | |
| if component == 'charts' and isinstance(content, list): | |
| full_doc += f"## {component_title}\n\n" | |
| for j, chart in enumerate(content): | |
| full_doc += f"### Chart {j+1}\n{chart}\n\n" | |
| else: | |
| full_doc += f"## {component_title}\n\n{content}\n\n" | |
| full_doc += "---\n\n" | |
| return full_doc | |
| else: | |
| # Fall back to the original implementation | |
| full_doc = f"# {st.session_state.current_problem_title}\n\n" | |
| full_doc += f"## Problem Description\n\n{st.session_state.current_problem_details}\n\n---\n\n" | |
| for key, data in st.session_state.stages.items(): | |
| # Include content of all stages, even if not started | |
| full_doc += data["content"] + "\n\n---\n\n" | |
| return full_doc | |
| # Instead of static predefinition, we'll create a function to build the chapter structure dynamically | |
| def get_chapter_structure(): | |
| """Dynamically build the chapter structure using ModelingAgentSystem interfaces.""" | |
| if st.session_state.agent_initialized and st.session_state.modeling_agent: | |
| # Get all planned steps (complete structure) | |
| planned_steps = st.session_state.modeling_agent.get_planned_steps() | |
| # Build chapter structure | |
| chapter_structure = OrderedDict() | |
| for step_name in planned_steps: | |
| # Extract title and create a prompt hint | |
| title = step_name | |
| prompt_hint = f"Complete the {step_name} section. {stages_config.get(step_name, '')}" | |
| # For task steps, create more specific prompts | |
| if GenerationStep.is_task_step(step_name): | |
| task_id, step_type = GenerationStep.parse_task_step(step_name) | |
| if task_id and step_type: | |
| prompt_hint = f"Complete the {step_type} for Task {task_id}." | |
| chapter_structure[step_name] = { | |
| "title": title, | |
| "prompt_hint": prompt_hint | |
| } | |
| return chapter_structure | |
| else: | |
| # Fallback to the original definition if modeling_agent is not available | |
| return OrderedDict(config['stages']) | |
| def sync_stages_with_agent(): | |
| """Synchronizes session stages with modeling agent progress""" | |
| if not st.session_state.agent_initialized or not st.session_state.modeling_agent: | |
| return | |
| # Get agent's current state | |
| completed_steps = st.session_state.modeling_agent.get_completed_steps() | |
| planned_steps = st.session_state.modeling_agent.get_planned_steps() | |
| paper = st.session_state.modeling_agent.get_paper() | |
| # Get current chapter structure | |
| chapter_structure = get_chapter_structure() | |
| # First, update stage structure - add any new steps | |
| for step_name in planned_steps: | |
| if step_name not in st.session_state.stages: | |
| prompt_hint = chapter_structure.get(step_name, {}).get('prompt_hint', f"Complete the {step_name} section. {stages_config.get(step_name, '')}") | |
| st.session_state.stages[step_name] = { | |
| "title": step_name, | |
| "content": f"# {step_name}\n\n*(Content not generated yet. Use the '✨ Generate Content' button and '✏️ Edit Content' after generation.)*", | |
| "status": "completed" if step_name in completed_steps else "not_started", | |
| "edit_mode": False, | |
| "prompt_hint": prompt_hint | |
| } | |
| # Update stage statuses | |
| for step_name in st.session_state.stages: | |
| if step_name in completed_steps: | |
| st.session_state.stages[step_name]["status"] = "completed" | |
| # Update content based on what's in the paper | |
| # Sync Problem Background and Problem Requirement from paper | |
| if 'problem_background' in paper and 'Problem Background' in st.session_state.stages: | |
| st.session_state.stages['Problem Background']['content'] = f"# Problem Background\n\n{paper['problem_background']}" | |
| if 'problem_requirement' in paper and 'Problem Requirement' in st.session_state.stages: | |
| st.session_state.stages['Problem Requirement']['content'] = f"# Problem Requirement\n\n{paper['problem_requirement']}" | |
| # Update Problem Analysis content | |
| if 'problem_analysis' in paper and 'Problem Analysis' in st.session_state.stages: | |
| st.session_state.stages['Problem Analysis']['content'] = f"# Problem Analysis\n\n{paper['problem_analysis']}" | |
| # Update High-Level Modeling content | |
| if 'high_level_modeling' in paper and 'High-Level Modeling' in st.session_state.stages: | |
| st.session_state.stages['High-Level Modeling']['content'] = f"# High-Level Modeling\n\n{paper['high_level_modeling']}" | |
| # Update Task Decomposition content | |
| if 'task_decomposition_summary' in paper and 'Task Decomposition' in st.session_state.stages: | |
| st.session_state.stages['Task Decomposition']['content'] = f"# Task Decomposition\n\n{paper['task_decomposition_summary']}" | |
| # Update Task Dependency Analysis content | |
| if 'task_dependency_analysis' in paper and 'Dependency Analysis' in st.session_state.stages: | |
| dependency_content = "# Task Dependency Analysis\n\n" | |
| if isinstance(paper['task_dependency_analysis'], list): | |
| for i, analysis in enumerate(paper['task_dependency_analysis']): | |
| dependency_content += f"## Task {i+1} Dependencies\n{analysis}\n\n" | |
| else: | |
| dependency_content += str(paper['task_dependency_analysis']) | |
| st.session_state.stages['Dependency Analysis']['content'] = dependency_content | |
| # Update task-specific contents | |
| if 'tasks' in paper: | |
| for task_index, task_dict in enumerate(paper['tasks']): | |
| if not task_dict: # Skip empty task dictionaries | |
| continue | |
| task_id = task_index + 1 # 1-based ID for display | |
| # Map each task component to the corresponding step | |
| component_to_step = { | |
| 'task_description': f'Task {task_id} Description', | |
| 'task_analysis': f'Task {task_id} Analysis', | |
| 'preliminary_formulas': f'Task {task_id} Preliminary Formulas', | |
| 'mathematical_modeling_process': f'Task {task_id} Mathematical Modeling Process', | |
| 'task_code': f'Task {task_id} Code', | |
| 'solution_interpretation': f'Task {task_id} Solution Interpretation', | |
| 'subtask_outcome_analysis': f'Task {task_id} Subtask Outcome Analysis', | |
| 'charts': f'Task {task_id} Charts' | |
| } | |
| # Update each component if it exists | |
| for component, step_name in component_to_step.items(): | |
| if component in task_dict and step_name in st.session_state.stages: | |
| content = task_dict[component] | |
| # Format content based on component type | |
| if component == 'charts': | |
| formatted_content = f"# Charts for Task {task_id}\n\n" | |
| if isinstance(content, list): | |
| for i, chart in enumerate(content): | |
| formatted_content += f"## Chart {i+1}\n{chart}\n\n" | |
| else: | |
| formatted_content += str(content) | |
| else: | |
| # Default formatting | |
| formatted_content = f"# {step_name}\n\n{content}" | |
| st.session_state.stages[step_name]['content'] = formatted_content | |
| def _handle_content_edit(active_stage_key, new_content): | |
| """将用户编辑的内容同步回 ModelingAgentSystem""" | |
| if not st.session_state.agent_initialized or not st.session_state.modeling_agent: | |
| return | |
| # 更新 session_state 中的内容 | |
| st.session_state.stages[active_stage_key]['content'] = new_content | |
| # 根据步骤类型更新 agent 的 paper 字典 | |
| agent = st.session_state.modeling_agent | |
| paper = agent.paper | |
| if active_stage_key == 'Problem Analysis': | |
| paper['problem_analysis'] = new_content.replace('# Problem Analysis\n\n', '') | |
| elif active_stage_key == 'High-Level Modeling': | |
| paper['high_level_modeling'] = new_content.replace('# High-Level Modeling\n\n', '') | |
| elif active_stage_key == 'Task Decomposition': | |
| paper['task_decomposition_summary'] = new_content.replace('# Task Decomposition\n\n', '') | |
| elif active_stage_key == 'Dependency Analysis': | |
| # 可能需要特殊处理,因为这可能是一个结构化的内容 | |
| clean_content = new_content.replace('# Task Dependency Analysis\n\n', '') | |
| paper['task_dependency_analysis'] = clean_content | |
| elif active_stage_key.startswith('Task '): | |
| # 解析任务 ID 和步骤类型 | |
| match = re.match(r"Task (\d+) (.*)", active_stage_key) | |
| if match: | |
| task_id = int(match.group(1)) | |
| step_type = match.group(2) | |
| task_index = task_id - 1 | |
| # 确保 task 列表足够长 | |
| while len(paper['tasks']) <= task_index: | |
| paper['tasks'].append({}) | |
| # 映射步骤类型到 paper 中的键 | |
| step_to_key = { | |
| 'Description': 'task_description', | |
| 'Analysis': 'task_analysis', | |
| 'Preliminary Formulas': 'preliminary_formulas', | |
| 'Mathematical Modeling Process': 'mathematical_modeling_process', | |
| 'Solution Interpretation': 'solution_interpretation', | |
| 'Subtask Outcome Analysis': 'subtask_outcome_analysis', | |
| 'Charts': 'charts' | |
| } | |
| if step_type in step_to_key: | |
| key = step_to_key[step_type] | |
| clean_content = new_content.replace(f'# {active_stage_key}\n\n', '') | |
| paper['tasks'][task_index][key] = clean_content | |
| # 同时更新协调器的内存,如果适用 | |
| if step_type != 'Charts': # Charts 可能有特殊格式 | |
| agent.coordinator.memory.setdefault(str(task_id), {})[key] = clean_content | |
| # --- Main App Logic --- | |
| initialize_session_state() | |
| # Only show title and caption on the initial page | |
| if not st.session_state.problem_defined: | |
| st.title(f"{APP_TITLE}") # Added a bit of flair | |
| st.caption("An AI-assisted platform for structuring and drafting mathematical modeling reports.") | |
| # --- Sidebar --- | |
| with st.sidebar: | |
| st.header("⚙️ Configuration") | |
| # Use secrets if available, otherwise show inputs | |
| api_base_provided = bool(st.session_state.api_base_url) | |
| api_key_provided = bool(st.session_state.api_key) | |
| api_model_provided = bool(st.session_state.api_model) | |
| with st.expander("LLM API Details", expanded=not (api_base_provided and api_key_provided and api_model_provided)): | |
| # Sync with main content if those fields exist | |
| if 'api_base_url_main' in st.session_state: | |
| st.session_state.api_base_url = st.session_state.api_base_url_main | |
| if 'api_key_main' in st.session_state: | |
| st.session_state.api_key = st.session_state.api_key_main | |
| if 'api_model_main' in st.session_state: | |
| st.session_state.api_model = st.session_state.api_model_main | |
| st.text_input( | |
| "API Base URL", | |
| value=st.session_state.api_base_url, | |
| key="api_base_url", | |
| placeholder="e.g., https://api.openai.com/v1", | |
| help="Your OpenAI compatible API endpoint." | |
| ) | |
| st.text_input( | |
| "API Key", | |
| value=st.session_state.api_key, | |
| key="api_key", | |
| type="password", | |
| help="Your OpenAI compatible API key. Can also be set via Streamlit secrets (API_KEY)." | |
| ) | |
| st.text_input( | |
| "Model Name", | |
| value=st.session_state.api_model, | |
| key="api_model", | |
| placeholder="e.g., gpt-4-turbo", | |
| help="The specific model to use for generation." | |
| ) | |
| if st.button("Save", key="save_api_settings", type="secondary", use_container_width=True): | |
| st.session_state.api_base_url_main = st.session_state.api_base_url | |
| st.session_state.api_key_main = st.session_state.api_key | |
| st.session_state.api_model_main = st.session_state.api_model | |
| update_api_settings() | |
| st.divider() | |
| # Requirement 1: Put Problem Definition in a controllable expander | |
| st.header("🔍 Problem Definition") | |
| with st.expander("Problem Background & Requirements", expanded=st.session_state.problem_definition_expanded): | |
| # Check if API keys are provided before allowing problem definition | |
| api_configured = bool(st.session_state.api_base_url and st.session_state.api_key and st.session_state.api_model) | |
| if not api_configured: | |
| st.warning("Please provide valid API Base URL, API Key, and Model Name in the configuration above to define a problem.") | |
| # Don't st.stop() here, allow viewing config expander | |
| else: | |
| problem_input_method = st.radio( | |
| "Select Problem Source:", | |
| ["Select Predefined", "Input Custom"], | |
| key="problem_input_method", | |
| horizontal=True, | |
| # help="Choose a built-in MCM/ICM problem or define your own." | |
| ) | |
| confirm_problem = False | |
| if st.session_state.problem_input_method == "Select Predefined": | |
| st.selectbox( | |
| "Choose MCM/ICM Problem:", | |
| options=list(MCM_PROBLEMS.keys()), | |
| format_func=lambda x: f"MCM_{x}", # Show full title | |
| key="selected_mcm_problem" | |
| ) | |
| if st.button("Load Problem", type="primary", key="load_predefined", use_container_width=True): | |
| confirm_problem = True | |
| problem_key = st.session_state.selected_mcm_problem | |
| st.session_state.current_problem_title = problem_key # MCM_PROBLEMS[problem_key]['title'] | |
| st.session_state.current_problem_details = f"**Background:**\n{MCM_PROBLEMS[problem_key]['background']}\n\n**Requirements:**\n{MCM_PROBLEMS[problem_key]['problem_requirement']}" | |
| else: # Input Custom | |
| # Removing the custom title input field | |
| # st.text_input("Custom Problem Title:", key="custom_title", placeholder="Enter a short title for your problem") | |
| st.text_area("Problem Background:", key="custom_background", height=150, placeholder="Provide context and background information.") | |
| st.text_area("Problem Requirements:", key="custom_requirements", height=100, placeholder="Detail the specific tasks or questions to be addressed.") | |
| # Add file upload functionality | |
| st.subheader("Upload Data Files") | |
| uploaded_files = st.file_uploader( | |
| "Upload CSV or Excel files for your problem (optional)", | |
| type=["csv", "xlsx", "xls"], | |
| accept_multiple_files=True, | |
| help="Data files will be available for the modeling agent to use." | |
| ) | |
| # Process uploaded files | |
| if uploaded_files and len(uploaded_files) > 0: | |
| # Clear previous uploads if new files are uploaded | |
| if "last_upload_count" not in st.session_state or st.session_state.last_upload_count != len(uploaded_files): | |
| st.session_state.uploaded_files = [] | |
| st.session_state.last_upload_count = len(uploaded_files) | |
| # Save uploaded files | |
| for uploaded_file in uploaded_files: | |
| # Check if file was already processed | |
| file_already_processed = any(info['name'] == uploaded_file.name for info in st.session_state.uploaded_files) | |
| if not file_already_processed: | |
| # Create directory for files if it doesn't exist | |
| file_dir = Path('data') / st.session_state.session_id / 'data' | |
| file_dir.mkdir(parents=True, exist_ok=True) | |
| # Save file to disk | |
| file_path = file_dir / uploaded_file.name | |
| with open(file_path, "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| # Try to read file to get preview | |
| preview = None | |
| if uploaded_file.name.endswith(('.csv', '.xlsx', '.xls')): | |
| try: | |
| if uploaded_file.name.endswith('.csv'): | |
| df = pd.read_csv(file_path) | |
| else: | |
| df = pd.read_excel(file_path) | |
| preview = df.head(5) | |
| except Exception as e: | |
| preview = f"Error reading file: {str(e)}" | |
| # Add file info to session state | |
| st.session_state.uploaded_files.append({ | |
| 'name': uploaded_file.name, | |
| 'path': str(file_path.absolute()), | |
| 'preview': preview | |
| }) | |
| # Display uploaded files | |
| if st.session_state.uploaded_files: | |
| st.success(f"{len(st.session_state.uploaded_files)} file(s) uploaded successfully") | |
| for file_info in st.session_state.uploaded_files: | |
| # Replace nested expander with a container and bolded title | |
| with st.container(border=True): | |
| st.markdown(f"**📄 {file_info['name']}**") | |
| if isinstance(file_info['preview'], pd.DataFrame): | |
| st.dataframe(file_info['preview']) | |
| else: | |
| st.write(file_info['preview']) | |
| if st.button("Set Custom Problem", type="primary", key="load_custom", use_container_width=True): | |
| if st.session_state.custom_background and st.session_state.custom_requirements: | |
| confirm_problem = True | |
| # Use "Custom Problem" as the default title instead of custom_title | |
| st.session_state.current_problem_title = "Custom Problem" | |
| st.session_state.current_problem_details = f"**Background:**\n{st.session_state.custom_background}\n\n**Requirements:**\n{st.session_state.custom_requirements}" | |
| else: | |
| st.warning("Please provide background and requirements for the custom problem.") | |
| # Handle problem confirmation and stage reset | |
| if confirm_problem: | |
| if st.session_state.problem_defined: # If a problem was already defined, show info about reset | |
| st.toast("Reloading problem: Existing stage content and progress will be reset.") | |
| time.sleep(1) # Give user time to see toast | |
| st.session_state.problem_defined = True | |
| st.session_state.problem_definition_expanded = False # Requirement 1: Collapse expander | |
| reset_stages() | |
| st.rerun() # Rerun to update sidebar navigation and main content area | |
| # --- Stage Navigation (Displayed only if a problem is defined) --- | |
| if st.session_state.problem_defined: | |
| st.divider() | |
| st.header("📚 Stages") | |
| navigatable_stages = get_navigatable_stages() | |
| # Ensure the current active stage is valid | |
| if st.session_state.active_stage not in navigatable_stages: | |
| # If current active is somehow locked (e.g., after loading a new problem), | |
| # default to the last navigatable one. | |
| if navigatable_stages: | |
| st.session_state.active_stage = navigatable_stages[-1] | |
| else: # Should not happen, but fallback to first stage | |
| # st.session_state.active_stage = list(st.session_state.stages.keys())[0] | |
| pass | |
| # Requirement 2 & 3: Use buttons for navigation, disable locked stages | |
| # Using a container to apply styles easier via CSS selector | |
| with st.container(border=False): | |
| st.markdown('<div data-testid="stSidebarNavItems">', unsafe_allow_html=True) # Wrapper for CSS targeting | |
| for key, data in st.session_state.stages.items(): | |
| stage_info = st.session_state.stages.get(key) | |
| if stage_info: | |
| is_navigatable = key in navigatable_stages | |
| is_active = key == st.session_state.active_stage | |
| display_status = get_stage_display_status(key, navigatable_stages) | |
| if display_status == "locked": | |
| icon = "🔒" | |
| label_markdown = f"{icon} {stage_info['title']}" # Add non-breaking spaces | |
| else: | |
| icon = get_stage_status_icon(display_status) | |
| label_markdown = f"{icon} {stage_info['title']}" | |
| # Use markdown in button label to render icons correctly | |
| button_label_html = f'<div style="display: flex; align-items: center;">{label_markdown}</div>' | |
| # Set button type: primary if active, secondary otherwise | |
| button_type = "primary" if is_active else "secondary" | |
| if st.button( | |
| label=label_markdown, # Use markdown directly | |
| key=f"nav_{key}", | |
| disabled=not is_navigatable, | |
| use_container_width=True, | |
| type=button_type, | |
| help=f"Status: {display_status.replace('_', ' ').title()}" if is_navigatable else "Complete previous stages to unlock" | |
| ): | |
| if is_navigatable and not is_active: | |
| st.session_state.active_stage = key | |
| # Turn off edit mode when switching stages | |
| if st.session_state.stages[key].get('edit_mode', False): | |
| st.session_state.stages[key]['edit_mode'] = False | |
| st.rerun() | |
| st.markdown('</div>', unsafe_allow_html=True) # Close wrapper | |
| st.divider() | |
| st.header("📄 Solution Report") | |
| if st.session_state.stages: | |
| markdown_content = generate_markdown_export() | |
| st.download_button( | |
| label="📥 Export Intermediate Process (.md)", | |
| data=markdown_content, | |
| file_name=f"mapp_export_{st.session_state.current_problem_title.replace(' ', '_').lower()}.md", | |
| mime="text/markdown", | |
| use_container_width=True | |
| ) | |
| # 检查是否所有章节都已完成 | |
| all_completed = all(stage_data["status"] == "completed" for stage_data in st.session_state.stages.values()) | |
| # 添加导出完整报告按钮(只有在所有章节完成后才能点击) | |
| if st.button( | |
| "📊 Export Solution Report (.latex & .pdf)", | |
| disabled=not all_completed, | |
| use_container_width=True, | |
| help="Generate and download a complete LaTeX and PDF report (available after all stages are completed)" | |
| ): | |
| if st.session_state.agent_initialized and st.session_state.modeling_agent: | |
| with st.spinner("Generating LaTeX and PDF solution report... This may take a few minutes."): | |
| try: | |
| # 获取输出路径 | |
| output_path = str(Path('data') / st.session_state.session_id / 'output') | |
| # 调用agent生成LaTeX和PDF | |
| st.session_state.modeling_agent.generate_paper(output_path) | |
| # 存储文件路径到session_state,使按钮可以在后续渲染中保持显示 | |
| st.session_state.latex_path = f'{output_path}/latex/solution.tex' | |
| st.session_state.pdf_path = f'{output_path}/latex/solution.pdf' | |
| except Exception as e: | |
| st.error(f"Error generating report: {str(e)}") | |
| import traceback | |
| st.error(traceback.format_exc()) | |
| st.rerun() # 刷新页面以显示下载按钮 | |
| else: | |
| st.error("ModelingAgentSystem not initialized. Please check API configuration.") | |
| # 检查session_state中是否有生成的文件路径,并显示对应的下载按钮 | |
| if hasattr(st.session_state, 'latex_path') and Path(st.session_state.latex_path).exists(): | |
| with open(st.session_state.latex_path, "rb") as f: | |
| st.download_button( | |
| label="📥 Download LaTeX (.tex)", | |
| data=f, | |
| file_name="solution.tex", | |
| mime="application/x-tex", | |
| key="download_latex" # 添加唯一key | |
| ) | |
| if hasattr(st.session_state, 'pdf_path') and Path(st.session_state.pdf_path).exists(): | |
| with open(st.session_state.pdf_path, "rb") as f: | |
| st.download_button( | |
| label="📥 Download PDF Report", | |
| data=f, | |
| file_name="solution.pdf", | |
| mime="application/pdf", | |
| key="download_pdf" # 添加唯一key | |
| ) | |
| else: | |
| st.info("Define a problem and generate content to enable export.") | |
| # --- Main Content Area --- | |
| if not st.session_state.problem_defined: | |
| # Enhanced initial message | |
| st.info("⬅️ Welcome to Mathematical Modeling Agent! Please configure your API details and define a modeling problem using the sidebar to begin.") | |
| st.markdown("---") | |
| st.subheader("How it works:") | |
| st.markdown(""" | |
| 1. **Configure API:** Enter your OpenAI compatible API details in the sidebar. These can also be set via Streamlit secrets (`API_BASE_URL`, `API_KEY`). | |
| 2. **Define Problem:** Choose a predefined problem or input your own custom problem description and requirements. | |
| 3. **Navigate Stages:** Use the sidebar to move through the standard sections of a modeling report. Stages unlock as you mark previous ones complete. | |
| 4. **Generate & Edit:** For each stage, you can: | |
| * Use the **✨ Generate Content** button (with optional instructions) to get an initial draft from the AI (mock generation in this version). | |
| * **✏️ Edit Content** after generation. | |
| * Mark stages as **✅ Complete** to unlock the next one. | |
| 5. **Export:** Download your progress as a Markdown file at any time, or export your complete solution as both LaTeX and PDF files when finished. | |
| """) | |
| else: | |
| active_stage_key = st.session_state.active_stage | |
| stage_data = st.session_state.stages[active_stage_key] | |
| # Display Stage Title and Goal | |
| st.header(f"{stage_data['title']}") | |
| st.markdown(f"> **Goal:** *{stage_data['prompt_hint']}*") | |
| st.divider() # Requirement 3: Use dividers for better separation | |
| # --- AI Generation Controls --- | |
| # Requirement 3: Optimize UI layout | |
| with st.container(border=True): # Put generation controls in a bordered container | |
| st.subheader("🚀 Agent Content Generation") | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| user_prompt = st.text_area( | |
| "Instructions / Prompt Refinement:", | |
| key=f"prompt_{active_stage_key}", | |
| placeholder="Optional: Provide specific instructions, focus points, or data for the Agent to use in this stage.", | |
| help="Guide the AI generation for this specific stage.", | |
| height=100 | |
| ) | |
| with col2: | |
| st.session_state.critique_rounds = st.slider( | |
| "Critic Rounds", 0, 3, st.session_state.critique_rounds, | |
| help="Simulated self-critique iterations for the AI (0-3). More rounds might improve quality but take longer (mock only).", | |
| key=f"critique_{active_stage_key}" # Unique key per stage | |
| ) | |
| if st.button("✨ Generate Content", key=f"generate_{active_stage_key}", type="primary", use_container_width=True): | |
| if not st.session_state.agent_initialized or st.session_state.modeling_agent is None: | |
| st.error("ModelingAgentSystem not initialized. Please check API configuration.") | |
| else: | |
| with st.spinner(f"🤖 Generating content for '{active_stage_key}'... Please wait."): | |
| try: | |
| # Update the critique rounds in case they changed | |
| st.session_state.modeling_agent.config['problem_analysis_round'] = st.session_state.critique_rounds | |
| st.session_state.modeling_agent.config['problem_modeling_round'] = st.session_state.critique_rounds | |
| st.session_state.modeling_agent.config['task_formulas_round'] = st.session_state.critique_rounds | |
| # Call the agent's generate_step method | |
| if user_prompt: | |
| print(user_prompt, st.session_state.critique_rounds) | |
| success = st.session_state.modeling_agent.generate_step(active_stage_key, user_prompt=user_prompt, round=st.session_state.critique_rounds) | |
| if success: | |
| # Sync stages with the updated agent state | |
| sync_stages_with_agent() | |
| # Update the stage status | |
| if st.session_state.stages[active_stage_key]['status'] == 'not_started': | |
| st.session_state.stages[active_stage_key]['status'] = 'in_progress' | |
| st.success(f"Successfully generated content for '{active_stage_key}'!") | |
| # Check if we should advance to next stage | |
| completed_steps = st.session_state.modeling_agent.get_completed_steps() | |
| if active_stage_key in completed_steps: | |
| st.session_state.stages[active_stage_key]['status'] = 'completed' | |
| else: | |
| st.error(f"Could not generate content for '{active_stage_key}'. Check dependencies or retry.") | |
| except Exception as e: | |
| st.error(f"Error generating content: {str(e)}") | |
| import traceback | |
| st.error(traceback.format_exc()) | |
| st.rerun() # Update display immediately | |
| st.divider() # Requirement 3: Use dividers | |
| # --- Content Display and Editing --- | |
| # st.subheader("Stage Content") | |
| edit_mode = st.session_state.stages[active_stage_key].get('edit_mode', False) | |
| content = st.session_state.stages[active_stage_key]['content'] | |
| is_completed = st.session_state.stages[active_stage_key]['status'] == 'completed' | |
| if edit_mode: | |
| # --- Edit Mode --- | |
| st.info("✍️ You are in Edit Mode. Use Markdown syntax. Save or Cancel when done.") | |
| new_content = st.text_area( | |
| "Edit Content (Markdown Supported)", | |
| value=content, | |
| key=f"editor_{active_stage_key}", | |
| height=500, # Increased height for editing | |
| label_visibility="collapsed" | |
| ) | |
| col_save, col_cancel, _ = st.columns([1, 1, 4]) # Keep button layout simple | |
| with col_save: | |
| if st.button("💾 Save Changes", key=f"save_{active_stage_key}", type="primary"): | |
| _handle_content_edit(active_stage_key, new_content) | |
| st.session_state.stages[active_stage_key]['edit_mode'] = False | |
| # If content is saved and stage was 'not_started', move to 'in_progress' | |
| if st.session_state.stages[active_stage_key]['status'] == 'not_started': | |
| st.session_state.stages[active_stage_key]['status'] = 'in_progress' | |
| st.toast("Changes saved!", icon="💾") | |
| st.rerun() | |
| with col_cancel: | |
| if st.button("❌ Cancel Edit", key=f"cancel_{active_stage_key}"): | |
| st.session_state.stages[active_stage_key]['edit_mode'] = False | |
| st.rerun() | |
| else: | |
| # --- View Mode --- | |
| with st.container(border=True): # Put content in a bordered container for visual grouping | |
| st.markdown(content, unsafe_allow_html=True) # Render the markdown content | |
| # st.markdown("---") # Separator before action buttons | |
| # Action buttons layout - Requirement 3: Optimize UI layout | |
| cols = st.columns(3) | |
| with cols[0]: | |
| if st.button("✏️ Edit Content", key=f"edit_{active_stage_key}", use_container_width=True, disabled=edit_mode): | |
| st.session_state.stages[active_stage_key]['edit_mode'] = True | |
| st.rerun() | |
| with cols[1]: | |
| # Allow marking as complete only if not already completed | |
| if not is_completed: | |
| if st.button("✅ Mark as Complete", key=f"complete_{active_stage_key}", use_container_width=True): | |
| st.session_state.stages[active_stage_key]['status'] = 'completed' | |
| st.toast(f"Stage '{stage_data['title']}' marked complete!", icon="✅") | |
| # Try to advance to the next stage automatically | |
| stage_keys = list(st.session_state.stages.keys()) | |
| current_index = stage_keys.index(active_stage_key) | |
| if current_index + 1 < len(stage_keys): | |
| next_stage_key = stage_keys[current_index + 1] | |
| # Unlock the next stage by setting its status to in_progress if not started | |
| if st.session_state.stages[next_stage_key]['status'] == 'not_started': | |
| st.session_state.stages[next_stage_key]['status'] = 'in_progress' | |
| st.session_state.active_stage = next_stage_key # Move focus | |
| else: | |
| st.success("🎉 All stages completed!") # Optional: Message when last stage is done | |
| st.rerun() # Rerun to update sidebar icons/state and main view | |
| else: | |
| # 如果已经完成,显示一个可点击的"再次完成"按钮 | |
| if st.button("✅ Completed (Click to advance)", key=f"completed_{active_stage_key}", use_container_width=True): | |
| # 尝试前进到下一个阶段 | |
| stage_keys = list(st.session_state.stages.keys()) | |
| current_index = stage_keys.index(active_stage_key) | |
| if current_index + 1 < len(stage_keys): | |
| next_stage_key = stage_keys[current_index + 1] | |
| # 如果下一阶段未开始,设置为进行中 | |
| if st.session_state.stages[next_stage_key]['status'] == 'not_started': | |
| st.session_state.stages[next_stage_key]['status'] = 'in_progress' | |
| st.session_state.active_stage = next_stage_key # 移动焦点 | |
| st.rerun() # 重新运行以更新侧边栏图标/状态和主视图 | |
| else: | |
| st.success("🎉 All stages completed!") # 可选:完成最后一个阶段时的消息 | |
| # Placeholder for potential future actions in the third column | |
| # with cols[2]: | |
| # st.button("Other Action?", use_container_width=True) | |
| # --- Footer --- | |
| st.markdown("---") | |
| st.caption("Mathematical Modeling Multi-Agent System | Prototype") | |
| # 在重要操作后添加同步调用 | |
| def on_page_load(): | |
| """页面加载时同步代理状态""" | |
| if st.session_state.agent_initialized and st.session_state.modeling_agent: | |
| sync_stages_with_agent() | |
| # 在 app.py 主循环的开始处调用 | |
| if st.session_state.problem_defined and st.session_state.agent_initialized: | |
| on_page_load() | |