Alina Lozovskaya commited on
Commit
e6be86a
·
1 Parent(s): ee25d7e

fix: cancel pending asyncio tasks to prevent warnings on shutdown

Browse files
src/reachy_mini_conversation_demo/console.py CHANGED
@@ -24,6 +24,7 @@ class LocalStream:
24
  self.handler = handler
25
  self._robot = robot
26
  self._stop_event = asyncio.Event()
 
27
  # Allow the handler to flush the player queue when appropriate.
28
  self.handler._clear_queue = self.clear_queue # type: ignore[assignment]
29
 
@@ -34,18 +35,37 @@ class LocalStream:
34
  self._robot.media.start_playing()
35
 
36
  async def runner() -> None:
37
- tasks = [
38
  asyncio.create_task(self.handler.start_up(), name="openai-handler"),
39
  asyncio.create_task(self.record_loop(), name="stream-record-loop"),
40
  asyncio.create_task(self.play_loop(), name="stream-play-loop"),
41
  ]
42
- await asyncio.gather(*tasks)
 
 
 
 
 
 
43
 
44
  asyncio.run(runner())
45
 
46
  def stop(self) -> None:
47
- """Stop the stream and underlying GStreamer pipelines."""
 
 
 
 
 
 
 
48
  self._stop_event.set()
 
 
 
 
 
 
49
  self._robot.media.stop_recording()
50
  self._robot.media.stop_playing()
51
 
 
24
  self.handler = handler
25
  self._robot = robot
26
  self._stop_event = asyncio.Event()
27
+ self._tasks = []
28
  # Allow the handler to flush the player queue when appropriate.
29
  self.handler._clear_queue = self.clear_queue # type: ignore[assignment]
30
 
 
35
  self._robot.media.start_playing()
36
 
37
  async def runner() -> None:
38
+ self._tasks = [
39
  asyncio.create_task(self.handler.start_up(), name="openai-handler"),
40
  asyncio.create_task(self.record_loop(), name="stream-record-loop"),
41
  asyncio.create_task(self.play_loop(), name="stream-play-loop"),
42
  ]
43
+ try:
44
+ await asyncio.gather(*self._tasks)
45
+ except asyncio.CancelledError:
46
+ logger.info("Tasks cancelled during shutdown")
47
+ finally:
48
+ # Ensure handler connection is closed
49
+ await self.handler.shutdown()
50
 
51
  asyncio.run(runner())
52
 
53
  def stop(self) -> None:
54
+ """Stop the stream and underlying GStreamer pipelines.
55
+
56
+ This method:
57
+ - Sets the stop event to signal async loops to terminate
58
+ - Cancels all pending async tasks (openai-handler, record-loop, play-loop)
59
+ - Stops audio recording and playback
60
+ """
61
+ logger.info("Stopping LocalStream...")
62
  self._stop_event.set()
63
+
64
+ # Cancel all running tasks
65
+ for task in self._tasks:
66
+ if not task.done():
67
+ task.cancel()
68
+
69
  self._robot.media.stop_recording()
70
  self._robot.media.stop_playing()
71
 
src/reachy_mini_conversation_demo/main.py CHANGED
@@ -77,6 +77,7 @@ def main():
77
  handler = OpenaiRealtimeHandler(deps)
78
 
79
  stream_manager = None
 
80
  if args.gradio:
81
  stream = Stream(
82
  handler=handler,
@@ -102,9 +103,12 @@ def main():
102
  try:
103
  stream_manager.launch()
104
  except KeyboardInterrupt:
105
- logger.info("Exiting...")
106
- stream_manager.stop()
107
  finally:
 
 
 
 
108
  movement_manager.stop()
109
  head_wobbler.stop()
110
  if camera_worker:
@@ -112,6 +116,7 @@ def main():
112
 
113
  # prevent connection to keep alive some threads
114
  robot.client.disconnect()
 
115
 
116
 
117
  if __name__ == "__main__":
 
77
  handler = OpenaiRealtimeHandler(deps)
78
 
79
  stream_manager = None
80
+
81
  if args.gradio:
82
  stream = Stream(
83
  handler=handler,
 
103
  try:
104
  stream_manager.launch()
105
  except KeyboardInterrupt:
106
+ logger.info("Keyboard interruption in main thread... closing server.")
 
107
  finally:
108
+ # Stop the stream manager and its pipelines
109
+ stream_manager.close()
110
+
111
+ # Stop other services
112
  movement_manager.stop()
113
  head_wobbler.stop()
114
  if camera_worker:
 
116
 
117
  # prevent connection to keep alive some threads
118
  robot.client.disconnect()
119
+ logger.info("Shutdown complete.")
120
 
121
 
122
  if __name__ == "__main__":