import os import secrets import time from typing import cast from urllib.parse import urlencode import requests import streamlit as st from gateway_client import delete_profile, ingest_and_rewrite, ingest_memories from llm import chat, set_model from model_config import MODEL_CHOICES, MODEL_TO_PROVIDER, MODEL_DISPLAY_NAMES def _generate_session_name(base: str = "Session") -> str: existing = set(st.session_state.get("session_order", [])) idx = 1 while True: candidate = f"{base} {idx}" if candidate not in existing: return candidate idx += 1 def ensure_session_state() -> None: if "sessions" not in st.session_state: st.session_state.sessions = {} if "session_order" not in st.session_state: st.session_state.session_order = [] if ( "active_session_id" not in st.session_state or st.session_state.active_session_id not in st.session_state.sessions ): default_name = _generate_session_name() st.session_state.sessions.setdefault(default_name, {"history": []}) if default_name not in st.session_state.session_order: st.session_state.session_order.append(default_name) st.session_state.active_session_id = default_name if "session_select" not in st.session_state: st.session_state.session_select = st.session_state.active_session_id if st.session_state.session_select not in st.session_state.sessions: st.session_state.session_select = st.session_state.active_session_id st.session_state.setdefault( "rename_session_name", st.session_state.active_session_id ) st.session_state.setdefault( "rename_session_synced_to", st.session_state.active_session_id ) st.session_state.history = cast( list[dict], st.session_state.sessions[ st.session_state.active_session_id ].setdefault("history", []), ) def create_session(session_name: str | None = None) -> tuple[bool, str]: ensure_session_state() candidate = (session_name or "").strip() if not candidate: candidate = _generate_session_name() if candidate in st.session_state.sessions: return False, candidate st.session_state.sessions[candidate] = {"history": []} st.session_state.session_order.append(candidate) st.session_state.active_session_id = candidate st.session_state.session_select = candidate st.session_state.history = cast( list[dict], st.session_state.sessions[candidate]["history"] ) st.session_state.rename_session_name = candidate st.session_state.rename_session_synced_to = candidate return True, candidate def rename_session(current_name: str, new_name: str) -> bool: ensure_session_state() target = new_name.strip() if not target or target == current_name: return False if target in st.session_state.sessions: return False st.session_state.sessions[target] = st.session_state.sessions.pop(current_name) order = st.session_state.session_order order[order.index(current_name)] = target if st.session_state.active_session_id == current_name: st.session_state.active_session_id = target st.session_state.session_select = target st.session_state.history = cast( list[dict], st.session_state.sessions[st.session_state.active_session_id]["history"], ) st.session_state.rename_session_name = target st.session_state.rename_session_synced_to = target return True def delete_session(session_name: str) -> bool: ensure_session_state() if session_name not in st.session_state.sessions: return False if len(st.session_state.session_order) <= 1: return False st.session_state.sessions.pop(session_name, None) st.session_state.session_order.remove(session_name) if st.session_state.active_session_id == session_name: st.session_state.active_session_id = st.session_state.session_order[-1] st.session_state.session_select = st.session_state.active_session_id st.session_state.rename_session_name = st.session_state.active_session_id st.session_state.rename_session_synced_to = st.session_state.active_session_id st.session_state.history = cast( list[dict], st.session_state.sessions[st.session_state.active_session_id]["history"], ) return True def rewrite_message( msg: str, persona_name: str, show_rationale: bool, use_memory: bool = True ) -> str: # If memory is disabled or Control persona, don't use memory if not use_memory or persona_name.lower() == "control": rewritten_msg = msg if show_rationale: rewritten_msg += " At the beginning of your response, please say the following in ITALIC: 'Persona Rationale: No personalization applied.'. Begin your answer on the next line." return rewritten_msg try: rewritten_msg = ingest_and_rewrite( user_id=persona_name, query=msg ) if show_rationale: rewritten_msg += " At the beginning of your response, please say the following in ITALIC: 'Persona Rationale: ' followed by 1 sentence about how your reasoning for how the persona traits influenced this response, also in italics. Begin your answer on the next line." except Exception as e: st.error(f"Failed to ingest_and_append message: {e}") raise print(rewritten_msg) return rewritten_msg # ────────────────────────────────────────────────────────────── # Page setup & CSS # ────────────────────────────────────────────────────────────── st.set_page_config(page_title="MemMachine Chatbot", layout="wide") try: with open("./styles.css") as f: st.markdown(f"", unsafe_allow_html=True) except FileNotFoundError: pass ensure_session_state() HEADER_STYLE = """ """ HEADER_HTML = """
""" st.markdown(HEADER_STYLE, unsafe_allow_html=True) st.markdown(HEADER_HTML, unsafe_allow_html=True) # ────────────────────────────────────────────────────────────── # Sidebar # ────────────────────────────────────────────────────────────── default_model = MODEL_CHOICES[0] if MODEL_CHOICES else "gpt-4.1-mini" model_id = default_model provider = MODEL_TO_PROVIDER.get(default_model, "openai") selected_persona = "Charlie" persona_name = "Charlie" skip_rewrite = False compare_personas = False show_rationale = False with st.sidebar: st.markdown("#### Sessions") session_options = st.session_state.session_order active_session = st.session_state.active_session_id if st.session_state.rename_session_synced_to != active_session: st.session_state.rename_session_name = active_session st.session_state.rename_session_synced_to = active_session for idx, session_name in enumerate(session_options, start=1): is_active = session_name == active_session button_label = f"{session_name}" row = st.container() with row: button_col, menu_col = st.columns([0.8, 0.2]) with button_col: if st.button( button_label, key=f"session_button_{session_name}", use_container_width=True, type="primary" if is_active else "secondary", ): if not is_active: st.session_state.active_session_id = session_name st.session_state.session_select = session_name st.session_state.history = cast( list[dict], st.session_state.sessions[session_name]["history"], ) st.session_state.rename_session_name = session_name st.session_state.rename_session_synced_to = session_name st.rerun() with menu_col: if hasattr(st, "popover"): menu_container = st.popover("⋯", use_container_width=True) else: menu_container = st.expander( "⋯", expanded=False, key=f"session_actions_{session_name}" ) with menu_container: st.markdown(f"**Actions for {session_name}**") rename_value = st.text_input( "Rename session", value=session_name, key=f"rename_session_input_{session_name}", ) if st.button( "Rename", use_container_width=True, key=f"rename_session_button_{session_name}", ): rename_target = rename_value.strip() if not rename_target: st.warning("Enter a session name to rename.") elif rename_target == session_name: st.info("Session name unchanged.") elif rename_target in st.session_state.sessions: st.warning(f"Session '{rename_target}' already exists.") elif rename_session(session_name, rename_target): st.success(f"Session renamed to '{rename_target}'.") st.rerun() else: st.error("Unable to rename session. Please try again.") st.divider() if st.button( "Delete session", use_container_width=True, type="secondary", key=f"delete_session_button_{session_name}", ): if delete_session(session_name): new_active = st.session_state.active_session_id st.session_state.session_select = new_active st.session_state.rename_session_name = new_active st.session_state.rename_session_synced_to = new_active st.success(f"Session '{session_name}' deleted.") st.rerun() else: st.warning("Cannot delete the last remaining session.") with st.form("create_session_form", clear_on_submit=True): new_session_name = st.text_input( "New session name", key="create_session_name", placeholder="Leave blank for automatic name", ) if st.form_submit_button("Create session", use_container_width=True): success, created_name = create_session(new_session_name) if success: st.success(f"Session '{created_name}' created.") st.rerun() else: st.warning(f"Session '{created_name}' already exists.") st.divider() st.markdown("#### Choose Model") # Create display options with categories display_options = [MODEL_DISPLAY_NAMES[model] for model in MODEL_CHOICES] selected_display = st.selectbox( "Choose Model", display_options, index=0, label_visibility="collapsed" ) # Get the actual model ID from the display name model_id = next(model for model, display in MODEL_DISPLAY_NAMES.items() if display == selected_display) provider = MODEL_TO_PROVIDER[model_id] set_model(model_id) st.markdown("#### User Identity") # Get Hugging Face user ID if available (in HF Spaces) hf_user_id = os.getenv("SPACE_USER") or os.getenv("HF_USERNAME") or os.getenv("HF_USER") # Check if we're on Hugging Face Spaces (not local) is_hf_space = os.getenv("SPACE_ID") is not None or os.getenv("HF_ENDPOINT") is not None def validate_hf_token(token: str) -> tuple[bool, str, str]: """Validate HF token and return (is_valid, username, error_message).""" token = token.strip() if not token: return False, "", "Token cannot be empty" # Remove any whitespace or newlines that might have been copied token = "".join(token.split()) # Try using huggingface_hub library if available, otherwise fall back to API try: from huggingface_hub import whoami try: user_info = whoami(token=token) username = user_info.get("name") or user_info.get("username") or "" if username: return True, username, "" else: return False, "", "Token validated but username not found in response." except Exception as e: error_msg = str(e) if "401" in error_msg or "Unauthorized" in error_msg or "Invalid" in error_msg: return False, "", f"Invalid token. Please verify your token is correct and has Read permissions. Error: {error_msg[:100]}" return False, "", f"Validation error: {error_msg[:150]}" except ImportError: # Fall back to direct API call if huggingface_hub not available pass # Fallback: Use the HF whoami endpoint directly endpoint = "https://huggingface.co/api/whoami" headers = { "Authorization": f"Bearer {token}", "User-Agent": "MemMachine-Playground/1.0" } try: resp = requests.get(endpoint, headers=headers, timeout=10) if resp.status_code == 200: user_data = resp.json() # Try different possible username fields username = ( user_data.get("name") or user_data.get("username") or user_data.get("user") or "" ) if username: return True, username, "" else: return False, "", f"Token validated but username not found. Response: {str(user_data)[:100]}" elif resp.status_code == 401: error_detail = "" try: error_data = resp.json() error_detail = error_data.get("error", "") except: pass return False, "", f"Invalid token (401). The token may be expired, revoked, or incorrect. {error_detail} Please create a new Read token at https://huggingface.co/settings/tokens" elif resp.status_code == 403: return False, "", f"Token access denied (403). Please ensure your token has Read permissions." else: error_text = "" try: error_data = resp.json() error_text = error_data.get("error", resp.text[:100]) except: error_text = resp.text[:100] if hasattr(resp, 'text') else f"Status {resp.status_code}" return False, "", f"Authentication failed (Status {resp.status_code}): {error_text}" except requests.exceptions.Timeout: return False, "", "Request timed out. Please check your internet connection and try again." except requests.exceptions.RequestException as e: return False, "", f"Network error: {str(e)}. Please try again." except Exception as e: return False, "", f"Validation error: {str(e)}. Please try again." if is_hf_space: # On HF Spaces - require token authentication for security if "hf_authenticated_user" not in st.session_state: st.warning("🔐 **Authentication Required**") st.caption("To protect your memories, please authenticate with your Hugging Face account.") token_input = st.text_input( "Enter your Hugging Face Access Token", key="hf_token_input", type="password", placeholder="hf_xxxxxxxxxxxxxxxxxxxxx", help="❓ Create a Read token: https://huggingface.co/settings/tokens" ) if st.button("Authenticate", use_container_width=True, type="primary"): if token_input.strip(): with st.spinner("Validating token..."): is_valid, username, error_msg = validate_hf_token(token_input.strip()) if is_valid and username: st.session_state.hf_authenticated_user = username st.session_state.hf_token = token_input.strip() # Store for future use # Use custom purple styling instead of green success message st.markdown(f"""