-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpika_client.py
More file actions
148 lines (122 loc) · 5.82 KB
/
pika_client.py
File metadata and controls
148 lines (122 loc) · 5.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python
# coding: utf-8
import pika
import json
import tornado.ioloop
from pika.adapters.tornado_connection import TornadoConnection
class PikaClient(object):
'''
Helper class to manage RabbitMQ/Pika interface
'''
def __init__(self):
# Construct queue names request and response
self.queue_name_req = 'request'
self.queue_name_resp = 'response'
# A place to keep requests and reponses from Rabbitmq
self.req_listeners = set([])
self.resp_listeners = set([])
def connect(self):
pika.log.info('PikaClient: Connecting to RabbitMQ on localhost:5672')
credentials = pika.PlainCredentials('guest', 'guest')
param = pika.ConnectionParameters(host='localhost',
port=5672,
virtual_host="/",
credentials=credentials)
self.connection = TornadoConnection(param,
on_open_callback=self.on_connected)
self.connection.add_on_close_callback(self.on_closed)
def on_connected(self, connection):
pika.log.info('PikaClient: Connected to RabbitMQ on localhost:5672')
self.connection = connection
self.connection.channel(self.on_channel_open)
def on_channel_open(self, channel):
pika.log.info('PikaClient: Channel Open, Declaring Exchange')
self.channel = channel
self.channel.exchange_declare(exchange='tornado',
type="topic",
auto_delete=True,
durable=False,
callback=self.on_exchange_declared)
def on_exchange_declared(self, frame):
pika.log.info('PikaClient: Exchange Declared, Declaring Queues')
self.channel.queue_declare(queue=self.queue_name_req,
auto_delete=True,
durable=False,
exclusive=False)
self.channel.queue_declare(queue=self.queue_name_resp,
auto_delete=True,
durable=False,
exclusive=False,
callback=self.on_queue_declared)
def on_queue_declared(self, frame):
pika.log.info('PikaClient: Queues Declared, Binding Queues')
self.channel.queue_bind(exchange='tornado',
queue=self.queue_name_req,
routing_key='*.longpoll.request')
self.channel.queue_bind(exchange='tornado',
queue=self.queue_name_resp,
routing_key='*.longpoll.response',
callback=self.on_queue_bound)
def on_queue_bound(self, frame):
pika.log.info('PikaClient: Queue Bound, Issuing Basic Consume')
self.channel.basic_consume(consumer_callback=self.on_pika_req_message,
queue=self.queue_name_req,
no_ack=True)
self.channel.basic_consume(consumer_callback=self.on_pika_resp_message,
queue=self.queue_name_resp,
no_ack=True)
def on_pika_req_message(self, channel, method, header, body):
log = 'PikaCient: Request Message received: %s'
pika.log.info(log % body)
self.notify_listeners(body, 'request')
def on_pika_resp_message(self, channel, method, header, body):
log = 'PikaCient: Response Message received: %s'
pika.log.info(log % body)
self.notify_listeners(body, 'response')
def on_basic_cancel(self, frame):
pika.log.info('PikaClient: Basic Cancel Ok')
# If we don't have any more consumer processes running close
self.connection.close()
def on_closed(self, connection):
# We've closed our pika connection so stop the demo
tornado.ioloop.IOLoop.instance().stop()
def server_test(self, tornado_request, action):
# Prepare POST information for RabbitMQ
if action == 'request':
body = json.dumps(tornado_request.arguments)
elif action == 'response':
body = tornado_request.body
# Send the message
properties = pika.BasicProperties(
content_type="application/json",
delivery_mode=1
)
self.channel.basic_publish(exchange='tornado',
routing_key='*.longpoll.' + action,
properties=properties,
body=body)
def notify_listeners(self, message, action):
# Deliver the message from RabbitMQ and finish the connection
if action == 'request':
listeners = self.req_listeners.copy()
elif action == 'response':
listeners = self.resp_listeners.copy()
for listener in listeners:
listener.finish(json.dumps(message))
pika.log.info('PikaClient: notified %s' % repr(listener))
self.remove_listener(listener, action)
def add_listener(self, listener, action):
if action == 'request':
self.req_listeners.add(listener)
elif action == 'response':
self.resp_listeners.add(listener)
pika.log.info('PikaClient: listener %s added' % repr(listener))
def remove_listener(self, listener, action):
try:
if action == 'request':
self.req_listeners.remove(listener)
elif action == 'response':
self.resp_listeners.remove(listener)
pika.log.info('PikaClient: listener %s removed' % repr(listener))
except KeyError:
pass