aa / app.py
Voxxium's picture
Update app.py
ec152af verified
#!/usr/bin/env python3
"""
Anna's Archives API - Hugging Face Space Edition
Optimized for HF Free Tier (CPU-only, minimal resources)
"""
import os
import re
import time
import logging
from datetime import datetime, timedelta
from functools import lru_cache
from dataclasses import dataclass, asdict
from typing import Optional, Any
import html
from flask import Flask, jsonify, request, Response
from curl_cffi import requests
from bs4 import BeautifulSoup
import csv
import io
# ============================================================================
# CONFIGURATION
# ============================================================================
class Config:
# Server
PORT = int(os.getenv("PORT", 7860)) # HF Spaces default port
HOST = "0.0.0.0"
# Anna's Archives
MIRRORS_URL = "https://shadowlibraries.github.io/DirectDownloads/AnnasArchive/"
DEFAULT_BASE_URL = "https://annas-archive.gs"
# ⚠️ CORRECTION ICI : Utiliser une version compatible
BROWSER_IMPERSONATE = "chrome110" # ✅ Versions supportées : chrome99, chrome100, chrome101, chrome104, chrome107, chrome110, chrome116, chrome119, chrome120, edge99, edge101, safari15_3, safari15_5
# Performance (optimized for free tier)
CACHE_TTL_MINUTES = 10
REQUEST_TIMEOUT = 20
MAX_RETRIES = 2
# Logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)
# ============================================================================
# SIMPLE CACHE
# ============================================================================
class SimpleCache:
"""Cache minimaliste avec TTL"""
def __init__(self, ttl_minutes: int):
self._cache = {}
self._ttl = timedelta(minutes=ttl_minutes)
def get(self, key: str) -> Optional[Any]:
if key in self._cache:
value, timestamp = self._cache[key]
if datetime.now() - timestamp < self._ttl:
return value
del self._cache[key]
return None
def set(self, key: str, value: Any):
# Auto-cleanup: max 100 items
if len(self._cache) > 100:
oldest = min(self._cache.items(), key=lambda x: x[1][1])[0]
del self._cache[oldest]
self._cache[key] = (value, datetime.now())
def clear(self):
self._cache.clear()
def size(self):
return len(self._cache)
cache = SimpleCache(Config.CACHE_TTL_MINUTES)
# ============================================================================
# DATA MODELS
# ============================================================================
@dataclass
class Book:
md5: Optional[str]
title: str
author: str
publisher: str
year: Optional[int]
format: str
language: str
size_mb: float
url: str
cover_url: Optional[str] = None
def to_dict(self):
return asdict(self)
# ============================================================================
# UTILITIES
# ============================================================================
def clean_url(url: str) -> str:
"""Remove query params from URL"""
if not url:
return ""
from urllib.parse import urlparse, urlunparse
parsed = urlparse(url)
return urlunparse((parsed.scheme, parsed.netloc, parsed.path.rstrip('/'), '', '', ''))
def clean_text(text: str) -> str:
"""Clean HTML entities and extra spaces"""
if not text:
return ""
text = html.unescape(text)
text = re.sub(r'[👤🏢📘🚀]', '', text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def parse_size(size_str: str) -> float:
"""Convert size to MB"""
if not size_str:
return 0.0
match = re.search(r'([\d.]+)\s*([KMGT]?B)', size_str, re.I)
if not match:
return 0.0
num = float(match.group(1))
unit = match.group(2).upper()
multipliers = {'B': 1/1024/1024, 'KB': 1/1024, 'MB': 1, 'GB': 1024, 'TB': 1024*1024}
return round(num * multipliers.get(unit, 1), 2)
# ============================================================================
# MIRROR MANAGER
# ============================================================================
class MirrorManager:
"""Gère la détection et sélection des mirrors"""
def __init__(self):
self._current_mirror = None
@lru_cache(maxsize=1)
def get_mirrors(self) -> list[dict]:
"""Fetch mirrors list (cached)"""
logger.info("Fetching mirrors...")
try:
resp = requests.get(
Config.MIRRORS_URL,
impersonate=Config.BROWSER_IMPERSONATE,
timeout=Config.REQUEST_TIMEOUT
)
soup = BeautifulSoup(resp.text, "html.parser")
article = soup.find("article", class_="book-article")
if not article:
return []
heading = article.find("h3", id="links")
if not heading:
return []
ul = heading.find_next_sibling("ul")
if not ul:
return []
mirrors = []
for li in ul.find_all("li"):
a = li.find("a", href=True)
if a:
mirrors.append({
"label": a.get_text(strip=True),
"url": clean_url(a["href"])
})
logger.info(f"Found {len(mirrors)} mirrors")
return mirrors
except Exception as e:
logger.error(f"Failed to fetch mirrors: {e}")
return []
def get_active_mirror(self) -> str:
"""Get first working mirror"""
if self._current_mirror:
return self._current_mirror
mirrors = self.get_mirrors()
for mirror in mirrors:
try:
logger.info(f"Testing mirror: {mirror['url']}")
resp = requests.get(
mirror['url'],
impersonate=Config.BROWSER_IMPERSONATE,
timeout=10
)
if resp.status_code == 200:
self._current_mirror = mirror['url']
logger.info(f"✅ Active mirror: {self._current_mirror}")
return self._current_mirror
except Exception as e:
logger.warning(f"Mirror {mirror['url']} failed: {e}")
continue
logger.warning("No active mirror, using default")
self._current_mirror = Config.DEFAULT_BASE_URL
return self._current_mirror
mirror_manager = MirrorManager()
# ============================================================================
# SCRAPER
# ============================================================================
def scrape_search(query: str, page: int = 1, **filters) -> dict:
"""Scrape Anna's Archives search results"""
# Check cache
cache_key = f"{query}_{page}_{filters}"
cached = cache.get(cache_key)
if cached:
logger.info(f"Cache HIT: {query} (page {page})")
return cached
logger.info(f"Scraping: {query} (page {page})")
base_url = mirror_manager.get_active_mirror()
search_url = f"{base_url}/search"
params = {"q": query}
if page > 1:
params["page"] = page
# Add filters
for key in ['lang', 'content', 'ext', 'sort']:
if key in filters and filters[key]:
params[key] = filters[key]
try:
resp = requests.get(
search_url,
params=params,
impersonate=Config.BROWSER_IMPERSONATE,
timeout=Config.REQUEST_TIMEOUT
)
resp.raise_for_status()
# Parse results
books = parse_books(resp.text, base_url)
has_more = check_next_page(resp.text)
result = {
"books": [b.to_dict() for b in books],
"total": len(books),
"has_more": has_more,
"timestamp": datetime.now().isoformat()
}
# Cache result
cache.set(cache_key, result)
logger.info(f"Found {len(books)} books")
return result
except Exception as e:
logger.error(f"Scraping error: {e}")
return {"books": [], "total": 0, "has_more": False, "error": str(e)}
def parse_books(html: str, base_url: str) -> list[Book]:
"""Parse book blocks from HTML"""
soup = BeautifulSoup(html, 'html.parser')
books = []
seen_md5s = set()
# Find book blocks
blocks = soup.find_all('div', class_=lambda x: x and 'flex pt-3 pb-3' in x)
for block in blocks:
try:
# MD5
md5 = None
md5_div = block.find('div', class_='hidden')
if md5_div:
match = re.search(r'md5:([a-f0-9]{32})', md5_div.text)
if match:
md5 = match.group(1)
if md5 in seen_md5s:
continue
# Title & URL
title_link = block.find('a', class_=['js-vim-focus', 'font-semibold'])
if not title_link:
continue
title = clean_text(title_link.text)
url = title_link.get('href', '')
if url.startswith('/'):
url = f"{base_url}{url}"
# Extract MD5 from URL if needed
if not md5:
match = re.search(r'/md5/([a-f0-9]{32})', url)
if match:
md5 = match.group(1)
if md5:
seen_md5s.add(md5)
# Cover
cover_img = block.find('img')
cover_url = None
if cover_img:
cover_url = cover_img.get('src', '')
if cover_url.startswith('/'):
cover_url = f"{base_url}{cover_url}"
# Author
author = "Unknown"
for link in block.find_all('a', href=re.compile(r'search\?q=')):
if 'user-edit' in str(link):
author = clean_text(link.text)
break
# Publisher & Year
publisher = "Unknown"
year = None
for link in block.find_all('a', href=re.compile(r'search\?q=')):
if 'company' in str(link):
pub_text = clean_text(link.text)
match = re.search(r'(.+),\s*(\d{4})$', pub_text)
if match:
publisher = match.group(1)
year = int(match.group(2))
else:
publisher = pub_text
break
# Info line
info_div = block.find('div', class_=re.compile(r'text-gray-800'))
info_text = info_div.text if info_div else ""
# Parse format, language, size
format_match = re.search(r'·\s*([A-Z0-9]+)\s*·', info_text)
lang_match = re.search(r'\[([a-z]{2})\]', info_text)
size_match = re.search(r'([\d.]+[KMGT]?B)', info_text)
year_match = re.search(r'·\s*(\d{4})\s*·', info_text)
book = Book(
md5=md5,
title=title,
author=author,
publisher=publisher,
year=year or (int(year_match.group(1)) if year_match else None),
format=format_match.group(1) if format_match else "UNKNOWN",
language=lang_match.group(1) if lang_match else "xx",
size_mb=parse_size(size_match.group(1)) if size_match else 0.0,
url=url,
cover_url=cover_url
)
books.append(book)
except Exception as e:
logger.warning(f"Error parsing book: {e}")
continue
return books
def check_next_page(html: str) -> bool:
"""Check if there's a next page"""
soup = BeautifulSoup(html, 'html.parser')
return soup.find('a', string=re.compile(r'Next|→|»')) is not None
# ============================================================================
# FLASK APP
# ============================================================================
app = Flask(__name__)
@app.route('/')
def index():
"""API Documentation"""
return jsonify({
"name": "Anna's Archives API",
"version": "1.0.1",
"description": "Hugging Face Space Edition - Optimized for free tier",
"browser_impersonate": Config.BROWSER_IMPERSONATE,
"endpoints": {
"GET /": "This documentation",
"GET /search": "Search books",
"GET /health": "Health check",
"GET /mirrors": "List mirrors",
"POST /cache/clear": "Clear cache"
},
"examples": {
"search": "/search?q=python",
"with_filters": "/search?q=machine+learning&ext=pdf&lang=en",
"pagination": "/search?q=python&page=2",
"csv_export": "/search?q=python&format=csv"
},
"search_params": {
"q": "Search query (required)",
"page": "Page number (default: 1)",
"lang": "Language code (en, fr, es, etc.)",
"ext": "File extension (pdf, epub, mobi, etc.)",
"content": "Content type (book_fiction, book_nonfiction, etc.)",
"sort": "Sort order (newest, oldest, largest, smallest)",
"format": "Response format (json, csv)"
}
})
@app.route('/search')
def search():
"""Search endpoint"""
query = request.args.get('q', '').strip()
if not query:
return jsonify({"error": "Parameter 'q' is required"}), 400
try:
page = max(1, int(request.args.get('page', 1)))
except ValueError:
return jsonify({"error": "Invalid page number"}), 400
filters = {
'lang': request.args.get('lang'),
'ext': request.args.get('ext'),
'content': request.args.get('content'),
'sort': request.args.get('sort')
}
result = scrape_search(query, page, **filters)
# CSV export
if request.args.get('format') == 'csv':
output = io.StringIO()
if result['books']:
writer = csv.DictWriter(output, fieldnames=result['books'][0].keys())
writer.writeheader()
writer.writerows(result['books'])
return Response(
output.getvalue(),
mimetype='text/csv',
headers={'Content-Disposition': f'attachment; filename=search_{query}.csv'}
)
return jsonify({
"query": query,
"page": page,
**result,
"filters": filters
})
@app.route('/health')
def health():
"""Health check"""
try:
mirror = mirror_manager.get_active_mirror()
status = "healthy"
except:
mirror = "unavailable"
status = "degraded"
return jsonify({
"status": status,
"mirror": mirror,
"cache_size": cache.size(),
"browser": Config.BROWSER_IMPERSONATE,
"uptime": "ok"
})
@app.route('/mirrors')
def mirrors():
"""List available mirrors"""
return jsonify({
"mirrors": mirror_manager.get_mirrors(),
"current": mirror_manager.get_active_mirror()
})
@app.route('/cache/clear', methods=['POST'])
def clear_cache():
"""Clear cache"""
cache.clear()
mirror_manager.get_mirrors.cache_clear() # Clear LRU cache too
return jsonify({"message": "Cache cleared", "size": 0})
# ============================================================================
# MAIN
# ============================================================================
if __name__ == "__main__":
logger.info("=" * 70)
logger.info("🚀 Anna's Archives API - Hugging Face Space Edition")
logger.info("=" * 70)
logger.info(f"Port: {Config.PORT}")
logger.info(f"Browser: {Config.BROWSER_IMPERSONATE}")
logger.info(f"Cache TTL: {Config.CACHE_TTL_MINUTES} minutes")
logger.info("=" * 70)
# Initialize mirror
mirror_manager.get_active_mirror()
app.run(host=Config.HOST, port=Config.PORT)