From e5dd44dec3789473d2f682910f6f9dd0f71a31be Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Wed, 7 Jun 2017 18:56:36 -0700 Subject: [PATCH 01/24] Misc linter fixes --- clipper_admin/clipper_manager.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index 3df066360..7e5f5c956 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -7,13 +7,11 @@ import requests import json import yaml -import pprint import subprocess32 as subprocess import shutil from sklearn import base from sklearn.externals import joblib from cStringIO import StringIO -import sys from .cloudpickle import CloudPickler import time import re @@ -95,7 +93,7 @@ class Clipper: on `host` at the port specified by `redis_port`. redis_persistence_path : string, optional The directory path to which redis data should be persisted. The directory - should not already exist. If unspecified, redis will not persist data to disk. + should not already exist. If unspecified, redis will not persist data to disk. restart_containers : bool, optional If true, containers will restart on failure. If false, containers will not restart automatically. @@ -336,8 +334,8 @@ def register_application(self, name, model, input_type, default_output, by the end of the latency objective. slo_micros : int The query latency objective for the application in microseconds. - This is the processing latency between Clipper receiving a request - and sending a response. It does not account for network latencies + This is the processing latency between Clipper receiving a request + and sending a response. It does not account for network latencies before a request is received or after a response is sent. If Clipper cannot process a query within the latency objective, @@ -912,7 +910,7 @@ def _check_and_write_dependencies(self, environment_path, directory, If packages listed in specified conda environment file have conflicting dependencies, this function will warn the user and return False. - If there are no conflicting package dependencies, existence of the packages in the + If there are no conflicting package dependencies, existence of the packages in the container conda channel is tested. The user is warned about any missing packages. All existing conda packages are written out to `conda_dep_fname` and pip packages to `pip_dep_fname` in the given `directory`. This function then returns True. From d05a7733d9de7a63251ebb72f04853d01dc0367e Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Wed, 7 Jun 2017 19:34:49 -0700 Subject: [PATCH 02/24] Adds docker registry and builds model image --- clipper_admin/clipper_manager.py | 68 ++++++++++++++++++++------------ docker-compose.yml | 46 +++++++++++++++++++++ 2 files changed, 89 insertions(+), 25 deletions(-) create mode 100644 docker-compose.yml diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index 7e5f5c956..ae33277de 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -122,6 +122,13 @@ def __init__(self, } }, 'services': { + 'docker_registry': { + 'image': + 'registry:2.6.1', + 'ports': [ + '5000:5000' + ] + }, 'mgmt_frontend': { 'command': [ '--redis_ip=%s' % self.redis_ip, @@ -167,9 +174,9 @@ def __init__(self, } } self.docker_compost_dict['services']['mgmt_frontend'][ - 'depends_on'] = ['redis'] + 'depends_on'] = ['docker_registry', 'redis'] self.docker_compost_dict['services']['query_frontend'][ - 'depends_on'].append('redis') + 'depends_on'].extend(['docker_registry', 'redis']) if redis_persistence_path: if not os.path.exists(redis_persistence_path): self.docker_compost_dict['services']['redis'][ @@ -453,8 +460,8 @@ def deploy_model(self, with hide("warnings", "output", "running"): if isinstance(model_data, base.BaseEstimator): fname = name.replace("/", "_") - pkl_path = '/tmp/%s/%s.pkl' % (fname, fname) model_data_path = "/tmp/%s" % fname + pkl_path = '%s/%s.pkl' % (model_data_path, fname) try: os.mkdir(model_data_path) except OSError: @@ -470,31 +477,42 @@ def deploy_model(self, vol = "{model_repo}/{name}/{version}".format( model_repo=MODEL_REPO, name=name, version=version) - # publish model to Clipper and verify success before copying model - # parameters to Clipper and starting containers - if not self._publish_new_model( - name, version, labels, input_type, container_name, - os.path.join(vol, os.path.basename(model_data_path))): - return False - print("Published model to Clipper") + print(container_name) + + with open(model_data_path + '/Dockerfile', 'w') as f: + f.write("FROM {container_name}\nCOPY . /model/.\n".format(container_name=container_name)) + + self._execute_root("docker build -t {name} {model_data_path}/.".format( + name=name, model_data_path=model_data_path)) - if (not self._put_container_on_host(container_name)): - return False - # Put model parameter data on host - with hide("warnings", "output", "running"): - self._execute_standard("mkdir -p {vol}".format(vol=vol)) - with cd(vol): - with hide("warnings", "output", "running"): - self._execute_put(model_data_path, vol) - print("Copied model data to host") - # aggregate results of starting all containers - return all([ - self.add_container(name, version) - for r in range(num_containers) - ]) + # publish model to Clipper and verify success before copying model + # parameters to Clipper and starting containers + # if not self._publish_new_model( + # name, version, labels, input_type, container_name, + # os.path.join(vol, os.path.basename(model_data_path))): + # return False + # print("Published model to Clipper") + + # if (not self._put_container_on_host(container_name)): + # return False + + # # Put model parameter data on host + # with hide("warnings", "output", "running"): + # self._execute_standard("mkdir -p {vol}".format(vol=vol)) + + # with cd(vol): + # with hide("warnings", "output", "running"): + # self._execute_put(model_data_path, vol) + + # print("Copied model data to host") + # # aggregate results of starting all containers + # return all([ + # self.add_container(name, version) + # for r in range(num_containers) + # ]) def register_external_model(self, name, @@ -727,7 +745,7 @@ def centered_predict(inputs): default_python_container, input_type, labels, num_containers) # Remove temp files - shutil.rmtree(serialization_dir) + # shutil.rmtree(serialization_dir) return deploy_result diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 000000000..da52b2a14 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,46 @@ +networks: + default: + external: + name: clipper_nw +services: + docker_registry: + image: registry:2.6.1 + ports: + - 5000:5000 + mgmt_frontend: + command: + - --redis_ip=redis + - --redis_port=6379 + depends_on: + - docker_registry + - redis + image: clipper/management_frontend:latest + labels: + ai.clipper.container.label: '' + ports: + - 1338:1338 + restart: always + query_frontend: + command: + - --redis_ip=redis + - --redis_port=6379 + depends_on: + - mgmt_frontend + - docker_registry + - redis + image: clipper/query_frontend:latest + labels: + ai.clipper.container.label: '' + ports: + - 7000:7000 + - 1337:1337 + restart: always + redis: + command: redis-server --port 6379 + image: redis:alpine + labels: + ai.clipper.container.label: '' + ports: + - 6379:6379 + restart: always +version: '2' From 05cf81fdba378bf42e3b2be754f7c5901b647ace Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Wed, 7 Jun 2017 21:25:42 -0700 Subject: [PATCH 03/24] Adds k8s deployment creation --- clipper_admin/clipper_manager.py | 71 +++++++++++++++++++++++--------- setup.py | 2 +- 2 files changed, 53 insertions(+), 20 deletions(-) diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index ae33277de..a6c4b7204 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -477,16 +477,6 @@ def deploy_model(self, vol = "{model_repo}/{name}/{version}".format( model_repo=MODEL_REPO, name=name, version=version) - print(container_name) - - with open(model_data_path + '/Dockerfile', 'w') as f: - f.write("FROM {container_name}\nCOPY . /model/.\n".format(container_name=container_name)) - - self._execute_root("docker build -t {name} {model_data_path}/.".format( - name=name, model_data_path=model_data_path)) - - - # publish model to Clipper and verify success before copying model # parameters to Clipper and starting containers @@ -496,18 +486,59 @@ def deploy_model(self, # return False # print("Published model to Clipper") - # if (not self._put_container_on_host(container_name)): - # return False + # build and push docker registry image + with open(model_data_path + '/Dockerfile', 'w') as f: + f.write("FROM {container_name}\nCOPY . /model/.\n".format(container_name=container_name)) - # # Put model parameter data on host - # with hide("warnings", "output", "running"): - # self._execute_standard("mkdir -p {vol}".format(vol=vol)) + self._execute_root("docker build -t {name} {model_data_path}/.".format( + name=name, model_data_path=model_data_path)) - # with cd(vol): - # with hide("warnings", "output", "running"): - # self._execute_put(model_data_path, vol) + # TODO: push to docker registry + from kubernetes import client, config + config.load_kube_config() + k8s_beta = client.ExtensionsV1beta1Api() + k8s_beta.create_namespaced_deployment( + body={ + 'apiVersion': 'extensions/v1beta1', + 'kind': 'Deployment', + 'metadata': { + 'name': 'nginx-deployment' + }, + 'spec': { + 'replicas': 1, + 'template': { + 'metadata': { + 'labels': { + 'app': 'nginx' + } + }, + 'spec': { + 'containers': [ + { + 'name': 'nginx', + 'image': 'nginx:1.7.9', + 'ports': [ + { + 'containerPort': 80 + } + ] + } + ] + } + } + } + }, namespace='default') + k8s_bbeta.create_namespaced_service( + { + + }, namespace='default') + # TODO: error handling (e.g. if k8s resourcesalready exists) + + # to run built docker image on local docker daemon + # docker run -it --network=clipper_nw --restart=always -e "CLIPPER_MODEL_NAME=feature_sum_model" -e "CLIPPER_MODEL_VERSION=1" -e "CLIPPER_IP=query_frontend" -e "CLIPPER_INPUT_TYPE=doubles" -l "ai.clipper.container.label" -l "ai.clipper.model_container.model_version=feature_sum_model:1" feature_sum_model + + # TODO: generate a k8s service and pod manifest, deploy to k8s - # print("Copied model data to host") # # aggregate results of starting all containers # return all([ # self.add_container(name, version) @@ -1043,6 +1074,7 @@ def add_container(self, model_name, model_version): mv_label="%s=%s:%d" % (CLIPPER_MODEL_CONTAINER_LABEL, model_name, model_version), restart_policy=restart_policy)) + print(add_container_cmd) result = self._execute_root(add_container_cmd) return result.return_code == 0 else: @@ -1190,6 +1222,7 @@ def stop_all(self): self._execute_root( "docker rm {ids}".format(ids=container_id_str), warn_only=True) + # TODO: provide registry image for k8s service instead of container_name and model_data_path def _publish_new_model(self, name, version, labels, input_type, container_name, model_data_path): url = "http://%s:%d/admin/add_model" % (self.host, diff --git a/setup.py b/setup.py index d48612309..35085ecaf 100644 --- a/setup.py +++ b/setup.py @@ -12,5 +12,5 @@ keywords=['clipper', 'prediction', 'model', 'management'], install_requires=[ 'requests', 'pyparsing', 'appdirs', 'pprint', 'subprocess32', - 'sklearn', 'numpy', 'scipy', 'fabric', 'pyyaml' + 'sklearn', 'numpy', 'scipy', 'fabric', 'pyyaml', 'kubernetes' ]) From 81a40ed70f7609c24259185db46719d45e1e2388 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Wed, 7 Jun 2017 23:23:26 -0700 Subject: [PATCH 04/24] Deploy k8s service --- clipper_admin/clipper_manager.py | 160 +++++++++++++++++++++---------- docker-compose.yml | 6 -- setup.py | 3 +- 3 files changed, 113 insertions(+), 56 deletions(-) diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index a6c4b7204..41a91b781 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -122,13 +122,6 @@ def __init__(self, } }, 'services': { - 'docker_registry': { - 'image': - 'registry:2.6.1', - 'ports': [ - '5000:5000' - ] - }, 'mgmt_frontend': { 'command': [ '--redis_ip=%s' % self.redis_ip, @@ -174,9 +167,9 @@ def __init__(self, } } self.docker_compost_dict['services']['mgmt_frontend'][ - 'depends_on'] = ['docker_registry', 'redis'] + 'depends_on'] = ['redis'] self.docker_compost_dict['services']['query_frontend'][ - 'depends_on'].extend(['docker_registry', 'redis']) + 'depends_on'].append('redis') if redis_persistence_path: if not os.path.exists(redis_persistence_path): self.docker_compost_dict['services']['redis'][ @@ -486,53 +479,122 @@ def deploy_model(self, # return False # print("Published model to Clipper") - # build and push docker registry image + import docker + # prepare docker image build dir with open(model_data_path + '/Dockerfile', 'w') as f: f.write("FROM {container_name}\nCOPY . /model/.\n".format(container_name=container_name)) - self._execute_root("docker build -t {name} {model_data_path}/.".format( - name=name, model_data_path=model_data_path)) - - # TODO: push to docker registry - from kubernetes import client, config - config.load_kube_config() - k8s_beta = client.ExtensionsV1beta1Api() - k8s_beta.create_namespaced_deployment( - body={ - 'apiVersion': 'extensions/v1beta1', - 'kind': 'Deployment', - 'metadata': { - 'name': 'nginx-deployment' - }, - 'spec': { - 'replicas': 1, - 'template': { - 'metadata': { - 'labels': { - 'app': 'nginx' - } - }, - 'spec': { - 'containers': [ - { - 'name': 'nginx', - 'image': 'nginx:1.7.9', - 'ports': [ - { - 'containerPort': 80 - } - ] + # build, tag, and push docker image to registry + docker_client = docker.from_env(version=os.environ["DOCKER_API_VERSION"]) + # NOTE: this must be same version as docker registry server + + repo = '{docker_registry}/{name}:{version}'.format( + docker_registry='localhost:5000', # TODO: make configurable + name=name, + version=version) + docker_client.images.build( + path=model_data_path, + tag=repo) + docker_client.images.push(repository=repo) + + import yaml + import kubernetes + from kubernetes.client.rest import ApiException + # TODO: check before creating k8s resources + kubernetes.config.load_kube_config() + k8s_v1 = kubernetes.client.CoreV1Api() + k8s_beta = kubernetes.client.ExtensionsV1beta1Api() + # TODO: initializing the registry probably goes elsewhere + try: + k8s_v1.create_namespaced_replication_controller( + body=yaml.load(open('k8s/kube-registry-rc.yaml')), namespace='kube-system') + except ApiException: # already exists + pass + try: + k8s_v1.create_namespaced_service( + body=yaml.load(open('k8s/kube-registry-svc.yaml')), namespace='kube-system') + except ApiException: # already exists + pass + try: + k8s_beta.create_namespaced_daemon_set( + body=yaml.load(open('k8s/kube-registry-ds.yaml')), namespace='kube-system') + except ApiException: # already exists + pass + try: + k8s_beta.create_namespaced_deployment( + body={ + 'apiVersion': 'extensions/v1beta1', + 'kind': 'Deployment', + 'metadata': { + 'name': name + '-deployment' # NOTE: must satisfy RFC 1123 pathname conventions + }, + 'spec': { + 'replicas': 1, + 'template': { + 'metadata': { + 'labels': { + 'model': name, + 'version': str(version) } - ] + }, + 'spec': { + 'containers': [ + { + 'name': name, + 'image': repo, + 'ports': [ + { + 'containerPort': 80 + } + ], + 'env': [ + { + 'name': 'CLIPPER_MODEL_NAME', + 'value': 'feature-sum-model' + }, + { + 'name': 'CLIPPER_MODEL_VERSION', + 'value': '1' + }, + { + 'name': 'CLIPPER_IP', + 'value': '127.0.0.1' + } + ] + } + ] + } } } - } - }, namespace='default') - k8s_bbeta.create_namespaced_service( - { - + }, namespace='default') + except ApiException: # already exists + pass + try: + k8s_v1.create_namespaced_service( + body={ + 'apiVersion': 'v1', + 'kind': 'Service', + 'metadata': { + 'name': 'nginx' + }, + 'spec': { + 'ports': [ + { + 'nodePort': 32100, # TODO: remove when ClusterIP + 'port': 80, + 'protocol': 'TCP', + 'targetPort': 80 + } + ], + 'selector': { + 'app': 'nginx' + }, + 'type': 'NodePort' # TODO: use ClusterIP when query frontend is cluster-internal + } }, namespace='default') - # TODO: error handling (e.g. if k8s resourcesalready exists) + except ApiException: # already exists + pass + # TODO: better error handling # to run built docker image on local docker daemon # docker run -it --network=clipper_nw --restart=always -e "CLIPPER_MODEL_NAME=feature_sum_model" -e "CLIPPER_MODEL_VERSION=1" -e "CLIPPER_IP=query_frontend" -e "CLIPPER_INPUT_TYPE=doubles" -l "ai.clipper.container.label" -l "ai.clipper.model_container.model_version=feature_sum_model:1" feature_sum_model diff --git a/docker-compose.yml b/docker-compose.yml index da52b2a14..06f65897c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,16 +3,11 @@ networks: external: name: clipper_nw services: - docker_registry: - image: registry:2.6.1 - ports: - - 5000:5000 mgmt_frontend: command: - --redis_ip=redis - --redis_port=6379 depends_on: - - docker_registry - redis image: clipper/management_frontend:latest labels: @@ -26,7 +21,6 @@ services: - --redis_port=6379 depends_on: - mgmt_frontend - - docker_registry - redis image: clipper/query_frontend:latest labels: diff --git a/setup.py b/setup.py index 35085ecaf..85cf48e92 100644 --- a/setup.py +++ b/setup.py @@ -12,5 +12,6 @@ keywords=['clipper', 'prediction', 'model', 'management'], install_requires=[ 'requests', 'pyparsing', 'appdirs', 'pprint', 'subprocess32', - 'sklearn', 'numpy', 'scipy', 'fabric', 'pyyaml', 'kubernetes' + 'sklearn', 'numpy', 'scipy', 'fabric', 'pyyaml', 'docker', + 'kubernetes' ]) From b74f14fe71de05bed661899882f11745eeeea31d Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Wed, 7 Jun 2017 23:42:04 -0700 Subject: [PATCH 05/24] RPC heartbeats received on query --- README.md | 23 ++++++++++ clipper_admin/clipper_manager.py | 30 +++++++++---- k8s/minikube-registry/kube-registry-ds.yaml | 32 ++++++++++++++ k8s/minikube-registry/kube-registry-rc.yaml | 46 ++++++++++++++++++++ k8s/minikube-registry/kube-registry-svc.yaml | 14 ++++++ 5 files changed, 136 insertions(+), 9 deletions(-) create mode 100644 k8s/minikube-registry/kube-registry-ds.yaml create mode 100644 k8s/minikube-registry/kube-registry-rc.yaml create mode 100644 k8s/minikube-registry/kube-registry-svc.yaml diff --git a/README.md b/README.md index 3fde65b31..0eea68d32 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,29 @@ Clipper is a prediction serving system that sits between user-facing application * Clipper **improves prediction accuracy** by introducing state-of-the-art bandit and ensemble methods to intelligently select and combine predictions and achieve real-time personalization across machine learning frameworks. *Clipper makes users happy.* +## Kubernetes quickstart + +Dependencies: + + * [Docker](https://www.docker.com/) + * [Minikube](https://github.com/kubernetes/minikube/releases) + +```bash +# (Optional) create clipper virtualenv +pyenv virtualenv 2.7.11 clipper +pyenv local clipper + +# install clipper_admin (editable) +pip install -e . + +# start minikube +minikube start --insecure-registry localhost:5000 + +# start docker and configure to use minikube registry +systemctl start docker +eval $(minikube docker-env) +``` + ## Quickstart diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index 41a91b781..6dbd53445 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -492,10 +492,10 @@ def deploy_model(self, docker_registry='localhost:5000', # TODO: make configurable name=name, version=version) - docker_client.images.build( - path=model_data_path, - tag=repo) - docker_client.images.push(repository=repo) + # docker_client.images.build( + # path=model_data_path, + # tag=repo) + # docker_client.images.push(repository=repo) import yaml import kubernetes @@ -504,20 +504,32 @@ def deploy_model(self, kubernetes.config.load_kube_config() k8s_v1 = kubernetes.client.CoreV1Api() k8s_beta = kubernetes.client.ExtensionsV1beta1Api() - # TODO: initializing the registry probably goes elsewhere + # TODO: initializing clipper + # for name in ['mgmt-frontend', 'query-frontend', 'redis']: + # try: + # k8s_beta.create_namespaced_deployment( + # body=yaml.load(open('k8s/clipper/{}-deployment.yaml'.format(name))), namespace='default') + # except ApiException: + # pass + # try: + # k8s_v1.create_namespaced_service( + # body=yaml.load(open('k8s/clipper/{}-service.yaml'.format(name))), namespace='default') + # except ApiException: + # pass + # TODO: initializing registry probably goes elsewhere try: k8s_v1.create_namespaced_replication_controller( - body=yaml.load(open('k8s/kube-registry-rc.yaml')), namespace='kube-system') + body=yaml.load(open('k8s/minikube-registry/kube-registry-rc.yaml')), namespace='kube-system') except ApiException: # already exists pass try: k8s_v1.create_namespaced_service( - body=yaml.load(open('k8s/kube-registry-svc.yaml')), namespace='kube-system') + body=yaml.load(open('k8s/minikube-registry/kube-registry-svc.yaml')), namespace='kube-system') except ApiException: # already exists pass try: k8s_beta.create_namespaced_daemon_set( - body=yaml.load(open('k8s/kube-registry-ds.yaml')), namespace='kube-system') + body=yaml.load(open('k8s/minikube-registry/kube-registry-ds.yaml')), namespace='kube-system') except ApiException: # already exists pass try: @@ -558,7 +570,7 @@ def deploy_model(self, }, { 'name': 'CLIPPER_IP', - 'value': '127.0.0.1' + 'value': '10.0.2.2' # TODO: WTF magic IP that goes to host } ] } diff --git a/k8s/minikube-registry/kube-registry-ds.yaml b/k8s/minikube-registry/kube-registry-ds.yaml new file mode 100644 index 000000000..1023ad7a7 --- /dev/null +++ b/k8s/minikube-registry/kube-registry-ds.yaml @@ -0,0 +1,32 @@ +apiVersion: extensions/v1beta1 +kind: DaemonSet +metadata: + name: kube-registry-proxy + namespace: kube-system + labels: + k8s-app: kube-registry + kubernetes.io/cluster-service: "true" + version: v0.4 +spec: + template: + metadata: + labels: + k8s-app: kube-registry + version: v0.4 + spec: + containers: + - name: kube-registry-proxy + image: gcr.io/google_containers/kube-registry-proxy:0.4 + resources: + limits: + cpu: 100m + memory: 50Mi + env: + - name: REGISTRY_HOST + value: kube-registry.kube-system.svc.cluster.local + - name: REGISTRY_PORT + value: "5000" + ports: + - name: registry + containerPort: 80 + hostPort: 5000 diff --git a/k8s/minikube-registry/kube-registry-rc.yaml b/k8s/minikube-registry/kube-registry-rc.yaml new file mode 100644 index 000000000..75d273d2e --- /dev/null +++ b/k8s/minikube-registry/kube-registry-rc.yaml @@ -0,0 +1,46 @@ +apiVersion: v1 +kind: ReplicationController +metadata: + name: kube-registry-v0 + namespace: kube-system + labels: + k8s-app: kube-registry + version: v0 +spec: + replicas: 1 + selector: + k8s-app: kube-registry + version: v0 + template: + metadata: + labels: + k8s-app: kube-registry + version: v0 + spec: + containers: + - name: registry + image: registry:2.5.1 + resources: + # keep request = limit to keep this container in guaranteed class + limits: + cpu: 100m + memory: 100Mi + requests: + cpu: 100m + memory: 100Mi + env: + - name: REGISTRY_HTTP_ADDR + value: :5000 + - name: REGISTRY_STORAGE_FILESYSTEM_ROOTDIRECTORY + value: /var/lib/registry + volumeMounts: + - name: image-store + mountPath: /var/lib/registry + ports: + - containerPort: 5000 + name: registry + protocol: TCP + volumes: + - name: image-store + hostPath: + path: /data/registry/ diff --git a/k8s/minikube-registry/kube-registry-svc.yaml b/k8s/minikube-registry/kube-registry-svc.yaml new file mode 100644 index 000000000..60a2a5836 --- /dev/null +++ b/k8s/minikube-registry/kube-registry-svc.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: Service +metadata: + name: kube-registry + namespace: kube-system + labels: + k8s-app: kube-registry +spec: + selector: + k8s-app: kube-registry + ports: + - name: registry + port: 5000 + protocol: TCP From 93d20b595891f1fadd9940ae3f9a6fa1811a635c Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Wed, 7 Jun 2017 23:42:15 -0700 Subject: [PATCH 06/24] Adds clipper services to k8s, disabled --- k8s/clipper/mgmt-frontend-deployment.yaml | 29 +++++++++++++++++++++ k8s/clipper/mgmt-frontend-service.yaml | 18 +++++++++++++ k8s/clipper/query-frontend-deployment.yaml | 30 ++++++++++++++++++++++ k8s/clipper/query-frontend-service.yaml | 21 +++++++++++++++ k8s/clipper/redis-deployment.yaml | 30 ++++++++++++++++++++++ k8s/clipper/redis-service.yaml | 18 +++++++++++++ 6 files changed, 146 insertions(+) create mode 100644 k8s/clipper/mgmt-frontend-deployment.yaml create mode 100644 k8s/clipper/mgmt-frontend-service.yaml create mode 100644 k8s/clipper/query-frontend-deployment.yaml create mode 100644 k8s/clipper/query-frontend-service.yaml create mode 100644 k8s/clipper/redis-deployment.yaml create mode 100644 k8s/clipper/redis-service.yaml diff --git a/k8s/clipper/mgmt-frontend-deployment.yaml b/k8s/clipper/mgmt-frontend-deployment.yaml new file mode 100644 index 000000000..03e5a76df --- /dev/null +++ b/k8s/clipper/mgmt-frontend-deployment.yaml @@ -0,0 +1,29 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + annotations: + ai.clipper.container.label: "" + creationTimestamp: null + labels: + io.kompose.service: mgmt-frontend + name: mgmt-frontend +spec: + replicas: 1 + strategy: {} + template: + metadata: + creationTimestamp: null + labels: + io.kompose.service: mgmt-frontend + spec: + containers: + - args: + - --redis_ip=redis + - --redis_port=6379 + image: clipper/management_frontend:latest + name: mgmt-frontend + ports: + - containerPort: 1338 + resources: {} + restartPolicy: Always +status: {} diff --git a/k8s/clipper/mgmt-frontend-service.yaml b/k8s/clipper/mgmt-frontend-service.yaml new file mode 100644 index 000000000..8c7149405 --- /dev/null +++ b/k8s/clipper/mgmt-frontend-service.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +kind: Service +metadata: + annotations: + ai.clipper.container.label: "" + creationTimestamp: null + labels: + io.kompose.service: mgmt-frontend + name: mgmt-frontend +spec: + ports: + - name: "1338" + port: 1338 + targetPort: 1338 + selector: + io.kompose.service: mgmt-frontend +status: + loadBalancer: {} diff --git a/k8s/clipper/query-frontend-deployment.yaml b/k8s/clipper/query-frontend-deployment.yaml new file mode 100644 index 000000000..b0b84a1a0 --- /dev/null +++ b/k8s/clipper/query-frontend-deployment.yaml @@ -0,0 +1,30 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + annotations: + ai.clipper.container.label: "" + creationTimestamp: null + labels: + io.kompose.service: query-frontend + name: query-frontend +spec: + replicas: 1 + strategy: {} + template: + metadata: + creationTimestamp: null + labels: + io.kompose.service: query-frontend + spec: + containers: + - args: + - --redis_ip=redis + - --redis_port=6379 + image: clipper/query_frontend:latest + name: query-frontend + ports: + - containerPort: 7000 + - containerPort: 1337 + resources: {} + restartPolicy: Always +status: {} diff --git a/k8s/clipper/query-frontend-service.yaml b/k8s/clipper/query-frontend-service.yaml new file mode 100644 index 000000000..8f09ddaa7 --- /dev/null +++ b/k8s/clipper/query-frontend-service.yaml @@ -0,0 +1,21 @@ +apiVersion: v1 +kind: Service +metadata: + annotations: + ai.clipper.container.label: "" + creationTimestamp: null + labels: + io.kompose.service: query-frontend + name: query-frontend +spec: + ports: + - name: "7000" + port: 7000 + targetPort: 7000 + - name: "1337" + port: 1337 + targetPort: 1337 + selector: + io.kompose.service: query-frontend +status: + loadBalancer: {} diff --git a/k8s/clipper/redis-deployment.yaml b/k8s/clipper/redis-deployment.yaml new file mode 100644 index 000000000..ce4496927 --- /dev/null +++ b/k8s/clipper/redis-deployment.yaml @@ -0,0 +1,30 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + annotations: + ai.clipper.container.label: "" + creationTimestamp: null + labels: + io.kompose.service: redis + name: redis +spec: + replicas: 1 + strategy: {} + template: + metadata: + creationTimestamp: null + labels: + io.kompose.service: redis + spec: + containers: + - args: + - redis-server + - --port + - "6379" + image: redis:alpine + name: redis + ports: + - containerPort: 6379 + resources: {} + restartPolicy: Always +status: {} diff --git a/k8s/clipper/redis-service.yaml b/k8s/clipper/redis-service.yaml new file mode 100644 index 000000000..6ca665f30 --- /dev/null +++ b/k8s/clipper/redis-service.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +kind: Service +metadata: + annotations: + ai.clipper.container.label: "" + creationTimestamp: null + labels: + io.kompose.service: redis + name: redis +spec: + ports: + - name: "6379" + port: 6379 + targetPort: 6379 + selector: + io.kompose.service: redis +status: + loadBalancer: {} From 3c8e61124870388ea34b604ee34ecd7b8408d2e6 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Wed, 7 Jun 2017 23:53:20 -0700 Subject: [PATCH 07/24] Working end to end model deployment --- clipper_admin/clipper_manager.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index 6dbd53445..f5a5a49f6 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -473,11 +473,11 @@ def deploy_model(self, # publish model to Clipper and verify success before copying model # parameters to Clipper and starting containers - # if not self._publish_new_model( - # name, version, labels, input_type, container_name, - # os.path.join(vol, os.path.basename(model_data_path))): - # return False - # print("Published model to Clipper") + if not self._publish_new_model( + name, version, labels, input_type, container_name, + os.path.join(vol, os.path.basename(model_data_path))): + return False + print("Published model to Clipper") import docker # prepare docker image build dir From 1617ee767ae024fd3a535169edad8cbb8b6de92d Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Thu, 8 Jun 2017 09:30:06 -0700 Subject: [PATCH 08/24] Starts abstracting container runtime --- clipper_admin/clipper_k8s.py | 109 ++++++++++++++ clipper_admin/clipper_manager.py | 227 ++++------------------------- clipper_admin/container.py | 55 +++++++ clipper_admin/container_manager.py | 16 ++ setup.py | 2 +- 5 files changed, 208 insertions(+), 201 deletions(-) create mode 100644 clipper_admin/clipper_k8s.py create mode 100644 clipper_admin/container.py create mode 100644 clipper_admin/container_manager.py diff --git a/clipper_admin/clipper_k8s.py b/clipper_admin/clipper_k8s.py new file mode 100644 index 000000000..2eb4ff803 --- /dev/null +++ b/clipper_admin/clipper_k8s.py @@ -0,0 +1,109 @@ +"""Clipper Kubernetes Utilities""" +# TODO: better error handling, check if resources exist before creating +# TODO: include labels (used by clipper.stop_all) +# TODO: deletion methods + +import yaml +from kubernetes import client, config +from kubernetes.client.rest import ApiException + +class ClipperK8s: + def __init__(self): + config.load_kube_config() + # self.initialize_clipper() # NOTE: this allows containers to discover query_manager by DNS rather than IP, + # # but may couple too tightly to k8s + self.initialize_registry() # TODO: check doesn't exist before trying + self.k8s_v1 = client.CoreV1Api() + self.k8s_beta = client.ExtensionsV1beta1Api() + + def initialize_clipper(self): + for name in ['mgmt-frontend', 'query-frontend', 'redis']: + try: + self.k8s_beta.create_namespaced_deployment( + body=yaml.load(open('k8s/clipper/{}-deployment.yaml'.format(name))), namespace='default') + except ApiException: + pass + try: + self.k8s_v1.create_namespaced_service( + body=yaml.load(open('k8s/clipper/{}-service.yaml'.format(name))), namespace='default') + except ApiException: + pass + + def initialize_registry(self): + try: + self.k8s_v1.create_namespaced_replication_controller( + body=yaml.load(open('k8s/minikube-registry/kube-registry-rc.yaml')), namespace='kube-system') + except ApiException: # already exists + pass + try: + self.k8s_v1.create_namespaced_service( + body=yaml.load(open('k8s/minikube-registry/kube-registry-svc.yaml')), namespace='kube-system') + except ApiException: # already exists + pass + try: + self.k8s_beta.create_namespaced_daemon_set( + body=yaml.load(open('k8s/minikube-registry/kube-registry-ds.yaml')), namespace='kube-system') + except ApiException: # already exists + pass + + def deploy_model(self, name, version, repo): + """Deploys a versioned model to a k8s cluster. + + Parameters + ---------- + name : str + The name to assign this model. + version : int + The version to assign this model. + repo : str + A docker repository path, which must be accessible by the k8s cluster. + """ + try: + k8s_beta.create_namespaced_deployment( + body={ + 'apiVersion': 'extensions/v1beta1', + 'kind': 'Deployment', + 'metadata': { + 'name': name + '-deployment' # NOTE: must satisfy RFC 1123 pathname conventions + }, + 'spec': { + 'replicas': 1, + 'template': { + 'metadata': { + 'labels': { + 'model': name, + 'version': str(version) + } + }, + 'spec': { + 'containers': [ + { + 'name': name, + 'image': repo, + 'ports': [ + { + 'containerPort': 80 + } + ], + 'env': [ + { + 'name': 'CLIPPER_MODEL_NAME', + 'value': 'feature-sum-model' + }, + { + 'name': 'CLIPPER_MODEL_VERSION', + 'value': '1' + }, + { + 'name': 'CLIPPER_IP', + 'value': '10.0.2.2' # TODO: WTF magic IP that goes to host + } + ] + } + ] + } + } + } + }, namespace='default') + except ApiException: # already exists + pass diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index f5a5a49f6..b3cc7162d 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -1,6 +1,7 @@ """Clipper Management Utilities""" from __future__ import print_function, with_statement, absolute_import +import docker from fabric.api import * from fabric.contrib.files import append import os @@ -215,6 +216,7 @@ def _host_is_local(self): return self.host in LOCAL_HOST_NAMES def _start_docker_if_necessary(self): + # TODO: this should only run if we are running using local Docker daemon with hide("warnings", "output", "running"): print("Checking if Docker is running...") self._execute_root("docker ps") @@ -471,149 +473,36 @@ def deploy_model(self, vol = "{model_repo}/{name}/{version}".format( model_repo=MODEL_REPO, name=name, version=version) - # publish model to Clipper and verify success before copying model - # parameters to Clipper and starting containers - if not self._publish_new_model( - name, version, labels, input_type, container_name, - os.path.join(vol, os.path.basename(model_data_path))): - return False - print("Published model to Clipper") - - import docker # prepare docker image build dir with open(model_data_path + '/Dockerfile', 'w') as f: f.write("FROM {container_name}\nCOPY . /model/.\n".format(container_name=container_name)) # build, tag, and push docker image to registry + # NOTE: DOCKER_API_VERSION (set by `minikube docker-env`) must be same version as docker registry server docker_client = docker.from_env(version=os.environ["DOCKER_API_VERSION"]) - # NOTE: this must be same version as docker registry server - repo = '{docker_registry}/{name}:{version}'.format( docker_registry='localhost:5000', # TODO: make configurable name=name, version=version) - # docker_client.images.build( - # path=model_data_path, - # tag=repo) - # docker_client.images.push(repository=repo) - - import yaml - import kubernetes - from kubernetes.client.rest import ApiException - # TODO: check before creating k8s resources - kubernetes.config.load_kube_config() - k8s_v1 = kubernetes.client.CoreV1Api() - k8s_beta = kubernetes.client.ExtensionsV1beta1Api() - # TODO: initializing clipper - # for name in ['mgmt-frontend', 'query-frontend', 'redis']: - # try: - # k8s_beta.create_namespaced_deployment( - # body=yaml.load(open('k8s/clipper/{}-deployment.yaml'.format(name))), namespace='default') - # except ApiException: - # pass - # try: - # k8s_v1.create_namespaced_service( - # body=yaml.load(open('k8s/clipper/{}-service.yaml'.format(name))), namespace='default') - # except ApiException: - # pass - # TODO: initializing registry probably goes elsewhere - try: - k8s_v1.create_namespaced_replication_controller( - body=yaml.load(open('k8s/minikube-registry/kube-registry-rc.yaml')), namespace='kube-system') - except ApiException: # already exists - pass - try: - k8s_v1.create_namespaced_service( - body=yaml.load(open('k8s/minikube-registry/kube-registry-svc.yaml')), namespace='kube-system') - except ApiException: # already exists - pass - try: - k8s_beta.create_namespaced_daemon_set( - body=yaml.load(open('k8s/minikube-registry/kube-registry-ds.yaml')), namespace='kube-system') - except ApiException: # already exists - pass - try: - k8s_beta.create_namespaced_deployment( - body={ - 'apiVersion': 'extensions/v1beta1', - 'kind': 'Deployment', - 'metadata': { - 'name': name + '-deployment' # NOTE: must satisfy RFC 1123 pathname conventions - }, - 'spec': { - 'replicas': 1, - 'template': { - 'metadata': { - 'labels': { - 'model': name, - 'version': str(version) - } - }, - 'spec': { - 'containers': [ - { - 'name': name, - 'image': repo, - 'ports': [ - { - 'containerPort': 80 - } - ], - 'env': [ - { - 'name': 'CLIPPER_MODEL_NAME', - 'value': 'feature-sum-model' - }, - { - 'name': 'CLIPPER_MODEL_VERSION', - 'value': '1' - }, - { - 'name': 'CLIPPER_IP', - 'value': '10.0.2.2' # TODO: WTF magic IP that goes to host - } - ] - } - ] - } - } - } - }, namespace='default') - except ApiException: # already exists - pass - try: - k8s_v1.create_namespaced_service( - body={ - 'apiVersion': 'v1', - 'kind': 'Service', - 'metadata': { - 'name': 'nginx' - }, - 'spec': { - 'ports': [ - { - 'nodePort': 32100, # TODO: remove when ClusterIP - 'port': 80, - 'protocol': 'TCP', - 'targetPort': 80 - } - ], - 'selector': { - 'app': 'nginx' - }, - 'type': 'NodePort' # TODO: use ClusterIP when query frontend is cluster-internal - } - }, namespace='default') - except ApiException: # already exists - pass - # TODO: better error handling - - # to run built docker image on local docker daemon - # docker run -it --network=clipper_nw --restart=always -e "CLIPPER_MODEL_NAME=feature_sum_model" -e "CLIPPER_MODEL_VERSION=1" -e "CLIPPER_IP=query_frontend" -e "CLIPPER_INPUT_TYPE=doubles" -l "ai.clipper.container.label" -l "ai.clipper.model_container.model_version=feature_sum_model:1" feature_sum_model - - # TODO: generate a k8s service and pod manifest, deploy to k8s - - # # aggregate results of starting all containers + docker_client.images.build( + path=model_data_path, + tag=repo) + docker_client.images.push(repository=repo) + + # TODO: replace `model_data_path` in `_publish_new_model` with the docker repo + # publish model to Clipper and verify success before copying model + # parameters to Clipper and starting containers + if not self._publish_new_model( + name, version, labels, input_type, container_name, + os.path.join(vol, os.path.basename(model_data_path))): + return False + print("Published model to Clipper") + + # TODO: call this in `add_container` once `repo` is available from redis + clipper_k8s = ClipperK8s() + clipper_k8s.deploy_container(name, version, repo) + + # aggregate results of starting all containers # return all([ # self.add_container(name, version) # for r in range(num_containers) @@ -1100,6 +989,7 @@ def add_container(self, model_name, model_version): True if the container was added successfully and False if the container could not be added. """ + # TODO: this needs to abstract containers deployed on k8s vs those running on local docker with hide("warnings", "output", "running"): # Look up model info in Redis if self.redis_ip == DEFAULT_REDIS_IP: @@ -1231,6 +1121,7 @@ def set_model_version(self, model_name, model_version, num_containers=0): selected model version. """ + # TODO: update to use k8s API url = "http://%s:%d/admin/set_model_version" % ( self.host, CLIPPER_MANAGEMENT_PORT) req_json = json.dumps({ @@ -1255,11 +1146,8 @@ def remove_inactive_containers(self, model_name): # Get all Docker containers tagged as model containers num_containers_removed = 0 with hide("output", "warnings", "running"): - containers = self._execute_root( - "docker ps -aq --filter label={model_container_label}".format( - model_container_label=CLIPPER_MODEL_CONTAINER_LABEL)) - if len(containers) > 0: - container_ids = [l.strip() for l in containers.split("\n")] + container_ids = self._get_clipper_container_ids() + if len(container_ids) > 0: for container in container_ids: # returns a string formatted as ":" container_model_name_and_version = self._execute_root( @@ -1286,6 +1174,7 @@ def stop_all(self): """Stops and removes all Clipper Docker containers on the host. """ + # TODO: stop containers in k8s if running on k8s print("Stopping Clipper and all running models...") with hide("output", "warnings", "running"): container_ids = self._get_clipper_container_ids() @@ -1316,65 +1205,3 @@ def _publish_new_model(self, name, version, labels, input_type, else: print("Error publishing model: %s" % r.text) return False - - def _put_container_on_host(self, container_name): - """Puts the provided container on the host. - - Parameters - __________ - container_name : str - The name of the container. - - Notes - ----- - This method will first check the host, then Docker Hub, then the local - machine to find the container. - - This method is safe to call multiple times with the same container name. - Subsequent calls will detect that the container is already present on - the host and do nothing. - - """ - with hide("output", "warnings", "running"): - # first see if container is already present on host - host_result = self._execute_root( - "docker images -q {cn}".format(cn=container_name)) - if len(host_result.stdout) > 0: - print("Found %s on host" % container_name) - return True - # now try to pull from Docker Hub - hub_result = self._execute_root( - "docker pull {cn}".format(cn=container_name), warn_only=True) - if hub_result.return_code == 0: - print("Found %s in Docker hub" % container_name) - return True - - # assume container_name refers to a local container and - # copy it to host - local_result = local( - "docker images -q {cn}".format(cn=container_name)) - - if len(local_result.stdout) > 0: - saved_fname = container_name.replace("/", "_") - subprocess.call("docker save -o /tmp/{fn}.tar {cn}".format( - fn=saved_fname, cn=container_name)) - tar_loc = "/tmp/{fn}.tar".format(fn=saved_fname) - self._execute_put(tar_loc, tar_loc) - self._execute_root("docker load -i {loc}".format(loc=tar_loc)) - # self._execute_root("docker tag {image_id} {cn}".format( - # image_id=image_id, cn=cn)) - # now check to make sure we can access it - host_result = self._execute_root( - "docker images -q {cn}".format(cn=container_name)) - if len(host_result.stdout) > 0: - print("Successfuly copied %s to host" % container_name) - return True - else: - warn("Problem copying container %s to host" % - container_name) - return False - - # out of options - warn("Could not find %s, please try with a valid " - "container docker image") - return False diff --git a/clipper_admin/container.py b/clipper_admin/container.py new file mode 100644 index 000000000..9adb296d9 --- /dev/null +++ b/clipper_admin/container.py @@ -0,0 +1,55 @@ +from abc import ABCMeta, abstractmethod + +class Container(metaclass=ABCMeta): + + """An instance of a running Docker container, abstracted from its deployment environment. + + This could be running on the local Docker daemon, or on a Kubernetes cluster.""" + + @abstractmethod + def logs(self): + pass + + @abstractmethod + def inspect(self): + pass + + @abstractmethod + def stop(self): + pass + + @abstractmethod + def rm(self): + pass + +class DockerDaemonContainer(Container): + def __init__(self): + pass + + def logs(self): + pass + + def inspect(self): + pass + + def stop(self): + pass + + def rm(self): + pass + +class K8sContainer(Container): + def __init__(self): + pass + + def logs(self): + pass + + def inspect(self): + pass + + def stop(self): + pass + + def rm(self): + pass diff --git a/clipper_admin/container_manager.py b/clipper_admin/container_manager.py new file mode 100644 index 000000000..faeccc113 --- /dev/null +++ b/clipper_admin/container_manager.py @@ -0,0 +1,16 @@ +from abc import ABCMeta, abstractmethod + +class ContainerManager(metaclass=ABCMeta): + + """A manager for running containers. + + Used to launch containers as well as track currently running :class:`container.Container` instances. """ + + @abstractmethod + def start(self): + pass + + @abstractmethod + def get_containers(self): + """Returns all the containers managed by this :class:`container_manager.ContainerManager` instance.""" + pass diff --git a/setup.py b/setup.py index 85cf48e92..8e83449a1 100644 --- a/setup.py +++ b/setup.py @@ -12,6 +12,6 @@ keywords=['clipper', 'prediction', 'model', 'management'], install_requires=[ 'requests', 'pyparsing', 'appdirs', 'pprint', 'subprocess32', - 'sklearn', 'numpy', 'scipy', 'fabric', 'pyyaml', 'docker', + 'sklearn', 'numpy', 'scipy', 'fabric', 'pyyaml', 'abc', 'docker', 'kubernetes' ]) From c8afa4a90473499c069c3effce4dd1bcaab7c7ad Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Thu, 8 Jun 2017 09:31:28 -0700 Subject: [PATCH 09/24] Adds TODO --- clipper_admin/clipper_k8s.py | 1 + 1 file changed, 1 insertion(+) diff --git a/clipper_admin/clipper_k8s.py b/clipper_admin/clipper_k8s.py index 2eb4ff803..0093804bb 100644 --- a/clipper_admin/clipper_k8s.py +++ b/clipper_admin/clipper_k8s.py @@ -8,6 +8,7 @@ from kubernetes.client.rest import ApiException class ClipperK8s: + # TODO: subclass ContainerManager interface def __init__(self): config.load_kube_config() # self.initialize_clipper() # NOTE: this allows containers to discover query_manager by DNS rather than IP, From ad5733668f6083dd457e69bb7ab40892cb0a9064 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Thu, 8 Jun 2017 09:32:32 -0700 Subject: [PATCH 10/24] Uncomment rmtree --- clipper_admin/clipper_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index b3cc7162d..1bb696324 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -739,7 +739,7 @@ def centered_predict(inputs): default_python_container, input_type, labels, num_containers) # Remove temp files - # shutil.rmtree(serialization_dir) + shutil.rmtree(serialization_dir) return deploy_result From 829e5fced8f068073b25467ad2d84fb98c4b1ada Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Thu, 8 Jun 2017 09:33:17 -0700 Subject: [PATCH 11/24] Cleans up comments --- clipper_admin/clipper_manager.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index 1bb696324..f6d16228d 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -989,7 +989,7 @@ def add_container(self, model_name, model_version): True if the container was added successfully and False if the container could not be added. """ - # TODO: this needs to abstract containers deployed on k8s vs those running on local docker + # TODO: this must abstract containers deployed on k8s vs those running on local docker, see ContainerManager with hide("warnings", "output", "running"): # Look up model info in Redis if self.redis_ip == DEFAULT_REDIS_IP: @@ -1038,7 +1038,6 @@ def add_container(self, model_name, model_version): mv_label="%s=%s:%d" % (CLIPPER_MODEL_CONTAINER_LABEL, model_name, model_version), restart_policy=restart_policy)) - print(add_container_cmd) result = self._execute_root(add_container_cmd) return result.return_code == 0 else: From 4355910905bfb005c1eb493478d11f0854e1200d Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Thu, 8 Jun 2017 09:34:00 -0700 Subject: [PATCH 12/24] Ignore docker-compose, it's generated from code --- .gitignore | 2 ++ docker-compose.yml | 40 ---------------------------------------- 2 files changed, 2 insertions(+), 40 deletions(-) delete mode 100644 docker-compose.yml diff --git a/.gitignore b/.gitignore index 79ec906e3..744667c90 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,5 @@ _build/ # scala dependency-reduced-pom.xml *.iml + +docker-compose.yml diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index 06f65897c..000000000 --- a/docker-compose.yml +++ /dev/null @@ -1,40 +0,0 @@ -networks: - default: - external: - name: clipper_nw -services: - mgmt_frontend: - command: - - --redis_ip=redis - - --redis_port=6379 - depends_on: - - redis - image: clipper/management_frontend:latest - labels: - ai.clipper.container.label: '' - ports: - - 1338:1338 - restart: always - query_frontend: - command: - - --redis_ip=redis - - --redis_port=6379 - depends_on: - - mgmt_frontend - - redis - image: clipper/query_frontend:latest - labels: - ai.clipper.container.label: '' - ports: - - 7000:7000 - - 1337:1337 - restart: always - redis: - command: redis-server --port 6379 - image: redis:alpine - labels: - ai.clipper.container.label: '' - ports: - - 6379:6379 - restart: always -version: '2' From 4dba8eafefe2e009bd2bab6d494c86ba32328004 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Thu, 29 Jun 2017 16:35:19 -0700 Subject: [PATCH 13/24] Fix K8s deployment --- clipper_admin/clipper_k8s.py | 6 +++--- clipper_admin/clipper_manager.py | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/clipper_admin/clipper_k8s.py b/clipper_admin/clipper_k8s.py index 0093804bb..3188898e1 100644 --- a/clipper_admin/clipper_k8s.py +++ b/clipper_admin/clipper_k8s.py @@ -11,11 +11,11 @@ class ClipperK8s: # TODO: subclass ContainerManager interface def __init__(self): config.load_kube_config() + self.k8s_v1 = client.CoreV1Api() + self.k8s_beta = client.ExtensionsV1beta1Api() # self.initialize_clipper() # NOTE: this allows containers to discover query_manager by DNS rather than IP, # # but may couple too tightly to k8s self.initialize_registry() # TODO: check doesn't exist before trying - self.k8s_v1 = client.CoreV1Api() - self.k8s_beta = client.ExtensionsV1beta1Api() def initialize_clipper(self): for name in ['mgmt-frontend', 'query-frontend', 'redis']: @@ -60,7 +60,7 @@ def deploy_model(self, name, version, repo): A docker repository path, which must be accessible by the k8s cluster. """ try: - k8s_beta.create_namespaced_deployment( + self.k8s_beta.create_namespaced_deployment( body={ 'apiVersion': 'extensions/v1beta1', 'kind': 'Deployment', diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index f6d16228d..fb4ddf323 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -14,6 +14,7 @@ from sklearn.externals import joblib from cStringIO import StringIO from .cloudpickle import CloudPickler +from .clipper_k8s import ClipperK8s import time import re @@ -479,11 +480,11 @@ def deploy_model(self, # build, tag, and push docker image to registry # NOTE: DOCKER_API_VERSION (set by `minikube docker-env`) must be same version as docker registry server - docker_client = docker.from_env(version=os.environ["DOCKER_API_VERSION"]) repo = '{docker_registry}/{name}:{version}'.format( docker_registry='localhost:5000', # TODO: make configurable name=name, version=version) + docker_client = docker.from_env(version=os.environ["DOCKER_API_VERSION"]) docker_client.images.build( path=model_data_path, tag=repo) @@ -500,7 +501,7 @@ def deploy_model(self, # TODO: call this in `add_container` once `repo` is available from redis clipper_k8s = ClipperK8s() - clipper_k8s.deploy_container(name, version, repo) + clipper_k8s.deploy_model(name, version, repo) # aggregate results of starting all containers # return all([ From d62bf0c1bcfdc67e809bac9b3a98e3b273897652 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Thu, 29 Jun 2017 17:26:35 -0700 Subject: [PATCH 14/24] Can deploy to clipper on k8s, but not serve --- clipper_admin/clipper_k8s.py | 5 +- clipper_admin/clipper_manager.py | 126 ++---------------------- k8s/clipper/mgmt-frontend-service.yaml | 1 + k8s/clipper/query-frontend-service.yaml | 1 + k8s/clipper/redis-service.yaml | 1 + 5 files changed, 14 insertions(+), 120 deletions(-) diff --git a/clipper_admin/clipper_k8s.py b/clipper_admin/clipper_k8s.py index 3188898e1..cf784881d 100644 --- a/clipper_admin/clipper_k8s.py +++ b/clipper_admin/clipper_k8s.py @@ -13,11 +13,12 @@ def __init__(self): config.load_kube_config() self.k8s_v1 = client.CoreV1Api() self.k8s_beta = client.ExtensionsV1beta1Api() - # self.initialize_clipper() # NOTE: this allows containers to discover query_manager by DNS rather than IP, - # # but may couple too tightly to k8s + self.initialize_clipper() # NOTE: this allows containers to discover query_manager by DNS rather than IP, + # but may couple too tightly to k8s self.initialize_registry() # TODO: check doesn't exist before trying def initialize_clipper(self): + """Deploys Clipper to the k8s cluster and exposes the frontends as services.""" for name in ['mgmt-frontend', 'query-frontend', 'redis']: try: self.k8s_beta.create_namespaced_deployment( diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index fb4ddf323..c6bf6a531 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -113,108 +113,14 @@ def __init__(self, redis_port=DEFAULT_REDIS_PORT, redis_persistence_path=None, restart_containers=True): - self.redis_ip = redis_ip - self.redis_port = redis_port - self.docker_compost_dict = { - 'networks': { - 'default': { - 'external': { - 'name': DOCKER_NW - } - } - }, - 'services': { - 'mgmt_frontend': { - 'command': [ - '--redis_ip=%s' % self.redis_ip, - '--redis_port=%d' % self.redis_port - ], - 'image': - 'clipper/management_frontend:latest', - 'ports': [ - '%d:%d' % (CLIPPER_MANAGEMENT_PORT, - CLIPPER_MANAGEMENT_PORT) - ], - 'labels': { - CLIPPER_DOCKER_LABEL: "" - } - }, - 'query_frontend': { - 'command': [ - '--redis_ip=%s' % self.redis_ip, - '--redis_port=%d' % self.redis_port - ], - 'depends_on': ['mgmt_frontend'], - 'image': - 'clipper/query_frontend:latest', - 'ports': [ - '%d:%d' % (CLIPPER_RPC_PORT, CLIPPER_RPC_PORT), - '%d:%d' % (CLIPPER_QUERY_PORT, CLIPPER_QUERY_PORT) - ], - 'labels': { - CLIPPER_DOCKER_LABEL: "" - } - } - }, - 'version': '2' - } - start_redis = (self.redis_ip == DEFAULT_REDIS_IP) - if start_redis: - self.docker_compost_dict['services']['redis'] = { - 'image': 'redis:alpine', - 'ports': ['%d:%d' % (self.redis_port, self.redis_port)], - 'command': "redis-server --port %d" % self.redis_port, - 'labels': { - CLIPPER_DOCKER_LABEL: "" - } - } - self.docker_compost_dict['services']['mgmt_frontend'][ - 'depends_on'] = ['redis'] - self.docker_compost_dict['services']['query_frontend'][ - 'depends_on'].append('redis') - if redis_persistence_path: - if not os.path.exists(redis_persistence_path): - self.docker_compost_dict['services']['redis'][ - 'volumes'] = ['%s:/data' % redis_persistence_path] - else: - print( - "The directory specified by the redis persistence path already exists" - ) - raise ClipperManagerException( - "The directory specified by the redis persistence path already exists" - ) - self.restart_containers = restart_containers - if self.restart_containers: - self.docker_compost_dict['services']['mgmt_frontend'][ - 'restart'] = 'always' - self.docker_compost_dict['services']['query_frontend'][ - 'restart'] = 'always' - if start_redis: - self.docker_compost_dict['services']['redis'][ - 'restart'] = 'always' + self.clipper_k8s = ClipperK8s() self.sudo = sudo self.host = host - if self._host_is_local(): - self.host = "localhost" - env.host_string = self.host - else: - if not user or not key_path: - print( - "user and key_path must be specified when instantiating Clipper with a nonlocal host" - ) - raise ClipperManagerException( - "user and key_path must be specified when instantiating Clipper with a nonlocal host" - ) - env.user = user - env.key_filename = key_path - env.host_string = "%s:%d" % (host, ssh_port) - if check_for_docker: - # Make sure docker is running on cluster - self._start_docker_if_necessary() + self.host_string = self.host def _host_is_local(self): - return self.host in LOCAL_HOST_NAMES + return True def _start_docker_if_necessary(self): # TODO: this should only run if we are running using local Docker daemon @@ -304,22 +210,6 @@ def _execute_put(self, local_path, remote_path, *args, **kwargs): *args, **kwargs) - def start(self): - """Start a Clipper instance. - - """ - with hide("output", "warnings", "running"): - self._execute_standard("rm -f docker-compose.yml") - self._execute_append("docker-compose.yml", - yaml.dump( - self.docker_compost_dict, - default_flow_style=False)) - print( - "Note: Docker must download the Clipper Docker images if they are not already cached. This may take awhile." - ) - self._execute_root("docker-compose up -d query_frontend") - print("Clipper is running") - def register_application(self, name, model, input_type, default_output, slo_micros): """Register a new Clipper application. @@ -494,14 +384,12 @@ def deploy_model(self, # publish model to Clipper and verify success before copying model # parameters to Clipper and starting containers if not self._publish_new_model( - name, version, labels, input_type, container_name, - os.path.join(vol, os.path.basename(model_data_path))): + name, version, labels, input_type, container_name, repo): return False print("Published model to Clipper") # TODO: call this in `add_container` once `repo` is available from redis - clipper_k8s = ClipperK8s() - clipper_k8s.deploy_model(name, version, repo) + self.clipper_k8s.deploy_model(name, version, repo) # aggregate results of starting all containers # return all([ @@ -527,6 +415,7 @@ def register_external_model(self, labels : list of str, optional A list of strings annotating the model. """ + # TODO: this could be implemented by taking a docker repo and deploying a container with it return self._publish_new_model(name, version, labels, input_type, EXTERNALLY_MANAGED_MODEL, EXTERNALLY_MANAGED_MODEL) @@ -1192,13 +1081,14 @@ def _publish_new_model(self, name, version, labels, input_type, CLIPPER_MANAGEMENT_PORT) req_json = json.dumps({ "model_name": name, - "model_version": version, + "model_version": str(version), "labels": labels, "input_type": input_type, "container_name": container_name, "model_data_path": model_data_path }) headers = {'Content-type': 'application/json'} + print(req_json) r = requests.post(url, headers=headers, data=req_json) if r.status_code == requests.codes.ok: return True diff --git a/k8s/clipper/mgmt-frontend-service.yaml b/k8s/clipper/mgmt-frontend-service.yaml index 8c7149405..85bf7f2b7 100644 --- a/k8s/clipper/mgmt-frontend-service.yaml +++ b/k8s/clipper/mgmt-frontend-service.yaml @@ -8,6 +8,7 @@ metadata: io.kompose.service: mgmt-frontend name: mgmt-frontend spec: + type: NodePort ports: - name: "1338" port: 1338 diff --git a/k8s/clipper/query-frontend-service.yaml b/k8s/clipper/query-frontend-service.yaml index 8f09ddaa7..3fbe8eb6a 100644 --- a/k8s/clipper/query-frontend-service.yaml +++ b/k8s/clipper/query-frontend-service.yaml @@ -8,6 +8,7 @@ metadata: io.kompose.service: query-frontend name: query-frontend spec: + type: NodePort ports: - name: "7000" port: 7000 diff --git a/k8s/clipper/redis-service.yaml b/k8s/clipper/redis-service.yaml index 6ca665f30..bf12033ad 100644 --- a/k8s/clipper/redis-service.yaml +++ b/k8s/clipper/redis-service.yaml @@ -8,6 +8,7 @@ metadata: io.kompose.service: redis name: redis spec: + type: NodePort ports: - name: "6379" port: 6379 From 9d15478b8c2ebee53e2a92c4217fef5fce7520b1 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Thu, 29 Jun 2017 19:58:39 -0700 Subject: [PATCH 15/24] Working model serving from k8s --- clipper_admin/clipper_k8s.py | 7 +++--- clipper_admin/clipper_manager.py | 26 +++++----------------- k8s/clipper/mgmt-frontend-deployment.yaml | 4 ++-- k8s/clipper/query-frontend-deployment.yaml | 4 ++-- 4 files changed, 13 insertions(+), 28 deletions(-) diff --git a/clipper_admin/clipper_k8s.py b/clipper_admin/clipper_k8s.py index cf784881d..769106e6f 100644 --- a/clipper_admin/clipper_k8s.py +++ b/clipper_admin/clipper_k8s.py @@ -90,15 +90,16 @@ def deploy_model(self, name, version, repo): 'env': [ { 'name': 'CLIPPER_MODEL_NAME', - 'value': 'feature-sum-model' + 'value': name }, { 'name': 'CLIPPER_MODEL_VERSION', - 'value': '1' + 'value': str(version) }, { 'name': 'CLIPPER_IP', - 'value': '10.0.2.2' # TODO: WTF magic IP that goes to host + 'value': '192.168.99.100' + # TODO: this is minikube IP, ideally the python-container could use K8s env vars } ] } diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index c6bf6a531..807f590dc 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -87,15 +87,6 @@ class Clipper: The SSH port to use. Default is port 22. check_for_docker : bool, optional If True, checks that Docker is running on the host machine. Default is True. - redis_port : int, optional - The port to use for connecting to redis. Default is port 6379. - redis_ip : string, optional - The ip address of the redis instance that Clipper should use. - If unspecified, a docker container running redis will be started - on `host` at the port specified by `redis_port`. - redis_persistence_path : string, optional - The directory path to which redis data should be persisted. The directory - should not already exist. If unspecified, redis will not persist data to disk. restart_containers : bool, optional If true, containers will restart on failure. If false, containers will not restart automatically. @@ -109,10 +100,8 @@ def __init__(self, sudo=False, ssh_port=22, check_for_docker=True, - redis_ip=DEFAULT_REDIS_IP, - redis_port=DEFAULT_REDIS_PORT, - redis_persistence_path=None, restart_containers=True): + # TODO: support deploying redis host off-cluster by taking redis_ip as constructor param to ClipperK8s self.clipper_k8s = ClipperK8s() self.sudo = sudo @@ -383,13 +372,12 @@ def deploy_model(self, # TODO: replace `model_data_path` in `_publish_new_model` with the docker repo # publish model to Clipper and verify success before copying model # parameters to Clipper and starting containers - if not self._publish_new_model( - name, version, labels, input_type, container_name, repo): - return False + self._publish_new_model(name, version, labels, input_type, container_name, repo) print("Published model to Clipper") # TODO: call this in `add_container` once `repo` is available from redis self.clipper_k8s.deploy_model(name, version, repo) + # self.add_container(name, version) # aggregate results of starting all containers # return all([ @@ -882,16 +870,12 @@ def add_container(self, model_name, model_version): # TODO: this must abstract containers deployed on k8s vs those running on local docker, see ContainerManager with hide("warnings", "output", "running"): # Look up model info in Redis - if self.redis_ip == DEFAULT_REDIS_IP: - redis_host = self.host - else: - redis_host = self.redis_ip model_key = "{mn}:{mv}".format(mn=model_name, mv=model_version) result = local( "redis-cli -h {host} -p {redis_port} -n {db} hgetall {key}". format( - host=redis_host, - redis_port=self.redis_port, + host=self.host, + redis_port=DEFAULT_REDIS_PORT, key=model_key, db=REDIS_MODEL_DB_NUM), capture=True) diff --git a/k8s/clipper/mgmt-frontend-deployment.yaml b/k8s/clipper/mgmt-frontend-deployment.yaml index 03e5a76df..5344f90ff 100644 --- a/k8s/clipper/mgmt-frontend-deployment.yaml +++ b/k8s/clipper/mgmt-frontend-deployment.yaml @@ -18,8 +18,8 @@ spec: spec: containers: - args: - - --redis_ip=redis - - --redis_port=6379 + - "--redis_ip=$(REDIS_SERVICE_HOST)" + - "--redis_port=$(REDIS_SERVICE_PORT)" image: clipper/management_frontend:latest name: mgmt-frontend ports: diff --git a/k8s/clipper/query-frontend-deployment.yaml b/k8s/clipper/query-frontend-deployment.yaml index b0b84a1a0..af188584a 100644 --- a/k8s/clipper/query-frontend-deployment.yaml +++ b/k8s/clipper/query-frontend-deployment.yaml @@ -18,8 +18,8 @@ spec: spec: containers: - args: - - --redis_ip=redis - - --redis_port=6379 + - "--redis_ip=$(REDIS_SERVICE_HOST)" + - "--redis_port=$(REDIS_SERVICE_PORT)" image: clipper/query_frontend:latest name: query-frontend ports: From b276279e7e50e6a113fb02765e5a6ba071235e6b Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Fri, 30 Jun 2017 09:41:00 -0700 Subject: [PATCH 16/24] Cleanup, use k8s dns --- clipper_admin/clipper_k8s.py | 2 +- clipper_admin/container.py | 55 ------------------------------ clipper_admin/container_manager.py | 16 --------- 3 files changed, 1 insertion(+), 72 deletions(-) delete mode 100644 clipper_admin/container.py delete mode 100644 clipper_admin/container_manager.py diff --git a/clipper_admin/clipper_k8s.py b/clipper_admin/clipper_k8s.py index 769106e6f..3a51b7621 100644 --- a/clipper_admin/clipper_k8s.py +++ b/clipper_admin/clipper_k8s.py @@ -98,7 +98,7 @@ def deploy_model(self, name, version, repo): }, { 'name': 'CLIPPER_IP', - 'value': '192.168.99.100' + 'value': 'query-frontend' # TODO: this is minikube IP, ideally the python-container could use K8s env vars } ] diff --git a/clipper_admin/container.py b/clipper_admin/container.py deleted file mode 100644 index 9adb296d9..000000000 --- a/clipper_admin/container.py +++ /dev/null @@ -1,55 +0,0 @@ -from abc import ABCMeta, abstractmethod - -class Container(metaclass=ABCMeta): - - """An instance of a running Docker container, abstracted from its deployment environment. - - This could be running on the local Docker daemon, or on a Kubernetes cluster.""" - - @abstractmethod - def logs(self): - pass - - @abstractmethod - def inspect(self): - pass - - @abstractmethod - def stop(self): - pass - - @abstractmethod - def rm(self): - pass - -class DockerDaemonContainer(Container): - def __init__(self): - pass - - def logs(self): - pass - - def inspect(self): - pass - - def stop(self): - pass - - def rm(self): - pass - -class K8sContainer(Container): - def __init__(self): - pass - - def logs(self): - pass - - def inspect(self): - pass - - def stop(self): - pass - - def rm(self): - pass diff --git a/clipper_admin/container_manager.py b/clipper_admin/container_manager.py deleted file mode 100644 index faeccc113..000000000 --- a/clipper_admin/container_manager.py +++ /dev/null @@ -1,16 +0,0 @@ -from abc import ABCMeta, abstractmethod - -class ContainerManager(metaclass=ABCMeta): - - """A manager for running containers. - - Used to launch containers as well as track currently running :class:`container.Container` instances. """ - - @abstractmethod - def start(self): - pass - - @abstractmethod - def get_containers(self): - """Returns all the containers managed by this :class:`container_manager.ContainerManager` instance.""" - pass From a9e2321d1874392180b542d34b01a93b1d323a78 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Fri, 30 Jun 2017 10:37:13 -0700 Subject: [PATCH 17/24] Add logging, improve clipper_k8s --- clipper_admin/clipper_k8s.py | 72 +++++++++++++++++--------------- clipper_admin/clipper_manager.py | 3 +- 2 files changed, 40 insertions(+), 35 deletions(-) diff --git a/clipper_admin/clipper_k8s.py b/clipper_admin/clipper_k8s.py index 3a51b7621..42254496c 100644 --- a/clipper_admin/clipper_k8s.py +++ b/clipper_admin/clipper_k8s.py @@ -1,52 +1,58 @@ """Clipper Kubernetes Utilities""" -# TODO: better error handling, check if resources exist before creating # TODO: include labels (used by clipper.stop_all) # TODO: deletion methods -import yaml +from contextlib import contextmanager from kubernetes import client, config from kubernetes.client.rest import ApiException +import logging +import json +import yaml + +@contextmanager +def _pass_conflicts(): + try: + yield + except ApiException as e: + body = json.loads(e.body) + if body['reason'] == 'AlreadyExists': + logging.info("{} already exists, skipping!".format(body['details'])) + pass class ClipperK8s: # TODO: subclass ContainerManager interface def __init__(self): config.load_kube_config() - self.k8s_v1 = client.CoreV1Api() - self.k8s_beta = client.ExtensionsV1beta1Api() - self.initialize_clipper() # NOTE: this allows containers to discover query_manager by DNS rather than IP, - # but may couple too tightly to k8s - self.initialize_registry() # TODO: check doesn't exist before trying + self._k8s_v1 = client.CoreV1Api() + self._k8s_beta = client.ExtensionsV1beta1Api() + self._initialize_clipper() + + # NOTE: this provides a minikube accessible docker registry for convenience during dev + # TODO: should not be required, as `deploy_model` should be able to pull from any accessible `repo` + self._initialize_registry() - def initialize_clipper(self): + def _initialize_clipper(self): """Deploys Clipper to the k8s cluster and exposes the frontends as services.""" + logging.info("Initializing Clipper services to k8s cluster") for name in ['mgmt-frontend', 'query-frontend', 'redis']: - try: - self.k8s_beta.create_namespaced_deployment( + with _pass_conflicts() as cm: + resp = self._k8s_beta.create_namespaced_deployment( body=yaml.load(open('k8s/clipper/{}-deployment.yaml'.format(name))), namespace='default') - except ApiException: - pass - try: - self.k8s_v1.create_namespaced_service( + with _pass_conflicts() as cm: + resp = self._k8s_v1.create_namespaced_service( body=yaml.load(open('k8s/clipper/{}-service.yaml'.format(name))), namespace='default') - except ApiException: - pass - def initialize_registry(self): - try: - self.k8s_v1.create_namespaced_replication_controller( + def _initialize_registry(self): + logging.info("Initializing Docker registry on k8s cluster") + with _pass_conflicts(): + self._k8s_v1.create_namespaced_replication_controller( body=yaml.load(open('k8s/minikube-registry/kube-registry-rc.yaml')), namespace='kube-system') - except ApiException: # already exists - pass - try: - self.k8s_v1.create_namespaced_service( + with _pass_conflicts(): + self._k8s_v1.create_namespaced_service( body=yaml.load(open('k8s/minikube-registry/kube-registry-svc.yaml')), namespace='kube-system') - except ApiException: # already exists - pass - try: - self.k8s_beta.create_namespaced_daemon_set( + with _pass_conflicts(): + self._k8s_beta.create_namespaced_daemon_set( body=yaml.load(open('k8s/minikube-registry/kube-registry-ds.yaml')), namespace='kube-system') - except ApiException: # already exists - pass def deploy_model(self, name, version, repo): """Deploys a versioned model to a k8s cluster. @@ -60,8 +66,9 @@ def deploy_model(self, name, version, repo): repo : str A docker repository path, which must be accessible by the k8s cluster. """ - try: - self.k8s_beta.create_namespaced_deployment( + with _pass_conflicts(): + # TODO: handle errors where `repo` is not accessible + self._k8s_beta.create_namespaced_deployment( body={ 'apiVersion': 'extensions/v1beta1', 'kind': 'Deployment', @@ -99,7 +106,6 @@ def deploy_model(self, name, version, repo): { 'name': 'CLIPPER_IP', 'value': 'query-frontend' - # TODO: this is minikube IP, ideally the python-container could use K8s env vars } ] } @@ -108,5 +114,3 @@ def deploy_model(self, name, version, repo): } } }, namespace='default') - except ApiException: # already exists - pass diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index 807f590dc..14371d18e 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -15,6 +15,7 @@ from cStringIO import StringIO from .cloudpickle import CloudPickler from .clipper_k8s import ClipperK8s +import logging import time import re @@ -102,8 +103,8 @@ def __init__(self, check_for_docker=True, restart_containers=True): # TODO: support deploying redis host off-cluster by taking redis_ip as constructor param to ClipperK8s + logging.basicConfig(level=logging.INFO) self.clipper_k8s = ClipperK8s() - self.sudo = sudo self.host = host self.host_string = self.host From ee8560585754b7f40b384581b350ee6b2a387fbc Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Fri, 30 Jun 2017 11:07:48 -0700 Subject: [PATCH 18/24] Adds stop_all --- clipper_admin/clipper_k8s.py | 21 ++++++-- clipper_admin/clipper_manager.py | 87 +++++++++++--------------------- 2 files changed, 46 insertions(+), 62 deletions(-) diff --git a/clipper_admin/clipper_k8s.py b/clipper_admin/clipper_k8s.py index 42254496c..ea85b8c72 100644 --- a/clipper_admin/clipper_k8s.py +++ b/clipper_admin/clipper_k8s.py @@ -9,6 +9,8 @@ import json import yaml +import clipper_manager + @contextmanager def _pass_conflicts(): try: @@ -35,11 +37,11 @@ def _initialize_clipper(self): """Deploys Clipper to the k8s cluster and exposes the frontends as services.""" logging.info("Initializing Clipper services to k8s cluster") for name in ['mgmt-frontend', 'query-frontend', 'redis']: - with _pass_conflicts() as cm: - resp = self._k8s_beta.create_namespaced_deployment( + with _pass_conflicts(): + self._k8s_beta.create_namespaced_deployment( body=yaml.load(open('k8s/clipper/{}-deployment.yaml'.format(name))), namespace='default') - with _pass_conflicts() as cm: - resp = self._k8s_v1.create_namespaced_service( + with _pass_conflicts(): + self._k8s_v1.create_namespaced_service( body=yaml.load(open('k8s/clipper/{}-service.yaml'.format(name))), namespace='default') def _initialize_registry(self): @@ -80,6 +82,7 @@ def deploy_model(self, name, version, repo): 'template': { 'metadata': { 'labels': { + clipper_manager.CLIPPER_DOCKER_LABEL: '', 'model': name, 'version': str(version) } @@ -114,3 +117,13 @@ def deploy_model(self, name, version, repo): } } }, namespace='default') + + def stop_all_model_deployments(self): + logging.info("Stopping all running Clipper model deployments...") + try: + resp = self._k8s_beta.delete_collection_namespaced_deployment( + namespace='default', + label_selector='ai.clipper.container.label') + logging.info(resp) + except ApiException as e: + logging.warn("Exception deleting k8s deployments: {}".format(e)) diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index 14371d18e..fb6389fed 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -34,7 +34,6 @@ REDIS_RESOURCE_DB_NUM = 4 REDIS_APPLICATION_DB_NUM = 5 -DEFAULT_REDIS_IP = "redis" DEFAULT_REDIS_PORT = 6379 CLIPPER_QUERY_PORT = 1337 CLIPPER_MANAGEMENT_PORT = 1338 @@ -112,23 +111,6 @@ def __init__(self, def _host_is_local(self): return True - def _start_docker_if_necessary(self): - # TODO: this should only run if we are running using local Docker daemon - with hide("warnings", "output", "running"): - print("Checking if Docker is running...") - self._execute_root("docker ps") - dc_installed = self._execute_root( - "docker-compose --version", warn_only=True) - if dc_installed.return_code != 0: - print("docker-compose not installed on host.") - raise ClipperManagerException( - "docker-compose not installed on host.") - nw_create_command = ("docker network create --driver bridge {nw}" - .format(nw=DOCKER_NW)) - self._execute_root(nw_create_command, warn_only=True) - self._execute_standard( - "mkdir -p {model_repo}".format(model_repo=MODEL_REPO)) - def _execute_root(self, *args, **kwargs): if not self.sudo: return self._execute_standard(*args, **kwargs) @@ -148,9 +130,9 @@ def _execute_local(self, as_root, *args, **kwargs): root_args = list(args) root_args[0] = "sudo %s" % root_args[0] args = tuple(root_args) - # local is not currently capable of simultaneously printing and + # local is not currently capable of simultaneously logging.infoing and # capturing output, as run/sudo do. The capture kwarg allows you to - # switch between printing and capturing as necessary, and defaults to + # switch between logging.infoing and capturing as necessary, and defaults to # False. In this case, we need to capture the output and return it. if "capture" not in kwargs: kwargs["capture"] = True @@ -238,7 +220,7 @@ def register_application(self, name, model, input_type, default_output, }) headers = {'Content-type': 'application/json'} r = requests.post(url, headers=headers, data=req_json) - print(r.text) + logging.info(r.text) def get_all_apps(self, verbose=False): """Gets information about all applications registered with Clipper. @@ -265,7 +247,7 @@ def get_all_apps(self, verbose=False): if r.status_code == requests.codes.ok: return r.json() else: - print(r.text) + logging.warn(r.text) return None def get_app_info(self, name): @@ -295,7 +277,7 @@ def get_app_info(self, name): return None return app_info else: - print(r.text) + logging.warn(r.text) return None def deploy_model(self, @@ -346,7 +328,7 @@ def deploy_model(self, elif isinstance(model_data, str): # assume that model_data is a path to the serialized model model_data_path = model_data - print("model_data_path is: %s" % model_data_path) + logging.info("model_data_path is: %s" % model_data_path) else: warn("%s is invalid model format" % str(type(model_data))) return False @@ -374,7 +356,7 @@ def deploy_model(self, # publish model to Clipper and verify success before copying model # parameters to Clipper and starting containers self._publish_new_model(name, version, labels, input_type, container_name, repo) - print("Published model to Clipper") + logging.info("Published model to Clipper") # TODO: call this in `add_container` once `repo` is available from redis self.clipper_k8s.deploy_model(name, version, repo) @@ -435,7 +417,7 @@ def _save_python_function(self, name, predict_function): conda_env_exported = self._export_conda_env(environment_file_abs_path) if conda_env_exported: - print("Anaconda environment found. Verifying packages.") + logging.info("Anaconda environment found. Verifying packages.") # Confirm that packages installed through conda are solvable # Write out conda and pip dependency files to be supplied to container @@ -444,9 +426,9 @@ def _save_python_function(self, name, predict_function): conda_dep_fname, pip_dep_fname)): return False - print("Supplied environment details") + logging.info("Supplied environment details") else: - print( + logging.info( "Warning: Anaconda environment was either not found or exporting the environment " "failed. Your function will still be serialized and deployed, but may fail due to " "missing dependencies. In this case, please re-run inside an Anaconda environment. " @@ -457,7 +439,7 @@ def _save_python_function(self, name, predict_function): func_file_path = os.path.join(serialization_dir, predict_fname) with open(func_file_path, "w") as serialized_function_file: serialized_function_file.write(serialized_prediction_function) - print("Serialized and supplied predict function") + logging.info("Serialized and supplied predict function") return serialization_dir def deploy_pyspark_model(self, @@ -527,7 +509,7 @@ def deploy_pyspark_model(self, else: pyspark_model.save(sc, spark_model_save_loc) except Exception as e: - print("Error saving spark model: %s" % e) + logging.warn("Error saving spark model: %s" % e) raise e pyspark_container = "clipper/pyspark-container" @@ -538,7 +520,7 @@ def deploy_pyspark_model(self, "w") as metadata_file: json.dump({"model_class": model_class}, metadata_file) - print("Spark model saved") + logging.info("Spark model saved") # Deploy model deploy_result = self.deploy_model(name, version, serialization_dir, @@ -645,7 +627,7 @@ def get_all_models(self, verbose=False): if r.status_code == requests.codes.ok: return r.json() else: - print(r.text) + logging.info(r.text) return None def get_model_info(self, model_name, model_version): @@ -679,7 +661,7 @@ def get_model_info(self, model_name, model_version): return None return app_info else: - print(r.text) + logging.info(r.text) return None def get_all_containers(self, verbose=False): @@ -705,7 +687,7 @@ def get_all_containers(self, verbose=False): if r.status_code == requests.codes.ok: return r.json() else: - print(r.text) + logging.info(r.text) return None def get_container_info(self, model_name, model_version, replica_id): @@ -741,7 +723,7 @@ def get_container_info(self, model_name, model_version, replica_id): return None return app_info else: - print(r.text) + logging.info(r.text) return None def _inspect_selection_policy(self, app_name, uid): @@ -824,7 +806,7 @@ def _check_and_write_dependencies(self, environment_path, directory, on the container os. Otherwise returns False. """ if "CONDA_PREFIX" not in os.environ: - print("No Anaconda environment found") + logging.info("No Anaconda environment found") return False root_prefix = os.environ["CONDA_PREFIX"].split("envs")[0] @@ -843,8 +825,8 @@ def _check_and_write_dependencies(self, environment_path, directory, stderr=subprocess.PIPE, shell=True) out, err = process.communicate() - print(out) - print(err) + logging.info(out) + logging.info(err) return process.returncode == 0 def add_container(self, model_name, model_version): @@ -880,7 +862,7 @@ def add_container(self, model_name, model_version): key=model_key, db=REDIS_MODEL_DB_NUM), capture=True) - print(result) + logging.info(result) if "empty list or set" in result.stdout: # Model not found @@ -916,7 +898,7 @@ def add_container(self, model_name, model_version): result = self._execute_root(add_container_cmd) return result.return_code == 0 else: - print("Cannot start containers for externally managed model %s" + logging.info("Cannot start containers for externally managed model %s" % model_name) return False @@ -954,7 +936,7 @@ def _get_clipper_container_ids(self): "docker ps -aq --filter label={clipper_label}".format( clipper_label=CLIPPER_DOCKER_LABEL)) ids = [l.strip() for l in containers.split("\n")] - print("Clipper container IDS found: %s" % str(ids)) + logging.info("Clipper container IDS found: %s" % str(ids)) return ids def inspect_instance(self): @@ -1004,7 +986,7 @@ def set_model_version(self, model_name, model_version, num_containers=0): }) headers = {'Content-type': 'application/json'} r = requests.post(url, headers=headers, data=req_json) - print(r.text) + logging.info(r.text) for r in range(num_containers): self.add_container(model_name, model_version) @@ -1040,24 +1022,13 @@ def remove_inactive_containers(self, model_name): self._execute_root("docker rm {container}".format( container=container)) num_containers_removed += 1 - print("Removed %d inactive containers for model %s" % + logging.info("Removed %d inactive containers for model %s" % (num_containers_removed, model_name)) return num_containers_removed def stop_all(self): - """Stops and removes all Clipper Docker containers on the host. - - """ - # TODO: stop containers in k8s if running on k8s - print("Stopping Clipper and all running models...") - with hide("output", "warnings", "running"): - container_ids = self._get_clipper_container_ids() - container_id_str = " ".join(container_ids) - self._execute_root( - "docker stop {ids}".format(ids=container_id_str), - warn_only=True) - self._execute_root( - "docker rm {ids}".format(ids=container_id_str), warn_only=True) + """Stops and removes all Clipper model deployments.""" + self.clipper_k8s.stop_all_model_deployments() # TODO: provide registry image for k8s service instead of container_name and model_data_path def _publish_new_model(self, name, version, labels, input_type, @@ -1073,10 +1044,10 @@ def _publish_new_model(self, name, version, labels, input_type, "model_data_path": model_data_path }) headers = {'Content-type': 'application/json'} - print(req_json) + logging.info(req_json) r = requests.post(url, headers=headers, data=req_json) if r.status_code == requests.codes.ok: return True else: - print("Error publishing model: %s" % r.text) + logging.warn("Error publishing model: %s" % r.text) return False From 5cfce1d4618633994fb0556b5ab15512102333b4 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Fri, 30 Jun 2017 13:39:15 -0700 Subject: [PATCH 19/24] Cleans up minikube registry --- clipper_admin/clipper_k8s.py | 61 +++++++++++-------- ...-ds.yaml => kube-registry-daemon-set.yaml} | 0 ...kube-registry-replication-controller.yaml} | 0 ...ry-svc.yaml => kube-registry-service.yaml} | 0 4 files changed, 35 insertions(+), 26 deletions(-) rename k8s/minikube-registry/{kube-registry-ds.yaml => kube-registry-daemon-set.yaml} (100%) rename k8s/minikube-registry/{kube-registry-rc.yaml => kube-registry-replication-controller.yaml} (100%) rename k8s/minikube-registry/{kube-registry-svc.yaml => kube-registry-service.yaml} (100%) diff --git a/clipper_admin/clipper_k8s.py b/clipper_admin/clipper_k8s.py index ea85b8c72..845c0cb5b 100644 --- a/clipper_admin/clipper_k8s.py +++ b/clipper_admin/clipper_k8s.py @@ -27,34 +27,13 @@ def __init__(self): config.load_kube_config() self._k8s_v1 = client.CoreV1Api() self._k8s_beta = client.ExtensionsV1beta1Api() - self._initialize_clipper() + + def start(self): + self._start_clipper() # NOTE: this provides a minikube accessible docker registry for convenience during dev # TODO: should not be required, as `deploy_model` should be able to pull from any accessible `repo` - self._initialize_registry() - - def _initialize_clipper(self): - """Deploys Clipper to the k8s cluster and exposes the frontends as services.""" - logging.info("Initializing Clipper services to k8s cluster") - for name in ['mgmt-frontend', 'query-frontend', 'redis']: - with _pass_conflicts(): - self._k8s_beta.create_namespaced_deployment( - body=yaml.load(open('k8s/clipper/{}-deployment.yaml'.format(name))), namespace='default') - with _pass_conflicts(): - self._k8s_v1.create_namespaced_service( - body=yaml.load(open('k8s/clipper/{}-service.yaml'.format(name))), namespace='default') - - def _initialize_registry(self): - logging.info("Initializing Docker registry on k8s cluster") - with _pass_conflicts(): - self._k8s_v1.create_namespaced_replication_controller( - body=yaml.load(open('k8s/minikube-registry/kube-registry-rc.yaml')), namespace='kube-system') - with _pass_conflicts(): - self._k8s_v1.create_namespaced_service( - body=yaml.load(open('k8s/minikube-registry/kube-registry-svc.yaml')), namespace='kube-system') - with _pass_conflicts(): - self._k8s_beta.create_namespaced_daemon_set( - body=yaml.load(open('k8s/minikube-registry/kube-registry-ds.yaml')), namespace='kube-system') + self._start_registry() def deploy_model(self, name, version, repo): """Deploys a versioned model to a k8s cluster. @@ -119,11 +98,41 @@ def deploy_model(self, name, version, repo): }, namespace='default') def stop_all_model_deployments(self): + """Stops all deployments of pods running Clipper models.""" logging.info("Stopping all running Clipper model deployments...") try: resp = self._k8s_beta.delete_collection_namespaced_deployment( namespace='default', label_selector='ai.clipper.container.label') - logging.info(resp) except ApiException as e: logging.warn("Exception deleting k8s deployments: {}".format(e)) + + def stop_clipper_resources(self): + """Stops all Clipper resources. + + WARNING: this will delete Redis and Docker Registry pods. + """ + + def _start_clipper(self): + """Deploys Clipper to the k8s cluster and exposes the frontends as services.""" + logging.info("Initializing Clipper services to k8s cluster") + for name in ['mgmt-frontend', 'query-frontend', 'redis']: + with _pass_conflicts(): + self._k8s_beta.create_namespaced_deployment( + body=yaml.load(open('k8s/clipper/{}-deployment.yaml'.format(name))), namespace='default') + with _pass_conflicts(): + self._k8s_v1.create_namespaced_service( + body=yaml.load(open('k8s/clipper/{}-service.yaml'.format(name))), namespace='default') + + def _start_registry(self): + logging.info("Initializing Docker registry on k8s cluster") + with _pass_conflicts(): + self._k8s_v1.create_namespaced_replication_controller( + body=yaml.load(open('k8s/minikube-registry/kube-registry-replication-controller.yaml')), namespace='kube-system') + with _pass_conflicts(): + self._k8s_v1.create_namespaced_service( + body=yaml.load(open('k8s/minikube-registry/kube-registry-service.yaml')), namespace='kube-system') + with _pass_conflicts(): + self._k8s_beta.create_namespaced_daemon_set( + body=yaml.load(open('k8s/minikube-registry/kube-registry-daemon-set.yaml')), namespace='kube-system') + diff --git a/k8s/minikube-registry/kube-registry-ds.yaml b/k8s/minikube-registry/kube-registry-daemon-set.yaml similarity index 100% rename from k8s/minikube-registry/kube-registry-ds.yaml rename to k8s/minikube-registry/kube-registry-daemon-set.yaml diff --git a/k8s/minikube-registry/kube-registry-rc.yaml b/k8s/minikube-registry/kube-registry-replication-controller.yaml similarity index 100% rename from k8s/minikube-registry/kube-registry-rc.yaml rename to k8s/minikube-registry/kube-registry-replication-controller.yaml diff --git a/k8s/minikube-registry/kube-registry-svc.yaml b/k8s/minikube-registry/kube-registry-service.yaml similarity index 100% rename from k8s/minikube-registry/kube-registry-svc.yaml rename to k8s/minikube-registry/kube-registry-service.yaml From e4f13619ffc81d6b01fb58aaf30330418bdc1cee Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Fri, 30 Jun 2017 13:39:38 -0700 Subject: [PATCH 20/24] Clean up clipper deployment --- clipper_admin/clipper_manager.py | 19 ++++++++++++++----- k8s/clipper/mgmt-frontend-deployment.yaml | 13 ++++--------- k8s/clipper/mgmt-frontend-service.yaml | 8 +++----- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index fb6389fed..d3f3f8961 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -182,6 +182,13 @@ def _execute_put(self, local_path, remote_path, *args, **kwargs): *args, **kwargs) + def start(self): + """Start a Clipper instance. + + """ + self.clipper_k8s.start() + logging.info("Clipper is running") + def register_application(self, name, model, input_type, default_output, slo_micros): """Register a new Clipper application. @@ -347,20 +354,22 @@ def deploy_model(self, name=name, version=version) docker_client = docker.from_env(version=os.environ["DOCKER_API_VERSION"]) + logging.info("Building model Docker image at {}".format(model_data_path)) docker_client.images.build( path=model_data_path, tag=repo) + logging.info("Pushing model Docker image to {}".format(repo)) docker_client.images.push(repository=repo) + # TODO: call this in `add_container` once `repo` is available from redis + logging.info("Creating model deployment on k8s") + self.clipper_k8s.deploy_model(name, version, repo) + # TODO: replace `model_data_path` in `_publish_new_model` with the docker repo # publish model to Clipper and verify success before copying model # parameters to Clipper and starting containers + logging.info("Publishing model to Clipper query manager") self._publish_new_model(name, version, labels, input_type, container_name, repo) - logging.info("Published model to Clipper") - - # TODO: call this in `add_container` once `repo` is available from redis - self.clipper_k8s.deploy_model(name, version, repo) - # self.add_container(name, version) # aggregate results of starting all containers # return all([ diff --git a/k8s/clipper/mgmt-frontend-deployment.yaml b/k8s/clipper/mgmt-frontend-deployment.yaml index 5344f90ff..3b18a87fe 100644 --- a/k8s/clipper/mgmt-frontend-deployment.yaml +++ b/k8s/clipper/mgmt-frontend-deployment.yaml @@ -1,20 +1,17 @@ apiVersion: extensions/v1beta1 kind: Deployment metadata: - annotations: - ai.clipper.container.label: "" - creationTimestamp: null labels: - io.kompose.service: mgmt-frontend + ai.clipper.container.label: "" + ai.clipper.name: mgmt-frontend name: mgmt-frontend spec: replicas: 1 - strategy: {} template: metadata: - creationTimestamp: null labels: - io.kompose.service: mgmt-frontend + ai.clipper.container.label: "" + ai.clipper.name: mgmt-frontend spec: containers: - args: @@ -24,6 +21,4 @@ spec: name: mgmt-frontend ports: - containerPort: 1338 - resources: {} restartPolicy: Always -status: {} diff --git a/k8s/clipper/mgmt-frontend-service.yaml b/k8s/clipper/mgmt-frontend-service.yaml index 85bf7f2b7..f3024ada8 100644 --- a/k8s/clipper/mgmt-frontend-service.yaml +++ b/k8s/clipper/mgmt-frontend-service.yaml @@ -1,11 +1,9 @@ apiVersion: v1 kind: Service metadata: - annotations: - ai.clipper.container.label: "" - creationTimestamp: null labels: - io.kompose.service: mgmt-frontend + ai.clipper.container.label: "" + ai.clipper.name: mgmt-frontend name: mgmt-frontend spec: type: NodePort @@ -14,6 +12,6 @@ spec: port: 1338 targetPort: 1338 selector: - io.kompose.service: mgmt-frontend + ai.clipper.name: mgmt-frontend status: loadBalancer: {} From 08dfaf95c284c29f1c31a8613d39f81107f3a5d4 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Fri, 30 Jun 2017 14:14:45 -0700 Subject: [PATCH 21/24] Support stop_all --- clipper_admin/clipper_k8s.py | 34 +++++++++++++++++++--- clipper_admin/clipper_manager.py | 8 +++-- k8s/clipper/mgmt-frontend-service.yaml | 2 -- k8s/clipper/query-frontend-deployment.yaml | 13 +++------ k8s/clipper/query-frontend-service.yaml | 10 ++----- k8s/clipper/redis-deployment.yaml | 13 +++------ k8s/clipper/redis-service.yaml | 10 ++----- 7 files changed, 49 insertions(+), 41 deletions(-) diff --git a/clipper_admin/clipper_k8s.py b/clipper_admin/clipper_k8s.py index 845c0cb5b..dac61eefc 100644 --- a/clipper_admin/clipper_k8s.py +++ b/clipper_admin/clipper_k8s.py @@ -61,7 +61,7 @@ def deploy_model(self, name, version, repo): 'template': { 'metadata': { 'labels': { - clipper_manager.CLIPPER_DOCKER_LABEL: '', + clipper_manager.CLIPPER_MODEL_CONTAINER_LABEL: '', 'model': name, 'version': str(version) } @@ -99,19 +99,45 @@ def deploy_model(self, name, version, repo): def stop_all_model_deployments(self): """Stops all deployments of pods running Clipper models.""" - logging.info("Stopping all running Clipper model deployments...") + logging.info("Stopping all running Clipper model deployments") try: resp = self._k8s_beta.delete_collection_namespaced_deployment( namespace='default', - label_selector='ai.clipper.container.label') + label_selector=clipper_manager.CLIPPER_MODEL_CONTAINER_LABEL) except ApiException as e: logging.warn("Exception deleting k8s deployments: {}".format(e)) def stop_clipper_resources(self): """Stops all Clipper resources. - WARNING: this will delete Redis and Docker Registry pods. + WARNING: Data stored on an in-cluster Redis deployment will be lost! This method does not delete + any existing in-cluster Docker registry. """ + logging.info("Stopping all running Clipper resources") + + try: + for service in self._k8s_v1.list_namespaced_service( + namespace='default', + label_selector=clipper_manager.CLIPPER_DOCKER_LABEL).items: + # TODO: use delete collection of services if API provides + service_name = service.metadata.name + self._k8s_v1.delete_namespaced_service( + namespace='default', + name=service_name) + + self._k8s_beta.delete_collection_namespaced_deployment( + namespace='default', + label_selector=clipper_manager.CLIPPER_DOCKER_LABEL) + + self._k8s_v1.delete_collection_namespaced_pod( + namespace='default', + label_selector=clipper_manager.CLIPPER_DOCKER_LABEL) + + self._k8s_v1.delete_collection_namespaced_pod( + namespace='default', + label_selector=clipper_manager.CLIPPER_MODEL_CONTAINER_LABEL) + except ApiException as e: + logging.warn("Exception deleting k8s resources: {}".format(e)) def _start_clipper(self): """Deploys Clipper to the k8s cluster and exposes the frontends as services.""" diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index d3f3f8961..c0cdbe203 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -42,7 +42,7 @@ CLIPPER_LOGS_PATH = "/tmp/clipper-logs" CLIPPER_DOCKER_LABEL = "ai.clipper.container.label" -CLIPPER_MODEL_CONTAINER_LABEL = "ai.clipper.model_container.model_version" +CLIPPER_MODEL_CONTAINER_LABEL = "ai.clipper.model_container.label" DEFAULT_LABEL = ["DEFAULT"] @@ -335,7 +335,6 @@ def deploy_model(self, elif isinstance(model_data, str): # assume that model_data is a path to the serialized model model_data_path = model_data - logging.info("model_data_path is: %s" % model_data_path) else: warn("%s is invalid model format" % str(type(model_data))) return False @@ -371,6 +370,8 @@ def deploy_model(self, logging.info("Publishing model to Clipper query manager") self._publish_new_model(name, version, labels, input_type, container_name, repo) + logging.info("Done deploying!") + # aggregate results of starting all containers # return all([ # self.add_container(name, version) @@ -1036,8 +1037,9 @@ def remove_inactive_containers(self, model_name): return num_containers_removed def stop_all(self): - """Stops and removes all Clipper model deployments.""" + """Stops and removes all Clipper model deployments and Clipper resources.""" self.clipper_k8s.stop_all_model_deployments() + self.clipper_k8s.stop_clipper_resources() # TODO: provide registry image for k8s service instead of container_name and model_data_path def _publish_new_model(self, name, version, labels, input_type, diff --git a/k8s/clipper/mgmt-frontend-service.yaml b/k8s/clipper/mgmt-frontend-service.yaml index f3024ada8..4cf5fa239 100644 --- a/k8s/clipper/mgmt-frontend-service.yaml +++ b/k8s/clipper/mgmt-frontend-service.yaml @@ -13,5 +13,3 @@ spec: targetPort: 1338 selector: ai.clipper.name: mgmt-frontend -status: - loadBalancer: {} diff --git a/k8s/clipper/query-frontend-deployment.yaml b/k8s/clipper/query-frontend-deployment.yaml index af188584a..f6a0bf4c6 100644 --- a/k8s/clipper/query-frontend-deployment.yaml +++ b/k8s/clipper/query-frontend-deployment.yaml @@ -1,20 +1,17 @@ apiVersion: extensions/v1beta1 kind: Deployment metadata: - annotations: - ai.clipper.container.label: "" - creationTimestamp: null labels: - io.kompose.service: query-frontend + ai.clipper.container.label: "" + ai.clipper.name: query-frontend name: query-frontend spec: replicas: 1 - strategy: {} template: metadata: - creationTimestamp: null labels: - io.kompose.service: query-frontend + ai.clipper.container.label: "" + ai.clipper.name: query-frontend spec: containers: - args: @@ -25,6 +22,4 @@ spec: ports: - containerPort: 7000 - containerPort: 1337 - resources: {} restartPolicy: Always -status: {} diff --git a/k8s/clipper/query-frontend-service.yaml b/k8s/clipper/query-frontend-service.yaml index 3fbe8eb6a..936253e0b 100644 --- a/k8s/clipper/query-frontend-service.yaml +++ b/k8s/clipper/query-frontend-service.yaml @@ -1,11 +1,9 @@ apiVersion: v1 kind: Service metadata: - annotations: - ai.clipper.container.label: "" - creationTimestamp: null labels: - io.kompose.service: query-frontend + ai.clipper.container.label: "" + ai.clipper.name: query-frontend name: query-frontend spec: type: NodePort @@ -17,6 +15,4 @@ spec: port: 1337 targetPort: 1337 selector: - io.kompose.service: query-frontend -status: - loadBalancer: {} + ai.clipper.name: query-frontend diff --git a/k8s/clipper/redis-deployment.yaml b/k8s/clipper/redis-deployment.yaml index ce4496927..cc7bce16c 100644 --- a/k8s/clipper/redis-deployment.yaml +++ b/k8s/clipper/redis-deployment.yaml @@ -1,20 +1,17 @@ apiVersion: extensions/v1beta1 kind: Deployment metadata: - annotations: - ai.clipper.container.label: "" - creationTimestamp: null labels: - io.kompose.service: redis + ai.clipper.container.label: "" + ai.clipper.name: redis name: redis spec: replicas: 1 - strategy: {} template: metadata: - creationTimestamp: null labels: - io.kompose.service: redis + ai.clipper.container.label: "" + ai.clipper.name: redis spec: containers: - args: @@ -25,6 +22,4 @@ spec: name: redis ports: - containerPort: 6379 - resources: {} restartPolicy: Always -status: {} diff --git a/k8s/clipper/redis-service.yaml b/k8s/clipper/redis-service.yaml index bf12033ad..30941c9cc 100644 --- a/k8s/clipper/redis-service.yaml +++ b/k8s/clipper/redis-service.yaml @@ -1,11 +1,9 @@ apiVersion: v1 kind: Service metadata: - annotations: - ai.clipper.container.label: "" - creationTimestamp: null labels: - io.kompose.service: redis + ai.clipper.container.label: "" + ai.clipper.name: redis name: redis spec: type: NodePort @@ -14,6 +12,4 @@ spec: port: 6379 targetPort: 6379 selector: - io.kompose.service: redis -status: - loadBalancer: {} + ai.clipper.name: redis From 485a1a506fdff0960f0c6af6f2baa0986170b65b Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Sun, 2 Jul 2017 18:53:57 -0700 Subject: [PATCH 22/24] Clean up unused code --- .gitignore | 2 - clipper_admin/clipper_manager.py | 64 ++------------------------------ 2 files changed, 4 insertions(+), 62 deletions(-) diff --git a/.gitignore b/.gitignore index 744667c90..79ec906e3 100644 --- a/.gitignore +++ b/.gitignore @@ -16,5 +16,3 @@ _build/ # scala dependency-reduced-pom.xml *.iml - -docker-compose.yml diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index c0cdbe203..0ad725c38 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -64,13 +64,7 @@ class ClipperManagerException(Exception): class Clipper: """ - Connection to a Clipper instance for administrative purposes. - - Sets up the machine for running Clipper. This includes verifying - SSH credentials and initializing Docker. - - Docker and docker-compose must already by installed on the machine - before connecting to a machine. + Connection to a Clipper instance running on k8s for administrative purposes. Parameters ---------- @@ -79,14 +73,8 @@ class Clipper: should allow passwordless SSH access. user : str, optional The SSH username. This field must be specified if `host` is not local. - key_path : str, optional. - The path to the SSH private key. This field must be specified if `host` is not local. sudo : bool, optional. Specifies level of execution for docker commands (sudo if true, standard if false). - ssh_port : int, optional - The SSH port to use. Default is port 22. - check_for_docker : bool, optional - If True, checks that Docker is running on the host machine. Default is True. restart_containers : bool, optional If true, containers will restart on failure. If false, containers will not restart automatically. @@ -95,11 +83,7 @@ class Clipper: def __init__(self, host, - user=None, - key_path=None, sudo=False, - ssh_port=22, - check_for_docker=True, restart_containers=True): # TODO: support deploying redis host off-cluster by taking redis_ip as constructor param to ClipperK8s logging.basicConfig(level=logging.INFO) @@ -107,23 +91,16 @@ def __init__(self, self.sudo = sudo self.host = host self.host_string = self.host - - def _host_is_local(self): - return True + self.restart_containers = restart_containers # TODO: add to logic def _execute_root(self, *args, **kwargs): if not self.sudo: return self._execute_standard(*args, **kwargs) - elif self._host_is_local(): - return self._execute_local(True, *args, **kwargs) else: - return sudo(*args, **kwargs) + return self._execute_local(True, *args, **kwargs) def _execute_standard(self, *args, **kwargs): - if self._host_is_local(): - return self._execute_local(False, *args, **kwargs) - else: - return run(*args, **kwargs) + return self._execute_local(False, *args, **kwargs) def _execute_local(self, as_root, *args, **kwargs): if self.sudo and as_root: @@ -149,39 +126,6 @@ def _execute_local(self, as_root, *args, **kwargs): result = local(*args, **kwargs) return result - def _execute_append(self, filename, text, **kwargs): - if self._host_is_local(): - file = open(filename, "a+") - # As with fabric.append(), we should only - # append the text if it is not already - # present within the file - if text not in file.read(): - file.write(text) - file.close() - else: - append(filename, text, **kwargs) - - def _execute_put(self, local_path, remote_path, *args, **kwargs): - if self._host_is_local(): - # We should only copy data if the paths are different - if local_path != remote_path: - if os.path.isdir(local_path): - remote_path = os.path.join(remote_path, - os.path.basename(local_path)) - # if remote_path exists, delete it because shutil.copytree requires - # that the dst path doesn't exist - if os.path.exists(remote_path): - shutil.rmtree(remote_path) - shutil.copytree(local_path, remote_path) - else: - shutil.copy2(local_path, remote_path) - else: - put( - local_path=local_path, - remote_path=remote_path, - *args, - **kwargs) - def start(self): """Start a Clipper instance. From 651e285fcb5eea63df84b3d5aee78c561e5d899e Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Sun, 2 Jul 2017 19:21:21 -0700 Subject: [PATCH 23/24] Remove dead code, working end to end --- clipper_admin/clipper_k8s.py | 4 + clipper_admin/clipper_manager.py | 259 ++++------------ clipper_admin/tests/__init__.py | 0 clipper_admin/tests/clipper_manager_test.py | 308 ++++++++++++++++++++ 4 files changed, 363 insertions(+), 208 deletions(-) create mode 100644 clipper_admin/tests/__init__.py create mode 100644 clipper_admin/tests/clipper_manager_test.py diff --git a/clipper_admin/clipper_k8s.py b/clipper_admin/clipper_k8s.py index dac61eefc..6b4e918bf 100644 --- a/clipper_admin/clipper_k8s.py +++ b/clipper_admin/clipper_k8s.py @@ -99,6 +99,8 @@ def deploy_model(self, name, version, repo): def stop_all_model_deployments(self): """Stops all deployments of pods running Clipper models.""" + # TODO: stopping deploy doesn't stop replicaset, maybe this is a kubernetes python API bug? + # either way, need to manually stop for now logging.info("Stopping all running Clipper model deployments") try: resp = self._k8s_beta.delete_collection_namespaced_deployment( @@ -113,6 +115,8 @@ def stop_clipper_resources(self): WARNING: Data stored on an in-cluster Redis deployment will be lost! This method does not delete any existing in-cluster Docker registry. """ + # TODO: stopping deploy doesn't stop replicaset, maybe this is a kubernetes python API bug? + # either way, need to manually stop for now logging.info("Stopping all running Clipper resources") try: diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index 0ad725c38..cd9876322 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -2,8 +2,6 @@ from __future__ import print_function, with_statement, absolute_import import docker -from fabric.api import * -from fabric.contrib.files import append import os import requests import json @@ -35,8 +33,8 @@ REDIS_APPLICATION_DB_NUM = 5 DEFAULT_REDIS_PORT = 6379 -CLIPPER_QUERY_PORT = 1337 -CLIPPER_MANAGEMENT_PORT = 1338 +CLIPPER_QUERY_PORT = 32595 +CLIPPER_MANAGEMENT_PORT = 31725 CLIPPER_RPC_PORT = 7000 CLIPPER_LOGS_PATH = "/tmp/clipper-logs" @@ -93,39 +91,6 @@ def __init__(self, self.host_string = self.host self.restart_containers = restart_containers # TODO: add to logic - def _execute_root(self, *args, **kwargs): - if not self.sudo: - return self._execute_standard(*args, **kwargs) - else: - return self._execute_local(True, *args, **kwargs) - - def _execute_standard(self, *args, **kwargs): - return self._execute_local(False, *args, **kwargs) - - def _execute_local(self, as_root, *args, **kwargs): - if self.sudo and as_root: - root_args = list(args) - root_args[0] = "sudo %s" % root_args[0] - args = tuple(root_args) - # local is not currently capable of simultaneously logging.infoing and - # capturing output, as run/sudo do. The capture kwarg allows you to - # switch between logging.infoing and capturing as necessary, and defaults to - # False. In this case, we need to capture the output and return it. - if "capture" not in kwargs: - kwargs["capture"] = True - # fabric.local() does not accept the "warn_only" - # key word argument, so we must remove it before - # calling - if "warn_only" in kwargs: - del kwargs["warn_only"] - # Forces execution to continue in the face of an error, - # just like warn_only=True - with warn_only(): - result = local(*args, **kwargs) - else: - result = local(*args, **kwargs) - return result - def start(self): """Start a Clipper instance. @@ -266,61 +231,53 @@ def deploy_model(self, The number of replicas of the model to create. More replicas can be created later as well. Defaults to 1. """ - with hide("warnings", "output", "running"): - if isinstance(model_data, base.BaseEstimator): - fname = name.replace("/", "_") - model_data_path = "/tmp/%s" % fname - pkl_path = '%s/%s.pkl' % (model_data_path, fname) - try: - os.mkdir(model_data_path) - except OSError: - pass - joblib.dump(model_data, pkl_path) - elif isinstance(model_data, str): - # assume that model_data is a path to the serialized model - model_data_path = model_data - else: - warn("%s is invalid model format" % str(type(model_data))) - return False + if isinstance(model_data, base.BaseEstimator): + fname = name.replace("/", "_") + model_data_path = "/tmp/%s" % fname + pkl_path = '%s/%s.pkl' % (model_data_path, fname) + try: + os.mkdir(model_data_path) + except OSError: + pass + joblib.dump(model_data, pkl_path) + elif isinstance(model_data, str): + # assume that model_data is a path to the serialized model + model_data_path = model_data + else: + warn("%s is invalid model format" % str(type(model_data))) + return False - vol = "{model_repo}/{name}/{version}".format( - model_repo=MODEL_REPO, name=name, version=version) - - # prepare docker image build dir - with open(model_data_path + '/Dockerfile', 'w') as f: - f.write("FROM {container_name}\nCOPY . /model/.\n".format(container_name=container_name)) - - # build, tag, and push docker image to registry - # NOTE: DOCKER_API_VERSION (set by `minikube docker-env`) must be same version as docker registry server - repo = '{docker_registry}/{name}:{version}'.format( - docker_registry='localhost:5000', # TODO: make configurable - name=name, - version=version) - docker_client = docker.from_env(version=os.environ["DOCKER_API_VERSION"]) - logging.info("Building model Docker image at {}".format(model_data_path)) - docker_client.images.build( - path=model_data_path, - tag=repo) - logging.info("Pushing model Docker image to {}".format(repo)) - docker_client.images.push(repository=repo) - - # TODO: call this in `add_container` once `repo` is available from redis - logging.info("Creating model deployment on k8s") - self.clipper_k8s.deploy_model(name, version, repo) - - # TODO: replace `model_data_path` in `_publish_new_model` with the docker repo - # publish model to Clipper and verify success before copying model - # parameters to Clipper and starting containers - logging.info("Publishing model to Clipper query manager") - self._publish_new_model(name, version, labels, input_type, container_name, repo) - - logging.info("Done deploying!") - - # aggregate results of starting all containers - # return all([ - # self.add_container(name, version) - # for r in range(num_containers) - # ]) + vol = "{model_repo}/{name}/{version}".format( + model_repo=MODEL_REPO, name=name, version=version) + + # prepare docker image build dir + with open(model_data_path + '/Dockerfile', 'w') as f: + f.write("FROM {container_name}\nCOPY . /model/.\n".format(container_name=container_name)) + + # build, tag, and push docker image to registry + # NOTE: DOCKER_API_VERSION (set by `minikube docker-env`) must be same version as docker registry server + repo = '{docker_registry}/{name}:{version}'.format( + docker_registry='localhost:5000', # TODO: make configurable + name=name, + version=version) + docker_client = docker.from_env(version=os.environ["DOCKER_API_VERSION"]) + logging.info("Building model Docker image at {}".format(model_data_path)) + docker_client.images.build( + path=model_data_path, + tag=repo) + logging.info("Pushing model Docker image to {}".format(repo)) + docker_client.images.push(repository=repo) + + logging.info("Creating model deployment on k8s") + self.clipper_k8s.deploy_model(name, version, repo) + + # TODO: replace `model_data_path` in `_publish_new_model` with the docker repo + # publish model to Clipper and verify success before copying model + # parameters to Clipper and starting containers + logging.info("Publishing model to Clipper query manager") + self._publish_new_model(name, version, labels, input_type, container_name, repo) + + logging.info("Done deploying!") def register_external_model(self, name, @@ -783,79 +740,6 @@ def _check_and_write_dependencies(self, environment_path, directory, logging.info(err) return process.returncode == 0 - def add_container(self, model_name, model_version): - """Create a new container for an existing model. - - Starts a new container for a model that has already been added to - Clipper. Note that models are uniquely identified by both name - and version, so this method will fail if you have not already called - `Clipper.deploy_model()` for the specified name and version. - - Parameters - ---------- - model_name : str - The name of the model - model_version : int - The version of the model - - Returns - ---------- - bool - True if the container was added successfully and False - if the container could not be added. - """ - # TODO: this must abstract containers deployed on k8s vs those running on local docker, see ContainerManager - with hide("warnings", "output", "running"): - # Look up model info in Redis - model_key = "{mn}:{mv}".format(mn=model_name, mv=model_version) - result = local( - "redis-cli -h {host} -p {redis_port} -n {db} hgetall {key}". - format( - host=self.host, - redis_port=DEFAULT_REDIS_PORT, - key=model_key, - db=REDIS_MODEL_DB_NUM), - capture=True) - logging.info(result) - - if "empty list or set" in result.stdout: - # Model not found - warn("Trying to add container but model {mn}:{mv} not in " - "Redis".format(mn=model_name, mv=model_version)) - return False - - splits = result.stdout.split("\n") - model_metadata = dict([(splits[i].strip(), splits[i + 1].strip()) - for i in range(0, len(splits), 2)]) - image_name = model_metadata["container_name"] - model_data_path = model_metadata["model_data_path"] - model_input_type = model_metadata["input_type"] - restart_policy = 'always' if self.restart_containers else 'no' - - if image_name != EXTERNALLY_MANAGED_MODEL: - # Start container - add_container_cmd = ( - "docker run -d --network={nw} --restart={restart_policy} -v {path}:/model:ro " - "-e \"CLIPPER_MODEL_NAME={mn}\" -e \"CLIPPER_MODEL_VERSION={mv}\" " - "-e \"CLIPPER_IP=query_frontend\" -e \"CLIPPER_INPUT_TYPE={mip}\" -l \"{clipper_label}\" -l \"{mv_label}\" " - "{image}".format( - path=model_data_path, - nw=DOCKER_NW, - image=image_name, - mn=model_name, - mv=model_version, - mip=model_input_type, - clipper_label=CLIPPER_DOCKER_LABEL, - mv_label="%s=%s:%d" % (CLIPPER_MODEL_CONTAINER_LABEL, - model_name, model_version), - restart_policy=restart_policy)) - result = self._execute_root(add_container_cmd) - return result.return_code == 0 - else: - logging.info("Cannot start containers for externally managed model %s" - % model_name) - return False - def get_clipper_logs(self): """Copies the logs from all Docker containers running on the host machine that have been tagged with the Clipper label (ai.clipper.container.label) into @@ -866,33 +750,14 @@ def get_clipper_logs(self): list(str) Returns a list of local filenames containing the Docker container log snapshots. """ - container_ids = self._get_clipper_container_ids() cur_time_logs_path = os.path.join(CLIPPER_LOGS_PATH, time.strftime("%Y%m%d-%H%M%S")) if not os.path.exists(cur_time_logs_path): os.makedirs(cur_time_logs_path) log_file_names = [] - for container in container_ids: - output = self._execute_root( - "docker logs {container}".format(container=container)) - cur_log_fname = os.path.join(cur_time_logs_path, - "%s-container.log" % container) - with open(cur_log_fname, "w") as f: - f.write(output) - log_file_names.append(cur_log_fname) + # TODO: implement, or update docs to point to kubectl logs return log_file_names - def _get_clipper_container_ids(self): - """ - Gets the container IDs of all containers labeled with the clipper label - """ - containers = self._execute_root( - "docker ps -aq --filter label={clipper_label}".format( - clipper_label=CLIPPER_DOCKER_LABEL)) - ids = [l.strip() for l in containers.split("\n")] - logging.info("Clipper container IDS found: %s" % str(ids)) - return ids - def inspect_instance(self): """Fetches metrics from the running Clipper instance. @@ -931,7 +796,6 @@ def set_model_version(self, model_name, model_version, num_containers=0): selected model version. """ - # TODO: update to use k8s API url = "http://%s:%d/admin/set_model_version" % ( self.host, CLIPPER_MANAGEMENT_PORT) req_json = json.dumps({ @@ -941,8 +805,7 @@ def set_model_version(self, model_name, model_version, num_containers=0): headers = {'Content-type': 'application/json'} r = requests.post(url, headers=headers, data=req_json) logging.info(r.text) - for r in range(num_containers): - self.add_container(model_name, model_version) + # TODO: use k8s API to udpate model_version def remove_inactive_containers(self, model_name): """Removes all containers serving stale versions of the specified model. @@ -955,27 +818,7 @@ def remove_inactive_containers(self, model_name): """ # Get all Docker containers tagged as model containers num_containers_removed = 0 - with hide("output", "warnings", "running"): - container_ids = self._get_clipper_container_ids() - if len(container_ids) > 0: - for container in container_ids: - # returns a string formatted as ":" - container_model_name_and_version = self._execute_root( - "docker inspect --format \"{{ index .Config.Labels \\\"%s\\\"}}\" %s" - % (CLIPPER_MODEL_CONTAINER_LABEL, container)) - splits = container_model_name_and_version.split(":") - container_model_name = splits[0] - container_model_version = int(splits[1]) - if container_model_name == model_name: - # check if container_model_version is the currently deployed version - model_info = self.get_model_info( - container_model_name, container_model_version) - if model_info == None or not model_info["is_current_version"]: - self._execute_root("docker stop {container}". - format(container=container)) - self._execute_root("docker rm {container}".format( - container=container)) - num_containers_removed += 1 + # TODO: implement using self.get_model_info to check model_version logging.info("Removed %d inactive containers for model %s" % (num_containers_removed, model_name)) return num_containers_removed diff --git a/clipper_admin/tests/__init__.py b/clipper_admin/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/clipper_admin/tests/clipper_manager_test.py b/clipper_admin/tests/clipper_manager_test.py new file mode 100644 index 000000000..da0cb4493 --- /dev/null +++ b/clipper_admin/tests/clipper_manager_test.py @@ -0,0 +1,308 @@ +import unittest +import sys +import os +import json +import time +import requests +from sklearn import svm +from argparse import ArgumentParser +cur_dir = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, os.path.abspath('%s/../' % cur_dir)) +import clipper_manager +import random +import socket +""" +Executes a test suite consisting of two separate cases: short tests and long tests. +Before each case, an instance of clipper_manager.Clipper is created. Tests +are then performed by invoking methods on this instance, often resulting +in the execution of docker commands. +""" + +# range of ports where available ports can be found +PORT_RANGE = [34256, 40000] + + +def find_unbound_port(): + """ + Returns an unbound port number on 127.0.0.1. + """ + while True: + port = random.randint(*PORT_RANGE) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + sock.bind(("127.0.0.1", port)) + return port + except socket.error: + print("randomly generated port %d is bound. Trying again." % port) + + +class ClipperManagerTestCaseShort(unittest.TestCase): + @classmethod + def setUpClass(self): + self.clipper_inst = clipper_manager.Clipper( + "localhost", redis_port=find_unbound_port()) + self.clipper_inst.start() + self.app_name = "app1" + self.model_name = "m1" + self.model_version_1 = 1 + self.model_version_2 = 2 + self.deploy_model_name = "m3" + self.deploy_model_version = 1 + + @classmethod + def tearDownClass(self): + self.clipper_inst.stop_all() + + def test_external_models_register_correctly(self): + name = "m1" + version1 = 1 + tags = ["test"] + input_type = "doubles" + result = self.clipper_inst.register_external_model( + self.model_name, self.model_version_1, tags, input_type) + self.assertTrue(result) + registered_model_info = self.clipper_inst.get_model_info( + self.model_name, self.model_version_1) + self.assertIsNotNone(registered_model_info) + + version2 = 2 + result = self.clipper_inst.register_external_model( + self.model_name, self.model_version_2, tags, input_type) + self.assertTrue(result) + registered_model_info = self.clipper_inst.get_model_info( + self.model_name, self.model_version_2) + self.assertIsNotNone(registered_model_info) + + def test_application_registers_correctly(self): + input_type = "doubles" + default_output = "DEFAULT" + slo_micros = 30000 + self.clipper_inst.register_application(self.app_name, self.model_name, + input_type, default_output, + slo_micros) + registered_applications = self.clipper_inst.get_all_apps() + self.assertGreaterEqual(len(registered_applications), 1) + self.assertTrue(self.app_name in registered_applications) + + def get_app_info_for_registered_app_returns_info_dictionary(self): + result = self.clipper_inst.get_app_info(self.app_name) + self.assertIsNotNone(result) + self.assertEqual(type(result), dict) + + def get_app_info_for_nonexistent_app_returns_none(self): + result = self.clipper_inst.get_app_info("fake_app") + self.assertIsNone(result) + + def test_add_container_for_external_model_fails(self): + result = self.clipper_inst.add_container(self.model_name, + self.model_version_1) + self.assertFalse(result) + + def test_model_version_sets_correctly(self): + self.clipper_inst.set_model_version(self.model_name, + self.model_version_1) + all_models = self.clipper_inst.get_all_models(verbose=True) + models_list_contains_correct_version = False + for model_info in all_models: + version = model_info["model_version"] + if version == self.model_version_1: + models_list_contains_correct_version = True + self.assertTrue(model_info["is_current_version"]) + + self.assertTrue(models_list_contains_correct_version) + + def test_get_logs_creates_log_files(self): + log_file_names = self.clipper_inst.get_clipper_logs() + self.assertIsNotNone(log_file_names) + self.assertGreaterEqual(len(log_file_names), 1) + for file_name in log_file_names: + self.assertTrue(os.path.isfile(file_name)) + + def test_inspect_instance_returns_json_dict(self): + metrics = self.clipper_inst.inspect_instance() + self.assertEqual(type(metrics), dict) + self.assertGreaterEqual(len(metrics), 1) + + def test_model_deploys_successfully(self): + # Initialize a support vector classifier + # that will be deployed to a no-op container + model_data = svm.SVC() + container_name = "clipper/noop-container" + labels = ["test"] + input_type = "doubles" + result = self.clipper_inst.deploy_model( + self.deploy_model_name, self.deploy_model_version, model_data, + container_name, labels, input_type) + self.assertTrue(result) + model_info = self.clipper_inst.get_model_info( + self.deploy_model_name, self.deploy_model_version) + self.assertIsNotNone(model_info) + running_containers_output = self.clipper_inst._execute_standard( + "docker ps -q --filter \"ancestor=clipper/noop-container\"") + self.assertIsNotNone(running_containers_output) + self.assertGreaterEqual(len(running_containers_output), 1) + + def test_add_container_for_deployed_model_succeeds(self): + result = self.clipper_inst.add_container(self.deploy_model_name, + self.deploy_model_version) + self.assertTrue(result) + running_containers_output = self.clipper_inst._execute_standard( + "docker ps -q --filter \"ancestor=clipper/noop-container\"") + self.assertIsNotNone(running_containers_output) + split_output = running_containers_output.split("\n") + self.assertGreaterEqual(len(split_output), 2) + + def test_predict_function_deploys_successfully(self): + model_name = "m2" + model_version = 1 + predict_func = lambda inputs: ["0" for x in inputs] + labels = ["test"] + input_type = "doubles" + result = self.clipper_inst.deploy_predict_function( + model_name, model_version, predict_func, labels, input_type) + self.assertTrue(result) + model_info = self.clipper_inst.get_model_info(model_name, + model_version) + self.assertIsNotNone(model_info) + running_containers_output = self.clipper_inst._execute_standard( + "docker ps -q --filter \"ancestor=clipper/python-container\"") + self.assertIsNotNone(running_containers_output) + self.assertGreaterEqual(len(running_containers_output), 1) + + +class ClipperManagerTestCaseLong(unittest.TestCase): + @classmethod + def setUpClass(self): + self.clipper_inst = clipper_manager.Clipper( + "localhost", redis_port=find_unbound_port()) + self.clipper_inst.start() + self.app_name_1 = "app3" + self.app_name_2 = "app4" + self.model_name_1 = "m4" + self.model_name_2 = "m5" + self.input_type = "doubles" + self.default_output = "DEFAULT" + self.latency_slo_micros = 30000 + self.clipper_inst.register_application( + self.app_name_1, self.model_name_1, self.input_type, + self.default_output, self.latency_slo_micros) + self.clipper_inst.register_application( + self.app_name_2, self.model_name_2, self.input_type, + self.default_output, self.latency_slo_micros) + + @classmethod + def tearDownClass(self): + self.clipper_inst.stop_all() + + def test_deployed_model_queried_successfully(self): + model_version = 1 + # Initialize a support vector classifier + # that will be deployed to a no-op container + model_data = svm.SVC() + container_name = "clipper/noop-container" + labels = ["test"] + result = self.clipper_inst.deploy_model( + self.model_name_2, model_version, model_data, container_name, + labels, self.input_type) + self.assertTrue(result) + + time.sleep(30) + + url = "http://localhost:1337/{}/predict".format(self.app_name_2) + test_input = [99.3, 18.9, 67.2, 34.2] + req_json = json.dumps({'uid': 0, 'input': test_input}) + headers = {'Content-type': 'application/json'} + response = requests.post(url, headers=headers, data=req_json) + parsed_response = json.loads(response.text) + self.assertNotEqual(parsed_response["output"], self.default_output) + self.assertFalse(parsed_response["default"]) + + def test_deployed_predict_function_queried_successfully(self): + model_version = 1 + predict_func = lambda inputs: [str(len(x)) for x in inputs] + labels = ["test"] + input_type = "doubles" + result = self.clipper_inst.deploy_predict_function( + self.model_name_1, model_version, predict_func, labels, input_type) + self.assertTrue(result) + + time.sleep(60) + + received_non_default_prediction = False + url = "http://localhost:1337/{}/predict".format(self.app_name_1) + test_input = [101.1, 99.5, 107.2] + req_json = json.dumps({'uid': 0, 'input': test_input}) + headers = {'Content-type': 'application/json'} + for i in range(0, 40): + response = requests.post(url, headers=headers, data=req_json) + parsed_response = json.loads(response.text) + output = parsed_response["output"] + if output == self.default_output: + time.sleep(20) + else: + received_non_default_prediction = True + self.assertEqual(int(output), len(test_input)) + break + + self.assertTrue(received_non_default_prediction) + + +SHORT_TEST_ORDERING = [ + 'test_external_models_register_correctly', + 'test_application_registers_correctly', + 'get_app_info_for_registered_app_returns_info_dictionary', + 'get_app_info_for_nonexistent_app_returns_none', + 'test_add_container_for_external_model_fails', + 'test_model_version_sets_correctly', + 'test_get_logs_creates_log_files', + 'test_inspect_instance_returns_json_dict', + 'test_model_deploys_successfully', + 'test_add_container_for_deployed_model_succeeds', + # 'test_predict_function_deploys_successfully' +] + +LONG_TEST_ORDERING = [ + 'test_deployed_model_queried_successfully', + # 'test_deployed_predict_function_queried_successfully' +] + +if __name__ == '__main__': + description = "Runs clipper manager tests. If no arguments are specified, all tests are executed." + parser = ArgumentParser(description) + parser.add_argument( + "-s", + "--short", + action="store_true", + dest="run_short", + help="Run the short suite of test cases") + parser.add_argument( + "-l", + "--long", + action="store_true", + dest="run_long", + help="Run the long suite of test cases") + parser.add_argument( + "-a", + "--all", + action="store_true", + dest="run_all", + help="Run all test cases") + args = parser.parse_args() + + # If neither the short nor the long argument is specified, + # we will run all tests + args.run_all = args.run_all or ((not args.run_short) and + (not args.run_long)) + + suite = unittest.TestSuite() + + if args.run_short or args.run_all: + for test in SHORT_TEST_ORDERING: + suite.addTest(ClipperManagerTestCaseShort(test)) + + if args.run_long or args.run_all: + for test in LONG_TEST_ORDERING: + suite.addTest(ClipperManagerTestCaseLong(test)) + + result = unittest.TextTestRunner(verbosity=2).run(suite) + sys.exit(not result.wasSuccessful()) From 1a52904691b4511764f0dfc8222963bf6b945e92 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Sun, 2 Jul 2017 19:37:59 -0700 Subject: [PATCH 24/24] Remove committed test --- clipper_admin/tests/clipper_manager_test.py | 308 -------------------- 1 file changed, 308 deletions(-) delete mode 100644 clipper_admin/tests/clipper_manager_test.py diff --git a/clipper_admin/tests/clipper_manager_test.py b/clipper_admin/tests/clipper_manager_test.py deleted file mode 100644 index da0cb4493..000000000 --- a/clipper_admin/tests/clipper_manager_test.py +++ /dev/null @@ -1,308 +0,0 @@ -import unittest -import sys -import os -import json -import time -import requests -from sklearn import svm -from argparse import ArgumentParser -cur_dir = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.abspath('%s/../' % cur_dir)) -import clipper_manager -import random -import socket -""" -Executes a test suite consisting of two separate cases: short tests and long tests. -Before each case, an instance of clipper_manager.Clipper is created. Tests -are then performed by invoking methods on this instance, often resulting -in the execution of docker commands. -""" - -# range of ports where available ports can be found -PORT_RANGE = [34256, 40000] - - -def find_unbound_port(): - """ - Returns an unbound port number on 127.0.0.1. - """ - while True: - port = random.randint(*PORT_RANGE) - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - sock.bind(("127.0.0.1", port)) - return port - except socket.error: - print("randomly generated port %d is bound. Trying again." % port) - - -class ClipperManagerTestCaseShort(unittest.TestCase): - @classmethod - def setUpClass(self): - self.clipper_inst = clipper_manager.Clipper( - "localhost", redis_port=find_unbound_port()) - self.clipper_inst.start() - self.app_name = "app1" - self.model_name = "m1" - self.model_version_1 = 1 - self.model_version_2 = 2 - self.deploy_model_name = "m3" - self.deploy_model_version = 1 - - @classmethod - def tearDownClass(self): - self.clipper_inst.stop_all() - - def test_external_models_register_correctly(self): - name = "m1" - version1 = 1 - tags = ["test"] - input_type = "doubles" - result = self.clipper_inst.register_external_model( - self.model_name, self.model_version_1, tags, input_type) - self.assertTrue(result) - registered_model_info = self.clipper_inst.get_model_info( - self.model_name, self.model_version_1) - self.assertIsNotNone(registered_model_info) - - version2 = 2 - result = self.clipper_inst.register_external_model( - self.model_name, self.model_version_2, tags, input_type) - self.assertTrue(result) - registered_model_info = self.clipper_inst.get_model_info( - self.model_name, self.model_version_2) - self.assertIsNotNone(registered_model_info) - - def test_application_registers_correctly(self): - input_type = "doubles" - default_output = "DEFAULT" - slo_micros = 30000 - self.clipper_inst.register_application(self.app_name, self.model_name, - input_type, default_output, - slo_micros) - registered_applications = self.clipper_inst.get_all_apps() - self.assertGreaterEqual(len(registered_applications), 1) - self.assertTrue(self.app_name in registered_applications) - - def get_app_info_for_registered_app_returns_info_dictionary(self): - result = self.clipper_inst.get_app_info(self.app_name) - self.assertIsNotNone(result) - self.assertEqual(type(result), dict) - - def get_app_info_for_nonexistent_app_returns_none(self): - result = self.clipper_inst.get_app_info("fake_app") - self.assertIsNone(result) - - def test_add_container_for_external_model_fails(self): - result = self.clipper_inst.add_container(self.model_name, - self.model_version_1) - self.assertFalse(result) - - def test_model_version_sets_correctly(self): - self.clipper_inst.set_model_version(self.model_name, - self.model_version_1) - all_models = self.clipper_inst.get_all_models(verbose=True) - models_list_contains_correct_version = False - for model_info in all_models: - version = model_info["model_version"] - if version == self.model_version_1: - models_list_contains_correct_version = True - self.assertTrue(model_info["is_current_version"]) - - self.assertTrue(models_list_contains_correct_version) - - def test_get_logs_creates_log_files(self): - log_file_names = self.clipper_inst.get_clipper_logs() - self.assertIsNotNone(log_file_names) - self.assertGreaterEqual(len(log_file_names), 1) - for file_name in log_file_names: - self.assertTrue(os.path.isfile(file_name)) - - def test_inspect_instance_returns_json_dict(self): - metrics = self.clipper_inst.inspect_instance() - self.assertEqual(type(metrics), dict) - self.assertGreaterEqual(len(metrics), 1) - - def test_model_deploys_successfully(self): - # Initialize a support vector classifier - # that will be deployed to a no-op container - model_data = svm.SVC() - container_name = "clipper/noop-container" - labels = ["test"] - input_type = "doubles" - result = self.clipper_inst.deploy_model( - self.deploy_model_name, self.deploy_model_version, model_data, - container_name, labels, input_type) - self.assertTrue(result) - model_info = self.clipper_inst.get_model_info( - self.deploy_model_name, self.deploy_model_version) - self.assertIsNotNone(model_info) - running_containers_output = self.clipper_inst._execute_standard( - "docker ps -q --filter \"ancestor=clipper/noop-container\"") - self.assertIsNotNone(running_containers_output) - self.assertGreaterEqual(len(running_containers_output), 1) - - def test_add_container_for_deployed_model_succeeds(self): - result = self.clipper_inst.add_container(self.deploy_model_name, - self.deploy_model_version) - self.assertTrue(result) - running_containers_output = self.clipper_inst._execute_standard( - "docker ps -q --filter \"ancestor=clipper/noop-container\"") - self.assertIsNotNone(running_containers_output) - split_output = running_containers_output.split("\n") - self.assertGreaterEqual(len(split_output), 2) - - def test_predict_function_deploys_successfully(self): - model_name = "m2" - model_version = 1 - predict_func = lambda inputs: ["0" for x in inputs] - labels = ["test"] - input_type = "doubles" - result = self.clipper_inst.deploy_predict_function( - model_name, model_version, predict_func, labels, input_type) - self.assertTrue(result) - model_info = self.clipper_inst.get_model_info(model_name, - model_version) - self.assertIsNotNone(model_info) - running_containers_output = self.clipper_inst._execute_standard( - "docker ps -q --filter \"ancestor=clipper/python-container\"") - self.assertIsNotNone(running_containers_output) - self.assertGreaterEqual(len(running_containers_output), 1) - - -class ClipperManagerTestCaseLong(unittest.TestCase): - @classmethod - def setUpClass(self): - self.clipper_inst = clipper_manager.Clipper( - "localhost", redis_port=find_unbound_port()) - self.clipper_inst.start() - self.app_name_1 = "app3" - self.app_name_2 = "app4" - self.model_name_1 = "m4" - self.model_name_2 = "m5" - self.input_type = "doubles" - self.default_output = "DEFAULT" - self.latency_slo_micros = 30000 - self.clipper_inst.register_application( - self.app_name_1, self.model_name_1, self.input_type, - self.default_output, self.latency_slo_micros) - self.clipper_inst.register_application( - self.app_name_2, self.model_name_2, self.input_type, - self.default_output, self.latency_slo_micros) - - @classmethod - def tearDownClass(self): - self.clipper_inst.stop_all() - - def test_deployed_model_queried_successfully(self): - model_version = 1 - # Initialize a support vector classifier - # that will be deployed to a no-op container - model_data = svm.SVC() - container_name = "clipper/noop-container" - labels = ["test"] - result = self.clipper_inst.deploy_model( - self.model_name_2, model_version, model_data, container_name, - labels, self.input_type) - self.assertTrue(result) - - time.sleep(30) - - url = "http://localhost:1337/{}/predict".format(self.app_name_2) - test_input = [99.3, 18.9, 67.2, 34.2] - req_json = json.dumps({'uid': 0, 'input': test_input}) - headers = {'Content-type': 'application/json'} - response = requests.post(url, headers=headers, data=req_json) - parsed_response = json.loads(response.text) - self.assertNotEqual(parsed_response["output"], self.default_output) - self.assertFalse(parsed_response["default"]) - - def test_deployed_predict_function_queried_successfully(self): - model_version = 1 - predict_func = lambda inputs: [str(len(x)) for x in inputs] - labels = ["test"] - input_type = "doubles" - result = self.clipper_inst.deploy_predict_function( - self.model_name_1, model_version, predict_func, labels, input_type) - self.assertTrue(result) - - time.sleep(60) - - received_non_default_prediction = False - url = "http://localhost:1337/{}/predict".format(self.app_name_1) - test_input = [101.1, 99.5, 107.2] - req_json = json.dumps({'uid': 0, 'input': test_input}) - headers = {'Content-type': 'application/json'} - for i in range(0, 40): - response = requests.post(url, headers=headers, data=req_json) - parsed_response = json.loads(response.text) - output = parsed_response["output"] - if output == self.default_output: - time.sleep(20) - else: - received_non_default_prediction = True - self.assertEqual(int(output), len(test_input)) - break - - self.assertTrue(received_non_default_prediction) - - -SHORT_TEST_ORDERING = [ - 'test_external_models_register_correctly', - 'test_application_registers_correctly', - 'get_app_info_for_registered_app_returns_info_dictionary', - 'get_app_info_for_nonexistent_app_returns_none', - 'test_add_container_for_external_model_fails', - 'test_model_version_sets_correctly', - 'test_get_logs_creates_log_files', - 'test_inspect_instance_returns_json_dict', - 'test_model_deploys_successfully', - 'test_add_container_for_deployed_model_succeeds', - # 'test_predict_function_deploys_successfully' -] - -LONG_TEST_ORDERING = [ - 'test_deployed_model_queried_successfully', - # 'test_deployed_predict_function_queried_successfully' -] - -if __name__ == '__main__': - description = "Runs clipper manager tests. If no arguments are specified, all tests are executed." - parser = ArgumentParser(description) - parser.add_argument( - "-s", - "--short", - action="store_true", - dest="run_short", - help="Run the short suite of test cases") - parser.add_argument( - "-l", - "--long", - action="store_true", - dest="run_long", - help="Run the long suite of test cases") - parser.add_argument( - "-a", - "--all", - action="store_true", - dest="run_all", - help="Run all test cases") - args = parser.parse_args() - - # If neither the short nor the long argument is specified, - # we will run all tests - args.run_all = args.run_all or ((not args.run_short) and - (not args.run_long)) - - suite = unittest.TestSuite() - - if args.run_short or args.run_all: - for test in SHORT_TEST_ORDERING: - suite.addTest(ClipperManagerTestCaseShort(test)) - - if args.run_long or args.run_all: - for test in LONG_TEST_ORDERING: - suite.addTest(ClipperManagerTestCaseLong(test)) - - result = unittest.TextTestRunner(verbosity=2).run(suite) - sys.exit(not result.wasSuccessful())