Skip to content

Commit 0431269

Browse files
authored
Merge pull request #80 from serv-c/drgroot-patch-1
feat(bus): adding in asb support
2 parents 5e528dd + 180aa2f commit 0431269

File tree

2 files changed

+139
-1
lines changed

2 files changed

+139
-1
lines changed

requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ simplejson==3.20.1
55
flask==3.1.0
66
pyyaml==6.0.2
77
pyiceberg[sql-sqlite,pyarrow]==0.8.1
8-
deltalake==0.25.5
8+
deltalake==0.25.5
9+
azure-servicebus==7.14.2

servc/svc/com/bus/asb.py

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
from __future__ import annotations
2+
3+
import json
4+
import threading
5+
import time
6+
from typing import Any
7+
8+
import simplejson
9+
from azure.servicebus import ServiceBusClient, ServiceBusMessage, ServiceBusReceiver
10+
from azure.servicebus.management import ServiceBusAdministrationClient
11+
from servc.svc.com.bus import BusComponent, InputProcessor, OnConsuming
12+
from servc.svc.com.cache.redis import decimal_default
13+
from servc.svc.io.input import EventPayload, InputPayload, InputType
14+
from servc.svc.io.output import StatusCode
15+
16+
17+
class AzureServiceBus(BusComponent):
18+
_url: str
19+
20+
_conn: ServiceBusClient | None = None
21+
22+
@property
23+
def isReady(self) -> bool:
24+
return self._conn is not None
25+
26+
@property
27+
def isOpen(self) -> bool:
28+
return self.isReady
29+
30+
def isBlockingConnection(self) -> bool:
31+
return isinstance(self._conn, ServiceBusClient)
32+
33+
def _connect(self):
34+
if not self.isOpen:
35+
self._conn = ServiceBusClient.from_connection_string(self._url)
36+
37+
def _close(self, expected=True, reason: Any = None):
38+
print("Close method called", flush=True)
39+
if not expected:
40+
print("Unexpected close: ", reason, flush=True)
41+
exit(1)
42+
if self.isOpen or self.isReady:
43+
if (
44+
self._conn
45+
# and not self._conn.is_closed
46+
# and (self.isBlockingConnection() or not self._conn.is_closing)
47+
):
48+
self._conn.close()
49+
self._conn = None
50+
51+
return True
52+
return False
53+
54+
def publishMessage(self, route: str, message: InputPayload | EventPayload) -> bool:
55+
if not self.isReady or not self._conn:
56+
self._connect()
57+
if not self._conn:
58+
raise Exception("Service Bus connection is not established")
59+
60+
isEvent = (
61+
True
62+
if "type" in message
63+
and message["type"] in [InputType.EVENT.value, InputType.EVENT]
64+
else False
65+
)
66+
asb_message = ServiceBusMessage(
67+
simplejson.dumps(message, default=decimal_default, ignore_nan=True)
68+
)
69+
70+
# NOTE: azure service bus does not support event routing. thus, we must
71+
# manually handle the event routing
72+
if isEvent:
73+
with ServiceBusAdministrationClient.from_connection_string(
74+
self._url
75+
) as admin_client:
76+
for queue_properties in admin_client.list_queues():
77+
sender = self._conn.get_queue_sender(
78+
queue_name=self.getRoute(queue_properties.name)
79+
)
80+
with sender:
81+
sender.send_messages(asb_message)
82+
83+
return super().publishMessage(route, message)
84+
85+
sender = self._conn.get_queue_sender(queue_name=self.getRoute(route))
86+
with sender:
87+
sender.send_messages(asb_message)
88+
89+
return super().publishMessage(route, message)
90+
91+
def subscribe(
92+
self,
93+
route: str,
94+
inputProcessor: InputProcessor,
95+
onConsuming: OnConsuming | None,
96+
bindEventExchange: bool,
97+
) -> bool:
98+
if not self.isReady or not self._conn:
99+
self._connect()
100+
if not self._conn:
101+
raise Exception("Service Bus connection is not established")
102+
103+
receiver = self._conn.get_queue_receiver(queue_name=self.getRoute(route))
104+
with receiver:
105+
received_msgs = receiver.receive_messages(max_message_count=1)
106+
for msg in received_msgs:
107+
thread = threading.Thread(
108+
target=self.on_message,
109+
args=(msg, receiver, inputProcessor),
110+
)
111+
thread.start()
112+
thread.join()
113+
114+
time.sleep(1)
115+
self.subscribe(
116+
route,
117+
inputProcessor,
118+
onConsuming,
119+
bindEventExchange,
120+
)
121+
122+
return True
123+
124+
def on_message(
125+
self,
126+
body: Any,
127+
receiver: ServiceBusReceiver,
128+
inputProcessor: InputProcessor,
129+
):
130+
payload = json.loads(str(body))
131+
result = inputProcessor(payload)
132+
133+
if result == StatusCode.NO_PROCESSING:
134+
receiver.abandon_message(body)
135+
else:
136+
receiver.complete_message(body)
137+
print("Processed message", flush=True)

0 commit comments

Comments
 (0)