| |
| import os |
| import zipfile |
| import uuid |
| import subprocess |
| import difflib |
| import io |
| import pdfplumber |
| import pandas as pd |
| from pypdf import PdfWriter, PdfReader, Transformation |
| from pdf2image import convert_from_path |
| from pdf2docx import Converter |
| from PIL import Image |
| from pptx import Presentation |
| from pptx.util import Inches |
|
|
| from reportlab.lib.pagesizes import A4, letter |
| from reportlab.lib import colors |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle |
| from reportlab.lib.enums import TA_JUSTIFY |
| from reportlab.pdfgen import canvas |
| from reportlab.lib.units import inch |
|
|
| from config import TEMP_DIR |
|
|
| class PDFEngine: |
|
|
| |
| @staticmethod |
| def _get_output_path(filename: str) -> str: |
| unique_name = f"{uuid.uuid4().hex[:8]}_{filename}" |
| return os.path.join(TEMP_DIR, unique_name) |
|
|
| def get_pdf_info(self, file_path: str) -> dict: |
| try: |
| reader = PdfReader(file_path) |
| meta = reader.metadata |
| title = meta.title if meta and meta.title else "Sin título" |
| return {"pages": len(reader.pages), "name": os.path.basename(file_path), "title": title} |
| except: return {"pages": 0, "name": "Error", "title": ""} |
|
|
| def _parse_range_groups(self, range_str: str, max_pages: int) -> list: |
| groups = [] |
| parts = range_str.split(',') |
| for part in parts: |
| part = part.strip() |
| if not part: continue |
| current_group = [] |
| if '-' in part: |
| try: |
| start, end = map(int, part.split('-')) |
| start = max(1, start) |
| end = min(max_pages, end) |
| if start <= end: current_group = list(range(start - 1, end)) |
| except ValueError: continue |
| else: |
| try: |
| p = int(part) |
| if 1 <= p <= max_pages: current_group = [p - 1] |
| except ValueError: continue |
| if current_group: groups.append({"label": part, "indices": current_group}) |
| return groups |
|
|
| |
| def generate_preview(self, f, p): |
| try: |
| imgs = convert_from_path(f, first_page=p, last_page=p, size=(None, 400)) |
| if imgs: |
| out = self._get_output_path(f"preview_pg{p}.jpg") |
| imgs[0].save(out, "JPEG") |
| return out |
| except: return None |
|
|
| def get_rotated_preview(self, f, a): |
| if not f: return None |
| try: |
| imgs = convert_from_path(f, first_page=1, last_page=1, size=(None, 500)) |
| if not imgs: return None |
| img = imgs[0] |
| if a != 0: img = img.rotate(-a, expand=True) |
| out = self._get_output_path(f"rot_prev_{a}.jpg") |
| img.save(out, "JPEG") |
| return out |
| except: return None |
|
|
| def get_preview_indices_from_string(self, range_str: str, max_pages: int) -> list: |
| key_pages = [] |
| parts = range_str.split(',') |
| for part in parts: |
| part = part.strip() |
| if '-' in part: |
| try: |
| s, e = map(int, part.split('-')) |
| key_pages.extend([max(1, min(s, max_pages)), max(1, min(e, max_pages))]) |
| except ValueError: continue |
| else: |
| try: |
| p = int(part) |
| if 1 <= p <= max_pages: key_pages.append(p) |
| except ValueError: continue |
| return sorted(list(set(key_pages))) |
|
|
| |
| |
| def merge_pdfs(self, file_paths: list, order_indices: list = None, use_numbering: bool = False) -> str: |
| if not file_paths: raise ValueError("No hay archivos.") |
| |
| ordered = [] |
| if order_indices and len(order_indices) == len(file_paths): |
| try: ordered = [file_paths[int(i)] for i in order_indices] |
| except: ordered = file_paths |
| else: ordered = file_paths |
| |
| m = PdfWriter() |
| for p in ordered: m.append(p) |
| |
| temp_out = self._get_output_path("temp_unido.pdf") |
| with open(temp_out, "wb") as f: m.write(f) |
| |
| if use_numbering: |
| final_out = self._add_page_numbers(temp_out) |
| try: os.remove(temp_out) |
| except: pass |
| return final_out |
| |
| return temp_out |
|
|
| def _add_page_numbers(self, file_path: str) -> str: |
| reader = PdfReader(file_path) |
| writer = PdfWriter() |
| num_pages = len(reader.pages) |
|
|
| for i, page in enumerate(reader.pages): |
| packet = io.BytesIO() |
| can = canvas.Canvas(packet, pagesize=letter) |
| page_width = float(page.mediabox.width) |
| text = f"Página {i+1} de {num_pages}" |
| can.setFont("Helvetica", 10) |
| can.drawCentredString(page_width / 2.0, 20, text) |
| can.save() |
|
|
| packet.seek(0) |
| new_pdf = PdfReader(packet) |
| page.merge_page(new_pdf.pages[0]) |
| writer.add_page(page) |
|
|
| out = self._get_output_path("unido_numerado.pdf") |
| with open(out, "wb") as f: writer.write(f) |
| return out |
|
|
| def add_watermark(self, file_path: str, text: str) -> str: |
| if not file_path or not text: raise ValueError("Falta archivo o texto.") |
| |
| reader = PdfReader(file_path) |
| writer = PdfWriter() |
|
|
| packet = io.BytesIO() |
| can = canvas.Canvas(packet, pagesize=letter) |
| can.setFont("Helvetica-Bold", 50) |
| can.setFillColorRGB(0.5, 0.5, 0.5, 0.3) |
| |
| can.saveState() |
| can.translate(300, 400) |
| can.rotate(45) |
| can.drawCentredString(0, 0, text) |
| can.restoreState() |
| can.save() |
|
|
| packet.seek(0) |
| watermark_pdf = PdfReader(packet) |
| watermark_page = watermark_pdf.pages[0] |
|
|
| for page in reader.pages: |
| page.merge_page(watermark_page) |
| writer.add_page(page) |
|
|
| out = self._get_output_path("marca_agua.pdf") |
| with open(out, "wb") as f: writer.write(f) |
| return out |
|
|
| def repair_pdf(self, file_path: str) -> str: |
| if not file_path: raise ValueError("Falta archivo.") |
| |
| out = self._get_output_path("reparado.pdf") |
| cmd = [ |
| "gs", |
| "-o", out, |
| "-sDEVICE=pdfwrite", |
| "-dPDFSETTINGS=/default", |
| "-dInteract=N", |
| "-dNOPAUSE", "-dQUIET", "-dBATCH", |
| file_path |
| ] |
| |
| try: |
| subprocess.run(cmd, check=True) |
| return out |
| except subprocess.CalledProcessError: |
| raise RuntimeError("Ghostscript no pudo reparar el archivo (daño severo).") |
| except Exception as e: |
| raise RuntimeError(f"Error sistema: {e}") |
|
|
| def split_pdf_custom(self, file_path: str, range_str: str) -> str: |
| if not file_path: raise ValueError("Falta archivo.") |
| r = PdfReader(file_path) |
| g = self._parse_range_groups(range_str, len(r.pages)) |
| if not g: raise ValueError("Rango inválido.") |
| gen = [] |
| base = os.path.basename(file_path).replace(".pdf", "") |
| for group in g: |
| w = PdfWriter() |
| for i in group["indices"]: w.add_page(r.pages[i]) |
| safe = group["label"].replace(" ", "") |
| p = self._get_output_path(f"{base}_part_{safe}.pdf") |
| with open(p, "wb") as f: w.write(f) |
| gen.append(p) |
| zp = self._get_output_path(f"{base}_split.zip") |
| with zipfile.ZipFile(zp, 'w') as z: |
| for f in gen: z.write(f, arcname=os.path.basename(f)) |
| return zp |
|
|
| def reorder_pages(self, file_path: str, order_str: str) -> str: |
| if not file_path: raise ValueError("Falta archivo.") |
| r = PdfReader(file_path) |
| g = self._parse_range_groups(order_str, len(r.pages)) |
| if not g: raise ValueError("Orden inválido.") |
| w = PdfWriter() |
| flat = [i for group in g for i in group["indices"]] |
| for i in flat: w.add_page(r.pages[i]) |
| out = self._get_output_path("reordenado.pdf") |
| with open(out, "wb") as f: w.write(f) |
| return out |
|
|
| def compress_pdf(self, file_path: str, power: int = 3) -> str: |
| if not file_path: raise ValueError("Falta archivo.") |
| q = {1: "/prepress", 3: "/ebook", 4: "/screen"} |
| gs_set = q.get(power, "/ebook") |
| out = self._get_output_path("comprimido.pdf") |
| cmd = ["gs", "-sDEVICE=pdfwrite", "-dCompatibilityLevel=1.4", f"-dPDFSETTINGS={gs_set}", "-dNOPAUSE", "-dQUIET", "-dBATCH", f"-sOutputFile={out}", file_path] |
| try: |
| subprocess.run(cmd, check=True) |
| return out |
| except: raise RuntimeError("Error comprimiendo (Ghostscript).") |
|
|
| def protect_pdf(self, file_path: str, password: str) -> str: |
| if not file_path or not password: raise ValueError("Faltan datos.") |
| try: |
| r = PdfReader(file_path) |
| w = PdfWriter() |
| for p in r.pages: w.add_page(p) |
| w.encrypt(password) |
| out = self._get_output_path("protegido.pdf") |
| with open(out, "wb") as f: w.write(f) |
| return out |
| except Exception as e: raise RuntimeError(f"Error: {e}") |
|
|
| def rotate_pdf(self, file_path: str, angle: int) -> str: |
| if not file_path: raise ValueError("Falta archivo.") |
| try: |
| r = PdfReader(file_path) |
| w = PdfWriter() |
| for p in r.pages: |
| p.rotate(angle) |
| w.add_page(p) |
| out = self._get_output_path(f"rotado_{angle}.pdf") |
| with open(out, "wb") as f: w.write(f) |
| return out |
| except Exception as e: raise RuntimeError(f"Error: {e}") |
|
|
| def update_metadata(self, f, t, a, s): |
| if not f: raise ValueError("Falta archivo.") |
| try: |
| r = PdfReader(f) |
| w = PdfWriter() |
| for p in r.pages: w.add_page(p) |
| w.add_metadata({"/Title": t, "/Author": a, "/Subject": s, "/Producer": "OpenPDF Tools"}) |
| out = self._get_output_path("meta.pdf") |
| with open(out, "wb") as outf: w.write(outf) |
| return out |
| except Exception as e: raise RuntimeError(f"Error: {e}") |
|
|
| def extract_text(self, f): |
| if not f: raise ValueError("Falta archivo.") |
| try: |
| r = PdfReader(f) |
| txts = [] |
| for i, p in enumerate(r.pages): |
| t = p.extract_text() |
| if t: txts.append(f"--- Pág {i+1} ---\n{t}\n") |
| out = self._get_output_path(os.path.basename(f).replace(".pdf", ".txt")) |
| with open(out, "w", encoding="utf-8") as file: file.write("\n".join(txts)) |
| return out |
| except Exception as e: raise RuntimeError(f"Error: {e}") |
|
|
| |
| def compare_pdfs_text(self, path_a: str, path_b: str) -> str: |
| if not path_a or not path_b: raise ValueError("Faltan archivos.") |
|
|
| def get_all_words(path): |
| try: |
| reader = PdfReader(path) |
| text = "" |
| for page in reader.pages: |
| extracted = page.extract_text() |
| if extracted: text += extracted + " " |
| return text.split() |
| except Exception as e: |
| raise RuntimeError(f"Error leyendo PDF: {e}") |
|
|
| words_a = get_all_words(path_a) |
| words_b = get_all_words(path_b) |
| diff = difflib.ndiff(words_a, words_b) |
|
|
| output_path = self._get_output_path("informe_diferencias_palabras.pdf") |
| doc = SimpleDocTemplate(output_path, pagesize=A4) |
| styles = getSampleStyleSheet() |
| |
| style_body = ParagraphStyle( |
| 'Body', |
| parent=styles['BodyText'], |
| alignment=TA_JUSTIFY, |
| fontSize=11, |
| leading=14 |
| ) |
|
|
| story = [] |
| story.append(Paragraph("Informe de Comparación (Modo Palabras)", styles['Heading1'])) |
| story.append(Spacer(1, 12)) |
| |
| legend = '<b>Leyenda:</b> <font color="red"><strike>Eliminado</strike></font> | <font color="green"><b>Añadido</b></font> | Texto Común' |
| story.append(Paragraph(legend, style_body)) |
| story.append(Spacer(1, 12)) |
| story.append(Paragraph(f"<b>A:</b> {os.path.basename(path_a)} | <b>B:</b> {os.path.basename(path_b)}", style_body)) |
| story.append(Spacer(1, 12)) |
|
|
| current_html = "" |
| word_count = 0 |
| |
| for token in diff: |
| code = token[:2] |
| word = token[2:] |
| safe_word = word.replace('&', '&').replace('<', '<').replace('>', '>') |
| |
| chunk = "" |
| if code == '- ': |
| chunk = f'<font color="red"><strike>{safe_word}</strike></font> ' |
| elif code == '+ ': |
| chunk = f'<font color="green"><b>{safe_word}</b></font> ' |
| elif code == ' ': |
| chunk = f'{safe_word} ' |
| |
| current_html += chunk |
| word_count += 1 |
| |
| if word_count > 300 and code == ' ': |
| story.append(Paragraph(current_html, style_body)) |
| story.append(Spacer(1, 6)) |
| current_html = "" |
| word_count = 0 |
|
|
| if current_html: |
| story.append(Paragraph(current_html, style_body)) |
|
|
| doc.build(story) |
| return output_path |
|
|
| |
| def pdf_to_pptx(self, f): |
| if not f: raise ValueError("Falta archivo.") |
| try: |
| imgs = convert_from_path(f, dpi=150) |
| prs = Presentation() |
| blank = 6 |
| for i, img in enumerate(imgs): |
| ip = self._get_output_path(f"slide_{i}.jpg") |
| img.save(ip, "JPEG") |
| slide = prs.slides.add_slide(prs.slide_layouts[blank]) |
| slide.shapes.add_picture(ip, Inches(0), Inches(0), width=prs.slide_width) |
| out = self._get_output_path(os.path.basename(f).replace(".pdf", ".pptx")) |
| prs.save(out) |
| return out |
| except Exception as e: raise RuntimeError(f"Error PPTX: {e}") |
|
|
| def pdf_to_word(self, f): |
| if not f: raise ValueError("Falta archivo.") |
| try: |
| out = self._get_output_path(os.path.basename(f).replace(".pdf", ".docx")) |
| cv = Converter(f) |
| cv.convert(out, start=0, end=None) |
| cv.close() |
| return out |
| except Exception as e: raise RuntimeError(f"Error Word: {e}") |
| |
| def pdf_to_images_zip(self, f): |
| if not f: raise ValueError("Falta archivo.") |
| try: |
| imgs = convert_from_path(f, dpi=150) |
| paths = [] |
| base = os.path.basename(f).replace(".pdf", "") |
| for i, img in enumerate(imgs): |
| p = self._get_output_path(f"{base}_{i+1}.jpg") |
| img.save(p, "JPEG") |
| paths.append(p) |
| zp = self._get_output_path(f"{base}_imgs.zip") |
| with zipfile.ZipFile(zp, 'w') as z: |
| for p in paths: z.write(p, arcname=os.path.basename(p)) |
| return zp |
| except: raise RuntimeError("Error imgs") |