FinanceAuger / prediction_models.py
therickglenn's picture
Create prediction_models.py
05bd5ca verified
raw
history blame
4.89 kB
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.tsa.arima.model import ARIMA
from scipy.stats import norm
class MarketPredictor:
def __init__(self):
self.scaler = MinMaxScaler(feature_range=(0, 1))
self.rf_model = None
self.exp_model = None
self.arima_model = None
def prepare_data(self, data, lookback=60):
"""Prepare data for Random Forest model"""
scaled_data = self.scaler.fit_transform(data.values.reshape(-1, 1))
X, y = [], []
for i in range(lookback, len(scaled_data)):
X.append(scaled_data[i-lookback:i, 0])
y.append(scaled_data[i, 0])
return np.array(X), np.array(y)
def train_rf(self, data, lookback=60):
"""Train Random Forest model"""
X, y = self.prepare_data(data, lookback)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
self.rf_model = RandomForestRegressor(n_estimators=100, random_state=42)
self.rf_model.fit(X_train, y_train)
return self.rf_model.score(X_test, y_test)
def predict_rf(self, data, days_ahead=30):
"""Generate Random Forest predictions"""
last_sequence = data[-60:].values.reshape(-1, 1)
last_sequence = self.scaler.transform(last_sequence)
predictions = []
current_sequence = last_sequence.copy()
for _ in range(days_ahead):
current_features = current_sequence[-60:].reshape(1, -1)
predicted_value = self.rf_model.predict(current_features)
predictions.append(predicted_value[0])
current_sequence = np.append(current_sequence, predicted_value)
predictions = self.scaler.inverse_transform(np.array(predictions).reshape(-1, 1))
return predictions
def train_exp(self, data):
"""Train Exponential Smoothing model"""
self.exp_model = ExponentialSmoothing(
data,
seasonal_periods=5,
trend='add',
seasonal='add',
).fit()
return self.exp_model
def predict_exp(self, days_ahead=30):
"""Generate Exponential Smoothing predictions"""
forecast = self.exp_model.forecast(days_ahead)
return forecast
def calculate_var(self, data, confidence_level=0.95):
"""Calculate Value at Risk"""
returns = data.pct_change().dropna()
var = norm.ppf(1-confidence_level) * returns.std()
return var
def monte_carlo_simulation(self, data, n_simulations=1000, days_ahead=30):
"""Perform Monte Carlo simulation"""
returns = data.pct_change().dropna()
mu = returns.mean()
sigma = returns.std()
simulations = np.zeros((days_ahead, n_simulations))
last_price = data.iloc[-1]
for sim in range(n_simulations):
prices = [last_price]
for day in range(days_ahead):
price = prices[-1] * (1 + np.random.normal(mu, sigma))
prices.append(price)
simulations[:, sim] = prices[1:]
return pd.DataFrame(simulations, columns=[f'sim_{i}' for i in range(n_simulations)])
def detect_patterns(self, data):
"""Detect basic chart patterns"""
patterns = {}
close = data['Close']
# Moving averages for trend detection
sma20 = close.rolling(window=20).mean()
sma50 = close.rolling(window=50).mean()
# Detect trend changes
patterns['uptrend'] = sma20 > sma50
patterns['downtrend'] = sma20 < sma50
# Support/Resistance levels
patterns['support'] = close.rolling(window=20).min()
patterns['resistance'] = close.rolling(window=20).max()
return patterns
def predict_breakouts(self, data, window=20):
"""Predict potential breakout points using Bollinger Bands logic"""
close = data['Close']
bb = pd.DataFrame()
# Calculate Bollinger Bands manually
bb['middle'] = close.rolling(window=window).mean()
std = close.rolling(window=window).std()
bb['upper'] = bb['middle'] + (std * 2)
bb['lower'] = bb['middle'] - (std * 2)
# Calculate distance from bounds
bb['distance_upper'] = (bb['upper'] - data['Close']) / data['Close']
bb['distance_lower'] = (data['Close'] - bb['lower']) / data['Close']
# Identify potential breakout points
bb['breakout_probability'] = 1 - (bb['distance_upper'] + bb['distance_lower'])
return bb