SVR_Predict_Stocks / src /data_processing.py
Reality8081's picture
Update src
35beba6
import pandas as pd
import numpy as np
import yfinance as yf
import warnings
warnings.filterwarnings('ignore')
SMA_WINDOWS = [5, 10, 20, 50, 100]
EMA_WINDOWS = [5, 10, 20, 50]
RSI_WINDOWS = [7, 14, 21]
BB_WINDOWS = [10, 20, 50]
ATR_WINDOWS = [14, 20]
VOL_WINDOWS = [20, 50]
LAGS = 3
def load_data(symbols, market_symbol, start_date, end_date):
print("Downloading data for AAPL and market index (auto_adjust=True)...")
df_market = yf.download(market_symbol, start=start_date, end=end_date, auto_adjust=True, progress=False)
if isinstance(df_market.columns, pd.MultiIndex):
df_market.columns = df_market.columns.droplevel(1)
df_market = df_market.reset_index()[['Date', 'Close']].rename(columns={'Close': 'Market_Close'})
dfs = []
for symbol in symbols:
df = yf.download(symbol, start=start_date, end=end_date, auto_adjust=True, progress=False)
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.droplevel(1)
df = df.reset_index()[['Date', 'Open', 'High', 'Low', 'Close', 'Volume']]
df['Ticker'] = symbol
df = pd.merge(df, df_market, on='Date', how='left')
dfs.append(df)
df = pd.concat(dfs, ignore_index = True)
df = df.sort_values(['Ticker', 'Date']).reset_index(drop=True)
print(f"Loaded raw panel data: {len(df)} rows | {len(symbols)} tickers | "
f"from {df['Date'].min().date()} to {df['Date'].max().date()}")
return df
def clean_data(df):
cleaned_dfs = []
for ticker, group in df.groupby('Ticker'):
group = group.set_index('Date').sort_index()
start_dt = group.index.min()
end_dt = group.index.max()
all_business_days = pd.date_range(start=start_dt, end=end_dt, freq="B")
group = group.reindex(all_business_days)
group = group.ffill()
group = group.reset_index().rename(columns={'index': 'Date'})
group['Ticker'] = ticker
cleaned_dfs.append(group)
df_cleaned = pd.concat(cleaned_dfs, ignore_index = True)
df_cleaned = df_cleaned.sort_values(['Ticker', 'Date']).reset_index(drop=True)
print(f"Data cleaned: {len(df_cleaned)} rows | "
f"from {df_cleaned['Date'].min().date()} to {df_cleaned['Date'].max().date()}")
return df
def validate_data(df, stage="pre_feature"):
print(f"Validating data at stage: {stage}...")
num_cols = df.select_dtypes(include=[np.number]).columns
nan_count = df[num_cols].isna().sum().sum()
inf_count = np.isinf(df[num_cols]).sum().sum()
if nan_count > 0:
print(f"WARNING: Tìm thấy {nan_count} NaN values tại stage {stage}")
if inf_count > 0:
print(f"WARNING: Tìm thấy {inf_count} Inf values tại stage {stage}")
if 'Date' in df.columns and 'Market_Return' in df.columns:
market_std_per_date = df.groupby('Date')['Market_Return'].std(ddof=0).max()
if pd.notna(market_std_per_date) and market_std_per_date > 1e-8:
print(f"WARNING: Cross-ticker contamination detected! "
f"Max std of Market_Return per date: {market_std_per_date:.2e}")
# Kiểm tra nhanh variance của returns (nên > 0)
if 'Daily_Return' in df.columns:
for ticker, grp in df.groupby('Ticker'):
if len(grp) > 1 and grp['Daily_Return'].std(ddof=0) == 0:
print(f"WARNING: Ticker {ticker} has zero variance in Daily_Return!")
print(f"Validation passed at {stage} (no critical issues).")
return df
def generate_technical_features(df, is_inference=False, target_horizon=1):
"""
Feature Engineering hoàn toàn mới theo 5 yêu cầu:
1. Corporate actions đã được xử lý ở load_data (auto_adjust=True)
2. TẤT CẢ features được chuyển sang dạng stationary (ratio, pct distance, normalized, position 0-1)
3. Multi-timeframe: nhiều windows để Linear_Regression tự chọn tín hiệu mạnh
4. Market Regime & Volatility: ATR normalized + rolling volatility
5. Gọi validate_data ngay trước khi return
"""
data = df.copy()
def add_features(group):
g = group.copy()
# === 1. BASIC RETURNS (luôn stationary) ===
g['Daily_Return'] = g['Close'].pct_change()
g['Log_Return'] = np.log(1 + g['Daily_Return'])
g['Market_Return'] = g['Market_Close'].pct_change()
g['Market_Log_Return'] = np.log(1 + g['Market_Return'])
# === 2. LAGGED FEATURES – CHỈ lag returns (KHÔNG lag Close raw) ===
# Lý do: Close raw và SMA raw là non-stationary → Linear_Regression sẽ học nhầm trend dài hạn thay vì pattern thực sự.
for i in range(1, LAGS + 1):
g[f'Return_Lag_{i}'] = g['Daily_Return'].shift(i)
g[f'Market_Return_Lag_{i}'] = g['Market_Return'].shift(i)
# === 3. MULTI-TIMEFRAME TECHNICAL INDICATORS (Stationary version) ===
# SMA & EMA → Ratio + % Distance (thay vì giá trị tuyệt đối)
for w in SMA_WINDOWS:
sma = g['Close'].rolling(window=w).mean()
g[f'SMA_{w}_Ratio'] = g['Close'] / sma
g[f'SMA_{w}_Distance_pct'] = (g['Close'] - sma) / sma * 100 # % distance từ giá đến SMA
for w in EMA_WINDOWS:
ema = g['Close'].ewm(span=w, adjust=False).mean()
g[f'EMA_{w}_Ratio'] = g['Close'] / ema
g[f'EMA_{w}_Distance_pct'] = (g['Close'] - ema) / ema * 100
# RSI multi-window (đã stationary tự nhiên 0-100)
for w in RSI_WINDOWS:
delta = g['Close'].diff()
gain = delta.where(delta > 0, 0).rolling(w).mean()
loss = -delta.where(delta < 0, 0).rolling(w).mean()
rs = gain / loss
g[f'RSI_{w}'] = 100 - (100 / (1 + rs))
# MACD: giữ cấu trúc gốc nhưng normalize Hist theo % giá (stationary)
ema_fast = g['Close'].ewm(span=12, adjust=False).mean()
ema_slow = g['Close'].ewm(span=26, adjust=False).mean()
g['MACD_Line'] = ema_fast - ema_slow
g['MACD_Signal'] = g['MACD_Line'].ewm(span=9, adjust=False).mean()
g['MACD_Hist'] = g['MACD_Line'] - g['MACD_Signal']
g['MACD_Hist_Normalized'] = g['MACD_Hist'] / g['Close'] * 100 # % của giá → stationary
# Bollinger Bands: Width % + Position (0-1) thay vì Upper/Lower tuyệt đối
for w in BB_WINDOWS:
middle = g['Close'].rolling(w).mean()
std_dev = g['Close'].rolling(w).std()
upper = middle + 2 * std_dev
lower = middle - 2 * std_dev
bb_range = upper - lower
g[f'BB_Width_{w}_pct'] = (bb_range / middle * 100) # % width (stationary)
g[f'BB_Position_{w}'] = (g['Close'] - lower) / bb_range.where(bb_range > 0, 1) # 0-1 position
# === 4. VOLATILITY & MARKET REGIME FEATURES ===
# True Range & ATR normalized
def calculate_true_range(high, low, close):
tr1 = high - low
tr2 = abs(high - close.shift(1))
tr3 = abs(low - close.shift(1))
return pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
tr = calculate_true_range(g['High'], g['Low'], g['Close'])
for w in ATR_WINDOWS:
atr = tr.rolling(w).mean()
g[f'ATR_{w}'] = atr
g[f'ATR_Normalized_{w}'] = atr / g['Close'] # Relative volatility → stationary
# Rolling volatility (market regime detection)
for w in VOL_WINDOWS:
g[f'Market_Rolling_Vol_{w}'] = g['Market_Return'].rolling(w).std()
g[f'AAPL_Rolling_Vol_{w}'] = g['Daily_Return'].rolling(w).std()
# Relative volume
g['Rel_Volume_20'] = g['Volume'] / g['Volume'].rolling(20).mean()
return g
# Xóa NaN (do rolling + lag)
data_list = [add_features(group) for _, group in data.groupby('Ticker')]
data = pd.concat(data_list, ignore_index=True)
if not is_inference:
data['Target_Return'] = data.groupby('Ticker')['Close'].shift(-target_horizon) / data['Close'] - 1
data = data.dropna().reset_index(drop=True)
# === 5. DATA VALIDATION TRƯỚC KHI TRẢ VỀ ===
data = validate_data(data, f"post_feature_engineering_h{target_horizon}")
df_backtest = data.copy()
drop_cols = ['Date', 'Ticker', 'Market_Close', 'Target_Return']
X = data.drop(columns=drop_cols, errors='ignore')
y = data['Target_Return'].copy()
print(f"Generated data for Horizon {target_horizon} days:\n"
f" • Total rows: {len(data)} | Tickers: {data['Ticker'].nunique()}\n"
f" • Features: {X.shape[1]} | X shape: {X.shape} | y shape: {y.shape}")
return df_backtest, X, y
else:
# Nếu là predict, dòng cuối cùng của mỗi ticker sẽ chứa feature đầy đủ và không bị loại bỏ do thiếu target
data = data.dropna().reset_index(drop=True)
X = data.drop(columns=['Date', 'Ticker', 'Market_Close'], errors='ignore')
return data, X, None