# SPDX-FileCopyrightText: 2025 UL Research Institutes # SPDX-License-Identifier: Apache-2.0 import functools import time from datetime import datetime, timedelta, timezone from pathlib import Path import click import httpx from dyff.client import Client, errors from dyff.schema.platform import * from dyff.schema.requests import * from app.api.models import PredictionResponse # ---------------------------------------------------------------------------- def _wait_for_status( get_entity_fn, target_status: str | list[str], *, timeout: timedelta ) -> str: if isinstance(target_status, str): target_status = [target_status] then = datetime.now(timezone.utc) while True: try: status = get_entity_fn().status if status in target_status: return status except errors.HTTPError as ex: if ex.status != 404: raise except httpx.HTTPStatusError as ex: if ex.response.status_code != 404: raise if (datetime.now(timezone.utc) - then) >= timeout: break time.sleep(5) raise AssertionError("timeout") def _common_options(f): @click.option( "--account", type=str, required=True, help="Your account ID", metavar="ID", ) @functools.wraps(f) def wrapper(*args, **kwargs): return f(*args, **kwargs) return wrapper @click.group() def cli(): pass @cli.command() @_common_options @click.option( "--name", type=str, required=True, help="The name of your detector model. For display and querying purposes only.", ) @click.option( "--image", type=str, default=None, help="The Docker image to upload (e.g., 'some/image:latest')." " Must exist in your local Docker deamon." " Required if --artifact is not specified.", ) @click.option( "--endpoint", type=str, default="predict", help="The endpoint to call on your service to make a prediction.", ) @click.option( "--volume", type=click.Path(exists=True, file_okay=False, readable=True, resolve_path=True, path_type=Path), default=None, help="A local directory path containing files to upload and mount in the running Docker container." " You should use this if your submission includes large files like neural network weights." ) @click.option( "--volume-mount", type=click.Path(exists=False, path_type=Path), default=None, help="The path to mount your uploaded directory in the running Docker container." " Must be an absolute path." " Required if --volume is specified.") @click.option( "--artifact", "artifact_id", type=str, default=None, help="The ID of the Artifact (i.e., Docker image) to use in the submission, if it already exists." " You can pass the artifact.id from a previous invocation.", metavar="ID", ) @click.option( "--model", "model_id", type=str, default=None, help="The ID of the Model (i.e., neural network weights) to use in the submission, if it already exists." " You can pass the model.id from a previous invocation.", metavar="ID", ) @click.option( "--gpu", is_flag=True, default=False, help="Request a GPU (NVIDIA L4) for the inference service.", ) def upload_submission( account: str, name: str, image: str | None, endpoint: str, volume: Path | None, volume_mount: Path | None, artifact_id: str | None, model_id: str | None, gpu: bool, ) -> None: dyffapi = Client() # Upload the image if artifact_id is None: # Create an Artifact resource click.echo("creating Artifact ...") artifact = dyffapi.artifacts.create(ArtifactCreateRequest(account=account)) click.echo(f"artifact.id: \"{artifact.id}\"") _wait_for_status( lambda: dyffapi.artifacts.get(artifact.id), "WaitingForUpload", timeout=timedelta(seconds=30), ) # Push the image from the local Docker daemon click.echo("pushing Artifact ...") dyffapi.artifacts.push(artifact, source=f"docker-daemon:{image}") time.sleep(5) # Indicate that we're done pushing dyffapi.artifacts.finalize(artifact.id) _wait_for_status( lambda: dyffapi.artifacts.get(artifact.id), "Ready", timeout=timedelta(seconds=30), ) click.echo("... done") else: artifact = dyffapi.artifacts.get(artifact_id) assert artifact is not None model: Model | None = None if model_id is None: if volume is not None: if volume_mount is None: raise click.UsageError("--volume-mount is required when --volume is used") click.echo("creating Model from local directory ...") model = dyffapi.models.create_from_volume( volume, name="model_volume", account=account, resources=ModelResources() ) click.echo(f"model.id: \"{model.id}\"") _wait_for_status( lambda: dyffapi.models.get(model.id), "WaitingForUpload", timeout=timedelta(seconds=30), ) click.echo("uploading Model ...") dyffapi.models.upload_volume(model, volume) _wait_for_status( lambda: dyffapi.models.get(model.id), "Ready", timeout=timedelta(seconds=30), ) click.echo("... done") else: model = None else: model = dyffapi.models.get(model_id) assert model is not None # Create a runnable InferenceService if volume_mount is not None: if model is None: raise click.UsageError("--volume-mount requires --volume or --model") if not volume_mount.is_absolute(): raise click.UsageError("--volume-mount must be an absolute path") volumeMounts=[ VolumeMount( kind=VolumeMountKind.data, name="model", mountPath=volume_mount, data=VolumeMountData( source=EntityIdentifier.of(model), ), ), ] else: volumeMounts = None accelerator: Accelerator | None = None if gpu: accelerator = Accelerator( kind="GPU", gpu=AcceleratorGPU( hardwareTypes=["nvidia.com/gpu-l4"], count=1, ), ) # Don't change this service_request = InferenceServiceCreateRequest( account=account, name=name, model=None, runner=InferenceServiceRunner( kind=InferenceServiceRunnerKind.CONTAINER, imageRef=EntityIdentifier.of(artifact), resources=ModelResources(), volumeMounts=volumeMounts, accelerator=accelerator, ), interface=InferenceInterface( endpoint=endpoint, outputSchema=DataSchema.make_output_schema(PredictionResponse), ), ) click.echo("creating InferenceService ...") service = dyffapi.inferenceservices.create(service_request) click.echo(f"service.id: \"{service.id}\"") click.echo("... done") @cli.command() @_common_options @click.option( "--task", "task_id", type=str, required=True, help="The Task ID to submit to.", metavar="ID", ) @click.option( "--team", "team_id", type=str, required=True, help="The Team ID making the submission.", metavar="ID", ) @click.option( "--service", "service_id", type=str, required=True, help="The InferenceService ID to submit.", metavar="ID", ) @click.option( "--challenge", "challenge_id", type=str, default="dc509a8c771b492b90c43012fde9a04f", help="The Challenge ID to submit to.", metavar="ID", ) def submit(account: str, task_id: str, team_id: str, service_id: str, challenge_id: str) -> None: dyffapi = Client() challenge = dyffapi.challenges.get(challenge_id) challengetask = challenge.tasks[task_id] team = dyffapi.teams.get(team_id) service = dyffapi.inferenceservices.get(service_id) submission = dyffapi.challenges.submit( challenge.id, challengetask.id, SubmissionCreateRequest( account=account, team=team.id, submission=EntityIdentifier(kind="InferenceService", id=service.id), ), ) click.echo(submission.model_dump_json(indent=2)) click.echo(f"submission.id: \"{submission.id}\"") if __name__ == "__main__": cli(show_default=True)