Alina Lozovskaya commited on
Commit
e660849
·
1 Parent(s): 4067dbe

Separate headless and gradio [wip]

Browse files
src/reachy_mini_conversation_demo/audio/gstreamer.py CHANGED
@@ -7,7 +7,7 @@ import gi
7
 
8
  gi.require_version("Gst", "1.0")
9
  gi.require_version("GstApp", "1.0")
10
- from gi.repository import Gst, GLib # noqa: E402
11
 
12
 
13
  class GstPlayer:
@@ -159,7 +159,6 @@ class GstRecorder:
159
  self._thread_bus_calls.start()
160
 
161
  def get_sample(self):
162
- """Return next audio sample as bytes, or None if no sample available."""
163
  sample = self.appsink.pull_sample()
164
  data = None
165
  if isinstance(sample, Gst.Sample):
 
7
 
8
  gi.require_version("Gst", "1.0")
9
  gi.require_version("GstApp", "1.0")
10
+ from gi.repository import Gst, GLib, GstApp # noqa: E402
11
 
12
 
13
  class GstPlayer:
 
159
  self._thread_bus_calls.start()
160
 
161
  def get_sample(self):
 
162
  sample = self.appsink.pull_sample()
163
  data = None
164
  if isinstance(sample, Gst.Sample):
src/reachy_mini_conversation_demo/console.py ADDED
@@ -0,0 +1,101 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import base64
3
+ import logging
4
+ import numpy as np
5
+
6
+ from fastrtc import AdditionalOutputs
7
+ from gi.repository import Gst
8
+
9
+ from reachy_mini_conversation_demo.audio.head_wobbler import SAMPLE_RATE
10
+ from reachy_mini_conversation_demo.openai_realtime import OpenaiRealtimeHandler
11
+ from reachy_mini_conversation_demo.audio.gstreamer import GstPlayer, GstRecorder
12
+
13
+ logger = logging.getLogger(__name__)
14
+
15
+
16
+ class LocalStream:
17
+ def __init__(self, handler: OpenaiRealtimeHandler):
18
+ self.handler = handler
19
+ self._stop_event = asyncio.Event()
20
+
21
+ self.recorder = GstRecorder(sample_rate=SAMPLE_RATE)
22
+ self.player = GstPlayer(sample_rate=SAMPLE_RATE)
23
+
24
+ self.handler._clear_queue = self.clear_queue # type: ignore[assignment]
25
+
26
+ # player_bus = self.player.pipeline.get_bus()
27
+ # player_bus.add_signal_watch()
28
+ # player_bus.connect("message", self.on_player_message)
29
+
30
+ # def on_player_message(self, bus, message):
31
+ # # logger.info(f"Player message: {message.type}")
32
+ # if message.type == Gst.MessageType.STATE_CHANGED:
33
+ # old_state, new_state, pending_state = message.parse_state_changed()
34
+ # if new_state != old_state and new_state == Gst.State.PLAYING:
35
+ # print("Player is now playing")
36
+ # self.recorder.pipeline.set_state(Gst.State.PAUSED)
37
+
38
+ # if new_state != old_state and new_state == Gst.State.PAUSED:
39
+ # print("Player is now paused")
40
+ # self.recorder.pipeline.set_state(Gst.State.PLAYING)
41
+
42
+ # if message.type == Gst.MessageType.EOS:
43
+ # self.recorder.pipeline.set_state(Gst.State.PLAYING)
44
+ # print("Player reached end of stream, restarting recorder")
45
+
46
+ def clear_queue(self):
47
+ self.player.pipeline.set_state(Gst.State.PAUSED)
48
+ self.player.appsrc.send_event(Gst.Event.new_flush_start())
49
+ self.player.appsrc.send_event(Gst.Event.new_flush_stop(reset_time=True))
50
+ self.player.pipeline.set_state(Gst.State.PLAYING)
51
+ logger.info("Cleared player queue")
52
+
53
+ def start(self):
54
+ self._stop_event.clear()
55
+ self.recorder.record()
56
+ self.player.play()
57
+
58
+ async def runner():
59
+ tasks = [
60
+ asyncio.create_task(self.handler.start_up(), name="openai-handler"),
61
+ asyncio.create_task(self.record_loop(), name="stream-record-loop"),
62
+ asyncio.create_task(self.play_loop(), name="stream-play-loop"),
63
+ ]
64
+ await asyncio.gather(*tasks)
65
+
66
+ asyncio.run(runner())
67
+
68
+ def stop(self):
69
+ self._stop_event.set()
70
+ self.recorder.stop()
71
+ self.player.stop()
72
+
73
+ async def record_loop(self) -> None:
74
+ logger.info("Starting receive loop")
75
+ while not self._stop_event.is_set():
76
+ data = self.recorder.get_sample()
77
+
78
+ if data is not None:
79
+ frame = np.frombuffer(data, dtype=np.int16).squeeze()
80
+ await self.handler.receive((0, frame))
81
+ await asyncio.sleep(0) # Prevent busy waiting
82
+
83
+ async def play_loop(self) -> None:
84
+ while not self._stop_event.is_set():
85
+ data = await self.handler.emit()
86
+ if isinstance(data, AdditionalOutputs):
87
+ for msg in data.args:
88
+ content = msg.get("content", "")
89
+ logger.info(
90
+ "role=%s content=%s",
91
+ msg.get("role"),
92
+ content if len(content) < 500 else content[:500] + "…",
93
+ )
94
+
95
+ elif isinstance(data, tuple):
96
+ _, frame = data
97
+ self.player.push_sample(frame.tobytes())
98
+ else:
99
+ pass
100
+
101
+ await asyncio.sleep(0) # Prevent busy waiting
src/reachy_mini_conversation_demo/main.py CHANGED
@@ -3,8 +3,8 @@
3
  import os
