Adds support for deploying PySpark models#196
Conversation
69af1ad to
a84d2c4
Compare
|
Test FAILed. |
|
Test FAILed. |
|
jenkins test this please |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
| cd $DIR | ||
| if [ -z ${SPARK_HOME+x} ]; then | ||
| echo "Downloading Spark" | ||
| curl -o spark.tgz https://d3kbcqa49mib13.cloudfront.net/spark-2.1.1-bin-hadoop2.7.tgz |
There was a problem hiding this comment.
should we pull vars for these versions?
| import findspark | ||
| findspark.init() | ||
| import pyspark | ||
| from pyspark import SparkConf, SparkContext |
| import findspark | ||
| findspark.init() | ||
| import pyspark | ||
| from pyspark import SparkConf, SparkContext |
| COPY containers/python/python_container_conda_deps.txt /lib/ | ||
|
|
||
| RUN curl -o /spark.tgz https://d3kbcqa49mib13.cloudfront.net/spark-2.1.1-bin-hadoop2.7.tgz \ | ||
| && cd / && tar zxf /spark.tgz && mv /spark-2.1.1-bin-hadoop2.7 /spark \ |
There was a problem hiding this comment.
Should we extract vars for these versions and envsubst or BUILDARGS them in?
There was a problem hiding this comment.
Preliminary style review + questions. Will attempt to train and deploy PySpark models after #192 is merged and we rebase on develop.
clipper_admin/clipper_manager.py
Outdated
| if local_path != remote_path: | ||
| if os.path.isdir(local_path): | ||
| self._copytree(local_path, remote_path) | ||
| # self._copytree(local_path, remote_path) |
| # where dst may or may not exist. We cannot use | ||
| # shutil.copytree() alone because it stipulates that | ||
| # dst cannot already exist | ||
| def _copytree(self, src, dst, symlinks=False, ignore=None): |
There was a problem hiding this comment.
What is the rationale behind deleting this method? Did you run into an issue with it?
There was a problem hiding this comment.
Yeah it wasn't working for recursive copying of directories.
clipper_admin/clipper_manager.py
Outdated
| else: | ||
| print( | ||
| "Warning: Anaconda environment was either not found or exporting the environment " | ||
| "failed. Your function will still be serialized deployed, but may fail due to " |
There was a problem hiding this comment.
I think the second sentence should read Your function will still be serialized and deployed, but may fail due to... (missing and)
| # Remove temp files | ||
| shutil.rmtree(serialization_dir) | ||
|
|
||
| return deploy_result |
There was a problem hiding this comment.
Add a Returns section to documentation with type and explanation of deploy_result
|
|
||
| return deploy_result | ||
|
|
||
| def deploy_predict_function(self, |
There was a problem hiding this comment.
This method should also have Returns documentation
| spark_model_path) | ||
|
|
||
| def predict_ints(self, inputs): | ||
| if self.input_type != rpc.INPUT_TYPE_INTS: |
There was a problem hiding this comment.
Why are we performing an additional check on the validity of the input type? Lines 270-278 of rpc.py should take care of this validation.
There was a problem hiding this comment.
Oh I copied this code from the python_container.py code. I didn't realize it was redundant. I'll remove it.
There was a problem hiding this comment.
Got it. Can you remove these checks from python_container as well?
| preds = self.predict_func(self.spark, self.model, inputs) | ||
| return [str(p) for p in preds] | ||
|
|
||
| def _log_incorrect_input_type(self, input_type): |
There was a problem hiding this comment.
Remove this if input type validation is redundant (see comment above regarding preexisting validation in rpc.py)
| PORT_RANGE = [34256, 40000] | ||
|
|
||
|
|
||
| def find_unbound_port(): |
There was a problem hiding this comment.
This doesn't have to be done now, but we should eventually create a reusable module that we can import this functionality from because it's also defined in many_apps_many_models.py in the same directory.
|
|
||
| def parseData(line, obj, pos_label): | ||
| fields = line.strip().split(',') | ||
| # return LabeledPoint(obj(int(fields[0]), pos_label), [float(v)/255.0 for v in fields[1:]]) |
clipper_admin/clipper_manager.py
Outdated
|
|
||
| vol = "{model_repo}/{name}/{version}".format( | ||
| model_repo=MODEL_REPO, name=name, version=version) | ||
| print("Vol is: %s" % vol) |
There was a problem hiding this comment.
Consider removing debug print statement.
|
Test FAILed. |
|
jenkins test this please |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test PASSed. |
Corey-Zumar
left a comment
There was a problem hiding this comment.
Just a couple minor comments. This works well!
| correct_input_type = rpc.input_type_to_string(self.input_type) | ||
| print( | ||
| "Attempted to use prediction function for input type {incorrect_input_type}.\ | ||
| This model-container was configured accept data for input type {correct_input_type}" |
There was a problem hiding this comment.
Nit: Missing "to". This should be
Attempted to use prediction function for input type {incorrect_input_type}, but this model-container was configured to accept data for input type {correct_input_type}
|
|
||
| CMD ["/container/pyspark_container_entry.sh"] | ||
|
|
||
| # vim: set filetype=dockerfile: |
There was a problem hiding this comment.
This actually isn't a comment, it's a vim modeline directive http://vim.wikia.com/wiki/Modeline_magic
You can see how it works by looking at the integration tests I added in
integration-tests/.This should wait for #192 before being merged.
Note that after this gets merged, we need to create an automated build for
clipper/pyspark-containeron DockerHub.