Skip to content
Merged
124 changes: 124 additions & 0 deletions docs/polars.rst
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,94 @@ SQLAlchemy allows this option to be specified in the connection string.

NOTE: PolarsCursor handles the CSV file on memory. Pay attention to the memory capacity.

Chunksize Options
~~~~~~~~~~~~~~~~~

PolarsCursor supports memory-efficient chunked processing of large query results
using Polars' native lazy evaluation APIs. This allows processing datasets that
are too large to fit in memory.

The chunksize option can be enabled by specifying an integer value in the ``cursor_kwargs``
argument of the connect method or as an argument to the cursor method.

.. code:: python

from pyathena import connect
from pyathena.polars.cursor import PolarsCursor

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=PolarsCursor,
cursor_kwargs={
"chunksize": 50_000
}).cursor()

.. code:: python

from pyathena import connect
from pyathena.polars.cursor import PolarsCursor

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=PolarsCursor).cursor(chunksize=50_000)

When the chunksize option is enabled, data is loaded lazily in chunks. This applies
to all data access methods:

**Standard DB-API fetch methods** - ``fetchone()`` and ``fetchmany()`` load data
chunk by chunk as needed, keeping memory usage bounded:

.. code:: python

from pyathena import connect
from pyathena.polars.cursor import PolarsCursor

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=PolarsCursor).cursor(chunksize=50_000)

cursor.execute("SELECT * FROM large_table")
# Data is loaded in 50,000 row chunks as you iterate
for row in cursor:
process_row(row)

**iter_chunks() method** - Use this when you want to process data as Polars DataFrames
in chunks, which is more efficient for batch processing:

.. code:: python

from pyathena import connect
from pyathena.polars.cursor import PolarsCursor

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=PolarsCursor).cursor(chunksize=50_000)

cursor.execute("SELECT * FROM large_table")
for chunk in cursor.iter_chunks():
# Process each chunk - chunk is a polars.DataFrame
processed = chunk.group_by('category').agg(pl.sum('value'))
print(f"Processed chunk with {chunk.height} rows")

This method uses Polars' ``scan_csv()`` and ``scan_parquet()`` with ``collect_batches()``
for efficient lazy evaluation, minimizing memory usage when processing large datasets.

The chunked iteration also works with the unload option:

.. code:: python

from pyathena import connect
from pyathena.polars.cursor import PolarsCursor

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=PolarsCursor).cursor(chunksize=100_000, unload=True)

cursor.execute("SELECT * FROM huge_table")
for chunk in cursor.iter_chunks():
# Process Parquet data in chunks
process_chunk(chunk)

.. _async-polars-cursor:

AsyncPolarsCursor
Expand Down Expand Up @@ -414,6 +502,42 @@ As with AsyncPolarsCursor, the unload option is also available.
region_name="us-west-2",
cursor_class=AsyncPolarsCursor).cursor(unload=True)

As with PolarsCursor, the chunksize option is also available for memory-efficient processing.
When chunksize is specified, data is loaded lazily in chunks for both standard fetch methods
and ``iter_chunks()``.

.. code:: python

from pyathena import connect
from pyathena.polars.async_cursor import AsyncPolarsCursor

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncPolarsCursor).cursor(chunksize=50_000)

query_id, future = cursor.execute("SELECT * FROM large_table")
result_set = future.result()

# Standard iteration - data loaded in chunks
for row in result_set:
process_row(row)

.. code:: python

from pyathena import connect
from pyathena.polars.async_cursor import AsyncPolarsCursor

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncPolarsCursor).cursor(chunksize=50_000)

query_id, future = cursor.execute("SELECT * FROM large_table")
result_set = future.result()

# Process as DataFrame chunks
for chunk in result_set.iter_chunks():
process_chunk(chunk)

