diff --git a/README.md b/README.md index d68077828..dc7d589bf 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,29 @@ Clipper is a prediction serving system that sits between user-facing application * Clipper **improves prediction accuracy** by introducing state-of-the-art bandit and ensemble methods to intelligently select and combine predictions and achieve real-time personalization across machine learning frameworks. *Clipper makes users happy.* +## Kubernetes quickstart + +Dependencies: + + * [Docker](https://www.docker.com/) + * [Minikube](https://github.com/kubernetes/minikube/releases) + +```bash +# (Optional) create clipper virtualenv +pyenv virtualenv 2.7.11 clipper +pyenv local clipper + +# install clipper_admin (editable) +pip install -e . + +# start minikube +minikube start --insecure-registry localhost:5000 + +# start docker and configure to use minikube registry +systemctl start docker +eval $(minikube docker-env) +``` + ## Quickstart diff --git a/clipper_admin/clipper_k8s.py b/clipper_admin/clipper_k8s.py new file mode 100644 index 000000000..6b4e918bf --- /dev/null +++ b/clipper_admin/clipper_k8s.py @@ -0,0 +1,168 @@ +"""Clipper Kubernetes Utilities""" +# TODO: include labels (used by clipper.stop_all) +# TODO: deletion methods + +from contextlib import contextmanager +from kubernetes import client, config +from kubernetes.client.rest import ApiException +import logging +import json +import yaml + +import clipper_manager + +@contextmanager +def _pass_conflicts(): + try: + yield + except ApiException as e: + body = json.loads(e.body) + if body['reason'] == 'AlreadyExists': + logging.info("{} already exists, skipping!".format(body['details'])) + pass + +class ClipperK8s: + # TODO: subclass ContainerManager interface + def __init__(self): + config.load_kube_config() + self._k8s_v1 = client.CoreV1Api() + self._k8s_beta = client.ExtensionsV1beta1Api() + + def start(self): + self._start_clipper() + + # NOTE: this provides a minikube accessible docker registry for convenience during dev + # TODO: should not be required, as `deploy_model` should be able to pull from any accessible `repo` + self._start_registry() + + def deploy_model(self, name, version, repo): + """Deploys a versioned model to a k8s cluster. + + Parameters + ---------- + name : str + The name to assign this model. + version : int + The version to assign this model. + repo : str + A docker repository path, which must be accessible by the k8s cluster. + """ + with _pass_conflicts(): + # TODO: handle errors where `repo` is not accessible + self._k8s_beta.create_namespaced_deployment( + body={ + 'apiVersion': 'extensions/v1beta1', + 'kind': 'Deployment', + 'metadata': { + 'name': name + '-deployment' # NOTE: must satisfy RFC 1123 pathname conventions + }, + 'spec': { + 'replicas': 1, + 'template': { + 'metadata': { + 'labels': { + clipper_manager.CLIPPER_MODEL_CONTAINER_LABEL: '', + 'model': name, + 'version': str(version) + } + }, + 'spec': { + 'containers': [ + { + 'name': name, + 'image': repo, + 'ports': [ + { + 'containerPort': 80 + } + ], + 'env': [ + { + 'name': 'CLIPPER_MODEL_NAME', + 'value': name + }, + { + 'name': 'CLIPPER_MODEL_VERSION', + 'value': str(version) + }, + { + 'name': 'CLIPPER_IP', + 'value': 'query-frontend' + } + ] + } + ] + } + } + } + }, namespace='default') + + def stop_all_model_deployments(self): + """Stops all deployments of pods running Clipper models.""" + # TODO: stopping deploy doesn't stop replicaset, maybe this is a kubernetes python API bug? + # either way, need to manually stop for now + logging.info("Stopping all running Clipper model deployments") + try: + resp = self._k8s_beta.delete_collection_namespaced_deployment( + namespace='default', + label_selector=clipper_manager.CLIPPER_MODEL_CONTAINER_LABEL) + except ApiException as e: + logging.warn("Exception deleting k8s deployments: {}".format(e)) + + def stop_clipper_resources(self): + """Stops all Clipper resources. + + WARNING: Data stored on an in-cluster Redis deployment will be lost! This method does not delete + any existing in-cluster Docker registry. + """ + # TODO: stopping deploy doesn't stop replicaset, maybe this is a kubernetes python API bug? + # either way, need to manually stop for now + logging.info("Stopping all running Clipper resources") + + try: + for service in self._k8s_v1.list_namespaced_service( + namespace='default', + label_selector=clipper_manager.CLIPPER_DOCKER_LABEL).items: + # TODO: use delete collection of services if API provides + service_name = service.metadata.name + self._k8s_v1.delete_namespaced_service( + namespace='default', + name=service_name) + + self._k8s_beta.delete_collection_namespaced_deployment( + namespace='default', + label_selector=clipper_manager.CLIPPER_DOCKER_LABEL) + + self._k8s_v1.delete_collection_namespaced_pod( + namespace='default', + label_selector=clipper_manager.CLIPPER_DOCKER_LABEL) + + self._k8s_v1.delete_collection_namespaced_pod( + namespace='default', + label_selector=clipper_manager.CLIPPER_MODEL_CONTAINER_LABEL) + except ApiException as e: + logging.warn("Exception deleting k8s resources: {}".format(e)) + + def _start_clipper(self): + """Deploys Clipper to the k8s cluster and exposes the frontends as services.""" + logging.info("Initializing Clipper services to k8s cluster") + for name in ['mgmt-frontend', 'query-frontend', 'redis']: + with _pass_conflicts(): + self._k8s_beta.create_namespaced_deployment( + body=yaml.load(open('k8s/clipper/{}-deployment.yaml'.format(name))), namespace='default') + with _pass_conflicts(): + self._k8s_v1.create_namespaced_service( + body=yaml.load(open('k8s/clipper/{}-service.yaml'.format(name))), namespace='default') + + def _start_registry(self): + logging.info("Initializing Docker registry on k8s cluster") + with _pass_conflicts(): + self._k8s_v1.create_namespaced_replication_controller( + body=yaml.load(open('k8s/minikube-registry/kube-registry-replication-controller.yaml')), namespace='kube-system') + with _pass_conflicts(): + self._k8s_v1.create_namespaced_service( + body=yaml.load(open('k8s/minikube-registry/kube-registry-service.yaml')), namespace='kube-system') + with _pass_conflicts(): + self._k8s_beta.create_namespaced_daemon_set( + body=yaml.load(open('k8s/minikube-registry/kube-registry-daemon-set.yaml')), namespace='kube-system') + diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index 3cd8a87e9..13ecce23f 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -1,20 +1,21 @@ """Clipper Management Utilities""" from __future__ import print_function, with_statement, absolute_import +import docker from fabric.api import * from fabric.contrib.files import append import os import requests import json import yaml -import pprint import subprocess32 as subprocess import shutil from sklearn import base from sklearn.externals import joblib from cStringIO import StringIO -import sys from .cloudpickle import CloudPickler +from .clipper_k8s import ClipperK8s +import logging import time import re @@ -33,16 +34,15 @@ REDIS_RESOURCE_DB_NUM = 4 REDIS_APPLICATION_DB_NUM = 5 -DEFAULT_REDIS_IP = "redis" DEFAULT_REDIS_PORT = 6379 -CLIPPER_QUERY_PORT = 1337 -CLIPPER_MANAGEMENT_PORT = 1338 +CLIPPER_QUERY_PORT = 32595 +CLIPPER_MANAGEMENT_PORT = 31725 CLIPPER_RPC_PORT = 7000 CLIPPER_LOGS_PATH = "/tmp/clipper-logs" CLIPPER_DOCKER_LABEL = "ai.clipper.container.label" -CLIPPER_MODEL_CONTAINER_LABEL = "ai.clipper.model_container.model_version" +CLIPPER_MODEL_CONTAINER_LABEL = "ai.clipper.model_container.label" DEFAULT_MODEL_VERSION = 1 DEFAULT_DEFAULT_OUTPUT = "None" @@ -67,13 +67,7 @@ class ClipperManagerException(Exception): class Clipper: """ - Connection to a Clipper instance for administrative purposes. - - Sets up the machine for running Clipper. This includes verifying - SSH credentials and initializing Docker. - - Docker and docker-compose must already by installed on the machine - before connecting to a machine. + Connection to a Clipper instance running on k8s for administrative purposes. Parameters ---------- @@ -82,23 +76,8 @@ class Clipper: should allow passwordless SSH access. user : str, optional The SSH username. This field must be specified if `host` is not local. - key_path : str, optional. - The path to the SSH private key. This field must be specified if `host` is not local. sudo : bool, optional. Specifies level of execution for docker commands (sudo if true, standard if false). - ssh_port : int, optional - The SSH port to use. Default is port 22. - check_for_docker : bool, optional - If True, checks that Docker is running on the host machine. Default is True. - redis_port : int, optional - The port to use for connecting to redis. Default is port 6379. - redis_ip : string, optional - The ip address of the redis instance that Clipper should use. - If unspecified, a docker container running redis will be started - on `host` at the port specified by `redis_port`. - redis_persistence_path : string, optional - The directory path to which redis data should be persisted. The directory - should not already exist. If unspecified, redis will not persist data to disk. restart_containers : bool, optional If true, containers will restart on failure. If false, containers will not restart automatically. @@ -107,133 +86,13 @@ class Clipper: def __init__(self, host, - user=None, - key_path=None, sudo=False, - ssh_port=22, - check_for_docker=True, - redis_ip=DEFAULT_REDIS_IP, - redis_port=DEFAULT_REDIS_PORT, - redis_persistence_path=None, - restart_containers=False): - self.redis_ip = redis_ip - self.redis_port = redis_port - self.docker_compost_dict = { - 'networks': { - 'default': { - 'external': { - 'name': DOCKER_NW - } - } - }, - 'services': { - 'mgmt_frontend': { - 'command': [ - '--redis_ip=%s' % self.redis_ip, - '--redis_port=%d' % self.redis_port - ], - 'image': - 'clipper/management_frontend:latest', - 'ports': [ - '%d:%d' % (CLIPPER_MANAGEMENT_PORT, - CLIPPER_MANAGEMENT_PORT) - ], - 'labels': { - CLIPPER_DOCKER_LABEL: "" - } - }, - 'query_frontend': { - 'command': [ - '--redis_ip=%s' % self.redis_ip, - '--redis_port=%d' % self.redis_port - ], - 'depends_on': ['mgmt_frontend'], - 'image': - 'clipper/query_frontend:latest', - 'ports': [ - '%d:%d' % (CLIPPER_RPC_PORT, CLIPPER_RPC_PORT), - '%d:%d' % (CLIPPER_QUERY_PORT, CLIPPER_QUERY_PORT) - ], - 'labels': { - CLIPPER_DOCKER_LABEL: "" - } - } - }, - 'version': '2' - } - start_redis = (self.redis_ip == DEFAULT_REDIS_IP) - if start_redis: - self.docker_compost_dict['services']['redis'] = { - 'image': 'redis:alpine', - 'ports': ['%d:%d' % (self.redis_port, self.redis_port)], - 'command': "redis-server --port %d" % self.redis_port, - 'labels': { - CLIPPER_DOCKER_LABEL: "" - } - } - self.docker_compost_dict['services']['mgmt_frontend'][ - 'depends_on'] = ['redis'] - self.docker_compost_dict['services']['query_frontend'][ - 'depends_on'].append('redis') - if redis_persistence_path: - if not os.path.exists(redis_persistence_path): - self.docker_compost_dict['services']['redis'][ - 'volumes'] = ['%s:/data' % redis_persistence_path] - else: - print( - "The directory specified by the redis persistence path already exists" - ) - raise ClipperManagerException( - "The directory specified by the redis persistence path already exists" - ) - self.restart_containers = restart_containers - if self.restart_containers: - self.docker_compost_dict['services']['mgmt_frontend'][ - 'restart'] = 'always' - self.docker_compost_dict['services']['query_frontend'][ - 'restart'] = 'always' - if start_redis: - self.docker_compost_dict['services']['redis'][ - 'restart'] = 'always' - + restart_containers=True): + # TODO: support deploying redis host off-cluster by taking redis_ip as constructor param to ClipperK8s + logging.basicConfig(level=logging.INFO) + self.clipper_k8s = ClipperK8s() self.sudo = sudo self.host = host - if self._host_is_local(): - self.host = "localhost" - env.host_string = self.host - else: - if not user or not key_path: - print( - "user and key_path must be specified when instantiating Clipper with a nonlocal host" - ) - raise ClipperManagerException( - "user and key_path must be specified when instantiating Clipper with a nonlocal host" - ) - env.user = user - env.key_filename = key_path - env.host_string = "%s:%d" % (host, ssh_port) - if check_for_docker: - # Make sure docker is running on cluster - self._start_docker_if_necessary() - - def _host_is_local(self): - return self.host in LOCAL_HOST_NAMES - - def _start_docker_if_necessary(self): - with hide("warnings", "output", "running"): - print("Checking if Docker is running...") - self._execute_root("docker ps") - dc_installed = self._execute_root( - "docker-compose --version", warn_only=True) - if dc_installed.return_code != 0: - print("docker-compose not installed on host.") - raise ClipperManagerException( - "docker-compose not installed on host.") - nw_create_command = ("docker network create --driver bridge {nw}" - .format(nw=DOCKER_NW)) - self._execute_root(nw_create_command, warn_only=True) - self._execute_standard( - "mkdir -p {model_repo}".format(model_repo=MODEL_REPO)) def _execute_root(self, *args, **kwargs): if not self.sudo: @@ -310,17 +169,8 @@ def start(self): """Start a Clipper instance. """ - with hide("output", "warnings", "running"): - self._execute_standard("rm -f docker-compose.yml") - self._execute_append("docker-compose.yml", - yaml.dump( - self.docker_compost_dict, - default_flow_style=False)) - print( - "Note: Docker must download the Clipper Docker images if they are not already cached. This may take awhile." - ) - self._execute_root("docker-compose up -d query_frontend") - print("Clipper is running") + self.clipper_k8s.start() + logging.info("Clipper is running") def register_application(self, name, model, input_type, default_output, slo_micros): @@ -341,8 +191,8 @@ def register_application(self, name, model, input_type, default_output, by the end of the latency objective. slo_micros : int The query latency objective for the application in microseconds. - This is the processing latency between Clipper receiving a request - and sending a response. It does not account for network latencies + This is the processing latency between Clipper receiving a request + and sending a response. It does not account for network latencies before a request is received or after a response is sent. If Clipper cannot process a query within the latency objective, @@ -368,7 +218,7 @@ def register_application(self, name, model, input_type, default_output, }) headers = {'Content-type': 'application/json'} r = requests.post(url, headers=headers, data=req_json) - print(r.text) + logging.info(r.text) return r.status_code == requests.codes.ok def get_all_apps(self, verbose=False): @@ -396,7 +246,7 @@ def get_all_apps(self, verbose=False): if r.status_code == requests.codes.ok: return r.json() else: - print(r.text) + logging.warn(r.text) return None def get_app_info(self, name): @@ -426,7 +276,7 @@ def get_app_info(self, name): return None return app_info else: - print(r.text) + logging.warn(r.text) return None def deploy_model(self, @@ -464,52 +314,37 @@ def deploy_model(self, The number of replicas of the model to create. More replicas can be created later as well. Defaults to 1. """ - with hide("warnings", "output", "running"): - if isinstance(model_data, base.BaseEstimator): - fname = name.replace("/", "_") - pkl_path = '/tmp/%s/%s.pkl' % (fname, fname) - model_data_path = "/tmp/%s" % fname - try: - os.mkdir(model_data_path) - except OSError: - pass - joblib.dump(model_data, pkl_path) - elif isinstance(model_data, str): - # assume that model_data is a path to the serialized model - model_data_path = model_data - print("model_data_path is: %s" % model_data_path) - else: - warn("%s is invalid model format" % str(type(model_data))) - return False - - version = str(version) - vol = "{model_repo}/{name}/{version}".format( - model_repo=MODEL_REPO, name=name, version=version) - # publish model to Clipper and verify success before copying model - # parameters to Clipper and starting containers - if not self._publish_new_model( - name, version, labels, input_type, container_name, - os.path.join(vol, os.path.basename(model_data_path))): - return False - print("Published model to Clipper") - - if (not self._put_container_on_host(container_name)): - return False - - # Put model parameter data on host - with hide("warnings", "output", "running"): - self._execute_standard("mkdir -p {vol}".format(vol=vol)) - - with cd(vol): - with hide("warnings", "output", "running"): - self._execute_put(model_data_path, vol) - - print("Copied model data to host") - # aggregate results of starting all containers - return all([ - self.add_container(name, version) - for r in range(num_containers) - ]) + vol = "{model_repo}/{name}/{version}".format( + model_repo=MODEL_REPO, name=name, version=version) + + # prepare docker image build dir + with open(model_data_path + '/Dockerfile', 'w') as f: + f.write("FROM {container_name}\nCOPY . /model/.\n".format(container_name=container_name)) + + # build, tag, and push docker image to registry + # NOTE: DOCKER_API_VERSION (set by `minikube docker-env`) must be same version as docker registry server + repo = '{docker_registry}/{name}:{version}'.format( + docker_registry='localhost:5000', # TODO: make configurable + name=name, + version=version) + docker_client = docker.from_env(version=os.environ["DOCKER_API_VERSION"]) + logging.info("Building model Docker image at {}".format(model_data_path)) + docker_client.images.build( + path=model_data_path, + tag=repo) + logging.info("Pushing model Docker image to {}".format(repo)) + docker_client.images.push(repository=repo) + + logging.info("Creating model deployment on k8s") + self.clipper_k8s.deploy_model(name, version, repo) + + # TODO: replace `model_data_path` in `_publish_new_model` with the docker repo + # publish model to Clipper and verify success before copying model + # parameters to Clipper and starting containers + logging.info("Publishing model to Clipper query manager") + self._publish_new_model(name, version, labels, input_type, container_name, repo) + + logging.info("Done deploying!") def register_external_model(self, name, @@ -529,6 +364,7 @@ def register_external_model(self, labels : list of str, optional A list of strings annotating the model. """ + # TODO: this could be implemented by taking a docker repo and deploying a container with it version = str(version) return self._publish_new_model(name, version, labels, input_type, EXTERNALLY_MANAGED_MODEL, @@ -560,7 +396,7 @@ def _save_python_function(self, name, predict_function): conda_env_exported = self._export_conda_env(environment_file_abs_path) if conda_env_exported: - print("Anaconda environment found. Verifying packages.") + logging.info("Anaconda environment found. Verifying packages.") # Confirm that packages installed through conda are solvable # Write out conda and pip dependency files to be supplied to container @@ -569,9 +405,9 @@ def _save_python_function(self, name, predict_function): conda_dep_fname, pip_dep_fname)): return False - print("Supplied environment details") + logging.info("Supplied environment details") else: - print( + logging.warn( "Warning: Anaconda environment was either not found or exporting the environment " "failed. Your function will still be serialized and deployed, but may fail due to " "missing dependencies. In this case, please re-run inside an Anaconda environment. " @@ -582,7 +418,7 @@ def _save_python_function(self, name, predict_function): func_file_path = os.path.join(serialization_dir, predict_fname) with open(func_file_path, "w") as serialized_function_file: serialized_function_file.write(serialized_prediction_function) - print("Serialized and supplied predict function") + logging.info("Serialized and supplied predict function") return serialization_dir def deploy_pyspark_model(self, @@ -652,7 +488,7 @@ def deploy_pyspark_model(self, else: pyspark_model.save(sc, spark_model_save_loc) except Exception as e: - print("Error saving spark model: %s" % e) + logging.error("Error saving spark model: %s" % e) raise e pyspark_container = "clipper/pyspark-container" @@ -663,7 +499,7 @@ def deploy_pyspark_model(self, "w") as metadata_file: json.dump({"model_class": model_class}, metadata_file) - print("Spark model saved") + logging.info("Spark model saved") # Deploy model deploy_result = self.deploy_model(name, version, serialization_dir, @@ -751,9 +587,9 @@ def _register_app_and_check_success(self, name, input_type, default_output, slo_micros): if self.register_application(name, name, input_type, default_output, slo_micros): - print("Application registration sucessful! Deploying model.") + logging.info("Application registration sucessful! Deploying model.") return True - print("Application registration unsuccessful. Will not deploy model.") + logging.warn("Application registration unsuccessful. Will not deploy model.") return False def register_app_and_deploy_predict_function( @@ -785,8 +621,8 @@ def register_app_and_deploy_predict_function( The version to assign the deployed model. slo_micros : int The query latency objective for the application in microseconds. - This is the processing latency between Clipper receiving a request - and sending a response. It does not account for network latencies + This is the processing latency between Clipper receiving a request + and sending a response. It does not account for network latencies before a request is received or after a response is sent. labels : list of str, optional A list of strings annotating the model. @@ -845,8 +681,8 @@ def register_app_and_deploy_pyspark_model( The version to assign the deployed model. slo_micros : int, optional The query latency objective for the application in microseconds. - This is the processing latency between Clipper receiving a request - and sending a response. It does not account for network latencies + This is the processing latency between Clipper receiving a request + and sending a response. It does not account for network latencies before a request is received or after a response is sent. labels : list of str, optional A list of strings annotating the model. @@ -885,7 +721,7 @@ def get_all_models(self, verbose=False): if r.status_code == requests.codes.ok: return r.json() else: - print(r.text) + logging.info(r.text) return None def get_model_info(self, model_name, model_version): @@ -920,7 +756,7 @@ def get_model_info(self, model_name, model_version): return None return app_info else: - print(r.text) + logging.info(r.text) return None def get_all_containers(self, verbose=False): @@ -946,7 +782,7 @@ def get_all_containers(self, verbose=False): if r.status_code == requests.codes.ok: return r.json() else: - print(r.text) + logging.info(r.text) return None def get_container_info(self, model_name, model_version, replica_id): @@ -983,7 +819,7 @@ def get_container_info(self, model_name, model_version, replica_id): return None return app_info else: - print(r.text) + logging.info(r.text) return None def _inspect_selection_policy(self, app_name, uid): @@ -1043,7 +879,7 @@ def _check_and_write_dependencies(self, environment_path, directory, If packages listed in specified conda environment file have conflicting dependencies, this function will warn the user and return False. - If there are no conflicting package dependencies, existence of the packages in the + If there are no conflicting package dependencies, existence of the packages in the container conda channel is tested. The user is warned about any missing packages. All existing conda packages are written out to `conda_dep_fname` and pip packages to `pip_dep_fname` in the given `directory`. This function then returns True. @@ -1066,7 +902,7 @@ def _check_and_write_dependencies(self, environment_path, directory, on the container os. Otherwise returns False. """ if "CONDA_PREFIX" not in os.environ: - print("No Anaconda environment found") + logging.info("No Anaconda environment found") return False root_prefix = os.environ["CONDA_PREFIX"].split("envs")[0] @@ -1085,87 +921,10 @@ def _check_and_write_dependencies(self, environment_path, directory, stderr=subprocess.PIPE, shell=True) out, err = process.communicate() - print(out) - print(err) + logging.info(out) + logging.info(err) return process.returncode == 0 - def add_container(self, model_name, model_version): - """Create a new container for an existing model. - - Starts a new container for a model that has already been added to - Clipper. Note that models are uniquely identified by both name - and version, so this method will fail if you have not already called - `Clipper.deploy_model()` for the specified name and version. - - Parameters - ---------- - model_name : str - The name of the model - model_version : Any object with a string representation (with __str__ implementation) - The version of the model - - Returns - ---------- - bool - True if the container was added successfully and False - if the container could not be added. - """ - model_version = str(model_version) - with hide("warnings", "output", "running"): - # Look up model info in Redis - if self.redis_ip == DEFAULT_REDIS_IP: - redis_host = self.host - else: - redis_host = self.redis_ip - model_key = "{mn}:{mv}".format(mn=model_name, mv=model_version) - result = local( - "redis-cli -h {host} -p {redis_port} -n {db} hgetall {key}". - format( - host=redis_host, - redis_port=self.redis_port, - key=model_key, - db=REDIS_MODEL_DB_NUM), - capture=True) - print(result) - - if "empty list or set" in result.stdout: - # Model not found - warn("Trying to add container but model {mn}:{mv} not in " - "Redis".format(mn=model_name, mv=model_version)) - return False - - splits = result.stdout.split("\n") - model_metadata = dict([(splits[i].strip(), splits[i + 1].strip()) - for i in range(0, len(splits), 2)]) - image_name = model_metadata["container_name"] - model_data_path = model_metadata["model_data_path"] - model_input_type = model_metadata["input_type"] - restart_policy = 'always' if self.restart_containers else 'no' - - if image_name != EXTERNALLY_MANAGED_MODEL: - # Start container - add_container_cmd = ( - "docker run -d --network={nw} --restart={restart_policy} -v {path}:/model:ro " - "-e \"CLIPPER_MODEL_NAME={mn}\" -e \"CLIPPER_MODEL_VERSION={mv}\" " - "-e \"CLIPPER_IP=query_frontend\" -e \"CLIPPER_INPUT_TYPE={mip}\" -l \"{clipper_label}\" -l \"{mv_label}\" " - "{image}".format( - path=model_data_path, - nw=DOCKER_NW, - image=image_name, - mn=model_name, - mv=model_version, - mip=model_input_type, - clipper_label=CLIPPER_DOCKER_LABEL, - mv_label="%s=%s:%s" % (CLIPPER_MODEL_CONTAINER_LABEL, - model_name, model_version), - restart_policy=restart_policy)) - result = self._execute_root(add_container_cmd) - return result.return_code == 0 - else: - print("Cannot start containers for externally managed model %s" - % model_name) - return False - def get_clipper_logs(self): """Copies the logs from all Docker containers running on the host machine that have been tagged with the Clipper label (ai.clipper.container.label) into @@ -1176,33 +935,14 @@ def get_clipper_logs(self): list(str) Returns a list of local filenames containing the Docker container log snapshots. """ - container_ids = self._get_clipper_container_ids() cur_time_logs_path = os.path.join(CLIPPER_LOGS_PATH, time.strftime("%Y%m%d-%H%M%S")) if not os.path.exists(cur_time_logs_path): os.makedirs(cur_time_logs_path) log_file_names = [] - for container in container_ids: - output = self._execute_root( - "docker logs {container}".format(container=container)) - cur_log_fname = os.path.join(cur_time_logs_path, - "%s-container.log" % container) - with open(cur_log_fname, "w") as f: - f.write(output) - log_file_names.append(cur_log_fname) + # TODO: implement, or update docs to point to kubectl logs return log_file_names - def _get_clipper_container_ids(self): - """ - Gets the container IDs of all containers labeled with the clipper label - """ - containers = self._execute_root( - "docker ps -aq --filter label={clipper_label}".format( - clipper_label=CLIPPER_DOCKER_LABEL)) - ids = [l.strip() for l in containers.split("\n")] - print("Clipper container IDS found: %s" % str(ids)) - return ids - def inspect_instance(self): """Fetches metrics from the running Clipper instance. @@ -1251,9 +991,8 @@ def set_model_version(self, model_name, model_version, num_containers=0): }) headers = {'Content-type': 'application/json'} r = requests.post(url, headers=headers, data=req_json) - print(r.text) - for r in range(num_containers): - self.add_container(model_name, model_version) + logging.info(r.text) + # TODO: use k8s API to udpate model_version def remove_inactive_containers(self, model_name): """Removes all containers serving stale versions of the specified model. @@ -1266,53 +1005,18 @@ def remove_inactive_containers(self, model_name): """ # Get all Docker containers tagged as model containers num_containers_removed = 0 - with hide("output", "warnings", "running"): - containers = self._execute_root( - "docker ps -aq --filter label={model_container_label}".format( - model_container_label=CLIPPER_MODEL_CONTAINER_LABEL)) - if len(containers) > 0: - container_ids = [l.strip() for l in containers.split("\n")] - for container in container_ids: - # returns a string formatted as ":" - if self._host_is_local(): - container_model_name_and_version = self._execute_root( - "docker inspect --format \"{{ index .Config.Labels \\\"%s\\\"}}\" %s" - % (CLIPPER_MODEL_CONTAINER_LABEL, container)) - else: - container_model_name_and_version = self._execute_root( - "docker inspect --format \"{{ index .Config.Labels \\\\\"%s\\\\\"}}\" %s" - % (CLIPPER_MODEL_CONTAINER_LABEL, container)) - splits = container_model_name_and_version.split(":") - container_model_name = splits[0] - container_model_version = int(splits[1]) - if container_model_name == model_name: - # check if container_model_version is the currently deployed version - model_info = self.get_model_info( - container_model_name, container_model_version) - if model_info == None or not model_info["is_current_version"]: - self._execute_root("docker stop {container}". - format(container=container)) - self._execute_root("docker rm {container}".format( - container=container)) - num_containers_removed += 1 - print("Removed %d inactive containers for model %s" % + model_name = "" + # TODO: implement using self.get_model_info to check model_version + logging.info("Removed %d inactive containers for model %s" % (num_containers_removed, model_name)) return num_containers_removed def stop_all(self): - """Stops and removes all Clipper Docker containers on the host. - - """ - print("Stopping Clipper and all running models...") - with hide("output", "warnings", "running"): - container_ids = self._get_clipper_container_ids() - container_id_str = " ".join(container_ids) - self._execute_root( - "docker stop {ids}".format(ids=container_id_str), - warn_only=True) - self._execute_root( - "docker rm {ids}".format(ids=container_id_str), warn_only=True) + """Stops and removes all Clipper model deployments and Clipper resources.""" + self.clipper_k8s.stop_all_model_deployments() + self.clipper_k8s.stop_clipper_resources() + # TODO: provide registry image for k8s service instead of container_name and model_data_path def _publish_new_model(self, name, version, labels, input_type, container_name, model_data_path): url = "http://%s:%d/admin/add_model" % (self.host, @@ -1330,69 +1034,7 @@ def _publish_new_model(self, name, version, labels, input_type, if r.status_code == requests.codes.ok: return True else: - print("Error publishing model: %s" % r.text) - return False - - def _put_container_on_host(self, container_name): - """Puts the provided container on the host. - - Parameters - __________ - container_name : str - The name of the container. - - Notes - ----- - This method will first check the host, then Docker Hub, then the local - machine to find the container. - - This method is safe to call multiple times with the same container name. - Subsequent calls will detect that the container is already present on - the host and do nothing. - - """ - with hide("output", "warnings", "running"): - # first see if container is already present on host - host_result = self._execute_root( - "docker images -q {cn}".format(cn=container_name)) - if len(host_result.stdout) > 0: - print("Found %s on host" % container_name) - return True - # now try to pull from Docker Hub - hub_result = self._execute_root( - "docker pull {cn}".format(cn=container_name), warn_only=True) - if hub_result.return_code == 0: - print("Found %s in Docker hub" % container_name) - return True - - # assume container_name refers to a local container and - # copy it to host - local_result = local( - "docker images -q {cn}".format(cn=container_name)) - - if len(local_result.stdout) > 0: - saved_fname = container_name.replace("/", "_") - subprocess.call("docker save -o /tmp/{fn}.tar {cn}".format( - fn=saved_fname, cn=container_name)) - tar_loc = "/tmp/{fn}.tar".format(fn=saved_fname) - self._execute_put(tar_loc, tar_loc) - self._execute_root("docker load -i {loc}".format(loc=tar_loc)) - # self._execute_root("docker tag {image_id} {cn}".format( - # image_id=image_id, cn=cn)) - # now check to make sure we can access it - host_result = self._execute_root( - "docker images -q {cn}".format(cn=container_name)) - if len(host_result.stdout) > 0: - print("Successfuly copied %s to host" % container_name) - return True - else: - warn("Problem copying container %s to host" % - container_name) - return False - - # out of options - warn("Could not find %s, please try with a valid " - "container docker image") + logging.warn("Error publishing model: %s" % r.text) return False def deploy_R_model(self, @@ -1408,13 +1050,13 @@ def deploy_R_model(self, The name to assign this model. version : int The version to assign this model. - model_data : + model_data : The trained model to add to Clipper.The type has to be rpy2.robjects.vectors.ListVector, this is how python's rpy2 encapsulates any given R model.This model will be loaded into the Clipper model container and provided as an argument to the - predict function each time it is called. + predict function each time it is called. labels : list of str, optional - A set of strings annotating the model + A set of strings annotating the model num_containers : int, optional The number of replicas of the model to create. More replicas can be created later as well. Defaults to 1. @@ -1446,7 +1088,7 @@ def deploy_R_model(self, name, version, labels, input_type, container_name, os.path.join(vol, os.path.basename(model_data_path))): return False - print("Published model to Clipper") + logging.info("Published model to Clipper") # Put model parameter data on host with hide("warnings", "output", "running"): @@ -1455,7 +1097,7 @@ def deploy_R_model(self, with hide("output", "running"): self._execute_put(model_data_path, vol) - print("Copied model data to host") + logging.info("Copied model data to host") # aggregate results of starting all containers return all([ self.add_container(name, version) diff --git a/clipper_admin/tests/__init__.py b/clipper_admin/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/k8s/clipper/mgmt-frontend-deployment.yaml b/k8s/clipper/mgmt-frontend-deployment.yaml new file mode 100644 index 000000000..3b18a87fe --- /dev/null +++ b/k8s/clipper/mgmt-frontend-deployment.yaml @@ -0,0 +1,24 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + labels: + ai.clipper.container.label: "" + ai.clipper.name: mgmt-frontend + name: mgmt-frontend +spec: + replicas: 1 + template: + metadata: + labels: + ai.clipper.container.label: "" + ai.clipper.name: mgmt-frontend + spec: + containers: + - args: + - "--redis_ip=$(REDIS_SERVICE_HOST)" + - "--redis_port=$(REDIS_SERVICE_PORT)" + image: clipper/management_frontend:latest + name: mgmt-frontend + ports: + - containerPort: 1338 + restartPolicy: Always diff --git a/k8s/clipper/mgmt-frontend-service.yaml b/k8s/clipper/mgmt-frontend-service.yaml new file mode 100644 index 000000000..4cf5fa239 --- /dev/null +++ b/k8s/clipper/mgmt-frontend-service.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + ai.clipper.container.label: "" + ai.clipper.name: mgmt-frontend + name: mgmt-frontend +spec: + type: NodePort + ports: + - name: "1338" + port: 1338 + targetPort: 1338 + selector: + ai.clipper.name: mgmt-frontend diff --git a/k8s/clipper/query-frontend-deployment.yaml b/k8s/clipper/query-frontend-deployment.yaml new file mode 100644 index 000000000..f6a0bf4c6 --- /dev/null +++ b/k8s/clipper/query-frontend-deployment.yaml @@ -0,0 +1,25 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + labels: + ai.clipper.container.label: "" + ai.clipper.name: query-frontend + name: query-frontend +spec: + replicas: 1 + template: + metadata: + labels: + ai.clipper.container.label: "" + ai.clipper.name: query-frontend + spec: + containers: + - args: + - "--redis_ip=$(REDIS_SERVICE_HOST)" + - "--redis_port=$(REDIS_SERVICE_PORT)" + image: clipper/query_frontend:latest + name: query-frontend + ports: + - containerPort: 7000 + - containerPort: 1337 + restartPolicy: Always diff --git a/k8s/clipper/query-frontend-service.yaml b/k8s/clipper/query-frontend-service.yaml new file mode 100644 index 000000000..936253e0b --- /dev/null +++ b/k8s/clipper/query-frontend-service.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + ai.clipper.container.label: "" + ai.clipper.name: query-frontend + name: query-frontend +spec: + type: NodePort + ports: + - name: "7000" + port: 7000 + targetPort: 7000 + - name: "1337" + port: 1337 + targetPort: 1337 + selector: + ai.clipper.name: query-frontend diff --git a/k8s/clipper/redis-deployment.yaml b/k8s/clipper/redis-deployment.yaml new file mode 100644 index 000000000..cc7bce16c --- /dev/null +++ b/k8s/clipper/redis-deployment.yaml @@ -0,0 +1,25 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + labels: + ai.clipper.container.label: "" + ai.clipper.name: redis + name: redis +spec: + replicas: 1 + template: + metadata: + labels: + ai.clipper.container.label: "" + ai.clipper.name: redis + spec: + containers: + - args: + - redis-server + - --port + - "6379" + image: redis:alpine + name: redis + ports: + - containerPort: 6379 + restartPolicy: Always diff --git a/k8s/clipper/redis-service.yaml b/k8s/clipper/redis-service.yaml new file mode 100644 index 000000000..30941c9cc --- /dev/null +++ b/k8s/clipper/redis-service.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + ai.clipper.container.label: "" + ai.clipper.name: redis + name: redis +spec: + type: NodePort + ports: + - name: "6379" + port: 6379 + targetPort: 6379 + selector: + ai.clipper.name: redis diff --git a/k8s/minikube-registry/kube-registry-daemon-set.yaml b/k8s/minikube-registry/kube-registry-daemon-set.yaml new file mode 100644 index 000000000..1023ad7a7 --- /dev/null +++ b/k8s/minikube-registry/kube-registry-daemon-set.yaml @@ -0,0 +1,32 @@ +apiVersion: extensions/v1beta1 +kind: DaemonSet +metadata: + name: kube-registry-proxy + namespace: kube-system + labels: + k8s-app: kube-registry + kubernetes.io/cluster-service: "true" + version: v0.4 +spec: + template: + metadata: + labels: + k8s-app: kube-registry + version: v0.4 + spec: + containers: + - name: kube-registry-proxy + image: gcr.io/google_containers/kube-registry-proxy:0.4 + resources: + limits: + cpu: 100m + memory: 50Mi + env: + - name: REGISTRY_HOST + value: kube-registry.kube-system.svc.cluster.local + - name: REGISTRY_PORT + value: "5000" + ports: + - name: registry + containerPort: 80 + hostPort: 5000 diff --git a/k8s/minikube-registry/kube-registry-replication-controller.yaml b/k8s/minikube-registry/kube-registry-replication-controller.yaml new file mode 100644 index 000000000..75d273d2e --- /dev/null +++ b/k8s/minikube-registry/kube-registry-replication-controller.yaml @@ -0,0 +1,46 @@ +apiVersion: v1 +kind: ReplicationController +metadata: + name: kube-registry-v0 + namespace: kube-system + labels: + k8s-app: kube-registry + version: v0 +spec: + replicas: 1 + selector: + k8s-app: kube-registry + version: v0 + template: + metadata: + labels: + k8s-app: kube-registry + version: v0 + spec: + containers: + - name: registry + image: registry:2.5.1 + resources: + # keep request = limit to keep this container in guaranteed class + limits: + cpu: 100m + memory: 100Mi + requests: + cpu: 100m + memory: 100Mi + env: + - name: REGISTRY_HTTP_ADDR + value: :5000 + - name: REGISTRY_STORAGE_FILESYSTEM_ROOTDIRECTORY + value: /var/lib/registry + volumeMounts: + - name: image-store + mountPath: /var/lib/registry + ports: + - containerPort: 5000 + name: registry + protocol: TCP + volumes: + - name: image-store + hostPath: + path: /data/registry/ diff --git a/k8s/minikube-registry/kube-registry-service.yaml b/k8s/minikube-registry/kube-registry-service.yaml new file mode 100644 index 000000000..60a2a5836 --- /dev/null +++ b/k8s/minikube-registry/kube-registry-service.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: Service +metadata: + name: kube-registry + namespace: kube-system + labels: + k8s-app: kube-registry +spec: + selector: + k8s-app: kube-registry + ports: + - name: registry + port: 5000 + protocol: TCP diff --git a/setup.py b/setup.py index d48612309..8e83449a1 100644 --- a/setup.py +++ b/setup.py @@ -12,5 +12,6 @@ keywords=['clipper', 'prediction', 'model', 'management'], install_requires=[ 'requests', 'pyparsing', 'appdirs', 'pprint', 'subprocess32', - 'sklearn', 'numpy', 'scipy', 'fabric', 'pyyaml' + 'sklearn', 'numpy', 'scipy', 'fabric', 'pyyaml', 'abc', 'docker', + 'kubernetes' ])