Spaces:
Sleeping
Sleeping
fix: handle serverless 404 by falling back to DialoGPT/distilgpt2; default serverless model to DialoGPT
7396832
| #!/usr/bin/env python3 | |
| """ | |
| Textilindo AI Assistant - Hugging Face Spaces FastAPI Application | |
| Simplified version for HF Spaces deployment | |
| """ | |
| import os | |
| import json | |
| import logging | |
| import time | |
| import subprocess | |
| import threading | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Optional, Dict, Any, List | |
| from fastapi import FastAPI, HTTPException, Request, BackgroundTasks | |
| from fastapi.responses import JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| import uvicorn | |
| import requests | |
| import re | |
| from difflib import SequenceMatcher | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="Textilindo AI Assistant", | |
| description="AI Assistant for Textilindo textile company", | |
| version="1.0.0" | |
| ) | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Request/Response models | |
| class ChatRequest(BaseModel): | |
| message: str | |
| conversation_id: Optional[str] = None | |
| class ChatResponse(BaseModel): | |
| response: str | |
| conversation_id: str | |
| status: str = "success" | |
| class HealthResponse(BaseModel): | |
| status: str | |
| message: str | |
| version: str = "1.0.0" | |
| class TrainingRequest(BaseModel): | |
| model_name: str = "distilgpt2" | |
| dataset_path: str = "data/lora_dataset_20250910_145055.jsonl" | |
| config_path: str = "configs/training_config.yaml" | |
| max_samples: int = 20 | |
| epochs: int = 1 | |
| batch_size: int = 1 | |
| learning_rate: float = 5e-5 | |
| class TrainingResponse(BaseModel): | |
| success: bool | |
| message: str | |
| training_id: str | |
| status: str | |
| # Training status storage | |
| training_status = { | |
| "is_training": False, | |
| "progress": 0, | |
| "status": "idle", | |
| "current_step": 0, | |
| "total_steps": 0, | |
| "loss": 0.0, | |
| "start_time": None, | |
| "end_time": None, | |
| "error": None | |
| } | |
| class TrainingDataLoader: | |
| """Load and manage training data for intelligent responses""" | |
| def __init__(self, data_path: str = "data/textilindo_training_data.jsonl"): | |
| self.data_path = data_path | |
| self.training_data = [] | |
| self.load_data() | |
| def load_data(self): | |
| """Load training data from JSONL file""" | |
| try: | |
| if os.path.exists(self.data_path): | |
| with open(self.data_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| try: | |
| data = json.loads(line) | |
| self.training_data.append(data) | |
| except json.JSONDecodeError: | |
| continue | |
| logger.info(f"Loaded {len(self.training_data)} training samples") | |
| else: | |
| logger.warning(f"Training data file not found: {self.data_path}") | |
| except Exception as e: | |
| logger.error(f"Error loading training data: {e}") | |
| def find_best_match(self, user_input: str, threshold: float = 0.85) -> Optional[Dict]: | |
| """Find the best matching training sample for user input""" | |
| if not self.training_data: | |
| return None | |
| user_input_lower = user_input.lower().strip() | |
| best_match = None | |
| best_score = 0 | |
| # Remove common words that shouldn't affect matching | |
| common_words = {'and', 'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'can', 'dan', 'yang', 'adalah', 'itu', 'ini', 'dengan', 'untuk', 'dari', 'ke', 'di', 'pada', 'oleh', 'dalam', 'dengan'} | |
| # Clean user input by removing common words | |
| user_words = [word for word in user_input_lower.split() if word not in common_words] | |
| user_input_clean = ' '.join(user_words) | |
| for data in self.training_data: | |
| instruction = data.get('instruction', '').lower().strip() | |
| if not instruction: | |
| continue | |
| # Clean instruction by removing common words | |
| instruction_words = [word for word in instruction.split() if word not in common_words] | |
| instruction_clean = ' '.join(instruction_words) | |
| # Calculate similarity score on cleaned text | |
| score = SequenceMatcher(None, user_input_clean, instruction_clean).ratio() | |
| # Also check for keyword matches on cleaned words | |
| user_word_set = set(user_words) | |
| instruction_word_set = set(instruction_words) | |
| keyword_score = len(user_word_set.intersection(instruction_word_set)) / max(len(user_word_set), 1) if user_word_set else 0 | |
| # Combine scores | |
| combined_score = (score * 0.8) + (keyword_score * 0.2) | |
| if combined_score > best_score and combined_score >= threshold: | |
| best_score = combined_score | |
| best_match = data | |
| if best_match: | |
| # Add similarity score to the match | |
| best_match['similarity'] = best_score | |
| logger.info(f"Found match with score {best_score:.2f}: {best_match.get('instruction', '')[:50]}...") | |
| return best_match | |
| class TrainingManager: | |
| """Manage AI model training using the training scripts""" | |
| def __init__(self): | |
| self.training_status = { | |
| "is_training": False, | |
| "progress": 0, | |
| "status": "idle", | |
| "start_time": None, | |
| "end_time": None, | |
| "error": None, | |
| "logs": [] | |
| } | |
| self.training_thread = None | |
| def start_training(self, model_name: str = "meta-llama/Llama-3.1-8B-Instruct", epochs: int = 3, batch_size: int = 4): | |
| """Start training in background thread""" | |
| if self.training_status["is_training"]: | |
| return {"error": "Training already in progress"} | |
| self.training_status = { | |
| "is_training": True, | |
| "progress": 0, | |
| "status": "starting", | |
| "start_time": datetime.now().isoformat(), | |
| "end_time": None, | |
| "error": None, | |
| "logs": [] | |
| } | |
| # Start training in background thread | |
| self.training_thread = threading.Thread( | |
| target=self._run_training, | |
| args=(model_name, epochs, batch_size), | |
| daemon=True | |
| ) | |
| self.training_thread.start() | |
| return {"message": "Training started", "status": "starting"} | |
| def _run_training(self, model_name: str, epochs: int, batch_size: int): | |
| """Run the actual training process""" | |
| try: | |
| self.training_status["status"] = "preparing" | |
| self.training_status["logs"].append("Preparing training environment...") | |
| # Check if training data exists | |
| data_path = "data/textilindo_training_data.jsonl" | |
| if not os.path.exists(data_path): | |
| raise Exception("Training data not found") | |
| self.training_status["status"] = "training" | |
| self.training_status["logs"].append("Starting model training...") | |
| # Create a simple training script for HF Spaces | |
| training_script = f""" | |
| import os | |
| import sys | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from datetime import datetime | |
| # Add current directory to path | |
| sys.path.append('.') | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def simple_training(): | |
| \"\"\"Simple training simulation for HF Spaces with Llama support\"\"\" | |
| logger.info("Starting training process...") | |
| logger.info(f"Model: {model_name}") | |
| logger.info(f"Epochs: {epochs}") | |
| logger.info(f"Batch Size: {batch_size}") | |
| # Load training data | |
| data_path = "data/textilindo_training_data.jsonl" | |
| with open(data_path, 'r', encoding='utf-8') as f: | |
| data = [json.loads(line) for line in f if line.strip()] | |
| logger.info(f"Loaded {{len(data)}} training samples") | |
| # Model-specific training simulation | |
| if "llama" in model_name.lower(): | |
| logger.info("Using Llama model - High quality training simulation") | |
| training_steps = len(data) * {epochs} * 2 # More steps for Llama | |
| else: | |
| logger.info("Using standard model - Basic training simulation") | |
| training_steps = len(data) * {epochs} | |
| # Simulate training progress | |
| for epoch in range({epochs}): | |
| logger.info(f"Epoch {{epoch + 1}}/{epochs}") | |
| for i, sample in enumerate(data): | |
| # Simulate training step | |
| progress = ((epoch * len(data) + i) / ({epochs} * len(data))) * 100 | |
| logger.info(f"Training progress: {{progress:.1f}}% - Processing: {{sample.get('instruction', 'Unknown')[:50]}}...") | |
| # Update training status | |
| with open("training_status.json", "w") as f: | |
| json.dump({{ | |
| "is_training": True, | |
| "progress": progress, | |
| "status": "training", | |
| "model": "{model_name}", | |
| "epoch": epoch + 1, | |
| "step": i + 1, | |
| "total_steps": len(data), | |
| "current_sample": sample.get('instruction', 'Unknown')[:50] | |
| }}, f) | |
| logger.info("Training completed successfully!") | |
| logger.info(f"Model {model_name} has been fine-tuned with Textilindo data") | |
| # Save final status | |
| with open("training_status.json", "w") as f: | |
| json.dump({{ | |
| "is_training": False, | |
| "progress": 100, | |
| "status": "completed", | |
| "model": "{model_name}", | |
| "end_time": datetime.now().isoformat(), | |
| "message": f"Model {model_name} training completed successfully!" | |
| }}, f) | |
| if __name__ == "__main__": | |
| simple_training() | |
| """ | |
| # Write training script | |
| with open("run_training.py", "w") as f: | |
| f.write(training_script) | |
| # Run training | |
| result = subprocess.run( | |
| ["python", "run_training.py"], | |
| capture_output=True, | |
| text=True, | |
| cwd="." | |
| ) | |
| if result.returncode == 0: | |
| self.training_status["status"] = "completed" | |
| self.training_status["progress"] = 100 | |
| self.training_status["logs"].append("Training completed successfully!") | |
| else: | |
| raise Exception(f"Training failed: {result.stderr}") | |
| except Exception as e: | |
| logger.error(f"Training error: {e}") | |
| self.training_status["status"] = "error" | |
| self.training_status["error"] = str(e) | |
| self.training_status["logs"].append(f"Error: {e}") | |
| finally: | |
| self.training_status["is_training"] = False | |
| self.training_status["end_time"] = datetime.now().isoformat() | |
| def get_training_status(self): | |
| """Get current training status""" | |
| # Try to read from file if available | |
| status_file = "training_status.json" | |
| if os.path.exists(status_file): | |
| try: | |
| with open(status_file, "r") as f: | |
| file_status = json.load(f) | |
| self.training_status.update(file_status) | |
| except: | |
| pass | |
| return self.training_status | |
| def stop_training(self): | |
| """Stop training if running""" | |
| if self.training_status["is_training"]: | |
| self.training_status["status"] = "stopped" | |
| self.training_status["is_training"] = False | |
| return {"message": "Training stopped"} | |
| return {"message": "No training in progress"} | |
| class TextilindoAI: | |
| """Textilindo AI Assistant using HuggingFace Inference API with Auto-Training""" | |
| def __init__(self): | |
| # Prefer standard env vars; keep backward-compatible fallback | |
| self.api_key = ( | |
| os.getenv('HUGGINGFACE_API_KEY') | |
| or os.getenv('HF_TOKEN') | |
| or os.getenv('HUGGINGFAC_API_KEY_2') | |
| ) | |
| # Optional dedicated Inference Endpoint (for gated models like Llama) | |
| # Example: https://xxxxxx.aws.endpoints.huggingface.cloud | |
| self.endpoint_url = (os.getenv('HF_ENDPOINT_URL') or '').strip() | |
| # Normalize model: block unsupported/gated models; prefer widely available ones | |
| env_model = (os.getenv('DEFAULT_MODEL') or '').strip() | |
| # Use a widely available serverless default to avoid 404s | |
| default_supported = 'microsoft/DialoGPT-medium' | |
| if env_model and ( | |
| 'gpt2' in env_model.lower() | |
| or 'meta-llama/llama-3.2-1b-instruct' in env_model.lower() | |
| or 'meta-llama/llama-3.2-3b-instruct' in env_model.lower() | |
| ): | |
| logger.warning("DEFAULT_MODEL not supported on HF Inference or gated; overriding to TinyLlama/TinyLlama-1.1B-Chat-v1.0") | |
| self.model = default_supported | |
| else: | |
| # Safer default | |
| self.model = env_model or default_supported | |
| # Fallback model used on serverless 404s | |
| self._fallback_model = 'distilgpt2' | |
| self.system_prompt = self.load_system_prompt() | |
| self.data_loader = TrainingDataLoader() | |
| # Auto-training configuration | |
| self.auto_training_enabled = True | |
| self.training_interval = 300 # Train every 5 minutes | |
| self.last_training_time = 0 | |
| self.trained_responses = {} # Cache for trained responses | |
| if not self.api_key: | |
| logger.warning("HUGGINGFAC_API_KEY_2 not found. Using mock responses.") | |
| self.client = None | |
| else: | |
| try: | |
| # If endpoint URL provided, we'll use direct HTTP calls (OpenAI-style) | |
| if self.endpoint_url: | |
| logger.info("Using HF Inference Endpoint (OpenAI-compatible mode)") | |
| self.client = 'endpoint' # sentinel | |
| else: | |
| from huggingface_hub import InferenceClient | |
| self.client = InferenceClient( | |
| token=self.api_key, | |
| model=self.model | |
| ) | |
| logger.info(f"Initialized with model: {self.model}") | |
| logger.info("Auto-training enabled - will train continuously") | |
| # Start auto-training in background | |
| self.start_auto_training() | |
| except Exception as e: | |
| logger.error(f"Failed to initialize InferenceClient: {e}") | |
| self.client = None | |
| def load_system_prompt(self) -> str: | |
| """Load system prompt from config file""" | |
| try: | |
| prompt_path = Path("configs/system_prompt.md") | |
| if prompt_path.exists(): | |
| with open(prompt_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| # Extract system prompt from markdown | |
| if 'SYSTEM_PROMPT = """' in content: | |
| start = content.find('SYSTEM_PROMPT = """') + len('SYSTEM_PROMPT = """') | |
| end = content.find('"""', start) | |
| return content[start:end].strip() | |
| else: | |
| # Fallback: use entire content | |
| return content.strip() | |
| else: | |
| return self.get_default_system_prompt() | |
| except Exception as e: | |
| logger.error(f"Error loading system prompt: {e}") | |
| return self.get_default_system_prompt() | |
| def get_default_system_prompt(self) -> str: | |
| """Default system prompt if file not found""" | |
| return """You are a friendly and helpful AI assistant for Textilindo, a textile company. | |
| Always respond in Indonesian (Bahasa Indonesia). | |
| Keep responses short and direct. | |
| Be friendly and helpful. | |
| Use exact information from the knowledge base. | |
| The company uses yards for sales. | |
| Minimum purchase is 1 roll (67-70 yards).""" | |
| def start_auto_training(self): | |
| """Start continuous auto-training in background""" | |
| if not self.auto_training_enabled: | |
| return | |
| def auto_train_loop(): | |
| while self.auto_training_enabled: | |
| try: | |
| current_time = time.time() | |
| if current_time - self.last_training_time >= self.training_interval: | |
| logger.info("Starting auto-training cycle...") | |
| self.perform_auto_training() | |
| self.last_training_time = current_time | |
| time.sleep(60) # Check every minute | |
| except Exception as e: | |
| logger.error(f"Auto-training error: {e}") | |
| time.sleep(300) # Wait 5 minutes on error | |
| # Start auto-training in background thread | |
| training_thread = threading.Thread(target=auto_train_loop, daemon=True) | |
| training_thread.start() | |
| logger.info("Auto-training thread started") | |
| def perform_auto_training(self): | |
| """Perform actual training with current data""" | |
| try: | |
| # Load training data | |
| training_data = self.data_loader.training_data | |
| if not training_data: | |
| logger.warning("No training data available for auto-training") | |
| return | |
| logger.info(f"Auto-training with {len(training_data)} samples") | |
| # Simulate training process (in real implementation, this would be actual model training) | |
| for i, sample in enumerate(training_data): | |
| instruction = sample.get('instruction', '') | |
| output = sample.get('output', '') | |
| if instruction and output: | |
| # Store trained response | |
| self.trained_responses[instruction.lower()] = output | |
| # Simulate training progress | |
| progress = (i + 1) / len(training_data) * 100 | |
| logger.info(f"Auto-training progress: {progress:.1f}% - {instruction[:50]}...") | |
| logger.info(f"Auto-training completed! Cached {len(self.trained_responses)} responses") | |
| except Exception as e: | |
| logger.error(f"Auto-training failed: {e}") | |
| def find_trained_response(self, user_input: str) -> Optional[str]: | |
| """Find response from trained model cache""" | |
| user_input_lower = user_input.lower().strip() | |
| # Direct match | |
| if user_input_lower in self.trained_responses: | |
| return self.trained_responses[user_input_lower] | |
| # Fuzzy match | |
| best_match = None | |
| best_score = 0 | |
| for instruction, response in self.trained_responses.items(): | |
| score = SequenceMatcher(None, user_input_lower, instruction).ratio() | |
| if score > best_score and score > 0.6: # 60% similarity threshold | |
| best_score = score | |
| best_match = response | |
| return best_match | |
| def generate_response(self, user_message: str) -> str: | |
| """Generate response using HuggingFace Inference API with training data fallback""" | |
| # Check for similarity match in training data FIRST (return on strong match) | |
| training_match = self.data_loader.find_best_match(user_message) | |
| if training_match: | |
| similarity_score = training_match.get('similarity', 0) | |
| logger.info(f"Best training match: '{training_match.get('instruction', '')}' with similarity {similarity_score:.2f}") | |
| if similarity_score >= 0.85: | |
| logger.info(f"Using training data match (similarity: {similarity_score:.2f})") | |
| return training_match.get('output', '') | |
| # Avoid dumping full company overview for specific questions (e.g., jam/lokasi/ongkir) | |
| # Only provide overview for clearly generic queries about Textilindo | |
| lower_msg = user_message.lower() | |
| generic_overview_triggers = [ | |
| "tentang textilindo", | |
| "apa itu textilindo", | |
| "informasi textilindo", | |
| "profil textilindo", | |
| ] | |
| specific_keywords = [ | |
| "jam", "buka", "operasional", "lokasi", "alamat", "ongkir", | |
| "katalog", "produk", "harga", "pembelian", "pembayaran", "sample", "sampel" | |
| ] | |
| if any(t in lower_msg for t in generic_overview_triggers) and not any(k in lower_msg for k in specific_keywords): | |
| overview = self.get_company_overview() | |
| if overview: | |
| logger.info("Returning company overview synthesized from training data (generic query)") | |
| return overview | |
| # If no high similarity match, use AI model | |
| logger.info(f"No high similarity match, using AI model for: {user_message[:50]}...") | |
| if not self.client: | |
| logger.warning("No HuggingFace client available, using fallback response") | |
| return self.get_fallback_response(user_message) | |
| try: | |
| # Endpoint (OpenAI-compatible) path | |
| if self.client == 'endpoint' and self.endpoint_url: | |
| headers = { | |
| "Authorization": f"Bearer {self.api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": self.model, | |
| "messages": [ | |
| {"role": "system", "content": self.system_prompt}, | |
| {"role": "user", "content": user_message} | |
| ], | |
| "temperature": 0.5, | |
| "top_p": 0.9, | |
| "max_tokens": 180 | |
| } | |
| url = self.endpoint_url.rstrip('/') + "/v1/chat/completions" | |
| logger.info(f"Calling endpoint: {url} with model: {self.model}") | |
| r = requests.post(url, headers=headers, json=payload, timeout=60) | |
| r.raise_for_status() | |
| data = r.json() | |
| # OpenAI-style response | |
| assistant_response = data.get("choices", [{}])[0].get("message", {}).get("content", "").strip() | |
| if not assistant_response: | |
| logger.warning("Empty endpoint response; using fallback") | |
| return self.get_fallback_response(user_message) | |
| return assistant_response | |
| # Serverless InferenceClient path | |
| # Use appropriate conversation format | |
| if "llama" in self.model.lower() or "tinyllama" in self.model.lower(): | |
| prompt = ( | |
| f"<|system|>\n{self.system_prompt}\n<|user|>\n{user_message}\n<|assistant|>\n" | |
| ) | |
| elif "dialogpt" in self.model.lower() or "gpt2" in self.model.lower(): | |
| prompt = f"User: {user_message}\nAssistant:" | |
| else: | |
| # Fallback format for other models | |
| prompt = f"User: {user_message}\nAssistant:" | |
| logger.info(f"Using model: {self.model}") | |
| logger.info(f"API Key present: {bool(self.api_key)}") | |
| logger.info(f"Generating response for prompt: {prompt[:100]}...") | |
| if "llama" in self.model.lower() or "tinyllama" in self.model.lower(): | |
| response = self.client.text_generation( | |
| prompt, | |
| max_new_tokens=120, | |
| temperature=0.5, | |
| top_p=0.9, | |
| top_k=50, | |
| repetition_penalty=1.15, | |
| stop_sequences=["<|end|>", "<|user|>"] | |
| ) | |
| elif "dialogpt" in self.model.lower(): | |
| response = self.client.text_generation( | |
| prompt, | |
| max_new_tokens=150, | |
| temperature=0.8, | |
| top_p=0.9, | |
| top_k=50, | |
| repetition_penalty=1.1, | |
| do_sample=True, | |
| stop_sequences=["User:", "Assistant:", "\n\n"] | |
| ) | |
| else: | |
| response = self.client.text_generation( | |
| prompt, | |
| max_new_tokens=150, | |
| temperature=0.8, | |
| top_p=0.9, | |
| top_k=50, | |
| repetition_penalty=1.2, | |
| do_sample=True, | |
| stop_sequences=["User:", "Assistant:", "\n\n"] | |
| ) | |
| logger.info(f"Raw AI response: {response[:200]}...") | |
| # Clean up the response based on model type | |
| if "llama" in self.model.lower() or "tinyllama" in self.model.lower(): | |
| if "<|assistant|>" in response: | |
| assistant_response = response.split("<|assistant|>")[-1].strip() | |
| else: | |
| assistant_response = response.strip() | |
| assistant_response = assistant_response.replace("<|end|>", "").strip() | |
| elif "dialogpt" in self.model.lower() or "gpt2" in self.model.lower(): | |
| # Clean up DialoGPT/GPT-2 response | |
| if "Assistant:" in response: | |
| assistant_response = response.split("Assistant:")[-1].strip() | |
| else: | |
| assistant_response = response.strip() | |
| # Remove any remaining conversation markers | |
| assistant_response = assistant_response.replace("User:", "").replace("Assistant:", "").strip() | |
| else: | |
| # Clean up other model responses | |
| if "Assistant:" in response: | |
| assistant_response = response.split("Assistant:")[-1].strip() | |
| else: | |
| assistant_response = response.strip() | |
| # Remove any remaining conversation markers | |
| assistant_response = assistant_response.replace("User:", "").replace("Assistant:", "").strip() | |
| # Remove any incomplete sentences or cut-off text | |
| if assistant_response.endswith(('.', '!', '?')): | |
| pass # Complete sentence | |
| elif '.' in assistant_response: | |
| # Take only the first complete sentence | |
| assistant_response = assistant_response.split('.')[0] + '.' | |
| else: | |
| # If no complete sentence, take first 100 characters | |
| assistant_response = assistant_response[:100] | |
| logger.info(f"Cleaned AI response: {assistant_response[:100]}...") | |
| # If response is too short or generic, use fallback | |
| if len(assistant_response) < 10 or "I don't know" in assistant_response.lower(): | |
| logger.warning("AI response too short, using fallback response") | |
| return self.get_fallback_response(user_message) | |
| return assistant_response | |
| except Exception as e: | |
| logger.error(f"Error generating response: {e}") | |
| logger.error(f"Error type: {type(e).__name__}") | |
| logger.error(f"Error details: {str(e)}") | |
| # Automatic one-time fallback to supported model on 404/not found | |
| error_text = str(e).lower() | |
| if ("404" in error_text or "not found" in error_text) and self.model != self._fallback_model: | |
| try: | |
| logger.warning(f"Model {self.model} unavailable. Falling back to {self._fallback_model} and retrying once.") | |
| from huggingface_hub import InferenceClient | |
| self.model = self._fallback_model | |
| self.client = InferenceClient(token=self.api_key, model=self.model) | |
| # Rebuild prompt for new model family | |
| if "tinyllama" in self.model.lower() or "llama" in self.model.lower(): | |
| retry_prompt = ( | |
| f"<|system|>\n{self.system_prompt}\n<|user|>\n{user_message}\n<|assistant|>\n" | |
| ) | |
| response = self.client.text_generation( | |
| retry_prompt, | |
| max_new_tokens=120, | |
| temperature=0.5, | |
| top_p=0.9, | |
| top_k=50, | |
| repetition_penalty=1.15, | |
| stop_sequences=["<|end|>", "<|user|>"] | |
| ) | |
| if "<|assistant|>" in response: | |
| assistant_response = response.split("<|assistant|>")[-1].strip() | |
| else: | |
| assistant_response = response.strip() | |
| return assistant_response.replace("<|end|>", "").strip() | |
| except Exception as e2: | |
| logger.error(f"Fallback retry failed: {e2}") | |
| # Try training data as fallback | |
| training_match = self.data_loader.find_best_match(user_message) | |
| if training_match: | |
| logger.info("Using training data as fallback after API error") | |
| return training_match.get('output', '') | |
| return self.get_fallback_response(user_message) | |
| def get_company_overview(self) -> str: | |
| """Build a short Textilindo overview from available training data.""" | |
| try: | |
| location = None | |
| hours = None | |
| shipping = None | |
| catalog = None | |
| min_order = None | |
| products = None | |
| for item in self.data_loader.training_data: | |
| instr = (item.get('instruction') or '').lower() | |
| out = (item.get('output') or '').strip() | |
| if not out: | |
| continue | |
| if location is None and any(k in instr for k in ["lokasi", "alamat", "dimana textilindo", "lokasi mana"]): | |
| location = out | |
| if hours is None and any(k in instr for k in ["jam", "operasional", "buka"]): | |
| hours = out | |
| if shipping is None and any(k in instr for k in ["ongkir", "pengiriman", "kirim"]): | |
| shipping = out | |
| if catalog is None and any(k in instr for k in ["katalog", "pdf", "buku"]): | |
| catalog = out | |
| if min_order is None and any(k in instr for k in ["minimal order", "ketentuan pembelian", "per roll", "ecer"]): | |
| min_order = out | |
| if products is None and any(k in instr for k in ["produk", "kain", "bahan"]): | |
| products = out | |
| parts = [] | |
| if location: | |
| parts.append(f"Alamat: {location}") | |
| if hours: | |
| parts.append(f"Jam operasional: {hours}") | |
| if shipping: | |
| parts.append(f"Pengiriman: {shipping}") | |
| if min_order: | |
| parts.append(f"Pembelian: {min_order}") | |
| if catalog: | |
| parts.append(f"Katalog: {catalog}") | |
| if products: | |
| parts.append(f"Produk: {products}") | |
| if parts: | |
| return "Tentang Textilindo — " + " | ".join(parts) | |
| return "Textilindo adalah perusahaan tekstil. Tanyakan lokasi, jam operasional, katalog, produk, atau pengiriman untuk info detail." | |
| except Exception as e: | |
| logger.error(f"Error building company overview: {e}") | |
| return "Textilindo adalah perusahaan tekstil. Tanyakan detail spesifik yang Anda butuhkan." | |
| def get_fallback_response(self, user_message: str) -> str: | |
| """Fallback response when no training data match and no API available""" | |
| # Try to give a more contextual response based on the question | |
| if "hello" in user_message.lower() or "hi" in user_message.lower(): | |
| return "Halo! Saya adalah asisten AI Textilindo. Bagaimana saya bisa membantu Anda hari ini? 😊" | |
| elif "weather" in user_message.lower() or "cuaca" in user_message.lower(): | |
| return "Maaf, saya tidak bisa memberikan informasi cuaca terkini. Tapi saya bisa membantu Anda dengan pertanyaan tentang produk dan layanan Textilindo! Apakah ada yang ingin Anda ketahui tentang kain atau layanan kami?" | |
| elif "how are you" in user_message.lower() or "apa kabar" in user_message.lower(): | |
| return "Saya baik-baik saja, terima kasih! Saya siap membantu Anda dengan pertanyaan tentang Textilindo. Ada yang bisa saya bantu?" | |
| elif "time" in user_message.lower() or "waktu" in user_message.lower(): | |
| return f"Waktu saat ini adalah {datetime.now().strftime('%H:%M WIB, %d %B %Y')}. Apakah ada yang ingin Anda ketahui tentang produk Textilindo?" | |
| elif "date" in user_message.lower() or "tanggal" in user_message.lower(): | |
| return f"Hari ini adalah {datetime.now().strftime('%d %B %Y')}. Apakah ada yang ingin Anda ketahui tentang produk Textilindo?" | |
| else: | |
| return f"Halo! Saya adalah asisten AI Textilindo. Saya bisa membantu Anda dengan pertanyaan tentang produk dan layanan kami, atau sekadar mengobrol! Bagaimana saya bisa membantu Anda hari ini? 😊" | |
| def get_mock_response(self, user_message: str) -> str: | |
| """Enhanced mock responses with better context awareness""" | |
| mock_responses = { | |
| "dimana lokasi textilindo": "Textilindo berkantor pusat di Jl. Raya Prancis No.39, Kosambi Tim., Kec. Kosambi, Kabupaten Tangerang, Banten 15213", | |
| "jam berapa textilindo beroperasional": "Jam operasional Senin-Jumat 08:00-17:00, Sabtu 08:00-12:00.", | |
| "berapa ketentuan pembelian": "Minimal order 1 roll per jenis kain", | |
| "bagaimana dengan pembayarannya": "Pembayaran dapat dilakukan via transfer bank atau cash on delivery", | |
| "apa ada gratis ongkir": "Gratis ongkir untuk order minimal 5 roll.", | |
| "apa bisa dikirimkan sample": "hallo kak untuk sampel kita bisa kirimkan gratis ya kak 😊", | |
| "katalog": "Katalog produk Textilindo tersedia dalam bentuk Buku, PDF, atau Katalog Website.", | |
| "harga": "Harga kain berbeda-beda tergantung jenis kainnya. Untuk informasi lengkap bisa hubungi admin kami.", | |
| "produk": "Kami menjual berbagai jenis kain woven dan knitting. Ada rayon twill, baby doll, voal, dan masih banyak lagi.", | |
| "what is 2+2": "2 + 2 = 4", | |
| "what is the capital of france": "The capital of France is Paris.", | |
| "explain machine learning": "Machine learning is a subset of artificial intelligence that enables computers to learn and make decisions from data without being explicitly programmed.", | |
| "write a poem": "Here's a short poem:\n\nIn lines of code we find our way,\nThrough logic's maze we play,\nEach function calls, each loop runs true,\nCreating something bright and new.", | |
| "hello": "Hello! I'm the Textilindo AI assistant. How can I help you today?", | |
| "hi": "Hi there! I'm here to help with any questions about Textilindo. What would you like to know?", | |
| "how are you": "I'm doing well, thank you for asking! I'm ready to help you with any questions about Textilindo's products and services.", | |
| "thank you": "You're welcome! I'm happy to help. Is there anything else you'd like to know about Textilindo?", | |
| "goodbye": "Goodbye! Thank you for chatting with me. Have a great day!", | |
| "bye": "Bye! Feel free to come back anytime if you have more questions about Textilindo." | |
| } | |
| # More specific keyword matching | |
| user_lower = user_message.lower() | |
| # Check for exact phrase matches first | |
| for key, response in mock_responses.items(): | |
| if key in user_lower: | |
| return response | |
| # Check for specific keywords with better matching | |
| if any(word in user_lower for word in ["lokasi", "alamat", "dimana"]): | |
| return "Textilindo berkantor pusat di Jl. Raya Prancis No.39, Kosambi Tim., Kec. Kosambi, Kabupaten Tangerang, Banten 15213" | |
| elif any(word in user_lower for word in ["jam", "buka", "operasional"]): | |
| return "Jam operasional Senin-Jumat 08:00-17:00, Sabtu 08:00-12:00." | |
| elif any(word in user_lower for word in ["pembelian", "beli", "order"]): | |
| return "Minimal order 1 roll per jenis kain" | |
| elif any(word in user_lower for word in ["pembayaran", "bayar", "payment"]): | |
| return "Pembayaran dapat dilakukan via transfer bank atau cash on delivery" | |
| elif any(word in user_lower for word in ["ongkir", "ongkos", "kirim"]): | |
| return "Gratis ongkir untuk order minimal 5 roll." | |
| elif any(word in user_lower for word in ["sample", "sampel", "contoh"]): | |
| return "hallo kak untuk sampel kita bisa kirimkan gratis ya kak 😊" | |
| elif any(word in user_lower for word in ["katalog", "katalog"]): | |
| return "Katalog produk Textilindo tersedia dalam bentuk Buku, PDF, atau Katalog Website." | |
| elif any(word in user_lower for word in ["harga", "price", "cost"]): | |
| return "Harga kain berbeda-beda tergantung jenis kainnya. Untuk informasi lengkap bisa hubungi admin kami." | |
| elif any(word in user_lower for word in ["produk", "kain", "bahan"]): | |
| return "Kami menjual berbagai jenis kain woven dan knitting. Ada rayon twill, baby doll, voal, dan masih banyak lagi." | |
| elif any(word in user_lower for word in ["math", "mathematics", "calculate", "addition", "subtraction", "multiplication", "division"]): | |
| return "I can help with basic math questions! Please ask me a specific math problem and I'll do my best to help." | |
| elif any(word in user_lower for word in ["capital", "country", "geography", "world"]): | |
| return "I can help with geography questions! Please ask me about a specific country or capital city." | |
| elif any(word in user_lower for word in ["technology", "ai", "artificial intelligence", "machine learning", "programming", "coding"]): | |
| return "I'd be happy to discuss technology topics! Please ask me a specific question about AI, programming, or technology." | |
| elif any(word in user_lower for word in ["poem", "poetry", "creative", "write"]): | |
| return "I enjoy creative writing! I can help with poems, stories, or other creative content. What would you like me to write about?" | |
| elif any(word in user_lower for word in ["hello", "hi", "hey", "greetings"]): | |
| return "Hello! I'm the Textilindo AI assistant. I'm here to help with questions about our products and services, or just have a friendly conversation!" | |
| elif any(word in user_lower for word in ["how are you", "how do you do", "how's it going"]): | |
| return "I'm doing great, thank you for asking! I'm ready to help you with any questions about Textilindo or just chat about anything you'd like." | |
| elif any(word in user_lower for word in ["thank you", "thanks", "appreciate"]): | |
| return "You're very welcome! I'm happy to help. Is there anything else you'd like to know about Textilindo or anything else I can assist you with?" | |
| elif any(word in user_lower for word in ["goodbye", "bye", "see you", "farewell"]): | |
| return "Goodbye! It was great chatting with you. Feel free to come back anytime if you have more questions about Textilindo or just want to chat!" | |
| return "Halo! Saya adalah asisten AI Textilindo. Saya bisa membantu Anda dengan pertanyaan tentang produk dan layanan kami, atau sekadar mengobrol! Bagaimana saya bisa membantu Anda hari ini? 😊" | |
| # Initialize AI assistant | |
| ai_assistant = TextilindoAI() | |
| training_manager = TrainingManager() | |
| # Routes | |
| async def root(): | |
| """API root endpoint""" | |
| return { | |
| "message": "Textilindo AI Assistant API", | |
| "version": "1.0.0", | |
| "description": "AI Assistant for Textilindo textile company", | |
| "endpoints": { | |
| "chat": "/chat", | |
| "status": "/api/status", | |
| "health": "/health", | |
| "info": "/info", | |
| "auto_training_status": "/api/auto-training/status", | |
| "auto_training_toggle": "/api/auto-training/toggle", | |
| "train_start": "/api/train/start", | |
| "train_status": "/api/train/status", | |
| "train_stop": "/api/train/stop", | |
| "train_data": "/api/train/data", | |
| "train_models": "/api/train/models" | |
| }, | |
| "usage": { | |
| "chat": "POST /chat with {\"message\": \"your question\"}", | |
| "status": "GET /api/status for system status", | |
| "auto_training": "GET /api/auto-training/status for training status" | |
| } | |
| } | |
| async def get_status(): | |
| """Get system status""" | |
| return { | |
| "status": "running", | |
| "model": ai_assistant.model, | |
| "auto_training_enabled": ai_assistant.auto_training_enabled, | |
| "trained_responses_count": len(ai_assistant.trained_responses), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return {"status": "healthy", "timestamp": datetime.now().isoformat()} | |
| async def get_info(): | |
| """Get API information""" | |
| return { | |
| "name": "Textilindo AI Assistant API", | |
| "version": "1.0.0", | |
| "description": "AI Assistant for Textilindo textile company", | |
| "model": ai_assistant.model, | |
| "auto_training": ai_assistant.auto_training_enabled | |
| } | |
| async def chat(request: ChatRequest): | |
| """Chat endpoint""" | |
| try: | |
| response = ai_assistant.generate_response(request.message) | |
| return ChatResponse( | |
| response=response, | |
| conversation_id=request.conversation_id or "default", | |
| status="success" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in chat endpoint: {e}") | |
| raise HTTPException(status_code=500, detail="Internal server error") | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return HealthResponse( | |
| status="healthy", | |
| message="Textilindo AI Assistant is running", | |
| version="1.0.0" | |
| ) | |
| async def get_info(): | |
| """Get application information""" | |
| return { | |
| "name": "Textilindo AI Assistant", | |
| "version": "1.0.0", | |
| "model": ai_assistant.model, | |
| "has_api_key": bool(ai_assistant.api_key), | |
| "client_initialized": bool(ai_assistant.client), | |
| "endpoints": { | |
| "training": { | |
| "start": "POST /api/train/start", | |
| "status": "GET /api/train/status", | |
| "data": "GET /api/train/data", | |
| "gpu": "GET /api/train/gpu", | |
| "test": "POST /api/train/test" | |
| }, | |
| "chat": { | |
| "chat": "POST /chat", | |
| "health": "GET /health" | |
| } | |
| } | |
| } | |
| # Training API endpoints (simplified for HF Spaces) | |
| async def start_training(request: TrainingRequest, background_tasks: BackgroundTasks): | |
| """Start training process (simplified for HF Spaces)""" | |
| global training_status | |
| if training_status["is_training"]: | |
| raise HTTPException(status_code=400, detail="Training already in progress") | |
| # For HF Spaces, we'll simulate training | |
| training_id = f"train_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| # Update status to show training started | |
| training_status.update({ | |
| "is_training": True, | |
| "status": "started", | |
| "progress": 0, | |
| "start_time": datetime.now().isoformat(), | |
| "error": None | |
| }) | |
| # Simulate training completion after a delay | |
| background_tasks.add_task(simulate_training_completion) | |
| return TrainingResponse( | |
| success=True, | |
| message="Training started successfully (simulated for HF Spaces)", | |
| training_id=training_id, | |
| status="started" | |
| ) | |
| async def simulate_training_completion(): | |
| """Simulate training completion for HF Spaces""" | |
| import asyncio | |
| await asyncio.sleep(10) # Simulate 10 seconds of training | |
| global training_status | |
| training_status.update({ | |
| "is_training": False, | |
| "status": "completed", | |
| "progress": 100, | |
| "end_time": datetime.now().isoformat() | |
| }) | |
| async def get_training_status(): | |
| """Get current training status""" | |
| return training_status | |
| async def get_training_data_info(): | |
| """Get information about available training data""" | |
| data_dir = Path("data") | |
| if not data_dir.exists(): | |
| return {"files": [], "count": 0} | |
| jsonl_files = list(data_dir.glob("*.jsonl")) | |
| files_info = [] | |
| for file in jsonl_files: | |
| try: | |
| with open(file, 'r', encoding='utf-8') as f: | |
| lines = f.readlines() | |
| files_info.append({ | |
| "name": file.name, | |
| "size": file.stat().st_size, | |
| "lines": len(lines) | |
| }) | |
| except Exception as e: | |
| files_info.append({ | |
| "name": file.name, | |
| "error": str(e) | |
| }) | |
| return { | |
| "files": files_info, | |
| "count": len(jsonl_files) | |
| } | |
| async def get_gpu_info(): | |
| """Get GPU information (simulated for HF Spaces)""" | |
| return { | |
| "available": False, | |
| "message": "GPU not available in HF Spaces free tier", | |
| "recommendation": "Use local training or upgrade to paid tier" | |
| } | |
| async def test_trained_model(): | |
| """Test the trained model (simulated)""" | |
| return { | |
| "success": True, | |
| "message": "Model testing simulated for HF Spaces", | |
| "test_prompt": "dimana lokasi textilindo?", | |
| "response": "Textilindo berkantor pusat di Jl. Raya Prancis No.39, Kosambi Tim., Kec. Kosambi, Kabupaten Tangerang, Banten 15213", | |
| "note": "This is a simulated response for HF Spaces demo" | |
| } | |
| async def test_ai_directly(request: ChatRequest): | |
| """Test AI directly without fallback to mock responses""" | |
| try: | |
| if not ai_assistant.client: | |
| return { | |
| "success": False, | |
| "message": "No HuggingFace client available", | |
| "response": None | |
| } | |
| # Test with a simple prompt | |
| test_prompt = f"User: {request.message}\nAssistant:" | |
| logger.info(f"Testing AI with prompt: {test_prompt}") | |
| response = ai_assistant.client.text_generation( | |
| test_prompt, | |
| max_new_tokens=100, | |
| temperature=0.7, | |
| top_p=0.9, | |
| top_k=40 | |
| ) | |
| logger.info(f"Direct AI response: {response}") | |
| return { | |
| "success": True, | |
| "message": "AI response generated successfully", | |
| "raw_response": response, | |
| "model": ai_assistant.model, | |
| "api_key_available": bool(ai_assistant.api_key) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in direct AI test: {e}") | |
| return { | |
| "success": False, | |
| "message": f"Error: {str(e)}", | |
| "response": None | |
| } | |
| # Training Endpoints | |
| async def start_training( | |
| model_name: str = "gpt2", | |
| epochs: int = 3, | |
| batch_size: int = 4 | |
| ): | |
| """Start AI model training""" | |
| try: | |
| result = training_manager.start_training(model_name, epochs, batch_size) | |
| return { | |
| "success": True, | |
| "message": "Training started successfully", | |
| "training_id": "train_" + datetime.now().strftime("%Y%m%d_%H%M%S"), | |
| **result | |
| } | |
| except Exception as e: | |
| logger.error(f"Error starting training: {e}") | |
| return { | |
| "success": False, | |
| "message": f"Error starting training: {str(e)}" | |
| } | |
| async def get_training_status(): | |
| """Get current training status""" | |
| try: | |
| status = training_manager.get_training_status() | |
| return { | |
| "success": True, | |
| "status": status | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting training status: {e}") | |
| return { | |
| "success": False, | |
| "message": f"Error getting training status: {str(e)}" | |
| } | |
| async def stop_training(): | |
| """Stop current training""" | |
| try: | |
| result = training_manager.stop_training() | |
| return { | |
| "success": True, | |
| "message": "Training stop requested", | |
| **result | |
| } | |
| except Exception as e: | |
| logger.error(f"Error stopping training: {e}") | |
| return { | |
| "success": False, | |
| "message": f"Error stopping training: {str(e)}" | |
| } | |
| async def get_training_data_info(): | |
| """Get information about training data""" | |
| try: | |
| data_path = "data/textilindo_training_data.jsonl" | |
| if not os.path.exists(data_path): | |
| return { | |
| "success": False, | |
| "message": "Training data not found" | |
| } | |
| # Count lines in training data | |
| with open(data_path, 'r', encoding='utf-8') as f: | |
| lines = f.readlines() | |
| # Sample first few entries | |
| sample_data = [] | |
| for line in lines[:3]: | |
| try: | |
| sample_data.append(json.loads(line)) | |
| except: | |
| continue | |
| return { | |
| "success": True, | |
| "data_info": { | |
| "total_samples": len(lines), | |
| "file_size_mb": os.path.getsize(data_path) / (1024 * 1024), | |
| "sample_entries": sample_data | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting training data info: {e}") | |
| return { | |
| "success": False, | |
| "message": f"Error getting training data info: {str(e)}" | |
| } | |
| async def get_available_models(): | |
| """Get list of available models for training""" | |
| return { | |
| "success": True, | |
| "models": [ | |
| { | |
| "name": "TinyLlama/TinyLlama-1.1B-Chat-v1.0", | |
| "description": "TinyLlama 1.1B Chat - Fast, widely available (Recommended)", | |
| "size": "1.1B parameters", | |
| "recommended": True | |
| }, | |
| { | |
| "name": "google/gemma-2-2b-it", | |
| "description": "Gemma 2B Instruct - Capable small instruct model", | |
| "size": "2B parameters", | |
| "recommended": True | |
| }, | |
| { | |
| "name": "microsoft/DialoGPT-medium", | |
| "description": "DialoGPT Medium - Conversational baseline", | |
| "size": "345M parameters", | |
| "recommended": False | |
| }, | |
| { | |
| "name": "distilgpt2", | |
| "description": "DistilGPT-2 - Lightweight baseline", | |
| "size": "82M parameters", | |
| "recommended": False | |
| } | |
| ] | |
| } | |
| async def get_auto_training_status(): | |
| """Get auto-training status""" | |
| return { | |
| "enabled": ai_assistant.auto_training_enabled, | |
| "interval_seconds": ai_assistant.training_interval, | |
| "last_training_time": ai_assistant.last_training_time, | |
| "trained_responses_count": len(ai_assistant.trained_responses), | |
| "next_training_in": max(0, ai_assistant.training_interval - (time.time() - ai_assistant.last_training_time)) | |
| } | |
| async def toggle_auto_training(): | |
| """Toggle auto-training on/off""" | |
| ai_assistant.auto_training_enabled = not ai_assistant.auto_training_enabled | |
| if ai_assistant.auto_training_enabled: | |
| ai_assistant.start_auto_training() | |
| return { | |
| "enabled": ai_assistant.auto_training_enabled, | |
| "message": f"Auto-training {'enabled' if ai_assistant.auto_training_enabled else 'disabled'}" | |
| } | |
| if __name__ == "__main__": | |
| # Get port from environment variable (Hugging Face Spaces uses 7860) | |
| port = int(os.getenv("PORT", 7860)) | |
| # Run the application | |
| uvicorn.run( | |
| "app:app", | |
| host="0.0.0.0", | |
| port=port, | |
| log_level="info" | |
| ) |