#!/usr/bin/env python3 """ Anna's Archives API - Hugging Face Space Edition Optimized for HF Free Tier (CPU-only, minimal resources) """ import os import re import time import logging from datetime import datetime, timedelta from functools import lru_cache from dataclasses import dataclass, asdict from typing import Optional, Any import html from flask import Flask, jsonify, request, Response from curl_cffi import requests from bs4 import BeautifulSoup import csv import io # ============================================================================ # CONFIGURATION # ============================================================================ class Config: # Server PORT = int(os.getenv("PORT", 7860)) # HF Spaces default port HOST = "0.0.0.0" # Anna's Archives MIRRORS_URL = "https://shadowlibraries.github.io/DirectDownloads/AnnasArchive/" DEFAULT_BASE_URL = "https://annas-archive.gs" # ⚠️ CORRECTION ICI : Utiliser une version compatible BROWSER_IMPERSONATE = "chrome110" # ✅ Versions supportées : chrome99, chrome100, chrome101, chrome104, chrome107, chrome110, chrome116, chrome119, chrome120, edge99, edge101, safari15_3, safari15_5 # Performance (optimized for free tier) CACHE_TTL_MINUTES = 10 REQUEST_TIMEOUT = 20 MAX_RETRIES = 2 # Logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s' ) logger = logging.getLogger(__name__) # ============================================================================ # SIMPLE CACHE # ============================================================================ class SimpleCache: """Cache minimaliste avec TTL""" def __init__(self, ttl_minutes: int): self._cache = {} self._ttl = timedelta(minutes=ttl_minutes) def get(self, key: str) -> Optional[Any]: if key in self._cache: value, timestamp = self._cache[key] if datetime.now() - timestamp < self._ttl: return value del self._cache[key] return None def set(self, key: str, value: Any): # Auto-cleanup: max 100 items if len(self._cache) > 100: oldest = min(self._cache.items(), key=lambda x: x[1][1])[0] del self._cache[oldest] self._cache[key] = (value, datetime.now()) def clear(self): self._cache.clear() def size(self): return len(self._cache) cache = SimpleCache(Config.CACHE_TTL_MINUTES) # ============================================================================ # DATA MODELS # ============================================================================ @dataclass class Book: md5: Optional[str] title: str author: str publisher: str year: Optional[int] format: str language: str size_mb: float url: str cover_url: Optional[str] = None def to_dict(self): return asdict(self) # ============================================================================ # UTILITIES # ============================================================================ def clean_url(url: str) -> str: """Remove query params from URL""" if not url: return "" from urllib.parse import urlparse, urlunparse parsed = urlparse(url) return urlunparse((parsed.scheme, parsed.netloc, parsed.path.rstrip('/'), '', '', '')) def clean_text(text: str) -> str: """Clean HTML entities and extra spaces""" if not text: return "" text = html.unescape(text) text = re.sub(r'[👤🏢📘🚀]', '', text) text = re.sub(r'\s+', ' ', text) return text.strip() def parse_size(size_str: str) -> float: """Convert size to MB""" if not size_str: return 0.0 match = re.search(r'([\d.]+)\s*([KMGT]?B)', size_str, re.I) if not match: return 0.0 num = float(match.group(1)) unit = match.group(2).upper() multipliers = {'B': 1/1024/1024, 'KB': 1/1024, 'MB': 1, 'GB': 1024, 'TB': 1024*1024} return round(num * multipliers.get(unit, 1), 2) # ============================================================================ # MIRROR MANAGER # ============================================================================ class MirrorManager: """Gère la détection et sélection des mirrors""" def __init__(self): self._current_mirror = None @lru_cache(maxsize=1) def get_mirrors(self) -> list[dict]: """Fetch mirrors list (cached)""" logger.info("Fetching mirrors...") try: resp = requests.get( Config.MIRRORS_URL, impersonate=Config.BROWSER_IMPERSONATE, timeout=Config.REQUEST_TIMEOUT ) soup = BeautifulSoup(resp.text, "html.parser") article = soup.find("article", class_="book-article") if not article: return [] heading = article.find("h3", id="links") if not heading: return [] ul = heading.find_next_sibling("ul") if not ul: return [] mirrors = [] for li in ul.find_all("li"): a = li.find("a", href=True) if a: mirrors.append({ "label": a.get_text(strip=True), "url": clean_url(a["href"]) }) logger.info(f"Found {len(mirrors)} mirrors") return mirrors except Exception as e: logger.error(f"Failed to fetch mirrors: {e}") return [] def get_active_mirror(self) -> str: """Get first working mirror""" if self._current_mirror: return self._current_mirror mirrors = self.get_mirrors() for mirror in mirrors: try: logger.info(f"Testing mirror: {mirror['url']}") resp = requests.get( mirror['url'], impersonate=Config.BROWSER_IMPERSONATE, timeout=10 ) if resp.status_code == 200: self._current_mirror = mirror['url'] logger.info(f"✅ Active mirror: {self._current_mirror}") return self._current_mirror except Exception as e: logger.warning(f"Mirror {mirror['url']} failed: {e}") continue logger.warning("No active mirror, using default") self._current_mirror = Config.DEFAULT_BASE_URL return self._current_mirror mirror_manager = MirrorManager() # ============================================================================ # SCRAPER # ============================================================================ def scrape_search(query: str, page: int = 1, **filters) -> dict: """Scrape Anna's Archives search results""" # Check cache cache_key = f"{query}_{page}_{filters}" cached = cache.get(cache_key) if cached: logger.info(f"Cache HIT: {query} (page {page})") return cached logger.info(f"Scraping: {query} (page {page})") base_url = mirror_manager.get_active_mirror() search_url = f"{base_url}/search" params = {"q": query} if page > 1: params["page"] = page # Add filters for key in ['lang', 'content', 'ext', 'sort']: if key in filters and filters[key]: params[key] = filters[key] try: resp = requests.get( search_url, params=params, impersonate=Config.BROWSER_IMPERSONATE, timeout=Config.REQUEST_TIMEOUT ) resp.raise_for_status() # Parse results books = parse_books(resp.text, base_url) has_more = check_next_page(resp.text) result = { "books": [b.to_dict() for b in books], "total": len(books), "has_more": has_more, "timestamp": datetime.now().isoformat() } # Cache result cache.set(cache_key, result) logger.info(f"Found {len(books)} books") return result except Exception as e: logger.error(f"Scraping error: {e}") return {"books": [], "total": 0, "has_more": False, "error": str(e)} def parse_books(html: str, base_url: str) -> list[Book]: """Parse book blocks from HTML""" soup = BeautifulSoup(html, 'html.parser') books = [] seen_md5s = set() # Find book blocks blocks = soup.find_all('div', class_=lambda x: x and 'flex pt-3 pb-3' in x) for block in blocks: try: # MD5 md5 = None md5_div = block.find('div', class_='hidden') if md5_div: match = re.search(r'md5:([a-f0-9]{32})', md5_div.text) if match: md5 = match.group(1) if md5 in seen_md5s: continue # Title & URL title_link = block.find('a', class_=['js-vim-focus', 'font-semibold']) if not title_link: continue title = clean_text(title_link.text) url = title_link.get('href', '') if url.startswith('/'): url = f"{base_url}{url}" # Extract MD5 from URL if needed if not md5: match = re.search(r'/md5/([a-f0-9]{32})', url) if match: md5 = match.group(1) if md5: seen_md5s.add(md5) # Cover cover_img = block.find('img') cover_url = None if cover_img: cover_url = cover_img.get('src', '') if cover_url.startswith('/'): cover_url = f"{base_url}{cover_url}" # Author author = "Unknown" for link in block.find_all('a', href=re.compile(r'search\?q=')): if 'user-edit' in str(link): author = clean_text(link.text) break # Publisher & Year publisher = "Unknown" year = None for link in block.find_all('a', href=re.compile(r'search\?q=')): if 'company' in str(link): pub_text = clean_text(link.text) match = re.search(r'(.+),\s*(\d{4})$', pub_text) if match: publisher = match.group(1) year = int(match.group(2)) else: publisher = pub_text break # Info line info_div = block.find('div', class_=re.compile(r'text-gray-800')) info_text = info_div.text if info_div else "" # Parse format, language, size format_match = re.search(r'·\s*([A-Z0-9]+)\s*·', info_text) lang_match = re.search(r'\[([a-z]{2})\]', info_text) size_match = re.search(r'([\d.]+[KMGT]?B)', info_text) year_match = re.search(r'·\s*(\d{4})\s*·', info_text) book = Book( md5=md5, title=title, author=author, publisher=publisher, year=year or (int(year_match.group(1)) if year_match else None), format=format_match.group(1) if format_match else "UNKNOWN", language=lang_match.group(1) if lang_match else "xx", size_mb=parse_size(size_match.group(1)) if size_match else 0.0, url=url, cover_url=cover_url ) books.append(book) except Exception as e: logger.warning(f"Error parsing book: {e}") continue return books def check_next_page(html: str) -> bool: """Check if there's a next page""" soup = BeautifulSoup(html, 'html.parser') return soup.find('a', string=re.compile(r'Next|→|»')) is not None # ============================================================================ # FLASK APP # ============================================================================ app = Flask(__name__) @app.route('/') def index(): """API Documentation""" return jsonify({ "name": "Anna's Archives API", "version": "1.0.1", "description": "Hugging Face Space Edition - Optimized for free tier", "browser_impersonate": Config.BROWSER_IMPERSONATE, "endpoints": { "GET /": "This documentation", "GET /search": "Search books", "GET /health": "Health check", "GET /mirrors": "List mirrors", "POST /cache/clear": "Clear cache" }, "examples": { "search": "/search?q=python", "with_filters": "/search?q=machine+learning&ext=pdf&lang=en", "pagination": "/search?q=python&page=2", "csv_export": "/search?q=python&format=csv" }, "search_params": { "q": "Search query (required)", "page": "Page number (default: 1)", "lang": "Language code (en, fr, es, etc.)", "ext": "File extension (pdf, epub, mobi, etc.)", "content": "Content type (book_fiction, book_nonfiction, etc.)", "sort": "Sort order (newest, oldest, largest, smallest)", "format": "Response format (json, csv)" } }) @app.route('/search') def search(): """Search endpoint""" query = request.args.get('q', '').strip() if not query: return jsonify({"error": "Parameter 'q' is required"}), 400 try: page = max(1, int(request.args.get('page', 1))) except ValueError: return jsonify({"error": "Invalid page number"}), 400 filters = { 'lang': request.args.get('lang'), 'ext': request.args.get('ext'), 'content': request.args.get('content'), 'sort': request.args.get('sort') } result = scrape_search(query, page, **filters) # CSV export if request.args.get('format') == 'csv': output = io.StringIO() if result['books']: writer = csv.DictWriter(output, fieldnames=result['books'][0].keys()) writer.writeheader() writer.writerows(result['books']) return Response( output.getvalue(), mimetype='text/csv', headers={'Content-Disposition': f'attachment; filename=search_{query}.csv'} ) return jsonify({ "query": query, "page": page, **result, "filters": filters }) @app.route('/health') def health(): """Health check""" try: mirror = mirror_manager.get_active_mirror() status = "healthy" except: mirror = "unavailable" status = "degraded" return jsonify({ "status": status, "mirror": mirror, "cache_size": cache.size(), "browser": Config.BROWSER_IMPERSONATE, "uptime": "ok" }) @app.route('/mirrors') def mirrors(): """List available mirrors""" return jsonify({ "mirrors": mirror_manager.get_mirrors(), "current": mirror_manager.get_active_mirror() }) @app.route('/cache/clear', methods=['POST']) def clear_cache(): """Clear cache""" cache.clear() mirror_manager.get_mirrors.cache_clear() # Clear LRU cache too return jsonify({"message": "Cache cleared", "size": 0}) # ============================================================================ # MAIN # ============================================================================ if __name__ == "__main__": logger.info("=" * 70) logger.info("🚀 Anna's Archives API - Hugging Face Space Edition") logger.info("=" * 70) logger.info(f"Port: {Config.PORT}") logger.info(f"Browser: {Config.BROWSER_IMPERSONATE}") logger.info(f"Cache TTL: {Config.CACHE_TTL_MINUTES} minutes") logger.info("=" * 70) # Initialize mirror mirror_manager.get_active_mirror() app.run(host=Config.HOST, port=Config.PORT)