Alina Lozovskaya commited on
Commit
c772694
·
1 Parent(s): b46d1ae

Move out OpenAIRealtimeHandler from main.py

Browse files
src/reachy_mini_conversation_demo/main.py CHANGED
@@ -1,31 +1,23 @@
1
  from __future__ import annotations
2
 
3
  import asyncio
4
- import json
5
  import logging
6
- import random
7
  import argparse
8
- import time
9
  import warnings
10
 
11
- import numpy as np
12
- from openai import AsyncOpenAI
13
-
14
- from fastrtc import AdditionalOutputs, AsyncStreamHandler, wait_for_item
15
- from websockets import ConnectionClosedError, ConnectionClosedOK
16
 
17
  from reachy_mini import ReachyMini
18
  from reachy_mini.utils import create_head_pose
19
 
20
  from reachy_mini_conversation_demo.config import config
21
  from reachy_mini_conversation_demo.head_tracker import HeadTracker
 
22
  from reachy_mini_conversation_demo.prompts import SESSION_INSTRUCTIONS
23
  from reachy_mini_conversation_demo.tools import (
24
  ToolDependencies,
25
- ALL_TOOL_SPECS,
26
- dispatch_tool_call,
27
  )
28
- from reachy_mini_conversation_demo.audio_sway import AudioSync, AudioConfig, pcm_to_b64
29
  from reachy_mini_conversation_demo.movement import MovementManager
30
  from reachy_mini_conversation_demo.gstreamer import GstPlayer, GstRecorder
31
  from reachy_mini_conversation_demo.vision import VisionManager, init_vision, init_camera
@@ -82,271 +74,6 @@ masked = (API_KEY[:6] + "..." + API_KEY[-4:]) if len(API_KEY) >= 12 else "<short
82
  logger.info("OPENAI_API_KEY loaded (prefix): %s", masked)
83
 
84
 
