Skip to content

Chassisml API Reference

Welcome to the Chassisml API Reference documentation homepage. The API is designed using the principles of REST services using standard HTTP verbs and status codes, implemented with Flask. On this page, you will find documentation for:

  • Available REST endpoints in API Service
  • Methods implemented within the each endpoint

Endpoints

/health (GET)

  • Confirms Chassis service is up and running

/build (POST)

  • Kicks off the container image build process

/job/{job_id} (GET)

  • Retrieves the status of a chassis /build job

/job/{job_id}/download-tar (GET)

  • Retrieves docker image tar archive from a volume attached to the Kubernetes cluster hosting chassis and downloads it to a local filepath

/test (POST)

  • Creates a conda environment as specified by the user's model artifacts and runs the ChassisModel to ensure the model code can run within the provided conda environment

Functions

build_image()

This method is run by the /build endpoint. It generates a model image based upon a POST request. The request.files structure can be seen in the Python SDK docs.

Parameters:

Name Type Description Default
None None

This method does not take any parameters

required

Returns:

Type Description
Dict

information about whether or not the image build resulted in an error

Source code in service/app.py
def build_image():
    '''
    This method is run by the `/build` endpoint. 
    It generates a model image based upon a POST request. The `request.files` structure can be seen in the Python SDK docs.

    Args:
        None (None): This method does not take any parameters

    Returns:
        Dict: information about whether or not the image build resulted in an error
    '''

    if not ('image_data' in request.files and 'model' in request.files):
        return 'Both model and image_data are required', 500

    # retrieve image_data and populate variables accordingly
    image_data = json.load(request.files.get('image_data'))
    model_name = image_data.get('model_name')
    image_name = image_data.get('name')
    gpu = image_data.get('gpu')
    arm64 = image_data.get('arm64')
    publish = image_data.get('publish', False)
    publish = True if publish else ''
    registry_auth = image_data.get('registry_auth')
    webhook = image_data.get('webhook')

    # retrieve binary representations for all three variables
    model = request.files.get('model')
    metadata_data = request.files.get('metadata_data')

    # This is a future proofing variable in case we encounter a model that cannot be converted into mlflow.
    # It will remain hardcoded for now.
    module_name = 'mlflow'

    # This name is a random id used to ensure that all jobs are uniquely named and traceable.
    random_name = str(uuid.uuid4())

    # Unzip model archive
    if PV_MODE:
        unzip_model(model, module_name, random_name)
        context_uri = None
    else:
        dockerfile = choose_dockerfile(gpu,arm64)
        context_uri = upload_context(model, module_name, random_name, metadata_data, dockerfile)

        if not context_uri:
            return Response(f"403 Forbidden: Cloud storage credentials could not push to context bucket.",403)

    metadata_path = extract_metadata(metadata_data, module_name, random_name)

    # this path is the local location that kaniko will store the image it creates
    path_to_tar_file = f'{DATA_DIR if PV_MODE else "/tar"}/kaniko_image-{random_name}.tar'

    logger.debug(f'Request data: {image_name}, {module_name}, {model_name}, {path_to_tar_file}')
    error = run_kaniko(
        image_name,
        module_name,
        model_name,
        path_to_tar_file,
        random_name,
        publish,
        registry_auth,
        gpu,
        arm64,
        context_uri,
        metadata_path,
        webhook
    )

    if error:
        return {'error': error, 'job_id': None}

    return {'error': False, 'job_id': f'{K_JOB_NAME}-{random_name}'}

create_job_object(image_name, module_name, model_name, path_to_tar_file, random_name, publish, registry_auth, gpu=False, arm64=False, context_uri=None, metadata_path=None)

This utility method sets up all the required objects needed to create a model image and is run within the run_kaniko method.

Parameters:

Name Type Description Default
image_name str

container image name

required
module_name str

reference module to locate location within service input is saved

required
model_name str

name of model to package

required
path_to_tar_file str

filepath destination to save docker image tar file

required
random_name str

random id generated during build process that is used to ensure that all jobs are uniquely named and traceable

required
publish bool

determines if image will be published to Docker registry

required
registry_auth dict

Docker registry authorization credentials

required
gpu bool

If True, will build container image that runs on GPU

False
arm64 bool

If True, will build container image that runs on ARM64 architecture

False
context_uri str

Location of build context in S3 (S3 mode only)

None

Returns:

Type Description
Job

Chassis job object

