PyNote / pcn.py
Voxxium's picture
Create pcn.py
2757a58 verified
"""
pcn.py — Module ENT Paris Classe Numérique (sans CLI, pour import API)
"""
from __future__ import annotations
import hashlib
import html as html_mod
import json
import logging
import mimetypes
import os
import re
import sqlite3
import time
import random
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from html.parser import HTMLParser
from pathlib import Path
from typing import Any, Optional
from urllib.parse import urlparse, unquote
import requests
try:
import cloudscraper
except ImportError:
cloudscraper = None
_log = logging.getLogger("pcn")
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Config
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
@dataclass
class Config:
base_url: str = “https://ent.parisclassenumerique.fr”
login: str = “”
password: str = “”
hours_back: int = 24
fetch_body: bool = True
fetch_attachments: bool = False
attachments_dir: Path = field(default_factory=lambda: Path("/tmp/pcn_pj"))
max_notif_pages: int = 50
max_msg_pages: int = 30
msg_page_size: int = 50
db_path: Path = field(default_factory=lambda: Path("/tmp/pcn_cache.db"))
dry_run: bool = False
notif_types: list[str] = field(default_factory=lambda: [
"MESSAGERIE", "BLOG", "ACTUALITES", "EXERCIZER",
"COMMUNITIES", "WIKI", "SCRAPBOOK", "TIMELINEGENERATOR",
])
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Data Models
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
@dataclass
class Attachment:
url: str
filename: str
size_bytes: int = 0
content_type: str = “”
local_path: Optional[str] = None
downloaded: bool = False
source: str = “”
sha256: Optional[str] = None
@dataclass
class Message:
id: str
date: str
sender: str
role: str
subject: str
body: str = “”
has_attachments: bool = False
attachments: list[Attachment] = field(default_factory=list)
@dataclass
class Notification:
date: str
type: str
sender: str
subject: str
preview: str = “”
@dataclass
class Report:
generated_at: str
user: str
hours_back: int
notifications: list[Notification] = field(default_factory=list)
messages: list[Message] = field(default_factory=list)
stats: dict = field(default_factory=dict)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Utilities
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def _pause(lo=0.5, hi=1.5):
time.sleep(random.uniform(lo, hi))
_MIME_EXT = {
"image/jpeg": ".jpg", "image/png": ".png", "image/gif": ".gif",
"application/pdf": ".pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx",
"application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx",
"application/msword": ".doc", "application/zip": ".zip",
"text/plain": ".txt", "audio/mpeg": ".mp3", "video/mp4": ".mp4",
}
_FILE_EXTS = frozenset(
“.pdf .doc .docx .xls .xlsx .ppt .pptx .odt .ods .odp .rtf .txt .csv ”
“.jpg .jpeg .png .gif .bmp .svg .webp .mp3 .mp4 .avi .mkv .mov .wav ”
".zip .rar .7z .html .epub".split()
)
_ENT_FILE_PATTERNS = (
"/workspace/document/", "/workspace/pub/document/", "/workspace/pub/",
"/conversation/api/messages/", "/infra/file/", "/blog/pub/",
)
def _safe_name(name: str, maxlen: int = 200) -> str:
name = re.sub(r'[\\/*?:"<>|\x00-\x1f]', "_", name)
return (name.strip(". ") or "fichier")[:maxlen]
def _resolve_filename(resp: requests.Response, hint: str) -> str:
cd = resp.headers.get("Content-Disposition", "")
m = re.search(r"filename\*\s*=\s*(?:UTF-8|utf-8)''([^;\s]+)", cd, re.I)
if m:
return unquote(m.group(1))
m = re.search(r'filename="([^"]+)"', cd, re.I)
if m:
return m.group(1).strip()
name = hint or “fichier”
if not Path(name).suffix:
ct = resp.headers.get("Content-Type", "").split(";")[0].strip().lower()
ext = _MIME_EXT.get(ct, "") or (mimetypes.guess_extension(ct) or "")
if ext:
name += ext
return name
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# HTML parsers
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class _TextExtractor(HTMLParser):
_BLOCK = frozenset("p div br h1 h2 h3 h4 h5 h6 li tr blockquote pre hr".split())
_SKIP = frozenset("script style head".split())
def __init__(self):
super().__init__()
self._buf, self._skip = [], 0
def handle_starttag(self, tag, _):
t = tag.lower()
if t in self._SKIP: self._skip += 1
elif t in self._BLOCK: self._buf.append("\n")
def handle_endtag(self, tag):
t = tag.lower()
if t in self._SKIP: self._skip = max(0, self._skip - 1)
elif t in self._BLOCK: self._buf.append("\n")
def handle_data(self, data):
if not self._skip: self._buf.append(data)
def handle_entityref(self, name):
self._buf.append(html_mod.unescape(f"&{name};"))
def handle_charref(self, name):
self._buf.append(html_mod.unescape(f"&#{name};"))
def text(self):
t = "".join(self._buf)
t = re.sub(r"[ \t]+", " ", t)
t = re.sub(r"\n{3,}", "\n\n", t)
return t.strip()
def html_to_text(raw: str) -> str:
if not raw:
return “”
p = _TextExtractor()
try:
p.feed(raw)
return p.text()
except Exception:
t = re.sub(r"<br\s*/?>", "\n", raw, flags=re.I)
t = re.sub(r"</(?:p|div|h\d|li|tr)>", "\n", t, flags=re.I)
return re.sub(r"<[^>]+>", "", t).strip()
class _ResourceExtractor(HTMLParser):
_URL_ATTRS = frozenset("href src data-src data-document-href data-download-url poster data-uri data-href".split())
def __init__(self, base: str):
super().__init__()
self.base = base
self._host = urlparse(base).netloc
self.found: list[dict] = []
self._seen: set[str] = set()
def _norm(self, url):
url = url.strip()
if url.startswith("//"): return "https:" + url
if url.startswith("/"): return self.base + url
return url
def _same_domain(self, url):
h = urlparse(url).netloc
return not h or h == self._host
def _looks_like_file(self, url):
path = urlparse(url).path.lower()
if any(p in path for p in _ENT_FILE_PATTERNS): return True
_, ext = os.path.splitext(path)
return ext in _FILE_EXTS
def _add(self, url, filename, source):
url = self._norm(url)
if url in self._seen or not self._same_domain(url) or not self._looks_like_file(url):
return
self._seen.add(url)
self.found.append({"url": url, "filename": filename or "fichier", "source": source})
def _best_name(self, attrs, url):
for a in ("data-filename", "title", "alt", "download"):
v = attrs.get(a)
if v and isinstance(v, str) and v.strip(): return v.strip()
return unquote(urlparse(url).path.rstrip("/").split("/")[-1]) or “fichier”
def handle_starttag(self, tag, attrs):
ad = dict(attrs)
tl = tag.lower()
did = (ad.get("data-document-id") or "").strip()
if did:
url = f"{self.base}/workspace/document/{did}"
self._add(url, self._best_name(ad, url), f"data-document-id:{tl}")
for attr in self._URL_ATTRS:
val = ad.get(attr)
if not val or not isinstance(val, str): continue
val = val.strip()
if val.startswith(("data:", "javascript:", "mailto:", "#")): continue
self._add(val, self._best_name(ad, val), f"{attr}:{tl}")
if tl == "object":
val = ad.get("data")
if val and isinstance(val, str) and not val.strip().startswith(("data:", "javascript:")):
self._add(val.strip(), self._best_name(ad, val.strip()), f"data:{tl}")
style = ad.get("style") or “”
if style and isinstance(style, str):
for m in re.finditer(r"url\(['\"]?([^'\")\s]+)['\"]?\)", style):
self._add(m.group(1), "style_resource", f"style:{tl}")
def extract_resources(html_str: str, base: str) -> list[dict]:
if not html_str: return []
resources, seen = [], set()
ex = _ResourceExtractor(base)
try:
ex.feed(html_str)
except Exception:
pass
for r in ex.found:
if r["url"] not in seen:
seen.add(r["url"])
resources.append®
for m in re.finditer(r"(/workspace/(?:pub/)?document/[a-f0-9-]+(?:/[^\s\"'<>]*)?)", html_str):
url = base + m.group(1)
if url not in seen:
seen.add(url)
fn = unquote(urlparse(url).path.rstrip("/").split("/")[-1])
resources.append({"url": url, "filename": fn or "workspace_doc", "source": "regex"})
return resources
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Cache SQLite
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class Cache:
def __init__(self, path: Path):
self._path = path
self._conn: Optional[sqlite3.Connection] = None
def _db(self):
if self._conn is None:
self._conn = sqlite3.connect(str(self._path))
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.executescript("""
CREATE TABLE IF NOT EXISTS downloads (url TEXT PRIMARY KEY, filename TEXT, local_path TEXT, sha256 TEXT, size_bytes INTEGER, ts TEXT);
CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY, ts TEXT, subject TEXT, sender TEXT);
""")
return self._conn
def already(self, url):
r = self._db().execute("SELECT local_path FROM downloads WHERE url=?", (url,)).fetchone()
return r[0] if r and r[0] and Path(r[0]).exists() else None
def save(self, url, fn, lp, h, sz):
self._db().execute("INSERT OR REPLACE INTO downloads VALUES (?,?,?,?,?,?)",
(url, fn, lp, h, sz, datetime.now(timezone.utc).isoformat()))
self._db().commit()
def mark_msg(self, mid, subj, sender):
self._db().execute("INSERT OR REPLACE INTO messages VALUES (?,?,?,?)",
(mid, datetime.now(timezone.utc).isoformat(), subj, sender))
self._db().commit()
def close(self):
if self._conn: self._conn.close(); self._conn = None
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Smart Session
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
_CF_MARKERS = ("cf-browser-verification", "challenge-platform", "cf-challenge", "Just a moment")
_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0",
"Accept-Language": "fr-FR,fr;q=0.9", "DNT": "1",
}
class SmartSession:
MAX_RETRIES = 3
BACKOFF = 2.0
def __init__(self):
self._s = requests.Session()
self._s.headers.update(_HEADERS)
self._upgraded = False
@property
def is_cloudscraper(self): return self._upgraded
@property
def cookies(self): return self._s.cookies
@property
def headers(self): return self._s.headers
def _cf_blocked(self, r):
if self._upgraded or r.status_code not in (403, 503): return False
return any(m in r.text[:4000] for m in _CF_MARKERS)
def _upgrade(self):
if self._upgraded: return
if cloudscraper is None: return
_log.warning("Cloudflare detected → cloudscraper")
old = dict(self._s.cookies)
self._s = cloudscraper.create_scraper(browser={"browser": "firefox", "platform": "windows", "mobile": False})
self._s.headers.update(_HEADERS)
self._s.cookies.update(old)
self._upgraded = True
def _do(self, method, url, **kw):
kw.setdefault("timeout", 30)
last_exc = None
for attempt in range(self.MAX_RETRIES):
try:
r = getattr(self._s, method)(url, **kw)
if self._cf_blocked(r): self._upgrade(); r = getattr(self._s, method)(url, **kw)
if r.status_code == 429:
time.sleep(float(r.headers.get("Retry-After", 10))); continue
if r.status_code >= 500 and attempt < self.MAX_RETRIES - 1:
time.sleep(self.BACKOFF ** attempt); continue
return r
except (requests.ConnectionError, requests.Timeout) as exc:
last_exc = exc
if attempt < self.MAX_RETRIES - 1: time.sleep(self.BACKOFF ** (attempt + 1))
if last_exc: raise last_exc
return r
def get(self, url, **kw): return self._do("get", url, **kw)
def post(self, url, **kw): return self._do("post", url, **kw)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# ENT Client
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class ENTClient:
def __init__(self, cfg: Config):
self.cfg = cfg
self.s = SmartSession()
self.cache = Cache(cfg.db_path)
self.user: dict = {}
self.stats: dict[str, int] = defaultdict(int)
def _xhr(self, ref=None):
return {
"X-XSRF-TOKEN": self.s.cookies.get("XSRF-TOKEN", ""),
"Accept": "application/json, text/plain, */*",
"Referer": ref or f"{self.cfg.base_url}/conversation/conversation",
}
def _api(self, path, params=None, ref=None):
r = self.s.get(f"{self.cfg.base_url}{path}", params=params, headers=self._xhr(ref), timeout=20)
self.stats["api"] += 1
if r.status_code != 200: return None
try: return r.json()
except Exception: return None
def login(self):
_log.info("Connecting to PCN…")
self.s.get(f"{self.cfg.base_url}/auth/login", timeout=30)
_pause(1.0, 2.0)
xsrf = self.s.cookies.get("XSRF-TOKEN", "")
self.s.post(f"{self.cfg.base_url}/auth/login",
data={"email": self.cfg.login, "password": self.cfg.password},
headers={"X-XSRF-TOKEN": xsrf, "Content-Type": "application/x-www-form-urlencoded",
"Origin": self.cfg.base_url},
timeout=30, allow_redirects=True)
_pause(1.5, 2.5)
if self.s.cookies.get("authenticated") != "true":
r = self.s.get(f"{self.cfg.base_url}/auth/oauth2/userinfo", headers=self._xhr(), timeout=15)
if r.status_code != 200:
raise Exception("Login failed")
_pause()
self.user = self._api("/auth/oauth2/userinfo") or {}
_log.info("Logged in as: %s %s", self.user.get("firstName", "?"), self.user.get("lastName", "?"))
def fetch_notifications(self):
cutoff = datetime.now(timezone.utc) - timedelta(hours=self.cfg.hours_back)
out, page = [], 0
while page < self.cfg.max_notif_pages:
data = self._api("/timeline/lastNotifications",
params=[("type", t) for t in self.cfg.notif_types] + [("page", page)])
if not data: break
items = data.get("results", [])
stop = False
for n in items:
try: dt = datetime.fromisoformat(n["date"]["$date"].replace("Z", "+00:00"))
except Exception: continue
if dt < cutoff: stop = True; break
p = n.get("params", {})
out.append(Notification(
date=dt.strftime("%Y-%m-%d %H:%M"), type=n.get("type", ""),
sender=p.get("username", ""),
subject=p.get("subject") or p.get("postTitle") or p.get("resourceName", ""),
preview=re.sub(r"\s+", " ", html_to_text(n.get("message", "")))[:300],
))
if stop or len(items) < 25: break
page += 1; _pause(0.3, 0.8)
self.stats["notifs"] = len(out)
return out
def fetch_messages(self):
cutoff = datetime.now(timezone.utc) - timedelta(hours=self.cfg.hours_back)
out, page = [], 0
while page < self.cfg.max_msg_pages:
items = self._api("/conversation/api/folders/inbox/messages",
params={"page_size": self.cfg.msg_page_size, "page": page, "unread": "true"})
if not items: break
stop = False
for m in items:
try: dt = datetime.fromtimestamp(m["date"] / 1000, tz=timezone.utc)
except Exception: continue
if dt < cutoff: stop = True; break
m["_dt"] = dt.strftime("%Y-%m-%d %H:%M")
out.append(m)
if stop or len(items) < self.cfg.msg_page_size: break
page += 1; _pause(0.3, 0.8)
self.stats["msgs"] = len(out)
return out
def _detail(self, mid):
data = self._api(f"/conversation/api/messages/{mid}")
if not data: return {"body_text": "", "api_att": [], "html_res": []}
body_html = data.get("body", "")
return {"body_text": html_to_text(body_html), "api_att": data.get("attachments", []),
"html_res": extract_resources(body_html, self.cfg.base_url)}
def _download(self, url, dest_dir, hint):
cached = self.cache.already(url)
if cached: self.stats["cache"] += 1; return Attachment(url=url, filename=hint, local_path=cached, downloaded=True, source="cache")
if self.cfg.dry_run: return Attachment(url=url, filename=hint, downloaded=False, source="dry-run")
dest_dir.mkdir(parents=True, exist_ok=True)
try:
r = self.s.get(url, headers=self._xhr(), timeout=120, stream=True)
if r.status_code != 200: self.stats["dl_err"] += 1; return Attachment(url=url, filename=hint, downloaded=False, source="error")
real = _resolve_filename(r, hint); safe = _safe_name(real); dest = dest_dir / safe
if dest.exists():
stem, suf = dest.stem, dest.suffix; i = 1
while dest.exists(): dest = dest_dir / f"{stem}_{i}{suf}"; i += 1
sha = hashlib.sha256(); size = 0
with open(dest, "wb") as fp:
for chunk in r.iter_content(65_536): fp.write(chunk); sha.update(chunk); size += len(chunk)
h = sha.hexdigest()
self.cache.save(url, safe, str(dest), h, size); self.stats["dl"] += 1; self.stats["dl_bytes"] += size
return Attachment(url=url, filename=safe, size_bytes=size, content_type=r.headers.get("Content-Type", "").split(";")[0].strip(),
local_path=str(dest), downloaded=True, source="download", sha256=h)
except Exception:
self.stats["dl_err"] += 1; return Attachment(url=url, filename=hint, downloaded=False, source="error")
def _attachments(self, mid, detail):
out, seen = [], set()
d = self.cfg.attachments_dir / mid
for a in detail.get("api_att", []):
fid = a.get("id", ""); fn = a.get("filename", f"file_{fid}")
url = f"{self.cfg.base_url}/conversation/api/messages/{mid}/attachments/{fid}"
if url in seen: continue; seen.add(url); _pause(0.2, 0.6)
att = self._download(url, d, fn); out.append(att)
for res in detail.get("htm