85
- class OpenAIRealtimeHandler(AsyncStreamHandler):
86
- def __init__(
87
- self,
88
- deps: ToolDependencies,
89
- audio_sync: AudioSync,
90
- robot_is_speaking: asyncio.Event,
91
- speaking_queue: asyncio.Queue,
92
- ) -> None:
93
- super().__init__(
94
- expected_layout="mono",
95
- output_sample_rate=SAMPLE_RATE,
96
- input_sample_rate=SAMPLE_RATE,
97
- )
98
- self.deps = deps
99
- self.audio_sync = audio_sync
100
- self.robot_is_speaking = robot_is_speaking
101
- self.speaking_queue = speaking_queue
102
- self.client: AsyncOpenAI | None = None
103
- self.connection = None
104
- self.output_queue: asyncio.Queue = asyncio.Queue()
105
- self._stop = False
106
- self._started_audio = False
107
- self._connection_ready = False
108
- self._speech_start_time = 0.0
109
- # backoff managment for retry
110
- self._backoff_start = 1.0
111
- self._backoff_max = 16.0
112
- self._backoff = self._backoff_start
113
-
114
- def copy(self):
115
- return OpenAIRealtimeHandler(
116
- self.deps, self.audio_sync, self.robot_is_speaking, self.speaking_queue
117
- )
118
-
119
- async def start_up(self):
120
- if not self._started_audio:
121
- self.audio_sync.start()
122
- self._started_audio = True
123
-
124
- if self.client is None:
125
- logger.info("Realtime start_up: creating AsyncOpenAI client...")
126
- self.client = AsyncOpenAI(api_key=API_KEY)
127
-
128
- self._backoff = self._backoff_start
129
- while not self._stop:
130
- try:
131
- async with self.client.beta.realtime.connect(
132
- model=MODEL_NAME
133
- ) as rt_connection:
134
- self.connection = rt_connection
135
- self._connection_ready = False
136
-
137
- # configure session
138
- await rt_connection.session.update(
139
- session={
140
- "turn_detection": {
141
- "type": "server_vad",
142
- "threshold": 0.6, # Higher threshold = less sensitive
143
- "prefix_padding_ms": 300, # More padding before speech
144
- "silence_duration_ms": 800, # Longer silence before detecting end
145
- },
146
- "voice": "ballad",
147
- "instructions": SESSION_INSTRUCTIONS,
148
- "input_audio_transcription": {
149
- "model": "whisper-1",
150
- "language": "en",
151
- },
152
- "tools": ALL_TOOL_SPECS,
153
- "tool_choice": "auto",
154
- "temperature": 0.7,
155
- }
156
- )
157
-
158
- # Wait for session to be configured
159
- await asyncio.sleep(0.2)
160
-
161
- # Add system message with even stronger brevity emphasis
162
- await rt_connection.conversation.item.create(
163
- item={
164
- "type": "message",
165
- "role": "system",
166
- "content": [
167
- {
168
- "type": "input_text",
169
- "text": f"{SESSION_INSTRUCTIONS}\n\nIMPORTANT: Always keep responses under 25 words. Be extremely concise.",
170
- }
171
- ],
172
- }
173
- )
174
-
175
- self._connection_ready = True
176
-
177
- logger.info(
178
- "Session updated: tools=%d, voice=%s, vad=improved",
179
- len(ALL_TOOL_SPECS),
180
- "ballad",
181
- )
182
-
183
- logger.info("Realtime event loop started with improved VAD")
184
- self._backoff = self._backoff_start
185
-
186
- async for event in rt_connection:
187
- event_type = getattr(event, "type", None)
188
- logger.debug("RT event: %s", event_type)
189
-
190
- # Enhanced speech state tracking
191
- if event_type == "input_audio_buffer.speech_started":
192
- # Only process user speech if robot isn't currently speaking
193
- if not self.robot_is_speaking.is_set():
194
- self.audio_sync.on_input_speech_started()
195
- logger.info("User speech detected (robot not speaking)")
196
- else:
197
- logger.info(
198
- "Ignoring speech detection - robot is speaking"
199
- )
200
-
201
- elif event_type == "response.started":
202
- self._speech_start_time = time.time()
203
- self.audio_sync.on_response_started()
204
- logger.info("Robot started speaking")
205
-
206
- elif event_type in (
207
- "response.audio.completed",
208
- "response.completed",
209
- "response.audio.done",
210
- ):
211
- logger.info("Robot finished speaking %s", event_type)
212
-
213
- elif (
214
- event_type
215
- == "conversation.item.input_audio_transcription.completed"
216
- ):
217
- await self.output_queue.put(
218
- AdditionalOutputs(
219
- {"role": "user", "content": event.transcript}
220
- )
221
- )
222
-
223
- elif event_type == "response.audio_transcript.done":
224
- await self.output_queue.put(
225
- AdditionalOutputs(
226
- {"role": "assistant", "content": event.transcript}
227
- )
228
- )
229
-
230
- # audio streaming
231
- if event_type == "response.audio.delta":
232
- self.robot_is_speaking.set()
233
- # block mic from recording for given time, for each audio delta
234
- self.speaking_queue.put_nowait(0.25)
235
- self.audio_sync.on_response_audio_delta(
236
- getattr(event, "delta", b"")
237
- )
238
-
239
- elif event_type == "response.function_call_arguments.done":
240
- tool_name = getattr(event, "name", None)
241
- args_json_str = getattr(event, "arguments", None)
242
- call_id = getattr(event, "call_id", None)
243
-
244
- try:
245
- tool_result = await dispatch_tool_call(
246
- tool_name, args_json_str, self.deps
247
- )
248
-
249
- logger.info(f"Tool result {tool_result}")
250
-
251
- except Exception as e:
252
- logger.exception("Tool %s failed", tool_name)
253
- tool_result = {"error": str(e)}
254
-
255
- await rt_connection.conversation.item.create(
256
- item={
257
- "type": "function_call_output",
258
- "call_id": call_id,
259
- "output": json.dumps(tool_result),
260
- }
261
- )
262
- logger.info(
263
- "Sent tool=%s call_id=%s result=%s",
264
- tool_name,
265
- call_id,
266
- tool_result,
267
- )
268
- if tool_name and (
269
- tool_name == "camera" or "scene" in tool_name
270
- ):
271
- logger.info(
272
- "Forcing response after tool call %s", tool_name
273
- )
274
- await rt_connection.response.create()
275
-
276
- # server errors
277
- if event_type == "error":
278
- err = getattr(event, "error", None)
279
- msg = getattr(
280
- err, "message", str(err) if err else "unknown error"
281
- )
282
- logger.error("Realtime error: %s (raw=%s)", msg, err)
283
- await self.output_queue.put(
284
- AdditionalOutputs(
285
- {"role": "assistant", "content": f"[error] {msg}"}
286
- )
287
- )
288
-
289
- except (ConnectionClosedOK, ConnectionClosedError) as e:
290
- if self._stop:
291
- break
292
- logger.warning(
293
- "Connection closed (%s). Reconnecting…",
294
- getattr(e, "code", "no-code"),
295
- )
296
- except asyncio.CancelledError:
297
- break
298
- except Exception:
299
- logger.exception("Realtime loop error; will reconnect")
300
- finally:
301
- self.connection = None
302
- self._connection_ready = False
303
-
304
- # Exponential backoff
305
- delay = min(self._backoff, self._backoff_max) + random.uniform(0, 0.5)
306
- logger.info("Reconnect in %.1fs…", delay)
307
- await asyncio.sleep(delay)
308
- self._backoff = min(self._backoff * 2.0, self._backoff_max)
309
-
310
- async def receive(self, frame: bytes) -> None:
311
- """Mic frames from fastrtc."""
312
- # Don't send mic audio while robot is speaking (simple echo cancellation)
313
- if self.robot_is_speaking.is_set() or not self._connection_ready:
314
- return
315
-
316
- mic_samples = np.frombuffer(frame, dtype=np.int16).squeeze()
317
- audio_b64 = pcm_to_b64(mic_samples)
318
-
319
- try:
320
- await self.connection.input_audio_buffer.append(audio=audio_b64)
321
- except (ConnectionClosedOK, ConnectionClosedError):
322
- pass
323
-
324
- async def emit(self) -> tuple[int, np.ndarray] | AdditionalOutputs | None:
325
- """Return audio for playback or chat outputs."""
326
- try:
327
- sample_rate, pcm_frame = self.audio_sync.playback_q.get_nowait()
328
- logger.debug(
329
- "Emitting playback frame (sr=%d, n=%d)", sample_rate, pcm_frame.size
330
- )
331
- return (sample_rate, pcm_frame)
332
- except asyncio.QueueEmpty:
333
- pass
334
- return await wait_for_item(self.output_queue)
335
-
336
- async def shutdown(self) -> None:
337
- logger.info("Shutdown: closing connections and audio")
338
- self._stop = True
339
- if self.connection:
340
- try:
341
- await self.connection.close()
342
- except Exception:
343
- logger.exception("Error closing realtime connection")
344
- finally:
345
- self.connection = None
346
- self._connection_ready = False
347
- await self.audio_sync.stop()
348
-
349
-
350
  async def receive_loop(
351
  recorder: GstRecorder, openai: OpenAIRealtimeHandler, stop_event: asyncio.Event
352
  ) -> None:
 
