Alina Lozovskaya commited on
Commit
561574f
·
1 Parent(s): 15911c8

Convert VisionManager to threaded worker pattern

Browse files
src/reachy_mini_conversation_demo/vision/processors.py CHANGED
@@ -1,7 +1,6 @@
1
  import os
2
  import time
3
  import base64
4
- import asyncio
5
  import logging
6
  import threading
7
  from typing import Any, Dict, Optional
@@ -214,44 +213,57 @@ class VisionManager:
214
  self.processor = VisionProcessor(self.vision_config)
215
 
216
  self._last_processed_time = 0
 
 
217
 
218
  # Initialize processor
219
  if not self.processor.initialize():
220
  logger.error("Failed to initialize vision processor")
221
  raise RuntimeError("Vision processor initialization failed")
222
 
223
- async def enable(self, stop_event: threading.Event):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
224
  """Vision processing loop (runs in separate thread)."""
225
- while not stop_event.is_set():
226
  try:
227
  current_time = time.time()
228
 
229
  if current_time - self._last_processed_time >= self.vision_interval:
230
  frame = self.camera.get_latest_frame()
231
  if frame is not None:
232
- description = await asyncio.to_thread(
233
- lambda: self.processor.process_image(
234
- frame, "Briefly describe what you see in one sentence."
235
- )
236
  )
237
 
238
  # Only update if we got a valid response
239
  if description and not description.startswith(("Vision", "Failed", "Error")):
240
  self._last_processed_time = current_time
241
-
242
- logger.info(f"Vision update: {description}")
243
  else:
244
  logger.warning(f"Invalid vision response: {description}")
245
 
246
- await asyncio.sleep(1.0) # Check every second
247
 
248
  except Exception:
249
  logger.exception("Vision processing loop error")
250
- await asyncio.sleep(5.0) # Longer sleep on error
251
 
252
  logger.info("Vision loop finished")
253
 
254
- async def get_status(self) -> Dict[str, Any]:
255
  """Get comprehensive status information."""
256
  return {
257
  "last_processed": self._last_processed_time,
 
1
  import os
2
  import time
3
  import base64
 
4
  import logging
5
  import threading
6
  from typing import Any, Dict, Optional
 
213
  self.processor = VisionProcessor(self.vision_config)
214
 
215
  self._last_processed_time = 0
216
+ self._stop_event = threading.Event()
217
+ self._thread: Optional[threading.Thread] = None
218
 
219
  # Initialize processor
220
  if not self.processor.initialize():
221
  logger.error("Failed to initialize vision processor")
222
  raise RuntimeError("Vision processor initialization failed")
223
 
224
+ def start(self) -> None:
225
+ """Start the vision processing loop in a thread."""
226
+ self._stop_event.clear()
227
+ self._thread = threading.Thread(target=self._working_loop, daemon=True)
228
+ self._thread.start()
229
+ logger.info("Local vision processing started")
230
+
231
+ def stop(self) -> None:
232
+ """Stop the vision processing loop."""
233
+ self._stop_event.set()
234
+ if self._thread is not None:
235
+ self._thread.join()
236
+ logger.info("Local vision processing stopped")
237
+
238
+ def _working_loop(self) -> None:
239
  """Vision processing loop (runs in separate thread)."""
240
+ while not self._stop_event.is_set():
241
  try:
242
  current_time = time.time()
243
 
244
  if current_time - self._last_processed_time >= self.vision_interval:
245
  frame = self.camera.get_latest_frame()
246
  if frame is not None:
247
+ description = self.processor.process_image(
248
+ frame, "Briefly describe what you see in one sentence."
 
 
249
  )
250
 
251
  # Only update if we got a valid response
252
  if description and not description.startswith(("Vision", "Failed", "Error")):
253
  self._last_processed_time = current_time
254
+ logger.debug(f"Vision update: {description}")
 
255
  else:
256
  logger.warning(f"Invalid vision response: {description}")
257
 
258
+ time.sleep(1.0) # Check every second
259
 
260
  except Exception:
261
  logger.exception("Vision processing loop error")
262
+ time.sleep(5.0) # Longer sleep on error
263
 
264
  logger.info("Vision loop finished")
265
 
266
+ def get_status(self) -> Dict[str, Any]:
267
  """Get comprehensive status information."""
268
  return {
269
  "last_processed": self._last_processed_time,