Alina Lozovskaya commited on
Commit
13d2d0d
·
1 Parent(s): f8fa8de

Enhance typing

Browse files
src/reachy_mini_conversation_demo/dance_emotion_moves.py CHANGED
@@ -32,7 +32,7 @@ class DanceQueueMove(Move): # type: ignore[misc]
32
  """Duration property required by official Move interface."""
33
  return float(self.dance_move.duration)
34
 
35
- def evaluate(self, t: float) -> Tuple[NDArray[np.float32] | None, NDArray[np.float32] | None, float | None]:
36
  """Evaluate dance move at time t."""
37
  try:
38
  # Get the pose from the dance move
@@ -50,7 +50,7 @@ class DanceQueueMove(Move): # type: ignore[misc]
50
  from reachy_mini.utils import create_head_pose
51
 
52
  neutral_head_pose = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
53
- return (neutral_head_pose, np.array([0.0, 0.0]), 0.0)
54
 
55
 
56
  class EmotionQueueMove(Move): # type: ignore[misc]
@@ -66,7 +66,7 @@ class EmotionQueueMove(Move): # type: ignore[misc]
66
  """Duration property required by official Move interface."""
67
  return float(self.emotion_move.duration)
68
 
69
- def evaluate(self, t: float) -> Tuple[NDArray[np.float32] | None, NDArray[np.float32] | None, float | None]:
70
  """Evaluate emotion move at time t."""
71
  try:
72
  # Get the pose from the emotion move
@@ -84,7 +84,7 @@ class EmotionQueueMove(Move): # type: ignore[misc]
84
  from reachy_mini.utils import create_head_pose
85
 
86
  neutral_head_pose = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
87
- return (neutral_head_pose, np.array([0.0, 0.0]), 0.0)
88
 
89
 
90
  class GotoQueueMove(Move): # type: ignore[misc]
@@ -114,7 +114,7 @@ class GotoQueueMove(Move): # type: ignore[misc]
114
  """Duration property required by official Move interface."""
115
  return self._duration
116
 
117
- def evaluate(self, t: float) -> Tuple[NDArray[np.float32] | None, NDArray[np.float32] | None, float | None]:
118
  """Evaluate goto move at time t using linear interpolation."""
119
  try:
120
  from reachy_mini.utils import create_head_pose
@@ -138,6 +138,7 @@ class GotoQueueMove(Move): # type: ignore[misc]
138
  self.start_antennas[0] + (self.target_antennas[0] - self.start_antennas[0]) * t_clamped,
139
  self.start_antennas[1] + (self.target_antennas[1] - self.start_antennas[1]) * t_clamped,
140
  ],
 
141
  )
142
 
143
  # Interpolate body yaw
@@ -147,6 +148,7 @@ class GotoQueueMove(Move): # type: ignore[misc]
147
 
148
  except Exception as e:
149
  logger.error(f"Error evaluating goto move at t={t}: {e}")
150
- # Return target pose on error - convert antennas to numpy array
151
- target_antennas_array = np.array([self.target_antennas[0], self.target_antennas[1]])
152
- return (self.target_head_pose, target_antennas_array, self.target_body_yaw)
 
 
32
  """Duration property required by official Move interface."""
33
  return float(self.dance_move.duration)
34
 
35
+ def evaluate(self, t: float) -> tuple[NDArray[np.float64] | None, NDArray[np.float64] | None, float | None]:
36
  """Evaluate dance move at time t."""
37
  try:
38
  # Get the pose from the dance move
 
50
  from reachy_mini.utils import create_head_pose
51
 
52
  neutral_head_pose = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
53
+ return (neutral_head_pose, np.array([0.0, 0.0], dtype=np.float64), 0.0)
54
 
55
 
56
  class EmotionQueueMove(Move): # type: ignore[misc]
 
66
  """Duration property required by official Move interface."""
67
  return float(self.emotion_move.duration)
68
 
69
+ def evaluate(self, t: float) -> tuple[NDArray[np.float64] | None, NDArray[np.float64] | None, float | None]:
70
  """Evaluate emotion move at time t."""
71
  try:
72
  # Get the pose from the emotion move
 
84
  from reachy_mini.utils import create_head_pose
85
 
86
  neutral_head_pose = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
87
+ return (neutral_head_pose, np.array([0.0, 0.0], dtype=np.float64), 0.0)
88
 
89
 
90
  class GotoQueueMove(Move): # type: ignore[misc]
 
114
  """Duration property required by official Move interface."""
115
  return self._duration
116
 
117
+ def evaluate(self, t: float) -> tuple[NDArray[np.float64] | None, NDArray[np.float64] | None, float | None]:
118
  """Evaluate goto move at time t using linear interpolation."""
119
  try:
120
  from reachy_mini.utils import create_head_pose
 
138
  self.start_antennas[0] + (self.target_antennas[0] - self.start_antennas[0]) * t_clamped,
139
  self.start_antennas[1] + (self.target_antennas[1] - self.start_antennas[1]) * t_clamped,
140
  ],
141
+ dtype=np.float64,
142
  )
143
 
144
  # Interpolate body yaw
 
148
 
149
  except Exception as e:
150
  logger.error(f"Error evaluating goto move at t={t}: {e}")
151
+ # Return target pose on error - convert to float64
152
+ target_head_pose_f64 = self.target_head_pose.astype(np.float64)
153
+ target_antennas_array = np.array([self.target_antennas[0], self.target_antennas[1]], dtype=np.float64)
154
+ return (target_head_pose_f64, target_antennas_array, self.target_body_yaw)
src/reachy_mini_conversation_demo/moves.py CHANGED
@@ -97,7 +97,7 @@ class BreathingMove(Move): # type: ignore[misc]
97
  """Duration property required by official Move interface."""
