import os import yaml import yfinance as yf import pandas as pd import streamlit as st import plotly.express as px import plotly.graph_objects as go from ta.volatility import BollingerBands from ta.trend import SMAIndicator, EMAIndicator, MACD from ta.momentum import RSIIndicator from ta.volume import VolumeWeightedAveragePrice from sklearn.ensemble import RandomForestRegressor from statsmodels.tsa.holtwinters import ExponentialSmoothing import numpy as np from datetime import date, timedelta # ----------------------------- # Inlined Custom Module: MarketPredictor # ----------------------------- class MarketPredictor: def __init__(self): self.rf_model = None self.exp_model = None def train_rf(self, series): X = np.arange(len(series)).reshape(-1, 1) y = series.values self.rf_model = RandomForestRegressor( n_estimators=100, random_state=42) self.rf_model.fit(X, y) def predict_rf(self, series, days_ahead=30): if self.rf_model is None: raise ValueError("Random Forest model is not trained.") last_index = len(series) future_X = np.arange(last_index, last_index + days_ahead).reshape(-1, 1) predictions = self.rf_model.predict(future_X) return predictions def train_exp(self, series): self.exp_model = ExponentialSmoothing(series, trend='add', seasonal=None, initialization_method='estimated').fit() def predict_exp(self, days_ahead=30): if self.exp_model is None: raise ValueError("Exponential Smoothing model is not trained.") predictions = self.exp_model.forecast(days_ahead) return predictions def monte_carlo_simulation(self, series, days_ahead=30, num_simulations=100): returns = series.pct_change().dropna() last_price = series.iloc[-1] simulation_df = pd.DataFrame() for sim in range(num_simulations): prices = [last_price] for day in range(days_ahead): shock = np.random.normal( loc=returns.mean(), scale=returns.std()) price = prices[-1] * (1 + shock) prices.append(price) simulation_df[sim] = prices[1:] return simulation_df def calculate_var(self, series, confidence_level=0.95): returns = series.pct_change().dropna() var = np.percentile(returns, (1 - confidence_level) * 100) return var def detect_patterns(self, ohlc_df): patterns = {} if all(col in ohlc_df.columns for col in ['Open', 'High', 'Low', 'Close']): bullish_engulfing = (ohlc_df['Close'] > ohlc_df['Open']) & ( ohlc_df['Open'].shift() > ohlc_df['Close'].shift()) patterns['Bullish Engulfing'] = bullish_engulfing return patterns def predict_breakouts(self, ohlc_df): if 'High' in ohlc_df.columns: ohlc_df['SMA'] = ohlc_df['High'].rolling(window=20).mean() ohlc_df['breakout_probability'] = np.where( ohlc_df['High'] > ohlc_df['SMA'] * 1.05, 0.8, 0.2) return ohlc_df[['breakout_probability']] # Using Predictives and Risk Features: # # - Predictives: Leverage our predictive modules to gain insights into trends, market changes, and forecast potential outcomes. # Navigate to the Predictives section, choose your parameters, and view generated predictions. # # - Risk Features: Utilize our risk tools for detailed analysis of potential risk factors. # Input your criteria to run risk simulations, and review the results through comprehensive charts and summaries. # ----------------------------- # Configuration Loader Function # ----------------------------- def load_yaml(filepath): """ Loads a YAML file and returns the parsed configuration. """ try: with open(filepath, 'r') as file: config = yaml.safe_load(file) return config except Exception as e: st.error(f"Error loading YAML file {filepath}: {e}") raise # ----------------------------- # Data Download Function # ----------------------------- def download_data(tickers, start_date, end_date, interval, output_dir): """ Download historical data for each ticker using yfinance and save as CSV. Parameters: tickers (list): List of ticker symbols. start_date (str): Start date in 'YYYY-MM-DD' format. end_date (str): End date in 'YYYY-MM-DD' format. interval (str): Data interval (e.g., '1d', '1wk', '1mo'). output_dir (str): Directory where CSV files will be saved. """ downloaded = [] os.makedirs(output_dir, exist_ok=True) progress_bar = st.progress(0) for i, ticker in enumerate(tickers): try: stock = yf.Ticker(ticker) data = stock.history( start=start_date, end=end_date, interval=interval) if not data.empty: output_file = os.path.join(output_dir, f"{ticker}.csv") data.to_csv(output_file) downloaded.append(ticker) except Exception: pass progress_bar.progress((i + 1) / len(tickers)) progress_bar.empty() return downloaded # ----------------------------- # Load Ticker Data Function # ----------------------------- @st.cache_data(show_spinner=False) def load_ticker_data(data_dir, ticker): """Load CSV data for a given ticker.""" file_path = os.path.join(data_dir, f"{ticker}.csv") if os.path.exists(file_path): try: data = pd.read_csv(file_path, parse_dates=True, index_col='Date') return data except Exception as e: st.error(f"Error loading {ticker} data: {e}") else: st.warning(f"Data file for {ticker} not found in {data_dir}.") return None # ----------------------------- # Main Dashboard Function # ----------------------------- def main(): st.set_page_config(layout="wide") st.title("πŸ“Š Market Data Simulation and Prediction Dashboard ") # Define configuration file paths and load configurations market_config_path = os.path.join(".", "market_config.yml") project_config_path = os.path.join(".", "project_config.yml") try: market_config = load_yaml(market_config_path) project_config = load_yaml(project_config_path) if market_config is None or project_config is None: st.error( "Configuration files are invalid. Please check market_config.yml and project_config.yml") return except Exception as e: st.error("Could not load configuration files.") return # Setup output directory output_dir = project_config.get('project', {}).get('directory', 'data') os.makedirs(output_dir, exist_ok=True) # ----------------------------- # Sidebar Controls # ----------------------------- with st.sidebar: st.header("Controls") # Exclude the "test" group # Direct Ticker Input ticker_input = st.text_input('Enter Ticker Symbol (e.g., AAPL):') # Optional Asset Group Selection use_groups = st.checkbox('Or select from predefined groups') selected_tickers = [] if ticker_input: selected_tickers.append(ticker_input.upper().strip()) if use_groups: available_groups = [grp for grp in market_config.get( 'groups', {}).keys() if grp.lower() != "test"] selected_groups = st.multiselect( "Select Asset Groups", available_groups) # Build the union of tickers from the selected groups group_tickers = [] for group in selected_groups: group_tickers.extend(market_config['groups'][group]) group_tickers = sorted(set(group_tickers)) if group_tickers: group_selected = st.multiselect( "Select Additional Tickers from Groups:", group_tickers) selected_tickers.extend(group_selected) # Date Range Selection default_end = date.today() default_start = default_end - timedelta(days=500) date_range = st.date_input("Select Date Range", [ default_start, default_end]) if isinstance(date_range, list) and len(date_range) == 2: start_date = date_range[0] end_date = date_range[1] else: start_date = default_start end_date = default_end start_str = start_date.strftime("%Y-%m-%d") end_str = end_date.strftime("%Y-%m-%d") # Date Interval Selection date_interval_option = st.selectbox("Select Data Interval", options=[ "Day", "Week", "Month", "Year"], index=0) interval_mapping = {"Day": "1d", "Week": "1wk", "Month": "1mo", "Year": "1d"} selected_interval = interval_mapping[date_interval_option] # ----------------------------- # Main Content Area # ----------------------------- if selected_tickers: # Check which tickers need downloading using the selected date range and interval files = set(f.split('.')[0] for f in os.listdir( output_dir) if f.endswith('.csv')) tickers_to_download = [t for t in selected_tickers if t not in files] if tickers_to_download: download_status = st.empty() progress_bar = st.progress(0) for idx, ticker in enumerate(tickers_to_download, 1): download_status.text(f"Downloading {ticker}...") try: download_data([ticker], start_str, end_str, selected_interval, output_dir) progress_bar.progress(idx / len(tickers_to_download)) except Exception as e: st.warning(f"Failed to download {ticker}: {str(e)}") download_status.empty() progress_bar.empty() # Combine data from all selected tickers for plotting combined_data = None for ticker in selected_tickers: data = load_ticker_data(output_dir, ticker) if data is not None: if 'Date' in data.columns: data['Date'] = pd.to_datetime(data['Date'], utc=True) data.set_index('Date', inplace=True) else: data.index = pd.to_datetime(data.index, utc=True) # If "Year" interval is selected, resample data to yearly frequency if date_interval_option == "Year": data = data.resample('Y').last() if 'Close' in data.columns: data = data.rename(columns={'Close': ticker}) col_data = data[[ticker]] if combined_data is None: combined_data = col_data else: combined_data = combined_data.join( col_data, how='outer') if combined_data is not None: # Create tabs for Analysis and Predictions analysis_tab, prediction_tab = st.tabs( ["Technical Analysis", "Predictions & Risk"]) # ----------------------------- # Technical Analysis Tab # ----------------------------- with analysis_tab: st.write("Technical Analysis Tools:") col1, col2, col3, col4 = st.columns(4) with col1: show_bb = st.checkbox('Bollinger Bands') show_rsi = st.checkbox('RSI') with col2: show_sma = st.checkbox('SMA') sma_period = st.number_input( 'SMA Period', min_value=1, value=50, max_value=200) if show_sma else 50 with col3: show_ema = st.checkbox('EMA') ema_period = st.number_input( 'EMA Period', min_value=1, value=20, max_value=200) if show_ema else 20 with col4: show_vwap = st.checkbox('VWAP') show_macd = st.checkbox('MACD') fig = go.Figure() for ticker in combined_data.columns: fig.add_trace(go.Scatter( x=combined_data.index, y=combined_data[ticker], name=ticker, mode='lines' )) if show_bb: bb = BollingerBands( close=combined_data[ticker], window=20, window_dev=2) fig.add_trace(go.Scatter( x=combined_data.index, y=bb.bollinger_hband(), name=f'{ticker} BB Upper', line=dict(dash='dash'), opacity=0.7 )) fig.add_trace(go.Scatter( x=combined_data.index, y=bb.bollinger_lband(), name=f'{ticker} BB Lower', line=dict(dash='dash'), opacity=0.7 )) fig.add_trace(go.Scatter( x=combined_data.index, y=bb.bollinger_mavg(), name=f'{ticker} BB MA', line=dict(dash='dash'), opacity=0.7 )) if show_sma: sma = SMAIndicator( close=combined_data[ticker], window=sma_period) fig.add_trace(go.Scatter( x=combined_data.index, y=sma.sma_indicator(), name=f'{ticker} SMA{sma_period}', line=dict(dash='dot') )) if show_ema: ema = EMAIndicator( close=combined_data[ticker], window=ema_period) fig.add_trace(go.Scatter( x=combined_data.index, y=ema.ema_indicator(), name=f'{ticker} EMA{ema_period}', line=dict(dash='dot') )) if show_vwap: data_for_vwap = load_ticker_data(output_dir, ticker) if data_for_vwap is not None and 'Volume' in data_for_vwap.columns: vwap = VolumeWeightedAveragePrice( high=data_for_vwap['High'], low=data_for_vwap['Low'], close=data_for_vwap['Close'], volume=data_for_vwap['Volume'], window=14 ) fig.add_trace(go.Scatter( x=combined_data.index, y=vwap.volume_weighted_average_price(), name=f'{ticker} VWAP', line=dict(dash='dashdot') )) if show_macd: macd = MACD(close=combined_data[ticker]) fig.add_trace(go.Scatter( x=combined_data.index, y=macd.macd(), name=f'{ticker} MACD', line=dict(dash='dot') )) fig.add_trace(go.Scatter( x=combined_data.index, y=macd.macd_signal(), name=f'{ticker} Signal', line=dict(dash='dot') )) fig.update_layout( height=800, showlegend=True, legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01), margin=dict(l=40, r=40, t=40, b=40), hovermode='x unified' ) st.plotly_chart(fig, use_container_width=True) # ----------------------------- # Predictions & Risk Analysis Tab # ----------------------------- with prediction_tab: st.write("Prediction & Risk Analysis Tools:") pred_col1, pred_col2 = st.columns(2) with pred_col1: prediction_days = st.slider("Prediction Days", 5, 60, 30) show_rf = st.checkbox("Random Forest Prediction") show_exp = st.checkbox("Exponential Smoothing") show_monte_carlo = st.checkbox("Monte Carlo Simulation") with pred_col2: show_var = st.checkbox("Value at Risk") show_patterns = st.checkbox("Pattern Detection") show_breakouts = st.checkbox("Breakout Prediction") if any([show_rf, show_exp, show_monte_carlo, show_var, show_patterns, show_breakouts]): predictor = MarketPredictor() pred_fig = go.Figure() for ticker in combined_data.columns: pred_fig.add_trace(go.Scatter( x=combined_data.index, y=combined_data[ticker], name=f"{ticker} (Actual)", mode='lines' )) if show_rf: predictor.train_rf(combined_data[ticker]) rf_pred = predictor.predict_rf( combined_data[ticker], days_ahead=prediction_days) future_dates = pd.date_range( start=combined_data.index[-1], periods=prediction_days+1 )[1:] pred_fig.add_trace(go.Scatter( x=future_dates, y=rf_pred.flatten(), name=f"{ticker} (Random Forest)", line=dict(dash='dash') )) if show_exp: predictor.train_exp(combined_data[ticker]) exp_pred = predictor.predict_exp( days_ahead=prediction_days) future_dates = pd.date_range( start=combined_data.index[-1], periods=prediction_days+1 )[1:] pred_fig.add_trace(go.Scatter( x=future_dates, y=exp_pred, name=f"{ticker} (Exp Smoothing)", line=dict(dash='dot') )) if show_monte_carlo: mc_sims = predictor.monte_carlo_simulation( combined_data[ticker], days_ahead=prediction_days) future_dates = pd.date_range( start=combined_data.index[-1], periods=prediction_days+1 )[1:] upper = mc_sims.quantile(0.95, axis=1) lower = mc_sims.quantile(0.05, axis=1) pred_fig.add_trace(go.Scatter( x=future_dates, y=upper, fill=None, mode='lines', line_color='rgba(0,100,80,0.2)', name=f"{ticker} MC (95% Upper)" )) pred_fig.add_trace(go.Scatter( x=future_dates, y=lower, fill='tonexty', mode='lines', line_color='rgba(0,100,80,0.2)', name=f"{ticker} MC (95% Lower)" )) if show_var: var = predictor.calculate_var( combined_data[ticker]) st.write( f"Value at Risk (95%) for {ticker}: {var:.2%}") if show_patterns or show_breakouts: ticker_data = load_ticker_data(output_dir, ticker) if ticker_data is not None: if show_patterns: patterns = predictor.detect_patterns( ticker_data) st.write( f"\nDetected patterns for {ticker}:") for pattern, signals in patterns.items(): if signals.any(): st.write( f"- {pattern}: {signals.sum()} occurrences") if show_breakouts: breakouts = predictor.predict_breakouts( ticker_data) high_prob = breakouts[breakouts['breakout_probability'] > 0.7] if not high_prob.empty: st.write( f"\nPotential breakout points for {ticker}:") st.write(high_prob) pred_fig.update_layout( height=600, title="Price Predictions", showlegend=True, legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01), margin=dict(l=40, r=40, t=40, b=40), hovermode='x unified' ) st.plotly_chart(pred_fig, use_container_width=True) # ----------------------------- # Expandable Raw Data View # ----------------------------- with st.expander("View Raw Data"): st.dataframe(combined_data) else: st.markdown(""" Compare stocks, crypto, commodities and more with our advanced technical indicators. #### Getting Started πŸš€ 1. **Choose Your Asset Group** πŸ‘ˆ - Pick from a range including Stocks, Crypto, Commodities, ETFs, and Bonds. 2. **Select Your Tickers** πŸ“ˆ - Simply choose your assets from the ticker list. - Data is downloaded automatically. 3. **Configure Predictives & Risk Tools** πŸ€–πŸ“Š - Predictive models combine historical data with real-time market signals to forecast trendsβ€”helping anticipate price movements and spot emerging opportunities. - Risk analysis tools compute key metrics like Value at Risk (VaR) and perform stress tests to evaluate potential losses, giving critical insights into risk exposure. - Configure these tools to complement your overall technical analysis. 4. **Enhance Your Analysis with Indicators** βš™οΈ - Use indicators such as Bollinger Bands for volatility, SMA/EMA with your custom periods for trend analysis, MACD for momentum, RSI for market strength, and VWAP for price action insights. #### Chart Features πŸ” - **Zoom**: Use the mouse wheel or toolbar. - **Pan**: Click and drag to navigate the chart. - **Data Visibility**: Toggle asset display via the legend. - **Detailed View**: Hover over data points for precise values. """) # ----------------------------- # Entry Point # ----------------------------- if __name__ == "__main__": main()