| """ |
| |
| pcn.py — Module ENT Paris Classe Numérique (sans CLI, pour import API) |
| |
| """ |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
|
|
| import html as html_mod |
|
|
| import json |
|
|
| import logging |
|
|
| import mimetypes |
|
|
| import os |
|
|
| import re |
|
|
| import sqlite3 |
|
|
| import time |
|
|
| import random |
|
|
| from collections import defaultdict |
|
|
| from dataclasses import dataclass, field |
|
|
| from datetime import datetime, timezone, timedelta |
|
|
| from html.parser import HTMLParser |
|
|
| from pathlib import Path |
|
|
| from typing import Any, Optional |
|
|
| from urllib.parse import urlparse, unquote |
|
|
| import requests |
|
|
| try: |
|
|
| import cloudscraper |
|
|
| except ImportError: |
|
|
| cloudscraper = None |
|
|
| _log = logging.getLogger("pcn") |
|
|
| |
|
|
| |
|
|
| |
|
|
|
|
|
|
| @dataclass |
|
|
| class Config: |
|
|
| base_url: str = “https://ent.parisclassenumerique.fr” |
|
|
| login: str = “” |
|
|
| password: str = “” |
|
|
| hours_back: int = 24 |
|
|
| fetch_body: bool = True |
|
|
| fetch_attachments: bool = False |
|
|
| attachments_dir: Path = field(default_factory=lambda: Path("/tmp/pcn_pj")) |
|
|
| max_notif_pages: int = 50 |
|
|
| max_msg_pages: int = 30 |
|
|
| msg_page_size: int = 50 |
|
|
| db_path: Path = field(default_factory=lambda: Path("/tmp/pcn_cache.db")) |
|
|
| dry_run: bool = False |
|
|
| notif_types: list[str] = field(default_factory=lambda: [ |
|
|
| "MESSAGERIE", "BLOG", "ACTUALITES", "EXERCIZER", |
|
|
| "COMMUNITIES", "WIKI", "SCRAPBOOK", "TIMELINEGENERATOR", |
|
|
| ]) |
|
|
|
|
|
|
| |
|
|
| |
|
|
| |
|
|
|
|
|
|
| @dataclass |
|
|
| class Attachment: |
|
|
| url: str |
|
|
| filename: str |
|
|
| size_bytes: int = 0 |
|
|
| content_type: str = “” |
|
|
| local_path: Optional[str] = None |
|
|
| downloaded: bool = False |
|
|
| source: str = “” |
|
|
| sha256: Optional[str] = None |
|
|
|
|
|
|
| @dataclass |
|
|
| class Message: |
|
|
| id: str |
|
|
| date: str |
|
|
| sender: str |
|
|
| role: str |
|
|
| subject: str |
|
|
| body: str = “” |
|
|
| has_attachments: bool = False |
|
|
| attachments: list[Attachment] = field(default_factory=list) |
|
|
|
|
|
|
| @dataclass |
|
|
| class Notification: |
|
|
| date: str |
|
|
| type: str |
|
|
| sender: str |
|
|
| subject: str |
|
|
| preview: str = “” |
|
|
|
|
|
|
| @dataclass |
|
|
| class Report: |
|
|
| generated_at: str |
|
|
| user: str |
|
|
| hours_back: int |
|
|
| notifications: list[Notification] = field(default_factory=list) |
|
|
| messages: list[Message] = field(default_factory=list) |
|
|
| stats: dict = field(default_factory=dict) |
|
|
|
|
|
|
| |
|
|
| |
|
|
| |
|
|
|
|
|
|
| def _pause(lo=0.5, hi=1.5): |
|
|
| time.sleep(random.uniform(lo, hi)) |
|
|
|
|
|
|
| _MIME_EXT = { |
|
|
| "image/jpeg": ".jpg", "image/png": ".png", "image/gif": ".gif", |
|
|
| "application/pdf": ".pdf", |
|
|
| "application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx", |
|
|
| "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx", |
|
|
| "application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx", |
|
|
| "application/msword": ".doc", "application/zip": ".zip", |
|
|
| "text/plain": ".txt", "audio/mpeg": ".mp3", "video/mp4": ".mp4", |
|
|
| } |
|
|
| _FILE_EXTS = frozenset( |
|
|
| “.pdf .doc .docx .xls .xlsx .ppt .pptx .odt .ods .odp .rtf .txt .csv ” |
|
|
| “.jpg .jpeg .png .gif .bmp .svg .webp .mp3 .mp4 .avi .mkv .mov .wav ” |
|
|
| ".zip .rar .7z .html .epub".split() |
|
|
| ) |
|
|
| _ENT_FILE_PATTERNS = ( |
|
|
| "/workspace/document/", "/workspace/pub/document/", "/workspace/pub/", |
|
|
| "/conversation/api/messages/", "/infra/file/", "/blog/pub/", |
|
|
| ) |
|
|
|
|
|
|
| def _safe_name(name: str, maxlen: int = 200) -> str: |
|
|
| name = re.sub(r'[\\/*?:"<>|\x00-\x1f]', "_", name) |
|
|
| return (name.strip(". ") or "fichier")[:maxlen] |
|
|
|
|
|
|
| def _resolve_filename(resp: requests.Response, hint: str) -> str: |
|
|
| cd = resp.headers.get("Content-Disposition", "") |
|
|
| m = re.search(r"filename\*\s*=\s*(?:UTF-8|utf-8)''([^;\s]+)", cd, re.I) |
|
|
| if m: |
|
|
| return unquote(m.group(1)) |
|
|
| m = re.search(r'filename="([^"]+)"', cd, re.I) |
|
|
| if m: |
|
|
| return m.group(1).strip() |
|
|
| name = hint or “fichier” |
|
|
| if not Path(name).suffix: |
|
|
| ct = resp.headers.get("Content-Type", "").split(";")[0].strip().lower() |
|
|
| ext = _MIME_EXT.get(ct, "") or (mimetypes.guess_extension(ct) or "") |
|
|
| if ext: |
|
|
| name += ext |
|
|
| return name |
|
|
|
|
|
|
| |
|
|
| |
|
|
| |
|
|
|
|
|
|
| class _TextExtractor(HTMLParser): |
|
|
| _BLOCK = frozenset("p div br h1 h2 h3 h4 h5 h6 li tr blockquote pre hr".split()) |
|
|
| _SKIP = frozenset("script style head".split()) |
|
|
| def __init__(self): |
|
|
| super().__init__() |
|
|
| self._buf, self._skip = [], 0 |
|
|
| def handle_starttag(self, tag, _): |
|
|
| t = tag.lower() |
|
|
| if t in self._SKIP: self._skip += 1 |
|
|
| elif t in self._BLOCK: self._buf.append("\n") |
|
|
| def handle_endtag(self, tag): |
|
|
| t = tag.lower() |
|
|
| if t in self._SKIP: self._skip = max(0, self._skip - 1) |
|
|
| elif t in self._BLOCK: self._buf.append("\n") |
|
|
| def handle_data(self, data): |
|
|
| if not self._skip: self._buf.append(data) |
|
|
| def handle_entityref(self, name): |
|
|
| self._buf.append(html_mod.unescape(f"&{name};")) |
|
|
| def handle_charref(self, name): |
|
|
| self._buf.append(html_mod.unescape(f"&#{name};")) |
|
|
| def text(self): |
|
|
| t = "".join(self._buf) |
|
|
| t = re.sub(r"[ \t]+", " ", t) |
|
|
| t = re.sub(r"\n{3,}", "\n\n", t) |
|
|
| return t.strip() |
|
|
|
|
|
|
| def html_to_text(raw: str) -> str: |
|
|
| if not raw: |
|
|
| return “” |
|
|
| p = _TextExtractor() |
|
|
| try: |
|
|
| p.feed(raw) |
|
|
| return p.text() |
|
|
| except Exception: |
|
|
| t = re.sub(r"<br\s*/?>", "\n", raw, flags=re.I) |
|
|
| t = re.sub(r"</(?:p|div|h\d|li|tr)>", "\n", t, flags=re.I) |
|
|
| return re.sub(r"<[^>]+>", "", t).strip() |
|
|
|
|
|
|
| class _ResourceExtractor(HTMLParser): |
|
|
| _URL_ATTRS = frozenset("href src data-src data-document-href data-download-url poster data-uri data-href".split()) |
|
|
| def __init__(self, base: str): |
|
|
| super().__init__() |
|
|
| self.base = base |
|
|
| self._host = urlparse(base).netloc |
|
|
| self.found: list[dict] = [] |
|
|
| self._seen: set[str] = set() |
|
|
| def _norm(self, url): |
|
|
| url = url.strip() |
|
|
| if url.startswith("//"): return "https:" + url |
|
|
| if url.startswith("/"): return self.base + url |
|
|
| return url |
|
|
| def _same_domain(self, url): |
|
|
| h = urlparse(url).netloc |
|
|
| return not h or h == self._host |
|
|
| def _looks_like_file(self, url): |
|
|
| path = urlparse(url).path.lower() |
|
|
| if any(p in path for p in _ENT_FILE_PATTERNS): return True |
|
|
| _, ext = os.path.splitext(path) |
|
|
| return ext in _FILE_EXTS |
|
|
| def _add(self, url, filename, source): |
|
|
| url = self._norm(url) |
|
|
| if url in self._seen or not self._same_domain(url) or not self._looks_like_file(url): |
|
|
| return |
|
|
| self._seen.add(url) |
|
|
| self.found.append({"url": url, "filename": filename or "fichier", "source": source}) |
|
|
| def _best_name(self, attrs, url): |
|
|
| for a in ("data-filename", "title", "alt", "download"): |
|
|
| v = attrs.get(a) |
|
|
| if v and isinstance(v, str) and v.strip(): return v.strip() |
|
|
| return unquote(urlparse(url).path.rstrip("/").split("/")[-1]) or “fichier” |
|
|
| def handle_starttag(self, tag, attrs): |
|
|
| ad = dict(attrs) |
|
|
| tl = tag.lower() |
|
|
| did = (ad.get("data-document-id") or "").strip() |
|
|
| if did: |
|
|
| url = f"{self.base}/workspace/document/{did}" |
|
|
| self._add(url, self._best_name(ad, url), f"data-document-id:{tl}") |
|
|
| for attr in self._URL_ATTRS: |
|
|
| val = ad.get(attr) |
|
|
| if not val or not isinstance(val, str): continue |
|
|
| val = val.strip() |
|
|
| if val.startswith(("data:", "javascript:", "mailto:", "#")): continue |
|
|
| self._add(val, self._best_name(ad, val), f"{attr}:{tl}") |
|
|
| if tl == "object": |
|
|
| val = ad.get("data") |
|
|
| if val and isinstance(val, str) and not val.strip().startswith(("data:", "javascript:")): |
|
|
| self._add(val.strip(), self._best_name(ad, val.strip()), f"data:{tl}") |
|
|
| style = ad.get("style") or “” |
|
|
| if style and isinstance(style, str): |
|
|
| for m in re.finditer(r"url\(['\"]?([^'\")\s]+)['\"]?\)", style): |
|
|
| self._add(m.group(1), "style_resource", f"style:{tl}") |
|
|
|
|
|
|
| def extract_resources(html_str: str, base: str) -> list[dict]: |
|
|
| if not html_str: return [] |
|
|
| resources, seen = [], set() |
|
|
| ex = _ResourceExtractor(base) |
|
|
| try: |
|
|
| ex.feed(html_str) |
|
|
| except Exception: |
|
|
| pass |
|
|
| for r in ex.found: |
|
|
| if r["url"] not in seen: |
|
|
| seen.add(r["url"]) |
|
|
| resources.append® |
|
|
| for m in re.finditer(r"(/workspace/(?:pub/)?document/[a-f0-9-]+(?:/[^\s\"'<>]*)?)", html_str): |
|
|
| url = base + m.group(1) |
|
|
| if url not in seen: |
|
|
| seen.add(url) |
|
|
| fn = unquote(urlparse(url).path.rstrip("/").split("/")[-1]) |
|
|
| resources.append({"url": url, "filename": fn or "workspace_doc", "source": "regex"}) |
|
|
| return resources |
|
|
|
|
|
|
| |
|
|
| |
|
|
| |
|
|
|
|
|
|
| class Cache: |
|
|
| def __init__(self, path: Path): |
|
|
| self._path = path |
|
|
| self._conn: Optional[sqlite3.Connection] = None |
|
|
| def _db(self): |
|
|
| if self._conn is None: |
|
|
| self._conn = sqlite3.connect(str(self._path)) |
|
|
| self._conn.execute("PRAGMA journal_mode=WAL") |
|
|
| self._conn.executescript(""" |
| |
| CREATE TABLE IF NOT EXISTS downloads (url TEXT PRIMARY KEY, filename TEXT, local_path TEXT, sha256 TEXT, size_bytes INTEGER, ts TEXT); |
| |
| CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY, ts TEXT, subject TEXT, sender TEXT); |
| |
| """) |
|
|
| return self._conn |
|
|
| def already(self, url): |
|
|
| r = self._db().execute("SELECT local_path FROM downloads WHERE url=?", (url,)).fetchone() |
|
|
| return r[0] if r and r[0] and Path(r[0]).exists() else None |
|
|
| def save(self, url, fn, lp, h, sz): |
|
|
| self._db().execute("INSERT OR REPLACE INTO downloads VALUES (?,?,?,?,?,?)", |
|
|
| (url, fn, lp, h, sz, datetime.now(timezone.utc).isoformat())) |
|
|
| self._db().commit() |
|
|
| def mark_msg(self, mid, subj, sender): |
|
|
| self._db().execute("INSERT OR REPLACE INTO messages VALUES (?,?,?,?)", |
|
|
| (mid, datetime.now(timezone.utc).isoformat(), subj, sender)) |
|
|
| self._db().commit() |
|
|
| def close(self): |
|
|
| if self._conn: self._conn.close(); self._conn = None |
|
|
|
|
|
|
| |
|
|
| |
|
|
| |
|
|
| _CF_MARKERS = ("cf-browser-verification", "challenge-platform", "cf-challenge", "Just a moment") |
|
|
| _HEADERS = { |
|
|
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0", |
|
|
| "Accept-Language": "fr-FR,fr;q=0.9", "DNT": "1", |
|
|
| } |
|
|
|
|
|
|
| class SmartSession: |
|
|
| MAX_RETRIES = 3 |
|
|
| BACKOFF = 2.0 |
|
|
| def __init__(self): |
|
|
| self._s = requests.Session() |
|
|
| self._s.headers.update(_HEADERS) |
|
|
| self._upgraded = False |
|
|
| @property |
|
|
| def is_cloudscraper(self): return self._upgraded |
|
|
| @property |
|
|
| def cookies(self): return self._s.cookies |
|
|
| @property |
|
|
| def headers(self): return self._s.headers |
|
|
| def _cf_blocked(self, r): |
|
|
| if self._upgraded or r.status_code not in (403, 503): return False |
|
|
| return any(m in r.text[:4000] for m in _CF_MARKERS) |
|
|
| def _upgrade(self): |
|
|
| if self._upgraded: return |
|
|
| if cloudscraper is None: return |
|
|
| _log.warning("Cloudflare detected → cloudscraper") |
|
|
| old = dict(self._s.cookies) |
|
|
| self._s = cloudscraper.create_scraper(browser={"browser": "firefox", "platform": "windows", "mobile": False}) |
|
|
| self._s.headers.update(_HEADERS) |
|
|
| self._s.cookies.update(old) |
|
|
| self._upgraded = True |
|
|
| def _do(self, method, url, **kw): |
|
|
| kw.setdefault("timeout", 30) |
|
|
| last_exc = None |
|
|
| for attempt in range(self.MAX_RETRIES): |
|
|
| try: |
|
|
| r = getattr(self._s, method)(url, **kw) |
|
|
| if self._cf_blocked(r): self._upgrade(); r = getattr(self._s, method)(url, **kw) |
|
|
| if r.status_code == 429: |
|
|
| time.sleep(float(r.headers.get("Retry-After", 10))); continue |
|
|
| if r.status_code >= 500 and attempt < self.MAX_RETRIES - 1: |
|
|
| time.sleep(self.BACKOFF ** attempt); continue |
|
|
| return r |
|
|
| except (requests.ConnectionError, requests.Timeout) as exc: |
|
|
| last_exc = exc |
|
|
| if attempt < self.MAX_RETRIES - 1: time.sleep(self.BACKOFF ** (attempt + 1)) |
|
|
| if last_exc: raise last_exc |
|
|
| return r |
|
|
| def get(self, url, **kw): return self._do("get", url, **kw) |
|
|
| def post(self, url, **kw): return self._do("post", url, **kw) |
|
|
|
|
|
|
| |
|
|
| |
|
|
| |
|
|
|
|
|
|
| class ENTClient: |
|
|
| def __init__(self, cfg: Config): |
|
|
| self.cfg = cfg |
|
|
| self.s = SmartSession() |
|
|
| self.cache = Cache(cfg.db_path) |
|
|
| self.user: dict = {} |
|
|
| self.stats: dict[str, int] = defaultdict(int) |
|
|
| def _xhr(self, ref=None): |
|
|
| return { |
|
|
| "X-XSRF-TOKEN": self.s.cookies.get("XSRF-TOKEN", ""), |
|
|
| "Accept": "application/json, text/plain, */*", |
|
|
| "Referer": ref or f"{self.cfg.base_url}/conversation/conversation", |
|
|
| } |
|
|
| def _api(self, path, params=None, ref=None): |
|
|
| r = self.s.get(f"{self.cfg.base_url}{path}", params=params, headers=self._xhr(ref), timeout=20) |
|
|
| self.stats["api"] += 1 |
|
|
| if r.status_code != 200: return None |
|
|
| try: return r.json() |
|
|
| except Exception: return None |
|
|
| def login(self): |
|
|
| _log.info("Connecting to PCN…") |
|
|
| self.s.get(f"{self.cfg.base_url}/auth/login", timeout=30) |
|
|
| _pause(1.0, 2.0) |
|
|
| xsrf = self.s.cookies.get("XSRF-TOKEN", "") |
|
|
| self.s.post(f"{self.cfg.base_url}/auth/login", |
|
|
| data={"email": self.cfg.login, "password": self.cfg.password}, |
|
|
| headers={"X-XSRF-TOKEN": xsrf, "Content-Type": "application/x-www-form-urlencoded", |
|
|
| "Origin": self.cfg.base_url}, |
|
|
| timeout=30, allow_redirects=True) |
|
|
| _pause(1.5, 2.5) |
|
|
| if self.s.cookies.get("authenticated") != "true": |
|
|
| r = self.s.get(f"{self.cfg.base_url}/auth/oauth2/userinfo", headers=self._xhr(), timeout=15) |
|
|
| if r.status_code != 200: |
|
|
| raise Exception("Login failed") |
|
|
| _pause() |
|
|
| self.user = self._api("/auth/oauth2/userinfo") or {} |
|
|
| _log.info("Logged in as: %s %s", self.user.get("firstName", "?"), self.user.get("lastName", "?")) |
|
|
| def fetch_notifications(self): |
|
|
| cutoff = datetime.now(timezone.utc) - timedelta(hours=self.cfg.hours_back) |
|
|
| out, page = [], 0 |
|
|
| while page < self.cfg.max_notif_pages: |
|
|
| data = self._api("/timeline/lastNotifications", |
|
|
| params=[("type", t) for t in self.cfg.notif_types] + [("page", page)]) |
|
|
| if not data: break |
|
|
| items = data.get("results", []) |
|
|
| stop = False |
|
|
| for n in items: |
|
|
| try: dt = datetime.fromisoformat(n["date"]["$date"].replace("Z", "+00:00")) |
|
|
| except Exception: continue |
|
|
| if dt < cutoff: stop = True; break |
|
|
| p = n.get("params", {}) |
|
|
| out.append(Notification( |
|
|
| date=dt.strftime("%Y-%m-%d %H:%M"), type=n.get("type", ""), |
|
|
| sender=p.get("username", ""), |
|
|
| subject=p.get("subject") or p.get("postTitle") or p.get("resourceName", ""), |
|
|
| preview=re.sub(r"\s+", " ", html_to_text(n.get("message", "")))[:300], |
|
|
| )) |
|
|
| if stop or len(items) < 25: break |
|
|
| page += 1; _pause(0.3, 0.8) |
|
|
| self.stats["notifs"] = len(out) |
|
|
| return out |
|
|
| def fetch_messages(self): |
|
|
| cutoff = datetime.now(timezone.utc) - timedelta(hours=self.cfg.hours_back) |
|
|
| out, page = [], 0 |
|
|
| while page < self.cfg.max_msg_pages: |
|
|
| items = self._api("/conversation/api/folders/inbox/messages", |
|
|
| params={"page_size": self.cfg.msg_page_size, "page": page, "unread": "true"}) |
|
|
| if not items: break |
|
|
| stop = False |
|
|
| for m in items: |
|
|
| try: dt = datetime.fromtimestamp(m["date"] / 1000, tz=timezone.utc) |
|
|
| except Exception: continue |
|
|
| if dt < cutoff: stop = True; break |
|
|
| m["_dt"] = dt.strftime("%Y-%m-%d %H:%M") |
|
|
| out.append(m) |
|
|
| if stop or len(items) < self.cfg.msg_page_size: break |
|
|
| page += 1; _pause(0.3, 0.8) |
|
|
| self.stats["msgs"] = len(out) |
|
|
| return out |
|
|
| def _detail(self, mid): |
|
|
| data = self._api(f"/conversation/api/messages/{mid}") |
|
|
| if not data: return {"body_text": "", "api_att": [], "html_res": []} |
|
|
| body_html = data.get("body", "") |
|
|
| return {"body_text": html_to_text(body_html), "api_att": data.get("attachments", []), |
|
|
| "html_res": extract_resources(body_html, self.cfg.base_url)} |
|
|
| def _download(self, url, dest_dir, hint): |
|
|
| cached = self.cache.already(url) |
|
|
| if cached: self.stats["cache"] += 1; return Attachment(url=url, filename=hint, local_path=cached, downloaded=True, source="cache") |
|
|
| if self.cfg.dry_run: return Attachment(url=url, filename=hint, downloaded=False, source="dry-run") |
|
|
| dest_dir.mkdir(parents=True, exist_ok=True) |
|
|
| try: |
|
|
| r = self.s.get(url, headers=self._xhr(), timeout=120, stream=True) |
|
|
| if r.status_code != 200: self.stats["dl_err"] += 1; return Attachment(url=url, filename=hint, downloaded=False, source="error") |
|
|
| real = _resolve_filename(r, hint); safe = _safe_name(real); dest = dest_dir / safe |
|
|
| if dest.exists(): |
|
|
| stem, suf = dest.stem, dest.suffix; i = 1 |
|
|
| while dest.exists(): dest = dest_dir / f"{stem}_{i}{suf}"; i += 1 |
|
|
| sha = hashlib.sha256(); size = 0 |
|
|
| with open(dest, "wb") as fp: |
|
|
| for chunk in r.iter_content(65_536): fp.write(chunk); sha.update(chunk); size += len(chunk) |
|
|
| h = sha.hexdigest() |
|
|
| self.cache.save(url, safe, str(dest), h, size); self.stats["dl"] += 1; self.stats["dl_bytes"] += size |
|
|
| return Attachment(url=url, filename=safe, size_bytes=size, content_type=r.headers.get("Content-Type", "").split(";")[0].strip(), |
|
|
| local_path=str(dest), downloaded=True, source="download", sha256=h) |
|
|
| except Exception: |
|
|
| self.stats["dl_err"] += 1; return Attachment(url=url, filename=hint, downloaded=False, source="error") |
|
|
| def _attachments(self, mid, detail): |
|
|
| out, seen = [], set() |
|
|
| d = self.cfg.attachments_dir / mid |
|
|
| for a in detail.get("api_att", []): |
|
|
| fid = a.get("id", ""); fn = a.get("filename", f"file_{fid}") |
|
|
| url = f"{self.cfg.base_url}/conversation/api/messages/{mid}/attachments/{fid}" |
|
|
| if url in seen: continue; seen.add(url); _pause(0.2, 0.6) |
|
|
| att = self._download(url, d, fn); out.append(att) |
|
|
| for res in detail.get("htm |