"""Regression tests for the audio-driven head wobble behaviour.""" import math import time import base64 import threading from typing import Any, List, Tuple from collections.abc import Callable import numpy as np from reachy_mini_conversation_app.audio.head_wobbler import HeadWobbler def _make_audio_chunk(duration_s: float = 0.3, frequency_hz: float = 220.0) -> str: """Generate a base64-encoded mono PCM16 sine wave.""" sample_rate = 24000 sample_count = int(sample_rate * duration_s) t = np.linspace(0, duration_s, sample_count, endpoint=False) wave = 0.6 * np.sin(2 * math.pi * frequency_hz * t) pcm = np.clip(wave * np.iinfo(np.int16).max, -32768, 32767).astype(np.int16) return base64.b64encode(pcm.tobytes()).decode("ascii") def _wait_for(predicate: Callable[[], bool], timeout: float = 0.6) -> bool: """Poll `predicate` until true or timeout.""" end_time = time.time() + timeout while time.time() < end_time: if predicate(): return True time.sleep(0.01) return False def _start_wobbler() -> Tuple[HeadWobbler, List[Tuple[float, Tuple[float, float, float, float, float, float]]]]: captured: List[Tuple[float, Tuple[float, float, float, float, float, float]]] = [] def capture(offsets: Tuple[float, float, float, float, float, float]) -> None: captured.append((time.time(), offsets)) wobbler = HeadWobbler(set_speech_offsets=capture) wobbler.start() return wobbler, captured def test_reset_drops_pending_offsets() -> None: """Reset should stop wobble output derived from pre-reset audio.""" wobbler, captured = _start_wobbler() try: wobbler.feed(_make_audio_chunk(duration_s=0.35)) assert _wait_for(lambda: len(captured) > 0), "wobbler did not emit initial offsets" pre_reset_count = len(captured) wobbler.reset() time.sleep(0.3) assert len(captured) == pre_reset_count, "offsets continued after reset without new audio" finally: wobbler.stop() def test_reset_allows_future_offsets() -> None: """After reset, fresh audio must still produce wobble offsets.""" wobbler, captured = _start_wobbler() try: wobbler.feed(_make_audio_chunk(duration_s=0.35)) assert _wait_for(lambda: len(captured) > 0), "wobbler did not emit initial offsets" wobbler.reset() pre_second_count = len(captured) wobbler.feed(_make_audio_chunk(duration_s=0.35, frequency_hz=440.0)) assert _wait_for(lambda: len(captured) > pre_second_count), "no offsets after reset" assert wobbler._thread is not None and wobbler._thread.is_alive() finally: wobbler.stop() def test_reset_during_inflight_chunk_keeps_worker(monkeypatch: Any) -> None: """Simulate reset during chunk processing to ensure the worker survives.""" wobbler, captured = _start_wobbler() ready = threading.Event() release = threading.Event() original_feed = wobbler.sway.feed def blocking_feed(pcm, sr): # type: ignore[no-untyped-def] ready.set() release.wait(timeout=2.0) return original_feed(pcm, sr) monkeypatch.setattr(wobbler.sway, "feed", blocking_feed) try: wobbler.feed(_make_audio_chunk(duration_s=0.35)) assert ready.wait(timeout=1.0), "worker thread did not dequeue audio" wobbler.reset() release.set() # Allow the worker to finish processing the first chunk (which should be discarded) time.sleep(0.1) assert wobbler._thread is not None and wobbler._thread.is_alive(), "worker thread died after reset" pre_second = len(captured) wobbler.feed(_make_audio_chunk(duration_s=0.35, frequency_hz=440.0)) assert _wait_for(lambda: len(captured) > pre_second), "no offsets emitted after in-flight reset" assert wobbler._thread.is_alive() finally: wobbler.stop()