From 264aa4d3a86d8d9f21f265231ef71997f533cfed Mon Sep 17 00:00:00 2001 From: "EYFL (Eskild Schroll-Fleischer)" Date: Thu, 15 May 2025 12:29:11 +0200 Subject: [PATCH 1/5] Add unsubscribe method so client can unsubscribe from topics. --- fastapi_websocket_pubsub/pub_sub_server.py | 4 ++++ fastapi_websocket_pubsub/rpc_event_methods.py | 13 +++++++++++++ 2 files changed, 17 insertions(+) diff --git a/fastapi_websocket_pubsub/pub_sub_server.py b/fastapi_websocket_pubsub/pub_sub_server.py index 83f3913..755268a 100644 --- a/fastapi_websocket_pubsub/pub_sub_server.py +++ b/fastapi_websocket_pubsub/pub_sub_server.py @@ -91,6 +91,10 @@ async def subscribe( ) -> List[Subscription]: return await self.notifier.subscribe(self._subscriber_id, topics, callback) + async def unsubscribe( + self, topics: Union[TopicList, ALL_TOPICS]) -> List[Subscription]: + return await self.notifier.unsubscribe(self._subscriber_id, topics) + async def publish(self, topics: Union[TopicList, Topic], data=None): """ Publish events to subscribres of given topics currently connected to the endpoint diff --git a/fastapi_websocket_pubsub/rpc_event_methods.py b/fastapi_websocket_pubsub/rpc_event_methods.py index ea797c6..73b4163 100644 --- a/fastapi_websocket_pubsub/rpc_event_methods.py +++ b/fastapi_websocket_pubsub/rpc_event_methods.py @@ -49,6 +49,19 @@ async def callback(subscription: Subscription, data): "Failed to subscribe to RPC events notifier", topics) return False + async def unsubscribe(self, topics: TopicList = []) -> bool: + """ + provided by the server so that the client can unsubscribe topics. + """ + for topic in topics.copy(): + if topic not in self.event_notifier._topics: + self.logger.warning(f"Cannot unsubscribe topic '{topic}' which is not subscribed.") + topics.remove(topic) + # We'll use the remote channel id as our subscriber id + sub_id = await self._get_channel_id_() + await self.event_notifier.unsubscribe(sub_id, topics) + return True + async def publish(self, topics: TopicList = [], data=None, sync=True, notifier_id=None) -> bool: """ Publish an event through the server to subscribers From 6414ca708da886e966d8082f28a8d4a4dbd59872 Mon Sep 17 00:00:00 2001 From: Eskild Schroll-Fleischer Date: Tue, 20 May 2025 20:08:23 +0200 Subject: [PATCH 2/5] Add support for post connection subscriptions --- fastapi_websocket_pubsub/pub_sub_client.py | 29 ++++++++++++++-------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/fastapi_websocket_pubsub/pub_sub_client.py b/fastapi_websocket_pubsub/pub_sub_client.py index 4ad917e..ba2257f 100644 --- a/fastapi_websocket_pubsub/pub_sub_client.py +++ b/fastapi_websocket_pubsub/pub_sub_client.py @@ -251,7 +251,7 @@ async def _primary_on_connect(self, channel: RpcChannel): def subscribe(self, topic: Topic, callback: Coroutine): """ - Subscribe for events (prior to starting the client) + Subscribe for events (before and after starting the client) @see fastapi_websocket_pubsub/rpc_event_methods.py :: RpcEventServerMethods.subscribe Args: @@ -260,18 +260,25 @@ def subscribe(self, topic: Topic, callback: Coroutine): 'hello' or a complex path 'a/b/c/d' . Note: You can use ALL_TOPICS (event_notifier.ALL_TOPICS) to subscribe to all topics callback (Coroutine): the function to call upon relevant event publishing + + Returns: + Coroutine: awaitable task to subscribe to topic if connected. """ - # TODO: add support for post connection subscriptions - if not self.is_ready(): - self._topics.add(topic) - # init to empty list if no entry - callbacks = self._callbacks[topic] = self._callbacks.get(topic, []) - # add callback to callbacks list of the topic - callbacks.append(callback) + topic_is_new = topic not in self._topics + self._topics.add(topic) + # init to empty list if no entry + callbacks = self._callbacks[topic] = self._callbacks.get(topic, []) + # add callback to callbacks list of the topic + callbacks.append(callback) + if topic_is_new and self.is_ready(): + return self._rpc_channel.other.subscribe(topics=[topic]) else: - raise PubSubClientInvalidStateException( - "Client already connected and subscribed" - ) + # If we can't return an RPC call future then we need + # to supply something else to not fail when the + # calling code awaits the result of this function. + future = asyncio.Future() + future.set_result(None) + return future async def publish( self, topics: TopicList, data=None, sync=True, notifier_id=None From 019f532ad1391c3f0f22abf0f3196561d51f0f79 Mon Sep 17 00:00:00 2001 From: Eskild Schroll-Fleischer Date: Tue, 20 May 2025 20:08:47 +0200 Subject: [PATCH 3/5] Add unsubscribe support --- fastapi_websocket_pubsub/pub_sub_client.py | 43 ++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/fastapi_websocket_pubsub/pub_sub_client.py b/fastapi_websocket_pubsub/pub_sub_client.py index ba2257f..893d715 100644 --- a/fastapi_websocket_pubsub/pub_sub_client.py +++ b/fastapi_websocket_pubsub/pub_sub_client.py @@ -280,6 +280,49 @@ def subscribe(self, topic: Topic, callback: Coroutine): future.set_result(None) return future + def unsubscribe(self, topic: Topic): + """ + Unsubscribe for events + + Args: + topic (Topic): the identifier of the event topic to be unsubscribed. + Note: You can use ALL_TOPICS (event_notifier.ALL_TOPICS) to unsubscribe all topics + + Returns: + Coroutine: awaitable task to subscribe to topic if connected. + """ + # Create none-future which can be safely awaited + # but which also will not give warnings + # if it isn't awaited. This is returned + # on code paths which do not make RPC calls. + none_future = asyncio.Future() + none_future.set_result(None) + + # Topics to potentially make RPC calls about + topics = list(self._topics) if topic is ALL_TOPICS else [topic] + + # Handle ALL_TOPICS or specific topics + if topic is ALL_TOPICS and not self._topics: + logger.warning(f"Cannot unsubscribe 'ALL_TOPICS'. No topics are subscribed.") + return none_future + elif topic is not ALL_TOPICS and topic not in self._topics: + logger.warning(f"Cannot unsubscribe topic '{topic}' which is not subscribed.") + return none_future + elif topic is ALL_TOPICS and self._topics: + logger.debug(f"Unsubscribing all topics: {self._topics}") + # remove all topics and callbacks + self._topics.clear() + self._callbacks.clear() + elif topic is not ALL_TOPICS and topic in self._topics: + logger.debug(f"Unsubscribing topic '{topic}'") + self._topics.remove(topic) + self._callbacks.pop(topic, None) + + if self.is_ready(): + return self._rpc_channel.other.unsubscribe(topics=topics) + else: + return none_future + async def publish( self, topics: TopicList, data=None, sync=True, notifier_id=None ) -> bool: From 12d19ca4b4e597429f4ef5f33dd721cf61305850 Mon Sep 17 00:00:00 2001 From: Eskild Schroll-Fleischer Date: Tue, 20 May 2025 20:09:09 +0200 Subject: [PATCH 4/5] Add test case concerning subscribe/unsubscribe --- tests/basic_test.py | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/tests/basic_test.py b/tests/basic_test.py index 4b5c1aa..58416a4 100644 --- a/tests/basic_test.py +++ b/tests/basic_test.py @@ -133,3 +133,47 @@ async def on_event(data, topic): assert published.result # wait for finish trigger await asyncio.wait_for(finish.wait(), 5) + + +@pytest.mark.asyncio +async def test_pub_sub_unsub(server): + """ + Check client can unsubscribe topic and subscribe again. + """ + # finish trigger + finish = asyncio.Event() + async with PubSubClient() as client: + + async def on_event(data, topic): + assert data == DATA + finish.set() + + # subscribe for the event + client.subscribe(EVENT_TOPIC, on_event) + # start listentining + client.start_client(uri) + # wait for the client to be ready to receive events + await client.wait_until_ready() + # trigger the server via an HTTP route + requests.get(trigger_url) + # wait for finish trigger + await asyncio.wait_for(finish.wait(), 5) + assert finish.is_set() + + # unsubscribe and see that we don't get a message + finish.clear() + await client.unsubscribe(EVENT_TOPIC) + requests.get(trigger_url) + # wait for finish trigger which isn't coming + with pytest.raises(asyncio.TimeoutError) as excinfo: + await asyncio.wait_for(finish.wait(), 5) + assert not finish.is_set() + + # subscribe again and observe that we get the trigger + finish.clear() + await client.subscribe(EVENT_TOPIC, on_event) + # trigger the server via an HTTP route + requests.get(trigger_url) + # wait for finish trigger + await asyncio.wait_for(finish.wait(), 5) + assert finish.is_set() \ No newline at end of file From 05d97fbf35af1a70da87cef222e624f8d0553b11 Mon Sep 17 00:00:00 2001 From: Dan Yishai Date: Thu, 12 Jun 2025 16:34:15 +0300 Subject: [PATCH 5/5] dan/per-12313-release-fastapi-ws-rpc-pubsub (#91) * Update permit-broadcaster version constraints to <1 in requirements * Add CD workflow for building and publishing to PyPI --- .github/workflows/release.yml | 48 +++++++++++++++++++++++++++++++++++ .gitignore | 1 + requirements-dev.txt | 2 +- requirements.txt | 2 +- setup.py | 8 +++--- 5 files changed, 55 insertions(+), 6 deletions(-) create mode 100644 .github/workflows/release.yml diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..6c4abf1 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,48 @@ +name: Build and publish to Pypi +on: + release: + # job will automatically run after a new "release" is create on github. + types: [published] + +jobs: + publish_fastapi_websocket_rpc: + runs-on: ubuntu-latest + environment: + name: pypi + url: https://pypi.org/p/permit + permissions: + id-token: write + contents: write # 'write' access to repository contents + pull-requests: write # 'write' access to pull requests + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Python setup + uses: actions/setup-python@v5 + with: + python-version: '3.11.8' + + - name: Bump version and build package + run: | + # Get version tag and remove 'v' prefix + version_tag=${{ github.event.release.tag_name }} + version_tag=${version_tag#v} + + # Update version in setup.py + sed -i "s/version=\"[0-9][0-9]*\.[0-9][0-9]*\.[0-9][0-9]*\"/version=\"$version_tag\"/" setup.py + + # Print version for verification + echo "Version being published: $version_tag" + cat setup.py | grep version= + + - name: Build Python package + run: | + pip install wheel + python setup.py sdist bdist_wheel + + # Publish package distributions to PyPI + - name: Publish package distributions to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + with: + password: ${{ secrets.PYPI_TOKEN }} diff --git a/.gitignore b/.gitignore index 7d6da5c..8d02e6c 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,4 @@ dmypy.json # editors .vscode/ +.idea/ \ No newline at end of file diff --git a/requirements-dev.txt b/requirements-dev.txt index b4d1f17..d01f434 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -6,4 +6,4 @@ wheel loguru uvicorn pytest-timeout -permit-broadcaster[redis,postgres,kafka]>=0.2.5,<3 \ No newline at end of file +permit-broadcaster[redis,postgres,kafka]>=0.2.5,<1 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 4686357..63ded64 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,4 @@ fastapi-websocket-rpc>=0.1.25,<1 packaging>=20.4 pydantic>=1.9.1 websockets>=14.0 -permit-broadcaster>=0.2.5,<3 \ No newline at end of file +permit-broadcaster>=0.2.5,<1 \ No newline at end of file diff --git a/setup.py b/setup.py index b996403..33ea72c 100644 --- a/setup.py +++ b/setup.py @@ -31,9 +31,9 @@ def get_requirements(env=""): python_requires=">=3.9", install_requires=get_requirements(), extras_require = { - "redis": ["permit-broadcaster[redis]>=0.2.5,<3"], - "postgres": ["permit-broadcaster[postgres]>=0.2.5,<3"], - "kafka": ["permit-broadcaster[kafka]>=0.2.5,<3"], - "all": ["permit-broadcaster[redis,postgres,kafka]>=0.2.5,<3"], + "redis": ["permit-broadcaster[redis]>=0.2.5,<1"], + "postgres": ["permit-broadcaster[postgres]>=0.2.5,<1"], + "kafka": ["permit-broadcaster[kafka]>=0.2.5,<1"], + "all": ["permit-broadcaster[redis,postgres,kafka]>=0.2.5,<1"], } )