Chassisml API Reference
Welcome to the Chassisml API Reference documentation homepage. The API is designed using the principles of REST services using standard HTTP verbs and status codes, implemented with Flask. On this page, you will find documentation for:
- Available REST endpoints in API Service
- Methods implemented within the each endpoint
Endpoints
/health
(GET)
- Confirms Chassis service is up and running
/build
(POST)
- Kicks off the container image build process
/job/{job_id}
(GET)
- Retrieves the status of a chassis
/build
job
/job/{job_id}/download-tar
(GET)
- Retrieves docker image tar archive from a volume attached to the Kubernetes cluster hosting chassis and downloads it to a local filepath
/test
(POST)
- Creates a conda environment as specified by the user's model artifacts and runs the
ChassisModel
to ensure the model code can run within the provided conda environment
Functions
build_image()
This method is run by the /build
endpoint. It generates a model image based upon a POST request. The request.files
structure can be seen in the Python SDK docs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
None | None | This method does not take any parameters | required |
Returns:
Type | Description |
---|---|
Dict | information about whether or not the image build resulted in an error |
Source code in service/app.py
def build_image():
'''
This method is run by the `/build` endpoint.
It generates a model image based upon a POST request. The `request.files` structure can be seen in the Python SDK docs.
Args:
None (None): This method does not take any parameters
Returns:
Dict: information about whether or not the image build resulted in an error
'''
if not ('image_data' in request.files and 'model' in request.files):
return 'Both model and image_data are required', 500
# retrieve image_data and populate variables accordingly
image_data = json.load(request.files.get('image_data'))
model_name = image_data.get('model_name')
image_name = image_data.get('name')
gpu = image_data.get('gpu')
arm64 = image_data.get('arm64')
publish = image_data.get('publish', False)
publish = True if publish else ''
registry_auth = image_data.get('registry_auth')
webhook = image_data.get('webhook')
# retrieve binary representations for all three variables
model = request.files.get('model')
metadata_data = request.files.get('metadata_data')
# This is a future proofing variable in case we encounter a model that cannot be converted into mlflow.
# It will remain hardcoded for now.
module_name = 'mlflow'
# This name is a random id used to ensure that all jobs are uniquely named and traceable.
random_name = str(uuid.uuid4())
# Unzip model archive
if PV_MODE:
unzip_model(model, module_name, random_name)
context_uri = None
else:
dockerfile = choose_dockerfile(gpu,arm64)
context_uri = upload_context(model, module_name, random_name, metadata_data, dockerfile)
if not context_uri:
return Response(f"403 Forbidden: Cloud storage credentials could not push to context bucket.",403)
metadata_path = extract_metadata(metadata_data, module_name, random_name)
# this path is the local location that kaniko will store the image it creates
path_to_tar_file = f'{DATA_DIR if PV_MODE else "/tar"}/kaniko_image-{random_name}.tar'
logger.debug(f'Request data: {image_name}, {module_name}, {model_name}, {path_to_tar_file}')
error = run_kaniko(
image_name,
module_name,
model_name,
path_to_tar_file,
random_name,
publish,
registry_auth,
gpu,
arm64,
context_uri,
metadata_path,
webhook
)
if error:
return {'error': error, 'job_id': None}
return {'error': False, 'job_id': f'{K_JOB_NAME}-{random_name}'}
create_job_object(image_name, module_name, model_name, path_to_tar_file, random_name, publish, registry_auth, gpu=False, arm64=False, context_uri=None, metadata_path=None)
This utility method sets up all the required objects needed to create a model image and is run within the run_kaniko
method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image_name | str | container image name | required |
module_name | str | reference module to locate location within service input is saved | required |
model_name | str | name of model to package | required |
path_to_tar_file | str | filepath destination to save docker image tar file | required |
random_name | str | random id generated during build process that is used to ensure that all jobs are uniquely named and traceable | required |
publish | bool | determines if image will be published to Docker registry | required |
registry_auth | dict | Docker registry authorization credentials | required |
gpu | bool | If | False |
arm64 | bool | If | False |
context_uri | str | Location of build context in S3 (S3 mode only) | None |
Returns:
Type | Description |
---|---|
Job | Chassis job object |
Source code in service/app.py
def create_job_object(
image_name,
module_name,
model_name,
path_to_tar_file,
random_name,
publish,
registry_auth,
gpu=False,
arm64=False,
context_uri=None,
metadata_path=None
):
'''
This utility method sets up all the required objects needed to create a model image and is run within the `run_kaniko` method.
Args:
image_name (str): container image name
module_name (str): reference module to locate location within service input is saved
model_name (str): name of model to package
path_to_tar_file (str): filepath destination to save docker image tar file
random_name (str): random id generated during build process that is used to ensure that all jobs are uniquely named and traceable
publish (bool): determines if image will be published to Docker registry
registry_auth (dict): Docker registry authorization credentials
gpu (bool): If `True`, will build container image that runs on GPU
arm64 (bool): If `True`, will build container image that runs on ARM64 architecture
context_uri (str): Location of build context in S3 (S3 mode only)
Returns:
Job: Chassis job object
'''
job_name = f'{K_JOB_NAME}-{random_name}'
if registry_auth and not REGISTRY_URI:
# credential setup for Docker Hub.
# json for holding registry credentials that will access docker hub.
# reference: https://github.com/GoogleContainerTools/kaniko#pushing-to-docker-hub
registry_credentials = f'{{"auths":{{"https://index.docker.io/v1/":{{"auth":"{registry_auth}"}}}}}}'
b64_registry_credentials = base64.b64encode(registry_credentials.encode("utf-8")).decode("utf-8")
elif registry_auth and REGISTRY_URI:
registry_credentials = f'{{"auths":{{"{REGISTRY_URI}":{{"auth":"{registry_auth}"}}}}}}'
b64_registry_credentials = base64.b64encode(registry_credentials.encode("utf-8")).decode("utf-8")
elif not registry_auth and not REGISTRY_CREDENTIALS:
raise ValueError("No registry credentials provided by user or during Chassis installation.")
else:
b64_registry_credentials = REGISTRY_CREDENTIALS
# mount path leads to /data
# this is a mount point. NOT the volume itself.
# name aligns with a volume defined below.
if PV_MODE:
data_volume_mount = client.V1VolumeMount(
mount_path=MOUNT_PATH_DIR,
name="local-volume-code"
) if CHASSIS_DEV else client.V1VolumeMount(
mount_path=MOUNT_PATH_DIR,
name=DATA_VOLUME_NAME
)
# This volume will be used by kaniko container to get registry credentials.
# mount path leads to /kaniko/.docker per kaniko reference documentation
# this is a mount point. NOT the volume itself.
# name aligns with a volume defined below.
kaniko_credentials_volume_mount = client.V1VolumeMount(
mount_path=K_KANIKO_EMPTY_DIR_PATH,
name=K_EMPTY_DIR_NAME
)
# create secret for registry credentials
registry_creds_secret_name = f'{random_name}-creds'
metadata = {'name': registry_creds_secret_name, 'namespace': ENVIRONMENT}
data = {'config.json': b64_registry_credentials}
api_version = 'v1'
kind = 'Secret'
secret = client.V1Secret(api_version, data , kind, metadata)
client.CoreV1Api().create_namespaced_secret(ENVIRONMENT, secret)
# volume holding credentials
kaniko_credentials_volume = client.V1Volume(
name=K_EMPTY_DIR_NAME,
secret=client.V1SecretVolumeSource(secret_name=registry_creds_secret_name)
)
# This is the kaniko container used to build the final image.
kaniko_args = [
'' if publish else '--no-push',
f'--destination={REGISTRY_URI+"/" if REGISTRY_URI else ""}{REPOSITORY_PREFIX}{image_name}{"" if ":" in image_name else ":latest"}',
'--snapshotMode=redo',
'--use-new-run',
f'--build-arg=MODEL_DIR=model-{random_name}',
f'--build-arg=MODZY_METADATA_PATH={metadata_path if metadata_path is not None else "flavours/mlflow/interfaces/modzy/asset_bundle/0.1.0/model.yaml"}',
f'--build-arg=MODEL_NAME={model_name}',
f'--build-arg=MODEL_CLASS={module_name}',
# Modzy is the default interface.
'--build-arg=INTERFACE=modzy',
]
volumes = [kaniko_credentials_volume]
kaniko_volume_mounts = [kaniko_credentials_volume_mount]
base_resources = {"memory": "30Gi", "cpu": "2"}
slim_reqs = {"memory": "2Gi", "cpu": "1"}
kaniko_reqs = client.V1ResourceRequirements(limits=base_resources, requests=base_resources)
if PV_MODE:
dockerfile = choose_dockerfile(gpu,arm64)
kaniko_args.extend([f'--dockerfile={DATA_DIR}/flavours/{module_name}/{dockerfile}',f'--context={DATA_DIR}',
f'--tarPath={path_to_tar_file}',])
kaniko_volume_mounts.append(data_volume_mount)
init_container_kaniko = client.V1Container(
name='kaniko',
image='gcr.io/kaniko-project/executor:v1.8.1',
volume_mounts=kaniko_volume_mounts,
resources=kaniko_reqs,
args=kaniko_args
)
# volume claim
data_pv_claim = client.V1PersistentVolumeClaimVolumeSource(
claim_name="dir-claim-chassis"
) if CHASSIS_DEV else client.V1PersistentVolumeClaimVolumeSource(
claim_name=DATA_VOLUME_CLAIM_NAME
)
# volume holding data
data_volume = client.V1Volume(
name="local-volume-code",
persistent_volume_claim=data_pv_claim
) if CHASSIS_DEV else client.V1Volume(
name=DATA_VOLUME_NAME,
persistent_volume_claim=data_pv_claim
)
volumes.append(data_volume)
else:
kaniko_args.append(f'--context={context_uri}')
if MODE=="s3":
kaniko_s3_volume_mount = client.V1VolumeMount(
mount_path='/root/.aws',
name='storage-key'
)
kaniko_storage_key_volume = client.V1Volume(
name='storage-key',
secret=client.V1SecretVolumeSource(secret_name=STORAGE_CREDENTIALS_SECRET_NAME)
)
kaniko_volume_mounts.append(kaniko_s3_volume_mount)
init_container_kaniko = client.V1Container(
name='kaniko',
image='gcr.io/kaniko-project/executor:v1.8.1',
volume_mounts=kaniko_volume_mounts,
env=[client.V1EnvVar(name='AWS_REGION', value=AWS_REGION)],
resources=kaniko_reqs,
args=kaniko_args
)
elif MODE=="gs":
kaniko_gs_volume_mount = client.V1VolumeMount(
mount_path='/secret',
name='storage-key'
)
kaniko_storage_key_volume = client.V1Volume(
name='storage-key',
secret=client.V1SecretVolumeSource(secret_name=STORAGE_CREDENTIALS_SECRET_NAME)
)
kaniko_volume_mounts.append(kaniko_gs_volume_mount)
init_container_kaniko = client.V1Container(
name='kaniko',
image='gcr.io/kaniko-project/executor:v1.8.1',
volume_mounts=kaniko_volume_mounts,
env=[client.V1EnvVar(name='GOOGLE_APPLICATION_CREDENTIALS', value='/secret/storage-key.json')],
resources=kaniko_reqs,
args=kaniko_args
)
else:
raise ValueError("Only allowed modes are: 'pv', 'gs', 's3'")
volumes.append(kaniko_storage_key_volume)
# Pod spec for the image build process
init_container_list = []
containers_list = [init_container_kaniko]
pod_spec = client.V1PodSpec(
service_account_name=K_SERVICE_ACOUNT_NAME,
restart_policy='Never',
init_containers=init_container_list,
containers=containers_list,
volumes=volumes
)
# setup and initiate model image build
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(name=job_name),
spec=pod_spec
)
spec = client.V1JobSpec(
backoff_limit=0,
template=template
)
job = client.V1Job(
api_version='batch/v1',
kind='Job',
metadata=client.V1ObjectMeta(
name=job_name,
),
spec=spec
)
return job
download_tar(job_id)
This method is run by the /job/{job_id}/download-tar
endpoint. It downloads the container image from kaniko, built during the chassis job with the name job_id
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id | str | valid Chassis job identifier, generated by | required |
Returns:
Type | Description |
---|---|
Dict | response from |
Source code in service/app.py
def download_tar(job_id):
'''
This method is run by the `/job/{job_id}/download-tar` endpoint.
It downloads the container image from kaniko, built during the chassis job with the name `job_id`
Args:
job_id (str): valid Chassis job identifier, generated by `create_job` method
Returns:
Dict: response from `download_tar` endpoint
'''
uid = job_id.split(f'{K_JOB_NAME}-')[1]
if PV_MODE:
return send_from_directory(DATA_DIR, path=f'kaniko_image-{uid}.tar', as_attachment=False)
else:
return Response(f"400 Bad Request: Tar download not available in production mode, please use 'docker pull ...'",400)
get_job_status(job_id)
This method is run by the /job/{job_id}
endpoint. Based on a GET request, it retrieves the status of the Kaniko job and the results if the job has completed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id | str | valid Chassis job identifier, generated by | required |
Returns:
Type | Description |
---|---|
Dict | Dictionary containing corresponding job data of job |
Source code in service/app.py
def get_job_status(job_id):
'''
This method is run by the `/job/{job_id}` endpoint.
Based on a GET request, it retrieves the status of the Kaniko job and the results if the job has completed.
Args:
job_id (str): valid Chassis job identifier, generated by `create_job` method
Returns:
Dict: Dictionary containing corresponding job data of job `job_id`
'''
if CHASSIS_DEV:
# if you are doing local dev you need to point at the local kubernetes cluster with your config file
kubefile = os.getenv("CHASSIS_KUBECONFIG")
config.load_kube_config(kubefile)
else:
# if the service is running inside a cluster during production then the config can be inherited
config.load_incluster_config()
batch_v1 = client.BatchV1Api()
try:
job = batch_v1.read_namespaced_job(job_id, ENVIRONMENT)
annotations = job.metadata.annotations or {}
result = annotations.get('result')
result = json.loads(result) if result else None
status = job.status.to_dict()
job_data = {
'result': result,
'status': status
}
if status['failed']:
job_data['logs'] = get_job_logs(job_id)
return job_data
except ApiException as e:
logger.error(f'Exception when getting job status: {e}')
return e.body
test_model()
This method is run by the /test
endpoint. It creates a new conda environment from the provided conda.yaml
file and then tests the provided model in that conda environment with provided test input file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
None | None | This method does not take any parameters | required |
Returns:
Type | Description |
---|---|
Dict | model response to |
Source code in service/app.py
def test_model():
'''
This method is run by the `/test` endpoint. It creates a new conda environment from the provided `conda.yaml` file and then tests the provided model in that conda environment with provided test input file.
Args:
None (None): This method does not take any parameters
Returns:
Dict: model response to `/test` endpoint. Should contain either successful predictions or error message
'''
if not ('sample_input' in request.files and 'model' in request.files):
return 'Both sample input and model are required', 500
output_dict = {}
# retrieve binary representations for both variables
model = request.files.get('model')
sample_input = request.files.get('sample_input')
# This is a future proofing variable in case we encounter a model that cannot be converted into mlflow.
# It will remain hardcoded for now.
module_name = 'mlflow'
# This name is a random id used to ensure that all jobs are uniquely named and traceable.
random_name = str(uuid.uuid4())
# Unzip model archive
unzipped_path = unzip_model(model, module_name, random_name)
# get sample input path
sample_input_path = extract_sample_input(sample_input, module_name, random_name)
# create conda env, return error if fails
try:
tmp_env_name = str(time.time())
rm_env_cmd = "conda env remove --name {}".format(tmp_env_name)
yaml_path = os.path.join(unzipped_path,"conda.yaml")
create_env_cmd = "conda env create -f {} -n {}".format(yaml_path,tmp_env_name)
subprocess.run(create_env_cmd, capture_output=True, shell=True, executable='/bin/bash', check=True)
except subprocess.CalledProcessError as e:
print(e)
subprocess.run(rm_env_cmd, capture_output=True, shell=True, executable='/bin/bash')
output_dict["env_error"] = e.stderr.decode()
return output_dict
# test model in env with sample input file, return error if fails
try:
test_model_cmd = """
source activate {};
python test_chassis_model.py {} {}
""".format(tmp_env_name,unzipped_path,sample_input_path)
test_ret = subprocess.run(test_model_cmd, capture_output=True, shell=True, executable='/bin/bash', check=True)
output_dict["model_output"] = test_ret.stdout.decode()
except subprocess.CalledProcessError as e:
subprocess.run(rm_env_cmd, capture_output=True, shell=True, executable='/bin/bash')
output_dict["model_error"] = e.stderr.decode()
return output_dict
# if we make it here, test was successful, remove env and return output
subprocess.run(rm_env_cmd, capture_output=True, shell=True, executable='/bin/bash')
return output_dict