1
  from __future__ import annotations
2
 
3
  import asyncio
 
4
  import logging
 
5
  import argparse
 
6
  import warnings
7
 
8
+ from fastrtc import AdditionalOutputs
 
 
 
 
9
 
10
  from reachy_mini import ReachyMini
11
  from reachy_mini.utils import create_head_pose
12
 
13
  from reachy_mini_conversation_demo.config import config
14
  from reachy_mini_conversation_demo.head_tracker import HeadTracker
15
+ from reachy_mini_conversation_demo.openai_realtime import OpenAIRealtimeHandler
16
  from reachy_mini_conversation_demo.prompts import SESSION_INSTRUCTIONS
17
  from reachy_mini_conversation_demo.tools import (
18
  ToolDependencies,
 
 
19
  )
20
+ from reachy_mini_conversation_demo.audio_sway import AudioSync, AudioConfig
21
  from reachy_mini_conversation_demo.movement import MovementManager
22
  from reachy_mini_conversation_demo.gstreamer import GstPlayer, GstRecorder
23
  from reachy_mini_conversation_demo.vision import VisionManager, init_vision, init_camera
 
74
  logger.info("OPENAI_API_KEY loaded (prefix): %s", masked)
75
 
76
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
77
  async def receive_loop(
78
  recorder: GstRecorder, openai: OpenAIRealtimeHandler, stop_event: asyncio.Event
79
  ) -> None:
