""" pcn.py — Module ENT Paris Classe Numérique (sans CLI, pour import API) """ from __future__ import annotations import hashlib import html as html_mod import json import logging import mimetypes import os import re import sqlite3 import time import random from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timezone, timedelta from html.parser import HTMLParser from pathlib import Path from typing import Any, Optional from urllib.parse import urlparse, unquote import requests try: import cloudscraper except ImportError: cloudscraper = None _log = logging.getLogger("pcn") # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Config # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ @dataclass class Config: base_url: str = “https://ent.parisclassenumerique.fr” login: str = “” password: str = “” hours_back: int = 24 fetch_body: bool = True fetch_attachments: bool = False attachments_dir: Path = field(default_factory=lambda: Path("/tmp/pcn_pj")) max_notif_pages: int = 50 max_msg_pages: int = 30 msg_page_size: int = 50 db_path: Path = field(default_factory=lambda: Path("/tmp/pcn_cache.db")) dry_run: bool = False notif_types: list[str] = field(default_factory=lambda: [ "MESSAGERIE", "BLOG", "ACTUALITES", "EXERCIZER", "COMMUNITIES", "WIKI", "SCRAPBOOK", "TIMELINEGENERATOR", ]) # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Data Models # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ @dataclass class Attachment: url: str filename: str size_bytes: int = 0 content_type: str = “” local_path: Optional[str] = None downloaded: bool = False source: str = “” sha256: Optional[str] = None @dataclass class Message: id: str date: str sender: str role: str subject: str body: str = “” has_attachments: bool = False attachments: list[Attachment] = field(default_factory=list) @dataclass class Notification: date: str type: str sender: str subject: str preview: str = “” @dataclass class Report: generated_at: str user: str hours_back: int notifications: list[Notification] = field(default_factory=list) messages: list[Message] = field(default_factory=list) stats: dict = field(default_factory=dict) # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Utilities # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ def _pause(lo=0.5, hi=1.5): time.sleep(random.uniform(lo, hi)) _MIME_EXT = { "image/jpeg": ".jpg", "image/png": ".png", "image/gif": ".gif", "application/pdf": ".pdf", "application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx", "application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx", "application/msword": ".doc", "application/zip": ".zip", "text/plain": ".txt", "audio/mpeg": ".mp3", "video/mp4": ".mp4", } _FILE_EXTS = frozenset( “.pdf .doc .docx .xls .xlsx .ppt .pptx .odt .ods .odp .rtf .txt .csv ” “.jpg .jpeg .png .gif .bmp .svg .webp .mp3 .mp4 .avi .mkv .mov .wav ” ".zip .rar .7z .html .epub".split() ) _ENT_FILE_PATTERNS = ( "/workspace/document/", "/workspace/pub/document/", "/workspace/pub/", "/conversation/api/messages/", "/infra/file/", "/blog/pub/", ) def _safe_name(name: str, maxlen: int = 200) -> str: name = re.sub(r'[\\/*?:"<>|\x00-\x1f]', "_", name) return (name.strip(". ") or "fichier")[:maxlen] def _resolve_filename(resp: requests.Response, hint: str) -> str: cd = resp.headers.get("Content-Disposition", "") m = re.search(r"filename\*\s*=\s*(?:UTF-8|utf-8)''([^;\s]+)", cd, re.I) if m: return unquote(m.group(1)) m = re.search(r'filename="([^"]+)"', cd, re.I) if m: return m.group(1).strip() name = hint or “fichier” if not Path(name).suffix: ct = resp.headers.get("Content-Type", "").split(";")[0].strip().lower() ext = _MIME_EXT.get(ct, "") or (mimetypes.guess_extension(ct) or "") if ext: name += ext return name # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # HTML parsers # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ class _TextExtractor(HTMLParser): _BLOCK = frozenset("p div br h1 h2 h3 h4 h5 h6 li tr blockquote pre hr".split()) _SKIP = frozenset("script style head".split()) def __init__(self): super().__init__() self._buf, self._skip = [], 0 def handle_starttag(self, tag, _): t = tag.lower() if t in self._SKIP: self._skip += 1 elif t in self._BLOCK: self._buf.append("\n") def handle_endtag(self, tag): t = tag.lower() if t in self._SKIP: self._skip = max(0, self._skip - 1) elif t in self._BLOCK: self._buf.append("\n") def handle_data(self, data): if not self._skip: self._buf.append(data) def handle_entityref(self, name): self._buf.append(html_mod.unescape(f"&{name};")) def handle_charref(self, name): self._buf.append(html_mod.unescape(f"&#{name};")) def text(self): t = "".join(self._buf) t = re.sub(r"[ \t]+", " ", t) t = re.sub(r"\n{3,}", "\n\n", t) return t.strip() def html_to_text(raw: str) -> str: if not raw: return “” p = _TextExtractor() try: p.feed(raw) return p.text() except Exception: t = re.sub(r"", "\n", raw, flags=re.I) t = re.sub(r"", "\n", t, flags=re.I) return re.sub(r"<[^>]+>", "", t).strip() class _ResourceExtractor(HTMLParser): _URL_ATTRS = frozenset("href src data-src data-document-href data-download-url poster data-uri data-href".split()) def __init__(self, base: str): super().__init__() self.base = base self._host = urlparse(base).netloc self.found: list[dict] = [] self._seen: set[str] = set() def _norm(self, url): url = url.strip() if url.startswith("//"): return "https:" + url if url.startswith("/"): return self.base + url return url def _same_domain(self, url): h = urlparse(url).netloc return not h or h == self._host def _looks_like_file(self, url): path = urlparse(url).path.lower() if any(p in path for p in _ENT_FILE_PATTERNS): return True _, ext = os.path.splitext(path) return ext in _FILE_EXTS def _add(self, url, filename, source): url = self._norm(url) if url in self._seen or not self._same_domain(url) or not self._looks_like_file(url): return self._seen.add(url) self.found.append({"url": url, "filename": filename or "fichier", "source": source}) def _best_name(self, attrs, url): for a in ("data-filename", "title", "alt", "download"): v = attrs.get(a) if v and isinstance(v, str) and v.strip(): return v.strip() return unquote(urlparse(url).path.rstrip("/").split("/")[-1]) or “fichier” def handle_starttag(self, tag, attrs): ad = dict(attrs) tl = tag.lower() did = (ad.get("data-document-id") or "").strip() if did: url = f"{self.base}/workspace/document/{did}" self._add(url, self._best_name(ad, url), f"data-document-id:{tl}") for attr in self._URL_ATTRS: val = ad.get(attr) if not val or not isinstance(val, str): continue val = val.strip() if val.startswith(("data:", "javascript:", "mailto:", "#")): continue self._add(val, self._best_name(ad, val), f"{attr}:{tl}") if tl == "object": val = ad.get("data") if val and isinstance(val, str) and not val.strip().startswith(("data:", "javascript:")): self._add(val.strip(), self._best_name(ad, val.strip()), f"data:{tl}") style = ad.get("style") or “” if style and isinstance(style, str): for m in re.finditer(r"url\(['\"]?([^'\")\s]+)['\"]?\)", style): self._add(m.group(1), "style_resource", f"style:{tl}") def extract_resources(html_str: str, base: str) -> list[dict]: if not html_str: return [] resources, seen = [], set() ex = _ResourceExtractor(base) try: ex.feed(html_str) except Exception: pass for r in ex.found: if r["url"] not in seen: seen.add(r["url"]) resources.append® for m in re.finditer(r"(/workspace/(?:pub/)?document/[a-f0-9-]+(?:/[^\s\"'<>]*)?)", html_str): url = base + m.group(1) if url not in seen: seen.add(url) fn = unquote(urlparse(url).path.rstrip("/").split("/")[-1]) resources.append({"url": url, "filename": fn or "workspace_doc", "source": "regex"}) return resources # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Cache SQLite # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ class Cache: def __init__(self, path: Path): self._path = path self._conn: Optional[sqlite3.Connection] = None def _db(self): if self._conn is None: self._conn = sqlite3.connect(str(self._path)) self._conn.execute("PRAGMA journal_mode=WAL") self._conn.executescript(""" CREATE TABLE IF NOT EXISTS downloads (url TEXT PRIMARY KEY, filename TEXT, local_path TEXT, sha256 TEXT, size_bytes INTEGER, ts TEXT); CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY, ts TEXT, subject TEXT, sender TEXT); """) return self._conn def already(self, url): r = self._db().execute("SELECT local_path FROM downloads WHERE url=?", (url,)).fetchone() return r[0] if r and r[0] and Path(r[0]).exists() else None def save(self, url, fn, lp, h, sz): self._db().execute("INSERT OR REPLACE INTO downloads VALUES (?,?,?,?,?,?)", (url, fn, lp, h, sz, datetime.now(timezone.utc).isoformat())) self._db().commit() def mark_msg(self, mid, subj, sender): self._db().execute("INSERT OR REPLACE INTO messages VALUES (?,?,?,?)", (mid, datetime.now(timezone.utc).isoformat(), subj, sender)) self._db().commit() def close(self): if self._conn: self._conn.close(); self._conn = None # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Smart Session # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ _CF_MARKERS = ("cf-browser-verification", "challenge-platform", "cf-challenge", "Just a moment") _HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0", "Accept-Language": "fr-FR,fr;q=0.9", "DNT": "1", } class SmartSession: MAX_RETRIES = 3 BACKOFF = 2.0 def __init__(self): self._s = requests.Session() self._s.headers.update(_HEADERS) self._upgraded = False @property def is_cloudscraper(self): return self._upgraded @property def cookies(self): return self._s.cookies @property def headers(self): return self._s.headers def _cf_blocked(self, r): if self._upgraded or r.status_code not in (403, 503): return False return any(m in r.text[:4000] for m in _CF_MARKERS) def _upgrade(self): if self._upgraded: return if cloudscraper is None: return _log.warning("Cloudflare detected → cloudscraper") old = dict(self._s.cookies) self._s = cloudscraper.create_scraper(browser={"browser": "firefox", "platform": "windows", "mobile": False}) self._s.headers.update(_HEADERS) self._s.cookies.update(old) self._upgraded = True def _do(self, method, url, **kw): kw.setdefault("timeout", 30) last_exc = None for attempt in range(self.MAX_RETRIES): try: r = getattr(self._s, method)(url, **kw) if self._cf_blocked(r): self._upgrade(); r = getattr(self._s, method)(url, **kw) if r.status_code == 429: time.sleep(float(r.headers.get("Retry-After", 10))); continue if r.status_code >= 500 and attempt < self.MAX_RETRIES - 1: time.sleep(self.BACKOFF ** attempt); continue return r except (requests.ConnectionError, requests.Timeout) as exc: last_exc = exc if attempt < self.MAX_RETRIES - 1: time.sleep(self.BACKOFF ** (attempt + 1)) if last_exc: raise last_exc return r def get(self, url, **kw): return self._do("get", url, **kw) def post(self, url, **kw): return self._do("post", url, **kw) # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # ENT Client # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ class ENTClient: def __init__(self, cfg: Config): self.cfg = cfg self.s = SmartSession() self.cache = Cache(cfg.db_path) self.user: dict = {} self.stats: dict[str, int] = defaultdict(int) def _xhr(self, ref=None): return { "X-XSRF-TOKEN": self.s.cookies.get("XSRF-TOKEN", ""), "Accept": "application/json, text/plain, */*", "Referer": ref or f"{self.cfg.base_url}/conversation/conversation", } def _api(self, path, params=None, ref=None): r = self.s.get(f"{self.cfg.base_url}{path}", params=params, headers=self._xhr(ref), timeout=20) self.stats["api"] += 1 if r.status_code != 200: return None try: return r.json() except Exception: return None def login(self): _log.info("Connecting to PCN…") self.s.get(f"{self.cfg.base_url}/auth/login", timeout=30) _pause(1.0, 2.0) xsrf = self.s.cookies.get("XSRF-TOKEN", "") self.s.post(f"{self.cfg.base_url}/auth/login", data={"email": self.cfg.login, "password": self.cfg.password}, headers={"X-XSRF-TOKEN": xsrf, "Content-Type": "application/x-www-form-urlencoded", "Origin": self.cfg.base_url}, timeout=30, allow_redirects=True) _pause(1.5, 2.5) if self.s.cookies.get("authenticated") != "true": r = self.s.get(f"{self.cfg.base_url}/auth/oauth2/userinfo", headers=self._xhr(), timeout=15) if r.status_code != 200: raise Exception("Login failed") _pause() self.user = self._api("/auth/oauth2/userinfo") or {} _log.info("Logged in as: %s %s", self.user.get("firstName", "?"), self.user.get("lastName", "?")) def fetch_notifications(self): cutoff = datetime.now(timezone.utc) - timedelta(hours=self.cfg.hours_back) out, page = [], 0 while page < self.cfg.max_notif_pages: data = self._api("/timeline/lastNotifications", params=[("type", t) for t in self.cfg.notif_types] + [("page", page)]) if not data: break items = data.get("results", []) stop = False for n in items: try: dt = datetime.fromisoformat(n["date"]["$date"].replace("Z", "+00:00")) except Exception: continue if dt < cutoff: stop = True; break p = n.get("params", {}) out.append(Notification( date=dt.strftime("%Y-%m-%d %H:%M"), type=n.get("type", ""), sender=p.get("username", ""), subject=p.get("subject") or p.get("postTitle") or p.get("resourceName", ""), preview=re.sub(r"\s+", " ", html_to_text(n.get("message", "")))[:300], )) if stop or len(items) < 25: break page += 1; _pause(0.3, 0.8) self.stats["notifs"] = len(out) return out def fetch_messages(self): cutoff = datetime.now(timezone.utc) - timedelta(hours=self.cfg.hours_back) out, page = [], 0 while page < self.cfg.max_msg_pages: items = self._api("/conversation/api/folders/inbox/messages", params={"page_size": self.cfg.msg_page_size, "page": page, "unread": "true"}) if not items: break stop = False for m in items: try: dt = datetime.fromtimestamp(m["date"] / 1000, tz=timezone.utc) except Exception: continue if dt < cutoff: stop = True; break m["_dt"] = dt.strftime("%Y-%m-%d %H:%M") out.append(m) if stop or len(items) < self.cfg.msg_page_size: break page += 1; _pause(0.3, 0.8) self.stats["msgs"] = len(out) return out def _detail(self, mid): data = self._api(f"/conversation/api/messages/{mid}") if not data: return {"body_text": "", "api_att": [], "html_res": []} body_html = data.get("body", "") return {"body_text": html_to_text(body_html), "api_att": data.get("attachments", []), "html_res": extract_resources(body_html, self.cfg.base_url)} def _download(self, url, dest_dir, hint): cached = self.cache.already(url) if cached: self.stats["cache"] += 1; return Attachment(url=url, filename=hint, local_path=cached, downloaded=True, source="cache") if self.cfg.dry_run: return Attachment(url=url, filename=hint, downloaded=False, source="dry-run") dest_dir.mkdir(parents=True, exist_ok=True) try: r = self.s.get(url, headers=self._xhr(), timeout=120, stream=True) if r.status_code != 200: self.stats["dl_err"] += 1; return Attachment(url=url, filename=hint, downloaded=False, source="error") real = _resolve_filename(r, hint); safe = _safe_name(real); dest = dest_dir / safe if dest.exists(): stem, suf = dest.stem, dest.suffix; i = 1 while dest.exists(): dest = dest_dir / f"{stem}_{i}{suf}"; i += 1 sha = hashlib.sha256(); size = 0 with open(dest, "wb") as fp: for chunk in r.iter_content(65_536): fp.write(chunk); sha.update(chunk); size += len(chunk) h = sha.hexdigest() self.cache.save(url, safe, str(dest), h, size); self.stats["dl"] += 1; self.stats["dl_bytes"] += size return Attachment(url=url, filename=safe, size_bytes=size, content_type=r.headers.get("Content-Type", "").split(";")[0].strip(), local_path=str(dest), downloaded=True, source="download", sha256=h) except Exception: self.stats["dl_err"] += 1; return Attachment(url=url, filename=hint, downloaded=False, source="error") def _attachments(self, mid, detail): out, seen = [], set() d = self.cfg.attachments_dir / mid for a in detail.get("api_att", []): fid = a.get("id", ""); fn = a.get("filename", f"file_{fid}") url = f"{self.cfg.base_url}/conversation/api/messages/{mid}/attachments/{fid}" if url in seen: continue; seen.add(url); _pause(0.2, 0.6) att = self._download(url, d, fn); out.append(att) for res in detail.get("htm