| |
| from __future__ import annotations |
| import os |
| import sys |
| import json |
| import shutil |
| import subprocess |
| import platform |
| import getpass |
| import datetime |
| from typing import Dict, Any, Optional, Sequence |
|
|
| |
| try: |
| import psutil |
| except Exception: |
| psutil = None |
|
|
|
|
| def get_env_vars() -> Dict[str, str]: |
| """Return all environment variables as a dict.""" |
| |
| return dict(os.environ) |
|
|
|
|
| def _shorten_text(s: str, max_lines: int = 5, max_chars: int = 2000) -> str: |
| """Trim text to the given number of lines and characters to keep output small.""" |
| out = "\n".join(s.splitlines()[:max_lines]) |
| if len(out) > max_chars: |
| out = out[: max_chars - 3] + "..." |
| return out |
|
|
|
|
| def safe_run_version(cmd: Sequence[str], timeout: float = 3.0) -> Dict[str, Any]: |
| """Try to run `cmd` and capture stdout/stderr (combined).""" |
| try: |
| p = subprocess.run( |
| list(cmd), |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| timeout=timeout, |
| check=False, |
| ) |
| out = p.stdout.decode(errors="replace").strip() |
| short_out = _shorten_text(out) |
| return {"ok": True, "rc": p.returncode, "output": short_out} |
| except FileNotFoundError: |
| return {"ok": False, "error": "not found"} |
| except subprocess.TimeoutExpired: |
| return {"ok": False, "error": "timeout"} |
| except Exception as e: |
| return {"ok": False, "error": str(e)} |
|
|
|
|
| |
| TOOL_VERSION_FLAGS = ("--version", "-V", "-v", "--help") |
|
|
|
|
| def check_tools(tools: Sequence[str]) -> Dict[str, Dict[str, Any]]: |
| """Check whether tools are callable (in PATH) and try to get version/help output.""" |
| results: Dict[str, Dict[str, Any]] = {} |
| for t in tools: |
| found_path: Optional[str] = shutil.which(t) |
| info: Dict[str, Any] = {"path": found_path, "callable": bool(found_path)} |
| |
| cmd_base = found_path or t |
| |
| for flag in TOOL_VERSION_FLAGS: |
| res = safe_run_version([cmd_base, flag]) |
| if res.get("ok") and res.get("output"): |
| info["version_cmd"] = flag |
| info["version_output"] = res.get("output") |
| break |
| else: |
| |
| res = safe_run_version([cmd_base]) |
| if res.get("ok") and res.get("output"): |
| info["version_cmd"] = None |
| info["version_output"] = res.get("output") |
| else: |
| |
| info["version_error"] = res.get("error") |
| |
| if res.get("error") == "not found": |
| info["callable"] = False |
| results[t] = info |
| return results |
|
|
|
|
| def _safe_get_user() -> Optional[str]: |
| try: |
| return getpass.getuser() |
| except Exception: |
| |
| return os.environ.get("USER") or os.environ.get("USERNAME") |
|
|
|
|
| def get_process_info() -> Dict[str, Any]: |
| """Collect basic process information. If psutil is available, include richer info.""" |
| info: Dict[str, Any] = {} |
| info["platform"] = platform.system() |
| info["platform_release"] = platform.release() |
| info["platform_version"] = platform.version() |
| info["machine"] = platform.machine() |
| info["processor"] = platform.processor() |
| info["python_version"] = platform.python_version() |
| info["python_executable"] = sys.executable |
| info["argv"] = sys.argv |
| try: |
| info["cwd"] = os.getcwd() |
| except Exception: |
| info["cwd"] = None |
| info["pid"] = os.getpid() |
| try: |
| info["ppid"] = os.getppid() |
| except Exception: |
| info["ppid"] = None |
| info["user"] = _safe_get_user() |
|
|
| |
| if hasattr(os, "getuid"): |
| try: |
| info["uid"] = os.getuid() |
| info["gid"] = os.getgid() |
| if hasattr(os, "geteuid"): |
| info["euid"] = os.geteuid() |
| if hasattr(os, "getegid"): |
| info["egid"] = os.getegid() |
| except Exception: |
| pass |
|
|
| |
| if psutil is not None: |
| try: |
| p = psutil.Process(info["pid"]) |
| with p.oneshot(): |
| try: |
| info["exe"] = p.exe() |
| except Exception: |
| info["exe"] = sys.executable |
| try: |
| info["cmdline"] = p.cmdline() |
| except Exception: |
| info["cmdline"] = sys.argv |
| try: |
| ct = p.create_time() |
| info["create_time"] = datetime.datetime.fromtimestamp(ct).isoformat() |
| except Exception: |
| info["create_time"] = None |
| try: |
| info["cpu_percent"] = p.cpu_percent(interval=0.1) |
| except Exception: |
| info["cpu_percent"] = None |
| try: |
| mem = p.memory_info() |
| info["memory_rss"] = getattr(mem, "rss", None) |
| info["memory_vms"] = getattr(mem, "vms", None) |
| except Exception: |
| info["memory_rss"] = None |
| info["memory_vms"] = None |
| try: |
| info["num_threads"] = p.num_threads() |
| except Exception: |
| info["num_threads"] = None |
| try: |
| info["open_files"] = [f.path for f in p.open_files()] |
| except Exception: |
| info["open_files"] = None |
| |
| try: |
| |
| conns = getattr(p, "connections", None) |
| if callable(conns): |
| conns_list = p.connections() |
| else: |
| net_conns = getattr(p, "net_connections", None) |
| if callable(net_conns): |
| conns_list = p.net_connections() |
| else: |
| conns_list = None |
| if conns_list is None: |
| info["connections"] = None |
| else: |
| info["connections"] = [str(c) for c in conns_list] |
| except Exception: |
| info["connections"] = None |
| except Exception: |
| |
| pass |
| else: |
| |
| info["exe"] = sys.executable |
| info["cmdline"] = sys.argv |
| info["create_time"] = None |
| return info |
|
|
|
|
| def main() -> int: |
| try: |
| env = get_env_vars() |
| process = get_process_info() |
| tools = check_tools(["curl", "wget"]) |
| result = {"env": env, "process": process, "tools": tools} |
| |
| out = json.dumps(result, indent=2, ensure_ascii=False, sort_keys=False) |
| print(out, flush=True) |
| return 0 |
| except Exception as e: |
| |
| try: |
| print(json.dumps({"error": str(e)}), flush=True) |
| except Exception: |
| print(f"error: {e}", flush=True) |
| return 2 |
|
|
|
|
| main() |