reachy_mini_conversation_app / tests /test_openai_realtime.py
dlouapre's picture
dlouapre HF Staff
Ruff
fd16944
raw
history blame
4.37 kB
import asyncio
import logging
from typing import Any
from datetime import datetime, timezone
from unittest.mock import MagicMock
import pytest
import reachy_mini_conversation_app.openai_realtime as rt_mod
from reachy_mini_conversation_app.openai_realtime import OpenaiRealtimeHandler
from reachy_mini_conversation_app.tools.core_tools import ToolDependencies
def _build_handler(loop: asyncio.AbstractEventLoop) -> OpenaiRealtimeHandler:
asyncio.set_event_loop(loop)
deps = ToolDependencies(reachy_mini=MagicMock(), movement_manager=MagicMock())
return OpenaiRealtimeHandler(deps)
def test_format_timestamp_uses_wall_clock() -> None:
"""Test that format_timestamp uses wall clock time."""
loop = asyncio.new_event_loop()
try:
print("Testing format_timestamp...")
handler = _build_handler(loop)
formatted = handler.format_timestamp()
print(f"Formatted timestamp: {formatted}")
finally:
asyncio.set_event_loop(None)
loop.close()
# Extract year from "[YYYY-MM-DD ...]"
year = int(formatted[1:5])
assert year == datetime.now(timezone.utc).year
@pytest.mark.asyncio
async def test_start_up_retries_on_abrupt_close(monkeypatch: Any, caplog: Any) -> None:
"""First connection dies with ConnectionClosedError during iteration -> retried.
Second connection iterates cleanly (no events) -> start_up returns without raising.
Ensures handler clears self.connection at the end.
"""
caplog.set_level(logging.WARNING)
# Use a local Exception as the module's ConnectionClosedError to avoid ws dependency
FakeCCE = type("FakeCCE", (Exception,), {})
monkeypatch.setattr(rt_mod, "ConnectionClosedError", FakeCCE)
# Make asyncio.sleep return immediately (for backoff)
async def _fast_sleep(*_a: Any, **_kw: Any) -> None: return None
monkeypatch.setattr(asyncio, "sleep", _fast_sleep, raising=False)
attempt_counter = {"n": 0}
class FakeConn:
"""Minimal realtime connection stub."""
def __init__(self, mode: str):
self._mode = mode
class _Session:
async def update(self, **_kw: Any) -> None: return None
self.session = _Session()
class _InputAudioBuffer:
async def append(self, **_kw: Any) -> None: return None
self.input_audio_buffer = _InputAudioBuffer()
class _Item:
async def create(self, **_kw: Any) -> None: return None
class _Conversation:
item = _Item()
self.conversation = _Conversation()
class _Response:
async def create(self, **_kw: Any) -> None: return None
async def cancel(self, **_kw: Any) -> None: return None
self.response = _Response()
async def __aenter__(self) -> "FakeConn": return self
async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> bool: return False
async def close(self) -> None: return None
# Async iterator protocol
def __aiter__(self) -> "FakeConn": return self
async def __anext__(self) -> None:
if self._mode == "raise_on_iter":
raise FakeCCE("abrupt close (simulated)")
raise StopAsyncIteration # clean exit (no events)
class FakeRealtime:
def connect(self, **_kw: Any) -> FakeConn:
attempt_counter["n"] += 1
mode = "raise_on_iter" if attempt_counter["n"] == 1 else "clean"
return FakeConn(mode)
class FakeClient:
def __init__(self, **_kw: Any) -> None: self.realtime = FakeRealtime()
# Patch the OpenAI client used by the handler
monkeypatch.setattr(rt_mod, "AsyncOpenAI", FakeClient)
# Build handler with minimal deps
deps = ToolDependencies(reachy_mini=MagicMock(), movement_manager=MagicMock())
handler = rt_mod.OpenaiRealtimeHandler(deps)
# Run: should retry once and exit cleanly
await handler.start_up()
# Validate: two attempts total (fail -> retry -> succeed), and connection cleared
assert attempt_counter["n"] == 2
assert handler.connection is None
# Optional: confirm we logged the unexpected close once
warnings = [r for r in caplog.records if r.levelname == "WARNING" and "closed unexpectedly" in r.msg]
assert len(warnings) == 1