Source code in service/app.py
def create_job_object(
        image_name,
        module_name,
        model_name,
        path_to_tar_file,
        random_name,
        publish,
        registry_auth,
        gpu=False,
        arm64=False,
        context_uri=None,
        metadata_path=None
):
    '''
    This utility method sets up all the required objects needed to create a model image and is run within the `run_kaniko` method.

    Args:
        image_name (str): container image name
        module_name (str): reference module to locate location within service input is saved
        model_name (str): name of model to package
        path_to_tar_file (str): filepath destination to save docker image tar file
        random_name (str): random id generated during build process that is used to ensure that all jobs are uniquely named and traceable
        publish (bool): determines if image will be published to Docker registry
        registry_auth (dict): Docker registry authorization credentials  
        gpu (bool): If `True`, will build container image that runs on GPU 
        arm64 (bool): If `True`, will build container image that runs on ARM64 architecture
        context_uri (str): Location of build context in S3 (S3 mode only)

    Returns:
        Job: Chassis job object

    '''
    job_name = f'{K_JOB_NAME}-{random_name}'

    if registry_auth and not REGISTRY_URI:
        # credential setup for Docker Hub.
        # json for holding registry credentials that will access docker hub.
        # reference: https://github.com/GoogleContainerTools/kaniko#pushing-to-docker-hub
        registry_credentials = f'{{"auths":{{"https://index.docker.io/v1/":{{"auth":"{registry_auth}"}}}}}}'
        b64_registry_credentials = base64.b64encode(registry_credentials.encode("utf-8")).decode("utf-8")
    elif registry_auth and REGISTRY_URI:
        registry_credentials = f'{{"auths":{{"{REGISTRY_URI}":{{"auth":"{registry_auth}"}}}}}}'
        b64_registry_credentials = base64.b64encode(registry_credentials.encode("utf-8")).decode("utf-8")
    elif not registry_auth and not REGISTRY_CREDENTIALS:
        raise ValueError("No registry credentials provided by user or during Chassis installation.")
    else:
        b64_registry_credentials = REGISTRY_CREDENTIALS

    # mount path leads to /data
    # this is a mount point. NOT the volume itself.
    # name aligns with a volume defined below.
    if PV_MODE:
        data_volume_mount = client.V1VolumeMount(
            mount_path=MOUNT_PATH_DIR,
            name="local-volume-code"
        ) if CHASSIS_DEV else client.V1VolumeMount(
            mount_path=MOUNT_PATH_DIR,
            name=DATA_VOLUME_NAME
        )

    # This volume will be used by kaniko container to get registry credentials.
    # mount path leads to /kaniko/.docker per kaniko reference documentation
    # this is a mount point. NOT the volume itself.
    # name aligns with a volume defined below.
    kaniko_credentials_volume_mount = client.V1VolumeMount(
        mount_path=K_KANIKO_EMPTY_DIR_PATH,
        name=K_EMPTY_DIR_NAME
    )

    # create secret for registry credentials       
    registry_creds_secret_name = f'{random_name}-creds'
    metadata = {'name': registry_creds_secret_name, 'namespace': ENVIRONMENT}
    data = {'config.json': b64_registry_credentials}
    api_version = 'v1'
    kind = 'Secret'
    secret = client.V1Secret(api_version, data , kind, metadata)
    client.CoreV1Api().create_namespaced_secret(ENVIRONMENT, secret)

    # volume holding credentials
    kaniko_credentials_volume = client.V1Volume(
        name=K_EMPTY_DIR_NAME,
        secret=client.V1SecretVolumeSource(secret_name=registry_creds_secret_name)
    )

    # This is the kaniko container used to build the final image.
    kaniko_args = [
        '' if publish else '--no-push',
        f'--destination={REGISTRY_URI+"/" if REGISTRY_URI else ""}{REPOSITORY_PREFIX}{image_name}{"" if ":" in image_name else ":latest"}',
        '--snapshotMode=redo',
        '--use-new-run',
        f'--build-arg=MODEL_DIR=model-{random_name}',
        f'--build-arg=MODZY_METADATA_PATH={metadata_path if metadata_path is not None else "flavours/mlflow/interfaces/modzy/asset_bundle/0.1.0/model.yaml"}',
        f'--build-arg=MODEL_NAME={model_name}',
        f'--build-arg=MODEL_CLASS={module_name}',
        # Modzy is the default interface.
        '--build-arg=INTERFACE=modzy',
    ]

    volumes = [kaniko_credentials_volume]
    kaniko_volume_mounts = [kaniko_credentials_volume_mount]

    base_resources = {"memory": "30Gi", "cpu": "2"}
    slim_reqs = {"memory": "2Gi", "cpu": "1"}
    kaniko_reqs = client.V1ResourceRequirements(limits=base_resources, requests=base_resources)

    if PV_MODE:
        dockerfile = choose_dockerfile(gpu,arm64)
        kaniko_args.extend([f'--dockerfile={DATA_DIR}/flavours/{module_name}/{dockerfile}',f'--context={DATA_DIR}',
                            f'--tarPath={path_to_tar_file}',])
        kaniko_volume_mounts.append(data_volume_mount)

        init_container_kaniko = client.V1Container(
            name='kaniko',
            image='gcr.io/kaniko-project/executor:v1.8.1',
            volume_mounts=kaniko_volume_mounts,
            resources=kaniko_reqs,
            args=kaniko_args
        )

        # volume claim
        data_pv_claim = client.V1PersistentVolumeClaimVolumeSource(
            claim_name="dir-claim-chassis"
        ) if CHASSIS_DEV else client.V1PersistentVolumeClaimVolumeSource(
            claim_name=DATA_VOLUME_CLAIM_NAME
        )

        # volume holding data
        data_volume = client.V1Volume(
            name="local-volume-code",
            persistent_volume_claim=data_pv_claim
        ) if CHASSIS_DEV else client.V1Volume(
            name=DATA_VOLUME_NAME,
            persistent_volume_claim=data_pv_claim
        )

        volumes.append(data_volume)
    else:
        kaniko_args.append(f'--context={context_uri}')
        if MODE=="s3":
            kaniko_s3_volume_mount = client.V1VolumeMount(
                mount_path='/root/.aws',
                name='storage-key'
            )

            kaniko_storage_key_volume = client.V1Volume(
                name='storage-key',
                secret=client.V1SecretVolumeSource(secret_name=STORAGE_CREDENTIALS_SECRET_NAME)
            )

            kaniko_volume_mounts.append(kaniko_s3_volume_mount)
            init_container_kaniko = client.V1Container(
                name='kaniko',
                image='gcr.io/kaniko-project/executor:v1.8.1',
                volume_mounts=kaniko_volume_mounts,
                env=[client.V1EnvVar(name='AWS_REGION', value=AWS_REGION)],
                resources=kaniko_reqs,
                args=kaniko_args
            )
        elif MODE=="gs":
            kaniko_gs_volume_mount = client.V1VolumeMount(
                mount_path='/secret',
                name='storage-key'
            )

            kaniko_storage_key_volume = client.V1Volume(
                name='storage-key',
                secret=client.V1SecretVolumeSource(secret_name=STORAGE_CREDENTIALS_SECRET_NAME)
            )

            kaniko_volume_mounts.append(kaniko_gs_volume_mount)
            init_container_kaniko = client.V1Container(
                name='kaniko',
                image='gcr.io/kaniko-project/executor:v1.8.1',
                volume_mounts=kaniko_volume_mounts,
                env=[client.V1EnvVar(name='GOOGLE_APPLICATION_CREDENTIALS', value='/secret/storage-key.json')],
                resources=kaniko_reqs,
                args=kaniko_args
            )
        else:
            raise ValueError("Only allowed modes are: 'pv', 'gs', 's3'")

        volumes.append(kaniko_storage_key_volume)

    # Pod spec for the image build process
    init_container_list = []
    containers_list = [init_container_kaniko]

    pod_spec = client.V1PodSpec(
        service_account_name=K_SERVICE_ACOUNT_NAME,
        restart_policy='Never',
        init_containers=init_container_list,
        containers=containers_list,
        volumes=volumes
    )

    # setup and initiate model image build
    template = client.V1PodTemplateSpec(
        metadata=client.V1ObjectMeta(name=job_name),
        spec=pod_spec
    )

    spec = client.V1JobSpec(
        backoff_limit=0,
        template=template
    )
    job = client.V1Job(
        api_version='batch/v1',
        kind='Job',
        metadata=client.V1ObjectMeta(
            name=job_name,
        ),
        spec=spec
    )

    return job

