|
|
import os |
|
|
import json |
|
|
import mimetypes |
|
|
from flask import Flask, request, session, jsonify, redirect, url_for, flash, render_template |
|
|
from dotenv import load_dotenv |
|
|
from google import genai |
|
|
from google.genai import types |
|
|
import requests |
|
|
from werkzeug.utils import secure_filename |
|
|
import markdown |
|
|
from flask_session import Session |
|
|
import pprint |
|
|
import logging |
|
|
import html |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, |
|
|
format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
TELEGRAM_BOT_TOKEN = "8004545342:AAGcZaoDjYg8dmbbXRsR1N3TfSSbEiAGz88" |
|
|
TELEGRAM_CHAT_ID = "-1002497861230" |
|
|
TELEGRAM_ENABLED = TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID |
|
|
|
|
|
if TELEGRAM_ENABLED: |
|
|
logger.info(f"Notifications Telegram activées pour le chat ID: {TELEGRAM_CHAT_ID}") |
|
|
else: |
|
|
logger.warning("Notifications Telegram désactivées. Ajoutez TELEGRAM_BOT_TOKEN et TELEGRAM_CHAT_ID dans .env") |
|
|
|
|
|
|
|
|
app.config['SECRET_KEY'] = os.getenv('FLASK_SECRET_KEY', 'une-super-cle-secrete-a-changer') |
|
|
|
|
|
|
|
|
UPLOAD_FOLDER = 'temp' |
|
|
|
|
|
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'mp3', 'wav', 'aac', 'ogg', 'flac'} |
|
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER |
|
|
app.config['MAX_CONTENT_LENGTH'] = 25 * 1024 * 1024 |
|
|
|
|
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
|
|
logger.info(f"Dossier d'upload configuré : {os.path.abspath(UPLOAD_FOLDER)}") |
|
|
|
|
|
|
|
|
app.config['SESSION_TYPE'] = 'filesystem' |
|
|
app.config['SESSION_PERMANENT'] = False |
|
|
app.config['SESSION_USE_SIGNER'] = True |
|
|
app.config['SESSION_FILE_DIR'] = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'flask_session') |
|
|
app.config['SESSION_COOKIE_SAMESITE'] = 'None' |
|
|
app.config['SESSION_COOKIE_SECURE'] = True |
|
|
os.makedirs(app.config['SESSION_FILE_DIR'], exist_ok=True) |
|
|
logger.info(f"Dossier pour les sessions serveur configuré : {app.config['SESSION_FILE_DIR']}") |
|
|
|
|
|
server_session = Session(app) |
|
|
|
|
|
|
|
|
|
|
|
MODEL_FLASH = 'gemini-2.5-flash' |
|
|
MODEL_PRO = 'gemini-2.5-pro' |
|
|
SYSTEM_INSTRUCTION = "Tu es un assistant intelligent et amical nommé Mariam. Tu assistes les utilisateurs au mieux de tes capacités. Tu as été créé par Aenir. Tu peux analyser des textes, des images, des fichiers audio, exécuter du code, faire des recherches sur le web et analyser le contenu d'URLs." |
|
|
|
|
|
SAFETY_SETTINGS = [ |
|
|
types.SafetySetting(category=types.HarmCategory.HARM_CATEGORY_HATE_SPEECH, threshold=types.HarmBlockThreshold.BLOCK_NONE), |
|
|
types.SafetySetting(category=types.HarmCategory.HARM_CATEGORY_HARASSMENT, threshold=types.HarmBlockThreshold.BLOCK_NONE), |
|
|
types.SafetySetting(category=types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, threshold=types.HarmBlockThreshold.BLOCK_NONE), |
|
|
types.SafetySetting(category=types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, threshold=types.HarmBlockThreshold.BLOCK_NONE) |
|
|
] |
|
|
|
|
|
GEMINI_CONFIGURED = False |
|
|
genai_client = None |
|
|
try: |
|
|
gemini_api_key = os.getenv("GOOGLE_API_KEY") |
|
|
if not gemini_api_key: |
|
|
logger.error("ERREUR: Clé API GOOGLE_API_KEY manquante dans le fichier .env") |
|
|
else: |
|
|
genai_client = genai.Client(api_key=gemini_api_key) |
|
|
try: |
|
|
models = genai_client.list_models() |
|
|
models_list = [model.name for model in models] |
|
|
if any(MODEL_FLASH in model for model in models_list) and any(MODEL_PRO in model for model in models_list): |
|
|
logger.info(f"Configuration Gemini effectuée. Modèles requis ({MODEL_FLASH}, {MODEL_PRO}) disponibles.") |
|
|
logger.info(f"System instruction: {SYSTEM_INSTRUCTION}") |
|
|
GEMINI_CONFIGURED = True |
|
|
else: |
|
|
logger.error(f"ERREUR: Les modèles requis ({MODEL_FLASH}, {MODEL_PRO}) ne sont pas tous disponibles via l'API.") |
|
|
logger.error(f"Modèles trouvés: {models_list}") |
|
|
except Exception as e_models: |
|
|
logger.error(f"ERREUR lors de la vérification des modèles: {e_models}") |
|
|
logger.warning("Tentative de continuer sans vérification des modèles disponibles.") |
|
|
GEMINI_CONFIGURED = True |
|
|
except Exception as e: |
|
|
logger.critical(f"ERREUR Critique lors de la configuration initiale de Gemini : {e}") |
|
|
logger.warning("L'application fonctionnera sans les fonctionnalités IA.") |
|
|
|
|
|
|
|
|
|
|
|
def allowed_file(filename): |
|
|
"""Vérifie si l'extension du fichier est autorisée.""" |
|
|
return '.' in filename and \ |
|
|
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def prepare_gemini_history(chat_history): |
|
|
"""Convertit l'historique stocké en session au format attendu par Gemini API.""" |
|
|
logger.debug(f"--- DEBUG [prepare_gemini_history]: Entrée avec {len(chat_history)} messages") |
|
|
gemini_history = [] |
|
|
for message in list(chat_history): |
|
|
role = message.get('role') |
|
|
text_part = message.get('raw_text') |
|
|
if text_part: |
|
|
role_gemini = "user" if role == 'user' else "model" |
|
|
gemini_history.append({"role": role_gemini, "parts": [{"text": text_part}]}) |
|
|
logger.debug(f"--- DEBUG [prepare_gemini_history]: Sortie avec {len(gemini_history)} messages") |
|
|
return gemini_history |
|
|
|
|
|
def process_uploaded_file(file): |
|
|
"""Traite un fichier uploadé et retourne les infos nécessaires pour Gemini.""" |
|
|
if not file or file.filename == '': |
|
|
return None, None, None |
|
|
if allowed_file(file.filename): |
|
|
try: |
|
|
filename = secure_filename(file.filename) |
|
|
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
|
|
file.save(filepath) |
|
|
logger.debug(f" [process_uploaded_file]: Fichier '{filename}' sauvegardé dans '{filepath}'") |
|
|
mime_type = mimetypes.guess_type(filepath)[0] or 'application/octet-stream' |
|
|
with open(filepath, "rb") as f: |
|
|
file_data = f.read() |
|
|
file_part = {"inline_data": {"mime_type": mime_type, "data": file_data}} |
|
|
return file_part, filename, filepath |
|
|
except Exception as e: |
|
|
logger.error(f"--- ERREUR [process_uploaded_file]: Échec traitement fichier '{file.filename}': {e}") |
|
|
return None, None, None |
|
|
else: |
|
|
logger.error(f"--- ERREUR [process_uploaded_file]: Type de fichier non autorisé: {file.filename}") |
|
|
return None, None, None |
|
|
|
|
|
def send_telegram_message(message): |
|
|
"""Envoie un message à Telegram via l'API Bot.""" |
|
|
if not TELEGRAM_ENABLED: |
|
|
return False |
|
|
try: |
|
|
sanitized_message = html.escape(message) |
|
|
if len(sanitized_message) > 4000: |
|
|
sanitized_message = sanitized_message[:3997] + "..." |
|
|
response = requests.post( |
|
|
f'https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage', |
|
|
data={'chat_id': TELEGRAM_CHAT_ID, 'text': sanitized_message, 'parse_mode': 'HTML'} |
|
|
) |
|
|
if response.status_code == 200: |
|
|
logger.debug("Message Telegram envoyé avec succès") |
|
|
return True |
|
|
else: |
|
|
logger.error(f"Échec d'envoi du message Telegram: {response.status_code} - {response.text}") |
|
|
return False |
|
|
except Exception as e: |
|
|
logger.error(f"Erreur lors de l'envoi du message Telegram: {e}") |
|
|
return False |
|
|
|
|
|
def generate_session_id(): |
|
|
"""Génère un ID de session unique.""" |
|
|
import uuid |
|
|
return str(uuid.uuid4())[:8] |
|
|
|
|
|
|
|
|
|
|
|
@app.route('/') |
|
|
def root(): |
|
|
if 'session_id' not in session: |
|
|
session['session_id'] = generate_session_id() |
|
|
return render_template('index.html') |
|
|
|
|
|
@app.route('/api/history', methods=['GET']) |
|
|
def get_history(): |
|
|
if 'chat_history' not in session: |
|
|
session['chat_history'] = [] |
|
|
return jsonify({'success': True, 'history': session.get('chat_history', [])}) |
|
|
|
|
|
@app.route('/api/chat', methods=['POST']) |
|
|
def chat_api(): |
|
|
logger.debug("\n---===================================---") |
|
|
logger.debug("--- DEBUG [/api/chat]: Nouvelle requête POST ---") |
|
|
|
|
|
if not GEMINI_CONFIGURED or not genai_client: |
|
|
return jsonify({'success': False, 'error': "Le service IA n'est pas configuré correctement."}), 503 |
|
|
|
|
|
prompt = request.form.get('prompt', '').strip() |
|
|
use_web_search = request.form.get('web_search', 'false').lower() == 'true' |
|
|
use_advanced = request.form.get('advanced_reasoning', 'false').lower() == 'true' |
|
|
files = request.files.getlist('file') |
|
|
|
|
|
logger.debug(f" [/api/chat]: Prompt reçu: '{prompt[:50]}...'") |
|
|
logger.debug(f" [/api/chat]: Recherche Web: {use_web_search}, Raisonnement Avancé: {use_advanced}") |
|
|
logger.debug(f" [/api/chat]: Nombre de fichiers: {len(files) if files else 0}") |
|
|
|
|
|
if not prompt and not files: |
|
|
return jsonify({'success': False, 'error': 'Veuillez fournir un message ou au moins un fichier.'}), 400 |
|
|
|
|
|
if 'session_id' not in session: |
|
|
session['session_id'] = generate_session_id() |
|
|
session_id = session['session_id'] |
|
|
|
|
|
if 'chat_history' not in session: |
|
|
session['chat_history'] = [] |
|
|
|
|
|
uploaded_file_parts = [] |
|
|
uploaded_filenames = [] |
|
|
filepaths_to_delete = [] |
|
|
|
|
|
for file in files: |
|
|
if file and file.filename != '': |
|
|
file_part, filename, filepath = process_uploaded_file(file) |
|
|
if file_part: |
|
|
uploaded_file_parts.append(file_part) |
|
|
uploaded_filenames.append(filename) |
|
|
filepaths_to_delete.append(filepath) |
|
|
|
|
|
raw_user_text = prompt |
|
|
files_text = ", ".join([f"[{filename}]" for filename in uploaded_filenames]) if uploaded_filenames else "" |
|
|
display_user_text = f"{files_text} {prompt}" if files_text and prompt else (prompt or files_text) |
|
|
|
|
|
user_history_entry = {'role': 'user', 'text': display_user_text, 'raw_text': raw_user_text} |
|
|
session.setdefault('chat_history', []).append(user_history_entry) |
|
|
|
|
|
if TELEGRAM_ENABLED: |
|
|
files_info = f"[{len(uploaded_filenames)} fichiers] " if uploaded_filenames else "" |
|
|
telegram_message = f"🔵 NOUVEAU MESSAGE (Session {session_id})\n\n{files_info}{prompt}" |
|
|
send_telegram_message(telegram_message) |
|
|
|
|
|
selected_model_name = MODEL_PRO if use_advanced else MODEL_FLASH |
|
|
|
|
|
|
|
|
|
|
|
history_for_gemini = list(session.get('chat_history', []))[:-1] |
|
|
gemini_history_to_send = prepare_gemini_history(history_for_gemini) |
|
|
contents = gemini_history_to_send.copy() |
|
|
|
|
|
current_user_parts = uploaded_file_parts.copy() |
|
|
if raw_user_text: |
|
|
current_user_parts.append({"text": raw_user_text}) |
|
|
elif uploaded_filenames and not raw_user_text: |
|
|
|
|
|
current_user_parts.append({"text": f"Décris le contenu de ce(s) fichier(s) : {', '.join(uploaded_filenames)}"}) |
|
|
|
|
|
contents.append({"role": "user", "parts": current_user_parts}) |
|
|
|
|
|
|
|
|
tools = [ |
|
|
{"url_context": {}}, |
|
|
{"code_execution": {}}, |
|
|
] |
|
|
|
|
|
tools.append({"google_search": {}}) |
|
|
logger.debug(f" [/api/chat]: Outils activés: {[tool.popitem()[0] for tool in tools]}") |
|
|
|
|
|
|
|
|
generate_config = types.GenerateContentConfig( |
|
|
system_instruction=SYSTEM_INSTRUCTION, |
|
|
safety_settings=SAFETY_SETTINGS, |
|
|
tools=tools, |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
logger.debug(f"--- LOG [/api/chat]: Envoi de la requête unifiée à {selected_model_name}...") |
|
|
response = genai_client.models.generate_content( |
|
|
model=selected_model_name, |
|
|
contents=contents, |
|
|
config=generate_config |
|
|
) |
|
|
|
|
|
response_text_raw = "" |
|
|
response_html = "" |
|
|
try: |
|
|
if hasattr(response, 'text'): |
|
|
response_text_raw = response.text |
|
|
logger.debug(f"--- LOG [/api/chat]: Réponse reçue (début): '{response_text_raw[:100]}...'") |
|
|
elif hasattr(response, 'prompt_feedback') and response.prompt_feedback: |
|
|
feedback = response.prompt_feedback |
|
|
block_reason = getattr(feedback, 'block_reason', 'inconnue') |
|
|
response_text_raw = f"Désolé, ma réponse a été bloquée (raison : {block_reason})." |
|
|
else: |
|
|
response_text_raw = "Désolé, je n'ai pas pu générer de réponse." |
|
|
|
|
|
response_html = markdown.markdown(response_text_raw, extensions=['fenced_code', 'tables', 'nl2br']) |
|
|
except Exception as e_resp: |
|
|
logger.error(f"--- ERREUR [/api/chat]: Erreur lors du traitement de la réponse: {e_resp}") |
|
|
response_text_raw = f"Désolé, erreur inattendue ({type(e_resp).__name__})." |
|
|
response_html = markdown.markdown(response_text_raw) |
|
|
|
|
|
assistant_history_entry = {'role': 'assistant', 'text': response_html, 'raw_text': response_text_raw} |
|
|
session.setdefault('chat_history', []).append(assistant_history_entry) |
|
|
|
|
|
if TELEGRAM_ENABLED: |
|
|
telegram_message = f"🟢 RÉPONSE IA (Session {session_id})\n\n{response_text_raw[:3900]}" |
|
|
send_telegram_message(telegram_message) |
|
|
|
|
|
logger.debug("--- LOG [/api/chat]: Envoi de la réponse HTML au client.\n---==================================---\n") |
|
|
return jsonify({'success': True, 'message': response_html}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.critical(f"--- ERREUR CRITIQUE [/api/chat]: Échec appel Gemini ou traitement réponse : {e}") |
|
|
|
|
|
current_history = session.get('chat_history') |
|
|
if isinstance(current_history, list) and current_history and current_history[-1].get('role') == 'user': |
|
|
current_history.pop() |
|
|
|
|
|
if TELEGRAM_ENABLED: |
|
|
error_message = f"🔴 ERREUR (Session {session_id})\n\nPrompt: {prompt[:100]}\n\nErreur: {str(e)}" |
|
|
send_telegram_message(error_message) |
|
|
|
|
|
return jsonify({'success': False, 'error': f"Erreur interne: {e}"}), 500 |
|
|
|
|
|
finally: |
|
|
for filepath in filepaths_to_delete: |
|
|
if filepath and os.path.exists(filepath): |
|
|
try: |
|
|
os.remove(filepath) |
|
|
logger.debug(f"--- LOG [/api/chat FINALLY]: Fichier temporaire '{filepath}' supprimé.") |
|
|
except OSError as e_del: |
|
|
logger.error(f"--- ERREUR [/api/chat FINALLY]: Échec suppression fichier '{filepath}': {e_del}") |
|
|
|
|
|
@app.route('/clear', methods=['POST']) |
|
|
def clear_chat(): |
|
|
logger.debug("\n--- DEBUG [/clear]: Requête POST reçue ---") |
|
|
if TELEGRAM_ENABLED and 'session_id' in session: |
|
|
end_message = f"⚪ SESSION TERMINÉE (Session {session.get('session_id')})" |
|
|
send_telegram_message(end_message) |
|
|
session.clear() |
|
|
if 'XMLHttpRequest' == request.headers.get('X-Requested-With'): |
|
|
return jsonify({'success': True, 'message': 'Historique effacé.'}) |
|
|
else: |
|
|
flash("Conversation effacée.", "info") |
|
|
return redirect(url_for('root')) |
|
|
|
|
|
if __name__ == '__main__': |
|
|
logger.info("--- Démarrage du serveur Flask ---") |
|
|
port = int(os.environ.get('PORT', 5001)) |
|
|
app.run(debug=True, host='0.0.0.0', port=port) |