Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ircb/config/default_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,9 @@

# A 32 byte string
WEB_SALT = b'c237202ee55411e584f4cc3d8237ff4b'

# Redis keys
REDIS_KEYS = {
'STORE': 'store',
'STORE_CLIENTS': 'store_clients'
}
119 changes: 70 additions & 49 deletions ircb/lib/dispatcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from collections import defaultdict

from ircb.config import settings
from ircb.lib.redis import redis

logger = logging.getLogger('dispatcher')

Expand All @@ -15,8 +16,6 @@ class Handler(aiozmq.rpc.AttrHandler):

def __init__(self, dispatcher):
self._dispatcher = dispatcher
# lock for registering subscriber
self._lock = asyncio.Lock()

@aiozmq.rpc.method
def send(self, signal, data, taskid=None):
Expand All @@ -30,26 +29,10 @@ def send(self, signal, data, taskid=None):
logger.error('SEND ERROR: {} {} {} {}'.format(
e, signal, data, taskid), exc_info=True)

@asyncio.coroutine
@aiozmq.rpc.method
def register_sub(self, subscriber_addr, key):
yield from self._lock.acquire()
try:
connections = self._dispatcher.publisher.transport.connections()
if subscriber_addr in connections:
self._lock.release()
return
self._dispatcher.publisher.transport.connect(subscriber_addr)
redis = yield from aioredis.create_redis(
(settings.REDIS_HOST, settings.REDIS_PORT)
)
yield from redis.set(key, 1)
redis.close()
finally:
self._lock.release()


class Dispatcher(object):
STORE_KEY = settings.REDIS_KEYS['STORE']
STORE_CLIENTS_KEY = settings.REDIS_KEYS['STORE_CLIENTS']

def __init__(self, role, loop=None):
self.loop = loop or asyncio.get_event_loop()
Expand All @@ -74,43 +57,81 @@ def process_queue(self):
except asyncio.QueueEmpty:
break

@asyncio.coroutine
def watch_store(self, key, ch):
"""
Watch store key in redis for change.

Args:
key (str): Name of redis key for store
ch: aioredis channel obj
"""
while (yield from ch.wait_message()):
msg = yield from ch.get()
endpoint = yield from redis.get(key)
self.publisher.transport.connect(endpoint.decode())
if self.publisher.transport.connections():
if self.lock.locked():
self.lock.release()

@asyncio.coroutine
def watch_storeclients(self, key, ch):
"""
Watch store_clients key in redis for change.

Args:
key (str): Name of redis key for store clients
ch (obj): aioredis channel obj
"""
while (yield from ch.wait_message()):
msg = yield from ch.get()
endpoints = yield from redis.smembers(key)
for endpoint in endpoints:
self.publisher.transport.connect(endpoint.decode())
if self.publisher.transport.connections():
if self.lock.locked():
self.lock.release()

@asyncio.coroutine
def setup_pubsub(self):
redis = yield from aioredis.create_redis(
(settings.REDIS_HOST, settings.REDIS_PORT)
)
if self.role == 'stores':
bind_addr = settings.SUBSCRIBER_ENDPOINTS[self.role]
else:
bind_addr = 'tcp://{host}:*'.format(host=settings.INTERNAL_HOST)
# Initialize redis connection
yield from redis.init()

bind_addr = 'tcp://{host}:*'.format(host=settings.INTERNAL_HOST)
self.subscriber = yield from aiozmq.rpc.serve_pubsub(
self.handler, subscribe='',
bind=bind_addr,
log_exceptions=True)
subscriber_addr = list(self.subscriber.transport.bindings())[0]
self.publisher = yield from aiozmq.rpc.connect_pubsub()
if self.role == 'storeclient':
self.publisher.transport.connect(
settings.SUBSCRIBER_ENDPOINTS['stores'])
_key = 'SUBSCRIBER_REGISTERED_{}'.format(subscriber_addr)
ret = 0
yield from redis.set(_key, ret)
while ret != b'1':
yield from self.publisher.publish(
'register_sub'
).register_sub(
subscriber_addr, _key
)
ret = yield from redis.get(_key)
yield from asyncio.sleep(0.01)
self.lock.release()
redis.close()

