| import sys |
| import io |
| import multiprocessing |
| import contextlib |
| import signal |
| import subprocess |
| import tempfile |
| import shutil |
|
|
| |
| class TimeoutException(Exception): pass |
|
|
| def timeout_handler(signum, frame): |
| raise TimeoutException |
|
|
| def _exec_code(code_str, test_code, entry_point, result_queue): |
| """ |
| 运行生成的代码 + 测试用例 |
| """ |
| capture = io.StringIO() |
| success = False |
| error_msg = "" |
| |
| try: |
| |
| |
| |
| |
| with contextlib.redirect_stdout(capture), contextlib.redirect_stderr(capture): |
| |
| scope = {} |
| |
| |
| exec(code_str, scope) |
| |
| |
| if entry_point not in scope: |
| raise ValueError(f"Entry point {entry_point} not found in generated code.") |
| |
| |
| |
| |
| full_test_script = code_str + "\n" + test_code + f"\ncheck({entry_point})" |
| |
| exec(full_test_script, scope) |
| |
| success = True |
| |
| except Exception as e: |
| error_msg = str(e) |
| |
| |
| |
| result_queue.put((success, error_msg)) |
|
|
| class SafeSandbox: |
| def __init__(self, timeout=5.0): |
| self.timeout = timeout |
|
|
| def run(self, code, test_code, entry_point): |
| queue = multiprocessing.Queue() |
| p = multiprocessing.Process(target=_exec_code, args=(code, test_code, entry_point, queue)) |
| p.start() |
| p.join(self.timeout) |
|
|
| if p.is_alive(): |
| p.terminate() |
| p.join() |
| return False, "Timeout" |
|
|
| if not queue.empty(): |
| return queue.get() |
| return False, "Unknown Error" |
|
|
| class JavaSandbox: |
| def __init__(self, timeout=5.0): |
| self.timeout = timeout |
| |
| |
| if shutil.which("javac") is None or shutil.which("java") is None: |
| raise RuntimeError("Java environment (jdk) not found. Please install java.") |
|
|
| def run(self, code, test_code, entry_point): |
| """ |
| code: 修复后的 Java 方法代码 |
| test_code: 包含 main 函数的测试类代码,调用 entry_point |
| entry_point: 方法名 (Java 中通常不需要,只要 test_code 写对) |
| """ |
| |
| with tempfile.TemporaryDirectory() as temp_dir: |
| file_name = "Solution.java" |
| file_path = os.path.join(temp_dir, file_name) |
| |
| |
| |
| |
| |
| full_source = f""" |
| public class Solution {{ |
| {code} |
| |
| {test_code} |
| }} |
| """ |
| |
| with open(file_path, "w") as f: |
| f.write(full_source) |
| |
| |
| compile_cmd = ["javac", file_path] |
| try: |
| subprocess.run(compile_cmd, check=True, capture_output=True, timeout=10) |
| except subprocess.CalledProcessError as e: |
| return False, f"Compilation Error: {e.stderr.decode()}" |
| except subprocess.TimeoutExpired: |
| return False, "Compilation Timeout" |
|
|
| |
| run_cmd = ["java", "-cp", temp_dir, "Solution"] |
| try: |
| result = subprocess.run(run_cmd, capture_output=True, timeout=self.timeout) |
| if result.returncode == 0: |
| return True, result.stdout.decode() |
| else: |
| return False, f"Runtime Error: {result.stderr.decode()}" |
| except subprocess.TimeoutExpired: |
| return False, "Runtime Timeout" |
|
|
| |
| if __name__ == "__main__": |
| sandbox = JavaSandbox() |
| |
| |
| code_pass = """ |
| public static int add(int a, int b) { |
| return a + b; |
| } |
| """ |
| test_pass = """ |
| public static void main(String[] args) { |
| if (add(1, 1) == 2) { |
| System.out.println("PASS"); |
| } else { |
| System.exit(1); |
| } |
| } |
| """ |
| print("Test Pass:", sandbox.run(code_pass, test_pass, "add")) |
| |
| |
| code_fail = """ |
| public static int add(int a, int b) { |
| return a * b; // Bug |
| } |
| """ |
| print("Test Fail:", sandbox.run(code_fail, test_pass, "add")) |