Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import shutil | |
| import tempfile | |
| from datetime import datetime | |
| from flask import Flask, request, jsonify | |
| from huggingface_hub import login, HfApi | |
| from werkzeug.utils import secure_filename | |
| app = Flask(__name__) | |
| # Load environment variables | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| REPO_ID = os.getenv("HF_REPO_ID") | |
| # Initialize Hugging Face API | |
| login(token=HF_TOKEN) | |
| api = HfApi() | |
| def upload_image(): | |
| try: | |
| # Check if required data is present | |
| if 'image' not in request.files or 'guid' not in request.form or 'ip' not in request.form: | |
| return jsonify({"error": "Missing image, guid, or ip"}), 400 | |
| image = request.files['image'] | |
| guid = secure_filename(request.form['guid']) # Sanitize GUID | |
| ip = request.form['ip'] | |
| # Validate image | |
| if not image or not image.filename: | |
| return jsonify({"error": "No valid image provided"}), 400 | |
| # Create temporary directory | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| # Create images folder | |
| temp_image_dir = os.path.join(temp_dir, "images") | |
| os.makedirs(temp_image_dir, exist_ok=True) | |
| # Save image with guid.jpg extension | |
| temp_image_path = os.path.join(temp_image_dir, f"{guid}.jpg") | |
| image.save(temp_image_path) | |
| # Create metadata | |
| path_in_repo = f"images/{guid}.jpg" | |
| metadata = { | |
| "file_name": path_in_repo, | |
| "guid": guid, | |
| "ip": ip, | |
| "upload_timestamp": datetime.utcnow().isoformat(), | |
| } | |
| # Path for temporary metadata file | |
| metadata_file = os.path.join(temp_dir, "metadata.jsonl") | |
| # Try to download existing metadata.jsonl from the repository | |
| try: | |
| api.hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename="metadata.jsonl", | |
| repo_type="dataset", | |
| local_dir=temp_dir, | |
| ) | |
| # Read existing metadata and append new entry | |
| with open(metadata_file, "r") as f: | |
| existing_metadata = f.readlines() | |
| except Exception: | |
| # If metadata.jsonl doesn't exist, start with an empty list | |
| existing_metadata = [] | |
| # Append new metadata | |
| with open(metadata_file, "w") as f: | |
| # Write existing metadata back | |
| for line in existing_metadata: | |
| f.write(line) | |
| # Append new metadata | |
| f.write(json.dumps(metadata) + "\n") | |
| # Upload to Hugging Face | |
| api.upload_folder( | |
| folder_path=temp_dir, | |
| path_in_repo="", | |
| repo_id=REPO_ID, | |
| repo_type="dataset" | |
| ) | |
| return jsonify({"message": "Image and metadata uploaded successfully"}), 200 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| if __name__ == '__main__': | |
| print('''curl -X POST https://<your-space-name>.hf.space/upload \\ | |
| -F "image=@/path/to/image.jpg" \\ | |
| -F "guid=test-guid" \\ | |
| -F "ip=127.0.0.1"''') | |
| port = int(os.environ.get("PORT", 7860)) | |
| app.run(host="0.0.0.0", port=port) |