| | import os |
| | import re |
| | import time |
| | from typing import List, Optional, Dict, Any |
| | from pathlib import Path |
| |
|
| | from langchain_community.document_loaders import PyPDFLoader |
| | from langchain.text_splitter import RecursiveCharacterTextSplitter |
| | from langchain_core.documents import Document |
| |
|
| | from config import Config, AppConstants |
| | from models import DocumentMetadata, ProcessingStats |
| |
|
| | class DocumentProcessor: |
| | """Handles document loading, processing, and chunking.""" |
| | |
| | def __init__(self, base_path: str = None): |
| | """Initialize the document processor. |
| | |
| | Args: |
| | base_path: Base path for document directories |
| | """ |
| | self.base_path = base_path or Config.DATA_BASE_PATH |
| | self.text_splitter = RecursiveCharacterTextSplitter( |
| | chunk_size=Config.CHUNK_SIZE, |
| | chunk_overlap=Config.CHUNK_OVERLAP, |
| | length_function=len, |
| | separators=["\n\n", "\n", " ", ""] |
| | ) |
| | |
| | def process_all_documents(self) -> List[Document]: |
| | """Process both markdown and PDF documents from courses and programs directories. |
| | |
| | Returns: |
| | List of processed documents with proper metadata |
| | """ |
| | start_time = time.time() |
| | |
| | documents = { |
| | 'courses': [], |
| | 'programs': [] |
| | } |
| | |
| | |
| | paths = self._get_document_paths() |
| | |
| | |
| | self._ensure_directories_exist(paths) |
| | |
| | |
| | for category in ['courses', 'programs']: |
| | |
| | md_path = paths[f'{category}_md'] |
| | if os.path.exists(md_path): |
| | documents[category].extend(self._process_markdown_files(md_path, category)) |
| | |
| | |
| | pdf_path = paths[f'{category}_pdf'] |
| | if os.path.exists(pdf_path): |
| | documents[category].extend(self._process_pdf_files(pdf_path, category)) |
| | |
| | print(f"Processed {len(documents[category])} {category} documents") |
| | |
| | |
| | all_documents = documents['courses'] + documents['programs'] |
| | |
| | |
| | processing_time = time.time() - start_time |
| | stats = ProcessingStats( |
| | total_documents=len(all_documents), |
| | courses_processed=len(documents['courses']), |
| | programs_processed=len(documents['programs']), |
| | chunks_created=0, |
| | processing_time=processing_time |
| | ) |
| | |
| | print(f"Total documents processed: {len(all_documents)}") |
| | print(f"Courses: {len(documents['courses'])}, Programs: {len(documents['programs'])}") |
| | print(f"Processing time: {processing_time:.2f} seconds") |
| | |
| | return all_documents |
| | |
| | def chunk_documents(self, documents: List[Document]) -> List[Document]: |
| | """Split documents into chunks for embedding. |
| | |
| | Args: |
| | documents: List of documents to chunk |
| | |
| | Returns: |
| | List of document chunks |
| | """ |
| | print(f"Splitting {len(documents)} documents into chunks...") |
| | chunks = self.text_splitter.split_documents(documents) |
| | print(f"Created {len(chunks)} document chunks") |
| | return chunks |
| | |
| | def _get_document_paths(self) -> Dict[str, str]: |
| | """Get paths for different document types. |
| | |
| | Returns: |
| | Dictionary with document paths |
| | """ |
| | return { |
| | 'courses_md': os.path.join(self.base_path, Config.COURSES_MD_PATH), |
| | 'courses_pdf': os.path.join(self.base_path, Config.COURSES_PDF_PATH), |
| | 'programs_md': os.path.join(self.base_path, Config.PROGRAMS_MD_PATH), |
| | 'programs_pdf': os.path.join(self.base_path, Config.PROGRAMS_PDF_PATH) |
| | } |
| | |
| | def _ensure_directories_exist(self, paths: Dict[str, str]) -> None: |
| | """Ensure all document directories exist. |
| | |
| | Args: |
| | paths: Dictionary of paths to create |
| | """ |
| | for path in paths.values(): |
| | if not os.path.exists(path): |
| | os.makedirs(path, exist_ok=True) |
| | print(f"Created directory: {path}") |
| | |
| | def _process_markdown_files(self, path: str, category: str) -> List[Document]: |
| | """Process markdown files in a directory. |
| | |
| | Args: |
| | path: Path to the markdown files directory |
| | category: Type of documents ('courses' or 'programs') |
| | |
| | Returns: |
| | List of processed markdown documents with metadata |
| | """ |
| | documents = [] |
| | |
| | if not os.path.exists(path): |
| | print(f"Warning: Markdown directory {path} does not exist") |
| | return documents |
| | |
| | for filename in os.listdir(path): |
| | if filename.endswith('.md'): |
| | file_path = os.path.join(path, filename) |
| | try: |
| | content = self._read_file_with_fallback_encoding(file_path) |
| | |
| | |
| | metadata = { |
| | 'source': file_path, |
| | 'type': 'markdown', |
| | 'category': category, |
| | 'doc_type': category.rstrip('s'), |
| | 'filename': filename |
| | } |
| | |
| | |
| | if category == 'courses': |
| | code = self._extract_course_code(filename, content) |
| | if code: |
| | metadata['course_code'] = code |
| | |
| | doc = Document( |
| | page_content=content, |
| | metadata=metadata |
| | ) |
| | documents.append(doc) |
| | |
| | except Exception as e: |
| | print(f"Error processing markdown file {filename}: {str(e)}") |
| | |
| | return documents |
| | |
| | def _process_pdf_files(self, path: str, category: str) -> List[Document]: |
| | """Process PDF files in a directory. |
| | |
| | Args: |
| | path: Path to the PDF files directory |
| | category: Type of documents ('courses' or 'programs') |
| | |
| | Returns: |
| | List of processed PDF documents with metadata |
| | """ |
| | documents = [] |
| | |
| | if not os.path.exists(path): |
| | print(f"Warning: PDF directory {path} does not exist") |
| | return documents |
| | |
| | for filename in os.listdir(path): |
| | if filename.endswith('.pdf'): |
| | file_path = os.path.join(path, filename) |
| | try: |
| | loader = PyPDFLoader(file_path) |
| | pdf_docs = loader.load() |
| | |
| | |
| | metadata = { |
| | 'type': 'pdf', |
| | 'category': category, |
| | 'doc_type': category.rstrip('s'), |
| | 'filename': filename |
| | } |
| | |
| | |
| | if category == 'courses' and pdf_docs: |
| | code = self._extract_course_code(filename, pdf_docs[0].page_content) |
| | if code: |
| | metadata['course_code'] = code |
| | |
| | |
| | for doc in pdf_docs: |
| | doc.metadata.update(metadata) |
| | |
| | documents.extend(pdf_docs) |
| | |
| | except Exception as e: |
| | print(f"Error processing PDF {filename}: {str(e)}") |
| | |
| | return documents |
| | |
| | def _read_file_with_fallback_encoding(self, file_path: str) -> str: |
| | """Read a file with fallback encodings. |
| | |
| | Args: |
| | file_path: Path to the file to read |
| | |
| | Returns: |
| | File content as string |
| | |
| | Raises: |
| | UnicodeDecodeError: If file cannot be read with any encoding |
| | """ |
| | for encoding in AppConstants.SUPPORTED_FILE_ENCODINGS: |
| | try: |
| | with open(file_path, 'r', encoding=encoding) as f: |
| | return f.read() |
| | except UnicodeDecodeError: |
| | continue |
| | |
| | raise UnicodeDecodeError(f"Failed to read {file_path} with any encoding") |
| | |
| | def _extract_course_code(self, filename: str, content: str) -> Optional[str]: |
| | """Extract course code from filename or content if possible. |
| | |
| | Args: |
| | filename: Name of the file |
| | content: Content of the document |
| | |
| | Returns: |
| | Course code if found, None otherwise |
| | """ |
| | |
| | code_match = re.search(r'([A-Z]{3}\d{3})', filename) |
| | if code_match: |
| | return code_match.group(1) |
| | |
| | |
| | code_match = re.search(r'([A-Z]{3}\d{3})', content[:1000]) |
| | if code_match: |
| | return code_match.group(1) |
| | |
| | return None |
| | |
| | def get_document_stats(self, documents: List[Document]) -> Dict[str, Any]: |
| | """Get statistics about processed documents. |
| | |
| | Args: |
| | documents: List of processed documents |
| | |
| | Returns: |
| | Dictionary with document statistics |
| | """ |
| | stats = { |
| | 'total_documents': len(documents), |
| | 'by_category': {}, |
| | 'by_type': {}, |
| | 'by_doc_type': {}, |
| | 'course_codes': set(), |
| | 'total_content_length': 0 |
| | } |
| | |
| | for doc in documents: |
| | metadata = doc.metadata |
| | |
| | |
| | category = metadata.get('category', 'unknown') |
| | stats['by_category'][category] = stats['by_category'].get(category, 0) + 1 |
| | |
| | |
| | file_type = metadata.get('type', 'unknown') |
| | stats['by_type'][file_type] = stats['by_type'].get(file_type, 0) + 1 |
| | |
| | |
| | doc_type = metadata.get('doc_type', 'unknown') |
| | stats['by_doc_type'][doc_type] = stats['by_doc_type'].get(doc_type, 0) + 1 |
| | |
| | |
| | if metadata.get('course_code'): |
| | stats['course_codes'].add(metadata['course_code']) |
| | |
| | |
| | stats['total_content_length'] += len(doc.page_content) |
| | |
| | |
| | stats['course_codes'] = list(stats['course_codes']) |
| | stats['unique_course_codes'] = len(stats['course_codes']) |
| | |
| | return stats |
| | |
| | def validate_documents(self, documents: List[Document]) -> Dict[str, Any]: |
| | """Validate processed documents for common issues. |
| | |
| | Args: |
| | documents: List of documents to validate |
| | |
| | Returns: |
| | Dictionary with validation results |
| | """ |
| | validation_results = { |
| | 'total_documents': len(documents), |
| | 'issues': [], |
| | 'warnings': [], |
| | 'valid_documents': 0, |
| | 'empty_documents': 0, |
| | 'missing_metadata': 0 |
| | } |
| | |
| | for i, doc in enumerate(documents): |
| | |
| | if not doc.page_content or len(doc.page_content.strip()) == 0: |
| | validation_results['empty_documents'] += 1 |
| | validation_results['issues'].append(f"Document {i}: Empty content") |
| | continue |
| | |
| | |
| | required_metadata = ['source', 'type', 'category', 'doc_type', 'filename'] |
| | missing_fields = [field for field in required_metadata if not doc.metadata.get(field)] |
| | |
| | if missing_fields: |
| | validation_results['missing_metadata'] += 1 |
| | validation_results['warnings'].append( |
| | f"Document {i}: Missing metadata fields: {missing_fields}" |
| | ) |
| | |
| | |
| | if len(doc.page_content) < 50: |
| | validation_results['warnings'].append( |
| | f"Document {i}: Very short content ({len(doc.page_content)} chars)" |
| | ) |
| | |
| | validation_results['valid_documents'] += 1 |
| | |
| | return validation_results |