professorIa / app.py
dnzita's picture
math
e655aa6
import os
import gradio as gr
from PIL import Image
import warnings
import torch
from typing import List, Tuple, Dict, Any
import time
import json
import uuid
from datetime import datetime
from threading import Lock, Thread
import tempfile
import shutil
from pathlib import Path
import yaml
from typing import Optional
from fastapi import FastAPI
from pydantic import BaseModel
# HTRflow (mesma lógica do diretório app)
from htrflow.pipeline.pipeline import Pipeline
from htrflow.pipeline.steps import init_step
from htrflow.volume.volume import Collection
# Suprimir avisos desnecessários
warnings.filterwarnings("ignore")
# Configurar cache para modelos
os.environ.setdefault("TRANSFORMERS_CACHE", "/tmp/transformers_cache")
os.environ.setdefault("HF_HOME", "/tmp/hf_home")
# Otimizações CPU
os.environ.setdefault("OMP_NUM_THREADS", "1")
os.environ.setdefault("MKL_NUM_THREADS", "1")
torch.set_num_threads(1)
# Sistema de Status Global
class StatusManager:
def __init__(self):
self.jobs = {}
self.lock = Lock()
self.system_status = {
"model_loaded": False,
"system_ready": False,
"total_processed": 0,
"startup_time": datetime.now().isoformat(),
"version": "2.0.0"
}
self.cancelled_jobs = set()
def create_job(self, image_name: str = "unknown") -> str:
job_id = str(uuid.uuid4())
with self.lock:
self.jobs[job_id] = {
"id": job_id,
"status": "pending",
"progress": 0,
"stage": "iniciando",
"image_name": image_name,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"result": None,
"error": None,
"processing_time": None,
"stages_completed": []
}
return job_id
def update_job(self, job_id: str, **kwargs):
with self.lock:
if job_id in self.jobs:
self.jobs[job_id].update(kwargs)
self.jobs[job_id]["updated_at"] = datetime.now().isoformat()
def get_job(self, job_id: str) -> Dict[str, Any]:
with self.lock:
return self.jobs.get(job_id, {})
def get_all_jobs(self) -> Dict[str, Any]:
with self.lock:
return dict(self.jobs)
def complete_job(self, job_id: str, result: str):
with self.lock:
if job_id in self.jobs:
self.jobs[job_id].update({
"status": "completed",
"progress": 100,
"stage": "concluído",
"result": result,
"processing_time": time.time() - time.mktime(
datetime.fromisoformat(self.jobs[job_id]["created_at"]).timetuple()
)
})
self.system_status["total_processed"] += 1
def fail_job(self, job_id: str, error: str):
with self.lock:
if job_id in self.jobs:
self.jobs[job_id].update({
"status": "failed",
"stage": "erro",
"error": error
})
def cancel_job(self, job_id: str):
with self.lock:
if job_id in self.jobs and self.jobs[job_id]["status"] == "processing":
self.jobs[job_id].update({
"status": "cancelled",
"stage": "cancelado",
"error": "Job cancelado pelo usuário"
})
self.cancelled_jobs.add(job_id)
return True
return False
# Instanciar o gerenciador de status
status_manager = StatusManager()
# Lazy-load: marcamos não carregado até inicializar a pipeline HTRflow
status_manager.system_status["model_loaded"] = False
status_manager.system_status["system_ready"] = False
# YAML das pipelines por idioma (baseadas em app/assets/templates/)
LANGUAGE_PIPELINES = {
"English": {
"yaml": """
steps:
- step: Segmentation
settings:
model: yolo
model_settings:
model: Riksarkivet/yolov9-lines-within-regions-1
- step: TextRecognition
settings:
model: TrOCR
model_settings:
model: microsoft/trocr-base-handwritten
generation_settings:
batch_size: 16
- step: OrderLines
""",
"description": "Modelo geral para português e inglês manuscrito moderno"
},
"Swedish": {
"yaml": """
steps:
- step: Segmentation
settings:
model: yolo
model_settings:
model: Riksarkivet/yolov9-lines-within-regions-1
- step: TextRecognition
settings:
model: TrOCR
model_settings:
model: Riksarkivet/trocr-base-handwritten-hist-swe-2
generation_settings:
batch_size: 16
- step: OrderLines
""",
"description": "Modelo especializado para sueco histórico manuscrito"
},
"Norwegian": {
"yaml": """
steps:
- step: Segmentation
settings:
model: yolo
model_settings:
model: Riksarkivet/yolov9-lines-within-regions-1
- step: TextRecognition
settings:
model: TrOCR
model_settings:
model: Sprakbanken/TrOCR-norhand-v3
generation_settings:
batch_size: 16
- step: OrderLines
""",
"description": "Modelo especializado para norueguês histórico manuscrito"
},
"Medieval": {
"yaml": """
steps:
- step: Segmentation
settings:
model: yolo
model_settings:
model: Riksarkivet/yolov9-lines-within-regions-1
- step: TextRecognition
settings:
model: TrOCR
model_settings:
model: medieval-data/trocr-medieval-base
generation_settings:
batch_size: 16
- step: OrderLines
""",
"description": "Modelo para textos medievais e manuscritos históricos"
}
,"Math": {
"yaml": """
steps:
- step: Segmentation
settings:
model: yolo
model_settings:
model: Riksarkivet/yolov9-lines-within-regions-1
- step: TextRecognition
settings:
model: TrOCR
model_settings:
model: fhswf/TrOCR_Math_handwritten
generation_settings:
batch_size: 16
- step: OrderLines
""",
"description": "Modelo especializado para escrita matemática manuscrita"
}
}
class PipelineWithProgress(Pipeline):
@classmethod
def from_config(cls, config: Dict[str, Any]):
return cls([init_step(step["step"], step.get("settings", {})) for step in config["steps"]])
def run(self, collection, start=0, progress=None):
total_steps = len(self.steps[start:])
for i, step in enumerate(self.steps[start:]):
if progress is not None:
try:
progress((i + 1) / max(1, total_steps))
except Exception:
pass
collection = step.run(collection)
return collection
_PIPELINE_CACHE: Dict[str, Optional[PipelineWithProgress]] = {}
def _get_htrflow_pipeline(language: str = "English") -> PipelineWithProgress:
global _PIPELINE_CACHE
if language not in _PIPELINE_CACHE or _PIPELINE_CACHE[language] is None:
if language not in LANGUAGE_PIPELINES:
language = "English" # fallback
config = yaml.safe_load(LANGUAGE_PIPELINES[language]["yaml"])
_PIPELINE_CACHE[language] = PipelineWithProgress.from_config(config)
status_manager.system_status["model_loaded"] = True
status_manager.system_status["system_ready"] = True
print(f"✅ HTRflow pipeline inicializada para {language}")
return _PIPELINE_CACHE[language]
def preprocess_image(image: Image.Image) -> Image.Image:
"""Mantida para compatibilidade, não utilizada com HTRflow (pipeline cuida disso)."""
return image
def segment_text_lines(image: Image.Image) -> List[Image.Image]:
"""Mantida para compatibilidade, não utilizada com HTRflow (pipeline faz segmentação)."""
return [image]
def recognize_text_batch(line_images: List[Image.Image], language: str = "English") -> str:
"""Substituída pela execução da pipeline HTRflow. Mantida por compatibilidade."""
return htrflow_transcribe(line_images[0], language) if line_images else ""
def _read_txt_from_collection(collection: Collection) -> str:
"""Exporta a collection como TXT e retorna o conteúdo concatenado."""
temp_dir = tempfile.mkdtemp(prefix="htrflow_txt_")
try:
collection.save(directory=temp_dir, serializer="txt")
texts: list[str] = []
for root, _, files in os.walk(temp_dir):
for fname in files:
if fname.lower().endswith(".txt"):
with open(Path(root) / fname, "r", encoding="utf-8") as f:
texts.append(f.read().strip())
return "\n".join([t for t in texts if t])
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
def htrflow_transcribe(image: Image.Image, language: str = "English") -> str:
"""Executa a mesma pipeline do diretório app (HTRflow) e retorna a transcrição em texto puro."""
pipe = _get_htrflow_pipeline(language)
# Salvar imagem temporariamente como arquivo (HTRflow espera caminhos)
temp_dir = tempfile.mkdtemp(prefix="htrflow_img_")
temp_image_path = Path(temp_dir) / "input.png"
image.save(temp_image_path)
try:
collection = Collection([str(temp_image_path)])
collection.label = "demo_output"
collection = pipe.run(collection)
return _read_txt_from_collection(collection).strip()
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
def _sanitize_text(text: str) -> str:
# Normaliza espaços e remove quebras de linha indesejadas
text = " ".join(text.replace("\n", " ").split())
return text.strip()
def ocr_handwritten(image, language="English"):
if image is None:
return "Por favor, carregue uma imagem."
# Criar job de monitoramento
image_name = getattr(image, 'name', 'unknown') if hasattr(image, 'name') else 'unknown'
job_id = status_manager.create_job(image_name)
try:
# 1. (HTRflow cuida do pré-processamento internamente)
status_manager.update_job(job_id,
status="processing",
progress=10,
stage=f"inicializando pipeline ({language})")
# Checar cancelamento
if job_id in status_manager.cancelled_jobs:
status_manager.fail_job(job_id, "Job cancelado pelo usuário")
return "❌ Job cancelado."
# 2. Carregar/obter pipeline e segmentação
status_manager.update_job(job_id,
progress=30,
stage=f"segmentação (HTRflow-{language})")
_ = _get_htrflow_pipeline(language) # garante cache carregado
if job_id in status_manager.cancelled_jobs:
status_manager.fail_job(job_id, "Job cancelado pelo usuário")
return "❌ Job cancelado."
# 3. Reconhecimento de texto (pipeline completa)
status_manager.update_job(job_id,
progress=70,
stage=f"reconhecimento (HTRflow-{language})")
if job_id in status_manager.cancelled_jobs:
status_manager.fail_job(job_id, "Job cancelado pelo usuário")
return "❌ Job cancelado."
recognized_text = htrflow_transcribe(image, language)
# 4. Pós-processamento
status_manager.update_job(job_id,
progress=90,
stage="finalizando")
if job_id in status_manager.cancelled_jobs:
status_manager.fail_job(job_id, "Job cancelado pelo usuário")
return "❌ Job cancelado."
final_text = _sanitize_text(recognized_text)
if not final_text.strip():
status_manager.fail_job(job_id, "Texto não foi reconhecido")
return "❌ Não foi possível reconhecer texto na imagem. Tente com uma imagem mais nítida."
# Completar job
status_manager.complete_job(job_id, final_text)
return final_text
except Exception as e:
error_msg = f"Erro no processamento: {str(e)}"
status_manager.fail_job(job_id, error_msg)
print(f"❌ {error_msg}")
return f"❌ Erro ao processar imagem: {str(e)}"
# -----------------------------
# API-First (Jobs assíncronos)
# -----------------------------
def _process_job_worker(job_id: str, image_path: str, language: str):
"""Worker que processa o job em background."""
try:
# Abrir imagem
try:
img = Image.open(image_path)
except Exception as e:
status_manager.fail_job(job_id, f"Falha ao abrir imagem: {e}")
return
# 1. Inicialização
status_manager.update_job(job_id,
status="processing",
progress=10,
stage=f"inicializando pipeline ({language})")
if job_id in status_manager.cancelled_jobs:
status_manager.fail_job(job_id, "Job cancelado pelo usuário")
return
# 2. Pipeline / segmentação
status_manager.update_job(job_id,
progress=30,
stage=f"segmentação (HTRflow-{language})")
_ = _get_htrflow_pipeline(language)
if job_id in status_manager.cancelled_jobs:
status_manager.fail_job(job_id, "Job cancelado pelo usuário")
return
# 3. Reconhecimento
status_manager.update_job(job_id,
progress=70,
stage=f"reconhecimento (HTRflow-{language})")
if job_id in status_manager.cancelled_jobs:
status_manager.fail_job(job_id, "Job cancelado pelo usuário")
return
recognized_text = htrflow_transcribe(img, language)
# 4. Pós-processamento
status_manager.update_job(job_id,
progress=90,
stage="finalizando")
if job_id in status_manager.cancelled_jobs:
status_manager.fail_job(job_id, "Job cancelado pelo usuário")
return
final_text = _sanitize_text(recognized_text)
if not final_text.strip():
status_manager.fail_job(job_id, "Texto não foi reconhecido")
return
status_manager.complete_job(job_id, final_text)
except Exception as e:
status_manager.fail_job(job_id, f"Erro interno: {e}")
def start_job_api(image, language="English"):
"""Inicia um job assíncrono e retorna somente o job_id.
Espera-se que 'image' seja um objeto do Gradio (PIL) ou caminho temporário.
"""
if image is None:
return {"error": "Imagem não enviada"}
# Determinar nome e salvar temporariamente se for PIL
if isinstance(image, Image.Image):
temp_dir = tempfile.mkdtemp(prefix="jobimg_")
image_path = str(Path(temp_dir) / "input.png")
image.save(image_path)
cleanup_dir = temp_dir
elif isinstance(image, (str, Path)) and os.path.exists(str(image)):
image_path = str(image)
cleanup_dir = None
else:
return {"error": "Formato de imagem não suportado"}
job_id = status_manager.create_job(Path(image_path).name)
# Thread para processamento
t = Thread(target=_process_job_worker, args=(job_id, image_path, language), daemon=True)
t.start()
return {"job_id": job_id, "status": "started"}
def get_job_status_api(job_id: str):
job = status_manager.get_job(job_id)
if not job:
return {"error": "Job não encontrado"}
return job
def list_jobs_api():
return status_manager.get_all_jobs()
def cancel_job_rest(job_id: str):
ok = status_manager.cancel_job(job_id)
if ok:
return {"success": True, "message": "Job cancelado"}
return {"success": False, "message": "Não foi possível cancelar (verifique status)"}
def start_job_from_path(path: str = None, language: str = "English"):
"""Função interna usada pelas rotas REST."""
if not path:
return {"error": "'path' não fornecido"}
if not os.path.exists(path):
return {"error": f"Arquivo não encontrado no servidor: {path}"}
job_id = status_manager.create_job(Path(path).name)
t = Thread(target=_process_job_worker, args=(job_id, path, language), daemon=True)
t.start()
return {"job_id": job_id, "status": "started", "language": language}
# --------- FastAPI Wrappers (evita dependência de add_server_route do Gradio) ---------
class StartFromPathBody(BaseModel):
path: str
language: str = "English"
class CancelBody(BaseModel):
reason: Optional[str] = None # reservado para futuro
# FastAPI app principal
api_app = FastAPI(title="Professor IA OCR API", version="2.0")
@api_app.get("/jobs")
def api_list_jobs():
return list_jobs_api()
@api_app.get("/jobs/{job_id}")
def api_job_status(job_id: str):
return get_job_status_api(job_id)
@api_app.post("/jobs/{job_id}/cancel")
def api_cancel_job(job_id: str, body: CancelBody | None = None): # body não usado ainda
return cancel_job_rest(job_id)
@api_app.post("/jobs/start_from_path")
def api_start_from_path(body: StartFromPathBody):
return start_job_from_path(body.path, body.language)
@api_app.get("/system/status")
def api_system_status():
return get_system_status()
# Endpoints para Monitoramento
def get_system_status():
"""Retorna status do sistema"""
return status_manager.system_status
def get_job_status(job_id: str):
"""Retorna status de um job específico"""
job = status_manager.get_job(job_id)
if not job:
return {"error": "Job não encontrado"}
return job
def get_all_jobs_status():
"""Retorna todos os jobs"""
return status_manager.get_all_jobs()
# Endpoint para cancelar job
def cancel_job_api(job_id: str):
ok = status_manager.cancel_job(job_id)
if ok:
return {"success": True, "message": "Job cancelado"}
else:
return {"success": False, "message": "Não foi possível cancelar (job não está em processamento)"}
# Interface Gradio
with gr.Blocks(
title="Professor IA - OCR para Escrita Manual",
theme=gr.themes.Soft(),
css="""
.gradio-container {
max-width: 1200px;
margin: auto;
}
"""
) as iface:
gr.HTML("""
<div style="text-align: center; margin-bottom: 20px;">
<h1>🎓 Professor IA - OCR para Escrita Manual</h1>
<p>Faça upload de uma imagem com texto manuscrito e obtenha a transcrição usando IA</p>
</div>
""")
with gr.Row(equal_height=True):
with gr.Column(scale=1):
image_input = gr.Image(
type="pil",
label="📤 Carregue sua imagem aqui",
height=400
)
submit_btn = gr.Button(
"🔍 Transcrever Texto",
variant="primary",
size="lg"
)
# Seleção de Idioma/Pipeline
with gr.Group():
gr.HTML("<h3>🌍 Idioma/Modelo</h3>")
language_dropdown = gr.Dropdown(
choices=list(LANGUAGE_PIPELINES.keys()),
value="English",
label="Selecione o idioma/modelo",
info="Escolha o modelo mais adequado para seu texto"
)
language_info = gr.HTML(
value=LANGUAGE_PIPELINES["English"]["description"],
label="Descrição do modelo"
)
# Status do Sistema
with gr.Group():
gr.HTML("<h3>📊 Status do Sistema</h3>")
system_status_display = gr.JSON(
label="Status",
value=status_manager.system_status
)
refresh_status_btn = gr.Button(
"🔄 Atualizar Status",
variant="secondary",
size="sm"
)
gr.HTML("""
<div style="margin-top: 15px; padding: 10px; background: #f0f0f0; border-radius: 5px;">
<small>
<b>💡 Dicas para melhor resultado:</b><br>
• Use imagens claras e bem iluminadas<br>
• Selecione o idioma/modelo correto acima<br>
• Evite letras muito pequenas ou borradas<br>
• Funciona melhor com texto organizado em linhas<br>
• Para documentos longos, pode demorar mais para processar<br>
• <b>Modelos disponíveis:</b> Português/Inglês, Sueco, Norueguês, Medieval, Math
</small>
</div>
""")
with gr.Column(scale=1):
text_output = gr.Textbox(
label="📝 Texto Transcrito",
lines=10,
placeholder="O texto transcrito aparecerá aqui...",
max_lines=15,
show_copy_button=True
)
# Monitor de Jobs
with gr.Group():
gr.HTML("<h3>📈 Monitor de Processamento</h3>")
jobs_display = gr.JSON(
label="Jobs Ativos",
value={}
)
refresh_jobs_btn = gr.Button(
"🔄 Atualizar Jobs",
variant="secondary",
size="sm"
)
cancel_job_id = gr.Textbox(
label="ID do Job para Cancelar",
placeholder="Cole o ID do job aqui"
)
cancel_job_btn = gr.Button(
"❌ Cancelar Job",
variant="stop",
size="sm"
)
# Event Handlers
submit_btn.click(
fn=ocr_handwritten,
inputs=[image_input, language_dropdown],
outputs=text_output,
show_progress=True
)
# Novo: endpoint Gradio para iniciar job assíncrono (retorna somente job_id)
start_job_btn = gr.Button("🧵 Iniciar Job Assíncrono (retorna job_id)", variant="secondary")
job_id_box = gr.Textbox(label="Job ID Gerado", interactive=False)
start_job_btn.click(
fn=start_job_api,
inputs=[image_input, language_dropdown],
outputs=job_id_box
)
# Atualizar descrição do modelo quando idioma muda
def update_language_info(language):
return LANGUAGE_PIPELINES[language]["description"]
language_dropdown.change(
fn=update_language_info,
inputs=language_dropdown,
outputs=language_info
)
# Atualizar status do sistema
refresh_status_btn.click(
fn=get_system_status,
outputs=system_status_display
)
# Atualizar jobs
refresh_jobs_btn.click(
fn=get_all_jobs_status,
outputs=jobs_display
)
# Cancelar job
cancel_job_btn.click(
fn=cancel_job_api,
inputs=cancel_job_id,
outputs=jobs_display
)
# Também permitir transcrição automática ao carregar imagem
image_input.change(
fn=ocr_handwritten,
inputs=[image_input, language_dropdown],
outputs=text_output,
show_progress=True
)
# Configuração para Hugging Face Spaces / execução local
# Montamos a interface Gradio dentro do FastAPI para garantir rotas REST sempre disponíveis.
iface.queue(default_concurrency_limit=20)
# Gradio fornece helper para montar dentro de FastAPI
try:
# A partir do Gradio 4.x: gr.mount_gradio_app
from gradio import mount_gradio_app
app = mount_gradio_app(api_app, iface, path="/")
print("✅ Gradio montado dentro do FastAPI (mount_gradio_app)")
except Exception:
# Fallback manual: anexar atributo .app (algumas versões expõem .app em launch)
app = api_app
print("⚠️ mount_gradio_app indisponível - será necessário iniciar via iface.launch() em modo standalone.")
if __name__ == "__main__":
# Se mount funcionou, apenas rodar via uvicorn implícito do launch apontando para app.
try:
import uvicorn # garantido em HF spaces base; se não, fallback
# Executa uvicorn servindo FastAPI com Gradio montado.
uvicorn.run(app, host="0.0.0.0", port=7860)
except Exception:
# Fallback: lançar somente interface (perderá rotas REST se não montadas).
iface.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
quiet=False
)