samaan / utils /file_utils.py
Copilot Sync Bot
Sync backend from main branch
26c9046
from __future__ import annotations
import os
import uuid
from pathlib import Path
from typing import Tuple
from fastapi import UploadFile, HTTPException, status
UPLOAD_ROOT = Path(os.getenv("SAMAAN_UPLOADS_DIR", os.getenv("SAMAAN_UPLOAD_DIR", "/data/uploads")))
def ensure_upload_root() -> Path:
for candidate in [UPLOAD_ROOT, Path("/tmp/samaan-uploads"), Path(__file__).resolve().parents[1] / "data" / "uploads"]:
try:
candidate.mkdir(parents=True, exist_ok=True)
return candidate
except PermissionError:
continue
raise PermissionError("Unable to create an upload directory")
def save_uploaded_file(file: UploadFile) -> Tuple[str, str, str]:
root = ensure_upload_root()
suffix = Path(file.filename or "").suffix.lower()
if not suffix:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Uploaded file must have an extension")
filename = f"{uuid.uuid4().hex}{suffix}"
dest = root / filename
contents = file.file.read()
if not contents:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Uploaded file is empty")
dest.write_bytes(contents)
content_type = file.content_type or "application/octet-stream"
file_type = "video" if content_type.startswith("video") else "photo"
return str(dest), file_type, file.filename or filename
def read_file_bytes(path: str) -> bytes:
return Path(path).read_bytes()