|
|
|
|
|
""" |
|
|
BLUX-cA :: Unified CLI Entrypoint |
|
|
Consolidates all CLI functionality into a single entrypoint. |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import argparse |
|
|
import hashlib |
|
|
import json |
|
|
import sys |
|
|
import traceback |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Optional, Any |
|
|
|
|
|
import typer |
|
|
|
|
|
ROOT_DIR = Path(__file__).resolve().parent |
|
|
for path in [ROOT_DIR, ROOT_DIR / "ca"]: |
|
|
path_str = str(path) |
|
|
if path_str not in sys.path: |
|
|
sys.path.insert(0, path_str) |
|
|
|
|
|
from ca.core.audit import AuditLog |
|
|
from ca.core.clarity_engine import ClarityEngine |
|
|
from ca.core.constitution import ConstitutionEngine |
|
|
from ca.core.discernment import DiscernmentCompass |
|
|
from ca.core.perception import PerceptionLayer |
|
|
from ca.core.reflection import ReflectionEngine |
|
|
from ca.adaptors.reg import RegistryValidator, RegistrationResult, Capability |
|
|
from ca.config import load_config |
|
|
from ca.evaluator.probe_runner import PROBE_SUITES, run_probe_evaluation |
|
|
|
|
|
|
|
|
def _hash_text(text: str) -> str: |
|
|
"""Generate SHA256 hash for text.""" |
|
|
return hashlib.sha256(text.encode("utf-8")).hexdigest() |
|
|
|
|
|
|
|
|
def _format_timestamp() -> str: |
|
|
"""Format current timestamp for display.""" |
|
|
return datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
|
|
|
def clarity_interactive_mode(registration_key: Optional[str] = None) -> None: |
|
|
"""Run the interactive clarity engine demo with optional registration.""" |
|
|
engine = ClarityEngine() |
|
|
user_state_token: Optional[Dict] = None |
|
|
validator = RegistryValidator() |
|
|
|
|
|
|
|
|
registration_status: Optional[RegistrationResult] = None |
|
|
if registration_key: |
|
|
registration_status = validator.validate(registration_key) |
|
|
if not registration_status.valid: |
|
|
print(f"Registration failed: {registration_status.reason}") |
|
|
print("Continuing with limited capabilities...") |
|
|
|
|
|
print("=" * 60) |
|
|
print("BLUX-cA :: Clarity Agent Interactive Mode") |
|
|
print(f"Session started: {_format_timestamp()}") |
|
|
|
|
|
if registration_status and registration_status.valid: |
|
|
print(f"Registration: {registration_status.key_type.upper()}") |
|
|
caps = [cap.value for cap in registration_status.capabilities] |
|
|
print(f"Capabilities: {len(caps)} available") |
|
|
else: |
|
|
print("Registration: Limited (unregistered mode)") |
|
|
|
|
|
print("\nCommands:") |
|
|
print(" 'exit' or 'quit' - End session") |
|
|
print(" 'debug' - Toggle debug mode") |
|
|
print(" 'state' - Show current user state") |
|
|
print(" 'capabilities' - List available capabilities") |
|
|
print(" 'help' - Show this help") |
|
|
print(" 'clear' - Clear screen (if supported)") |
|
|
print("-" * 40) |
|
|
|
|
|
debug_mode = False |
|
|
session_messages = [] |
|
|
|
|
|
while True: |
|
|
try: |
|
|
text = input("\n[cA] > ").strip() |
|
|
except (EOFError, KeyboardInterrupt): |
|
|
print("\n\nSession ended by user.") |
|
|
break |
|
|
|
|
|
|
|
|
if text.lower() in {"exit", "quit"}: |
|
|
print("Exiting interactive mode.") |
|
|
print(f"Session summary: {len(session_messages)} messages processed") |
|
|
break |
|
|
|
|
|
if text.lower() == "debug": |
|
|
debug_mode = not debug_mode |
|
|
print(f"Debug mode {'enabled' if debug_mode else 'disabled'}") |
|
|
continue |
|
|
|
|
|
if text.lower() == "state": |
|
|
if user_state_token: |
|
|
print("\n" + "=" * 40) |
|
|
print("Current User State Token:") |
|
|
for key, value in user_state_token.items(): |
|
|
print(f" {key}: {str(value)[:50]}...") |
|
|
print("=" * 40) |
|
|
else: |
|
|
print("No user state token available.") |
|
|
continue |
|
|
|
|
|
if text.lower() == "capabilities": |
|
|
if registration_status and registration_status.valid: |
|
|
print("\nAvailable Capabilities:") |
|
|
for cap in sorted(registration_status.capabilities, key=lambda c: c.value): |
|
|
print(f" • {cap.value}") |
|
|
else: |
|
|
print("No capabilities available in unregistered mode.") |
|
|
continue |
|
|
|
|
|
if text.lower() == "help": |
|
|
print("\nCommands:") |
|
|
print(" 'exit' or 'quit' - End session") |
|
|
print(" 'debug' - Toggle debug mode") |
|
|
print(" 'state' - Show current user state") |
|
|
print(" 'capabilities' - List available capabilities") |
|
|
print(" 'help' - Show this help") |
|
|
print(" 'clear' - Clear screen (if supported)") |
|
|
continue |
|
|
|
|
|
if text.lower() == "clear": |
|
|
print("\n" * 100) |
|
|
continue |
|
|
|
|
|
if not text: |
|
|
continue |
|
|
|
|
|
|
|
|
if registration_status and registration_status.valid: |
|
|
|
|
|
if registration_status.is_expired(): |
|
|
print("Registration has expired. Please renew your key.") |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
shadow_keywords = {"shadow", "dark", "hidden", "repressed", "unconscious"} |
|
|
if any(keyword in text.lower() for keyword in shadow_keywords): |
|
|
if not registration_status.has_capability(Capability.SHADOW_CLARITY): |
|
|
print("Shadow clarity capability required for this query.") |
|
|
print("Please upgrade your registration for full access.") |
|
|
continue |
|
|
|
|
|
|
|
|
try: |
|
|
resp = engine.process(text, user_state_token=user_state_token) |
|
|
user_state_token = resp.user_state_token |
|
|
|
|
|
|
|
|
print(f"\n[{resp.intent.value if hasattr(resp.intent, 'value') else resp.intent}/" |
|
|
f"{resp.emotion.value if hasattr(resp.emotion, 'value') else resp.emotion}]") |
|
|
print(f"{resp.message}") |
|
|
|
|
|
|
|
|
session_messages.append({ |
|
|
"input": text, |
|
|
"response": resp.message, |
|
|
"intent": resp.intent, |
|
|
"emotion": resp.emotion, |
|
|
"timestamp": datetime.now().isoformat() |
|
|
}) |
|
|
|
|
|
|
|
|
if debug_mode: |
|
|
print("\n" + "=" * 40) |
|
|
print("DEBUG - Full Response Details:") |
|
|
print(f"Avatar: {resp.avatar}") |
|
|
print(f"Confidence: {resp.confidence if hasattr(resp, 'confidence') else 'N/A'}") |
|
|
|
|
|
if resp.user_state_token: |
|
|
print("\nUser State Token Keys:") |
|
|
for key in resp.user_state_token.keys(): |
|
|
print(f" - {key}") |
|
|
|
|
|
print("=" * 40) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"\nError processing input: {str(e)}") |
|
|
if debug_mode: |
|
|
print("\nTraceback:") |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
def process_single_task(task: str, debug: bool = False, registration_key: Optional[str] = None) -> None: |
|
|
"""Process a single task through the clarity engine.""" |
|
|
engine = ClarityEngine() |
|
|
validator = RegistryValidator() |
|
|
|
|
|
|
|
|
registration_status: Optional[RegistrationResult] = None |
|
|
if registration_key: |
|
|
registration_status = validator.validate(registration_key) |
|
|
if not registration_status.valid: |
|
|
print(f"Registration failed: {registration_status.reason}") |
|
|
return |
|
|
|
|
|
try: |
|
|
resp = engine.process(task) |
|
|
|
|
|
print(f"[{datetime.now().strftime('%H:%M:%S')}]") |
|
|
print(f"Input: {task}") |
|
|
print(f"Response: {resp.message}") |
|
|
print(f"Intent: {resp.intent}") |
|
|
print(f"Emotion: {resp.emotion}") |
|
|
|
|
|
if debug: |
|
|
print("\n" + "-" * 40) |
|
|
print("Full response details:") |
|
|
debug_info = { |
|
|
"intent": resp.intent.value if hasattr(resp.intent, 'value') else resp.intent, |
|
|
"emotion": resp.emotion.value if hasattr(resp.emotion, 'value') else resp.emotion, |
|
|
"message": resp.message, |
|
|
"avatar": resp.avatar, |
|
|
} |
|
|
|
|
|
if hasattr(resp, 'confidence'): |
|
|
debug_info["confidence"] = resp.confidence |
|
|
|
|
|
if resp.user_state_token: |
|
|
debug_info["state_token_keys"] = list(resp.user_state_token.keys()) |
|
|
debug_info["state_token_sample"] = { |
|
|
k: str(v)[:100] + "..." if len(str(v)) > 100 else v |
|
|
for k, v in list(resp.user_state_token.items())[:3] |
|
|
} |
|
|
|
|
|
print(json.dumps(debug_info, indent=2)) |
|
|
|
|
|
if registration_status: |
|
|
print("\nRegistration Status:") |
|
|
print(json.dumps(registration_status.to_dict(), indent=2)) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error processing task: {str(e)}") |
|
|
if debug: |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
def process_batch_file(batch_file: str, output_file: Optional[str] = None, |
|
|
registration_key: Optional[str] = None) -> None: |
|
|
"""Process a batch of tasks from a file.""" |
|
|
engine = ClarityEngine() |
|
|
validator = RegistryValidator() |
|
|
|
|
|
|
|
|
registration_status: Optional[RegistrationResult] = None |
|
|
if registration_key: |
|
|
registration_status = validator.validate(registration_key) |
|
|
if not registration_status.valid: |
|
|
print(f"Registration failed: {registration_status.reason}") |
|
|
return |
|
|
|
|
|
try: |
|
|
with open(batch_file, 'r', encoding='utf-8') as f: |
|
|
tasks = [line.strip() for line in f if line.strip()] |
|
|
except FileNotFoundError: |
|
|
print(f"Error: Batch file '{batch_file}' not found.") |
|
|
return |
|
|
except Exception as e: |
|
|
print(f"Error reading batch file: {str(e)}") |
|
|
return |
|
|
|
|
|
if not tasks: |
|
|
print("No tasks found in batch file.") |
|
|
return |
|
|
|
|
|
print(f"Processing {len(tasks)} tasks...") |
|
|
if registration_status: |
|
|
print(f"Registration: {registration_status.key_type.upper()}") |
|
|
|
|
|
results = [] |
|
|
successful = 0 |
|
|
failed = 0 |
|
|
|
|
|
for i, task in enumerate(tasks, 1): |
|
|
print(f"\rProcessing {i}/{len(tasks)}...", end="", flush=True) |
|
|
|
|
|
try: |
|
|
resp = engine.process(task) |
|
|
result = { |
|
|
"task": task, |
|
|
"response": resp.message, |
|
|
"intent": resp.intent.value if hasattr(resp.intent, 'value') else resp.intent, |
|
|
"emotion": resp.emotion.value if hasattr(resp.emotion, 'value') else resp.emotion, |
|
|
"success": True, |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
successful += 1 |
|
|
except Exception as e: |
|
|
result = { |
|
|
"task": task, |
|
|
"response": f"Error: {str(e)}", |
|
|
"intent": "error", |
|
|
"emotion": "error", |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
failed += 1 |
|
|
|
|
|
results.append(result) |
|
|
|
|
|
print(f"\n\nProcessing complete: {successful} successful, {failed} failed") |
|
|
|
|
|
|
|
|
if output_file: |
|
|
try: |
|
|
output_path = Path(output_file) |
|
|
output_data = { |
|
|
"metadata": { |
|
|
"source_file": batch_file, |
|
|
"processed_at": datetime.now().isoformat(), |
|
|
"total_tasks": len(tasks), |
|
|
"successful": successful, |
|
|
"failed": failed, |
|
|
"registration": registration_status.to_dict() if registration_status else None |
|
|
}, |
|
|
"results": results |
|
|
} |
|
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(output_data, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
print(f"Results saved to {output_file} ({output_path.stat().st_size} bytes)") |
|
|
except Exception as e: |
|
|
print(f"Error saving output file: {str(e)}") |
|
|
|
|
|
output_file = None |
|
|
|
|
|
if not output_file: |
|
|
print("\n" + "=" * 60) |
|
|
print("Batch Processing Results:") |
|
|
print("=" * 60) |
|
|
|
|
|
for i, r in enumerate(results[:10], 1): |
|
|
print(f"\n{i}. Task: {r['task'][:80]}...") |
|
|
print(f" Response: {r['response'][:100]}...") |
|
|
print(f" Intent: {r['intent']}, Emotion: {r['emotion']}") |
|
|
|
|
|
if len(results) > 10: |
|
|
print(f"\n... and {len(results) - 10} more results") |
|
|
|
|
|
|
|
|
|
|
|
typer_app = typer.Typer( |
|
|
help="BLUX-cA conscious agent core", |
|
|
context_settings={"help_option_names": ["-h", "--help"]} |
|
|
) |
|
|
|
|
|
@typer_app.command() |
|
|
def register( |
|
|
key: str = typer.Argument(..., help="Registration key to validate"), |
|
|
verbose: bool = typer.Option(False, "-v", "--verbose", help="Show detailed information"), |
|
|
check_capability: Optional[str] = typer.Option(None, "-c", "--capability", |
|
|
help="Check for specific capability") |
|
|
) -> None: |
|
|
"""Validate a registration key and show capabilities.""" |
|
|
validator = RegistryValidator() |
|
|
|
|
|
try: |
|
|
if check_capability: |
|
|
|
|
|
try: |
|
|
cap = Capability(check_capability) |
|
|
result = validator.validate_with_context(key, {cap}) |
|
|
except ValueError: |
|
|
typer.echo(f"Error: Invalid capability '{check_capability}'") |
|
|
typer.echo(f"Valid capabilities: {[c.value for c in Capability]}") |
|
|
raise typer.Exit(code=1) |
|
|
else: |
|
|
result = validator.validate(key) |
|
|
|
|
|
if verbose: |
|
|
typer.echo(json.dumps(result.to_dict(), indent=2)) |
|
|
else: |
|
|
if result.valid: |
|
|
typer.echo(f"✓ Registration valid: {result.key_type}") |
|
|
typer.echo(f"Capabilities: {len(result.capabilities)}") |
|
|
typer.echo(f"Expires: {result.get_remaining_seconds() // 86400:.0f} days remaining") |
|
|
else: |
|
|
typer.echo(f"✗ Registration invalid: {result.reason}") |
|
|
|
|
|
except Exception as e: |
|
|
typer.echo(f"Error during registration validation: {str(e)}") |
|
|
raise typer.Exit(code=1) |
|
|
|
|
|
|
|
|
@typer_app.command() |
|
|
def code_eval( |
|
|
language: str = typer.Argument(...), |
|
|
file: Optional[Path] = typer.Option(None, exists=True), |
|
|
snippet: str = typer.Option("", help="Inline code snippet"), |
|
|
) -> None: |
|
|
"""Evaluate code (placeholder for future implementation).""" |
|
|
typer.echo("Code evaluation feature is under development.") |
|
|
typer.echo(f"Language: {language}") |
|
|
if file: |
|
|
typer.echo(f"File: {file}") |
|
|
if snippet: |
|
|
typer.echo(f"Snippet: {snippet[:50]}...") |
|
|
|
|
|
|
|
|
@typer_app.command() |
|
|
def reflect( |
|
|
text: str, |
|
|
depth: int = typer.Option(3, help="Number of why-chain iterations.") |
|
|
) -> None: |
|
|
"""Run reflection on text.""" |
|
|
try: |
|
|
perception = PerceptionLayer() |
|
|
reflection = ReflectionEngine(depth=depth) |
|
|
entry = perception.observe(text) |
|
|
insight = reflection.reflect(entry.text) |
|
|
typer.echo(json.dumps(insight.__dict__, indent=2, ensure_ascii=False)) |
|
|
except Exception as e: |
|
|
typer.echo(f"Error during reflection: {str(e)}") |
|
|
raise typer.Exit(code=1) |
|
|
|
|
|
|
|
|
@typer_app.command(name="eval") |
|
|
def eval_suite( |
|
|
dataset_dir: Path = typer.Option(..., exists=True, file_okay=False, dir_okay=True, resolve_path=True, |
|
|
help="Path to BLUX-cA dataset directory (with eval/*.jsonl files)."), |
|
|
suite: str = typer.Option("all", help=f"Probe suite to run: {sorted(PROBE_SUITES)} or 'all'"), |
|
|
output: Optional[Path] = typer.Option(None, help="Optional output report path (defaults to runs/eval_<timestamp>.md).") |
|
|
) -> None: |
|
|
"""Run evaluation probes (identity, red_team, capability, doctrine).""" |
|
|
try: |
|
|
suite_name = suite.lower() |
|
|
valid = set(PROBE_SUITES.keys()) | {"all"} |
|
|
if suite_name not in valid: |
|
|
raise typer.BadParameter(f"Unknown suite '{suite}'. Valid options: {sorted(valid)}") |
|
|
report_path = run_probe_evaluation(dataset_dir, suite_name, output) |
|
|
typer.echo(f"Evaluation complete. Report written to {report_path}") |
|
|
except Exception as e: |
|
|
typer.echo(f"Error during evaluation: {str(e)}") |
|
|
raise typer.Exit(code=1) |
|
|
|
|
|
|
|
|
@typer_app.command() |
|
|
def explain( |
|
|
last: bool = typer.Option(False, help="Explain the most recent audit entry."), |
|
|
count: int = typer.Option(1, help="Number of recent entries to show.") |
|
|
) -> None: |
|
|
"""Explain audit entries.""" |
|
|
audit = AuditLog() |
|
|
if not audit.path.exists(): |
|
|
typer.echo("No audit history available.") |
|
|
raise typer.Exit(code=1) |
|
|
|
|
|
try: |
|
|
lines = audit.path.read_text(encoding="utf-8").strip().splitlines() |
|
|
if not lines: |
|
|
typer.echo("Audit log empty.") |
|
|
return |
|
|
|
|
|
if last: |
|
|
show_count = min(count, len(lines)) |
|
|
typer.echo(f"Last {show_count} audit entries:") |
|
|
for i, line in enumerate(lines[-show_count:], 1): |
|
|
typer.echo(f"\n{i}. {line}") |
|
|
else: |
|
|
typer.echo(f"Audit log contains {len(lines)} entries.") |
|
|
typer.echo("Use --last to view recent entries.") |
|
|
|
|
|
except Exception as e: |
|
|
typer.echo(f"Error reading audit log: {str(e)}") |
|
|
raise typer.Exit(code=1) |
|
|
|
|
|
|
|
|
@typer_app.command() |
|
|
def audit_export( |
|
|
output: Optional[str] = typer.Option(None, help="Export path."), |
|
|
format: str = typer.Option("jsonl", help="Export format: jsonl or json.") |
|
|
) -> None: |
|
|
"""Export audit log.""" |
|
|
audit = AuditLog() |
|
|
if not audit.path.exists(): |
|
|
typer.echo("No audit history available.") |
|
|
return |
|
|
|
|
|
try: |
|
|
target = output or f"audit_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{format}" |
|
|
audit_data = audit.path.read_text(encoding="utf-8") |
|
|
|
|
|
if format.lower() == "json": |
|
|
|
|
|
lines = audit_data.strip().splitlines() |
|
|
json_data = [json.loads(line) for line in lines if line.strip()] |
|
|
with open(target, 'w', encoding='utf-8') as f: |
|
|
json.dump(json_data, f, indent=2, ensure_ascii=False) |
|
|
else: |
|
|
|
|
|
with open(target, 'w', encoding='utf-8') as f: |
|
|
f.write(audit_data) |
|
|
|
|
|
typer.echo(f"Audit log exported to {target}") |
|
|
|
|
|
except Exception as e: |
|
|
typer.echo(f"Error exporting audit log: {str(e)}") |
|
|
raise typer.Exit(code=1) |
|
|
|
|
|
|
|
|
@typer_app.command() |
|
|
def doctrine(text: str, mode: str = typer.Option("strict", help="Evaluation mode")) -> None: |
|
|
"""Evaluate text against doctrine.""" |
|
|
try: |
|
|
config = load_config() |
|
|
compass = DiscernmentCompass() |
|
|
constitution = ConstitutionEngine(mode=mode) |
|
|
|
|
|
insights = [text] |
|
|
decision = constitution.evaluate( |
|
|
insights=insights, |
|
|
intent=compass.classify(text).intent.value |
|
|
) |
|
|
|
|
|
audit = AuditLog() |
|
|
record = audit.create_record( |
|
|
input_hash=_hash_text(text), |
|
|
verdict=decision.decision, |
|
|
doctrine_refs=decision.doctrine_refs, |
|
|
rationale=decision.reason, |
|
|
) |
|
|
audit.append(record) |
|
|
|
|
|
typer.echo(json.dumps(decision.__dict__, indent=2, ensure_ascii=False)) |
|
|
|
|
|
except Exception as e: |
|
|
typer.echo(f"Error during doctrine evaluation: {str(e)}") |
|
|
raise typer.Exit(code=1) |
|
|
|
|
|
|
|
|
@typer_app.command() |
|
|
def repl( |
|
|
key: Optional[str] = typer.Option(None, "--key", "-k", help="Registration key") |
|
|
) -> None: |
|
|
"""Start interactive REPL.""" |
|
|
clarity_interactive_mode(registration_key=key) |
|
|
|
|
|
|
|
|
@typer_app.command() |
|
|
def version() -> None: |
|
|
"""Show version information.""" |
|
|
try: |
|
|
import importlib.metadata |
|
|
version = importlib.metadata.version("blux-ca") |
|
|
typer.echo(f"BLUX-cA version {version}") |
|
|
except: |
|
|
typer.echo("BLUX-cA (version unknown)") |
|
|
|
|
|
typer.echo(f"Python {sys.version}") |
|
|
typer.echo(f"Platform: {sys.platform}") |
|
|
|
|
|
|
|
|
def run_typer_command(args: List[str]) -> None: |
|
|
"""Run Typer commands from argparse.""" |
|
|
try: |
|
|
|
|
|
original_argv = sys.argv |
|
|
sys.argv = ["ca.py"] + args |
|
|
typer_app() |
|
|
except SystemExit as e: |
|
|
if e.code != 0: |
|
|
print(f"Command failed with exit code {e.code}") |
|
|
except Exception as e: |
|
|
print(f"Error executing command: {str(e)}") |
|
|
traceback.print_exc() |
|
|
finally: |
|
|
sys.argv = original_argv |
|
|
|
|
|
|
|
|
def main() -> None: |
|
|
"""Main CLI entrypoint with argparse for backward compatibility.""" |
|
|
parser = argparse.ArgumentParser( |
|
|
description="BLUX-cA Clarity Agent - Unified Interface", |
|
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
|
epilog=""" |
|
|
Examples: |
|
|
ca.py # Start interactive session |
|
|
ca.py --task "Hello" # Process single task |
|
|
ca.py --batch tasks.txt # Process batch file |
|
|
ca.py --debug --task "Test" # Debug single task |
|
|
ca.py reflect "What is clarity?" # Reflection command |
|
|
ca.py doctrine "Analyze this" # Doctrine evaluation |
|
|
ca.py repl # Interactive REPL |
|
|
ca.py register KEY # Validate registration key |
|
|
ca.py --help # Show this help |
|
|
""" |
|
|
) |
|
|
|
|
|
|
|
|
mode_group = parser.add_mutually_exclusive_group() |
|
|
mode_group.add_argument( |
|
|
"--task", |
|
|
type=str, |
|
|
help="Process a single task" |
|
|
) |
|
|
mode_group.add_argument( |
|
|
"--batch", |
|
|
type=str, |
|
|
help="Process tasks from a file (one per line)" |
|
|
) |
|
|
mode_group.add_argument( |
|
|
"--repl", |
|
|
action="store_true", |
|
|
help="Start interactive REPL (same as no arguments)" |
|
|
) |
|
|
|
|
|
|
|
|
parser.add_argument( |
|
|
"--key", |
|
|
"-k", |
|
|
type=str, |
|
|
help="Registration key for enhanced capabilities" |
|
|
) |
|
|
|
|
|
|
|
|
parser.add_argument( |
|
|
"typer_command", |
|
|
nargs="?", |
|
|
help="Typer command (register, reflect, explain, eval, audit-export, doctrine, repl, version)" |
|
|
) |
|
|
|
|
|
|
|
|
parser.add_argument( |
|
|
"--debug", |
|
|
action="store_true", |
|
|
help="Enable debug output" |
|
|
) |
|
|
parser.add_argument( |
|
|
"--output", |
|
|
"-o", |
|
|
type=str, |
|
|
help="Output file for batch processing" |
|
|
) |
|
|
|
|
|
|
|
|
parser.add_argument( |
|
|
"typer_args", |
|
|
nargs=argparse.REMAINDER, |
|
|
help=argparse.SUPPRESS |
|
|
) |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
if args.typer_command: |
|
|
typer_args = [args.typer_command] + args.typer_args |
|
|
run_typer_command(typer_args) |
|
|
return |
|
|
|
|
|
|
|
|
try: |
|
|
if args.task: |
|
|
process_single_task(args.task, args.debug, args.key) |
|
|
elif args.batch: |
|
|
process_batch_file(args.batch, args.output, args.key) |
|
|
elif args.repl or (not args.task and not args.batch): |
|
|
clarity_interactive_mode(args.key) |
|
|
else: |
|
|
parser.print_help() |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print("\n\nOperation cancelled by user.") |
|
|
sys.exit(130) |
|
|
except Exception as e: |
|
|
print(f"Error: {str(e)}") |
|
|
if args.debug: |
|
|
traceback.print_exc() |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |