diff --git a/README.rst b/README.rst index 253a485a..11c197e3 100644 --- a/README.rst +++ b/README.rst @@ -62,6 +62,8 @@ Extra packages: +---------------+---------------------------------------+------------------+ | Arrow | ``pip install PyAthena[Arrow]`` | >=10.0.0 | +---------------+---------------------------------------+------------------+ +| Polars | ``pip install PyAthena[Polars]`` | >=1.0.0 | ++---------------+---------------------------------------+------------------+ .. _usage: diff --git a/docs/api/pandas.rst b/docs/api/pandas.rst index bfc2d6bd..095aacb5 100644 --- a/docs/api/pandas.rst +++ b/docs/api/pandas.rst @@ -23,7 +23,7 @@ Pandas Result Set :members: :inherited-members: -.. autoclass:: pyathena.pandas.result_set.DataFrameIterator +.. autoclass:: pyathena.pandas.result_set.PandasDataFrameIterator :members: Pandas Data Converters diff --git a/docs/api/polars.rst b/docs/api/polars.rst index 5d4998f4..059ee6c1 100644 --- a/docs/api/polars.rst +++ b/docs/api/polars.rst @@ -23,6 +23,9 @@ Polars Result Set :members: :inherited-members: +.. autoclass:: pyathena.polars.result_set.PolarsDataFrameIterator + :members: + Polars Data Converters ---------------------- diff --git a/docs/pandas.rst b/docs/pandas.rst index a8ce3274..998a6d78 100644 --- a/docs/pandas.rst +++ b/docs/pandas.rst @@ -381,7 +381,7 @@ SQLAlchemy allows this option to be specified in the connection string. awsathena+pandas://:@athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}&chunksize=1000000... -When this option is used, the object returned by the as_pandas method is a ``DataFrameIterator`` object. +When this option is used, the object returned by the as_pandas method is a ``PandasDataFrameIterator`` object. This object has exactly the same interface as the ``TextFileReader`` object and can be handled in the same way. .. code:: python @@ -418,7 +418,20 @@ PandasCursor provides an ``iter_chunks()`` method for convenient chunked process # Memory can be freed after each chunk del chunk -You can also concatenate them into a single `pandas.DataFrame object`_ using `pandas.concat`_. +The ``PandasDataFrameIterator`` also has an ``as_pandas()`` method that collects all chunks into a single DataFrame: + +.. code:: python + + from pyathena import connect + from pyathena.pandas.cursor import PandasCursor + + cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2", + cursor_class=PandasCursor).cursor() + df_iter = cursor.execute("SELECT * FROM many_rows", chunksize=1_000_000).as_pandas() + df = df_iter.as_pandas() # Collect all chunks into a single DataFrame + +This is equivalent to using `pandas.concat`_: .. code:: python diff --git a/docs/polars.rst b/docs/polars.rst index fb0d59b7..0651cb36 100644 --- a/docs/polars.rst +++ b/docs/polars.rst @@ -334,6 +334,51 @@ The chunked iteration also works with the unload option: # Process Parquet data in chunks process_chunk(chunk) +When the chunksize option is used, the object returned by the ``as_polars`` method is a ``PolarsDataFrameIterator`` object. +This object provides the same chunked iteration interface and can be used in the same way: + +.. code:: python + + from pyathena import connect + from pyathena.polars.cursor import PolarsCursor + + cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2", + cursor_class=PolarsCursor).cursor(chunksize=50_000) + df_iter = cursor.execute("SELECT * FROM many_rows").as_polars() + for df in df_iter: + print(df.describe()) + print(df.head()) + +The ``PolarsDataFrameIterator`` also has an ``as_polars()`` method that collects all chunks into a single DataFrame: + +.. code:: python + + from pyathena import connect + from pyathena.polars.cursor import PolarsCursor + + cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2", + cursor_class=PolarsCursor).cursor(chunksize=50_000) + df_iter = cursor.execute("SELECT * FROM many_rows").as_polars() + df = df_iter.as_polars() # Collect all chunks into a single DataFrame + +This is equivalent to using `polars.concat`_: + +.. code:: python + + import polars as pl + from pyathena import connect + from pyathena.polars.cursor import PolarsCursor + + cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2", + cursor_class=PolarsCursor).cursor(chunksize=50_000) + df_iter = cursor.execute("SELECT * FROM many_rows").as_polars() + df = pl.concat(list(df_iter)) + +.. _`polars.concat`: https://docs.pola.rs/api/python/stable/reference/api/polars.concat.html + .. _async-polars-cursor: AsyncPolarsCursor diff --git a/pyathena/pandas/cursor.py b/pyathena/pandas/cursor.py index 470adc5c..433e538e 100644 --- a/pyathena/pandas/cursor.py +++ b/pyathena/pandas/cursor.py @@ -25,7 +25,7 @@ DefaultPandasTypeConverter, DefaultPandasUnloadTypeConverter, ) -from pyathena.pandas.result_set import AthenaPandasResultSet, DataFrameIterator +from pyathena.pandas.result_set import AthenaPandasResultSet, PandasDataFrameIterator from pyathena.result_set import WithResultSet if TYPE_CHECKING: @@ -331,11 +331,11 @@ def fetchall( result_set = cast(AthenaPandasResultSet, self.result_set) return result_set.fetchall() - def as_pandas(self) -> Union["DataFrame", DataFrameIterator]: - """Return DataFrame or DataFrameIterator based on chunksize setting. + def as_pandas(self) -> Union["DataFrame", PandasDataFrameIterator]: + """Return DataFrame or PandasDataFrameIterator based on chunksize setting. Returns: - DataFrame when chunksize is None, DataFrameIterator when chunksize is set. + DataFrame when chunksize is None, PandasDataFrameIterator when chunksize is set. """ if not self.has_result_set: raise ProgrammingError("No result set.") @@ -380,18 +380,13 @@ def iter_chunks(self) -> Generator["DataFrame", None, None]: """ if not self.has_result_set: raise ProgrammingError("No result set.") + result_set = cast(AthenaPandasResultSet, self.result_set) - result = self.as_pandas() - if isinstance(result, DataFrameIterator): - # It's an iterator (chunked mode) - import gc + import gc - for chunk_count, chunk in enumerate(result, 1): - yield chunk + for chunk_count, chunk in enumerate(result_set.iter_chunks(), 1): + yield chunk - # Suggest garbage collection every 10 chunks for large datasets - if chunk_count % 10 == 0: - gc.collect() - else: - # Single DataFrame - yield as one chunk - yield result + # Suggest garbage collection every 10 chunks for large datasets + if chunk_count % 10 == 0: + gc.collect() diff --git a/pyathena/pandas/result_set.py b/pyathena/pandas/result_set.py index 2700b354..a605f086 100644 --- a/pyathena/pandas/result_set.py +++ b/pyathena/pandas/result_set.py @@ -38,7 +38,7 @@ def _no_trunc_date(df: "DataFrame") -> "DataFrame": return df -class DataFrameIterator(abc.Iterator): # type: ignore +class PandasDataFrameIterator(abc.Iterator): # type: ignore """Iterator for chunked DataFrame results from Athena queries. This class wraps either a pandas TextFileReader (for chunked reading) or @@ -68,6 +68,12 @@ def __init__( reader: Union["TextFileReader", "DataFrame"], trunc_date: Callable[["DataFrame"], "DataFrame"], ) -> None: + """Initialize the iterator. + + Args: + reader: Either a TextFileReader (for chunked) or a single DataFrame. + trunc_date: Function to apply date truncation to each chunk. + """ from pandas import DataFrame if isinstance(reader, DataFrame): @@ -76,7 +82,15 @@ def __init__( self._reader = reader self._trunc_date = trunc_date - def __next__(self): + def __next__(self) -> "DataFrame": + """Get the next DataFrame chunk. + + Returns: + The next pandas DataFrame chunk with date truncation applied. + + Raises: + StopIteration: When no more chunks are available. + """ try: df = next(self._reader) return self._trunc_date(df) @@ -84,33 +98,72 @@ def __next__(self): self.close() raise - def __iter__(self): + def __iter__(self) -> "PandasDataFrameIterator": + """Return self as iterator.""" return self - def __enter__(self): + def __enter__(self) -> "PandasDataFrameIterator": + """Context manager entry.""" return self - def __exit__(self, exc_type, exc_value, traceback): + def __exit__(self, exc_type, exc_value, traceback) -> None: + """Context manager exit.""" self.close() def close(self) -> None: + """Close the iterator and release resources.""" from pandas.io.parsers import TextFileReader if isinstance(self._reader, TextFileReader): self._reader.close() - def iterrows(self) -> Iterator[Any]: + def iterrows(self) -> Iterator[Tuple[int, Dict[str, Any]]]: + """Iterate over rows as (index, row_dict) tuples. + + Row indices are continuous across all chunks, starting from 0. + + Yields: + Tuple of (row_index, row_dict) for each row across all chunks. + """ + row_num = 0 for df in self: - for row in enumerate(df.to_dict("records")): - yield row + # Use itertuples for memory efficiency instead of to_dict("records") + # which loads all rows into memory at once + columns = df.columns.tolist() + for row in df.itertuples(index=False): + yield (row_num, dict(zip(columns, row, strict=True))) + row_num += 1 + + def get_chunk(self, size: Optional[int] = None) -> "DataFrame": + """Get a chunk of specified size. + + Args: + size: Number of rows to retrieve. If None, returns entire chunk. - def get_chunk(self, size=None): + Returns: + DataFrame chunk. + """ from pandas.io.parsers import TextFileReader if isinstance(self._reader, TextFileReader): return self._reader.get_chunk(size) return next(self._reader) + def as_pandas(self) -> "DataFrame": + """Collect all chunks into a single DataFrame. + + Returns: + Single pandas DataFrame containing all data. + """ + import pandas as pd + + dfs: List["DataFrame"] = list(self) + if not dfs: + return pd.DataFrame() + if len(dfs) == 1: + return dfs[0] + return pd.concat(dfs, ignore_index=True) + class AthenaPandasResultSet(AthenaResultSet): """Result set that provides pandas DataFrame results with memory optimization. @@ -232,14 +285,21 @@ def __init__( self._data_manifest: List[str] = [] self._kwargs = kwargs self._fs = self.__s3_file_system() + + # Cache time column names for efficient _trunc_date processing + description = self.description if self.description else [] + self._time_columns: List[str] = [ + d[0] for d in description if d[1] in ("time", "time with time zone") + ] + if self.state == AthenaQueryExecution.STATE_SUCCEEDED and self.output_location: df = self._as_pandas() trunc_date = _no_trunc_date if self.is_unload else self._trunc_date - self._df_iter = DataFrameIterator(df, trunc_date) + self._df_iter = PandasDataFrameIterator(df, trunc_date) else: import pandas as pd - self._df_iter = DataFrameIterator(pd.DataFrame(), _no_trunc_date) + self._df_iter = PandasDataFrameIterator(pd.DataFrame(), _no_trunc_date) self._iterrows = self._df_iter.iterrows() def _get_parquet_engine(self) -> str: @@ -401,12 +461,10 @@ def parse_dates(self) -> List[Optional[Any]]: return [d[0] for d in description if d[1] in self._PARSE_DATES] def _trunc_date(self, df: "DataFrame") -> "DataFrame": - description = self.description if self.description else [] - times = [d[0] for d in description if d[1] in ("time", "time with time zone")] - if times: - truncated = df.loc[:, times].apply(lambda r: r.dt.time) - for time in times: - df.isetitem(df.columns.get_loc(time), truncated[time]) + if self._time_columns: + truncated = df.loc[:, self._time_columns].apply(lambda r: r.dt.time) + for time_col in self._time_columns: + df.isetitem(df.columns.get_loc(time_col), truncated[time_col]) return df def fetchone( @@ -620,15 +678,42 @@ def _as_pandas(self) -> Union["TextFileReader", "DataFrame"]: df = self._read_csv() return df - def as_pandas(self) -> Union[DataFrameIterator, "DataFrame"]: + def as_pandas(self) -> Union[PandasDataFrameIterator, "DataFrame"]: if self._chunksize is None: return next(self._df_iter) return self._df_iter + def iter_chunks(self) -> PandasDataFrameIterator: + """Iterate over result chunks as pandas DataFrames. + + This method provides an iterator interface for processing large result sets. + When chunksize is specified, it yields DataFrames in chunks for memory-efficient + processing. When chunksize is not specified, it yields the entire result as a + single DataFrame. + + Returns: + PandasDataFrameIterator that yields pandas DataFrames for each chunk + of rows, or the entire DataFrame if chunksize was not specified. + + Example: + >>> # With chunking for large datasets + >>> cursor = connection.cursor(PandasCursor, chunksize=50000) + >>> cursor.execute("SELECT * FROM large_table") + >>> for chunk in cursor.iter_chunks(): + ... process_chunk(chunk) # Each chunk is a pandas DataFrame + >>> + >>> # Without chunking - yields entire result as single chunk + >>> cursor = connection.cursor(PandasCursor) + >>> cursor.execute("SELECT * FROM small_table") + >>> for df in cursor.iter_chunks(): + ... process(df) # Single DataFrame with all data + """ + return self._df_iter + def close(self) -> None: import pandas as pd super().close() - self._df_iter = DataFrameIterator(pd.DataFrame(), _no_trunc_date) + self._df_iter = PandasDataFrameIterator(pd.DataFrame(), _no_trunc_date) self._iterrows = enumerate([]) self._data_manifest = [] diff --git a/pyathena/polars/result_set.py b/pyathena/polars/result_set.py index 7cd27417..4deb5215 100644 --- a/pyathena/polars/result_set.py +++ b/pyathena/polars/result_set.py @@ -34,7 +34,12 @@ _logger = logging.getLogger(__name__) -class DataFrameIterator(abc.Iterator): # type: ignore +def _identity(x: Any) -> Any: + """Identity function for use as default converter.""" + return x + + +class PolarsDataFrameIterator(abc.Iterator): # type: ignore """Iterator for chunked DataFrame results from Athena queries. This class wraps either a Polars DataFrame iterator (for chunked reading) or @@ -80,7 +85,6 @@ def __init__( self._reader = reader self._converters = converters self._column_names = column_names - self._closed = False def __next__(self) -> "pl.DataFrame": """Get the next DataFrame chunk. @@ -91,19 +95,17 @@ def __next__(self) -> "pl.DataFrame": Raises: StopIteration: When no more chunks are available. """ - if self._closed: - raise StopIteration try: return next(self._reader) except StopIteration: self.close() raise - def __iter__(self) -> "DataFrameIterator": + def __iter__(self) -> "PolarsDataFrameIterator": """Return self as iterator.""" return self - def __enter__(self) -> "DataFrameIterator": + def __enter__(self) -> "PolarsDataFrameIterator": """Context manager entry.""" return self @@ -113,7 +115,10 @@ def __exit__(self, exc_type, exc_value, traceback) -> None: def close(self) -> None: """Close the iterator and release resources.""" - self._closed = True + from types import GeneratorType + + if isinstance(self._reader, GeneratorType): + self._reader.close() def iterrows(self) -> Iterator[Tuple[int, Dict[str, Any]]]: """Iterate over rows as (index, row_dict) tuples. @@ -124,9 +129,9 @@ def iterrows(self) -> Iterator[Tuple[int, Dict[str, Any]]]: row_num = 0 for df in self: for row_dict in df.iter_rows(named=True): - # Apply converters + # Apply converters (use module-level _identity to avoid creating lambdas) processed_row = { - col: self._converters.get(col, lambda x: x)(row_dict.get(col)) + col: self._converters.get(col, _identity)(row_dict.get(col)) for col in self._column_names } yield (row_num, processed_row) @@ -242,14 +247,20 @@ def __init__( self._kwargs = kwargs # Build DataFrame iterator (handles both chunked and non-chunked cases) + # Note: _create_dataframe_iterator() calls _as_polars() which may update + # _metadata for unload queries, so we must cache column names AFTER this. if self.state == AthenaQueryExecution.STATE_SUCCEEDED and self.output_location: self._df_iter = self._create_dataframe_iterator() else: import polars as pl - self._df_iter = DataFrameIterator( + self._df_iter = PolarsDataFrameIterator( pl.DataFrame(), self.converters, self._get_column_names() ) + + # Cache column names for efficient access in fetchone() + # Must be after _create_dataframe_iterator() which updates _metadata for unload + self._column_names_cache: List[str] = self._get_column_names() self._iterrows = self._df_iter.iterrows() @property @@ -320,11 +331,11 @@ def _get_column_names(self) -> List[str]: description = self.description if self.description else [] return [d[0] for d in description] - def _create_dataframe_iterator(self) -> DataFrameIterator: + def _create_dataframe_iterator(self) -> PolarsDataFrameIterator: """Create a DataFrame iterator for the result set. Returns: - DataFrameIterator that handles both chunked and non-chunked cases. + PolarsDataFrameIterator that handles both chunked and non-chunked cases. """ if self._chunksize is not None: # Chunked mode: create lazy iterator @@ -335,7 +346,7 @@ def _create_dataframe_iterator(self) -> DataFrameIterator: # Non-chunked mode: load entire DataFrame reader = self._as_polars() - return DataFrameIterator(reader, self.converters, self._get_column_names()) + return PolarsDataFrameIterator(reader, self.converters, self._get_column_names()) def fetchone( self, @@ -351,8 +362,7 @@ def fetchone( return None else: self._rownumber = row[0] + 1 - column_names = self._get_column_names() - return tuple([row[1][col] for col in column_names]) + return tuple([row[1][col] for col in self._column_names_cache]) def fetchmany( self, size: Optional[int] = None @@ -661,7 +671,7 @@ def _iter_parquet_chunks(self) -> Iterator["pl.DataFrame"]: _logger.exception(f"Failed to read {self._unload_location}.") raise OperationalError(*e.args) from e - def iter_chunks(self) -> DataFrameIterator: + def iter_chunks(self) -> PolarsDataFrameIterator: """Iterate over result chunks as Polars DataFrames. This method provides an iterator interface for processing large result sets. @@ -670,7 +680,7 @@ def iter_chunks(self) -> DataFrameIterator: it yields the entire result as a single DataFrame. Returns: - DataFrameIterator that yields Polars DataFrames for each chunk + PolarsDataFrameIterator that yields Polars DataFrames for each chunk of rows, or the entire DataFrame if chunksize was not specified. Example: @@ -693,5 +703,5 @@ def close(self) -> None: import polars as pl super().close() - self._df_iter = DataFrameIterator(pl.DataFrame(), {}, []) + self._df_iter = PolarsDataFrameIterator(pl.DataFrame(), {}, []) self._iterrows = iter([]) diff --git a/tests/pyathena/pandas/test_cursor.py b/tests/pyathena/pandas/test_cursor.py index 160d0c87..fc12166f 100644 --- a/tests/pyathena/pandas/test_cursor.py +++ b/tests/pyathena/pandas/test_cursor.py @@ -15,7 +15,7 @@ from pyathena.error import DatabaseError, ProgrammingError from pyathena.pandas.cursor import PandasCursor -from pyathena.pandas.result_set import AthenaPandasResultSet, DataFrameIterator +from pyathena.pandas.result_set import AthenaPandasResultSet, PandasDataFrameIterator from tests import ENV from tests.pyathena.conftest import connect @@ -1207,7 +1207,7 @@ def test_pandas_cursor_auto_optimize_chunksize_enabled(self, pandas_cursor): # Should work without error (auto-optimization for small files may not trigger chunking) result = cursor.as_pandas() # Small test data likely won't trigger chunking, so expect DataFrame - assert isinstance(result, (pd.DataFrame, DataFrameIterator)) + assert isinstance(result, (pd.DataFrame, PandasDataFrameIterator)) def test_pandas_cursor_auto_optimize_chunksize_disabled(self, pandas_cursor): """Test PandasCursor with auto_optimize_chunksize disabled (default).""" @@ -1229,7 +1229,7 @@ def test_pandas_cursor_explicit_chunksize_overrides_auto_optimize(self, pandas_c # Should return iterator due to explicit chunksize result = cursor.as_pandas() - assert isinstance(result, DataFrameIterator) + assert isinstance(result, PandasDataFrameIterator) def test_pandas_cursor_iter_chunks_without_chunksize(self, pandas_cursor): """Test PandasCursor iter_chunks method without chunksize (single DataFrame).""" @@ -1283,7 +1283,7 @@ def test_pandas_cursor_actual_chunking_behavior(self, pandas_cursor): cursor.execute("SELECT * FROM many_rows LIMIT 50") result = cursor.as_pandas() - assert isinstance(result, DataFrameIterator) + assert isinstance(result, PandasDataFrameIterator) chunk_sizes = [] total_rows = 0