download_tar(job_id)

This method is run by the /job/{job_id}/download-tar endpoint. It downloads the container image from kaniko, built during the chassis job with the name job_id

Parameters:

Name Type Description Default
job_id str

valid Chassis job identifier, generated by create_job method

required

Returns:

Type Description
Dict

response from download_tar endpoint

Source code in service/app.py
def download_tar(job_id):
    '''
    This method is run by the `/job/{job_id}/download-tar` endpoint. 
    It downloads the container image from kaniko, built during the chassis job with the name `job_id`

    Args:
        job_id (str): valid Chassis job identifier, generated by `create_job` method 

    Returns:
        Dict: response from `download_tar` endpoint
    '''
    uid = job_id.split(f'{K_JOB_NAME}-')[1]

    if PV_MODE:
        return send_from_directory(DATA_DIR, path=f'kaniko_image-{uid}.tar', as_attachment=False)
    else:
        return Response(f"400 Bad Request: Tar download not available in production mode, please use 'docker pull ...'",400)

get_job_status(job_id)

This method is run by the /job/{job_id} endpoint. Based on a GET request, it retrieves the status of the Kaniko job and the results if the job has completed.

Parameters:

Name Type Description Default
job_id str

valid Chassis job identifier, generated by create_job method

required

Returns:

