diff --git a/README.md b/README.md index 53ce43b..fb055f5 100644 --- a/README.md +++ b/README.md @@ -175,18 +175,31 @@ source: value_format: json # or arrow_ipc batch_size: 1000 continuous: true # Continuously consume data (default: true) + consumer_group: my-consumer-group # Required for Redis Streams + consumer_name: optional # Defaults to hostname-id if not provided ``` The Redis source supports two consumption modes: - **Continuous mode** (default): Continuously polls Redis for new data, similar to Kafka consumer - **One-shot mode**: Reads available data once and stops +**Redis Streams Consumer Groups:** +When using Redis Streams (`stream_keys`), a `consumer_group` is required. The source uses Redis consumer groups to: +- Track message consumption across multiple consumers +- Enable parallel processing with multiple pipeline instances +- Automatically acknowledge messages after successful processing with XACK +- Provide at-least-once delivery semantics + +If `consumer_name` is not provided, it defaults to `{hostname}-{id}` to ensure uniqueness. + Environment variables: - `REDIS_HOST`: Redis host - `REDIS_PORT`: Redis port - `REDIS_PASSWORD`: Redis password - `REDIS_STREAM_KEYS`: Comma-separated list of stream keys - `REDIS_CONTINUOUS`: Enable continuous consumption (true/false) +- `REDIS_CONSUMER_GROUP`: Consumer group name (required for streams) +- `REDIS_CONSUMER_NAME`: Consumer name within the group ### Sink Configuration diff --git a/USAGE.md b/USAGE.md index 6397bf7..ad0bea2 100644 --- a/USAGE.md +++ b/USAGE.md @@ -56,6 +56,7 @@ export SOURCE_TYPE=redis export REDIS_HOST=localhost export REDIS_PORT=6379 export REDIS_STREAM_KEYS=sensor-data,user-events +export REDIS_CONSUMER_GROUP=fs-data-sink-group # Required for Redis Streams export SINK_TYPE=hdfs export HDFS_URL=http://namenode:9870 @@ -153,6 +154,8 @@ source: - queue:tasks value_format: json # or arrow_ipc batch_size: 1000 + consumer_group: fs-data-sink-group # Required for Redis Streams + consumer_name: optional # Defaults to hostname-id if not provided extra_config: # Any additional Redis client configuration socket_timeout: 5 @@ -232,6 +235,8 @@ All configuration can be overridden with environment variables: - `REDIS_PORT`: Redis port - `REDIS_PASSWORD`: Redis password - `REDIS_STREAM_KEYS`: comma-separated +- `REDIS_CONSUMER_GROUP`: consumer group (required for streams) +- `REDIS_CONSUMER_NAME`: consumer name (optional) #### Sink - `SINK_TYPE`: s3 or hdfs diff --git a/config/example-redis-to-hdfs.ini b/config/example-redis-to-hdfs.ini index 07789c0..59a7902 100644 --- a/config/example-redis-to-hdfs.ini +++ b/config/example-redis-to-hdfs.ini @@ -9,6 +9,7 @@ stream_keys = data-stream-1,data-stream-2 value_format = json batch_size = 1000 continuous = true +consumer_group = fs-data-sink-group [sink] type = hdfs diff --git a/config/example-redis-to-hdfs.yaml b/config/example-redis-to-hdfs.yaml index 9016b44..1e5ff7a 100644 --- a/config/example-redis-to-hdfs.yaml +++ b/config/example-redis-to-hdfs.yaml @@ -13,6 +13,8 @@ source: value_format: json # or arrow_ipc batch_size: 1000 continuous: true # Continuously consume data (default: true) + consumer_group: fs-data-sink-group # Required for Redis Streams + # consumer_name: optional # Defaults to hostname-id if not provided sink: type: hdfs diff --git a/config/example-redis-to-local.ini b/config/example-redis-to-local.ini index 6ea9e65..bfa8cc2 100644 --- a/config/example-redis-to-local.ini +++ b/config/example-redis-to-local.ini @@ -9,6 +9,7 @@ stream_keys = data-stream-1,data-stream-2 value_format = json batch_size = 1000 continuous = true +consumer_group = fs-data-sink-group [sink] type = local diff --git a/src/fs_data_sink/config/settings.py b/src/fs_data_sink/config/settings.py index 8e63c35..39fd9d8 100644 --- a/src/fs_data_sink/config/settings.py +++ b/src/fs_data_sink/config/settings.py @@ -27,6 +27,8 @@ class SourceConfig: stream_keys: Optional[list[str]] = None list_keys: Optional[list[str]] = None continuous: bool = True + consumer_group: Optional[str] = None + consumer_name: Optional[str] = None # Common value_format: str = "json" @@ -126,6 +128,13 @@ def _load_ini_file(config_path: str) -> dict: # Handle boolean values elif key in ("continuous",): config_data["source"][key] = config.getboolean("source", key) + # Handle string values that could be None/null + elif key in ("consumer_group", "consumer_name") and value.lower() in ( + "null", + "none", + "", + ): + config_data["source"][key] = None else: config_data["source"][key] = value @@ -232,6 +241,10 @@ def _apply_env_overrides(config_data: dict) -> None: source["stream_keys"] = os.getenv("REDIS_STREAM_KEYS").split(",") if os.getenv("REDIS_CONTINUOUS"): source["continuous"] = os.getenv("REDIS_CONTINUOUS").lower() in ("true", "1", "yes") + if os.getenv("REDIS_CONSUMER_GROUP"): + source["consumer_group"] = os.getenv("REDIS_CONSUMER_GROUP") + if os.getenv("REDIS_CONSUMER_NAME"): + source["consumer_name"] = os.getenv("REDIS_CONSUMER_NAME") # Sink overrides sink = config_data.setdefault("sink", {}) diff --git a/src/fs_data_sink/pipeline.py b/src/fs_data_sink/pipeline.py index a2b4588..5b9fb4d 100644 --- a/src/fs_data_sink/pipeline.py +++ b/src/fs_data_sink/pipeline.py @@ -76,6 +76,8 @@ def _create_source(self) -> DataSource: value_format=source_config.value_format, continuous=source_config.continuous, redis_config=source_config.extra_config, + consumer_group=source_config.consumer_group, + consumer_name=source_config.consumer_name, ) raise ValueError(f"Unsupported source type: {source_config.type}") diff --git a/src/fs_data_sink/sources/redis_source.py b/src/fs_data_sink/sources/redis_source.py index c23ae42..4d6b081 100644 --- a/src/fs_data_sink/sources/redis_source.py +++ b/src/fs_data_sink/sources/redis_source.py @@ -33,6 +33,8 @@ def __init__( block_timeout: int = 1000, continuous: bool = True, redis_config: Optional[dict] = None, + consumer_group: Optional[str] = None, + consumer_name: Optional[str] = None, ): """ Initialize Redis source. @@ -48,6 +50,8 @@ def __init__( block_timeout: Timeout in milliseconds for blocking operations continuous: If True, continuously consume data in a loop; if False, read once and stop redis_config: Additional Redis configuration + consumer_group: Consumer group name (required for Redis Streams) + consumer_name: Consumer name within the group (defaults to hostname if not provided) """ self.host = host self.port = port @@ -59,8 +63,10 @@ def __init__( self.block_timeout = block_timeout self.continuous = continuous self.redis_config = redis_config or {} + self.consumer_group = consumer_group + self.consumer_name = consumer_name self.client: Optional[redis.Redis] = None - self.stream_ids: dict[str, str] = dict.fromkeys(self.stream_keys, "0") + self.stream_ids: dict[str, str] = dict.fromkeys(self.stream_keys, ">") def connect(self) -> None: """Establish connection to Redis.""" @@ -69,6 +75,10 @@ def connect(self) -> None: "Connecting to Redis: host=%s, port=%s, db=%s", self.host, self.port, self.db ) + # Validate that consumer_group is provided if using streams + if self.stream_keys and not self.consumer_group: + raise ValueError("consumer_group is required when using Redis Streams") + self.client = redis.Redis( host=self.host, port=self.port, @@ -82,6 +92,39 @@ def connect(self) -> None: self.client.ping() logger.info("Successfully connected to Redis") + # Set default consumer name if not provided + if self.stream_keys and not self.consumer_name: + import socket + + self.consumer_name = f"{socket.gethostname()}-{id(self)}" + logger.info("Using default consumer name: %s", self.consumer_name) + + # Create consumer groups for streams if they don't exist + if self.stream_keys and self.consumer_group: + for stream_key in self.stream_keys: + try: + # Try to create the consumer group + # Start from the beginning ('0') to consume all messages + # Use mkstream=True to create stream if it doesn't exist + self.client.xgroup_create( + name=stream_key, groupname=self.consumer_group, id="0", mkstream=True + ) + logger.info( + "Created consumer group '%s' for stream '%s'", + self.consumer_group, + stream_key, + ) + except redis.exceptions.ResponseError as e: + # Group already exists, which is fine + if "BUSYGROUP" in str(e): + logger.info( + "Consumer group '%s' already exists for stream '%s'", + self.consumer_group, + stream_key, + ) + else: + raise + def read_batch(self, batch_size: int = 1000) -> Iterator[Optional[pa.RecordBatch]]: """ Read data batches from Redis. @@ -134,23 +177,35 @@ def read_batch(self, batch_size: int = 1000) -> Iterator[Optional[pa.RecordBatch break def _read_from_streams(self, batch_size: int) -> list: - """Read messages from Redis streams.""" + """Read messages from Redis streams using consumer groups.""" messages = [] + message_ids_to_ack = [] # Track message IDs for acknowledgment try: - # Prepare stream IDs for XREAD - streams = {key: self.stream_ids[key] for key in self.stream_keys} - - # Read from all streams - results = self.client.xread(streams=streams, count=batch_size, block=self.block_timeout) + # Prepare stream IDs for XREADGROUP + # Use ">" to receive messages never delivered to other consumers + streams = dict.fromkeys(self.stream_keys, ">") + + # Read from all streams using consumer group + results = self.client.xreadgroup( + groupname=self.consumer_group, + consumername=self.consumer_name, + streams=streams, + count=batch_size, + block=self.block_timeout, + ) if results: for stream_key, stream_messages in results: + stream_key_str = ( + stream_key.decode() if isinstance(stream_key, bytes) else stream_key + ) + for msg_id, msg_data in stream_messages: - # Update last read ID - key = stream_key.decode() if isinstance(stream_key, bytes) else stream_key msg_id_str = msg_id.decode() if isinstance(msg_id, bytes) else msg_id - self.stream_ids[key] = msg_id_str + + # Track message ID and stream for acknowledgment + message_ids_to_ack.append((stream_key_str, msg_id_str)) # Extract message value if b"value" in msg_data: @@ -158,6 +213,22 @@ def _read_from_streams(self, batch_size: int) -> list: elif "value" in msg_data: messages.append(msg_data["value"]) + # Acknowledge all messages after successful reading + if message_ids_to_ack: + for stream_key, msg_id in message_ids_to_ack: + try: + self.client.xack(stream_key, self.consumer_group, msg_id) + logger.debug( + "Acknowledged message %s from stream %s", msg_id, stream_key + ) + except Exception as ack_error: + logger.error( + "Error acknowledging message %s from stream %s: %s", + msg_id, + stream_key, + ack_error, + ) + except Exception as e: logger.error("Error reading from Redis streams: %s", e, exc_info=True) diff --git a/tests/unit/test_nonblocking_flush.py b/tests/unit/test_nonblocking_flush.py index 34478ad..9a8530f 100644 --- a/tests/unit/test_nonblocking_flush.py +++ b/tests/unit/test_nonblocking_flush.py @@ -148,20 +148,28 @@ def test_continuous_redis_yields_none_when_no_data(): """Test that Redis source in continuous mode yields None when no data.""" from unittest.mock import MagicMock, patch + import redis + from fs_data_sink.sources import RedisSource mock_client = MagicMock() - mock_client.xread.side_effect = [ + mock_client.xreadgroup.side_effect = [ [(b"stream1", [(b"1-0", {b"value": b'{"id": 1}'})])], # First call returns data [], # Second call returns no data [], # Third call returns no data ] + # Mock xgroup_create to simulate group already exists + mock_client.xgroup_create.side_effect = redis.exceptions.ResponseError( + "BUSYGROUP Consumer Group name already exists" + ) with patch("fs_data_sink.sources.redis_source.redis.Redis", return_value=mock_client): source = RedisSource( host="localhost", stream_keys=["stream1"], continuous=True, + consumer_group="test-group", + consumer_name="test-consumer", ) source.connect() diff --git a/tests/unit/test_redis_source.py b/tests/unit/test_redis_source.py index 2ea9d0d..bc97799 100644 --- a/tests/unit/test_redis_source.py +++ b/tests/unit/test_redis_source.py @@ -5,6 +5,7 @@ import pyarrow as pa import pytest +import redis from fs_data_sink.sources import RedisSource @@ -15,13 +16,17 @@ def mock_redis_client(): with patch("fs_data_sink.sources.redis_source.redis.Redis") as mock: client = MagicMock() mock.return_value = client + # Mock xgroup_create to simulate group already exists + client.xgroup_create.side_effect = redis.exceptions.ResponseError( + "BUSYGROUP Consumer Group name already exists" + ) yield client def test_redis_source_continuous_mode_enabled(mock_redis_client): """Test that Redis source with continuous=True keeps yielding batches and None.""" # Setup mock to return messages on first two calls, then empty on third - mock_redis_client.xread.side_effect = [ + mock_redis_client.xreadgroup.side_effect = [ [(b"stream1", [(b"1-0", {b"value": json.dumps({"id": 1, "name": "test1"}).encode()})])], [(b"stream1", [(b"1-1", {b"value": json.dumps({"id": 2, "name": "test2"}).encode()})])], [], # Empty result - will yield None in continuous mode @@ -33,6 +38,8 @@ def test_redis_source_continuous_mode_enabled(mock_redis_client): port=6379, stream_keys=["stream1"], continuous=True, + consumer_group="test-group", + consumer_name="test-consumer", ) source.connect() @@ -53,15 +60,17 @@ def test_redis_source_continuous_mode_enabled(mock_redis_client): batch3 = next(batch_gen) assert batch3 is None - # Verify xread was called multiple times (continuous mode) - assert mock_redis_client.xread.call_count >= 3 + # Verify xreadgroup was called multiple times (continuous mode) + assert mock_redis_client.xreadgroup.call_count >= 3 + # Verify messages were acknowledged + assert mock_redis_client.xack.call_count >= 2 source.close() def test_redis_source_continuous_mode_disabled(mock_redis_client): """Test that Redis source with continuous=False reads once and stops.""" - mock_redis_client.xread.return_value = [ + mock_redis_client.xreadgroup.return_value = [ (b"stream1", [(b"1-0", {b"value": json.dumps({"id": 1, "name": "test1"}).encode()})]) ] @@ -70,6 +79,8 @@ def test_redis_source_continuous_mode_disabled(mock_redis_client): port=6379, stream_keys=["stream1"], continuous=False, + consumer_group="test-group", + consumer_name="test-consumer", ) source.connect() @@ -79,8 +90,10 @@ def test_redis_source_continuous_mode_disabled(mock_redis_client): assert len(batches) == 1 assert batches[0].num_rows == 1 - # Verify xread was called only once (non-continuous mode) - assert mock_redis_client.xread.call_count == 1 + # Verify xreadgroup was called only once (non-continuous mode) + assert mock_redis_client.xreadgroup.call_count == 1 + # Verify message was acknowledged + assert mock_redis_client.xack.call_count == 1 source.close() @@ -126,7 +139,7 @@ def test_redis_source_continuous_mode_with_lists(mock_redis_client): def test_redis_source_continuous_mode_default(mock_redis_client): """Test that continuous mode is True by default.""" - mock_redis_client.xread.side_effect = [ + mock_redis_client.xreadgroup.side_effect = [ [(b"stream1", [(b"1-0", {b"value": json.dumps({"id": 1}).encode()})])], [], # Will yield None ] @@ -135,6 +148,8 @@ def test_redis_source_continuous_mode_default(mock_redis_client): host="localhost", port=6379, stream_keys=["stream1"], + consumer_group="test-group", + consumer_name="test-consumer", ) # Should default to continuous=True @@ -157,7 +172,7 @@ def test_redis_source_continuous_mode_default(mock_redis_client): def test_redis_source_mixed_streams_and_lists(mock_redis_client): """Test reading from both streams and lists in continuous mode.""" - mock_redis_client.xread.return_value = [ + mock_redis_client.xreadgroup.return_value = [ (b"stream1", [(b"1-0", {b"value": json.dumps({"id": 1, "source": "stream"}).encode()})]) ] mock_redis_client.blpop.side_effect = [ @@ -171,6 +186,8 @@ def test_redis_source_mixed_streams_and_lists(mock_redis_client): stream_keys=["stream1"], list_keys=["list1"], continuous=False, # Use non-continuous for predictable test + consumer_group="test-group", + consumer_name="test-consumer", ) source.connect() @@ -192,7 +209,7 @@ def test_redis_source_arrow_ipc_format(mock_redis_client): writer.close() arrow_data = sink.getvalue().to_pybytes() - mock_redis_client.xread.side_effect = [ + mock_redis_client.xreadgroup.side_effect = [ [(b"stream1", [(b"1-0", {b"value": arrow_data})])], [], ] @@ -203,6 +220,8 @@ def test_redis_source_arrow_ipc_format(mock_redis_client): stream_keys=["stream1"], value_format="arrow_ipc", continuous=False, + consumer_group="test-group", + consumer_name="test-consumer", ) source.connect() @@ -213,3 +232,120 @@ def test_redis_source_arrow_ipc_format(mock_redis_client): assert batches[0].column_names == ["id", "name"] source.close() + + +def test_redis_source_requires_consumer_group_for_streams(mock_redis_client): + """Test that consumer_group is required when using Redis Streams.""" + source = RedisSource( + host="localhost", + port=6379, + stream_keys=["stream1"], + ) + + # Should raise ValueError when connecting without consumer_group + with pytest.raises(ValueError, match="consumer_group is required when using Redis Streams"): + source.connect() + + +def test_redis_source_consumer_group_creation(mock_redis_client): + """Test that consumer group is created if it doesn't exist.""" + # Reset mock to not have side effect for this test + mock_redis_client.xgroup_create.side_effect = None + mock_redis_client.xgroup_create.return_value = True + + source = RedisSource( + host="localhost", + port=6379, + stream_keys=["stream1", "stream2"], + consumer_group="test-group", + consumer_name="test-consumer", + ) + source.connect() + + # Verify xgroup_create was called for each stream + assert mock_redis_client.xgroup_create.call_count == 2 + mock_redis_client.xgroup_create.assert_any_call( + name="stream1", groupname="test-group", id="0", mkstream=True + ) + mock_redis_client.xgroup_create.assert_any_call( + name="stream2", groupname="test-group", id="0", mkstream=True + ) + + source.close() + + +def test_redis_source_default_consumer_name(mock_redis_client): + """Test that consumer_name defaults to hostname-id if not provided.""" + source = RedisSource( + host="localhost", + port=6379, + stream_keys=["stream1"], + consumer_group="test-group", + ) + source.connect() + + # Should have generated a default consumer_name + assert source.consumer_name is not None + assert "-" in source.consumer_name # Should contain hostname and id + + source.close() + + +def test_redis_source_lists_dont_require_consumer_group(mock_redis_client): + """Test that lists work without consumer_group.""" + mock_redis_client.blpop.side_effect = [ + ("list1", json.dumps({"id": 1, "name": "test"}).encode()), + None, # Return None to stop reading + ] + + source = RedisSource( + host="localhost", + port=6379, + list_keys=["list1"], + continuous=False, + ) + source.connect() + + batches = list(source.read_batch(batch_size=10)) + + assert len(batches) == 1 + assert batches[0].num_rows == 1 + + source.close() + + +def test_redis_source_xack_called_for_each_message(mock_redis_client): + """Test that XACK is called for each message read.""" + mock_redis_client.xreadgroup.return_value = [ + ( + b"stream1", + [ + (b"1-0", {b"value": json.dumps({"id": 1}).encode()}), + (b"1-1", {b"value": json.dumps({"id": 2}).encode()}), + (b"1-2", {b"value": json.dumps({"id": 3}).encode()}), + ], + ) + ] + + source = RedisSource( + host="localhost", + port=6379, + stream_keys=["stream1"], + continuous=False, + consumer_group="test-group", + consumer_name="test-consumer", + ) + source.connect() + + batches = list(source.read_batch(batch_size=10)) + + assert len(batches) == 1 + assert batches[0].num_rows == 3 + + # Verify XACK was called for each message + assert mock_redis_client.xack.call_count == 3 + mock_redis_client.xack.assert_any_call("stream1", "test-group", "1-0") + mock_redis_client.xack.assert_any_call("stream1", "test-group", "1-1") + mock_redis_client.xack.assert_any_call("stream1", "test-group", "1-2") + + source.close()