diff --git a/RPythonDockerfile b/RPythonDockerfile new file mode 100644 index 000000000..09291a755 --- /dev/null +++ b/RPythonDockerfile @@ -0,0 +1,39 @@ +FROM clipper/py-rpc:latest + +## Use Debian unstable via pinning -- new style via APT::Default-Release +RUN echo "deb http://http.debian.net/debian sid main" > /etc/apt/sources.list.d/debian-unstable.list \ + && echo 'APT::Default-Release "testing";' > /etc/apt/apt.conf.d/default + +ENV R_BASE_VERSION 3.4.0 + +## Now install R and littler, and create a link for littler in /usr/local/bin +## Also set a default CRAN repo, and make sure littler knows about it too +RUN apt-get update \ + && apt-get install -t unstable -y --no-install-recommends \ + littler \ + r-cran-littler \ + r-base=${R_BASE_VERSION}* \ + r-base-dev=${R_BASE_VERSION}* \ + r-recommended=${R_BASE_VERSION}* \ + && echo 'options(repos = c(CRAN = "https://cran.rstudio.com/"), download.file.method = "libcurl")' >> /etc/R/Rprofile.site \ + && echo 'source("/etc/R/Rprofile.site")' >> /etc/littler.r \ + && ln -s /usr/share/doc/littler/examples/install.r /usr/local/bin/install.r \ + && ln -s /usr/share/doc/littler/examples/install2.r /usr/local/bin/install2.r \ + && ln -s /usr/share/doc/littler/examples/installGithub.r /usr/local/bin/installGithub.r \ + && ln -s /usr/share/doc/littler/examples/testInstalled.r /usr/local/bin/testInstalled.r \ + && install.r docopt \ + && rm -rf /tmp/downloaded_packages/ /tmp/*.rds \ + && rm -rf /var/lib/apt/lists/* + + + + +RUN conda install rpy2 + +COPY containers/R/r_python_container.py /container/ + +CMD ["python", "/container/r_python_container.py"] + + + +# vim: set filetype=dockerfile: diff --git a/bin/build_docker_images.sh b/bin/build_docker_images.sh index cd4ba0093..786b35464 100755 --- a/bin/build_docker_images.sh +++ b/bin/build_docker_images.sh @@ -33,4 +33,5 @@ time docker build -t clipper/python-container -f ./PythonContainerDockerfile ./ time docker build -t clipper/pyspark-container -f ./PySparkContainerDockerfile ./ time docker build -t clipper/sklearn_cifar_container -f ./SklearnCifarDockerfile ./ time docker build -t clipper/tf_cifar_container -f ./TensorFlowCifarDockerfile ./ +time docker build -t clipper/r_python_container -f ./RPythonDockerfile ./ cd - diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index 8c1c9ee1f..d4cfc2386 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -1267,3 +1267,70 @@ def _put_container_on_host(self, container_name): warn("Could not find %s, please try with a valid " "container docker image") return False + + def deploy_R_model(self, + name, + version, + model_data, + labels=DEFAULT_LABEL, + num_containers=1): + """Registers a model with Clipper and deploys instances of it in containers. + Parameters + ---------- + name : str + The name to assign this model. + version : int + The version to assign this model. + model_data : + The trained model to add to Clipper.The type has to be rpy2.robjects.vectors.ListVector, + this is how python's rpy2 encapsulates any given R model.This model will be loaded + into the Clipper model container and provided as an argument to the + predict function each time it is called. + labels : list of str, optional + A set of strings annotating the model + num_containers : int, optional + The number of replicas of the model to create. More replicas can be + created later as well. Defaults to 1. + """ + + # importing some R specific dependencies + import rpy2.robjects as ro + from rpy2.robjects.packages import importr + base = importr('base') + + input_type = "strings" + container_name = "clipper/r_python_container" + + with hide("warnings", "output", "running"): + fname = name.replace("/", "_") + rds_path = '/tmp/%s/%s.rds' % (fname, fname) + model_data_path = "/tmp/%s" % fname + try: + os.mkdir(model_data_path) + except OSError: + pass + base.saveRDS(model_data, rds_path) + + vol = "{model_repo}/{name}/{version}".format( + model_repo=MODEL_REPO, name=name, version=version) + # publish model to Clipper and verify success before copying model + # parameters to Clipper and starting containers + if not self._publish_new_model( + name, version, labels, input_type, container_name, + os.path.join(vol, os.path.basename(model_data_path))): + return False + print("Published model to Clipper") + + # Put model parameter data on host + with hide("warnings", "output", "running"): + self._execute_standard("mkdir -p {vol}".format(vol=vol)) + + with hide("output", "running"): + self._execute_put(model_data_path, vol) + + print("Copied model data to host") + # aggregate results of starting all containers + return all([ + self.add_container(name, version) + for r in range(num_containers) + ]) diff --git a/containers/R/README.md b/containers/R/README.md new file mode 100644 index 000000000..0bf3486a0 --- /dev/null +++ b/containers/R/README.md @@ -0,0 +1,59 @@ +# RPython model container +This container enables the deployment of R models. It supports both the R and Python runtimes. R functions are called from Python using the RPy2 interface, while the container is built upon the [Python RPC client](https://github.com/ucbrise/clipper/blob/develop/containers/python/rpc.py). + +## Prerequisites : +In addition to the requirements of running clipper, + +1. R must be installed (version:latest , >=3.4) +2. Python version must be =2.7. +3. RPy2 must be installed. For more info, go to + +## Instructions for Model Deployment and Prediction : +- Make sure 'r_python_container' image is created from /RPythonDockerfile and Clipper is running. +- R models can be formulated and trained in python notebooks using the RPy2 interface, for example : + +```py +import rpy2.robjects as ro +ro.r('formula=mpg~wt+cyl') #model's formula +ro.r('dataset=mtcars') #model's training dataset +# Create an R model with an RPy2 reference +model_RPy2 = ro.r('model_R <- lm(formula,data=dataset)') +``` +- A previously trained and saved model (in .rds format) can also be loaded as RPy2 object : + +```py +from rpy2.robjects.packages import importr +base = importr('base') +self.model = base.readRDS(PATH) #PATH is the path of saved model. +``` + +- Deploy the model : + +```py +Clipper.deploy_R_model( + "example_model", 1, model_RPy2 + ) +``` + +- Register Application : + +``` +Clipper.register_application( + "example_app", "example_model", "strings", default_output, slo_micros=2000 + ) + ``` + +- Requesting predictions + +The container inputs are of type **string**. Each string input is a csv-encoded pandas dataframe. +The following is an example encoding: +```py + pandas_dataframe = DataFrame({'wt':[5.43,6.00,7.89],'cyl' :[4.32,5.76,7.90]}) + encoded_dataframe = pandas_dataframe.to_csv(sep=";") + assert type(encoded_dataframe) == str +``` + +Once a data frame has been string encoded, we can pass it to the container via the `requests.post()` method and obtain batched predictions for each data frame row. + +This process is illustrated in the `predict_R_model()` method of +[R Model Integration Test](../../integration-tests/deploy_R_models.py) diff --git a/containers/R/r_python_container.py b/containers/R/r_python_container.py new file mode 100644 index 000000000..3367b1ee1 --- /dev/null +++ b/containers/R/r_python_container.py @@ -0,0 +1,85 @@ +from __future__ import print_function +import rpc +import os +import numpy as np +import pandas as pd +from rpy2.robjects import r, pandas2ri +from rpy2.robjects.packages import importr +import rpy2.robjects as ro +import sys +if sys.version_info[0] < 3: + from StringIO import StringIO +else: + from io import StringIO +stats = importr('stats') +base = importr('base') + + +class RContainer(rpc.ModelContainerBase): + def __init__(self, path): + self.model = base.readRDS(path) + print("Loaded %s model" % type(self.model), file=sys.stderr) + self.path = path + + def predict_strings(self, inputs): + outputs = [] + for input_csv in inputs: + csv_handle = StringIO(input_csv) + pdf = pd.read_csv(csv_handle, sep=";", index_col=0) + pandas2ri.activate() + rdf = pandas2ri.py2ri(pdf) + preds = stats.predict(self.model, rdf) + make_list = ro.r('as.list') + make_df = ro.r('data.frame') + rdf_preds = make_df(make_list(preds)) + pdf_preds = pandas2ri.ri2py(rdf_preds) + response_csv = pdf_preds.to_csv(sep=";") + outputs.append(response_csv) + return outputs + + +if __name__ == "__main__": + print("Starting R container") + try: + model_name = os.environ["CLIPPER_MODEL_NAME"] + except KeyError: + print( + "ERROR: CLIPPER_MODEL_NAME environment variable must be set", + file=sys.stdout) + sys.exit(1) + try: + model_version = os.environ["CLIPPER_MODEL_VERSION"] + except KeyError: + print( + "ERROR: CLIPPER_MODEL_VERSION environment variable must be set", + file=sys.stdout) + sys.exit(1) + + ip = "127.0.0.1" + if "CLIPPER_IP" in os.environ: + ip = os.environ["CLIPPER_IP"] + else: + print("Connecting to Clipper on localhost") + + port = 7000 + if "CLIPPER_PORT" in os.environ: + port = int(os.environ["CLIPPER_PORT"]) + else: + print("Connecting to Clipper with default port: 7000") + + input_type = "strings" + model_path = os.environ["CLIPPER_MODEL_PATH"] + + rds_names = [ + l for l in os.listdir(model_path) if os.path.splitext(l)[-1] == ".rds" + ] + + if len(rds_names) != 1: + print("Found %d *.rds files. Expected 1" % len(rds_names)) + sys.exit(1) + rds_path = os.path.join(model_path, rds_names[0]) + print(rds_path, file=sys.stdout) + + model = RContainer(rds_path) + rpc_service = rpc.RPCService() + rpc_service.start(model, ip, port, model_name, model_version, input_type) diff --git a/integration-tests/deploy_R_models.py b/integration-tests/deploy_R_models.py new file mode 100644 index 000000000..ec33bdc7f --- /dev/null +++ b/integration-tests/deploy_R_models.py @@ -0,0 +1,190 @@ +from __future__ import absolute_import, print_function +import os +import sys +import requests +import json +import numpy as np +cur_dir = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, os.path.abspath("%s/.." % cur_dir)) +from clipper_admin import Clipper +import time +import subprocess32 as subprocess +import pprint +import random +import socket + +from pandas import * +from rpy2.robjects.packages import importr +import rpy2.robjects as ro +from rpy2.robjects import r, pandas2ri +pandas2ri.activate() +stats = importr('stats') +base = importr('base') + +headers = {'Content-type': 'application/json'} +app_name = "R_model_test" +model_name = "R_model" + +import sys +if sys.version_info[0] < 3: + from StringIO import StringIO +else: + from io import StringIO + + +class BenchmarkException(Exception): + def __init__(self, value): + self.parameter = value + + def __str__(self): + return repr(self.parameter) + + +# range of ports where available ports can be found +PORT_RANGE = [34256, 40000] + + +def find_unbound_port(): + """ + Returns an unbound port number on 127.0.0.1. + """ + while True: + port = random.randint(*PORT_RANGE) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + sock.bind(("127.0.0.1", port)) + return port + except socket.error: + print("randomly generated port %d is bound. Trying again." % port) + + +def init_clipper(): + clipper = Clipper("localhost", redis_port=find_unbound_port()) + clipper.stop_all() + clipper.start() + time.sleep(1) + return clipper + + +def train_R_model(): + return ro.r('model_R <- lm(mpg~wt+cyl,data=train_data)') + + +def call_predictions(query_string): + default = 0 + url = "http://localhost:1337/%s/predict" % app_name + req_json = json.dumps({'input': query_string}) + response = requests.post(url, headers=headers, data=req_json) + result = response.json() + if response.status_code == requests.codes.ok and result["default"] == True: + default = 1 + elif response.status_code != requests.codes.ok: + print(result) + raise BenchmarkException(response.text) + else: + parsed_output = pandas.read_csv( + StringIO(result["output"]), sep=";", index_col=0) + print("Request Input:\n{} \nResponse:\n{}\n".format( + query_string, parsed_output)) + return default + + +def predict_R_model(df): + """ + This function splits a dataframe into subframes consisting of a + single row. Each subframe is then csv-encoded and sent to Clipper + as a prediction request. + """ + num_preds = len(df) + num_defaults = 0 + for i in range(0, num_preds): + subframe = df.iloc[i:i + 1] + query_string = subframe.to_csv(sep=";") + num_defaults += call_predictions(query_string) + return num_preds, num_defaults + + +def deploy_and_test_model(clipper, model, version, test_data_collection): + """ + Parameters + ---------- + test_data_collection : dict + A collection of pandas dataframes for which to request predictions + """ + clipper.deploy_R_model(model_name, version, model) + time.sleep(25) + num_preds = 0 + num_defaults = 0 + for i in range(0, len(test_data_collection)): + new_preds, new_defaults = predict_R_model(test_data_collection[i]) + num_preds += new_preds + num_defaults += new_defaults + + if num_defaults > 0: + print("Error: %d/%d predictions were default" % (num_defaults, + num_preds)) + + print("PREDS: {} DEFAULTS: {}".format(num_preds, num_defaults)) + + if num_defaults > num_preds / 2: + raise BenchmarkException("Error querying APP %s, MODEL %s:%d" % + (app_name, model_name, version)) + + +def cleanup(clipper, test_succeeded): + """ + Parameters + ---------- + test_succeeded: bool + True if the test succeeded, False otherwise + """ + clipper.stop_all() + if test_succeeded: + print("ALL TESTS PASSED") + else: + sys.exit(1) + + +if __name__ == "__main__": + try: + clipper = init_clipper() + + #preparing datasets for training and testing + #using dataset mtcars , already provided by R. It has 32 rows and various coloums for eg. mpg,wt,cyl etc + #splitting it for training and testing in ratio 1:1. Further splitting the test data in ratio 1:1 + train_data = ro.r('train_data=head(mtcars,0.5*nrow(mtcars))') + test_data = ro.r('test_data=tail(mtcars,0.5*nrow(mtcars))') + test_data_collection = {} + test_data_collection[0] = ro.r( + 'test_data1=head(test_data,0.5*nrow(test_data))') + test_data_collection[1] = ro.r( + 'test_data2=tail(test_data,0.5*nrow(test_data))') + + try: + clipper.register_application(app_name, model_name, "string", + "default_pred", 100000) + time.sleep(1) + response = requests.post( + "http://localhost:1337/%s/predict" % app_name, + headers=headers, + data=json.dumps({ + 'input': "" + })) + result = response.json() + if response.status_code != requests.codes.ok: + print("Error: %s" % response.text) + raise BenchmarkException("Error creating app %s" % app_name) + + version = 1 + R_model = train_R_model() + deploy_and_test_model(clipper, R_model, version, + test_data_collection) + except BenchmarkException as e: + print(e) + cleanup(clipper, test_succeeded=False) + else: + cleanup(clipper, test_succeeded=True) + except Exception as e: + print(e) + clipper = Clipper("localhost") + cleanup(clipper, test_succeeded=False)