| """ |
| Comprehensive frequency management module for time series forecasting. |
| |
| This module centralizes all frequency-related functionality including: |
| - Frequency enum with helper methods |
| - Frequency parsing and validation |
| - Pandas frequency string conversion |
| - Safety checks for date ranges |
| - Frequency selection utilities |
| - All frequency constants and mappings |
| """ |
|
|
| import logging |
| import re |
| from enum import Enum |
|
|
| import numpy as np |
| import pandas as pd |
| from numpy.random import Generator |
|
|
| from src.data.constants import BASE_END_DATE, BASE_START_DATE, MAX_YEARS |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class Frequency(Enum): |
| """ |
| Enhanced Frequency enum with comprehensive helper methods. |
| |
| Each frequency includes methods for pandas conversion, safety checks, |
| and other frequency-specific operations. |
| """ |
|
|
| A = "A" |
| Q = "Q" |
| M = "M" |
| W = "W" |
| D = "D" |
| H = "h" |
| S = "s" |
| T1 = "1min" |
| T5 = "5min" |
| T10 = "10min" |
| T15 = "15min" |
| T30 = "30min" |
|
|
| def to_pandas_freq(self, for_date_range: bool = True) -> str: |
| """ |
| Convert to pandas frequency string. |
| |
| Args: |
| for_date_range: If True, use strings suitable for pd.date_range(). |
| If False, use strings suitable for pd.PeriodIndex(). |
| |
| Returns: |
| Pandas frequency string |
| """ |
| base, prefix, _ = FREQUENCY_MAPPING[self] |
|
|
| |
| if for_date_range: |
| |
| if self == Frequency.M: |
| return "ME" |
| elif self == Frequency.A: |
| return "YE" |
| elif self == Frequency.Q: |
| return "QE" |
| else: |
| |
| if self == Frequency.M: |
| return "M" |
| elif self == Frequency.A: |
| return "Y" |
| elif self == Frequency.Q: |
| return "Q" |
|
|
| |
| if prefix: |
| return f"{prefix}{base}" |
| else: |
| return base |
|
|
| def to_pandas_offset(self) -> str: |
| """Get pandas offset string for time delta calculations.""" |
| return FREQUENCY_TO_OFFSET[self] |
|
|
| def get_days_per_period(self) -> float: |
| """Get approximate days per period for this frequency.""" |
| _, _, days = FREQUENCY_MAPPING[self] |
| return days |
|
|
| def get_max_safe_length(self) -> int: |
| """Get maximum safe sequence length to prevent timestamp overflow.""" |
| return ALL_FREQUENCY_MAX_LENGTHS.get(self, float("inf")) |
|
|
| def is_high_frequency(self) -> bool: |
| """Check if this is a high frequency (minute/second level).""" |
| return self in [ |
| Frequency.S, |
| Frequency.T1, |
| Frequency.T5, |
| Frequency.T10, |
| Frequency.T15, |
| Frequency.T30, |
| ] |
|
|
| def is_low_frequency(self) -> bool: |
| """Check if this is a low frequency (annual/quarterly/monthly).""" |
| return self in [Frequency.A, Frequency.Q, Frequency.M] |
|
|
| def get_seasonality(self) -> int: |
| """Get typical seasonality for this frequency.""" |
| seasonality_map = { |
| Frequency.S: 3600, |
| Frequency.T1: 60, |
| Frequency.T5: 12, |
| Frequency.T10: 6, |
| Frequency.T15: 4, |
| Frequency.T30: 2, |
| Frequency.H: 24, |
| Frequency.D: 7, |
| Frequency.W: 52, |
| Frequency.M: 12, |
| Frequency.Q: 4, |
| Frequency.A: 1, |
| } |
| return seasonality_map.get(self, 1) |
|
|
| def get_gift_eval_weight(self) -> float: |
| """Get GIFT eval dataset frequency weight.""" |
| return GIFT_EVAL_FREQUENCY_WEIGHTS.get(self, 0.1) |
|
|
| def get_length_range(self) -> tuple[int, int, int, int]: |
| """Get (min_length, max_length, optimal_start, optimal_end) for this frequency.""" |
| return GIFT_EVAL_LENGTH_RANGES.get(self, (50, 1000, 100, 500)) |
|
|
|
|
| |
| |
| |
|
|
| |
| FREQUENCY_MAPPING: dict[Frequency, tuple[str, str, float]] = { |
| Frequency.A: ( |
| "YE", |
| "", |
| 365.25, |
| ), |
| Frequency.Q: ("Q", "", 91.3125), |
| Frequency.M: ("M", "", 30.4375), |
| Frequency.W: ("W", "", 7), |
| Frequency.D: ("D", "", 1), |
| Frequency.H: ("h", "", 1 / 24), |
| Frequency.S: ("s", "", 1 / 86400), |
| Frequency.T1: ("min", "1", 1 / 1440), |
| Frequency.T5: ("min", "5", 1 / 288), |
| Frequency.T10: ("min", "10", 1 / 144), |
| Frequency.T15: ("min", "15", 1 / 96), |
| Frequency.T30: ("min", "30", 1 / 48), |
| } |
|
|
| |
| FREQUENCY_TO_OFFSET: dict[Frequency, str] = { |
| Frequency.A: "AS", |
| Frequency.Q: "QS", |
| Frequency.M: "MS", |
| Frequency.W: "W", |
| Frequency.D: "D", |
| Frequency.H: "H", |
| Frequency.T1: "1T", |
| Frequency.T5: "5T", |
| Frequency.T10: "10T", |
| Frequency.T15: "15T", |
| Frequency.T30: "30T", |
| Frequency.S: "S", |
| } |
|
|
| |
| SHORT_FREQUENCY_MAX_LENGTHS = { |
| Frequency.A: MAX_YEARS, |
| Frequency.Q: MAX_YEARS * 4, |
| Frequency.M: MAX_YEARS * 12, |
| Frequency.W: int(MAX_YEARS * 52.1775), |
| Frequency.D: int(MAX_YEARS * 365.2425), |
| } |
|
|
| HIGH_FREQUENCY_MAX_LENGTHS = { |
| Frequency.H: int(MAX_YEARS * 365.2425 * 24), |
| Frequency.S: int(MAX_YEARS * 365.2425 * 24 * 60 * 60), |
| Frequency.T1: int(MAX_YEARS * 365.2425 * 24 * 60), |
| Frequency.T5: int(MAX_YEARS * 365.2425 * 24 * 12), |
| Frequency.T10: int(MAX_YEARS * 365.2425 * 24 * 6), |
| Frequency.T15: int(MAX_YEARS * 365.2425 * 24 * 4), |
| Frequency.T30: int(MAX_YEARS * 365.2425 * 24 * 2), |
| } |
|
|
| |
| ALL_FREQUENCY_MAX_LENGTHS = { |
| **SHORT_FREQUENCY_MAX_LENGTHS, |
| **HIGH_FREQUENCY_MAX_LENGTHS, |
| } |
|
|
| |
| GIFT_EVAL_FREQUENCY_WEIGHTS: dict[Frequency, float] = { |
| Frequency.H: 25.0, |
| Frequency.D: 23.4, |
| Frequency.W: 12.9, |
| Frequency.T15: 9.7, |
| Frequency.T5: 9.7, |
| Frequency.M: 7.3, |
| Frequency.T10: 4.8, |
| Frequency.S: 4.8, |
| Frequency.T1: 1.6, |
| Frequency.Q: 0.8, |
| Frequency.A: 0.8, |
| } |
|
|
| |
| |
| GIFT_EVAL_LENGTH_RANGES: dict[Frequency, tuple[int, int, int, int]] = { |
| |
| Frequency.A: (25, 100, 30, 70), |
| Frequency.Q: (25, 150, 50, 120), |
| Frequency.M: (40, 1000, 100, 600), |
| Frequency.W: (50, 3500, 100, 1500), |
| |
| Frequency.D: (150, 25000, 300, 7000), |
| Frequency.H: (600, 35000, 700, 17000), |
| |
| Frequency.T1: (200, 2500, 1200, 1800), |
| Frequency.S: (7500, 9500, 7900, 9000), |
| Frequency.T15: (1000, 140000, 50000, 130000), |
| Frequency.T5: (200, 105000, 20000, 95000), |
| Frequency.T10: (40000, 55000, 47000, 52000), |
| Frequency.T30: (100, 50000, 10000, 40000), |
| } |
|
|
|
|
| |
| |
| |
|
|
|
|
| def parse_frequency(freq_str: str) -> Frequency: |
| """ |
| Parse frequency string to Frequency enum, robust to variations. |
| |
| Handles various frequency string formats: |
| - Standard: "A", "Q", "M", "W", "D", "H", "S" |
| - Pandas-style: "A-DEC", "W-SUN", "QE-MAR" |
| - Minutes: "5T", "10min", "1T" |
| - Case variations: "a", "h", "D" |
| |
| Args: |
| freq_str: The frequency string to parse (e.g., "5T", "W-SUN", "M") |
| |
| Returns: |
| Corresponding Frequency enum member |
| |
| Raises: |
| ValueError: If the frequency string is not supported |
| """ |
| |
| |
| minute_match = re.match(r"^(\d*)T$", freq_str, re.IGNORECASE) or re.match(r"^(\d*)min$", freq_str, re.IGNORECASE) |
| if minute_match: |
| multiplier = int(minute_match.group(1)) if minute_match.group(1) else 1 |
| enum_key = f"T{multiplier}" |
| try: |
| return Frequency[enum_key] |
| except KeyError: |
| logger.warning( |
| f"Unsupported minute frequency '{freq_str}' (multiplier: {multiplier}). " |
| f"Falling back to '1min' ({Frequency.T1.value})." |
| ) |
| return Frequency.T1 |
|
|
| |
| try: |
| offset = pd.tseries.frequencies.to_offset(freq_str) |
| standardized_freq = offset.name |
| except Exception: |
| standardized_freq = freq_str |
|
|
| |
| base_freq = standardized_freq.split("-")[0].upper() |
|
|
| freq_map = { |
| "A": Frequency.A, |
| "Y": Frequency.A, |
| "YE": Frequency.A, |
| "Q": Frequency.Q, |
| "QE": Frequency.Q, |
| "M": Frequency.M, |
| "ME": Frequency.M, |
| "W": Frequency.W, |
| "D": Frequency.D, |
| "H": Frequency.H, |
| "S": Frequency.S, |
| } |
|
|
| if base_freq in freq_map: |
| return freq_map[base_freq] |
|
|
| raise NotImplementedError(f"Frequency '{standardized_freq}' is not supported.") |
|
|
|
|
| def validate_frequency_safety(start_date: np.datetime64, total_length: int, frequency: Frequency) -> bool: |
| """ |
| Check if start date and frequency combination is safe for pandas datetime operations. |
| |
| This function verifies that pd.date_range(start=start_date, periods=total_length, freq=freq_str) |
| will not raise an OutOfBoundsDatetime error, accounting for pandas' datetime bounds |
| (1677-09-21 to 2262-04-11) and realistic frequency limitations. |
| |
| Args: |
| start_date: The proposed start date for the time series |
| total_length: Total length of the time series |
| frequency: The frequency of the time series |
| |
| Returns: |
| True if the combination is safe, False otherwise |
| """ |
| try: |
| |
| freq_str = frequency.to_pandas_freq(for_date_range=True) |
|
|
| |
| start_pd = pd.Timestamp(start_date) |
|
|
| |
| if start_pd < pd.Timestamp.min or start_pd > pd.Timestamp.max: |
| return False |
|
|
| |
| max_length = frequency.get_max_safe_length() |
| if total_length > max_length: |
| return False |
|
|
| |
| if frequency.is_low_frequency(): |
| if frequency == Frequency.A and total_length > 500: |
| return False |
| elif frequency == Frequency.Q and total_length > 2000: |
| return False |
| elif frequency == Frequency.M and total_length > 6000: |
| return False |
|
|
| |
| days_per_period = frequency.get_days_per_period() |
| approx_days = total_length * days_per_period |
|
|
| |
| if frequency in [Frequency.A, Frequency.Q]: |
| approx_days *= 1.1 |
|
|
| end_date = start_pd + pd.Timedelta(days=approx_days) |
|
|
| |
| if end_date < pd.Timestamp.min or end_date > pd.Timestamp.max: |
| return False |
|
|
| |
| pd.date_range(start=start_pd, periods=total_length, freq=freq_str) |
| return True |
|
|
| except (pd.errors.OutOfBoundsDatetime, OverflowError, ValueError): |
| return False |
|
|
|
|
| |
| |
| |
|
|
|
|
| def select_safe_random_frequency(total_length: int, rng: Generator) -> Frequency: |
| """ |
| Select a random frequency suitable for a given total length of a time series, |
| based on actual GIFT eval dataset patterns and distributions. |
| |
| The selection logic: |
| 1. Filters frequencies that can handle the given total_length |
| 2. Applies base weights derived from actual GIFT eval frequency distribution |
| 3. Strongly boosts frequencies that are in their optimal length ranges |
| 4. Handles edge cases gracefully with fallbacks |
| |
| Args: |
| total_length: The total length of the time series (history + future) |
| rng: A numpy random number generator instance |
| |
| Returns: |
| A randomly selected frequency that matches GIFT eval patterns |
| """ |
| |
| valid_frequencies = [] |
| frequency_scores = [] |
|
|
| for freq in Frequency: |
| |
| max_allowed = freq.get_max_safe_length() |
| if total_length > max_allowed: |
| continue |
|
|
| |
| min_len, max_len, optimal_start, optimal_end = freq.get_length_range() |
|
|
| |
| if total_length < min_len or total_length > max_len: |
| continue |
|
|
| valid_frequencies.append(freq) |
|
|
| |
| base_weight = freq.get_gift_eval_weight() |
|
|
| |
| if optimal_start <= total_length <= optimal_end: |
| |
| length_multiplier = 5.0 |
| else: |
| |
| if total_length < optimal_start: |
| |
| distance_ratio = (optimal_start - total_length) / (optimal_start - min_len) |
| else: |
| |
| distance_ratio = (total_length - optimal_end) / (max_len - optimal_end) |
|
|
| |
| length_multiplier = 0.3 + 1.2 * (1.0 - distance_ratio) |
|
|
| final_score = base_weight * length_multiplier |
| frequency_scores.append(final_score) |
|
|
| |
| if not valid_frequencies: |
| |
| if total_length <= 100: |
| |
| fallback_order = [ |
| Frequency.A, |
| Frequency.Q, |
| Frequency.M, |
| Frequency.W, |
| Frequency.D, |
| ] |
| elif total_length <= 1000: |
| |
| fallback_order = [Frequency.D, Frequency.W, Frequency.H, Frequency.M] |
| else: |
| |
| fallback_order = [Frequency.H, Frequency.D, Frequency.T15, Frequency.T5] |
|
|
| for fallback_freq in fallback_order: |
| max_allowed = fallback_freq.get_max_safe_length() |
| if total_length <= max_allowed: |
| return fallback_freq |
| |
| return Frequency.D |
|
|
| if len(valid_frequencies) == 1: |
| return valid_frequencies[0] |
|
|
| |
| scores = np.array(frequency_scores) |
| probabilities = scores / scores.sum() |
|
|
| return rng.choice(valid_frequencies, p=probabilities) |
|
|
|
|
| def select_safe_start_date( |
| total_length: int, |
| frequency: Frequency, |
| rng: Generator | None = None, |
| max_retries: int = 10, |
| ) -> np.datetime64: |
| """ |
| Select a safe start date that ensures the entire time series (history + future) |
| will not exceed pandas' datetime bounds. |
| |
| Args: |
| total_length: Total length of the time series (history + future) |
| frequency: Time series frequency |
| rng: Random number generator instance |
| max_retries: Maximum number of retry attempts |
| |
| Returns: |
| A safe start date that prevents timestamp overflow |
| |
| Raises: |
| ValueError: If no safe start date is found after max_retries or if the required |
| time span exceeds the available date window |
| """ |
| if rng is None: |
| rng = np.random.default_rng() |
|
|
| days_per_period = frequency.get_days_per_period() |
|
|
| |
| total_days = total_length * days_per_period |
|
|
| |
| latest_safe_start = BASE_END_DATE - np.timedelta64(int(total_days), "D") |
| earliest_safe_start = BASE_START_DATE |
|
|
| |
| if latest_safe_start < earliest_safe_start: |
| available_days = (BASE_END_DATE - BASE_START_DATE).astype("timedelta64[D]").astype(int) |
| available_years = available_days / 365.25 |
| required_years = total_days / 365.25 |
| raise ValueError( |
| f"Required time span ({required_years:.1f} years, {total_days:.0f} days) " |
| f"exceeds available date window ({available_years:.1f} years, {available_days} days). " |
| f"Reduce total_length ({total_length}) or extend the date window." |
| ) |
|
|
| |
| earliest_ns = earliest_safe_start.astype("datetime64[ns]").astype(np.int64) |
| latest_ns = latest_safe_start.astype("datetime64[ns]").astype(np.int64) |
|
|
| for _ in range(max_retries): |
| |
| random_ns = rng.integers(earliest_ns, latest_ns + 1) |
| start_date = np.datetime64(int(random_ns), "ns") |
|
|
| |
| if validate_frequency_safety(start_date, total_length, frequency): |
| return start_date |
|
|
| |
| return BASE_START_DATE |
|
|