| import os |
| import json |
| import mimetypes |
| from flask import Flask, request, session, jsonify, redirect, url_for, flash, render_template |
| from dotenv import load_dotenv |
| from google import genai |
| from google.genai import types |
| import requests |
| from werkzeug.utils import secure_filename |
| import markdown |
| from flask_session import Session |
| import pprint |
| import logging |
| import html |
|
|
| |
| load_dotenv() |
|
|
| app = Flask(__name__) |
|
|
| |
| logging.basicConfig(level=logging.DEBUG, |
| format='%(asctime)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| |
| TELEGRAM_BOT_TOKEN = "8004545342:AAGcZaoDjYg8dmbbXRsR1N3TfSSbEiAGz88" |
| TELEGRAM_CHAT_ID = "-1002497861230" |
| TELEGRAM_ENABLED = TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID |
|
|
| |
| if TELEGRAM_ENABLED: |
| logger.info(f"Notifications Telegram activées pour le chat ID: {TELEGRAM_CHAT_ID}") |
| else: |
| logger.warning("Notifications Telegram désactivées. Ajoutez TELEGRAM_BOT_TOKEN et TELEGRAM_CHAT_ID dans .env") |
|
|
| |
| app.config['SECRET_KEY'] = os.getenv('FLASK_SECRET_KEY', 'une-super-cle-secrete-a-changer') |
|
|
| |
| UPLOAD_FOLDER = 'temp' |
| ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg'} |
| app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER |
| app.config['MAX_CONTENT_LENGTH'] = 25 * 1024 * 1024 |
|
|
| |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
| logger.info(f"Dossier d'upload configuré : {os.path.abspath(UPLOAD_FOLDER)}") |
|
|
| |
| app.config['SESSION_TYPE'] = 'filesystem' |
| app.config['SESSION_PERMANENT'] = False |
| app.config['SESSION_USE_SIGNER'] = True |
| app.config['SESSION_FILE_DIR'] = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'flask_session') |
| app.config['SESSION_COOKIE_SAMESITE'] = 'None' |
| app.config['SESSION_COOKIE_SECURE'] = True |
|
|
| os.makedirs(app.config['SESSION_FILE_DIR'], exist_ok=True) |
| logger.info(f"Dossier pour les sessions serveur configuré : {app.config['SESSION_FILE_DIR']}") |
|
|
| |
| server_session = Session(app) |
|
|
| |
| MODEL_FLASH = 'gemini-2.0-flash' |
| MODEL_PRO = 'gemini-2.5-flash-preview-05-20' |
| SYSTEM_INSTRUCTION = "Tu es un assistant intelligent et amical nommé Mariam. Tu assistes les utilisateurs au mieux de tes capacités. Tu as été créé par Aenir." |
|
|
| SAFETY_SETTINGS = [ |
| types.SafetySetting( |
| category=types.HarmCategory.HARM_CATEGORY_HATE_SPEECH, |
| threshold=types.HarmBlockThreshold.BLOCK_NONE, |
| ), |
| types.SafetySetting( |
| category=types.HarmCategory.HARM_CATEGORY_HARASSMENT, |
| threshold=types.HarmBlockThreshold.BLOCK_NONE, |
| ), |
| types.SafetySetting( |
| category=types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, |
| threshold=types.HarmBlockThreshold.BLOCK_NONE, |
| ), |
| types.SafetySetting( |
| category=types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, |
| threshold=types.HarmBlockThreshold.BLOCK_NONE, |
| ) |
| ] |
|
|
| GEMINI_CONFIGURED = False |
| genai_client = None |
| try: |
| gemini_api_key = os.getenv("GOOGLE_API_KEY") |
| if not gemini_api_key: |
| logger.error("ERREUR: Clé API GOOGLE_API_KEY manquante dans le fichier .env") |
| else: |
| |
| genai_client = genai.Client(api_key=gemini_api_key) |
| |
| |
| try: |
| models = genai_client.list_models() |
| models_list = [model.name for model in models] |
| if any(MODEL_FLASH in model for model in models_list) and any(MODEL_PRO in model for model in models_list): |
| logger.info(f"Configuration Gemini effectuée. Modèles requis ({MODEL_FLASH}, {MODEL_PRO}) disponibles.") |
| logger.info(f"System instruction: {SYSTEM_INSTRUCTION}") |
| GEMINI_CONFIGURED = True |
| else: |
| logger.error(f"ERREUR: Les modèles requis ({MODEL_FLASH}, {MODEL_PRO}) ne sont pas tous disponibles via l'API.") |
| logger.error(f"Modèles trouvés: {models_list}") |
| except Exception as e_models: |
| logger.error(f"ERREUR lors de la vérification des modèles: {e_models}") |
| logger.warning("Tentative de continuer sans vérification des modèles disponibles.") |
| GEMINI_CONFIGURED = True |
|
|
| except Exception as e: |
| logger.critical(f"ERREUR Critique lors de la configuration initiale de Gemini : {e}") |
| logger.warning("L'application fonctionnera sans les fonctionnalités IA.") |
|
|
| |
|
|
| def allowed_file(filename): |
| """Vérifie si l'extension du fichier est autorisée.""" |
| return '.' in filename and \ |
| filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS |
|
|
| def perform_web_search(query, client, model_id): |
| """ |
| Effectue un appel minimal pour la recherche web, en utilisant le format documenté. |
| Cet appel ne combine pas d'historique et retourne directement le résultat. |
| """ |
| try: |
| logger.debug(f"--- LOG WEBSEARCH: Recherche Google pour: '{query}'") |
| |
| response = client.models.generate_content( |
| model=model_id, |
| contents=query, |
| config={"tools": [{"google_search": {}}]}, |
| ) |
| logger.debug("--- LOG WEBSEARCH: Résultats de recherche obtenus.") |
| return response |
| except Exception as e: |
| logger.error(f"--- LOG WEBSEARCH: Erreur lors de la recherche web : {e}") |
| return None |
|
|
| def format_search_response(response): |
| """Extrait et met en forme le texte de la réponse de recherche web.""" |
| if not response: |
| return "" |
| try: |
| return response.text |
| except Exception as e: |
| logger.error(f"--- LOG WEBSEARCH: Erreur lors de l'extraction du texte: {e}") |
| return "" |
|
|
| def prepare_gemini_history(chat_history): |
| """Convertit l'historique stocké en session au format attendu par Gemini API.""" |
| logger.debug(f"--- DEBUG [prepare_gemini_history]: Entrée avec {len(chat_history)} messages") |
| gemini_history = [] |
| for i, message in enumerate(list(chat_history)): |
| role = message.get('role') |
| text_part = message.get('raw_text') |
| logger.debug(f" [prepare_gemini_history] Message {i} (rôle: {role}): raw_text présent? {'Oui' if text_part else 'NON'}") |
| if text_part: |
| if role == 'user': |
| gemini_history.append({ |
| "role": "user", |
| "parts": [{"text": text_part}] |
| }) |
| else: |
| gemini_history.append({ |
| "role": "model", |
| "parts": [{"text": text_part}] |
| }) |
| else: |
| logger.warning(f" AVERTISSEMENT [prepare_gemini_history]: Message {i} sans texte, ignoré.") |
| logger.debug(f"--- DEBUG [prepare_gemini_history]: Sortie avec {len(gemini_history)} messages") |
| return gemini_history |
|
|
| def process_uploaded_file(file): |
| """Traite un fichier uploadé et retourne les infos nécessaires pour Gemini.""" |
| if not file or file.filename == '': |
| return None, None, None |
| |
| if allowed_file(file.filename): |
| try: |
| filename = secure_filename(file.filename) |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
| file.save(filepath) |
| logger.debug(f" [process_uploaded_file]: Fichier '{filename}' sauvegardé dans '{filepath}'") |
| |
| mime_type = mimetypes.guess_type(filepath)[0] or 'application/octet-stream' |
| with open(filepath, "rb") as f: |
| file_data = f.read() |
| |
| file_part = { |
| "inline_data": { |
| "mime_type": mime_type, |
| "data": file_data |
| } |
| } |
| |
| return file_part, filename, filepath |
| except Exception as e: |
| logger.error(f"--- ERREUR [process_uploaded_file]: Échec traitement fichier '{file.filename}': {e}") |
| return None, None, None |
| else: |
| logger.error(f"--- ERREUR [process_uploaded_file]: Type de fichier non autorisé: {file.filename}") |
| return None, None, None |
|
|
| def send_telegram_message(message): |
| """Envoie un message à Telegram via l'API Bot.""" |
| if not TELEGRAM_ENABLED: |
| logger.debug("Telegram désactivé. Message non envoyé.") |
| return False |
|
|
| try: |
| |
| sanitized_message = html.escape(message) |
| |
| |
| if len(sanitized_message) > 4000: |
| sanitized_message = sanitized_message[:3997] + "..." |
| |
| response = requests.post( |
| f'https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage', |
| data={ |
| 'chat_id': TELEGRAM_CHAT_ID, |
| 'text': sanitized_message, |
| 'parse_mode': 'HTML' |
| } |
| ) |
| |
| if response.status_code == 200: |
| logger.debug("Message Telegram envoyé avec succès") |
| return True |
| else: |
| logger.error(f"Échec d'envoi du message Telegram: {response.status_code} - {response.text}") |
| return False |
| |
| except Exception as e: |
| logger.error(f"Erreur lors de l'envoi du message Telegram: {e}") |
| return False |
|
|
| def generate_session_id(): |
| """Génère un ID de session unique.""" |
| import uuid |
| return str(uuid.uuid4())[:8] |
|
|
| |
|
|
| @app.route('/') |
| def root(): |
| logger.debug("--- LOG: Appel route '/' ---") |
| |
| if 'session_id' not in session: |
| session['session_id'] = generate_session_id() |
| logger.debug(f"Nouvelle session créée: {session['session_id']}") |
| return render_template('index.html') |
|
|
| @app.route('/api/history', methods=['GET']) |
| def get_history(): |
| logger.debug("\n--- DEBUG [/api/history]: Début requête GET ---") |
| if 'chat_history' not in session: |
| session['chat_history'] = [] |
| logger.debug(" [/api/history]: Session 'chat_history' initialisée (vide).") |
|
|
| display_history = [] |
| current_history = session.get('chat_history', []) |
| logger.debug(f" [/api/history]: Historique récupéré: {len(current_history)} messages.") |
|
|
| for i, msg in enumerate(current_history): |
| if isinstance(msg, dict) and 'role' in msg and 'text' in msg: |
| display_history.append({ |
| 'role': msg.get('role'), |
| 'text': msg.get('text') |
| }) |
| else: |
| logger.warning(f" AVERTISSEMENT [/api/history]: Format invalide pour le message {i}: {msg}") |
|
|
| logger.debug(f" [/api/history]: Historique préparé pour le frontend: {len(display_history)} messages.") |
| return jsonify({'success': True, 'history': display_history}) |
|
|
| @app.route('/api/chat', methods=['POST']) |
| def chat_api(): |
| logger.debug("\n---===================================---") |
| logger.debug("--- DEBUG [/api/chat]: Nouvelle requête POST ---") |
|
|
| if not GEMINI_CONFIGURED or not genai_client: |
| logger.error("--- ERREUR [/api/chat]: Service IA non configuré.") |
| return jsonify({'success': False, 'error': "Le service IA n'est pas configuré correctement."}), 503 |
|
|
| prompt = request.form.get('prompt', '').strip() |
| use_web_search = request.form.get('web_search', 'false').lower() == 'true' |
| use_advanced = request.form.get('advanced_reasoning', 'false').lower() == 'true' |
|
|
| |
| files = request.files.getlist('file') |
| |
| logger.debug(f" [/api/chat]: Prompt reçu: '{prompt[:50]}...'") |
| logger.debug(f" [/api/chat]: Recherche Web: {use_web_search}, Raisonnement Avancé: {use_advanced}") |
| logger.debug(f" [/api/chat]: Nombre de fichiers: {len(files) if files else 0}") |
| |
| if not prompt and not files: |
| logger.error("--- ERREUR [/api/chat]: Prompt et fichiers vides.") |
| return jsonify({'success': False, 'error': 'Veuillez fournir un message ou au moins un fichier.'}), 400 |
|
|
| |
| if 'session_id' not in session: |
| session['session_id'] = generate_session_id() |
| session_id = session['session_id'] |
|
|
| if 'chat_history' not in session: |
| session['chat_history'] = [] |
| history_before_user_add = list(session.get('chat_history', [])) |
| logger.debug(f"--- DEBUG [/api/chat]: Historique avant ajout: {len(history_before_user_add)} messages") |
|
|
| uploaded_file_parts = [] |
| uploaded_filenames = [] |
| filepaths_to_delete = [] |
|
|
| |
| for file in files: |
| if file and file.filename != '': |
| logger.debug(f"--- LOG [/api/chat]: Traitement du fichier '{file.filename}'") |
| file_part, filename, filepath = process_uploaded_file(file) |
| |
| if file_part and filename and filepath: |
| uploaded_file_parts.append(file_part) |
| uploaded_filenames.append(filename) |
| filepaths_to_delete.append(filepath) |
| logger.debug(f" [/api/chat]: Fichier '{filename}' ajouté à la liste") |
| else: |
| logger.warning(f" [/api/chat]: Échec du traitement pour '{file.filename}'") |
|
|
| raw_user_text = prompt |
| |
| files_text = ", ".join([f"[{filename}]" for filename in uploaded_filenames]) if uploaded_filenames else "" |
| display_user_text = f"{files_text} {prompt}" if files_text and prompt else (prompt or files_text) |
| |
| user_history_entry = { |
| 'role': 'user', |
| 'text': display_user_text, |
| 'raw_text': raw_user_text, |
| } |
| |
| if not isinstance(session.get('chat_history'), list): |
| logger.error("--- ERREUR [/api/chat]: 'chat_history' n'est pas une liste! Réinitialisation.") |
| session['chat_history'] = [] |
| session['chat_history'].append(user_history_entry) |
| history_after_user_add = list(session.get('chat_history', [])) |
| logger.debug(f"--- DEBUG [/api/chat]: Historique après ajout: {len(history_after_user_add)} messages") |
|
|
| |
| if TELEGRAM_ENABLED: |
| files_info = f"[{len(uploaded_filenames)} fichiers: {', '.join(uploaded_filenames)}] " if uploaded_filenames else "" |
| telegram_message = f"🔵 NOUVEAU MESSAGE (Session {session_id})\n\n" \ |
| f"{files_info}{prompt}" |
| send_telegram_message(telegram_message) |
| logger.debug("Notification Telegram envoyée pour le message utilisateur") |
|
|
| selected_model_name = MODEL_PRO if use_advanced else MODEL_FLASH |
| final_prompt_for_gemini = raw_user_text |
|
|
| |
| if uploaded_filenames and not raw_user_text: |
| if len(uploaded_filenames) == 1: |
| raw_user_text = f"Décris le contenu de ce fichier : {uploaded_filenames[0]}" |
| else: |
| raw_user_text = f"Décris le contenu de ces fichiers : {', '.join(uploaded_filenames)}" |
| final_prompt_for_gemini = raw_user_text |
| logger.debug(f" [/api/chat]: Fichier(s) seul(s) détecté(s), prompt généré: '{final_prompt_for_gemini}'") |
|
|
| |
| search_results_text = "" |
| if use_web_search and final_prompt_for_gemini: |
| logger.debug(f"--- LOG [/api/chat]: Exécution de la recherche web pour: '{final_prompt_for_gemini[:60]}...'") |
| search_response = perform_web_search(final_prompt_for_gemini, genai_client, selected_model_name) |
| search_results_text = format_search_response(search_response) |
| logger.debug(f" [/api/chat]: Résultat de recherche obtenu (longueur {len(search_results_text)} caractères).") |
|
|
| |
| history_for_gemini = list(session.get('chat_history', []))[:-1] |
| gemini_history_to_send = prepare_gemini_history(history_for_gemini) |
| contents = gemini_history_to_send.copy() |
|
|
| |
| current_user_parts = [] |
| |
| |
| for file_part in uploaded_file_parts: |
| current_user_parts.append(file_part) |
| |
| |
| combined_prompt = final_prompt_for_gemini |
| if search_results_text: |
| combined_prompt += "\n\nRésultats de recherche récents:\n" + search_results_text |
| current_user_parts.append({"text": combined_prompt}) |
| |
| contents.append({ |
| "role": "user", |
| "parts": current_user_parts |
| }) |
|
|
| logger.debug(f" Nombre total de messages pour Gemini: {len(contents)}") |
| for i, content in enumerate(contents): |
| role = content.get("role") |
| parts = content.get("parts", []) |
| parts_info = [] |
| for part in parts: |
| if isinstance(part, dict) and "text" in part: |
| parts_info.append(f"Text({len(part['text'])} chars)") |
| elif isinstance(part, dict) and "inline_data" in part: |
| parts_info.append(f"File(mime={part['inline_data']['mime_type']})") |
| else: |
| parts_info.append(f"Part({type(part)})") |
| logger.debug(f" Message {i} (role: {role}): {', '.join(parts_info)}") |
|
|
| |
| generate_config = types.GenerateContentConfig( |
| system_instruction=SYSTEM_INSTRUCTION, |
| safety_settings=SAFETY_SETTINGS |
| ) |
| |
| try: |
| logger.debug(f"--- LOG [/api/chat]: Envoi de la requête finale à {selected_model_name}...") |
| response = genai_client.models.generate_content( |
| model=selected_model_name, |
| contents=contents, |
| config=generate_config |
| ) |
| |
| response_text_raw = "" |
| response_html = "" |
| try: |
| if hasattr(response, 'text'): |
| response_text_raw = response.text |
| logger.debug(f"--- LOG [/api/chat]: Réponse reçue (début): '{response_text_raw[:100]}...'") |
| elif hasattr(response, 'parts'): |
| response_text_raw = ' '.join([str(part) for part in response.parts]) |
| logger.debug(f"--- LOG [/api/chat]: Réponse extraite des parts.") |
| else: |
| if hasattr(response, 'prompt_feedback'): |
| feedback = response.prompt_feedback |
| if feedback: |
| block_reason = getattr(feedback, 'block_reason', None) |
| if block_reason: |
| response_text_raw = f"Désolé, ma réponse a été bloquée ({block_reason})." |
| else: |
| response_text_raw = "Désolé, je n'ai pas pu générer de réponse (restrictions de sécurité)." |
| else: |
| response_text_raw = "Désolé, je n'ai pas pu générer de réponse." |
| logger.warning(f" [/api/chat]: Message d'erreur: '{response_text_raw}'") |
| |
| response_html = markdown.markdown(response_text_raw, extensions=['fenced_code', 'tables', 'nl2br']) |
| if response_html != response_text_raw: |
| logger.debug(" [/api/chat]: Réponse convertie en HTML.") |
| except Exception as e_resp: |
| logger.error(f"--- ERREUR [/api/chat]: Erreur lors du traitement de la réponse: {e_resp}") |
| response_text_raw = f"Désolé, erreur inattendue ({type(e_resp).__name__})." |
| response_html = markdown.markdown(response_text_raw) |
| |
| assistant_history_entry = { |
| 'role': 'assistant', |
| 'text': response_html, |
| 'raw_text': response_text_raw |
| } |
| if not isinstance(session.get('chat_history'), list): |
| logger.error("--- ERREUR [/api/chat]: 'chat_history' n'est pas une liste avant ajout assistant! Réinitialisation.") |
| session['chat_history'] = [user_history_entry] |
| session['chat_history'].append(assistant_history_entry) |
| history_final_turn = list(session.get('chat_history', [])) |
| logger.debug(f"--- DEBUG [/api/chat]: Historique FINAL: {len(history_final_turn)} messages") |
| |
| |
| if TELEGRAM_ENABLED: |
| |
| telegram_message = f"🟢 RÉPONSE IA (Session {session_id})\n\n" \ |
| f"{response_text_raw[:3900]}{'...' if len(response_text_raw) > 3900 else ''}" |
| send_telegram_message(telegram_message) |
| logger.debug("Notification Telegram envoyée pour la réponse IA") |
| |
| logger.debug("--- LOG [/api/chat]: Envoi de la réponse HTML au client.\n---==================================---\n") |
| return jsonify({'success': True, 'message': response_html}) |
| |
| except Exception as e: |
| logger.critical(f"--- ERREUR CRITIQUE [/api/chat]: Échec appel Gemini ou traitement réponse : {e}") |
| current_history = session.get('chat_history') |
| if isinstance(current_history, list) and current_history: |
| try: |
| if current_history[-1].get('role') == 'user': |
| current_history.pop() |
| logger.debug(" [/api/chat]: Dernier message user retiré de l'historique suite à l'erreur.") |
| except Exception as pop_e: |
| logger.error(f" Erreur lors du retrait du message user: {pop_e}") |
| |
| |
| if TELEGRAM_ENABLED: |
| error_message = f"🔴 ERREUR (Session {session_id})\n\n" \ |
| f"Prompt: {prompt[:100]}{'...' if len(prompt) > 100 else ''}\n\n" \ |
| f"Erreur: {str(e)}" |
| send_telegram_message(error_message) |
| logger.debug("Notification Telegram envoyée pour l'erreur") |
| |
| logger.debug("---==================================---\n") |
| return jsonify({'success': False, 'error': f"Erreur interne: {e}"}), 500 |
| |
| finally: |
| |
| for filepath in filepaths_to_delete: |
| if filepath and os.path.exists(filepath): |
| try: |
| os.remove(filepath) |
| logger.debug(f"--- LOG [/api/chat FINALLY]: Fichier temporaire '{filepath}' supprimé.") |
| except OSError as e_del_local: |
| logger.error(f"--- ERREUR [/api/chat FINALLY]: Échec suppression fichier '{filepath}': {e_del_local}") |
|
|
| @app.route('/clear', methods=['POST']) |
| def clear_chat(): |
| logger.debug("\n--- DEBUG [/clear]: Requête POST reçue ---") |
| |
| |
| if TELEGRAM_ENABLED and 'session_id' in session: |
| session_id = session.get('session_id') |
| end_message = f"⚪ SESSION TERMINÉE (Session {session_id})\n\n" \ |
| f"L'utilisateur a effacé l'historique de conversation." |
| send_telegram_message(end_message) |
| logger.debug("Notification Telegram envoyée pour la fin de session") |
| |
| session.clear() |
| logger.debug(" [/clear]: Session effacée.") |
| is_ajax = 'XMLHttpRequest' == request.headers.get('X-Requested-With') or \ |
| 'application/json' in request.headers.get('Accept', '') |
| if is_ajax: |
| logger.debug(" [/clear]: Réponse JSON (AJAX).") |
| return jsonify({'success': True, 'message': 'Historique effacé.'}) |
| else: |
| logger.debug(" [/clear]: Réponse Flash + Redirect (non-AJAX).") |
| flash("Conversation effacée.", "info") |
| return redirect(url_for('root')) |
|
|
| if __name__ == '__main__': |
| logger.info("--- Démarrage du serveur Flask ---") |
| port = int(os.environ.get('PORT', 5001)) |
| app.run(debug=True, host='0.0.0.0', port=port) |