reachy_mini_conversation_app / tests /test_openai_realtime.py
dlouapre's picture
dlouapre HF Staff
Ruff
fd16944
import asyncio
import logging
from typing import Any
from datetime import datetime, timezone
from unittest.mock import MagicMock
import pytest
import reachy_mini_conversation_app.openai_realtime as rt_mod
from reachy_mini_conversation_app.openai_realtime import OpenaiRealtimeHandler
from reachy_mini_conversation_app.tools.core_tools import ToolDependencies
def _build_handler(loop: asyncio.AbstractEventLoop) -> OpenaiRealtimeHandler:
asyncio.set_event_loop(loop)
deps = ToolDependencies(reachy_mini=MagicMock(), movement_manager=MagicMock())
return OpenaiRealtimeHandler(deps)
def test_format_timestamp_uses_wall_clock() -> None:
"""Test that format_timestamp uses wall clock time."""
loop = asyncio.new_event_loop()
try:
print("Testing format_timestamp...")
handler = _build_handler(loop)
formatted = handler.format_timestamp()
print(f"Formatted timestamp: {formatted}")
finally:
asyncio.set_event_loop(None)
loop.close()
# Extract year from "[YYYY-MM-DD ...]"
year = int(formatted[1:5])
assert year == datetime.now(timezone.utc).year
@pytest.mark.asyncio
async def test_start_up_retries_on_abrupt_close(monkeypatch: Any, caplog: Any) -> None:
"""First connection dies with ConnectionClosedError during iteration -> retried.
Second connection iterates cleanly (no events) -> start_up returns without raising.
Ensures handler clears self.connection at the end.
"""
caplog.set_level(logging.WARNING)
# Use a local Exception as the module's ConnectionClosedError to avoid ws dependency
FakeCCE = type("FakeCCE", (Exception,), {})
monkeypatch.setattr(rt_mod, "ConnectionClosedError", FakeCCE)
# Make asyncio.sleep return immediately (for backoff)
async def _fast_sleep(*_a: Any, **_kw: Any) -> None: return None
monkeypatch.setattr(asyncio, "sleep", _fast_sleep, raising=False)
attempt_counter = {"n": 0}
class FakeConn:
"""Minimal realtime connection stub."""
def __init__(self, mode: str):
self._mode = mode
class _Session:
async def update(self, **_kw: Any) -> None: return None
self.session = _Session()
class _InputAudioBuffer:
async def append(self, **_kw: Any) -> None: return None
self.input_audio_buffer = _InputAudioBuffer()
class _Item:
async def create(self, **_kw: Any) -> None: return None
class _Conversation:
item = _Item()
self.conversation = _Conversation()
class _Response:
async def create(self, **_kw: Any) -> None: return None
async def cancel(self, **_kw: Any) -> None: return None
self.response = _Response()
async def __aenter__(self) -> "FakeConn": return self
async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> bool: return False
async def close(self) -> None: return None
# Async iterator protocol
def __aiter__(self) -> "FakeConn": return self
async def __anext__(self) -> None:
if self._mode == "raise_on_iter":
raise FakeCCE("abrupt close (simulated)")
raise StopAsyncIteration # clean exit (no events)
class FakeRealtime:
def connect(self, **_kw: Any) -> FakeConn:
attempt_counter["n"] += 1
mode = "raise_on_iter" if attempt_counter["n"] == 1 else "clean"
return FakeConn(mode)
class FakeClient:
def __init__(self, **_kw: Any) -> None: self.realtime = FakeRealtime()
# Patch the OpenAI client used by the handler
monkeypatch.setattr(rt_mod, "AsyncOpenAI", FakeClient)
# Build handler with minimal deps
deps = ToolDependencies(reachy_mini=MagicMock(), movement_manager=MagicMock())
handler = rt_mod.OpenaiRealtimeHandler(deps)
# Run: should retry once and exit cleanly
await handler.start_up()
# Validate: two attempts total (fail -> retry -> succeed), and connection cleared
assert attempt_counter["n"] == 2
assert handler.connection is None
# Optional: confirm we logged the unexpected close once
warnings = [r for r in caplog.records if r.levelname == "WARNING" and "closed unexpectedly" in r.msg]
assert len(warnings) == 1