Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 8 additions & 40 deletions python/redisrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,41 +56,6 @@ def __call__(self, *args, **kwargs):
return self.fun(*(self.pending + args), **kw)


class FunctionCall(dict):
"""Encapsulates a function call as a Python dictionary."""

@staticmethod
def from_dict(dictionary):
"""Return a new FunctionCall from a Python dictionary."""
name = dictionary.get('name')
args = dictionary.get('args')
kwargs = dictionary.get('kwargs')
return FunctionCall(name, args, kwargs)

def __init__(self, name, args=None, kwargs=None):
"""Create a new FunctionCall from a method name, an optional argument tuple, and an optional keyword argument
dictionary."""
self['name'] = name
if args is not None and args != ():
self['args'] = args
if kwargs is not None and kwargs != {}:
self['kwargs'] = kwargs

def as_python_code(self):
"""Return a string representation of this object that can be evaled to execute the function call."""
argstring = '' if 'args' not in self else \
','.join(str(arg) for arg in self['args'])
kwargstring = '' if 'kwargs' not in self else \
','.join('%s=%s' % (key,val) for (key,val) in list(self['kwargs'].items()))
if len(argstring) == 0:
params = kwargstring
elif len(kwargstring) == 0:
params = argstring
else:
params = ','.join([argstring,kwargstring])
return '%s(%s)' % (self['name'], params)


def decode_message(message):
"""Returns a (transport, decoded_message) pair."""
# Try JSON, then try Python pickle, then fail.
Expand Down Expand Up @@ -144,7 +109,7 @@ def __init__(self, redis_server, message_queue, timeout=0, transport='json'):
raise Exception('invalid transport {0}'.format(transport))

def call(self, method_name, *args, **kwargs):
function_call = FunctionCall(method_name, args, kwargs)
function_call = {'name': method_name, 'args': args, 'kwargs': kwargs}
response_queue = self.message_queue + ':rpc:' + random_string()
rpc_request = dict(function_call=function_call, response_queue=response_queue)
message = self.transport.dumps(rpc_request)
Expand Down Expand Up @@ -188,11 +153,14 @@ def run(self):
logging.debug('RPC Request: %s' % message)
transport, rpc_request = decode_message(message)
response_queue = rpc_request['response_queue']
function_call = FunctionCall.from_dict(rpc_request['function_call'])
code = 'self.return_value = self.local_object.' + function_call.as_python_code()
function_call = rpc_request['function_call']
try:
exec(code)
rpc_response = dict(return_value=self.return_value)
f_name = function_call['name']
f_args = function_call.get('args', ())
f_kw = function_call.get('kwargs', {})
func = getattr(self.local_object, f_name)
return_value = func(*f_args, **f_kw)
rpc_response = dict(return_value=return_value)
except:
(type, value, traceback) = sys.exc_info()
rpc_response = dict(exception=repr(value))
Expand Down