Skip to content
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ Extra packages:
+---------------+---------------------------------------+------------------+
| Arrow | ``pip install PyAthena[Arrow]`` | >=10.0.0 |
+---------------+---------------------------------------+------------------+
| Polars | ``pip install PyAthena[Polars]`` | >=1.0.0 |
+---------------+---------------------------------------+------------------+

.. _usage:

Expand Down
2 changes: 1 addition & 1 deletion docs/api/pandas.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Pandas Result Set
:members:
:inherited-members:

.. autoclass:: pyathena.pandas.result_set.DataFrameIterator
.. autoclass:: pyathena.pandas.result_set.PandasDataFrameIterator
:members:

Pandas Data Converters
Expand Down
3 changes: 3 additions & 0 deletions docs/api/polars.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ Polars Result Set
:members:
:inherited-members:

.. autoclass:: pyathena.polars.result_set.PolarsDataFrameIterator
:members:

Polars Data Converters
----------------------

Expand Down
17 changes: 15 additions & 2 deletions docs/pandas.rst
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ SQLAlchemy allows this option to be specified in the connection string.

awsathena+pandas://:@athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}&chunksize=1000000...

When this option is used, the object returned by the as_pandas method is a ``DataFrameIterator`` object.
When this option is used, the object returned by the as_pandas method is a ``PandasDataFrameIterator`` object.
This object has exactly the same interface as the ``TextFileReader`` object and can be handled in the same way.

.. code:: python
Expand Down Expand Up @@ -418,7 +418,20 @@ PandasCursor provides an ``iter_chunks()`` method for convenient chunked process
# Memory can be freed after each chunk
del chunk

You can also concatenate them into a single `pandas.DataFrame object`_ using `pandas.concat`_.
The ``PandasDataFrameIterator`` also has an ``as_pandas()`` method that collects all chunks into a single DataFrame:

.. code:: python

from pyathena import connect
from pyathena.pandas.cursor import PandasCursor

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=PandasCursor).cursor()
df_iter = cursor.execute("SELECT * FROM many_rows", chunksize=1_000_000).as_pandas()
df = df_iter.as_pandas() # Collect all chunks into a single DataFrame

This is equivalent to using `pandas.concat`_:

.. code:: python

Expand Down
45 changes: 45 additions & 0 deletions docs/polars.rst
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,51 @@ The chunked iteration also works with the unload option:
# Process Parquet data in chunks
process_chunk(chunk)

When the chunksize option is used, the object returned by the ``as_polars`` method is a ``PolarsDataFrameIterator`` object.
This object provides the same chunked iteration interface and can be used in the same way:

.. code:: python

from pyathena import connect
from pyathena.polars.cursor import PolarsCursor

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=PolarsCursor).cursor(chunksize=50_000)
df_iter = cursor.execute("SELECT * FROM many_rows").as_polars()
for df in df_iter:
print(df.describe())
print(df.head())

The ``PolarsDataFrameIterator`` also has an ``as_polars()`` method that collects all chunks into a single DataFrame:

.. code:: python

from pyathena import connect
from pyathena.polars.cursor import PolarsCursor

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=PolarsCursor).cursor(chunksize=50_000)
df_iter = cursor.execute("SELECT * FROM many_rows").as_polars()
df = df_iter.as_polars() # Collect all chunks into a single DataFrame

This is equivalent to using `polars.concat`_:

.. code:: python

import polars as pl
from pyathena import connect
from pyathena.polars.cursor import PolarsCursor

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=PolarsCursor).cursor(chunksize=50_000)
df_iter = cursor.execute("SELECT * FROM many_rows").as_polars()
df = pl.concat(list(df_iter))

.. _`polars.concat`: https://docs.pola.rs/api/python/stable/reference/api/polars.concat.html

.. _async-polars-cursor:

AsyncPolarsCursor
Expand Down
27 changes: 11 additions & 16 deletions pyathena/pandas/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
DefaultPandasTypeConverter,
DefaultPandasUnloadTypeConverter,
)
from pyathena.pandas.result_set import AthenaPandasResultSet, DataFrameIterator
from pyathena.pandas.result_set import AthenaPandasResultSet, PandasDataFrameIterator
from pyathena.result_set import WithResultSet

if TYPE_CHECKING:
Expand Down Expand Up @@ -331,11 +331,11 @@ def fetchall(
result_set = cast(AthenaPandasResultSet, self.result_set)
return result_set.fetchall()

def as_pandas(self) -> Union["DataFrame", DataFrameIterator]:
"""Return DataFrame or DataFrameIterator based on chunksize setting.
def as_pandas(self) -> Union["DataFrame", PandasDataFrameIterator]:
"""Return DataFrame or PandasDataFrameIterator based on chunksize setting.

Returns:
DataFrame when chunksize is None, DataFrameIterator when chunksize is set.
DataFrame when chunksize is None, PandasDataFrameIterator when chunksize is set.
"""
if not self.has_result_set:
raise ProgrammingError("No result set.")
Expand Down Expand Up @@ -380,18 +380,13 @@ def iter_chunks(self) -> Generator["DataFrame", None, None]:
"""
if not self.has_result_set:
raise ProgrammingError("No result set.")
result_set = cast(AthenaPandasResultSet, self.result_set)

result = self.as_pandas()
if isinstance(result, DataFrameIterator):
# It's an iterator (chunked mode)
import gc
import gc

for chunk_count, chunk in enumerate(result, 1):
yield chunk
for chunk_count, chunk in enumerate(result_set.iter_chunks(), 1):
yield chunk

# Suggest garbage collection every 10 chunks for large datasets
if chunk_count % 10 == 0:
gc.collect()
else:
# Single DataFrame - yield as one chunk
yield result
# Suggest garbage collection every 10 chunks for large datasets
if chunk_count % 10 == 0:
gc.collect()
123 changes: 104 additions & 19 deletions pyathena/pandas/result_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def _no_trunc_date(df: "DataFrame") -> "DataFrame":
return df


class DataFrameIterator(abc.Iterator): # type: ignore
class PandasDataFrameIterator(abc.Iterator): # type: ignore
"""Iterator for chunked DataFrame results from Athena queries.

This class wraps either a pandas TextFileReader (for chunked reading) or
Expand Down Expand Up @@ -68,6 +68,12 @@ def __init__(
reader: Union["TextFileReader", "DataFrame"],
trunc_date: Callable[["DataFrame"], "DataFrame"],
) -> None:
"""Initialize the iterator.

Args:
reader: Either a TextFileReader (for chunked) or a single DataFrame.
trunc_date: Function to apply date truncation to each chunk.
"""
from pandas import DataFrame

if isinstance(reader, DataFrame):
Expand All @@ -76,41 +82,88 @@ def __init__(
self._reader = reader
self._trunc_date = trunc_date

def __next__(self):
def __next__(self) -> "DataFrame":
"""Get the next DataFrame chunk.

Returns:
The next pandas DataFrame chunk with date truncation applied.

Raises:
StopIteration: When no more chunks are available.
"""
try:
df = next(self._reader)
return self._trunc_date(df)
except StopIteration:
self.close()
raise

def __iter__(self):
def __iter__(self) -> "PandasDataFrameIterator":
"""Return self as iterator."""
return self

def __enter__(self):
def __enter__(self) -> "PandasDataFrameIterator":
"""Context manager entry."""
return self

def __exit__(self, exc_type, exc_value, traceback):
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""Context manager exit."""
self.close()

def close(self) -> None:
"""Close the iterator and release resources."""
from pandas.io.parsers import TextFileReader

if isinstance(self._reader, TextFileReader):
self._reader.close()

def iterrows(self) -> Iterator[Any]:
def iterrows(self) -> Iterator[Tuple[int, Dict[str, Any]]]:
"""Iterate over rows as (index, row_dict) tuples.

Row indices are continuous across all chunks, starting from 0.

Yields:
Tuple of (row_index, row_dict) for each row across all chunks.
"""
row_num = 0
for df in self:
for row in enumerate(df.to_dict("records")):
yield row
# Use itertuples for memory efficiency instead of to_dict("records")
# which loads all rows into memory at once
columns = df.columns.tolist()
for row in df.itertuples(index=False):
yield (row_num, dict(zip(columns, row, strict=True)))
row_num += 1

def get_chunk(self, size: Optional[int] = None) -> "DataFrame":
"""Get a chunk of specified size.

Args:
size: Number of rows to retrieve. If None, returns entire chunk.

def get_chunk(self, size=None):
Returns:
DataFrame chunk.
"""
from pandas.io.parsers import TextFileReader

if isinstance(self._reader, TextFileReader):
return self._reader.get_chunk(size)
return next(self._reader)

def as_pandas(self) -> "DataFrame":
"""Collect all chunks into a single DataFrame.

Returns:
Single pandas DataFrame containing all data.
"""
import pandas as pd

dfs: List["DataFrame"] = list(self)
if not dfs:
return pd.DataFrame()
if len(dfs) == 1:
return dfs[0]
return pd.concat(dfs, ignore_index=True)


class AthenaPandasResultSet(AthenaResultSet):
"""Result set that provides pandas DataFrame results with memory optimization.
Expand Down Expand Up @@ -232,14 +285,21 @@ def __init__(
self._data_manifest: List[str] = []
self._kwargs = kwargs
self._fs = self.__s3_file_system()

# Cache time column names for efficient _trunc_date processing
description = self.description if self.description else []
self._time_columns: List[str] = [
d[0] for d in description if d[1] in ("time", "time with time zone")
]

if self.state == AthenaQueryExecution.STATE_SUCCEEDED and self.output_location:
df = self._as_pandas()
trunc_date = _no_trunc_date if self.is_unload else self._trunc_date
self._df_iter = DataFrameIterator(df, trunc_date)
self._df_iter = PandasDataFrameIterator(df, trunc_date)
else:
import pandas as pd

self._df_iter = DataFrameIterator(pd.DataFrame(), _no_trunc_date)
self._df_iter = PandasDataFrameIterator(pd.DataFrame(), _no_trunc_date)
self._iterrows = self._df_iter.iterrows()

def _get_parquet_engine(self) -> str:
Expand Down Expand Up @@ -401,12 +461,10 @@ def parse_dates(self) -> List[Optional[Any]]:
return [d[0] for d in description if d[1] in self._PARSE_DATES]

def _trunc_date(self, df: "DataFrame") -> "DataFrame":
description = self.description if self.description else []
times = [d[0] for d in description if d[1] in ("time", "time with time zone")]
if times:
truncated = df.loc[:, times].apply(lambda r: r.dt.time)
for time in times:
df.isetitem(df.columns.get_loc(time), truncated[time])
if self._time_columns:
truncated = df.loc[:, self._time_columns].apply(lambda r: r.dt.time)
for time_col in self._time_columns:
df.isetitem(df.columns.get_loc(time_col), truncated[time_col])
return df

def fetchone(
Expand Down Expand Up @@ -620,15 +678,42 @@ def _as_pandas(self) -> Union["TextFileReader", "DataFrame"]:
df = self._read_csv()
return df

def as_pandas(self) -> Union[DataFrameIterator, "DataFrame"]:
def as_pandas(self) -> Union[PandasDataFrameIterator, "DataFrame"]:
if self._chunksize is None:
return next(self._df_iter)
return self._df_iter

def iter_chunks(self) -> PandasDataFrameIterator:
"""Iterate over result chunks as pandas DataFrames.

This method provides an iterator interface for processing large result sets.
When chunksize is specified, it yields DataFrames in chunks for memory-efficient
processing. When chunksize is not specified, it yields the entire result as a
single DataFrame.

Returns:
PandasDataFrameIterator that yields pandas DataFrames for each chunk
of rows, or the entire DataFrame if chunksize was not specified.

Example:
>>> # With chunking for large datasets
>>> cursor = connection.cursor(PandasCursor, chunksize=50000)
>>> cursor.execute("SELECT * FROM large_table")
>>> for chunk in cursor.iter_chunks():
... process_chunk(chunk) # Each chunk is a pandas DataFrame
>>>
>>> # Without chunking - yields entire result as single chunk
>>> cursor = connection.cursor(PandasCursor)
>>> cursor.execute("SELECT * FROM small_table")
>>> for df in cursor.iter_chunks():
... process(df) # Single DataFrame with all data
"""
return self._df_iter

def close(self) -> None:
import pandas as pd

super().close()
self._df_iter = DataFrameIterator(pd.DataFrame(), _no_trunc_date)
self._df_iter = PandasDataFrameIterator(pd.DataFrame(), _no_trunc_date)
self._iterrows = enumerate([])
self._data_manifest = []
Loading