Chatm2 / app.py
Docfile's picture
Update app.py
b293b19 verified
raw
history blame
16.2 kB
import os
import json
import mimetypes
from flask import Flask, request, session, jsonify, redirect, url_for, flash, render_template
from dotenv import load_dotenv
from google import genai
from google.genai import types
import requests
from werkzeug.utils import secure_filename
import markdown # Pour convertir la réponse en HTML
from flask_session import Session
import pprint # Pour un affichage plus lisible des structures complexes (optionnel)
import logging # Import logging
import html # Pour échapper le HTML dans les messages Telegram
# --- Configuration Initiale ---
load_dotenv()
app = Flask(__name__)
# --- Configuration Logging ---
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Configuration Telegram ---
TELEGRAM_BOT_TOKEN = "8004545342:AAGcZaoDjYg8dmbbXRsR1N3TfSSbEiAGz88"
TELEGRAM_CHAT_ID = "-1002497861230"
TELEGRAM_ENABLED = TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID
if TELEGRAM_ENABLED:
logger.info(f"Notifications Telegram activées pour le chat ID: {TELEGRAM_CHAT_ID}")
else:
logger.warning("Notifications Telegram désactivées. Ajoutez TELEGRAM_BOT_TOKEN et TELEGRAM_CHAT_ID dans .env")
# --- Configuration Flask Standard ---
app.config['SECRET_KEY'] = os.getenv('FLASK_SECRET_KEY', 'une-super-cle-secrete-a-changer')
# Configuration pour les uploads
UPLOAD_FOLDER = 'temp'
# CHANGED: Ajout des extensions audio
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'mp3', 'wav', 'aac', 'ogg', 'flac'}
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 25 * 1024 * 1024
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
logger.info(f"Dossier d'upload configuré : {os.path.abspath(UPLOAD_FOLDER)}")
# --- Configuration pour Flask-Session ---
app.config['SESSION_TYPE'] = 'filesystem'
app.config['SESSION_PERMANENT'] = False
app.config['SESSION_USE_SIGNER'] = True
app.config['SESSION_FILE_DIR'] = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'flask_session')
app.config['SESSION_COOKIE_SAMESITE'] = 'None'
app.config['SESSION_COOKIE_SECURE'] = True
os.makedirs(app.config['SESSION_FILE_DIR'], exist_ok=True)
logger.info(f"Dossier pour les sessions serveur configuré : {app.config['SESSION_FILE_DIR']}")
server_session = Session(app)
# --- Configuration de l'API Gemini ---
# CHANGED: Mise à jour des noms de modèles pour utiliser les versions avec "thinking" intégré
MODEL_FLASH = 'gemini-2.5-flash'
MODEL_PRO = 'gemini-2.5-pro'
SYSTEM_INSTRUCTION = "Tu es un assistant intelligent et amical nommé Mariam. Tu assistes les utilisateurs au mieux de tes capacités. Tu as été créé par Aenir. Tu peux analyser des textes, des images, des fichiers audio, exécuter du code, faire des recherches sur le web et analyser le contenu d'URLs."
SAFETY_SETTINGS = [
types.SafetySetting(category=types.HarmCategory.HARM_CATEGORY_HATE_SPEECH, threshold=types.HarmBlockThreshold.BLOCK_NONE),
types.SafetySetting(category=types.HarmCategory.HARM_CATEGORY_HARASSMENT, threshold=types.HarmBlockThreshold.BLOCK_NONE),
types.SafetySetting(category=types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, threshold=types.HarmBlockThreshold.BLOCK_NONE),
types.SafetySetting(category=types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, threshold=types.HarmBlockThreshold.BLOCK_NONE)
]
GEMINI_CONFIGURED = False
genai_client = None
try:
gemini_api_key = os.getenv("GOOGLE_API_KEY")
if not gemini_api_key:
logger.error("ERREUR: Clé API GOOGLE_API_KEY manquante dans le fichier .env")
else:
genai_client = genai.Client(api_key=gemini_api_key)
try:
models = genai_client.list_models()
models_list = [model.name for model in models]
if any(MODEL_FLASH in model for model in models_list) and any(MODEL_PRO in model for model in models_list):
logger.info(f"Configuration Gemini effectuée. Modèles requis ({MODEL_FLASH}, {MODEL_PRO}) disponibles.")
logger.info(f"System instruction: {SYSTEM_INSTRUCTION}")
GEMINI_CONFIGURED = True
else:
logger.error(f"ERREUR: Les modèles requis ({MODEL_FLASH}, {MODEL_PRO}) ne sont pas tous disponibles via l'API.")
logger.error(f"Modèles trouvés: {models_list}")
except Exception as e_models:
logger.error(f"ERREUR lors de la vérification des modèles: {e_models}")
logger.warning("Tentative de continuer sans vérification des modèles disponibles.")
GEMINI_CONFIGURED = True
except Exception as e:
logger.critical(f"ERREUR Critique lors de la configuration initiale de Gemini : {e}")
logger.warning("L'application fonctionnera sans les fonctionnalités IA.")
# --- Fonctions Utilitaires ---
def allowed_file(filename):
"""Vérifie si l'extension du fichier est autorisée."""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# REMOVED: La fonction perform_web_search est maintenant obsolète grâce à l'intégration des outils.
# REMOVED: La fonction format_search_response est également obsolète.
def prepare_gemini_history(chat_history):
"""Convertit l'historique stocké en session au format attendu par Gemini API."""
logger.debug(f"--- DEBUG [prepare_gemini_history]: Entrée avec {len(chat_history)} messages")
gemini_history = []
for message in list(chat_history):
role = message.get('role')
text_part = message.get('raw_text')
if text_part:
role_gemini = "user" if role == 'user' else "model"
gemini_history.append({"role": role_gemini, "parts": [{"text": text_part}]})
logger.debug(f"--- DEBUG [prepare_gemini_history]: Sortie avec {len(gemini_history)} messages")
return gemini_history
def process_uploaded_file(file):
"""Traite un fichier uploadé et retourne les infos nécessaires pour Gemini."""
if not file or file.filename == '':
return None, None, None
if allowed_file(file.filename):
try:
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
logger.debug(f" [process_uploaded_file]: Fichier '{filename}' sauvegardé dans '{filepath}'")
mime_type = mimetypes.guess_type(filepath)[0] or 'application/octet-stream'
with open(filepath, "rb") as f:
file_data = f.read()
file_part = {"inline_data": {"mime_type": mime_type, "data": file_data}}
return file_part, filename, filepath
except Exception as e:
logger.error(f"--- ERREUR [process_uploaded_file]: Échec traitement fichier '{file.filename}': {e}")
return None, None, None
else:
logger.error(f"--- ERREUR [process_uploaded_file]: Type de fichier non autorisé: {file.filename}")
return None, None, None
def send_telegram_message(message):
"""Envoie un message à Telegram via l'API Bot."""
if not TELEGRAM_ENABLED:
return False
try:
sanitized_message = html.escape(message)
if len(sanitized_message) > 4000:
sanitized_message = sanitized_message[:3997] + "..."
response = requests.post(
f'https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage',
data={'chat_id': TELEGRAM_CHAT_ID, 'text': sanitized_message, 'parse_mode': 'HTML'}
)
if response.status_code == 200:
logger.debug("Message Telegram envoyé avec succès")
return True
else:
logger.error(f"Échec d'envoi du message Telegram: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Erreur lors de l'envoi du message Telegram: {e}")
return False
def generate_session_id():
"""Génère un ID de session unique."""
import uuid
return str(uuid.uuid4())[:8]
# --- Routes Flask ---
@app.route('/')
def root():
if 'session_id' not in session:
session['session_id'] = generate_session_id()
return render_template('index.html')
@app.route('/api/history', methods=['GET'])
def get_history():
if 'chat_history' not in session:
session['chat_history'] = []
return jsonify({'success': True, 'history': session.get('chat_history', [])})
@app.route('/api/chat', methods=['POST'])
def chat_api():
logger.debug("\n---===================================---")
logger.debug("--- DEBUG [/api/chat]: Nouvelle requête POST ---")
if not GEMINI_CONFIGURED or not genai_client:
return jsonify({'success': False, 'error': "Le service IA n'est pas configuré correctement."}), 503
prompt = request.form.get('prompt', '').strip()
use_web_search = request.form.get('web_search', 'false').lower() == 'true'
use_advanced = request.form.get('advanced_reasoning', 'false').lower() == 'true'
files = request.files.getlist('file')
logger.debug(f" [/api/chat]: Prompt reçu: '{prompt[:50]}...'")
logger.debug(f" [/api/chat]: Recherche Web: {use_web_search}, Raisonnement Avancé: {use_advanced}")
logger.debug(f" [/api/chat]: Nombre de fichiers: {len(files) if files else 0}")
if not prompt and not files:
return jsonify({'success': False, 'error': 'Veuillez fournir un message ou au moins un fichier.'}), 400
if 'session_id' not in session:
session['session_id'] = generate_session_id()
session_id = session['session_id']
if 'chat_history' not in session:
session['chat_history'] = []
uploaded_file_parts = []
uploaded_filenames = []
filepaths_to_delete = []
for file in files:
if file and file.filename != '':
file_part, filename, filepath = process_uploaded_file(file)
if file_part:
uploaded_file_parts.append(file_part)
uploaded_filenames.append(filename)
filepaths_to_delete.append(filepath)
raw_user_text = prompt
files_text = ", ".join([f"[{filename}]" for filename in uploaded_filenames]) if uploaded_filenames else ""
display_user_text = f"{files_text} {prompt}" if files_text and prompt else (prompt or files_text)
user_history_entry = {'role': 'user', 'text': display_user_text, 'raw_text': raw_user_text}
session.setdefault('chat_history', []).append(user_history_entry)
if TELEGRAM_ENABLED:
files_info = f"[{len(uploaded_filenames)} fichiers] " if uploaded_filenames else ""
telegram_message = f"🔵 NOUVEAU MESSAGE (Session {session_id})\n\n{files_info}{prompt}"
send_telegram_message(telegram_message)
selected_model_name = MODEL_PRO if use_advanced else MODEL_FLASH
# --- NOUVELLE LOGIQUE UNIFIÉE ---
# Préparation de l'historique et du message courant
history_for_gemini = list(session.get('chat_history', []))[:-1]
gemini_history_to_send = prepare_gemini_history(history_for_gemini)
contents = gemini_history_to_send.copy()
current_user_parts = uploaded_file_parts.copy()
if raw_user_text:
current_user_parts.append({"text": raw_user_text})
elif uploaded_filenames and not raw_user_text:
# Si seulement un fichier est envoyé, générer un prompt
current_user_parts.append({"text": f"Décris le contenu de ce(s) fichier(s) : {', '.join(uploaded_filenames)}"})
contents.append({"role": "user", "parts": current_user_parts})
# NEW: Définition des outils à utiliser
tools = [
{"url_context": {}}, # Pour analyser les URLs dans le prompt
{"code_execution": {}}, # Pour exécuter du code Python
]
tools.append({"google_search": {}})
logger.debug(f" [/api/chat]: Outils activés: {[tool.popitem()[0] for tool in tools]}")
# Configuration finale de la requête
generate_config = types.GenerateContentConfig(
system_instruction=SYSTEM_INSTRUCTION,
safety_settings=SAFETY_SETTINGS,
tools=tools, # Intégration des outils
)
# Le "thinking" est activé par défaut sur les modèles 2.5, pas besoin de configurer `thinking_config` pour le mode dynamique.
try:
logger.debug(f"--- LOG [/api/chat]: Envoi de la requête unifiée à {selected_model_name}...")
response = genai_client.models.generate_content(
model=selected_model_name,
contents=contents,
config=generate_config
)
response_text_raw = ""
response_html = ""
try:
if hasattr(response, 'text'):
response_text_raw = response.text
logger.debug(f"--- LOG [/api/chat]: Réponse reçue (début): '{response_text_raw[:100]}...'")
elif hasattr(response, 'prompt_feedback') and response.prompt_feedback:
feedback = response.prompt_feedback
block_reason = getattr(feedback, 'block_reason', 'inconnue')
response_text_raw = f"Désolé, ma réponse a été bloquée (raison : {block_reason})."
else:
response_text_raw = "Désolé, je n'ai pas pu générer de réponse."
response_html = markdown.markdown(response_text_raw, extensions=['fenced_code', 'tables', 'nl2br'])
except Exception as e_resp:
logger.error(f"--- ERREUR [/api/chat]: Erreur lors du traitement de la réponse: {e_resp}")
response_text_raw = f"Désolé, erreur inattendue ({type(e_resp).__name__})."
response_html = markdown.markdown(response_text_raw)
assistant_history_entry = {'role': 'assistant', 'text': response_html, 'raw_text': response_text_raw}
session.setdefault('chat_history', []).append(assistant_history_entry)
if TELEGRAM_ENABLED:
telegram_message = f"🟢 RÉPONSE IA (Session {session_id})\n\n{response_text_raw[:3900]}"
send_telegram_message(telegram_message)
logger.debug("--- LOG [/api/chat]: Envoi de la réponse HTML au client.\n---==================================---\n")
return jsonify({'success': True, 'message': response_html})
except Exception as e:
logger.critical(f"--- ERREUR CRITIQUE [/api/chat]: Échec appel Gemini ou traitement réponse : {e}")
# Retirer le dernier message utilisateur en cas d'échec
current_history = session.get('chat_history')
if isinstance(current_history, list) and current_history and current_history[-1].get('role') == 'user':
current_history.pop()
if TELEGRAM_ENABLED:
error_message = f"🔴 ERREUR (Session {session_id})\n\nPrompt: {prompt[:100]}\n\nErreur: {str(e)}"
send_telegram_message(error_message)
return jsonify({'success': False, 'error': f"Erreur interne: {e}"}), 500
finally:
for filepath in filepaths_to_delete:
if filepath and os.path.exists(filepath):
try:
os.remove(filepath)
logger.debug(f"--- LOG [/api/chat FINALLY]: Fichier temporaire '{filepath}' supprimé.")
except OSError as e_del:
logger.error(f"--- ERREUR [/api/chat FINALLY]: Échec suppression fichier '{filepath}': {e_del}")
@app.route('/clear', methods=['POST'])
def clear_chat():
logger.debug("\n--- DEBUG [/clear]: Requête POST reçue ---")
if TELEGRAM_ENABLED and 'session_id' in session:
end_message = f"⚪ SESSION TERMINÉE (Session {session.get('session_id')})"
send_telegram_message(end_message)
session.clear()
if 'XMLHttpRequest' == request.headers.get('X-Requested-With'):
return jsonify({'success': True, 'message': 'Historique effacé.'})
else:
flash("Conversation effacée.", "info")
return redirect(url_for('root'))
if __name__ == '__main__':
logger.info("--- Démarrage du serveur Flask ---")
port = int(os.environ.get('PORT', 5001))
app.run(debug=True, host='0.0.0.0', port=port)