From f477817759a446823eec870fded83351ddbae579 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 19 Feb 2026 13:18:49 -0500 Subject: [PATCH] Fix breaking changes from renaming Chunk to EmbeddableItem --- ...am_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +- .../rag/enrichment/bigquery_vector_search.py | 2 +- .../apache_beam/ml/rag/ingestion/bigquery.py | 81 +++++++++++-------- 3 files changed, 50 insertions(+), 35 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index bb5da04014ec..83346d34aee0 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 15 + "modification": 16 } diff --git a/sdks/python/apache_beam/ml/rag/enrichment/bigquery_vector_search.py b/sdks/python/apache_beam/ml/rag/enrichment/bigquery_vector_search.py index 614e5f9c0800..e9269af27bd4 100644 --- a/sdks/python/apache_beam/ml/rag/enrichment/bigquery_vector_search.py +++ b/sdks/python/apache_beam/ml/rag/enrichment/bigquery_vector_search.py @@ -241,7 +241,7 @@ def format_query(self, items: List[EmbeddableItem]) -> str: ARRAY_AGG( STRUCT({"distance, " if self.include_distance else ""}\ {base_columns_str}) - ) as embeddable_items + ) as chunks FROM VECTOR_SEARCH( (SELECT {columns_str}, {self.embedding_column} FROM `{self.table_name}` diff --git a/sdks/python/apache_beam/ml/rag/ingestion/bigquery.py b/sdks/python/apache_beam/ml/rag/ingestion/bigquery.py index af170992b09c..2a7111c0d35f 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/bigquery.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/bigquery.py @@ -14,8 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import warnings from collections.abc import Callable -from dataclasses import dataclass from typing import Any from typing import Dict from typing import Optional @@ -28,41 +28,56 @@ from apache_beam.typehints.row_type import RowTypeConstraint EmbeddableToDictFn = Callable[[EmbeddableItem], Dict[str, any]] +# Backward compatibility alias. +ChunkToDictFn = EmbeddableToDictFn -@dataclass class SchemaConfig: - """Configuration for custom BigQuery schema and row conversion. - - Allows overriding the default schema and row conversion logic for BigQuery - vector storage. This enables custom table schemas beyond the default - id/embedding/content/metadata structure. - - Attributes: - schema: BigQuery TableSchema dict defining the table structure. - Example: - >>> { - ... 'fields': [ - ... {'name': 'id', 'type': 'STRING'}, - ... {'name': 'embedding', 'type': 'FLOAT64', 'mode': 'REPEATED'}, - ... {'name': 'custom_field', 'type': 'STRING'} - ... ] - ... } - embeddable_to_dict_fn: Function that converts an - EmbeddableItem to a dict matching the schema. - Takes an EmbeddableItem and returns - Dict[str, Any] with keys matching - schema fields. - Example: - >>> def embeddable_to_dict(item: EmbeddableItem) -> Dict[str, Any]: - ... return { - ... 'id': item.id, - ... 'embedding': item.embedding.dense_embedding, - ... 'custom_field': item.metadata.get('custom_field') - ... } - """ - schema: Dict - embeddable_to_dict_fn: EmbeddableToDictFn + def __init__( + self, + schema: Dict, + embeddable_to_dict_fn: Optional[EmbeddableToDictFn] = None, + **kwargs): + """Configuration for custom BigQuery schema and row conversion. + + Allows overriding the default schema and row conversion logic for BigQuery + vector storage. This enables custom table schemas beyond the default + id/embedding/content/metadata structure. + + Args: + schema: BigQuery TableSchema dict defining the table structure. + embeddable_to_dict_fn: Function that converts an EmbeddableItem to a + dict matching the schema. Takes an EmbeddableItem and returns + Dict[str, Any] with keys matching schema fields. + + Example with custom schema: + >>> schema_config = SchemaConfig( + ... schema={ + ... 'fields': [ + ... {'name': 'id', 'type': 'STRING'}, + ... {'name': 'embedding', 'type': 'FLOAT64', 'mode': 'REPEATED'}, + ... {'name': 'source_url', 'type': 'STRING'} + ... ] + ... }, + ... embeddable_to_dict_fn=lambda item: { + ... 'id': item.id, + ... 'embedding': item.embedding.dense_embedding, + ... 'source_url': item.metadata.get('url') + ... } + ... ) + """ + self.schema = schema + if 'chunk_to_dict_fn' in kwargs: + warnings.warn( + "chunk_to_dict_fn is deprecated, use embeddable_to_dict_fn", + DeprecationWarning, + stacklevel=2) + embeddable_to_dict_fn = kwargs.pop('chunk_to_dict_fn') + if kwargs: + raise TypeError(f"Unexpected keyword arguments: {', '.join(kwargs)}") + if embeddable_to_dict_fn is None: + raise TypeError("SchemaConfig requires embeddable_to_dict_fn") + self.embeddable_to_dict_fn = embeddable_to_dict_fn class BigQueryVectorWriterConfig(VectorDatabaseWriteConfig):