diff --git a/README.md b/README.md index fb055f5..7870072 100644 --- a/README.md +++ b/README.md @@ -327,6 +327,73 @@ Environment variables: - `PIPELINE_FLUSH_INTERVAL_SECONDS`: Flush interval in seconds - `PIPELINE_FLUSH_INTERVAL_BATCHES`: Flush interval in batches +### File Merging Configuration + +The sink can automatically merge small Parquet files into larger consolidated files based on configured time periods (hourly, daily, weekly, monthly). This reduces the number of small files and improves query performance. + +```yaml +sink: + type: s3 # or hdfs, local + bucket: my-bucket + prefix: raw-data + compression: snappy + partition_by: + - date + # File merging configuration + merge_enabled: true # Enable automatic file merging + merge_period: hour # Time period for grouping files: 'hour', 'day', 'week', 'month' + merge_min_files: 2 # Minimum number of files required to trigger a merge + merge_on_flush: false # Whether to merge files during flush operations +``` + +**Merge Behavior:** +- Files are grouped by their timestamp (extracted from filename) and configured period +- Only groups with at least `merge_min_files` files will be merged +- Merged files are named with the pattern `merged_{period_key}.parquet` (e.g., `merged_20241113_06.parquet` for hourly) +- Original files are deleted after successful merge +- Already merged files are skipped in subsequent merge operations + +**Merge Periods:** +- **hour**: Merges files created within the same hour (e.g., all files from 2024-11-13 06:00-06:59) +- **day**: Merges files created within the same day (e.g., all files from 2024-11-13) +- **week**: Merges files created within the same ISO week (e.g., week 46 of 2024) +- **month**: Merges files created within the same month (e.g., November 2024) + +**Merge Triggers:** +- **Manual**: Call `sink.merge_files()` explicitly +- **On flush**: Set `merge_on_flush: true` to merge after each flush operation +- **Scheduled**: Run merge as a separate scheduled job/cron + +Example configurations: + +```yaml +# Hourly merge (merge files from each hour) +sink: + merge_enabled: true + merge_period: hour + merge_min_files: 5 + merge_on_flush: false + +# Daily merge (merge files from each day) +sink: + merge_enabled: true + merge_period: day + merge_min_files: 10 + merge_on_flush: true +``` + +**Benefits:** +- Reduces the number of small files (Small File Problem) +- Improves query performance in analytics databases +- Better resource utilization in distributed systems (Spark, Presto, Hive) +- Reduced metadata overhead + +Environment variables: +- `SINK_MERGE_ENABLED`: Enable file merging (true/false) +- `SINK_MERGE_PERIOD`: Merge period (hour/day/week/month) +- `SINK_MERGE_MIN_FILES`: Minimum files to trigger merge +- `SINK_MERGE_ON_FLUSH`: Merge on flush (true/false) + ## Data Formats ### JSON Format diff --git a/config/example-kafka-to-s3-with-merge.yaml b/config/example-kafka-to-s3-with-merge.yaml new file mode 100644 index 0000000..e27ac7c --- /dev/null +++ b/config/example-kafka-to-s3-with-merge.yaml @@ -0,0 +1,41 @@ +# Example configuration: Kafka to S3 with automatic file merging +source: + type: kafka + bootstrap_servers: + - localhost:9092 + topics: + - data-topic + group_id: fs-data-sink-group + value_format: json # or arrow_ipc + batch_size: 1000 + +sink: + type: s3 + bucket: my-data-bucket + prefix: raw-data + region_name: us-east-1 + compression: snappy # snappy, gzip, brotli, zstd, none + partition_by: + - date + - hour + # File merging configuration + merge_enabled: true # Enable automatic file merging + merge_period: hour # Merge files by hour (hour, day, week, month) + merge_min_files: 5 # Require at least 5 files to trigger merge + merge_on_flush: false # Don't merge on every flush (merge separately) + +telemetry: + log_level: INFO + log_format: json # json or text + enabled: true + service_name: fs-data-sink + otlp_endpoint: http://localhost:4317 + trace_enabled: true + metrics_enabled: true + +pipeline: + max_batches: null # null for unlimited + batch_timeout_seconds: 30 + error_handling: log # log, raise, or ignore + flush_interval_seconds: 300 # Flush every 5 minutes + flush_interval_batches: 100 # Or flush after 100 batches diff --git a/src/fs_data_sink/config/settings.py b/src/fs_data_sink/config/settings.py index 39fd9d8..fa82d5c 100644 --- a/src/fs_data_sink/config/settings.py +++ b/src/fs_data_sink/config/settings.py @@ -62,6 +62,12 @@ class SinkConfig: partition_by: Optional[list[str]] = None extra_config: dict = field(default_factory=dict) + # Merge configuration + merge_enabled: bool = False + merge_period: str = "hour" # 'hour', 'day', 'week', 'month' + merge_min_files: int = 2 # Minimum files to trigger merge + merge_on_flush: bool = False # Merge during flush operations + @dataclass class TelemetryConfig: @@ -144,6 +150,12 @@ def _load_ini_file(config_path: str) -> dict: # Handle list values if key in ("partition_by",): config_data["sink"][key] = [v.strip() for v in value.split(",") if v.strip()] + # Handle integer values + elif key in ("merge_min_files",): + config_data["sink"][key] = config.getint("sink", key) + # Handle boolean values + elif key in ("merge_enabled", "merge_on_flush"): + config_data["sink"][key] = config.getboolean("sink", key) else: config_data["sink"][key] = value @@ -272,6 +284,16 @@ def _apply_env_overrides(config_data: dict) -> None: if os.getenv("HDFS_USER"): sink["user"] = os.getenv("HDFS_USER") + # Merge configuration + if os.getenv("SINK_MERGE_ENABLED"): + sink["merge_enabled"] = os.getenv("SINK_MERGE_ENABLED").lower() in ("true", "1", "yes") + if os.getenv("SINK_MERGE_PERIOD"): + sink["merge_period"] = os.getenv("SINK_MERGE_PERIOD") + if os.getenv("SINK_MERGE_MIN_FILES"): + sink["merge_min_files"] = int(os.getenv("SINK_MERGE_MIN_FILES")) + if os.getenv("SINK_MERGE_ON_FLUSH"): + sink["merge_on_flush"] = os.getenv("SINK_MERGE_ON_FLUSH").lower() in ("true", "1", "yes") + # Telemetry overrides telemetry = config_data.setdefault("telemetry", {}) diff --git a/src/fs_data_sink/pipeline.py b/src/fs_data_sink/pipeline.py index 5b9fb4d..3abead8 100644 --- a/src/fs_data_sink/pipeline.py +++ b/src/fs_data_sink/pipeline.py @@ -100,6 +100,10 @@ def _create_sink(self) -> DataSink: compression=sink_config.compression, partition_by=sink_config.partition_by, s3_config=sink_config.extra_config, + merge_enabled=sink_config.merge_enabled, + merge_period=sink_config.merge_period, + merge_min_files=sink_config.merge_min_files, + merge_on_flush=sink_config.merge_on_flush, ) if sink_config.type == "hdfs": @@ -113,6 +117,10 @@ def _create_sink(self) -> DataSink: compression=sink_config.compression, partition_by=sink_config.partition_by, hdfs_config=sink_config.extra_config, + merge_enabled=sink_config.merge_enabled, + merge_period=sink_config.merge_period, + merge_min_files=sink_config.merge_min_files, + merge_on_flush=sink_config.merge_on_flush, ) if sink_config.type == "local": @@ -123,6 +131,10 @@ def _create_sink(self) -> DataSink: base_path=sink_config.base_path, compression=sink_config.compression, partition_by=sink_config.partition_by, + merge_enabled=sink_config.merge_enabled, + merge_period=sink_config.merge_period, + merge_min_files=sink_config.merge_min_files, + merge_on_flush=sink_config.merge_on_flush, ) raise ValueError(f"Unsupported sink type: {sink_config.type}") diff --git a/src/fs_data_sink/sinks/hdfs_sink.py b/src/fs_data_sink/sinks/hdfs_sink.py index 432165f..3c34410 100644 --- a/src/fs_data_sink/sinks/hdfs_sink.py +++ b/src/fs_data_sink/sinks/hdfs_sink.py @@ -32,6 +32,10 @@ def __init__( compression: str = "snappy", partition_by: Optional[list[str]] = None, hdfs_config: Optional[dict] = None, + merge_enabled: bool = False, + merge_period: str = "hour", + merge_min_files: int = 2, + merge_on_flush: bool = False, ): """ Initialize HDFS sink. @@ -43,6 +47,10 @@ def __init__( compression: Compression codec for Parquet ('snappy', 'gzip', 'brotli', 'zstd', 'none') partition_by: List of column names to partition by hdfs_config: Additional HDFS client configuration + merge_enabled: Enable automatic file merging + merge_period: Period for merging files ('hour', 'day', 'week', 'month') + merge_min_files: Minimum number of files to trigger a merge + merge_on_flush: Merge files during flush operations """ self.url = url self.base_path = base_path.rstrip("/") @@ -50,6 +58,10 @@ def __init__( self.compression = compression self.partition_by = partition_by or [] self.hdfs_config = hdfs_config or {} + self.merge_enabled = merge_enabled + self.merge_period = merge_period + self.merge_min_files = merge_min_files + self.merge_on_flush = merge_on_flush self.client = None self.file_counter = 0 self.buffered_batches: list[pa.RecordBatch] = [] @@ -173,10 +185,242 @@ def flush(self) -> None: # Clear the buffer self.buffered_batches.clear() + # Optionally merge files after flush + if self.merge_enabled and self.merge_on_flush: + logger.info("Merging files after flush") + self.merge_files() + except Exception as e: logger.error("Error flushing batches to HDFS: %s", e, exc_info=True) raise + def merge_files(self, period: Optional[str] = None) -> int: + """ + Merge small Parquet files into larger consolidated files by time period. + + This method reads existing Parquet files from HDFS, groups them by time period based on + their filenames, and merges files in each group into a single consolidated file. + Dictionary encoding is explicitly disabled for merged files to avoid schema compatibility + issues when reading files that may have been written with different dictionary encodings. + + Args: + period: Time period for grouping files ('hour', 'day', 'week', 'month') + If None, uses the sink's configured merge_period + + Returns: + Number of files merged + """ + if not self.merge_enabled: + logger.debug("File merging is disabled") + return 0 + + if not self.client: + raise RuntimeError("Not connected. Call connect() first.") + + merge_period = period or self.merge_period + logger.info("Starting HDFS file merge with period: %s", merge_period) + + with tracer.start_as_current_span("hdfs_merge") as span: + span.set_attribute("merge.period", merge_period) + total_merged = 0 + + try: + # Find all directories to process + dirs_to_process = [self.base_path] + + # If partitioned, include partition directories + if self.partition_by: + for item in self.client.list(self.base_path, status=False): + item_path = f"{self.base_path}/{item}" + if self.client.status(item_path)["type"] == "DIRECTORY": + dirs_to_process.append(item_path) + + for directory in dirs_to_process: + try: + # Group files by time period + file_groups = self._group_hdfs_files_by_period(directory, merge_period) + + for period_key, files in file_groups.items(): + if len(files) < self.merge_min_files: + logger.debug( + "Skipping merge for period %s: only %d files (min: %d)", + period_key, + len(files), + self.merge_min_files, + ) + continue + + # Merge files in this period + merged = self._merge_hdfs_file_group(directory, period_key, files) + total_merged += merged + + except Exception as e: + logger.error("Error processing directory %s: %s", directory, e) + continue + + span.set_attribute("merge.files_merged", total_merged) + logger.info("HDFS merge completed: %d files merged", total_merged) + return total_merged + + except Exception as e: + logger.error("Error during HDFS file merge: %s", e, exc_info=True) + raise + + def _group_hdfs_files_by_period(self, directory: str, period: str) -> dict[str, list[str]]: + """Group HDFS Parquet files by time period based on filename timestamp.""" + from collections import defaultdict + + groups = defaultdict(list) + + try: + # List all files in the directory + files = self.client.list(directory, status=False) + + for filename in files: + if not filename.endswith(".parquet") or not filename.startswith("data_"): + continue + + file_path = f"{directory}/{filename}" + + # Extract period from filename + period_key = self._extract_period_from_filename(filename, period) + if period_key: + groups[period_key].append(file_path) + + except Exception as e: + logger.error("Error listing HDFS directory %s: %s", directory, e) + + return groups + + def _extract_period_from_filename(self, filename: str, period: str) -> Optional[str]: + """Extract time period key from filename.""" + try: + # Filename format: data_YYYYMMDD_HHMMSS_*.parquet + parts = filename.split("_") + if len(parts) < 3 or not filename.startswith("data_"): + return None + + date_str = parts[1] # YYYYMMDD + time_str = parts[2] # HHMMSS + + year = date_str[0:4] + month = date_str[4:6] + day = date_str[6:8] + hour = time_str[0:2] + + if period == "hour": + return f"{year}{month}{day}_{hour}" + elif period == "day": + return f"{year}{month}{day}" + elif period == "week": + from datetime import datetime + + dt = datetime(int(year), int(month), int(day)) + week = dt.isocalendar()[1] + return f"{year}W{week:02d}" + elif period == "month": + return f"{year}{month}" + else: + return f"{year}{month}{day}" + + except (ValueError, IndexError) as e: + logger.warning("Could not parse timestamp from filename %s: %s", filename, e) + return None + + def _merge_hdfs_file_group(self, directory: str, period_key: str, files: list[str]) -> int: + """Merge a group of HDFS files into a single consolidated file.""" + try: + logger.info( + "Merging %d HDFS files for period %s in %s", len(files), period_key, directory + ) + + # Read all files (avoid dictionary encoding for easier merging) + tables = [] + for file_path in files: + try: + with self.client.read(file_path) as reader: + data = reader.read() + buffer = BytesIO(data) + parquet_file = pq.ParquetFile(buffer) + table = parquet_file.read(use_pandas_metadata=False) + + # Convert any dictionary-encoded columns to regular columns + columns = [] + for i, field in enumerate(table.schema): + column = table.column(i) + if pa.types.is_dictionary(field.type): + columns.append(column.dictionary_decode()) + else: + columns.append(column) + + # Rebuild table without dictionary encoding + if columns: + schema = pa.schema( + [ + pa.field( + field.name, + ( + field.type.value_type + if pa.types.is_dictionary(field.type) + else field.type + ), + ) + for field in table.schema + ] + ) + table = pa.Table.from_arrays(columns, schema=schema) + + tables.append(table) + except Exception as e: + logger.error("Failed to read HDFS file %s: %s", file_path, e) + continue + + if not tables: + logger.warning("No tables to merge for period %s", period_key) + return 0 + + # Concatenate all tables + merged_table = pa.concat_tables(tables) + + # Generate merged filename + merged_filename = f"merged_{period_key}.parquet" + merged_path = f"{directory}/{merged_filename}" + + # Write merged file (without forcing dictionary encoding) + buffer = BytesIO() + pq.write_table( + merged_table, + buffer, + compression=self.compression, + use_dictionary=False, # Don't use dictionary encoding for merged files + version="2.6", + ) + + buffer.seek(0) + with self.client.write(merged_path, overwrite=False) as writer: + writer.write(buffer.getvalue()) + + logger.info( + "Created merged HDFS file: %s (%d rows, %d bytes)", + merged_path, + merged_table.num_rows, + len(buffer.getvalue()), + ) + + # Delete original files + for file_path in files: + try: + self.client.delete(file_path) + logger.debug("Deleted original HDFS file: %s", file_path) + except Exception as e: + logger.error("Failed to delete HDFS file %s: %s", file_path, e) + + return len(files) + + except Exception as e: + logger.error("Error merging HDFS files for period %s: %s", period_key, e, exc_info=True) + return 0 + def close(self) -> None: """Close the HDFS connection.""" with tracer.start_as_current_span("hdfs_close"): diff --git a/src/fs_data_sink/sinks/local_sink.py b/src/fs_data_sink/sinks/local_sink.py index ae16daa..2991e46 100644 --- a/src/fs_data_sink/sinks/local_sink.py +++ b/src/fs_data_sink/sinks/local_sink.py @@ -1,6 +1,7 @@ """Local filesystem data sink implementation.""" import logging +import os from datetime import datetime from pathlib import Path from typing import Optional @@ -28,6 +29,10 @@ def __init__( base_path: str, compression: str = "snappy", partition_by: Optional[list[str]] = None, + merge_enabled: bool = False, + merge_period: str = "hour", + merge_min_files: int = 2, + merge_on_flush: bool = False, ): """ Initialize Local sink. @@ -36,10 +41,18 @@ def __init__( base_path: Base directory path for writing files compression: Compression codec for Parquet ('snappy', 'gzip', 'brotli', 'zstd', 'none') partition_by: List of column names to partition by + merge_enabled: Enable automatic file merging + merge_period: Period for merging files ('hour', 'day', 'week', 'month') + merge_min_files: Minimum number of files to trigger a merge + merge_on_flush: Merge files during flush operations """ self.base_path = Path(base_path).resolve() self.compression = compression self.partition_by = partition_by or [] + self.merge_enabled = merge_enabled + self.merge_period = merge_period + self.merge_min_files = merge_min_files + self.merge_on_flush = merge_on_flush self.file_counter = 0 self.buffered_batches: list[pa.RecordBatch] = [] @@ -136,10 +149,221 @@ def flush(self) -> None: # Clear the buffer self.buffered_batches.clear() + # Optionally merge files after flush + if self.merge_enabled and self.merge_on_flush: + logger.info("Merging files after flush") + self.merge_files() + except Exception as e: logger.error("Error flushing batches to local filesystem: %s", e, exc_info=True) raise + def merge_files(self, period: Optional[str] = None) -> int: + """ + Merge small Parquet files into larger consolidated files by time period. + + This method reads existing Parquet files, groups them by time period based on their + filenames, and merges files in each group into a single consolidated file. Dictionary + encoding is explicitly disabled for merged files to avoid schema compatibility issues + when reading files that may have been written with different dictionary encodings. + + Args: + period: Time period for grouping files ('hour', 'day', 'week', 'month') + If None, uses the sink's configured merge_period + + Returns: + Number of files merged + """ + if not self.merge_enabled: + logger.debug("File merging is disabled") + return 0 + + merge_period = period or self.merge_period + logger.info("Starting file merge with period: %s", merge_period) + + with tracer.start_as_current_span("local_merge") as span: + span.set_attribute("merge.period", merge_period) + total_merged = 0 + + try: + # Find all directories to process (including partitioned directories) + dirs_to_process = [self.base_path] + + # If partitioned, include partition directories + if self.partition_by: + # Use os.walk for Python 3.9+ compatibility + for root, dirs, _ in os.walk(self.base_path): + root_path = Path(root) + for d in dirs: + dirs_to_process.append(root_path / d) + + for directory in dirs_to_process: + if not directory.exists(): + continue + + # Group files by time period + file_groups = self._group_files_by_period(directory, merge_period) + + for period_key, files in file_groups.items(): + if len(files) < self.merge_min_files: + logger.debug( + "Skipping merge for period %s: only %d files (min: %d)", + period_key, + len(files), + self.merge_min_files, + ) + continue + + # Merge files in this period + merged = self._merge_file_group(directory, period_key, files) + total_merged += merged + + span.set_attribute("merge.files_merged", total_merged) + logger.info("Merge completed: %d files merged", total_merged) + return total_merged + + except Exception as e: + logger.error("Error during file merge: %s", e, exc_info=True) + raise + + def _group_files_by_period(self, directory: Path, period: str) -> dict[str, list[Path]]: + """Group Parquet files by time period based on filename timestamp.""" + from collections import defaultdict + + groups = defaultdict(list) + + # Find all parquet files in the directory + parquet_files = list(directory.glob("data_*.parquet")) + + for file_path in parquet_files: + try: + # Extract timestamp from filename (format: data_YYYYMMDD_HHMMSS_*.parquet) + filename = file_path.name + parts = filename.split("_") + if len(parts) < 3: + continue + + date_str = parts[1] # YYYYMMDD + time_str = parts[2] # HHMMSS + + # Parse timestamp + year = date_str[0:4] + month = date_str[4:6] + day = date_str[6:8] + hour = time_str[0:2] + + # Group by period + if period == "hour": + group_key = f"{year}{month}{day}_{hour}" + elif period == "day": + group_key = f"{year}{month}{day}" + elif period == "week": + # Calculate week number + from datetime import datetime + + dt = datetime(int(year), int(month), int(day)) + week = dt.isocalendar()[1] + group_key = f"{year}W{week:02d}" + elif period == "month": + group_key = f"{year}{month}" + else: + logger.warning("Unknown merge period: %s, using 'day'", period) + group_key = f"{year}{month}{day}" + + groups[group_key].append(file_path) + + except (ValueError, IndexError) as e: + logger.warning("Could not parse timestamp from file %s: %s", file_path, e) + continue + + return groups + + def _merge_file_group(self, directory: Path, period_key: str, files: list[Path]) -> int: + """Merge a group of files into a single consolidated file.""" + try: + logger.info("Merging %d files for period %s", len(files), period_key) + + # Read all files (avoid dictionary encoding for easier merging) + tables = [] + for file_path in files: + try: + # Read table without dictionary encoding to avoid schema conflicts + parquet_file = pq.ParquetFile(file_path) + table = parquet_file.read(use_pandas_metadata=False) + + # Convert any dictionary-encoded columns to regular columns + columns = [] + for i, field in enumerate(table.schema): + column = table.column(i) + if pa.types.is_dictionary(field.type): + columns.append(column.dictionary_decode()) + else: + columns.append(column) + + # Rebuild table without dictionary encoding + if columns: + schema = pa.schema( + [ + pa.field( + field.name, + ( + field.type.value_type + if pa.types.is_dictionary(field.type) + else field.type + ), + ) + for field in table.schema + ] + ) + table = pa.Table.from_arrays(columns, schema=schema) + + tables.append(table) + except Exception as e: + logger.error("Failed to read file %s: %s", file_path, e) + continue + + if not tables: + logger.warning("No tables to merge for period %s", period_key) + return 0 + + # Concatenate all tables + merged_table = pa.concat_tables(tables) + + # Generate merged filename + merged_filename = f"merged_{period_key}.parquet" + merged_path = directory / merged_filename + + # Write merged file (without forcing dictionary encoding) + pq.write_table( + merged_table, + merged_path, + compression=self.compression, + use_dictionary=False, # Don't use dictionary encoding for merged files + version="2.6", + ) + + merged_size = merged_path.stat().st_size + logger.info( + "Created merged file: %s (%d rows, %d bytes)", + merged_path, + merged_table.num_rows, + merged_size, + ) + + # Delete original files + for file_path in files: + try: + file_path.unlink() + logger.debug("Deleted original file: %s", file_path) + except Exception as e: + logger.error("Failed to delete file %s: %s", file_path, e) + + return len(files) + + except Exception as e: + logger.error("Error merging files for period %s: %s", period_key, e, exc_info=True) + return 0 + def close(self) -> None: """Close the connection.""" logger.info("Closing local sink connection") diff --git a/src/fs_data_sink/sinks/s3_sink.py b/src/fs_data_sink/sinks/s3_sink.py index 2d595f3..b34dee1 100644 --- a/src/fs_data_sink/sinks/s3_sink.py +++ b/src/fs_data_sink/sinks/s3_sink.py @@ -38,6 +38,10 @@ def __init__( partition_by: Optional[list[str]] = None, s3_config: Optional[dict] = None, secure: bool = True, + merge_enabled: bool = False, + merge_period: str = "hour", + merge_min_files: int = 2, + merge_on_flush: bool = False, ): """ Initialize S3 sink with MinIO client. @@ -53,6 +57,10 @@ def __init__( partition_by: List of column names to partition by s3_config: Additional MinIO client configuration secure: Use HTTPS for connections (default: True) + merge_enabled: Enable automatic file merging + merge_period: Period for merging files ('hour', 'day', 'week', 'month') + merge_min_files: Minimum number of files to trigger a merge + merge_on_flush: Merge files during flush operations """ self.bucket = bucket self.prefix = prefix.rstrip("/") @@ -64,6 +72,10 @@ def __init__( self.partition_by = partition_by or [] self.s3_config = s3_config or {} self.secure = secure + self.merge_enabled = merge_enabled + self.merge_period = merge_period + self.merge_min_files = merge_min_files + self.merge_on_flush = merge_on_flush self.s3_client = None self.file_counter = 0 self.buffered_batches: list[pa.RecordBatch] = [] @@ -211,6 +223,11 @@ def flush(self) -> None: # Clear the buffer self.buffered_batches.clear() + # Optionally merge files after flush + if self.merge_enabled and self.merge_on_flush: + logger.info("Merging files after flush") + self.merge_files() + except S3Error as e: logger.error("S3 error flushing batches: %s", e, exc_info=True) raise @@ -218,6 +235,240 @@ def flush(self) -> None: logger.error("Unexpected error flushing batches to S3: %s", e, exc_info=True) raise + def merge_files(self, period: Optional[str] = None) -> int: + """ + Merge small Parquet files into larger consolidated files by time period. + + This method reads existing Parquet files from S3, groups them by time period based on + their filenames, and merges files in each group into a single consolidated file. + Dictionary encoding is explicitly disabled for merged files to avoid schema compatibility + issues when reading files that may have been written with different dictionary encodings. + + Args: + period: Time period for grouping files ('hour', 'day', 'week', 'month') + If None, uses the sink's configured merge_period + + Returns: + Number of files merged + """ + if not self.merge_enabled: + logger.debug("File merging is disabled") + return 0 + + if not self.s3_client: + raise RuntimeError("Not connected. Call connect() first.") + + merge_period = period or self.merge_period + logger.info("Starting S3 file merge with period: %s", merge_period) + + with tracer.start_as_current_span("s3_merge") as span: + span.set_attribute("merge.period", merge_period) + total_merged = 0 + + try: + # List all objects with the prefix + objects = self.s3_client.list_objects( + bucket_name=self.bucket, prefix=self.prefix, recursive=True + ) + + # Group objects by directory and time period + from collections import defaultdict + + dir_groups = defaultdict(lambda: defaultdict(list)) + + for obj in objects: + if not obj.object_name.endswith(".parquet"): + continue + if obj.object_name.startswith( + f"{self.prefix}/merged_" if self.prefix else "merged_" + ): + # Skip already merged files + continue + + # Get directory path + obj_dir = "/".join(obj.object_name.split("/")[:-1]) + filename = obj.object_name.split("/")[-1] + + # Extract period from filename + period_key = self._extract_period_from_filename(filename, merge_period) + if period_key: + dir_groups[obj_dir][period_key].append(obj) + + # Merge files in each directory/period group + for obj_dir, period_groups in dir_groups.items(): + for period_key, objects_list in period_groups.items(): + if len(objects_list) < self.merge_min_files: + logger.debug( + "Skipping merge for %s/%s: only %d files (min: %d)", + obj_dir, + period_key, + len(objects_list), + self.merge_min_files, + ) + continue + + # Merge files in this period + merged = self._merge_s3_file_group(obj_dir, period_key, objects_list) + total_merged += merged + + span.set_attribute("merge.files_merged", total_merged) + logger.info("S3 merge completed: %d files merged", total_merged) + return total_merged + + except S3Error as e: + logger.error("S3 error during merge: %s", e, exc_info=True) + raise + except Exception as e: + logger.error("Error during S3 file merge: %s", e, exc_info=True) + raise + + def _extract_period_from_filename(self, filename: str, period: str) -> Optional[str]: + """Extract time period key from filename.""" + try: + # Filename format: data_YYYYMMDD_HHMMSS_*.parquet + parts = filename.split("_") + if len(parts) < 3 or not filename.startswith("data_"): + return None + + date_str = parts[1] # YYYYMMDD + time_str = parts[2] # HHMMSS + + year = date_str[0:4] + month = date_str[4:6] + day = date_str[6:8] + hour = time_str[0:2] + + if period == "hour": + return f"{year}{month}{day}_{hour}" + elif period == "day": + return f"{year}{month}{day}" + elif period == "week": + from datetime import datetime + + dt = datetime(int(year), int(month), int(day)) + week = dt.isocalendar()[1] + return f"{year}W{week:02d}" + elif period == "month": + return f"{year}{month}" + else: + return f"{year}{month}{day}" + + except (ValueError, IndexError) as e: + logger.warning("Could not parse timestamp from filename %s: %s", filename, e) + return None + + def _merge_s3_file_group(self, obj_dir: str, period_key: str, objects_list: list) -> int: + """Merge a group of S3 objects into a single consolidated file.""" + try: + logger.info( + "Merging %d S3 objects for period %s in %s", len(objects_list), period_key, obj_dir + ) + + # Download and read all objects (avoid dictionary encoding for easier merging) + tables = [] + for obj in objects_list: + try: + # Download object to memory + response = self.s3_client.get_object( + bucket_name=self.bucket, object_name=obj.object_name + ) + data = response.read() + response.close() + response.release_conn() + + # Read as parquet + buffer = BytesIO(data) + parquet_file = pq.ParquetFile(buffer) + table = parquet_file.read(use_pandas_metadata=False) + + # Convert any dictionary-encoded columns to regular columns + columns = [] + for i, field in enumerate(table.schema): + column = table.column(i) + if pa.types.is_dictionary(field.type): + columns.append(column.dictionary_decode()) + else: + columns.append(column) + + # Rebuild table without dictionary encoding + if columns: + schema = pa.schema( + [ + pa.field( + field.name, + ( + field.type.value_type + if pa.types.is_dictionary(field.type) + else field.type + ), + ) + for field in table.schema + ] + ) + table = pa.Table.from_arrays(columns, schema=schema) + + tables.append(table) + + except Exception as e: + logger.error("Failed to read S3 object %s: %s", obj.object_name, e) + continue + + if not tables: + logger.warning("No tables to merge for period %s", period_key) + return 0 + + # Concatenate all tables + merged_table = pa.concat_tables(tables) + + # Generate merged object name + merged_name = f"merged_{period_key}.parquet" + merged_key = f"{obj_dir}/{merged_name}" if obj_dir else merged_name + + # Write merged file to buffer (without forcing dictionary encoding) + buffer = BytesIO() + pq.write_table( + merged_table, + buffer, + compression=self.compression, + use_dictionary=False, # Don't use dictionary encoding for merged files + version="2.6", + ) + + # Upload merged file + buffer.seek(0) + data_length = len(buffer.getvalue()) + self.s3_client.put_object( + bucket_name=self.bucket, + object_name=merged_key, + data=buffer, + length=data_length, + content_type="application/octet-stream", + ) + + logger.info( + "Created merged S3 object: s3://%s/%s (%d rows, %d bytes)", + self.bucket, + merged_key, + merged_table.num_rows, + data_length, + ) + + # Delete original objects + for obj in objects_list: + try: + self.s3_client.remove_object( + bucket_name=self.bucket, object_name=obj.object_name + ) + logger.debug("Deleted original S3 object: %s", obj.object_name) + except Exception as e: + logger.error("Failed to delete S3 object %s: %s", obj.object_name, e) + + return len(objects_list) + + except Exception as e: + logger.error("Error merging S3 files for period %s: %s", period_key, e, exc_info=True) + return 0 + def close(self) -> None: """Close the S3 connection.""" with tracer.start_as_current_span("s3_close"): diff --git a/src/fs_data_sink/types.py b/src/fs_data_sink/types.py index e2f1485..270ca7d 100644 --- a/src/fs_data_sink/types.py +++ b/src/fs_data_sink/types.py @@ -53,6 +53,19 @@ def write_batch( def flush(self) -> None: """Flush any buffered data to the sink.""" + @abstractmethod + def merge_files(self, period: Optional[str] = None) -> int: + """ + Merge small Parquet files into larger consolidated files. + + Args: + period: Time period for grouping files ('hour', 'day', 'week', 'month') + If None, uses the sink's configured merge_period + + Returns: + Number of files merged + """ + @abstractmethod def close(self) -> None: """Close the connection to the data sink.""" diff --git a/tests/unit/test_file_merge.py b/tests/unit/test_file_merge.py new file mode 100644 index 0000000..6149b90 --- /dev/null +++ b/tests/unit/test_file_merge.py @@ -0,0 +1,341 @@ +"""Tests for file merging functionality.""" + +import tempfile +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pyarrow as pa +import pyarrow.parquet as pq + +from fs_data_sink.sinks import LocalSink, S3Sink + + +def test_local_sink_merge_disabled_by_default(): + """Test that merge is disabled by default.""" + with tempfile.TemporaryDirectory() as tmpdir: + sink = LocalSink(base_path=tmpdir) + sink.connect() + + # Write and flush some batches + for i in range(3): + batch = pa.table({"id": [i], "value": [f"test_{i}"]}).to_batches()[0] + sink.write_batch(batch) + + sink.flush() + + # Call merge - should return 0 (disabled) + merged_count = sink.merge_files() + assert merged_count == 0 + + # All original files should still exist + files = list(Path(tmpdir).glob("data_*.parquet")) + assert len(files) == 1 + + sink.close() + + +def test_local_sink_merge_by_hour(): + """Test merging files by hour.""" + with tempfile.TemporaryDirectory() as tmpdir: + sink = LocalSink( + base_path=tmpdir, + merge_enabled=True, + merge_period="hour", + merge_min_files=2, + ) + sink.connect() + + # Write multiple batches (will create separate files) + for i in range(3): + batch = pa.table({"id": [i], "value": [f"test_{i}"]}).to_batches()[0] + sink.write_batch(batch) + sink.flush() + time.sleep(0.01) # Small delay to ensure different timestamps + + # Should have 3 separate files + files_before = list(Path(tmpdir).glob("data_*.parquet")) + assert len(files_before) == 3 + + # Merge files + merged_count = sink.merge_files() + + # Should have merged all 3 files + assert merged_count == 3 + + # Should now have 1 merged file + merged_files = list(Path(tmpdir).glob("merged_*.parquet")) + assert len(merged_files) == 1 + + # Original files should be deleted + original_files = list(Path(tmpdir).glob("data_*.parquet")) + assert len(original_files) == 0 + + # Verify merged file contains all data + table = pq.read_table(merged_files[0]) + assert table.num_rows == 3 + + sink.close() + + +def test_local_sink_merge_by_day(): + """Test merging files by day.""" + with tempfile.TemporaryDirectory() as tmpdir: + sink = LocalSink( + base_path=tmpdir, + merge_enabled=True, + merge_period="day", + merge_min_files=2, + ) + sink.connect() + + # Write multiple batches + for i in range(3): + batch = pa.table({"id": [i], "value": [f"test_{i}"]}).to_batches()[0] + sink.write_batch(batch) + sink.flush() + + # Merge files + merged_count = sink.merge_files() + assert merged_count == 3 + + # Should have 1 merged file + merged_files = list(Path(tmpdir).glob("merged_*.parquet")) + assert len(merged_files) == 1 + + sink.close() + + +def test_local_sink_merge_min_files(): + """Test that merge respects minimum file count.""" + with tempfile.TemporaryDirectory() as tmpdir: + sink = LocalSink( + base_path=tmpdir, + merge_enabled=True, + merge_period="hour", + merge_min_files=5, # Require at least 5 files + ) + sink.connect() + + # Write only 3 batches + for i in range(3): + batch = pa.table({"id": [i], "value": [f"test_{i}"]}).to_batches()[0] + sink.write_batch(batch) + sink.flush() + + # Merge should not happen (only 3 files, need 5) + merged_count = sink.merge_files() + assert merged_count == 0 + + # Original files should still exist + original_files = list(Path(tmpdir).glob("data_*.parquet")) + assert len(original_files) == 3 + + # No merged files + merged_files = list(Path(tmpdir).glob("merged_*.parquet")) + assert len(merged_files) == 0 + + sink.close() + + +def test_local_sink_merge_on_flush(): + """Test automatic merge during flush.""" + with tempfile.TemporaryDirectory() as tmpdir: + sink = LocalSink( + base_path=tmpdir, + merge_enabled=True, + merge_period="hour", + merge_min_files=2, + merge_on_flush=True, + ) + sink.connect() + + # Write and flush batches multiple times to create separate files + for i in range(3): + batch = pa.table({"id": [i], "value": [f"test_{i}"]}).to_batches()[0] + sink.write_batch(batch) + sink.flush() # Each flush creates a separate file + + # After last flush with merge_on_flush=True, files should be merged + # Files should be merged + merged_files = list(Path(tmpdir).glob("merged_*.parquet")) + assert len(merged_files) == 1 + + sink.close() + + +def test_local_sink_merge_with_partitioning(): + """Test merging with partitioned data.""" + with tempfile.TemporaryDirectory() as tmpdir: + sink = LocalSink( + base_path=tmpdir, + partition_by=["date"], + merge_enabled=True, + merge_period="hour", + merge_min_files=2, + ) + sink.connect() + + schema = pa.schema([("id", pa.int64()), ("date", pa.string()), ("value", pa.string())]) + + # Write batches to same partition + for i in range(3): + batch = pa.table( + {"id": [i], "date": ["2024-01-01"], "value": [f"test_{i}"]}, schema=schema + ).to_batches()[0] + sink.write_batch(batch) + sink.flush() + + # Merge files + merged_count = sink.merge_files() + assert merged_count == 3 + + # Check merged file exists in partition directory + partition_dir = Path(tmpdir) / "date=2024-01-01" + assert partition_dir.exists() + + merged_files = list(partition_dir.glob("merged_*.parquet")) + assert len(merged_files) == 1 + + # Verify merged data (use ParquetFile to read single file, not as dataset) + pf = pq.ParquetFile(merged_files[0]) + table = pf.read() + assert table.num_rows == 3 + + sink.close() + + +def test_s3_sink_merge_disabled_by_default(): + """Test that S3 merge is disabled by default.""" + mock_s3_client = MagicMock() + + with patch("fs_data_sink.sinks.s3_sink.Minio", return_value=mock_s3_client): + sink = S3Sink( + bucket="test-bucket", + aws_access_key_id="test", + aws_secret_access_key="test", + ) + sink.connect() + + # Call merge - should return 0 (disabled) + merged_count = sink.merge_files() + assert merged_count == 0 + + sink.close() + + +def test_s3_sink_merge_enabled(): + """Test S3 merge with mock client.""" + mock_s3_client = MagicMock() + + # Mock list_objects to return some files + mock_obj1 = MagicMock() + mock_obj1.object_name = "data_20241113_100000_000001.parquet" + mock_obj2 = MagicMock() + mock_obj2.object_name = "data_20241113_100100_000002.parquet" + + mock_s3_client.list_objects.return_value = [mock_obj1, mock_obj2] + + # Mock get_object to return parquet data + def mock_get_object(bucket_name, object_name): + # Create a simple parquet file in memory + table = pa.table({"id": [1], "value": ["test"]}) + from io import BytesIO + + buffer = BytesIO() + pq.write_table(table, buffer) + buffer.seek(0) + + mock_response = MagicMock() + mock_response.read.return_value = buffer.getvalue() + return mock_response + + mock_s3_client.get_object.side_effect = mock_get_object + + with patch("fs_data_sink.sinks.s3_sink.Minio", return_value=mock_s3_client): + sink = S3Sink( + bucket="test-bucket", + aws_access_key_id="test", + aws_secret_access_key="test", + merge_enabled=True, + merge_period="hour", + merge_min_files=2, + ) + sink.connect() + + # Call merge + merged_count = sink.merge_files() + + # Should have merged 2 files + assert merged_count == 2 + + # Verify put_object was called for merged file + assert mock_s3_client.put_object.call_count == 1 + + # Verify remove_object was called for original files + assert mock_s3_client.remove_object.call_count == 2 + + sink.close() + + +def test_local_sink_merge_skips_already_merged_files(): + """Test that merge skips already merged files.""" + with tempfile.TemporaryDirectory() as tmpdir: + sink = LocalSink( + base_path=tmpdir, + merge_enabled=True, + merge_period="hour", + merge_min_files=2, + ) + sink.connect() + + # Write and merge files + for i in range(3): + batch = pa.table({"id": [i], "value": [f"test_{i}"]}).to_batches()[0] + sink.write_batch(batch) + sink.flush() + + # First merge + merged_count = sink.merge_files() + assert merged_count == 3 + + # Try to merge again - should find no files to merge + merged_count2 = sink.merge_files() + assert merged_count2 == 0 + + # Still only 1 merged file + merged_files = list(Path(tmpdir).glob("merged_*.parquet")) + assert len(merged_files) == 1 + + sink.close() + + +def test_merge_config_loading(): + """Test that merge configuration is loaded correctly.""" + from fs_data_sink.config.settings import SinkConfig + + config = SinkConfig( + type="local", + base_path="/tmp/test", + merge_enabled=True, + merge_period="day", + merge_min_files=5, + merge_on_flush=True, + ) + + assert config.merge_enabled is True + assert config.merge_period == "day" + assert config.merge_min_files == 5 + assert config.merge_on_flush is True + + +def test_merge_config_defaults(): + """Test default merge configuration values.""" + from fs_data_sink.config.settings import SinkConfig + + config = SinkConfig(type="local", base_path="/tmp/test") + + assert config.merge_enabled is False + assert config.merge_period == "hour" + assert config.merge_min_files == 2 + assert config.merge_on_flush is False diff --git a/tests/unit/test_types.py b/tests/unit/test_types.py index 4f1dfd3..aac49af 100644 --- a/tests/unit/test_types.py +++ b/tests/unit/test_types.py @@ -49,6 +49,9 @@ def write_batch(self, batch, partition_cols=None): def flush(self): pass + def merge_files(self, period=None): + return 0 + def close(self): self.connected = False