MathematicalModelingAgent's picture
fix bugs
4569928
import json
import os
from pathlib import Path
from datetime import datetime
from enum import Enum, auto
import re
import uuid
import pandas as pd
import streamlit as st
import time
from collections import OrderedDict
from html_style import html_style
from config import config, MCM_PROBLEMS, stages_config
from agent_api import ModelingAgentSystem
# Define enum for mapping UI stages to agent steps
class GenerationStep(Enum):
PROBLEM_ANALYSIS = "Problem Analysis"
HIGH_LEVEL_MODELING = "High-Level Modeling"
TASK_DECOMPOSITION = "Task Decomposition"
DEPENDENCY_ANALYSIS = "Dependency Analysis"
# Task-specific steps follow pattern: Task {N} {StepType}
@classmethod
def get_task_step(cls, task_id, step_type):
"""Returns a step name for a specific task and step type."""
return f"Task {task_id} {step_type}"
@classmethod
def is_task_step(cls, step_name):
"""Checks if a step name belongs to a task-specific step."""
return step_name.startswith("Task ")
@classmethod
def parse_task_step(cls, step_name):
"""Parses a task step name into task_id and step_type components."""
if not cls.is_task_step(step_name):
return None, None
parts = step_name.split(" ", 2)
if len(parts) < 3:
return None, None
try:
task_id = int(parts[1])
step_type = parts[2]
return task_id, step_type
except ValueError:
return None, None
# --- Configuration ---
APP_TITLE = "Mathematical Modeling Multi-Agent System"
st.set_page_config(page_title=APP_TITLE, layout="wide", initial_sidebar_state="expanded")
# --- Custom CSS for Styling ---
# Requirement 4: Optimize UI style and color scheme
st.markdown(html_style, unsafe_allow_html=True)
# Define Stage Structure (Template)
# CHAPTER_STRUCTURE = OrderedDict(config['stages'])
# --- Helper Functions ---
def initialize_session_state():
"""Initializes session state variables if they don't exist."""
defaults = {
"api_base_url": "", # Default empty string
"api_key": "", # Default empty string
"api_model": "", # Default model suggestion
"problem_input_method": "Select Predefined",
"selected_mcm_problem": list(MCM_PROBLEMS.keys())[0],
"custom_background": "",
"custom_requirements": "",
"problem_defined": False,
"problem_definition_expanded": True, # Requirement 1: Control expander state
"current_problem_title": "",
"current_problem_details": "",
"stages": {}, # Stores content, status, edit_mode for each stage
"active_stage": None, # Will be set after chapter structure is determined
"critique_rounds": 0,
"modeling_agent": None, # Will store the ModelingAgentSystem instance
"agent_initialized": False, # Flag to track if agent was initialized
"session_id": str(uuid.uuid4()), # Generate a unique session ID
"uploaded_files": [], # Track uploaded data files
}
# # Try to load secrets if available
# try:
# if "API_BASE_URL" in st.secrets:
# defaults["api_base_url"] = st.secrets["API_BASE_URL"]
# if "API_KEY" in st.secrets:
# defaults["api_key"] = st.secrets["API_KEY"]
# except FileNotFoundError:
# # Secrets file not found, use defaults
# pass
for key, value in defaults.items():
if key not in st.session_state:
st.session_state[key] = value
# Initialize stage structure in session state if problem is defined but stages aren't init'd
# or if the stages structure is empty
if st.session_state.problem_defined and not st.session_state.stages:
reset_stages()
def reset_stages():
""" Resets stage content and progress. """
# Initialize ModelingAgentSystem if API is configured
if st.session_state.api_base_url and st.session_state.api_key and st.session_state.api_model:
initialize_modeling_agent()
# If agent is initialized, use its planned steps
if st.session_state.agent_initialized and st.session_state.modeling_agent:
# Get the planned steps from the agent
planned_steps = st.session_state.modeling_agent.get_planned_steps()
completed_steps = st.session_state.modeling_agent.get_completed_steps()
# Initialize stages based on planned steps
st.session_state.stages = {}
for step_name in planned_steps:
status = "completed" if step_name in completed_steps else "not_started"
# For the first non-completed step, mark as in_progress
if status == "not_started" and not any(s["status"] == "in_progress" for s in st.session_state.stages.values()):
status = "in_progress"
st.session_state.stages[step_name] = {
"title": step_name, # Use step name as title
"content": f"# {step_name}\n\n*(Content not generated yet. Use the '✨ Generate Content' button and '✏️ Edit Content' after generation.)*",
"status": status,
"edit_mode": False,
"prompt_hint": f"Complete the {step_name} section. {stages_config.get(step_name, '')}" # Default prompt hint
}
# Set active stage to first in-progress stage, or first stage if none
in_progress_stages = [k for k, v in st.session_state.stages.items() if v["status"] == "in_progress"]
if in_progress_stages:
st.session_state.active_stage = in_progress_stages[0]
elif st.session_state.stages:
st.session_state.active_stage = next(iter(st.session_state.stages))
else:
# Fallback to static structure if agent is not initialized
chapter_structure = OrderedDict(config['stages'])
st.session_state.stages = {
key: {"title": data["title"],
"content": f"# {data['title']}\n\n*(Content not generated yet. Use the '✨ Generate Content' button and '✏️ Edit Content' after generation.)*",
"status": "not_started", # States: not_started, in_progress, completed
"edit_mode": False,
"prompt_hint": data["prompt_hint"]}
for key, data in chapter_structure.items()
}
# Mark the first stage as in_progress initially
if chapter_structure:
first_stage_key = list(chapter_structure.keys())[0]
if first_stage_key in st.session_state.stages:
st.session_state.stages[first_stage_key]["status"] = "in_progress"
st.session_state.active_stage = first_stage_key
def initialize_modeling_agent():
"""Initialize the ModelingAgentSystem with current configuration"""
# Check if API details are provided in session state first
if not st.session_state.api_base_url or not st.session_state.api_key or not st.session_state.api_model:
st.error("API Base URL, API Key, and Model Name must be configured before initializing the agent.")
st.session_state.modeling_agent = None
st.session_state.agent_initialized = False
return # Stop initialization if config is missing
# Create config for the agent system
agent_config = {
'top_method_num': 6,
'problem_analysis_round': st.session_state.critique_rounds,
'problem_modeling_round': st.session_state.critique_rounds,
'task_formulas_round': st.session_state.critique_rounds,
'tasknum': 1,
'chart_num': 1,
'model_name': st.session_state.api_model,
"method_name": "MAPP-Streamlit"
}
# Create or get problem path based on selected problem
if st.session_state.problem_input_method == "Select Predefined":
problem_key = st.session_state.selected_mcm_problem
# Use problem from predefined problems
problem_path = f'../data/actor_data/input/problem/{problem_key}.json'
print(problem_path)
dataset_path = "" # No custom dataset for predefined problems
else:
# Create dirs for session data
session_dir = Path('data') / st.session_state.session_id
problem_dir = session_dir
data_dir = session_dir / 'data'
output_dir = session_dir / 'output'
for dir_path in [problem_dir, data_dir, output_dir]:
dir_path.mkdir(parents=True, exist_ok=True)
# Create custom problem JSON file
dataset_paths = []
for uploaded_file_info in st.session_state.uploaded_files:
dataset_paths.append(str(uploaded_file_info['path']))
custom_problem = {
"title": "Custom Problem", # Set default title instead of using custom_title
"background": st.session_state.custom_background,
"problem_requirement": st.session_state.custom_requirements,
"dataset_path": dataset_paths,
"dataset_description": {},
"variable_description": {},
"addendum": ""
}
problem_path = problem_dir / 'problem.json'
with open(problem_path, 'w') as f:
json.dump(custom_problem, f, indent=2)
dataset_path = str(data_dir)
# Initialize the agent system
output_path = str(Path('data') / st.session_state.session_id / 'output')
os.makedirs(output_path, exist_ok=True)
try:
# Initialize ModelingAgentSystem
st.session_state.modeling_agent = ModelingAgentSystem(
problem_path=str(problem_path), # Ensure path is a string
config=agent_config,
dataset_path=dataset_path,
output_path=output_path,
name=st.session_state.selected_mcm_problem if st.session_state.problem_input_method == "Select Predefined" else "custom"
)
# Set API details using reset method after initialization
st.session_state.modeling_agent.llm.reset(
api_key=st.session_state.api_key,
api_base=st.session_state.api_base_url,
model_name=st.session_state.api_model
)
# Mark agent as initialized
st.session_state.agent_initialized = True
# Update the current problem details for display
if st.session_state.problem_input_method == "Select Predefined":
problem_key = st.session_state.selected_mcm_problem
st.session_state.current_problem_title = problem_key
st.session_state.current_problem_details = f"**Background:**\n{MCM_PROBLEMS[problem_key]['background']}\n\n**Requirements:**\n{MCM_PROBLEMS[problem_key]['problem_requirement']}"
else:
# Use "Custom Problem" directly instead of custom_title
st.session_state.current_problem_title = "Custom Problem"
st.session_state.current_problem_details = f"**Background:**\n{st.session_state.custom_background}\n\n**Requirements:**\n{st.session_state.custom_requirements}"
except Exception as e:
st.error(f"Failed to initialize modeling agent: {e}")
st.session_state.modeling_agent = None
st.session_state.agent_initialized = False
def update_api_settings():
"""Update API settings for the ModelingAgentSystem in the middle of a session."""
if not st.session_state.agent_initialized or st.session_state.modeling_agent is None:
# st.error("ModelingAgentSystem not initialized. Please define a problem first.")
return False
# Check if we have all required API information
if not (st.session_state.api_base_url and st.session_state.api_key and st.session_state.api_model):
st.error("Please provide valid API Base URL, API Key, and Model Name.")
return False
try:
# Reset the LLM with new API settings
st.session_state.modeling_agent.llm.reset(
api_key=st.session_state.api_key,
api_base=st.session_state.api_base_url,
model_name=st.session_state.api_model
)
print(f'Reset LLM: {st.session_state.api_model}')
# Update the model name in the config
st.session_state.modeling_agent.config['model_name'] = st.session_state.api_model
# Also update the agents that use this LLM
for agent_name in ['pa', 'pm', 'td', 'task', 'mr', 'chart', 'coordinator']:
if hasattr(st.session_state.modeling_agent, agent_name):
agent = getattr(st.session_state.modeling_agent, agent_name)
if hasattr(agent, 'llm'):
agent.llm = st.session_state.modeling_agent.llm
st.success("API settings updated successfully!")
return True
except Exception as e:
st.error(f"Failed to update API settings: {e}")
import traceback
st.error(traceback.format_exc())
return False
def get_navigatable_stages():
"""Determines which stages can be navigated to (unlocked) based on agent's step dependencies."""
# If no agent or no stages, return empty list
if not st.session_state.agent_initialized or not st.session_state.stages:
return []
navigatable = []
if st.session_state.modeling_agent:
# Get completed steps from agent
completed_steps = st.session_state.modeling_agent.get_completed_steps()
# Get dependencies between steps from agent
dependencies = st.session_state.modeling_agent._define_dependencies()
# A stage is navigatable if all its dependencies are completed
for stage_key in st.session_state.stages.keys():
# If stage is already completed, it's navigatable
if stage_key in completed_steps:
navigatable.append(stage_key)
continue
# Check if all dependencies are completed
deps = dependencies.get(stage_key, [])
if all(dep in completed_steps for dep in deps):
navigatable.append(stage_key)
else:
# Strict sequential unlocking - only allow completed stages and the next one
chapter_keys = list(st.session_state.stages.keys())
# Always allow navigation to completed stages
for i, key in enumerate(chapter_keys):
if st.session_state.stages[key]['status'] == 'completed':
navigatable.append(key)
# Find the first non-completed stage (if any)
next_stage = None
for key in chapter_keys:
if st.session_state.stages[key]['status'] != 'completed':
next_stage = key
break
# Add the next non-completed stage to navigatable list (if found)
if next_stage:
navigatable.append(next_stage)
# Ensure the first stage is always navigatable
if not navigatable and chapter_keys:
navigatable.append(chapter_keys[0])
return navigatable
def get_stage_status_icon(status):
"""Returns an icon based on stage status."""
if status == "completed":
return "✅"
elif status == "in_progress":
return "⏳"
else: # not_started
return "📄" # Use a neutral icon for not started but accessible
def get_stage_display_status(key, navigatable_stages):
""" Gets status or locked state """
if key not in st.session_state.stages:
return "locked" # Should not happen with current logic, but safeguard
if key in navigatable_stages:
return st.session_state.stages[key]["status"]
else:
return "locked"
def generate_markdown_export():
"""Concatenates all stage content into a single markdown string."""
# If we have a modeling agent, use its paper content
if st.session_state.agent_initialized and st.session_state.modeling_agent:
paper = st.session_state.modeling_agent.get_paper()
with open('paper.json', 'w') as f:
json.dump(paper, f, indent=2)
full_doc = f"# {st.session_state.current_problem_title}\n\n"
full_doc += f"## Problem Description\n\n{st.session_state.current_problem_details}\n\n---\n\n"
# Add problem analysis if available
if 'problem_analysis' in paper:
full_doc += f"# Problem Analysis\n\n{paper['problem_analysis']}\n\n---\n\n"
# Add high-level modeling if available
if 'high_level_modeling' in paper:
full_doc += f"# High-Level Modeling\n\n{paper['high_level_modeling']}\n\n---\n\n"
# Add task decomposition if available
if 'task_decomposition_summary' in paper:
full_doc += f"# Task Decomposition\n\n{paper['task_decomposition_summary']}\n\n---\n\n"
# Add task dependency analysis if available
if 'task_dependency_analysis' in paper:
full_doc += f"# Task Dependency Analysis\n\n"
if isinstance(paper['task_dependency_analysis'], list):
for i, analysis in enumerate(paper['task_dependency_analysis']):
full_doc += f"## Task {i+1} Dependencies\n{analysis}\n\n"
else:
full_doc += f"{paper['task_dependency_analysis']}\n\n"
full_doc += "---\n\n"
# Add all tasks
for i, task in enumerate(paper.get('tasks', [])):
if task: # Only include non-empty task dictionaries
task_id = i + 1
full_doc += f"# Task {task_id}\n\n"
# Add task components in a logical order
components_order = [
'task_description',
'task_analysis',
'preliminary_formulas',
'mathematical_modeling_process',
'solution_interpretation',
'subtask_outcome_analysis',
'charts'
]
for component in components_order:
if component in task:
component_title = component.replace('_', ' ').title()
content = task[component]
if component == 'charts' and isinstance(content, list):
full_doc += f"## {component_title}\n\n"
for j, chart in enumerate(content):
full_doc += f"### Chart {j+1}\n{chart}\n\n"
else:
full_doc += f"## {component_title}\n\n{content}\n\n"
full_doc += "---\n\n"
return full_doc
else:
# Fall back to the original implementation
full_doc = f"# {st.session_state.current_problem_title}\n\n"
full_doc += f"## Problem Description\n\n{st.session_state.current_problem_details}\n\n---\n\n"
for key, data in st.session_state.stages.items():
# Include content of all stages, even if not started
full_doc += data["content"] + "\n\n---\n\n"
return full_doc
# Instead of static predefinition, we'll create a function to build the chapter structure dynamically
def get_chapter_structure():
"""Dynamically build the chapter structure using ModelingAgentSystem interfaces."""
if st.session_state.agent_initialized and st.session_state.modeling_agent:
# Get all planned steps (complete structure)
planned_steps = st.session_state.modeling_agent.get_planned_steps()
# Build chapter structure
chapter_structure = OrderedDict()
for step_name in planned_steps:
# Extract title and create a prompt hint
title = step_name
prompt_hint = f"Complete the {step_name} section. {stages_config.get(step_name, '')}"
# For task steps, create more specific prompts
if GenerationStep.is_task_step(step_name):
task_id, step_type = GenerationStep.parse_task_step(step_name)
if task_id and step_type:
prompt_hint = f"Complete the {step_type} for Task {task_id}."
chapter_structure[step_name] = {
"title": title,
"prompt_hint": prompt_hint
}
return chapter_structure
else:
# Fallback to the original definition if modeling_agent is not available
return OrderedDict(config['stages'])
def sync_stages_with_agent():
"""Synchronizes session stages with modeling agent progress"""
if not st.session_state.agent_initialized or not st.session_state.modeling_agent:
return
# Get agent's current state
completed_steps = st.session_state.modeling_agent.get_completed_steps()
planned_steps = st.session_state.modeling_agent.get_planned_steps()
paper = st.session_state.modeling_agent.get_paper()
# Get current chapter structure
chapter_structure = get_chapter_structure()
# First, update stage structure - add any new steps
for step_name in planned_steps:
if step_name not in st.session_state.stages:
prompt_hint = chapter_structure.get(step_name, {}).get('prompt_hint', f"Complete the {step_name} section. {stages_config.get(step_name, '')}")
st.session_state.stages[step_name] = {
"title": step_name,
"content": f"# {step_name}\n\n*(Content not generated yet. Use the '✨ Generate Content' button and '✏️ Edit Content' after generation.)*",
"status": "completed" if step_name in completed_steps else "not_started",
"edit_mode": False,
"prompt_hint": prompt_hint
}
# Update stage statuses
for step_name in st.session_state.stages:
if step_name in completed_steps:
st.session_state.stages[step_name]["status"] = "completed"
# Update content based on what's in the paper
# Sync Problem Background and Problem Requirement from paper
if 'problem_background' in paper and 'Problem Background' in st.session_state.stages:
st.session_state.stages['Problem Background']['content'] = f"# Problem Background\n\n{paper['problem_background']}"
if 'problem_requirement' in paper and 'Problem Requirement' in st.session_state.stages:
st.session_state.stages['Problem Requirement']['content'] = f"# Problem Requirement\n\n{paper['problem_requirement']}"
# Update Problem Analysis content
if 'problem_analysis' in paper and 'Problem Analysis' in st.session_state.stages:
st.session_state.stages['Problem Analysis']['content'] = f"# Problem Analysis\n\n{paper['problem_analysis']}"
# Update High-Level Modeling content
if 'high_level_modeling' in paper and 'High-Level Modeling' in st.session_state.stages:
st.session_state.stages['High-Level Modeling']['content'] = f"# High-Level Modeling\n\n{paper['high_level_modeling']}"
# Update Task Decomposition content
if 'task_decomposition_summary' in paper and 'Task Decomposition' in st.session_state.stages:
st.session_state.stages['Task Decomposition']['content'] = f"# Task Decomposition\n\n{paper['task_decomposition_summary']}"
# Update Task Dependency Analysis content
if 'task_dependency_analysis' in paper and 'Dependency Analysis' in st.session_state.stages:
dependency_content = "# Task Dependency Analysis\n\n"
if isinstance(paper['task_dependency_analysis'], list):
for i, analysis in enumerate(paper['task_dependency_analysis']):
dependency_content += f"## Task {i+1} Dependencies\n{analysis}\n\n"
else:
dependency_content += str(paper['task_dependency_analysis'])
st.session_state.stages['Dependency Analysis']['content'] = dependency_content
# Update task-specific contents
if 'tasks' in paper:
for task_index, task_dict in enumerate(paper['tasks']):
if not task_dict: # Skip empty task dictionaries
continue
task_id = task_index + 1 # 1-based ID for display
# Map each task component to the corresponding step
component_to_step = {
'task_description': f'Task {task_id} Description',
'task_analysis': f'Task {task_id} Analysis',
'preliminary_formulas': f'Task {task_id} Preliminary Formulas',
'mathematical_modeling_process': f'Task {task_id} Mathematical Modeling Process',
'task_code': f'Task {task_id} Code',
'solution_interpretation': f'Task {task_id} Solution Interpretation',
'subtask_outcome_analysis': f'Task {task_id} Subtask Outcome Analysis',
'charts': f'Task {task_id} Charts'
}
# Update each component if it exists
for component, step_name in component_to_step.items():
if component in task_dict and step_name in st.session_state.stages:
content = task_dict[component]
# Format content based on component type
if component == 'charts':
formatted_content = f"# Charts for Task {task_id}\n\n"
if isinstance(content, list):
for i, chart in enumerate(content):
formatted_content += f"## Chart {i+1}\n{chart}\n\n"
else:
formatted_content += str(content)
else:
# Default formatting
formatted_content = f"# {step_name}\n\n{content}"
st.session_state.stages[step_name]['content'] = formatted_content
def _handle_content_edit(active_stage_key, new_content):
"""将用户编辑的内容同步回 ModelingAgentSystem"""
if not st.session_state.agent_initialized or not st.session_state.modeling_agent:
return
# 更新 session_state 中的内容
st.session_state.stages[active_stage_key]['content'] = new_content
# 根据步骤类型更新 agent 的 paper 字典
agent = st.session_state.modeling_agent
paper = agent.paper
if active_stage_key == 'Problem Analysis':
paper['problem_analysis'] = new_content.replace('# Problem Analysis\n\n', '')
elif active_stage_key == 'High-Level Modeling':
paper['high_level_modeling'] = new_content.replace('# High-Level Modeling\n\n', '')
elif active_stage_key == 'Task Decomposition':
paper['task_decomposition_summary'] = new_content.replace('# Task Decomposition\n\n', '')
elif active_stage_key == 'Dependency Analysis':
# 可能需要特殊处理,因为这可能是一个结构化的内容
clean_content = new_content.replace('# Task Dependency Analysis\n\n', '')
paper['task_dependency_analysis'] = clean_content
elif active_stage_key.startswith('Task '):
# 解析任务 ID 和步骤类型
match = re.match(r"Task (\d+) (.*)", active_stage_key)
if match:
task_id = int(match.group(1))
step_type = match.group(2)
task_index = task_id - 1
# 确保 task 列表足够长
while len(paper['tasks']) <= task_index:
paper['tasks'].append({})
# 映射步骤类型到 paper 中的键
step_to_key = {
'Description': 'task_description',
'Analysis': 'task_analysis',
'Preliminary Formulas': 'preliminary_formulas',
'Mathematical Modeling Process': 'mathematical_modeling_process',
'Solution Interpretation': 'solution_interpretation',
'Subtask Outcome Analysis': 'subtask_outcome_analysis',
'Charts': 'charts'
}
if step_type in step_to_key:
key = step_to_key[step_type]
clean_content = new_content.replace(f'# {active_stage_key}\n\n', '')
paper['tasks'][task_index][key] = clean_content
# 同时更新协调器的内存,如果适用
if step_type != 'Charts': # Charts 可能有特殊格式
agent.coordinator.memory.setdefault(str(task_id), {})[key] = clean_content
# --- Main App Logic ---
initialize_session_state()
# Only show title and caption on the initial page
if not st.session_state.problem_defined:
st.title(f"{APP_TITLE}") # Added a bit of flair
st.caption("An AI-assisted platform for structuring and drafting mathematical modeling reports.")
# --- Sidebar ---
with st.sidebar:
st.header("⚙️ Configuration")
# Use secrets if available, otherwise show inputs
api_base_provided = bool(st.session_state.api_base_url)
api_key_provided = bool(st.session_state.api_key)
api_model_provided = bool(st.session_state.api_model)
with st.expander("LLM API Details", expanded=not (api_base_provided and api_key_provided and api_model_provided)):
# Sync with main content if those fields exist
if 'api_base_url_main' in st.session_state:
st.session_state.api_base_url = st.session_state.api_base_url_main
if 'api_key_main' in st.session_state:
st.session_state.api_key = st.session_state.api_key_main
if 'api_model_main' in st.session_state:
st.session_state.api_model = st.session_state.api_model_main
st.text_input(
"API Base URL",
value=st.session_state.api_base_url,
key="api_base_url",
placeholder="e.g., https://api.openai.com/v1",
help="Your OpenAI compatible API endpoint."
)
st.text_input(
"API Key",
value=st.session_state.api_key,
key="api_key",
type="password",
help="Your OpenAI compatible API key. Can also be set via Streamlit secrets (API_KEY)."
)
st.text_input(
"Model Name",
value=st.session_state.api_model,
key="api_model",
placeholder="e.g., gpt-4-turbo",
help="The specific model to use for generation."
)
if st.button("Save", key="save_api_settings", type="secondary", use_container_width=True):
st.session_state.api_base_url_main = st.session_state.api_base_url
st.session_state.api_key_main = st.session_state.api_key
st.session_state.api_model_main = st.session_state.api_model
update_api_settings()
st.divider()
# Requirement 1: Put Problem Definition in a controllable expander
st.header("🔍 Problem Definition")
with st.expander("Problem Background & Requirements", expanded=st.session_state.problem_definition_expanded):
# Check if API keys are provided before allowing problem definition
api_configured = bool(st.session_state.api_base_url and st.session_state.api_key and st.session_state.api_model)
if not api_configured:
st.warning("Please provide valid API Base URL, API Key, and Model Name in the configuration above to define a problem.")
# Don't st.stop() here, allow viewing config expander
else:
problem_input_method = st.radio(
"Select Problem Source:",
["Select Predefined", "Input Custom"],
key="problem_input_method",
horizontal=True,
# help="Choose a built-in MCM/ICM problem or define your own."
)
confirm_problem = False
if st.session_state.problem_input_method == "Select Predefined":
st.selectbox(
"Choose MCM/ICM Problem:",
options=list(MCM_PROBLEMS.keys()),
format_func=lambda x: f"MCM_{x}", # Show full title
key="selected_mcm_problem"
)
if st.button("Load Problem", type="primary", key="load_predefined", use_container_width=True):
confirm_problem = True
problem_key = st.session_state.selected_mcm_problem
st.session_state.current_problem_title = problem_key # MCM_PROBLEMS[problem_key]['title']
st.session_state.current_problem_details = f"**Background:**\n{MCM_PROBLEMS[problem_key]['background']}\n\n**Requirements:**\n{MCM_PROBLEMS[problem_key]['problem_requirement']}"
else: # Input Custom
# Removing the custom title input field
# st.text_input("Custom Problem Title:", key="custom_title", placeholder="Enter a short title for your problem")
st.text_area("Problem Background:", key="custom_background", height=150, placeholder="Provide context and background information.")
st.text_area("Problem Requirements:", key="custom_requirements", height=100, placeholder="Detail the specific tasks or questions to be addressed.")
# Add file upload functionality
st.subheader("Upload Data Files")
uploaded_files = st.file_uploader(
"Upload CSV or Excel files for your problem (optional)",
type=["csv", "xlsx", "xls"],
accept_multiple_files=True,
help="Data files will be available for the modeling agent to use."
)
# Process uploaded files
if uploaded_files and len(uploaded_files) > 0:
# Clear previous uploads if new files are uploaded
if "last_upload_count" not in st.session_state or st.session_state.last_upload_count != len(uploaded_files):
st.session_state.uploaded_files = []
st.session_state.last_upload_count = len(uploaded_files)
# Save uploaded files
for uploaded_file in uploaded_files:
# Check if file was already processed
file_already_processed = any(info['name'] == uploaded_file.name for info in st.session_state.uploaded_files)
if not file_already_processed:
# Create directory for files if it doesn't exist
file_dir = Path('data') / st.session_state.session_id / 'data'
file_dir.mkdir(parents=True, exist_ok=True)
# Save file to disk
file_path = file_dir / uploaded_file.name
with open(file_path, "wb") as f:
f.write(uploaded_file.getbuffer())
# Try to read file to get preview
preview = None
if uploaded_file.name.endswith(('.csv', '.xlsx', '.xls')):
try:
if uploaded_file.name.endswith('.csv'):
df = pd.read_csv(file_path)
else:
df = pd.read_excel(file_path)
preview = df.head(5)
except Exception as e:
preview = f"Error reading file: {str(e)}"
# Add file info to session state
st.session_state.uploaded_files.append({
'name': uploaded_file.name,
'path': str(file_path.absolute()),
'preview': preview
})
# Display uploaded files
if st.session_state.uploaded_files:
st.success(f"{len(st.session_state.uploaded_files)} file(s) uploaded successfully")
for file_info in st.session_state.uploaded_files:
# Replace nested expander with a container and bolded title
with st.container(border=True):
st.markdown(f"**📄 {file_info['name']}**")
if isinstance(file_info['preview'], pd.DataFrame):
st.dataframe(file_info['preview'])
else:
st.write(file_info['preview'])
if st.button("Set Custom Problem", type="primary", key="load_custom", use_container_width=True):
if st.session_state.custom_background and st.session_state.custom_requirements:
confirm_problem = True
# Use "Custom Problem" as the default title instead of custom_title
st.session_state.current_problem_title = "Custom Problem"
st.session_state.current_problem_details = f"**Background:**\n{st.session_state.custom_background}\n\n**Requirements:**\n{st.session_state.custom_requirements}"
else:
st.warning("Please provide background and requirements for the custom problem.")
# Handle problem confirmation and stage reset
if confirm_problem:
if st.session_state.problem_defined: # If a problem was already defined, show info about reset
st.toast("Reloading problem: Existing stage content and progress will be reset.")
time.sleep(1) # Give user time to see toast
st.session_state.problem_defined = True
st.session_state.problem_definition_expanded = False # Requirement 1: Collapse expander
reset_stages()
st.rerun() # Rerun to update sidebar navigation and main content area
# --- Stage Navigation (Displayed only if a problem is defined) ---
if st.session_state.problem_defined:
st.divider()
st.header("📚 Stages")
navigatable_stages = get_navigatable_stages()
# Ensure the current active stage is valid
if st.session_state.active_stage not in navigatable_stages:
# If current active is somehow locked (e.g., after loading a new problem),
# default to the last navigatable one.
if navigatable_stages:
st.session_state.active_stage = navigatable_stages[-1]
else: # Should not happen, but fallback to first stage
# st.session_state.active_stage = list(st.session_state.stages.keys())[0]
pass
# Requirement 2 & 3: Use buttons for navigation, disable locked stages
# Using a container to apply styles easier via CSS selector
with st.container(border=False):
st.markdown('<div data-testid="stSidebarNavItems">', unsafe_allow_html=True) # Wrapper for CSS targeting
for key, data in st.session_state.stages.items():
stage_info = st.session_state.stages.get(key)
if stage_info:
is_navigatable = key in navigatable_stages
is_active = key == st.session_state.active_stage
display_status = get_stage_display_status(key, navigatable_stages)
if display_status == "locked":
icon = "🔒"
label_markdown = f"{icon}  {stage_info['title']}" # Add non-breaking spaces
else:
icon = get_stage_status_icon(display_status)
label_markdown = f"{icon}  {stage_info['title']}"
# Use markdown in button label to render icons correctly
button_label_html = f'<div style="display: flex; align-items: center;">{label_markdown}</div>'
# Set button type: primary if active, secondary otherwise
button_type = "primary" if is_active else "secondary"
if st.button(
label=label_markdown, # Use markdown directly
key=f"nav_{key}",
disabled=not is_navigatable,
use_container_width=True,
type=button_type,
help=f"Status: {display_status.replace('_', ' ').title()}" if is_navigatable else "Complete previous stages to unlock"
):
if is_navigatable and not is_active:
st.session_state.active_stage = key
# Turn off edit mode when switching stages
if st.session_state.stages[key].get('edit_mode', False):
st.session_state.stages[key]['edit_mode'] = False
st.rerun()
st.markdown('</div>', unsafe_allow_html=True) # Close wrapper
st.divider()
st.header("📄 Solution Report")
if st.session_state.stages:
markdown_content = generate_markdown_export()
st.download_button(
label="📥 Export Intermediate Process (.md)",
data=markdown_content,
file_name=f"mapp_export_{st.session_state.current_problem_title.replace(' ', '_').lower()}.md",
mime="text/markdown",
use_container_width=True
)
# 检查是否所有章节都已完成
all_completed = all(stage_data["status"] == "completed" for stage_data in st.session_state.stages.values())
# 添加导出完整报告按钮(只有在所有章节完成后才能点击)
if st.button(
"📊 Export Solution Report (.latex & .pdf)",
disabled=not all_completed,
use_container_width=True,
help="Generate and download a complete LaTeX and PDF report (available after all stages are completed)"
):
if st.session_state.agent_initialized and st.session_state.modeling_agent:
with st.spinner("Generating LaTeX and PDF solution report... This may take a few minutes."):
try:
# 获取输出路径
output_path = str(Path('data') / st.session_state.session_id / 'output')
# 调用agent生成LaTeX和PDF
st.session_state.modeling_agent.generate_paper(output_path)
# 存储文件路径到session_state,使按钮可以在后续渲染中保持显示
st.session_state.latex_path = f'{output_path}/latex/solution.tex'
st.session_state.pdf_path = f'{output_path}/latex/solution.pdf'
except Exception as e:
st.error(f"Error generating report: {str(e)}")
import traceback
st.error(traceback.format_exc())
st.rerun() # 刷新页面以显示下载按钮
else:
st.error("ModelingAgentSystem not initialized. Please check API configuration.")
# 检查session_state中是否有生成的文件路径,并显示对应的下载按钮
if hasattr(st.session_state, 'latex_path') and Path(st.session_state.latex_path).exists():
with open(st.session_state.latex_path, "rb") as f:
st.download_button(
label="📥 Download LaTeX (.tex)",
data=f,
file_name="solution.tex",
mime="application/x-tex",
key="download_latex" # 添加唯一key
)
if hasattr(st.session_state, 'pdf_path') and Path(st.session_state.pdf_path).exists():
with open(st.session_state.pdf_path, "rb") as f:
st.download_button(
label="📥 Download PDF Report",
data=f,
file_name="solution.pdf",
mime="application/pdf",
key="download_pdf" # 添加唯一key
)
else:
st.info("Define a problem and generate content to enable export.")
# --- Main Content Area ---
if not st.session_state.problem_defined:
# Enhanced initial message
st.info("⬅️ Welcome to Mathematical Modeling Agent! Please configure your API details and define a modeling problem using the sidebar to begin.")
st.markdown("---")
st.subheader("How it works:")
st.markdown("""
1. **Configure API:** Enter your OpenAI compatible API details in the sidebar. These can also be set via Streamlit secrets (`API_BASE_URL`, `API_KEY`).
2. **Define Problem:** Choose a predefined problem or input your own custom problem description and requirements.
3. **Navigate Stages:** Use the sidebar to move through the standard sections of a modeling report. Stages unlock as you mark previous ones complete.
4. **Generate & Edit:** For each stage, you can:
* Use the **✨ Generate Content** button (with optional instructions) to get an initial draft from the AI (mock generation in this version).
* **✏️ Edit Content** after generation.
* Mark stages as **✅ Complete** to unlock the next one.
5. **Export:** Download your progress as a Markdown file at any time, or export your complete solution as both LaTeX and PDF files when finished.
""")
else:
active_stage_key = st.session_state.active_stage
stage_data = st.session_state.stages[active_stage_key]
# Display Stage Title and Goal
st.header(f"{stage_data['title']}")
st.markdown(f"> **Goal:** *{stage_data['prompt_hint']}*")
st.divider() # Requirement 3: Use dividers for better separation
# --- AI Generation Controls ---
# Requirement 3: Optimize UI layout
with st.container(border=True): # Put generation controls in a bordered container
st.subheader("🚀 Agent Content Generation")
col1, col2 = st.columns([3, 1])
with col1:
user_prompt = st.text_area(
"Instructions / Prompt Refinement:",
key=f"prompt_{active_stage_key}",
placeholder="Optional: Provide specific instructions, focus points, or data for the Agent to use in this stage.",
help="Guide the AI generation for this specific stage.",
height=100
)
with col2:
st.session_state.critique_rounds = st.slider(
"Critic Rounds", 0, 3, st.session_state.critique_rounds,
help="Simulated self-critique iterations for the AI (0-3). More rounds might improve quality but take longer (mock only).",
key=f"critique_{active_stage_key}" # Unique key per stage
)
if st.button("✨ Generate Content", key=f"generate_{active_stage_key}", type="primary", use_container_width=True):
if not st.session_state.agent_initialized or st.session_state.modeling_agent is None:
st.error("ModelingAgentSystem not initialized. Please check API configuration.")
else:
with st.spinner(f"🤖 Generating content for '{active_stage_key}'... Please wait."):
try:
# Update the critique rounds in case they changed
st.session_state.modeling_agent.config['problem_analysis_round'] = st.session_state.critique_rounds
st.session_state.modeling_agent.config['problem_modeling_round'] = st.session_state.critique_rounds
st.session_state.modeling_agent.config['task_formulas_round'] = st.session_state.critique_rounds
# Call the agent's generate_step method
if user_prompt:
print(user_prompt, st.session_state.critique_rounds)
success = st.session_state.modeling_agent.generate_step(active_stage_key, user_prompt=user_prompt, round=st.session_state.critique_rounds)
if success:
# Sync stages with the updated agent state
sync_stages_with_agent()
# Update the stage status
if st.session_state.stages[active_stage_key]['status'] == 'not_started':
st.session_state.stages[active_stage_key]['status'] = 'in_progress'
st.success(f"Successfully generated content for '{active_stage_key}'!")
# Check if we should advance to next stage
completed_steps = st.session_state.modeling_agent.get_completed_steps()
if active_stage_key in completed_steps:
st.session_state.stages[active_stage_key]['status'] = 'completed'
else:
st.error(f"Could not generate content for '{active_stage_key}'. Check dependencies or retry.")
except Exception as e:
st.error(f"Error generating content: {str(e)}")
import traceback
st.error(traceback.format_exc())
st.rerun() # Update display immediately
st.divider() # Requirement 3: Use dividers
# --- Content Display and Editing ---
# st.subheader("Stage Content")
edit_mode = st.session_state.stages[active_stage_key].get('edit_mode', False)
content = st.session_state.stages[active_stage_key]['content']
is_completed = st.session_state.stages[active_stage_key]['status'] == 'completed'
if edit_mode:
# --- Edit Mode ---
st.info("✍️ You are in Edit Mode. Use Markdown syntax. Save or Cancel when done.")
new_content = st.text_area(
"Edit Content (Markdown Supported)",
value=content,
key=f"editor_{active_stage_key}",
height=500, # Increased height for editing
label_visibility="collapsed"
)
col_save, col_cancel, _ = st.columns([1, 1, 4]) # Keep button layout simple
with col_save:
if st.button("💾 Save Changes", key=f"save_{active_stage_key}", type="primary"):
_handle_content_edit(active_stage_key, new_content)
st.session_state.stages[active_stage_key]['edit_mode'] = False
# If content is saved and stage was 'not_started', move to 'in_progress'
if st.session_state.stages[active_stage_key]['status'] == 'not_started':
st.session_state.stages[active_stage_key]['status'] = 'in_progress'
st.toast("Changes saved!", icon="💾")
st.rerun()
with col_cancel:
if st.button("❌ Cancel Edit", key=f"cancel_{active_stage_key}"):
st.session_state.stages[active_stage_key]['edit_mode'] = False
st.rerun()
else:
# --- View Mode ---
with st.container(border=True): # Put content in a bordered container for visual grouping
st.markdown(content, unsafe_allow_html=True) # Render the markdown content
# st.markdown("---") # Separator before action buttons
# Action buttons layout - Requirement 3: Optimize UI layout
cols = st.columns(3)
with cols[0]:
if st.button("✏️ Edit Content", key=f"edit_{active_stage_key}", use_container_width=True, disabled=edit_mode):
st.session_state.stages[active_stage_key]['edit_mode'] = True
st.rerun()
with cols[1]:
# Allow marking as complete only if not already completed
if not is_completed:
if st.button("✅ Mark as Complete", key=f"complete_{active_stage_key}", use_container_width=True):
st.session_state.stages[active_stage_key]['status'] = 'completed'
st.toast(f"Stage '{stage_data['title']}' marked complete!", icon="✅")
# Try to advance to the next stage automatically
stage_keys = list(st.session_state.stages.keys())
current_index = stage_keys.index(active_stage_key)
if current_index + 1 < len(stage_keys):
next_stage_key = stage_keys[current_index + 1]
# Unlock the next stage by setting its status to in_progress if not started
if st.session_state.stages[next_stage_key]['status'] == 'not_started':
st.session_state.stages[next_stage_key]['status'] = 'in_progress'
st.session_state.active_stage = next_stage_key # Move focus
else:
st.success("🎉 All stages completed!") # Optional: Message when last stage is done
st.rerun() # Rerun to update sidebar icons/state and main view
else:
# 如果已经完成,显示一个可点击的"再次完成"按钮
if st.button("✅ Completed (Click to advance)", key=f"completed_{active_stage_key}", use_container_width=True):
# 尝试前进到下一个阶段
stage_keys = list(st.session_state.stages.keys())
current_index = stage_keys.index(active_stage_key)
if current_index + 1 < len(stage_keys):
next_stage_key = stage_keys[current_index + 1]
# 如果下一阶段未开始,设置为进行中
if st.session_state.stages[next_stage_key]['status'] == 'not_started':
st.session_state.stages[next_stage_key]['status'] = 'in_progress'
st.session_state.active_stage = next_stage_key # 移动焦点
st.rerun() # 重新运行以更新侧边栏图标/状态和主视图
else:
st.success("🎉 All stages completed!") # 可选:完成最后一个阶段时的消息
# Placeholder for potential future actions in the third column
# with cols[2]:
# st.button("Other Action?", use_container_width=True)
# --- Footer ---
st.markdown("---")
st.caption("Mathematical Modeling Multi-Agent System | Prototype")
# 在重要操作后添加同步调用
def on_page_load():
"""页面加载时同步代理状态"""
if st.session_state.agent_initialized and st.session_state.modeling_agent:
sync_stages_with_agent()
# 在 app.py 主循环的开始处调用
if st.session_state.problem_defined and st.session_state.agent_initialized:
on_page_load()