From 513ea1ff6a5dee0a5ba9ba015e53d649f690697c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 13 Nov 2025 06:26:30 +0000 Subject: [PATCH 1/5] Initial plan From 8d9d3364d8281abff92bdeff966a234bdeecfcaa Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 13 Nov 2025 06:42:43 +0000 Subject: [PATCH 2/5] Implement periodic Parquet file merging feature - Add merge configuration to SinkConfig (merge_enabled, merge_period, merge_min_files, merge_on_flush) - Add merge_files() method to DataSink interface - Implement merge_files() in LocalSink, S3Sink, and HDFSSink - Handle dictionary encoding issues when merging Parquet files - Add comprehensive tests for merge functionality - Update documentation with merge configuration examples - Add environment variable support for merge settings Co-authored-by: slhmy <31381093+slhmy@users.noreply.github.com> --- README.md | 67 ++++ config/example-kafka-to-s3-with-merge.yaml | 41 +++ src/fs_data_sink/config/settings.py | 22 ++ src/fs_data_sink/pipeline.py | 12 + src/fs_data_sink/sinks/hdfs_sink.py | 234 ++++++++++++++ src/fs_data_sink/sinks/local_sink.py | 213 +++++++++++++ src/fs_data_sink/sinks/s3_sink.py | 234 ++++++++++++++ src/fs_data_sink/types.py | 13 + tests/unit/test_file_merge.py | 341 +++++++++++++++++++++ tests/unit/test_types.py | 3 + 10 files changed, 1180 insertions(+) create mode 100644 config/example-kafka-to-s3-with-merge.yaml create mode 100644 tests/unit/test_file_merge.py diff --git a/README.md b/README.md index fb055f5..7870072 100644 --- a/README.md +++ b/README.md @@ -327,6 +327,73 @@ Environment variables: - `PIPELINE_FLUSH_INTERVAL_SECONDS`: Flush interval in seconds - `PIPELINE_FLUSH_INTERVAL_BATCHES`: Flush interval in batches +### File Merging Configuration + +The sink can automatically merge small Parquet files into larger consolidated files based on configured time periods (hourly, daily, weekly, monthly). This reduces the number of small files and improves query performance. + +```yaml +sink: + type: s3 # or hdfs, local + bucket: my-bucket + prefix: raw-data + compression: snappy + partition_by: + - date + # File merging configuration + merge_enabled: true # Enable automatic file merging + merge_period: hour # Time period for grouping files: 'hour', 'day', 'week', 'month' + merge_min_files: 2 # Minimum number of files required to trigger a merge + merge_on_flush: false # Whether to merge files during flush operations +``` + +**Merge Behavior:** +- Files are grouped by their timestamp (extracted from filename) and configured period +- Only groups with at least `merge_min_files` files will be merged +- Merged files are named with the pattern `merged_{period_key}.parquet` (e.g., `merged_20241113_06.parquet` for hourly) +- Original files are deleted after successful merge +- Already merged files are skipped in subsequent merge operations + +**Merge Periods:** +- **hour**: Merges files created within the same hour (e.g., all files from 2024-11-13 06:00-06:59) +- **day**: Merges files created within the same day (e.g., all files from 2024-11-13) +- **week**: Merges files created within the same ISO week (e.g., week 46 of 2024) +- **month**: Merges files created within the same month (e.g., November 2024) + +**Merge Triggers:** +- **Manual**: Call `sink.merge_files()` explicitly +- **On flush**: Set `merge_on_flush: true` to merge after each flush operation +- **Scheduled**: Run merge as a separate scheduled job/cron + +Example configurations: + +```yaml +# Hourly merge (merge files from each hour) +sink: + merge_enabled: true + merge_period: hour + merge_min_files: 5 + merge_on_flush: false + +# Daily merge (merge files from each day) +sink: + merge_enabled: true + merge_period: day + merge_min_files: 10 + merge_on_flush: true +``` + +**Benefits:** +- Reduces the number of small files (Small File Problem) +- Improves query performance in analytics databases +- Better resource utilization in distributed systems (Spark, Presto, Hive) +- Reduced metadata overhead + +Environment variables: +- `SINK_MERGE_ENABLED`: Enable file merging (true/false) +- `SINK_MERGE_PERIOD`: Merge period (hour/day/week/month) +- `SINK_MERGE_MIN_FILES`: Minimum files to trigger merge +- `SINK_MERGE_ON_FLUSH`: Merge on flush (true/false) + ## Data Formats ### JSON Format diff --git a/config/example-kafka-to-s3-with-merge.yaml b/config/example-kafka-to-s3-with-merge.yaml new file mode 100644 index 0000000..e27ac7c --- /dev/null +++ b/config/example-kafka-to-s3-with-merge.yaml @@ -0,0 +1,41 @@ +# Example configuration: Kafka to S3 with automatic file merging +source: + type: kafka + bootstrap_servers: + - localhost:9092 + topics: + - data-topic + group_id: fs-data-sink-group + value_format: json # or arrow_ipc + batch_size: 1000 + +sink: + type: s3 + bucket: my-data-bucket + prefix: raw-data + region_name: us-east-1 + compression: snappy # snappy, gzip, brotli, zstd, none + partition_by: + - date + - hour + # File merging configuration + merge_enabled: true # Enable automatic file merging + merge_period: hour # Merge files by hour (hour, day, week, month) + merge_min_files: 5 # Require at least 5 files to trigger merge + merge_on_flush: false # Don't merge on every flush (merge separately) + +telemetry: + log_level: INFO + log_format: json # json or text + enabled: true + service_name: fs-data-sink + otlp_endpoint: http://localhost:4317 + trace_enabled: true + metrics_enabled: true + +pipeline: + max_batches: null # null for unlimited + batch_timeout_seconds: 30 + error_handling: log # log, raise, or ignore + flush_interval_seconds: 300 # Flush every 5 minutes + flush_interval_batches: 100 # Or flush after 100 batches diff --git a/src/fs_data_sink/config/settings.py b/src/fs_data_sink/config/settings.py index 39fd9d8..e01e0eb 100644 --- a/src/fs_data_sink/config/settings.py +++ b/src/fs_data_sink/config/settings.py @@ -61,6 +61,12 @@ class SinkConfig: compression: str = "snappy" partition_by: Optional[list[str]] = None extra_config: dict = field(default_factory=dict) + + # Merge configuration + merge_enabled: bool = False + merge_period: str = "hour" # 'hour', 'day', 'week', 'month' + merge_min_files: int = 2 # Minimum files to trigger merge + merge_on_flush: bool = False # Merge during flush operations @dataclass @@ -144,6 +150,12 @@ def _load_ini_file(config_path: str) -> dict: # Handle list values if key in ("partition_by",): config_data["sink"][key] = [v.strip() for v in value.split(",") if v.strip()] + # Handle integer values + elif key in ("merge_min_files",): + config_data["sink"][key] = config.getint("sink", key) + # Handle boolean values + elif key in ("merge_enabled", "merge_on_flush"): + config_data["sink"][key] = config.getboolean("sink", key) else: config_data["sink"][key] = value @@ -272,6 +284,16 @@ def _apply_env_overrides(config_data: dict) -> None: if os.getenv("HDFS_USER"): sink["user"] = os.getenv("HDFS_USER") + # Merge configuration + if os.getenv("SINK_MERGE_ENABLED"): + sink["merge_enabled"] = os.getenv("SINK_MERGE_ENABLED").lower() in ("true", "1", "yes") + if os.getenv("SINK_MERGE_PERIOD"): + sink["merge_period"] = os.getenv("SINK_MERGE_PERIOD") + if os.getenv("SINK_MERGE_MIN_FILES"): + sink["merge_min_files"] = int(os.getenv("SINK_MERGE_MIN_FILES")) + if os.getenv("SINK_MERGE_ON_FLUSH"): + sink["merge_on_flush"] = os.getenv("SINK_MERGE_ON_FLUSH").lower() in ("true", "1", "yes") + # Telemetry overrides telemetry = config_data.setdefault("telemetry", {}) diff --git a/src/fs_data_sink/pipeline.py b/src/fs_data_sink/pipeline.py index 5b9fb4d..3abead8 100644 --- a/src/fs_data_sink/pipeline.py +++ b/src/fs_data_sink/pipeline.py @@ -100,6 +100,10 @@ def _create_sink(self) -> DataSink: compression=sink_config.compression, partition_by=sink_config.partition_by, s3_config=sink_config.extra_config, + merge_enabled=sink_config.merge_enabled, + merge_period=sink_config.merge_period, + merge_min_files=sink_config.merge_min_files, + merge_on_flush=sink_config.merge_on_flush, ) if sink_config.type == "hdfs": @@ -113,6 +117,10 @@ def _create_sink(self) -> DataSink: compression=sink_config.compression, partition_by=sink_config.partition_by, hdfs_config=sink_config.extra_config, + merge_enabled=sink_config.merge_enabled, + merge_period=sink_config.merge_period, + merge_min_files=sink_config.merge_min_files, + merge_on_flush=sink_config.merge_on_flush, ) if sink_config.type == "local": @@ -123,6 +131,10 @@ def _create_sink(self) -> DataSink: base_path=sink_config.base_path, compression=sink_config.compression, partition_by=sink_config.partition_by, + merge_enabled=sink_config.merge_enabled, + merge_period=sink_config.merge_period, + merge_min_files=sink_config.merge_min_files, + merge_on_flush=sink_config.merge_on_flush, ) raise ValueError(f"Unsupported sink type: {sink_config.type}") diff --git a/src/fs_data_sink/sinks/hdfs_sink.py b/src/fs_data_sink/sinks/hdfs_sink.py index 432165f..69dbf8c 100644 --- a/src/fs_data_sink/sinks/hdfs_sink.py +++ b/src/fs_data_sink/sinks/hdfs_sink.py @@ -32,6 +32,10 @@ def __init__( compression: str = "snappy", partition_by: Optional[list[str]] = None, hdfs_config: Optional[dict] = None, + merge_enabled: bool = False, + merge_period: str = "hour", + merge_min_files: int = 2, + merge_on_flush: bool = False, ): """ Initialize HDFS sink. @@ -43,6 +47,10 @@ def __init__( compression: Compression codec for Parquet ('snappy', 'gzip', 'brotli', 'zstd', 'none') partition_by: List of column names to partition by hdfs_config: Additional HDFS client configuration + merge_enabled: Enable automatic file merging + merge_period: Period for merging files ('hour', 'day', 'week', 'month') + merge_min_files: Minimum number of files to trigger a merge + merge_on_flush: Merge files during flush operations """ self.url = url self.base_path = base_path.rstrip("/") @@ -50,6 +58,10 @@ def __init__( self.compression = compression self.partition_by = partition_by or [] self.hdfs_config = hdfs_config or {} + self.merge_enabled = merge_enabled + self.merge_period = merge_period + self.merge_min_files = merge_min_files + self.merge_on_flush = merge_on_flush self.client = None self.file_counter = 0 self.buffered_batches: list[pa.RecordBatch] = [] @@ -173,10 +185,232 @@ def flush(self) -> None: # Clear the buffer self.buffered_batches.clear() + # Optionally merge files after flush + if self.merge_enabled and self.merge_on_flush: + logger.info("Merging files after flush") + self.merge_files() + except Exception as e: logger.error("Error flushing batches to HDFS: %s", e, exc_info=True) raise + def merge_files(self, period: Optional[str] = None) -> int: + """ + Merge small Parquet files into larger consolidated files by time period. + + Args: + period: Time period for grouping files ('hour', 'day', 'week', 'month') + If None, uses the sink's configured merge_period + + Returns: + Number of files merged + """ + if not self.merge_enabled: + logger.debug("File merging is disabled") + return 0 + + if not self.client: + raise RuntimeError("Not connected. Call connect() first.") + + merge_period = period or self.merge_period + logger.info("Starting HDFS file merge with period: %s", merge_period) + + with tracer.start_as_current_span("hdfs_merge") as span: + span.set_attribute("merge.period", merge_period) + total_merged = 0 + + try: + # Find all directories to process + dirs_to_process = [self.base_path] + + # If partitioned, include partition directories + if self.partition_by: + for item in self.client.list(self.base_path, status=False): + item_path = f"{self.base_path}/{item}" + if self.client.status(item_path)["type"] == "DIRECTORY": + dirs_to_process.append(item_path) + + for directory in dirs_to_process: + try: + # Group files by time period + file_groups = self._group_hdfs_files_by_period(directory, merge_period) + + for period_key, files in file_groups.items(): + if len(files) < self.merge_min_files: + logger.debug( + "Skipping merge for period %s: only %d files (min: %d)", + period_key, + len(files), + self.merge_min_files, + ) + continue + + # Merge files in this period + merged = self._merge_hdfs_file_group(directory, period_key, files) + total_merged += merged + + except Exception as e: + logger.error("Error processing directory %s: %s", directory, e) + continue + + span.set_attribute("merge.files_merged", total_merged) + logger.info("HDFS merge completed: %d files merged", total_merged) + return total_merged + + except Exception as e: + logger.error("Error during HDFS file merge: %s", e, exc_info=True) + raise + + def _group_hdfs_files_by_period( + self, directory: str, period: str + ) -> dict[str, list[str]]: + """Group HDFS Parquet files by time period based on filename timestamp.""" + from collections import defaultdict + + groups = defaultdict(list) + + try: + # List all files in the directory + files = self.client.list(directory, status=False) + + for filename in files: + if not filename.endswith(".parquet") or not filename.startswith("data_"): + continue + + file_path = f"{directory}/{filename}" + + # Extract period from filename + period_key = self._extract_period_from_filename(filename, period) + if period_key: + groups[period_key].append(file_path) + + except Exception as e: + logger.error("Error listing HDFS directory %s: %s", directory, e) + + return groups + + def _extract_period_from_filename(self, filename: str, period: str) -> Optional[str]: + """Extract time period key from filename.""" + try: + # Filename format: data_YYYYMMDD_HHMMSS_*.parquet + parts = filename.split("_") + if len(parts) < 3 or not filename.startswith("data_"): + return None + + date_str = parts[1] # YYYYMMDD + time_str = parts[2] # HHMMSS + + year = date_str[0:4] + month = date_str[4:6] + day = date_str[6:8] + hour = time_str[0:2] + + if period == "hour": + return f"{year}{month}{day}_{hour}" + elif period == "day": + return f"{year}{month}{day}" + elif period == "week": + from datetime import datetime + dt = datetime(int(year), int(month), int(day)) + week = dt.isocalendar()[1] + return f"{year}W{week:02d}" + elif period == "month": + return f"{year}{month}" + else: + return f"{year}{month}{day}" + + except (ValueError, IndexError) as e: + logger.warning("Could not parse timestamp from filename %s: %s", filename, e) + return None + + def _merge_hdfs_file_group( + self, directory: str, period_key: str, files: list[str] + ) -> int: + """Merge a group of HDFS files into a single consolidated file.""" + try: + logger.info("Merging %d HDFS files for period %s in %s", len(files), period_key, directory) + + # Read all files (avoid dictionary encoding for easier merging) + tables = [] + for file_path in files: + try: + with self.client.read(file_path) as reader: + data = reader.read() + buffer = BytesIO(data) + parquet_file = pq.ParquetFile(buffer) + table = parquet_file.read(use_pandas_metadata=False) + + # Convert any dictionary-encoded columns to regular columns + columns = [] + for i, field in enumerate(table.schema): + column = table.column(i) + if pa.types.is_dictionary(field.type): + columns.append(column.dictionary_decode()) + else: + columns.append(column) + + # Rebuild table without dictionary encoding + if columns: + schema = pa.schema([ + pa.field( + field.name, + field.type.value_type if pa.types.is_dictionary(field.type) else field.type + ) + for field in table.schema + ]) + table = pa.Table.from_arrays(columns, schema=schema) + + tables.append(table) + except Exception as e: + logger.error("Failed to read HDFS file %s: %s", file_path, e) + continue + + if not tables: + logger.warning("No tables to merge for period %s", period_key) + return 0 + + # Concatenate all tables + merged_table = pa.concat_tables(tables) + + # Generate merged filename + merged_filename = f"merged_{period_key}.parquet" + merged_path = f"{directory}/{merged_filename}" + + # Write merged file (without forcing dictionary encoding) + buffer = BytesIO() + pq.write_table( + merged_table, + buffer, + compression=self.compression, + use_dictionary=False, # Don't use dictionary encoding for merged files + version="2.6", + ) + + buffer.seek(0) + with self.client.write(merged_path, overwrite=False) as writer: + writer.write(buffer.getvalue()) + + logger.info( + "Created merged HDFS file: %s (%d rows, %d bytes)", + merged_path, + merged_table.num_rows, + len(buffer.getvalue()), + ) + + # Delete original files + for file_path in files: + try: + self.client.delete(file_path) + logger.debug("Deleted original HDFS file: %s", file_path) + except Exception as e: + logger.error("Failed to delete HDFS file %s: %s", file_path, e) + + return len(files) + + except Exception as e: + logger.error("Error merging HDFS files for period %s: %s", period_key, e, exc_info=True) + return 0 + def close(self) -> None: """Close the HDFS connection.""" with tracer.start_as_current_span("hdfs_close"): diff --git a/src/fs_data_sink/sinks/local_sink.py b/src/fs_data_sink/sinks/local_sink.py index ae16daa..9c49703 100644 --- a/src/fs_data_sink/sinks/local_sink.py +++ b/src/fs_data_sink/sinks/local_sink.py @@ -28,6 +28,10 @@ def __init__( base_path: str, compression: str = "snappy", partition_by: Optional[list[str]] = None, + merge_enabled: bool = False, + merge_period: str = "hour", + merge_min_files: int = 2, + merge_on_flush: bool = False, ): """ Initialize Local sink. @@ -36,10 +40,18 @@ def __init__( base_path: Base directory path for writing files compression: Compression codec for Parquet ('snappy', 'gzip', 'brotli', 'zstd', 'none') partition_by: List of column names to partition by + merge_enabled: Enable automatic file merging + merge_period: Period for merging files ('hour', 'day', 'week', 'month') + merge_min_files: Minimum number of files to trigger a merge + merge_on_flush: Merge files during flush operations """ self.base_path = Path(base_path).resolve() self.compression = compression self.partition_by = partition_by or [] + self.merge_enabled = merge_enabled + self.merge_period = merge_period + self.merge_min_files = merge_min_files + self.merge_on_flush = merge_on_flush self.file_counter = 0 self.buffered_batches: list[pa.RecordBatch] = [] @@ -136,10 +148,211 @@ def flush(self) -> None: # Clear the buffer self.buffered_batches.clear() + # Optionally merge files after flush + if self.merge_enabled and self.merge_on_flush: + logger.info("Merging files after flush") + self.merge_files() + except Exception as e: logger.error("Error flushing batches to local filesystem: %s", e, exc_info=True) raise + def merge_files(self, period: Optional[str] = None) -> int: + """ + Merge small Parquet files into larger consolidated files by time period. + + Args: + period: Time period for grouping files ('hour', 'day', 'week', 'month') + If None, uses the sink's configured merge_period + + Returns: + Number of files merged + """ + if not self.merge_enabled: + logger.debug("File merging is disabled") + return 0 + + merge_period = period or self.merge_period + logger.info("Starting file merge with period: %s", merge_period) + + with tracer.start_as_current_span("local_merge") as span: + span.set_attribute("merge.period", merge_period) + total_merged = 0 + + try: + # Find all directories to process (including partitioned directories) + dirs_to_process = [self.base_path] + + # If partitioned, include partition directories + if self.partition_by: + for root, dirs, _ in self.base_path.walk(): + for d in dirs: + dirs_to_process.append(root / d) + + for directory in dirs_to_process: + if not directory.exists(): + continue + + # Group files by time period + file_groups = self._group_files_by_period(directory, merge_period) + + for period_key, files in file_groups.items(): + if len(files) < self.merge_min_files: + logger.debug( + "Skipping merge for period %s: only %d files (min: %d)", + period_key, + len(files), + self.merge_min_files, + ) + continue + + # Merge files in this period + merged = self._merge_file_group(directory, period_key, files) + total_merged += merged + + span.set_attribute("merge.files_merged", total_merged) + logger.info("Merge completed: %d files merged", total_merged) + return total_merged + + except Exception as e: + logger.error("Error during file merge: %s", e, exc_info=True) + raise + + def _group_files_by_period( + self, directory: Path, period: str + ) -> dict[str, list[Path]]: + """Group Parquet files by time period based on filename timestamp.""" + from collections import defaultdict + + groups = defaultdict(list) + + # Find all parquet files in the directory + parquet_files = list(directory.glob("data_*.parquet")) + + for file_path in parquet_files: + try: + # Extract timestamp from filename (format: data_YYYYMMDD_HHMMSS_*.parquet) + filename = file_path.name + parts = filename.split("_") + if len(parts) < 3: + continue + + date_str = parts[1] # YYYYMMDD + time_str = parts[2] # HHMMSS + + # Parse timestamp + year = date_str[0:4] + month = date_str[4:6] + day = date_str[6:8] + hour = time_str[0:2] + + # Group by period + if period == "hour": + group_key = f"{year}{month}{day}_{hour}" + elif period == "day": + group_key = f"{year}{month}{day}" + elif period == "week": + # Calculate week number + from datetime import datetime + dt = datetime(int(year), int(month), int(day)) + week = dt.isocalendar()[1] + group_key = f"{year}W{week:02d}" + elif period == "month": + group_key = f"{year}{month}" + else: + logger.warning("Unknown merge period: %s, using 'day'", period) + group_key = f"{year}{month}{day}" + + groups[group_key].append(file_path) + + except (ValueError, IndexError) as e: + logger.warning("Could not parse timestamp from file %s: %s", file_path, e) + continue + + return groups + + def _merge_file_group( + self, directory: Path, period_key: str, files: list[Path] + ) -> int: + """Merge a group of files into a single consolidated file.""" + try: + logger.info("Merging %d files for period %s", len(files), period_key) + + # Read all files (avoid dictionary encoding for easier merging) + tables = [] + for file_path in files: + try: + # Read table without dictionary encoding to avoid schema conflicts + parquet_file = pq.ParquetFile(file_path) + table = parquet_file.read(use_pandas_metadata=False) + + # Convert any dictionary-encoded columns to regular columns + columns = [] + for i, field in enumerate(table.schema): + column = table.column(i) + if pa.types.is_dictionary(field.type): + columns.append(column.dictionary_decode()) + else: + columns.append(column) + + # Rebuild table without dictionary encoding + if columns: + schema = pa.schema([ + pa.field( + field.name, + field.type.value_type if pa.types.is_dictionary(field.type) else field.type + ) + for field in table.schema + ]) + table = pa.Table.from_arrays(columns, schema=schema) + + tables.append(table) + except Exception as e: + logger.error("Failed to read file %s: %s", file_path, e) + continue + + if not tables: + logger.warning("No tables to merge for period %s", period_key) + return 0 + + # Concatenate all tables + merged_table = pa.concat_tables(tables) + + # Generate merged filename + merged_filename = f"merged_{period_key}.parquet" + merged_path = directory / merged_filename + + # Write merged file (without forcing dictionary encoding) + pq.write_table( + merged_table, + merged_path, + compression=self.compression, + use_dictionary=False, # Don't use dictionary encoding for merged files + version="2.6", + ) + + merged_size = merged_path.stat().st_size + logger.info( + "Created merged file: %s (%d rows, %d bytes)", + merged_path, + merged_table.num_rows, + merged_size, + ) + + # Delete original files + for file_path in files: + try: + file_path.unlink() + logger.debug("Deleted original file: %s", file_path) + except Exception as e: + logger.error("Failed to delete file %s: %s", file_path, e) + + return len(files) + + except Exception as e: + logger.error("Error merging files for period %s: %s", period_key, e, exc_info=True) + return 0 + def close(self) -> None: """Close the connection.""" logger.info("Closing local sink connection") diff --git a/src/fs_data_sink/sinks/s3_sink.py b/src/fs_data_sink/sinks/s3_sink.py index 2d595f3..5e9717d 100644 --- a/src/fs_data_sink/sinks/s3_sink.py +++ b/src/fs_data_sink/sinks/s3_sink.py @@ -38,6 +38,10 @@ def __init__( partition_by: Optional[list[str]] = None, s3_config: Optional[dict] = None, secure: bool = True, + merge_enabled: bool = False, + merge_period: str = "hour", + merge_min_files: int = 2, + merge_on_flush: bool = False, ): """ Initialize S3 sink with MinIO client. @@ -53,6 +57,10 @@ def __init__( partition_by: List of column names to partition by s3_config: Additional MinIO client configuration secure: Use HTTPS for connections (default: True) + merge_enabled: Enable automatic file merging + merge_period: Period for merging files ('hour', 'day', 'week', 'month') + merge_min_files: Minimum number of files to trigger a merge + merge_on_flush: Merge files during flush operations """ self.bucket = bucket self.prefix = prefix.rstrip("/") @@ -64,6 +72,10 @@ def __init__( self.partition_by = partition_by or [] self.s3_config = s3_config or {} self.secure = secure + self.merge_enabled = merge_enabled + self.merge_period = merge_period + self.merge_min_files = merge_min_files + self.merge_on_flush = merge_on_flush self.s3_client = None self.file_counter = 0 self.buffered_batches: list[pa.RecordBatch] = [] @@ -211,6 +223,11 @@ def flush(self) -> None: # Clear the buffer self.buffered_batches.clear() + # Optionally merge files after flush + if self.merge_enabled and self.merge_on_flush: + logger.info("Merging files after flush") + self.merge_files() + except S3Error as e: logger.error("S3 error flushing batches: %s", e, exc_info=True) raise @@ -218,6 +235,223 @@ def flush(self) -> None: logger.error("Unexpected error flushing batches to S3: %s", e, exc_info=True) raise + def merge_files(self, period: Optional[str] = None) -> int: + """ + Merge small Parquet files into larger consolidated files by time period. + + Args: + period: Time period for grouping files ('hour', 'day', 'week', 'month') + If None, uses the sink's configured merge_period + + Returns: + Number of files merged + """ + if not self.merge_enabled: + logger.debug("File merging is disabled") + return 0 + + if not self.s3_client: + raise RuntimeError("Not connected. Call connect() first.") + + merge_period = period or self.merge_period + logger.info("Starting S3 file merge with period: %s", merge_period) + + with tracer.start_as_current_span("s3_merge") as span: + span.set_attribute("merge.period", merge_period) + total_merged = 0 + + try: + # List all objects with the prefix + objects = self.s3_client.list_objects( + bucket_name=self.bucket, prefix=self.prefix, recursive=True + ) + + # Group objects by directory and time period + from collections import defaultdict + dir_groups = defaultdict(lambda: defaultdict(list)) + + for obj in objects: + if not obj.object_name.endswith(".parquet"): + continue + if obj.object_name.startswith(f"{self.prefix}/merged_" if self.prefix else "merged_"): + # Skip already merged files + continue + + # Get directory path + obj_dir = "/".join(obj.object_name.split("/")[:-1]) + filename = obj.object_name.split("/")[-1] + + # Extract period from filename + period_key = self._extract_period_from_filename(filename, merge_period) + if period_key: + dir_groups[obj_dir][period_key].append(obj) + + # Merge files in each directory/period group + for obj_dir, period_groups in dir_groups.items(): + for period_key, objects_list in period_groups.items(): + if len(objects_list) < self.merge_min_files: + logger.debug( + "Skipping merge for %s/%s: only %d files (min: %d)", + obj_dir, + period_key, + len(objects_list), + self.merge_min_files, + ) + continue + + # Merge files in this period + merged = self._merge_s3_file_group(obj_dir, period_key, objects_list) + total_merged += merged + + span.set_attribute("merge.files_merged", total_merged) + logger.info("S3 merge completed: %d files merged", total_merged) + return total_merged + + except S3Error as e: + logger.error("S3 error during merge: %s", e, exc_info=True) + raise + except Exception as e: + logger.error("Error during S3 file merge: %s", e, exc_info=True) + raise + + def _extract_period_from_filename(self, filename: str, period: str) -> Optional[str]: + """Extract time period key from filename.""" + try: + # Filename format: data_YYYYMMDD_HHMMSS_*.parquet + parts = filename.split("_") + if len(parts) < 3 or not filename.startswith("data_"): + return None + + date_str = parts[1] # YYYYMMDD + time_str = parts[2] # HHMMSS + + year = date_str[0:4] + month = date_str[4:6] + day = date_str[6:8] + hour = time_str[0:2] + + if period == "hour": + return f"{year}{month}{day}_{hour}" + elif period == "day": + return f"{year}{month}{day}" + elif period == "week": + from datetime import datetime + dt = datetime(int(year), int(month), int(day)) + week = dt.isocalendar()[1] + return f"{year}W{week:02d}" + elif period == "month": + return f"{year}{month}" + else: + return f"{year}{month}{day}" + + except (ValueError, IndexError) as e: + logger.warning("Could not parse timestamp from filename %s: %s", filename, e) + return None + + def _merge_s3_file_group(self, obj_dir: str, period_key: str, objects_list: list) -> int: + """Merge a group of S3 objects into a single consolidated file.""" + try: + logger.info("Merging %d S3 objects for period %s in %s", len(objects_list), period_key, obj_dir) + + # Download and read all objects (avoid dictionary encoding for easier merging) + tables = [] + for obj in objects_list: + try: + # Download object to memory + response = self.s3_client.get_object( + bucket_name=self.bucket, object_name=obj.object_name + ) + data = response.read() + response.close() + response.release_conn() + + # Read as parquet + buffer = BytesIO(data) + parquet_file = pq.ParquetFile(buffer) + table = parquet_file.read(use_pandas_metadata=False) + + # Convert any dictionary-encoded columns to regular columns + columns = [] + for i, field in enumerate(table.schema): + column = table.column(i) + if pa.types.is_dictionary(field.type): + columns.append(column.dictionary_decode()) + else: + columns.append(column) + + # Rebuild table without dictionary encoding + if columns: + schema = pa.schema([ + pa.field( + field.name, + field.type.value_type if pa.types.is_dictionary(field.type) else field.type + ) + for field in table.schema + ]) + table = pa.Table.from_arrays(columns, schema=schema) + + tables.append(table) + + except Exception as e: + logger.error("Failed to read S3 object %s: %s", obj.object_name, e) + continue + + if not tables: + logger.warning("No tables to merge for period %s", period_key) + return 0 + + # Concatenate all tables + merged_table = pa.concat_tables(tables) + + # Generate merged object name + merged_name = f"merged_{period_key}.parquet" + merged_key = f"{obj_dir}/{merged_name}" if obj_dir else merged_name + + # Write merged file to buffer (without forcing dictionary encoding) + buffer = BytesIO() + pq.write_table( + merged_table, + buffer, + compression=self.compression, + use_dictionary=False, # Don't use dictionary encoding for merged files + version="2.6", + ) + + # Upload merged file + buffer.seek(0) + data_length = len(buffer.getvalue()) + self.s3_client.put_object( + bucket_name=self.bucket, + object_name=merged_key, + data=buffer, + length=data_length, + content_type="application/octet-stream", + ) + + logger.info( + "Created merged S3 object: s3://%s/%s (%d rows, %d bytes)", + self.bucket, + merged_key, + merged_table.num_rows, + data_length, + ) + + # Delete original objects + for obj in objects_list: + try: + self.s3_client.remove_object( + bucket_name=self.bucket, object_name=obj.object_name + ) + logger.debug("Deleted original S3 object: %s", obj.object_name) + except Exception as e: + logger.error("Failed to delete S3 object %s: %s", obj.object_name, e) + + return len(objects_list) + + except Exception as e: + logger.error("Error merging S3 files for period %s: %s", period_key, e, exc_info=True) + return 0 + def close(self) -> None: """Close the S3 connection.""" with tracer.start_as_current_span("s3_close"): diff --git a/src/fs_data_sink/types.py b/src/fs_data_sink/types.py index e2f1485..a226f2b 100644 --- a/src/fs_data_sink/types.py +++ b/src/fs_data_sink/types.py @@ -53,6 +53,19 @@ def write_batch( def flush(self) -> None: """Flush any buffered data to the sink.""" + @abstractmethod + def merge_files(self, period: Optional[str] = None) -> int: + """ + Merge small Parquet files into larger consolidated files. + + Args: + period: Time period for grouping files ('hour', 'day', 'week', 'month') + If None, uses the sink's configured merge_period + + Returns: + Number of files merged + """ + @abstractmethod def close(self) -> None: """Close the connection to the data sink.""" diff --git a/tests/unit/test_file_merge.py b/tests/unit/test_file_merge.py new file mode 100644 index 0000000..6149b90 --- /dev/null +++ b/tests/unit/test_file_merge.py @@ -0,0 +1,341 @@ +"""Tests for file merging functionality.""" + +import tempfile +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pyarrow as pa +import pyarrow.parquet as pq + +from fs_data_sink.sinks import LocalSink, S3Sink + + +def test_local_sink_merge_disabled_by_default(): + """Test that merge is disabled by default.""" + with tempfile.TemporaryDirectory() as tmpdir: + sink = LocalSink(base_path=tmpdir) + sink.connect() + + # Write and flush some batches + for i in range(3): + batch = pa.table({"id": [i], "value": [f"test_{i}"]}).to_batches()[0] + sink.write_batch(batch) + + sink.flush() + + # Call merge - should return 0 (disabled) + merged_count = sink.merge_files() + assert merged_count == 0 + + # All original files should still exist + files = list(Path(tmpdir).glob("data_*.parquet")) + assert len(files) == 1 + + sink.close() + + +def test_local_sink_merge_by_hour(): + """Test merging files by hour.""" + with tempfile.TemporaryDirectory() as tmpdir: + sink = LocalSink( + base_path=tmpdir, + merge_enabled=True, + merge_period="hour", + merge_min_files=2, + ) + sink.connect() + + # Write multiple batches (will create separate files) + for i in range(3): + batch = pa.table({"id": [i], "value": [f"test_{i}"]}).to_batches()[0] + sink.write_batch(batch) + sink.flush() + time.sleep(0.01) # Small delay to ensure different timestamps + + # Should have 3 separate files + files_before = list(Path(tmpdir).glob("data_*.parquet")) + assert len(files_before) == 3 + + # Merge files + merged_count = sink.merge_files() + + # Should have merged all 3 files + assert merged_count == 3 + + # Should now have 1 merged file + merged_files = list(Path(tmpdir).glob("merged_*.parquet")) + assert len(merged_files) == 1 + + # Original files should be deleted + original_files = list(Path(tmpdir).glob("data_*.parquet")) + assert len(original_files) == 0 + + # Verify merged file contains all data + table = pq.read_table(merged_files[0]) + assert table.num_rows == 3 + + sink.close() + + +def test_local_sink_merge_by_day(): + """Test merging files by day.""" + with tempfile.TemporaryDirectory() as tmpdir: + sink = LocalSink( + base_path=tmpdir, + merge_enabled=True, + merge_period="day", + merge_min_files=2, + ) + sink.connect() + + # Write multiple batches + for i in range(3): + batch = pa.table({"id": [i], "value": [f"test_{i}"]}).to_batches()[0] + sink.write_batch(batch) + sink.flush() + + # Merge files + merged_count = sink.merge_files() + assert merged_count == 3 + + # Should have 1 merged file + merged_files = list(Path(tmpdir).glob("merged_*.parquet")) + assert len(merged_files) == 1 + + sink.close() + + +def test_local_sink_merge_min_files(): + """Test that merge respects minimum file count.""" + with tempfile.TemporaryDirectory() as tmpdir: + sink = LocalSink( + base_path=tmpdir, + merge_enabled=True, + merge_period="hour", + merge_min_files=5, # Require at least 5 files + ) + sink.connect() + + # Write only 3 batches + for i in range(3): + batch = pa.table({"id": [i], "value": [f"test_{i}"]}).to_batches()[0] + sink.write_batch(batch) + sink.flush() + + # Merge should not happen (only 3 files, need 5) + merged_count = sink.merge_files() + assert merged_count == 0 + + # Original files should still exist + original_files = list(Path(tmpdir).glob("data_*.parquet")) + assert len(original_files) == 3 + + # No merged files + merged_files = list(Path(tmpdir).glob("merged_*.parquet")) + assert len(merged_files) == 0 + + sink.close() + + +def test_local_sink_merge_on_flush(): + """Test automatic merge during flush.""" + with tempfile.TemporaryDirectory() as tmpdir: + sink = LocalSink( + base_path=tmpdir, + merge_enabled=True, + merge_period="hour", + merge_min_files=2, + merge_on_flush=True, + ) + sink.connect() + + # Write and flush batches multiple times to create separate files + for i in range(3): + batch = pa.table({"id": [i], "value": [f"test_{i}"]}).to_batches()[0] + sink.write_batch(batch) + sink.flush() # Each flush creates a separate file + + # After last flush with merge_on_flush=True, files should be merged + # Files should be merged + merged_files = list(Path(tmpdir).glob("merged_*.parquet")) + assert len(merged_files) == 1 + + sink.close() + + +def test_local_sink_merge_with_partitioning(): + """Test merging with partitioned data.""" + with tempfile.TemporaryDirectory() as tmpdir: + sink = LocalSink( + base_path=tmpdir, + partition_by=["date"], + merge_enabled=True, + merge_period="hour", + merge_min_files=2, + ) + sink.connect() + + schema = pa.schema([("id", pa.int64()), ("date", pa.string()), ("value", pa.string())]) + + # Write batches to same partition + for i in range(3): + batch = pa.table( + {"id": [i], "date": ["2024-01-01"], "value": [f"test_{i}"]}, schema=schema + ).to_batches()[0] + sink.write_batch(batch) + sink.flush() + + # Merge files + merged_count = sink.merge_files() + assert merged_count == 3 + + # Check merged file exists in partition directory + partition_dir = Path(tmpdir) / "date=2024-01-01" + assert partition_dir.exists() + + merged_files = list(partition_dir.glob("merged_*.parquet")) + assert len(merged_files) == 1 + + # Verify merged data (use ParquetFile to read single file, not as dataset) + pf = pq.ParquetFile(merged_files[0]) + table = pf.read() + assert table.num_rows == 3 + + sink.close() + + +def test_s3_sink_merge_disabled_by_default(): + """Test that S3 merge is disabled by default.""" + mock_s3_client = MagicMock() + + with patch("fs_data_sink.sinks.s3_sink.Minio", return_value=mock_s3_client): + sink = S3Sink( + bucket="test-bucket", + aws_access_key_id="test", + aws_secret_access_key="test", + ) + sink.connect() + + # Call merge - should return 0 (disabled) + merged_count = sink.merge_files() + assert merged_count == 0 + + sink.close() + + +def test_s3_sink_merge_enabled(): + """Test S3 merge with mock client.""" + mock_s3_client = MagicMock() + + # Mock list_objects to return some files + mock_obj1 = MagicMock() + mock_obj1.object_name = "data_20241113_100000_000001.parquet" + mock_obj2 = MagicMock() + mock_obj2.object_name = "data_20241113_100100_000002.parquet" + + mock_s3_client.list_objects.return_value = [mock_obj1, mock_obj2] + + # Mock get_object to return parquet data + def mock_get_object(bucket_name, object_name): + # Create a simple parquet file in memory + table = pa.table({"id": [1], "value": ["test"]}) + from io import BytesIO + + buffer = BytesIO() + pq.write_table(table, buffer) + buffer.seek(0) + + mock_response = MagicMock() + mock_response.read.return_value = buffer.getvalue() + return mock_response + + mock_s3_client.get_object.side_effect = mock_get_object + + with patch("fs_data_sink.sinks.s3_sink.Minio", return_value=mock_s3_client): + sink = S3Sink( + bucket="test-bucket", + aws_access_key_id="test", + aws_secret_access_key="test", + merge_enabled=True, + merge_period="hour", + merge_min_files=2, + ) + sink.connect() + + # Call merge + merged_count = sink.merge_files() + + # Should have merged 2 files + assert merged_count == 2 + + # Verify put_object was called for merged file + assert mock_s3_client.put_object.call_count == 1 + + # Verify remove_object was called for original files + assert mock_s3_client.remove_object.call_count == 2 + + sink.close() + + +def test_local_sink_merge_skips_already_merged_files(): + """Test that merge skips already merged files.""" + with tempfile.TemporaryDirectory() as tmpdir: + sink = LocalSink( + base_path=tmpdir, + merge_enabled=True, + merge_period="hour", + merge_min_files=2, + ) + sink.connect() + + # Write and merge files + for i in range(3): + batch = pa.table({"id": [i], "value": [f"test_{i}"]}).to_batches()[0] + sink.write_batch(batch) + sink.flush() + + # First merge + merged_count = sink.merge_files() + assert merged_count == 3 + + # Try to merge again - should find no files to merge + merged_count2 = sink.merge_files() + assert merged_count2 == 0 + + # Still only 1 merged file + merged_files = list(Path(tmpdir).glob("merged_*.parquet")) + assert len(merged_files) == 1 + + sink.close() + + +def test_merge_config_loading(): + """Test that merge configuration is loaded correctly.""" + from fs_data_sink.config.settings import SinkConfig + + config = SinkConfig( + type="local", + base_path="/tmp/test", + merge_enabled=True, + merge_period="day", + merge_min_files=5, + merge_on_flush=True, + ) + + assert config.merge_enabled is True + assert config.merge_period == "day" + assert config.merge_min_files == 5 + assert config.merge_on_flush is True + + +def test_merge_config_defaults(): + """Test default merge configuration values.""" + from fs_data_sink.config.settings import SinkConfig + + config = SinkConfig(type="local", base_path="/tmp/test") + + assert config.merge_enabled is False + assert config.merge_period == "hour" + assert config.merge_min_files == 2 + assert config.merge_on_flush is False diff --git a/tests/unit/test_types.py b/tests/unit/test_types.py index 4f1dfd3..aac49af 100644 --- a/tests/unit/test_types.py +++ b/tests/unit/test_types.py @@ -49,6 +49,9 @@ def write_batch(self, batch, partition_cols=None): def flush(self): pass + def merge_files(self, period=None): + return 0 + def close(self): self.connected = False From bd9f55c0f1c896e2be888039848b39b8a163d873 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 13 Nov 2025 06:44:03 +0000 Subject: [PATCH 3/5] Fix linter warnings (remove trailing whitespace) Co-authored-by: slhmy <31381093+slhmy@users.noreply.github.com> --- src/fs_data_sink/config/settings.py | 2 +- src/fs_data_sink/sinks/hdfs_sink.py | 6 ++-- src/fs_data_sink/sinks/local_sink.py | 42 ++++++++++++++-------------- src/fs_data_sink/sinks/s3_sink.py | 6 ++-- src/fs_data_sink/types.py | 4 +-- 5 files changed, 30 insertions(+), 30 deletions(-) diff --git a/src/fs_data_sink/config/settings.py b/src/fs_data_sink/config/settings.py index e01e0eb..fa82d5c 100644 --- a/src/fs_data_sink/config/settings.py +++ b/src/fs_data_sink/config/settings.py @@ -61,7 +61,7 @@ class SinkConfig: compression: str = "snappy" partition_by: Optional[list[str]] = None extra_config: dict = field(default_factory=dict) - + # Merge configuration merge_enabled: bool = False merge_period: str = "hour" # 'hour', 'day', 'week', 'month' diff --git a/src/fs_data_sink/sinks/hdfs_sink.py b/src/fs_data_sink/sinks/hdfs_sink.py index 69dbf8c..703cdf9 100644 --- a/src/fs_data_sink/sinks/hdfs_sink.py +++ b/src/fs_data_sink/sinks/hdfs_sink.py @@ -339,7 +339,7 @@ def _merge_hdfs_file_group( buffer = BytesIO(data) parquet_file = pq.ParquetFile(buffer) table = parquet_file.read(use_pandas_metadata=False) - + # Convert any dictionary-encoded columns to regular columns columns = [] for i, field in enumerate(table.schema): @@ -348,7 +348,7 @@ def _merge_hdfs_file_group( columns.append(column.dictionary_decode()) else: columns.append(column) - + # Rebuild table without dictionary encoding if columns: schema = pa.schema([ @@ -359,7 +359,7 @@ def _merge_hdfs_file_group( for field in table.schema ]) table = pa.Table.from_arrays(columns, schema=schema) - + tables.append(table) except Exception as e: logger.error("Failed to read HDFS file %s: %s", file_path, e) diff --git a/src/fs_data_sink/sinks/local_sink.py b/src/fs_data_sink/sinks/local_sink.py index 9c49703..794866c 100644 --- a/src/fs_data_sink/sinks/local_sink.py +++ b/src/fs_data_sink/sinks/local_sink.py @@ -182,7 +182,7 @@ def merge_files(self, period: Optional[str] = None) -> int: try: # Find all directories to process (including partitioned directories) dirs_to_process = [self.base_path] - + # If partitioned, include partition directories if self.partition_by: for root, dirs, _ in self.base_path.walk(): @@ -225,10 +225,10 @@ def _group_files_by_period( from collections import defaultdict groups = defaultdict(list) - + # Find all parquet files in the directory parquet_files = list(directory.glob("data_*.parquet")) - + for file_path in parquet_files: try: # Extract timestamp from filename (format: data_YYYYMMDD_HHMMSS_*.parquet) @@ -236,16 +236,16 @@ def _group_files_by_period( parts = filename.split("_") if len(parts) < 3: continue - + date_str = parts[1] # YYYYMMDD time_str = parts[2] # HHMMSS - + # Parse timestamp year = date_str[0:4] month = date_str[4:6] day = date_str[6:8] hour = time_str[0:2] - + # Group by period if period == "hour": group_key = f"{year}{month}{day}_{hour}" @@ -262,13 +262,13 @@ def _group_files_by_period( else: logger.warning("Unknown merge period: %s, using 'day'", period) group_key = f"{year}{month}{day}" - + groups[group_key].append(file_path) - + except (ValueError, IndexError) as e: logger.warning("Could not parse timestamp from file %s: %s", file_path, e) continue - + return groups def _merge_file_group( @@ -277,7 +277,7 @@ def _merge_file_group( """Merge a group of files into a single consolidated file.""" try: logger.info("Merging %d files for period %s", len(files), period_key) - + # Read all files (avoid dictionary encoding for easier merging) tables = [] for file_path in files: @@ -285,7 +285,7 @@ def _merge_file_group( # Read table without dictionary encoding to avoid schema conflicts parquet_file = pq.ParquetFile(file_path) table = parquet_file.read(use_pandas_metadata=False) - + # Convert any dictionary-encoded columns to regular columns columns = [] for i, field in enumerate(table.schema): @@ -294,7 +294,7 @@ def _merge_file_group( columns.append(column.dictionary_decode()) else: columns.append(column) - + # Rebuild table without dictionary encoding if columns: schema = pa.schema([ @@ -305,23 +305,23 @@ def _merge_file_group( for field in table.schema ]) table = pa.Table.from_arrays(columns, schema=schema) - + tables.append(table) except Exception as e: logger.error("Failed to read file %s: %s", file_path, e) continue - + if not tables: logger.warning("No tables to merge for period %s", period_key) return 0 - + # Concatenate all tables merged_table = pa.concat_tables(tables) - + # Generate merged filename merged_filename = f"merged_{period_key}.parquet" merged_path = directory / merged_filename - + # Write merged file (without forcing dictionary encoding) pq.write_table( merged_table, @@ -330,7 +330,7 @@ def _merge_file_group( use_dictionary=False, # Don't use dictionary encoding for merged files version="2.6", ) - + merged_size = merged_path.stat().st_size logger.info( "Created merged file: %s (%d rows, %d bytes)", @@ -338,7 +338,7 @@ def _merge_file_group( merged_table.num_rows, merged_size, ) - + # Delete original files for file_path in files: try: @@ -346,9 +346,9 @@ def _merge_file_group( logger.debug("Deleted original file: %s", file_path) except Exception as e: logger.error("Failed to delete file %s: %s", file_path, e) - + return len(files) - + except Exception as e: logger.error("Error merging files for period %s: %s", period_key, e, exc_info=True) return 0 diff --git a/src/fs_data_sink/sinks/s3_sink.py b/src/fs_data_sink/sinks/s3_sink.py index 5e9717d..6bc00bb 100644 --- a/src/fs_data_sink/sinks/s3_sink.py +++ b/src/fs_data_sink/sinks/s3_sink.py @@ -369,7 +369,7 @@ def _merge_s3_file_group(self, obj_dir: str, period_key: str, objects_list: list buffer = BytesIO(data) parquet_file = pq.ParquetFile(buffer) table = parquet_file.read(use_pandas_metadata=False) - + # Convert any dictionary-encoded columns to regular columns columns = [] for i, field in enumerate(table.schema): @@ -378,7 +378,7 @@ def _merge_s3_file_group(self, obj_dir: str, period_key: str, objects_list: list columns.append(column.dictionary_decode()) else: columns.append(column) - + # Rebuild table without dictionary encoding if columns: schema = pa.schema([ @@ -389,7 +389,7 @@ def _merge_s3_file_group(self, obj_dir: str, period_key: str, objects_list: list for field in table.schema ]) table = pa.Table.from_arrays(columns, schema=schema) - + tables.append(table) except Exception as e: diff --git a/src/fs_data_sink/types.py b/src/fs_data_sink/types.py index a226f2b..270ca7d 100644 --- a/src/fs_data_sink/types.py +++ b/src/fs_data_sink/types.py @@ -57,11 +57,11 @@ def flush(self) -> None: def merge_files(self, period: Optional[str] = None) -> int: """ Merge small Parquet files into larger consolidated files. - + Args: period: Time period for grouping files ('hour', 'day', 'week', 'month') If None, uses the sink's configured merge_period - + Returns: Number of files merged """ From 1162a77bec1bc294c32cf0c0a4caa08450d1af80 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 13 Nov 2025 06:48:47 +0000 Subject: [PATCH 4/5] Address code review comments - Fix Python 3.9 compatibility: use os.walk() instead of Path.walk() - Add comprehensive documentation for dictionary encoding decision in merge methods - Improve docstrings to explain merge behavior and schema compatibility handling Co-authored-by: slhmy <31381093+slhmy@users.noreply.github.com> --- src/fs_data_sink/sinks/hdfs_sink.py | 5 +++++ src/fs_data_sink/sinks/local_sink.py | 12 ++++++++++-- src/fs_data_sink/sinks/s3_sink.py | 5 +++++ 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/fs_data_sink/sinks/hdfs_sink.py b/src/fs_data_sink/sinks/hdfs_sink.py index 703cdf9..4fe0b10 100644 --- a/src/fs_data_sink/sinks/hdfs_sink.py +++ b/src/fs_data_sink/sinks/hdfs_sink.py @@ -198,6 +198,11 @@ def merge_files(self, period: Optional[str] = None) -> int: """ Merge small Parquet files into larger consolidated files by time period. + This method reads existing Parquet files from HDFS, groups them by time period based on + their filenames, and merges files in each group into a single consolidated file. + Dictionary encoding is explicitly disabled for merged files to avoid schema compatibility + issues when reading files that may have been written with different dictionary encodings. + Args: period: Time period for grouping files ('hour', 'day', 'week', 'month') If None, uses the sink's configured merge_period diff --git a/src/fs_data_sink/sinks/local_sink.py b/src/fs_data_sink/sinks/local_sink.py index 794866c..ca0abdb 100644 --- a/src/fs_data_sink/sinks/local_sink.py +++ b/src/fs_data_sink/sinks/local_sink.py @@ -1,6 +1,7 @@ """Local filesystem data sink implementation.""" import logging +import os from datetime import datetime from pathlib import Path from typing import Optional @@ -161,6 +162,11 @@ def merge_files(self, period: Optional[str] = None) -> int: """ Merge small Parquet files into larger consolidated files by time period. + This method reads existing Parquet files, groups them by time period based on their + filenames, and merges files in each group into a single consolidated file. Dictionary + encoding is explicitly disabled for merged files to avoid schema compatibility issues + when reading files that may have been written with different dictionary encodings. + Args: period: Time period for grouping files ('hour', 'day', 'week', 'month') If None, uses the sink's configured merge_period @@ -185,9 +191,11 @@ def merge_files(self, period: Optional[str] = None) -> int: # If partitioned, include partition directories if self.partition_by: - for root, dirs, _ in self.base_path.walk(): + # Use os.walk for Python 3.9+ compatibility + for root, dirs, _ in os.walk(self.base_path): + root_path = Path(root) for d in dirs: - dirs_to_process.append(root / d) + dirs_to_process.append(root_path / d) for directory in dirs_to_process: if not directory.exists(): diff --git a/src/fs_data_sink/sinks/s3_sink.py b/src/fs_data_sink/sinks/s3_sink.py index 6bc00bb..5d1634b 100644 --- a/src/fs_data_sink/sinks/s3_sink.py +++ b/src/fs_data_sink/sinks/s3_sink.py @@ -239,6 +239,11 @@ def merge_files(self, period: Optional[str] = None) -> int: """ Merge small Parquet files into larger consolidated files by time period. + This method reads existing Parquet files from S3, groups them by time period based on + their filenames, and merges files in each group into a single consolidated file. + Dictionary encoding is explicitly disabled for merged files to avoid schema compatibility + issues when reading files that may have been written with different dictionary encodings. + Args: period: Time period for grouping files ('hour', 'day', 'week', 'month') If None, uses the sink's configured merge_period From baef95762fea0c92394ada06315489c2dc886572 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 13 Nov 2025 07:16:10 +0000 Subject: [PATCH 5/5] Fix formatting issues with black Run black formatter on sink files to fix formatting issues: - src/fs_data_sink/sinks/local_sink.py - src/fs_data_sink/sinks/hdfs_sink.py - src/fs_data_sink/sinks/s3_sink.py Co-authored-by: slhmy <31381093+slhmy@users.noreply.github.com> --- src/fs_data_sink/sinks/hdfs_sink.py | 33 ++++++++++++++++------------ src/fs_data_sink/sinks/local_sink.py | 29 +++++++++++++----------- src/fs_data_sink/sinks/s3_sink.py | 30 +++++++++++++++++-------- 3 files changed, 56 insertions(+), 36 deletions(-) diff --git a/src/fs_data_sink/sinks/hdfs_sink.py b/src/fs_data_sink/sinks/hdfs_sink.py index 4fe0b10..3c34410 100644 --- a/src/fs_data_sink/sinks/hdfs_sink.py +++ b/src/fs_data_sink/sinks/hdfs_sink.py @@ -266,9 +266,7 @@ def merge_files(self, period: Optional[str] = None) -> int: logger.error("Error during HDFS file merge: %s", e, exc_info=True) raise - def _group_hdfs_files_by_period( - self, directory: str, period: str - ) -> dict[str, list[str]]: + def _group_hdfs_files_by_period(self, directory: str, period: str) -> dict[str, list[str]]: """Group HDFS Parquet files by time period based on filename timestamp.""" from collections import defaultdict @@ -316,6 +314,7 @@ def _extract_period_from_filename(self, filename: str, period: str) -> Optional[ return f"{year}{month}{day}" elif period == "week": from datetime import datetime + dt = datetime(int(year), int(month), int(day)) week = dt.isocalendar()[1] return f"{year}W{week:02d}" @@ -328,12 +327,12 @@ def _extract_period_from_filename(self, filename: str, period: str) -> Optional[ logger.warning("Could not parse timestamp from filename %s: %s", filename, e) return None - def _merge_hdfs_file_group( - self, directory: str, period_key: str, files: list[str] - ) -> int: + def _merge_hdfs_file_group(self, directory: str, period_key: str, files: list[str]) -> int: """Merge a group of HDFS files into a single consolidated file.""" try: - logger.info("Merging %d HDFS files for period %s in %s", len(files), period_key, directory) + logger.info( + "Merging %d HDFS files for period %s in %s", len(files), period_key, directory + ) # Read all files (avoid dictionary encoding for easier merging) tables = [] @@ -356,13 +355,19 @@ def _merge_hdfs_file_group( # Rebuild table without dictionary encoding if columns: - schema = pa.schema([ - pa.field( - field.name, - field.type.value_type if pa.types.is_dictionary(field.type) else field.type - ) - for field in table.schema - ]) + schema = pa.schema( + [ + pa.field( + field.name, + ( + field.type.value_type + if pa.types.is_dictionary(field.type) + else field.type + ), + ) + for field in table.schema + ] + ) table = pa.Table.from_arrays(columns, schema=schema) tables.append(table) diff --git a/src/fs_data_sink/sinks/local_sink.py b/src/fs_data_sink/sinks/local_sink.py index ca0abdb..2991e46 100644 --- a/src/fs_data_sink/sinks/local_sink.py +++ b/src/fs_data_sink/sinks/local_sink.py @@ -226,9 +226,7 @@ def merge_files(self, period: Optional[str] = None) -> int: logger.error("Error during file merge: %s", e, exc_info=True) raise - def _group_files_by_period( - self, directory: Path, period: str - ) -> dict[str, list[Path]]: + def _group_files_by_period(self, directory: Path, period: str) -> dict[str, list[Path]]: """Group Parquet files by time period based on filename timestamp.""" from collections import defaultdict @@ -262,6 +260,7 @@ def _group_files_by_period( elif period == "week": # Calculate week number from datetime import datetime + dt = datetime(int(year), int(month), int(day)) week = dt.isocalendar()[1] group_key = f"{year}W{week:02d}" @@ -279,9 +278,7 @@ def _group_files_by_period( return groups - def _merge_file_group( - self, directory: Path, period_key: str, files: list[Path] - ) -> int: + def _merge_file_group(self, directory: Path, period_key: str, files: list[Path]) -> int: """Merge a group of files into a single consolidated file.""" try: logger.info("Merging %d files for period %s", len(files), period_key) @@ -305,13 +302,19 @@ def _merge_file_group( # Rebuild table without dictionary encoding if columns: - schema = pa.schema([ - pa.field( - field.name, - field.type.value_type if pa.types.is_dictionary(field.type) else field.type - ) - for field in table.schema - ]) + schema = pa.schema( + [ + pa.field( + field.name, + ( + field.type.value_type + if pa.types.is_dictionary(field.type) + else field.type + ), + ) + for field in table.schema + ] + ) table = pa.Table.from_arrays(columns, schema=schema) tables.append(table) diff --git a/src/fs_data_sink/sinks/s3_sink.py b/src/fs_data_sink/sinks/s3_sink.py index 5d1634b..b34dee1 100644 --- a/src/fs_data_sink/sinks/s3_sink.py +++ b/src/fs_data_sink/sinks/s3_sink.py @@ -273,12 +273,15 @@ def merge_files(self, period: Optional[str] = None) -> int: # Group objects by directory and time period from collections import defaultdict + dir_groups = defaultdict(lambda: defaultdict(list)) for obj in objects: if not obj.object_name.endswith(".parquet"): continue - if obj.object_name.startswith(f"{self.prefix}/merged_" if self.prefix else "merged_"): + if obj.object_name.startswith( + f"{self.prefix}/merged_" if self.prefix else "merged_" + ): # Skip already merged files continue @@ -341,6 +344,7 @@ def _extract_period_from_filename(self, filename: str, period: str) -> Optional[ return f"{year}{month}{day}" elif period == "week": from datetime import datetime + dt = datetime(int(year), int(month), int(day)) week = dt.isocalendar()[1] return f"{year}W{week:02d}" @@ -356,7 +360,9 @@ def _extract_period_from_filename(self, filename: str, period: str) -> Optional[ def _merge_s3_file_group(self, obj_dir: str, period_key: str, objects_list: list) -> int: """Merge a group of S3 objects into a single consolidated file.""" try: - logger.info("Merging %d S3 objects for period %s in %s", len(objects_list), period_key, obj_dir) + logger.info( + "Merging %d S3 objects for period %s in %s", len(objects_list), period_key, obj_dir + ) # Download and read all objects (avoid dictionary encoding for easier merging) tables = [] @@ -386,13 +392,19 @@ def _merge_s3_file_group(self, obj_dir: str, period_key: str, objects_list: list # Rebuild table without dictionary encoding if columns: - schema = pa.schema([ - pa.field( - field.name, - field.type.value_type if pa.types.is_dictionary(field.type) else field.type - ) - for field in table.schema - ]) + schema = pa.schema( + [ + pa.field( + field.name, + ( + field.type.value_type + if pa.types.is_dictionary(field.type) + else field.type + ), + ) + for field in table.schema + ] + ) table = pa.Table.from_arrays(columns, schema=schema) tables.append(table)