diff --git a/frontera/contrib/messagebus/kafka/transport.py b/frontera/contrib/messagebus/kafka/transport.py new file mode 100644 index 000000000..6d7d24ccc --- /dev/null +++ b/frontera/contrib/messagebus/kafka/transport.py @@ -0,0 +1,51 @@ +from math import ceil + +import hashlib +from cachetools import LRUCache +from msgpack import Packer, unpackb +from random import randint +from six import MAXSIZE +from struct import pack + + +def random_bytes(): + return pack("L", randint(0, MAXSIZE)) + + +class FramedTransport(object): + def __init__(self, max_message_size): + self.max_message_size = max_message_size + self.buffer = LRUCache(10) + self.packer = Packer() + + def read(self, kafka_msg): + frame = unpackb(kafka_msg.value) + seg_id, seg_count, msg_key, msg = frame + if seg_count == 1: + return msg + + buffer = self.buffer.get(msg_key, dict()) + if not buffer: + self.buffer[msg_key] = buffer + buffer[seg_id] = frame + if len(buffer) == seg_count: + msgs = [buffer[_seg_id][3] for _seg_id in sorted(buffer.keys())] + final_msg = b''.join(msgs) + del self.buffer[msg_key] + return final_msg + return None + + def write(self, key, msg): + if len(msg) < self.max_message_size: + yield self.packer.pack((0, 1, None, msg)) + else: + length = len(msg) + seg_size = self.max_message_size + seg_count = int(ceil(length / float(seg_size))) + h = hashlib.sha1() + h.update(msg) + h.update(random_bytes()) + msg_key = h.digest() + for seg_id in range(seg_count): + seg_msg = msg[seg_id * seg_size: (seg_id + 1) * seg_size] + yield self.packer.pack((seg_id, seg_count, msg_key, seg_msg)) \ No newline at end of file diff --git a/frontera/contrib/messagebus/kafkabus.py b/frontera/contrib/messagebus/kafkabus.py index f4761e052..6140b7ce4 100644 --- a/frontera/contrib/messagebus/kafkabus.py +++ b/frontera/contrib/messagebus/kafkabus.py @@ -2,23 +2,21 @@ from __future__ import absolute_import from logging import getLogger -from time import sleep import six from kafka import KafkaConsumer, KafkaProducer, TopicPartition from frontera.contrib.backends.partitioners import FingerprintPartitioner, Crc32NamePartitioner from frontera.contrib.messagebus.kafka.offsets_fetcher import OffsetsFetcherAsync +from frontera.contrib.messagebus.kafka.transport import FramedTransport from frontera.core.messagebus import BaseMessageBus, BaseSpiderLogStream, BaseSpiderFeedStream, \ BaseStreamConsumer, BaseScoringLogStream, BaseStreamProducer, BaseStatsLogStream -from twisted.internet.task import LoopingCall -from traceback import format_tb from os.path import join as os_path_join - DEFAULT_BATCH_SIZE = 1024 * 1024 DEFAULT_BUFFER_MEMORY = 130 * 1024 * 1024 DEFAULT_MAX_REQUEST_SIZE = 4 * 1024 * 1024 +MAX_SEGMENT_SIZE = int(DEFAULT_MAX_REQUEST_SIZE * 0.95) logger = getLogger("messagebus.kafka") @@ -59,13 +57,16 @@ def __init__(self, location, enable_ssl, cert_path, topic, group, partition_id): else: self._partitions = [TopicPartition(self._topic, pid) for pid in self._consumer.partitions_for_topic(self._topic)] self._consumer.subscribe(topics=[self._topic]) + self._transport = FramedTransport(MAX_SEGMENT_SIZE) def get_messages(self, timeout=0.1, count=1): result = [] while count > 0: try: - m = next(self._consumer) - result.append(m.value) + kafka_msg = next(self._consumer) + msg = self._transport.read(kafka_msg) + if msg is not None: + result.append(msg) count -= 1 except StopIteration: break @@ -90,17 +91,18 @@ def __init__(self, location, enable_ssl, cert_path, topic, compression, **kwargs self._create(enable_ssl, cert_path, **kwargs) def _create(self, enable_ssl, cert_path, **kwargs): - max_request_size = kwargs.pop('max_request_size', DEFAULT_MAX_REQUEST_SIZE) + self._transport = FramedTransport(MAX_SEGMENT_SIZE) kwargs.update(_prepare_kafka_ssl_kwargs(cert_path) if enable_ssl else {}) self._producer = KafkaProducer(bootstrap_servers=self._location, retries=5, compression_type=self._compression, - max_request_size=max_request_size, + max_request_size=DEFAULT_MAX_REQUEST_SIZE, **kwargs) def send(self, key, *messages): for msg in messages: - self._producer.send(self._topic, value=msg) + for kafka_msg in self._transport.write(key, msg): + self._producer.send(self._topic, value=kafka_msg) def flush(self): self._producer.flush() @@ -115,18 +117,19 @@ def __init__(self, location, enable_ssl, cert_path, topic_done, partitioner, com self._topic_done = topic_done self._partitioner = partitioner self._compression = compression - max_request_size = kwargs.pop('max_request_size', DEFAULT_MAX_REQUEST_SIZE) kwargs.update(_prepare_kafka_ssl_kwargs(cert_path) if enable_ssl else {}) + self._transport = FramedTransport(MAX_SEGMENT_SIZE) self._producer = KafkaProducer(bootstrap_servers=self._location, partitioner=partitioner, retries=5, compression_type=self._compression, - max_request_size=max_request_size, + max_request_size=DEFAULT_MAX_REQUEST_SIZE, **kwargs) def send(self, key, *messages): for msg in messages: - self._producer.send(self._topic_done, key=key, value=msg) + for kafka_msg in self._transport.write(key, msg): + self._producer.send(self._topic_done, key=key, value=kafka_msg) def flush(self): self._producer.flush() diff --git a/tests/kafka/docker-compose.yml b/tests/kafka/docker-compose.yml index f9b7a5e5f..b06962b63 100644 --- a/tests/kafka/docker-compose.yml +++ b/tests/kafka/docker-compose.yml @@ -5,12 +5,14 @@ services: ports: - "2181:2181" kafka: - image: wurstmeister/kafka + image: wurstmeister/kafka:2.11-0.10.2.1 ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 - KAFKA_CREATE_TOPICS: "test:1:1" + KAFKA_CREATE_TOPICS: "frontier-todo:1:1,frontier-done:1:1,frontier-score:4:1,frontier-stats:4:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_MESSAGE_MAX_BYTES: 4194304 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" volumes: - /var/run/docker.sock:/var/run/docker.sock \ No newline at end of file diff --git a/tests/test_framed_transport.py b/tests/test_framed_transport.py new file mode 100644 index 000000000..9d0c2f76a --- /dev/null +++ b/tests/test_framed_transport.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +from frontera.contrib.messagebus.kafka.transport import FramedTransport +import random +import string +from collections import namedtuple +import unittest + +KafkaMessage = namedtuple("KafkaMessage", ['key', 'value']) + + +def get_blob(size): + s = ''.join(random.choice(string.ascii_letters) for x in range(size)) + return s.encode("latin1") + + +class TestFramedTransport(unittest.TestCase): + def setUp(self): + self.transport = FramedTransport(32768) + + def test_big_message(self): + test_msg = get_blob(1000000) + assert len(test_msg) == 1000000 + framed_msgs = [m for m in self.transport.write(b"key", test_msg)] + assert len(framed_msgs) == 31 + + random.shuffle(framed_msgs) + + for i, msg in enumerate(framed_msgs): + km = KafkaMessage(key=b"key", value=msg) + result = self.transport.read(km) + if i < len(framed_msgs) - 1: + assert result is None + assert result == test_msg # the last one is triggering msg assembling + + def test_common_message(self): + test_msg = get_blob(4096) + framed_msgs = [m for m in self.transport.write(b"key", test_msg)] + assert len(framed_msgs) == 1 + + km = KafkaMessage(key=b"key", value=framed_msgs[0]) + result = self.transport.read(km) + assert result == test_msg diff --git a/tests/test_message_bus.py b/tests/test_message_bus.py index 5293c2445..c78d5393d 100644 --- a/tests/test_message_bus.py +++ b/tests/test_message_bus.py @@ -5,8 +5,9 @@ from frontera.contrib.messagebus.kafkabus import MessageBus as KafkaMessageBus from frontera.utils.fingerprint import sha1 from kafka import KafkaClient -from random import randint +from random import randint, choice from time import sleep +import string from six.moves import range import logging from sys import stdout @@ -14,6 +15,11 @@ from w3lib.util import to_bytes +def get_blob(size): + s = ''.join(choice(string.ascii_letters) for x in range(size)) + return s.encode("latin1") + + class MessageBusTester(object): def __init__(self, cls, settings=Settings()): settings.set('SPIDER_FEED_PARTITIONS', 1) @@ -119,7 +125,8 @@ def close(self): class KafkaMessageBusTest(unittest.TestCase): def setUp(self): logging.basicConfig() - handler = logging.StreamHandler(stdout) + #handler = logging.StreamHandler(stdout) + handler = logging.FileHandler("kafka-debug.log") logger = logging.getLogger("kafka") logger.setLevel(logging.INFO) logger.addHandler(handler) @@ -177,7 +184,8 @@ def spider_log_activity(self, messages): if i % 2 == 0: self.sp_sl_p.send(sha1(str(randint(1, 1000))), b'http://helloworld.com/way/to/the/sun/' + b'0') else: - self.sp_sl_p.send(sha1(str(randint(1, 1000))), b'http://way.to.the.sun' + b'0') + msg = b'http://way.to.the.sun' + b'0' if i != messages - 1 else get_blob(10485760) + self.sp_sl_p.send(sha1(str(randint(1, 1000))), msg) self.sp_sl_p.flush() self.logger.debug("spider log activity finished") @@ -190,12 +198,17 @@ def spider_feed_activity(self): def sw_activity(self): c = 0 p = 0 + big_message_passed = False for m in self.sw_sl_c.get_messages(timeout=0.1, count=512): if m.startswith(b'http://helloworld.com/'): p += 1 self.sw_us_p.send(None, b'message' + b'0' + b"," + to_bytes(str(c))) + else: + if len(m) == 10485760: + big_message_passed = True c += 1 assert p > 0 + assert big_message_passed return c def db_activity(self, messages):