diff --git a/dpq/queue.py b/dpq/queue.py index 0631425..9d9f041 100644 --- a/dpq/queue.py +++ b/dpq/queue.py @@ -57,14 +57,22 @@ def listen(self): cur.execute('LISTEN "{}";'.format(self.notify_channel)) def wait(self, timeout=30): - connection.connection.poll() - notifies = self.filter_notifies() - if notifies: - return notifies - - select.select([connection.connection], [], [], timeout) - connection.connection.poll() - return self.filter_notifies() + if hasattr(connection.connection, 'poll'): + # psycopg2 + connection.connection.poll() + notifies = self.filter_notifies() + if notifies: + return notifies + + select.select([connection.connection], [], [], timeout) + connection.connection.poll() + return self.filter_notifies() + else: + # psycopg3 + for notify in connection.connection.notifies(timeout=timeout): + return [notify] + else: + return [] def filter_notifies(self): notifies = [