98
  return float("inf") # Continuous breathing (never ends naturally)
99
 
100
- def evaluate(self, t: float) -> Tuple[NDArray[np.float32] | None, NDArray[np.float32] | None, float | None]:
101
  """Evaluate breathing move at time t."""
102
  if t < self.interpolation_duration:
103
  # Phase 1: Interpolate to neutral base position
@@ -112,7 +112,7 @@ class BreathingMove(Move): # type: ignore[misc]
112
  antennas_interp = (
113
  1 - interpolation_t
114
  ) * self.interpolation_start_antennas + interpolation_t * self.neutral_antennas
115
- antennas = antennas_interp.astype(np.float32)
116
 
117
  else:
118
  # Phase 2: Breathing patterns from neutral base
@@ -124,7 +124,7 @@ class BreathingMove(Move): # type: ignore[misc]
124
 
125
  # Antenna sway (opposite directions)
126
  antenna_sway = self.antenna_sway_amplitude * np.sin(2 * np.pi * self.antenna_frequency * breathing_time)
127
- antennas = np.array([antenna_sway, -antenna_sway], dtype=np.float32)
128
 
129
  # Return in official Move interface format: (head_pose, antennas_array, body_yaw)
130
  return (head_pose, antennas, 0.0)
 
97
  """Duration property required by official Move interface."""
98
  return float("inf") # Continuous breathing (never ends naturally)
99
 
100
+ def evaluate(self, t: float) -> tuple[NDArray[np.float64] | None, NDArray[np.float64] | None, float | None]:
101
  """Evaluate breathing move at time t."""
102
  if t < self.interpolation_duration:
103
  # Phase 1: Interpolate to neutral base position
 
112
  antennas_interp = (
113
  1 - interpolation_t
114
  ) * self.interpolation_start_antennas + interpolation_t * self.neutral_antennas
115
+ antennas = antennas_interp.astype(np.float64)
116
 
117
  else:
118
  # Phase 2: Breathing patterns from neutral base
 
124
 
125
  # Antenna sway (opposite directions)
126
  antenna_sway = self.antenna_sway_amplitude * np.sin(2 * np.pi * self.antenna_frequency * breathing_time)
127
+ antennas = np.array([antenna_sway, -antenna_sway], dtype=np.float64)
128
 
129
  # Return in official Move interface format: (head_pose, antennas_array, body_yaw)
130
  return (head_pose, antennas, 0.0)
src/reachy_mini_conversation_demo/vision/processors.py CHANGED
@@ -9,6 +9,7 @@ from dataclasses import dataclass
9
  import cv2
10
  import numpy as np
11
  import torch
 
12
  from transformers import AutoProcessor, AutoModelForImageTextToText
13
  from huggingface_hub import snapshot_download
14
 
@@ -90,7 +91,7 @@ class VisionProcessor:
90
 
91
  def process_image(
92
  self,
93
- cv2_image: np.ndarray[Any, Any],
94
  prompt: str = "Briefly describe what you see in one sentence.",
95
  ) -> str:
96
  """Process CV2 image and return description with retry logic."""
 
9
  import cv2
10
  import numpy as np
11
  import torch
12
+ from numpy.typing import NDArray
13
  from transformers import AutoProcessor, AutoModelForImageTextToText
14
  from huggingface_hub import snapshot_download
15
 
 
91
 
92
  def process_image(
93
  self,
94
+ cv2_image: NDArray[np.uint8],
95
  prompt: str = "Briefly describe what you see in one sentence.",
96
  ) -> str:
97
  """Process CV2 image and return description with retry logic."""
src/reachy_mini_conversation_demo/vision/yolo_head_tracker.py CHANGED
@@ -1,8 +1,9 @@
1
  from __future__ import annotations
2
  import logging
3
- from typing import Any, Tuple
4
 
5
  import numpy as np
 
6
 
7
 
8
  try:
@@ -84,7 +85,7 @@ class HeadTracker:
84
  best_idx = valid_indices[np.argmax(scores)]
85
  return int(best_idx)
86
 
87
- def _bbox_to_mp_coords(self, bbox: np.ndarray[Any, Any], w: int, h: int) -> np.ndarray[Any, Any]:
88
  """Convert bounding box center to MediaPipe-style coordinates [-1, 1].
89
 
90
  Args:
@@ -105,7 +106,7 @@ class HeadTracker:
105
 
106
  return np.array([norm_x, norm_y], dtype=np.float32)
107
 
108
- def get_head_position(self, img: np.ndarray[Any, Any]) -> Tuple[np.ndarray[Any, Any] | None, float | None]:
109
  """Get head position from face detection.
110
 
111
  Args:
 
1
  from __future__ import annotations
2
  import logging
3
+ from typing import Tuple
4
 
5
  import numpy as np
6
+ from numpy.typing import NDArray
7
 
8
 
9
  try:
 
85
  best_idx = valid_indices[np.argmax(scores)]
86
  return int(best_idx)
87
 
88
+ def _bbox_to_mp_coords(self, bbox: NDArray[np.float32], w: int, h: int) -> NDArray[np.float32]:
89
  """Convert bounding box center to MediaPipe-style coordinates [-1, 1].
90
 
91
  Args:
 
106
 
107
  return np.array([norm_x, norm_y], dtype=np.float32)
108
 
109
+ def get_head_position(self, img: NDArray[np.uint8]) -> Tuple[NDArray[np.float32] | None, float | None]:
110
  """Get head position from face detection.
111
 
112
  Args: