From 574d50cb94dc7468ef71004c40e5010544fc9194 Mon Sep 17 00:00:00 2001 From: Ratnadeep Debnath Date: Fri, 17 Feb 2017 13:59:22 +0530 Subject: [PATCH 1/2] Implemented a wrapper to interact with redis. --- ircb/lib/redis/__init__.py | 119 +++++++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 ircb/lib/redis/__init__.py diff --git a/ircb/lib/redis/__init__.py b/ircb/lib/redis/__init__.py new file mode 100644 index 0000000..bce1486 --- /dev/null +++ b/ircb/lib/redis/__init__.py @@ -0,0 +1,119 @@ +import asyncio +import aioredis + + +class Redis(): + """ + Wrapper around aioredis library to talk to redis. It provides some + higher level utility methods to interact with redis as a cache or + as a data structure code. + """ + SUPPORTED_METHODS = set([ + 'set', + 'get', + 'sadd', + 'smembers', + 'delete' + ]) + + def __init__(self, host='127.0.0.1', port=6379, db=0): + self.host = host + self.port = port + self.db = db + self._initialized = False + + async def init(self, reset=False): + """ + Initialize redis connections + + Args: + reset (bool): Whether to reset redis connection + """ + if not self._initialized or reset: + # redis connection to reade/write + self.conn = await aioredis.create_redis( + (self.host, self.port), db=self.db) + # redis connection to subscribe + self.sub = await aioredis.create_redis( + (self.host, self.port), db=self.db) + self._initialized = True + + async def subscribe(self, pattern, handler): + """ + Subscribe for a pattern from Redis + + Args: + pattern (str): Pattern to subscribe to + handler (func): Handler to handle published message + + Returns: + Task object running the handler function + """ + res = await self.sub.subscribe(pattern) + ch = res[0] + return asyncio.ensure_future(handler(pattern, ch)) + + async def watch(self, key, handler): + """ + Watch a redis key for change events. + + Args: + key (str): Name of key + handler (func): Handler function to handle key change + + Returns: + A task object running the handler + """ + res = await self.sub.subscribe('__keyspace@0__:{}'.format(key)) + ch = res[0] + return asyncio.ensure_future(handler(key, ch)) + + def __getattr__(self, key): + """ + Interface select redis operations from redis connection + object to this instance. + """ + val = None + if key not in self.SUPPORTED_METHODS: + raise AttributeError( + 'RedisError: Key: "{}" not supported'.format(key)) + if key in ('subscribe',): + val = getattr(self.sub, key) + else: + val = getattr(self.conn, key) + if val: + setattr(self, key, val) + return val + + def close(self): + """Close redis connections""" + self.conn.close() + self.sub.close() + + +redis = Redis() + + +def main(): + loop = asyncio.get_event_loop() + + async def reader(key, ch): + while (await ch.wait_message()): + msg = await ch.get() + print("Got Message:", msg) + val = await redis.smembers(key) + print(val) + + async def go(): + await redis.init() + tsk = await redis.watch('mykey', reader) + await redis.sadd('mykey', 1, 2, 3, 7) + print(await redis.smembers('mykey')) + await redis.delete('mykey') + await tsk + redis.close() + + loop.run_until_complete(go()) + +if __name__ == '__main__': + main() From 993e488dbe1be5989dd7164df50ccc5e546d17ae Mon Sep 17 00:00:00 2001 From: Ratnadeep Debnath Date: Tue, 21 Feb 2017 20:21:37 +0530 Subject: [PATCH 2/2] Wire up dispatcher with redis as discovery engine. - dynamic store/storeclient discovery --- ircb/config/default_settings.py | 6 ++ ircb/lib/dispatcher/__init__.py | 119 +++++++++++++++++++------------- 2 files changed, 76 insertions(+), 49 deletions(-) diff --git a/ircb/config/default_settings.py b/ircb/config/default_settings.py index d8f9ead..9be72ff 100644 --- a/ircb/config/default_settings.py +++ b/ircb/config/default_settings.py @@ -94,3 +94,9 @@ # A 32 byte string WEB_SALT = b'c237202ee55411e584f4cc3d8237ff4b' + +# Redis keys +REDIS_KEYS = { + 'STORE': 'store', + 'STORE_CLIENTS': 'store_clients' +} diff --git a/ircb/lib/dispatcher/__init__.py b/ircb/lib/dispatcher/__init__.py index 1046667..99549fa 100644 --- a/ircb/lib/dispatcher/__init__.py +++ b/ircb/lib/dispatcher/__init__.py @@ -7,6 +7,7 @@ from collections import defaultdict from ircb.config import settings +from ircb.lib.redis import redis logger = logging.getLogger('dispatcher') @@ -15,8 +16,6 @@ class Handler(aiozmq.rpc.AttrHandler): def __init__(self, dispatcher): self._dispatcher = dispatcher - # lock for registering subscriber - self._lock = asyncio.Lock() @aiozmq.rpc.method def send(self, signal, data, taskid=None): @@ -30,26 +29,10 @@ def send(self, signal, data, taskid=None): logger.error('SEND ERROR: {} {} {} {}'.format( e, signal, data, taskid), exc_info=True) - @asyncio.coroutine - @aiozmq.rpc.method - def register_sub(self, subscriber_addr, key): - yield from self._lock.acquire() - try: - connections = self._dispatcher.publisher.transport.connections() - if subscriber_addr in connections: - self._lock.release() - return - self._dispatcher.publisher.transport.connect(subscriber_addr) - redis = yield from aioredis.create_redis( - (settings.REDIS_HOST, settings.REDIS_PORT) - ) - yield from redis.set(key, 1) - redis.close() - finally: - self._lock.release() - class Dispatcher(object): + STORE_KEY = settings.REDIS_KEYS['STORE'] + STORE_CLIENTS_KEY = settings.REDIS_KEYS['STORE_CLIENTS'] def __init__(self, role, loop=None): self.loop = loop or asyncio.get_event_loop() @@ -74,43 +57,81 @@ def process_queue(self): except asyncio.QueueEmpty: break + @asyncio.coroutine + def watch_store(self, key, ch): + """ + Watch store key in redis for change. + + Args: + key (str): Name of redis key for store + ch: aioredis channel obj + """ + while (yield from ch.wait_message()): + msg = yield from ch.get() + endpoint = yield from redis.get(key) + self.publisher.transport.connect(endpoint.decode()) + if self.publisher.transport.connections(): + if self.lock.locked(): + self.lock.release() + + @asyncio.coroutine + def watch_storeclients(self, key, ch): + """ + Watch store_clients key in redis for change. + + Args: + key (str): Name of redis key for store clients + ch (obj): aioredis channel obj + """ + while (yield from ch.wait_message()): + msg = yield from ch.get() + endpoints = yield from redis.smembers(key) + for endpoint in endpoints: + self.publisher.transport.connect(endpoint.decode()) + if self.publisher.transport.connections(): + if self.lock.locked(): + self.lock.release() + @asyncio.coroutine def setup_pubsub(self): - redis = yield from aioredis.create_redis( - (settings.REDIS_HOST, settings.REDIS_PORT) - ) - if self.role == 'stores': - bind_addr = settings.SUBSCRIBER_ENDPOINTS[self.role] - else: - bind_addr = 'tcp://{host}:*'.format(host=settings.INTERNAL_HOST) + # Initialize redis connection + yield from redis.init() + + bind_addr = 'tcp://{host}:*'.format(host=settings.INTERNAL_HOST) self.subscriber = yield from aiozmq.rpc.serve_pubsub( self.handler, subscribe='', bind=bind_addr, log_exceptions=True) subscriber_addr = list(self.subscriber.transport.bindings())[0] self.publisher = yield from aiozmq.rpc.connect_pubsub() - if self.role == 'storeclient': - self.publisher.transport.connect( - settings.SUBSCRIBER_ENDPOINTS['stores']) - _key = 'SUBSCRIBER_REGISTERED_{}'.format(subscriber_addr) - ret = 0 - yield from redis.set(_key, ret) - while ret != b'1': - yield from self.publisher.publish( - 'register_sub' - ).register_sub( - subscriber_addr, _key - ) - ret = yield from redis.get(_key) - yield from asyncio.sleep(0.01) - self.lock.release() - redis.close() - - @property - def subscriber_endpoints(self): - return [endpoint for role, endpoint in - settings.SUBSCRIBER_ENDPOINTS.items() - if role != self.role] + + if self.role == 'stores': + # Register current store endpoint + yield from redis.set(settings.REDIS_KEYS['STORE'], + subscriber_addr) + + # Set watcher for store clients to handle new store clients + yield from redis.watch(self.STORE_CLIENTS_KEY, self.watch_storeclients) + + # Setup initial connections to available store clients + store_client_endpoints = yield from redis.smembers( + self.STORE_CLIENTS_KEY) + for endpoint in store_client_endpoints: + self.publisher.transport.connect(endpoint.decode()) + else: + # Register current store client endpoint to pool + yield from redis.sadd(settings.REDIS_KEYS['STORE_CLIENTS'], + subscriber_addr) + + # Enable watcher to track store endpoints + yield from redis.watch(self.STORE_KEY, self.watch_store) + + # Setup initial connection to store + store_endpoint = yield from redis.get( + settings.REDIS_KEYS['STORE']) + self.publisher.transport.connect(store_endpoint.decode()) + if self.publisher.transport.connections(): + self.lock.release() def send(self, signal, data, taskid=None): asyncio.Task(self.enqueue((signal, data, taskid)))