From 5415eb31fc92cb197292b8cc805b094c5a8dcfbf Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sun, 4 Jan 2026 16:00:02 +0900 Subject: [PATCH 1/8] Align PandasCursor chunk processing with PolarsCursor implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add as_pandas() method to DataFrameIterator for collecting all chunks into a single DataFrame (mirrors PolarsCursor's as_polars() method) - Add iter_chunks() method to AthenaPandasResultSet for explicit iterator access - Refactor PandasCursor.iter_chunks() to delegate to ResultSet while preserving gc.collect() optimization for memory management - Add comprehensive docstrings with Google-style documentation - Update docs/pandas.rst with DataFrameIterator.as_pandas() examples This aligns the Pandas and Polars cursor implementations for consistency, making it easier for users to switch between them. Closes #638 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- docs/pandas.rst | 15 ++++++- pyathena/pandas/cursor.py | 19 +++----- pyathena/pandas/result_set.py | 85 ++++++++++++++++++++++++++++++++--- 3 files changed, 100 insertions(+), 19 deletions(-) diff --git a/docs/pandas.rst b/docs/pandas.rst index a8ce3274..b9a910ff 100644 --- a/docs/pandas.rst +++ b/docs/pandas.rst @@ -418,7 +418,20 @@ PandasCursor provides an ``iter_chunks()`` method for convenient chunked process # Memory can be freed after each chunk del chunk -You can also concatenate them into a single `pandas.DataFrame object`_ using `pandas.concat`_. +The ``DataFrameIterator`` also has an ``as_pandas()`` method that collects all chunks into a single DataFrame: + +.. code:: python + + from pyathena import connect + from pyathena.pandas.cursor import PandasCursor + + cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2", + cursor_class=PandasCursor).cursor() + df_iter = cursor.execute("SELECT * FROM many_rows", chunksize=1_000_000).as_pandas() + df = df_iter.as_pandas() # Collect all chunks into a single DataFrame + +This is equivalent to using `pandas.concat`_: .. code:: python diff --git a/pyathena/pandas/cursor.py b/pyathena/pandas/cursor.py index 470adc5c..dc9bea53 100644 --- a/pyathena/pandas/cursor.py +++ b/pyathena/pandas/cursor.py @@ -380,18 +380,13 @@ def iter_chunks(self) -> Generator["DataFrame", None, None]: """ if not self.has_result_set: raise ProgrammingError("No result set.") + result_set = cast(AthenaPandasResultSet, self.result_set) - result = self.as_pandas() - if isinstance(result, DataFrameIterator): - # It's an iterator (chunked mode) - import gc + import gc - for chunk_count, chunk in enumerate(result, 1): - yield chunk + for chunk_count, chunk in enumerate(result_set.iter_chunks(), 1): + yield chunk - # Suggest garbage collection every 10 chunks for large datasets - if chunk_count % 10 == 0: - gc.collect() - else: - # Single DataFrame - yield as one chunk - yield result + # Suggest garbage collection every 10 chunks for large datasets + if chunk_count % 10 == 0: + gc.collect() diff --git a/pyathena/pandas/result_set.py b/pyathena/pandas/result_set.py index 2700b354..d5bfbbe5 100644 --- a/pyathena/pandas/result_set.py +++ b/pyathena/pandas/result_set.py @@ -68,6 +68,12 @@ def __init__( reader: Union["TextFileReader", "DataFrame"], trunc_date: Callable[["DataFrame"], "DataFrame"], ) -> None: + """Initialize the iterator. + + Args: + reader: Either a TextFileReader (for chunked) or a single DataFrame. + trunc_date: Function to apply date truncation to each chunk. + """ from pandas import DataFrame if isinstance(reader, DataFrame): @@ -76,7 +82,15 @@ def __init__( self._reader = reader self._trunc_date = trunc_date - def __next__(self): + def __next__(self) -> "DataFrame": + """Get the next DataFrame chunk. + + Returns: + The next pandas DataFrame chunk with date truncation applied. + + Raises: + StopIteration: When no more chunks are available. + """ try: df = next(self._reader) return self._trunc_date(df) @@ -84,33 +98,65 @@ def __next__(self): self.close() raise - def __iter__(self): + def __iter__(self) -> "DataFrameIterator": + """Return self as iterator.""" return self - def __enter__(self): + def __enter__(self) -> "DataFrameIterator": + """Context manager entry.""" return self - def __exit__(self, exc_type, exc_value, traceback): + def __exit__(self, exc_type, exc_value, traceback) -> None: + """Context manager exit.""" self.close() def close(self) -> None: + """Close the iterator and release resources.""" from pandas.io.parsers import TextFileReader if isinstance(self._reader, TextFileReader): self._reader.close() - def iterrows(self) -> Iterator[Any]: + def iterrows(self) -> Iterator[Tuple[int, Dict[str, Any]]]: + """Iterate over rows as (index, row_dict) tuples. + + Yields: + Tuple of (row_index, row_dict) for each row across all chunks. + """ for df in self: for row in enumerate(df.to_dict("records")): yield row - def get_chunk(self, size=None): + def get_chunk(self, size: Optional[int] = None) -> "DataFrame": + """Get a chunk of specified size. + + Args: + size: Number of rows to retrieve. If None, returns entire chunk. + + Returns: + DataFrame chunk. + """ from pandas.io.parsers import TextFileReader if isinstance(self._reader, TextFileReader): return self._reader.get_chunk(size) return next(self._reader) + def as_pandas(self) -> "DataFrame": + """Collect all chunks into a single DataFrame. + + Returns: + Single pandas DataFrame containing all data. + """ + import pandas as pd + + dfs: List["DataFrame"] = list(self) + if not dfs: + return pd.DataFrame() + if len(dfs) == 1: + return dfs[0] + return pd.concat(dfs, ignore_index=True) + class AthenaPandasResultSet(AthenaResultSet): """Result set that provides pandas DataFrame results with memory optimization. @@ -625,6 +671,33 @@ def as_pandas(self) -> Union[DataFrameIterator, "DataFrame"]: return next(self._df_iter) return self._df_iter + def iter_chunks(self) -> DataFrameIterator: + """Iterate over result chunks as pandas DataFrames. + + This method provides an iterator interface for processing large result sets. + When chunksize is specified, it yields DataFrames in chunks for memory-efficient + processing. When chunksize is not specified, it yields the entire result as a + single DataFrame. + + Returns: + DataFrameIterator that yields pandas DataFrames for each chunk + of rows, or the entire DataFrame if chunksize was not specified. + + Example: + >>> # With chunking for large datasets + >>> cursor = connection.cursor(PandasCursor, chunksize=50000) + >>> cursor.execute("SELECT * FROM large_table") + >>> for chunk in cursor.iter_chunks(): + ... process_chunk(chunk) # Each chunk is a pandas DataFrame + >>> + >>> # Without chunking - yields entire result as single chunk + >>> cursor = connection.cursor(PandasCursor) + >>> cursor.execute("SELECT * FROM small_table") + >>> for df in cursor.iter_chunks(): + ... process(df) # Single DataFrame with all data + """ + return self._df_iter + def close(self) -> None: import pandas as pd From 69d06257ac4c239fc3a1b7829eaa8b6be5979819 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sun, 4 Jan 2026 17:59:49 +0900 Subject: [PATCH 2/8] Fix DataFrameIterator.iterrows() to maintain continuous row indices MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, row indices reset to 0 for each chunk when iterating. Now row indices are continuous across all chunks, consistent with PolarsCursor's DataFrameIterator behavior. This is the expected behavior since chunking is an optimization detail that should be transparent to the caller. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pyathena/pandas/result_set.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pyathena/pandas/result_set.py b/pyathena/pandas/result_set.py index d5bfbbe5..6f2fb83a 100644 --- a/pyathena/pandas/result_set.py +++ b/pyathena/pandas/result_set.py @@ -120,12 +120,16 @@ def close(self) -> None: def iterrows(self) -> Iterator[Tuple[int, Dict[str, Any]]]: """Iterate over rows as (index, row_dict) tuples. + Row indices are continuous across all chunks, starting from 0. + Yields: Tuple of (row_index, row_dict) for each row across all chunks. """ + row_num = 0 for df in self: - for row in enumerate(df.to_dict("records")): - yield row + for row_dict in df.to_dict("records"): + yield (row_num, row_dict) + row_num += 1 def get_chunk(self, size: Optional[int] = None) -> "DataFrame": """Get a chunk of specified size. From b0f1b1d6d235c159a817e33297f708f62006e424 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sun, 4 Jan 2026 18:04:16 +0900 Subject: [PATCH 3/8] Rename DataFrameIterator to PandasDataFrameIterator and PolarsDataFrameIterator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename the DataFrameIterator classes to include their respective module prefix for clarity and to avoid confusion when importing from both modules. - pyathena.pandas.result_set.DataFrameIterator → PandasDataFrameIterator - pyathena.polars.result_set.DataFrameIterator → PolarsDataFrameIterator Also updates all documentation and test references. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- docs/api/pandas.rst | 2 +- docs/api/polars.rst | 3 +++ docs/pandas.rst | 4 ++-- pyathena/pandas/cursor.py | 8 ++++---- pyathena/pandas/result_set.py | 18 +++++++++--------- pyathena/polars/result_set.py | 20 ++++++++++---------- tests/pyathena/pandas/test_cursor.py | 8 ++++---- 7 files changed, 33 insertions(+), 30 deletions(-) diff --git a/docs/api/pandas.rst b/docs/api/pandas.rst index bfc2d6bd..095aacb5 100644 --- a/docs/api/pandas.rst +++ b/docs/api/pandas.rst @@ -23,7 +23,7 @@ Pandas Result Set :members: :inherited-members: -.. autoclass:: pyathena.pandas.result_set.DataFrameIterator +.. autoclass:: pyathena.pandas.result_set.PandasDataFrameIterator :members: Pandas Data Converters diff --git a/docs/api/polars.rst b/docs/api/polars.rst index 5d4998f4..059ee6c1 100644 --- a/docs/api/polars.rst +++ b/docs/api/polars.rst @@ -23,6 +23,9 @@ Polars Result Set :members: :inherited-members: +.. autoclass:: pyathena.polars.result_set.PolarsDataFrameIterator + :members: + Polars Data Converters ---------------------- diff --git a/docs/pandas.rst b/docs/pandas.rst index b9a910ff..998a6d78 100644 --- a/docs/pandas.rst +++ b/docs/pandas.rst @@ -381,7 +381,7 @@ SQLAlchemy allows this option to be specified in the connection string. awsathena+pandas://:@athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}&chunksize=1000000... -When this option is used, the object returned by the as_pandas method is a ``DataFrameIterator`` object. +When this option is used, the object returned by the as_pandas method is a ``PandasDataFrameIterator`` object. This object has exactly the same interface as the ``TextFileReader`` object and can be handled in the same way. .. code:: python @@ -418,7 +418,7 @@ PandasCursor provides an ``iter_chunks()`` method for convenient chunked process # Memory can be freed after each chunk del chunk -The ``DataFrameIterator`` also has an ``as_pandas()`` method that collects all chunks into a single DataFrame: +The ``PandasDataFrameIterator`` also has an ``as_pandas()`` method that collects all chunks into a single DataFrame: .. code:: python diff --git a/pyathena/pandas/cursor.py b/pyathena/pandas/cursor.py index dc9bea53..433e538e 100644 --- a/pyathena/pandas/cursor.py +++ b/pyathena/pandas/cursor.py @@ -25,7 +25,7 @@ DefaultPandasTypeConverter, DefaultPandasUnloadTypeConverter, ) -from pyathena.pandas.result_set import AthenaPandasResultSet, DataFrameIterator +from pyathena.pandas.result_set import AthenaPandasResultSet, PandasDataFrameIterator from pyathena.result_set import WithResultSet if TYPE_CHECKING: @@ -331,11 +331,11 @@ def fetchall( result_set = cast(AthenaPandasResultSet, self.result_set) return result_set.fetchall() - def as_pandas(self) -> Union["DataFrame", DataFrameIterator]: - """Return DataFrame or DataFrameIterator based on chunksize setting. + def as_pandas(self) -> Union["DataFrame", PandasDataFrameIterator]: + """Return DataFrame or PandasDataFrameIterator based on chunksize setting. Returns: - DataFrame when chunksize is None, DataFrameIterator when chunksize is set. + DataFrame when chunksize is None, PandasDataFrameIterator when chunksize is set. """ if not self.has_result_set: raise ProgrammingError("No result set.") diff --git a/pyathena/pandas/result_set.py b/pyathena/pandas/result_set.py index 6f2fb83a..e002eba6 100644 --- a/pyathena/pandas/result_set.py +++ b/pyathena/pandas/result_set.py @@ -38,7 +38,7 @@ def _no_trunc_date(df: "DataFrame") -> "DataFrame": return df -class DataFrameIterator(abc.Iterator): # type: ignore +class PandasDataFrameIterator(abc.Iterator): # type: ignore """Iterator for chunked DataFrame results from Athena queries. This class wraps either a pandas TextFileReader (for chunked reading) or @@ -98,11 +98,11 @@ def __next__(self) -> "DataFrame": self.close() raise - def __iter__(self) -> "DataFrameIterator": + def __iter__(self) -> "PandasDataFrameIterator": """Return self as iterator.""" return self - def __enter__(self) -> "DataFrameIterator": + def __enter__(self) -> "PandasDataFrameIterator": """Context manager entry.""" return self @@ -285,11 +285,11 @@ def __init__( if self.state == AthenaQueryExecution.STATE_SUCCEEDED and self.output_location: df = self._as_pandas() trunc_date = _no_trunc_date if self.is_unload else self._trunc_date - self._df_iter = DataFrameIterator(df, trunc_date) + self._df_iter = PandasDataFrameIterator(df, trunc_date) else: import pandas as pd - self._df_iter = DataFrameIterator(pd.DataFrame(), _no_trunc_date) + self._df_iter = PandasDataFrameIterator(pd.DataFrame(), _no_trunc_date) self._iterrows = self._df_iter.iterrows() def _get_parquet_engine(self) -> str: @@ -670,12 +670,12 @@ def _as_pandas(self) -> Union["TextFileReader", "DataFrame"]: df = self._read_csv() return df - def as_pandas(self) -> Union[DataFrameIterator, "DataFrame"]: + def as_pandas(self) -> Union[PandasDataFrameIterator, "DataFrame"]: if self._chunksize is None: return next(self._df_iter) return self._df_iter - def iter_chunks(self) -> DataFrameIterator: + def iter_chunks(self) -> PandasDataFrameIterator: """Iterate over result chunks as pandas DataFrames. This method provides an iterator interface for processing large result sets. @@ -684,7 +684,7 @@ def iter_chunks(self) -> DataFrameIterator: single DataFrame. Returns: - DataFrameIterator that yields pandas DataFrames for each chunk + PandasDataFrameIterator that yields pandas DataFrames for each chunk of rows, or the entire DataFrame if chunksize was not specified. Example: @@ -706,6 +706,6 @@ def close(self) -> None: import pandas as pd super().close() - self._df_iter = DataFrameIterator(pd.DataFrame(), _no_trunc_date) + self._df_iter = PandasDataFrameIterator(pd.DataFrame(), _no_trunc_date) self._iterrows = enumerate([]) self._data_manifest = [] diff --git a/pyathena/polars/result_set.py b/pyathena/polars/result_set.py index 7cd27417..7d971840 100644 --- a/pyathena/polars/result_set.py +++ b/pyathena/polars/result_set.py @@ -34,7 +34,7 @@ _logger = logging.getLogger(__name__) -class DataFrameIterator(abc.Iterator): # type: ignore +class PolarsDataFrameIterator(abc.Iterator): # type: ignore """Iterator for chunked DataFrame results from Athena queries. This class wraps either a Polars DataFrame iterator (for chunked reading) or @@ -99,11 +99,11 @@ def __next__(self) -> "pl.DataFrame": self.close() raise - def __iter__(self) -> "DataFrameIterator": + def __iter__(self) -> "PolarsDataFrameIterator": """Return self as iterator.""" return self - def __enter__(self) -> "DataFrameIterator": + def __enter__(self) -> "PolarsDataFrameIterator": """Context manager entry.""" return self @@ -247,7 +247,7 @@ def __init__( else: import polars as pl - self._df_iter = DataFrameIterator( + self._df_iter = PolarsDataFrameIterator( pl.DataFrame(), self.converters, self._get_column_names() ) self._iterrows = self._df_iter.iterrows() @@ -320,11 +320,11 @@ def _get_column_names(self) -> List[str]: description = self.description if self.description else [] return [d[0] for d in description] - def _create_dataframe_iterator(self) -> DataFrameIterator: + def _create_dataframe_iterator(self) -> PolarsDataFrameIterator: """Create a DataFrame iterator for the result set. Returns: - DataFrameIterator that handles both chunked and non-chunked cases. + PolarsDataFrameIterator that handles both chunked and non-chunked cases. """ if self._chunksize is not None: # Chunked mode: create lazy iterator @@ -335,7 +335,7 @@ def _create_dataframe_iterator(self) -> DataFrameIterator: # Non-chunked mode: load entire DataFrame reader = self._as_polars() - return DataFrameIterator(reader, self.converters, self._get_column_names()) + return PolarsDataFrameIterator(reader, self.converters, self._get_column_names()) def fetchone( self, @@ -661,7 +661,7 @@ def _iter_parquet_chunks(self) -> Iterator["pl.DataFrame"]: _logger.exception(f"Failed to read {self._unload_location}.") raise OperationalError(*e.args) from e - def iter_chunks(self) -> DataFrameIterator: + def iter_chunks(self) -> PolarsDataFrameIterator: """Iterate over result chunks as Polars DataFrames. This method provides an iterator interface for processing large result sets. @@ -670,7 +670,7 @@ def iter_chunks(self) -> DataFrameIterator: it yields the entire result as a single DataFrame. Returns: - DataFrameIterator that yields Polars DataFrames for each chunk + PolarsDataFrameIterator that yields Polars DataFrames for each chunk of rows, or the entire DataFrame if chunksize was not specified. Example: @@ -693,5 +693,5 @@ def close(self) -> None: import polars as pl super().close() - self._df_iter = DataFrameIterator(pl.DataFrame(), {}, []) + self._df_iter = PolarsDataFrameIterator(pl.DataFrame(), {}, []) self._iterrows = iter([]) diff --git a/tests/pyathena/pandas/test_cursor.py b/tests/pyathena/pandas/test_cursor.py index 160d0c87..fc12166f 100644 --- a/tests/pyathena/pandas/test_cursor.py +++ b/tests/pyathena/pandas/test_cursor.py @@ -15,7 +15,7 @@ from pyathena.error import DatabaseError, ProgrammingError from pyathena.pandas.cursor import PandasCursor -from pyathena.pandas.result_set import AthenaPandasResultSet, DataFrameIterator +from pyathena.pandas.result_set import AthenaPandasResultSet, PandasDataFrameIterator from tests import ENV from tests.pyathena.conftest import connect @@ -1207,7 +1207,7 @@ def test_pandas_cursor_auto_optimize_chunksize_enabled(self, pandas_cursor): # Should work without error (auto-optimization for small files may not trigger chunking) result = cursor.as_pandas() # Small test data likely won't trigger chunking, so expect DataFrame - assert isinstance(result, (pd.DataFrame, DataFrameIterator)) + assert isinstance(result, (pd.DataFrame, PandasDataFrameIterator)) def test_pandas_cursor_auto_optimize_chunksize_disabled(self, pandas_cursor): """Test PandasCursor with auto_optimize_chunksize disabled (default).""" @@ -1229,7 +1229,7 @@ def test_pandas_cursor_explicit_chunksize_overrides_auto_optimize(self, pandas_c # Should return iterator due to explicit chunksize result = cursor.as_pandas() - assert isinstance(result, DataFrameIterator) + assert isinstance(result, PandasDataFrameIterator) def test_pandas_cursor_iter_chunks_without_chunksize(self, pandas_cursor): """Test PandasCursor iter_chunks method without chunksize (single DataFrame).""" @@ -1283,7 +1283,7 @@ def test_pandas_cursor_actual_chunking_behavior(self, pandas_cursor): cursor.execute("SELECT * FROM many_rows LIMIT 50") result = cursor.as_pandas() - assert isinstance(result, DataFrameIterator) + assert isinstance(result, PandasDataFrameIterator) chunk_sizes = [] total_rows = 0 From 9c119b17abd449cf5af838d3ff6feeb013cda923 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sun, 4 Jan 2026 18:29:22 +0900 Subject: [PATCH 4/8] Optimize DataFrame operations for better performance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Pandas iterrows(): Use itertuples() instead of to_dict("records") to avoid loading all rows into memory at once - Pandas _trunc_date(): Cache time column names in __init__ to avoid repeated list comprehension on each DataFrame chunk - Polars iterrows(): Replace inline lambda with module-level _identity function to avoid creating new function objects in hot path - Polars fetchone(): Cache column names in __init__ to avoid repeated _get_column_names() calls 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pyathena/pandas/result_set.py | 24 ++++++++++++++++-------- pyathena/polars/result_set.py | 21 ++++++++++++++------- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/pyathena/pandas/result_set.py b/pyathena/pandas/result_set.py index e002eba6..a605f086 100644 --- a/pyathena/pandas/result_set.py +++ b/pyathena/pandas/result_set.py @@ -127,8 +127,11 @@ def iterrows(self) -> Iterator[Tuple[int, Dict[str, Any]]]: """ row_num = 0 for df in self: - for row_dict in df.to_dict("records"): - yield (row_num, row_dict) + # Use itertuples for memory efficiency instead of to_dict("records") + # which loads all rows into memory at once + columns = df.columns.tolist() + for row in df.itertuples(index=False): + yield (row_num, dict(zip(columns, row, strict=True))) row_num += 1 def get_chunk(self, size: Optional[int] = None) -> "DataFrame": @@ -282,6 +285,13 @@ def __init__( self._data_manifest: List[str] = [] self._kwargs = kwargs self._fs = self.__s3_file_system() + + # Cache time column names for efficient _trunc_date processing + description = self.description if self.description else [] + self._time_columns: List[str] = [ + d[0] for d in description if d[1] in ("time", "time with time zone") + ] + if self.state == AthenaQueryExecution.STATE_SUCCEEDED and self.output_location: df = self._as_pandas() trunc_date = _no_trunc_date if self.is_unload else self._trunc_date @@ -451,12 +461,10 @@ def parse_dates(self) -> List[Optional[Any]]: return [d[0] for d in description if d[1] in self._PARSE_DATES] def _trunc_date(self, df: "DataFrame") -> "DataFrame": - description = self.description if self.description else [] - times = [d[0] for d in description if d[1] in ("time", "time with time zone")] - if times: - truncated = df.loc[:, times].apply(lambda r: r.dt.time) - for time in times: - df.isetitem(df.columns.get_loc(time), truncated[time]) + if self._time_columns: + truncated = df.loc[:, self._time_columns].apply(lambda r: r.dt.time) + for time_col in self._time_columns: + df.isetitem(df.columns.get_loc(time_col), truncated[time_col]) return df def fetchone( diff --git a/pyathena/polars/result_set.py b/pyathena/polars/result_set.py index 7d971840..e5e09176 100644 --- a/pyathena/polars/result_set.py +++ b/pyathena/polars/result_set.py @@ -34,6 +34,11 @@ _logger = logging.getLogger(__name__) +def _identity(x: Any) -> Any: + """Identity function for use as default converter.""" + return x + + class PolarsDataFrameIterator(abc.Iterator): # type: ignore """Iterator for chunked DataFrame results from Athena queries. @@ -124,9 +129,9 @@ def iterrows(self) -> Iterator[Tuple[int, Dict[str, Any]]]: row_num = 0 for df in self: for row_dict in df.iter_rows(named=True): - # Apply converters + # Apply converters (use module-level _identity to avoid creating lambdas) processed_row = { - col: self._converters.get(col, lambda x: x)(row_dict.get(col)) + col: self._converters.get(col, _identity)(row_dict.get(col)) for col in self._column_names } yield (row_num, processed_row) @@ -241,6 +246,9 @@ def __init__( self._chunksize = chunksize self._kwargs = kwargs + # Cache column names for efficient access in fetchone() + self._column_names_cache: List[str] = self._get_column_names() + # Build DataFrame iterator (handles both chunked and non-chunked cases) if self.state == AthenaQueryExecution.STATE_SUCCEEDED and self.output_location: self._df_iter = self._create_dataframe_iterator() @@ -248,7 +256,7 @@ def __init__( import polars as pl self._df_iter = PolarsDataFrameIterator( - pl.DataFrame(), self.converters, self._get_column_names() + pl.DataFrame(), self.converters, self._column_names_cache ) self._iterrows = self._df_iter.iterrows() @@ -335,7 +343,7 @@ def _create_dataframe_iterator(self) -> PolarsDataFrameIterator: # Non-chunked mode: load entire DataFrame reader = self._as_polars() - return PolarsDataFrameIterator(reader, self.converters, self._get_column_names()) + return PolarsDataFrameIterator(reader, self.converters, self._column_names_cache) def fetchone( self, @@ -351,8 +359,7 @@ def fetchone( return None else: self._rownumber = row[0] + 1 - column_names = self._get_column_names() - return tuple([row[1][col] for col in column_names]) + return tuple([row[1][col] for col in self._column_names_cache]) def fetchmany( self, size: Optional[int] = None @@ -586,7 +593,7 @@ def _get_csv_params(self) -> Tuple[str, bool, Optional[List[str]]]: if self.output_location and self.output_location.endswith(".txt"): separator = "\t" has_header = False - new_columns: Optional[List[str]] = self._get_column_names() + new_columns: Optional[List[str]] = self._column_names_cache else: separator = "," has_header = True From 559df202560cb091a301e7b8192988be14d0979d Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sun, 4 Jan 2026 18:54:01 +0900 Subject: [PATCH 5/8] Fix column names cache initialization order for unload queries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The _column_names_cache was being set before _create_dataframe_iterator() was called, but for unload queries, _as_polars() updates _metadata with the Parquet schema. This caused fetchone() to use stale column names that didn't match the actual DataFrame columns. Fix by moving cache initialization after _create_dataframe_iterator(), and using _get_column_names() directly in methods called during init. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pyathena/polars/result_set.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pyathena/polars/result_set.py b/pyathena/polars/result_set.py index e5e09176..73a84daf 100644 --- a/pyathena/polars/result_set.py +++ b/pyathena/polars/result_set.py @@ -246,18 +246,21 @@ def __init__( self._chunksize = chunksize self._kwargs = kwargs - # Cache column names for efficient access in fetchone() - self._column_names_cache: List[str] = self._get_column_names() - # Build DataFrame iterator (handles both chunked and non-chunked cases) + # Note: _create_dataframe_iterator() calls _as_polars() which may update + # _metadata for unload queries, so we must cache column names AFTER this. if self.state == AthenaQueryExecution.STATE_SUCCEEDED and self.output_location: self._df_iter = self._create_dataframe_iterator() else: import polars as pl self._df_iter = PolarsDataFrameIterator( - pl.DataFrame(), self.converters, self._column_names_cache + pl.DataFrame(), self.converters, self._get_column_names() ) + + # Cache column names for efficient access in fetchone() + # Must be after _create_dataframe_iterator() which updates _metadata for unload + self._column_names_cache: List[str] = self._get_column_names() self._iterrows = self._df_iter.iterrows() @property @@ -343,7 +346,7 @@ def _create_dataframe_iterator(self) -> PolarsDataFrameIterator: # Non-chunked mode: load entire DataFrame reader = self._as_polars() - return PolarsDataFrameIterator(reader, self.converters, self._column_names_cache) + return PolarsDataFrameIterator(reader, self.converters, self._get_column_names()) def fetchone( self, @@ -593,7 +596,7 @@ def _get_csv_params(self) -> Tuple[str, bool, Optional[List[str]]]: if self.output_location and self.output_location.endswith(".txt"): separator = "\t" has_header = False - new_columns: Optional[List[str]] = self._column_names_cache + new_columns: Optional[List[str]] = self._get_column_names() else: separator = "," has_header = True From e8873dd7c8f5d092b2c17a0bd0a1dd282d8b10da Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sun, 4 Jan 2026 19:09:07 +0900 Subject: [PATCH 6/8] Remove unused _closed flag from PolarsDataFrameIterator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove _closed flag that was not providing any real functionality - Update close() to properly close generator if reader is a generator - Align with PandasDataFrameIterator which doesn't use _closed flag 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pyathena/polars/result_set.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pyathena/polars/result_set.py b/pyathena/polars/result_set.py index 73a84daf..4deb5215 100644 --- a/pyathena/polars/result_set.py +++ b/pyathena/polars/result_set.py @@ -85,7 +85,6 @@ def __init__( self._reader = reader self._converters = converters self._column_names = column_names - self._closed = False def __next__(self) -> "pl.DataFrame": """Get the next DataFrame chunk. @@ -96,8 +95,6 @@ def __next__(self) -> "pl.DataFrame": Raises: StopIteration: When no more chunks are available. """ - if self._closed: - raise StopIteration try: return next(self._reader) except StopIteration: @@ -118,7 +115,10 @@ def __exit__(self, exc_type, exc_value, traceback) -> None: def close(self) -> None: """Close the iterator and release resources.""" - self._closed = True + from types import GeneratorType + + if isinstance(self._reader, GeneratorType): + self._reader.close() def iterrows(self) -> Iterator[Tuple[int, Dict[str, Any]]]: """Iterate over rows as (index, row_dict) tuples. From 93a7338fba9696c58bf9352053a9ff48de8e35a7 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sun, 4 Jan 2026 19:13:30 +0900 Subject: [PATCH 7/8] Add PolarsDataFrameIterator documentation to polars.rst MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Document the PolarsDataFrameIterator class and its as_polars() method for consistency with pandas.rst documentation. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- docs/polars.rst | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/docs/polars.rst b/docs/polars.rst index fb0d59b7..0651cb36 100644 --- a/docs/polars.rst +++ b/docs/polars.rst @@ -334,6 +334,51 @@ The chunked iteration also works with the unload option: # Process Parquet data in chunks process_chunk(chunk) +When the chunksize option is used, the object returned by the ``as_polars`` method is a ``PolarsDataFrameIterator`` object. +This object provides the same chunked iteration interface and can be used in the same way: + +.. code:: python + + from pyathena import connect + from pyathena.polars.cursor import PolarsCursor + + cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2", + cursor_class=PolarsCursor).cursor(chunksize=50_000) + df_iter = cursor.execute("SELECT * FROM many_rows").as_polars() + for df in df_iter: + print(df.describe()) + print(df.head()) + +The ``PolarsDataFrameIterator`` also has an ``as_polars()`` method that collects all chunks into a single DataFrame: + +.. code:: python + + from pyathena import connect + from pyathena.polars.cursor import PolarsCursor + + cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2", + cursor_class=PolarsCursor).cursor(chunksize=50_000) + df_iter = cursor.execute("SELECT * FROM many_rows").as_polars() + df = df_iter.as_polars() # Collect all chunks into a single DataFrame + +This is equivalent to using `polars.concat`_: + +.. code:: python + + import polars as pl + from pyathena import connect + from pyathena.polars.cursor import PolarsCursor + + cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2", + cursor_class=PolarsCursor).cursor(chunksize=50_000) + df_iter = cursor.execute("SELECT * FROM many_rows").as_polars() + df = pl.concat(list(df_iter)) + +.. _`polars.concat`: https://docs.pola.rs/api/python/stable/reference/api/polars.concat.html + .. _async-polars-cursor: AsyncPolarsCursor From b6d27f0d62f53ebba1f4035f6cafb87b6507ed06 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sun, 4 Jan 2026 19:24:13 +0900 Subject: [PATCH 8/8] Add Polars to extra packages table in README MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added missing Polars entry to the installation extra packages table, documenting the pip install command and version requirement (>=1.0.0). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- README.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.rst b/README.rst index 253a485a..11c197e3 100644 --- a/README.rst +++ b/README.rst @@ -62,6 +62,8 @@ Extra packages: +---------------+---------------------------------------+------------------+ | Arrow | ``pip install PyAthena[Arrow]`` | >=10.0.0 | +---------------+---------------------------------------+------------------+ +| Polars | ``pip install PyAthena[Polars]`` | >=1.0.0 | ++---------------+---------------------------------------+------------------+ .. _usage: