psx-test / app.py
abidkh's picture
No Message
c00c4c0
import os
import gradio as gr
import pandas as pd
from datetime import datetime, timedelta
import tempfile
import plotly.express as px
from pytz import timezone
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
from openpyxl.utils import get_column_letter
from openpyxl.styles import Font, Alignment, PatternFill
from concurrent.futures import ThreadPoolExecutor
from io import StringIO
from dotenv import load_dotenv
# ENVIRONMENT
if os.path.exists('.env.local'):
load_dotenv('.env.local')
SUPABASE_URL = os.environ.get("NEXT_PUBLIC_SUPABASE_URL")
SUPABASE_KEY = os.environ.get("NEXT_PUBLIC_SUPABASE_ANON_KEY")
from supabase import create_client, Client
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
# CONFIG
MONTH_CODES = ['-JAN', '-FEB', '-MAR', '-APR', '-MAY', '-JUN', '-JUL', '-AUG', '-SEP', '-OCT', '-NOV', '-DEC']
MAX_DAYS_BACK = 5
CIRCUIT_BREAKER_PERCENTAGE = 7.5
CIRCUIT_BREAKER_RS_LIMIT = 1
loaded_data = None
# --- SUPABASE DATA FUNCTIONS ---
def get_symbols_data():
try:
companies = supabase.table('companies').select("*").execute().data
kmi = supabase.table('kmi_compliance').select("Symbol").execute().data
psx_df = pd.DataFrame(companies)
kmi_df = pd.DataFrame(kmi)
psx_required = ['Symbol', 'Company Name', 'Sector']
for col in psx_required:
if col not in psx_df.columns:
raise ValueError(f"Missing column: {col}")
kmi_symbols = set(kmi_df['Symbol'].str.strip().str.upper()) if not kmi_df.empty else set()
symbols_data = {
row['Symbol'].strip().upper(): {
'Company': row['Company Name'],
'Sector': row['Sector'],
'KMI': 'Yes' if row['Symbol'].strip().upper() in kmi_symbols else 'No'
}
for _, row in psx_df.iterrows()
}
return symbols_data
except Exception as e:
print(f"Error loading symbols data: {e}")
return {}
def fetch_market_data(date):
try:
date_str = date.strftime('%Y-%m-%d')
data = supabase.table('market_data').select('*').eq('date', date_str).execute().data
if not data:
return None, None
df = pd.DataFrame(data)
for col in ['SYMBOL', 'LDCP', 'OPEN', 'HIGH', 'LOW', 'CLOSE', 'VOLUME']:
if col not in df.columns:
if col.lower() in df.columns:
df.rename(columns={col.lower(): col}, inplace=True)
else:
df[col] = None
df = df[['SYMBOL', 'LDCP', 'OPEN', 'HIGH', 'LOW', 'CLOSE', 'VOLUME']]
return df, date_str
except Exception as e:
print(f"Error fetching market data from supabase: {e}")
return None, None
# --- ALL REMAINING FUNCTIONS UNCHANGED FROM YOUR ORIGINAL CODE BELOW ---
def calculate_breakout_stats(today_data, prev_day_data, prev_week_data, prev_month_data, symbols_data):
results = []
for _, today_row in today_data.iterrows():
symbol = today_row['SYMBOL']
company_info = symbols_data.get(symbol, {})
company_name = company_info.get('Company', symbol)
sector = company_info.get('Sector', 'N/A')
kmi_status = company_info.get('KMI', 'No')
try:
today_close = float(today_row['CLOSE'].replace(',', '')) if today_row['CLOSE'] else 0
today_high = float(today_row['HIGH'].replace(',', '')) if today_row['HIGH'] else 0
today_low = float(today_row['LOW'].replace(',', '')) if today_row['LOW'] else 0
today_ldcp = float(today_row['LDCP'].replace(',', '')) if today_row['LDCP'] else 0
volume = float(str(today_row['VOLUME']).replace(',', '')) if today_row['VOLUME'] and str(today_row['VOLUME']).strip() else 0
volume_str = f"{volume:,.0f}"
except Exception as e:
print(f"Error processing numerical values for {symbol}: {str(e)}")
continue
daily_status = weekly_status = monthly_status = circuit_breaker_status = "N/A"
prev_day_high = prev_day_low = weekly_high = weekly_low = monthly_high = monthly_low = "N/A"
if prev_day_data is not None:
prev_day_row = prev_day_data[prev_day_data['SYMBOL'].str.upper() == symbol.upper()]
if not prev_day_row.empty:
try:
prev_day_close = float(prev_day_row['CLOSE'].iloc[0].replace(',', '')) if prev_day_row['CLOSE'].iloc[0] else 0
prev_day_high = float(prev_day_row['HIGH'].iloc[0].replace(',', '')) if prev_day_row['HIGH'].iloc[0] else "N/A"
prev_day_low = float(prev_day_row['LOW'].iloc[0].replace(',', '')) if prev_day_row['LOW'].iloc[0] else "N/A"
if isinstance(prev_day_high, float) and isinstance(prev_day_low, float):
if today_close > prev_day_high:
daily_status = "▲▲ Daily Breakout"
elif today_close < prev_day_low:
daily_status = "▼▼ Daily Breakdown"
else:
daily_status = "– Daily Within Range"
if prev_day_close > 0:
price_change = today_close - prev_day_close
circuit_breaker_limit = max(CIRCUIT_BREAKER_RS_LIMIT, prev_day_close * CIRCUIT_BREAKER_PERCENTAGE / 100)
if price_change > circuit_breaker_limit:
circuit_breaker_status = "Upper Circuit Breaker"
elif price_change < -circuit_breaker_limit:
circuit_breaker_status = "Lower Circuit Breaker"
except Exception as e:
daily_status = "– Daily Data Error"
if prev_week_data is not None:
symbol_week_data = prev_week_data[prev_week_data['SYMBOL'].str.upper() == symbol.upper()]
if not symbol_week_data.empty:
try:
weekly_high = symbol_week_data['HIGH'].apply(lambda x: float(x.replace(',', '')) if str(x) != 'nan' else 0).max()
weekly_low = symbol_week_data['LOW'].apply(lambda x: float(x.replace(',', '')) if str(x) != 'nan' else 0).min()
if today_close > weekly_high:
weekly_status = "▲▲ Weekly Breakout"
elif today_close < weekly_low:
weekly_status = "▼▼ Weekly Breakdown"
else:
weekly_status = "– Weekly Within Range"
except Exception as e:
weekly_status = "– Weekly Data Error"
if prev_month_data is not None:
symbol_month_data = prev_month_data[prev_month_data['SYMBOL'].str.upper() == symbol.upper()]
if not symbol_month_data.empty:
try:
monthly_high = symbol_month_data['HIGH'].apply(lambda x: float(x.replace(',', '')) if str(x) != 'nan' else 0).max()
monthly_low = symbol_month_data['LOW'].apply(lambda x: float(x.replace(',', '')) if str(x) != 'nan' else 0).min()
if today_close > monthly_high:
monthly_status = "▲▲ Monthly Breakout"
elif today_close < monthly_low:
monthly_status = "▼▼ Monthly Breakdown"
else:
monthly_status = "– Monthly Within Range"
except Exception as e:
monthly_status = "– Monthly Data Error"
if daily_status == "N/A":
try:
if today_close > today_ldcp:
daily_status = "▲▲ Daily Breakout"
elif today_close < today_ldcp:
daily_status = "▼▼ Daily Breakdown"
else:
daily_status = "– Daily Within Range"
except:
daily_status = "– No Data"
def format_value(val):
if isinstance(val, (int, float)):
return f"{val:,.2f}"
return str(val)
results.append({
'SYMBOL': symbol,
'COMPANY': company_name,
'SECTOR': sector,
'KMI_COMPLIANT': kmi_status,
'VOLUME': volume_str,
'LDCP': format_value(today_ldcp),
'OPEN': format_value(float(today_row['OPEN'].replace(',', '')) if today_row['OPEN'] else 0),
'CLOSE': format_value(today_close),
'HIGH': format_value(today_high),
'LOW': format_value(today_low),
'PREV_DAY_HIGH': format_value(prev_day_high) if isinstance(prev_day_high, (int, float)) else prev_day_high,
'PREV_DAY_LOW': format_value(prev_day_low) if isinstance(prev_day_low, (int, float)) else prev_day_low,
'WEEKLY_HIGH': format_value(weekly_high) if isinstance(weekly_high, (int, float)) else weekly_high,
'WEEKLY_LOW': format_value(weekly_low) if isinstance(weekly_low, (int, float)) else weekly_low,
'MONTHLY_HIGH': format_value(monthly_high) if isinstance(monthly_high, (int, float)) else monthly_high,
'MONTHLY_LOW': format_value(monthly_low) if isinstance(monthly_low, (int, float)) else monthly_low,
'DAILY_STATUS': daily_status,
'WEEKLY_STATUS': weekly_status,
'MONTHLY_STATUS': monthly_status,
'CIRCUIT_BREAKER_STATUS': circuit_breaker_status
})
return pd.DataFrame(results)
def save_to_excel(df, report_date):
try:
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as tmp:
EXCEL_FILE = tmp.name
workbook = Workbook()
worksheet = workbook.active
worksheet.title = 'Breakout Analysis'
formatted_date = datetime.strptime(report_date, "%Y-%m-%d").strftime("%d %B %Y")
worksheet.merge_cells('A1:T1')
title_cell = worksheet['A1']
title_cell.value = f"📈 PSX Breakout Analysis - {formatted_date}"
title_cell.font = Font(bold=True, size=14, color="1F4E78")
title_cell.alignment = Alignment(horizontal='center')
worksheet.merge_cells('A2:T2')
timestamp_cell = worksheet['A2']
timestamp_cell.value = f"⏰ Generated: {datetime.now(timezone('Asia/Karachi')).strftime('%d %B %Y %H:%M:%S')} (PKT)"
timestamp_cell.font = Font(size=12, italic=True, color="404040")
timestamp_cell.alignment = Alignment(horizontal='center')
headers = [
'SYMBOL', 'COMPANY', 'SECTOR', 'KMI_COMPLIANT', 'VOLUME',
'LDCP', 'OPEN', 'CLOSE', 'HIGH', 'LOW',
'PREV_DAY_HIGH', 'PREV_DAY_LOW', 'WEEKLY_HIGH', 'WEEKLY_LOW',
'MONTHLY_HIGH', 'MONTHLY_LOW', 'DAILY_STATUS', 'WEEKLY_STATUS', 'MONTHLY_STATUS', 'CIRCUIT_BREAKER_STATUS'
]
for col_num, header in enumerate(headers, 1):
cell = worksheet.cell(row=3, column=col_num, value=header)
cell.font = Font(bold=True, color="FFFFFF")
cell.fill = PatternFill(start_color="4F81BD", fill_type="solid")
cell.alignment = Alignment(horizontal='center')
for row in dataframe_to_rows(df, index=False, header=False):
worksheet.append(row)
status_cols = {17: 'DAILY_STATUS', 18: 'WEEKLY_STATUS', 19: 'MONTHLY_STATUS', 20: 'CIRCUIT_BREAKER_STATUS'}
cond_formatting = {
"▲▲": PatternFill(start_color="008000", fill_type="solid"),
"▲": PatternFill(start_color="92D050", fill_type="solid"),
"▼▼": PatternFill(start_color="FF0000", fill_type="solid"),
"▼": PatternFill(start_color="FFC7CE", fill_type="solid"),
"–": PatternFill(start_color="D9D9D9", fill_type="solid"),
"Upper Circuit Breaker": PatternFill(start_color="FFD700", fill_type="solid"),
"Lower Circuit Breaker": PatternFill(start_color="A52A2A", fill_type="solid")
}
for col_num, col_name in status_cols.items():
for row in range(4, len(df) + 4):
cell = worksheet.cell(row=row, column=col_num)
for prefix, fill in cond_formatting.items():
if prefix in str(cell.value):
cell.fill = fill
break
for column in worksheet.columns:
max_length = 0
column_letter = get_column_letter(column[0].column)
if column_letter in ['Q', 'R', 'S', 'T']:
worksheet.column_dimensions[column_letter].width = 18
continue
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = (max_length + 2) * 1.1
worksheet.column_dimensions[column_letter].width = adjusted_width
worksheet.column_dimensions['A'].width = 12
worksheet.column_dimensions['B'].width = 30
worksheet.column_dimensions['C'].width = 20
worksheet.column_dimensions['D'].width = 12
worksheet.column_dimensions['E'].width = 15
worksheet.freeze_panes = 'C4'
workbook.save(EXCEL_FILE)
return EXCEL_FILE
except Exception as e:
print(f"Error saving Excel file: {e}")
return None
def get_counts(df, status_col):
return {
"Breakout": len(df[df[status_col].str.contains("▲▲")]),
"Breakdown": len(df[df[status_col].str.contains("▼▼")]),
"Within Range": len(df[df[status_col].str.contains("–")])
}
def create_pie_chart(counts, title):
df = pd.DataFrame({
'Status': list(counts.keys()),
'Count': list(counts.values())
})
fig = px.pie(df, values='Count', names='Status', title=title)
return fig
def highlight_status(val):
if "▲▲" in str(val):
return 'background-color: #008000; color: white'
elif "▼▼" in str(val):
return 'background-color: #FF0000; color: white'
elif "–" in str(val):
return 'background-color: #D9D9D9; color: black'
elif "Upper Circuit Breaker" in str(val):
return 'background-color: #FFD700; color: black'
elif "Lower Circuit Breaker" in str(val):
return 'background-color: #A52A2A; color: white'
return ''
def is_valid_symbol(symbol, symbols_data):
try:
symbol = symbol.strip().upper()
return (symbol in symbols_data) and not any(month in symbol for month in MONTH_CODES)
except:
return False
def is_weekend(date):
return date.weekday() >= 5
def load_data():
global loaded_data
symbols_data = get_symbols_data()
if not symbols_data:
return None, None, None, None, None, None, None, None, gr.update(choices=["All"])
date_to_try = datetime.now()
attempts = 0
today_data, today_date = None, None
while attempts < MAX_DAYS_BACK and today_data is None:
if not is_weekend(date_to_try):
today_data, today_date = fetch_market_data(date_to_try)
if today_data is not None:
break
date_to_try -= timedelta(days=1)
attempts += 1
if today_data is None:
print("❌ No market data found")
return None, None, None, None, None, None, None, None, gr.update(choices=["All"])
today_data = today_data[today_data['SYMBOL'].apply(lambda x: is_valid_symbol(x, symbols_data))].copy()
if today_data.empty:
print("⚠️ No valid symbols found")
return None, None, None, None, None, None, None, None, gr.update(choices=["All"])
target_date = datetime.strptime(today_date, "%Y-%m-%d")
prev_day_data = None
days_back = 1
while days_back <= MAX_DAYS_BACK and prev_day_data is None:
prev_day_date = target_date - timedelta(days=days_back)
if not is_weekend(prev_day_date):
prev_day_data, _ = fetch_market_data(prev_day_date)
if prev_day_data is not None:
break
days_back += 1
prev_monday = target_date - timedelta(days=target_date.weekday() + 7)
prev_friday = prev_monday + timedelta(days=4)
current_date = prev_monday
all_week_data = []
while current_date <= prev_friday:
if not is_weekend(current_date):
data, _ = fetch_market_data(current_date)
if data is not None:
all_week_data.append(data)
current_date += timedelta(days=1)
prev_week_data = pd.concat(all_week_data) if all_week_data else None
first_day_prev_month = (target_date.replace(day=1) - timedelta(days=1)).replace(day=1)
last_day_prev_month = target_date.replace(day=1) - timedelta(days=1)
current_date = first_day_prev_month
all_month_data = []
while current_date <= last_day_prev_month:
if not is_weekend(current_date):
data, _ = fetch_market_data(current_date)
if data is not None:
all_month_data.append(data)
current_date += timedelta(days=1)
prev_month_data = pd.concat(all_month_data) if all_month_data else None
result_df = calculate_breakout_stats(today_data, prev_day_data, prev_week_data, prev_month_data, symbols_data)
loaded_data = result_df
excel_file = save_to_excel(result_df, today_date)
daily_counts = get_counts(result_df, 'DAILY_STATUS')
weekly_counts = get_counts(result_df, 'WEEKLY_STATUS')
monthly_counts = get_counts(result_df, 'MONTHLY_STATUS')
fig_daily = create_pie_chart(daily_counts, "Daily Breakout Distribution")
fig_weekly = create_pie_chart(weekly_counts, "Weekly Breakout Distribution")
fig_monthly = create_pie_chart(monthly_counts, "Monthly Breakout Distribution")
daily_table = pd.DataFrame.from_dict(daily_counts, orient='index').reset_index()
weekly_table = pd.DataFrame.from_dict(weekly_counts, orient='index').reset_index()
monthly_table = pd.DataFrame.from_dict(monthly_counts, orient='index').reset_index()
styled_df = result_df.style.map(highlight_status, subset=['DAILY_STATUS', 'WEEKLY_STATUS', 'MONTHLY_STATUS', 'CIRCUIT_BREAKER_STATUS'])
sectors = ["All"] + sorted(result_df['SECTOR'].unique().tolist())
return (
excel_file,
styled_df,
fig_daily,
fig_weekly,
fig_monthly,
daily_table,
weekly_table,
monthly_table,
gr.update(choices=sectors, value="All")
)
def filter_data(filter_breakout, filter_sector, filter_kmi, filter_circuit_breaker, filter_symbols):
global loaded_data
if loaded_data is None:
return gr.Dataframe()
df = loaded_data.copy()
if filter_breakout:
df = df[(df['DAILY_STATUS'].str.contains("▲▲")) &
(df['WEEKLY_STATUS'].str.contains("▲▲")) &
(df['MONTHLY_STATUS'].str.contains("▲▲"))]
if filter_sector != "All":
df = df[df['SECTOR'] == filter_sector]
if filter_kmi != "All":
df = df[df['KMI_COMPLIANT'] == filter_kmi]
if filter_circuit_breaker != "All":
if filter_circuit_breaker == "Upper Circuit Breaker":
df = df[df['CIRCUIT_BREAKER_STATUS'] == "Upper Circuit Breaker"]
elif filter_circuit_breaker == "Lower Circuit Breaker":
df = df[df['CIRCUIT_BREAKER_STATUS'] == "Lower Circuit Breaker"]
if filter_symbols:
symbols = [symbol.strip().upper() for symbol in filter_symbols.split(',')]
df = df[df['SYMBOL'].isin(symbols)]
styled_df = df.style.map(highlight_status, subset=['DAILY_STATUS', 'WEEKLY_STATUS', 'MONTHLY_STATUS', 'CIRCUIT_BREAKER_STATUS'])
return styled_df
### Gradio Interface
with gr.Blocks(title="PSX Breakout Scanner", theme=gr.themes.Soft()) as app:
gr.Markdown("# 📈 PSX Breakout Scanner")
gr.Markdown("Identifies breakout/breakdown signals in Pakistan Stock Exchange")
with gr.Row():
run_btn = gr.Button("Run Analysis", variant="primary")
download = gr.File(label="Download Excel Report")
with gr.Row():
filter_breakout = gr.Checkbox(label="Show only stocks with Daily, Weekly, and Monthly Breakouts")
filter_sector = gr.Dropdown(label="Filter by Sector", choices=["All"], value="All")
filter_kmi = gr.Dropdown(label="Filter by Shariah Compliance", choices=["All", "Yes", "No"], value="All")
filter_circuit_breaker = gr.Dropdown(label="Filter by Circuit Breaker", choices=["All", "Upper Circuit Breaker", "Lower Circuit Breaker"], value="All")
filter_symbols = gr.Textbox(label="Filter by Symbols (comma-separated)", placeholder="e.g., SYM1, SYM2")
with gr.Row():
dataframe = gr.DataFrame(interactive=False, wrap=True)
with gr.Row():
with gr.Column():
gr.Markdown("### Daily Analysis")
daily_plot = gr.Plot()
daily_table = gr.Dataframe(headers=["Status", "Count"])
with gr.Column():
gr.Markdown("### Weekly Analysis")
weekly_plot = gr.Plot()
weekly_table = gr.Dataframe(headers=["Status", "Count"])
with gr.Column():
gr.Markdown("### Monthly Analysis")
monthly_plot = gr.Plot()
monthly_table = gr.Dataframe(headers=["Status", "Count"])
run_btn.click(
fn=load_data,
outputs=[
download,
dataframe,
daily_plot,
weekly_plot,
monthly_plot,
daily_table,
weekly_table,
monthly_table,
filter_sector
]
)
filter_breakout.change(
fn=filter_data,
inputs=[filter_breakout, filter_sector, filter_kmi, filter_circuit_breaker, filter_symbols],
outputs=dataframe
)
filter_sector.change(
fn=filter_data,
inputs=[filter_breakout, filter_sector, filter_kmi, filter_circuit_breaker, filter_symbols],
outputs=dataframe
)
filter_kmi.change(
fn=filter_data,
inputs=[filter_breakout, filter_sector, filter_kmi, filter_circuit_breaker, filter_symbols],
outputs=dataframe
)
filter_circuit_breaker.change(
fn=filter_data,
inputs=[filter_breakout, filter_sector, filter_kmi, filter_circuit_breaker, filter_symbols],
outputs=dataframe
)
filter_symbols.change(
fn=filter_data,
inputs=[filter_breakout, filter_sector, filter_kmi, filter_circuit_breaker, filter_symbols],
outputs=dataframe
)
if __name__ == "__main__":
app.launch()