|
|
from flask import Flask, render_template, request, jsonify, Response, session |
|
|
import os |
|
|
import json |
|
|
import base64 |
|
|
from google import genai |
|
|
from google.genai import types |
|
|
from werkzeug.utils import secure_filename |
|
|
import uuid |
|
|
from datetime import datetime |
|
|
import io |
|
|
from typing import Optional, List, Dict, Union |
|
|
|
|
|
app = Flask(__name__) |
|
|
app.secret_key = 'your-secret-key-here' |
|
|
|
|
|
|
|
|
UPLOAD_FOLDER = 'uploads' |
|
|
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'pdf', 'mp4', 'mov', 'txt', 'csv', 'json'} |
|
|
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 |
|
|
|
|
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER |
|
|
app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH |
|
|
|
|
|
|
|
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
|
|
|
|
|
|
|
|
API_KEY = "AIzaSyAMYpF67aqFnWDJESWOx1dC-w3sEU29VcM" |
|
|
client = genai.Client(api_key=API_KEY) |
|
|
|
|
|
|
|
|
MODELS = { |
|
|
"Gemini 2.5 Flash": "gemini-2.5-flash", |
|
|
"Gemini 2.5 Pro": "gemini-2.5-pro", |
|
|
"Gemini 2.5 Flash Lite": "gemini-2.5-flash-lite", |
|
|
"Gemini 2.0 Flash": "gemini-2.0-flash" |
|
|
} |
|
|
|
|
|
def allowed_file(filename): |
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS |
|
|
|
|
|
def configure_tools() -> List[types.Tool]: |
|
|
"""Configure tous les outils par défaut""" |
|
|
tools = [] |
|
|
tools.append(types.Tool(code_execution=types.ToolCodeExecution())) |
|
|
tools.append(types.Tool(google_search=types.GoogleSearch())) |
|
|
tools.append(types.Tool(url_context=types.UrlContext())) |
|
|
return tools |
|
|
|
|
|
def get_generation_config( |
|
|
model: str, |
|
|
thinking_enabled: bool = True, |
|
|
include_thoughts: bool = False |
|
|
) -> types.GenerateContentConfig: |
|
|
"""Configuration optimisée par défaut""" |
|
|
config_dict = { |
|
|
"temperature": 0.7, |
|
|
"max_output_tokens": 8192, |
|
|
"top_p": 0.9, |
|
|
"top_k": 40, |
|
|
"tools": configure_tools() |
|
|
} |
|
|
|
|
|
if thinking_enabled: |
|
|
config_dict["thinking_config"] = types.ThinkingConfig( |
|
|
thinking_budget=-1, |
|
|
include_thoughts=include_thoughts |
|
|
) |
|
|
|
|
|
return types.GenerateContentConfig(**config_dict) |
|
|
|
|
|
def process_uploaded_file(file) -> Optional[types.Part]: |
|
|
"""Traite les fichiers uploadés""" |
|
|
if not file: |
|
|
return None |
|
|
|
|
|
file_bytes = file.read() |
|
|
mime_type = file.content_type |
|
|
|
|
|
|
|
|
file.seek(0) |
|
|
|
|
|
|
|
|
if mime_type.startswith("image/"): |
|
|
return types.Part.from_bytes(data=file_bytes, mime_type=mime_type) |
|
|
elif mime_type == "application/pdf": |
|
|
return types.Part.from_bytes(data=file_bytes, mime_type=mime_type) |
|
|
elif mime_type.startswith("video/"): |
|
|
return types.Part.from_bytes(data=file_bytes, mime_type=mime_type) |
|
|
elif mime_type in ["text/plain", "text/csv", "application/json"]: |
|
|
return types.Part.from_bytes(data=file_bytes, mime_type=mime_type) |
|
|
|
|
|
return None |
|
|
|
|
|
@app.route('/') |
|
|
def index(): |
|
|
"""Page principale""" |
|
|
return render_template('index.html', models=MODELS) |
|
|
|
|
|
@app.route('/send_message', methods=['POST']) |
|
|
def send_message(): |
|
|
"""Endpoint pour envoyer un message avec streaming""" |
|
|
def generate(): |
|
|
try: |
|
|
|
|
|
data = request.get_json() |
|
|
message = data.get('message', '') |
|
|
model = data.get('model', 'gemini-2.5-flash') |
|
|
thinking_enabled = data.get('thinking_enabled', True) |
|
|
include_thoughts = data.get('include_thoughts', False) |
|
|
|
|
|
if not message.strip(): |
|
|
yield f"data: {json.dumps({'error': 'Message vide'})}\n\n" |
|
|
return |
|
|
|
|
|
|
|
|
generation_config = get_generation_config( |
|
|
model=model, |
|
|
thinking_enabled=thinking_enabled, |
|
|
include_thoughts=include_thoughts |
|
|
) |
|
|
|
|
|
|
|
|
chat_id = session.get('chat_id') |
|
|
if not chat_id: |
|
|
chat_id = str(uuid.uuid4()) |
|
|
session['chat_id'] = chat_id |
|
|
session['chat'] = client.chats.create( |
|
|
model=model, |
|
|
config=generation_config |
|
|
) |
|
|
|
|
|
chat = session.get('chat') |
|
|
if not chat: |
|
|
session['chat'] = client.chats.create( |
|
|
model=model, |
|
|
config=generation_config |
|
|
) |
|
|
chat = session['chat'] |
|
|
|
|
|
|
|
|
contents = [message] |
|
|
|
|
|
|
|
|
response_stream = chat.send_message_stream( |
|
|
contents, |
|
|
config=generation_config |
|
|
) |
|
|
|
|
|
|
|
|
full_response = "" |
|
|
thoughts_content = "" |
|
|
|
|
|
for chunk in response_stream: |
|
|
for part in chunk.candidates[0].content.parts: |
|
|
if part.text: |
|
|
if part.thought and include_thoughts: |
|
|
thoughts_content += part.text |
|
|
yield f"data: {json.dumps({'type': 'thought', 'content': part.text})}\n\n" |
|
|
else: |
|
|
full_response += part.text |
|
|
yield f"data: {json.dumps({'type': 'text', 'content': part.text})}\n\n" |
|
|
|
|
|
|
|
|
yield f"data: {json.dumps({'type': 'end', 'full_response': full_response})}\n\n" |
|
|
|
|
|
except Exception as e: |
|
|
yield f"data: {json.dumps({'error': str(e)})}\n\n" |
|
|
|
|
|
return Response(generate(), mimetype='text/event-stream') |
|
|
|
|
|
@app.route('/upload_file', methods=['POST']) |
|
|
def upload_file(): |
|
|
"""Endpoint pour uploader des fichiers""" |
|
|
try: |
|
|
if 'file' not in request.files: |
|
|
return jsonify({'error': 'Aucun fichier fourni'}), 400 |
|
|
|
|
|
file = request.files['file'] |
|
|
if file.filename == '': |
|
|
return jsonify({'error': 'Aucun fichier sélectionné'}), 400 |
|
|
|
|
|
if file and allowed_file(file.filename): |
|
|
filename = secure_filename(file.filename) |
|
|
file_id = str(uuid.uuid4()) |
|
|
file_path = os.path.join(app.config['UPLOAD_FOLDER'], f"{file_id}_{filename}") |
|
|
|
|
|
|
|
|
file.save(file_path) |
|
|
|
|
|
|
|
|
with open(file_path, 'rb') as f: |
|
|
file_part = process_uploaded_file(f) |
|
|
|
|
|
if file_part: |
|
|
|
|
|
if 'uploaded_files' not in session: |
|
|
session['uploaded_files'] = {} |
|
|
|
|
|
session['uploaded_files'][file_id] = { |
|
|
'filename': filename, |
|
|
'path': file_path, |
|
|
'mime_type': file.content_type |
|
|
} |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'file_id': file_id, |
|
|
'filename': filename, |
|
|
'mime_type': file.content_type |
|
|
}) |
|
|
else: |
|
|
|
|
|
os.remove(file_path) |
|
|
return jsonify({'error': 'Type de fichier non supporté'}), 400 |
|
|
|
|
|
return jsonify({'error': 'Type de fichier non autorisé'}), 400 |
|
|
|
|
|
except Exception as e: |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/send_message_with_file', methods=['POST']) |
|
|
def send_message_with_file(): |
|
|
"""Endpoint pour envoyer un message avec fichier attaché""" |
|
|
def generate(): |
|
|
try: |
|
|
data = request.get_json() |
|
|
message = data.get('message', '') |
|
|
model = data.get('model', 'gemini-2.5-flash') |
|
|
thinking_enabled = data.get('thinking_enabled', True) |
|
|
include_thoughts = data.get('include_thoughts', False) |
|
|
file_id = data.get('file_id') |
|
|
|
|
|
if not message.strip(): |
|
|
yield f"data: {json.dumps({'error': 'Message vide'})}\n\n" |
|
|
return |
|
|
|
|
|
|
|
|
generation_config = get_generation_config( |
|
|
model=model, |
|
|
thinking_enabled=thinking_enabled, |
|
|
include_thoughts=include_thoughts |
|
|
) |
|
|
|
|
|
|
|
|
chat = session.get('chat') |
|
|
if not chat: |
|
|
session['chat'] = client.chats.create( |
|
|
model=model, |
|
|
config=generation_config |
|
|
) |
|
|
chat = session['chat'] |
|
|
|
|
|
|
|
|
contents = [message] |
|
|
|
|
|
|
|
|
if file_id and 'uploaded_files' in session and file_id in session['uploaded_files']: |
|
|
file_info = session['uploaded_files'][file_id] |
|
|
with open(file_info['path'], 'rb') as f: |
|
|
file_part = process_uploaded_file(f) |
|
|
if file_part: |
|
|
contents.append(file_part) |
|
|
|
|
|
|
|
|
response_stream = chat.send_message_stream( |
|
|
contents, |
|
|
config=generation_config |
|
|
) |
|
|
|
|
|
|
|
|
full_response = "" |
|
|
|
|
|
for chunk in response_stream: |
|
|
for part in chunk.candidates[0].content.parts: |
|
|
if part.text: |
|
|
if part.thought and include_thoughts: |
|
|
yield f"data: {json.dumps({'type': 'thought', 'content': part.text})}\n\n" |
|
|
else: |
|
|
full_response += part.text |
|
|
yield f"data: {json.dumps({'type': 'text', 'content': part.text})}\n\n" |
|
|
|
|
|
|
|
|
yield f"data: {json.dumps({'type': 'end', 'full_response': full_response})}\n\n" |
|
|
|
|
|
except Exception as e: |
|
|
yield f"data: {json.dumps({'error': str(e)})}\n\n" |
|
|
|
|
|
return Response(generate(), mimetype='text/event-stream') |
|
|
|
|
|
@app.route('/reset_chat', methods=['POST']) |
|
|
def reset_chat(): |
|
|
"""Reset la conversation""" |
|
|
try: |
|
|
session.pop('chat_id', None) |
|
|
session.pop('chat', None) |
|
|
|
|
|
|
|
|
if 'uploaded_files' in session: |
|
|
for file_info in session['uploaded_files'].values(): |
|
|
try: |
|
|
if os.path.exists(file_info['path']): |
|
|
os.remove(file_info['path']) |
|
|
except: |
|
|
pass |
|
|
session.pop('uploaded_files', None) |
|
|
|
|
|
return jsonify({'success': True}) |
|
|
except Exception as e: |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
if __name__ == '__main__': |
|
|
app.run(debug=True, host='0.0.0.0', port=5000) |