Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| from sklearn.preprocessing import MinMaxScaler | |
| from sklearn.ensemble import RandomForestRegressor | |
| from sklearn.model_selection import train_test_split | |
| from statsmodels.tsa.holtwinters import ExponentialSmoothing | |
| from statsmodels.tsa.arima.model import ARIMA | |
| from scipy.stats import norm | |
| class MarketPredictor: | |
| def __init__(self): | |
| self.scaler = MinMaxScaler(feature_range=(0, 1)) | |
| self.rf_model = None | |
| self.exp_model = None | |
| self.arima_model = None | |
| def prepare_data(self, data, lookback=60): | |
| """Prepare data for Random Forest model""" | |
| scaled_data = self.scaler.fit_transform(data.values.reshape(-1, 1)) | |
| X, y = [], [] | |
| for i in range(lookback, len(scaled_data)): | |
| X.append(scaled_data[i-lookback:i, 0]) | |
| y.append(scaled_data[i, 0]) | |
| return np.array(X), np.array(y) | |
| def train_rf(self, data, lookback=60): | |
| """Train Random Forest model""" | |
| X, y = self.prepare_data(data, lookback) | |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) | |
| self.rf_model = RandomForestRegressor(n_estimators=100, random_state=42) | |
| self.rf_model.fit(X_train, y_train) | |
| return self.rf_model.score(X_test, y_test) | |
| def predict_rf(self, data, days_ahead=30): | |
| """Generate Random Forest predictions""" | |
| last_sequence = data[-60:].values.reshape(-1, 1) | |
| last_sequence = self.scaler.transform(last_sequence) | |
| predictions = [] | |
| current_sequence = last_sequence.copy() | |
| for _ in range(days_ahead): | |
| current_features = current_sequence[-60:].reshape(1, -1) | |
| predicted_value = self.rf_model.predict(current_features) | |
| predictions.append(predicted_value[0]) | |
| current_sequence = np.append(current_sequence, predicted_value) | |
| predictions = self.scaler.inverse_transform(np.array(predictions).reshape(-1, 1)) | |
| return predictions | |
| def train_exp(self, data): | |
| """Train Exponential Smoothing model""" | |
| self.exp_model = ExponentialSmoothing( | |
| data, | |
| seasonal_periods=5, | |
| trend='add', | |
| seasonal='add', | |
| ).fit() | |
| return self.exp_model | |
| def predict_exp(self, days_ahead=30): | |
| """Generate Exponential Smoothing predictions""" | |
| forecast = self.exp_model.forecast(days_ahead) | |
| return forecast | |
| def calculate_var(self, data, confidence_level=0.95): | |
| """Calculate Value at Risk""" | |
| returns = data.pct_change().dropna() | |
| var = norm.ppf(1-confidence_level) * returns.std() | |
| return var | |
| def monte_carlo_simulation(self, data, n_simulations=1000, days_ahead=30): | |
| """Perform Monte Carlo simulation""" | |
| returns = data.pct_change().dropna() | |
| mu = returns.mean() | |
| sigma = returns.std() | |
| simulations = np.zeros((days_ahead, n_simulations)) | |
| last_price = data.iloc[-1] | |
| for sim in range(n_simulations): | |
| prices = [last_price] | |
| for day in range(days_ahead): | |
| price = prices[-1] * (1 + np.random.normal(mu, sigma)) | |
| prices.append(price) | |
| simulations[:, sim] = prices[1:] | |
| return pd.DataFrame(simulations, columns=[f'sim_{i}' for i in range(n_simulations)]) | |
| def detect_patterns(self, data): | |
| """Detect basic chart patterns""" | |
| patterns = {} | |
| close = data['Close'] | |
| # Moving averages for trend detection | |
| sma20 = close.rolling(window=20).mean() | |
| sma50 = close.rolling(window=50).mean() | |
| # Detect trend changes | |
| patterns['uptrend'] = sma20 > sma50 | |
| patterns['downtrend'] = sma20 < sma50 | |
| # Support/Resistance levels | |
| patterns['support'] = close.rolling(window=20).min() | |
| patterns['resistance'] = close.rolling(window=20).max() | |
| return patterns | |
| def predict_breakouts(self, data, window=20): | |
| """Predict potential breakout points using Bollinger Bands logic""" | |
| close = data['Close'] | |
| bb = pd.DataFrame() | |
| # Calculate Bollinger Bands manually | |
| bb['middle'] = close.rolling(window=window).mean() | |
| std = close.rolling(window=window).std() | |
| bb['upper'] = bb['middle'] + (std * 2) | |
| bb['lower'] = bb['middle'] - (std * 2) | |
| # Calculate distance from bounds | |
| bb['distance_upper'] = (bb['upper'] - data['Close']) / data['Close'] | |
| bb['distance_lower'] = (data['Close'] - bb['lower']) / data['Close'] | |
| # Identify potential breakout points | |
| bb['breakout_probability'] = 1 - (bb['distance_upper'] + bb['distance_lower']) | |
| return bb | |