.. _`polars.DataFrame object`: https://docs.pola.rs/api/python/stable/reference/dataframe/index.html
.. _`Polars`: https://pola.rs/
.. _`Unload options`: arrow.html#unload-options
Expand Down
8 changes: 8 additions & 0 deletions pyathena/polars/async_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def __init__(
result_reuse_minutes: int = CursorIterator.DEFAULT_RESULT_REUSE_MINUTES,
block_size: Optional[int] = None,
cache_type: Optional[str] = None,
chunksize: Optional[int] = None,
**kwargs,
) -> None:
"""Initialize an AsyncPolarsCursor.
Expand All @@ -93,10 +94,15 @@ def __init__(
result_reuse_minutes: Minutes to reuse cached results.
block_size: S3 read block size.
cache_type: S3 caching strategy.
chunksize: Number of rows per chunk for memory-efficient processing.
If specified, data is loaded lazily in chunks for all data
access methods including fetchone(), fetchmany(), and iter_chunks().
**kwargs: Additional connection parameters.

Example:
>>> cursor = connection.cursor(AsyncPolarsCursor, unload=True)
>>> # With chunked processing
>>> cursor = connection.cursor(AsyncPolarsCursor, chunksize=50000)
"""
super().__init__(
s3_staging_dir=s3_staging_dir,
Expand All @@ -116,6 +122,7 @@ def __init__(
self._unload = unload
self._block_size = block_size
self._cache_type = cache_type
self._chunksize = chunksize

@staticmethod
def get_default_converter(
Expand Down Expand Up @@ -172,6 +179,7 @@ def _collect_result_set(
block_size=self._block_size,
cache_type=self._cache_type,
max_workers=self._max_workers,
chunksize=self._chunksize,
**kwargs,
)

Expand Down
55 changes: 54 additions & 1 deletion pyathena/polars/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,18 @@

import logging
from multiprocessing import cpu_count
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union, cast
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterator,
List,
Optional,
Tuple,
Union,
cast,
)

from pyathena.common import BaseCursor, CursorIterator
from pyathena.error import OperationalError, ProgrammingError
Expand Down Expand Up @@ -74,6 +85,7 @@ def __init__(
block_size: Optional[int] = None,
cache_type: Optional[str] = None,
max_workers: int = (cpu_count() or 1) * 5,
chunksize: Optional[int] = None,
**kwargs,
) -> None:
"""Initialize a PolarsCursor.
Expand All @@ -94,10 +106,15 @@ def __init__(
block_size: S3 read block size.
cache_type: S3 caching strategy.
max_workers: Maximum worker threads for parallel S3 operations.
chunksize: Number of rows per chunk for memory-efficient processing.
If specified, data is loaded lazily in chunks for all data
access methods including fetchone(), fetchmany(), and iter_chunks().
**kwargs: Additional connection parameters.

Example:
>>> cursor = connection.cursor(PolarsCursor, unload=True)
>>> # With chunked processing
>>> cursor = connection.cursor(PolarsCursor, chunksize=50000)
"""
super().__init__(
s3_staging_dir=s3_staging_dir,
Expand All @@ -117,6 +134,7 @@ def __init__(
self._block_size = block_size
self._cache_type = cache_type
self._max_workers = max_workers
self._chunksize = chunksize
self._query_id: Optional[str] = None
self._result_set: Optional[AthenaPolarsResultSet] = None

Expand Down Expand Up @@ -272,6 +290,7 @@ def execute(
block_size=self._block_size,
cache_type=self._cache_type,
max_workers=self._max_workers,
chunksize=self._chunksize,
**kwargs,
)
else:
Expand Down Expand Up @@ -404,3 +423,37 @@ def as_arrow(self) -> "Table":
raise ProgrammingError("No result set.")
result_set = cast(AthenaPolarsResultSet, self.result_set)
return result_set.as_arrow()

def iter_chunks(self) -> Iterator["pl.DataFrame"]:
"""Iterate over result chunks as Polars DataFrames.

This method provides an iterator interface for processing result sets.
When chunksize is specified, it yields DataFrames in chunks using lazy
evaluation for memory-efficient processing. When chunksize is not specified,
it yields the entire result as a single DataFrame, providing a consistent
interface regardless of chunking configuration.

Yields:
Polars DataFrame for each chunk of rows, or the entire DataFrame
if chunksize was not specified.

Raises:
ProgrammingError: If no result set is available.

Example:
>>> # With chunking for large datasets
>>> cursor = connection.cursor(PolarsCursor, chunksize=50000)
>>> cursor.execute("SELECT * FROM large_table")
>>> for chunk in cursor.iter_chunks():
... process_chunk(chunk) # Each chunk is a Polars DataFrame
>>>
>>> # Without chunking - yields entire result as single chunk
>>> cursor = connection.cursor(PolarsCursor)
>>> cursor.execute("SELECT * FROM small_table")
>>> for df in cursor.iter_chunks():
... process(df) # Single DataFrame with all data
"""
if not self.has_result_set:
raise ProgrammingError("No result set.")
result_set = cast(AthenaPolarsResultSet, self.result_set)
yield from result_set.iter_chunks()
Loading