Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import logging, faulthandler, sys, time, os, requests, zipfile, io, gc, threading, psutil, json, configargparse | |
| faulthandler.enable() | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger("jax_ik_server") | |
| def _excepthook(t, v, tb): | |
| logger.exception("Uncaught exception", exc_info=(t, v, tb)) | |
| sys.excepthook = _excepthook | |
| # Environment (CPU only) | |
| os.environ['JAX_PLATFORMS'] = 'cpu' | |
| os.environ['CUDA_VISIBLE_DEVICES'] = '' | |
| import jax | |
| jax.config.update("jax_default_device", "cpu") | |
| import numpy as np | |
| from fastapi import FastAPI, Request, Response | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from starlette.responses import FileResponse, JSONResponse | |
| from jax_ik.helper import deform_mesh, load_mesh_data_from_gltf, load_mesh_data_from_urdf | |
| from jax_ik.hand_specification import HandSpecification | |
| from jax_ik.smplx_statics import left_arm_bounds_dict, right_arm_bounds_dict, complete_full_body_bounds_dict | |
| from jax_ik.ik import InverseKinematicsSolver | |
| from jax_ik.objectives import ( | |
| BoneZeroRotationObj, | |
| CombinedDerivativeObj, | |
| DistanceObjTraj, | |
| SphereCollisionPenaltyObjTraj, | |
| ) | |
| try: | |
| from collision_objectives import FastSphereCollisionPenaltyObjTraj as _FastColl | |
| except Exception: # fallback if file missing | |
| _FastColl = SphereCollisionPenaltyObjTraj | |
| def download_and_setup_files(): | |
| os.makedirs("files", exist_ok=True) | |
| # Pepper | |
| pepper_dir = "files/pepper_description-master" | |
| if not os.path.isdir(pepper_dir): | |
| logger.info("Downloading Pepper model...") | |
| try: | |
| r = requests.get(os.environ.get("PEPPER_DOWNLOAD"), stream=True) | |
| r.raise_for_status() | |
| with zipfile.ZipFile(io.BytesIO(r.content)) as z: | |
| z.extractall("files/") | |
| except Exception as e: | |
| logger.warning(f"Pepper download failed: {e}") | |
| # SMPLX | |
| smplx_file = "files/smplx.glb" | |
| if not os.path.isfile(smplx_file): | |
| logger.info("Downloading SMPLX model...") | |
| try: | |
| r = requests.get(os.environ.get("SMPLX_DOWNLOAD")) | |
| r.raise_for_status() | |
| open(smplx_file, "wb").write(r.content) | |
| except Exception as e: | |
| logger.warning(f"SMPLX download failed: {e}") | |
| class IKServer: | |
| def __init__(self, args): | |
| self.args = args | |
| self.solve_lock = threading.Lock() | |
| self.config_lock = threading.Lock() | |
| self.process = psutil.Process(os.getpid()) | |
| # Failure / resilience counters | |
| self.agent_fail_count = 0 | |
| self.urdf_fail_count = 0 | |
| self.last_agent_error = None | |
| self.last_urdf_error = None | |
| self.agent_solve_counter = 0 | |
| self.urdf_solve_counter = 0 | |
| self.max_fail_retries = 2 # total attempts (initial + retries) | |
| self.circuit_breaker_threshold = 5 # open after N consecutive failures | |
| self.circuit_open_until = 0 # epoch time until which solves short-circuit | |
| self.circuit_backoff_seconds = 5 | |
| # Numeric safety | |
| self.max_num_steps_global = 20000 | |
| self.max_learning_rate = 5.0 | |
| # Animation buffers | |
| self.animation_frames_agent = [] | |
| self.animation_frames_urdf = [] | |
| # FastAPI app | |
| self.app = FastAPI() | |
| self.app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], allow_credentials=True, | |
| allow_methods=["*"], allow_headers=["*"], | |
| ) | |
| os.makedirs("static", exist_ok=True) | |
| os.makedirs("files", exist_ok=True) | |
| self.app.mount("/static", StaticFiles(directory="static"), name="static") | |
| self.app.mount("/files", StaticFiles(directory="files"), name="files") | |
| # Initialize models | |
| self._init_agent() | |
| self._setup_agent_objectives() | |
| self._init_urdf() | |
| self._setup_urdf_objectives() | |
| # Cleanup timing | |
| self.last_cleanup_time = time.time() | |
| self.cleanup_interval = 30 | |
| self._register_routes() | |
| logger.info("Server ready.") | |
| if getattr(self.args, 'warmup', True): | |
| threading.Thread(target=self._warmup_all, daemon=True).start() | |
| # ---------- Utility / Safety ---------- | |
| def _safe_int(self, v, default, lo=None, hi=None): | |
| try: | |
| v = int(v) | |
| except Exception: | |
| v = default | |
| if lo is not None and v < lo: v = lo | |
| if hi is not None and v > hi: v = hi | |
| return v | |
| def _safe_float(self, v, default, lo=None, hi=None): | |
| try: | |
| v = float(v) | |
| if np.isnan(v) or np.isinf(v): raise ValueError | |
| except Exception: | |
| v = default | |
| if lo is not None and v < lo: v = lo | |
| if hi is not None and v > hi: v = hi | |
| return v | |
| def _sanitize_num_steps(self, steps): | |
| return self._safe_int(steps, self.args.num_steps, 1, self.max_num_steps_global) | |
| def _sanitize_bones(self, bones, is_urdf=False): | |
| if not isinstance(bones, (list, tuple)): | |
| return [] | |
| allowed = self.urdf_available_bones if is_urdf else self.available_bones | |
| return [b for b in bones if b in allowed] | |
| def _create_solver(self, bones, is_urdf, num_steps): | |
| if is_urdf: | |
| return InverseKinematicsSolver( | |
| model_file=self.urdf_file, | |
| controlled_bones=bones, | |
| bounds=None, | |
| threshold=self.args.threshold, | |
| num_steps=int(num_steps), | |
| compute_sdf=False, | |
| ) | |
| bounds = [] | |
| for b in bones: | |
| if b in self.bounds_dict: | |
| lower, upper = self.bounds_dict[b] | |
| bounds.extend(list(zip(lower, upper))) | |
| else: | |
| bounds.extend([(-90, 90)] * 3) | |
| return InverseKinematicsSolver( | |
| model_file=self.args.gltf_file, | |
| controlled_bones=bones, | |
| bounds=bounds, | |
| threshold=self.args.threshold, | |
| num_steps=int(num_steps), | |
| compute_sdf=False, | |
| ) | |
| def _rebuild_solver_safe(self, is_urdf=False, force_defaults=False): | |
| try: | |
| if is_urdf: | |
| bones = self.urdf_default_controlled_bones if force_defaults else self.urdf_current_controlled_bones | |
| bones = self._sanitize_bones(bones, True) or self.urdf_default_controlled_bones | |
| self.urdf_solver = self._create_solver(bones, True, self.urdf_current_num_steps) | |
| self.urdf_initial_rotations = np.zeros(len(self.urdf_solver.controlled_bones)*3, dtype=np.float32) | |
| self.urdf_best_angles = self.urdf_initial_rotations.copy() | |
| else: | |
| bones = self.default_controlled_bones if force_defaults else self.current_controlled_bones | |
| bones = self._sanitize_bones(bones, False) or self.default_controlled_bones | |
| self.solver = self._create_solver(bones, False, self.current_num_steps) | |
| self.initial_rotations = np.zeros(len(self.solver.controlled_bones)*3, dtype=np.float32) | |
| self.best_angles = self.initial_rotations.copy() | |
| return True | |
| except Exception as e: | |
| (self.last_urdf_error if is_urdf else setattr(self, 'last_agent_error', str(e))) | |
| logger.exception("Solver rebuild failed") | |
| return False | |
| def _attempt_solver_recovery(self, is_urdf=False): | |
| logger.warning("Attempting full solver recovery (reset to defaults)") | |
| if is_urdf: | |
| self.urdf_current_controlled_bones = self.urdf_default_controlled_bones.copy() | |
| return self._rebuild_solver_safe(True, force_defaults=True) | |
| else: | |
| self.current_controlled_bones = self.default_controlled_bones.copy() | |
| return self._rebuild_solver_safe(False, force_defaults=True) | |
| # ---------- Warmup ---------- | |
| def _warmup_agent(self): | |
| try: | |
| mand = [DistanceObjTraj(target_points=np.array([0.0,0.2,0.35]), bone_name=self.current_end_effector, use_head=True, weight=1.0)] | |
| opt = [BoneZeroRotationObj(weight=0.01)] | |
| _ = self.solver.solve(initial_rotations=self.initial_rotations, learning_rate=self.args.learning_rate, | |
| mandatory_objective_functions=tuple(mand), optional_objective_functions=tuple(opt), | |
| ik_points=1, verbose=False,patience=1,) | |
| logger.info("Agent warmup complete") | |
| except Exception as e: | |
| logger.warning(f"Agent warmup failed: {e}") | |
| def _warmup_urdf(self): | |
| try: | |
| mand = [DistanceObjTraj(target_points=np.array([0.3,0.3,0.35]), bone_name=self.urdf_current_end_effector, use_head=True, weight=1.0)] | |
| opt = [BoneZeroRotationObj(weight=0.01)] | |
| _ = self.urdf_solver.solve(initial_rotations=self.urdf_initial_rotations, learning_rate=self.args.learning_rate, | |
| mandatory_objective_functions=tuple(mand), optional_objective_functions=tuple(opt), | |
| ik_points=1, verbose=False,patience=1) | |
| logger.info("URDF warmup complete") | |
| except Exception as e: | |
| logger.warning(f"URDF warmup failed: {e}") | |
| def _warmup_all(self): | |
| t0 = time.time() | |
| logger.info("Starting background warmup...") | |
| self._warmup_agent() | |
| self._warmup_urdf() | |
| logger.info(f"Warmup finished in {time.time()-t0:.2f}s") | |
| # ---------- Initialization ---------- | |
| def _init_agent(self): | |
| self.current_num_steps = self._sanitize_num_steps(self.args.num_steps) | |
| basic = InverseKinematicsSolver( | |
| model_file=self.args.gltf_file, | |
| controlled_bones=["left_collar"], | |
| bounds=[(-90,90)]*3, | |
| threshold=self.args.threshold, | |
| num_steps=self.current_num_steps, | |
| compute_sdf=False, | |
| ) | |
| self.available_bones = basic.fk_solver.bone_names | |
| self.bounds_dict = complete_full_body_bounds_dict | |
| if self.args.hand == "left": | |
| self.default_controlled_bones = list(left_arm_bounds_dict.keys())#["left_collar","left_shoulder","left_elbow","left_wrist"] | |
| self.default_end_effector = "left_index3" | |
| else: | |
| self.default_controlled_bones = list(right_arm_bounds_dict.keys()) | |
| self.default_end_effector = "right_index3" | |
| self.selectable_bones = [b for b in self.available_bones if b in self.bounds_dict] | |
| self.current_controlled_bones = self.default_controlled_bones.copy() | |
| self.current_end_effector = self.default_end_effector | |
| # Direct solver creation (no cache) | |
| self.solver = self._create_solver(self.current_controlled_bones, False, self.current_num_steps) | |
| self.initial_rotations = np.zeros(len(self.solver.controlled_bones) * 3, dtype=np.float32) | |
| self.best_angles = self.initial_rotations.copy() | |
| self.mesh_data = load_mesh_data_from_gltf(self.args.gltf_file, self.solver.fk_solver) | |
| self.animation_frames_agent = self._frames_from_angles([self.initial_rotations], False) | |
| def _init_urdf(self): | |
| self.urdf_current_num_steps = self._sanitize_num_steps(self.args.num_steps) | |
| self.urdf_file = "files/pepper_description-master/urdf/pepper.urdf" | |
| basic = InverseKinematicsSolver( | |
| model_file=self.urdf_file, | |
| controlled_bones=["LShoulder"], | |
| bounds=None, | |
| threshold=self.args.threshold, | |
| num_steps=self.urdf_current_num_steps, | |
| compute_sdf=False, | |
| ) | |
| self.urdf_available_bones = basic.fk_solver.bone_names | |
| self.urdf_default_controlled_bones = ["LShoulder","LBicep","LElbow","LForeArm","l_wrist"] | |
| self.urdf_default_end_effector = "LFinger13_link" | |
| self.urdf_selectable_bones = list(self.urdf_available_bones) | |
| self.urdf_current_controlled_bones = self.urdf_default_controlled_bones.copy() | |
| self.urdf_current_end_effector = self.urdf_default_end_effector | |
| self.urdf_solver = self._create_solver(self.urdf_current_controlled_bones, True, self.urdf_current_num_steps) | |
| self.urdf_initial_rotations = np.zeros(len(self.urdf_solver.controlled_bones) * 3, dtype=np.float32) | |
| self.urdf_best_angles = self.urdf_initial_rotations.copy() | |
| self.urdf_mesh_data = load_mesh_data_from_urdf(self.urdf_file, self.urdf_solver.fk_solver) | |
| self.animation_frames_urdf = self._frames_from_angles([self.urdf_initial_rotations], True) | |
| # ---------- Objectives ---------- | |
| def _setup_agent_objectives(self): | |
| self.distance_obj = DistanceObjTraj( | |
| target_points=np.array([0.0,0.2,0.35]), | |
| bone_name=self.current_end_effector, | |
| use_head=True, | |
| weight=1.0, | |
| ) | |
| use_fast = os.environ.get("USE_FAST_COLLISION", "1") != "0" | |
| CollCls = _FastColl if use_fast else SphereCollisionPenaltyObjTraj | |
| self.collision_obj = CollCls( | |
| {"center":[0.1,0.0,0.35],"radius":0.1}, | |
| min_clearance=0.0, | |
| weight=1.0, | |
| ) | |
| # store defaults for config exposure | |
| self.collision_default_center = [0.1,0.0,0.35] | |
| self.collision_default_radius = 0.1 | |
| self.collision_default_min_clearance = 0.0 | |
| def _setup_urdf_objectives(self): | |
| self.urdf_distance_obj = DistanceObjTraj( | |
| target_points=np.array([0.3,0.3,0.35]), | |
| bone_name=self.urdf_current_end_effector, | |
| use_head=True, | |
| weight=1.0, | |
| ) | |
| use_fast = os.environ.get("USE_FAST_COLLISION", "1") != "0" | |
| CollCls = _FastColl if use_fast else SphereCollisionPenaltyObjTraj | |
| self.urdf_collision_obj = CollCls( | |
| {"center":[0.2,0.0,0.35],"radius":0.1}, | |
| min_clearance=0.0, | |
| weight=1.0, | |
| ) | |
| self.urdf_collision_default_center = [0.2,0.0,0.35] | |
| self.urdf_collision_default_radius = 0.1 | |
| self.urdf_collision_default_min_clearance = 0.0 | |
| # ---------- Build objectives per request ---------- | |
| def _build_agent_objectives(self, payload): | |
| tgt = np.array(payload.get("target",[0.0,0.2,0.35])) | |
| self.distance_obj.update_params({"bone_name": self.current_end_effector, "target_points": tgt, "weight": float(payload.get("distance_weight",1.0))}) | |
| # collision updates (weight, center, radius, min_clearance) | |
| collision_enabled = payload.get("collision_enabled", False) | |
| coll_update = {"weight": float(payload.get("collision_weight",1.0)) if collision_enabled else 0.0} | |
| center = payload.get("collision_center") | |
| if isinstance(center, (list, tuple)) and len(center)==3: | |
| coll_update["center"] = center | |
| radius = payload.get("collision_radius") | |
| if radius is not None: | |
| coll_update["radius"] = float(radius) | |
| if "collision_min_clearance" in payload: | |
| coll_update["min_clearance"] = float(payload.get("collision_min_clearance", 0.0)) | |
| # Always update params so compiled graph stays stable | |
| self.collision_obj.update_params(coll_update) | |
| subpoints = self._safe_int(payload.get("subpoints",1), 1, 1, 100) | |
| mandatory, optional = [], [] | |
| if payload.get("distance_enabled", True): mandatory.append(self.distance_obj) | |
| # Always include collision objective to avoid JIT retrace when toggling | |
| optional.append(self.collision_obj) | |
| if payload.get("bone_zero_enabled", True): | |
| optional.append(BoneZeroRotationObj(weight=float(payload.get("bone_zero_weight",0.05)))) | |
| if payload.get("derivative_enabled", True) and subpoints > 1: | |
| optional.append(CombinedDerivativeObj(max_order=3, weights=[float(payload.get("derivative_weight",0.05))]*3)) | |
| elif not payload.get("bone_zero_enabled", True) and not payload.get("derivative_enabled", True): | |
| optional.append(BoneZeroRotationObj(weight=0.01)) | |
| # Hand specification | |
| hand_shape = payload.get("hand_shape","None") | |
| hand_position = payload.get("hand_position","None") | |
| params = { | |
| "is_pointing": hand_shape=="Pointing", | |
| "is_shaping": hand_shape=="Shaping", | |
| "is_flat": hand_shape=="Flat", | |
| "look_forward": hand_position=="Look Forward", | |
| "look_45_up": hand_position=="Look 45° Up", | |
| "look_45_down": hand_position=="Look 45° Down", | |
| "look_up": hand_position=="Look Up", | |
| "look_down": hand_position=="Look Down", | |
| "look_45_x_downwards": hand_position=="Look 45° X Downwards", | |
| "look_45_x_upwards": hand_position=="Look 45° X Upwards", | |
| "look_x_inward": hand_position=="Look X Inward", | |
| "look_to_body": hand_position=="Look to Body", | |
| "arm_down": hand_position=="Arm Down", | |
| "arm_45_down": hand_position=="Arm 45° Down", | |
| "arm_flat": hand_position=="Arm Flat", | |
| } | |
| if any(params.values()): | |
| spec = HandSpecification(**params) | |
| optional.extend(spec.get_objectives( | |
| left_hand=self.args.hand=="left", | |
| controlled_bones=self.current_controlled_bones, | |
| full_trajectory=subpoints>1, | |
| last_position=True, | |
| weight=0.5, | |
| )) | |
| return mandatory, optional, subpoints | |
| def _build_urdf_objectives(self, payload): | |
| tgt = np.array(payload.get("target",[0.3,0.3,0.35])) | |
| self.urdf_distance_obj.update_params({"bone_name": self.urdf_current_end_effector, "target_points": tgt, "weight": float(payload.get("distance_weight",1.0))}) | |
| collision_enabled = payload.get("collision_enabled", False) | |
| coll_update = {"weight": float(payload.get("collision_weight",1.0)) if collision_enabled else 0.0} | |
| center = payload.get("collision_center") | |
| if isinstance(center, (list, tuple)) and len(center)==3: | |
| coll_update["center"] = center | |
| radius = payload.get("collision_radius") | |
| if radius is not None: | |
| coll_update["radius"] = float(radius) | |
| if "collision_min_clearance" in payload: | |
| coll_update["min_clearance"] = float(payload.get("collision_min_clearance", 0.0)) | |
| self.urdf_collision_obj.update_params(coll_update) | |
| subpoints = self._safe_int(payload.get("subpoints",1),1,1,100) | |
| mandatory, optional = [], [] | |
| if payload.get("distance_enabled", True): mandatory.append(self.urdf_distance_obj) | |
| # Always include collision objective with weight possibly zero | |
| optional.append(self.urdf_collision_obj) | |
| if payload.get("bone_zero_enabled", True): | |
| optional.append(BoneZeroRotationObj(weight=float(payload.get("bone_zero_weight",0.05)))) | |
| if payload.get("derivative_enabled", True) and subpoints > 1: | |
| optional.append(CombinedDerivativeObj(max_order=3, weights=[float(payload.get("derivative_weight",0.05))]*3)) | |
| elif not payload.get("bone_zero_enabled", True) and not payload.get("derivative_enabled", True): | |
| optional.append(BoneZeroRotationObj(weight=0.01)) | |
| return mandatory, optional, subpoints | |
| # ---------- Frames ---------- | |
| def _frames_from_angles(self, angles_seq, is_urdf): | |
| frames = [] | |
| for ang in angles_seq: | |
| try: | |
| if is_urdf: | |
| verts = deform_mesh(ang, self.urdf_solver.fk_solver, self.urdf_mesh_data) | |
| faces = self.urdf_mesh_data["faces"] | |
| else: | |
| verts = deform_mesh(ang, self.solver.fk_solver, self.mesh_data) | |
| faces = self.mesh_data["faces"] | |
| frames.append({"vertices": verts.tolist(), "faces": faces.tolist()}) | |
| except Exception as e: | |
| logger.warning(f"Mesh deformation failed: {e}") | |
| return frames | |
| # ---------- Configuration ---------- | |
| def configure_agent(self, bones, eff, num_steps=None): | |
| with self.config_lock: | |
| if num_steps is None: | |
| num_steps = self.current_num_steps | |
| num_steps = self._sanitize_num_steps(num_steps) | |
| bones = self._sanitize_bones(bones, False) or self.default_controlled_bones | |
| if eff not in getattr(self, 'available_bones', []): | |
| eff = self.default_end_effector | |
| changed = (bones != self.current_controlled_bones or eff != self.current_end_effector or int(num_steps) != int(self.current_num_steps)) | |
| if changed: | |
| try: | |
| self.current_controlled_bones = bones | |
| self.current_end_effector = eff | |
| self.current_num_steps = int(num_steps) | |
| self.solver = self._create_solver(bones, False, self.current_num_steps) | |
| self.initial_rotations = np.zeros(len(self.solver.controlled_bones)*3, dtype=np.float32) | |
| self.best_angles = self.initial_rotations.copy() | |
| self._setup_agent_objectives() | |
| except Exception as e: | |
| self.agent_fail_count += 1 | |
| self.last_agent_error = str(e) | |
| logger.exception("configure_agent failed; attempting default recovery") | |
| if not self._attempt_solver_recovery(False): | |
| raise | |
| return {"controlled_bones": self.current_controlled_bones, "end_effector": self.current_end_effector, "num_steps": self.current_num_steps} | |
| def configure_urdf(self, bones, eff, num_steps=None): | |
| with self.config_lock: | |
| if num_steps is None: | |
| num_steps = self.urdf_current_num_steps | |
| num_steps = self._sanitize_num_steps(num_steps) | |
| bones = self._sanitize_bones(bones, True) or self.urdf_default_controlled_bones | |
| if eff not in getattr(self, 'urdf_available_bones', []): | |
| eff = self.urdf_default_end_effector | |
| changed = (bones != self.urdf_current_controlled_bones or eff != self.urdf_current_end_effector or int(num_steps) != int(self.urdf_current_num_steps)) | |
| if changed: | |
| try: | |
| self.urdf_current_controlled_bones = bones | |
| self.urdf_current_end_effector = eff | |
| self.urdf_current_num_steps = int(num_steps) | |
| self.urdf_solver = self._create_solver(bones, True, self.urdf_current_num_steps) | |
| self.urdf_initial_rotations = np.zeros(len(self.urdf_solver.controlled_bones)*3, dtype=np.float32) | |
| self.urdf_best_angles = self.urdf_initial_rotations.copy() | |
| self._setup_urdf_objectives() | |
| except Exception as e: | |
| self.urdf_fail_count += 1 | |
| self.last_urdf_error = str(e) | |
| logger.exception("configure_urdf failed; attempting default recovery") | |
| if not self._attempt_solver_recovery(True): | |
| raise | |
| return {"controlled_bones": self.urdf_current_controlled_bones, "end_effector": self.urdf_current_end_effector, "num_steps": self.urdf_current_num_steps} | |
| # ---------- Solve (hardened) ---------- | |
| def _solve_core(self, solver, init, mand, opt, subpoints, lr): | |
| return solver.solve( | |
| initial_rotations=init, | |
| learning_rate=lr, | |
| mandatory_objective_functions=tuple(mand), | |
| optional_objective_functions=tuple(opt), | |
| ik_points=subpoints, | |
| verbose=False, | |
| patience=100, | |
| ) | |
| def solve_agent(self, payload, last_only=False): | |
| if time.time() < self.circuit_open_until: | |
| return {"status":"circuit_open","error":"agent circuit open","frames":len(self.animation_frames_agent)} | |
| mand, opt, sub = self._build_agent_objectives(payload) | |
| lr = self._safe_float(payload.get("learning_rate", self.args.learning_rate), self.args.learning_rate, 1e-5, self.max_learning_rate) | |
| attempts = 0 | |
| best_angles = None; obj_val = float('inf'); steps = 0; error_msg=None; recovered=False | |
| start=time.time() | |
| while attempts < self.max_fail_retries: | |
| attempts += 1 | |
| try: | |
| best_angles, obj_val, steps = self._solve_core(self.solver, self.initial_rotations, mand, opt, sub, lr) | |
| break | |
| except Exception as e: | |
| self.agent_fail_count += 1 | |
| error_msg = str(e) | |
| self.last_agent_error = error_msg | |
| logger.warning(f"Agent solve attempt {attempts} failed: {e}") | |
| if attempts < self.max_fail_retries: | |
| # First retry: rebuild current; second: full recovery | |
| if attempts == 1: | |
| self._rebuild_solver_safe(False, force_defaults=False) | |
| else: | |
| recovered = self._attempt_solver_recovery(False) | |
| else: | |
| break | |
| if best_angles is None: | |
| # Provide at least one frame (initial) | |
| frame_angles = [self.initial_rotations] | |
| if recovered: | |
| frame_angles = [self.initial_rotations] | |
| else: | |
| self.best_angles = best_angles[-1].copy() | |
| self.initial_rotations = self.best_angles.copy() | |
| frame_angles = [best_angles[-1]] if last_only else best_angles | |
| self.animation_frames_agent = self._frames_from_angles(frame_angles, False) | |
| self.agent_solve_counter += 1 | |
| # Circuit breaker update | |
| if best_angles is None and self.agent_fail_count >= self.circuit_breaker_threshold: | |
| self.circuit_open_until = time.time() + self.circuit_backoff_seconds | |
| status = "ok" if best_angles is not None else ("recovered" if recovered else "failed") | |
| return { | |
| "status":status, | |
| "solve_time": time.time()-start, | |
| "iterations": steps, | |
| "objective": obj_val, | |
| "frames": len(self.animation_frames_agent), | |
| "solve_id": self.agent_solve_counter, | |
| "attempts": attempts, | |
| "error": error_msg, | |
| } | |
| def solve_urdf(self, payload, last_only=False): | |
| if time.time() < self.circuit_open_until: | |
| return {"status":"circuit_open","error":"urdf circuit open","frames":len(self.animation_frames_urdf)} | |
| mand, opt, sub = self._build_urdf_objectives(payload) | |
| lr = self._safe_float(payload.get("learning_rate", self.args.learning_rate), self.args.learning_rate, 1e-5, self.max_learning_rate) | |
| attempts=0 | |
| best_angles=None; obj_val=float('inf'); steps=0; error_msg=None; recovered=False | |
| start=time.time() | |
| while attempts < self.max_fail_retries: | |
| attempts += 1 | |
| try: | |
| best_angles, obj_val, steps = self._solve_core(self.urdf_solver, self.urdf_initial_rotations, mand, opt, sub, lr) | |
| break | |
| except Exception as e: | |
| self.urdf_fail_count += 1 | |
| error_msg = str(e) | |
| self.last_urdf_error = error_msg | |
| logger.warning(f"URDF solve attempt {attempts} failed: {e}") | |
| if attempts < self.max_fail_retries: | |
| if attempts == 1: | |
| self._rebuild_solver_safe(True, force_defaults=False) | |
| else: | |
| recovered = self._attempt_solver_recovery(True) | |
| else: | |
| break | |
| if best_angles is None: | |
| frame_angles = [self.urdf_initial_rotations] | |
| if recovered: | |
| frame_angles = [self.urdf_initial_rotations] | |
| else: | |
| self.urdf_best_angles = best_angles[-1].copy() | |
| self.urdf_initial_rotations = self.urdf_best_angles.copy() | |
| frame_angles = [best_angles[-1]] if last_only else best_angles | |
| self.animation_frames_urdf = self._frames_from_angles(frame_angles, True) | |
| self.urdf_solve_counter += 1 | |
| if best_angles is None and self.urdf_fail_count >= self.circuit_breaker_threshold: | |
| self.circuit_open_until = time.time() + self.circuit_backoff_seconds | |
| status = "ok" if best_angles is not None else ("recovered" if recovered else "failed") | |
| return { | |
| "status":status, | |
| "solve_time": time.time()-start, | |
| "iterations": steps, | |
| "objective": obj_val, | |
| "frames": len(self.animation_frames_urdf), | |
| "solve_id": self.urdf_solve_counter, | |
| "attempts": attempts, | |
| "error": error_msg, | |
| } | |
| # ---------- Housekeeping ---------- | |
| def _cleanup(self): | |
| now = time.time() | |
| if now - self.last_cleanup_time < self.cleanup_interval: | |
| return | |
| try: | |
| gc.collect() | |
| # Removed cache invalidation logic (no caches anymore) | |
| except Exception: | |
| pass | |
| self.last_cleanup_time = now | |
| # ---------- API ---------- | |
| def _register_routes(self): | |
| def index(): | |
| return FileResponse("static/index.html") | |
| def legacy(): | |
| return FileResponse("static/index.html") | |
| def animation(request: Request): | |
| model = request.query_params.get("model","agent").lower() | |
| data = self.animation_frames_urdf if model == "pepper" else self.animation_frames_agent | |
| return Response(content=json.dumps(data), media_type="application/json") | |
| def cfg(model: str = "agent"): | |
| if model.lower() == "pepper": | |
| return { | |
| "model":"pepper", | |
| "available_bones": self.urdf_available_bones, | |
| "selectable_bones": self.urdf_selectable_bones, | |
| "default_controlled_bones": self.urdf_current_controlled_bones, | |
| "default_end_effector": self.urdf_current_end_effector, | |
| "end_effector_choices": self.urdf_available_bones, | |
| "hand_shapes": [], | |
| "hand_positions": [], | |
| "max_subpoints": 20, | |
| "default_num_steps": self.urdf_current_num_steps, | |
| "collision_default_center": getattr(self, 'urdf_collision_default_center', [0.2,0.0,0.35]), | |
| "collision_default_radius": getattr(self, 'urdf_collision_default_radius', 0.1), | |
| "collision_default_min_clearance": getattr(self, 'urdf_collision_default_min_clearance', 0.0), | |
| } | |
| return { | |
| "model":"agent", | |
| "available_bones": self.available_bones, | |
| "selectable_bones": self.selectable_bones, | |
| "default_controlled_bones": self.current_controlled_bones, | |
| "default_end_effector": self.current_end_effector, | |
| "end_effector_choices": self.available_bones, | |
| "hand_shapes":["None","Pointing","Shaping","Flat"], | |
| "hand_positions":[ | |
| "None","Look Forward","Look 45° Up","Look 45° Down","Look Up","Look Down", | |
| "Look 45° X Downwards","Look 45° X Upwards","Look X Inward","Look to Body", | |
| "Arm Down","Arm 45° Down","Arm Flat" | |
| ], | |
| "max_subpoints": 20, | |
| "default_num_steps": self.current_num_steps, | |
| "collision_default_center": getattr(self, 'collision_default_center', [0.1,0.0,0.35]), | |
| "collision_default_radius": getattr(self, 'collision_default_radius', 0.1), | |
| "collision_default_min_clearance": getattr(self, 'collision_default_min_clearance', 0.0), | |
| } | |
| async def configure(request: Request): | |
| payload = await request.json() | |
| model = payload.get("model","agent").lower() | |
| num_steps = self._sanitize_num_steps(payload.get("num_steps", self.args.num_steps)) | |
| try: | |
| if model == "pepper": | |
| cfg = self.configure_urdf(payload.get("controlled_bones", []), payload.get("end_effector"), num_steps=num_steps) | |
| else: | |
| cfg = self.configure_agent(payload.get("controlled_bones", []), payload.get("end_effector"), num_steps=num_steps) | |
| return JSONResponse({"status":"ok","config":cfg}) | |
| except Exception as e: | |
| return JSONResponse({"status":"error","message":str(e)}, status_code=500) | |
| async def solve(request: Request): | |
| payload = await request.json() | |
| model = payload.get("model","agent").lower() | |
| return_mode = payload.get("frames_mode", "auto") | |
| num_steps = self._sanitize_num_steps(payload.get("num_steps", self.args.num_steps)) | |
| subpoints = self._safe_int(payload.get("subpoints",1),1,1,100) | |
| last_only = (return_mode != 'all' and subpoints == 1) | |
| self._cleanup() | |
| with self.solve_lock: | |
| try: | |
| if model == "pepper": | |
| self.configure_urdf(payload.get("controlled_bones", []), payload.get("end_effector", self.urdf_current_end_effector), num_steps=num_steps) | |
| result = self.solve_urdf(payload, last_only=last_only); result["model"]="pepper"; result["num_steps"] = num_steps | |
| if last_only: | |
| frames = list(self.animation_frames_urdf) | |
| elif return_mode == "all" or (return_mode == "auto" and subpoints > 1): | |
| frames = list(self.animation_frames_urdf) | |
| else: | |
| frames = [self.animation_frames_urdf[-1]] | |
| result["frames_data"] = frames | |
| else: | |
| self.configure_agent(payload.get("controlled_bones", []), payload.get("end_effector", self.current_end_effector), num_steps=num_steps) | |
| result = self.solve_agent(payload, last_only=last_only); result["model"]="agent"; result["num_steps"] = num_steps | |
| if last_only: | |
| frames = list(self.animation_frames_agent) | |
| elif return_mode == "all" or (return_mode == "auto" and subpoints > 1): | |
| frames = list(self.animation_frames_agent) | |
| else: | |
| frames = [self.animation_frames_agent[-1]] | |
| result["frames_data"] = frames | |
| status = result.get("status","ok") | |
| top_status = "ok" if status in ("ok","recovered") else status | |
| return JSONResponse({"status":top_status,"result":result}) | |
| except Exception as e: | |
| logger.exception("Solve failed unrecoverably") | |
| return JSONResponse({"status":"error","message":str(e)}, status_code=500) | |
| def reset(): | |
| with self.solve_lock: | |
| self._attempt_solver_recovery(False) | |
| self._attempt_solver_recovery(True) | |
| return {"status":"ok","message":"solvers reset"} | |
| def health(): | |
| rss = self.process.memory_info().rss if self.process else 0 | |
| return { | |
| "status":"ok", | |
| "agent_frames": len(self.animation_frames_agent), | |
| "urdf_frames": len(self.animation_frames_urdf), | |
| "agent_fail_count": self.agent_fail_count, | |
| "urdf_fail_count": self.urdf_fail_count, | |
| "last_agent_error": self.last_agent_error, | |
| "last_urdf_error": self.last_urdf_error, | |
| "circuit_open_for": max(0, int(self.circuit_open_until - time.time())), | |
| "memory_rss_mb": round(rss/1024/1024,1), | |
| } | |
| def favicon(): | |
| return Response(status_code=204) | |
| # ---------- Main ---------- | |
| def main(): | |
| parser = configargparse.ArgumentParser(description="Inverse Kinematics Solver - Three.js Web UI", default_config_files=["config.ini"]) | |
| parser.add_argument("--gltf_file", type=str, default="files/smplx.glb") | |
| parser.add_argument("--hand", type=str, choices=["left","right"], default="left") | |
| parser.add_argument("--threshold", type=float, default=0.005) | |
| parser.add_argument("--num_steps", type=int, default=100) | |
| parser.add_argument("--learning_rate", type=float, default=0.2) | |
| parser.add_argument("--subpoints", type=int, default=1) | |
| parser.add_argument("--api_port", type=int, default=17861) | |
| parser.add_argument("--warmup", action='store_true', default=True, help="Enable background JIT warmup") | |
| args = parser.parse_args() | |
| download_and_setup_files() | |
| srv = IKServer(args) | |
| import uvicorn | |
| uvicorn.run(srv.app, host="0.0.0.0", port=args.api_port) | |
| if __name__ == "__main__": | |
| main() | |