JAX-IK / app.py
hvoss-techfak's picture
Initial commit
3f7d1b3
import logging, faulthandler, sys, time, os, requests, zipfile, io, gc, threading, psutil, json, configargparse
faulthandler.enable()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("jax_ik_server")
def _excepthook(t, v, tb):
logger.exception("Uncaught exception", exc_info=(t, v, tb))
sys.excepthook = _excepthook
# Environment (CPU only)
os.environ['JAX_PLATFORMS'] = 'cpu'
os.environ['CUDA_VISIBLE_DEVICES'] = ''
import jax
jax.config.update("jax_default_device", "cpu")
import numpy as np
from fastapi import FastAPI, Request, Response
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import FileResponse, JSONResponse
from jax_ik.helper import deform_mesh, load_mesh_data_from_gltf, load_mesh_data_from_urdf
from jax_ik.hand_specification import HandSpecification
from jax_ik.smplx_statics import left_arm_bounds_dict, right_arm_bounds_dict, complete_full_body_bounds_dict
from jax_ik.ik import InverseKinematicsSolver
from jax_ik.objectives import (
BoneZeroRotationObj,
CombinedDerivativeObj,
DistanceObjTraj,
SphereCollisionPenaltyObjTraj,
)
try:
from collision_objectives import FastSphereCollisionPenaltyObjTraj as _FastColl
except Exception: # fallback if file missing
_FastColl = SphereCollisionPenaltyObjTraj
def download_and_setup_files():
os.makedirs("files", exist_ok=True)
# Pepper
pepper_dir = "files/pepper_description-master"
if not os.path.isdir(pepper_dir):
logger.info("Downloading Pepper model...")
try:
r = requests.get(os.environ.get("PEPPER_DOWNLOAD"), stream=True)
r.raise_for_status()
with zipfile.ZipFile(io.BytesIO(r.content)) as z:
z.extractall("files/")
except Exception as e:
logger.warning(f"Pepper download failed: {e}")
# SMPLX
smplx_file = "files/smplx.glb"
if not os.path.isfile(smplx_file):
logger.info("Downloading SMPLX model...")
try:
r = requests.get(os.environ.get("SMPLX_DOWNLOAD"))
r.raise_for_status()
open(smplx_file, "wb").write(r.content)
except Exception as e:
logger.warning(f"SMPLX download failed: {e}")
class IKServer:
def __init__(self, args):
self.args = args
self.solve_lock = threading.Lock()
self.config_lock = threading.Lock()
self.process = psutil.Process(os.getpid())
# Failure / resilience counters
self.agent_fail_count = 0
self.urdf_fail_count = 0
self.last_agent_error = None
self.last_urdf_error = None
self.agent_solve_counter = 0
self.urdf_solve_counter = 0
self.max_fail_retries = 2 # total attempts (initial + retries)
self.circuit_breaker_threshold = 5 # open after N consecutive failures
self.circuit_open_until = 0 # epoch time until which solves short-circuit
self.circuit_backoff_seconds = 5
# Numeric safety
self.max_num_steps_global = 20000
self.max_learning_rate = 5.0
# Animation buffers
self.animation_frames_agent = []
self.animation_frames_urdf = []
# FastAPI app
self.app = FastAPI()
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"], allow_credentials=True,
allow_methods=["*"], allow_headers=["*"],
)
os.makedirs("static", exist_ok=True)
os.makedirs("files", exist_ok=True)
self.app.mount("/static", StaticFiles(directory="static"), name="static")
self.app.mount("/files", StaticFiles(directory="files"), name="files")
# Initialize models
self._init_agent()
self._setup_agent_objectives()
self._init_urdf()
self._setup_urdf_objectives()
# Cleanup timing
self.last_cleanup_time = time.time()
self.cleanup_interval = 30
self._register_routes()
logger.info("Server ready.")
if getattr(self.args, 'warmup', True):
threading.Thread(target=self._warmup_all, daemon=True).start()
# ---------- Utility / Safety ----------
def _safe_int(self, v, default, lo=None, hi=None):
try:
v = int(v)
except Exception:
v = default
if lo is not None and v < lo: v = lo
if hi is not None and v > hi: v = hi
return v
def _safe_float(self, v, default, lo=None, hi=None):
try:
v = float(v)
if np.isnan(v) or np.isinf(v): raise ValueError
except Exception:
v = default
if lo is not None and v < lo: v = lo
if hi is not None and v > hi: v = hi
return v
def _sanitize_num_steps(self, steps):
return self._safe_int(steps, self.args.num_steps, 1, self.max_num_steps_global)
def _sanitize_bones(self, bones, is_urdf=False):
if not isinstance(bones, (list, tuple)):
return []
allowed = self.urdf_available_bones if is_urdf else self.available_bones
return [b for b in bones if b in allowed]
def _create_solver(self, bones, is_urdf, num_steps):
if is_urdf:
return InverseKinematicsSolver(
model_file=self.urdf_file,
controlled_bones=bones,
bounds=None,
threshold=self.args.threshold,
num_steps=int(num_steps),
compute_sdf=False,
)
bounds = []
for b in bones:
if b in self.bounds_dict:
lower, upper = self.bounds_dict[b]
bounds.extend(list(zip(lower, upper)))
else:
bounds.extend([(-90, 90)] * 3)
return InverseKinematicsSolver(
model_file=self.args.gltf_file,
controlled_bones=bones,
bounds=bounds,
threshold=self.args.threshold,
num_steps=int(num_steps),
compute_sdf=False,
)
def _rebuild_solver_safe(self, is_urdf=False, force_defaults=False):
try:
if is_urdf:
bones = self.urdf_default_controlled_bones if force_defaults else self.urdf_current_controlled_bones
bones = self._sanitize_bones(bones, True) or self.urdf_default_controlled_bones
self.urdf_solver = self._create_solver(bones, True, self.urdf_current_num_steps)
self.urdf_initial_rotations = np.zeros(len(self.urdf_solver.controlled_bones)*3, dtype=np.float32)
self.urdf_best_angles = self.urdf_initial_rotations.copy()
else:
bones = self.default_controlled_bones if force_defaults else self.current_controlled_bones
bones = self._sanitize_bones(bones, False) or self.default_controlled_bones
self.solver = self._create_solver(bones, False, self.current_num_steps)
self.initial_rotations = np.zeros(len(self.solver.controlled_bones)*3, dtype=np.float32)
self.best_angles = self.initial_rotations.copy()
return True
except Exception as e:
(self.last_urdf_error if is_urdf else setattr(self, 'last_agent_error', str(e)))
logger.exception("Solver rebuild failed")
return False
def _attempt_solver_recovery(self, is_urdf=False):
logger.warning("Attempting full solver recovery (reset to defaults)")
if is_urdf:
self.urdf_current_controlled_bones = self.urdf_default_controlled_bones.copy()
return self._rebuild_solver_safe(True, force_defaults=True)
else:
self.current_controlled_bones = self.default_controlled_bones.copy()
return self._rebuild_solver_safe(False, force_defaults=True)
# ---------- Warmup ----------
def _warmup_agent(self):
try:
mand = [DistanceObjTraj(target_points=np.array([0.0,0.2,0.35]), bone_name=self.current_end_effector, use_head=True, weight=1.0)]
opt = [BoneZeroRotationObj(weight=0.01)]
_ = self.solver.solve(initial_rotations=self.initial_rotations, learning_rate=self.args.learning_rate,
mandatory_objective_functions=tuple(mand), optional_objective_functions=tuple(opt),
ik_points=1, verbose=False,patience=1,)
logger.info("Agent warmup complete")
except Exception as e:
logger.warning(f"Agent warmup failed: {e}")
def _warmup_urdf(self):
try:
mand = [DistanceObjTraj(target_points=np.array([0.3,0.3,0.35]), bone_name=self.urdf_current_end_effector, use_head=True, weight=1.0)]
opt = [BoneZeroRotationObj(weight=0.01)]
_ = self.urdf_solver.solve(initial_rotations=self.urdf_initial_rotations, learning_rate=self.args.learning_rate,
mandatory_objective_functions=tuple(mand), optional_objective_functions=tuple(opt),
ik_points=1, verbose=False,patience=1)
logger.info("URDF warmup complete")
except Exception as e:
logger.warning(f"URDF warmup failed: {e}")
def _warmup_all(self):
t0 = time.time()
logger.info("Starting background warmup...")
self._warmup_agent()
self._warmup_urdf()
logger.info(f"Warmup finished in {time.time()-t0:.2f}s")
# ---------- Initialization ----------
def _init_agent(self):
self.current_num_steps = self._sanitize_num_steps(self.args.num_steps)
basic = InverseKinematicsSolver(
model_file=self.args.gltf_file,
controlled_bones=["left_collar"],
bounds=[(-90,90)]*3,
threshold=self.args.threshold,
num_steps=self.current_num_steps,
compute_sdf=False,
)
self.available_bones = basic.fk_solver.bone_names
self.bounds_dict = complete_full_body_bounds_dict
if self.args.hand == "left":
self.default_controlled_bones = list(left_arm_bounds_dict.keys())#["left_collar","left_shoulder","left_elbow","left_wrist"]
self.default_end_effector = "left_index3"
else:
self.default_controlled_bones = list(right_arm_bounds_dict.keys())
self.default_end_effector = "right_index3"
self.selectable_bones = [b for b in self.available_bones if b in self.bounds_dict]
self.current_controlled_bones = self.default_controlled_bones.copy()
self.current_end_effector = self.default_end_effector
# Direct solver creation (no cache)
self.solver = self._create_solver(self.current_controlled_bones, False, self.current_num_steps)
self.initial_rotations = np.zeros(len(self.solver.controlled_bones) * 3, dtype=np.float32)
self.best_angles = self.initial_rotations.copy()
self.mesh_data = load_mesh_data_from_gltf(self.args.gltf_file, self.solver.fk_solver)
self.animation_frames_agent = self._frames_from_angles([self.initial_rotations], False)
def _init_urdf(self):
self.urdf_current_num_steps = self._sanitize_num_steps(self.args.num_steps)
self.urdf_file = "files/pepper_description-master/urdf/pepper.urdf"
basic = InverseKinematicsSolver(
model_file=self.urdf_file,
controlled_bones=["LShoulder"],
bounds=None,
threshold=self.args.threshold,
num_steps=self.urdf_current_num_steps,
compute_sdf=False,
)
self.urdf_available_bones = basic.fk_solver.bone_names
self.urdf_default_controlled_bones = ["LShoulder","LBicep","LElbow","LForeArm","l_wrist"]
self.urdf_default_end_effector = "LFinger13_link"
self.urdf_selectable_bones = list(self.urdf_available_bones)
self.urdf_current_controlled_bones = self.urdf_default_controlled_bones.copy()
self.urdf_current_end_effector = self.urdf_default_end_effector
self.urdf_solver = self._create_solver(self.urdf_current_controlled_bones, True, self.urdf_current_num_steps)
self.urdf_initial_rotations = np.zeros(len(self.urdf_solver.controlled_bones) * 3, dtype=np.float32)
self.urdf_best_angles = self.urdf_initial_rotations.copy()
self.urdf_mesh_data = load_mesh_data_from_urdf(self.urdf_file, self.urdf_solver.fk_solver)
self.animation_frames_urdf = self._frames_from_angles([self.urdf_initial_rotations], True)
# ---------- Objectives ----------
def _setup_agent_objectives(self):
self.distance_obj = DistanceObjTraj(
target_points=np.array([0.0,0.2,0.35]),
bone_name=self.current_end_effector,
use_head=True,
weight=1.0,
)
use_fast = os.environ.get("USE_FAST_COLLISION", "1") != "0"
CollCls = _FastColl if use_fast else SphereCollisionPenaltyObjTraj
self.collision_obj = CollCls(
{"center":[0.1,0.0,0.35],"radius":0.1},
min_clearance=0.0,
weight=1.0,
)
# store defaults for config exposure
self.collision_default_center = [0.1,0.0,0.35]
self.collision_default_radius = 0.1
self.collision_default_min_clearance = 0.0
def _setup_urdf_objectives(self):
self.urdf_distance_obj = DistanceObjTraj(
target_points=np.array([0.3,0.3,0.35]),
bone_name=self.urdf_current_end_effector,
use_head=True,
weight=1.0,
)
use_fast = os.environ.get("USE_FAST_COLLISION", "1") != "0"
CollCls = _FastColl if use_fast else SphereCollisionPenaltyObjTraj
self.urdf_collision_obj = CollCls(
{"center":[0.2,0.0,0.35],"radius":0.1},
min_clearance=0.0,
weight=1.0,
)
self.urdf_collision_default_center = [0.2,0.0,0.35]
self.urdf_collision_default_radius = 0.1
self.urdf_collision_default_min_clearance = 0.0
# ---------- Build objectives per request ----------
def _build_agent_objectives(self, payload):
tgt = np.array(payload.get("target",[0.0,0.2,0.35]))
self.distance_obj.update_params({"bone_name": self.current_end_effector, "target_points": tgt, "weight": float(payload.get("distance_weight",1.0))})
# collision updates (weight, center, radius, min_clearance)
collision_enabled = payload.get("collision_enabled", False)
coll_update = {"weight": float(payload.get("collision_weight",1.0)) if collision_enabled else 0.0}
center = payload.get("collision_center")
if isinstance(center, (list, tuple)) and len(center)==3:
coll_update["center"] = center
radius = payload.get("collision_radius")
if radius is not None:
coll_update["radius"] = float(radius)
if "collision_min_clearance" in payload:
coll_update["min_clearance"] = float(payload.get("collision_min_clearance", 0.0))
# Always update params so compiled graph stays stable
self.collision_obj.update_params(coll_update)
subpoints = self._safe_int(payload.get("subpoints",1), 1, 1, 100)
mandatory, optional = [], []
if payload.get("distance_enabled", True): mandatory.append(self.distance_obj)
# Always include collision objective to avoid JIT retrace when toggling
optional.append(self.collision_obj)
if payload.get("bone_zero_enabled", True):
optional.append(BoneZeroRotationObj(weight=float(payload.get("bone_zero_weight",0.05))))
if payload.get("derivative_enabled", True) and subpoints > 1:
optional.append(CombinedDerivativeObj(max_order=3, weights=[float(payload.get("derivative_weight",0.05))]*3))
elif not payload.get("bone_zero_enabled", True) and not payload.get("derivative_enabled", True):
optional.append(BoneZeroRotationObj(weight=0.01))
# Hand specification
hand_shape = payload.get("hand_shape","None")
hand_position = payload.get("hand_position","None")
params = {
"is_pointing": hand_shape=="Pointing",
"is_shaping": hand_shape=="Shaping",
"is_flat": hand_shape=="Flat",
"look_forward": hand_position=="Look Forward",
"look_45_up": hand_position=="Look 45° Up",
"look_45_down": hand_position=="Look 45° Down",
"look_up": hand_position=="Look Up",
"look_down": hand_position=="Look Down",
"look_45_x_downwards": hand_position=="Look 45° X Downwards",
"look_45_x_upwards": hand_position=="Look 45° X Upwards",
"look_x_inward": hand_position=="Look X Inward",
"look_to_body": hand_position=="Look to Body",
"arm_down": hand_position=="Arm Down",
"arm_45_down": hand_position=="Arm 45° Down",
"arm_flat": hand_position=="Arm Flat",
}
if any(params.values()):
spec = HandSpecification(**params)
optional.extend(spec.get_objectives(
left_hand=self.args.hand=="left",
controlled_bones=self.current_controlled_bones,
full_trajectory=subpoints>1,
last_position=True,
weight=0.5,
))
return mandatory, optional, subpoints
def _build_urdf_objectives(self, payload):
tgt = np.array(payload.get("target",[0.3,0.3,0.35]))
self.urdf_distance_obj.update_params({"bone_name": self.urdf_current_end_effector, "target_points": tgt, "weight": float(payload.get("distance_weight",1.0))})
collision_enabled = payload.get("collision_enabled", False)
coll_update = {"weight": float(payload.get("collision_weight",1.0)) if collision_enabled else 0.0}
center = payload.get("collision_center")
if isinstance(center, (list, tuple)) and len(center)==3:
coll_update["center"] = center
radius = payload.get("collision_radius")
if radius is not None:
coll_update["radius"] = float(radius)
if "collision_min_clearance" in payload:
coll_update["min_clearance"] = float(payload.get("collision_min_clearance", 0.0))
self.urdf_collision_obj.update_params(coll_update)
subpoints = self._safe_int(payload.get("subpoints",1),1,1,100)
mandatory, optional = [], []
if payload.get("distance_enabled", True): mandatory.append(self.urdf_distance_obj)
# Always include collision objective with weight possibly zero
optional.append(self.urdf_collision_obj)
if payload.get("bone_zero_enabled", True):
optional.append(BoneZeroRotationObj(weight=float(payload.get("bone_zero_weight",0.05))))
if payload.get("derivative_enabled", True) and subpoints > 1:
optional.append(CombinedDerivativeObj(max_order=3, weights=[float(payload.get("derivative_weight",0.05))]*3))
elif not payload.get("bone_zero_enabled", True) and not payload.get("derivative_enabled", True):
optional.append(BoneZeroRotationObj(weight=0.01))
return mandatory, optional, subpoints
# ---------- Frames ----------
def _frames_from_angles(self, angles_seq, is_urdf):
frames = []
for ang in angles_seq:
try:
if is_urdf:
verts = deform_mesh(ang, self.urdf_solver.fk_solver, self.urdf_mesh_data)
faces = self.urdf_mesh_data["faces"]
else:
verts = deform_mesh(ang, self.solver.fk_solver, self.mesh_data)
faces = self.mesh_data["faces"]
frames.append({"vertices": verts.tolist(), "faces": faces.tolist()})
except Exception as e:
logger.warning(f"Mesh deformation failed: {e}")
return frames
# ---------- Configuration ----------
def configure_agent(self, bones, eff, num_steps=None):
with self.config_lock:
if num_steps is None:
num_steps = self.current_num_steps
num_steps = self._sanitize_num_steps(num_steps)
bones = self._sanitize_bones(bones, False) or self.default_controlled_bones
if eff not in getattr(self, 'available_bones', []):
eff = self.default_end_effector
changed = (bones != self.current_controlled_bones or eff != self.current_end_effector or int(num_steps) != int(self.current_num_steps))
if changed:
try:
self.current_controlled_bones = bones
self.current_end_effector = eff
self.current_num_steps = int(num_steps)
self.solver = self._create_solver(bones, False, self.current_num_steps)
self.initial_rotations = np.zeros(len(self.solver.controlled_bones)*3, dtype=np.float32)
self.best_angles = self.initial_rotations.copy()
self._setup_agent_objectives()
except Exception as e:
self.agent_fail_count += 1
self.last_agent_error = str(e)
logger.exception("configure_agent failed; attempting default recovery")
if not self._attempt_solver_recovery(False):
raise
return {"controlled_bones": self.current_controlled_bones, "end_effector": self.current_end_effector, "num_steps": self.current_num_steps}
def configure_urdf(self, bones, eff, num_steps=None):
with self.config_lock:
if num_steps is None:
num_steps = self.urdf_current_num_steps
num_steps = self._sanitize_num_steps(num_steps)
bones = self._sanitize_bones(bones, True) or self.urdf_default_controlled_bones
if eff not in getattr(self, 'urdf_available_bones', []):
eff = self.urdf_default_end_effector
changed = (bones != self.urdf_current_controlled_bones or eff != self.urdf_current_end_effector or int(num_steps) != int(self.urdf_current_num_steps))
if changed:
try:
self.urdf_current_controlled_bones = bones
self.urdf_current_end_effector = eff
self.urdf_current_num_steps = int(num_steps)
self.urdf_solver = self._create_solver(bones, True, self.urdf_current_num_steps)
self.urdf_initial_rotations = np.zeros(len(self.urdf_solver.controlled_bones)*3, dtype=np.float32)
self.urdf_best_angles = self.urdf_initial_rotations.copy()
self._setup_urdf_objectives()
except Exception as e:
self.urdf_fail_count += 1
self.last_urdf_error = str(e)
logger.exception("configure_urdf failed; attempting default recovery")
if not self._attempt_solver_recovery(True):
raise
return {"controlled_bones": self.urdf_current_controlled_bones, "end_effector": self.urdf_current_end_effector, "num_steps": self.urdf_current_num_steps}
# ---------- Solve (hardened) ----------
def _solve_core(self, solver, init, mand, opt, subpoints, lr):
return solver.solve(
initial_rotations=init,
learning_rate=lr,
mandatory_objective_functions=tuple(mand),
optional_objective_functions=tuple(opt),
ik_points=subpoints,
verbose=False,
patience=100,
)
def solve_agent(self, payload, last_only=False):
if time.time() < self.circuit_open_until:
return {"status":"circuit_open","error":"agent circuit open","frames":len(self.animation_frames_agent)}
mand, opt, sub = self._build_agent_objectives(payload)
lr = self._safe_float(payload.get("learning_rate", self.args.learning_rate), self.args.learning_rate, 1e-5, self.max_learning_rate)
attempts = 0
best_angles = None; obj_val = float('inf'); steps = 0; error_msg=None; recovered=False
start=time.time()
while attempts < self.max_fail_retries:
attempts += 1
try:
best_angles, obj_val, steps = self._solve_core(self.solver, self.initial_rotations, mand, opt, sub, lr)
break
except Exception as e:
self.agent_fail_count += 1
error_msg = str(e)
self.last_agent_error = error_msg
logger.warning(f"Agent solve attempt {attempts} failed: {e}")
if attempts < self.max_fail_retries:
# First retry: rebuild current; second: full recovery
if attempts == 1:
self._rebuild_solver_safe(False, force_defaults=False)
else:
recovered = self._attempt_solver_recovery(False)
else:
break
if best_angles is None:
# Provide at least one frame (initial)
frame_angles = [self.initial_rotations]
if recovered:
frame_angles = [self.initial_rotations]
else:
self.best_angles = best_angles[-1].copy()
self.initial_rotations = self.best_angles.copy()
frame_angles = [best_angles[-1]] if last_only else best_angles
self.animation_frames_agent = self._frames_from_angles(frame_angles, False)
self.agent_solve_counter += 1
# Circuit breaker update
if best_angles is None and self.agent_fail_count >= self.circuit_breaker_threshold:
self.circuit_open_until = time.time() + self.circuit_backoff_seconds
status = "ok" if best_angles is not None else ("recovered" if recovered else "failed")
return {
"status":status,
"solve_time": time.time()-start,
"iterations": steps,
"objective": obj_val,
"frames": len(self.animation_frames_agent),
"solve_id": self.agent_solve_counter,
"attempts": attempts,
"error": error_msg,
}
def solve_urdf(self, payload, last_only=False):
if time.time() < self.circuit_open_until:
return {"status":"circuit_open","error":"urdf circuit open","frames":len(self.animation_frames_urdf)}
mand, opt, sub = self._build_urdf_objectives(payload)
lr = self._safe_float(payload.get("learning_rate", self.args.learning_rate), self.args.learning_rate, 1e-5, self.max_learning_rate)
attempts=0
best_angles=None; obj_val=float('inf'); steps=0; error_msg=None; recovered=False
start=time.time()
while attempts < self.max_fail_retries:
attempts += 1
try:
best_angles, obj_val, steps = self._solve_core(self.urdf_solver, self.urdf_initial_rotations, mand, opt, sub, lr)
break
except Exception as e:
self.urdf_fail_count += 1
error_msg = str(e)
self.last_urdf_error = error_msg
logger.warning(f"URDF solve attempt {attempts} failed: {e}")
if attempts < self.max_fail_retries:
if attempts == 1:
self._rebuild_solver_safe(True, force_defaults=False)
else:
recovered = self._attempt_solver_recovery(True)
else:
break
if best_angles is None:
frame_angles = [self.urdf_initial_rotations]
if recovered:
frame_angles = [self.urdf_initial_rotations]
else:
self.urdf_best_angles = best_angles[-1].copy()
self.urdf_initial_rotations = self.urdf_best_angles.copy()
frame_angles = [best_angles[-1]] if last_only else best_angles
self.animation_frames_urdf = self._frames_from_angles(frame_angles, True)
self.urdf_solve_counter += 1
if best_angles is None and self.urdf_fail_count >= self.circuit_breaker_threshold:
self.circuit_open_until = time.time() + self.circuit_backoff_seconds
status = "ok" if best_angles is not None else ("recovered" if recovered else "failed")
return {
"status":status,
"solve_time": time.time()-start,
"iterations": steps,
"objective": obj_val,
"frames": len(self.animation_frames_urdf),
"solve_id": self.urdf_solve_counter,
"attempts": attempts,
"error": error_msg,
}
# ---------- Housekeeping ----------
def _cleanup(self):
now = time.time()
if now - self.last_cleanup_time < self.cleanup_interval:
return
try:
gc.collect()
# Removed cache invalidation logic (no caches anymore)
except Exception:
pass
self.last_cleanup_time = now
# ---------- API ----------
def _register_routes(self):
@self.app.get("/")
def index():
return FileResponse("static/index.html")
@self.app.get("/threejs_viewer")
def legacy():
return FileResponse("static/index.html")
@self.app.get("/animation")
def animation(request: Request):
model = request.query_params.get("model","agent").lower()
data = self.animation_frames_urdf if model == "pepper" else self.animation_frames_agent
return Response(content=json.dumps(data), media_type="application/json")
@self.app.get("/config")
def cfg(model: str = "agent"):
if model.lower() == "pepper":
return {
"model":"pepper",
"available_bones": self.urdf_available_bones,
"selectable_bones": self.urdf_selectable_bones,
"default_controlled_bones": self.urdf_current_controlled_bones,
"default_end_effector": self.urdf_current_end_effector,
"end_effector_choices": self.urdf_available_bones,
"hand_shapes": [],
"hand_positions": [],
"max_subpoints": 20,
"default_num_steps": self.urdf_current_num_steps,
"collision_default_center": getattr(self, 'urdf_collision_default_center', [0.2,0.0,0.35]),
"collision_default_radius": getattr(self, 'urdf_collision_default_radius', 0.1),
"collision_default_min_clearance": getattr(self, 'urdf_collision_default_min_clearance', 0.0),
}
return {
"model":"agent",
"available_bones": self.available_bones,
"selectable_bones": self.selectable_bones,
"default_controlled_bones": self.current_controlled_bones,
"default_end_effector": self.current_end_effector,
"end_effector_choices": self.available_bones,
"hand_shapes":["None","Pointing","Shaping","Flat"],
"hand_positions":[
"None","Look Forward","Look 45° Up","Look 45° Down","Look Up","Look Down",
"Look 45° X Downwards","Look 45° X Upwards","Look X Inward","Look to Body",
"Arm Down","Arm 45° Down","Arm Flat"
],
"max_subpoints": 20,
"default_num_steps": self.current_num_steps,
"collision_default_center": getattr(self, 'collision_default_center', [0.1,0.0,0.35]),
"collision_default_radius": getattr(self, 'collision_default_radius', 0.1),
"collision_default_min_clearance": getattr(self, 'collision_default_min_clearance', 0.0),
}
@self.app.post("/configure")
async def configure(request: Request):
payload = await request.json()
model = payload.get("model","agent").lower()
num_steps = self._sanitize_num_steps(payload.get("num_steps", self.args.num_steps))
try:
if model == "pepper":
cfg = self.configure_urdf(payload.get("controlled_bones", []), payload.get("end_effector"), num_steps=num_steps)
else:
cfg = self.configure_agent(payload.get("controlled_bones", []), payload.get("end_effector"), num_steps=num_steps)
return JSONResponse({"status":"ok","config":cfg})
except Exception as e:
return JSONResponse({"status":"error","message":str(e)}, status_code=500)
@self.app.post("/solve")
async def solve(request: Request):
payload = await request.json()
model = payload.get("model","agent").lower()
return_mode = payload.get("frames_mode", "auto")
num_steps = self._sanitize_num_steps(payload.get("num_steps", self.args.num_steps))
subpoints = self._safe_int(payload.get("subpoints",1),1,1,100)
last_only = (return_mode != 'all' and subpoints == 1)
self._cleanup()
with self.solve_lock:
try:
if model == "pepper":
self.configure_urdf(payload.get("controlled_bones", []), payload.get("end_effector", self.urdf_current_end_effector), num_steps=num_steps)
result = self.solve_urdf(payload, last_only=last_only); result["model"]="pepper"; result["num_steps"] = num_steps
if last_only:
frames = list(self.animation_frames_urdf)
elif return_mode == "all" or (return_mode == "auto" and subpoints > 1):
frames = list(self.animation_frames_urdf)
else:
frames = [self.animation_frames_urdf[-1]]
result["frames_data"] = frames
else:
self.configure_agent(payload.get("controlled_bones", []), payload.get("end_effector", self.current_end_effector), num_steps=num_steps)
result = self.solve_agent(payload, last_only=last_only); result["model"]="agent"; result["num_steps"] = num_steps
if last_only:
frames = list(self.animation_frames_agent)
elif return_mode == "all" or (return_mode == "auto" and subpoints > 1):
frames = list(self.animation_frames_agent)
else:
frames = [self.animation_frames_agent[-1]]
result["frames_data"] = frames
status = result.get("status","ok")
top_status = "ok" if status in ("ok","recovered") else status
return JSONResponse({"status":top_status,"result":result})
except Exception as e:
logger.exception("Solve failed unrecoverably")
return JSONResponse({"status":"error","message":str(e)}, status_code=500)
@self.app.post("/reset")
def reset():
with self.solve_lock:
self._attempt_solver_recovery(False)
self._attempt_solver_recovery(True)
return {"status":"ok","message":"solvers reset"}
@self.app.get("/health")
def health():
rss = self.process.memory_info().rss if self.process else 0
return {
"status":"ok",
"agent_frames": len(self.animation_frames_agent),
"urdf_frames": len(self.animation_frames_urdf),
"agent_fail_count": self.agent_fail_count,
"urdf_fail_count": self.urdf_fail_count,
"last_agent_error": self.last_agent_error,
"last_urdf_error": self.last_urdf_error,
"circuit_open_for": max(0, int(self.circuit_open_until - time.time())),
"memory_rss_mb": round(rss/1024/1024,1),
}
@self.app.get("/favicon.ico")
def favicon():
return Response(status_code=204)
# ---------- Main ----------
def main():
parser = configargparse.ArgumentParser(description="Inverse Kinematics Solver - Three.js Web UI", default_config_files=["config.ini"])
parser.add_argument("--gltf_file", type=str, default="files/smplx.glb")
parser.add_argument("--hand", type=str, choices=["left","right"], default="left")
parser.add_argument("--threshold", type=float, default=0.005)
parser.add_argument("--num_steps", type=int, default=100)
parser.add_argument("--learning_rate", type=float, default=0.2)
parser.add_argument("--subpoints", type=int, default=1)
parser.add_argument("--api_port", type=int, default=17861)
parser.add_argument("--warmup", action='store_true', default=True, help="Enable background JIT warmup")
args = parser.parse_args()
download_and_setup_files()
srv = IKServer(args)
import uvicorn
uvicorn.run(srv.app, host="0.0.0.0", port=args.api_port)
if __name__ == "__main__":
main()