4
 
5
  import gradio as gr
6
- from fastapi import FastAPI
7
- from fastrtc import Stream
8
 
9
  from reachy_mini import ReachyMini
10
  from reachy_mini_conversation_demo.moves import MovementManager
@@ -14,6 +14,7 @@ from reachy_mini_conversation_demo.utils import (
14
  setup_logger,
15
  handle_vision_stuff,
16
  )
 
17
  from reachy_mini_conversation_demo.openai_realtime import OpenaiRealtimeHandler
18
  from reachy_mini_conversation_demo.audio.head_wobbler import HeadWobbler
19
 
@@ -66,7 +67,9 @@ def main():
66
  logger.debug(f"Chatbot avatar images: {chatbot.avatar_images}")
67
 
68
  handler = OpenaiRealtimeHandler(deps)
69
- stream = Stream(
 
 
70
  handler=handler,
71
  mode="send-receive",
72
  modality="audio",
@@ -76,8 +79,8 @@ def main():
76
  ui_args={"title": "Talk with Reachy Mini"},
77
  )
78
 
79
- app = FastAPI()
80
- app = gr.mount_gradio_app(app, stream.ui, path="/")
81
 
82
  # Each async service → its own thread/loop
83
  movement_manager.start()
@@ -86,10 +89,11 @@ def main():
86
  camera_worker.start()
87
 
88
  try:
89
- stream.ui.launch()
 
90
  except KeyboardInterrupt:
91
  logger.info("Exiting...")
92
-
93
  finally:
94
  movement_manager.stop()
95
  head_wobbler.stop()
 
3
  import os
4
 
5
  import gradio as gr
6
+ import fastrtc
7
+
8
 
9
  from reachy_mini import ReachyMini
10
  from reachy_mini_conversation_demo.moves import MovementManager
 
14
  setup_logger,
15
  handle_vision_stuff,
16
  )
17
+ from reachy_mini_conversation_demo.console import LocalStream
18
  from reachy_mini_conversation_demo.openai_realtime import OpenaiRealtimeHandler
19
  from reachy_mini_conversation_demo.audio.head_wobbler import HeadWobbler
20
 
 
67
  logger.debug(f"Chatbot avatar images: {chatbot.avatar_images}")
68
 
69
  handler = OpenaiRealtimeHandler(deps)
70
+ local_stream = LocalStream(handler)
71
+
72
+ stream = fastrtc.Stream(
73
  handler=handler,
74
  mode="send-receive",
75
  modality="audio",
 
79
  ui_args={"title": "Talk with Reachy Mini"},
80
  )
81
 
82
+ # app = fastrtc.FastAPI()
83
+ # app = gr.mount_gradio_app(app, stream.ui, path="/")
84
 
85
  # Each async service → its own thread/loop
86
  movement_manager.start()
 
89
  camera_worker.start()
90
 
91
  try:
92
+ local_stream.start()
93
+ # stream.ui.launch()
94
  except KeyboardInterrupt:
95
  logger.info("Exiting...")
96
+ local_stream.stop()
97
  finally:
98
  movement_manager.stop()
99
  head_wobbler.stop()