Type Description
Dict

Dictionary containing corresponding job data of job job_id

Source code in service/app.py
def get_job_status(job_id):
    '''
    This method is run by the `/job/{job_id}` endpoint.
    Based on a GET request, it retrieves the status of the Kaniko job and the results if the job has completed.

    Args:
        job_id (str): valid Chassis job identifier, generated by `create_job` method

    Returns:
        Dict: Dictionary containing corresponding job data of job `job_id` 
    '''
    if CHASSIS_DEV:
        # if you are doing local dev you need to point at the local kubernetes cluster with your config file
        kubefile = os.getenv("CHASSIS_KUBECONFIG")
        config.load_kube_config(kubefile)
    else:
        # if the service is running inside a cluster during production then the config can be inherited
        config.load_incluster_config()

    batch_v1 = client.BatchV1Api()

    try:
        job = batch_v1.read_namespaced_job(job_id, ENVIRONMENT)

        annotations = job.metadata.annotations or {}
        result = annotations.get('result')
        result = json.loads(result) if result else None
        status = job.status.to_dict()

        job_data = {
            'result': result,
            'status': status
        }
        if status['failed']:
            job_data['logs'] = get_job_logs(job_id)

        return job_data
    except ApiException as e:
        logger.error(f'Exception when getting job status: {e}')
        return e.body

test_model()

This method is run by the /test endpoint. It creates a new conda environment from the provided conda.yaml file and then tests the provided model in that conda environment with provided test input file.

Parameters:

Name Type Description Default
None None

This method does not take any parameters

required

Returns:

Type Description
Dict

model response to /test endpoint. Should contain either successful predictions or error message

Source code in service/app.py
def test_model():
    '''
    This method is run by the `/test` endpoint. It creates a new conda environment from the provided `conda.yaml` file and then tests the provided model in that conda environment with provided test input file.

    Args:
        None (None): This method does not take any parameters

    Returns:
        Dict: model response to `/test` endpoint. Should contain either successful predictions or error message
    '''
    if not ('sample_input' in request.files and 'model' in request.files):
        return 'Both sample input and model are required', 500

    output_dict = {}

    # retrieve binary representations for both variables
    model = request.files.get('model')
    sample_input = request.files.get('sample_input')

    # This is a future proofing variable in case we encounter a model that cannot be converted into mlflow.
    # It will remain hardcoded for now.
    module_name = 'mlflow'

    # This name is a random id used to ensure that all jobs are uniquely named and traceable.
    random_name = str(uuid.uuid4())

    # Unzip model archive
    unzipped_path = unzip_model(model, module_name, random_name)

    # get sample input path
    sample_input_path = extract_sample_input(sample_input, module_name, random_name)

    # create conda env, return error if fails
    try:
        tmp_env_name = str(time.time())
        rm_env_cmd = "conda env remove --name {}".format(tmp_env_name)
        yaml_path = os.path.join(unzipped_path,"conda.yaml")
        create_env_cmd = "conda env create -f {} -n {}".format(yaml_path,tmp_env_name)
        subprocess.run(create_env_cmd, capture_output=True, shell=True, executable='/bin/bash', check=True)
    except subprocess.CalledProcessError as e:
        print(e)
        subprocess.run(rm_env_cmd, capture_output=True, shell=True, executable='/bin/bash')
        output_dict["env_error"] = e.stderr.decode()
        return output_dict

    # test model in env with sample input file, return error if fails
    try:
        test_model_cmd = """
        source activate {};
        python test_chassis_model.py {} {}
        """.format(tmp_env_name,unzipped_path,sample_input_path)
        test_ret = subprocess.run(test_model_cmd, capture_output=True, shell=True, executable='/bin/bash', check=True)
        output_dict["model_output"] = test_ret.stdout.decode()
    except subprocess.CalledProcessError as e:
        subprocess.run(rm_env_cmd, capture_output=True, shell=True, executable='/bin/bash')
        output_dict["model_error"] = e.stderr.decode()
        return output_dict

    # if we make it here, test was successful, remove env and return output
    subprocess.run(rm_env_cmd, capture_output=True, shell=True, executable='/bin/bash')
    return output_dict