DocIndexer-v2 / classes.py
Almaatla's picture
Upload classes.py
7506aab verified
import shutil
import bm25s
from bm25s.hf import BM25HF
import threading, re, time, concurrent.futures, requests, os, hashlib, traceback, io, zipfile, subprocess, tempfile, json, fitz
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
from datasets import load_dataset, Dataset
from datasets.data_files import EmptyDatasetError
from dotenv import load_dotenv
load_dotenv()
class TDocIndexer:
def __init__(self, max_workers=33):
self.indexer_length = 0
self.dataset = "OrganizedProgrammers/3GPPTDocLocation"
self.indexer = self.load_indexer()
self.main_ftp_url = "https://3gpp.org/ftp"
self.valid_doc_pattern = re.compile(r'^(S[1-6P]|C[1-6P]|R[1-6P])-\d+', flags=re.IGNORECASE)
self.max_workers = max_workers
self.print_lock = threading.Lock()
self.indexer_lock = threading.Lock()
self.total_indexed = 0
self.processed_count = 0
self.total_count = 0
def load_indexer(self):
self.indexer_length = 0
all_docs = {}
tdoc_locations = load_dataset(self.dataset)
tdoc_locations = tdoc_locations["train"].to_list()
for doc in tdoc_locations:
self.indexer_length += 1
all_docs[doc["doc_id"]] = doc["url"]
return all_docs
def save_indexer(self):
"""Save the updated index"""
data = []
for doc_id, url in self.indexer.items():
data.append({"doc_id": doc_id, "url": url})
dataset = Dataset.from_list(data)
dataset.push_to_hub(self.dataset, token=os.environ["HF"])
self.indexer = self.load_indexer()
def get_docs_from_url(self, url):
try:
response = requests.get(url, verify=False, timeout=10)
soup = BeautifulSoup(response.text, "html.parser")
return [item.get_text() for item in soup.select("tr td a")]
except Exception as e:
with self.print_lock:
print(f"Erreur lors de l'accès à {url}: {e}")
return []
def is_valid_document_pattern(self, filename):
return bool(self.valid_doc_pattern.match(filename))
def is_zip_file(self, filename):
return filename.lower().endswith('.zip')
def extract_doc_id(self, filename):
if self.is_valid_document_pattern(filename):
match = self.valid_doc_pattern.match(filename)
if match:
# Retourner le motif complet (comme S1-12345)
full_id = filename.split('.')[0] # Enlever l'extension si présente
return full_id.split('_')[0] # Enlever les suffixes après underscore si présents
return None
def process_zip_files(self, files_list, base_url, workshop=False):
"""Traiter une liste de fichiers pour trouver et indexer les ZIP valides"""
indexed_count = 0
for file in files_list:
if file in ['./', '../', 'ZIP/', 'zip/']:
continue
# Vérifier si c'est un fichier ZIP et s'il correspond au motif
if self.is_zip_file(file) and (self.is_valid_document_pattern(file) or workshop):
file_url = f"{base_url}/{file}"
# Extraire l'ID du document
doc_id = self.extract_doc_id(file)
if doc_id is None:
doc_id = file.split('.')[0]
if doc_id:
# Vérifier si ce fichier est déjà indexé
with self.indexer_lock:
if doc_id in self.indexer and self.indexer[doc_id] == file_url:
continue
# Ajouter ou mettre à jour l'index
self.indexer[doc_id] = file_url
indexed_count += 1
self.total_indexed += 1
return indexed_count
def process_meeting(self, meeting, wg_url, workshop=False):
"""Traiter une réunion individuelle avec multithreading"""
try:
if meeting in ['./', '../']:
return 0
meeting_url = f"{wg_url}/{meeting}"
with self.print_lock:
print(f"Vérification du meeting: {meeting}")
# Vérifier le contenu de la réunion
meeting_contents = self.get_docs_from_url(meeting_url)
key = None
for item in meeting_contents:
normalized = item.lower().rstrip('/')
if normalized in ("docs", "tdocs", "tdoc"):
key = item.rstrip('/')
break
if key is not None:
docs_url = f"{meeting_url}/{key}"
with self.print_lock:
print(f"Vérification des documents présent dans {docs_url}")
# Récupérer la liste des fichiers dans le dossier Docs
docs_files = self.get_docs_from_url(docs_url)
# 1. Indexer les fichiers ZIP directement dans le dossier Docs
docs_indexed_count = self.process_zip_files(docs_files, docs_url, workshop)
if docs_indexed_count > 0:
with self.print_lock:
print(f"{docs_indexed_count} fichiers trouvés")
# 2. Vérifier le sous-dossier ZIP s'il existe
zip_folder = None
for item in docs_files:
if item.lower().rstrip('/') == "zip":
zip_folder = item.rstrip('/')
break
if zip_folder:
zip_url = f"{docs_url}/{zip_folder}"
with self.print_lock:
print(f"Vérification du dossier ./zip: {zip_url}")
# Récupérer les fichiers dans le sous-dossier ZIP
zip_files = self.get_docs_from_url(zip_url)
# Indexer les fichiers ZIP dans le sous-dossier ZIP
zip_indexed_count = self.process_zip_files(zip_files, zip_url, workshop)
if zip_indexed_count > 0:
with self.print_lock:
print(f"{zip_indexed_count} fichiers trouvés")
# Mise à jour du compteur de progression
with self.indexer_lock:
self.processed_count += 1
# Affichage de la progression
with self.print_lock:
progress = (self.processed_count / self.total_count) * 100 if self.total_count > 0 else 0
print(f"\rProgression: {self.processed_count}/{self.total_count} réunions traitées ({progress:.1f}%)")
return 1 # Réunion traitée avec succès
except Exception as e:
with self.print_lock:
print(f"\nErreur lors du traitement de la réunion {meeting}: {str(e)}")
return 0
def process_workgroup(self, wg, main_url):
"""Traiter un groupe de travail avec multithreading pour ses réunions"""
if wg in ['./', '../']:
return
wg_url = f"{main_url}/{wg}"
with self.print_lock:
print(f"Vérification du working group: {wg}")
# Récupérer les dossiers de réunion
meeting_folders = self.get_docs_from_url(wg_url)
# Ajouter au compteur total
self.total_count += len([m for m in meeting_folders if m not in ['./', '../']])
# Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle
executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
futures = [executor.submit(self.process_meeting, meeting, wg_url)
for meeting in meeting_folders if meeting not in ['./', '../']]
total = len(futures)
done_count = 0
yield f"event: get-maximum\ndata: {total}\n\n"
try:
for future in concurrent.futures.as_completed(futures):
done_count += 1
yield f"event: progress\ndata: {done_count}\n\n"
except GeneratorExit:
for f in futures:
f.cancel()
executor.shutdown(wait=False, cancel_futures=True)
return
executor.shutdown(wait=False)
def index_all_tdocs(self):
"""Indexer tous les documents ZIP dans la structure FTP 3GPP avec multithreading"""
print("Démarrage de l'indexation des TDocs 3GPP complète")
start_time = time.time()
docs_count_before = self.indexer_length
# Principaux groupes TSG
main_groups = ["tsg_sa", "tsg_ct", "tsg_ran"] # Ajouter d'autres si nécessaire
for main_tsg in main_groups:
print(f"Indexation de {main_tsg.upper()}...")
main_url = f"{self.main_ftp_url}/{main_tsg}"
# Récupérer les groupes de travail
workgroups = self.get_docs_from_url(main_url)
# Traiter chaque groupe de travail séquentiellement
# (mais les réunions à l'intérieur seront traitées en parallèle)
for wg in workgroups:
yield f"event: info\ndata: {main_tsg}-{wg}\n\n"
for content in self.process_workgroup(wg, main_url):
yield content
docs_count_after = len(self.indexer)
new_docs_count = abs(docs_count_after - docs_count_before)
print(f"Indexation terminée en {time.time() - start_time:.2f} secondes")
print(f"Nouveaux documents ZIP indexés: {new_docs_count}")
print(f"Total des documents dans l'index: {docs_count_after}")
return self.indexer
def index_all_workshops(self):
print("Démarrage de l'indexation des workshops ZIP 3GPP...")
start_time = time.time()
docs_count_before = len(self.indexer)
print("\nIndexation du dossier 'workshop'")
main_url = f"{self.main_ftp_url}/workshop"
# Récupérer les dossiers de réunion
meeting_folders = self.get_docs_from_url(main_url)
# Ajouter au compteur total
self.total_count += len([m for m in meeting_folders if m not in ['./', '../']])
# Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle
executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
futures = [executor.submit(self.process_meeting, meeting, main_url, workshop=True)
for meeting in meeting_folders if meeting not in ['./', '../']]
total = len(futures)
done_count = 0
yield f"event: get-maximum\ndata: {total}\n\n"
try:
for future in concurrent.futures.as_completed(futures):
done_count += 1
yield f"event: progress\ndata: {done_count}\n\n"
except GeneratorExit:
for f in futures:
f.cancel()
executor.shutdown(wait=False, cancel_futures=True)
return
executor.shutdown(wait=False)
docs_count_after = len(self.indexer)
new_docs_count = docs_count_after - docs_count_before
print(f"\nIndexation terminée en {time.time() - start_time:.2f} secondes")
print(f"Nouveaux documents ZIP indexés: {new_docs_count}")
print(f"Total des documents dans l'index: {docs_count_after}")
return self.indexer
class Spec3GPPIndexer:
def __init__(self, max_workers=16):
self.spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent")["train"].to_list()
self.documents_by_spec_num = self._make_doc_index(self.spec_contents)
self.indexed_specifications = {}
self.specifications_passed = set()
self.processed_count = 0
self.total_count = 0
try:
self.failed_specifications = set(
item["spec_id"] for item in load_dataset("OrganizedProgrammers/3GPPFailedSpecs")["train"].to_list()
)
print(f"Loaded {len(self.failed_specifications)} previously failed specifications")
except (EmptyDatasetError, Exception):
self.failed_specifications = set()
self.DICT_LOCK = threading.Lock()
self.DOCUMENT_LOCK = threading.Lock()
self.STOP_EVENT = threading.Event()
self.max_workers = max_workers
self.LIBREOFFICE_SEMAPHORE = threading.Semaphore(self.max_workers)
def _make_doc_index(self, specs):
doc_index = {}
for section in specs:
if section["doc_id"] not in doc_index:
doc_index[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]}
else:
doc_index[section["doc_id"]]["content"][section["section"]] = section["content"]
return doc_index
@staticmethod
def version_to_code(version_str):
chars = "0123456789abcdefghijklmnopqrstuvwxyz"
parts = version_str.split('.')
if len(parts) != 3:
return None
try:
x, y, z = [int(p) for p in parts]
except ValueError:
return None
if x < 36 and y < 36 and z < 36:
return f"{chars[x]}{chars[y]}{chars[z]}"
else:
return f"{str(x).zfill(2)}{str(y).zfill(2)}{str(z).zfill(2)}"
@staticmethod
def hasher(specification, version_code):
return hashlib.md5(f"{specification}{version_code}".encode()).hexdigest()
@staticmethod
def get_scope(content):
for title, text in content.items():
if title.lower().endswith("scope"):
return text
return ""
def get_text(self, specification, version_code):
if self.STOP_EVENT.is_set():
return []
doc_id = specification
series = doc_id.split(".")[0]
url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip"
try:
response = requests.get(url, verify=False, timeout=(10, 120))
if response.status_code != 200:
return []
zip_bytes = io.BytesIO(response.content)
with zipfile.ZipFile(zip_bytes) as zip_file:
# Filtrer uniquement fichiers .doc et .docx
docx_files = [f for f in zip_file.namelist() if f.lower().endswith(('.doc', '.docx'))]
if not docx_files:
return []
full_text = []
for doc_file in docx_files:
with tempfile.TemporaryDirectory() as tmpdir:
extracted_path = os.path.join(tmpdir, os.path.basename(doc_file))
with open(extracted_path, 'wb') as f:
f.write(zip_file.read(doc_file))
# Profil libreoffice temp dédié
profile_dir = tempfile.mkdtemp(prefix="libreoffice_profile_")
try:
with self.LIBREOFFICE_SEMAPHORE:
cmd = [
'soffice',
'--headless',
f'-env:UserInstallation=file://{profile_dir}',
'--convert-to', 'txt:Text',
'--outdir', tmpdir,
extracted_path
]
subprocess.run(cmd, check=True, timeout=60*2, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
txt_file = os.path.splitext(extracted_path)[0] + '.txt'
if os.path.exists(txt_file):
with open(txt_file, 'r', encoding='utf-8', errors='ignore') as ftxt:
full_text.extend(ftxt.readlines())
finally:
shutil.rmtree(profile_dir, ignore_errors=True)
return full_text
except Exception as e:
print(f"Error getting text for {specification} v{version_code}: {e}")
if isinstance(e, (subprocess.TimeoutExpired, subprocess.CalledProcessError)):
with self.DICT_LOCK:
self.failed_specifications.add(specification)
print(f"Spec {specification}: marked as failed for future indexation runs")
return []
def get_spec_content(self, specification, version_code):
if self.STOP_EVENT.is_set():
return {}
text = self.get_text(specification, version_code)
if not text:
return {}
chapters = []
chapter_regex = re.compile(r"^(\d+[a-z]?(?:\.\d+)*)\t[A-Z0-9][\ \S]+[^\.]$")
for i, line in enumerate(text):
if chapter_regex.fullmatch(line):
chapters.append((i, line))
document = {}
for i in range(len(chapters)):
start_index, chapter_title = chapters[i]
end_index = chapters[i+1][0] if i+1 < len(chapters) else len(text)
content_lines = text[start_index + 1:end_index]
document[chapter_title.replace("\t", " ")] = "\n".join(content_lines)
return document
def fetch_spec_table(self):
response = requests.get(
'https://www.3gpp.org/dynareport?code=status-report.htm',
headers={"User-Agent": 'Mozilla/5.0'},
verify=False,
timeout=(10, 60)
)
dfs = pd.read_html(io.StringIO(response.text))
for x in range(len(dfs)):
dfs[x] = dfs[x].replace({np.nan: None})
columns_needed = [0, 1, 2, 3, 4]
extracted_dfs = [df.iloc[:, columns_needed] for df in dfs]
columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns]
specifications = []
for df in extracted_dfs:
for index, row in df.iterrows():
doc = row.to_list()
doc_dict = dict(zip(columns, doc))
specifications.append(doc_dict)
return specifications
def process_specification(self, spec):
if self.STOP_EVENT.is_set():
return
try:
spec_num = spec.get('spec_num')
vers = spec.get('vers')
if spec_num is None or vers is None:
with self.DICT_LOCK:
self.processed_count += 1
return
doc_id = str(spec_num)
version_code = self.version_to_code(str(vers))
if not version_code:
with self.DICT_LOCK:
self.processed_count += 1
return
if doc_id in self.failed_specifications:
with self.DICT_LOCK:
self.processed_count += 1
print(f"Spec {doc_id} ({spec.get('title', '')}): skipped (previously failed) - Progress {self.processed_count}/{self.total_count}")
return
document = None
already_indexed = False
needs_fetch = False
with self.DOCUMENT_LOCK:
if doc_id in self.specifications_passed:
document = self.documents_by_spec_num.get(doc_id)
already_indexed = True
elif (doc_id in self.documents_by_spec_num
and self.documents_by_spec_num[doc_id]["hash"] == self.hasher(doc_id, version_code)):
document = self.documents_by_spec_num[doc_id]
self.specifications_passed.add(doc_id)
already_indexed = True
else:
self.specifications_passed.add(doc_id)
needs_fetch = True
if needs_fetch:
doc_content = self.get_spec_content(doc_id, version_code)
if doc_content:
document = {"content": doc_content, "hash": self.hasher(doc_id, version_code)}
with self.DOCUMENT_LOCK:
self.documents_by_spec_num[doc_id] = document
already_indexed = False
if document:
url = f"https://www.3gpp.org/ftp/Specs/archive/{doc_id.split('.')[0]}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip"
metadata = {
"id": doc_id,
"title": spec.get("title", ""),
"type": spec.get("type", ""),
"version": str(spec.get("vers", "")),
"working_group": spec.get("WG", ""),
"url": url,
"scope": self.get_scope(document["content"])
}
key = f"{doc_id}+-+{spec.get('title', '')}+-+{spec.get('type', '')}+-+{spec.get('vers', '')}+-+{spec.get('WG', '')}"
with self.DICT_LOCK:
self.indexed_specifications[key] = metadata
with self.DICT_LOCK:
self.processed_count += 1
status = "already indexed" if already_indexed else "indexed now"
print(f"Spec {doc_id} ({spec.get('title', '')}): {status} - Progress {self.processed_count}/{self.total_count}")
except Exception as e:
traceback.print_exc()
print(f"Error processing spec {spec.get('spec_num')} v{spec.get('vers')}: {e}")
with self.DICT_LOCK:
self.processed_count += 1
print(f"Progress: {self.processed_count}/{self.total_count} specs processed")
def get_document(self, spec_id: str, spec_title: str):
text = [f"{spec_id} - {spec_title}\n"]
doc_data = self.documents_by_spec_num.get(spec_id)
if doc_data:
for section_title, content in doc_data["content"].items():
text.append(f"{section_title}\n\n{content}")
return text
def create_bm25_index(self):
dataset_metadata = self.indexed_specifications.values()
unique_specs = set()
corpus_json = []
for specification in dataset_metadata:
if specification['id'] in unique_specs: continue
unique_specs.add(specification['id'])
doc_data = self.documents_by_spec_num.get(specification['id'])
if doc_data:
for section_title, content in doc_data["content"].items():
corpus_json.append({"text": f"{section_title}\n{content}", "metadata": {
"id": specification['id'],
"title": specification['title'],
"section_title": section_title,
"version": specification['version'],
"type": specification['type'],
"working_group": specification['working_group'],
"url": specification['url'],
"scope": specification['scope']
}})
corpus_text = [doc["text"] for doc in corpus_json]
corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en")
print("Indexing BM25")
retriever = BM25HF(corpus=corpus_json)
retriever.index(corpus_tokens)
retriever.save_to_hub("OrganizedProgrammers/3GPPBM25IndexSections", token=os.environ.get("HF"))
unique_specs = set()
corpus_json = []
for specification in dataset_metadata:
if specification['id'] in unique_specs: continue
text_list = self.get_document(specification['id'], specification['title'])
text = "\n".join(text_list)
if len(text_list) == 1: continue
corpus_json.append({"text": text, "metadata": specification})
unique_specs.add(specification['id'])
corpus_text = [doc["text"] for doc in corpus_json]
corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en")
print("Indexing BM25")
retriever = BM25HF(corpus=corpus_json)
retriever.index(corpus_tokens)
retriever.save_to_hub("OrganizedProgrammers/3GPPBM25IndexSingle", token=os.environ.get("HF"))
def run(self):
print("Fetching specification tables from 3GPP...")
yield "event: info\ndata: Indexing 3GPP specs ...\n\n"
specifications = self.fetch_spec_table()
self.total_count = len(specifications)
print(f"Processing {self.total_count} specs with {self.max_workers} threads...")
executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
futures = [executor.submit(self.process_specification, spec) for spec in specifications]
total = len(futures)
done_count = 0
yield f"event: get-maximum\ndata: {total}\n\n"
try:
for future in concurrent.futures.as_completed(futures):
done_count += 1
yield f"event: progress\ndata: {done_count}\n\n"
if self.STOP_EVENT.is_set():
break
except GeneratorExit:
for f in futures:
f.cancel()
executor.shutdown(wait=False, cancel_futures=True)
return
executor.shutdown(wait=False)
print("All specs processed.")
# Sauvegarde (identique au script original)
def save(self):
print("Saving indexed data...")
flat_metadata = [metadata for metadata in self.indexed_specifications.values()]
flat_docs = []
print("Flatting doc contents")
for doc_id, data in self.documents_by_spec_num.items():
for title, content in data["content"].items():
flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content})
print("Creating datasets ...")
push_spec_content = Dataset.from_list(flat_docs)
push_spec_metadata = Dataset.from_list(flat_metadata)
# Token handling assumed set in environment
print("Pushing ...")
push_spec_content.push_to_hub("OrganizedProgrammers/3GPPSpecContent", token=os.environ["HF"])
push_spec_metadata.push_to_hub("OrganizedProgrammers/3GPPSpecMetadata", token=os.environ["HF"])
if self.failed_specifications:
failed_list = [{"spec_id": spec_id} for spec_id in sorted(self.failed_specifications)]
Dataset.from_list(failed_list).push_to_hub("OrganizedProgrammers/3GPPFailedSpecs", token=os.environ["HF"])
print(f"Saved {len(failed_list)} failed specifications to 3GPPFailedSpecs")
self.spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent")["train"].to_list()
self.documents_by_spec_num = self._make_doc_index(self.spec_contents)
print("Save finished.")
class SpecETSIIndexer:
def __init__(self, max_workers=16):
self.session = requests.Session()
self.session.verify = False
self.spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent")["train"].to_list()
self.documents_by_spec_num = self._make_doc_index(self.spec_contents)
self.indexed_specifications = {}
self.specifications_passed = set()
self.processed_count = 0
self.total_count = 0
self.DICT_LOCK = threading.Lock()
self.DOCUMENT_LOCK = threading.Lock()
self.STOP_EVENT = threading.Event()
self.max_workers = max_workers
self.df = self._fetch_spec_table()
def _make_doc_index(self, specs):
doc_index = {}
for section in specs:
if section["doc_id"] not in doc_index:
doc_index[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]}
else:
doc_index[section["doc_id"]]["content"][section["section"]] = section["content"]
return doc_index
def _fetch_spec_table(self):
# Connexion login et récupération CSV TS/TR
print("Connexion login ETSI...")
self.session.post(
"https://portal.etsi.org/ETSIPages/LoginEOL.ashx",
verify=False,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) ..."},
data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")}),
timeout=(10, 30),
)
print("Récupération des métadonnées TS/TR …")
url_ts = "https://www.etsi.org/?option=com_standardssearch&view=data&format=csv&includeScope=1&page=1&search=&title=1&etsiNumber=1&content=0&version=0&onApproval=0&published=1&withdrawn=0&historical=0&isCurrent=1&superseded=0&harmonized=0&keyword=&TB=&stdType=TS&frequency=&mandate=&collection=&sort=1"
url_tr = url_ts.replace("stdType=TS", "stdType=TR")
data_ts = self.session.get(url_ts, verify=False, timeout=(10, 120)).content
data_tr = self.session.get(url_tr, verify=False, timeout=(10, 120)).content
df_ts = pd.read_csv(io.StringIO(data_ts.decode('utf-8')), sep=";", skiprows=1, index_col=False)
df_tr = pd.read_csv(io.StringIO(data_tr.decode('utf-8')), sep=";", skiprows=1, index_col=False)
backup_ts = df_ts["ETSI deliverable"]
backup_tr = df_tr["ETSI deliverable"]
df_ts["ETSI deliverable"] = df_ts["ETSI deliverable"].str.extract(r"\s*ETSI TS (\d+ \d+(?:-\d+(?:-\d+)?)?)")
df_tr["ETSI deliverable"] = df_tr["ETSI deliverable"].str.extract(r"\s*ETSI TR (\d+ \d+(?:-\d+(?:-\d+)?)?)")
version1 = backup_ts.str.extract(r"\s*ETSI TS \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)")
version2 = backup_tr.str.extract(r"\s*ETSI TR \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)")
df_ts["Version"] = version1[0]
df_tr["Version"] = version2[0]
def ver_tuple(v):
if not isinstance(v, str):
return (0, 0, 0)
return tuple(map(int, v.split(".")))
df_ts["temp"] = df_ts["Version"].apply(ver_tuple)
df_tr["temp"] = df_tr["Version"].apply(ver_tuple)
df_ts["Type"] = "TS"
df_tr["Type"] = "TR"
df = pd.concat([df_ts, df_tr])
df = df.dropna(subset=["ETSI deliverable", "Version"])
unique_df = df.loc[df.groupby("ETSI deliverable")["temp"].idxmax()]
unique_df = unique_df.drop(columns="temp")
unique_df = unique_df[(~unique_df["title"].str.contains("3GPP", case=True, na=False))]
return unique_df
@staticmethod
def hasher(specification: str, version: str):
return hashlib.md5(f"{specification}{version}".encode()).hexdigest()
@staticmethod
def get_scope(content):
for title, text in content.items():
if title.lower().endswith("scope"):
return text
return ""
def get_document(self, spec_id: str, spec_title: str):
text = [f"{spec_id} - {spec_title}\n"]
doc_data = self.documents_by_spec_num.get(spec_id)
if doc_data:
for section_title, content in doc_data["content"].items():
text.append(f"{section_title}\n\n{content}")
return text
def get_text(self, specification: str):
if self.STOP_EVENT.is_set():
return None, []
print(f"\n[INFO] Tentative de récupération de la spécification {specification}", flush=True)
try:
# Récupérer la ligne avec le bon lien PDF
row = self.df[self.df["ETSI deliverable"] == specification]
if row.empty:
print(f"[WARN] Spécification {specification} absente du tableau")
return None, []
pdf_link = row.iloc[0]["PDF link"]
response = self.session.get(
pdf_link,
headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...'},
timeout=(10, 120)
)
if response.status_code != 200:
print(f"[ERREUR] Echec du téléchargement du PDF pour {specification}.")
return None, []
pdf = fitz.open(stream=response.content, filetype="pdf")
return pdf, pdf.get_toc()
except Exception as e:
print(f"[ERROR] Échec get_text pour {specification} : {e}", flush=True)
return None, []
def get_spec_content(self, specification: str):
def extract_sections(text, titles):
sections = {}
sorted_titles = sorted(titles, key=lambda t: text.find(t))
for i, title in enumerate(sorted_titles):
start = text.find(title)
if i + 1 < len(sorted_titles):
end = text.find(sorted_titles[i + 1])
sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:end].replace(title, "").strip().rstrip())
else:
sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:].replace(title, "").strip().rstrip())
return sections
if self.STOP_EVENT.is_set():
return {}
print(f"[INFO] Extraction du contenu de {specification}", flush=True)
pdf, doc_toc = self.get_text(specification)
text = []
if not pdf or not doc_toc:
print("[ERREUR] Pas de texte ou table of contents trouvé !")
return {}
# On prend à partir de la première réelle page référencée
first_page = 0
for level, title, page in doc_toc:
first_page = page - 1
break
for page in pdf[first_page:]:
text.append("\n".join([line.strip() for line in page.get_text().splitlines()]))
text = "\n".join(text)
if not text or not doc_toc or self.STOP_EVENT.is_set():
print("[ERREUR] Pas de texte/table of contents récupéré !")
return {}
titles = []
for level, title, page in doc_toc:
if self.STOP_EVENT.is_set():
return {}
if title and title[0].isnumeric() and '\n'.join(title.strip().split(" ", 1)) in text:
titles.append('\n'.join(title.strip().split(" ", 1)))
return extract_sections(text, titles)
def process_specification(self, spec):
if self.STOP_EVENT.is_set():
return
doc_id = "unknown"
try:
version = spec.get('Version')
if not version or (isinstance(version, float) and pd.isna(version)):
with self.DICT_LOCK:
self.processed_count += 1
return
doc_id = spec.get("ETSI deliverable")
if not doc_id or (isinstance(doc_id, float) and pd.isna(doc_id)):
with self.DICT_LOCK:
self.processed_count += 1
return
doc_id = str(doc_id)
document = None
already_indexed = False
needs_fetch = False
with self.DOCUMENT_LOCK:
if doc_id in self.specifications_passed:
document = self.documents_by_spec_num.get(doc_id)
already_indexed = True
elif (doc_id in self.documents_by_spec_num
and self.documents_by_spec_num[doc_id]["hash"] == self.hasher(doc_id, version)):
document = self.documents_by_spec_num[doc_id]
self.specifications_passed.add(doc_id)
already_indexed = True
else:
self.specifications_passed.add(doc_id)
needs_fetch = True
if needs_fetch:
document_content = self.get_spec_content(doc_id)
if document_content:
document = {"content": document_content, "hash": self.hasher(doc_id, version)}
with self.DOCUMENT_LOCK:
self.documents_by_spec_num[doc_id] = document
already_indexed = False
if document:
title = spec.get("title", "")
if isinstance(title, float) and pd.isna(title):
title = ""
spec_type = spec.get("Type", "")
pdf_link = spec.get("PDF link", "")
string_key = f"{doc_id}+-+{title}+-+{spec_type}+-+{version}"
metadata = {
"id": str(doc_id),
"title": title,
"type": spec_type,
"version": version,
"url": pdf_link,
"scope": self.get_scope(document["content"])
}
with self.DICT_LOCK:
self.indexed_specifications[string_key] = metadata
with self.DICT_LOCK:
self.processed_count += 1
status = "already indexed" if already_indexed else "indexed now"
print(f"Spec {doc_id} ({spec.get('title', '')}): {status} - Progress {self.processed_count}/{self.total_count}")
except Exception as e:
traceback.print_exc()
print(f"\n[ERREUR] Échec du traitement de {doc_id} {spec.get('Version')}: {e}", flush=True)
with self.DICT_LOCK:
self.processed_count += 1
print(f"Progress: {self.processed_count}/{self.total_count} specs processed")
def run(self):
print("Démarrage indexation ETSI…")
yield "event: info\ndata: Indexing ETSI specs ...\n\n"
specifications = self.df.to_dict(orient="records")
self.total_count = len(specifications)
print(f"Traitement de {self.total_count} specs avec {self.max_workers} threads...\n")
executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
futures = [executor.submit(self.process_specification, spec) for spec in specifications]
total = len(futures)
done_count = 0
yield f"event: get-maximum\ndata: {total}\n\n"
try:
for future in concurrent.futures.as_completed(futures):
done_count += 1
yield f"event: progress\ndata: {done_count}\n\n"
if self.STOP_EVENT.is_set():
break
except GeneratorExit:
for f in futures:
f.cancel()
executor.shutdown(wait=False, cancel_futures=True)
return
executor.shutdown(wait=False)
print(f"\nAll {self.processed_count}/{self.total_count} specs processed.")
def save(self):
print("\nSauvegarde en cours...", flush=True)
flat_metadata = [metadata for metadata in self.indexed_specifications.values()]
flat_docs = []
for doc_id, data in self.documents_by_spec_num.items():
for title, content in data["content"].items():
flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content})
push_spec_content = Dataset.from_list(flat_docs)
push_spec_metadata = Dataset.from_list(flat_metadata)
push_spec_content.push_to_hub("OrganizedProgrammers/ETSISpecContent", token=os.environ["HF"])
push_spec_metadata.push_to_hub("OrganizedProgrammers/ETSISpecMetadata", token=os.environ["HF"])
self.spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent")["train"].to_list()
self.documents_by_spec_num = self._make_doc_index(self.spec_contents)
print("Sauvegarde terminée.")
def create_bm25_index(self):
dataset_metadata = self.indexed_specifications.values()
unique_specs = set()
corpus_json = []
for specification in dataset_metadata:
if specification['id'] in unique_specs: continue
unique_specs.add(specification['id'])
doc_data = self.documents_by_spec_num.get(specification['id'])
if doc_data:
for section_title, content in doc_data["content"].items():
corpus_json.append({"text": f"{section_title}\n{content}", "metadata": {
"id": specification['id'],
"title": specification['title'],
"section_title": section_title,
"version": specification['version'],
"type": specification['type'],
"url": specification['url'],
"scope": specification['scope']
}})
corpus_text = [doc["text"] for doc in corpus_json]
corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en")
print("Indexing BM25")
retriever = BM25HF(corpus=corpus_json)
retriever.index(corpus_tokens)
retriever.save_to_hub("OrganizedProgrammers/ETSIBM25IndexSections", token=os.environ.get("HF"))
unique_specs = set()
corpus_json = []
for specification in dataset_metadata:
if specification['id'] in unique_specs: continue
text_list = self.get_document(specification['id'], specification['title'])
text = "\n".join(text_list)
if len(text_list) == 1: continue
corpus_json.append({"text": text, "metadata": specification})
unique_specs.add(specification['id'])
corpus_text = [doc["text"] for doc in corpus_json]
corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en")
print("Indexing BM25")
retriever = BM25HF(corpus=corpus_json)
retriever.index(corpus_tokens)
retriever.save_to_hub("OrganizedProgrammers/ETSIBM25IndexSingle", token=os.environ.get("HF"))