Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,18 +175,31 @@ source:
value_format: json # or arrow_ipc
batch_size: 1000
continuous: true # Continuously consume data (default: true)
consumer_group: my-consumer-group # Required for Redis Streams
consumer_name: optional # Defaults to hostname-id if not provided
```

The Redis source supports two consumption modes:
- **Continuous mode** (default): Continuously polls Redis for new data, similar to Kafka consumer
- **One-shot mode**: Reads available data once and stops

**Redis Streams Consumer Groups:**
When using Redis Streams (`stream_keys`), a `consumer_group` is required. The source uses Redis consumer groups to:
- Track message consumption across multiple consumers
- Enable parallel processing with multiple pipeline instances
- Automatically acknowledge messages after successful processing with XACK
- Provide at-least-once delivery semantics

If `consumer_name` is not provided, it defaults to `{hostname}-{id}` to ensure uniqueness.

Environment variables:
- `REDIS_HOST`: Redis host
- `REDIS_PORT`: Redis port
- `REDIS_PASSWORD`: Redis password
- `REDIS_STREAM_KEYS`: Comma-separated list of stream keys
- `REDIS_CONTINUOUS`: Enable continuous consumption (true/false)
- `REDIS_CONSUMER_GROUP`: Consumer group name (required for streams)
- `REDIS_CONSUMER_NAME`: Consumer name within the group

### Sink Configuration

Expand Down
5 changes: 5 additions & 0 deletions USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export SOURCE_TYPE=redis
export REDIS_HOST=localhost
export REDIS_PORT=6379
export REDIS_STREAM_KEYS=sensor-data,user-events
export REDIS_CONSUMER_GROUP=fs-data-sink-group # Required for Redis Streams

export SINK_TYPE=hdfs
export HDFS_URL=http://namenode:9870
Expand Down Expand Up @@ -153,6 +154,8 @@ source:
- queue:tasks
value_format: json # or arrow_ipc
batch_size: 1000
consumer_group: fs-data-sink-group # Required for Redis Streams
consumer_name: optional # Defaults to hostname-id if not provided
extra_config:
# Any additional Redis client configuration
socket_timeout: 5
Expand Down Expand Up @@ -232,6 +235,8 @@ All configuration can be overridden with environment variables:
- `REDIS_PORT`: Redis port
- `REDIS_PASSWORD`: Redis password
- `REDIS_STREAM_KEYS`: comma-separated
- `REDIS_CONSUMER_GROUP`: consumer group (required for streams)
- `REDIS_CONSUMER_NAME`: consumer name (optional)

#### Sink
- `SINK_TYPE`: s3 or hdfs
Expand Down
1 change: 1 addition & 0 deletions config/example-redis-to-hdfs.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ stream_keys = data-stream-1,data-stream-2
value_format = json
batch_size = 1000
continuous = true
consumer_group = fs-data-sink-group

[sink]
type = hdfs
Expand Down
2 changes: 2 additions & 0 deletions config/example-redis-to-hdfs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ source:
value_format: json # or arrow_ipc
batch_size: 1000
continuous: true # Continuously consume data (default: true)
consumer_group: fs-data-sink-group # Required for Redis Streams
# consumer_name: optional # Defaults to hostname-id if not provided

sink:
type: hdfs
Expand Down
1 change: 1 addition & 0 deletions config/example-redis-to-local.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ stream_keys = data-stream-1,data-stream-2
value_format = json
batch_size = 1000
continuous = true
consumer_group = fs-data-sink-group

[sink]
type = local
Expand Down
13 changes: 13 additions & 0 deletions src/fs_data_sink/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class SourceConfig:
stream_keys: Optional[list[str]] = None
list_keys: Optional[list[str]] = None
continuous: bool = True
consumer_group: Optional[str] = None
consumer_name: Optional[str] = None

# Common
value_format: str = "json"
Expand Down Expand Up @@ -126,6 +128,13 @@ def _load_ini_file(config_path: str) -> dict:
# Handle boolean values
elif key in ("continuous",):
config_data["source"][key] = config.getboolean("source", key)
# Handle string values that could be None/null
elif key in ("consumer_group", "consumer_name") and value.lower() in (
"null",
"none",
"",
):
config_data["source"][key] = None
else:
config_data["source"][key] = value

Expand Down Expand Up @@ -232,6 +241,10 @@ def _apply_env_overrides(config_data: dict) -> None:
source["stream_keys"] = os.getenv("REDIS_STREAM_KEYS").split(",")
if os.getenv("REDIS_CONTINUOUS"):
source["continuous"] = os.getenv("REDIS_CONTINUOUS").lower() in ("true", "1", "yes")
if os.getenv("REDIS_CONSUMER_GROUP"):
source["consumer_group"] = os.getenv("REDIS_CONSUMER_GROUP")
if os.getenv("REDIS_CONSUMER_NAME"):
source["consumer_name"] = os.getenv("REDIS_CONSUMER_NAME")

# Sink overrides
sink = config_data.setdefault("sink", {})
Expand Down
2 changes: 2 additions & 0 deletions src/fs_data_sink/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ def _create_source(self) -> DataSource:
value_format=source_config.value_format,
continuous=source_config.continuous,
redis_config=source_config.extra_config,
consumer_group=source_config.consumer_group,
consumer_name=source_config.consumer_name,
)

raise ValueError(f"Unsupported source type: {source_config.type}")
Expand Down
91 changes: 81 additions & 10 deletions src/fs_data_sink/sources/redis_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def __init__(
block_timeout: int = 1000,
continuous: bool = True,
redis_config: Optional[dict] = None,
consumer_group: Optional[str] = None,
consumer_name: Optional[str] = None,
):
"""
Initialize Redis source.
Expand All @@ -48,6 +50,8 @@ def __init__(
block_timeout: Timeout in milliseconds for blocking operations
continuous: If True, continuously consume data in a loop; if False, read once and stop
redis_config: Additional Redis configuration
consumer_group: Consumer group name (required for Redis Streams)
consumer_name: Consumer name within the group (defaults to hostname if not provided)
"""
self.host = host
self.port = port
Expand All @@ -59,8 +63,10 @@ def __init__(
self.block_timeout = block_timeout
self.continuous = continuous
self.redis_config = redis_config or {}
self.consumer_group = consumer_group
self.consumer_name = consumer_name
self.client: Optional[redis.Redis] = None
self.stream_ids: dict[str, str] = dict.fromkeys(self.stream_keys, "0")
self.stream_ids: dict[str, str] = dict.fromkeys(self.stream_keys, ">")

def connect(self) -> None:
"""Establish connection to Redis."""
Expand All @@ -69,6 +75,10 @@ def connect(self) -> None:
"Connecting to Redis: host=%s, port=%s, db=%s", self.host, self.port, self.db
)

# Validate that consumer_group is provided if using streams
if self.stream_keys and not self.consumer_group:
raise ValueError("consumer_group is required when using Redis Streams")

self.client = redis.Redis(
host=self.host,
port=self.port,
Expand All @@ -82,6 +92,39 @@ def connect(self) -> None:
self.client.ping()
logger.info("Successfully connected to Redis")

# Set default consumer name if not provided
if self.stream_keys and not self.consumer_name:
import socket

self.consumer_name = f"{socket.gethostname()}-{id(self)}"
logger.info("Using default consumer name: %s", self.consumer_name)

# Create consumer groups for streams if they don't exist
if self.stream_keys and self.consumer_group:
for stream_key in self.stream_keys:
try:
# Try to create the consumer group
# Start from the beginning ('0') to consume all messages
# Use mkstream=True to create stream if it doesn't exist
self.client.xgroup_create(
name=stream_key, groupname=self.consumer_group, id="0", mkstream=True
)
logger.info(
"Created consumer group '%s' for stream '%s'",
self.consumer_group,
stream_key,
)
except redis.exceptions.ResponseError as e:
# Group already exists, which is fine
if "BUSYGROUP" in str(e):
logger.info(
"Consumer group '%s' already exists for stream '%s'",
self.consumer_group,
stream_key,
)
else:
raise

def read_batch(self, batch_size: int = 1000) -> Iterator[Optional[pa.RecordBatch]]:
"""
Read data batches from Redis.
Expand Down Expand Up @@ -134,30 +177,58 @@ def read_batch(self, batch_size: int = 1000) -> Iterator[Optional[pa.RecordBatch
break

def _read_from_streams(self, batch_size: int) -> list:
"""Read messages from Redis streams."""
"""Read messages from Redis streams using consumer groups."""
messages = []
message_ids_to_ack = [] # Track message IDs for acknowledgment

try:
# Prepare stream IDs for XREAD
streams = {key: self.stream_ids[key] for key in self.stream_keys}

# Read from all streams
results = self.client.xread(streams=streams, count=batch_size, block=self.block_timeout)
# Prepare stream IDs for XREADGROUP
# Use ">" to receive messages never delivered to other consumers
streams = dict.fromkeys(self.stream_keys, ">")

# Read from all streams using consumer group
results = self.client.xreadgroup(
groupname=self.consumer_group,
consumername=self.consumer_name,
streams=streams,
count=batch_size,
block=self.block_timeout,
)

if results:
for stream_key, stream_messages in results:
stream_key_str = (
stream_key.decode() if isinstance(stream_key, bytes) else stream_key
)

for msg_id, msg_data in stream_messages:
# Update last read ID
key = stream_key.decode() if isinstance(stream_key, bytes) else stream_key
msg_id_str = msg_id.decode() if isinstance(msg_id, bytes) else msg_id
self.stream_ids[key] = msg_id_str

# Track message ID and stream for acknowledgment
message_ids_to_ack.append((stream_key_str, msg_id_str))

# Extract message value
if b"value" in msg_data:
messages.append(msg_data[b"value"])
elif "value" in msg_data:
messages.append(msg_data["value"])

# Acknowledge all messages after successful reading
if message_ids_to_ack:
for stream_key, msg_id in message_ids_to_ack:
try:
self.client.xack(stream_key, self.consumer_group, msg_id)
logger.debug(
"Acknowledged message %s from stream %s", msg_id, stream_key
)
except Exception as ack_error:
logger.error(
"Error acknowledging message %s from stream %s: %s",
msg_id,
stream_key,
ack_error,
)

except Exception as e:
logger.error("Error reading from Redis streams: %s", e, exc_info=True)

Expand Down
10 changes: 9 additions & 1 deletion tests/unit/test_nonblocking_flush.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,28 @@ def test_continuous_redis_yields_none_when_no_data():
"""Test that Redis source in continuous mode yields None when no data."""
from unittest.mock import MagicMock, patch

import redis

from fs_data_sink.sources import RedisSource

mock_client = MagicMock()
mock_client.xread.side_effect = [
mock_client.xreadgroup.side_effect = [
[(b"stream1", [(b"1-0", {b"value": b'{"id": 1}'})])], # First call returns data
[], # Second call returns no data
[], # Third call returns no data
]
# Mock xgroup_create to simulate group already exists
mock_client.xgroup_create.side_effect = redis.exceptions.ResponseError(
"BUSYGROUP Consumer Group name already exists"
)

with patch("fs_data_sink.sources.redis_source.redis.Redis", return_value=mock_client):
source = RedisSource(
host="localhost",
stream_keys=["stream1"],
continuous=True,
consumer_group="test-group",
consumer_name="test-consumer",
)
source.connect()

Expand Down
Loading
Loading