| | from pptx import Presentation |
| | from pdf2image import convert_from_path |
| | import pdfplumber |
| | from docx import Document |
| | import subprocess |
| | import os |
| | from typing import Optional, List |
| | import string |
| | import random |
| | import re |
| | import requests |
| | from bs4 import BeautifulSoup |
| | import logging |
| | import time |
| | from urllib.parse import urlparse |
| |
|
| |
|
| | class URLTextExtractor: |
| | """ |
| | A comprehensive utility for extracting text content from web pages with advanced features. |
| | |
| | Features: |
| | - Rotating User-Agents to mimic different browsers |
| | - Robust error handling and retry mechanism |
| | - Section preservation for maintaining document structure |
| | - Configurable extraction options |
| | - Logging support |
| | |
| | Attributes: |
| | USER_AGENTS (list): A comprehensive list of user agent strings to rotate through. |
| | logger (logging.Logger): Logger for tracking extraction attempts and errors. |
| | |
| | Example: |
| | >>> extractor = URLTextExtractor() |
| | >>> text = extractor.extract_text_from_url('https://example.com') |
| | >>> print(text) |
| | """ |
| |
|
| | |
| | USER_AGENTS = [ |
| | |
| | "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
| | "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.1 Safari/605.1.15", |
| | "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0", |
| | |
| | "Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Mobile/15E148 Safari/604.1", |
| | "Mozilla/5.0 (Linux; Android 10; SM-G970F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.101 Mobile Safari/537.36", |
| | ] |
| |
|
| | def __init__(self, logger=None): |
| | """ |
| | Initialize the URLTextExtractor. |
| | |
| | Args: |
| | logger (logging.Logger, optional): Custom logger. |
| | If not provided, creates a default logger. |
| | """ |
| | self.logger = logger or self._create_default_logger() |
| |
|
| | def _create_default_logger(self): |
| | """ |
| | Create a default logger for tracking extraction process. |
| | |
| | Returns: |
| | logging.Logger: Configured logger instance |
| | """ |
| | logger = logging.getLogger(__name__) |
| | logger.setLevel(logging.INFO) |
| | handler = logging.StreamHandler() |
| | formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") |
| | handler.setFormatter(formatter) |
| | logger.addHandler(handler) |
| | return logger |
| |
|
| | def _process_element_text(self, element): |
| | """ |
| | Process text within an element, handling anchor tags specially. |
| | |
| | Args: |
| | element (bs4.element.Tag): BeautifulSoup element to process |
| | |
| | Returns: |
| | str: Processed text with proper spacing |
| | """ |
| | |
| | for a_tag in element.find_all("a"): |
| | |
| | a_tag.replace_with(f" {a_tag.get_text(strip=True)} ") |
| |
|
| | |
| | return element.get_text(separator=" ", strip=True) |
| |
|
| | def extract_text_from_url( |
| | self, |
| | url, |
| | max_retries=3, |
| | preserve_sections=True, |
| | min_section_length=30, |
| | allowed_tags=None, |
| | ): |
| | """ |
| | Extract text content from a given URL with advanced configuration. |
| | |
| | Args: |
| | url (str): The URL of the webpage to extract text from. |
| | max_retries (int, optional): Maximum number of retry attempts. Defaults to 3. |
| | preserve_sections (bool, optional): Whether to preserve section separations. Defaults to True. |
| | min_section_length (int, optional): Minimum length of text sections to include. Defaults to 30. |
| | allowed_tags (list, optional): Specific HTML tags to extract text from. |
| | If None, uses a default set of content-rich tags. |
| | |
| | Returns: |
| | str: Extracted text content from the webpage |
| | |
| | Raises: |
| | ValueError: If URL cannot be fetched after maximum retries |
| | requests.RequestException: For network-related errors |
| | |
| | Examples: |
| | >>> extractor = URLTextExtractor() |
| | >>> text = extractor.extract_text_from_url('https://example.com') |
| | >>> text = extractor.extract_text_from_url('https://example.com', preserve_sections=False) |
| | """ |
| | |
| | if allowed_tags is None: |
| | allowed_tags = [ |
| | "p", |
| | "div", |
| | "article", |
| | "section", |
| | "main", |
| | "h1", |
| | "h2", |
| | "h3", |
| | "h4", |
| | "h5", |
| | "h6", |
| | ] |
| |
|
| | |
| | try: |
| | parsed_url = urlparse(url) |
| | if not all([parsed_url.scheme, parsed_url.netloc]): |
| | |
| | return None |
| | except Exception as e: |
| | self.logger.error(f"URL parsing error: {e}") |
| | raise |
| |
|
| | for attempt in range(max_retries): |
| | try: |
| | |
| | headers = { |
| | "User-Agent": random.choice(self.USER_AGENTS), |
| | "Accept-Language": "en-US,en;q=0.9", |
| | "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", |
| | } |
| |
|
| | |
| | response = requests.get( |
| | url, headers=headers, timeout=10, allow_redirects=True |
| | ) |
| |
|
| | |
| | response.raise_for_status() |
| |
|
| | |
| | self.logger.info(f"Successfully fetched URL: {url}") |
| |
|
| | |
| | soup = BeautifulSoup(response.text, "html.parser") |
| |
|
| | |
| | for script in soup( |
| | ["script", "style", "head", "header", "footer", "nav"] |
| | ): |
| | script.decompose() |
| |
|
| | |
| | if preserve_sections: |
| | |
| | sections = [] |
| | for tag in allowed_tags: |
| | for element in soup.find_all(tag): |
| | |
| | section_text = self._process_element_text(element) |
| |
|
| | |
| | if len(section_text) >= min_section_length: |
| | sections.append(section_text) |
| |
|
| | |
| | text = "\n".join(sections) |
| | else: |
| | |
| | text = " ".join( |
| | self._process_element_text(element) |
| | for tag in allowed_tags |
| | for element in soup.find_all(tag) |
| | ) |
| |
|
| | |
| | text = "\n".join( |
| | line.strip() for line in text.split("\n") if line.strip() |
| | ) |
| |
|
| | return text |
| |
|
| | except (requests.RequestException, ValueError) as e: |
| | |
| | self.logger.warning(f"Attempt {attempt + 1} failed: {e}") |
| |
|
| | |
| | if attempt == max_retries - 1: |
| | self.logger.error( |
| | f"Failed to fetch URL after {max_retries} attempts" |
| | ) |
| | raise ValueError( |
| | f"Error fetching URL after {max_retries} attempts: {e}" |
| | ) |
| |
|
| | |
| | wait_time = 2**attempt |
| | self.logger.info(f"Waiting {wait_time} seconds before retry") |
| | time.sleep(wait_time) |
| |
|
| | |
| | return None |
| |
|
| |
|
| | def extract_text_from_pptx(file_path): |
| | prs = Presentation(file_path) |
| | text_content = [] |
| |
|
| | for slide in prs.slides: |
| | slide_text = [] |
| | for shape in slide.shapes: |
| | if hasattr(shape, "text"): |
| | slide_text.append(shape.text) |
| | text_content.append("\n".join(slide_text)) |
| |
|
| | return "\n\n".join(text_content) |
| |
|
| |
|
| | def extract_text_from_ppt(file_path): |
| | try: |
| | print("file_path = ", file_path) |
| | |
| | pptx_file_path = os.path.splitext(file_path)[0] + ".pptx" |
| | subprocess.run(["unoconv", "-f", "pptx", file_path], check=True) |
| |
|
| | |
| | presentation = Presentation(pptx_file_path) |
| | text_content = [] |
| |
|
| | for slide in presentation.slides: |
| | slide_text = [] |
| | for shape in slide.shapes: |
| | if hasattr(shape, "text"): |
| | slide_text.append(shape.text) |
| | text_content.append("\n".join(slide_text)) |
| |
|
| | |
| | os.remove(pptx_file_path) |
| |
|
| | out = "\n\n".join(text_content) |
| | return out |
| | except Exception as e: |
| | print(f"Error extracting text from PPT file: {e}") |
| | return "Error extracting text from PPT file" |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| |
|
| | def convert_pdf_to_image(file): |
| | images = convert_from_path(file) |
| | return images |
| |
|
| |
|
| | def extract_text_from_pdf(file): |
| | text = "" |
| | with pdfplumber.open(file) as pdf: |
| | for page in pdf.pages: |
| | text += page.extract_text() + "\n" |
| | return text |
| |
|
| |
|
| | def extract_text_from_docx(file_path): |
| | text = "" |
| | doc = Document(file_path.name) |
| | for paragraph in doc.paragraphs: |
| | text += paragraph.text + "\n" |
| | return text |
| |
|
| |
|
| | def convert_doc_to_text(file_path): |
| | try: |
| | subprocess.run( |
| | ["unoconv", "--format", "txt", file_path], |
| | capture_output=True, |
| | text=True, |
| | check=True, |
| | ) |
| | txt_file_path = file_path.replace(".doc", ".txt") |
| | with open(txt_file_path, "r") as f: |
| | text = f.read() |
| | text = text.lstrip("\ufeff") |
| | os.remove(txt_file_path) |
| | return text |
| | except subprocess.CalledProcessError as e: |
| | print(f"Error converting {file_path} to text: {e}") |
| | return "" |
| |
|
| |
|
| | |
| | def generate_random_string(length=23): |
| | characters = string.ascii_letters + string.digits |
| | random_string = "".join(random.choice(characters) for _ in range(length)) |
| | return random_string |
| |
|
| |
|
| | |
| | def handle_json_output(json_list: list): |
| | n = len(json_list) |
| | for i in range(n): |
| | |
| | random_string1 = generate_random_string() |
| | random_string2 = generate_random_string() |
| | element = json_list[i] |
| | front = element["frontText"] |
| | back = element["backText"] |
| | element["frontHTML"] = ( |
| | f'<div id="element-richtextarea-{random_string1}" style="position:absolute;left:100px;top:50px;width:800px;height:300px;text-align:center;display:flex;align-items:center;font-size:40px;">' |
| | f"<p>{front}</p></div>" |
| | ) |
| | element["backHTML"] = ( |
| | f'<div id="element-richtextarea-{random_string2}" style="position:absolute;left:100px;top:50px;width:800px;height:300px;text-align:center;display:flex;align-items:center;font-size:40px;">' |
| | f"<p>{back}</p></div>" |
| | ) |
| | element["termType"] = "basic" |
| | cloze_matches = re.findall(r"_{2,}", front) |
| | |
| | if (cloze_matches != []) & (len(cloze_matches) <= 2): |
| | |
| | element["termType"] = "cloze" |
| |
|
| | |
| | def replace_cloze(match): |
| | return f'</p><p><span class="closure">{back}</span></p><p>' |
| |
|
| | front_html = re.sub(r"_{2,}", replace_cloze, front) |
| | element["frontHTML"] = ( |
| | f'<div id="element-richtextarea-{random_string1}" style="position:absolute;left:100px;top:50px;width:800px;height:300px;text-align:center;display:flex;align-items:center;font-size:40px;">' |
| | f"<p>{front_html}</p></div>" |
| | ) |
| |
|
| | def replace_underscores(match): |
| | return f" {back} " |
| |
|
| | element["frontText"] = re.sub(r"_{2,}", replace_underscores, front) |
| | element["backText"] = "" |
| |
|
| | element["backHTML"] = ( |
| | f'<div id="element-richtextarea-{random_string2}" style="position:absolute;left:100px;top:50px;width:800px;height:300px;text-align:center;display:flex;align-items:center;font-size:40px;">' |
| | f"<p><br></p></div>" |
| | ) |
| |
|
| | return json_list |
| |
|
| |
|
| | def sanitize_list_of_lists(text: str) -> Optional[List[List]]: |
| | left = text.find("[") |
| | right = text.rfind("]") |
| | text = text[left : right + 1] |
| | try: |
| | |
| | list_of_lists = eval(text) |
| | if isinstance(list_of_lists, list): |
| | out = [] |
| | try: |
| | |
| | for front, back in list_of_lists: |
| | out.append({"frontText": front, "backText": back}) |
| | return handle_json_output(out) |
| | |
| | except Exception as e: |
| | print(e) |
| | |
| | if out != []: |
| | return handle_json_output(out) |
| | |
| | else: |
| | return None |
| | else: |
| | print("The evaluated object is not a list.") |
| | return None |
| | except Exception as e: |
| | print(f"Error parsing the list of lists: {e}") |
| | return None |
| |
|
| |
|
| | extractor = URLTextExtractor() |
| |
|
| |
|
| | def parse_url(url): |
| | return extractor.extract_text_from_url(url) |
| |
|