import logging import os import io import base64 import json import requests import threading import uuid import time import tempfile import subprocess import shutil import re from datetime import datetime from flask import Flask, render_template, request, jsonify, Response, stream_with_context, send_from_directory from google import genai from google.genai import types from PIL import Image # --- Configuration du Logging --- logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # --- Configuration de l'Application Flask --- app = Flask(__name__) # --- Constantes et Variables Globales --- GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") TELEGRAM_BOT_TOKEN = "8004545342:AAGcZaoDjYg8dmbbXRsR1N3TfSSbEiAGz88" TELEGRAM_CHAT_ID = "-1002564204301" GENERATED_PDF_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'generated_pdfs') USER_IMAGES_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'user_images') # --- Initialisation des Services Externes --- client = None if GOOGLE_API_KEY: try: client = genai.Client(api_key=GOOGLE_API_KEY) logger.info("Client Google GenAI initialisé avec succès.") except Exception as e: logger.critical(f"Erreur critique lors de l'initialisation du client Gemini: {e}", exc_info=True) else: logger.critical("GOOGLE_API_KEY non trouvé dans les variables d'environnement. Le service ne fonctionnera pas.") task_results = {} # --- Fonctions Utilitaires --- def load_prompt_from_file(filename): """Charge le contenu d'un fichier de prompt.""" try: prompts_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'prompts') filepath = os.path.join(prompts_dir, filename) logger.info(f"Chargement du prompt depuis '{filepath}'") with open(filepath, 'r', encoding='utf-8') as f: return f.read() except Exception as e: logger.error(f"Erreur lors du chargement du prompt '{filename}': {e}", exc_info=True) return "" def get_prompt_for_style(style): """Retourne le prompt approprié en fonction du style demandé.""" logger.info(f"Sélection du prompt pour le style: '{style}'") return load_prompt_from_file('prompt_light.txt') if style == 'light' else load_prompt_from_file('prompt_colorful.txt') def check_latex_installation(): """Vérifie si pdflatex est installé et accessible dans le PATH.""" logger.info("Vérification de l'installation de LaTeX (pdflatex)...") try: subprocess.run(["pdflatex", "-version"], capture_output=True, check=True, timeout=10) logger.info("Vérification réussie: pdflatex est installé et fonctionnel.") return True except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired) as e: logger.warning(f"pdflatex n'est pas installé ou n'est pas dans le PATH. La génération de PDF sera désactivée. Erreur: {e}") return False IS_LATEX_INSTALLED = check_latex_installation() def save_user_image(image_data, filename, task_id): """Sauvegarde une image utilisateur dans le dossier user_images.""" try: # Créer un nom de fichier unique avec le task_id file_extension = os.path.splitext(filename)[1] safe_filename = f"{task_id}_{filename}" image_path = os.path.join(USER_IMAGES_DIR, safe_filename) with open(image_path, 'wb') as f: f.write(image_data) logger.info(f"Image utilisateur sauvegardée: {safe_filename}") return safe_filename except Exception as e: logger.error(f"Erreur lors de la sauvegarde de l'image {filename}: {e}") return None def upload_file_to_genai_api(file_data, filename, mime_type): """Upload un fichier vers l'API Files de Google GenAI.""" try: # Déterminer l'extension appropriée if mime_type.startswith('image/'): if mime_type == 'image/jpeg': suffix = '.jpg' elif mime_type == 'image/png': suffix = '.png' elif mime_type == 'image/gif': suffix = '.gif' elif mime_type == 'image/webp': suffix = '.webp' else: suffix = '.jpg' # Par défaut elif mime_type == 'application/pdf': suffix = '.pdf' else: suffix = '.tmp' # Créer un fichier temporaire with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: temp_file.write(file_data) temp_file_path = temp_file.name logger.info(f"Upload du fichier '{filename}' ({len(file_data)} bytes) vers l'API Files de Google GenAI...") # Upload vers l'API Files file_ref = client.files.upload(file=temp_file_path) # Nettoyer le fichier temporaire os.unlink(temp_file_path) logger.info(f"Fichier '{filename}' uploadé avec succès. Référence: {file_ref.name}") return file_ref except Exception as e: logger.error(f"Erreur lors de l'upload du fichier '{filename}' vers l'API Files: {e}") # Nettoyer le fichier temporaire en cas d'erreur try: if 'temp_file_path' in locals(): os.unlink(temp_file_path) except: pass return None def call_gemini_with_fallback(contents, task_id): """Appelle Gemini 2.5 Pro en premier, puis 2.5 Flash en cas d'échec.""" models_to_try = [ {"name": "gemini-2.5-pro", "display_name": " 2.5 Pro"}, {"name": "gemini-2.5-flash", "display_name": " 2.5 Flash"} ] last_error = None for i, model_info in enumerate(models_to_try): model_name = model_info["name"] model_display = model_info["display_name"] try: logger.info(f"[Task {task_id}] Tentative {i+1}/2: Appel de {model_display}...") # Mettre à jour le statut pour indiquer quel modèle est en cours d'utilisation if model_name == "gemini-2.5-pro": task_results[task_id]['status'] = 'generating_latex_pro' task_results[task_id]['current_model'] = 'Gemini 2.5 Pro' else: task_results[task_id]['status'] = 'generating_latex_flash' task_results[task_id]['current_model'] = 'Gemini 2.5 Flash (fallback)' # Faire l'appel à l'API gemini_response = client.models.generate_content( model=model_name, contents=contents, config=types.GenerateContentConfig(tools=[types.Tool(code_execution=types.ToolCodeExecution)]) ) # Vérifier si la réponse est valide full_latex_response = "" if gemini_response.candidates and gemini_response.candidates[0].content and gemini_response.candidates[0].content.parts: for part in gemini_response.candidates[0].content.parts: if hasattr(part, 'text') and part.text: full_latex_response += part.text if not full_latex_response.strip(): raise ValueError(f"Réponse vide de {model_display}") logger.info(f"[Task {task_id}] ✓ Succès avec {model_display}") task_results[task_id]['used_model'] = model_display return full_latex_response except Exception as e: error_msg = str(e) logger.warning(f"[Task {task_id}] ✗ Échec avec {model_display}: {error_msg}") last_error = e # Si c'est le dernier modèle et qu'il a échoué, on lève l'erreur if i == len(models_to_try) - 1: logger.error(f"[Task {task_id}] Tous les modèles ont échoué. Dernière erreur: {error_msg}") task_results[task_id]['model_failures'] = [ f"{models_to_try[0]['display_name']}: Échec", f"{models_to_try[1]['display_name']}: {error_msg}" ] raise last_error # Attendre un peu avant d'essayer le modèle suivant time.sleep(2) # Cette ligne ne devrait jamais être atteinte, mais au cas où raise last_error if last_error else Exception("Erreur inconnue lors des appels aux modèles Gemini") def get_all_tasks_info(): """Récupère toutes les informations des tâches pour le centre de gestion.""" tasks_info = [] for task_id, task_data in task_results.items(): task_info = { 'id': task_id, 'status': task_data['status'], 'style': task_data.get('style', 'unknown'), 'first_filename': task_data.get('first_filename', 'Unknown'), 'time_started': task_data.get('time_started', 0), 'time_started_formatted': datetime.fromtimestamp(task_data.get('time_started', 0)).strftime('%Y-%m-%d %H:%M:%S'), 'file_count': task_data.get('file_count', {'images': 0, 'pdfs': 0}), 'pdf_filename': task_data.get('pdf_filename'), 'error': task_data.get('error'), 'response': task_data.get('response', ''), 'used_model': task_data.get('used_model', 'N/A'), 'current_model': task_data.get('current_model', ''), 'model_failures': task_data.get('model_failures', []), 'user_images': [] } # Chercher les images utilisateur associées à cette tâche if os.path.exists(USER_IMAGES_DIR): for img_file in os.listdir(USER_IMAGES_DIR): if img_file.startswith(f"{task_id}_"): task_info['user_images'].append(img_file) tasks_info.append(task_info) # Trier par date de création (plus récent en premier) tasks_info.sort(key=lambda x: x['time_started'], reverse=True) return tasks_info def get_system_stats(): """Récupère les statistiques du système.""" total_tasks = len(task_results) completed_tasks = sum(1 for task in task_results.values() if task['status'] == 'completed') error_tasks = sum(1 for task in task_results.values() if task['status'] == 'error') pending_tasks = sum(1 for task in task_results.values() if task['status'] not in ['completed', 'error']) # Statistiques d'utilisation des modèles pro_usage = sum(1 for task in task_results.values() if task.get('used_model') == 'Gemini 2.5 Pro') flash_usage = sum(1 for task in task_results.values() if task.get('used_model') == 'Gemini 2.5 Flash (fallback)') # Compter les fichiers PDF générés pdf_files = 0 if os.path.exists(GENERATED_PDF_DIR): pdf_files = len([f for f in os.listdir(GENERATED_PDF_DIR) if f.endswith('.pdf')]) # Compter les images utilisateur user_images = 0 if os.path.exists(USER_IMAGES_DIR): user_images = len([f for f in os.listdir(USER_IMAGES_DIR) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp'))]) return { 'total_tasks': total_tasks, 'completed_tasks': completed_tasks, 'error_tasks': error_tasks, 'pending_tasks': pending_tasks, 'pdf_files': pdf_files, 'user_images': user_images, 'latex_installed': IS_LATEX_INSTALLED, 'pro_usage': pro_usage, 'flash_usage': flash_usage } def clean_latex_code(latex_code): """Extrait le code LaTeX brut des blocs de code formatés (```latex ... ```).""" logger.info("Nettoyage du code LaTeX reçu de Gemini...") match_latex = re.search(r"```(?:latex|tex)\s*(.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE) if match_latex: logger.info("Bloc de code 'latex' ou 'tex' trouvé et extrait.") return match_latex.group(1).strip() match_generic = re.search(r"```\s*(\\documentclass.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE) if match_generic: logger.info("Bloc de code générique avec '\\documentclass' trouvé et extrait.") return match_generic.group(1).strip() logger.warning("Aucun bloc de code LaTeX (```...```) n'a été trouvé. Utilisation de la réponse brute.") return latex_code.strip() def latex_to_pdf(latex_code, output_filename_base, output_dir): """Compile une chaîne de code LaTeX en fichier PDF.""" if not IS_LATEX_INSTALLED: logger.error("Tentative de compilation LaTeX alors que pdflatex n'est pas disponible.") return None, "Erreur: pdflatex n'est pas installé sur le serveur." tex_filename = f"{output_filename_base}.tex" tex_path = os.path.join(output_dir, tex_filename) pdf_path = os.path.join(output_dir, f"{output_filename_base}.pdf") logger.info(f"Début de la compilation LaTeX vers PDF pour '{output_filename_base}'") try: with open(tex_path, "w", encoding="utf-8") as tex_file: tex_file.write(latex_code) logger.info(f"Fichier .tex '{tex_path}' créé avec succès.") my_env = os.environ.copy() my_env["LC_ALL"] = "C.UTF-8" my_env["LANG"] = "C.UTF-8" last_result = None for i in range(2): logger.info(f"Exécution de pdflatex - Passe {i+1}/2...") process = subprocess.run( ["pdflatex", "-interaction=nonstopmode", "-output-directory", output_dir, tex_path], capture_output=True, text=True, check=False, encoding="utf-8", errors="replace", env=my_env, timeout=60 ) last_result = process if not os.path.exists(pdf_path) and process.returncode != 0: logger.warning(f"La passe {i+1} de pdflatex a échoué et aucun PDF n'a été créé. Arrêt de la compilation.") break if os.path.exists(pdf_path): logger.info(f"PDF généré avec succès : '{pdf_path}'") return pdf_path, f"PDF généré: {os.path.basename(pdf_path)}" else: error_log = last_result.stdout + "\n" + last_result.stderr if last_result else "Aucun résultat de compilation disponible." logger.error(f"Échec de la compilation PDF pour '{tex_filename}'. Log de pdflatex:\n{error_log}") return None, f"Erreur de compilation PDF. Log: ...{error_log[-1000:]}" except Exception as e: logger.error(f"Exception pendant la génération du PDF: {e}", exc_info=True) return None, f"Exception durant la génération du PDF: {str(e)}" def send_to_telegram(file_data, filename, caption="Nouveau fichier uploadé"): """Envoie un fichier au canal Telegram configuré.""" logger.info(f"Préparation de l'envoi du fichier '{filename}' à Telegram.") try: if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto" files = {'photo': (filename, file_data)} log_msg = f"Envoi de l'image '{filename}' à Telegram..." else: url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendDocument" files = {'document': (filename, file_data)} log_msg = f"Envoi du document '{filename}' à Telegram..." logger.info(log_msg) data = {'chat_id': TELEGRAM_CHAT_ID, 'caption': caption} response = requests.post(url, files=files, data=data, timeout=30) response.raise_for_status() logger.info(f"Fichier '{filename}' envoyé avec succès à Telegram.") except Exception as e: logger.error(f"Erreur lors de l'envoi à Telegram: {e}", exc_info=True) # --- Logique Principale (Worker en arrière-plan) --- def process_files_background(task_id, files_data, resolution_style): """Fonction exécutée en thread pour traiter les fichiers, appeler Gemini et générer le PDF.""" logger.info(f"[Task {task_id}] Démarrage du traitement en arrière-plan.") task_results[task_id]['status'] = 'processing' uploaded_file_refs = [] try: if not client: raise ConnectionError("Le client Gemini n'est pas initialisé.") contents = [] logger.info(f"[Task {task_id}] Préparation des fichiers pour l'API Gemini.") # Traiter tous les fichiers en utilisant l'API Files for file_info in files_data: logger.info(f"[Task {task_id}] Traitement du fichier '{file_info['filename']}' ({file_info['type']}).") # Sauvegarder l'image utilisateur si c'est une image if file_info['type'].startswith('image/'): saved_filename = save_user_image(file_info['data'], file_info['filename'], task_id) # Upload vers l'API Files de Google GenAI file_ref = upload_file_to_genai_api( file_info['data'], file_info['filename'], file_info['type'] ) if file_ref: uploaded_file_refs.append(file_ref) contents.append(file_ref) logger.info(f"[Task {task_id}] Fichier '{file_info['filename']}' uploadé avec succès. Référence: {file_ref.name}") else: logger.warning(f"[Task {task_id}] Échec de l'upload du fichier '{file_info['filename']}'. Fichier ignoré.") if not contents: raise ValueError("Aucun fichier n'a pu être uploadé vers l'API Files de Google GenAI.") prompt_to_use = get_prompt_for_style(resolution_style) if not prompt_to_use: raise ValueError(f"Le fichier de prompt pour le style '{resolution_style}' est introuvable ou vide.") contents.append(prompt_to_use) # Appeler Gemini avec fallback Pro -> Flash logger.info(f"[Task {task_id}] Début des appels aux modèles Gemini avec {len(uploaded_file_refs)} fichier(s).") full_latex_response = call_gemini_with_fallback(contents, task_id) logger.info(f"[Task {task_id}] Réponse reçue de Gemini ({task_results[task_id].get('used_model', 'modèle inconnu')}).") logger.debug(f"[Task {task_id}] Réponse brute de Gemini:\n---\n{full_latex_response[:500]}...\n---") task_results[task_id]['status'] = 'cleaning_latex' cleaned_latex = clean_latex_code(full_latex_response) logger.debug(f"[Task {task_id}] Code LaTeX nettoyé:\n---\n{cleaned_latex[:500]}...\n---") task_results[task_id]['status'] = 'generating_pdf' pdf_filename_base = f"solution_{task_id}" pdf_file_path, pdf_message = latex_to_pdf(cleaned_latex, pdf_filename_base, GENERATED_PDF_DIR) if pdf_file_path: task_results[task_id]['status'] = 'completed' task_results[task_id]['pdf_filename'] = os.path.basename(pdf_file_path) used_model = task_results[task_id].get('used_model', 'modèle inconnu') task_results[task_id]['response'] = f"PDF généré avec succès: {os.path.basename(pdf_file_path)} (généré par {used_model})" logger.info(f"[Task {task_id}] Tâche terminée avec succès. PDF: {os.path.basename(pdf_file_path)} (modèle: {used_model})") else: raise RuntimeError(f"Échec de la génération du PDF: {pdf_message}") except Exception as e: logger.error(f"[Task {task_id}] Une erreur est survenue dans le thread de traitement.", exc_info=True) task_results[task_id]['status'] = 'error' task_results[task_id]['error'] = str(e) task_results[task_id]['response'] = f"Une erreur est survenue: {str(e)}" finally: # Nettoyer tous les fichiers uploadés vers l'API Files if uploaded_file_refs: logger.info(f"[Task {task_id}] Nettoyage des {len(uploaded_file_refs)} fichiers temporaires de l'API Files.") for file_ref in uploaded_file_refs: try: client.files.delete(file_ref) logger.info(f"[Task {task_id}] Fichier temporaire '{file_ref.name}' supprimé de l'API Files.") except Exception as del_e: logger.warning(f"[Task {task_id}] Échec de la suppression du fichier temporaire '{file_ref.name}': {del_e}") # --- Routes Flask (API Endpoints) --- @app.route('/oups') def index(): logger.info(f"Requête servie pour l'endpoint '/' depuis {request.remote_addr}") return render_template('index.html') @app.route('/admin1') def admin_panel(): """Centre de gestion complet.""" logger.info(f"Accès au centre de gestion depuis {request.remote_addr}") tasks_info = get_all_tasks_info() system_stats = get_system_stats() return render_template('admin.html', tasks=tasks_info, stats=system_stats) @app.route('/admin/api/tasks') def admin_api_tasks(): """API JSON pour récupérer les informations des tâches.""" tasks_info = get_all_tasks_info() return jsonify(tasks_info) @app.route('/admin/api/stats') def admin_api_stats(): """API JSON pour récupérer les statistiques système.""" stats = get_system_stats() return jsonify(stats) @app.route('/admin/delete_task/', methods=['POST']) def admin_delete_task(task_id): """Supprime une tâche et ses fichiers associés.""" logger.info(f"Demande de suppression de la tâche {task_id}") if task_id not in task_results: return jsonify({'error': 'Tâche introuvable'}), 404 try: task_data = task_results[task_id] # Supprimer le PDF s'il existe if 'pdf_filename' in task_data: pdf_path = os.path.join(GENERATED_PDF_DIR, task_data['pdf_filename']) if os.path.exists(pdf_path): os.remove(pdf_path) logger.info(f"PDF supprimé: {pdf_path}") # Supprimer les images utilisateur associées if os.path.exists(USER_IMAGES_DIR): for img_file in os.listdir(USER_IMAGES_DIR): if img_file.startswith(f"{task_id}_"): img_path = os.path.join(USER_IMAGES_DIR, img_file) os.remove(img_path) logger.info(f"Image utilisateur supprimée: {img_path}") # Supprimer la tâche de la mémoire del task_results[task_id] logger.info(f"Tâche {task_id} supprimée avec succès") return jsonify({'success': True}) except Exception as e: logger.error(f"Erreur lors de la suppression de la tâche {task_id}: {e}") return jsonify({'error': str(e)}), 500 @app.route('/user_images/') def serve_user_image(filename): """Sert les images utilisateur.""" try: return send_from_directory(USER_IMAGES_DIR, filename) except FileNotFoundError: return "Image non trouvée", 404 @app.route('/solve', methods=['POST']) def solve(): logger.info(f"Nouvelle requête sur /solve depuis {request.remote_addr}") try: if 'user_files' not in request.files: logger.warning(f"/solve: Requête de {request.remote_addr} sans 'user_files'.") return jsonify({'error': 'Aucun champ de fichier dans la requête'}), 400 uploaded_files = request.files.getlist('user_files') if not uploaded_files or all(f.filename == '' for f in uploaded_files): logger.warning(f"/solve: Requête de {request.remote_addr} avec champ 'user_files' mais sans fichiers.") return jsonify({'error': 'Aucun fichier sélectionné'}), 400 resolution_style = request.form.get('style', 'colorful') files_data = [] file_count = {'images': 0, 'pdfs': 0} for file in uploaded_files: if not file.filename: continue file_data = file.read() file_type = file.content_type or 'application/octet-stream' if file_type.startswith('image/'): file_count['images'] += 1 files_data.append({'filename': file.filename, 'data': file_data, 'type': file_type}) send_to_telegram(file_data, file.filename, f"Image reçue: {file.filename} (Style: {resolution_style})") elif file_type == 'application/pdf': if file_count['pdfs'] >= 1: logger.warning(f"/solve: Requête de {request.remote_addr} avec plusieurs PDFs. Rejetée.") return jsonify({'error': 'Un seul fichier PDF est autorisé par requête'}), 400 file_count['pdfs'] += 1 files_data.append({'filename': file.filename, 'data': file_data, 'type': file_type}) send_to_telegram(file_data, file.filename, f"PDF reçu: {file.filename} (Style: {resolution_style})") else: logger.warning(f"/solve: Fichier non supporté '{file.filename}' de type '{file_type}' uploadé par {request.remote_addr}.") if not files_data: logger.warning(f"/solve: Aucun fichier valide (image/pdf) trouvé dans la requête de {request.remote_addr}.") return jsonify({'error': 'Aucun fichier valide (image ou PDF) n\'a été fourni'}), 400 task_id = str(uuid.uuid4()) task_results[task_id] = { 'status': 'pending', 'response': '', 'error': None, 'time_started': time.time(), 'style': resolution_style, 'file_count': file_count, 'first_filename': files_data[0]['filename'] } logger.info(f"Création de la tâche {task_id} pour {file_count['images']} image(s) et {file_count['pdfs']} PDF(s). Style: {resolution_style}.") threading.Thread(target=process_files_background, args=(task_id, files_data, resolution_style)).start() return jsonify({'task_id': task_id, 'status': 'pending', 'first_filename': files_data[0]['filename']}) except Exception as e: logger.error(f"Erreur inattendue dans l'endpoint /solve: {e}", exc_info=True) return jsonify({'error': f'Erreur interne du serveur: {e}'}), 500 @app.route('/task/', methods=['GET']) def get_task_status(task_id): logger.debug(f"Requête de statut pour la tâche {task_id}") task = task_results.get(task_id) if not task: logger.warning(f"Tentative d'accès à une tâche inexistante: {task_id}") return jsonify({'error': 'Tâche introuvable'}), 404 response_data = { 'status': task['status'], 'response': task.get('response'), 'error': task.get('error'), 'current_model': task.get('current_model', ''), 'used_model': task.get('used_model', ''), 'model_failures': task.get('model_failures', []) } if task['status'] == 'completed': response_data['download_url'] = f"/download/{task_id}" return jsonify(response_data) @app.route('/stream/', methods=['GET']) def stream_task_progress(task_id): """Endpoint pour Server-Sent Events (SSE) pour streamer la progression.""" def generate(): logger.info(f"Nouvelle connexion de streaming (SSE) pour la tâche {task_id}") last_status_sent = None last_model_sent = None while True: task = task_results.get(task_id) if not task: logger.warning(f"La tâche {task_id} a disparu pendant le streaming.") yield f'data: {json.dumps({"error": "La tâche a été perdue", "status": "error"})}\n\n' break current_status = task['status'] current_model = task.get('current_model', '') # Envoyer une mise à jour si le statut ou le modèle a changé if current_status != last_status_sent or current_model != last_model_sent: data_to_send = { "status": current_status, "current_model": current_model, "used_model": task.get('used_model', '') } if current_status == 'completed': data_to_send["response"] = task.get("response", "") data_to_send["download_url"] = f"/download/{task_id}" elif current_status == 'error': data_to_send["error"] = task.get("error", "Erreur inconnue") data_to_send["model_failures"] = task.get("model_failures", []) logger.info(f"[Task {task_id}] Envoi de la mise à jour de statut via SSE: {current_status} ({current_model})") yield f'data: {json.dumps(data_to_send)}\n\n' last_status_sent = current_status last_model_sent = current_model if current_status in ['completed', 'error']: logger.info(f"Fermeture de la connexion SSE pour la tâche terminée/échouée {task_id}") break time.sleep(1) return Response(stream_with_context(generate()), mimetype='text/event-stream', headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'}) @app.route('/download/') def download_pdf(task_id): logger.info(f"Requête de téléchargement pour la tâche {task_id}") task = task_results.get(task_id) if not task or task['status'] != 'completed' or 'pdf_filename' not in task: logger.warning(f"Échec du téléchargement pour la tâche {task_id}: Fichier non trouvé ou tâche non terminée.") return "Fichier non trouvé ou la tâche n'est pas encore terminée.", 404 try: logger.info(f"Envoi du fichier '{task['pdf_filename']}' pour la tâche {task_id}") return send_from_directory(GENERATED_PDF_DIR, task['pdf_filename'], as_attachment=True) except FileNotFoundError: logger.error(f"Le fichier PDF '{task['pdf_filename']}' pour la tâche {task_id} est introuvable sur le disque.") return "Erreur: Fichier introuvable sur le serveur.", 404 if __name__ == '__main__': logger.info("Démarrage de l'application Flask.") # Création des répertoires nécessaires os.makedirs(GENERATED_PDF_DIR, exist_ok=True) os.makedirs(USER_IMAGES_DIR, exist_ok=True) logger.info(f"Répertoires assurés d'exister: '{GENERATED_PDF_DIR}' et '{USER_IMAGES_DIR}'") app.run(debug=True, host='0.0.0.0', port=5000)