From 75b7b943c9ddca8f4a44b1a01f32ec9af0f7a06d Mon Sep 17 00:00:00 2001 From: Prince Oforh Asiedu Date: Tue, 1 Oct 2024 21:51:18 +0000 Subject: [PATCH 1/4] Initial commit --- publish.py | 30 ++++++++++++++++++++++++++++++ publish_test.py | 10 ++++++++++ requirements.txt | 3 +++ subscribe.py | 27 +++++++++++++++++++++++++++ subscribe_test.py | 13 +++++++++++++ 5 files changed, 83 insertions(+) create mode 100644 publish.py create mode 100644 publish_test.py create mode 100644 requirements.txt create mode 100644 subscribe.py create mode 100644 subscribe_test.py diff --git a/publish.py b/publish.py new file mode 100644 index 0000000..62625ae --- /dev/null +++ b/publish.py @@ -0,0 +1,30 @@ +import aiopg + +async def publish(dsn, channel: str, data: str): + """ + Publish a message to the PostgreSQL queue by inserting data into the table. + + Args: + dsn (str): The PostgreSQL database connection string. + channel (str): The channel where the message is to be dispatched. + data (str): The data to be inserted into the queue. + """ + # SQL query to insert data into the queue.message table + query = "INSERT INTO queue.message(channel, data) VALUES (%s, %s) RETURNING *" + values = (channel, data) + + async with aiopg.connect(dsn) as conn: + async with conn.cursor() as cur: + try: + # Execute the SQL query with the provided values + await cur.execute(query, values) + + # Fetch and print the result from the database + result = await cur.fetchone() + print(f"Postgres message dispatch success: {result}") + except Exception as error: + print(f"Error Occurred -> {error}") + + finally: + # This is often useful to let listeners know that no more messages will be sent. + await cur.execute(f"NOTIFY {channel}, 'finish'") \ No newline at end of file diff --git a/publish_test.py b/publish_test.py new file mode 100644 index 0000000..105fd1c --- /dev/null +++ b/publish_test.py @@ -0,0 +1,10 @@ +import asyncio +from publish import publish + +# Example usage (assuming you have a PostgreSQL connection pool): +async def main(): + dsn = "dbname=xxxxx user=xxxxxx password=xxxxxx host=localhost" + await publish(dsn, 'my_channel', '{"message": "Hello, world!"}') + +asyncio.run(main()) + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0d1a521 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +psycopg[binary] +asyncio +aiopg \ No newline at end of file diff --git a/subscribe.py b/subscribe.py new file mode 100644 index 0000000..e077146 --- /dev/null +++ b/subscribe.py @@ -0,0 +1,27 @@ +import psycopg2 + +async def subscribe(conn, channel: str): + """ + Listen for notifications on a PostgreSQL channel. + + Args: + conn (aiopg.Connection): The aiopg connection. + channel (str): The PostgreSQL channel to listen to. + """ + async with conn.cursor() as cur: + await cur.execute(f"LISTEN {channel}") + + # Continuously listen for notifications + while True: + try: + # Wait and get the next notification message + msg = await conn.notifies.get() + except psycopg2.Error as ex: + print("ERROR: ", ex) + return + + if msg.payload == "finish": + return + else: + print(f"Receive <- {msg.payload}") + diff --git a/subscribe_test.py b/subscribe_test.py new file mode 100644 index 0000000..7f79a86 --- /dev/null +++ b/subscribe_test.py @@ -0,0 +1,13 @@ +import asyncio +from subscribe import subscribe +import aiopg + +# Example usage (assuming you have a PostgreSQL connection pool): +async def main(): + #provide a postgres data source here + dsn = "dbname=xxxxx user=xxxxx password=xxxxx host=localhost" + async with aiopg.connect(dsn) as conn: + result = await subscribe(conn, 'my_channel') # change 'my_channel' to the channel you want to use + print(f"Received notification: {result}") + +asyncio.run(main()) From f862d17162e547e173086d0f5f1aaa7d3880db3f Mon Sep 17 00:00:00 2001 From: Prince Oforh Asiedu Date: Tue, 8 Oct 2024 06:07:09 +0000 Subject: [PATCH 2/4] refactored pub-sub functions into client; doc added[in README] --- README.md | 136 +++++++++++++++++++++++++++++++++++++++++++++- client.py | 61 +++++++++++++++++++++ publish.py | 30 ---------- publish_test.py | 10 ---- subscribe.py | 27 --------- subscribe_test.py | 13 ----- 6 files changed, 196 insertions(+), 81 deletions(-) create mode 100644 client.py delete mode 100644 publish.py delete mode 100644 publish_test.py delete mode 100644 subscribe.py delete mode 100644 subscribe_test.py diff --git a/README.md b/README.md index 401a67b..bca6a49 100644 --- a/README.md +++ b/README.md @@ -1 +1,135 @@ -Treetracker queue client for Python +# Treetracker queue Python client for PostgreSQL + +This Python project provides asynchronous pub-sub (publish-subscribe) functionality using PostgreSQL as a message broker, leveraging `aiopg` for asynchronous PostgreSQL connections and `psycopg2` for handling PostgreSQL notifications. + +## Features + +- **Subscribe**: Listen to PostgreSQL channels for incoming notifications. +- **Publish**: Send messages to a PostgreSQL queue and notify subscribers. + +## Requirements + +Make sure you have the following installed: + +- **Python 3.7+** +- **PostgreSQL 9.0+** +- **aiopg**: Asynchronous PostgreSQL driver for Python. +- **psycopg2**: PostgreSQL database adapter for Python. + +Install the required Python packages using `pip`: + +```bash +pip install aiopg psycopg2 +``` + +## Database Setup + +Before using the client, set up the necessary PostgreSQL table for the queue: + +```sql +CREATE SCHEMA queue; -- creates a schema called queue +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; -- helps us generate uuids +CREATE TABLE queue.message ( + id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), + channel text, + data json, + created_at timestamptz, + updated_at timestamptz +); -- creates a table with columns id, channel, data, created_at & updated_at +ALTER TABLE queue.message ALTER COLUMN created_at SET DEFAULT now(); +ALTER TABLE queue.message ALTER COLUMN updated_at SET DEFAULT now(); +-- above two lines make created_at and updated_at columns to be autopopulated + +CREATE OR REPLACE FUNCTION queue.new_message_notify() RETURNS TRIGGER AS $$ + DECLARE + BEGIN + PERFORM pg_notify(cast(NEW.channel as text), row_to_json(new)::text); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + +CREATE TRIGGER new_insert_trigger BEFORE INSERT ON queue.message + FOR EACH ROW EXECUTE PROCEDURE queue.new_message_notify(); +``` + +## Usage + +### 1. Subscribing to a Channel + +To subscribe to a PostgreSQL channel, you can use the `subscribe` method of the `Client` class. It continuously listens for notifications on the given channel. + +```python +import asyncio +import aiopg +from client import Client + +dsn = 'dbname=test user=postgres password=yourpassword host=localhost' + +async def run_subscriber(): + async with aiopg.connect(dsn) as conn: + await Client.subscribe(conn, 'my_channel') + +# Run the subscriber +asyncio.run(run_subscriber()) +``` + +### 2. Publishing to a Channel + +To publish a message to a PostgreSQL channel, use the `publish` method. It inserts a message into the `queue.message` table and notifies listeners. + +```python +import asyncio +from client import Client + +dsn = 'dbname=test user=postgres password=yourpassword host=localhost' + +async def run_publisher(): + await Client.publish(dsn, 'my_channel', 'Hello, PostgreSQL!') + +# Run the publisher +asyncio.run(run_publisher()) +``` + +### Example Output + +- When you publish a message, you’ll see: + + ``` + Postgres message dispatch success: (1, 'my_channel', 'Hello, PostgreSQL!') + ``` + +- When you subscribe to a channel, you’ll receive: + + ``` + Receive <- Hello, PostgreSQL! + ``` + +## Methods + +### `subscribe(conn, channel: str)` +Listens for notifications on a PostgreSQL channel. + +- **Parameters**: + - `conn`: The `aiopg.Connection` object. + - `channel`: The PostgreSQL channel name. + +- **Returns**: Prints out the messages received from the channel. + +### `publish(dsn: str, channel: str, data: str)` +Publishes a message to a PostgreSQL channel by inserting data into the `queue.message` table. + +- **Parameters**: + - `dsn`: The PostgreSQL connection string. + - `channel`: The PostgreSQL channel to notify. + - `data`: The message to be published. + +- **Returns**: Inserts a row into the `queue.message` table and notifies the channel subscribers. + +## Error Handling + +- The code catches and handles common PostgreSQL errors during subscription and publishing. +- It also ensures that listeners receive a 'finish' message, signaling the end of notifications. + +## License + +This project is open-source and available under the MIT License. \ No newline at end of file diff --git a/client.py b/client.py new file mode 100644 index 0000000..1cbd986 --- /dev/null +++ b/client.py @@ -0,0 +1,61 @@ +import aiopg +import psycopg2 +import asyncio + +class Client: + + + async def subscribe(conn, channel: str): + """ + Listen for notifications on a PostgreSQL channel. + + Args: + conn (aiopg.Connection): The aiopg connection. + channel (str): The PostgreSQL channel to listen to. + """ + async with conn.cursor() as cur: + await cur.execute(f"LISTEN {channel}") + + # Continuously listen for notifications + while True: + try: + # Wait and get the next notification message + msg = await conn.notifies.get() + except psycopg2.Error as ex: + print("ERROR: ", ex) + return + + if msg.payload == "finish": + return + else: + print(f"Receive <- {msg.payload}") + + + async def publish(dsn, channel: str, data: str): + """ + Publish a message to the PostgreSQL queue by inserting data into the table. + + Args: + dsn (str): The PostgreSQL database connection string. + channel (str): The channel where the message is to be dispatched. + data (str): The data to be inserted into the queue. + """ + # SQL query to insert data into the queue.message table + query = "INSERT INTO queue.message(channel, data) VALUES (%s, %s) RETURNING *" + values = (channel, data) + + async with aiopg.connect(dsn) as conn: + async with conn.cursor() as cur: + try: + # Execute the SQL query with the provided values + await cur.execute(query, values) + + # Fetch and print the result from the database + result = await cur.fetchone() + print(f"Postgres message dispatch success: {result}") + except Exception as error: + print(f"Error Occurred -> {error}") + + finally: + # This is often useful to let listeners know that no more messages will be sent. + await cur.execute(f"NOTIFY {channel}, 'finish'") \ No newline at end of file diff --git a/publish.py b/publish.py deleted file mode 100644 index 62625ae..0000000 --- a/publish.py +++ /dev/null @@ -1,30 +0,0 @@ -import aiopg - -async def publish(dsn, channel: str, data: str): - """ - Publish a message to the PostgreSQL queue by inserting data into the table. - - Args: - dsn (str): The PostgreSQL database connection string. - channel (str): The channel where the message is to be dispatched. - data (str): The data to be inserted into the queue. - """ - # SQL query to insert data into the queue.message table - query = "INSERT INTO queue.message(channel, data) VALUES (%s, %s) RETURNING *" - values = (channel, data) - - async with aiopg.connect(dsn) as conn: - async with conn.cursor() as cur: - try: - # Execute the SQL query with the provided values - await cur.execute(query, values) - - # Fetch and print the result from the database - result = await cur.fetchone() - print(f"Postgres message dispatch success: {result}") - except Exception as error: - print(f"Error Occurred -> {error}") - - finally: - # This is often useful to let listeners know that no more messages will be sent. - await cur.execute(f"NOTIFY {channel}, 'finish'") \ No newline at end of file diff --git a/publish_test.py b/publish_test.py deleted file mode 100644 index 105fd1c..0000000 --- a/publish_test.py +++ /dev/null @@ -1,10 +0,0 @@ -import asyncio -from publish import publish - -# Example usage (assuming you have a PostgreSQL connection pool): -async def main(): - dsn = "dbname=xxxxx user=xxxxxx password=xxxxxx host=localhost" - await publish(dsn, 'my_channel', '{"message": "Hello, world!"}') - -asyncio.run(main()) - diff --git a/subscribe.py b/subscribe.py deleted file mode 100644 index e077146..0000000 --- a/subscribe.py +++ /dev/null @@ -1,27 +0,0 @@ -import psycopg2 - -async def subscribe(conn, channel: str): - """ - Listen for notifications on a PostgreSQL channel. - - Args: - conn (aiopg.Connection): The aiopg connection. - channel (str): The PostgreSQL channel to listen to. - """ - async with conn.cursor() as cur: - await cur.execute(f"LISTEN {channel}") - - # Continuously listen for notifications - while True: - try: - # Wait and get the next notification message - msg = await conn.notifies.get() - except psycopg2.Error as ex: - print("ERROR: ", ex) - return - - if msg.payload == "finish": - return - else: - print(f"Receive <- {msg.payload}") - diff --git a/subscribe_test.py b/subscribe_test.py deleted file mode 100644 index 7f79a86..0000000 --- a/subscribe_test.py +++ /dev/null @@ -1,13 +0,0 @@ -import asyncio -from subscribe import subscribe -import aiopg - -# Example usage (assuming you have a PostgreSQL connection pool): -async def main(): - #provide a postgres data source here - dsn = "dbname=xxxxx user=xxxxx password=xxxxx host=localhost" - async with aiopg.connect(dsn) as conn: - result = await subscribe(conn, 'my_channel') # change 'my_channel' to the channel you want to use - print(f"Received notification: {result}") - -asyncio.run(main()) From 9f0aa451c52789a6d235619106fcddaa4fb864b8 Mon Sep 17 00:00:00 2001 From: Prince Oforh Asiedu Date: Tue, 8 Oct 2024 07:00:29 +0000 Subject: [PATCH 3/4] Readme updated --- README.md | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index bca6a49..e468c6d 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ pip install aiopg psycopg2 ## Database Setup -Before using the client, set up the necessary PostgreSQL table for the queue: +Before using the client, you will need to set up the necessary PostgreSQL table for testing locally otherwise skip this part: ```sql CREATE SCHEMA queue; -- creates a schema called queue @@ -94,19 +94,18 @@ asyncio.run(run_publisher()) - When you publish a message, you’ll see: - ``` - Postgres message dispatch success: (1, 'my_channel', 'Hello, PostgreSQL!') + ```Postgres message dispatch success: (1, 'my_channel', 'Hello, PostgreSQL!') ``` - When you subscribe to a channel, you’ll receive: - ``` - Receive <- Hello, PostgreSQL! + ``` Receive <- Hello, PostgreSQL! ``` ## Methods ### `subscribe(conn, channel: str)` + Listens for notifications on a PostgreSQL channel. - **Parameters**: @@ -116,6 +115,7 @@ Listens for notifications on a PostgreSQL channel. - **Returns**: Prints out the messages received from the channel. ### `publish(dsn: str, channel: str, data: str)` + Publishes a message to a PostgreSQL channel by inserting data into the `queue.message` table. - **Parameters**: @@ -129,7 +129,3 @@ Publishes a message to a PostgreSQL channel by inserting data into the `queue.me - The code catches and handles common PostgreSQL errors during subscription and publishing. - It also ensures that listeners receive a 'finish' message, signaling the end of notifications. - -## License - -This project is open-source and available under the MIT License. \ No newline at end of file From 97b1b59181566a74d614c6ce4fe8d2dcc597246b Mon Sep 17 00:00:00 2001 From: Prince Oforh Asiedu Date: Tue, 8 Oct 2024 07:04:19 +0000 Subject: [PATCH 4/4] Readme change --- README.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index e468c6d..6aaef6a 100644 --- a/README.md +++ b/README.md @@ -94,12 +94,14 @@ asyncio.run(run_publisher()) - When you publish a message, you’ll see: - ```Postgres message dispatch success: (1, 'my_channel', 'Hello, PostgreSQL!') + ``` + Postgres message dispatch success: (1, 'my_channel', 'Hello, PostgreSQL!') ``` - When you subscribe to a channel, you’ll receive: - ``` Receive <- Hello, PostgreSQL! + ``` + Receive <- Hello, PostgreSQL! ``` ## Methods