diff --git a/.gitignore b/.gitignore index 035d147e8b..a109e24627 100644 --- a/.gitignore +++ b/.gitignore @@ -53,3 +53,4 @@ thirdparty/* !thirdparty/install.sh !include/singa doc/ +/Debug/ diff --git a/Makefile.am b/Makefile.am index 55c45e1424..cd70f4bcda 100644 --- a/Makefile.am +++ b/Makefile.am @@ -38,8 +38,7 @@ CUDNN_SRCS := src/neuralnet/loss_layer/cudnn_softmaxloss.cc \ src/neuralnet/neuron_layer/cudnn_lrn.cc \ src/neuralnet/neuron_layer/cudnn_convolution.cc -PY_SRCS := tool/python/singa/driver_wrap.cxx \ - src/driver.cc +PY_SRCS := tool/python/singa/driver_wrap.cxx HDFS_SRCS := src/io/hdfsfile.cc \ src/io/hdfsfile_store.cc diff --git a/examples/cifar10/job.conf b/examples/cifar10/job.conf index d20b452e34..e5990fbf16 100644 --- a/examples/cifar10/job.conf +++ b/examples/cifar10/job.conf @@ -1,10 +1,10 @@ name: "cifar10-convnet" -train_steps: 1000 -test_steps: 100 -test_freq: 200 +train_steps: 50000 +test_steps: 1000 +test_freq: 2000 #validate_steps: 100 #validate_freq: 300 -disp_freq: 50 +disp_freq: 1000 #checkpoint_path: "examples/cifar10/checkpoint/step1000-worker0" train_one_batch { alg: kBP @@ -31,8 +31,8 @@ neuralnet { type: kRecordInput store_conf { backend: "kvfile" - path: "examples/cifar10/train_data.bin" - mean_file: "examples/cifar10/image_mean.bin" + path: "/home/aaron/Projects/incubator-singa/examples/cifar10/train_data.bin" + mean_file: "/home/aaron/Projects/incubator-singa/examples/cifar10/image_mean.bin" batchsize: 100 #random_skip: 5000 shape: 3 @@ -61,8 +61,8 @@ neuralnet { type: kRecordInput store_conf { backend: "kvfile" - path: "examples/cifar10/test_data.bin" - mean_file: "examples/cifar10/image_mean.bin" + path: "/home/aaron/Projects/incubator-singa/examples/cifar10/test_data.bin" + mean_file: "/home/aaron/Projects/incubator-singa/examples/cifar10/image_mean.bin" batchsize: 100 shape: 3 shape: 32 @@ -275,5 +275,5 @@ cluster { nserver_groups: 1 nworkers_per_group: 1 nworkers_per_procs: 1 - workspace: "examples/cifar10" + workspace: "/home/aaron/Projects/incubator-singa/examples/cifar10" } diff --git a/examples/cifar10_mesos/automobile.jpg b/examples/cifar10_mesos/automobile.jpg new file mode 100644 index 0000000000..6f71fa7138 Binary files /dev/null and b/examples/cifar10_mesos/automobile.jpg differ diff --git a/examples/cifar10_mesos/bird.jpg b/examples/cifar10_mesos/bird.jpg new file mode 100644 index 0000000000..241e69d15d Binary files /dev/null and b/examples/cifar10_mesos/bird.jpg differ diff --git a/examples/cifar10_mesos/checkpoint/step0-worker0 b/examples/cifar10_mesos/checkpoint/step0-worker0 new file mode 100644 index 0000000000..590a8800d1 Binary files /dev/null and b/examples/cifar10_mesos/checkpoint/step0-worker0 differ diff --git a/examples/cifar10_mesos/checkpoint/step1000-worker0 b/examples/cifar10_mesos/checkpoint/step1000-worker0 new file mode 100644 index 0000000000..d4805e95c5 Binary files /dev/null and b/examples/cifar10_mesos/checkpoint/step1000-worker0 differ diff --git a/examples/cifar10_mesos/cifarBin2img.py b/examples/cifar10_mesos/cifarBin2img.py new file mode 100644 index 0000000000..8a552ff242 --- /dev/null +++ b/examples/cifar10_mesos/cifarBin2img.py @@ -0,0 +1,67 @@ + +import sys, os +from numpy.core.test_rational import numerator + +SINGA_ROOT=os.path.join(os.path.dirname(__file__),'../','../') +sys.path.append(os.path.join(SINGA_ROOT,'tool','python')) +from singa.model import * +from singa.utils import imgtool +from PIL import Image +import cPickle + +def unpickle(file): + fo = open(file, 'rb') + dict = cPickle.load(fo) + fo.close() + return dict + +def test(): + ''' + test imgtool toBin and toImg + ''' + im = Image.open("dog.jpg").convert("RGB") + + byteArray=imgtool.toBin(im,(32,32)) + im2 = imgtool.toImg(byteArray,(32,32)) + + im2.save("dog2.jpg", "JPEG") + + +def getLabelMap(path): + d = unpickle(path) + label_map=dict() + for index,line in numerator(d["label_names"]): + print index,line + label_map[index]=line + return label_map + +def generateImage(input_path,output_path,label_map,random): + dict=unpickle(input_path) + data=dict["data"] + labels=dict["labels"] + for index,d in numerator(data): + im = imgtool.toImg(data[index],(32,32)) + temp_folder=os.path.join(output_path,label_map[labels[index]]) + try: + os.stat(temp_folder) + except: + os.makedirs(temp_folder) + im.save(os.path.join(temp_folder,random+"_"+str(index)+".jpg"),"JPEG") + #print labels + +def main(): + label_map=getLabelMap("data/batches.meta") + generateImage("data/data_batch_1", "data/output",label_map,"1") + generateImage("data/data_batch_2", "data/output",label_map,"2") + generateImage("data/data_batch_3", "data/output",label_map,"3") + generateImage("data/data_batch_4", "data/output",label_map,"4") + generateImage("data/data_batch_5", "data/output",label_map,"5") + generateImage("data/test_batch", "data/output",label_map,"6") + +if __name__=='__main__': + main() + + + + + diff --git a/examples/cifar10_mesos/dog.jpg b/examples/cifar10_mesos/dog.jpg new file mode 100644 index 0000000000..bcc6ea8aac Binary files /dev/null and b/examples/cifar10_mesos/dog.jpg differ diff --git a/examples/cifar10_mesos/entry.sh b/examples/cifar10_mesos/entry.sh new file mode 100644 index 0000000000..55d43a6135 --- /dev/null +++ b/examples/cifar10_mesos/entry.sh @@ -0,0 +1,12 @@ +#!/bin/bash +cd /workspace +wget $1 +tar zxf *.tar.gz +cp /workspace/model.py /usr/src/incubator-singa/tool/python/examples/user1-cifar10/ +if [ $2 -eq '2' ]; then + cd /usr/src/incubator-singa/examples/cifar10_mesos/ + python main.py +else + cd /usr/src/incubator-singa/ + python tool/python/examples/user1-cifar10/main.py $3 $4 $5 $6 $7 +fi diff --git a/examples/cifar10_mesos/executor.py b/examples/cifar10_mesos/executor.py new file mode 100755 index 0000000000..ba2adcc03d --- /dev/null +++ b/examples/cifar10_mesos/executor.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import threading +import time +import glob +from multiprocessing import Process + +MESOS_ROOT="/home/aaron/Softs/mesos-0.27.0/build" +for egg in glob.glob(os.path.join(MESOS_ROOT,'src','python','dist','*.egg')): + sys.path.append(os.path.abspath(egg)) + + +import mesos.interface +from mesos.interface import mesos_pb2 +import mesos.native + +import main + +class MyExecutor(mesos.interface.Executor): + def launchTask(self, driver, task): + # Create a thread to run the task. Tasks should always be run in new + # threads or processes, rather than inside launchTask itself. + + def runTask(driver,task): + # We are in the child. + # This is where one would perform the requested task. + + try: + print "test" + sys.argv.append("-singa_conf") + sys.argv.append("/home/aaron/Projects/incubator-singa/conf/singa.conf") + model = main.buildModel(1) + main.product(model) + except Exception as e: + print str(e) + print "Sending status failed..." + update = mesos_pb2.TaskStatus() + update.task_id.value = task.task_id.value + update.state = mesos_pb2.TASK_FAILED + update.data = 'data with a \0 byte' + driver.sendStatusUpdate(update) + print "Sent status failed" + sys.exit(1) + print "Sending status finished..." + update = mesos_pb2.TaskStatus() + update.task_id.value = task.task_id.value + update.state = mesos_pb2.TASK_FINISHED + update.data = 'data with a \0 byte' + driver.sendStatusUpdate(update) + print "Sent status finished" + print "Running task %s" % task.task_id.value + update = mesos_pb2.TaskStatus() + update.task_id.value = task.task_id.value + update.state = mesos_pb2.TASK_RUNNING + update.data = 'data with a \0 byte' + driver.sendStatusUpdate(update) + time.sleep(20) + p = Process(target=runTask, args=(driver,task)) + p.start() + p.join() + ''' + pid = os.fork() + if pid == 0: + # We are in the child. + # This is where one would perform the requested task. + + try: + model = main.buildModel(1) + print "test" + sys.argv.append("-singa_conf") + sys.argv.append("/home/aaron/Projects/incubator-singa/conf/singa.conf") + main.product(model) + except Exception as e: + print str(e) + print "Sending status failed..." + update = mesos_pb2.TaskStatus() + update.task_id.value = task.task_id.value + update.state = mesos_pb2.TASK_FAILED + update.data = 'data with a \0 byte' + driver.sendStatusUpdate(update) + print "Sent status failed" + sys.exit(1) + print "Sending status finished..." + update = mesos_pb2.TaskStatus() + update.task_id.value = task.task_id.value + update.state = mesos_pb2.TASK_FINISHED + update.data = 'data with a \0 byte' + driver.sendStatusUpdate(update) + print "Sent status finished" + sys.exit(0) + else: + # in parent + print "Running task %s" % task.task_id.value + update = mesos_pb2.TaskStatus() + update.task_id.value = task.task_id.value + update.state = mesos_pb2.TASK_RUNNING + update.data = 'data with a \0 byte' + driver.sendStatusUpdate(update) + time.sleep(20) + ''' + def frameworkMessage(self, driver, message): + # Send it back to the scheduler. + print "send message" + driver.sendFrameworkMessage(message) + +if __name__ == "__main__": + print "Starting executor on slave" + driver = mesos.native.MesosExecutorDriver(MyExecutor()) + sys.exit(0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1) diff --git a/examples/cifar10_mesos/framework.py b/examples/cifar10_mesos/framework.py new file mode 100755 index 0000000000..15eaabde10 --- /dev/null +++ b/examples/cifar10_mesos/framework.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import time +import glob + +MESOS_ROOT="/home/aaron/Softs/mesos-0.27.0/build" +for egg in glob.glob(os.path.join(MESOS_ROOT,'src','python','dist','*.egg')): + sys.path.append(os.path.abspath(egg)) + +import mesos.interface +from mesos.interface import mesos_pb2 +import mesos.native + +TOTAL_TASKS = 1 + +TASK_CPUS = 1 +TASK_MEM = 128 + +class TestScheduler(mesos.interface.Scheduler): + def __init__(self): + self.taskData = {} + self.tasksLaunched = 0 + self.tasksFinished = 0 + self.messagesSent = 0 + self.messagesReceived = 0 + + def registered(self, driver, frameworkId, masterInfo): + print "Registered with framework ID %s" % frameworkId.value + + def resourceOffers(self, driver, offers): + for offer in offers: + tasks = [] + offerCpus = 0 + offerMem = 0 + for resource in offer.resources: + if resource.name == "cpus": + offerCpus += resource.scalar.value + elif resource.name == "mem": + offerMem += resource.scalar.value + + print "Received offer %s with cpus: %s and mem: %s" \ + % (offer.id.value, offerCpus, offerMem) + + remainingCpus = offerCpus + remainingMem = offerMem + # no more tasks + if self.tasksLaunched == TOTAL_TASKS: + return + print "launch tasks" + while self.tasksLaunched < TOTAL_TASKS and \ + remainingCpus >= TASK_CPUS and \ + remainingMem >= TASK_MEM: + tid = self.tasksLaunched + self.tasksLaunched += 1 + + print "Launching task %d using offer %s" \ + % (tid, offer.id.value) + + task = mesos_pb2.TaskInfo() + task.task_id.value = str(tid) + task.slave_id.value = offer.slave_id.value + task.name = "task %d" % tid + task.command.value = os.path.abspath("./main.py") + + cpus = task.resources.add() + cpus.name = "cpus" + cpus.type = mesos_pb2.Value.SCALAR + cpus.scalar.value = TASK_CPUS + + mem = task.resources.add() + mem.name = "mem" + mem.type = mesos_pb2.Value.SCALAR + mem.scalar.value = TASK_MEM + + tasks.append(task) + self.taskData[task.task_id.value] = ( + offer.slave_id, task.executor.executor_id) + + remainingCpus -= TASK_CPUS + remainingMem -= TASK_MEM + + operation = mesos_pb2.Offer.Operation() + operation.type = mesos_pb2.Offer.Operation.LAUNCH + operation.launch.task_infos.extend(tasks) + + driver.acceptOffers([offer.id], [operation]) + + def statusUpdate(self, driver, update): + print "Task %s is in state %s" % \ + (update.task_id.value, mesos_pb2.TaskState.Name(update.state)) + + if update.state == mesos_pb2.TASK_FINISHED: + self.tasksFinished += 1 + if self.tasksFinished == TOTAL_TASKS: + print "All tasks done, waiting for final framework message" + + slave_id, executor_id = self.taskData[update.task_id.value] + + self.messagesSent += 1 + driver.sendFrameworkMessage( + executor_id, + slave_id, + 'data with a \0 byte') + + if update.state == mesos_pb2.TASK_LOST or \ + update.state == mesos_pb2.TASK_KILLED or \ + update.state == mesos_pb2.TASK_FAILED: + print "Aborting because task %s is in unexpected state %s with message '%s'" \ + % (update.task_id.value, mesos_pb2.TaskState.Name(update.state), update.message) + driver.abort() + + + def frameworkMessage(self, driver, executorId, slaveId, message): + self.messagesReceived += 1 + + print "Received message:", repr(str(message)) + + if self.messagesReceived == TOTAL_TASKS: + if self.messagesReceived != self.messagesSent: + print "Sent", self.messagesSent, + print "but received", self.messagesReceived + sys.exit(1) + print "All tasks done, and all messages received, exiting" + driver.stop() + +if __name__ == "__main__": + if len(sys.argv) != 2: + print "Usage: %s master" % sys.argv[0] + sys.exit(1) + master_uri=sys.argv[1] + + framework = mesos_pb2.FrameworkInfo() + framework.user = "" # Have Mesos fill in the current user. + framework.name = "SINGA product (Python)" + framework.checkpoint = True + + framework.principal = "SINGA-product-python" + + driver = mesos.native.MesosSchedulerDriver( + TestScheduler(), + framework, + master_uri + ) + + status = 0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1 + + # Ensure that the driver process terminates. + driver.stop(); + + sys.exit(status) diff --git a/examples/cifar10_mesos/main.py b/examples/cifar10_mesos/main.py new file mode 100755 index 0000000000..8705822068 --- /dev/null +++ b/examples/cifar10_mesos/main.py @@ -0,0 +1,268 @@ +#!/usr/bin/env python +#/************************************************************ +#* +#* Licensed to the Apache Software Foundation (ASF) under one +#* or more contributor license agreements. See the NOTICE file +#* distributed with this work for additional information +#* regarding copyright ownership. The ASF licenses this file +#* to you under the Apache License, Version 2.0 (the +#* "License"); you may not use this file except in compliance +#* with the License. You may obtain a copy of the License at +#* +#* http://www.apache.org/licenses/LICENSE-2.0 +#* +#* Unless required by applicable law or agreed to in writing, +#* software distributed under the License is distributed on an +#* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +#* KIND, either express or implied. See the License for the +#* specific language governing permissions and limitations +#* under the License. +#* +#*************************************************************/ + +#************** +#*sudo apt-get install libjpeg-dev +#*sudo pip install +#* flask pillow protobuf + + + +from PIL import Image +import sys, glob, os, random, shutil, time +from flask import Flask, request, redirect, url_for +#all the settings +current_path_ = os.path.dirname(__file__) + +singa_root_=os.path.abspath(os.path.join(current_path_,'../..')) + +#data prepare settings +#input_folder_=os.path.abspath(os.path.join(current_path_,"data/raw")) +#output_folder_=os.path.abspath(os.path.join(current_path_,"data/out")) +#temp_folder_=os.path.abspath(os.path.join(current_path_,"data/temp")) + +input_folder_ = "/workspace/raw" +output_folder_ = "/workspace/out" +temp_folder_ = "/workspace/temp" + +meta_file_name_="meta.txt" +train_bin_file_name_="train.bin" +test_bin_file_name_="test.bin" +validate_bin_file_name_="validate.bin" +mean_bin_file_name_="mean.bin" +label_list_=[(0,"airplane"), + (1,"truck"), + (2,"ship"), + (3,"dog"), + (4,"cat"), + (5,"deer"), + (6,"bird"), + (7,"automobile"), + (8,"horse"), + (9,"frog")] + +#image size +size_=(32,32) + +#final label numbers +total_record_num_=60000 +label_num_=10 + +#data partial +train_rate_=5.0/6 +test_rate_=1.0/6 +validate_rate_=0.0 + +#training settings +model_name_="cifar10-cnn" +workspace_="/workspace" +batch_size_=64 +check_point_path_=workspace_+"/checkpoint/step1000-worker0" + + +allowd_extensions_ = set(['txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif']) + +#singa python libs +sys.path.append(os.path.join(singa_root_,'tool','python')) +from singa.driver import NeuralNet,Driver,strVector,layerVector,intVector,floatVector,DummyLayer,Layer,floatArray_frompointer +from singa.model import * +from singa.utils import kvstore, imgtool +from pb2.common_pb2 import RecordProto +app = Flask(__name__) + +mean_record_="" +dummy_="" +net_="" +pixel_length_=0 + + +def buildModel(argv): + model = Sequential(model_name_,argv) + + model.add(Convolution2D(32, 5, 1, 2, w_std=0.0001, b_lr=2)) + model.add(MaxPooling2D(pool_size=(3,3), stride=2)) + model.add(Activation('relu')) + model.add(LRN2D(3, alpha=0.00005, beta=0.75)) + + model.add(Convolution2D(32, 5, 1, 2, b_lr=2)) + model.add(Activation('relu')) + model.add(AvgPooling2D(pool_size=(3,3), stride=2)) + model.add(LRN2D(3, alpha=0.00005, beta=0.75)) + + model.add(Convolution2D(64, 5, 1, 2)) + model.add(Activation('relu')) + model.add(AvgPooling2D(pool_size=(3,3), stride=2)) + + #label_num_ should be the same with input data label num + model.add(Dense(label_num_, w_wd=250, b_lr=2, b_wd=0, activation='softmax')) + + sgd = SGD(decay=0.004, momentum=0.9, lr_type='manual', step=(0,60000,65000), step_lr=(0.001,0.0001,0.00001)) + + topo = Cluster(workspace_) + model.compile(loss='categorical_crossentropy', optimizer=sgd, cluster=topo) + + return model + + + +def generate_data_conf( + backend = 'kvfile', + batchsize = 1, + random = 5000, + shape = (3, 32, 32), + std = 127.5, + mean = 127.5 + ): + + # using cifar10 dataset + path_train =os.path.join(output_folder_ ,train_bin_file_name_) + path_test =os.path.join(output_folder_ ,test_bin_file_name_) + path_mean =os.path.join(output_folder_ ,mean_bin_file_name_) + + + store = Store(path=path_train, mean_file=path_mean, backend=backend, + random_skip=random, batchsize=batchsize, + shape=shape) + + data_train = Data(load='recordinput', phase='train', conf=store) + + store = Store(path=path_test, mean_file=path_mean, backend=backend, + batchsize=batchsize, + shape=shape) + + data_test = Data(load='recordinput', phase='test', conf=store) + + return data_train, data_test + + +def train(model): + + X_train, X_test= generate_data_conf(batchsize=batch_size_) + model.fit(X_train, nb_epoch=1000, with_test=True) + result = model.evaluate(X_test, test_steps=100, test_freq=300) + +def test(model): + pass + + +@app.route("/") +def index(): + return "Hello World! This is SINGA DLAAS! Please send post request with image=file to '/predict' " + +def allowed_file(filename): + return '.' in filename and \ + filename.rsplit('.', 1)[1] in allowd_extensions_ + +@app.route('/predict', methods=['POST']) +def predict(): + global pixel_length_,mean_record_,net_,dummy_ + if request.method == 'POST': + file = request.files['image'] + if file and allowed_file(file.filename): + im = Image.open(file).convert("RGB") + im = imgtool.resize_to_center(im,size_) + pixel = floatVector(pixel_length_) + byteArray = imgtool.toBin(im,size_) + for i in range(pixel_length_): + pixel[i]= byteArray[i]-mean_record_.data[i] + + #dummy data Layer + + shape = intVector(4) + shape[0]=1 + shape[1]=3 + shape[2]=size_[0] + shape[3]=size_[1] + dummy_.Feed(shape,pixel,0) + + #checkpoint_paths =getattr(m.jobconf, 'checkpoint_path') + checkpoint_paths = strVector(1) + checkpoint_paths[0]=check_point_path_ + net_.Load(checkpoint_paths) + + print "1" + dummyVector=layerVector(1) + dummyVector[0]=dummy_.ToLayer() + print len(net_.layers()) + for i,layer in enumerate(net_.layers()): + #skip data layer + if i==0: + continue + elif i==1: + layer.ComputeFeature(4,dummyVector) + else: + layer.ComputeFeature(4,net_.srclayers(layer)) + + #get result + lastLayer=net_.layers()[-1] + data = lastLayer.data(dummy_.ToLayer()) + prop =floatArray_frompointer(data.mutable_cpu_data()) + result=[] + for i in range(label_num_): + result.append((i,prop[i])) + + result.sort(key=lambda tup: tup[1], reverse=True) + + label_map=dict() + for item in label_list_: + label_map[item[0]]=item[1] + response="" + for r in result: + response+=str(label_map[r[0]])+str(r[1]) + + return response + return "error" + +def product(model): + global pixel_length_,mean_record_,net_,dummy_ + #fake data layer + X_train, X_test= generate_data_conf() + + model.layers.insert(0,X_test) + model.build() + #register layers + d = Driver() + d.Init(sys.argv) + net_ = NeuralNet.CreateFromStr(model.jobconf.neuralnet.SerializeToString()) + + + pixel_length_ = 3*size_[0]*size_[1] + + #minus mean and feed data + key,mean_str = kvstore.FileStore().open(os.path.join(output_folder_,mean_bin_file_name_),"read").read() + mean_record_ = RecordProto() + mean_record_.ParseFromString(mean_str) + assert len(mean_record_.data)==pixel_length_ + + dummy_ = DummyLayer() + + + app.debug = True + app.run(host='0.0.0.0', port=80) + + +if __name__=='__main__': + + sys.argv.append("-singa_conf") + sys.argv.append("/usr/src/incubator-singa/conf/singa.conf") + model=buildModel(sys.argv) + product(model) diff --git a/examples/cifar10_mesos/main.pyc b/examples/cifar10_mesos/main.pyc new file mode 100644 index 0000000000..0a796657b9 Binary files /dev/null and b/examples/cifar10_mesos/main.pyc differ diff --git a/examples/cifar10_mesos/singa.conf b/examples/cifar10_mesos/singa.conf new file mode 100644 index 0000000000..20cff98d71 --- /dev/null +++ b/examples/cifar10_mesos/singa.conf @@ -0,0 +1,7 @@ +# point to your active zookeeper service +# this is comma separated host:port pairs, each corresponding to a zk server +# e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" +zookeeper_host: "localhost:2181" + +# set if you want to change log directory +log_dir: "/tmp/singa-log/" diff --git a/examples/cifar10_mesos/test_executor.py b/examples/cifar10_mesos/test_executor.py new file mode 100755 index 0000000000..c5a2615a56 --- /dev/null +++ b/examples/cifar10_mesos/test_executor.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import threading +import time +import glob + +MESOS_ROOT="/home/aaron/Softs/mesos-0.27.0/build" +for egg in glob.glob(os.path.join(MESOS_ROOT,'src','python','dist','*.egg')): + sys.path.append(os.path.abspath(egg)) + + +import mesos.interface +from mesos.interface import mesos_pb2 +import mesos.native + +class MyExecutor(mesos.interface.Executor): + def launchTask(self, driver, task): + # Create a thread to run the task. Tasks should always be run in new + # threads or processes, rather than inside launchTask itself. + def run_task(): + print "Running task %s" % task.task_id.value + update = mesos_pb2.TaskStatus() + update.task_id.value = task.task_id.value + update.state = mesos_pb2.TASK_RUNNING + update.data = 'data with a \0 byte' + driver.sendStatusUpdate(update) + + # This is where one would perform the requested task. + + print "Sending status update..." + update = mesos_pb2.TaskStatus() + update.task_id.value = task.task_id.value + update.state = mesos_pb2.TASK_FINISHED + update.data = 'data with a \0 byte' + driver.sendStatusUpdate(update) + print "Sent status update" + + thread = threading.Thread(target=run_task) + thread.start() + + def frameworkMessage(self, driver, message): + # Send it back to the scheduler. + driver.sendFrameworkMessage(message) + +if __name__ == "__main__": + print "Starting executor" + driver = mesos.native.MesosExecutorDriver(MyExecutor()) + sys.exit(0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1) diff --git a/examples/cifar10_mesos/test_framework.py b/examples/cifar10_mesos/test_framework.py new file mode 100755 index 0000000000..f2efed9320 --- /dev/null +++ b/examples/cifar10_mesos/test_framework.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import time +import glob + +MESOS_ROOT="/home/aaron/Softs/mesos-0.27.0/build" +for egg in glob.glob(os.path.join(MESOS_ROOT,'src','python','dist','*.egg')): + sys.path.append(os.path.abspath(egg)) + +import mesos.interface +from mesos.interface import mesos_pb2 +import mesos.native + +TOTAL_TASKS = 5 + +TASK_CPUS = 1 +TASK_MEM = 128 + +class TestScheduler(mesos.interface.Scheduler): + def __init__(self, implicitAcknowledgements, executor): + self.implicitAcknowledgements = implicitAcknowledgements + self.executor = executor + self.taskData = {} + self.tasksLaunched = 0 + self.tasksFinished = 0 + self.messagesSent = 0 + self.messagesReceived = 0 + + def registered(self, driver, frameworkId, masterInfo): + print "Registered with framework ID %s" % frameworkId.value + + def resourceOffers(self, driver, offers): + for offer in offers: + tasks = [] + offerCpus = 0 + offerMem = 0 + for resource in offer.resources: + if resource.name == "cpus": + offerCpus += resource.scalar.value + elif resource.name == "mem": + offerMem += resource.scalar.value + + print "Received offer %s with cpus: %s and mem: %s" \ + % (offer.id.value, offerCpus, offerMem) + + remainingCpus = offerCpus + remainingMem = offerMem + + while self.tasksLaunched < TOTAL_TASKS and \ + remainingCpus >= TASK_CPUS and \ + remainingMem >= TASK_MEM: + tid = self.tasksLaunched + self.tasksLaunched += 1 + + print "Launching task %d using offer %s" \ + % (tid, offer.id.value) + + task = mesos_pb2.TaskInfo() + task.task_id.value = str(tid) + task.slave_id.value = offer.slave_id.value + task.name = "task %d" % tid + task.executor.MergeFrom(self.executor) + + cpus = task.resources.add() + cpus.name = "cpus" + cpus.type = mesos_pb2.Value.SCALAR + cpus.scalar.value = TASK_CPUS + + mem = task.resources.add() + mem.name = "mem" + mem.type = mesos_pb2.Value.SCALAR + mem.scalar.value = TASK_MEM + + tasks.append(task) + self.taskData[task.task_id.value] = ( + offer.slave_id, task.executor.executor_id) + + remainingCpus -= TASK_CPUS + remainingMem -= TASK_MEM + + operation = mesos_pb2.Offer.Operation() + operation.type = mesos_pb2.Offer.Operation.LAUNCH + operation.launch.task_infos.extend(tasks) + + driver.acceptOffers([offer.id], [operation]) + + def statusUpdate(self, driver, update): + print "Task %s is in state %s" % \ + (update.task_id.value, mesos_pb2.TaskState.Name(update.state)) + + # Ensure the binary data came through. + if update.data != "data with a \0 byte": + print "The update data did not match!" + print " Expected: 'data with a \\x00 byte'" + print " Actual: ", repr(str(update.data)) + sys.exit(1) + + if update.state == mesos_pb2.TASK_FINISHED: + self.tasksFinished += 1 + if self.tasksFinished == TOTAL_TASKS: + print "All tasks done, waiting for final framework message" + + slave_id, executor_id = self.taskData[update.task_id.value] + + self.messagesSent += 1 + driver.sendFrameworkMessage( + executor_id, + slave_id, + 'data with a \0 byte') + + if update.state == mesos_pb2.TASK_LOST or \ + update.state == mesos_pb2.TASK_KILLED or \ + update.state == mesos_pb2.TASK_FAILED: + print "Aborting because task %s is in unexpected state %s with message '%s'" \ + % (update.task_id.value, mesos_pb2.TaskState.Name(update.state), update.message) + driver.abort() + + # Explicitly acknowledge the update if implicit acknowledgements + # are not being used. + if not self.implicitAcknowledgements: + driver.acknowledgeStatusUpdate(update) + + def frameworkMessage(self, driver, executorId, slaveId, message): + self.messagesReceived += 1 + + # The message bounced back as expected. + if message != "data with a \0 byte": + print "The returned message data did not match!" + print " Expected: 'data with a \\x00 byte'" + print " Actual: ", repr(str(message)) + sys.exit(1) + print "Received message:", repr(str(message)) + + if self.messagesReceived == TOTAL_TASKS: + if self.messagesReceived != self.messagesSent: + print "Sent", self.messagesSent, + print "but received", self.messagesReceived + sys.exit(1) + print "All tasks done, and all messages received, exiting" + driver.stop() + +if __name__ == "__main__": + if len(sys.argv) != 2: + print "Usage: %s master" % sys.argv[0] + sys.exit(1) + + executor = mesos_pb2.ExecutorInfo() + executor.executor_id.value = "default" + executor.command.value = os.path.abspath("./test_executor.py") + executor.name = "Test Executor (Python)" + executor.source = "python_test" + + framework = mesos_pb2.FrameworkInfo() + framework.user = "" # Have Mesos fill in the current user. + framework.name = "Test Framework (Python)" + framework.checkpoint = True + + implicitAcknowledgements = 1 + if os.getenv("MESOS_EXPLICIT_ACKNOWLEDGEMENTS"): + print "Enabling explicit status update acknowledgements" + implicitAcknowledgements = 0 + + if os.getenv("MESOS_AUTHENTICATE"): + print "Enabling authentication for the framework" + + if not os.getenv("DEFAULT_PRINCIPAL"): + print "Expecting authentication principal in the environment" + sys.exit(1); + + credential = mesos_pb2.Credential() + credential.principal = os.getenv("DEFAULT_PRINCIPAL") + + if os.getenv("DEFAULT_SECRET"): + credential.secret = os.getenv("DEFAULT_SECRET") + + framework.principal = os.getenv("DEFAULT_PRINCIPAL") + + driver = mesos.native.MesosSchedulerDriver( + TestScheduler(implicitAcknowledgements, executor), + framework, + sys.argv[1], + implicitAcknowledgements, + credential) + else: + framework.principal = "test-framework-python" + + driver = mesos.native.MesosSchedulerDriver( + TestScheduler(implicitAcknowledgements, executor), + framework, + sys.argv[1], + implicitAcknowledgements) + + status = 0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1 + + # Ensure that the driver process terminates. + driver.stop(); + + sys.exit(status) diff --git a/examples/cifar10_py/cifarBin2img.py b/examples/cifar10_py/cifarBin2img.py new file mode 100644 index 0000000000..8a552ff242 --- /dev/null +++ b/examples/cifar10_py/cifarBin2img.py @@ -0,0 +1,67 @@ + +import sys, os +from numpy.core.test_rational import numerator + +SINGA_ROOT=os.path.join(os.path.dirname(__file__),'../','../') +sys.path.append(os.path.join(SINGA_ROOT,'tool','python')) +from singa.model import * +from singa.utils import imgtool +from PIL import Image +import cPickle + +def unpickle(file): + fo = open(file, 'rb') + dict = cPickle.load(fo) + fo.close() + return dict + +def test(): + ''' + test imgtool toBin and toImg + ''' + im = Image.open("dog.jpg").convert("RGB") + + byteArray=imgtool.toBin(im,(32,32)) + im2 = imgtool.toImg(byteArray,(32,32)) + + im2.save("dog2.jpg", "JPEG") + + +def getLabelMap(path): + d = unpickle(path) + label_map=dict() + for index,line in numerator(d["label_names"]): + print index,line + label_map[index]=line + return label_map + +def generateImage(input_path,output_path,label_map,random): + dict=unpickle(input_path) + data=dict["data"] + labels=dict["labels"] + for index,d in numerator(data): + im = imgtool.toImg(data[index],(32,32)) + temp_folder=os.path.join(output_path,label_map[labels[index]]) + try: + os.stat(temp_folder) + except: + os.makedirs(temp_folder) + im.save(os.path.join(temp_folder,random+"_"+str(index)+".jpg"),"JPEG") + #print labels + +def main(): + label_map=getLabelMap("data/batches.meta") + generateImage("data/data_batch_1", "data/output",label_map,"1") + generateImage("data/data_batch_2", "data/output",label_map,"2") + generateImage("data/data_batch_3", "data/output",label_map,"3") + generateImage("data/data_batch_4", "data/output",label_map,"4") + generateImage("data/data_batch_5", "data/output",label_map,"5") + generateImage("data/test_batch", "data/output",label_map,"6") + +if __name__=='__main__': + main() + + + + + diff --git a/examples/cifar10_py/main.py b/examples/cifar10_py/main.py new file mode 100755 index 0000000000..53f79f927a --- /dev/null +++ b/examples/cifar10_py/main.py @@ -0,0 +1,284 @@ +#!/usr/bin/env python +#/************************************************************ +#* +#* Licensed to the Apache Software Foundation (ASF) under one +#* or more contributor license agreements. See the NOTICE file +#* distributed with this work for additional information +#* regarding copyright ownership. The ASF licenses this file +#* to you under the Apache License, Version 2.0 (the +#* "License"); you may not use this file except in compliance +#* with the License. You may obtain a copy of the License at +#* +#* http://www.apache.org/licenses/LICENSE-2.0 +#* +#* Unless required by applicable law or agreed to in writing, +#* software distributed under the License is distributed on an +#* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +#* KIND, either express or implied. See the License for the +#* specific language governing permissions and limitations +#* under the License. +#* +#*************************************************************/ + +#************** +#*sudo apt-get install libjpeg-dev +#*sudo pip install +#* flask pillow protobuf + + + +from PIL import Image +import sys, glob, os, random, shutil, time +from flask import Flask, request, redirect, url_for +#all the settings +current_path_ = os.path.dirname(__file__) + +singa_root_=os.path.abspath(os.path.join(current_path_,'../..')) + +#data prepare settings +input_folder_=os.path.abspath(os.path.join(current_path_,"data/raw")) +output_folder_=os.path.abspath(os.path.join(current_path_,"data/out")) +temp_folder_=os.path.abspath(os.path.join(current_path_,"data/temp")) + +meta_file_name_="meta.txt" +train_bin_file_name_="train.bin" +test_bin_file_name_="test.bin" +validate_bin_file_name_="validate.bin" +mean_bin_file_name_="mean.bin" +label_list_=[(0,"airplane"), + (1,"truck"), + (2,"ship"), + (3,"dog"), + (4,"cat"), + (5,"deer"), + (6,"bird"), + (7,"automobile"), + (8,"horse"), + (9,"frog")] + +#image size +size_=(32,32) + +#final label numbers +total_record_num_=60000 +label_num_=10 + +#data partial +train_rate_=5.0/6 +test_rate_=1.0/6 +validate_rate_=0.0 + +#training settings +model_name_="cifar10-cnn" +workspace_="examples/cifar10_py" +batch_size_=64 +check_point_path_=workspace_+"/checkpoint/step1000-worker0" + + +allowd_extensions_ = set(['txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif']) + +#singa python libs +sys.path.append(os.path.join(singa_root_,'tool','python')) +from singa.driver import NeuralNet,Driver,strVector,layerVector,intVector,floatVector,DummyLayer,Layer,floatArray_frompointer +from singa.model import * +from singa.utils import kvstore, imgtool +from pb2.common_pb2 import RecordProto +app = Flask(__name__) + +mean_record_="" +dummy_="" +net_="" +pixel_length_=0 + + +def buildModel(argv): + model = Sequential(model_name_,argv) + + model.add(Convolution2D(32, 5, 1, 2, w_std=0.0001, b_lr=2)) + model.add(MaxPooling2D(pool_size=(3,3), stride=2)) + model.add(Activation('relu')) + model.add(LRN2D(3, alpha=0.00005, beta=0.75)) + + model.add(Convolution2D(32, 5, 1, 2, b_lr=2)) + model.add(Activation('relu')) + model.add(AvgPooling2D(pool_size=(3,3), stride=2)) + model.add(LRN2D(3, alpha=0.00005, beta=0.75)) + + model.add(Convolution2D(64, 5, 1, 2)) + model.add(Activation('relu')) + model.add(AvgPooling2D(pool_size=(3,3), stride=2)) + + #label_num_ should be the same with input data label num + model.add(Dense(label_num_, w_wd=250, b_lr=2, b_wd=0, activation='softmax')) + + sgd = SGD(decay=0.004, momentum=0.9, lr_type='manual', step=(0,60000,65000), step_lr=(0.001,0.0001,0.00001)) + + topo = Cluster(workspace_) + model.compile(loss='categorical_crossentropy', optimizer=sgd, cluster=topo) + + return model + + + +def generate_data_conf( + backend = 'kvfile', + batchsize = 1, + random = 5000, + shape = (3, 32, 32), + std = 127.5, + mean = 127.5 + ): + + # using cifar10 dataset + path_train =os.path.join(output_folder_ ,train_bin_file_name_) + path_test =os.path.join(output_folder_ ,test_bin_file_name_) + path_mean =os.path.join(output_folder_ ,mean_bin_file_name_) + + + store = Store(path=path_train, mean_file=path_mean, backend=backend, + random_skip=random, batchsize=batchsize, + shape=shape) + + data_train = Data(load='recordinput', phase='train', conf=store) + + store = Store(path=path_test, mean_file=path_mean, backend=backend, + batchsize=batchsize, + shape=shape) + + data_test = Data(load='recordinput', phase='test', conf=store) + + return data_train, data_test + + +def train(model): + + X_train, X_test= generate_data_conf(batchsize=batch_size_) + model.fit(X_train, nb_epoch=1000, with_test=True) + result = model.evaluate(X_test, test_steps=100, test_freq=300) + +def test(model): + pass + + +@app.route("/") +def index(): + return "Hello World! This is SINGA DLAAS! Please send post request with image=file to '/predict' " + +def allowed_file(filename): + return '.' in filename and \ + filename.rsplit('.', 1)[1] in allowd_extensions_ + +@app.route('/predict', methods=['POST']) +def predict(): + global pixel_length_,mean_record_,net_,dummy_ + if request.method == 'POST': + file = request.files['image'] + if file and allowed_file(file.filename): + im = Image.open(file).convert("RGB") + im = imgtool.resize_to_center(im,size_) + pixel = floatVector(pixel_length_) + byteArray = imgtool.toBin(im,size_) + for i in range(pixel_length_): + pixel[i]= byteArray[i]-mean_record_.data[i] + + #dummy data Layer + + shape = intVector(3) + shape[0]=3 + shape[1]=size_[0] + shape[2]=size_[1] + dummy_.Feed(1,shape,pixel) + + #checkpoint_paths =getattr(m.jobconf, 'checkpoint_path') + checkpoint_paths = strVector(1) + checkpoint_paths[0]=check_point_path_ + net_.Load(checkpoint_paths) + + print "1" + dummyVector=layerVector(1) + dummyVector[0]=dummy_.ToLayer() + print len(net_.layers()) + for i,layer in enumerate(net_.layers()): + #skip data layer + if i==0: + continue + elif i==1: + layer.ComputeFeature(4,dummyVector) + else: + layer.ComputeFeature(4,net_.srclayers(layer)) + + #get result + lastLayer=net_.layers()[-1] + data = lastLayer.data(dummy_.ToLayer()) + prop =floatArray_frompointer(data.mutable_cpu_data()) + result=[] + for i in range(label_num_): + result.append((i,prop[i])) + + result.sort(key=lambda tup: tup[1], reverse=True) + + label_map=dict() + for item in label_list_: + label_map[item[0]]=item[1] + response="" + for r in result: + response+=str(label_map[r[0]])+str(r[1]) + + return response + return "error" + +def product(model): + global pixel_length_,mean_record_,net_,dummy_ + #fake data layer + X_train, X_test= generate_data_conf() + + model.layers.insert(0,X_test) + model.build() + #register layers + d = Driver() + d.Init(sys.argv) + net_ = NeuralNet.CreateForTest(model.jobconf.neuralnet.SerializeToString()) + + + pixel_length_ = 3*size_[0]*size_[1] + + #minus mean and feed data + key,mean_str = kvstore.FileStore().open(os.path.join(output_folder_,mean_bin_file_name_),"read").read() + mean_record_ = RecordProto() + mean_record_.ParseFromString(mean_str) + assert len(mean_record_.data)==pixel_length_ + + dummy_ = DummyLayer() + + + app.debug = True + app.run() + + +if __name__=='__main__': + + print "please use -transform -data -test -product to specify different task" + + if "-transform" in sys.argv: + total_record_num_=imgtool.transform_img(input_folder_,temp_folder_,size_) + if "-data" in sys.argv: + label_list_=imgtool.generate_kvrecord_data(temp_folder_, + output_folder_, + size_, + train_num=int(total_record_num_*train_rate_), + test_num=int(total_record_num_*test_rate_), + validate_num=int(total_record_num_*validate_rate_), + meta_file_name=meta_file_name_, + train_bin_file_name=train_bin_file_name_, + test_bin_file_name=test_bin_file_name_, + validate_bin_file_name=validate_bin_file_name_, + mean_bin_file_name=mean_bin_file_name_ + ) + label_num_=len(label_list_) + model=buildModel(sys.argv) + if "-train" in sys.argv: + train(model) + elif "-test" in sys.argv: + test(model) + elif "-product" in sys.argv: + product(model) diff --git a/include/singa/neuralnet/input_layer.h b/include/singa/neuralnet/input_layer.h index e701eecd46..26f3548957 100644 --- a/include/singa/neuralnet/input_layer.h +++ b/include/singa/neuralnet/input_layer.h @@ -200,6 +200,7 @@ class RNNLabelLayer : public InputLayer { void Setup(const LayerProto& proto, const vector& srclayers); void ComputeFeature(int flag, const vector& srclayers); }; + /****************Deprecated layers******************/ /** * @deprecated please use the StoreInputLayer. diff --git a/include/singa/neuralnet/layer.h b/include/singa/neuralnet/layer.h index c1612a2885..ce47b47c5a 100644 --- a/include/singa/neuralnet/layer.h +++ b/include/singa/neuralnet/layer.h @@ -57,6 +57,7 @@ inline const string AddPrefixSuffix(int unroll_idx, int partition_idx, * Layer::ComputeFeature() and Layer::ComputGradient() * functions in accordance with the NeuralNet::TrainOneBatch function. */ + class Layer { public: /** @@ -69,6 +70,14 @@ class Layer { Layer() {} virtual ~Layer() {} + + /** + * Create for python binding, production test mode + * + */ + static Layer* CreateLayer(const string str); + static void SetupLayer(Layer* layer, const string str, const vector& srclayers); + /** * Setup layer properties. * @@ -84,6 +93,8 @@ class Layer { datavec_.push_back(&data_); gradvec_.push_back(&grad_); } + + /** * Compute features of this layer based on connected layers. * @@ -108,6 +119,7 @@ class Layer { virtual const std::vector GetParams() const { return std::vector {}; } + virtual void SetParams(std::vector) {} /** * Return the connection type between one neuron of this layer and its source * layer. diff --git a/include/singa/neuralnet/neuralnet.h b/include/singa/neuralnet/neuralnet.h index 33ad38cb1c..33899c7e0e 100644 --- a/include/singa/neuralnet/neuralnet.h +++ b/include/singa/neuralnet/neuralnet.h @@ -58,6 +58,13 @@ class NeuralNet { static NeuralNet* Create(const NetProto& net_conf, Phase phase, int npartitions); + /** + * Create for python binding, production test mode + * + */ + static NeuralNet* CreateNeuralNet(const string str); + NeuralNet() {}; + static const NetProto Unrolling(const NetProto& net_conf); /** * construct the net structure from protocol buffer. @@ -66,6 +73,7 @@ class NeuralNet { */ NeuralNet(NetProto net_conf, int num_partitions); ~NeuralNet(); + /** * Load net params from checkpoint fiels. * @param path checkpoint files diff --git a/include/singa/neuralnet/neuron_layer.h b/include/singa/neuralnet/neuron_layer.h index f03e91bdb6..815ad9d11c 100644 --- a/include/singa/neuralnet/neuron_layer.h +++ b/include/singa/neuralnet/neuron_layer.h @@ -122,13 +122,17 @@ class DropoutLayer : public NeuronLayer { */ class DummyLayer: public NeuronLayer { public: + void Setup(const std::string str, const vector& srclayers); void Setup(const LayerProto& proto, const vector& srclayers) override; void ComputeFeature(int flag, const vector& srclayers) override; void ComputeGradient(int flag, const vector& srclayers) override; + void Feed(vector shape, vector* data, int op); + Layer* ToLayer() { return this;} private: bool input_ = false; // use as input layer bool output_ = false; // use as output layer + int batchsize_ = 1; // use for input layer }; /** @@ -224,6 +228,11 @@ class InnerProductLayer : public NeuronLayer { return params; } + void SetParams(std::vector params) { + weight_ = params.at(0); + bias_ = params.at(1); + } + private: int batchsize_; int vdim_, hdim_; @@ -269,7 +278,7 @@ class PoolingLayer : public NeuronLayer { int kernel_x_, pad_x_, stride_x_; int kernel_y_, pad_y_, stride_y_; int batchsize_, channels_, height_, width_, pooled_height_, pooled_width_; - PoolingProto_PoolMethod pool_; + PoolMethod pool_; }; /** * Use book-keeping for BP following Caffe's pooling implementation diff --git a/include/singa/utils/param.h b/include/singa/utils/param.h index fcaaeb76df..319f2b4300 100644 --- a/include/singa/utils/param.h +++ b/include/singa/utils/param.h @@ -155,6 +155,7 @@ class Param { * Init param values from checkpoint blob. */ void FromProto(const BlobProto& blob); + void FromProto(const std::string str); /** * Dump param values to blob. */ @@ -211,6 +212,7 @@ class Param { /** * @return num of parameters in this Param obj. */ + inline const std::vector& shape() const { return data_.shape(); } inline int size() const { return data_.count(); } inline const Blob& data() const { return data_; } inline Blob* mutable_data() { return &data_; } diff --git a/include/singa/utils/updater.h b/include/singa/utils/updater.h index b14f72b7a4..33ad8a7ec2 100644 --- a/include/singa/utils/updater.h +++ b/include/singa/utils/updater.h @@ -22,10 +22,13 @@ #ifndef SINGA_UTILS_UPDATER_H_ #define SINGA_UTILS_UPDATER_H_ +#include #include "singa/proto/job.pb.h" #include "singa/utils/param.h" +#include "singa/neuralnet/layer.h" namespace singa { +using std::string; /** * Base learning rate generator. * @@ -87,6 +90,11 @@ class InvTLRGen : public LRGenerator { */ class Updater { public: + + /* added for python binding */ + static Updater* CreateUpdater(const string str); + /* ------------------------ */ + static Updater* Create(const UpdaterProto& proto); virtual ~Updater() {} diff --git a/include/singa/worker.h b/include/singa/worker.h index 34c8000371..d53e54ba41 100644 --- a/include/singa/worker.h +++ b/include/singa/worker.h @@ -61,6 +61,7 @@ class Worker { * * @return a pointer to the instance of the Worker subclass. */ + static Worker* CreateWorker(const std::string str); static Worker* Create(const AlgProto& conf); virtual ~Worker(); /** @@ -129,6 +130,7 @@ class Worker { * initialized. */ void InitNetParams(const JobProto& job_conf, NeuralNet* net); + void InitNetParams(const std::string& folder, vector net); /** * Checkpoint all Param objects owned by the worker onto disk. * The serialization is done using BlobProtos which includes the name, version @@ -140,6 +142,7 @@ class Worker { * @param net the training net whose Param objects will be dumped. */ void Checkpoint(int step, const std::string& folder, NeuralNet* net); + void Checkpoint(int step, const std::string& folder, vector net); /** * Train one mini-batch. * Test/Validation is done before training. diff --git a/src/driver.cc b/src/driver.cc index 6163865620..1c8f36665b 100644 --- a/src/driver.cc +++ b/src/driver.cc @@ -90,6 +90,7 @@ void Driver::Init(int argc, char **argv) { RegisterLayer(kCConvolution); RegisterLayer(kCPooling); RegisterLayer(kEmbedding); + RegisterLayer(kActivation); #ifdef USE_CUDNN RegisterLayer(kCudnnActivation); diff --git a/src/neuralnet/layer.cc b/src/neuralnet/layer.cc index cb1f3b8b78..ef1629fc8e 100644 --- a/src/neuralnet/layer.cc +++ b/src/neuralnet/layer.cc @@ -19,7 +19,11 @@ * *************************************************************/ +#include "singa/worker.h" #include "singa/neuralnet/layer.h" +#include "singa/neuralnet/input_layer.h" +#include "singa/neuralnet/neuron_layer.h" +#include "singa/neuralnet/loss_layer.h" #include #include @@ -33,6 +37,20 @@ namespace singa { using std::string; +void Layer::SetupLayer(Layer* layer, const string str, const vector& srclayers) { + LayerProto layer_conf; + layer_conf.ParseFromString(str); + layer->Setup(layer_conf, srclayers); + for (auto param : layer->GetParams()) + param->InitValues(); +} + +Layer* Layer::CreateLayer(const string str) { + LayerProto layer_conf; + layer_conf.ParseFromString(str); + return Layer::Create(layer_conf); +} + Layer* Layer::Create(const LayerProto& proto) { auto* factory = Singleton>::Instance(); Layer* layer = nullptr; diff --git a/src/neuralnet/loss_layer/softmax.cc b/src/neuralnet/loss_layer/softmax.cc index 79564706ec..9d0cb1dda5 100644 --- a/src/neuralnet/loss_layer/softmax.cc +++ b/src/neuralnet/loss_layer/softmax.cc @@ -98,6 +98,7 @@ void SoftmaxLossLayer::ComputeGradient(int flag, Tensor gsrc(gsrcptr, Shape1(gsrcblob->count())); gsrc *= scale_ / (1.0f * batchsize_); } + const std::string SoftmaxLossLayer::ToString(bool debug, int flag) { if (debug) return Layer::ToString(debug, flag); diff --git a/src/neuralnet/neuralnet.cc b/src/neuralnet/neuralnet.cc index b045e0600c..226d8d97a8 100644 --- a/src/neuralnet/neuralnet.cc +++ b/src/neuralnet/neuralnet.cc @@ -58,6 +58,12 @@ const NetProto NetConfPreprocess(const NetProto& conf) { return proto; } +NeuralNet* NeuralNet::CreateNeuralNet(const string str) { + NetProto net_conf; + net_conf.ParseFromString(str); + return NeuralNet::Create(net_conf,singa::kTest,1); +} + NeuralNet* NeuralNet::Create(const NetProto& net_conf, Phase phase, int npartitions) { const NetProto& full_net_conf = NetConfPreprocess(net_conf); diff --git a/src/neuralnet/neuron_layer/dummy.cc b/src/neuralnet/neuron_layer/dummy.cc index 936bb5ed83..a3bec97b38 100644 --- a/src/neuralnet/neuron_layer/dummy.cc +++ b/src/neuralnet/neuron_layer/dummy.cc @@ -27,6 +27,13 @@ namespace singa { +void DummyLayer::Setup(const std::string str, + const vector& srclayers) { + LayerProto conf; + conf.ParseFromString(str); + DummyLayer::Setup(conf, srclayers); +} + void DummyLayer::Setup(const LayerProto& proto, const vector& srclayers) { NeuronLayer::Setup(proto, srclayers); @@ -71,4 +78,53 @@ void DummyLayer::ComputeGradient(int flag, const vector& srclayers) { Copy(grad_, srclayers[0]->mutable_grad(this)); } +void DummyLayer::Feed(vector shape, vector* data, int op){ + + //batchsize_ = batchsize; + batchsize_ = shape[0]; + // dataset + if (op == 0) { + /* + size_t hdim = 1; + for (size_t i = 1; i < shape.size(); ++i) + hdim *= shape[i]; + //data_.Reshape({batchsize, (int)hdim}); + //shape.insert(shape.begin(),batchsize); + data_.Reshape(shape); + */ + //reshape data + data_.Reshape(shape); + CHECK_EQ(data_.count(), data->size()); + + int size = data->size(); + float* ptr = data_.mutable_cpu_data(); + for (int i = 0; i< size; i++) { + ptr[i] = data->at(i); + } + } + // label + else { + aux_data_.resize(batchsize_); + for (int i = 0; i< batchsize_; i++) { + aux_data_[i] = static_cast(data->at(i)); + } + } + + return; + + /* Wenfeng's input + batchsize_ = batchsize; + shape.insert(shape.begin(),batchsize); + data_.Reshape(shape); + + int size = data_.count() / batchsize_; + CHECK_EQ(size, data->size()); + float* ptr = data_.mutable_cpu_data(); + for (int i = 0; i< size; i++) + ptr[i] = data->at(i); + + return; + */ +} + } // namespace singa diff --git a/src/neuralnet/neuron_layer/inner_product.cc b/src/neuralnet/neuron_layer/inner_product.cc index 1e5e93e97b..a7378a2a0b 100644 --- a/src/neuralnet/neuron_layer/inner_product.cc +++ b/src/neuralnet/neuron_layer/inner_product.cc @@ -83,5 +83,7 @@ void InnerProductLayer::ComputeGradient(int flag, else MMDot(grad_, weight_->data(), srclayers[0]->mutable_grad(this)); } + //clee auto w = weight_->mutable_cpu_data(); + //LOG(ERROR) << srclayers[0]->name() << " " << w[0]; } } // namespace singa diff --git a/src/neuralnet/neuron_layer/pooling.cc b/src/neuralnet/neuron_layer/pooling.cc index 4eda2e4097..d20b862a0f 100644 --- a/src/neuralnet/neuron_layer/pooling.cc +++ b/src/neuralnet/neuron_layer/pooling.cc @@ -58,8 +58,8 @@ void PoolingLayer::Setup(const LayerProto& conf, } pool_ = conf.pooling_conf().pool(); - CHECK(pool_ == PoolingProto_PoolMethod_AVG - || pool_ == PoolingProto_PoolMethod_MAX) + CHECK(pool_ == AVG + || pool_ == MAX) << "Padding implemented only for average and max pooling."; const auto& srcshape = srclayers[0]->data(this).shape(); int dim = srcshape.size(); @@ -83,9 +83,9 @@ void PoolingLayer::Setup(const LayerProto& conf, void PoolingLayer::ComputeFeature(int flag, const vector& srclayers) { auto src = Tensor4(srclayers[0]->mutable_data(this)); auto data = Tensor4(&data_); - if (pool_ == PoolingProto_PoolMethod_MAX) + if (pool_ == MAX) data = expr::pool(src, kernel_x_, stride_x_); - else if (pool_ == PoolingProto_PoolMethod_AVG) + else if (pool_ == AVG) data = expr::pool(src, kernel_x_, stride_x_) * (1.0f / (kernel_x_ * kernel_x_)); } @@ -99,9 +99,9 @@ void PoolingLayer::ComputeGradient(int flag, const vector& srclayers) { auto gsrc = Tensor4(srclayers[0]->mutable_grad(this)); auto data = Tensor4(&data_); auto grad = Tensor4(&grad_); - if (pool_ == PoolingProto_PoolMethod_MAX) + if (pool_ == MAX) gsrc = expr::unpool(src, data, grad, kernel_x_, stride_x_); - else if (pool_ == PoolingProto_PoolMethod_AVG) + else if (pool_ == AVG) gsrc = expr::unpool(src, data, grad, kernel_x_, stride_x_) * (1.0f / (kernel_x_ * kernel_x_)); } @@ -111,16 +111,16 @@ void PoolingLayer::ComputeGradient(int flag, const vector& srclayers) { void CPoolingLayer::Setup(const LayerProto& conf, const vector& srclayers) { PoolingLayer::Setup(conf, srclayers); - if (pool_ == PoolingProto_PoolMethod_MAX) + if (pool_ == MAX) mask_.ReshapeLike(data_); } void CPoolingLayer::ComputeFeature(int flag, const vector& srclayers) { - if (pool_ == PoolingProto_PoolMethod_MAX) + if (pool_ == MAX) ForwardMaxPooling(srclayers[0]->mutable_data(this)->mutable_cpu_data(), batchsize_, channels_, height_, width_, kernel_y_, kernel_x_, pad_y_, pad_y_, stride_y_, stride_x_, data_.mutable_cpu_data(), mask_.mutable_cpu_data()); - else if (pool_ == PoolingProto_PoolMethod_AVG) + else if (pool_ == AVG) ForwardAvgPooling(srclayers[0]->mutable_data(this)->mutable_cpu_data(), batchsize_, channels_, height_, width_, kernel_y_, kernel_x_, pad_y_, pad_x_, stride_y_, stride_y_, data_.mutable_cpu_data()); @@ -129,12 +129,12 @@ void CPoolingLayer::ComputeFeature(int flag, const vector& srclayers) { } void CPoolingLayer::ComputeGradient(int flag, const vector& srclayers) { - if (pool_ == PoolingProto_PoolMethod_MAX) + if (pool_ == MAX) BackwardMaxPooling(grad_.cpu_data(), mask_.cpu_data(), batchsize_, channels_, height_, width_, kernel_y_, kernel_x_, pad_y_, pad_x_, stride_y_, stride_y_, srclayers[0]->mutable_grad(this)->mutable_cpu_data()); - else if (pool_ == PoolingProto_PoolMethod_AVG) + else if (pool_ == AVG) BackwardAvgPooling(grad_.cpu_data(), batchsize_, channels_, height_, width_, kernel_y_, kernel_x_, pad_y_, pad_x_, stride_y_, stride_x_, diff --git a/src/proto/job.proto b/src/proto/job.proto index 7bc0ea3b9b..aa8a905ee0 100644 --- a/src/proto/job.proto +++ b/src/proto/job.proto @@ -524,10 +524,6 @@ message LRNProto { message PoolingProto { // The kernel size (square) optional int32 kernel= 1 [default = 3]; - enum PoolMethod { - MAX = 0; - AVG = 1; - } // The pooling method optional PoolMethod pool = 30 [default = MAX]; // The padding size @@ -647,6 +643,7 @@ enum LayerType { kImagePreprocess = 101; kPrefetch = 102; kRecordInput = 103; + kDummyInput = 104; kLMDBData = 190; // deprecated kLabel = 191; // deprecated kMnist = 192; // deprecated @@ -676,6 +673,7 @@ enum LayerType { kSoftmax = 214; kGRU = 215; kEmbedding = 216; + kActivation = 217; // cudnn v3 kCudnnConv = 250; @@ -811,3 +809,9 @@ enum UnrollConnType { // customized connection type defined by src_conn kUnrollCustomized = 4; } + +enum PoolMethod { + MAX = 0; + AVG = 1; +} + diff --git a/src/utils/param.cc b/src/utils/param.cc index 158c77735d..5510f16312 100644 --- a/src/utils/param.cc +++ b/src/utils/param.cc @@ -199,6 +199,12 @@ void Param::ShareFrom(Param* other) { grad_.ShareData(&(other->grad_), false); } +void Param::FromProto(const string str) { + BlobProto blob; + blob.ParseFromString(str); + data_.FromProto(blob); +} + void Param::FromProto(const BlobProto& blob) { data_.FromProto(blob); } diff --git a/src/utils/updater.cc b/src/utils/updater.cc index fa051b11e4..a2180d380b 100644 --- a/src/utils/updater.cc +++ b/src/utils/updater.cc @@ -60,8 +60,8 @@ float StepLRGen::Get(int step) { // do not cast int to float int freq = proto_.step_conf().change_freq(); float lr = proto_.base_lr() * pow(proto_.step_conf().gamma(), step / freq); - LOG_IF(INFO, step % freq == 0) << "Update learning rate to " << lr - << " @ step " << step; + // LOG_IF(INFO, step % freq == 0) << "Update learning rate to " << lr + // << " @ step " << step; return lr; } @@ -96,6 +96,15 @@ Updater* Updater::Create(const UpdaterProto& proto) { return updater; } +/**************** added for Python Binding ***************************/ +Updater* Updater::CreateUpdater(const string str) { + UpdaterProto conf; + conf.ParseFromString(str); + return Updater::Create(conf); +} +/***********************Python Binding end**************************/ + + /***********************SGD with momentum******************************/ void Updater::Init(const UpdaterProto& proto) { momentum_ = proto.momentum(); diff --git a/src/worker.cc b/src/worker.cc index 2afa8b06fe..a6f1bbe3a8 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -35,6 +35,12 @@ namespace singa { using std::string; +Worker* Worker::CreateWorker(const string str) { + AlgProto alg_proto; + alg_proto.ParseFromString(str); + return Worker::Create(alg_proto); +} + Worker* Worker::Create(const AlgProto& conf) { auto factory = Singleton>::Instance(); Worker* worker = nullptr; @@ -160,6 +166,23 @@ void Worker::InitSockets(const NeuralNet* net) { } } +void Worker::InitNetParams(const std::string& folder, vector net) { + + std::unordered_map name2param; + for (auto layer : net) { + for (auto param : layer->GetParams()) { + // only owners fill the memory of parameter values. + //if (param->owner() == param->id()) { + CHECK(name2param.find(param->name()) == name2param.end()); + name2param[param->name()] = param; + //} + } + } + vector paths; + paths.push_back(folder); + NeuralNet::Load(paths, name2param); +} + void Worker::InitNetParams(const JobProto& job_conf, NeuralNet* net) { // for each server grp, its first subscriber worker grp does the param init if (grp_id_ % Cluster::Get()->nworker_groups_per_server_group() == 0) { @@ -215,6 +238,27 @@ void Worker::InitNetParams(const JobProto& job_conf, NeuralNet* net) { } } +void Worker::Checkpoint(int step, const std::string& folder, vector net) { + BlobProtos bps; + for (auto layer : net) { + //if (layer->partition_id() == id_) { + for (auto param : layer->GetParams()) { + // only owners fill the memory of parameter values. + //if (param->owner() == param->id()) { + auto *blob = bps.add_blob(); + param->ToProto(blob); + bps.add_version(param->version()); + bps.add_name(param->name()); + //} + } + //} + } + char buf[256]; + snprintf(buf, sizeof(buf), "%s/step%d-worker0", folder.c_str(), step); + LOG(INFO) << "checkpoint to " << buf; + WriteProtoToBinaryFile(bps, buf); +} + void Worker::Checkpoint(int step, const std::string& folder, NeuralNet* net) { BlobProtos bps; for (auto layer : net->layers()) { diff --git a/thirdparty/install.sh b/thirdparty/install.sh index 99403b9cae..1def4de00b 100755 --- a/thirdparty/install.sh +++ b/thirdparty/install.sh @@ -245,32 +245,32 @@ function install_opencv() function install_protobuf() { - if [ ! -e "protobuf-2.6.0.tar.gz" ] + if [ ! -e "protobuf-2.5.0.tar.gz" ] then - wget http://www.comp.nus.edu.sg/~dbsystem/singa/assets/file/thirdparty/protobuf-2.6.0.tar.gz; + wget http://www.comp.nus.edu.sg/~dbsystem/singa/assets/file/thirdparty/protobuf-2.5.0.tar.gz; fi - rm -rf protobuf-2.6.0; - tar zxvf protobuf-2.6.0.tar.gz && cd protobuf-2.6.0; + rm -rf protobuf-2.5.0; + tar zxvf protobuf-2.5.0.tar.gz && cd protobuf-2.5.0; if [ $# == 1 ] then echo "install protobuf in $1"; ./configure --prefix=$1; make && make install; - #cd python; - #python setup.py build; - #python setup.py install --prefix=$1; - #cd ..; + cd python; + python setup.py build; + python setup.py install --prefix=$1; + cd ..; elif [ $# == 0 ] then echo "install protobuf in default path"; ./configure; make && sudo make install; - #cd python; - #python setup.py build; - #sudo python setup.py install; - #cd ..; + cd python; + python setup.py build; + sudo python setup.py install; + cd ..; else echo "wrong commands"; fi diff --git a/tool/mesos/Makefile b/tool/mesos/Makefile index d64fb5b2bc..f8abd697f6 100644 --- a/tool/mesos/Makefile +++ b/tool/mesos/Makefile @@ -1,5 +1,5 @@ -CXX_FLAGS=-I ../include -std=c++11 -I /usr/local/include/hdfs -I ../../include -LD_FLAGS=-lmesos -lsinga -lhdfs3 +CXX_FLAGS=-I/home/aaron/.local/include -std=c++11 -I /home/aaron/Softs/hadoop-2.7.2/include -I /home/aaron/Softs/mesos-0.26.0/include -I /home/aaron/Softs/mesos-0.26.0/build/include -I ../../include +LD_FLAGS=-L/home/aaron/.local/lib -L /home/aaron/Projects/incubator-singa/.libs -L/home/aaron/Softs/mesos-0.26.0/build/lib -L/home/aaron/Softs/hadoop-2.7.2/lib/native $(JAVA_HOME)/jre/lib/amd64/server/libjvm.so -lmesos -lsinga -lhdfs -lprotobuf -lglog EXE=scheduler OBJS=singa_scheduler.o scheduler.pb.o PROTOS=scheduler.proto @@ -11,7 +11,7 @@ CXX=g++ all: $(PROTO_HDRS) $(EXE) $(PROTO_SRCS) $(PROTO_HDRS): $(PROTOS) - protoc --cpp_out=. $(PROTOS) + /home/aaron/.local/bin/protoc --cpp_out=. $(PROTOS) $(EXE): $(OBJS) $(CXX) -o $@ $(OBJS) $(LD_FLAGS) diff --git a/tool/mesos/scheduler.conf b/tool/mesos/scheduler.conf index 1ab5fbac4b..aa6096b568 100644 --- a/tool/mesos/scheduler.conf +++ b/tool/mesos/scheduler.conf @@ -1,3 +1,3 @@ -namenode: "node0:9000" -master: "node0:5050" -job_counter: 5 +namenode: "localhost:9000" +master: "localhost:5050" +job_counter: 41 diff --git a/tool/mesos/singa_scheduler.cc b/tool/mesos/singa_scheduler.cc index 408b6096c9..f6ef97e451 100644 --- a/tool/mesos/singa_scheduler.cc +++ b/tool/mesos/singa_scheduler.cc @@ -93,7 +93,7 @@ class SingaScheduler: public mesos::Scheduler { if (hdfsExists(hdfs_handle_, SINGA_CONFIG) != 0) LOG(ERROR) << SINGA_CONFIG << " is not found on HDFS. Please use -singa_conf flag to upload the file"; } else { - LOG(ERROR) << "Failed to connect to HDFS"; + LOG(ERROR) << "Failed to connect to HDFS"<< namenode; } ReadProtoFromTextFile(job_conf_file_.c_str(), &job_conf_); } @@ -110,7 +110,7 @@ class SingaScheduler: public mesos::Scheduler { : job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), is_running_(false), job_counter_(jc), task_counter_(0) { hdfs_handle_ = hdfs_connect(namenode); if (!hdfs_handle_ || !hdfs_overwrite(hdfs_handle_, SINGA_CONFIG, singa_conf)) - LOG(ERROR) << "Failed to connect to HDFS"; + LOG(ERROR) << "Failed to connect to HDFS"<< namenode; ReadProtoFromTextFile(job_conf_file_.c_str(), &job_conf_); } @@ -192,6 +192,7 @@ class SingaScheduler: public mesos::Scheduler { // write job_conf_file_ to /singa/job_id/job.conf char path[512]; snprintf(path, 512, "/singa/%d/job.conf", job_counter_); + LOG(INFO) << path <3}: '.format(i+1 + dataset_id*(x.shape[0]/batchsize)), + loss.display() + loss.ComputeGradient(i+1, sgd) diff --git a/tool/python/examples/train_mnist.py b/tool/python/examples/train_mnist.py new file mode 100755 index 0000000000..ed6bb63fc4 --- /dev/null +++ b/tool/python/examples/train_mnist.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python + +#/************************************************************ +#* +#* Licensed to the Apache Software Foundation (ASF) under one +#* or more contributor license agreements. See the NOTICE file +#* distributed with this work for additional information +#* regarding copyright ownership. The ASF licenses this file +#* to you under the Apache License, Version 2.0 (the +#* "License"); you may not use this file except in compliance +#* with the License. You may obtain a copy of the License at +#* +#* http://www.apache.org/licenses/LICENSE-2.0 +#* +#* Unless required by applicable law or agreed to in writing, +#* software distributed under the License is distributed on an +#* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +#* KIND, either express or implied. See the License for the +#* specific language governing permissions and limitations +#* under the License. +#* +#*************************************************************/ + +import os, sys, string +import numpy as np + +current_path_ = os.path.dirname(__file__) +singa_root_=os.path.abspath(os.path.join(current_path_,'../../..')) +sys.path.append(os.path.join(singa_root_,'tool','python')) + +from singa.driver import Driver +from singa.layer import * +from singa.model import * +from singa.utils.utility import swap32 +from google.protobuf.text_format import Merge + +''' +Example of MLP with MNIST dataset +''' + +def load_dataset(): + ''' + train-images: 4 int32 headers & int8 pixels + train-labels: 2 int32 headers & int8 labels + ''' + print '[Loading MNIST dataset]' + fname_train_data = "examples/mnist/train-images-idx3-ubyte" + fname_train_label = "examples/mnist/train-labels-idx1-ubyte" + info = swap32(np.fromfile(fname_train_data, dtype=np.uint32, count=4)) + nb_samples = info[1] + shape = (info[2],info[3]) + + x = np.fromfile(fname_train_data, dtype=np.uint8) + x = x[4*4:] # skip header + x = x.reshape(nb_samples, shape[0]*shape[1]) + print ' data x:', x.shape + y = np.fromfile(fname_train_label, dtype=np.uint8) + y = y[4*2:] # skip header + y = y.reshape(nb_samples, 1) + print ' label y:', y.shape + return x, y + +#------------------------------------------------------------------- +print '[Layer registration/declaration]' +d = Driver() +d.Init(sys.argv) + +input = Dummy() +label = Dummy() + +nn = [] +nn.append(input) +nn.append(Dense(2500, init='uniform')) +nn.append(Activation('stanh')) +nn.append(Dense(2000, init='uniform')) +nn.append(Activation('stanh')) +nn.append(Dense(1500, init='uniform')) +nn.append(Activation('stanh')) +nn.append(Dense(1000, init='uniform')) +nn.append(Activation('stanh')) +nn.append(Dense(500, init='uniform')) +nn.append(Activation('stanh')) +nn.append(Dense(10, init='uniform')) +loss = Loss('softmaxloss') + +# updater +sgd = SGD(lr=0.001, lr_type='step') + +#------------------------------------------------------------------- +print '[Start training]' +batchsize = 64 +disp_freq = 10 + +x, y = load_dataset() + +for i in range(x.shape[0] / batchsize): + xb, yb = x[i*batchsize:(i+1)*batchsize,:], y[i*batchsize:(i+1)*batchsize,:] + nn[0].Feed(xb) + label.Feed(yb, is_label=1) + for h in range(1, len(nn)): + nn[h].ComputeFeature(nn[h-1]) + loss.ComputeFeature(nn[-1], label) + if (i+1)%disp_freq == 0: + print ' Step {:>3}: '.format(i+1), + loss.display() + loss.ComputeGradient(i+1, sgd) diff --git a/tool/python/examples/user1-cifar10/main.py b/tool/python/examples/user1-cifar10/main.py new file mode 100755 index 0000000000..882a3b1591 --- /dev/null +++ b/tool/python/examples/user1-cifar10/main.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python + +#/************************************************************ +#* +#* Licensed to the Apache Software Foundation (ASF) under one +#* or more contributor license agreements. See the NOTICE file +#* distributed with this work for additional information +#* regarding copyright ownership. The ASF licenses this file +#* to you under the Apache License, Version 2.0 (the +#* "License"); you may not use this file except in compliance +#* with the License. You may obtain a copy of the License at +#* +#* http://www.apache.org/licenses/LICENSE-2.0 +#* +#* Unless required by applicable law or agreed to in writing, +#* software distributed under the License is distributed on an +#* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +#* KIND, either express or implied. See the License for the +#* specific language governing permissions and limitations +#* under the License. +#* +#*************************************************************/ + +import os, sys +import numpy as np + +current_path_ = os.path.dirname(__file__) +singa_root_=os.path.abspath(os.path.join(current_path_,'../../../..')) +sys.path.append(os.path.join(singa_root_,'tool','python')) + +from model2 import neuralnet, updater +from singa.driver import Driver +from singa.layer import * +from singa.model import save_model_parameter, load_model_parameter +from singa.utils.utility import swap32 + +from PIL import Image +import glob,random, shutil, time +from flask import Flask, request, redirect, url_for +from singa.utils import kvstore, imgtool +app = Flask(__name__) + +''' +Example of CNN with cifar10 dataset +''' +def train(batchsize,disp_freq,check_freq,train_step,workspace,checkpoint=None): + print '[Layer registration/declaration]' + # TODO change layer registration methods + d = Driver() + d.Init(sys.argv) + + print '[Start training]' + + #if need to load checkpoint + if checkpoint: + load_model_parameter(workspace+checkpoint, neuralnet, batchsize) + + for i in range(0,train_step): + + for h in range(len(neuralnet)): + #Fetch data for input layer + if neuralnet[h].layer.type==kDummy: + neuralnet[h].FetchData(batchsize) + else: + neuralnet[h].ComputeFeature() + + neuralnet[-1].ComputeGradient(i+1, updater) + + if (i+1)%disp_freq == 0: + print ' Step {:>3}: '.format(i+1), + neuralnet[h].display() + + if (i+1)%check_freq == 0: + save_model_parameter(i+1, workspace, neuralnet) + + + print '[Finish training]' + + +def product(workspace,checkpoint): + + print '[Layer registration/declaration]' + # TODO change layer registration methods + d = Driver() + d.Init(sys.argv) + + load_model_parameter(workspace+checkpoint, neuralnet,1) + + app.debug = True + app.run(host='0.0.0.0', port=80) + + +@app.route("/") +def index(): + return "Hello World! This is SINGA DLAAS! Please send post request with image=file to '/predict' " + +def allowed_file(filename): + allowd_extensions_ = set(['txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif']) + return '.' in filename and \ + filename.rsplit('.', 1)[1] in allowd_extensions_ + +@app.route('/predict', methods=['POST']) +def predict(): + size_=(32,32) + pixel_length_=3*size_[0]*size_[1] + label_num_=10 + if request.method == 'POST': + file = request.files['image'] + if file and allowed_file(file.filename): + im = Image.open(file).convert("RGB") + im = imgtool.resize_to_center(im,size_) + pixel = floatVector(pixel_length_) + byteArray = imgtool.toBin(im,size_) + data = np.frombuffer(byteArray, dtype=np.uint8) + data = data.reshape(1, pixel_length_) + #dummy data Layer + shape = intVector(4) + shape[0]=1 + shape[1]=3 + shape[2]=size_[0] + shape[3]=size_[1] + + for h in range(len(neuralnet)): + #Fetch data for input layer + if neuralnet[h].is_datalayer: + if not neuralnet[h].is_label: + neuralnet[h].Feed(data,3) + else: + neuralnet[h].FetchData(1) + else: + neuralnet[h].ComputeFeature() + + #get result + #data = neuralnet[-1].get_singalayer().data(neuralnet[-1].get_singalayer()) + #prop =floatArray_frompointer(data.mutable_cpu_data()) + prop = neuralnet[-1].GetData() + print prop + result=[] + for i in range(label_num_): + result.append((i,prop[i])) + + result.sort(key=lambda tup: tup[1], reverse=True) + print result + response="" + for r in result: + response+=str(r[0])+":"+str(r[1]) + + return response + return "error" + + +if __name__=='__main__': + + if sys.argv[1]=="train": + if len(sys.argv) < 6: + print "argv should be more than 6" + exit() + if len(sys.argv) > 6: + checkpoint = sys.argv[6] + else: + checkpoint = None + #training + train( + batchsize = int(sys.argv[2]), + disp_freq = int(sys.argv[3]), + check_freq = int(sys.argv[4]), + train_step = int(sys.argv[5]), + workspace = '/workspace', + checkpoint = checkpoint, + ) + else: + if len(sys.argv) < 4: + print "argv should be more than 3" + exit() + checkpoint = sys.argv[2] + port = sys.argv[3] + product( + workspace = '/workspace', + checkpoint = checkpoint + ) + diff --git a/tool/python/examples/user1-cifar10/model.py b/tool/python/examples/user1-cifar10/model.py new file mode 100755 index 0000000000..67c492e391 --- /dev/null +++ b/tool/python/examples/user1-cifar10/model.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python + +#/************************************************************ +#* +#* Licensed to the Apache Software Foundation (ASF) under one +#* or more contributor license agreements. See the NOTICE file +#* distributed with this work for additional information +#* regarding copyright ownership. The ASF licenses this file +#* to you under the Apache License, Version 2.0 (the +#* "License"); you may not use this file except in compliance +#* with the License. You may obtain a copy of the License at +#* +#* http://www.apache.org/licenses/LICENSE-2.0 +#* +#* Unless required by applicable law or agreed to in writing, +#* software distributed under the License is distributed on an +#* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +#* KIND, either express or implied. See the License for the +#* specific language governing permissions and limitations +#* under the License. +#* +#*************************************************************/ + +import os, sys +import numpy as np + +current_path_ = os.path.dirname(__file__) +singa_root_=os.path.abspath(os.path.join(current_path_,'../../../..')) +sys.path.append(os.path.join(singa_root_,'tool','python')) + +from singa.driver import Driver +from singa.layer import * +from singa.model import * +from singa.utils.utility import swap32 + +data1=Dummy(shape=[50000,3,32,32],path="/workspace/data/train.bin",dtype='byte',src=[]) +data2=Dummy(shape=[50000,1],path="/workspace/data/train.label.bin",dtype='int',src=[]) +c1=Convolution2D(32, 5, 1, 2, w_std=0.0001, b_lr=2,src=[data1]) +p1=MaxPooling2D(pool_size=(3,3), stride=2,src=[c1]) +a1=Activation('relu',src=[p1]) +l1=LRN2D(3, alpha=0.00005, beta=0.75,src=[a1]) +c2=Convolution2D(32, 5, 1, 2, b_lr=2,src=[l1]) +a2=Activation('relu',src=[c2]) +p2=AvgPooling2D(pool_size=(3,3), stride=2,src=[a2]) +l2=LRN2D(3, alpha=0.00005, beta=0.75,src=[p2]) +c3=Convolution2D(64, 5, 1, 2,src=[l2]) +a3=Activation('relu',src=[c3]) +p3=AvgPooling2D(pool_size=(3,3), stride=2,src=[a3]) +d=Dense(10, w_wd=250, b_lr=2, b_wd=0,src=[p3]) +loss=Loss('softmaxloss',src=[d,data2]) + +neuralnet = [data1, data2, c1, p1, a1, l1, c2, a2, p2, l2, c3, a3, p3, d, loss] + +#algorithm +updater = SGD(decay=0.004, momentum=0.9, lr_type='manual', step=(0,60000,65000), step_lr=(0.001,0.0001,0.00001)) \ No newline at end of file diff --git a/tool/python/examples/user2-mnist/main.py b/tool/python/examples/user2-mnist/main.py new file mode 100755 index 0000000000..006d7d758f --- /dev/null +++ b/tool/python/examples/user2-mnist/main.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python + +#/************************************************************ +#* +#* Licensed to the Apache Software Foundation (ASF) under one +#* or more contributor license agreements. See the NOTICE file +#* distributed with this work for additional information +#* regarding copyright ownership. The ASF licenses this file +#* to you under the Apache License, Version 2.0 (the +#* "License"); you may not use this file except in compliance +#* with the License. You may obtain a copy of the License at +#* +#* http://www.apache.org/licenses/LICENSE-2.0 +#* +#* Unless required by applicable law or agreed to in writing, +#* software distributed under the License is distributed on an +#* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +#* KIND, either express or implied. See the License for the +#* specific language governing permissions and limitations +#* under the License. +#* +#*************************************************************/ + +import os, sys +import numpy as np + +current_path_ = os.path.dirname(__file__) +singa_root_=os.path.abspath(os.path.join(current_path_,'../../../..')) +sys.path.append(os.path.join(singa_root_,'tool','python')) + +from model import neuralnet, loss, updater +from singa.driver import Driver +from singa.layer import * +from singa.model import save_model_parameter, load_model_parameter +from singa.utils.utility import swap32 + +#import md5 +#modelkey = md5.new('cifar10').hexdigest() + +''' +Example of MLP with mnist dataset +''' + +def load_dataset(): + ''' + train-images: 4 int32 headers & int8 pixels + train-labels: 2 int32 headers & int8 labels + ''' + print '[Loading MNIST dataset]' + fname_train_data = "examples/mnist/train-images-idx3-ubyte" + fname_train_label = "examples/mnist/train-labels-idx1-ubyte" + info = swap32(np.fromfile(fname_train_data, dtype=np.uint32, count=4)) + nb_samples = info[1] + shape = (info[2],info[3]) + + x = np.fromfile(fname_train_data, dtype=np.uint8) + x = x[4*4:] # skip header + x = x.reshape(nb_samples, shape[0]*shape[1]) + print ' data x:', x.shape + y = np.fromfile(fname_train_label, dtype=np.uint8) + y = y[4*2:] # skip header + y = y.reshape(nb_samples, 1) + print ' label y:', y.shape + + return x, y + +#------------------------------------------------------------------- +print '[Layer registration/declaration]' +d = Driver() +d.Init(sys.argv) + +#------------------------------------------------------------------- +print '[Start training]' +batchsize = 64 +disp_freq = 10 + +workspace = 'tool/python/examples/user2-mnist/' +checkpoint = 'step30-worker0' + +label = Dummy() +x, y = load_dataset() + +imgsize = 28 +nb_channel = 1 +data_shape = [batchsize, nb_channel, imgsize, imgsize] +load_model_parameter(workspace+checkpoint, neuralnet, data_shape) + +#for i in range(x.shape[0] / batchsize): +for i in range(30): + xb, yb = x[i*batchsize:(i+1)*batchsize,:], y[i*batchsize:(i+1)*batchsize,:] + + neuralnet[0].Feed(xb) + label.Feed(yb, 1, 1) + + for h in range(1, len(neuralnet)): + neuralnet[h].ComputeFeature(neuralnet[h-1]) + loss.ComputeFeature(neuralnet[-1], label) + if (i+1)%disp_freq == 0: + print ' Step {:>3}: '.format(i+1), + loss.display() + loss.ComputeGradient(i+1, updater) + +step_trained = 30 +save_model_parameter(step_trained, workspace, neuralnet) + +#--- test of loading +load_model_parameter(workspace+checkpoint, neuralnet) +for i in range(30, 50): + xb, yb = x[i*batchsize:(i+1)*batchsize,:], y[i*batchsize:(i+1)*batchsize,:] + + neuralnet[0].Feed(xb) + label.Feed(yb, 1, 1) + + for h in range(1, len(neuralnet)): + neuralnet[h].ComputeFeature(neuralnet[h-1]) + loss.ComputeFeature(neuralnet[-1], label) + if (i+1)%disp_freq == 0: + print ' Step {:>3}: '.format(i+1), + loss.display() + loss.ComputeGradient(i+1, updater) diff --git a/tool/python/examples/user2-mnist/model.py b/tool/python/examples/user2-mnist/model.py new file mode 100755 index 0000000000..4d5453b43b --- /dev/null +++ b/tool/python/examples/user2-mnist/model.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python + +#/************************************************************ +#* +#* Licensed to the Apache Software Foundation (ASF) under one +#* or more contributor license agreements. See the NOTICE file +#* distributed with this work for additional information +#* regarding copyright ownership. The ASF licenses this file +#* to you under the Apache License, Version 2.0 (the +#* "License"); you may not use this file except in compliance +#* with the License. You may obtain a copy of the License at +#* +#* http://www.apache.org/licenses/LICENSE-2.0 +#* +#* Unless required by applicable law or agreed to in writing, +#* software distributed under the License is distributed on an +#* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +#* KIND, either express or implied. See the License for the +#* specific language governing permissions and limitations +#* under the License. +#* +#*************************************************************/ + +import os, sys +import numpy as np + +current_path_ = os.path.dirname(__file__) +singa_root_=os.path.abspath(os.path.join(current_path_,'../../../..')) +sys.path.append(os.path.join(singa_root_,'tool','python')) + +from singa.layer import * +from singa.model import * + +input = Dummy() + +neuralnet = [] # neural net (hidden layers) +neuralnet.append(input) +neuralnet.append(Dense(2500, init='uniform')) +neuralnet.append(Activation('stanh')) +neuralnet.append(Dense(2000, init='uniform')) +neuralnet.append(Activation('stanh')) +neuralnet.append(Dense(1500, init='uniform')) +neuralnet.append(Activation('stanh')) +neuralnet.append(Dense(1000, init='uniform')) +neuralnet.append(Activation('stanh')) +neuralnet.append(Dense(500, init='uniform')) +neuralnet.append(Activation('stanh')) +neuralnet.append(Dense(10, init='uniform')) +loss = Loss('softmaxloss') + +updater = SGD(lr=0.001, lr_type='step') diff --git a/tool/python/singa/driver.i b/tool/python/singa/driver.i index f756d574f4..c4f3f0635f 100644 --- a/tool/python/singa/driver.i +++ b/tool/python/singa/driver.i @@ -25,19 +25,105 @@ %include "std_vector.i" %include "std_string.i" %include "argcargv.i" +%include "carrays.i" +%array_class(float, floatArray); + %apply (int ARGC, char **ARGV) { (int argc, char **argv) } %{ #include "singa/driver.h" +#include "singa/worker.h" +#include "singa/neuralnet/neuralnet.h" +#include "singa/neuralnet/layer.h" +#include "singa/neuralnet/input_layer.h" +#include "singa/neuralnet/neuron_layer.h" +#include "singa/neuralnet/loss_layer.h" +#include "singa/utils/blob.h" +#include "singa/utils/param.h" +#include "singa/utils/updater.h" +#include "singa/proto/job.pb.h" +#include "singa/proto/common.pb.h" %} -namespace singa{ -using std::vector; -class Driver{ -public: -void Train(bool resume, const std::string job_conf); -void Init(int argc, char **argv); -void InitLog(char* arg); -void Test(const std::string job_conf); -}; +namespace std { + %template(strVector) vector; + %template(intVector) vector; + %template(floatVector) vector; + %template(layerVector) vector; + %template(paramVector) vector; } +namespace singa{ + class Driver{ + public: + void Train(bool resume, const std::string job_conf); + void Init(int argc, char **argv); + void InitLog(char* arg); + void Test(const std::string job_conf); + }; + + class NeuralNet{ + public: + static NeuralNet* CreateNeuralNet(const std::string str); + void Load(const std::vector& paths); + inline const std::vector& layers(); + inline const std::vector& srclayers(const singa::Layer* layer); + }; + + %nodefault Worker; + class Worker{ + public: + static singa::Worker* CreateWorker(const std::string str); + void InitNetParams(const std::string& folder, std::vector net); + void Checkpoint(int step, const std::string& folder, std::vector net); + }; + + class DummyLayer{ + public: + /* void Setup(const singa::LayerProto& proto, const std::vector& srclayers); + */ + void Setup(const std::string str, const std::vector& srclayers); + void Feed(std::vector shape, std::vector* data, int op); + singa::Layer* ToLayer(); + }; + + %nodefault Layer; + class Layer{ + public: + static singa::Layer* CreateLayer(const std::string str); + static void SetupLayer(singa::Layer* layer, const std::string str, const std::vector& srclayers); + virtual void ComputeFeature(int flag, const std::vector& srclayers); + virtual void ComputeGradient(int flag, const std::vector& srclayers); + virtual const singa::Blob& data(const singa::Layer* from); + virtual const std::vector GetParams(); + virtual const std::string ToString(bool debug, int flag); + void SetParams(std::vector params); + }; + + %nodefault Updater; + class Updater{ + public: + static singa::Updater* CreateUpdater(const std::string str); + virtual void Update(int step, singa::Param* param, float grad_scale); + }; + + template + class Blob{ + public: + inline int count(); + inline const std::vector& shape(); + inline Dtype* mutable_cpu_data(); + inline const Dtype* cpu_data(); + }; + + class Param{ + public: + inline int size(); + inline const std::vector& shape(); + inline float* mutable_cpu_data(); + void FromProto(const std::string str); + /*void ToProto(singa::BlobProto* blob); + */ + }; + + %template(floatBlob) Blob; +} diff --git a/tool/python/singa/layer.py b/tool/python/singa/layer.py index f838e45852..8a0750ab45 100644 --- a/tool/python/singa/layer.py +++ b/tool/python/singa/layer.py @@ -25,15 +25,21 @@ This script includes Layer class and its subclasses that users can configure different types of layers for their model. ''' - +import numpy from singa.parameter import Parameter, set_param_field from singa.initializations import get_init_values from singa.utils.utility import setval, generate_name from singa.utils.message import * from google.protobuf import text_format +from singa.driver import Layer as SingaLayer, Updater as SingaUpdater,\ + intVector, floatVector, layerVector,\ + paramVector, floatArray_frompointer, DummyLayer + class Layer(object): + singaupdater = None + def __init__(self, **kwargs): ''' **kwargs (KEY=VALUE) @@ -41,12 +47,389 @@ def __init__(self, **kwargs): ''' self.layer = Message('Layer', **kwargs).proto - # required + # required field if not 'name' in kwargs: setval(self.layer, name=generate_name('layer', 1)) - # srclayers are set in Model.build() + # layer connectivity is set in Model.build() self.is_datalayer = False + self.singalayer = None + self.srclayers = [] + + + # set src for Rafiki + if 'src' in kwargs: + self.src = kwargs['src'] + else: + self.src = None + + def setup(self, srclys): + # create singa::Layer and store srclayers + if self.singalayer == None: + self.singalayer = SingaLayer.CreateLayer(self.layer.SerializeToString()) + self.singaSrclayerVector = layerVector(len(srclys)) + for i in range(len(srclys)): + self.srclayers.append(srclys[i]) + self.singaSrclayerVector[i] = srclys[i].get_singalayer() + # set up the layer + SingaLayer.SetupLayer(self.singalayer, self.layer.SerializeToString(), self.singaSrclayerVector) + + def ComputeFeature(self, *srclys): + ''' The method creates and sets up singa::Layer + and maintains its source layers + then call ComputeFeature for data transformation. + + *srclys = (list) // a list of source layers + ''' + + # create singa::Layer and store srclayers + if self.singalayer == None: + if self.src != None: + srclys = self.src + self.singalayer = SingaLayer.CreateLayer(self.layer.SerializeToString()) + self.singaSrclayerVector = layerVector(len(srclys)) + for i in range(len(srclys)): + self.srclayers.append(srclys[i]) + self.singaSrclayerVector[i] = srclys[i].get_singalayer() + # set up the layer + SingaLayer.SetupLayer(self.singalayer, self.layer.SerializeToString(), self.singaSrclayerVector) + + self.singalayer.ComputeFeature(1, self.singaSrclayerVector) + + def ComputeGradient(self, step, upd=None): + ''' The method creates singa::Updater + and calls ComputeGradient for gradient computation + then updates the parameters. + + step = (int) // a training step + upd = (object) // Updater object + ''' + + # create singa::Updater + assert upd != None, 'required Updater (see model.py)' + if Layer.singaupdater == None: + Layer.singaupdater = SingaUpdater.CreateUpdater(upd.proto.SerializeToString()) + + # call ComputeGradient of Singa + self.singalayer.ComputeGradient(1, self.singaSrclayerVector) + + # update parameters + singaParams = self.singalayer.GetParams() + for p in singaParams: + Layer.singaupdater.Update(step, p, 1.0) + + # recursively call ComputeGradient of srclayers + #(TODO) what if there are multiple source layers??? + for sly in self.srclayers: + if sly.srclayers != None: + sly.ComputeGradient(step, upd) + + def GetParams(self): + ''' The method gets parameter values + singaParams[0] for weight + singaParams[1] for bias + ''' + singaParams = self.singalayer.GetParams() + assert len(singaParams) == 2, 'weight and bias' + # for weight + weight_array = floatArray_frompointer(singaParams[0].mutable_cpu_data()) + weight = [ weight_array[i] for i in range(singaParams[0].size()) ] + weight = numpy.array(weight).reshape(singaParams[0].shape()) + # for bias + bias_array = floatArray_frompointer(singaParams[1].mutable_cpu_data()) + bias = [ bias_array[i] for i in range(singaParams[1].size()) ] + bias = numpy.array(bias).reshape(singaParams[1].shape()[0], 1) + + return weight, bias + + def SetParams(self, *params): + ''' The method sets parameter values + params[0] for weight + params[1] for bias + ''' + singaParams = self.singalayer.GetParams() + import pb2.common_pb2 as cm + for k in range(len(params)): + bp = cm.BlobProto() + bp.shape.append(int(params[k].shape[0])) + bp.shape.append(int(params[k].shape[1])) + for i in range(params[k].shape[0]): + for j in range(params[k].shape[1]): + bp.data.append(params[k][i,j]) + singaParams[k].FromProto(bp.SerializeToString()) + + def GetData(self): + blobptr = self.singalayer.data(self.singalayer) + data_array = floatArray_frompointer(blobptr.mutable_cpu_data()) + data = [ data_array[i] for i in range(blobptr.count()) ] + return data + + def display(self): + debug, flag = 0, 0 + print self.singalayer.ToString(debug, flag) + + def get_singalayer(self): + return self.singalayer + + +class Dummy(object): + + def __init__(self, shape=[], path='', dtype='', src=[]): + ''' Dummy layer is used for data layer + shape = (list) // [# of samples, # of channels, img h, img w] + path = (string) // path to dataset + ''' + self.is_datalayer = True + self.srclayers = None + self.singalayer = None + + # create layer proto for Dummy layer + kwargs = {'name':'dummy', 'type':kDummy} + self.layer = Message('Layer', **kwargs).proto + + + # if dataset path is not specified, skip + # otherwise, load dataset + if path == '': + return + + self.shape = shape + self.path = path + self.src = None + self.batch_index = 0 + + import numpy as np + nb_samples = shape[0] + nb_pixels = shape[1] + for i in range(len(shape)-2): + nb_pixels *= shape[i+2] + if dtype=='byte': + self.is_label = 0 + d = np.fromfile(path, dtype=np.uint8) + elif dtype=='int': + self.is_label = 1 + d = np.fromfile(path, dtype=np.int) + self.data = d.reshape(nb_samples, nb_pixels) + + + def setup(self, data_shape): + ''' Create and Setup singa Dummy layer + called by load_model_parameter + ''' + if self.singalayer == None: + setval(self.layer.dummy_conf, input=True) + setval(self.layer.dummy_conf, shape=data_shape) + self.singalayer = DummyLayer() + self.singalayer.Setup(self.layer.SerializeToString(), layerVector(0)) + + + def FetchData(self, batchsize): + + d = self.data[self.batch_index*batchsize:(self.batch_index+1)*batchsize, :] + self.Feed(d, self.shape[1], self.is_label) + self.batch_index += 1 + + + def Feed(self, data, nb_channel=1, is_label=0): + ''' Create and Setup singa::DummyLayer for input data + Insert data using Feed() + ''' + + batchsize, hdim = data.shape + datasize = batchsize * hdim + imgsize = int(numpy.sqrt(hdim/nb_channel)) + shapeVector = [batchsize, nb_channel, imgsize, imgsize] + + # create and setup the dummy layer + if self.singalayer == None: + setval(self.layer.dummy_conf, input=True) + setval(self.layer.dummy_conf, shape=shapeVector) + self.singalayer = DummyLayer() + self.singalayer.Setup(self.layer.SerializeToString(), layerVector(0)) + + # feed input data + data = data.astype(numpy.float) + dataVector = floatVector(datasize) + k = 0 + for i in range(batchsize): + for j in range(hdim): + dataVector[k] = data[i,j] + k += 1 + self.singalayer.Feed(shapeVector, dataVector, is_label) + + def get_singalayer(self): + return self.singalayer.ToLayer() + +class ImageData(object): + + def __init__(self, shape=[], data_path='', data_type='byte',mean_path='',mean_type='float'): + ''' Dummy layer is used for data layer + shape = (list) // [# of samples, # of channels, img h, img w] + data_path = (string) // path to dataset + mean_path + ''' + self.is_datalayer = True + self.srclayers = None + self.singalayer = None + self.is_label = False + # create layer proto for Dummy layer + kwargs = {'name':'dummy', 'type':kDummy} + self.layer = Message('Layer', **kwargs).proto + + # if dataset path is not specified, skip + # otherwise, load dataset + if data_path == '' or mean_path=='': + return + + self.shape = shape + self.data_path = data_path + self.mean_path = mean_path + self.src = None + self.batch_index = 0 + + import numpy as np + nb_samples = shape[0] + nb_pixels = shape[1] + for i in range(len(shape)-2): + nb_pixels *= shape[i+2] + + if data_type=='byte': + d = np.fromfile(data_path, dtype=np.uint8) + elif data_type=='int': + d = np.fromfile(data_path, dtype=np.int) + self.data = d.reshape(nb_samples, nb_pixels) + + if mean_type=='float': + d = np.fromfile(mean_path, dtype=np.float32) + self.mean = d.reshape(1, nb_pixels) + + def setup(self, data_shape): + ''' Create and Setup singa Dummy layer + called by load_model_parameter + ''' + if self.singalayer == None: + setval(self.layer.dummy_conf, input=True) + setval(self.layer.dummy_conf, shape=data_shape) + self.singalayer = DummyLayer() + self.singalayer.Setup(self.layer.SerializeToString(), layerVector(0)) + + + def FetchData(self, batchsize): + + d = self.data[self.batch_index*batchsize:(self.batch_index+1)*batchsize, :] + self.Feed(d, self.shape[1]) + self.batch_index += 1 + if (self.batch_index+1)*batchsize>self.data.shape[0]: + self.batch_index=0 + + + + def Feed(self, data, nb_channel=1): + ''' Create and Setup singa::DummyLayer for input data + Insert data using Feed() + Need to minus the mean file + ''' + batchsize, hdim = data.shape + datasize = batchsize * hdim + imgsize = int(numpy.sqrt(hdim/nb_channel)) + shapeVector = [batchsize, nb_channel, imgsize, imgsize] + #print shapeVector + # create and setup the dummy layer + if self.singalayer == None: + setval(self.layer.dummy_conf, input=True) + setval(self.layer.dummy_conf, shape=shapeVector) + self.singalayer = DummyLayer() + self.singalayer.Setup(self.layer.SerializeToString(), layerVector(0)) + + # feed input data and minus mean + data = data.astype(numpy.float) + dataVector = floatVector(datasize) + k = 0 + for i in range(batchsize): + for j in range(hdim): + dataVector[k] = data[i,j]-self.mean[0,j] + k += 1 + self.singalayer.Feed(shapeVector, dataVector, 0) + + def get_singalayer(self): + return self.singalayer.ToLayer() + + +class LabelData(object): + + def __init__(self, shape=[], label_path='', label_type='int'): + ''' Dummy layer is used for label data layer + shape = (list) // [# of samples, # of channels, img h, img w] + data_path = (string) // path to dataset + mean_path + ''' + self.is_datalayer = True + self.srclayers = None + self.singalayer = None + self.is_label = True + # create layer proto for Dummy layer + kwargs = {'name':'dummy', 'type':kDummy} + self.layer = Message('Layer', **kwargs).proto + + # if dataset path is not specified, skip + # otherwise, load dataset + if label_path == '': + return + + self.shape = shape + self.label_path = label_path + self.src = None + self.batch_index = 0 + + import numpy as np + nb_samples = shape[0] + + if label_type=='int': + d = np.fromfile(label_path, dtype=np.int) + self.data = d.reshape(nb_samples, 1) + + def setup(self, data_shape): + ''' Create and Setup singa Dummy layer + called by load_model_parameter + ''' + if self.singalayer == None: + setval(self.layer.dummy_conf, input=True) + setval(self.layer.dummy_conf, shape=data_shape) + self.singalayer = DummyLayer() + self.singalayer.Setup(self.layer.SerializeToString(), layerVector(0)) + + + def FetchData(self, batchsize): + + d = self.data[self.batch_index*batchsize:(self.batch_index+1)*batchsize, :] + self.Feed(d, self.shape[1]) + self.batch_index += 1 + if (self.batch_index+1)*batchsize>self.data.shape[0]: + self.batch_index=0 + + def Feed(self, data,nb_chanel=1): + ''' Create and Setup singa::DummyLayer for input data + Insert data using Feed() + Need to minus the mean file + ''' + batchsize = data.shape[0] + shapeVector = [batchsize, 1] + + # create and setup the dummy layer + if self.singalayer == None: + setval(self.layer.dummy_conf, input=True) + setval(self.layer.dummy_conf, shape=shapeVector) + self.singalayer = DummyLayer() + self.singalayer.Setup(self.layer.SerializeToString(), layerVector(0)) + + data = data.astype(numpy.float) + dataVector = floatVector(batchsize) + for i in range(batchsize): + dataVector[i] = data[i,0] + self.singalayer.Feed(shapeVector, dataVector, 1) + + def get_singalayer(self): + return self.singalayer.ToLayer() class Data(Layer): @@ -66,11 +449,11 @@ def __init__(self, load, phase='train', checkpoint=None, assert load != None, 'data type should be specified' if load == 'kData': super(Data, self).__init__(name=generate_name('data'), - user_type=load) + user_type=load, **kwargs) else: self.layer_type = enumLayerType(load) super(Data, self).__init__(name=generate_name('data'), - type=self.layer_type) + type=self.layer_type, **kwargs) self.is_datalayer = True # include/exclude @@ -114,7 +497,7 @@ def __init__(self, nb_filter=0, kernel=0, stride=1, pad=0, assert nb_filter > 0, 'nb_filter should be set as positive int' super(Convolution2D, self).__init__(name=generate_name('conv', 1), - type=kCConvolution) + type=kCConvolution, **kwargs) fields = {"num_filters":nb_filter} # for kernel if type(kernel) == int: @@ -155,6 +538,7 @@ def __init__(self, nb_filter=0, kernel=0, stride=1, pad=0, if activation: self.mask = Activation(activation=activation).layer + class MaxPooling2D(Layer): def __init__(self, pool_size=None, @@ -177,7 +561,7 @@ def __init__(self, pool_size=None, 'currently pool size should be square in Singa' super(MaxPooling2D, self).__init__(name=generate_name('pool'), type=kCPooling, **kwargs) - fields = {'pool' : PoolingProto().MAX, + fields = {'pool' : MAX, 'kernel' : pool_size[0], 'stride' : stride, 'pad' : 0 if ignore_border else 1} @@ -203,8 +587,8 @@ def __init__(self, pool_size=None, 'currently pool size should be square in Singa' super(AvgPooling2D, self).__init__(name=generate_name('pool'), type=kCPooling, **kwargs) - self.layer.pooling_conf.pool = PoolingProto().AVG - fields = {'pool' : PoolingProto().AVG, + self.layer.pooling_conf.pool = AVG + fields = {'pool' : AVG, 'kernel' : pool_size[0], 'stride' : stride, 'pad' : 0 if ignore_border else 1} @@ -218,31 +602,54 @@ def __init__(self, size=0, **kwargs): size = (int) // local size ''' - super(LRN2D, self).__init__(name=generate_name('norm'), type=kLRN) + super(LRN2D, self).__init__(name=generate_name('norm'), type=kLRN, **kwargs) # required assert size != 0, 'local size should be set' self.layer.lrn_conf.local_size = size init_values = get_init_values('lrn2d', **kwargs) setval(self.layer.lrn_conf, **init_values) +class Loss(Layer): + + def __init__(self, lossname, topk=1, **kwargs): + ''' + required + lossname = (string) // softmaxloss, euclideanloss + ''' + self.layer_type = enumLayerType(lossname) + super(Loss, self).__init__(name=generate_name(lossname), + type=self.layer_type, **kwargs) + if lossname == 'softmaxloss': + self.layer.softmaxloss_conf.topk = topk class Activation(Layer): - def __init__(self, activation='stanh', topk=1): + def __init__(self, activation='stanh', **kwargs): ''' required - activation = (string) - optional - topk = (int) // the number of results + activation = (string) // relu, sigmoid, tanh, stanh, softmax. ''' + if activation == 'tanh': + print 'Warning: Tanh layer is not supported for CPU' self.name = activation - if activation == 'tanh': activation = 'stanh' # <-- better way to set? - self.layer_type = enumLayerType(activation) + self.layer_type = kActivation + if activation == 'stanh': + self.layer_type = kSTanh + elif activation == 'softmax': + self.layer_type = kSoftmax super(Activation, self).__init__(name=generate_name(self.name), - type=self.layer_type) - if activation == 'softmaxloss': - self.layer.softmaxloss_conf.topk = topk + type=self.layer_type, **kwargs) + if activation == 'relu': + self.layer.activation_conf.type = RELU + elif activation == 'sigmoid': + self.layer.activation_conf.type = SIGMOID + elif activation == 'tanh': + self.layer.activation_conf.type = TANH # for GPU + #elif activation == 'stanh': + # self.layer.activation_conf.type = STANH + + class Dropout(Layer): @@ -255,19 +662,19 @@ def __init__(self, ratio=0.5): self.name = 'dropout' self.layer_type = enumLayerType(self.name) super(Dropout, self).__init__(name=generate_name(self.name), - type=self.layer_type) + type=self.layer_type, **kwargs) self.layer.dropout_conf.dropout_ratio = ratio class Accuracy(Layer): - def __init__(self): + def __init__(self, **kwargs): ''' ''' self.name = 'accuracy' self.layer_type = enumLayerType(self.name) super(Accuracy, self).__init__(name=generate_name(self.name), - type=self.layer_type) + type=self.layer_type, **kwargs) class RGB(Layer): @@ -383,7 +790,7 @@ def __init__(self, out_dim=None, w_param=None, b_param=None, self.name = kwargs['name'] if 'name' in kwargs else 'RBMVis' self.layer_type = kwargs['type'] if 'type' in kwargs else kRBMVis super(RBM, self).__init__(name=generate_name(self.name, - withnumber=False), type=self.layer_type) + withnumber=False), type=self.layer_type, **kwargs) setval(self.layer.rbm_conf, hdim=self.out_dim[-1]) if self.layer_type == kRBMHid and sampling != None: if sampling == 'gaussian': diff --git a/tool/python/singa/model.py b/tool/python/singa/model.py index f652f86eed..9f2a768f64 100644 --- a/tool/python/singa/model.py +++ b/tool/python/singa/model.py @@ -107,9 +107,9 @@ def compile(self, optimizer=None, cluster=None, else: # add new layer if loss == 'categorical_crossentropy': - self.add(Activation('softmaxloss', topk=topk)) + self.add(Loss('softmaxloss', topk=topk)) elif loss == 'mean_squared_error': - self.add(Activation('euclideanloss')) + self.add(Loss('euclideanloss')) elif loss == 'user_loss_rnnlm': # user-defined loss layer self.add(UserLossRNNLM(nclass=kwargs['nclass'], vocab_size=kwargs['in_dim'])) @@ -323,17 +323,40 @@ def set_cudnn_layer_type(self, net): elif ly_type == kLRN: cudnn_ly_type = kCudnnLRN elif ly_type == kSoftmax: cudnn_ly_type = kCudnnSoftmax elif ly_type == kSoftmaxLoss: cudnn_ly_type = kCudnnSoftmaxLoss + elif ly_type == kActivation: + cudnn_ly_type = kCudnnActivation elif ly_type == kSTanh: + print 'Error report: STanh layer is not supported for GPU' + ''' + elif ly_type == kReLU: cudnn_ly_type = kCudnnActivation - net.layer[i].activation_conf.type = STANH + net.layer[i].activation_conf.type = RELU elif ly_type == kSigmoid: cudnn_ly_type = kCudnnActivation net.layer[i].activation_conf.type = SIGMOID - elif ly_type == kReLU: + elif ly_type == kTanh: cudnn_ly_type = kCudnnActivation - net.layer[i].activation_conf.type = RELU + net.layer[i].activation_conf.type = TANH + ''' + #elif ly_type == kSTanh: + # print 'Error report: STanh layer is not supported for GPU' + #cudnn_ly_type = kCudnnActivation + #net.layer[i].activation_conf.type = STANH net.layer[i].type = cudnn_ly_type + def show(self): + for ly in self.jobconf.neuralnet.layer: + print layer(ly.name) + + def layer_by_id(self, k): + return self.jobconf.neuralnet.layer[k] + + def layer_by_name(self, name): + return self.layers[k] + + def size(self): + return len(self.jobconf.neuralnet.layer) + class Energy(Model): ''' energy model ''' @@ -627,3 +650,45 @@ def SingaRun_script(filename='', execpath=''): #TODO better format to store the result?? return resultDic +def load_model_parameter(fin, neuralnet, batchsize=1, data_shape=None): + hly_idx = 0 + for i in range(len(neuralnet)): + if neuralnet[i].is_datalayer: + if data_shape == None: + shape = neuralnet[i].shape + shape[0] = batchsize + neuralnet[i].setup(shape) + else: + neuralnet[i].setup(data_shape) + else: + hly_idx = i + break + + net = layerVector(len(neuralnet)-hly_idx) + for i in range(hly_idx, len(neuralnet)): + if neuralnet[i].src==None: + neuralnet[i].setup(neuralnet[i-1]) + else: + neuralnet[i].setup(neuralnet[i].src) + net[i-hly_idx] = neuralnet[i].singalayer + + from singa.driver import Worker + alg = Algorithm(type=enumAlgType('bp')).proto + w = Worker.CreateWorker(alg.SerializeToString()) + w.InitNetParams(fin, net) + +def save_model_parameter(step, fout, neuralnet): + + hly_idx = 0 + for i in range(len(neuralnet)): + if not neuralnet[i].is_datalayer: + hly_idx = i + break + + from singa.driver import Worker + net = layerVector(len(neuralnet)-hly_idx) + for i in range(hly_idx, len(neuralnet)): + net[i-hly_idx] = neuralnet[i].singalayer + alg = Algorithm(type=enumAlgType('bp')).proto + w = Worker.CreateWorker(alg.SerializeToString()) + w.Checkpoint(step, fout, net) diff --git a/tool/python/singa/utils/imgtool.py b/tool/python/singa/utils/imgtool.py new file mode 100644 index 0000000000..683e3d3954 --- /dev/null +++ b/tool/python/singa/utils/imgtool.py @@ -0,0 +1,345 @@ +''' +Created on Jan 8, 2016 +@author: aaron +''' +from PIL import Image +import sys, glob, os, random, shutil, time, struct +from . import kvstore + +sys.path.append(os.path.join(os.path.dirname(__file__), '../../pb2')) +from common_pb2 import RecordProto + +#bytearray to image object +def toImg(byteArray,size): + img = Image.new("RGB",size) + pix = img.load() + area = size[0]*size[1] + red = byteArray[:area] + green = byteArray[area:area*2] + blue = byteArray[area*2:] + index=0 + for x in range(0,size[0]): + for y in range(0,size[1]): + img.putpixel((x,y), (red[index],green[index],blue[index])) + index+=1 + return img + +# image object to bytearray +def toBin(im,size): + red = [] + green = [] + blue = [] + pix = im.load() + for x in range(0,size[0]): + for y in range(0,size[1]): + pixel = pix[x,y] + red.append(pixel[0]) + green.append(pixel[1]) + blue.append(pixel[2]) + fileByteArray = bytearray(red+green+blue) + return fileByteArray + +def resize_to_center(im,size): + oldSize = im.size + #bigest center cube + data=(0,0,0,0) + if oldSize[0] < oldSize[1]: + data= (0,(oldSize[1]-oldSize[0])/2,oldSize[0],(oldSize[1]+oldSize[0])/2) + else : + data= ((oldSize[0]-oldSize[1])/2,0,(oldSize[0]+oldSize[1])/2,oldSize[1]) + newIm = im.transform(size,Image.EXTENT,data) + return newIm +#transfer, resize img. only deal with .jpg file +def transform_img( + input_folder, + output_folder, + size + ): + print "Transfer images begin at:"+time.strftime('%X %x %Z') + + #if output_folder exists, empty it, otherwise create a dir + try: + os.stat(output_folder) + for root, dirs, files in os.walk(output_folder): + for f in files: + os.unlink(os.path.join(root, f)) + for d in dirs: + shutil.rmtree(os.path.join(root, d)) + except: + os.makedirs(output_folder) + + count=0 + for root, dirs, files in os.walk(input_folder): + for d in dirs: + print "find dir:", d + os.makedirs(os.path.join(output_folder,d)) + for infile in glob.glob(os.path.join(input_folder,d,"*.jpg")): + fileName = os.path.split(infile)[-1] + name,ext = os.path.splitext(fileName) + im = Image.open(infile).convert("RGB") + newIm=resize_to_center(im,size) + newIm.save(os.path.join(output_folder,d,name+".center.jpg"), "JPEG") + count+=1 + + print "transfer end at:"+time.strftime('%X %x %Z') + print "total file number: ", count + + return count + + + +def generate_bin_data( + input_folder, + output_folder, + size , + train_num, + test_num, + validate_num, + meta_file_name="meta.txt", + train_bin_file_name="train.bin", + train_label_bin_file_name="train.label.bin", + test_bin_file_name="test.bin", + test_label_bin_file_name="test.label.bin", + validate_bin_file_name="validate.bin", + validate_label_bin_file_name="validate.label.bin", + mean_bin_file_name="mean.bin", + label_bin_file_name="label.bin", + + ): + try: + os.stat(output_folder) + except: + os.makedirs(output_folder) + + print "Generate bin start at: "+time.strftime('%X %x %Z') + meta_file = open(os.path.join(output_folder,meta_file_name), "w") + + fileList=[] + labelList= [] + label=0 #label begin from 1 + + #get all img file, the folder name is the label name + for d in os.listdir(input_folder): + if os.path.isdir(os.path.join(input_folder,d)): + labelList.append((label,d)) + for f in glob.glob(os.path.join(input_folder,d,"*.jpg")): + fileList.append((label,f)) + label += 1 + + # disorder all the files + random.shuffle(fileList) + + total = len(fileList) + print total,train_num,test_num,validate_num + assert total >= train_num+test_num+validate_num + + train_file = open(os.path.join(output_folder,train_bin_file_name),"wb") + train_label_file = open(os.path.join(output_folder,train_label_bin_file_name),"wb") + validate_file = open(os.path.join(output_folder,validate_bin_file_name),"wb") + validate_label_file = open(os.path.join(output_folder,validate_label_bin_file_name),"wb") + test_file = open(os.path.join(output_folder,test_bin_file_name),"wb") + test_label_file = open(os.path.join(output_folder,test_label_bin_file_name),"wb") + mean_file = open(os.path.join(output_folder,mean_bin_file_name),"wb") + + count=0 + trainCount=0 + validateCount=0 + testCount=0 + + # the expected image binary length + binaryLength=3*size[0]*size[1] + + meanData=[] + for i in range(0,binaryLength): + meanData.append(0.0) + + #calculate mean + for (label,f) in fileList: + + count+=1 + im =Image.open(f) + #the image size should be equal + assert im.size==size + binaryPixel=toBin(im,size) + if count <= train_num : + trainCount+=1 + train_file.write(binaryPixel) + train_label_file.write(kvstore.i2b(label)) + #only caculate train data's mean value + for i in range(binaryLength): + meanData[i]+=binaryPixel[i] + elif count <= train_num+validate_num : + validateCount+=1 + validate_label_file.write(kvstore.i2b(label)) + validate_file.write(binaryPixel) + elif count <= train_num+validate_num+test_num: + testCount+=1 + test_label_file.write(kvstore.i2b(label)) + test_file.write(binaryPixel) + else: + break + + for i in range(binaryLength): + meanData[i]/=trainCount + + meanBinary=struct.pack("%sf" % binaryLength, *meanData) + + mean_file.write(meanBinary) + mean_file.flush() + mean_file.close() + + train_file.flush() + train_file.close() + validate_file.flush() + validate_file.close() + test_file.flush() + test_file.close() + + meta_file.write("image size: "+str(size[0])+"*"+str(size[1])+"\n") + meta_file.write("total file num: "+str(count)+"\n") + meta_file.write("train file num: "+str(trainCount)+"\n") + meta_file.write("validate file num: "+str(validateCount)+"\n") + meta_file.write("test file num: "+str(testCount)+"\n") + meta_file.write("label list:[\n") + + for item in labelList: + meta_file.write("("+str(item[0])+",\""+item[1]+"\"),\n") + meta_file.write("]") + meta_file.flush() + meta_file.close() + + print "end at: "+time.strftime('%X %x %Z') + + return labelList + + +def generate_kvrecord_data( + input_folder, + output_folder, + size , + train_num, + test_num, + validate_num, + meta_file_name="meta.txt", + train_bin_file_name="train.bin", + test_bin_file_name="test.bin", + validate_bin_file_name="validate.bin", + mean_bin_file_name="mean.bin", + + ): + try: + os.stat(output_folder) + except: + os.makedirs(output_folder) + + print "Generate kvrecord start at: "+time.strftime('%X %x %Z') + meta_file = open(os.path.join(output_folder,meta_file_name), "w") + + fileList=[] + labelList= [] + label=0 #label begin from 1 + + #get all img file, the folder name is the label name + for d in os.listdir(input_folder): + if os.path.isdir(os.path.join(input_folder,d)): + labelList.append((label,d)) + for f in glob.glob(os.path.join(input_folder,d,"*.jpg")): + fileList.append((label,f)) + label += 1 + + # disorder all the files + random.shuffle(fileList) + + total = len(fileList) + print total,train_num,test_num,validate_num + assert total >= train_num+test_num+validate_num + + + trainStore = kvstore.FileStore() + trainStore.open(os.path.join(output_folder,train_bin_file_name), "create") + validateStore = kvstore.FileStore() + validateStore.open(os.path.join(output_folder,validate_bin_file_name), "create") + testStore = kvstore.FileStore() + testStore.open(os.path.join(output_folder,test_bin_file_name), "create") + + meanStore = kvstore.FileStore() + meanStore.open(os.path.join(output_folder,mean_bin_file_name), "create") + + + count=0 + trainCount=0 + validateCount=0 + testCount=0 + + # the expected image binary length + binaryLength=3*size[0]*size[1] + + meanRecord = RecordProto() + meanRecord.shape.extend([3,size[0],size[1]]) + for i in range(binaryLength): + meanRecord.data.append(0.0) + + for (label,f) in fileList: + + im =Image.open(f) + #the image size should be equal + assert im.size==size + + binaryContent=str(toBin(im,size)) + + count +=1 + record = RecordProto() + record.shape.extend([3,size[0],size[1]]) + record.label=label + record.pixel=binaryContent + + value = record.SerializeToString() + + if count <= train_num : + key = "%05d" % trainCount + trainCount+=1 + trainStore.write(key,value) + #only caculate train data's mean + for i in range(binaryLength): + meanRecord.data[i]+=ord(binaryContent[i]) + elif count <= train_num+validate_num : + key = "%05d" % validateCount + validateCount+=1 + validateStore.write(key,value) + elif count <= train_num+validate_num+test_num: + key = "%05d" % testCount + testCount+=1 + testStore.write(key,value) + else: + break + + for i in range(binaryLength): + meanRecord.data[i]/=trainCount + + meanStore.write("mean", meanRecord.SerializeToString()) + meanStore.flush() + meanStore.close() + + trainStore.flush() + trainStore.close() + validateStore.flush() + validateStore.close() + testStore.flush() + testStore.close() + + meta_file.write("image size: "+str(size[0])+"*"+str(size[1])+"\n") + meta_file.write("total file num: "+str(count)+"\n") + meta_file.write("train file num: "+str(trainCount)+"\n") + meta_file.write("validate file num: "+str(validateCount)+"\n") + meta_file.write("test file num: "+str(testCount)+"\n") + meta_file.write("label list:[\n") + + for item in labelList: + meta_file.write("("+str(item[0])+",\""+item[1]+"\"),\n") + meta_file.write("]") + meta_file.flush() + meta_file.close() + + print "end at: "+time.strftime('%X %x %Z') + + return labelList diff --git a/tool/python/singa/utils/kvstore.py b/tool/python/singa/utils/kvstore.py new file mode 100644 index 0000000000..7fe16e019d --- /dev/null +++ b/tool/python/singa/utils/kvstore.py @@ -0,0 +1,90 @@ +''' +Created on Jan 8, 2016 + +@author: aaron +''' +import struct, os + +INT_LEN=8 + +class FileStore(): + ''' + kv file store + ''' + def open(self,src_path,mode): + if mode == "create": + self._file = open(src_path,"wb") + if mode == "append": + self._file = open(src_path,"ab") + if mode == "read": + self._file = open(src_path,"rb") + return self + + def close(self): + self._file.close() + return + + def read(self): + keyLen=b2i(self._file.read(INT_LEN)) + key=str(self._file.read(keyLen)) + valueLen=b2i(self._file.read(INT_LEN)) + value=str(self._file.read(valueLen)) + return key,value + + def seekToFirst(self): + self._file.seek(0) + return + + #Don't do this + def seek(self,offset): + self._file.seek(offset) + return + + def write(self,key,value): + key_len = len(key) + value_len = len(value) + self._file.write(i2b(key_len)+key+i2b(value_len)+value) + return + + def flush(self): + self._file.flush() + return + + def __init__(self ): + + return +#integer to binary Q means long long, 8 bytes +def i2b(i): + return struct.pack("> 8) & 0x0000FF00) | + ((x >> 24) & 0x000000FF)) + +def blob_to_numpy(blob): + '''TODO This method transform blob data to python numpy array + ''' + pass diff --git a/tool/rafiki/main.py b/tool/rafiki/main.py new file mode 100755 index 0000000000..bdc1bb30bf --- /dev/null +++ b/tool/rafiki/main.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python + +#/************************************************************ +#* +#* Licensed to the Apache Software Foundation (ASF) under one +#* or more contributor license agreements. See the NOTICE file +#* distributed with this work for additional information +#* regarding copyright ownership. The ASF licenses this file +#* to you under the Apache License, Version 2.0 (the +#* "License"); you may not use this file except in compliance +#* with the License. You may obtain a copy of the License at +#* +#* http://www.apache.org/licenses/LICENSE-2.0 +#* +#* Unless required by applicable law or agreed to in writing, +#* software distributed under the License is distributed on an +#* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +#* KIND, either express or implied. See the License for the +#* specific language governing permissions and limitations +#* under the License. +#* +#*************************************************************/ + +import os, sys +import numpy as np + +current_path_ = os.path.dirname(__file__) +singa_root_=os.path.abspath(os.path.join(current_path_,'../../../..')) +sys.path.append(os.path.join(singa_root_,'tool','python')) + +from model import neuralnet, updater +from singa.driver import Driver +from singa.layer import * +from singa.model import save_model_parameter, load_model_parameter +from singa.utils.utility import swap32 + +from PIL import Image +import glob,random, shutil, time +from flask import Flask, request, redirect, url_for +from singa.utils import kvstore, imgtool +app = Flask(__name__) + +''' +Example of CNN with cifar10 dataset +''' +def train(batchsize,disp_freq,check_freq,train_step,workspace,checkpoint=None): + print '[Layer registration/declaration]' + # TODO change layer registration methods + d = Driver() + d.Init(sys.argv) + + print '[Start training]' + + #if need to load checkpoint + if checkpoint: + load_model_parameter(workspace+checkpoint, neuralnet, batchsize) + + for i in range(0,train_step): + + for h in range(len(neuralnet)): + #Fetch data for input layer + if neuralnet[h].layer.type==kDummy: + neuralnet[h].FetchData(batchsize) + else: + neuralnet[h].ComputeFeature() + + neuralnet[h].ComputeGradient(i+1, updater) + + if (i+1)%disp_freq == 0: + print ' Step {:>3}: '.format(i+1), + neuralnet[h].display() + + if (i+1)%check_freq == 0: + save_model_parameter(i+1, workspace, neuralnet) + + + print '[Finish training]' + + +def product(workspace,checkpoint): + + print '[Layer registration/declaration]' + # TODO change layer registration methods + d = Driver() + d.Init(sys.argv) + + load_model_parameter(workspace+checkpoint, neuralnet,1) + + app.debug = True + app.run(host='0.0.0.0', port=80) + + +@app.route("/") +def index(): + return "Hello World! This is SINGA DLAAS! Please send post request with image=file to '/predict' " + +def allowed_file(filename): + allowd_extensions_ = set(['txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif']) + return '.' in filename and \ + filename.rsplit('.', 1)[1] in allowd_extensions_ + +@app.route('/predict', methods=['POST']) +def predict(): + size_=(32,32) + pixel_length_=3*size_[0]*size_[1] + label_num_=10 + if request.method == 'POST': + file = request.files['image'] + if file and allowed_file(file.filename): + im = Image.open(file).convert("RGB") + im = imgtool.resize_to_center(im,size_) + pixel = floatVector(pixel_length_) + byteArray = imgtool.toBin(im,size_) + data = np.frombuffer(byteArray, dtype=np.uint8) + data = data.reshape(1, pixel_length_) + #dummy data Layer + shape = intVector(4) + shape[0]=1 + shape[1]=3 + shape[2]=size_[0] + shape[3]=size_[1] + + for h in range(len(neuralnet)): + #Fetch data for input layer + if neuralnet[h].is_datalayer: + if not neuralnet[h].is_label: + neuralnet[h].Feed(data,3) + else: + neuralnet[h].FetchData(1) + else: + neuralnet[h].ComputeFeature() + + #get result + #data = neuralnet[-1].get_singalayer().data(neuralnet[-1].get_singalayer()) + #prop =floatArray_frompointer(data.mutable_cpu_data()) + prop = neuralnet[-1].GetData() + print prop + result=[] + for i in range(label_num_): + result.append((i,prop[i])) + + result.sort(key=lambda tup: tup[1], reverse=True) + print result + response="" + for r in result: + response+=str(r[0])+":"+str(r[1]) + + return response + return "error" + + +if __name__=='__main__': + + if sys.argv[1]=="train": + if len(sys.argv) < 6: + print "argv should be more than 6" + exit() + if len(sys.argv) > 6: + checkpoint = sys.argv[6] + else: + checkpoint = None + #training + train( + batchsize = int(sys.argv[2]), + disp_freq = int(sys.argv[3]), + check_freq = int(sys.argv[4]), + train_step = int(sys.argv[5]), + workspace = '/workspace', + checkpoint = checkpoint, + ) + else: + if len(sys.argv) < 3: + print "argv should be more than 2" + exit() + checkpoint = sys.argv[2] + product( + workspace = '/workspace', + checkpoint = checkpoint + ) + diff --git a/tool/rafiki/test.sh b/tool/rafiki/test.sh new file mode 100644 index 0000000000..992745000f --- /dev/null +++ b/tool/rafiki/test.sh @@ -0,0 +1,7 @@ +#!/bin/bash +cd /workspace +wget $1 +tar zxf *.tar.gz +cp /workspace/model.py /usr/src/incubator-singa/tool/rafiki/ +cd /usr/src/incubator-singa/ +python tool/rafiki/main.py test $2 $3 diff --git a/tool/rafiki/train.sh b/tool/rafiki/train.sh new file mode 100644 index 0000000000..3e347b2f1f --- /dev/null +++ b/tool/rafiki/train.sh @@ -0,0 +1,7 @@ +#!/bin/bash +cd /workspace +wget $1 +tar zxf *.tar.gz +cp /workspace/model.py /usr/src/incubator-singa/tool/rafiki/ +cd /usr/src/incubator-singa/ +python tool/rafiki/main.py train $2 $3 $4 $5 $6