Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,73 @@ Environment variables:
- `PIPELINE_FLUSH_INTERVAL_SECONDS`: Flush interval in seconds
- `PIPELINE_FLUSH_INTERVAL_BATCHES`: Flush interval in batches

### File Merging Configuration

The sink can automatically merge small Parquet files into larger consolidated files based on configured time periods (hourly, daily, weekly, monthly). This reduces the number of small files and improves query performance.

```yaml
sink:
type: s3 # or hdfs, local
bucket: my-bucket
prefix: raw-data
compression: snappy
partition_by:
- date
# File merging configuration
merge_enabled: true # Enable automatic file merging
merge_period: hour # Time period for grouping files: 'hour', 'day', 'week', 'month'
merge_min_files: 2 # Minimum number of files required to trigger a merge
merge_on_flush: false # Whether to merge files during flush operations
```

**Merge Behavior:**
- Files are grouped by their timestamp (extracted from filename) and configured period
- Only groups with at least `merge_min_files` files will be merged
- Merged files are named with the pattern `merged_{period_key}.parquet` (e.g., `merged_20241113_06.parquet` for hourly)
- Original files are deleted after successful merge
- Already merged files are skipped in subsequent merge operations

**Merge Periods:**
- **hour**: Merges files created within the same hour (e.g., all files from 2024-11-13 06:00-06:59)
- **day**: Merges files created within the same day (e.g., all files from 2024-11-13)
- **week**: Merges files created within the same ISO week (e.g., week 46 of 2024)
- **month**: Merges files created within the same month (e.g., November 2024)

**Merge Triggers:**
- **Manual**: Call `sink.merge_files()` explicitly
- **On flush**: Set `merge_on_flush: true` to merge after each flush operation
- **Scheduled**: Run merge as a separate scheduled job/cron

Example configurations:

```yaml
# Hourly merge (merge files from each hour)
sink:
merge_enabled: true
merge_period: hour
merge_min_files: 5
merge_on_flush: false

# Daily merge (merge files from each day)
sink:
merge_enabled: true
merge_period: day
merge_min_files: 10
merge_on_flush: true
```

**Benefits:**
- Reduces the number of small files (Small File Problem)
- Improves query performance in analytics databases
- Better resource utilization in distributed systems (Spark, Presto, Hive)
- Reduced metadata overhead

Environment variables:
- `SINK_MERGE_ENABLED`: Enable file merging (true/false)
- `SINK_MERGE_PERIOD`: Merge period (hour/day/week/month)
- `SINK_MERGE_MIN_FILES`: Minimum files to trigger merge
- `SINK_MERGE_ON_FLUSH`: Merge on flush (true/false)

## Data Formats

### JSON Format
Expand Down
41 changes: 41 additions & 0 deletions config/example-kafka-to-s3-with-merge.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Example configuration: Kafka to S3 with automatic file merging
source:
type: kafka
bootstrap_servers:
- localhost:9092
topics:
- data-topic
group_id: fs-data-sink-group
value_format: json # or arrow_ipc
batch_size: 1000

sink:
type: s3
bucket: my-data-bucket
prefix: raw-data
region_name: us-east-1
compression: snappy # snappy, gzip, brotli, zstd, none
partition_by:
- date
- hour
# File merging configuration
merge_enabled: true # Enable automatic file merging
merge_period: hour # Merge files by hour (hour, day, week, month)
merge_min_files: 5 # Require at least 5 files to trigger merge
merge_on_flush: false # Don't merge on every flush (merge separately)

telemetry:
log_level: INFO
log_format: json # json or text
enabled: true
service_name: fs-data-sink
otlp_endpoint: http://localhost:4317
trace_enabled: true
metrics_enabled: true

pipeline:
max_batches: null # null for unlimited
batch_timeout_seconds: 30
error_handling: log # log, raise, or ignore
flush_interval_seconds: 300 # Flush every 5 minutes
flush_interval_batches: 100 # Or flush after 100 batches
22 changes: 22 additions & 0 deletions src/fs_data_sink/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ class SinkConfig:
partition_by: Optional[list[str]] = None
extra_config: dict = field(default_factory=dict)

# Merge configuration
merge_enabled: bool = False
merge_period: str = "hour" # 'hour', 'day', 'week', 'month'
merge_min_files: int = 2 # Minimum files to trigger merge
merge_on_flush: bool = False # Merge during flush operations


@dataclass
class TelemetryConfig:
Expand Down Expand Up @@ -144,6 +150,12 @@ def _load_ini_file(config_path: str) -> dict:
# Handle list values
if key in ("partition_by",):
config_data["sink"][key] = [v.strip() for v in value.split(",") if v.strip()]
# Handle integer values
elif key in ("merge_min_files",):
config_data["sink"][key] = config.getint("sink", key)
# Handle boolean values
elif key in ("merge_enabled", "merge_on_flush"):
config_data["sink"][key] = config.getboolean("sink", key)
else:
config_data["sink"][key] = value

Expand Down Expand Up @@ -272,6 +284,16 @@ def _apply_env_overrides(config_data: dict) -> None:
if os.getenv("HDFS_USER"):
sink["user"] = os.getenv("HDFS_USER")

# Merge configuration
if os.getenv("SINK_MERGE_ENABLED"):
sink["merge_enabled"] = os.getenv("SINK_MERGE_ENABLED").lower() in ("true", "1", "yes")
if os.getenv("SINK_MERGE_PERIOD"):
sink["merge_period"] = os.getenv("SINK_MERGE_PERIOD")
if os.getenv("SINK_MERGE_MIN_FILES"):
sink["merge_min_files"] = int(os.getenv("SINK_MERGE_MIN_FILES"))
if os.getenv("SINK_MERGE_ON_FLUSH"):
sink["merge_on_flush"] = os.getenv("SINK_MERGE_ON_FLUSH").lower() in ("true", "1", "yes")

# Telemetry overrides
telemetry = config_data.setdefault("telemetry", {})

Expand Down
12 changes: 12 additions & 0 deletions src/fs_data_sink/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ def _create_sink(self) -> DataSink:
compression=sink_config.compression,
partition_by=sink_config.partition_by,
s3_config=sink_config.extra_config,
merge_enabled=sink_config.merge_enabled,
merge_period=sink_config.merge_period,
merge_min_files=sink_config.merge_min_files,
merge_on_flush=sink_config.merge_on_flush,
)

if sink_config.type == "hdfs":
Expand All @@ -113,6 +117,10 @@ def _create_sink(self) -> DataSink:
compression=sink_config.compression,
partition_by=sink_config.partition_by,
hdfs_config=sink_config.extra_config,
merge_enabled=sink_config.merge_enabled,
merge_period=sink_config.merge_period,
merge_min_files=sink_config.merge_min_files,
merge_on_flush=sink_config.merge_on_flush,
)

if sink_config.type == "local":
Expand All @@ -123,6 +131,10 @@ def _create_sink(self) -> DataSink:
base_path=sink_config.base_path,
compression=sink_config.compression,
partition_by=sink_config.partition_by,
merge_enabled=sink_config.merge_enabled,
merge_period=sink_config.merge_period,
merge_min_files=sink_config.merge_min_files,
merge_on_flush=sink_config.merge_on_flush,
)

raise ValueError(f"Unsupported sink type: {sink_config.type}")
Expand Down
Loading
Loading