Testpdf1 / app.py
Docfile's picture
Update app.py
9b56992 verified
import logging
import os
import io
import base64
import json
import requests
import threading
import uuid
import time
import tempfile
import subprocess
import shutil
import re
from datetime import datetime
from flask import Flask, render_template, request, jsonify, Response, stream_with_context, send_from_directory
from google import genai
from google.genai import types
from PIL import Image
# --- Configuration du Logging ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# --- Configuration de l'Application Flask ---
app = Flask(__name__)
# --- Constantes et Variables Globales ---
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
TELEGRAM_BOT_TOKEN = "8004545342:AAGcZaoDjYg8dmbbXRsR1N3TfSSbEiAGz88"
TELEGRAM_CHAT_ID = "-1002564204301"
GENERATED_PDF_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'generated_pdfs')
USER_IMAGES_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'user_images')
# --- Initialisation des Services Externes ---
client = None
if GOOGLE_API_KEY:
try:
client = genai.Client(api_key=GOOGLE_API_KEY)
logger.info("Client Google GenAI initialisé avec succès.")
except Exception as e:
logger.critical(f"Erreur critique lors de l'initialisation du client Gemini: {e}", exc_info=True)
else:
logger.critical("GOOGLE_API_KEY non trouvé dans les variables d'environnement. Le service ne fonctionnera pas.")
task_results = {}
# --- Fonctions Utilitaires ---
def load_prompt_from_file(filename):
"""Charge le contenu d'un fichier de prompt."""
try:
prompts_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'prompts')
filepath = os.path.join(prompts_dir, filename)
logger.info(f"Chargement du prompt depuis '{filepath}'")
with open(filepath, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
logger.error(f"Erreur lors du chargement du prompt '{filename}': {e}", exc_info=True)
return ""
def get_prompt_for_style(style):
"""Retourne le prompt approprié en fonction du style demandé."""
logger.info(f"Sélection du prompt pour le style: '{style}'")
return load_prompt_from_file('prompt_light.txt') if style == 'light' else load_prompt_from_file('prompt_colorful.txt')
def check_latex_installation():
"""Vérifie si pdflatex est installé et accessible dans le PATH."""
logger.info("Vérification de l'installation de LaTeX (pdflatex)...")
try:
subprocess.run(["pdflatex", "-version"], capture_output=True, check=True, timeout=10)
logger.info("Vérification réussie: pdflatex est installé et fonctionnel.")
return True
except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired) as e:
logger.warning(f"pdflatex n'est pas installé ou n'est pas dans le PATH. La génération de PDF sera désactivée. Erreur: {e}")
return False
IS_LATEX_INSTALLED = check_latex_installation()
def save_user_image(image_data, filename, task_id):
"""Sauvegarde une image utilisateur dans le dossier user_images."""
try:
# Créer un nom de fichier unique avec le task_id
file_extension = os.path.splitext(filename)[1]
safe_filename = f"{task_id}_{filename}"
image_path = os.path.join(USER_IMAGES_DIR, safe_filename)
with open(image_path, 'wb') as f:
f.write(image_data)
logger.info(f"Image utilisateur sauvegardée: {safe_filename}")
return safe_filename
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde de l'image {filename}: {e}")
return None
def upload_file_to_genai_api(file_data, filename, mime_type):
"""Upload un fichier vers l'API Files de Google GenAI."""
try:
# Déterminer l'extension appropriée
if mime_type.startswith('image/'):
if mime_type == 'image/jpeg':
suffix = '.jpg'
elif mime_type == 'image/png':
suffix = '.png'
elif mime_type == 'image/gif':
suffix = '.gif'
elif mime_type == 'image/webp':
suffix = '.webp'
else:
suffix = '.jpg' # Par défaut
elif mime_type == 'application/pdf':
suffix = '.pdf'
else:
suffix = '.tmp'
# Créer un fichier temporaire
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
temp_file.write(file_data)
temp_file_path = temp_file.name
logger.info(f"Upload du fichier '{filename}' ({len(file_data)} bytes) vers l'API Files de Google GenAI...")
# Upload vers l'API Files
file_ref = client.files.upload(file=temp_file_path)
# Nettoyer le fichier temporaire
os.unlink(temp_file_path)
logger.info(f"Fichier '{filename}' uploadé avec succès. Référence: {file_ref.name}")
return file_ref
except Exception as e:
logger.error(f"Erreur lors de l'upload du fichier '{filename}' vers l'API Files: {e}")
# Nettoyer le fichier temporaire en cas d'erreur
try:
if 'temp_file_path' in locals():
os.unlink(temp_file_path)
except:
pass
return None
def call_gemini_with_fallback(contents, task_id):
"""Appelle Gemini 2.5 Pro en premier, puis 2.5 Flash en cas d'échec."""
models_to_try = [
{"name": "gemini-2.5-pro", "display_name": "Gemini 2.5 Pro"},
{"name": "gemini-2.5-flash", "display_name": "Gemini 2.5 Flash"}
]
last_error = None
for i, model_info in enumerate(models_to_try):
model_name = model_info["name"]
model_display = model_info["display_name"]
try:
logger.info(f"[Task {task_id}] Tentative {i+1}/2: Appel de {model_display}...")
# Mettre à jour le statut pour indiquer quel modèle est en cours d'utilisation
if model_name == "gemini-2.5-pro":
task_results[task_id]['status'] = 'generating_latex_pro'
task_results[task_id]['current_model'] = 'Gemini 2.5 Pro'
else:
task_results[task_id]['status'] = 'generating_latex_flash'
task_results[task_id]['current_model'] = 'Gemini 2.5 Flash (fallback)'
# Faire l'appel à l'API
gemini_response = client.models.generate_content(
model=model_name,
contents=contents,
config=types.GenerateContentConfig(tools=[types.Tool(code_execution=types.ToolCodeExecution)])
)
# Vérifier si la réponse est valide
full_latex_response = ""
if gemini_response.candidates and gemini_response.candidates[0].content and gemini_response.candidates[0].content.parts:
for part in gemini_response.candidates[0].content.parts:
if hasattr(part, 'text') and part.text:
full_latex_response += part.text
if not full_latex_response.strip():
raise ValueError(f"Réponse vide de {model_display}")
logger.info(f"[Task {task_id}] ✓ Succès avec {model_display}")
task_results[task_id]['used_model'] = model_display
return full_latex_response
except Exception as e:
error_msg = str(e)
logger.warning(f"[Task {task_id}] ✗ Échec avec {model_display}: {error_msg}")
last_error = e
# Si c'est le dernier modèle et qu'il a échoué, on lève l'erreur
if i == len(models_to_try) - 1:
logger.error(f"[Task {task_id}] Tous les modèles ont échoué. Dernière erreur: {error_msg}")
task_results[task_id]['model_failures'] = [
f"{models_to_try[0]['display_name']}: Échec",
f"{models_to_try[1]['display_name']}: {error_msg}"
]
raise last_error
# Attendre un peu avant d'essayer le modèle suivant
time.sleep(2)
# Cette ligne ne devrait jamais être atteinte, mais au cas où
raise last_error if last_error else Exception("Erreur inconnue lors des appels aux modèles Gemini")
def get_all_tasks_info():
"""Récupère toutes les informations des tâches pour le centre de gestion."""
tasks_info = []
for task_id, task_data in task_results.items():
task_info = {
'id': task_id,
'status': task_data['status'],
'style': task_data.get('style', 'unknown'),
'first_filename': task_data.get('first_filename', 'Unknown'),
'time_started': task_data.get('time_started', 0),
'time_started_formatted': datetime.fromtimestamp(task_data.get('time_started', 0)).strftime('%Y-%m-%d %H:%M:%S'),
'file_count': task_data.get('file_count', {'images': 0, 'pdfs': 0}),
'pdf_filename': task_data.get('pdf_filename'),
'error': task_data.get('error'),
'response': task_data.get('response', ''),
'used_model': task_data.get('used_model', 'N/A'),
'current_model': task_data.get('current_model', ''),
'model_failures': task_data.get('model_failures', []),
'user_images': []
}
# Chercher les images utilisateur associées à cette tâche
if os.path.exists(USER_IMAGES_DIR):
for img_file in os.listdir(USER_IMAGES_DIR):
if img_file.startswith(f"{task_id}_"):
task_info['user_images'].append(img_file)
tasks_info.append(task_info)
# Trier par date de création (plus récent en premier)
tasks_info.sort(key=lambda x: x['time_started'], reverse=True)
return tasks_info
def get_system_stats():
"""Récupère les statistiques du système."""
total_tasks = len(task_results)
completed_tasks = sum(1 for task in task_results.values() if task['status'] == 'completed')
error_tasks = sum(1 for task in task_results.values() if task['status'] == 'error')
pending_tasks = sum(1 for task in task_results.values() if task['status'] not in ['completed', 'error'])
# Statistiques d'utilisation des modèles
pro_usage = sum(1 for task in task_results.values() if task.get('used_model') == 'Gemini 2.5 Pro')
flash_usage = sum(1 for task in task_results.values() if task.get('used_model') == 'Gemini 2.5 Flash (fallback)')
# Compter les fichiers PDF générés
pdf_files = 0
if os.path.exists(GENERATED_PDF_DIR):
pdf_files = len([f for f in os.listdir(GENERATED_PDF_DIR) if f.endswith('.pdf')])
# Compter les images utilisateur
user_images = 0
if os.path.exists(USER_IMAGES_DIR):
user_images = len([f for f in os.listdir(USER_IMAGES_DIR) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp'))])
return {
'total_tasks': total_tasks,
'completed_tasks': completed_tasks,
'error_tasks': error_tasks,
'pending_tasks': pending_tasks,
'pdf_files': pdf_files,
'user_images': user_images,
'latex_installed': IS_LATEX_INSTALLED,
'pro_usage': pro_usage,
'flash_usage': flash_usage
}
def clean_latex_code(latex_code):
"""Extrait le code LaTeX brut des blocs de code formatés (```latex ... ```)."""
logger.info("Nettoyage du code LaTeX reçu de Gemini...")
match_latex = re.search(r"```(?:latex|tex)\s*(.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE)
if match_latex:
logger.info("Bloc de code 'latex' ou 'tex' trouvé et extrait.")
return match_latex.group(1).strip()
match_generic = re.search(r"```\s*(\\documentclass.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE)
if match_generic:
logger.info("Bloc de code générique avec '\\documentclass' trouvé et extrait.")
return match_generic.group(1).strip()
logger.warning("Aucun bloc de code LaTeX (```...```) n'a été trouvé. Utilisation de la réponse brute.")
return latex_code.strip()
def latex_to_pdf(latex_code, output_filename_base, output_dir):
"""Compile une chaîne de code LaTeX en fichier PDF."""
if not IS_LATEX_INSTALLED:
logger.error("Tentative de compilation LaTeX alors que pdflatex n'est pas disponible.")
return None, "Erreur: pdflatex n'est pas installé sur le serveur."
tex_filename = f"{output_filename_base}.tex"
tex_path = os.path.join(output_dir, tex_filename)
pdf_path = os.path.join(output_dir, f"{output_filename_base}.pdf")
logger.info(f"Début de la compilation LaTeX vers PDF pour '{output_filename_base}'")
try:
with open(tex_path, "w", encoding="utf-8") as tex_file:
tex_file.write(latex_code)
logger.info(f"Fichier .tex '{tex_path}' créé avec succès.")
my_env = os.environ.copy()
my_env["LC_ALL"] = "C.UTF-8"
my_env["LANG"] = "C.UTF-8"
last_result = None
for i in range(2):
logger.info(f"Exécution de pdflatex - Passe {i+1}/2...")
process = subprocess.run(
["pdflatex", "-interaction=nonstopmode", "-output-directory", output_dir, tex_path],
capture_output=True, text=True, check=False, encoding="utf-8", errors="replace", env=my_env, timeout=60
)
last_result = process
if not os.path.exists(pdf_path) and process.returncode != 0:
logger.warning(f"La passe {i+1} de pdflatex a échoué et aucun PDF n'a été créé. Arrêt de la compilation.")
break
if os.path.exists(pdf_path):
logger.info(f"PDF généré avec succès : '{pdf_path}'")
return pdf_path, f"PDF généré: {os.path.basename(pdf_path)}"
else:
error_log = last_result.stdout + "\n" + last_result.stderr if last_result else "Aucun résultat de compilation disponible."
logger.error(f"Échec de la compilation PDF pour '{tex_filename}'. Log de pdflatex:\n{error_log}")
return None, f"Erreur de compilation PDF. Log: ...{error_log[-1000:]}"
except Exception as e:
logger.error(f"Exception pendant la génération du PDF: {e}", exc_info=True)
return None, f"Exception durant la génération du PDF: {str(e)}"
def send_to_telegram(file_data, filename, caption="Nouveau fichier uploadé"):
"""Envoie un fichier au canal Telegram configuré."""
logger.info(f"Préparation de l'envoi du fichier '{filename}' à Telegram.")
try:
if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto"
files = {'photo': (filename, file_data)}
log_msg = f"Envoi de l'image '{filename}' à Telegram..."
else:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendDocument"
files = {'document': (filename, file_data)}
log_msg = f"Envoi du document '{filename}' à Telegram..."
logger.info(log_msg)
data = {'chat_id': TELEGRAM_CHAT_ID, 'caption': caption}
response = requests.post(url, files=files, data=data, timeout=30)
response.raise_for_status()
logger.info(f"Fichier '{filename}' envoyé avec succès à Telegram.")
except Exception as e:
logger.error(f"Erreur lors de l'envoi à Telegram: {e}", exc_info=True)
# --- Logique Principale (Worker en arrière-plan) ---
def process_files_background(task_id, files_data, resolution_style):
"""Fonction exécutée en thread pour traiter les fichiers, appeler Gemini et générer le PDF."""
logger.info(f"[Task {task_id}] Démarrage du traitement en arrière-plan.")
task_results[task_id]['status'] = 'processing'
uploaded_file_refs = []
try:
if not client:
raise ConnectionError("Le client Gemini n'est pas initialisé.")
contents = []
logger.info(f"[Task {task_id}] Préparation des fichiers pour l'API Gemini.")
# Traiter tous les fichiers en utilisant l'API Files
for file_info in files_data:
logger.info(f"[Task {task_id}] Traitement du fichier '{file_info['filename']}' ({file_info['type']}).")
# Sauvegarder l'image utilisateur si c'est une image
if file_info['type'].startswith('image/'):
saved_filename = save_user_image(file_info['data'], file_info['filename'], task_id)
# Upload vers l'API Files de Google GenAI
file_ref = upload_file_to_genai_api(
file_info['data'],
file_info['filename'],
file_info['type']
)
if file_ref:
uploaded_file_refs.append(file_ref)
contents.append(file_ref)
logger.info(f"[Task {task_id}] Fichier '{file_info['filename']}' uploadé avec succès. Référence: {file_ref.name}")
else:
logger.warning(f"[Task {task_id}] Échec de l'upload du fichier '{file_info['filename']}'. Fichier ignoré.")
if not contents:
raise ValueError("Aucun fichier n'a pu être uploadé vers l'API Files de Google GenAI.")
prompt_to_use = get_prompt_for_style(resolution_style)
if not prompt_to_use:
raise ValueError(f"Le fichier de prompt pour le style '{resolution_style}' est introuvable ou vide.")
contents.append(prompt_to_use)
# Appeler Gemini avec fallback Pro -> Flash
logger.info(f"[Task {task_id}] Début des appels aux modèles Gemini avec {len(uploaded_file_refs)} fichier(s).")
full_latex_response = call_gemini_with_fallback(contents, task_id)
logger.info(f"[Task {task_id}] Réponse reçue de Gemini ({task_results[task_id].get('used_model', 'modèle inconnu')}).")
logger.debug(f"[Task {task_id}] Réponse brute de Gemini:\n---\n{full_latex_response[:500]}...\n---")
task_results[task_id]['status'] = 'cleaning_latex'
cleaned_latex = clean_latex_code(full_latex_response)
logger.debug(f"[Task {task_id}] Code LaTeX nettoyé:\n---\n{cleaned_latex[:500]}...\n---")
task_results[task_id]['status'] = 'generating_pdf'
pdf_filename_base = f"solution_{task_id}"
pdf_file_path, pdf_message = latex_to_pdf(cleaned_latex, pdf_filename_base, GENERATED_PDF_DIR)
if pdf_file_path:
task_results[task_id]['status'] = 'completed'
task_results[task_id]['pdf_filename'] = os.path.basename(pdf_file_path)
used_model = task_results[task_id].get('used_model', 'modèle inconnu')
task_results[task_id]['response'] = f"PDF généré avec succès: {os.path.basename(pdf_file_path)} (généré par {used_model})"
logger.info(f"[Task {task_id}] Tâche terminée avec succès. PDF: {os.path.basename(pdf_file_path)} (modèle: {used_model})")
else:
raise RuntimeError(f"Échec de la génération du PDF: {pdf_message}")
except Exception as e:
logger.error(f"[Task {task_id}] Une erreur est survenue dans le thread de traitement.", exc_info=True)
task_results[task_id]['status'] = 'error'
task_results[task_id]['error'] = str(e)
task_results[task_id]['response'] = f"Une erreur est survenue: {str(e)}"
finally:
# Nettoyer tous les fichiers uploadés vers l'API Files
if uploaded_file_refs:
logger.info(f"[Task {task_id}] Nettoyage des {len(uploaded_file_refs)} fichiers temporaires de l'API Files.")
for file_ref in uploaded_file_refs:
try:
client.files.delete(file_ref)
logger.info(f"[Task {task_id}] Fichier temporaire '{file_ref.name}' supprimé de l'API Files.")
except Exception as del_e:
logger.warning(f"[Task {task_id}] Échec de la suppression du fichier temporaire '{file_ref.name}': {del_e}")
# --- Routes Flask (API Endpoints) ---
@app.route('/')
def index():
logger.info(f"Requête servie pour l'endpoint '/' depuis {request.remote_addr}")
return render_template('index.html')
@app.route('/admin')
def admin_panel():
"""Centre de gestion complet."""
logger.info(f"Accès au centre de gestion depuis {request.remote_addr}")
tasks_info = get_all_tasks_info()
system_stats = get_system_stats()
return render_template('admin.html', tasks=tasks_info, stats=system_stats)
@app.route('/admin/api/tasks')
def admin_api_tasks():
"""API JSON pour récupérer les informations des tâches."""
tasks_info = get_all_tasks_info()
return jsonify(tasks_info)
@app.route('/admin/api/stats')
def admin_api_stats():
"""API JSON pour récupérer les statistiques système."""
stats = get_system_stats()
return jsonify(stats)
@app.route('/admin/delete_task/<task_id>', methods=['POST'])
def admin_delete_task(task_id):
"""Supprime une tâche et ses fichiers associés."""
logger.info(f"Demande de suppression de la tâche {task_id}")
if task_id not in task_results:
return jsonify({'error': 'Tâche introuvable'}), 404
try:
task_data = task_results[task_id]
# Supprimer le PDF s'il existe
if 'pdf_filename' in task_data:
pdf_path = os.path.join(GENERATED_PDF_DIR, task_data['pdf_filename'])
if os.path.exists(pdf_path):
os.remove(pdf_path)
logger.info(f"PDF supprimé: {pdf_path}")
# Supprimer les images utilisateur associées
if os.path.exists(USER_IMAGES_DIR):
for img_file in os.listdir(USER_IMAGES_DIR):
if img_file.startswith(f"{task_id}_"):
img_path = os.path.join(USER_IMAGES_DIR, img_file)
os.remove(img_path)
logger.info(f"Image utilisateur supprimée: {img_path}")
# Supprimer la tâche de la mémoire
del task_results[task_id]
logger.info(f"Tâche {task_id} supprimée avec succès")
return jsonify({'success': True})
except Exception as e:
logger.error(f"Erreur lors de la suppression de la tâche {task_id}: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/user_images/<filename>')
def serve_user_image(filename):
"""Sert les images utilisateur."""
try:
return send_from_directory(USER_IMAGES_DIR, filename)
except FileNotFoundError:
return "Image non trouvée", 404
@app.route('/solve', methods=['POST'])
def solve():
logger.info(f"Nouvelle requête sur /solve depuis {request.remote_addr}")
try:
if 'user_files' not in request.files:
logger.warning(f"/solve: Requête de {request.remote_addr} sans 'user_files'.")
return jsonify({'error': 'Aucun champ de fichier dans la requête'}), 400
uploaded_files = request.files.getlist('user_files')
if not uploaded_files or all(f.filename == '' for f in uploaded_files):
logger.warning(f"/solve: Requête de {request.remote_addr} avec champ 'user_files' mais sans fichiers.")
return jsonify({'error': 'Aucun fichier sélectionné'}), 400
resolution_style = request.form.get('style', 'colorful')
files_data = []
file_count = {'images': 0, 'pdfs': 0}
for file in uploaded_files:
if not file.filename: continue
file_data = file.read()
file_type = file.content_type or 'application/octet-stream'
if file_type.startswith('image/'):
file_count['images'] += 1
files_data.append({'filename': file.filename, 'data': file_data, 'type': file_type})
send_to_telegram(file_data, file.filename, f"Image reçue: {file.filename} (Style: {resolution_style})")
elif file_type == 'application/pdf':
if file_count['pdfs'] >= 1:
logger.warning(f"/solve: Requête de {request.remote_addr} avec plusieurs PDFs. Rejetée.")
return jsonify({'error': 'Un seul fichier PDF est autorisé par requête'}), 400
file_count['pdfs'] += 1
files_data.append({'filename': file.filename, 'data': file_data, 'type': file_type})
send_to_telegram(file_data, file.filename, f"PDF reçu: {file.filename} (Style: {resolution_style})")
else:
logger.warning(f"/solve: Fichier non supporté '{file.filename}' de type '{file_type}' uploadé par {request.remote_addr}.")
if not files_data:
logger.warning(f"/solve: Aucun fichier valide (image/pdf) trouvé dans la requête de {request.remote_addr}.")
return jsonify({'error': 'Aucun fichier valide (image ou PDF) n\'a été fourni'}), 400
task_id = str(uuid.uuid4())
task_results[task_id] = {
'status': 'pending', 'response': '', 'error': None, 'time_started': time.time(),
'style': resolution_style, 'file_count': file_count, 'first_filename': files_data[0]['filename']
}
logger.info(f"Création de la tâche {task_id} pour {file_count['images']} image(s) et {file_count['pdfs']} PDF(s). Style: {resolution_style}.")
threading.Thread(target=process_files_background, args=(task_id, files_data, resolution_style)).start()
return jsonify({'task_id': task_id, 'status': 'pending', 'first_filename': files_data[0]['filename']})
except Exception as e:
logger.error(f"Erreur inattendue dans l'endpoint /solve: {e}", exc_info=True)
return jsonify({'error': f'Erreur interne du serveur: {e}'}), 500
@app.route('/task/<task_id>', methods=['GET'])
def get_task_status(task_id):
logger.debug(f"Requête de statut pour la tâche {task_id}")
task = task_results.get(task_id)
if not task:
logger.warning(f"Tentative d'accès à une tâche inexistante: {task_id}")
return jsonify({'error': 'Tâche introuvable'}), 404
response_data = {
'status': task['status'],
'response': task.get('response'),
'error': task.get('error'),
'current_model': task.get('current_model', ''),
'used_model': task.get('used_model', ''),
'model_failures': task.get('model_failures', [])
}
if task['status'] == 'completed':
response_data['download_url'] = f"/download/{task_id}"
return jsonify(response_data)
@app.route('/stream/<task_id>', methods=['GET'])
def stream_task_progress(task_id):
"""Endpoint pour Server-Sent Events (SSE) pour streamer la progression."""
def generate():
logger.info(f"Nouvelle connexion de streaming (SSE) pour la tâche {task_id}")
last_status_sent = None
last_model_sent = None
while True:
task = task_results.get(task_id)
if not task:
logger.warning(f"La tâche {task_id} a disparu pendant le streaming.")
yield f'data: {json.dumps({"error": "La tâche a été perdue", "status": "error"})}\n\n'
break
current_status = task['status']
current_model = task.get('current_model', '')
# Envoyer une mise à jour si le statut ou le modèle a changé
if current_status != last_status_sent or current_model != last_model_sent:
data_to_send = {
"status": current_status,
"current_model": current_model,
"used_model": task.get('used_model', '')
}
if current_status == 'completed':
data_to_send["response"] = task.get("response", "")
data_to_send["download_url"] = f"/download/{task_id}"
elif current_status == 'error':
data_to_send["error"] = task.get("error", "Erreur inconnue")
data_to_send["model_failures"] = task.get("model_failures", [])
logger.info(f"[Task {task_id}] Envoi de la mise à jour de statut via SSE: {current_status} ({current_model})")
yield f'data: {json.dumps(data_to_send)}\n\n'
last_status_sent = current_status
last_model_sent = current_model
if current_status in ['completed', 'error']:
logger.info(f"Fermeture de la connexion SSE pour la tâche terminée/échouée {task_id}")
break
time.sleep(1)
return Response(stream_with_context(generate()), mimetype='text/event-stream', headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'})
@app.route('/download/<task_id>')
def download_pdf(task_id):
logger.info(f"Requête de téléchargement pour la tâche {task_id}")
task = task_results.get(task_id)
if not task or task['status'] != 'completed' or 'pdf_filename' not in task:
logger.warning(f"Échec du téléchargement pour la tâche {task_id}: Fichier non trouvé ou tâche non terminée.")
return "Fichier non trouvé ou la tâche n'est pas encore terminée.", 404
try:
logger.info(f"Envoi du fichier '{task['pdf_filename']}' pour la tâche {task_id}")
return send_from_directory(GENERATED_PDF_DIR, task['pdf_filename'], as_attachment=True)
except FileNotFoundError:
logger.error(f"Le fichier PDF '{task['pdf_filename']}' pour la tâche {task_id} est introuvable sur le disque.")
return "Erreur: Fichier introuvable sur le serveur.", 404
if __name__ == '__main__':
logger.info("Démarrage de l'application Flask.")
# Création des répertoires nécessaires
os.makedirs(GENERATED_PDF_DIR, exist_ok=True)
os.makedirs(USER_IMAGES_DIR, exist_ok=True)
logger.info(f"Répertoires assurés d'exister: '{GENERATED_PDF_DIR}' et '{USER_IMAGES_DIR}'")
app.run(debug=True, host='0.0.0.0', port=5000)