From 294255ab620fa0e2dcd16fc9c4ff27d25feecd6a Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sat, 3 Jan 2026 18:23:26 +0900 Subject: [PATCH 1/9] Add chunk processing support for PolarsCursor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement memory-efficient chunked iteration for PolarsCursor and AsyncPolarsCursor using Polars' native lazy evaluation APIs. Features: - Add chunksize parameter to PolarsCursor and AsyncPolarsCursor - Add iter_chunks() method for memory-efficient chunk iteration - Use pl.scan_csv() and pl.scan_parquet() with collect_batches() for lazy evaluation - Support both CSV and Parquet (UNLOAD) result formats This follows the same pattern as PandasCursor's chunksize option, allowing users to process large datasets without loading the entire result set into memory. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- docs/polars.rst | 84 ++++++++++++++ pyathena/polars/async_cursor.py | 7 ++ pyathena/polars/cursor.py | 44 ++++++- pyathena/polars/result_set.py | 129 +++++++++++++++++++++ tests/pyathena/polars/test_async_cursor.py | 56 +++++++++ tests/pyathena/polars/test_cursor.py | 48 ++++++++ 6 files changed, 367 insertions(+), 1 deletion(-) diff --git a/docs/polars.rst b/docs/polars.rst index 55c7caf6..1b8273b4 100644 --- a/docs/polars.rst +++ b/docs/polars.rst @@ -246,6 +246,73 @@ SQLAlchemy allows this option to be specified in the connection string. NOTE: PolarsCursor handles the CSV file on memory. Pay attention to the memory capacity. +Chunksize Options +~~~~~~~~~~~~~~~~~ + +PolarsCursor supports memory-efficient chunked processing of large query results +using Polars' native lazy evaluation APIs. This allows processing datasets that +are too large to fit in memory. + +The chunksize option can be enabled by specifying an integer value in the ``cursor_kwargs`` +argument of the connect method or as an argument to the cursor method. + +.. code:: python + + from pyathena import connect + from pyathena.polars.cursor import PolarsCursor + + cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2", + cursor_class=PolarsCursor, + cursor_kwargs={ + "chunksize": 50_000 + }).cursor() + +.. code:: python + + from pyathena import connect + from pyathena.polars.cursor import PolarsCursor + + cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2", + cursor_class=PolarsCursor).cursor(chunksize=50_000) + +Use the ``iter_chunks()`` method to iterate over results in chunks: + +.. code:: python + + from pyathena import connect + from pyathena.polars.cursor import PolarsCursor + + cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2", + cursor_class=PolarsCursor).cursor(chunksize=50_000) + + cursor.execute("SELECT * FROM large_table") + for chunk in cursor.iter_chunks(): + # Process each chunk - chunk is a polars.DataFrame + processed = chunk.group_by('category').agg(pl.sum('value')) + print(f"Processed chunk with {chunk.height} rows") + +This method uses Polars' ``scan_csv()`` and ``scan_parquet()`` with ``collect_batches()`` +for efficient lazy evaluation, minimizing memory usage when processing large datasets. + +The chunked iteration also works with the unload option: + +.. code:: python + + from pyathena import connect + from pyathena.polars.cursor import PolarsCursor + + cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2", + cursor_class=PolarsCursor).cursor(chunksize=100_000, unload=True) + + cursor.execute("SELECT * FROM huge_table") + for chunk in cursor.iter_chunks(): + # Process Parquet data in chunks + process_chunk(chunk) + .. _async-polars-cursor: AsyncPolarsCursor @@ -414,6 +481,23 @@ As with AsyncPolarsCursor, the unload option is also available. region_name="us-west-2", cursor_class=AsyncPolarsCursor).cursor(unload=True) +As with PolarsCursor, the chunksize option is also available for memory-efficient processing. + +.. code:: python + + from pyathena import connect + from pyathena.polars.async_cursor import AsyncPolarsCursor + + cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2", + cursor_class=AsyncPolarsCursor).cursor(chunksize=50_000) + + query_id, future = cursor.execute("SELECT * FROM large_table") + result_set = future.result() + for chunk in result_set.iter_chunks(): + # Process each chunk + process_chunk(chunk) + .. _`polars.DataFrame object`: https://docs.pola.rs/api/python/stable/reference/dataframe/index.html .. _`Polars`: https://pola.rs/ .. _`Unload options`: arrow.html#unload-options diff --git a/pyathena/polars/async_cursor.py b/pyathena/polars/async_cursor.py index 2fc3ee6e..5bbc458a 100644 --- a/pyathena/polars/async_cursor.py +++ b/pyathena/polars/async_cursor.py @@ -73,6 +73,7 @@ def __init__( result_reuse_minutes: int = CursorIterator.DEFAULT_RESULT_REUSE_MINUTES, block_size: Optional[int] = None, cache_type: Optional[str] = None, + chunksize: Optional[int] = None, **kwargs, ) -> None: """Initialize an AsyncPolarsCursor. @@ -93,10 +94,14 @@ def __init__( result_reuse_minutes: Minutes to reuse cached results. block_size: S3 read block size. cache_type: S3 caching strategy. + chunksize: Number of rows per chunk for memory-efficient processing. + If specified, enables chunked iteration via iter_chunks(). **kwargs: Additional connection parameters. Example: >>> cursor = connection.cursor(AsyncPolarsCursor, unload=True) + >>> # With chunked processing + >>> cursor = connection.cursor(AsyncPolarsCursor, chunksize=50000) """ super().__init__( s3_staging_dir=s3_staging_dir, @@ -116,6 +121,7 @@ def __init__( self._unload = unload self._block_size = block_size self._cache_type = cache_type + self._chunksize = chunksize @staticmethod def get_default_converter( @@ -172,6 +178,7 @@ def _collect_result_set( block_size=self._block_size, cache_type=self._cache_type, max_workers=self._max_workers, + chunksize=self._chunksize, **kwargs, ) diff --git a/pyathena/polars/cursor.py b/pyathena/polars/cursor.py index e8877e2e..ff793c67 100644 --- a/pyathena/polars/cursor.py +++ b/pyathena/polars/cursor.py @@ -3,7 +3,18 @@ import logging from multiprocessing import cpu_count -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union, cast +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + Iterator, + List, + Optional, + Tuple, + Union, + cast, +) from pyathena.common import BaseCursor, CursorIterator from pyathena.error import OperationalError, ProgrammingError @@ -74,6 +85,7 @@ def __init__( block_size: Optional[int] = None, cache_type: Optional[str] = None, max_workers: int = (cpu_count() or 1) * 5, + chunksize: Optional[int] = None, **kwargs, ) -> None: """Initialize a PolarsCursor. @@ -94,10 +106,14 @@ def __init__( block_size: S3 read block size. cache_type: S3 caching strategy. max_workers: Maximum worker threads for parallel S3 operations. + chunksize: Number of rows per chunk for memory-efficient processing. + If specified, enables chunked iteration via iter_chunks(). **kwargs: Additional connection parameters. Example: >>> cursor = connection.cursor(PolarsCursor, unload=True) + >>> # With chunked processing + >>> cursor = connection.cursor(PolarsCursor, chunksize=50000) """ super().__init__( s3_staging_dir=s3_staging_dir, @@ -117,6 +133,7 @@ def __init__( self._block_size = block_size self._cache_type = cache_type self._max_workers = max_workers + self._chunksize = chunksize self._query_id: Optional[str] = None self._result_set: Optional[AthenaPolarsResultSet] = None @@ -272,6 +289,7 @@ def execute( block_size=self._block_size, cache_type=self._cache_type, max_workers=self._max_workers, + chunksize=self._chunksize, **kwargs, ) else: @@ -404,3 +422,27 @@ def as_arrow(self) -> "Table": raise ProgrammingError("No result set.") result_set = cast(AthenaPolarsResultSet, self.result_set) return result_set.as_arrow() + + def iter_chunks(self) -> Iterator["pl.DataFrame"]: + """Iterate over result chunks as Polars DataFrames. + + This method provides an iterator interface for processing large result sets + in chunks, preventing memory exhaustion when working with datasets that are + too large to fit in memory as a single DataFrame. + + Yields: + Polars DataFrame for each chunk of rows. + + Raises: + ProgrammingError: If no result set is available or chunksize was not set. + + Example: + >>> cursor = connection.cursor(PolarsCursor, chunksize=50000) + >>> cursor.execute("SELECT * FROM large_table") + >>> for chunk in cursor.iter_chunks(): + ... process_chunk(chunk) # Each chunk is a Polars DataFrame + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaPolarsResultSet, self.result_set) + yield from result_set.iter_chunks() diff --git a/pyathena/polars/result_set.py b/pyathena/polars/result_set.py index 6c4d18c2..55e5b3c0 100644 --- a/pyathena/polars/result_set.py +++ b/pyathena/polars/result_set.py @@ -8,6 +8,7 @@ Any, Callable, Dict, + Iterator, List, Optional, Tuple, @@ -79,6 +80,7 @@ def __init__( block_size: Optional[int] = None, cache_type: Optional[str] = None, max_workers: int = (cpu_count() or 1) * 5, + chunksize: Optional[int] = None, **kwargs, ) -> None: """Initialize the Polars result set. @@ -94,6 +96,8 @@ def __init__( block_size: Block size for S3 file reading. cache_type: Cache type for S3 file system. max_workers: Maximum number of worker threads. + chunksize: Number of rows per chunk for memory-efficient processing. + If specified, enables chunked iteration via iter_chunks(). **kwargs: Additional arguments passed to Polars read functions. """ super().__init__( @@ -110,6 +114,7 @@ def __init__( self._block_size = block_size self._cache_type = cache_type self._max_workers = max_workers + self._chunksize = chunksize self._kwargs = kwargs if self.state == AthenaQueryExecution.STATE_SUCCEEDED and self.output_location: self._df = self._as_polars() @@ -414,6 +419,130 @@ def as_arrow(self) -> "Table": "pyarrow is required for as_arrow(). Install it with: pip install pyarrow" ) from e + def _get_csv_params(self) -> Tuple[str, bool, Optional[List[str]]]: + """Get CSV parsing parameters based on file type. + + Returns: + Tuple of (separator, has_header, new_columns). + """ + if self.output_location and self.output_location.endswith(".txt"): + separator = "\t" + has_header = False + description = self.description if self.description else [] + new_columns: Optional[List[str]] = [d[0] for d in description] + else: + separator = "," + has_header = True + new_columns = None + return separator, has_header, new_columns + + def _iter_csv_chunks(self) -> Iterator["pl.DataFrame"]: + """Iterate over CSV data in chunks using lazy evaluation. + + Yields: + Polars DataFrame for each chunk. + + Raises: + ProgrammingError: If output location is not set. + OperationalError: If reading the CSV file fails. + """ + import polars as pl + + if not self.output_location: + raise ProgrammingError("OutputLocation is none or empty.") + if not self.output_location.endswith((".csv", ".txt")): + return + if self.substatement_type and self.substatement_type.upper() in ( + "UPDATE", + "DELETE", + "MERGE", + "VACUUM_TABLE", + ): + return + length = self._get_content_length() + if length == 0: + return + + separator, has_header, new_columns = self._get_csv_params() + + try: + # scan_csv uses Rust's native object_store (like scan_parquet), + # not fsspec, so we use the same storage options as Parquet + lazy_df = pl.scan_csv( + self.output_location, + separator=separator, + has_header=has_header, + schema_overrides=self.dtypes, + storage_options=self._parquet_storage_options, + **self._kwargs, + ) + for batch in lazy_df.collect_batches(chunk_size=self._chunksize): + if new_columns: + batch.columns = new_columns + yield batch + except Exception as e: + _logger.exception(f"Failed to read {self.output_location}.") + raise OperationalError(*e.args) from e + + def _iter_parquet_chunks(self) -> Iterator["pl.DataFrame"]: + """Iterate over Parquet data in chunks using lazy evaluation. + + Yields: + Polars DataFrame for each chunk. + + Raises: + OperationalError: If reading the Parquet files fails. + """ + import polars as pl + + manifests = self._read_data_manifest() + if not manifests: + return + if not self._unload_location: + self._unload_location = "/".join(manifests[0].split("/")[:-1]) + "/" + + try: + lazy_df = pl.scan_parquet( + self._unload_location, + storage_options=self._parquet_storage_options, + **self._kwargs, + ) + for batch in lazy_df.collect_batches(chunk_size=self._chunksize): + yield batch + except Exception as e: + _logger.exception(f"Failed to read {self._unload_location}.") + raise OperationalError(*e.args) from e + + def iter_chunks(self) -> Iterator["pl.DataFrame"]: + """Iterate over result chunks as Polars DataFrames. + + This method provides an iterator interface for processing large result sets + in chunks, preventing memory exhaustion when working with datasets that are + too large to fit in memory as a single DataFrame. + + Yields: + Polars DataFrame for each chunk of rows. + + Raises: + ProgrammingError: If chunksize was not set during cursor initialization. + + Example: + >>> cursor = connection.cursor(PolarsCursor, chunksize=50000) + >>> cursor.execute("SELECT * FROM large_table") + >>> for chunk in cursor.iter_chunks(): + ... process_chunk(chunk) # Each chunk is a Polars DataFrame + """ + if self._chunksize is None: + raise ProgrammingError( + "chunksize must be set to use iter_chunks(). " + "Example: cursor = connection.cursor(PolarsCursor, chunksize=50000)" + ) + + if self.is_unload: + yield from self._iter_parquet_chunks() + else: + yield from self._iter_csv_chunks() + def close(self) -> None: """Close the result set and release resources.""" import polars as pl diff --git a/tests/pyathena/polars/test_async_cursor.py b/tests/pyathena/polars/test_async_cursor.py index fd9d61ba..2d63641f 100644 --- a/tests/pyathena/polars/test_async_cursor.py +++ b/tests/pyathena/polars/test_async_cursor.py @@ -324,3 +324,59 @@ def test_empty_result_unload(self, async_polars_cursor): df = future.result().as_polars() assert df.height == 0 assert df.width == 0 + + def test_iter_chunks(self): + """Test chunked iteration over query results.""" + with contextlib.closing(connect(schema_name=ENV.schema)) as conn: + cursor = conn.cursor(AsyncPolarsCursor, chunksize=5) + query_id, future = cursor.execute("SELECT * FROM many_rows LIMIT 15") + assert query_id is not None + result_set = future.result() + chunks = list(result_set.iter_chunks()) + assert len(chunks) > 0 + total_rows = sum(chunk.height for chunk in chunks) + assert total_rows == 15 + for chunk in chunks: + assert isinstance(chunk, pl.DataFrame) + + def test_iter_chunks_without_chunksize(self, async_polars_cursor): + """Test that iter_chunks raises ProgrammingError when chunksize is not set.""" + query_id, future = async_polars_cursor.execute("SELECT * FROM one_row") + assert query_id is not None + result_set = future.result() + with pytest.raises(ProgrammingError) as exc_info: + list(result_set.iter_chunks()) + assert "chunksize must be set" in str(exc_info.value) + + def test_iter_chunks_many_rows(self): + """Test chunked iteration with many rows.""" + with contextlib.closing(connect(schema_name=ENV.schema)) as conn: + cursor = conn.cursor(AsyncPolarsCursor, chunksize=1000) + query_id, future = cursor.execute("SELECT * FROM many_rows") + assert query_id is not None + result_set = future.result() + chunks = list(result_set.iter_chunks()) + total_rows = sum(chunk.height for chunk in chunks) + assert total_rows == 10000 + assert len(chunks) >= 10 # At least 10 chunks with chunksize=1000 + + @pytest.mark.parametrize( + "async_polars_cursor", + [ + { + "cursor_kwargs": {"unload": True, "chunksize": 5}, + }, + ], + indirect=["async_polars_cursor"], + ) + def test_iter_chunks_unload(self, async_polars_cursor): + """Test chunked iteration with UNLOAD (Parquet).""" + query_id, future = async_polars_cursor.execute("SELECT * FROM many_rows LIMIT 15") + assert query_id is not None + result_set = future.result() + chunks = list(result_set.iter_chunks()) + assert len(chunks) > 0 + total_rows = sum(chunk.height for chunk in chunks) + assert total_rows == 15 + for chunk in chunks: + assert isinstance(chunk, pl.DataFrame) diff --git a/tests/pyathena/polars/test_cursor.py b/tests/pyathena/polars/test_cursor.py index cd19c958..b214c415 100644 --- a/tests/pyathena/polars/test_cursor.py +++ b/tests/pyathena/polars/test_cursor.py @@ -447,3 +447,51 @@ def test_callback(query_id: str): assert len(callback_results) == 1 assert callback_results[0] == polars_cursor.query_id assert polars_cursor.query_id is not None + + def test_iter_chunks(self): + """Test chunked iteration over query results.""" + with contextlib.closing(connect(schema_name=ENV.schema)) as conn: + cursor = conn.cursor(PolarsCursor, chunksize=5) + cursor.execute("SELECT * FROM many_rows LIMIT 15") + chunks = list(cursor.iter_chunks()) + assert len(chunks) > 0 + total_rows = sum(chunk.height for chunk in chunks) + assert total_rows == 15 + for chunk in chunks: + assert isinstance(chunk, pl.DataFrame) + + def test_iter_chunks_without_chunksize(self, polars_cursor): + """Test that iter_chunks raises ProgrammingError when chunksize is not set.""" + polars_cursor.execute("SELECT * FROM one_row") + with pytest.raises(ProgrammingError) as exc_info: + list(polars_cursor.iter_chunks()) + assert "chunksize must be set" in str(exc_info.value) + + def test_iter_chunks_many_rows(self): + """Test chunked iteration with many rows.""" + with contextlib.closing(connect(schema_name=ENV.schema)) as conn: + cursor = conn.cursor(PolarsCursor, chunksize=1000) + cursor.execute("SELECT * FROM many_rows") + chunks = list(cursor.iter_chunks()) + total_rows = sum(chunk.height for chunk in chunks) + assert total_rows == 10000 + assert len(chunks) >= 10 # At least 10 chunks with chunksize=1000 + + @pytest.mark.parametrize( + "polars_cursor", + [ + { + "cursor_kwargs": {"unload": True, "chunksize": 5}, + }, + ], + indirect=["polars_cursor"], + ) + def test_iter_chunks_unload(self, polars_cursor): + """Test chunked iteration with UNLOAD (Parquet).""" + polars_cursor.execute("SELECT * FROM many_rows LIMIT 15") + chunks = list(polars_cursor.iter_chunks()) + assert len(chunks) > 0 + total_rows = sum(chunk.height for chunk in chunks) + assert total_rows == 15 + for chunk in chunks: + assert isinstance(chunk, pl.DataFrame) From b8d87d2db34d8802e5cf3ccf497e7e877b465d04 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sat, 3 Jan 2026 18:35:23 +0900 Subject: [PATCH 2/9] Add data consistency and chunk size tests for iter_chunks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Following the PandasCursor test patterns, add: - test_iter_chunks_data_consistency: Verify chunked and regular reading produce the same data - test_iter_chunks_chunk_sizes: Verify each chunk respects the specified chunksize limit 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- tests/pyathena/polars/test_async_cursor.py | 50 ++++++++++++++++++++++ tests/pyathena/polars/test_cursor.py | 45 +++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/tests/pyathena/polars/test_async_cursor.py b/tests/pyathena/polars/test_async_cursor.py index 2d63641f..4dfabb84 100644 --- a/tests/pyathena/polars/test_async_cursor.py +++ b/tests/pyathena/polars/test_async_cursor.py @@ -380,3 +380,53 @@ def test_iter_chunks_unload(self, async_polars_cursor): assert total_rows == 15 for chunk in chunks: assert isinstance(chunk, pl.DataFrame) + + def test_iter_chunks_data_consistency(self): + """Test that chunked and regular reading produce the same data.""" + with contextlib.closing(connect(schema_name=ENV.schema)) as conn: + # Regular reading (no chunksize) + regular_cursor = conn.cursor(AsyncPolarsCursor) + query_id, future = regular_cursor.execute("SELECT * FROM many_rows LIMIT 100") + assert query_id is not None + regular_df = future.result().as_polars() + + # Chunked reading + chunked_cursor = conn.cursor(AsyncPolarsCursor, chunksize=25) + query_id, future = chunked_cursor.execute("SELECT * FROM many_rows LIMIT 100") + assert query_id is not None + result_set = future.result() + chunked_dfs = list(result_set.iter_chunks()) + + # Combine chunks + combined_df = pl.concat(chunked_dfs) + + # Should have the same data (sort for comparison) + assert regular_df.sort("a").equals(combined_df.sort("a")) + + # Should have multiple chunks + assert len(chunked_dfs) > 1 + + def test_iter_chunks_chunk_sizes(self): + """Test that chunks have correct sizes.""" + with contextlib.closing(connect(schema_name=ENV.schema)) as conn: + cursor = conn.cursor(AsyncPolarsCursor, chunksize=10) + query_id, future = cursor.execute("SELECT * FROM many_rows LIMIT 50") + assert query_id is not None + result_set = future.result() + + chunk_sizes = [] + total_rows = 0 + + for chunk in result_set.iter_chunks(): + chunk_size = chunk.height + chunk_sizes.append(chunk_size) + total_rows += chunk_size + + # Each chunk should not exceed chunksize + assert chunk_size <= 10 + + # Should have processed all 50 rows + assert total_rows == 50 + + # Should have multiple chunks + assert len(chunk_sizes) > 1 diff --git a/tests/pyathena/polars/test_cursor.py b/tests/pyathena/polars/test_cursor.py index b214c415..10d960de 100644 --- a/tests/pyathena/polars/test_cursor.py +++ b/tests/pyathena/polars/test_cursor.py @@ -495,3 +495,48 @@ def test_iter_chunks_unload(self, polars_cursor): assert total_rows == 15 for chunk in chunks: assert isinstance(chunk, pl.DataFrame) + + def test_iter_chunks_data_consistency(self): + """Test that chunked and regular reading produce the same data.""" + with contextlib.closing(connect(schema_name=ENV.schema)) as conn: + # Regular reading (no chunksize) + regular_cursor = conn.cursor(PolarsCursor) + regular_cursor.execute("SELECT * FROM many_rows LIMIT 100") + regular_df = regular_cursor.as_polars() + + # Chunked reading + chunked_cursor = conn.cursor(PolarsCursor, chunksize=25) + chunked_cursor.execute("SELECT * FROM many_rows LIMIT 100") + chunked_dfs = list(chunked_cursor.iter_chunks()) + + # Combine chunks + combined_df = pl.concat(chunked_dfs) + + # Should have the same data (sort for comparison) + assert regular_df.sort("a").equals(combined_df.sort("a")) + + # Should have multiple chunks + assert len(chunked_dfs) > 1 + + def test_iter_chunks_chunk_sizes(self): + """Test that chunks have correct sizes.""" + with contextlib.closing(connect(schema_name=ENV.schema)) as conn: + cursor = conn.cursor(PolarsCursor, chunksize=10) + cursor.execute("SELECT * FROM many_rows LIMIT 50") + + chunk_sizes = [] + total_rows = 0 + + for chunk in cursor.iter_chunks(): + chunk_size = chunk.height + chunk_sizes.append(chunk_size) + total_rows += chunk_size + + # Each chunk should not exceed chunksize + assert chunk_size <= 10 + + # Should have processed all 50 rows + assert total_rows == 50 + + # Should have multiple chunks + assert len(chunk_sizes) > 1 From 9c5721a38735145ac5dc8a7cd855e714d31680a3 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sat, 3 Jan 2026 18:42:51 +0900 Subject: [PATCH 3/9] Refactor _read_csv to use _get_csv_params helper method MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Consolidate CSV parameter extraction logic by reusing the _get_csv_params() method that was added for iter_chunks. This reduces code duplication between _read_csv and _iter_csv_chunks. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pyathena/polars/result_set.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/pyathena/polars/result_set.py b/pyathena/polars/result_set.py index 55e5b3c0..ff9c81a3 100644 --- a/pyathena/polars/result_set.py +++ b/pyathena/polars/result_set.py @@ -284,17 +284,7 @@ def _read_csv(self) -> "pl.DataFrame": if length == 0: return pl.DataFrame() - if self.output_location.endswith(".txt"): - separator = "\t" - has_header = False - description = self.description if self.description else [] - new_columns = [d[0] for d in description] - elif self.output_location.endswith(".csv"): - separator = "," - has_header = True - new_columns = None - else: - return pl.DataFrame() + separator, has_header, new_columns = self._get_csv_params() try: df = pl.read_csv( From 5592015c0cb754daaf2d011d32ebeb628a4fc305 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sun, 4 Jan 2026 13:30:49 +0900 Subject: [PATCH 4/9] Refactor Polars result set with improved chunk processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extract _is_csv_readable() helper for CSV validation - Extract _prepare_parquet_location() helper for Parquet setup - Refactor _read_csv to use _get_csv_params() helper - Skip eager loading in __init__ when chunksize is set (avoid double reads) - Allow iter_chunks() without chunksize (yields entire DataFrame as single chunk) - Update docstrings to reflect new behavior This provides a consistent interface matching PandasCursor behavior where iter_chunks() works with or without chunksize configuration. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pyathena/polars/cursor.py | 20 +++- pyathena/polars/result_set.py | 116 +++++++++++++-------- tests/pyathena/polars/test_async_cursor.py | 10 +- tests/pyathena/polars/test_cursor.py | 10 +- 4 files changed, 99 insertions(+), 57 deletions(-) diff --git a/pyathena/polars/cursor.py b/pyathena/polars/cursor.py index ff793c67..ca970d3f 100644 --- a/pyathena/polars/cursor.py +++ b/pyathena/polars/cursor.py @@ -426,21 +426,31 @@ def as_arrow(self) -> "Table": def iter_chunks(self) -> Iterator["pl.DataFrame"]: """Iterate over result chunks as Polars DataFrames. - This method provides an iterator interface for processing large result sets - in chunks, preventing memory exhaustion when working with datasets that are - too large to fit in memory as a single DataFrame. + This method provides an iterator interface for processing result sets. + When chunksize is specified, it yields DataFrames in chunks using lazy + evaluation for memory-efficient processing. When chunksize is not specified, + it yields the entire result as a single DataFrame, providing a consistent + interface regardless of chunking configuration. Yields: - Polars DataFrame for each chunk of rows. + Polars DataFrame for each chunk of rows, or the entire DataFrame + if chunksize was not specified. Raises: - ProgrammingError: If no result set is available or chunksize was not set. + ProgrammingError: If no result set is available. Example: + >>> # With chunking for large datasets >>> cursor = connection.cursor(PolarsCursor, chunksize=50000) >>> cursor.execute("SELECT * FROM large_table") >>> for chunk in cursor.iter_chunks(): ... process_chunk(chunk) # Each chunk is a Polars DataFrame + >>> + >>> # Without chunking - yields entire result as single chunk + >>> cursor = connection.cursor(PolarsCursor) + >>> cursor.execute("SELECT * FROM small_table") + >>> for df in cursor.iter_chunks(): + ... process(df) # Single DataFrame with all data """ if not self.has_result_set: raise ProgrammingError("No result set.") diff --git a/pyathena/polars/result_set.py b/pyathena/polars/result_set.py index ff9c81a3..484e110e 100644 --- a/pyathena/polars/result_set.py +++ b/pyathena/polars/result_set.py @@ -117,7 +117,14 @@ def __init__( self._chunksize = chunksize self._kwargs = kwargs if self.state == AthenaQueryExecution.STATE_SUCCEEDED and self.output_location: - self._df = self._as_polars() + # Skip eager loading when chunksize is set to avoid double reads + # User should use iter_chunks() for memory-efficient processing + if self._chunksize is None: + self._df = self._as_polars() + else: + import polars as pl + + self._df = pl.DataFrame() else: import polars as pl @@ -257,33 +264,60 @@ def fetchall( break return rows - def _read_csv(self) -> "pl.DataFrame": - """Read query results from CSV file in S3. + def _is_csv_readable(self) -> bool: + """Check if CSV output is available and can be read. Returns: - Polars DataFrame containing the CSV data. + True if CSV data is available to read, False otherwise. Raises: ProgrammingError: If output location is not set. - OperationalError: If reading the CSV file fails. """ - import polars as pl - if not self.output_location: raise ProgrammingError("OutputLocation is none or empty.") if not self.output_location.endswith((".csv", ".txt")): - return pl.DataFrame() + return False if self.substatement_type and self.substatement_type.upper() in ( "UPDATE", "DELETE", "MERGE", "VACUUM_TABLE", ): - return pl.DataFrame() + return False length = self._get_content_length() - if length == 0: + return length != 0 + + def _prepare_parquet_location(self) -> bool: + """Prepare unload location for Parquet reading. + + Returns: + True if Parquet data is available to read, False otherwise. + """ + manifests = self._read_data_manifest() + if not manifests: + return False + if not self._unload_location: + self._unload_location = "/".join(manifests[0].split("/")[:-1]) + "/" + return True + + def _read_csv(self) -> "pl.DataFrame": + """Read query results from CSV file in S3. + + Returns: + Polars DataFrame containing the CSV data. + + Raises: + ProgrammingError: If output location is not set. + OperationalError: If reading the CSV file fails. + """ + import polars as pl + + if not self._is_csv_readable(): return pl.DataFrame() + # After validation, output_location is guaranteed to be set + assert self.output_location is not None + separator, has_header, new_columns = self._get_csv_params() try: @@ -313,11 +347,11 @@ def _read_parquet(self) -> "pl.DataFrame": """ import polars as pl - manifests = self._read_data_manifest() - if not manifests: + if not self._prepare_parquet_location(): return pl.DataFrame() - if not self._unload_location: - self._unload_location = "/".join(manifests[0].split("/")[:-1]) + "/" + + # After preparation, unload_location is guaranteed to be set + assert self._unload_location is not None try: return pl.read_parquet( @@ -438,21 +472,12 @@ def _iter_csv_chunks(self) -> Iterator["pl.DataFrame"]: """ import polars as pl - if not self.output_location: - raise ProgrammingError("OutputLocation is none or empty.") - if not self.output_location.endswith((".csv", ".txt")): - return - if self.substatement_type and self.substatement_type.upper() in ( - "UPDATE", - "DELETE", - "MERGE", - "VACUUM_TABLE", - ): - return - length = self._get_content_length() - if length == 0: + if not self._is_csv_readable(): return + # After validation, output_location is guaranteed to be set + assert self.output_location is not None + separator, has_header, new_columns = self._get_csv_params() try: @@ -485,11 +510,11 @@ def _iter_parquet_chunks(self) -> Iterator["pl.DataFrame"]: """ import polars as pl - manifests = self._read_data_manifest() - if not manifests: + if not self._prepare_parquet_location(): return - if not self._unload_location: - self._unload_location = "/".join(manifests[0].split("/")[:-1]) + "/" + + # After preparation, unload_location is guaranteed to be set + assert self._unload_location is not None try: lazy_df = pl.scan_parquet( @@ -506,29 +531,32 @@ def _iter_parquet_chunks(self) -> Iterator["pl.DataFrame"]: def iter_chunks(self) -> Iterator["pl.DataFrame"]: """Iterate over result chunks as Polars DataFrames. - This method provides an iterator interface for processing large result sets - in chunks, preventing memory exhaustion when working with datasets that are - too large to fit in memory as a single DataFrame. + This method provides an iterator interface for processing large result sets. + When chunksize is specified, it yields DataFrames in chunks using lazy + evaluation for memory-efficient processing. When chunksize is not specified, + it yields the entire result as a single DataFrame. Yields: - Polars DataFrame for each chunk of rows. - - Raises: - ProgrammingError: If chunksize was not set during cursor initialization. + Polars DataFrame for each chunk of rows, or the entire DataFrame + if chunksize was not specified. Example: + >>> # With chunking for large datasets >>> cursor = connection.cursor(PolarsCursor, chunksize=50000) >>> cursor.execute("SELECT * FROM large_table") >>> for chunk in cursor.iter_chunks(): ... process_chunk(chunk) # Each chunk is a Polars DataFrame + >>> + >>> # Without chunking - yields entire result as single chunk + >>> cursor = connection.cursor(PolarsCursor) + >>> cursor.execute("SELECT * FROM small_table") + >>> for df in cursor.iter_chunks(): + ... process(df) # Single DataFrame with all data """ if self._chunksize is None: - raise ProgrammingError( - "chunksize must be set to use iter_chunks(). " - "Example: cursor = connection.cursor(PolarsCursor, chunksize=50000)" - ) - - if self.is_unload: + # No chunking - yield entire DataFrame as single chunk + yield self._df + elif self.is_unload: yield from self._iter_parquet_chunks() else: yield from self._iter_csv_chunks() diff --git a/tests/pyathena/polars/test_async_cursor.py b/tests/pyathena/polars/test_async_cursor.py index 4dfabb84..69ca86bd 100644 --- a/tests/pyathena/polars/test_async_cursor.py +++ b/tests/pyathena/polars/test_async_cursor.py @@ -340,13 +340,15 @@ def test_iter_chunks(self): assert isinstance(chunk, pl.DataFrame) def test_iter_chunks_without_chunksize(self, async_polars_cursor): - """Test that iter_chunks raises ProgrammingError when chunksize is not set.""" + """Test that iter_chunks works without chunksize, yielding entire DataFrame.""" query_id, future = async_polars_cursor.execute("SELECT * FROM one_row") assert query_id is not None result_set = future.result() - with pytest.raises(ProgrammingError) as exc_info: - list(result_set.iter_chunks()) - assert "chunksize must be set" in str(exc_info.value) + chunks = list(result_set.iter_chunks()) + # Without chunksize, yields entire DataFrame as single chunk + assert len(chunks) == 1 + assert isinstance(chunks[0], pl.DataFrame) + assert chunks[0].height == 1 def test_iter_chunks_many_rows(self): """Test chunked iteration with many rows.""" diff --git a/tests/pyathena/polars/test_cursor.py b/tests/pyathena/polars/test_cursor.py index 10d960de..acae2ffb 100644 --- a/tests/pyathena/polars/test_cursor.py +++ b/tests/pyathena/polars/test_cursor.py @@ -461,11 +461,13 @@ def test_iter_chunks(self): assert isinstance(chunk, pl.DataFrame) def test_iter_chunks_without_chunksize(self, polars_cursor): - """Test that iter_chunks raises ProgrammingError when chunksize is not set.""" + """Test that iter_chunks works without chunksize, yielding entire DataFrame.""" polars_cursor.execute("SELECT * FROM one_row") - with pytest.raises(ProgrammingError) as exc_info: - list(polars_cursor.iter_chunks()) - assert "chunksize must be set" in str(exc_info.value) + chunks = list(polars_cursor.iter_chunks()) + # Without chunksize, yields entire DataFrame as single chunk + assert len(chunks) == 1 + assert isinstance(chunks[0], pl.DataFrame) + assert chunks[0].height == 1 def test_iter_chunks_many_rows(self): """Test chunked iteration with many rows.""" From 6f7addc84670015ade4e38c4cbfb28bedd0b0328 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sun, 4 Jan 2026 13:59:52 +0900 Subject: [PATCH 5/9] Refactor Polars result set to use DataFrameIterator wrapper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce DataFrameIterator class following PandasCursor's pattern to provide a unified interface for both chunked and non-chunked DataFrame iteration. This eliminates the need for flag-based lazy loading and provides a more transparent API. Key changes: - Add DataFrameIterator class with iterrows() and as_polars() methods - Replace _df and _row_index with _df_iter iterator - Update fetchone() to use iterator-based row access - Update as_polars() and as_arrow() to use the wrapper - Update iter_chunks() to return the iterator directly - Remove _ensure_data_loaded() method 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pyathena/polars/result_set.py | 217 +++++++++++++++++++++++++++------- 1 file changed, 171 insertions(+), 46 deletions(-) diff --git a/pyathena/polars/result_set.py b/pyathena/polars/result_set.py index 484e110e..99e04486 100644 --- a/pyathena/polars/result_set.py +++ b/pyathena/polars/result_set.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- from __future__ import annotations +import abc import logging from multiprocessing import cpu_count from typing import ( @@ -32,6 +33,126 @@ _logger = logging.getLogger(__name__) +class DataFrameIterator(abc.Iterator): # type: ignore + """Iterator for chunked DataFrame results from Athena queries. + + This class wraps either a Polars DataFrame iterator (for chunked reading) or + a single DataFrame, providing a unified iterator interface. It applies + optional type conversion to each DataFrame chunk as it's yielded. + + The iterator is used by AthenaPolarsResultSet to provide chunked access + to large query results, enabling memory-efficient processing of datasets + that would be too large to load entirely into memory. + + Example: + >>> # Iterate over DataFrame chunks + >>> for df_chunk in iterator: + ... process(df_chunk) + >>> + >>> # Iterate over individual rows + >>> for idx, row in iterator.iterrows(): + ... print(row) + + Note: + This class is primarily for internal use by AthenaPolarsResultSet. + Most users should access results through PolarsCursor methods. + """ + + def __init__( + self, + reader: Union[Iterator["pl.DataFrame"], "pl.DataFrame"], + converters: Dict[str, Callable[[Optional[str]], Optional[Any]]], + column_names: List[str], + ) -> None: + """Initialize the iterator. + + Args: + reader: Either a DataFrame iterator (for chunked) or a single DataFrame. + converters: Dictionary mapping column names to converter functions. + column_names: List of column names in order. + """ + import polars as pl + + if isinstance(reader, pl.DataFrame): + self._reader: Iterator["pl.DataFrame"] = iter([reader]) + else: + self._reader = reader + self._converters = converters + self._column_names = column_names + self._current_df: Optional["pl.DataFrame"] = None + self._row_index = 0 + self._closed = False + + def __next__(self) -> "pl.DataFrame": + """Get the next DataFrame chunk. + + Returns: + The next Polars DataFrame chunk. + + Raises: + StopIteration: When no more chunks are available. + """ + if self._closed: + raise StopIteration + try: + df = next(self._reader) + self._current_df = df + self._row_index = 0 + return df + except StopIteration: + self.close() + raise + + def __iter__(self) -> "DataFrameIterator": + """Return self as iterator.""" + return self + + def __enter__(self) -> "DataFrameIterator": + """Context manager entry.""" + return self + + def __exit__(self, exc_type, exc_value, traceback) -> None: + """Context manager exit.""" + self.close() + + def close(self) -> None: + """Close the iterator and release resources.""" + self._closed = True + self._current_df = None + + def iterrows(self) -> Iterator[Tuple[int, Dict[str, Any]]]: + """Iterate over rows as (index, row_dict) tuples. + + Yields: + Tuple of (row_index, row_dict) for each row across all chunks. + """ + row_num = 0 + for df in self: + for row_dict in df.iter_rows(named=True): + # Apply converters + processed_row = { + col: self._converters.get(col, lambda x: x)(row_dict.get(col)) + for col in self._column_names + } + yield (row_num, processed_row) + row_num += 1 + + def as_polars(self) -> "pl.DataFrame": + """Collect all chunks into a single DataFrame. + + Returns: + Single Polars DataFrame containing all data. + """ + import polars as pl + + dfs = list(self) + if not dfs: + return pl.DataFrame() + if len(dfs) == 1: + return dfs[0] + return pl.concat(dfs) + + class AthenaPolarsResultSet(AthenaResultSet): """Result set that provides Polars DataFrame results with optional Arrow interoperability. @@ -116,20 +237,17 @@ def __init__( self._max_workers = max_workers self._chunksize = chunksize self._kwargs = kwargs - if self.state == AthenaQueryExecution.STATE_SUCCEEDED and self.output_location: - # Skip eager loading when chunksize is set to avoid double reads - # User should use iter_chunks() for memory-efficient processing - if self._chunksize is None: - self._df = self._as_polars() - else: - import polars as pl - self._df = pl.DataFrame() + # Build DataFrame iterator (handles both chunked and non-chunked cases) + if self.state == AthenaQueryExecution.STATE_SUCCEEDED and self.output_location: + self._df_iter = self._create_dataframe_iterator() else: import polars as pl - self._df = pl.DataFrame() - self._row_index = 0 + self._df_iter = DataFrameIterator( + pl.DataFrame(), self.converters, self._get_column_names() + ) + self._iterrows = self._df_iter.iterrows() @property def _csv_storage_options(self) -> Dict[str, Any]: @@ -190,23 +308,31 @@ def converters(self) -> Dict[str, Callable[[Optional[str]], Optional[Any]]]: description = self.description if self.description else [] return {d[0]: self._converter.get(d[1]) for d in description} - def _fetch(self) -> None: - """Fetch rows from the DataFrame into the row buffer.""" - if self._row_index >= self._df.height: - return - - end_index = min(self._row_index + self._arraysize, self._df.height) - chunk = self._df.slice(self._row_index, end_index - self._row_index) - self._row_index = end_index + def _get_column_names(self) -> List[str]: + """Get column names from description. - # Convert to rows and apply converters + Returns: + List of column names. + """ description = self.description if self.description else [] - column_names = [d[0] for d in description] - for row_dict in chunk.iter_rows(named=True): - processed_row = tuple( - self.converters.get(col, lambda x: x)(row_dict.get(col)) for col in column_names + return [d[0] for d in description] + + def _create_dataframe_iterator(self) -> DataFrameIterator: + """Create a DataFrame iterator for the result set. + + Returns: + DataFrameIterator that handles both chunked and non-chunked cases. + """ + if self._chunksize is not None: + # Chunked mode: create lazy iterator + reader: Union[Iterator["pl.DataFrame"], "pl.DataFrame"] = ( + self._iter_parquet_chunks() if self.is_unload else self._iter_csv_chunks() ) - self._rows.append(processed_row) + else: + # Non-chunked mode: load entire DataFrame + reader = self._as_polars() + + return DataFrameIterator(reader, self.converters, self._get_column_names()) def fetchone( self, @@ -216,14 +342,14 @@ def fetchone( Returns: A single row as a tuple, or None if no more rows are available. """ - if not self._rows: - self._fetch() - if not self._rows: + try: + row = next(self._iterrows) + except StopIteration: return None - if self._rownumber is None: - self._rownumber = 0 - self._rownumber += 1 - return self._rows.popleft() + else: + self._rownumber = row[0] + 1 + column_names = self._get_column_names() + return tuple([row[1][col] for col in column_names]) def fetchmany( self, size: Optional[int] = None @@ -406,6 +532,11 @@ def as_polars(self) -> "pl.DataFrame": Returns the query results as a Polars DataFrame. This is the primary method for accessing results with PolarsCursor. + Note: + When chunksize is set, calling this method will collect all chunks + into a single DataFrame, loading all data into memory. Use + iter_chunks() for memory-efficient processing of large datasets. + Returns: Polars DataFrame containing all query results. @@ -416,7 +547,7 @@ def as_polars(self) -> "pl.DataFrame": >>> print(f"DataFrame has {df.height} rows") >>> filtered = df.filter(pl.col("value") > 100) """ - return self._df + return self._df_iter.as_polars() def as_arrow(self) -> "Table": """Return query results as an Apache Arrow Table. @@ -437,7 +568,7 @@ def as_arrow(self) -> "Table": >>> # Use with other Arrow-compatible libraries """ try: - return self._df.to_arrow() + return self._df_iter.as_polars().to_arrow() except ImportError as e: raise ImportError( "pyarrow is required for as_arrow(). Install it with: pip install pyarrow" @@ -528,7 +659,7 @@ def _iter_parquet_chunks(self) -> Iterator["pl.DataFrame"]: _logger.exception(f"Failed to read {self._unload_location}.") raise OperationalError(*e.args) from e - def iter_chunks(self) -> Iterator["pl.DataFrame"]: + def iter_chunks(self) -> DataFrameIterator: """Iterate over result chunks as Polars DataFrames. This method provides an iterator interface for processing large result sets. @@ -536,9 +667,9 @@ def iter_chunks(self) -> Iterator["pl.DataFrame"]: evaluation for memory-efficient processing. When chunksize is not specified, it yields the entire result as a single DataFrame. - Yields: - Polars DataFrame for each chunk of rows, or the entire DataFrame - if chunksize was not specified. + Returns: + DataFrameIterator that yields Polars DataFrames for each chunk + of rows, or the entire DataFrame if chunksize was not specified. Example: >>> # With chunking for large datasets @@ -553,18 +684,12 @@ def iter_chunks(self) -> Iterator["pl.DataFrame"]: >>> for df in cursor.iter_chunks(): ... process(df) # Single DataFrame with all data """ - if self._chunksize is None: - # No chunking - yield entire DataFrame as single chunk - yield self._df - elif self.is_unload: - yield from self._iter_parquet_chunks() - else: - yield from self._iter_csv_chunks() + return self._df_iter def close(self) -> None: """Close the result set and release resources.""" import polars as pl super().close() - self._df = pl.DataFrame() - self._row_index = 0 + self._df_iter = DataFrameIterator(pl.DataFrame(), {}, []) + self._iterrows = iter([]) From 8ce8f47300a1df72973db6d972ecf69c4b26bc04 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sun, 4 Jan 2026 14:08:34 +0900 Subject: [PATCH 6/9] Refactor DataFrameIterator: remove unused variables and consolidate code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove unused _current_df and _row_index instance variables - Simplify __next__ method to return directly without intermediate variables - Consolidate column name extraction in _get_csv_params to use _get_column_names() - Update class docstring to document chunked iteration feature 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pyathena/polars/result_set.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pyathena/polars/result_set.py b/pyathena/polars/result_set.py index 99e04486..ed10c83d 100644 --- a/pyathena/polars/result_set.py +++ b/pyathena/polars/result_set.py @@ -79,8 +79,6 @@ def __init__( self._reader = reader self._converters = converters self._column_names = column_names - self._current_df: Optional["pl.DataFrame"] = None - self._row_index = 0 self._closed = False def __next__(self) -> "pl.DataFrame": @@ -95,10 +93,7 @@ def __next__(self) -> "pl.DataFrame": if self._closed: raise StopIteration try: - df = next(self._reader) - self._current_df = df - self._row_index = 0 - return df + return next(self._reader) except StopIteration: self.close() raise @@ -118,7 +113,6 @@ def __exit__(self, exc_type, exc_value, traceback) -> None: def close(self) -> None: """Close the iterator and release resources.""" self._closed = True - self._current_df = None def iterrows(self) -> Iterator[Tuple[int, Dict[str, Any]]]: """Iterate over rows as (index, row_dict) tuples. @@ -166,6 +160,7 @@ class AthenaPolarsResultSet(AthenaResultSet): - Efficient columnar data processing with Polars - Optional Arrow interoperability when PyArrow is available - Support for both CSV and Parquet result formats + - Chunked iteration for memory-efficient processing of large datasets - Optimized memory usage through columnar format Example: @@ -182,6 +177,12 @@ class AthenaPolarsResultSet(AthenaResultSet): >>> >>> # Optional: Get Arrow Table (requires pyarrow) >>> table = cursor.as_arrow() + >>> + >>> # Memory-efficient chunked iteration + >>> cursor = connection.cursor(PolarsCursor, chunksize=50000) + >>> cursor.execute("SELECT * FROM huge_table") + >>> for chunk in cursor.iter_chunks(): + ... process_chunk(chunk) Note: This class is used internally by PolarsCursor and typically not @@ -583,8 +584,7 @@ def _get_csv_params(self) -> Tuple[str, bool, Optional[List[str]]]: if self.output_location and self.output_location.endswith(".txt"): separator = "\t" has_header = False - description = self.description if self.description else [] - new_columns: Optional[List[str]] = [d[0] for d in description] + new_columns: Optional[List[str]] = self._get_column_names() else: separator = "," has_header = True From 317fd3b75e3105393e9f799d7b2cc417d1d22eb6 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sun, 4 Jan 2026 14:16:37 +0900 Subject: [PATCH 7/9] Clarify that chunksize enables lazy loading for all data access methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Update documentation to explain that standard fetch methods (fetchone, fetchmany) also benefit from chunked loading when chunksize is set - Update docstrings in PolarsCursor, AsyncPolarsCursor, and ResultSet to reflect this behavior - Add examples showing both iteration patterns in docs/polars.rst 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- docs/polars.rst | 44 +++++++++++++++++++++++++++++++-- pyathena/polars/async_cursor.py | 3 ++- pyathena/polars/cursor.py | 3 ++- pyathena/polars/result_set.py | 3 ++- 4 files changed, 48 insertions(+), 5 deletions(-) diff --git a/docs/polars.rst b/docs/polars.rst index 1b8273b4..fb0d59b7 100644 --- a/docs/polars.rst +++ b/docs/polars.rst @@ -277,7 +277,28 @@ argument of the connect method or as an argument to the cursor method. region_name="us-west-2", cursor_class=PolarsCursor).cursor(chunksize=50_000) -Use the ``iter_chunks()`` method to iterate over results in chunks: +When the chunksize option is enabled, data is loaded lazily in chunks. This applies +to all data access methods: + +**Standard DB-API fetch methods** - ``fetchone()`` and ``fetchmany()`` load data +chunk by chunk as needed, keeping memory usage bounded: + +.. code:: python + + from pyathena import connect + from pyathena.polars.cursor import PolarsCursor + + cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2", + cursor_class=PolarsCursor).cursor(chunksize=50_000) + + cursor.execute("SELECT * FROM large_table") + # Data is loaded in 50,000 row chunks as you iterate + for row in cursor: + process_row(row) + +**iter_chunks() method** - Use this when you want to process data as Polars DataFrames +in chunks, which is more efficient for batch processing: .. code:: python @@ -482,6 +503,8 @@ As with AsyncPolarsCursor, the unload option is also available. cursor_class=AsyncPolarsCursor).cursor(unload=True) As with PolarsCursor, the chunksize option is also available for memory-efficient processing. +When chunksize is specified, data is loaded lazily in chunks for both standard fetch methods +and ``iter_chunks()``. .. code:: python @@ -494,8 +517,25 @@ As with PolarsCursor, the chunksize option is also available for memory-efficien query_id, future = cursor.execute("SELECT * FROM large_table") result_set = future.result() + + # Standard iteration - data loaded in chunks + for row in result_set: + process_row(row) + +.. code:: python + + from pyathena import connect + from pyathena.polars.async_cursor import AsyncPolarsCursor + + cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2", + cursor_class=AsyncPolarsCursor).cursor(chunksize=50_000) + + query_id, future = cursor.execute("SELECT * FROM large_table") + result_set = future.result() + + # Process as DataFrame chunks for chunk in result_set.iter_chunks(): - # Process each chunk process_chunk(chunk) .. _`polars.DataFrame object`: https://docs.pola.rs/api/python/stable/reference/dataframe/index.html diff --git a/pyathena/polars/async_cursor.py b/pyathena/polars/async_cursor.py index 5bbc458a..d04fe08f 100644 --- a/pyathena/polars/async_cursor.py +++ b/pyathena/polars/async_cursor.py @@ -95,7 +95,8 @@ def __init__( block_size: S3 read block size. cache_type: S3 caching strategy. chunksize: Number of rows per chunk for memory-efficient processing. - If specified, enables chunked iteration via iter_chunks(). + If specified, data is loaded lazily in chunks for all data + access methods including fetchone(), fetchmany(), and iter_chunks(). **kwargs: Additional connection parameters. Example: diff --git a/pyathena/polars/cursor.py b/pyathena/polars/cursor.py index ca970d3f..d25bfc85 100644 --- a/pyathena/polars/cursor.py +++ b/pyathena/polars/cursor.py @@ -107,7 +107,8 @@ def __init__( cache_type: S3 caching strategy. max_workers: Maximum worker threads for parallel S3 operations. chunksize: Number of rows per chunk for memory-efficient processing. - If specified, enables chunked iteration via iter_chunks(). + If specified, data is loaded lazily in chunks for all data + access methods including fetchone(), fetchmany(), and iter_chunks(). **kwargs: Additional connection parameters. Example: diff --git a/pyathena/polars/result_set.py b/pyathena/polars/result_set.py index ed10c83d..05c4f792 100644 --- a/pyathena/polars/result_set.py +++ b/pyathena/polars/result_set.py @@ -219,7 +219,8 @@ def __init__( cache_type: Cache type for S3 file system. max_workers: Maximum number of worker threads. chunksize: Number of rows per chunk for memory-efficient processing. - If specified, enables chunked iteration via iter_chunks(). + If specified, data is loaded lazily in chunks for all data + access methods including fetchone(), fetchmany(), and iter_chunks(). **kwargs: Additional arguments passed to Polars read functions. """ super().__init__( From cef042adb0d94047a510ff3c74e066043dda7142 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sun, 4 Jan 2026 14:24:50 +0900 Subject: [PATCH 8/9] Add chunksize tests for PolarsCursor fetch methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add tests for fetchone, fetchmany, fetchall, and iterator with chunksize - Add tests for fetch methods with UNLOAD mode and chunksize - Remove redundant iter_chunks tests from AsyncPolarsCursor since both cursor types share the same AthenaPolarsResultSet implementation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- tests/pyathena/polars/test_async_cursor.py | 108 --------------------- tests/pyathena/polars/test_cursor.py | 82 ++++++++++++++++ 2 files changed, 82 insertions(+), 108 deletions(-) diff --git a/tests/pyathena/polars/test_async_cursor.py b/tests/pyathena/polars/test_async_cursor.py index 69ca86bd..fd9d61ba 100644 --- a/tests/pyathena/polars/test_async_cursor.py +++ b/tests/pyathena/polars/test_async_cursor.py @@ -324,111 +324,3 @@ def test_empty_result_unload(self, async_polars_cursor): df = future.result().as_polars() assert df.height == 0 assert df.width == 0 - - def test_iter_chunks(self): - """Test chunked iteration over query results.""" - with contextlib.closing(connect(schema_name=ENV.schema)) as conn: - cursor = conn.cursor(AsyncPolarsCursor, chunksize=5) - query_id, future = cursor.execute("SELECT * FROM many_rows LIMIT 15") - assert query_id is not None - result_set = future.result() - chunks = list(result_set.iter_chunks()) - assert len(chunks) > 0 - total_rows = sum(chunk.height for chunk in chunks) - assert total_rows == 15 - for chunk in chunks: - assert isinstance(chunk, pl.DataFrame) - - def test_iter_chunks_without_chunksize(self, async_polars_cursor): - """Test that iter_chunks works without chunksize, yielding entire DataFrame.""" - query_id, future = async_polars_cursor.execute("SELECT * FROM one_row") - assert query_id is not None - result_set = future.result() - chunks = list(result_set.iter_chunks()) - # Without chunksize, yields entire DataFrame as single chunk - assert len(chunks) == 1 - assert isinstance(chunks[0], pl.DataFrame) - assert chunks[0].height == 1 - - def test_iter_chunks_many_rows(self): - """Test chunked iteration with many rows.""" - with contextlib.closing(connect(schema_name=ENV.schema)) as conn: - cursor = conn.cursor(AsyncPolarsCursor, chunksize=1000) - query_id, future = cursor.execute("SELECT * FROM many_rows") - assert query_id is not None - result_set = future.result() - chunks = list(result_set.iter_chunks()) - total_rows = sum(chunk.height for chunk in chunks) - assert total_rows == 10000 - assert len(chunks) >= 10 # At least 10 chunks with chunksize=1000 - - @pytest.mark.parametrize( - "async_polars_cursor", - [ - { - "cursor_kwargs": {"unload": True, "chunksize": 5}, - }, - ], - indirect=["async_polars_cursor"], - ) - def test_iter_chunks_unload(self, async_polars_cursor): - """Test chunked iteration with UNLOAD (Parquet).""" - query_id, future = async_polars_cursor.execute("SELECT * FROM many_rows LIMIT 15") - assert query_id is not None - result_set = future.result() - chunks = list(result_set.iter_chunks()) - assert len(chunks) > 0 - total_rows = sum(chunk.height for chunk in chunks) - assert total_rows == 15 - for chunk in chunks: - assert isinstance(chunk, pl.DataFrame) - - def test_iter_chunks_data_consistency(self): - """Test that chunked and regular reading produce the same data.""" - with contextlib.closing(connect(schema_name=ENV.schema)) as conn: - # Regular reading (no chunksize) - regular_cursor = conn.cursor(AsyncPolarsCursor) - query_id, future = regular_cursor.execute("SELECT * FROM many_rows LIMIT 100") - assert query_id is not None - regular_df = future.result().as_polars() - - # Chunked reading - chunked_cursor = conn.cursor(AsyncPolarsCursor, chunksize=25) - query_id, future = chunked_cursor.execute("SELECT * FROM many_rows LIMIT 100") - assert query_id is not None - result_set = future.result() - chunked_dfs = list(result_set.iter_chunks()) - - # Combine chunks - combined_df = pl.concat(chunked_dfs) - - # Should have the same data (sort for comparison) - assert regular_df.sort("a").equals(combined_df.sort("a")) - - # Should have multiple chunks - assert len(chunked_dfs) > 1 - - def test_iter_chunks_chunk_sizes(self): - """Test that chunks have correct sizes.""" - with contextlib.closing(connect(schema_name=ENV.schema)) as conn: - cursor = conn.cursor(AsyncPolarsCursor, chunksize=10) - query_id, future = cursor.execute("SELECT * FROM many_rows LIMIT 50") - assert query_id is not None - result_set = future.result() - - chunk_sizes = [] - total_rows = 0 - - for chunk in result_set.iter_chunks(): - chunk_size = chunk.height - chunk_sizes.append(chunk_size) - total_rows += chunk_size - - # Each chunk should not exceed chunksize - assert chunk_size <= 10 - - # Should have processed all 50 rows - assert total_rows == 50 - - # Should have multiple chunks - assert len(chunk_sizes) > 1 diff --git a/tests/pyathena/polars/test_cursor.py b/tests/pyathena/polars/test_cursor.py index acae2ffb..447b676b 100644 --- a/tests/pyathena/polars/test_cursor.py +++ b/tests/pyathena/polars/test_cursor.py @@ -542,3 +542,85 @@ def test_iter_chunks_chunk_sizes(self): # Should have multiple chunks assert len(chunk_sizes) > 1 + + def test_fetchone_with_chunksize(self): + """Test that fetchone works correctly with chunksize enabled.""" + with contextlib.closing(connect(schema_name=ENV.schema)) as conn: + cursor = conn.cursor(PolarsCursor, chunksize=5) + cursor.execute("SELECT * FROM many_rows LIMIT 15") + + rows = [] + while True: + row = cursor.fetchone() + if row is None: + break + rows.append(row) + + assert len(rows) == 15 + + def test_fetchmany_with_chunksize(self): + """Test that fetchmany works correctly with chunksize enabled.""" + with contextlib.closing(connect(schema_name=ENV.schema)) as conn: + cursor = conn.cursor(PolarsCursor, chunksize=5) + cursor.execute("SELECT * FROM many_rows LIMIT 15") + + batch1 = cursor.fetchmany(10) + batch2 = cursor.fetchmany(10) + + assert len(batch1) == 10 + assert len(batch2) == 5 + + def test_fetchall_with_chunksize(self): + """Test that fetchall works correctly with chunksize enabled.""" + with contextlib.closing(connect(schema_name=ENV.schema)) as conn: + cursor = conn.cursor(PolarsCursor, chunksize=5) + cursor.execute("SELECT * FROM many_rows LIMIT 15") + + rows = cursor.fetchall() + assert len(rows) == 15 + + def test_iterator_with_chunksize(self): + """Test that cursor iteration works correctly with chunksize enabled.""" + with contextlib.closing(connect(schema_name=ENV.schema)) as conn: + cursor = conn.cursor(PolarsCursor, chunksize=5) + cursor.execute("SELECT * FROM many_rows LIMIT 15") + + rows = list(cursor) + assert len(rows) == 15 + + @pytest.mark.parametrize( + "polars_cursor", + [ + { + "cursor_kwargs": {"unload": True, "chunksize": 5}, + }, + ], + indirect=["polars_cursor"], + ) + def test_fetchone_with_chunksize_unload(self, polars_cursor): + """Test that fetchone works correctly with chunksize and unload enabled.""" + polars_cursor.execute("SELECT * FROM many_rows LIMIT 15") + + rows = [] + while True: + row = polars_cursor.fetchone() + if row is None: + break + rows.append(row) + + assert len(rows) == 15 + + @pytest.mark.parametrize( + "polars_cursor", + [ + { + "cursor_kwargs": {"unload": True, "chunksize": 5}, + }, + ], + indirect=["polars_cursor"], + ) + def test_iterator_with_chunksize_unload(self, polars_cursor): + """Test that cursor iteration works with chunksize and unload enabled.""" + polars_cursor.execute("SELECT * FROM many_rows LIMIT 15") + rows = list(polars_cursor) + assert len(rows) == 15 From a5a0c79a327c23b3862190c131d416020136bdd1 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sun, 4 Jan 2026 14:27:37 +0900 Subject: [PATCH 9/9] Fix abc.Iterator import error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change `import abc` to `from collections import abc` to fix AttributeError where `abc.Iterator` was not found (standard library abc module doesn't have Iterator, it's in collections.abc). Also add cast to fix mypy no-any-return errors. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pyathena/polars/result_set.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pyathena/polars/result_set.py b/pyathena/polars/result_set.py index 05c4f792..7cd27417 100644 --- a/pyathena/polars/result_set.py +++ b/pyathena/polars/result_set.py @@ -1,8 +1,8 @@ # -*- coding: utf-8 -*- from __future__ import annotations -import abc import logging +from collections import abc from multiprocessing import cpu_count from typing import ( TYPE_CHECKING, @@ -14,6 +14,7 @@ Optional, Tuple, Union, + cast, ) from pyathena import OperationalError @@ -139,7 +140,7 @@ def as_polars(self) -> "pl.DataFrame": """ import polars as pl - dfs = list(self) + dfs = cast(List["pl.DataFrame"], list(self)) if not dfs: return pl.DataFrame() if len(dfs) == 1: