diff --git a/docs/api.rst b/docs/api.rst index 9cb68d02..8c69f8d2 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -15,6 +15,7 @@ This section provides comprehensive API documentation for all PyAthena classes a api/s3fs api/spark api/converters + api/sqlalchemy api/filesystem api/models api/utilities @@ -38,9 +39,10 @@ Specialized Integrations - :ref:`api_arrow` - Apache Arrow columnar data integration - :ref:`api_s3fs` - Lightweight S3FS-based cursor (no pandas/pyarrow required) - :ref:`api_spark` - Apache Spark integration for big data processing +- :ref:`api_sqlalchemy` - SQLAlchemy dialect implementations Infrastructure ~~~~~~~~~~~~~~~ -- :ref:`api_filesystem` - S3 filesystem integration and object management +- :ref:`api_filesystem` - S3 filesystem integration and object management - :ref:`api_models` - Athena query execution and metadata models \ No newline at end of file diff --git a/docs/api/arrow.rst b/docs/api/arrow.rst index 803af3e8..7dc3ba49 100644 --- a/docs/api/arrow.rst +++ b/docs/api/arrow.rst @@ -3,7 +3,7 @@ Apache Arrow Integration ======================== -This section covers Apache Arrow-specific cursors and data converters. +This section covers Apache Arrow-specific cursors, result sets, and data converters. Arrow Cursors ------------- @@ -16,6 +16,13 @@ Arrow Cursors :members: :inherited-members: +Arrow Result Set +---------------- + +.. autoclass:: pyathena.arrow.result_set.AthenaArrowResultSet + :members: + :inherited-members: + Arrow Data Converters ---------------------- @@ -23,4 +30,11 @@ Arrow Data Converters :members: .. autoclass:: pyathena.arrow.converter.DefaultArrowUnloadTypeConverter - :members: \ No newline at end of file + :members: + +Arrow Utilities +--------------- + +.. autofunction:: pyathena.arrow.util.to_column_info + +.. autofunction:: pyathena.arrow.util.get_athena_type \ No newline at end of file diff --git a/docs/api/connection.rst b/docs/api/connection.rst index 69cb53f4..8bdf0e37 100644 --- a/docs/api/connection.rst +++ b/docs/api/connection.rst @@ -35,4 +35,18 @@ Asynchronous Cursors .. autoclass:: pyathena.async_cursor.AsyncDictCursor :members: - :inherited-members: \ No newline at end of file + :inherited-members: + +Result Sets +----------- + +.. autoclass:: pyathena.result_set.AthenaResultSet + :members: + :inherited-members: + +.. autoclass:: pyathena.result_set.AthenaDictResultSet + :members: + :inherited-members: + +.. autoclass:: pyathena.result_set.WithResultSet + :members: \ No newline at end of file diff --git a/docs/api/filesystem.rst b/docs/api/filesystem.rst index abdf6d98..827803d9 100644 --- a/docs/api/filesystem.rst +++ b/docs/api/filesystem.rst @@ -11,6 +11,9 @@ S3 FileSystem .. autoclass:: pyathena.filesystem.s3.S3FileSystem :members: +.. autoclass:: pyathena.filesystem.s3.S3File + :members: + S3 Objects ---------- diff --git a/docs/api/pandas.rst b/docs/api/pandas.rst index f933a27d..bfc2d6bd 100644 --- a/docs/api/pandas.rst +++ b/docs/api/pandas.rst @@ -3,7 +3,7 @@ Pandas Integration ================== -This section covers pandas-specific cursors and data converters. +This section covers pandas-specific cursors, result sets, and data converters. Pandas Cursors -------------- @@ -16,6 +16,16 @@ Pandas Cursors :members: :inherited-members: +Pandas Result Set +----------------- + +.. autoclass:: pyathena.pandas.result_set.AthenaPandasResultSet + :members: + :inherited-members: + +.. autoclass:: pyathena.pandas.result_set.DataFrameIterator + :members: + Pandas Data Converters ----------------------- @@ -23,4 +33,23 @@ Pandas Data Converters :members: .. autoclass:: pyathena.pandas.converter.DefaultPandasUnloadTypeConverter - :members: \ No newline at end of file + :members: + +Pandas Utilities +---------------- + +.. autofunction:: pyathena.pandas.util.get_chunks + +.. autofunction:: pyathena.pandas.util.reset_index + +.. autofunction:: pyathena.pandas.util.as_pandas + +.. autofunction:: pyathena.pandas.util.to_sql_type_mappings + +.. autofunction:: pyathena.pandas.util.to_parquet + +.. autofunction:: pyathena.pandas.util.to_sql + +.. autofunction:: pyathena.pandas.util.get_column_names_and_types + +.. autofunction:: pyathena.pandas.util.generate_ddl \ No newline at end of file diff --git a/docs/api/sqlalchemy.rst b/docs/api/sqlalchemy.rst new file mode 100644 index 00000000..af613691 --- /dev/null +++ b/docs/api/sqlalchemy.rst @@ -0,0 +1,63 @@ +.. _api_sqlalchemy: + +SQLAlchemy Integration +====================== + +This section covers SQLAlchemy dialect implementations for Amazon Athena. + +Dialects +-------- + +.. autoclass:: pyathena.sqlalchemy.rest.AthenaRestDialect + :members: + :inherited-members: + +.. autoclass:: pyathena.sqlalchemy.pandas.AthenaPandasDialect + :members: + :inherited-members: + +.. autoclass:: pyathena.sqlalchemy.arrow.AthenaArrowDialect + :members: + :inherited-members: + +Type System +----------- + +.. autoclass:: pyathena.sqlalchemy.types.AthenaTimestamp + :members: + +.. autoclass:: pyathena.sqlalchemy.types.AthenaDate + :members: + +.. autoclass:: pyathena.sqlalchemy.types.Tinyint + :members: + +.. autoclass:: pyathena.sqlalchemy.types.AthenaStruct + :members: + +.. autoclass:: pyathena.sqlalchemy.types.AthenaMap + :members: + +.. autoclass:: pyathena.sqlalchemy.types.AthenaArray + :members: + +Compilers +--------- + +.. autoclass:: pyathena.sqlalchemy.compiler.AthenaTypeCompiler + :members: + +.. autoclass:: pyathena.sqlalchemy.compiler.AthenaStatementCompiler + :members: + +.. autoclass:: pyathena.sqlalchemy.compiler.AthenaDDLCompiler + :members: + +Identifier Preparers +-------------------- + +.. autoclass:: pyathena.sqlalchemy.preparer.AthenaDMLIdentifierPreparer + :members: + +.. autoclass:: pyathena.sqlalchemy.preparer.AthenaDDLIdentifierPreparer + :members: diff --git a/pyathena/arrow/util.py b/pyathena/arrow/util.py index 6b24b7eb..4bd66843 100644 --- a/pyathena/arrow/util.py +++ b/pyathena/arrow/util.py @@ -1,4 +1,11 @@ # -*- coding: utf-8 -*- +"""Utilities for converting PyArrow types to Athena metadata. + +This module provides functions to convert PyArrow schema and type information +to Athena-compatible column metadata, enabling proper type mapping when +reading query results in Apache Arrow format. +""" + from __future__ import annotations from typing import TYPE_CHECKING, Any, Dict, Tuple, cast @@ -9,6 +16,22 @@ def to_column_info(schema: "Schema") -> Tuple[Dict[str, Any], ...]: + """Convert a PyArrow schema to Athena column information. + + Iterates through all fields in the schema and converts each field's + type information to an Athena-compatible column metadata dictionary. + + Args: + schema: A PyArrow Schema object containing field definitions. + + Returns: + A tuple of dictionaries, each containing column metadata with keys: + - Name: The column name + - Type: The Athena SQL type name + - Precision: Numeric precision (0 for non-numeric types) + - Scale: Numeric scale (0 for non-numeric types) + - Nullable: Either "NULLABLE" or "NOT_NULL" + """ columns = [] for field in schema: type_, precision, scale = get_athena_type(field.type) @@ -25,6 +48,25 @@ def to_column_info(schema: "Schema") -> Tuple[Dict[str, Any], ...]: def get_athena_type(type_: "DataType") -> Tuple[str, int, int]: + """Map a PyArrow data type to an Athena SQL type. + + Converts PyArrow type identifiers to corresponding Athena SQL type names + with appropriate precision and scale values. Handles all common Arrow + types including numeric, string, binary, temporal, and complex types. + + Args: + type_: A PyArrow DataType object to convert. + + Returns: + A tuple of (type_name, precision, scale) where: + - type_name: The Athena SQL type (e.g., "varchar", "bigint", "timestamp") + - precision: The numeric precision or max length + - scale: The numeric scale (decimal places) + + Note: + Unknown types default to "string" with maximum varchar length. + Decimal types preserve their original precision and scale. + """ import pyarrow.lib as types if type_.id in [types.Type_BOOL]: # 1 diff --git a/pyathena/model.py b/pyathena/model.py index 3aff27eb..9b90b068 100644 --- a/pyathena/model.py +++ b/pyathena/model.py @@ -274,6 +274,27 @@ def result_reuse_minutes(self) -> Optional[int]: class AthenaCalculationExecutionStatus: + """Status information for an Athena calculation execution. + + This class represents the current state and statistics of a calculation + execution in Amazon Athena's notebook or interactive session environment. + It tracks the calculation's lifecycle from creation through completion. + + Calculation States: + - CREATING: Calculation is being created + - CREATED: Calculation has been created + - QUEUED: Calculation is waiting to execute + - RUNNING: Calculation is currently executing + - CANCELING: Calculation is being cancelled + - CANCELED: Calculation was cancelled + - COMPLETED: Calculation completed successfully + - FAILED: Calculation execution failed + + See Also: + AWS Athena CalculationExecutionStatus API reference: + https://docs.aws.amazon.com/athena/latest/APIReference/API_CalculationStatus.html + """ + STATE_CREATING: str = "CREATING" STATE_CREATED: str = "CREATED" STATE_QUEUED: str = "QUEUED" @@ -324,6 +345,20 @@ def progress(self) -> Optional[str]: class AthenaCalculationExecution(AthenaCalculationExecutionStatus): + """Represents a complete Athena calculation execution with status and results. + + This class extends AthenaCalculationExecutionStatus to include additional + information about the calculation execution, including session details, + working directory, and result locations in S3. + + Attributes are inherited from AthenaCalculationExecutionStatus for state + and timing information. + + See Also: + AWS Athena CalculationExecution API reference: + https://docs.aws.amazon.com/athena/latest/APIReference/API_CalculationSummary.html + """ + def __init__(self, response: Dict[str, Any]) -> None: super().__init__(response) @@ -377,6 +412,27 @@ def result_type(self) -> Optional[str]: class AthenaSessionStatus: + """Status information for an Athena interactive session. + + This class represents the current state of an interactive session in + Amazon Athena, used for notebook and Spark workloads. Sessions provide + a persistent environment for running multiple calculations. + + Session States: + - CREATING: Session is being created + - CREATED: Session has been created + - IDLE: Session is idle and ready for calculations + - BUSY: Session is executing a calculation + - TERMINATING: Session is being terminated + - TERMINATED: Session has been terminated + - DEGRADED: Session is in a degraded state + - FAILED: Session creation or execution failed + + See Also: + AWS Athena Session API reference: + https://docs.aws.amazon.com/athena/latest/APIReference/API_SessionStatus.html + """ + STATE_CREATING: str = "CREATING" STATE_CREATED: str = "CREATED" STATE_IDLE: str = "IDLE" @@ -429,6 +485,17 @@ def idle_since_date_time(self) -> Optional[datetime]: class AthenaDatabase: + """Represents an Athena database (schema) and its metadata. + + This class encapsulates information about a database in the AWS Glue + Data Catalog that is accessible through Amazon Athena. Databases serve + as containers for tables and views. + + See Also: + AWS Athena Database API reference: + https://docs.aws.amazon.com/athena/latest/APIReference/API_Database.html + """ + def __init__(self, response): database = response.get("Database") if not database: @@ -452,6 +519,16 @@ def parameters(self) -> Dict[str, str]: class AthenaTableMetadataColumn: + """Represents a column definition in an Athena table. + + This class contains information about a single column in a table, + including its name, data type, and optional comment. + + See Also: + AWS Athena Column API reference: + https://docs.aws.amazon.com/athena/latest/APIReference/API_Column.html + """ + def __init__(self, response): self._name: Optional[str] = response.get("Name") self._type: Optional[str] = response.get("Type") @@ -471,6 +548,17 @@ def comment(self) -> Optional[str]: class AthenaTableMetadataPartitionKey: + """Represents a partition key definition in an Athena table. + + This class contains information about a partition key column, + which is used to organize data in partitioned tables for + improved query performance. + + See Also: + AWS Athena Column API reference: + https://docs.aws.amazon.com/athena/latest/APIReference/API_Column.html + """ + def __init__(self, response): self._name: Optional[str] = response.get("Name") self._type: Optional[str] = response.get("Type") @@ -490,6 +578,20 @@ def comment(self) -> Optional[str]: class AthenaTableMetadata: + """Represents comprehensive metadata for an Athena table. + + This class contains detailed information about a table in the AWS Glue + Data Catalog, including columns, partition keys, storage format, + serialization library, and various table properties. + + The class provides convenient properties for accessing common table + attributes like location, file format, compression, and SerDe configuration. + + See Also: + AWS Athena TableMetadata API reference: + https://docs.aws.amazon.com/athena/latest/APIReference/API_TableMetadata.html + """ + def __init__(self, response): table_metadata = response.get("TableMetadata") if not table_metadata: @@ -651,6 +753,32 @@ def is_orc(value: str) -> bool: class AthenaRowFormatSerde: + """Row format serializer/deserializer (SerDe) constants for Athena tables. + + This class provides constants for the various SerDe libraries that can be + used to serialize and deserialize data in Athena tables. SerDes define how + data is read from and written to underlying storage formats. + + The class also provides utility methods to detect specific SerDe types + from table metadata strings. + + Supported SerDes: + - CSV: OpenCSVSerde for CSV files + - REGEX: RegexSerDe for regex-parsed text files + - LAZY_SIMPLE: LazySimpleSerDe for simple delimited text + - CLOUD_TRAIL: CloudTrailSerde for AWS CloudTrail logs + - GROK: GrokSerDe for grok pattern parsing + - JSON: JsonSerDe for JSON data (OpenX implementation) + - JSON_HCATALOG: JsonSerDe for JSON data (HCatalog implementation) + - PARQUET: ParquetHiveSerDe for Parquet files + - ORC: OrcSerde for ORC files + - AVRO: AvroSerDe for Avro files + + See Also: + AWS Athena SerDe Reference: + https://docs.aws.amazon.com/athena/latest/ug/serde-reference.html + """ + PATTERN_ROW_FORMAT_SERDE: Pattern[str] = re.compile(r"^(?i:serde) '(?P.+)'$") ROW_FORMAT_SERDE_CSV: str = "org.apache.hadoop.hive.serde2.OpenCSVSerde" @@ -744,6 +872,36 @@ def is_valid(value: str) -> bool: class AthenaPartitionTransform: + """Partition transform constants for Iceberg tables in Athena. + + This class provides constants for partition transforms used with Apache + Iceberg tables in Athena. Partition transforms allow you to create + derived partition values from source column data, enabling more flexible + and efficient partitioning strategies. + + Transforms: + - year: Extract year from a timestamp/date column + - month: Extract year and month from a timestamp/date column + - day: Extract year, month, and day from a timestamp/date column + - hour: Extract year, month, day, and hour from a timestamp column + - bucket: Hash partition into N buckets + - truncate: Truncate values to a specified width + + Example: + Iceberg table with partition transforms:: + + CREATE TABLE my_table ( + id bigint, + ts timestamp, + category string + ) + PARTITIONED BY (month(ts), bucket(16, category)) + + See Also: + AWS Athena Iceberg Partitioning: + https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-creating-tables.html + """ + PARTITION_TRANSFORM_YEAR: str = "year" PARTITION_TRANSFORM_MONTH: str = "month" PARTITION_TRANSFORM_DAY: str = "day" diff --git a/pyathena/pandas/result_set.py b/pyathena/pandas/result_set.py index 89c55535..846cc9ab 100644 --- a/pyathena/pandas/result_set.py +++ b/pyathena/pandas/result_set.py @@ -39,6 +39,30 @@ def _no_trunc_date(df: "DataFrame") -> "DataFrame": class DataFrameIterator(abc.Iterator): # type: ignore + """Iterator for chunked DataFrame results from Athena queries. + + This class wraps either a pandas TextFileReader (for chunked reading) or + a single DataFrame, providing a unified iterator interface. It applies + optional date truncation to each DataFrame chunk as it's yielded. + + The iterator is used by AthenaPandasResultSet to provide chunked access + to large query results, enabling memory-efficient processing of datasets + that would be too large to load entirely into memory. + + Example: + >>> # Iterate over DataFrame chunks + >>> for df_chunk in iterator: + ... process(df_chunk) + >>> + >>> # Iterate over individual rows + >>> for idx, row in iterator.iterrows(): + ... print(row) + + Note: + This class is primarily for internal use by AthenaPandasResultSet. + Most users should access results through PandasCursor methods. + """ + def __init__( self, reader: Union["TextFileReader", "DataFrame"], diff --git a/pyathena/pandas/util.py b/pyathena/pandas/util.py index 7baaa9c1..3eb756c5 100644 --- a/pyathena/pandas/util.py +++ b/pyathena/pandas/util.py @@ -38,6 +38,18 @@ def get_chunks(df: "DataFrame", chunksize: Optional[int] = None) -> Iterator["DataFrame"]: + """Split a DataFrame into chunks of specified size. + + Args: + df: The DataFrame to split into chunks. + chunksize: Number of rows per chunk. If None, yields the entire DataFrame. + + Yields: + DataFrame chunks of the specified size. + + Raises: + ValueError: If chunksize is less than or equal to zero. + """ rows = len(df) if rows == 0: return @@ -56,6 +68,15 @@ def get_chunks(df: "DataFrame", chunksize: Optional[int] = None) -> Iterator["Da def reset_index(df: "DataFrame", index_label: Optional[str] = None) -> None: + """Reset the DataFrame index and add it as a column. + + Args: + df: The DataFrame to reset the index on (modified in-place). + index_label: Name for the index column. Defaults to "index". + + Raises: + ValueError: If the index name conflicts with existing column names. + """ df.index.name = index_label if index_label else "index" try: df.reset_index(inplace=True) @@ -64,6 +85,19 @@ def reset_index(df: "DataFrame", index_label: Optional[str] = None) -> None: def as_pandas(cursor: "Cursor", coerce_float: bool = False) -> "DataFrame": + """Convert cursor results to a pandas DataFrame. + + Fetches all remaining rows from the cursor and converts them to a + DataFrame with column names from the cursor description. + + Args: + cursor: A PyAthena cursor with executed query results. + coerce_float: If True, attempt to convert non-string columns to float. + + Returns: + A DataFrame containing the query results, or an empty DataFrame + if no results are available. + """ from pandas import DataFrame description = cursor.description @@ -74,6 +108,20 @@ def as_pandas(cursor: "Cursor", coerce_float: bool = False) -> "DataFrame": def to_sql_type_mappings(col: "Series") -> str: + """Map a pandas Series data type to an Athena SQL type. + + Infers the appropriate Athena SQL type based on the pandas Series dtype. + Used when creating tables from DataFrames. + + Args: + col: A pandas Series to determine the SQL type for. + + Returns: + The Athena SQL type name (e.g., "STRING", "BIGINT", "DOUBLE"). + + Raises: + ValueError: If the data type is not supported (complex, time). + """ import pandas as pd col_type = pd.api.types.infer_dtype(col, skipna=True) @@ -112,6 +160,24 @@ def to_parquet( compression: Optional[str] = None, flavor: str = "spark", ) -> str: + """Write a DataFrame to S3 as a Parquet file. + + Converts the DataFrame to Apache Arrow format and writes it to S3 + as a Parquet file with a UUID-based filename. + + Args: + df: The DataFrame to write. + bucket_name: S3 bucket name. + prefix: S3 key prefix (path within the bucket). + retry_config: Configuration for API call retries. + session_kwargs: Arguments for creating a boto3 Session. + client_kwargs: Arguments for creating the S3 client. + compression: Parquet compression codec (e.g., "snappy", "gzip"). + flavor: Parquet flavor for compatibility ("spark" or "hive"). + + Returns: + The S3 URI of the written Parquet file. + """ import pyarrow as pa from pyarrow import parquet as pq @@ -148,6 +214,35 @@ def to_sql( max_workers: int = (cpu_count() or 1) * 5, repair_table=True, ) -> None: + """Write a DataFrame to an Athena table backed by Parquet files in S3. + + Creates an external Athena table from a DataFrame by writing the data + as Parquet files to S3 and executing the appropriate DDL statements. + Supports partitioning, compression, and parallel uploads. + + Args: + df: The DataFrame to write to Athena. + name: Name of the table to create. + conn: PyAthena connection object. + location: S3 location for the table data (e.g., "s3://bucket/path/"). + schema: Database schema name. Defaults to "default". + index: If True, include the DataFrame index as a column. + index_label: Name for the index column if index=True. + partitions: List of column names to use as partition keys. + chunksize: Number of rows per Parquet file. None for single file. + if_exists: Action if table exists: "fail", "replace", or "append". + compression: Parquet compression codec (e.g., "snappy", "gzip"). + flavor: Parquet flavor for compatibility ("spark" or "hive"). + type_mappings: Function to map pandas types to SQL types. + executor_class: Executor class for parallel uploads. + max_workers: Maximum number of parallel upload workers. + repair_table: If True, run ALTER TABLE ADD PARTITION for partitioned tables. + + Raises: + ValueError: If if_exists is invalid, compression is unsupported, + or partition keys contain None values. + OperationalError: If if_exists="fail" and table already exists. + """ if if_exists not in ("fail", "replace", "append"): raise ValueError(f"`{if_exists}` is not valid for if_exists") if compression is not None and not AthenaCompression.is_valid(compression): @@ -280,6 +375,15 @@ def to_sql( def get_column_names_and_types(df: "DataFrame", type_mappings) -> "OrderedDict[str, str]": + """Extract column names and their SQL types from a DataFrame. + + Args: + df: The DataFrame to extract column information from. + type_mappings: Function to map pandas types to SQL types. + + Returns: + An OrderedDict mapping column names to their SQL type strings. + """ return OrderedDict( ((str(df.columns[i]), type_mappings(df.iloc[:, i])) for i in range(len(df.columns))) ) @@ -294,6 +398,23 @@ def generate_ddl( compression: Optional[str] = None, type_mappings: Callable[["Series"], str] = to_sql_type_mappings, ) -> str: + """Generate CREATE EXTERNAL TABLE DDL for a DataFrame. + + Creates DDL for an external Athena table with Parquet storage format + based on the DataFrame's schema. + + Args: + df: The DataFrame to generate DDL for. + name: Name of the table to create. + location: S3 location for the table data. + schema: Database schema name. Defaults to "default". + partitions: List of column names to use as partition keys. + compression: Parquet compression codec for TBLPROPERTIES. + type_mappings: Function to map pandas types to SQL types. + + Returns: + The CREATE EXTERNAL TABLE DDL statement as a string. + """ if partitions is None: partitions = [] column_names_and_types = get_column_names_and_types(df, type_mappings) diff --git a/pyathena/result_set.py b/pyathena/result_set.py index ae2c3972..15b97304 100644 --- a/pyathena/result_set.py +++ b/pyathena/result_set.py @@ -31,6 +31,36 @@ class AthenaResultSet(CursorIterator): + """Result set for Athena query execution using the GetQueryResults API. + + This class provides a DB API 2.0 compliant result set implementation that + fetches query results from Amazon Athena. It uses the GetQueryResults API + to retrieve data in paginated chunks, converting each value according to + its Athena data type. + + The result set exposes query execution metadata (timing, data scanned, + state, etc.) through read-only properties, allowing inspection of query + performance and status. + + This is the base result set implementation used by the standard Cursor. + Specialized implementations exist for different output formats: + - :class:`~pyathena.arrow.result_set.AthenaArrowResultSet`: Apache Arrow format + - :class:`~pyathena.pandas.result_set.AthenaPandasResultSet`: Pandas DataFrame + - :class:`~pyathena.s3fs.result_set.AthenaS3FSResultSet`: S3 file-based access + + Example: + >>> cursor.execute("SELECT * FROM my_table") + >>> result_set = cursor.result_set + >>> print(f"Query ID: {result_set.query_id}") + >>> print(f"Data scanned: {result_set.data_scanned_in_bytes} bytes") + >>> for row in result_set: + ... print(row) + + See Also: + AWS Athena GetQueryResults API: + https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html + """ + def __init__( self, connection: "Connection[Any]", diff --git a/pyathena/s3fs/converter.py b/pyathena/s3fs/converter.py index 0df11b3e..4396d4ac 100644 --- a/pyathena/s3fs/converter.py +++ b/pyathena/s3fs/converter.py @@ -46,5 +46,17 @@ def __init__(self) -> None: ) def convert(self, type_: str, value: Optional[str]) -> Optional[Any]: + """Convert a string value to the appropriate Python type. + + Looks up the converter function for the given Athena type and applies + it to the value. If the value is None, returns None without conversion. + + Args: + type_: The Athena data type name (e.g., "integer", "varchar", "date"). + value: The string value to convert, or None. + + Returns: + The converted Python value, or None if the input value was None. + """ converter = self.get(type_) return converter(value) diff --git a/pyathena/sqlalchemy/arrow.py b/pyathena/sqlalchemy/arrow.py index c7d8dea8..34dde6da 100644 --- a/pyathena/sqlalchemy/arrow.py +++ b/pyathena/sqlalchemy/arrow.py @@ -9,6 +9,35 @@ class AthenaArrowDialect(AthenaDialect): + """SQLAlchemy dialect for Amazon Athena with Apache Arrow result format. + + This dialect extends AthenaDialect to use ArrowCursor, which returns + query results as Apache Arrow Tables. Arrow format provides efficient + columnar data representation, making it ideal for analytical workloads + and integration with data science tools. + + Connection URL Format: + ``awsathena+arrow://{access_key}:{secret_key}@athena.{region}.amazonaws.com/{schema}`` + + Query Parameters: + In addition to the base dialect parameters: + - unload: If "true", use UNLOAD for Parquet output (better performance + for large datasets) + + Example: + >>> from sqlalchemy import create_engine + >>> engine = create_engine( + ... "awsathena+arrow://:@athena.us-west-2.amazonaws.com/default" + ... "?s3_staging_dir=s3://my-bucket/athena-results/" + ... "&unload=true" + ... ) + + See Also: + :class:`~pyathena.arrow.cursor.ArrowCursor`: The underlying cursor + implementation. + :class:`~pyathena.sqlalchemy.base.AthenaDialect`: Base dialect class. + """ + driver = "arrow" supports_statement_cache = True diff --git a/pyathena/sqlalchemy/base.py b/pyathena/sqlalchemy/base.py index 2ea4d431..08d63fe2 100644 --- a/pyathena/sqlalchemy/base.py +++ b/pyathena/sqlalchemy/base.py @@ -86,6 +86,51 @@ class AthenaDialect(DefaultDialect): + """SQLAlchemy dialect for Amazon Athena. + + This dialect enables SQLAlchemy to communicate with Amazon Athena, + allowing you to use SQLAlchemy's ORM and Core features with Athena + as the backend database engine. + + The dialect handles Athena-specific SQL syntax, data type mapping, + and schema reflection. It supports table creation with Athena-specific + options like file format, compression, and partitioning. + + Connection URL Format: + ``awsathena+rest://{access_key}:{secret_key}@athena.{region}.amazonaws.com/{schema}`` + + Query Parameters: + - s3_staging_dir: S3 location for query results (required) + - work_group: Athena workgroup name + - catalog_name: Data catalog name (default: AwsDataCatalog) + - poll_interval: Query status polling interval in seconds + + Example: + >>> from sqlalchemy import create_engine + >>> engine = create_engine( + ... "awsathena+rest://:@athena.us-west-2.amazonaws.com/default" + ... "?s3_staging_dir=s3://my-bucket/athena-results/" + ... ) + >>> with engine.connect() as conn: + ... result = conn.execute(text("SELECT * FROM my_table")) + + Dialect Options: + Table-level options (prefix with ``awsathena_``): + - location: S3 location for table data + - compression: Compression format (SNAPPY, GZIP, etc.) + - file_format: File format (PARQUET, ORC, etc.) + - row_format: Row format specification + - tblproperties: Table properties dictionary + + Column-level options: + - partition: Mark column as partition key + - cluster: Mark column as clustering key + + See Also: + SQLAlchemy Dialects: + https://docs.sqlalchemy.org/en/20/dialects/ + """ + name: str = "awsathena" preparer: Type[IdentifierPreparer] = AthenaDMLIdentifierPreparer statement_compiler: Type[SQLCompiler] = AthenaStatementCompiler diff --git a/pyathena/sqlalchemy/compiler.py b/pyathena/sqlalchemy/compiler.py index 6c3fb58a..0501426b 100644 --- a/pyathena/sqlalchemy/compiler.py +++ b/pyathena/sqlalchemy/compiler.py @@ -41,6 +41,26 @@ class AthenaTypeCompiler(GenericTypeCompiler): + """Type compiler for Amazon Athena SQL types. + + This compiler translates SQLAlchemy type objects into Athena-compatible + SQL type strings for use in DDL statements. It handles the mapping between + SQLAlchemy's portable types and Athena's specific type syntax. + + Athena has specific requirements for type names that differ from standard + SQL. For example, FLOAT maps to REAL in CAST expressions, and various + string types (TEXT, NCHAR, NVARCHAR) all map to STRING. + + The compiler also supports Athena-specific complex types: + - STRUCT/ROW: Nested record types with named fields + - MAP: Key-value pair collections + - ARRAY: Ordered collections of elements + + See Also: + AWS Athena Data Types: + https://docs.aws.amazon.com/athena/latest/ug/data-types.html + """ + def visit_FLOAT(self, type_: Type[Any], **kw) -> str: # noqa: N802 return self.visit_REAL(type_, **kw) @@ -180,6 +200,25 @@ def visit_ARRAY(self, type_, **kw): # noqa: N802 class AthenaStatementCompiler(SQLCompiler): + """SQL statement compiler for Amazon Athena queries. + + This compiler generates Athena-compatible SQL statements from SQLAlchemy + expression constructs. It handles Athena-specific SQL syntax including: + + - Function name mapping (e.g., char_length -> length) + - Lambda expressions in functions like filter() + - CAST expressions with Athena type requirements + - OFFSET/LIMIT clause ordering (Athena uses OFFSET before LIMIT) + - Time travel hints (FOR TIMESTAMP AS OF, FOR VERSION AS OF) + + The compiler ensures that generated SQL is compatible with Presto/Trino + syntax used by Athena engine versions 2 and 3. + + See Also: + AWS Athena SQL Reference: + https://docs.aws.amazon.com/athena/latest/ug/ddl-sql-reference.html + """ + def visit_char_length_func(self, fn: "FunctionElement[Any]", **kw): return f"length{self.function_argspec(fn, **kw)}" @@ -259,6 +298,42 @@ def format_from_hint_text(self, sqltext, table, hint, iscrud): class AthenaDDLCompiler(DDLCompiler): + """DDL compiler for Amazon Athena CREATE TABLE and related statements. + + This compiler generates Athena-compatible DDL statements including support + for Athena-specific table options: + + - External table creation (EXTERNAL keyword for Hive-style tables) + - Iceberg table creation (managed tables with ACID support) + - File formats: PARQUET, ORC, TEXTFILE, JSON, AVRO, etc. + - Row formats with SerDe specifications + - Compression settings for various file formats + - Table locations in S3 + - Partitioning (both Hive-style and Iceberg transforms) + - Bucketing/clustering for optimized queries + + The compiler uses backtick quoting for DDL identifiers (different from + DML which uses double quotes) and handles Athena's reserved words. + + Example: + A table created with this compiler might generate:: + + CREATE EXTERNAL TABLE IF NOT EXISTS my_schema.my_table ( + id INT, + name STRING + ) + PARTITIONED BY ( + dt STRING + ) + STORED AS PARQUET + LOCATION 's3://my-bucket/my-table/' + TBLPROPERTIES ('parquet.compress' = 'SNAPPY') + + See Also: + AWS Athena CREATE TABLE: + https://docs.aws.amazon.com/athena/latest/ug/create-table.html + """ + @property def preparer(self) -> IdentifierPreparer: return self._preparer diff --git a/pyathena/sqlalchemy/pandas.py b/pyathena/sqlalchemy/pandas.py index 8d32a03b..30383eea 100644 --- a/pyathena/sqlalchemy/pandas.py +++ b/pyathena/sqlalchemy/pandas.py @@ -9,6 +9,36 @@ class AthenaPandasDialect(AthenaDialect): + """SQLAlchemy dialect for Amazon Athena with pandas DataFrame result format. + + This dialect extends AthenaDialect to use PandasCursor, which returns + query results as pandas DataFrames. This integration enables seamless + use of Athena data in data analysis and machine learning workflows. + + Connection URL Format: + ``awsathena+pandas://{access_key}:{secret_key}@athena.{region}.amazonaws.com/{schema}`` + + Query Parameters: + In addition to the base dialect parameters: + - unload: If "true", use UNLOAD for Parquet output (better performance + for large datasets) + - engine: CSV parsing engine ("c", "python", or "pyarrow") + - chunksize: Number of rows per chunk for memory-efficient processing + + Example: + >>> from sqlalchemy import create_engine + >>> engine = create_engine( + ... "awsathena+pandas://:@athena.us-west-2.amazonaws.com/default" + ... "?s3_staging_dir=s3://my-bucket/athena-results/" + ... "&unload=true&chunksize=10000" + ... ) + + See Also: + :class:`~pyathena.pandas.cursor.PandasCursor`: The underlying cursor + implementation. + :class:`~pyathena.sqlalchemy.base.AthenaDialect`: Base dialect class. + """ + driver = "pandas" supports_statement_cache = True diff --git a/pyathena/sqlalchemy/preparer.py b/pyathena/sqlalchemy/preparer.py index 119b7a85..3664027c 100644 --- a/pyathena/sqlalchemy/preparer.py +++ b/pyathena/sqlalchemy/preparer.py @@ -12,10 +12,42 @@ class AthenaDMLIdentifierPreparer(IdentifierPreparer): + """Identifier preparer for Athena DML (SELECT, INSERT, etc.) statements. + + This preparer handles quoting and escaping of identifiers in DML statements. + It uses double quotes for identifiers and recognizes Athena's SELECT + statement reserved words to determine when quoting is necessary. + + Athena's DML syntax follows Presto/Trino conventions, which differ from + DDL syntax (which uses Hive conventions with backticks). + + See Also: + :class:`AthenaDDLIdentifierPreparer`: Preparer for DDL statements. + AWS Athena Reserved Words: + https://docs.aws.amazon.com/athena/latest/ug/reserved-words.html + """ + reserved_words: Set[str] = SELECT_STATEMENT_RESERVED_WORDS class AthenaDDLIdentifierPreparer(IdentifierPreparer): + """Identifier preparer for Athena DDL (CREATE, ALTER, DROP) statements. + + This preparer handles quoting and escaping of identifiers in DDL statements. + It uses backticks for identifiers (Hive convention) rather than double + quotes (Presto/Trino convention used in DML). + + Key differences from DML preparer: + - Uses backtick (`) as the quote character + - Recognizes DDL-specific reserved words + - Treats underscore (_) as an illegal initial character + + See Also: + :class:`AthenaDMLIdentifierPreparer`: Preparer for DML statements. + AWS Athena DDL Reserved Words: + https://docs.aws.amazon.com/athena/latest/ug/reserved-words.html + """ + reserved_words = DDL_RESERVED_WORDS illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS.union("_") diff --git a/pyathena/sqlalchemy/rest.py b/pyathena/sqlalchemy/rest.py index dc8c6c9e..67d56afc 100644 --- a/pyathena/sqlalchemy/rest.py +++ b/pyathena/sqlalchemy/rest.py @@ -8,6 +8,31 @@ class AthenaRestDialect(AthenaDialect): + """SQLAlchemy dialect for Amazon Athena using the standard REST API cursor. + + This dialect uses the default Cursor implementation, which retrieves + query results via the GetQueryResults API. Results are returned as + Python tuples with type conversion handled by the default converter. + + This is the standard dialect for general-purpose Athena access and is + suitable for most use cases where specialized result formats (Arrow, + pandas) are not required. + + Connection URL Format: + ``awsathena+rest://{access_key}:{secret_key}@athena.{region}.amazonaws.com/{schema}`` + + Example: + >>> from sqlalchemy import create_engine + >>> engine = create_engine( + ... "awsathena+rest://:@athena.us-west-2.amazonaws.com/default" + ... "?s3_staging_dir=s3://my-bucket/athena-results/" + ... ) + + See Also: + :class:`~pyathena.cursor.Cursor`: The underlying cursor implementation. + :class:`~pyathena.sqlalchemy.base.AthenaDialect`: Base dialect class. + """ + driver = "rest" supports_statement_cache = True diff --git a/pyathena/sqlalchemy/types.py b/pyathena/sqlalchemy/types.py index ee798d77..3a4df852 100644 --- a/pyathena/sqlalchemy/types.py +++ b/pyathena/sqlalchemy/types.py @@ -13,6 +13,24 @@ class AthenaTimestamp(TypeEngine[datetime]): + """SQLAlchemy type for Athena TIMESTAMP values. + + This type handles the conversion of Python datetime objects to Athena's + TIMESTAMP literal syntax. When used in queries, datetime values are + rendered as ``TIMESTAMP 'YYYY-MM-DD HH:MM:SS.mmm'``. + + The type supports millisecond precision (3 decimal places) which matches + Athena's TIMESTAMP type precision. + + Example: + >>> from sqlalchemy import Column, Table, MetaData + >>> from pyathena.sqlalchemy.types import AthenaTimestamp + >>> metadata = MetaData() + >>> events = Table('events', metadata, + ... Column('event_time', AthenaTimestamp) + ... ) + """ + render_literal_cast = True render_bind_cast = True @@ -27,6 +45,21 @@ def literal_processor(self, dialect: "Dialect") -> Optional["_LiteralProcessorTy class AthenaDate(TypeEngine[date]): + """SQLAlchemy type for Athena DATE values. + + This type handles the conversion of Python date objects to Athena's + DATE literal syntax. When used in queries, date values are rendered + as ``DATE 'YYYY-MM-DD'``. + + Example: + >>> from sqlalchemy import Column, Table, MetaData + >>> from pyathena.sqlalchemy.types import AthenaDate + >>> metadata = MetaData() + >>> orders = Table('orders', metadata, + ... Column('order_date', AthenaDate) + ... ) + """ + render_literal_cast = True render_bind_cast = True @@ -41,14 +74,53 @@ def literal_processor(self, dialect: "Dialect") -> Optional["_LiteralProcessorTy class Tinyint(sqltypes.Integer): + """SQLAlchemy type for Athena TINYINT (8-bit signed integer). + + TINYINT stores values from -128 to 127. This type is useful for + columns that contain small integer values to optimize storage. + """ + __visit_name__ = "tinyint" class TINYINT(Tinyint): + """Uppercase alias for Tinyint type. + + This provides SQLAlchemy-style uppercase naming convention. + """ + __visit_name__ = "TINYINT" class AthenaStruct(TypeEngine[Dict[str, Any]]): + """SQLAlchemy type for Athena STRUCT/ROW complex type. + + STRUCT represents a record with named fields, similar to a database row + or a Python dictionary with typed values. Each field has a name and a + data type. + + Args: + *fields: Field specifications. Each can be either: + - A string (field name, defaults to STRING type) + - A tuple of (field_name, field_type) + + Example: + >>> from sqlalchemy import Column, Table, MetaData, types + >>> from pyathena.sqlalchemy.types import AthenaStruct + >>> metadata = MetaData() + >>> users = Table('users', metadata, + ... Column('address', AthenaStruct( + ... ('street', types.String), + ... ('city', types.String), + ... ('zip_code', types.Integer) + ... )) + ... ) + + See Also: + AWS Athena STRUCT Type: + https://docs.aws.amazon.com/athena/latest/ug/rows-and-structs.html + """ + __visit_name__ = "struct" def __init__(self, *fields: Union[str, Tuple[str, Any]]) -> None: @@ -76,10 +148,34 @@ def python_type(self) -> type: class STRUCT(AthenaStruct): + """Uppercase alias for AthenaStruct type.""" + __visit_name__ = "STRUCT" class AthenaMap(TypeEngine[Dict[str, Any]]): + """SQLAlchemy type for Athena MAP complex type. + + MAP represents a collection of key-value pairs where all keys have the + same type and all values have the same type. + + Args: + key_type: SQLAlchemy type for map keys. Defaults to String. + value_type: SQLAlchemy type for map values. Defaults to String. + + Example: + >>> from sqlalchemy import Column, Table, MetaData, types + >>> from pyathena.sqlalchemy.types import AthenaMap + >>> metadata = MetaData() + >>> settings = Table('settings', metadata, + ... Column('config', AthenaMap(types.String, types.Integer)) + ... ) + + See Also: + AWS Athena MAP Type: + https://docs.aws.amazon.com/athena/latest/ug/maps.html + """ + __visit_name__ = "map" def __init__(self, key_type: Any = None, value_type: Any = None) -> None: @@ -105,10 +201,32 @@ def python_type(self) -> type: class MAP(AthenaMap): + """Uppercase alias for AthenaMap type.""" + __visit_name__ = "MAP" class AthenaArray(TypeEngine[List[Any]]): + """SQLAlchemy type for Athena ARRAY complex type. + + ARRAY represents an ordered collection of elements of the same type. + + Args: + item_type: SQLAlchemy type for array elements. Defaults to String. + + Example: + >>> from sqlalchemy import Column, Table, MetaData, types + >>> from pyathena.sqlalchemy.types import AthenaArray + >>> metadata = MetaData() + >>> posts = Table('posts', metadata, + ... Column('tags', AthenaArray(types.String)) + ... ) + + See Also: + AWS Athena ARRAY Type: + https://docs.aws.amazon.com/athena/latest/ug/arrays.html + """ + __visit_name__ = "array" def __init__(self, item_type: Any = None) -> None: @@ -126,4 +244,6 @@ def python_type(self) -> type: class ARRAY(AthenaArray): + """Uppercase alias for AthenaArray type.""" + __visit_name__ = "ARRAY" diff --git a/pyathena/sqlalchemy/util.py b/pyathena/sqlalchemy/util.py index f8c47f7e..984f125c 100644 --- a/pyathena/sqlalchemy/util.py +++ b/pyathena/sqlalchemy/util.py @@ -1,6 +1,14 @@ # -*- coding: utf-8 -*- +"""Utility classes for PyAthena SQLAlchemy dialect.""" class _HashableDict(dict): # type: ignore + """A dictionary subclass that can be used as a dictionary key. + + SQLAlchemy's reflection caching requires hashable objects. This class + enables dictionary values (like table properties) to be cached by + making them hashable through tuple conversion. + """ + def __hash__(self): # type: ignore return hash(tuple(sorted(self.items())))