| | from transformers import AutoModel, AutoTokenizer |
| | from typing import Dict, List, Any |
| | import torch |
| | import base64 |
| | from io import BytesIO |
| | from PIL import Image |
| | import os |
| | import tempfile |
| |
|
| | class EndpointHandler: |
| | def __init__(self, model_dir = 'deepseek-ai/DeepSeek-OCR'): |
| | model_path = model_dir |
| | |
| | self.tokenizer = AutoTokenizer.from_pretrained( |
| | model_path, |
| | trust_remote_code=True, |
| | local_files_only=bool(model_dir) |
| | ) |
| | |
| | |
| | self.device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| | print(f"Using device: {self.device}") |
| | |
| | |
| | model_kwargs = { |
| | 'trust_remote_code': True, |
| | 'torch_dtype': torch.float32 |
| | } |
| | |
| | |
| | model_kwargs['_attn_implementation'] = 'eager' |
| | |
| | self.model = AutoModel.from_pretrained(model_path, **model_kwargs) |
| | self.model = self.model.eval() |
| | |
| | |
| | if self.device == 'cuda': |
| | self.model = self.model.cuda() |
| | |
| | def __call__(self, data: Dict[str, Any]) -> str: |
| | try: |
| | base64_string = None |
| | if "inputs" in data and isinstance(data["inputs"], str): |
| | base64_string = data["inputs"] |
| | |
| | |
| | elif "inputs" in data and isinstance(data["inputs"], dict): |
| | base64_string = data["inputs"].get("base64") |
| | |
| | |
| | elif "base64" in data: |
| | base64_string = data["base64"] |
| | |
| | |
| | elif isinstance(data, str): |
| | base64_string = data |
| | |
| | if not base64_string: |
| | return {"error": "No base64 string found in input data. Available keys: " + str(data.keys())} |
| | |
| | print("Found base64 string, length:", len(base64_string)) |
| | |
| | |
| | if ',' in base64_string: |
| | base64_string = base64_string.split(',')[1] |
| | |
| | |
| | image_data = base64.b64decode(base64_string) |
| | |
| | |
| | prompt = "<image>\n<|grounding|>Convert this document to markdown format using # headers, **bold** for important information, and Markdown table syntax (using | and -) instead of HTML." |
| | |
| | with tempfile.TemporaryDirectory() as temp_dir: |
| | |
| | image_path = os.path.join(temp_dir, "input_image.png") |
| | with open(image_path, "wb") as f: |
| | f.write(image_data) |
| | |
| | print(f"Image saved to: {image_path}") |
| | |
| | |
| | try: |
| | test_image = Image.open(image_path) |
| | if test_image.mode != 'RGB': |
| | test_image = test_image.convert('RGB') |
| | test_image.save(image_path) |
| | print(f"Image verified: {test_image.size}, mode: {test_image.mode}") |
| | except Exception as img_error: |
| | return {"error": f"Invalid image: {str(img_error)}"} |
| |
|
| | output_dir = os.path.join(temp_dir, "deepseek_out") |
| | os.makedirs(output_dir, exist_ok=True) |
| | |
| | |
| | result = self.model.infer( |
| | self.tokenizer, |
| | prompt=prompt, |
| | image_file=image_path, |
| | output_path=output_dir, |
| | base_size=1024, |
| | image_size=640, |
| | crop_mode=True, |
| | save_results=True, |
| | |
| | ) |
| |
|
| | for fname in os.listdir(output_dir): |
| | print("File:\n", fname) |
| | if fname.endswith(".md") or fname.endswith(".mmd"): |
| | md_path = os.path.join(output_dir, fname) |
| | with open(md_path, 'r', encoding='utf-8') as f: |
| | markdown = f.read() |
| | print("Markdown output:\n", markdown) |
| | return markdown |
| |
|
| | |
| | |
| | |
| | except Exception as e: |
| | print(f"Error processing image: {e}") |
| | return str(e) |