|
|
from __future__ import annotations |
|
|
|
|
|
import re |
|
|
from datetime import datetime |
|
|
from typing import Any, List, Optional |
|
|
|
|
|
from dateutil import parser as date_parser |
|
|
from rapidfuzz import fuzz, process |
|
|
|
|
|
from .jsonpath_utils import delete_key_at_path, get_value_at_path, set_value_at_path |
|
|
|
|
|
BOOLEAN_TRUE = {"true", "yes", "y", "1", "on"} |
|
|
BOOLEAN_FALSE = {"false", "no", "n", "0", "off"} |
|
|
|
|
|
TEXT_NUMBERS = { |
|
|
"zero": 0, |
|
|
"one": 1, |
|
|
"two": 2, |
|
|
"three": 3, |
|
|
"four": 4, |
|
|
"five": 5, |
|
|
"six": 6, |
|
|
"seven": 7, |
|
|
"eight": 8, |
|
|
"nine": 9, |
|
|
"ten": 10, |
|
|
"eleven": 11, |
|
|
"twelve": 12, |
|
|
"thirteen": 13, |
|
|
"fourteen": 14, |
|
|
"fifteen": 15, |
|
|
"sixteen": 16, |
|
|
"seventeen": 17, |
|
|
"eighteen": 18, |
|
|
"nineteen": 19, |
|
|
"twenty": 20, |
|
|
"thirty": 30, |
|
|
"forty": 40, |
|
|
"fifty": 50, |
|
|
"sixty": 60, |
|
|
"seventy": 70, |
|
|
"eighty": 80, |
|
|
"ninety": 90, |
|
|
} |
|
|
|
|
|
|
|
|
def rename_key(payload: Any, src_path_tokens: List[str], dst_key: str) -> Any: |
|
|
parent_tokens = src_path_tokens[:-1] |
|
|
src_last = src_path_tokens[-1] |
|
|
if src_last.startswith("["): |
|
|
return payload |
|
|
value = get_value_at_path(payload, src_path_tokens) |
|
|
delete_key_at_path(payload, src_path_tokens) |
|
|
new_tokens = parent_tokens + [f".{dst_key}"] |
|
|
set_value_at_path(payload, new_tokens, value) |
|
|
return payload |
|
|
|
|
|
|
|
|
def cast_number(payload: Any, path_tokens: List[str]) -> Optional[Any]: |
|
|
value = get_value_at_path(payload, path_tokens) |
|
|
if isinstance(value, (int, float)): |
|
|
return payload |
|
|
if isinstance(value, str): |
|
|
s = value.strip().lower() |
|
|
|
|
|
try: |
|
|
num = float(s) if "." in s else int(s) |
|
|
set_value_at_path(payload, path_tokens, num) |
|
|
return payload |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
words = re.split(r"[\s-]+", s) |
|
|
total = 0 |
|
|
for w in words: |
|
|
if w in TEXT_NUMBERS: |
|
|
total += TEXT_NUMBERS[w] |
|
|
else: |
|
|
return None |
|
|
set_value_at_path(payload, path_tokens, total) |
|
|
return payload |
|
|
return None |
|
|
|
|
|
|
|
|
def cast_bool(payload: Any, path_tokens: List[str]) -> Optional[Any]: |
|
|
value = get_value_at_path(payload, path_tokens) |
|
|
if isinstance(value, bool): |
|
|
return payload |
|
|
if isinstance(value, (int, float)): |
|
|
b = bool(value) |
|
|
set_value_at_path(payload, path_tokens, b) |
|
|
return payload |
|
|
if isinstance(value, str): |
|
|
s = value.strip().lower() |
|
|
if s in BOOLEAN_TRUE: |
|
|
set_value_at_path(payload, path_tokens, True) |
|
|
return payload |
|
|
if s in BOOLEAN_FALSE: |
|
|
set_value_at_path(payload, path_tokens, False) |
|
|
return payload |
|
|
return None |
|
|
|
|
|
|
|
|
def parse_date_iso(payload: Any, path_tokens: List[str]) -> Optional[Any]: |
|
|
value = get_value_at_path(payload, path_tokens) |
|
|
if isinstance(value, (int, float)): |
|
|
try: |
|
|
dt = datetime.fromtimestamp(float(value)) |
|
|
set_value_at_path(payload, path_tokens, dt.date().isoformat()) |
|
|
return payload |
|
|
except Exception: |
|
|
return None |
|
|
if isinstance(value, str): |
|
|
try: |
|
|
dt = date_parser.parse(value) |
|
|
set_value_at_path(payload, path_tokens, dt.date().isoformat()) |
|
|
return payload |
|
|
except Exception: |
|
|
|
|
|
try: |
|
|
dt = date_parser.parse(value, dayfirst=True) |
|
|
set_value_at_path(payload, path_tokens, dt.date().isoformat()) |
|
|
return payload |
|
|
except Exception: |
|
|
return None |
|
|
return None |
|
|
|
|
|
|
|
|
def map_enum( |
|
|
payload: Any, |
|
|
path_tokens: List[str], |
|
|
allowed_values: List[str], |
|
|
*, |
|
|
threshold: Optional[int] = None, |
|
|
) -> Optional[Any]: |
|
|
value = get_value_at_path(payload, path_tokens) |
|
|
if value is None: |
|
|
return None |
|
|
|
|
|
s = str(value) |
|
|
norm_allowed = [str(v) for v in allowed_values] |
|
|
if s in norm_allowed: |
|
|
return payload |
|
|
if not norm_allowed: |
|
|
return None |
|
|
|
|
|
if threshold is None: |
|
|
threshold = 90 if len(norm_allowed) <= 5 else 80 |
|
|
result = process.extractOne( |
|
|
s.lower(), [v.lower() for v in norm_allowed], scorer=fuzz.WRatio |
|
|
) |
|
|
if result is None: |
|
|
return None |
|
|
match, score, _ = result |
|
|
if score >= threshold and match is not None: |
|
|
idx = [v.lower() for v in norm_allowed].index(match) |
|
|
set_value_at_path(payload, path_tokens, norm_allowed[idx]) |
|
|
return payload |
|
|
return None |
|
|
|
|
|
|
|
|
def swap_dates( |
|
|
payload: Any, start_tokens: List[str], end_tokens: List[str] |
|
|
) -> Optional[Any]: |
|
|
"""Swap start/end date fields if inverted (heuristic).""" |
|
|
try: |
|
|
start_val = get_value_at_path(payload, start_tokens) |
|
|
end_val = get_value_at_path(payload, end_tokens) |
|
|
sdt = date_parser.parse(str(start_val)) |
|
|
edt = date_parser.parse(str(end_val)) |
|
|
if edt < sdt: |
|
|
set_value_at_path(payload, start_tokens, edt.date().isoformat()) |
|
|
set_value_at_path(payload, end_tokens, sdt.date().isoformat()) |
|
|
return payload |
|
|
except Exception: |
|
|
return None |
|
|
return None |
|
|
|