Skip to content

Conversation

Copy link
Contributor

Copilot AI commented Nov 13, 2025

Periodic Parquet File Merging Implementation

Implemented feature to periodically merge small Parquet files into larger consolidated files by configured time periods (hourly, daily, weekly, monthly).

Changes Made:

  • Add configuration options for merge settings

    • merge_enabled: Enable/disable file merging
    • merge_period: Time-based merge interval (hour, day, week, month)
    • merge_min_files: Minimum number of files to trigger a merge
    • merge_on_flush: Merge files during flush operations
  • Create merge functionality

    • Add merge_files() method to DataSink interface
    • Implement merge in LocalSink
    • Implement merge in S3Sink
    • Implement merge in HDFSSink
    • Handle dictionary encoding compatibility issues
    • Group files by time period
    • Delete original files after successful merge
  • Integrate into pipeline

    • Pass merge configuration from SinkConfig to sinks
    • Support merge on flush option
    • Environment variable support
  • Add comprehensive tests (11 new tests)

    • Test merge disabled by default
    • Test merge by hour/day
    • Test minimum file threshold
    • Test merge on flush
    • Test merge with partitioning
    • Test S3 merge
    • Test config loading
  • Update documentation

    • Add merge configuration section to README
    • Document merge periods and triggers
    • Add example configuration file
    • Document environment variables
  • Code quality

    • All tests passing (45 tests, 0 failures)
    • Black formatting applied
    • Linter clean (ruff)
    • Code coverage: 56% overall, 81% for LocalSink
    • Address code review feedback
    • Python 3.9+ compatible

Key Features:

Flexible Time Periods: Merge by hour, day, week, or month
Minimum File Threshold: Only merge when enough files exist
Partitioning Support: Works with partitioned data
Multi-Storage: Works with S3, HDFS, and local filesystems
Schema Compatibility: Handles dictionary encoding in Parquet files
Automatic Cleanup: Deletes original files after merge
Python 3.9+ Compatible: Uses os.walk() for broad compatibility

Benefits:

  • Reduces small file problem in analytics systems
  • Improves query performance in ClickHouse, Presto, Spark
  • Reduces metadata overhead
  • Better resource utilization in distributed systems
Original prompt

This section details on the original issue you should resolve

<issue_title>Periodically merge the data in the folder</issue_title>
<issue_description>Parquet files should be merged by a configured period (like by hour or by day etc.)</issue_description>

Comments on the Issue (you are @copilot in this section)


✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.

Copilot AI and others added 3 commits November 13, 2025 06:42
- Add merge configuration to SinkConfig (merge_enabled, merge_period, merge_min_files, merge_on_flush)
- Add merge_files() method to DataSink interface
- Implement merge_files() in LocalSink, S3Sink, and HDFSSink
- Handle dictionary encoding issues when merging Parquet files
- Add comprehensive tests for merge functionality
- Update documentation with merge configuration examples
- Add environment variable support for merge settings

Co-authored-by: slhmy <31381093+slhmy@users.noreply.github.com>
Co-authored-by: slhmy <31381093+slhmy@users.noreply.github.com>
- Fix Python 3.9 compatibility: use os.walk() instead of Path.walk()
- Add comprehensive documentation for dictionary encoding decision in merge methods
- Improve docstrings to explain merge behavior and schema compatibility handling

Co-authored-by: slhmy <31381093+slhmy@users.noreply.github.com>
Copilot AI changed the title [WIP] Merge Parquet files periodically based on configured period Add periodic Parquet file merging by time period Nov 13, 2025
Copilot AI requested a review from slhmy November 13, 2025 06:52
@slhmy
Copy link
Contributor

slhmy commented Nov 13, 2025

Fix format and lint issues @copilot

Run black --check src/ tests/
would reformat /home/runner/work/fs-data-sink/fs-data-sink/src/fs_data_sink/sinks/hdfs_sink.py
would reformat /home/runner/work/fs-data-sink/fs-data-sink/src/fs_data_sink/sinks/local_sink.py
would reformat /home/runner/work/fs-data-sink/fs-data-sink/src/fs_data_sink/sinks/s3_sink.py

Oh no! 💥 💔 💥
3 files would be reformatted, 22 files would be left unchanged.

@slhmy slhmy marked this pull request as ready for review November 13, 2025 07:12
Run black formatter on sink files to fix formatting issues:
- src/fs_data_sink/sinks/local_sink.py
- src/fs_data_sink/sinks/hdfs_sink.py
- src/fs_data_sink/sinks/s3_sink.py

Co-authored-by: slhmy <31381093+slhmy@users.noreply.github.com>
Copy link
Contributor Author

Copilot AI commented Nov 13, 2025

Fix format and lint issues @copilot

...

Fixed in baef957. Ran black formatter on all three sink files to resolve the formatting issues.

@slhmy slhmy merged commit 71c9a7c into main Nov 13, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Periodically merge the data in the folder

2 participants