From 0d090bb3be42bb214bf6637466d2398a1245cd32 Mon Sep 17 00:00:00 2001 From: Martin Veselovsky Date: Thu, 31 Oct 2019 13:52:11 +0100 Subject: [PATCH 1/2] Use amqp library instead of amqplib Library `amqplib` is several years old. The `amqp` is fork of `amqplib`, which is maintained by the Celery project. --- graypy/rabbitmq.py | 7 ++++--- setup.py | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/graypy/rabbitmq.py b/graypy/rabbitmq.py index ea7be2cdf..f156ab7ff 100644 --- a/graypy/rabbitmq.py +++ b/graypy/rabbitmq.py @@ -8,7 +8,7 @@ from logging import Filter from logging.handlers import SocketHandler -from amqplib import client_0_8 as amqp # pylint: disable=import-error +import amqp # pylint: disable=import-error from graypy.handler import BaseGELFHandler @@ -78,7 +78,7 @@ def __init__( self.routing_key = routing_key BaseGELFHandler.__init__(self, **kwargs) SocketHandler.__init__(self, host, port) - self.addFilter(ExcludeFilter("amqplib")) + self.addFilter(ExcludeFilter("amqp")) def makeSocket(self, timeout=1): return RabbitSocket( @@ -97,7 +97,8 @@ def __init__(self, cn_args, timeout, exchange, exchange_type, routing_key): self.exchange = exchange self.exchange_type = exchange_type self.routing_key = routing_key - self.connection = amqp.Connection(connection_timeout=timeout, **self.cn_args) + self.connection = amqp.Connection(connect_timeout=timeout, **self.cn_args) + self.connection.connect() self.channel = self.connection.channel() self.channel.exchange_declare( exchange=self.exchange, diff --git a/setup.py b/setup.py index 925dc12b7..8f03253a1 100755 --- a/setup.py +++ b/setup.py @@ -80,10 +80,10 @@ def run_tests(self): "pylint>=1.9.3,<2.0.0", "mock>=2.0.0,<3.0.0", "requests>=2.20.1,<3.0.0", - "amqplib>=1.0.2,<2.0.0", + "amqp==2.5.2", ], extras_require={ - "amqp": ["amqplib==1.0.2"], + "amqp": ["amqp==2.5.2"], "docs": [ "sphinx>=2.1.2,<3.0.0", "sphinx_rtd_theme>=0.4.3,<1.0.0", From be77b185469fff1f61a814ec0960c2619f66d233 Mon Sep 17 00:00:00 2001 From: Martin Veselovsky Date: Fri, 1 Nov 2019 13:17:10 +0100 Subject: [PATCH 2/2] Add possibility to set timeouts in GELFRabbitHandler. --- graypy/rabbitmq.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/graypy/rabbitmq.py b/graypy/rabbitmq.py index f156ab7ff..6df14aab9 100644 --- a/graypy/rabbitmq.py +++ b/graypy/rabbitmq.py @@ -37,6 +37,9 @@ def __init__( exchange_type="fanout", virtual_host="/", routing_key="", + connect_timeout=1, + read_timeout=None, + write_timeout=None, **kwargs ): """Initialize the GELFRabbitHandler @@ -56,6 +59,15 @@ def __init__( :param routing_key: :type routing_key: str + + :param connect_timeout: Used when creating new connection + :type connect_timeout: int | float | None + + :param read_timeout: + :type read_timeout: int | float | None + + :param write_timeout: + :type write_timeout: int | float | None """ self.url = url parsed = urlparse(url) @@ -72,18 +84,19 @@ def __init__( "password": _ifnone(parsed.password, "guest"), "virtual_host": self.virtual_host, "insist": False, + "read_timeout": read_timeout, + "write_timeout": write_timeout, } self.exchange = exchange self.exchange_type = exchange_type self.routing_key = routing_key + self.connect_timeout = connect_timeout BaseGELFHandler.__init__(self, **kwargs) SocketHandler.__init__(self, host, port) self.addFilter(ExcludeFilter("amqp")) def makeSocket(self, timeout=1): - return RabbitSocket( - self.cn_args, timeout, self.exchange, self.exchange_type, self.routing_key - ) + return RabbitSocket(self.cn_args, self.connect_timeout or timeout, self.exchange, self.exchange_type, self.routing_key) def makePickle(self, record): message_dict = self._make_gelf_dict(record) @@ -91,13 +104,12 @@ def makePickle(self, record): class RabbitSocket(object): - def __init__(self, cn_args, timeout, exchange, exchange_type, routing_key): + def __init__(self, cn_args, connect_timeout, exchange, exchange_type, routing_key): self.cn_args = cn_args - self.timeout = timeout self.exchange = exchange self.exchange_type = exchange_type self.routing_key = routing_key - self.connection = amqp.Connection(connect_timeout=timeout, **self.cn_args) + self.connection = amqp.Connection(connect_timeout=connect_timeout, **self.cn_args) self.connection.connect() self.channel = self.connection.channel() self.channel.exchange_declare(