File size: 5,323 Bytes
0c09dd2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
from __future__ import annotations
import re
from datetime import datetime
from typing import Any, List, Optional
from dateutil import parser as date_parser
from rapidfuzz import fuzz, process
from .jsonpath_utils import delete_key_at_path, get_value_at_path, set_value_at_path
BOOLEAN_TRUE = {"true", "yes", "y", "1", "on"}
BOOLEAN_FALSE = {"false", "no", "n", "0", "off"}
TEXT_NUMBERS = {
"zero": 0,
"one": 1,
"two": 2,
"three": 3,
"four": 4,
"five": 5,
"six": 6,
"seven": 7,
"eight": 8,
"nine": 9,
"ten": 10,
"eleven": 11,
"twelve": 12,
"thirteen": 13,
"fourteen": 14,
"fifteen": 15,
"sixteen": 16,
"seventeen": 17,
"eighteen": 18,
"nineteen": 19,
"twenty": 20,
"thirty": 30,
"forty": 40,
"fifty": 50,
"sixty": 60,
"seventy": 70,
"eighty": 80,
"ninety": 90,
}
def rename_key(payload: Any, src_path_tokens: List[str], dst_key: str) -> Any:
parent_tokens = src_path_tokens[:-1]
src_last = src_path_tokens[-1]
if src_last.startswith("["):
return payload # renaming array indices not supported
value = get_value_at_path(payload, src_path_tokens)
delete_key_at_path(payload, src_path_tokens)
new_tokens = parent_tokens + [f".{dst_key}"]
set_value_at_path(payload, new_tokens, value)
return payload
def cast_number(payload: Any, path_tokens: List[str]) -> Optional[Any]:
value = get_value_at_path(payload, path_tokens)
if isinstance(value, (int, float)):
return payload
if isinstance(value, str):
s = value.strip().lower()
# Try numeric directly
try:
num = float(s) if "." in s else int(s)
set_value_at_path(payload, path_tokens, num)
return payload
except Exception:
pass
# Try words → number (simple)
words = re.split(r"[\s-]+", s)
total = 0
for w in words:
if w in TEXT_NUMBERS:
total += TEXT_NUMBERS[w]
else:
return None
set_value_at_path(payload, path_tokens, total)
return payload
return None
def cast_bool(payload: Any, path_tokens: List[str]) -> Optional[Any]:
value = get_value_at_path(payload, path_tokens)
if isinstance(value, bool):
return payload
if isinstance(value, (int, float)):
b = bool(value)
set_value_at_path(payload, path_tokens, b)
return payload
if isinstance(value, str):
s = value.strip().lower()
if s in BOOLEAN_TRUE:
set_value_at_path(payload, path_tokens, True)
return payload
if s in BOOLEAN_FALSE:
set_value_at_path(payload, path_tokens, False)
return payload
return None
def parse_date_iso(payload: Any, path_tokens: List[str]) -> Optional[Any]:
value = get_value_at_path(payload, path_tokens)
if isinstance(value, (int, float)):
try:
dt = datetime.fromtimestamp(float(value))
set_value_at_path(payload, path_tokens, dt.date().isoformat())
return payload
except Exception:
return None
if isinstance(value, str):
try:
dt = date_parser.parse(value)
set_value_at_path(payload, path_tokens, dt.date().isoformat())
return payload
except Exception:
# try common day-first variants
try:
dt = date_parser.parse(value, dayfirst=True)
set_value_at_path(payload, path_tokens, dt.date().isoformat())
return payload
except Exception:
return None
return None
def map_enum(
payload: Any,
path_tokens: List[str],
allowed_values: List[str],
*,
threshold: Optional[int] = None,
) -> Optional[Any]:
value = get_value_at_path(payload, path_tokens)
if value is None:
return None
# Normalize case on both sides
s = str(value)
norm_allowed = [str(v) for v in allowed_values]
if s in norm_allowed:
return payload
if not norm_allowed:
return None
# Adaptive threshold by enum size
if threshold is None:
threshold = 90 if len(norm_allowed) <= 5 else 80
result = process.extractOne(
s.lower(), [v.lower() for v in norm_allowed], scorer=fuzz.WRatio
)
if result is None:
return None
match, score, _ = result
if score >= threshold and match is not None:
idx = [v.lower() for v in norm_allowed].index(match)
set_value_at_path(payload, path_tokens, norm_allowed[idx])
return payload
return None
def swap_dates(
payload: Any, start_tokens: List[str], end_tokens: List[str]
) -> Optional[Any]:
"""Swap start/end date fields if inverted (heuristic)."""
try:
start_val = get_value_at_path(payload, start_tokens)
end_val = get_value_at_path(payload, end_tokens)
sdt = date_parser.parse(str(start_val))
edt = date_parser.parse(str(end_val))
if edt < sdt:
set_value_at_path(payload, start_tokens, edt.date().isoformat())
set_value_at_path(payload, end_tokens, sdt.date().isoformat())
return payload
except Exception:
return None
return None
|