| |
| """ |
| Anna's Archives API - Hugging Face Space Edition |
| Optimized for HF Free Tier (CPU-only, minimal resources) |
| """ |
|
|
| import os |
| import re |
| import time |
| import logging |
| from datetime import datetime, timedelta |
| from functools import lru_cache |
| from dataclasses import dataclass, asdict |
| from typing import Optional, Any |
| import html |
|
|
| from flask import Flask, jsonify, request, Response |
| from curl_cffi import requests |
| from bs4 import BeautifulSoup |
| import csv |
| import io |
|
|
| |
| |
| |
|
|
| class Config: |
| |
| PORT = int(os.getenv("PORT", 7860)) |
| HOST = "0.0.0.0" |
| |
| |
| MIRRORS_URL = "https://shadowlibraries.github.io/DirectDownloads/AnnasArchive/" |
| DEFAULT_BASE_URL = "https://annas-archive.gs" |
| |
| |
| BROWSER_IMPERSONATE = "chrome110" |
| |
| |
| CACHE_TTL_MINUTES = 10 |
| REQUEST_TIMEOUT = 20 |
| MAX_RETRIES = 2 |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s [%(levelname)s] %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
|
|
| class SimpleCache: |
| """Cache minimaliste avec TTL""" |
| def __init__(self, ttl_minutes: int): |
| self._cache = {} |
| self._ttl = timedelta(minutes=ttl_minutes) |
| |
| def get(self, key: str) -> Optional[Any]: |
| if key in self._cache: |
| value, timestamp = self._cache[key] |
| if datetime.now() - timestamp < self._ttl: |
| return value |
| del self._cache[key] |
| return None |
| |
| def set(self, key: str, value: Any): |
| |
| if len(self._cache) > 100: |
| oldest = min(self._cache.items(), key=lambda x: x[1][1])[0] |
| del self._cache[oldest] |
| self._cache[key] = (value, datetime.now()) |
| |
| def clear(self): |
| self._cache.clear() |
| |
| def size(self): |
| return len(self._cache) |
|
|
| cache = SimpleCache(Config.CACHE_TTL_MINUTES) |
|
|
| |
| |
| |
|
|
| @dataclass |
| class Book: |
| md5: Optional[str] |
| title: str |
| author: str |
| publisher: str |
| year: Optional[int] |
| format: str |
| language: str |
| size_mb: float |
| url: str |
| cover_url: Optional[str] = None |
| |
| def to_dict(self): |
| return asdict(self) |
|
|
| |
| |
| |
|
|
| def clean_url(url: str) -> str: |
| """Remove query params from URL""" |
| if not url: |
| return "" |
| from urllib.parse import urlparse, urlunparse |
| parsed = urlparse(url) |
| return urlunparse((parsed.scheme, parsed.netloc, parsed.path.rstrip('/'), '', '', '')) |
|
|
| def clean_text(text: str) -> str: |
| """Clean HTML entities and extra spaces""" |
| if not text: |
| return "" |
| text = html.unescape(text) |
| text = re.sub(r'[👤🏢📘🚀]', '', text) |
| text = re.sub(r'\s+', ' ', text) |
| return text.strip() |
|
|
| def parse_size(size_str: str) -> float: |
| """Convert size to MB""" |
| if not size_str: |
| return 0.0 |
| |
| match = re.search(r'([\d.]+)\s*([KMGT]?B)', size_str, re.I) |
| if not match: |
| return 0.0 |
| |
| num = float(match.group(1)) |
| unit = match.group(2).upper() |
| |
| multipliers = {'B': 1/1024/1024, 'KB': 1/1024, 'MB': 1, 'GB': 1024, 'TB': 1024*1024} |
| return round(num * multipliers.get(unit, 1), 2) |
|
|
| |
| |
| |
|
|
| class MirrorManager: |
| """Gère la détection et sélection des mirrors""" |
| |
| def __init__(self): |
| self._current_mirror = None |
| |
| @lru_cache(maxsize=1) |
| def get_mirrors(self) -> list[dict]: |
| """Fetch mirrors list (cached)""" |
| logger.info("Fetching mirrors...") |
| try: |
| resp = requests.get( |
| Config.MIRRORS_URL, |
| impersonate=Config.BROWSER_IMPERSONATE, |
| timeout=Config.REQUEST_TIMEOUT |
| ) |
| |
| soup = BeautifulSoup(resp.text, "html.parser") |
| article = soup.find("article", class_="book-article") |
| if not article: |
| return [] |
| |
| heading = article.find("h3", id="links") |
| if not heading: |
| return [] |
| |
| ul = heading.find_next_sibling("ul") |
| if not ul: |
| return [] |
| |
| mirrors = [] |
| for li in ul.find_all("li"): |
| a = li.find("a", href=True) |
| if a: |
| mirrors.append({ |
| "label": a.get_text(strip=True), |
| "url": clean_url(a["href"]) |
| }) |
| |
| logger.info(f"Found {len(mirrors)} mirrors") |
| return mirrors |
| |
| except Exception as e: |
| logger.error(f"Failed to fetch mirrors: {e}") |
| return [] |
| |
| def get_active_mirror(self) -> str: |
| """Get first working mirror""" |
| if self._current_mirror: |
| return self._current_mirror |
| |
| mirrors = self.get_mirrors() |
| |
| for mirror in mirrors: |
| try: |
| logger.info(f"Testing mirror: {mirror['url']}") |
| resp = requests.get( |
| mirror['url'], |
| impersonate=Config.BROWSER_IMPERSONATE, |
| timeout=10 |
| ) |
| if resp.status_code == 200: |
| self._current_mirror = mirror['url'] |
| logger.info(f"✅ Active mirror: {self._current_mirror}") |
| return self._current_mirror |
| except Exception as e: |
| logger.warning(f"Mirror {mirror['url']} failed: {e}") |
| continue |
| |
| logger.warning("No active mirror, using default") |
| self._current_mirror = Config.DEFAULT_BASE_URL |
| return self._current_mirror |
|
|
| mirror_manager = MirrorManager() |
|
|
| |
| |
| |
|
|
| def scrape_search(query: str, page: int = 1, **filters) -> dict: |
| """Scrape Anna's Archives search results""" |
| |
| |
| cache_key = f"{query}_{page}_{filters}" |
| cached = cache.get(cache_key) |
| if cached: |
| logger.info(f"Cache HIT: {query} (page {page})") |
| return cached |
| |
| logger.info(f"Scraping: {query} (page {page})") |
| |
| base_url = mirror_manager.get_active_mirror() |
| search_url = f"{base_url}/search" |
| |
| params = {"q": query} |
| if page > 1: |
| params["page"] = page |
| |
| |
| for key in ['lang', 'content', 'ext', 'sort']: |
| if key in filters and filters[key]: |
| params[key] = filters[key] |
| |
| try: |
| resp = requests.get( |
| search_url, |
| params=params, |
| impersonate=Config.BROWSER_IMPERSONATE, |
| timeout=Config.REQUEST_TIMEOUT |
| ) |
| resp.raise_for_status() |
| |
| |
| books = parse_books(resp.text, base_url) |
| has_more = check_next_page(resp.text) |
| |
| result = { |
| "books": [b.to_dict() for b in books], |
| "total": len(books), |
| "has_more": has_more, |
| "timestamp": datetime.now().isoformat() |
| } |
| |
| |
| cache.set(cache_key, result) |
| |
| logger.info(f"Found {len(books)} books") |
| return result |
| |
| except Exception as e: |
| logger.error(f"Scraping error: {e}") |
| return {"books": [], "total": 0, "has_more": False, "error": str(e)} |
|
|
| def parse_books(html: str, base_url: str) -> list[Book]: |
| """Parse book blocks from HTML""" |
| soup = BeautifulSoup(html, 'html.parser') |
| books = [] |
| seen_md5s = set() |
| |
| |
| blocks = soup.find_all('div', class_=lambda x: x and 'flex pt-3 pb-3' in x) |
| |
| for block in blocks: |
| try: |
| |
| md5 = None |
| md5_div = block.find('div', class_='hidden') |
| if md5_div: |
| match = re.search(r'md5:([a-f0-9]{32})', md5_div.text) |
| if match: |
| md5 = match.group(1) |
| |
| if md5 in seen_md5s: |
| continue |
| |
| |
| title_link = block.find('a', class_=['js-vim-focus', 'font-semibold']) |
| if not title_link: |
| continue |
| |
| title = clean_text(title_link.text) |
| url = title_link.get('href', '') |
| if url.startswith('/'): |
| url = f"{base_url}{url}" |
| |
| |
| if not md5: |
| match = re.search(r'/md5/([a-f0-9]{32})', url) |
| if match: |
| md5 = match.group(1) |
| |
| if md5: |
| seen_md5s.add(md5) |
| |
| |
| cover_img = block.find('img') |
| cover_url = None |
| if cover_img: |
| cover_url = cover_img.get('src', '') |
| if cover_url.startswith('/'): |
| cover_url = f"{base_url}{cover_url}" |
| |
| |
| author = "Unknown" |
| for link in block.find_all('a', href=re.compile(r'search\?q=')): |
| if 'user-edit' in str(link): |
| author = clean_text(link.text) |
| break |
| |
| |
| publisher = "Unknown" |
| year = None |
| for link in block.find_all('a', href=re.compile(r'search\?q=')): |
| if 'company' in str(link): |
| pub_text = clean_text(link.text) |
| match = re.search(r'(.+),\s*(\d{4})$', pub_text) |
| if match: |
| publisher = match.group(1) |
| year = int(match.group(2)) |
| else: |
| publisher = pub_text |
| break |
| |
| |
| info_div = block.find('div', class_=re.compile(r'text-gray-800')) |
| info_text = info_div.text if info_div else "" |
| |
| |
| format_match = re.search(r'·\s*([A-Z0-9]+)\s*·', info_text) |
| lang_match = re.search(r'\[([a-z]{2})\]', info_text) |
| size_match = re.search(r'([\d.]+[KMGT]?B)', info_text) |
| year_match = re.search(r'·\s*(\d{4})\s*·', info_text) |
| |
| book = Book( |
| md5=md5, |
| title=title, |
| author=author, |
| publisher=publisher, |
| year=year or (int(year_match.group(1)) if year_match else None), |
| format=format_match.group(1) if format_match else "UNKNOWN", |
| language=lang_match.group(1) if lang_match else "xx", |
| size_mb=parse_size(size_match.group(1)) if size_match else 0.0, |
| url=url, |
| cover_url=cover_url |
| ) |
| |
| books.append(book) |
| |
| except Exception as e: |
| logger.warning(f"Error parsing book: {e}") |
| continue |
| |
| return books |
|
|
| def check_next_page(html: str) -> bool: |
| """Check if there's a next page""" |
| soup = BeautifulSoup(html, 'html.parser') |
| return soup.find('a', string=re.compile(r'Next|→|»')) is not None |
|
|
| |
| |
| |
|
|
| app = Flask(__name__) |
|
|
| @app.route('/') |
| def index(): |
| """API Documentation""" |
| return jsonify({ |
| "name": "Anna's Archives API", |
| "version": "1.0.1", |
| "description": "Hugging Face Space Edition - Optimized for free tier", |
| "browser_impersonate": Config.BROWSER_IMPERSONATE, |
| "endpoints": { |
| "GET /": "This documentation", |
| "GET /search": "Search books", |
| "GET /health": "Health check", |
| "GET /mirrors": "List mirrors", |
| "POST /cache/clear": "Clear cache" |
| }, |
| "examples": { |
| "search": "/search?q=python", |
| "with_filters": "/search?q=machine+learning&ext=pdf&lang=en", |
| "pagination": "/search?q=python&page=2", |
| "csv_export": "/search?q=python&format=csv" |
| }, |
| "search_params": { |
| "q": "Search query (required)", |
| "page": "Page number (default: 1)", |
| "lang": "Language code (en, fr, es, etc.)", |
| "ext": "File extension (pdf, epub, mobi, etc.)", |
| "content": "Content type (book_fiction, book_nonfiction, etc.)", |
| "sort": "Sort order (newest, oldest, largest, smallest)", |
| "format": "Response format (json, csv)" |
| } |
| }) |
|
|
| @app.route('/search') |
| def search(): |
| """Search endpoint""" |
| query = request.args.get('q', '').strip() |
| if not query: |
| return jsonify({"error": "Parameter 'q' is required"}), 400 |
| |
| try: |
| page = max(1, int(request.args.get('page', 1))) |
| except ValueError: |
| return jsonify({"error": "Invalid page number"}), 400 |
| |
| filters = { |
| 'lang': request.args.get('lang'), |
| 'ext': request.args.get('ext'), |
| 'content': request.args.get('content'), |
| 'sort': request.args.get('sort') |
| } |
| |
| result = scrape_search(query, page, **filters) |
| |
| |
| if request.args.get('format') == 'csv': |
| output = io.StringIO() |
| if result['books']: |
| writer = csv.DictWriter(output, fieldnames=result['books'][0].keys()) |
| writer.writeheader() |
| writer.writerows(result['books']) |
| |
| return Response( |
| output.getvalue(), |
| mimetype='text/csv', |
| headers={'Content-Disposition': f'attachment; filename=search_{query}.csv'} |
| ) |
| |
| return jsonify({ |
| "query": query, |
| "page": page, |
| **result, |
| "filters": filters |
| }) |
|
|
| @app.route('/health') |
| def health(): |
| """Health check""" |
| try: |
| mirror = mirror_manager.get_active_mirror() |
| status = "healthy" |
| except: |
| mirror = "unavailable" |
| status = "degraded" |
| |
| return jsonify({ |
| "status": status, |
| "mirror": mirror, |
| "cache_size": cache.size(), |
| "browser": Config.BROWSER_IMPERSONATE, |
| "uptime": "ok" |
| }) |
|
|
| @app.route('/mirrors') |
| def mirrors(): |
| """List available mirrors""" |
| return jsonify({ |
| "mirrors": mirror_manager.get_mirrors(), |
| "current": mirror_manager.get_active_mirror() |
| }) |
|
|
| @app.route('/cache/clear', methods=['POST']) |
| def clear_cache(): |
| """Clear cache""" |
| cache.clear() |
| mirror_manager.get_mirrors.cache_clear() |
| return jsonify({"message": "Cache cleared", "size": 0}) |
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| logger.info("=" * 70) |
| logger.info("🚀 Anna's Archives API - Hugging Face Space Edition") |
| logger.info("=" * 70) |
| logger.info(f"Port: {Config.PORT}") |
| logger.info(f"Browser: {Config.BROWSER_IMPERSONATE}") |
| logger.info(f"Cache TTL: {Config.CACHE_TTL_MINUTES} minutes") |
| logger.info("=" * 70) |
| |
| |
| mirror_manager.get_active_mirror() |
| |
| app.run(host=Config.HOST, port=Config.PORT) |