"""Async collectors that power the FastAPI endpoints.""" from __future__ import annotations import asyncio import json import logging import time from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional import httpx from config import CACHE_TTL, COIN_SYMBOL_MAPPING, USER_AGENT, get_settings logger = logging.getLogger(__name__) settings = get_settings() class CollectorError(RuntimeError): """Raised when a provider fails to return data.""" def __init__(self, message: str, provider: Optional[str] = None, status_code: Optional[int] = None): super().__init__(message) self.provider = provider self.status_code = status_code @dataclass class CacheEntry: value: Any expires_at: float class TTLCache: """Simple in-memory TTL cache safe for async usage.""" def __init__(self, ttl: int = CACHE_TTL) -> None: self.ttl = ttl or CACHE_TTL self._store: Dict[str, CacheEntry] = {} self._lock = asyncio.Lock() async def get(self, key: str) -> Any: async with self._lock: entry = self._store.get(key) if not entry: return None if entry.expires_at < time.time(): self._store.pop(key, None) return None return entry.value async def set(self, key: str, value: Any) -> None: async with self._lock: self._store[key] = CacheEntry(value=value, expires_at=time.time() + self.ttl) class ProvidersRegistry: """Utility that loads provider definitions from disk.""" def __init__(self, path: Optional[Path] = None) -> None: self.path = Path(path or settings.providers_config_path) self._providers: Dict[str, Any] = {} self._load() def _load(self) -> None: if not self.path.exists(): logger.warning("Providers config not found at %s", self.path) self._providers = {} return with self.path.open("r", encoding="utf-8") as handle: data = json.load(handle) self._providers = data.get("providers", {}) @property def providers(self) -> Dict[str, Any]: return self._providers class MarketDataCollector: """Fetch market data from public providers with caching and fallbacks.""" def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None: self.registry = registry or ProvidersRegistry() self.cache = TTLCache(settings.cache_ttl) self._symbol_map = {symbol.lower(): coin_id for coin_id, symbol in COIN_SYMBOL_MAPPING.items()} self.headers = {"User-Agent": settings.user_agent or USER_AGENT} self.timeout = 15.0 self._last_error_log: Dict[str, float] = {} # Track last error log time per provider self._error_log_throttle = 60.0 # Only log same error once per 60 seconds async def _request(self, provider_key: str, path: str, params: Optional[Dict[str, Any]] = None) -> Any: provider = self.registry.providers.get(provider_key) if not provider: raise CollectorError(f"Provider {provider_key} not configured", provider=provider_key) url = provider["base_url"].rstrip("/") + path # Rate limit tracking per provider if not hasattr(self, '_rate_limit_timestamps'): self._rate_limit_timestamps: Dict[str, List[float]] = {} if provider_key not in self._rate_limit_timestamps: self._rate_limit_timestamps[provider_key] = [] # Get rate limits from provider config rate_limit_rpm = provider.get("rate_limit", {}).get("requests_per_minute", 30) if rate_limit_rpm and len(self._rate_limit_timestamps[provider_key]) >= rate_limit_rpm: # Check if oldest request is older than 1 minute oldest_time = self._rate_limit_timestamps[provider_key][0] if time.time() - oldest_time < 60: wait_time = 60 - (time.time() - oldest_time) + 1 if self._should_log_error(provider_key, "rate_limit_wait"): logger.warning(f"Rate limiting {provider_key}, waiting {wait_time:.1f}s") await asyncio.sleep(wait_time) # Clean old timestamps cutoff = time.time() - 60 self._rate_limit_timestamps[provider_key] = [ ts for ts in self._rate_limit_timestamps[provider_key] if ts > cutoff ] async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client: response = await client.get(url, params=params) # Record request timestamp self._rate_limit_timestamps[provider_key].append(time.time()) # Keep only last minute of timestamps cutoff = time.time() - 60 self._rate_limit_timestamps[provider_key] = [ ts for ts in self._rate_limit_timestamps[provider_key] if ts > cutoff ] # Handle HTTP 429 (Rate Limit) with exponential backoff if response.status_code == 429: retry_after = int(response.headers.get("Retry-After", "60")) error_msg = f"{provider_key} rate limited (HTTP 429), retry after {retry_after}s" if self._should_log_error(provider_key, "HTTP 429"): logger.warning(error_msg) raise CollectorError( error_msg, provider=provider_key, status_code=429, ) if response.status_code != 200: raise CollectorError( f"{provider_key} request failed with HTTP {response.status_code}", provider=provider_key, status_code=response.status_code, ) return response.json() def _should_log_error(self, provider: str, error_msg: str) -> bool: """Check if error should be logged (throttle repeated errors).""" error_key = f"{provider}:{error_msg}" now = time.time() last_log_time = self._last_error_log.get(error_key, 0) if now - last_log_time > self._error_log_throttle: self._last_error_log[error_key] = now # Clean up old entries (keep only last hour) cutoff = now - 3600 self._last_error_log = {k: v for k, v in self._last_error_log.items() if v > cutoff} return True return False async def get_top_coins(self, limit: int = 10) -> List[Dict[str, Any]]: cache_key = f"top_coins:{limit}" cached = await self.cache.get(cache_key) if cached: return cached # Provider list with priority order (add more fallbacks from resource files) providers = ["coingecko", "coincap", "coinpaprika"] last_error: Optional[Exception] = None last_error_details: Optional[str] = None for provider in providers: try: if provider == "coingecko": data = await self._request( "coingecko", "/coins/markets", { "vs_currency": "usd", "order": "market_cap_desc", "per_page": limit, "page": 1, "sparkline": "false", "price_change_percentage": "24h", }, ) coins = [ { "name": item.get("name"), "symbol": item.get("symbol", "").upper(), "price": item.get("current_price"), "change_24h": item.get("price_change_percentage_24h"), "market_cap": item.get("market_cap"), "volume_24h": item.get("total_volume"), "rank": item.get("market_cap_rank"), "last_updated": item.get("last_updated"), } for item in data ] await self.cache.set(cache_key, coins) return coins if provider == "coincap": data = await self._request("coincap", "/assets", {"limit": limit}) coins = [ { "name": item.get("name"), "symbol": item.get("symbol", "").upper(), "price": float(item.get("priceUsd", 0)), "change_24h": float(item.get("changePercent24Hr", 0)), "market_cap": float(item.get("marketCapUsd", 0)), "volume_24h": float(item.get("volumeUsd24Hr", 0)), "rank": int(item.get("rank", 0)), } for item in data.get("data", []) ] await self.cache.set(cache_key, coins) return coins if provider == "coinpaprika": data = await self._request("coinpaprika", "/tickers", {"quotes": "USD", "limit": limit}) coins = [ { "name": item.get("name"), "symbol": item.get("symbol", "").upper(), "price": float(item.get("quotes", {}).get("USD", {}).get("price", 0)), "change_24h": float(item.get("quotes", {}).get("USD", {}).get("percent_change_24h", 0)), "market_cap": float(item.get("quotes", {}).get("USD", {}).get("market_cap", 0)), "volume_24h": float(item.get("quotes", {}).get("USD", {}).get("volume_24h", 0)), "rank": int(item.get("rank", 0)), "last_updated": item.get("last_updated"), } for item in data[:limit] if item.get("quotes", {}).get("USD") ] await self.cache.set(cache_key, coins) return coins except Exception as exc: # pragma: no cover - network heavy last_error = exc error_msg = str(exc) if str(exc) else repr(exc) error_type = type(exc).__name__ # Extract HTTP status code if available if hasattr(exc, 'status_code'): status_code = exc.status_code error_msg = f"HTTP {status_code}: {error_msg}" if error_msg else f"HTTP {status_code}" elif isinstance(exc, CollectorError) and hasattr(exc, 'status_code') and exc.status_code: status_code = exc.status_code error_msg = f"HTTP {status_code}: {error_msg}" if error_msg else f"HTTP {status_code}" # Ensure we always have a meaningful error message if not error_msg or error_msg.strip() == "": error_msg = f"{error_type} (no details available)" last_error_details = f"{error_type}: {error_msg}" # Throttle error logging to prevent spam error_key_for_logging = error_msg or error_type if self._should_log_error(provider, error_key_for_logging): logger.warning( "Provider %s failed: %s (error logged, will suppress similar errors for 60s)", provider, last_error_details ) raise CollectorError(f"Unable to fetch top coins from any provider. Last error: {last_error_details or 'Unknown'}", provider=str(last_error) if last_error else None) async def _coin_id(self, symbol: str) -> str: symbol_lower = symbol.lower() if symbol_lower in self._symbol_map: return self._symbol_map[symbol_lower] cache_key = "coingecko:symbols" cached = await self.cache.get(cache_key) if cached: mapping = cached else: data = await self._request("coingecko", "/coins/list") mapping = {item["symbol"].lower(): item["id"] for item in data} await self.cache.set(cache_key, mapping) if symbol_lower not in mapping: raise CollectorError(f"Unknown symbol: {symbol}") return mapping[symbol_lower] async def get_coin_details(self, symbol: str) -> Dict[str, Any]: coin_id = await self._coin_id(symbol) cache_key = f"coin:{coin_id}" cached = await self.cache.get(cache_key) if cached: return cached data = await self._request( "coingecko", f"/coins/{coin_id}", {"localization": "false", "tickers": "false", "market_data": "true"}, ) market_data = data.get("market_data", {}) coin = { "id": coin_id, "name": data.get("name"), "symbol": data.get("symbol", "").upper(), "description": data.get("description", {}).get("en"), "homepage": data.get("links", {}).get("homepage", [None])[0], "price": market_data.get("current_price", {}).get("usd"), "market_cap": market_data.get("market_cap", {}).get("usd"), "volume_24h": market_data.get("total_volume", {}).get("usd"), "change_24h": market_data.get("price_change_percentage_24h"), "high_24h": market_data.get("high_24h", {}).get("usd"), "low_24h": market_data.get("low_24h", {}).get("usd"), "circulating_supply": market_data.get("circulating_supply"), "total_supply": market_data.get("total_supply"), "ath": market_data.get("ath", {}).get("usd"), "atl": market_data.get("atl", {}).get("usd"), "last_updated": data.get("last_updated"), } await self.cache.set(cache_key, coin) return coin async def get_market_stats(self) -> Dict[str, Any]: cache_key = "market:stats" cached = await self.cache.get(cache_key) if cached: return cached global_data = await self._request("coingecko", "/global") stats = global_data.get("data", {}) market = { "total_market_cap": stats.get("total_market_cap", {}).get("usd"), "total_volume_24h": stats.get("total_volume", {}).get("usd"), "market_cap_change_percentage_24h": stats.get("market_cap_change_percentage_24h_usd"), "btc_dominance": stats.get("market_cap_percentage", {}).get("btc"), "eth_dominance": stats.get("market_cap_percentage", {}).get("eth"), "active_cryptocurrencies": stats.get("active_cryptocurrencies"), "markets": stats.get("markets"), "updated_at": stats.get("updated_at"), } await self.cache.set(cache_key, market) return market async def get_price_history(self, symbol: str, timeframe: str = "7d") -> List[Dict[str, Any]]: coin_id = await self._coin_id(symbol) mapping = {"1d": 1, "7d": 7, "30d": 30, "90d": 90} days = mapping.get(timeframe, 7) cache_key = f"history:{coin_id}:{days}" cached = await self.cache.get(cache_key) if cached: return cached data = await self._request( "coingecko", f"/coins/{coin_id}/market_chart", {"vs_currency": "usd", "days": days}, ) prices = [ { "timestamp": datetime.fromtimestamp(point[0] / 1000, tz=timezone.utc).isoformat(), "price": round(point[1], 4), } for point in data.get("prices", []) ] await self.cache.set(cache_key, prices) return prices async def get_ohlcv(self, symbol: str, interval: str = "1h", limit: int = 100) -> List[Dict[str, Any]]: """Return OHLCV data from Binance with caching and validation.""" cache_key = f"ohlcv:{symbol.upper()}:{interval}:{limit}" cached = await self.cache.get(cache_key) if cached: return cached params = {"symbol": symbol.upper(), "interval": interval, "limit": min(max(limit, 1), 1000)} data = await self._request("binance", "/klines", params) candles: List[Dict[str, Any]] = [] for item in data: try: candles.append( { "timestamp": datetime.fromtimestamp(item[0] / 1000, tz=timezone.utc).isoformat(), "open": float(item[1]), "high": float(item[2]), "low": float(item[3]), "close": float(item[4]), "volume": float(item[5]), } ) except (TypeError, ValueError): # pragma: no cover - defensive continue if not candles: raise CollectorError(f"No OHLCV data returned for {symbol}", provider="binance") await self.cache.set(cache_key, candles) return candles class NewsCollector: """Fetch latest crypto news.""" def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None: self.registry = registry or ProvidersRegistry() self.cache = TTLCache(settings.cache_ttl) self.headers = {"User-Agent": settings.user_agent or USER_AGENT} self.timeout = 15.0 async def get_latest_news(self, limit: int = 10) -> List[Dict[str, Any]]: cache_key = f"news:{limit}" cached = await self.cache.get(cache_key) if cached: return cached url = "https://min-api.cryptocompare.com/data/v2/news/" params = {"lang": "EN"} async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client: response = await client.get(url, params=params) if response.status_code != 200: raise CollectorError(f"News provider error: HTTP {response.status_code}") payload = response.json() items = [] for entry in payload.get("Data", [])[:limit]: published = datetime.fromtimestamp(entry.get("published_on", 0), tz=timezone.utc) items.append( { "id": entry.get("id"), "title": entry.get("title"), "body": entry.get("body"), "url": entry.get("url"), "source": entry.get("source"), "categories": entry.get("categories"), "published_at": published.isoformat(), } ) await self.cache.set(cache_key, items) return items class ProviderStatusCollector: """Perform lightweight health checks against configured providers.""" def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None: self.registry = registry or ProvidersRegistry() self.cache = TTLCache(max(settings.cache_ttl, 600)) self.headers = {"User-Agent": settings.user_agent or USER_AGENT} self.timeout = 8.0 async def _check_provider(self, client: httpx.AsyncClient, provider_id: str, data: Dict[str, Any]) -> Dict[str, Any]: url = data.get("health_check") or data.get("base_url") start = time.perf_counter() try: response = await client.get(url, timeout=self.timeout) latency = round((time.perf_counter() - start) * 1000, 2) status = "online" if response.status_code < 400 else "degraded" return { "provider_id": provider_id, "name": data.get("name", provider_id), "category": data.get("category"), "status": status, "status_code": response.status_code, "latency_ms": latency, } except Exception as exc: # pragma: no cover - network heavy error_msg = str(exc) error_type = type(exc).__name__ logger.warning("Provider %s health check failed: %s: %s", provider_id, error_type, error_msg) return { "provider_id": provider_id, "name": data.get("name", provider_id), "category": data.get("category"), "status": "offline", "status_code": None, "latency_ms": None, "error": str(exc), } async def get_providers_status(self) -> List[Dict[str, Any]]: cached = await self.cache.get("providers_status") if cached: return cached providers = self.registry.providers if not providers: return [] results: List[Dict[str, Any]] = [] async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client: tasks = [self._check_provider(client, pid, data) for pid, data in providers.items()] for chunk in asyncio.as_completed(tasks): results.append(await chunk) await self.cache.set("providers_status", results) return results __all__ = [ "CollectorError", "MarketDataCollector", "NewsCollector", "ProviderStatusCollector", ]