@property
def subscriber_endpoints(self):
return [endpoint for role, endpoint in
settings.SUBSCRIBER_ENDPOINTS.items()
if role != self.role]

if self.role == 'stores':
# Register current store endpoint
yield from redis.set(settings.REDIS_KEYS['STORE'],
subscriber_addr)

# Set watcher for store clients to handle new store clients
yield from redis.watch(self.STORE_CLIENTS_KEY, self.watch_storeclients)

# Setup initial connections to available store clients
store_client_endpoints = yield from redis.smembers(
self.STORE_CLIENTS_KEY)
for endpoint in store_client_endpoints:
self.publisher.transport.connect(endpoint.decode())
else:
# Register current store client endpoint to pool
yield from redis.sadd(settings.REDIS_KEYS['STORE_CLIENTS'],
subscriber_addr)

# Enable watcher to track store endpoints
yield from redis.watch(self.STORE_KEY, self.watch_store)

# Setup initial connection to store
store_endpoint = yield from redis.get(
settings.REDIS_KEYS['STORE'])
self.publisher.transport.connect(store_endpoint.decode())
if self.publisher.transport.connections():
self.lock.release()

def send(self, signal, data, taskid=None):
asyncio.Task(self.enqueue((signal, data, taskid)))
Expand Down
119 changes: 119 additions & 0 deletions ircb/lib/redis/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import asyncio
import aioredis


class Redis():
"""
Wrapper around aioredis library to talk to redis. It provides some
higher level utility methods to interact with redis as a cache or
as a data structure code.
"""
SUPPORTED_METHODS = set([
'set',
'get',
'sadd',
'smembers',
'delete'
])

def __init__(self, host='127.0.0.1', port=6379, db=0):
self.host = host
self.port = port
self.db = db
self._initialized = False

async def init(self, reset=False):
"""
Initialize redis connections

Args:
reset (bool): Whether to reset redis connection
"""
if not self._initialized or reset:
# redis connection to reade/write
self.conn = await aioredis.create_redis(
(self.host, self.port), db=self.db)
# redis connection to subscribe
self.sub = await aioredis.create_redis(
(self.host, self.port), db=self.db)
self._initialized = True

async def subscribe(self, pattern, handler):
"""
Subscribe for a pattern from Redis

Args:
pattern (str): Pattern to subscribe to
handler (func): Handler to handle published message

Returns:
Task object running the handler function
"""
res = await self.sub.subscribe(pattern)
ch = res[0]
return asyncio.ensure_future(handler(pattern, ch))

async def watch(self, key, handler):
"""
Watch a redis key for change events.

Args:
key (str): Name of key
handler (func): Handler function to handle key change

Returns:
A task object running the handler
"""
res = await self.sub.subscribe('__keyspace@0__:{}'.format(key))
ch = res[0]
return asyncio.ensure_future(handler(key, ch))

def __getattr__(self, key):
"""
Interface select redis operations from redis connection
object to this instance.
"""
val = None
if key not in self.SUPPORTED_METHODS:
raise AttributeError(
'RedisError: Key: "{}" not supported'.format(key))
if key in ('subscribe',):
val = getattr(self.sub, key)
else:
val = getattr(self.conn, key)
if val:
setattr(self, key, val)
return val

def close(self):
"""Close redis connections"""
self.conn.close()
self.sub.close()


redis = Redis()


def main():
loop = asyncio.get_event_loop()

async def reader(key, ch):
while (await ch.wait_message()):
msg = await ch.get()
print("Got Message:", msg)
val = await redis.smembers(key)
print(val)

async def go():
await redis.init()
tsk = await redis.watch('mykey', reader)
await redis.sadd('mykey', 1, 2, 3, 7)
print(await redis.smembers('mykey'))
await redis.delete('mykey')
await tsk
redis.close()

loop.run_until_complete(go())

if __name__ == '__main__':
main()