diff --git a/graypy/rabbitmq.py b/graypy/rabbitmq.py index ea7be2cdf..6df14aab9 100644 --- a/graypy/rabbitmq.py +++ b/graypy/rabbitmq.py @@ -8,7 +8,7 @@ from logging import Filter from logging.handlers import SocketHandler -from amqplib import client_0_8 as amqp # pylint: disable=import-error +import amqp # pylint: disable=import-error from graypy.handler import BaseGELFHandler @@ -37,6 +37,9 @@ def __init__( exchange_type="fanout", virtual_host="/", routing_key="", + connect_timeout=1, + read_timeout=None, + write_timeout=None, **kwargs ): """Initialize the GELFRabbitHandler @@ -56,6 +59,15 @@ def __init__( :param routing_key: :type routing_key: str + + :param connect_timeout: Used when creating new connection + :type connect_timeout: int | float | None + + :param read_timeout: + :type read_timeout: int | float | None + + :param write_timeout: + :type write_timeout: int | float | None """ self.url = url parsed = urlparse(url) @@ -72,18 +84,19 @@ def __init__( "password": _ifnone(parsed.password, "guest"), "virtual_host": self.virtual_host, "insist": False, + "read_timeout": read_timeout, + "write_timeout": write_timeout, } self.exchange = exchange self.exchange_type = exchange_type self.routing_key = routing_key + self.connect_timeout = connect_timeout BaseGELFHandler.__init__(self, **kwargs) SocketHandler.__init__(self, host, port) - self.addFilter(ExcludeFilter("amqplib")) + self.addFilter(ExcludeFilter("amqp")) def makeSocket(self, timeout=1): - return RabbitSocket( - self.cn_args, timeout, self.exchange, self.exchange_type, self.routing_key - ) + return RabbitSocket(self.cn_args, self.connect_timeout or timeout, self.exchange, self.exchange_type, self.routing_key) def makePickle(self, record): message_dict = self._make_gelf_dict(record) @@ -91,13 +104,13 @@ def makePickle(self, record): class RabbitSocket(object): - def __init__(self, cn_args, timeout, exchange, exchange_type, routing_key): + def __init__(self, cn_args, connect_timeout, exchange, exchange_type, routing_key): self.cn_args = cn_args - self.timeout = timeout self.exchange = exchange self.exchange_type = exchange_type self.routing_key = routing_key - self.connection = amqp.Connection(connection_timeout=timeout, **self.cn_args) + self.connection = amqp.Connection(connect_timeout=connect_timeout, **self.cn_args) + self.connection.connect() self.channel = self.connection.channel() self.channel.exchange_declare( exchange=self.exchange, diff --git a/setup.py b/setup.py index 925dc12b7..8f03253a1 100755 --- a/setup.py +++ b/setup.py @@ -80,10 +80,10 @@ def run_tests(self): "pylint>=1.9.3,<2.0.0", "mock>=2.0.0,<3.0.0", "requests>=2.20.1,<3.0.0", - "amqplib>=1.0.2,<2.0.0", + "amqp==2.5.2", ], extras_require={ - "amqp": ["amqplib==1.0.2"], + "amqp": ["amqp==2.5.2"], "docs": [ "sphinx>=2.1.2,<3.0.0", "sphinx_rtd_theme>=0.4.3,<1.0.0",