import numpy as np import pandas as pd from sklearn.preprocessing import MinMaxScaler from sklearn.ensemble import RandomForestRegressor from sklearn.model_selection import train_test_split from statsmodels.tsa.holtwinters import ExponentialSmoothing from statsmodels.tsa.arima.model import ARIMA from scipy.stats import norm class MarketPredictor: def __init__(self): self.scaler = MinMaxScaler(feature_range=(0, 1)) self.rf_model = None self.exp_model = None self.arima_model = None def prepare_data(self, data, lookback=60): """Prepare data for Random Forest model""" scaled_data = self.scaler.fit_transform(data.values.reshape(-1, 1)) X, y = [], [] for i in range(lookback, len(scaled_data)): X.append(scaled_data[i-lookback:i, 0]) y.append(scaled_data[i, 0]) return np.array(X), np.array(y) def train_rf(self, data, lookback=60): """Train Random Forest model""" X, y = self.prepare_data(data, lookback) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) self.rf_model = RandomForestRegressor(n_estimators=100, random_state=42) self.rf_model.fit(X_train, y_train) return self.rf_model.score(X_test, y_test) def predict_rf(self, data, days_ahead=30): """Generate Random Forest predictions""" last_sequence = data[-60:].values.reshape(-1, 1) last_sequence = self.scaler.transform(last_sequence) predictions = [] current_sequence = last_sequence.copy() for _ in range(days_ahead): current_features = current_sequence[-60:].reshape(1, -1) predicted_value = self.rf_model.predict(current_features) predictions.append(predicted_value[0]) current_sequence = np.append(current_sequence, predicted_value) predictions = self.scaler.inverse_transform(np.array(predictions).reshape(-1, 1)) return predictions def train_exp(self, data): """Train Exponential Smoothing model""" self.exp_model = ExponentialSmoothing( data, seasonal_periods=5, trend='add', seasonal='add', ).fit() return self.exp_model def predict_exp(self, days_ahead=30): """Generate Exponential Smoothing predictions""" forecast = self.exp_model.forecast(days_ahead) return forecast def calculate_var(self, data, confidence_level=0.95): """Calculate Value at Risk""" returns = data.pct_change().dropna() var = norm.ppf(1-confidence_level) * returns.std() return var def monte_carlo_simulation(self, data, n_simulations=1000, days_ahead=30): """Perform Monte Carlo simulation""" returns = data.pct_change().dropna() mu = returns.mean() sigma = returns.std() simulations = np.zeros((days_ahead, n_simulations)) last_price = data.iloc[-1] for sim in range(n_simulations): prices = [last_price] for day in range(days_ahead): price = prices[-1] * (1 + np.random.normal(mu, sigma)) prices.append(price) simulations[:, sim] = prices[1:] return pd.DataFrame(simulations, columns=[f'sim_{i}' for i in range(n_simulations)]) def detect_patterns(self, data): """Detect basic chart patterns""" patterns = {} close = data['Close'] # Moving averages for trend detection sma20 = close.rolling(window=20).mean() sma50 = close.rolling(window=50).mean() # Detect trend changes patterns['uptrend'] = sma20 > sma50 patterns['downtrend'] = sma20 < sma50 # Support/Resistance levels patterns['support'] = close.rolling(window=20).min() patterns['resistance'] = close.rolling(window=20).max() return patterns def predict_breakouts(self, data, window=20): """Predict potential breakout points using Bollinger Bands logic""" close = data['Close'] bb = pd.DataFrame() # Calculate Bollinger Bands manually bb['middle'] = close.rolling(window=window).mean() std = close.rolling(window=window).std() bb['upper'] = bb['middle'] + (std * 2) bb['lower'] = bb['middle'] - (std * 2) # Calculate distance from bounds bb['distance_upper'] = (bb['upper'] - data['Close']) / data['Close'] bb['distance_lower'] = (data['Close'] - bb['lower']) / data['Close'] # Identify potential breakout points bb['breakout_probability'] = 1 - (bb['distance_upper'] + bb['distance_lower']) return bb