Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 15
"modification": 16
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def format_query(self, items: List[EmbeddableItem]) -> str:
ARRAY_AGG(
STRUCT({"distance, " if self.include_distance else ""}\
{base_columns_str})
) as embeddable_items
) as chunks
FROM VECTOR_SEARCH(
(SELECT {columns_str}, {self.embedding_column}
FROM `{self.table_name}`
Expand Down
81 changes: 48 additions & 33 deletions sdks/python/apache_beam/ml/rag/ingestion/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import warnings
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
from typing import Dict
from typing import Optional
Expand All @@ -28,41 +28,56 @@
from apache_beam.typehints.row_type import RowTypeConstraint

EmbeddableToDictFn = Callable[[EmbeddableItem], Dict[str, any]]
# Backward compatibility alias.
ChunkToDictFn = EmbeddableToDictFn


@dataclass
class SchemaConfig:
"""Configuration for custom BigQuery schema and row conversion.

Allows overriding the default schema and row conversion logic for BigQuery
vector storage. This enables custom table schemas beyond the default
id/embedding/content/metadata structure.

Attributes:
schema: BigQuery TableSchema dict defining the table structure.
Example:
>>> {
... 'fields': [
... {'name': 'id', 'type': 'STRING'},
... {'name': 'embedding', 'type': 'FLOAT64', 'mode': 'REPEATED'},
... {'name': 'custom_field', 'type': 'STRING'}
... ]
... }
embeddable_to_dict_fn: Function that converts an
EmbeddableItem to a dict matching the schema.
Takes an EmbeddableItem and returns
Dict[str, Any] with keys matching
schema fields.
Example:
>>> def embeddable_to_dict(chunk: EmbeddableItem) -> Dict[str, Any]:
... return {
... 'id': chunk.id,
... 'embedding': chunk.embedding.dense_embedding,
... 'custom_field': chunk.metadata.get('custom_field')
... }
"""
schema: Dict
embeddable_to_dict_fn: EmbeddableToDictFn
def __init__(
self,
schema: Dict,
embeddable_to_dict_fn: Optional[EmbeddableToDictFn] = None,
**kwargs):
"""Configuration for custom BigQuery schema and row conversion.

Allows overriding the default schema and row conversion logic for BigQuery
vector storage. This enables custom table schemas beyond the default
id/embedding/content/metadata structure.

Args:
schema: BigQuery TableSchema dict defining the table structure.
embeddable_to_dict_fn: Function that converts an EmbeddableItem to a
dict matching the schema. Takes an EmbeddableItem and returns
Dict[str, Any] with keys matching schema fields.

Example with custom schema:
>>> schema_config = SchemaConfig(
... schema={
... 'fields': [
... {'name': 'id', 'type': 'STRING'},
... {'name': 'embedding', 'type': 'FLOAT64', 'mode': 'REPEATED'},
... {'name': 'source_url', 'type': 'STRING'}
... ]
... },
... embeddable_to_dict_fn=lambda item: {
... 'id': item.id,
... 'embedding': item.embedding.dense_embedding,
... 'source_url': item.metadata.get('url')
... }
... )
"""
self.schema = schema
if 'chunk_to_dict_fn' in kwargs:
warnings.warn(
"chunk_to_dict_fn is deprecated, use embeddable_to_dict_fn",
DeprecationWarning,
stacklevel=2)
embeddable_to_dict_fn = kwargs.pop('chunk_to_dict_fn')
if kwargs:
raise TypeError(f"Unexpected keyword arguments: {', '.join(kwargs)}")
if embeddable_to_dict_fn is None:
raise TypeError("SchemaConfig requires embeddable_to_dict_fn")
self.embeddable_to_dict_fn = embeddable_to_dict_fn


class BigQueryVectorWriterConfig(VectorDatabaseWriteConfig):
Expand Down
Loading