from __future__ import annotations import copy import json from typing import Any, Dict, List from .fixes import cast_bool, cast_number, map_enum, parse_date_iso, rename_key from .ml_model import SemanticReasoner from .rules_engine import validate_with_jsonschema from .schema_utils import collect_enums from .types import Prediction, Report def _apply_fix(schema: Dict[str, Any], payload: Any, pred: Prediction) -> Any | None: path = pred.get("jsonpath", "$") if not path.startswith("$"): return None # convert to tokens tokens: List[str] = [] rest = path[1:] i = 0 while i < len(rest): ch = rest[i] if ch == ".": j = i + 1 name = [] while j < len(rest) and rest[j] not in ".[": name.append(rest[j]) j += 1 if name: tokens.append("." + "".join(name)) i = j continue if ch == "[": j = rest.find("]", i) tokens.append(rest[i : j + 1]) i = j + 1 continue i += 1 action = pred.get("fix_action", "") if action == "rename_key": dst = pred.get("fix_value") or "_renamed" try: return rename_key(payload, tokens, dst) except Exception: return None if action == "cast_number": return cast_number(payload, tokens) if action == "cast_bool": return cast_bool(payload, tokens) if action == "parse_date_iso": return parse_date_iso(payload, tokens) if action == "map_enum": enums = collect_enums(schema) allowed = enums.get(path.replace("$", ""), []) return map_enum(payload, tokens, allowed) # fill_default or unknown → skip return None def run_validation( schema: Dict[str, Any], payload: Any, *, apply_fixes: bool = True, max_fixes: int = 5, backend: str = "local", ) -> Report: is_valid, errors = validate_with_jsonschema(schema, payload) if is_valid: return { "valid": True, "rule_errors": [], "ml_predictions": [], "applied_fixes": [], "corrected_json": payload, } # Honor explicit rules-only backend: do not invoke ML or apply fixes if backend == "rules-only": return { "valid": False, "rule_errors": errors, "ml_predictions": [], "applied_fixes": [], "corrected_json": payload, } reasoner = SemanticReasoner(backend=backend) preds = reasoner.predict(json.dumps(schema), json.dumps(payload), errors) applied: List[Prediction] = [] corrected = copy.deepcopy(payload) if not apply_fixes: return { "valid": False, "rule_errors": errors, "ml_predictions": preds, "applied_fixes": [], "corrected_json": corrected, } for pred in preds[:max_fixes]: candidate = copy.deepcopy(corrected) updated = _apply_fix(schema, candidate, pred) if updated is None: continue now_valid, _ = validate_with_jsonschema(schema, candidate) if now_valid: corrected = candidate applied.append(pred) break else: # keep only if it reduces number of errors by any amount prev_count = len(errors) _, new_errs = validate_with_jsonschema(schema, candidate) if len(new_errs) <= prev_count: corrected = candidate applied.append(pred) errors = new_errs if len(applied) >= max_fixes: break final_valid, final_errors = validate_with_jsonschema(schema, corrected) return { "valid": final_valid, "rule_errors": final_errors if not final_valid else [], "ml_predictions": preds, "applied_fixes": applied, "corrected_json": corrected, }