src/reachy_mini_conversation_demo/openai_realtime.py ADDED
@@ -0,0 +1,291 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import asyncio
4
+ import json
5
+ import logging
6
+ import random
7
+ import time
8
+ from typing import Optional
9
+
10
+ import numpy as np
11
+ from openai import AsyncOpenAI
12
+ from websockets import ConnectionClosedError, ConnectionClosedOK
13
+
14
+ from fastrtc import AdditionalOutputs, AsyncStreamHandler, wait_for_item
15
+
16
+ from reachy_mini_conversation_demo.prompts import SESSION_INSTRUCTIONS
17
+ from reachy_mini_conversation_demo.tools import (
18
+ ToolDependencies,
19
+ ALL_TOOL_SPECS,
20
+ dispatch_tool_call,
21
+ )
22
+ from reachy_mini_conversation_demo.audio_sway import AudioSync, pcm_to_b64
23
+ from reachy_mini_conversation_demo.config import config
24
+
25
+ logger = logging.getLogger(__name__)
26
+
27
+
28
+ SAMPLE_RATE = 24000 # keep same default as before
29
+ MODEL_NAME = config.MODEL_NAME
30
+ API_KEY = config.OPENAI_API_KEY
31
+
32
+
33
+ class OpenAIRealtimeHandler(AsyncStreamHandler):
34
+ """
35
+ Async handler that bridges mic input -> OpenAI Realtime -> audio out (+ tool calls).
36
+ Public API:
37
+ - start_up()
38
+ - receive(frame: bytes)
39
+ - emit() -> tuple[int, np.ndarray] | AdditionalOutputs | None
40
+ - shutdown()
41
+ """
42
+
43
+ def __init__(
44
+ self,
45
+ deps: ToolDependencies,
46
+ audio_sync: AudioSync,
47
+ robot_is_speaking: asyncio.Event,
48
+ speaking_queue: asyncio.Queue,
49
+ ) -> None:
50
+ super().__init__(
51
+ expected_layout="mono",
52
+ output_sample_rate=SAMPLE_RATE,
53
+ input_sample_rate=SAMPLE_RATE,
54
+ )
55
+ # deps
56
+ self.deps = deps
57
+ self.audio_sync = audio_sync
58
+ self.robot_is_speaking = robot_is_speaking
59
+ self.speaking_queue = speaking_queue
60
+
61
+ # runtime
62
+ self.client: Optional[AsyncOpenAI] = None
63
+ self.connection = None
64
+ self.output_queue: asyncio.Queue = asyncio.Queue()
65
+ self._stop = False
66
+ self._started_audio = False
67
+ self._connection_ready = False
68
+ self._speech_start_time = 0.0
69
+
70
+ # backoff
71
+ self._backoff_start = 1.0
72
+ self._backoff_max = 16.0
73
+ self._backoff = self._backoff_start
74
+
75
+ def copy(self) -> "OpenAIRealtimeHandler":
76
+ return OpenAIRealtimeHandler(
77
+ self.deps,
78
+ self.audio_sync,
79
+ self.robot_is_speaking,
80
+ self.speaking_queue,
81
+ )
82
+
83
+ async def start_up(self):
84
+ if not self._started_audio:
85
+ self.audio_sync.start()
86
+ self._started_audio = True
87
+
88
+ if self.client is None:
89
+ logger.info("Realtime start_up: creating AsyncOpenAI client…")
90
+ self.client = AsyncOpenAI(api_key=API_KEY)
91
+
92
+ self._backoff = self._backoff_start
93
+ while not self._stop:
94
+ try:
95
+ async with self.client.beta.realtime.connect(model=MODEL_NAME) as rtc:
96
+ self.connection = rtc
97
+ self._connection_ready = False
98
+
99
+ # Configure session
100
+ await rtc.session.update(
101
+ session={
102
+ "turn_detection": {
103
+ "type": "server_vad",
104
+ "threshold": 0.6,
105
+ "prefix_padding_ms": 300,
106
+ "silence_duration_ms": 800,
107
+ },
108
+ "voice": "ballad",
109
+ "instructions": SESSION_INSTRUCTIONS,
110
+ "input_audio_transcription": {
111
+ "model": "whisper-1",
112
+ "language": "en",
113
+ },
114
+ "tools": ALL_TOOL_SPECS,
115
+ "tool_choice": "auto",
116
+ "temperature": 0.7,
117
+ }
118
+ )
119
+
120
+ # Give the server a breath to apply config
121
+ await asyncio.sleep(0.2)
122
+
123
+ # Extra brevity instruction
124
+ await rtc.conversation.item.create(
125
+ item={
126
+ "type": "message",
127
+ "role": "system",
128
+ "content": [
129
+ {
130
+ "type": "input_text",
131
+ "text": (
132
+ f"{SESSION_INSTRUCTIONS}\n\n"
133
+ "IMPORTANT: Always keep responses under 25 words. Be extremely concise."
134
+ ),
135
+ }
136
+ ],
137
+ }
138
+ )
139
+
140
+ self._connection_ready = True
141
+ self._backoff = self._backoff_start
142
+ logger.info(
143
+ "Session ready (tools=%d, voice=%s)",
144
+ len(ALL_TOOL_SPECS),
145
+ "ballad",
146
+ )
147
+
148
+ async for event in rtc:
149
+ et = getattr(event, "type", None)
150
+ logger.debug("RT event: %s", et)
151
+
152
+ # conversation / transcripts
153
+ if et == "input_audio_buffer.speech_started":
154
+ if not self.robot_is_speaking.is_set():
155
+ self.audio_sync.on_input_speech_started()
156
+ logger.info("User speech detected")
157
+ elif et == "response.started":
158
+ self._speech_start_time = time.time()
159
+ self.audio_sync.on_response_started()
160
+ logger.info("Robot started speaking")
161
+ elif et in (
162
+ "response.audio.completed",
163
+ "response.completed",
164
+ "response.audio.done",
165
+ ):
166
+ logger.info("Robot finished speaking (%s)", et)
167
+ elif (
168
+ et
169
+ == "conversation.item.input_audio_transcription.completed"
170
+ ):
171
+ await self.output_queue.put(
172
+ AdditionalOutputs(
173
+ {"role": "user", "content": event.transcript}
174
+ )
175
+ )
176
+ elif et == "response.audio_transcript.done":
177
+ await self.output_queue.put(
178
+ AdditionalOutputs(
179
+ {"role": "assistant", "content": event.transcript}
180
+ )
181
+ )
182
+
183
+ # streaming audio
184
+ if et == "response.audio.delta":
185
+ self.robot_is_speaking.set()
186
+ # block mic briefly per chunk to reduce echo
187
+ self.speaking_queue.put_nowait(0.25)
188
+ self.audio_sync.on_response_audio_delta(
189
+ getattr(event, "delta", b"")
190
+ )
191
+
192
+ # tool calls
193
+ elif et == "response.function_call_arguments.done":
194
+ tool_name = getattr(event, "name", None)
195
+ args_json_str = getattr(event, "arguments", None)
196
+ call_id = getattr(event, "call_id", None)
197
+
198
+ try:
199
+ tool_result = await dispatch_tool_call(
200
+ tool_name, args_json_str, self.deps
201
+ )
202
+ logger.info("Tool result: %s", tool_result)
203
+ except Exception as e:
204
+ logger.exception("Tool %s failed", tool_name)
205
+ tool_result = {"error": str(e)}
206
+
207
+ await rtc.conversation.item.create(
208
+ item={
209
+ "type": "function_call_output",
210
+ "call_id": call_id,
211
+ "output": json.dumps(tool_result),
212
+ }
213
+ )
214
+
215
+ # Force LLM to speak after vision/camera tools
216
+ if tool_name and (
217
+ tool_name == "camera" or "scene" in tool_name
218
+ ):
219
+ await rtc.response.create()
220
+
221
+ # server error
222
+ if et == "error":
223
+ err = getattr(event, "error", None)
224
+ msg = getattr(
225
+ err, "message", str(err) if err else "unknown error"
226
+ )
227
+ logger.error("Realtime error: %s (raw=%s)", msg, err)
228
+ await self.output_queue.put(
229
+ AdditionalOutputs(
230
+ {"role": "assistant", "content": f"[error] {msg}"}
231
+ )
232
+ )
233
+
234
+ except (ConnectionClosedOK, ConnectionClosedError) as e:
235
+ if self._stop:
236
+ break
237
+ logger.warning(
238
+ "Connection closed (%s). Reconnecting…",
239
+ getattr(e, "code", "no-code"),
240
+ )
241
+ except asyncio.CancelledError:
242
+ break
243
+ except Exception:
244
+ logger.exception("Realtime loop error; will reconnect")
245
+ finally:
246
+ self.connection = None
247
+ self._connection_ready = False
248
+
249
+ # Exponential backoff before reconnect
250
+ delay = min(self._backoff, self._backoff_max) + random.uniform(0, 0.5)
251
+ logger.info("Reconnect in %.1fs…", delay)
252
+ await asyncio.sleep(delay)
253
+ self._backoff = min(self._backoff * 2.0, self._backoff_max)
254
+
255
+ async def receive(self, frame: bytes) -> None:
256
+ """Accept PCM16 mono frames from the mic pipeline (fastrtc)."""
257
+ if self.robot_is_speaking.is_set() or not self._connection_ready:
258
+ return
259
+
260
+ mic_samples = np.frombuffer(frame, dtype=np.int16).squeeze()
261
+ audio_b64 = pcm_to_b64(mic_samples)
262
+
263
+ try:
264
+ await self.connection.input_audio_buffer.append(audio=audio_b64)
265
+ except (ConnectionClosedOK, ConnectionClosedError):
266
+ pass
267
+
268
+ async def emit(self) -> tuple[int, np.ndarray] | AdditionalOutputs | None:
269
+ """Return either audio to play (sr, np.int16 array) or chat outputs."""
270
+ try:
271
+ sample_rate, pcm_frame = self.audio_sync.playback_q.get_nowait()
272
+ logger.debug(
273
+ "Emitting playback frame (sr=%d, n=%d)", sample_rate, pcm_frame.size
274
+ )
275
+ return (sample_rate, pcm_frame)
276
+ except asyncio.QueueEmpty:
277
+ pass
278
+ return await wait_for_item(self.output_queue)
279
+
280
+ async def shutdown(self) -> None:
281
+ logger.info("Shutdown: closing connections and audio")
282
+ self._stop = True
283
+ if self.connection:
284
+ try:
285
+ await self.connection.close()
286
+ except Exception:
287
+ logger.exception("Error closing realtime connection")
288
+ finally:
289
+ self.connection = None
290
+ self._connection_ready = False
291
+ await self.audio_sync.stop()