"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "application/javascript": [
+ "\n",
+ " if (typeof window.interactive_beam_jquery == 'undefined') {\n",
+ " var jqueryScript = document.createElement('script');\n",
+ " jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n",
+ " jqueryScript.type = 'text/javascript';\n",
+ " jqueryScript.onload = function() {\n",
+ " var datatableScript = document.createElement('script');\n",
+ " datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n",
+ " datatableScript.type = 'text/javascript';\n",
+ " datatableScript.onload = function() {\n",
+ " window.interactive_beam_jquery = jQuery.noConflict(true);\n",
+ " window.interactive_beam_jquery(document).ready(function($){\n",
+ " \n",
+ " $(\"#progress_indicator_ef090119901644a31067b90f8d98d385\").remove();\n",
+ " });\n",
+ " }\n",
+ " document.head.appendChild(datatableScript);\n",
+ " };\n",
+ " document.head.appendChild(jqueryScript);\n",
+ " } else {\n",
+ " window.interactive_beam_jquery(document).ready(function($){\n",
+ " \n",
+ " $(\"#progress_indicator_ef090119901644a31067b90f8d98d385\").remove();\n",
+ " });\n",
+ " }"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "class DocumentSplitterDoFn(beam.DoFn):\n",
+ " def setup(self):\n",
+ " # The `chunk_size` parameter is constrained by the embedding model we’re using.\n",
+ " # Since we’re using `sentence-transformers/all-MiniLM-L6-v2`, which has a maximum\n",
+ " # token limit of ~384 tokens, we need to ensure chunk sizes stay well within that limit.\n",
+ " # Given that each document in our dataset contains approximately 331 tokens, using a chunk\n",
+ " # size of 256 allows us to preserve nearly the most semantic meaning of each entry while\n",
+ " # staying safely under the model’s token limit.\n",
+ " #\n",
+ " # For simplicity, We'll use sentence splitting as the chunking strategy for simplicity. Ideally,\n",
+ " # we would pass a tokenizer here — preferably the same one used by the retriever to ensure\n",
+ " # consistency. However, in this example, we are not using a tokenizer.\n",
+ " from llama_index.core.text_splitter import SentenceSplitter\n",
+ " chunk_size, chunk_overlap = 256, 20\n",
+ " self.llama_txt_splitter = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)\n",
+ "\n",
+ " def process(self, element: Dict[str, Any]) -> List[Chunk]:\n",
+ " id_field, content_field = 'id', 'content'\n",
+ " metadata_fields = [\"title\", \"keywords\", \"tags\"]\n",
+ " global_doc_id = element.get('id', str(uuid.uuid4()))\n",
+ " text_content = element.get('content', '')\n",
+ " splits = self.llama_txt_splitter.split_text(text_content)\n",
+ " for i, split in enumerate(splits):\n",
+ " local_doc_id = f\"{global_doc_id}_{i}\"\n",
+ " yield Chunk(id=local_doc_id, content=Content(split), metadata={f:element[f] for f in metadata_fields})\n",
+ "\n",
+ "class ChunkingTransformProvider(ChunkingTransformProvider):\n",
+ " def get_splitter_transform(self) -> beam.PTransform[beam.PCollection[Dict[str, Any]], beam.PCollection[Chunk]]:\n",
+ " return beam.ParDo(DocumentSplitterDoFn())\n",
+ "\n",
+ "class IndexToVectorDBDoFn(beam.DoFn):\n",
+ " def __init__(self, collection_name: str, batch_size: int = 100):\n",
+ " self.collection_name = collection_name\n",
+ " self.batch_size = batch_size\n",
+ "\n",
+ " def setup(self):\n",
+ " self._client = MilvusClient(**milvus_connection_parameters.__dict__)\n",
+ "\n",
+ " def start_bundle(self):\n",
+ " self._batch = []\n",
+ "\n",
+ " def process(self, doc: Chunk):\n",
+ " doc_to_index = {\n",
+ " \"id\": doc.id,\n",
+ " \"content\": doc.content.text,\n",
+ " \"title_and_content\": f\"{doc.metadata['title']}. {doc.content.text}\",\n",
+ " \"metadata\": doc.metadata,\n",
+ " \"embedding\": doc.embedding.dense_embedding,\n",
+ " }\n",
+ " self._batch.append(doc_to_index)\n",
+ "\n",
+ " if len(self._batch) >= self.batch_size:\n",
+ " self._flush_batch()\n",
+ "\n",
+ " yield doc_to_index\n",
+ "\n",
+ " def finish_bundle(self):\n",
+ " if self._batch:\n",
+ " self._flush_batch()\n",
+ "\n",
+ " def _flush_batch(self):\n",
+ " if self._batch:\n",
+ " # Upsert API gives us a built-in idempotency over the insert API.\n",
+ " result = self._client.upsert(collection_name=self.collection_name, data=self._batch)\n",
+ " print(f\"Upserted batch of {len(self._batch)} documents. Result: {result}\")\n",
+ " self._batch = []\n",
+ "\n",
+ "huggingface_embedder = HuggingfaceTextEmbeddings(\n",
+ " model_name=embedding_model_config[\"name\"],\n",
+ " max_seq_length=embedding_model_config[\"token_limit\"])\n",
+ "\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " data_transformed = (\n",
+ " pipeline\n",
+ " | 'Creating Documents' >> beam.Create(corpus)\n",
+ " | 'Converting to Chunks' >> MLTransform(\n",
+ " write_artifact_location=tempfile.mkdtemp()).with_transform(ChunkingTransformProvider())\n",
+ " | 'Generating Embeddings' >> MLTransform(\n",
+ " write_artifact_location=tempfile.mkdtemp()).with_transform(huggingface_embedder)\n",
+ " | 'Indexing to Vector DB' >> beam.ParDo(IndexToVectorDBDoFn(collection_name=collection_name))\n",
+ " )\n",
+ "\n",
+ "ib.show(data_transformed)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "ea478136-2ca8-4fee-bb1e-6bfcc2e97c93",
+ "metadata": {},
+ "source": [
+ "## Milvus Beam Enrichment Handler"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "e9ad2509-3e5d-42e8-b565-ecccde38b8f4",
+ "metadata": {},
+ "source": [
+ "### Prep for Milvus Beam Enrichment Handler"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 26,
+ "id": "4911e8cc-10f1-4d21-9251-1b756b61f2c1",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "class FormatAndPrintResults(beam.PTransform):\n",
+ " def expand(self, pcoll):\n",
+ " return pcoll | beam.Map(self.format_and_print)\n",
+ " \n",
+ " @staticmethod\n",
+ " def format_and_print(chunk):\n",
+ " # Create a clean structure to display.\n",
+ " formatted_result = {\n",
+ " \"query\": chunk.content.text,\n",
+ " \"query_embedding\": FormatAndPrintResults.get_embedding_count(chunk),\n",
+ " \"results\": []\n",
+ " }\n",
+ " \n",
+ " # Extract the enrichment data\n",
+ " enrichment_data = chunk.metadata.get('enrichment_data', defaultdict(list))\n",
+ " \n",
+ " # Format each result with its distance score\n",
+ " for i in range(len(enrichment_data.get('id', []))):\n",
+ " result = {\n",
+ " \"id\": enrichment_data['id'][i],\n",
+ " \"distance\": round(enrichment_data['distance'][i], 4),\n",
+ " \"fields\": enrichment_data['fields'][i] if i < len(enrichment_data.get('fields', [])) else {}\n",
+ " }\n",
+ " formatted_result[\"results\"].append(result)\n",
+ " \n",
+ " # Sort by distance in descending order (highest/best first)\n",
+ " formatted_result[\"results\"] = sorted(formatted_result[\"results\"], key=lambda x: x[\"distance\"], reverse=True)\n",
+ "\n",
+ " # Print the formatted JSON\n",
+ " print_json(data=formatted_result)\n",
+ " \n",
+ " # Return the original chunk for further processing if needed\n",
+ " return chunk\n",
+ "\n",
+ " @staticmethod\n",
+ " def get_embedding_count(chunk):\n",
+ " if chunk.embedding:\n",
+ " if chunk.embedding.dense_embedding:\n",
+ " return len(chunk.embedding.dense_embedding)\n",
+ " if chunk.embedding.sparse_embedding:\n",
+ " return len(chunk.embedding.sparse_embedding)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "656110c9-1360-49fd-ba17-f55f2257f127",
+ "metadata": {},
+ "source": [
+ "### Vector Search"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "2d165518-b27b-40a8-ae0a-42342df3c1eb",
+ "metadata": {},
+ "source": [
+ "Let’s choose a deliberate query that illustrates the unique benefits of pure vector search, especially its ability to grasp semantic meaning:\n",
+ "\n",
+ "Query: `How do I process large datasets efficiently?`\n",
+ "\n",
+ "This query demonstrates vector search advantages because:\n",
+ "\n",
+ "- **Dense vector (semantic) contribution:** The semantic component understands the conceptual intent of \"processing large datasets efficiently,\" connecting it to frameworks like **Apache Beam** and **Google Cloud Dataflow**, even if those terms aren't in the query.\n",
+ "- **Overcoming keyword limitations:** For conversational queries like this, traditional keyword search struggles. Vector search moves beyond exact lexical matching to find documents that semantically answer the \"how-to\" aspect.\n",
+ "- **Vector search advantage:** Documents describing solutions like **Apache Beam** (e.g., Document #1) rank highest. Vector search understands that Beam's \"unified programming model for defining and executing data processing pipelines\" directly addresses the query's need for efficient large dataset processing, even without an exact phrase match, by prioritizing based on deep semantic alignment."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 27,
+ "id": "74db1238-0a04-4e08-818d-5bce8f09006b",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "query = \"How do I process large datasets efficiently?\"\n",
+ "query_chunk = Chunk(content=Content(text=query))"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 28,
+ "id": "79e16531-8bec-4b4b-9ed3-cebd705480e0",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "search_parameters = MilvusSearchParameters(\n",
+ " collection_name=collection_name,\n",
+ " search_strategy=VectorSearchParameters(limit=10, anns_field=\"embedding\"),\n",
+ " output_fields=[\"metadata\",\"content\"])"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 29,
+ "id": "cbef1911-6464-4ba1-8974-ed00896c7e8b",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "collection_load_parameters = MilvusCollectionLoadParameters() "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 30,
+ "id": "f0481286-3f2b-4690-a2f6-a5a00de3ff34",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "milvus_handler = MilvusSearchEnrichmentHandler(\n",
+ " connection_parameters=milvus_connection_parameters,\n",
+ " search_parameters=search_parameters,\n",
+ " collection_load_parameters=collection_load_parameters)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 31,
+ "id": "35ee37f2-60cd-4d5d-aef6-aed4fda79161",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Union[Tuple[apache_beam.pvalue.PCollection[~MLTransformOutputT], apache_beam.pvalue.PCollection[apache_beam.pvalue.Row]], apache_beam.pvalue.PCollection[~MLTransformOutputT]] instead.\n",
+ "WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Union[Tuple[apache_beam.pvalue.PCollection[~MLTransformOutputT], apache_beam.pvalue.PCollection[apache_beam.pvalue.Row]], apache_beam.pvalue.PCollection[~MLTransformOutputT]] instead.\n"
+ ]
+ },
+ {
+ "data": {
+ "text/html": [
+ "{\n",
+ " \"query\": \"How do I process large datasets efficiently?\",\n",
+ " \"query_embedding\": 384,\n",
+ " \"results\": [\n",
+ " {\n",
+ " \"id\": \"1_0\",\n",
+ " \"distance\": 0.3657,\n",
+ " \"fields\": {\n",
+ " \"content\": \"Apache Beam is an open-source framework that provides a consistent programming model for both batch and streaming data processing. Developed originally by Google, it allows developers to write pipelines that can run on multiple engines, such as Apache Flink, Spark, and Google Cloud Dataflow. Beam uses abstractions like PCollections (data containers) and PTransforms (operations) to define the flow of data. The framework promotes portability through its runner architecture, letting the same pipeline execute on different backends. Support for multiple SDKs, including Java and Python, makes it accessible for a broad audience. Key features include support for event time, windowing, triggers, and stateful processing, which are essential for handling real-time data effectively. Beam is ideal for building ETL jobs, real-time analytics, and machine learning data pipelines. It helps teams focus on logic rather than infrastructure, offering flexibility and scalability in handling unbounded and bounded data sources. Apache Beam also supports a wide range of connectors for both input and output, including Kafka, BigQuery, and JDBC-based systems. This makes it easy to integrate Beam into existing data ecosystems. Developers can build reusable transforms and modularize pipeline logic, improving maintainability and testing.\",\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Beam\",\n",
+ " \"stream processing\",\n",
+ " \"batch processing\",\n",
+ " \"data pipelines\",\n",
+ " \"SDK\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Data Engineering\",\n",
+ " \"Open Source\",\n",
+ " \"Streaming\",\n",
+ " \"Batch\",\n",
+ " \"Big Data\"\n",
+ " ]\n",
+ " }\n",
+ " }\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"2_1\",\n",
+ " \"distance\": 0.3369,\n",
+ " \"fields\": {\n",
+ " \"content\": \"For developers, Dataflow provides local testing capabilities and a unified logging system through Cloud Logging. It also supports SQL-based pipeline definitions using BigQuery, which lowers the barrier to entry for analysts and data engineers. Dataflow’s streaming engine significantly improves performance and reduces costs by decoupling compute and state management. In summary, Google Cloud Dataflow not only simplifies the deployment of Apache Beam pipelines but also enhances them with cloud-native features. Its managed runtime, high availability, and integration with the broader Google Cloud ecosystem make it a powerful tool for modern data processing.\",\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Google Cloud Dataflow: Run Apache Beam in the Cloud\",\n",
+ " \"keywords\": [\n",
+ " \"Google Cloud\",\n",
+ " \"Dataflow\",\n",
+ " \"Apache Beam\",\n",
+ " \"serverless\",\n",
+ " \"stream and batch\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Cloud Computing\",\n",
+ " \"Data Pipelines\",\n",
+ " \"Google Cloud\",\n",
+ " \"Serverless\",\n",
+ " \"Enterprise\"\n",
+ " ]\n",
+ " }\n",
+ " }\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"2_0\",\n",
+ " \"distance\": 0.2918,\n",
+ " \"fields\": {\n",
+ " \"content\": \"Google Cloud Dataflow is a fully managed service that runs Apache Beam pipelines in the cloud. It abstracts away infrastructure management and handles dynamic scaling, load balancing, and fault tolerance. Developers can focus on writing data logic using the Beam SDK and deploy it easily to Google Cloud. Dataflow supports both batch and stream processing and integrates seamlessly with other Google services like BigQuery, Pub/Sub, and Cloud Storage. Its autoscaling capabilities allow it to adapt to changing data volumes, optimizing for cost and performance. Features like monitoring dashboards, job templates, and built-in logging make it suitable for both development and production use. With support for event time processing, stateful functions, and windowing, Dataflow is well-suited for real-time analytics and data transformation tasks. It’s a key component for architects building scalable, cloud-native data platforms. Dataflow also offers templates for common ETL tasks, helping teams get started quickly with minimal setup. Its integration with Cloud Functions and Cloud Composer enables event-driven and orchestrated workflows. Security and compliance are built-in with IAM roles, encryption at rest and in transit, and audit logging, making it suitable for enterprise environments.\",\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Google Cloud Dataflow: Run Apache Beam in the Cloud\",\n",
+ " \"keywords\": [\n",
+ " \"Google Cloud\",\n",
+ " \"Dataflow\",\n",
+ " \"Apache Beam\",\n",
+ " \"serverless\",\n",
+ " \"stream and batch\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Cloud Computing\",\n",
+ " \"Data Pipelines\",\n",
+ " \"Google Cloud\",\n",
+ " \"Serverless\",\n",
+ " \"Enterprise\"\n",
+ " ]\n",
+ " }\n",
+ " }\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"1_1\",\n",
+ " \"distance\": 0.2638,\n",
+ " \"fields\": {\n",
+ " \"content\": \"Developers can build reusable transforms and modularize pipeline logic, improving maintainability and testing. The concept of runners enables developers to write once and run anywhere, which is particularly appealing for organizations that want to avoid vendor lock-in. The Beam model is based on a unified programming model that decouples pipeline logic from execution. This makes it easier to reason about time and state in both batch and streaming pipelines. Advanced features like late data handling, watermarks, and session windowing allow for more accurate and meaningful processing of real-world data. Beam also integrates with orchestration tools and monitoring systems, allowing for production-grade deployments. Community support and contributions have grown significantly, making Beam a stable and evolving ecosystem. Many cloud providers offer native support for Beam pipelines, and it's increasingly a core component in modern data platform architectures.\",\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Beam\",\n",
+ " \"stream processing\",\n",
+ " \"batch processing\",\n",
+ " \"data pipelines\",\n",
+ " \"SDK\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Data Engineering\",\n",
+ " \"Open Source\",\n",
+ " \"Streaming\",\n",
+ " \"Batch\",\n",
+ " \"Big Data\"\n",
+ " ]\n",
+ " }\n",
+ " }\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"3_0\",\n",
+ " \"distance\": 0.031,\n",
+ " \"fields\": {\n",
+ " \"content\": \"Google Beam is an innovative video communication platform that builds on the research of Project Starline. It uses AI, 3D imaging, and light field rendering to create immersive, lifelike video calls. Designed to replicate in-person interaction, Beam allows users to see life-sized, three-dimensional representations of each other without the need for headsets. This breakthrough makes remote conversations feel natural—capturing facial expressions, eye contact, and subtle gestures that traditional video conferencing often misses. Beam reduces meeting fatigue and enhances engagement, making it ideal for enterprise collaboration, interviews, and virtual presence scenarios. Powered by Google AI, Beam represents a significant leap in communication technology. Major companies like Salesforce, Deloitte, and NEC are already exploring its impact on digital collaboration. Google is partnering with HP to build and distribute Beam hardware, designed to work with existing productivity and video tools. Currently in limited early access for enterprise partners, Google Beam aims to redefine virtual meetings by bridging the gap between digital and physical presence. It’s a promising step toward more human and effective remote interactions.\",\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Google Beam: 3D Communication Powered by AI\",\n",
+ " \"keywords\": [\n",
+ " \"Google Beam\",\n",
+ " \"Project Starline\",\n",
+ " \"3D video\",\n",
+ " \"AI communication\",\n",
+ " \"real-time meetings\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"AI\",\n",
+ " \"Communication\",\n",
+ " \"3D Technology\",\n",
+ " \"Remote Work\",\n",
+ " \"Enterprise Tech\"\n",
+ " ]\n",
+ " }\n",
+ " }\n",
+ " }\n",
+ " ]\n",
+ "}\n",
+ "\n"
+ ],
+ "text/plain": [
+ "\u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"How do I process large datasets efficiently?\"\u001b[0m,\n",
+ " \u001b[1;34m\"query_embedding\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n",
+ " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_0\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.3657\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"content\"\u001b[0m: \u001b[32m\"Apache Beam is an open-source framework that provides a consistent programming model for both batch and streaming data processing. Developed originally by Google, it allows developers to write pipelines that can run on multiple engines, such as Apache Flink, Spark, and Google Cloud Dataflow. Beam uses abstractions like PCollections (data containers) and PTransforms (operations) to define the flow of data. The framework promotes portability through its runner architecture, letting the same pipeline execute on different backends. Support for multiple SDKs, including Java and Python, makes it accessible for a broad audience. Key features include support for event time, windowing, triggers, and stateful processing, which are essential for handling real-time data effectively. Beam is ideal for building ETL jobs, real-time analytics, and machine learning data pipelines. It helps teams focus on logic rather than infrastructure, offering flexibility and scalability in handling unbounded and bounded data sources. Apache Beam also supports a wide range of connectors for both input and output, including Kafka, BigQuery, and JDBC-based systems. This makes it easy to integrate Beam into existing data ecosystems. Developers can build reusable transforms and modularize pipeline logic, improving maintainability and testing.\"\u001b[0m,\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"stream processing\"\u001b[0m,\n",
+ " \u001b[32m\"batch processing\"\u001b[0m,\n",
+ " \u001b[32m\"data pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"SDK\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Data Engineering\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Batch\"\u001b[0m,\n",
+ " \u001b[32m\"Big Data\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m,\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"2_1\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.3369\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"content\"\u001b[0m: \u001b[32m\"For developers, Dataflow provides local testing capabilities and a unified logging system through Cloud Logging. It also supports SQL-based pipeline definitions using BigQuery, which lowers the barrier to entry for analysts and data engineers. Dataflow’s streaming engine significantly improves performance and reduces costs by decoupling compute and state management. In summary, Google Cloud Dataflow not only simplifies the deployment of Apache Beam pipelines but also enhances them with cloud-native features. Its managed runtime, high availability, and integration with the broader Google Cloud ecosystem make it a powerful tool for modern data processing.\"\u001b[0m,\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Google Cloud Dataflow: Run Apache Beam in the Cloud\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Google Cloud\"\u001b[0m,\n",
+ " \u001b[32m\"Dataflow\"\u001b[0m,\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"serverless\"\u001b[0m,\n",
+ " \u001b[32m\"stream and batch\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Cloud Computing\"\u001b[0m,\n",
+ " \u001b[32m\"Data Pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"Google Cloud\"\u001b[0m,\n",
+ " \u001b[32m\"Serverless\"\u001b[0m,\n",
+ " \u001b[32m\"Enterprise\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m,\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"2_0\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.2918\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"content\"\u001b[0m: \u001b[32m\"Google Cloud Dataflow is a fully managed service that runs Apache Beam pipelines in the cloud. It abstracts away infrastructure management and handles dynamic scaling, load balancing, and fault tolerance. Developers can focus on writing data logic using the Beam SDK and deploy it easily to Google Cloud. Dataflow supports both batch and stream processing and integrates seamlessly with other Google services like BigQuery, Pub/Sub, and Cloud Storage. Its autoscaling capabilities allow it to adapt to changing data volumes, optimizing for cost and performance. Features like monitoring dashboards, job templates, and built-in logging make it suitable for both development and production use. With support for event time processing, stateful functions, and windowing, Dataflow is well-suited for real-time analytics and data transformation tasks. It’s a key component for architects building scalable, cloud-native data platforms. Dataflow also offers templates for common ETL tasks, helping teams get started quickly with minimal setup. Its integration with Cloud Functions and Cloud Composer enables event-driven and orchestrated workflows. Security and compliance are built-in with IAM roles, encryption at rest and in transit, and audit logging, making it suitable for enterprise environments.\"\u001b[0m,\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Google Cloud Dataflow: Run Apache Beam in the Cloud\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Google Cloud\"\u001b[0m,\n",
+ " \u001b[32m\"Dataflow\"\u001b[0m,\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"serverless\"\u001b[0m,\n",
+ " \u001b[32m\"stream and batch\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Cloud Computing\"\u001b[0m,\n",
+ " \u001b[32m\"Data Pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"Google Cloud\"\u001b[0m,\n",
+ " \u001b[32m\"Serverless\"\u001b[0m,\n",
+ " \u001b[32m\"Enterprise\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m,\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_1\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.2638\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"content\"\u001b[0m: \u001b[32m\"Developers can build reusable transforms and modularize pipeline logic, improving maintainability and testing. The concept of runners enables developers to write once and run anywhere, which is particularly appealing for organizations that want to avoid vendor lock-in. The Beam model is based on a unified programming model that decouples pipeline logic from execution. This makes it easier to reason about time and state in both batch and streaming pipelines. Advanced features like late data handling, watermarks, and session windowing allow for more accurate and meaningful processing of real-world data. Beam also integrates with orchestration tools and monitoring systems, allowing for production-grade deployments. Community support and contributions have grown significantly, making Beam a stable and evolving ecosystem. Many cloud providers offer native support for Beam pipelines, and it's increasingly a core component in modern data platform architectures.\"\u001b[0m,\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"stream processing\"\u001b[0m,\n",
+ " \u001b[32m\"batch processing\"\u001b[0m,\n",
+ " \u001b[32m\"data pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"SDK\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Data Engineering\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Batch\"\u001b[0m,\n",
+ " \u001b[32m\"Big Data\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m,\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"3_0\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.031\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"content\"\u001b[0m: \u001b[32m\"Google Beam is an innovative video communication platform that builds on the research of Project Starline. It uses AI, 3D imaging, and light field rendering to create immersive, lifelike video calls. Designed to replicate in-person interaction, Beam allows users to see life-sized, three-dimensional representations of each other without the need for headsets. This breakthrough makes remote conversations feel natural—capturing facial expressions, eye contact, and subtle gestures that traditional video conferencing often misses. Beam reduces meeting fatigue and enhances engagement, making it ideal for enterprise collaboration, interviews, and virtual presence scenarios. Powered by Google AI, Beam represents a significant leap in communication technology. Major companies like Salesforce, Deloitte, and NEC are already exploring its impact on digital collaboration. Google is partnering with HP to build and distribute Beam hardware, designed to work with existing productivity and video tools. Currently in limited early access for enterprise partners, Google Beam aims to redefine virtual meetings by bridging the gap between digital and physical presence. It’s a promising step toward more human and effective remote interactions.\"\u001b[0m,\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Google Beam: 3D Communication Powered by AI\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Google Beam\"\u001b[0m,\n",
+ " \u001b[32m\"Project Starline\"\u001b[0m,\n",
+ " \u001b[32m\"3D video\"\u001b[0m,\n",
+ " \u001b[32m\"AI communication\"\u001b[0m,\n",
+ " \u001b[32m\"real-time meetings\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"AI\"\u001b[0m,\n",
+ " \u001b[32m\"Communication\"\u001b[0m,\n",
+ " \u001b[32m\"3D Technology\"\u001b[0m,\n",
+ " \u001b[32m\"Remote Work\"\u001b[0m,\n",
+ " \u001b[32m\"Enterprise Tech\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ "\u001b[1m}\u001b[0m\n"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "with beam.Pipeline() as p:\n",
+ " _ = (\n",
+ " p\n",
+ " | \"Creating Queries\" >> beam.Create([query_chunk])\n",
+ " | 'Generating Embeddings' >> MLTransform(\n",
+ " write_artifact_location=tempfile.mkdtemp()).with_transform(huggingface_embedder)\n",
+ " | \"Enriching W/ Milvus Vector Search\" >> Enrichment(milvus_handler)\n",
+ " | \"Formatting and Printing Results\" >> FormatAndPrintResults())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "cb626be4-1c1c-4426-a6be-9cc8e385f2c8",
+ "metadata": {},
+ "source": [
+ "### Keyword Search"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "b30b29dc-0a59-4cff-b8a3-ace6e801b4da",
+ "metadata": {},
+ "source": [
+ "Let’s choose a deliberate query that illustrates the unique benefits of pure keyword search, especially its ability to pinpoint exact textual matches:\n",
+ "\n",
+ "Query: `Project Starline`\n",
+ "\n",
+ "This query demonstrates keyword search advantages because:\n",
+ "\n",
+ "- **Keyword (lexical) contribution:** The query, `Project Starline`, is an exact phrase. Keyword search is designed to prioritize and precisely match such literal strings, acting as an exact textual filter for specific product names or unique identifiers.\n",
+ "- **Overcoming vector limitations:** For a highly specific, proper noun like \"Project Starline\", pure vector search might struggle. It could semantically relate to other \"projects\" or \"communication technologies,\" potentially diluting the precision by not inherently prioritizing the exact string match over broader semantic similarity.\n",
+ "- **Keyword search advantage:** Only Document 3 (\"Google Beam: 3D Communication Powered by AI\") contains the exact phrase: `Google Beam is an innovative video communication platform that builds on the research of Project Starline.` A keyword search for \"Project Starline\" will exclusively and precisely retrieve Document 3, showcasing its unparalleled accuracy for factual lookups and named entities where the exact string is paramount.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 32,
+ "id": "f159ad87-5153-48bb-87b3-3845d3c76420",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "query = \"Project Starline\"\n",
+ "query_chunk = Chunk(content=Content(text=query))"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 33,
+ "id": "8b8cad3e-8a18-464b-8de6-aa4515a653c5",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "search_parameters = MilvusSearchParameters(\n",
+ " collection_name=collection_name,\n",
+ " search_strategy=KeywordSearchParameters(limit=10,anns_field=\"sparse_embedding\"),\n",
+ " output_fields=[\"metadata\",\"content\"])"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 34,
+ "id": "47cfc650-0b34-4333-9321-19be2e8fdc85",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "collection_load_parameters = MilvusCollectionLoadParameters()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 35,
+ "id": "4754763b-66bf-4f90-9920-28cef223b536",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "milvus_handler = MilvusSearchEnrichmentHandler(\n",
+ " connection_parameters=milvus_connection_parameters,\n",
+ " search_parameters=search_parameters,\n",
+ " collection_load_parameters=collection_load_parameters)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 36,
+ "id": "a3db4837-01c7-42d7-b4e8-58d8d361fe93",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Union[Tuple[apache_beam.pvalue.PCollection[~MLTransformOutputT], apache_beam.pvalue.PCollection[apache_beam.pvalue.Row]], apache_beam.pvalue.PCollection[~MLTransformOutputT]] instead.\n",
+ "WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Union[Tuple[apache_beam.pvalue.PCollection[~MLTransformOutputT], apache_beam.pvalue.PCollection[apache_beam.pvalue.Row]], apache_beam.pvalue.PCollection[~MLTransformOutputT]] instead.\n"
+ ]
+ },
+ {
+ "data": {
+ "text/html": [
+ "{\n",
+ " \"query\": \"Project Starline\",\n",
+ " \"query_embedding\": 384,\n",
+ " \"results\": [\n",
+ " {\n",
+ " \"id\": \"3_0\",\n",
+ " \"distance\": 2.8536,\n",
+ " \"fields\": {\n",
+ " \"content\": \"Google Beam is an innovative video communication platform that builds on the research of Project Starline. It uses AI, 3D imaging, and light field rendering to create immersive, lifelike video calls. Designed to replicate in-person interaction, Beam allows users to see life-sized, three-dimensional representations of each other without the need for headsets. This breakthrough makes remote conversations feel natural—capturing facial expressions, eye contact, and subtle gestures that traditional video conferencing often misses. Beam reduces meeting fatigue and enhances engagement, making it ideal for enterprise collaboration, interviews, and virtual presence scenarios. Powered by Google AI, Beam represents a significant leap in communication technology. Major companies like Salesforce, Deloitte, and NEC are already exploring its impact on digital collaboration. Google is partnering with HP to build and distribute Beam hardware, designed to work with existing productivity and video tools. Currently in limited early access for enterprise partners, Google Beam aims to redefine virtual meetings by bridging the gap between digital and physical presence. It’s a promising step toward more human and effective remote interactions.\",\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Google Beam: 3D Communication Powered by AI\",\n",
+ " \"keywords\": [\n",
+ " \"Google Beam\",\n",
+ " \"Project Starline\",\n",
+ " \"3D video\",\n",
+ " \"AI communication\",\n",
+ " \"real-time meetings\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"AI\",\n",
+ " \"Communication\",\n",
+ " \"3D Technology\",\n",
+ " \"Remote Work\",\n",
+ " \"Enterprise Tech\"\n",
+ " ]\n",
+ " }\n",
+ " }\n",
+ " }\n",
+ " ]\n",
+ "}\n",
+ "\n"
+ ],
+ "text/plain": [
+ "\u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"Project Starline\"\u001b[0m,\n",
+ " \u001b[1;34m\"query_embedding\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n",
+ " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"3_0\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m2.8536\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"content\"\u001b[0m: \u001b[32m\"Google Beam is an innovative video communication platform that builds on the research of Project Starline. It uses AI, 3D imaging, and light field rendering to create immersive, lifelike video calls. Designed to replicate in-person interaction, Beam allows users to see life-sized, three-dimensional representations of each other without the need for headsets. This breakthrough makes remote conversations feel natural—capturing facial expressions, eye contact, and subtle gestures that traditional video conferencing often misses. Beam reduces meeting fatigue and enhances engagement, making it ideal for enterprise collaboration, interviews, and virtual presence scenarios. Powered by Google AI, Beam represents a significant leap in communication technology. Major companies like Salesforce, Deloitte, and NEC are already exploring its impact on digital collaboration. Google is partnering with HP to build and distribute Beam hardware, designed to work with existing productivity and video tools. Currently in limited early access for enterprise partners, Google Beam aims to redefine virtual meetings by bridging the gap between digital and physical presence. It’s a promising step toward more human and effective remote interactions.\"\u001b[0m,\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Google Beam: 3D Communication Powered by AI\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Google Beam\"\u001b[0m,\n",
+ " \u001b[32m\"Project Starline\"\u001b[0m,\n",
+ " \u001b[32m\"3D video\"\u001b[0m,\n",
+ " \u001b[32m\"AI communication\"\u001b[0m,\n",
+ " \u001b[32m\"real-time meetings\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"AI\"\u001b[0m,\n",
+ " \u001b[32m\"Communication\"\u001b[0m,\n",
+ " \u001b[32m\"3D Technology\"\u001b[0m,\n",
+ " \u001b[32m\"Remote Work\"\u001b[0m,\n",
+ " \u001b[32m\"Enterprise Tech\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ "\u001b[1m}\u001b[0m\n"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "with beam.Pipeline() as p:\n",
+ " _ = (\n",
+ " p\n",
+ " | \"Creating Queries\" >> beam.Create([query_chunk])\n",
+ " | 'Generating Embeddings' >> MLTransform(\n",
+ " write_artifact_location=tempfile.mkdtemp()).with_transform(huggingface_embedder)\n",
+ " | \"Enriching W/ Milvus Keyword Search\" >> Enrichment(milvus_handler)\n",
+ " | \"Formatting and Printing Results\" >> FormatAndPrintResults()\n",
+ " )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "de344931-4f2e-473d-bd53-c2708c1d1bcc",
+ "metadata": {},
+ "source": [
+ "### Hybrid Search"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "e65b2158-5dce-46d1-80de-3c8047419224",
+ "metadata": {},
+ "source": [
+ "Let’s choose a deliberate query that illustrates the unique benefits of hybrid search:\n",
+ "\n",
+ "Query: `real-time data processing systems`\n",
+ "\n",
+ "This query demonstrates hybrid search advantages because:\n",
+ "\n",
+ "* **Dense vector (semantic) contribution:** Will understand the conceptual relationship between \"real-time processing\" and \"streaming\" (found in docs #1 and #2)\n",
+ "* **Sparse vector (keyword) contribution:** Will match exact terms like \"data\" and \"processing\" (found in docs #1 and #2)\n",
+ "* **Hybrid advantage:** Document #1 about Apache Beam should rank highest since it contains more specific technical details about real-time processing capabilities like \"event time,\" \"triggers,\" and \"stateful processing\" - even though the exact phrase \"real-time data processing\" doesn't appear in any document"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 37,
+ "id": "172b6c80-2a03-49d0-afc7-12bb0a4dc989",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "query = \"real-time data processing system\"\n",
+ "query_chunk = Chunk(content=Content(text=query))"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 38,
+ "id": "eb6d951c-0def-45cc-84a4-b6f7b7575f23",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "hybrid_search_parameters = HybridSearchParameters(\n",
+ " vector=VectorSearchParameters(limit=10,anns_field=\"embedding\"),\n",
+ " keyword=KeywordSearchParameters(limit=10,anns_field=\"sparse_embedding\"),\n",
+ " ranker=RRFRanker(3),\n",
+ " limit=2)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 39,
+ "id": "b339c498-d229-42e6-b439-b29eb107b533",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "search_parameters = MilvusSearchParameters(\n",
+ " collection_name=collection_name,\n",
+ " search_strategy=hybrid_search_parameters,\n",
+ " output_fields=[\"metadata\", \"content\"])"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 40,
+ "id": "b346abe6-03c9-4b28-a0fb-74936b9f3a06",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "collection_load_parameters = MilvusCollectionLoadParameters() "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 41,
+ "id": "ab27810d-40a8-4b6a-bc82-441e13763ebc",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "milvus_handler = MilvusSearchEnrichmentHandler(\n",
+ " connection_parameters=milvus_connection_parameters,\n",
+ " search_parameters=search_parameters,\n",
+ " collection_load_parameters=collection_load_parameters)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 42,
+ "id": "ea9d84f7-d142-4afa-9a6f-6c310d9604b0",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Union[Tuple[apache_beam.pvalue.PCollection[~MLTransformOutputT], apache_beam.pvalue.PCollection[apache_beam.pvalue.Row]], apache_beam.pvalue.PCollection[~MLTransformOutputT]] instead.\n",
+ "WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Union[Tuple[apache_beam.pvalue.PCollection[~MLTransformOutputT], apache_beam.pvalue.PCollection[apache_beam.pvalue.Row]], apache_beam.pvalue.PCollection[~MLTransformOutputT]] instead.\n"
+ ]
+ },
+ {
+ "data": {
+ "text/html": [
+ "{\n",
+ " \"query\": \"real-time data processing system\",\n",
+ " \"query_embedding\": 384,\n",
+ " \"results\": [\n",
+ " {\n",
+ " \"id\": \"1_0\",\n",
+ " \"distance\": 0.45,\n",
+ " \"fields\": {\n",
+ " \"content\": \"Apache Beam is an open-source framework that provides a consistent programming model for both batch and streaming data processing. Developed originally by Google, it allows developers to write pipelines that can run on multiple engines, such as Apache Flink, Spark, and Google Cloud Dataflow. Beam uses abstractions like PCollections (data containers) and PTransforms (operations) to define the flow of data. The framework promotes portability through its runner architecture, letting the same pipeline execute on different backends. Support for multiple SDKs, including Java and Python, makes it accessible for a broad audience. Key features include support for event time, windowing, triggers, and stateful processing, which are essential for handling real-time data effectively. Beam is ideal for building ETL jobs, real-time analytics, and machine learning data pipelines. It helps teams focus on logic rather than infrastructure, offering flexibility and scalability in handling unbounded and bounded data sources. Apache Beam also supports a wide range of connectors for both input and output, including Kafka, BigQuery, and JDBC-based systems. This makes it easy to integrate Beam into existing data ecosystems. Developers can build reusable transforms and modularize pipeline logic, improving maintainability and testing.\",\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Beam\",\n",
+ " \"stream processing\",\n",
+ " \"batch processing\",\n",
+ " \"data pipelines\",\n",
+ " \"SDK\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Data Engineering\",\n",
+ " \"Open Source\",\n",
+ " \"Streaming\",\n",
+ " \"Batch\",\n",
+ " \"Big Data\"\n",
+ " ]\n",
+ " }\n",
+ " }\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"2_1\",\n",
+ " \"distance\": 0.3929,\n",
+ " \"fields\": {\n",
+ " \"content\": \"For developers, Dataflow provides local testing capabilities and a unified logging system through Cloud Logging. It also supports SQL-based pipeline definitions using BigQuery, which lowers the barrier to entry for analysts and data engineers. Dataflow’s streaming engine significantly improves performance and reduces costs by decoupling compute and state management. In summary, Google Cloud Dataflow not only simplifies the deployment of Apache Beam pipelines but also enhances them with cloud-native features. Its managed runtime, high availability, and integration with the broader Google Cloud ecosystem make it a powerful tool for modern data processing.\",\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Google Cloud Dataflow: Run Apache Beam in the Cloud\",\n",
+ " \"keywords\": [\n",
+ " \"Google Cloud\",\n",
+ " \"Dataflow\",\n",
+ " \"Apache Beam\",\n",
+ " \"serverless\",\n",
+ " \"stream and batch\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Cloud Computing\",\n",
+ " \"Data Pipelines\",\n",
+ " \"Google Cloud\",\n",
+ " \"Serverless\",\n",
+ " \"Enterprise\"\n",
+ " ]\n",
+ " }\n",
+ " }\n",
+ " }\n",
+ " ]\n",
+ "}\n",
+ "\n"
+ ],
+ "text/plain": [
+ "\u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"real-time data processing system\"\u001b[0m,\n",
+ " \u001b[1;34m\"query_embedding\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n",
+ " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_0\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.45\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"content\"\u001b[0m: \u001b[32m\"Apache Beam is an open-source framework that provides a consistent programming model for both batch and streaming data processing. Developed originally by Google, it allows developers to write pipelines that can run on multiple engines, such as Apache Flink, Spark, and Google Cloud Dataflow. Beam uses abstractions like PCollections (data containers) and PTransforms (operations) to define the flow of data. The framework promotes portability through its runner architecture, letting the same pipeline execute on different backends. Support for multiple SDKs, including Java and Python, makes it accessible for a broad audience. Key features include support for event time, windowing, triggers, and stateful processing, which are essential for handling real-time data effectively. Beam is ideal for building ETL jobs, real-time analytics, and machine learning data pipelines. It helps teams focus on logic rather than infrastructure, offering flexibility and scalability in handling unbounded and bounded data sources. Apache Beam also supports a wide range of connectors for both input and output, including Kafka, BigQuery, and JDBC-based systems. This makes it easy to integrate Beam into existing data ecosystems. Developers can build reusable transforms and modularize pipeline logic, improving maintainability and testing.\"\u001b[0m,\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"stream processing\"\u001b[0m,\n",
+ " \u001b[32m\"batch processing\"\u001b[0m,\n",
+ " \u001b[32m\"data pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"SDK\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Data Engineering\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Batch\"\u001b[0m,\n",
+ " \u001b[32m\"Big Data\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m,\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"2_1\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.3929\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"content\"\u001b[0m: \u001b[32m\"For developers, Dataflow provides local testing capabilities and a unified logging system through Cloud Logging. It also supports SQL-based pipeline definitions using BigQuery, which lowers the barrier to entry for analysts and data engineers. Dataflow’s streaming engine significantly improves performance and reduces costs by decoupling compute and state management. In summary, Google Cloud Dataflow not only simplifies the deployment of Apache Beam pipelines but also enhances them with cloud-native features. Its managed runtime, high availability, and integration with the broader Google Cloud ecosystem make it a powerful tool for modern data processing.\"\u001b[0m,\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Google Cloud Dataflow: Run Apache Beam in the Cloud\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Google Cloud\"\u001b[0m,\n",
+ " \u001b[32m\"Dataflow\"\u001b[0m,\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"serverless\"\u001b[0m,\n",
+ " \u001b[32m\"stream and batch\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Cloud Computing\"\u001b[0m,\n",
+ " \u001b[32m\"Data Pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"Google Cloud\"\u001b[0m,\n",
+ " \u001b[32m\"Serverless\"\u001b[0m,\n",
+ " \u001b[32m\"Enterprise\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ "\u001b[1m}\u001b[0m\n"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "with beam.Pipeline() as p:\n",
+ " _ = (\n",
+ " p\n",
+ " | \"Creating Queries\" >> beam.Create([query_chunk])\n",
+ " | 'Generating Embeddings' >> MLTransform(\n",
+ " write_artifact_location=tempfile.mkdtemp()).with_transform(huggingface_embedder)\n",
+ " | \"Enriching W/ Milvus Hybrid Search\" >> Enrichment(milvus_handler)\n",
+ " | \"Formatting and Printing Results\" >> FormatAndPrintResults()\n",
+ " )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "58753d47-5e63-49ef-8d95-f9acd94b8c0e",
+ "metadata": {},
+ "source": [
+ "### Filtered Search (Metadata Filtering)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "0fdd049f-e856-4fa8-b3df-1498b973946b",
+ "metadata": {},
+ "source": [
+ "When a user queries `what is beam?` using a **vector search strategy**, the semantic nature of **vector embeddings** can lead to ambiguity. Without additional context, the system might confuse **Google Beam** (a 3D communication platform) with **Apache Beam** (a data processing framework).\n",
+ "\n",
+ "**Metadata filtering** directly solves this by adding contextual constraints. For instance, applying a **specific metadata filter** (e.g., `{\"category\": \"computing\"}` or `{\"domain\": \"communication\"}`) before the vector search ensures that only documents relevant to the intended concept are considered. This dramatically narrows down results, enhances search precision, and overcomes the limitations of pure content-based search by disambiguating terms like \"beam\" with specific, structured criteria."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "3c96898d-af2d-4401-a9ca-8d230fa95e6e",
+ "metadata": {},
+ "source": [
+ "#### Without Filtered Search"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "2e549b22-256e-44c8-9638-eafc3a844770",
+ "metadata": {},
+ "source": [
+ "As seen in the search results down below when a user searches for `what is beam?` without applying filters, the search results include both `Apache Beam` and `Google Beam`. Filtered search can come in play here by limiting the relevant search results."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 43,
+ "id": "3d267853-649d-494f-bea6-bbfe20650f79",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "query = \"what is beam?\"\n",
+ "query_chunk = Chunk(content=Content(text=query))"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 44,
+ "id": "28a45b1c-f9a5-452e-aea6-ac46f17e01bd",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "search_parameters = MilvusSearchParameters(\n",
+ " collection_name=collection_name,\n",
+ " search_strategy=VectorSearchParameters(\n",
+ " limit=10,\n",
+ " anns_field=\"embedding\",\n",
+ " ),\n",
+ " output_fields=[\"metadata\",\"content\"])"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 45,
+ "id": "9ce3f0c7-fd1d-49a1-81e9-b8153cd284ea",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "collection_load_parameters = MilvusCollectionLoadParameters() "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 46,
+ "id": "6fad29b5-c2b0-4458-ab83-b38eb15a7505",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "milvus_handler = MilvusSearchEnrichmentHandler(\n",
+ " connection_parameters=milvus_connection_parameters,\n",
+ " search_parameters=search_parameters,\n",
+ " collection_load_parameters=collection_load_parameters)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 47,
+ "id": "77add8a8-ddb8-48de-b1af-632d78c0d112",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Union[Tuple[apache_beam.pvalue.PCollection[~MLTransformOutputT], apache_beam.pvalue.PCollection[apache_beam.pvalue.Row]], apache_beam.pvalue.PCollection[~MLTransformOutputT]] instead.\n",
+ "WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Union[Tuple[apache_beam.pvalue.PCollection[~MLTransformOutputT], apache_beam.pvalue.PCollection[apache_beam.pvalue.Row]], apache_beam.pvalue.PCollection[~MLTransformOutputT]] instead.\n"
+ ]
+ },
+ {
+ "data": {
+ "text/html": [
+ "{\n",
+ " \"query\": \"what is beam?\",\n",
+ " \"query_embedding\": 384,\n",
+ " \"results\": [\n",
+ " {\n",
+ " \"id\": \"1_0\",\n",
+ " \"distance\": 0.4598,\n",
+ " \"fields\": {\n",
+ " \"content\": \"Apache Beam is an open-source framework that provides a consistent programming model for both batch and streaming data processing. Developed originally by Google, it allows developers to write pipelines that can run on multiple engines, such as Apache Flink, Spark, and Google Cloud Dataflow. Beam uses abstractions like PCollections (data containers) and PTransforms (operations) to define the flow of data. The framework promotes portability through its runner architecture, letting the same pipeline execute on different backends. Support for multiple SDKs, including Java and Python, makes it accessible for a broad audience. Key features include support for event time, windowing, triggers, and stateful processing, which are essential for handling real-time data effectively. Beam is ideal for building ETL jobs, real-time analytics, and machine learning data pipelines. It helps teams focus on logic rather than infrastructure, offering flexibility and scalability in handling unbounded and bounded data sources. Apache Beam also supports a wide range of connectors for both input and output, including Kafka, BigQuery, and JDBC-based systems. This makes it easy to integrate Beam into existing data ecosystems. Developers can build reusable transforms and modularize pipeline logic, improving maintainability and testing.\",\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Beam\",\n",
+ " \"stream processing\",\n",
+ " \"batch processing\",\n",
+ " \"data pipelines\",\n",
+ " \"SDK\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Data Engineering\",\n",
+ " \"Open Source\",\n",
+ " \"Streaming\",\n",
+ " \"Batch\",\n",
+ " \"Big Data\"\n",
+ " ]\n",
+ " }\n",
+ " }\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"1_1\",\n",
+ " \"distance\": 0.4353,\n",
+ " \"fields\": {\n",
+ " \"content\": \"Developers can build reusable transforms and modularize pipeline logic, improving maintainability and testing. The concept of runners enables developers to write once and run anywhere, which is particularly appealing for organizations that want to avoid vendor lock-in. The Beam model is based on a unified programming model that decouples pipeline logic from execution. This makes it easier to reason about time and state in both batch and streaming pipelines. Advanced features like late data handling, watermarks, and session windowing allow for more accurate and meaningful processing of real-world data. Beam also integrates with orchestration tools and monitoring systems, allowing for production-grade deployments. Community support and contributions have grown significantly, making Beam a stable and evolving ecosystem. Many cloud providers offer native support for Beam pipelines, and it's increasingly a core component in modern data platform architectures.\",\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Beam\",\n",
+ " \"stream processing\",\n",
+ " \"batch processing\",\n",
+ " \"data pipelines\",\n",
+ " \"SDK\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Data Engineering\",\n",
+ " \"Open Source\",\n",
+ " \"Streaming\",\n",
+ " \"Batch\",\n",
+ " \"Big Data\"\n",
+ " ]\n",
+ " }\n",
+ " }\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"3_0\",\n",
+ " \"distance\": 0.3927,\n",
+ " \"fields\": {\n",
+ " \"content\": \"Google Beam is an innovative video communication platform that builds on the research of Project Starline. It uses AI, 3D imaging, and light field rendering to create immersive, lifelike video calls. Designed to replicate in-person interaction, Beam allows users to see life-sized, three-dimensional representations of each other without the need for headsets. This breakthrough makes remote conversations feel natural—capturing facial expressions, eye contact, and subtle gestures that traditional video conferencing often misses. Beam reduces meeting fatigue and enhances engagement, making it ideal for enterprise collaboration, interviews, and virtual presence scenarios. Powered by Google AI, Beam represents a significant leap in communication technology. Major companies like Salesforce, Deloitte, and NEC are already exploring its impact on digital collaboration. Google is partnering with HP to build and distribute Beam hardware, designed to work with existing productivity and video tools. Currently in limited early access for enterprise partners, Google Beam aims to redefine virtual meetings by bridging the gap between digital and physical presence. It’s a promising step toward more human and effective remote interactions.\",\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Google Beam: 3D Communication Powered by AI\",\n",
+ " \"keywords\": [\n",
+ " \"Google Beam\",\n",
+ " \"Project Starline\",\n",
+ " \"3D video\",\n",
+ " \"AI communication\",\n",
+ " \"real-time meetings\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"AI\",\n",
+ " \"Communication\",\n",
+ " \"3D Technology\",\n",
+ " \"Remote Work\",\n",
+ " \"Enterprise Tech\"\n",
+ " ]\n",
+ " }\n",
+ " }\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"2_1\",\n",
+ " \"distance\": 0.2925,\n",
+ " \"fields\": {\n",
+ " \"content\": \"For developers, Dataflow provides local testing capabilities and a unified logging system through Cloud Logging. It also supports SQL-based pipeline definitions using BigQuery, which lowers the barrier to entry for analysts and data engineers. Dataflow’s streaming engine significantly improves performance and reduces costs by decoupling compute and state management. In summary, Google Cloud Dataflow not only simplifies the deployment of Apache Beam pipelines but also enhances them with cloud-native features. Its managed runtime, high availability, and integration with the broader Google Cloud ecosystem make it a powerful tool for modern data processing.\",\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Google Cloud Dataflow: Run Apache Beam in the Cloud\",\n",
+ " \"keywords\": [\n",
+ " \"Google Cloud\",\n",
+ " \"Dataflow\",\n",
+ " \"Apache Beam\",\n",
+ " \"serverless\",\n",
+ " \"stream and batch\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Cloud Computing\",\n",
+ " \"Data Pipelines\",\n",
+ " \"Google Cloud\",\n",
+ " \"Serverless\",\n",
+ " \"Enterprise\"\n",
+ " ]\n",
+ " }\n",
+ " }\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"2_0\",\n",
+ " \"distance\": 0.2342,\n",
+ " \"fields\": {\n",
+ " \"content\": \"Google Cloud Dataflow is a fully managed service that runs Apache Beam pipelines in the cloud. It abstracts away infrastructure management and handles dynamic scaling, load balancing, and fault tolerance. Developers can focus on writing data logic using the Beam SDK and deploy it easily to Google Cloud. Dataflow supports both batch and stream processing and integrates seamlessly with other Google services like BigQuery, Pub/Sub, and Cloud Storage. Its autoscaling capabilities allow it to adapt to changing data volumes, optimizing for cost and performance. Features like monitoring dashboards, job templates, and built-in logging make it suitable for both development and production use. With support for event time processing, stateful functions, and windowing, Dataflow is well-suited for real-time analytics and data transformation tasks. It’s a key component for architects building scalable, cloud-native data platforms. Dataflow also offers templates for common ETL tasks, helping teams get started quickly with minimal setup. Its integration with Cloud Functions and Cloud Composer enables event-driven and orchestrated workflows. Security and compliance are built-in with IAM roles, encryption at rest and in transit, and audit logging, making it suitable for enterprise environments.\",\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Google Cloud Dataflow: Run Apache Beam in the Cloud\",\n",
+ " \"keywords\": [\n",
+ " \"Google Cloud\",\n",
+ " \"Dataflow\",\n",
+ " \"Apache Beam\",\n",
+ " \"serverless\",\n",
+ " \"stream and batch\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Cloud Computing\",\n",
+ " \"Data Pipelines\",\n",
+ " \"Google Cloud\",\n",
+ " \"Serverless\",\n",
+ " \"Enterprise\"\n",
+ " ]\n",
+ " }\n",
+ " }\n",
+ " }\n",
+ " ]\n",
+ "}\n",
+ "\n"
+ ],
+ "text/plain": [
+ "\u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"what is beam?\"\u001b[0m,\n",
+ " \u001b[1;34m\"query_embedding\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n",
+ " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_0\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.4598\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"content\"\u001b[0m: \u001b[32m\"Apache Beam is an open-source framework that provides a consistent programming model for both batch and streaming data processing. Developed originally by Google, it allows developers to write pipelines that can run on multiple engines, such as Apache Flink, Spark, and Google Cloud Dataflow. Beam uses abstractions like PCollections (data containers) and PTransforms (operations) to define the flow of data. The framework promotes portability through its runner architecture, letting the same pipeline execute on different backends. Support for multiple SDKs, including Java and Python, makes it accessible for a broad audience. Key features include support for event time, windowing, triggers, and stateful processing, which are essential for handling real-time data effectively. Beam is ideal for building ETL jobs, real-time analytics, and machine learning data pipelines. It helps teams focus on logic rather than infrastructure, offering flexibility and scalability in handling unbounded and bounded data sources. Apache Beam also supports a wide range of connectors for both input and output, including Kafka, BigQuery, and JDBC-based systems. This makes it easy to integrate Beam into existing data ecosystems. Developers can build reusable transforms and modularize pipeline logic, improving maintainability and testing.\"\u001b[0m,\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"stream processing\"\u001b[0m,\n",
+ " \u001b[32m\"batch processing\"\u001b[0m,\n",
+ " \u001b[32m\"data pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"SDK\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Data Engineering\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Batch\"\u001b[0m,\n",
+ " \u001b[32m\"Big Data\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m,\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_1\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.4353\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"content\"\u001b[0m: \u001b[32m\"Developers can build reusable transforms and modularize pipeline logic, improving maintainability and testing. The concept of runners enables developers to write once and run anywhere, which is particularly appealing for organizations that want to avoid vendor lock-in. The Beam model is based on a unified programming model that decouples pipeline logic from execution. This makes it easier to reason about time and state in both batch and streaming pipelines. Advanced features like late data handling, watermarks, and session windowing allow for more accurate and meaningful processing of real-world data. Beam also integrates with orchestration tools and monitoring systems, allowing for production-grade deployments. Community support and contributions have grown significantly, making Beam a stable and evolving ecosystem. Many cloud providers offer native support for Beam pipelines, and it's increasingly a core component in modern data platform architectures.\"\u001b[0m,\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"stream processing\"\u001b[0m,\n",
+ " \u001b[32m\"batch processing\"\u001b[0m,\n",
+ " \u001b[32m\"data pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"SDK\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Data Engineering\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Batch\"\u001b[0m,\n",
+ " \u001b[32m\"Big Data\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m,\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"3_0\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.3927\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"content\"\u001b[0m: \u001b[32m\"Google Beam is an innovative video communication platform that builds on the research of Project Starline. It uses AI, 3D imaging, and light field rendering to create immersive, lifelike video calls. Designed to replicate in-person interaction, Beam allows users to see life-sized, three-dimensional representations of each other without the need for headsets. This breakthrough makes remote conversations feel natural—capturing facial expressions, eye contact, and subtle gestures that traditional video conferencing often misses. Beam reduces meeting fatigue and enhances engagement, making it ideal for enterprise collaboration, interviews, and virtual presence scenarios. Powered by Google AI, Beam represents a significant leap in communication technology. Major companies like Salesforce, Deloitte, and NEC are already exploring its impact on digital collaboration. Google is partnering with HP to build and distribute Beam hardware, designed to work with existing productivity and video tools. Currently in limited early access for enterprise partners, Google Beam aims to redefine virtual meetings by bridging the gap between digital and physical presence. It’s a promising step toward more human and effective remote interactions.\"\u001b[0m,\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Google Beam: 3D Communication Powered by AI\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Google Beam\"\u001b[0m,\n",
+ " \u001b[32m\"Project Starline\"\u001b[0m,\n",
+ " \u001b[32m\"3D video\"\u001b[0m,\n",
+ " \u001b[32m\"AI communication\"\u001b[0m,\n",
+ " \u001b[32m\"real-time meetings\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"AI\"\u001b[0m,\n",
+ " \u001b[32m\"Communication\"\u001b[0m,\n",
+ " \u001b[32m\"3D Technology\"\u001b[0m,\n",
+ " \u001b[32m\"Remote Work\"\u001b[0m,\n",
+ " \u001b[32m\"Enterprise Tech\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m,\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"2_1\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.2925\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"content\"\u001b[0m: \u001b[32m\"For developers, Dataflow provides local testing capabilities and a unified logging system through Cloud Logging. It also supports SQL-based pipeline definitions using BigQuery, which lowers the barrier to entry for analysts and data engineers. Dataflow’s streaming engine significantly improves performance and reduces costs by decoupling compute and state management. In summary, Google Cloud Dataflow not only simplifies the deployment of Apache Beam pipelines but also enhances them with cloud-native features. Its managed runtime, high availability, and integration with the broader Google Cloud ecosystem make it a powerful tool for modern data processing.\"\u001b[0m,\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Google Cloud Dataflow: Run Apache Beam in the Cloud\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Google Cloud\"\u001b[0m,\n",
+ " \u001b[32m\"Dataflow\"\u001b[0m,\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"serverless\"\u001b[0m,\n",
+ " \u001b[32m\"stream and batch\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Cloud Computing\"\u001b[0m,\n",
+ " \u001b[32m\"Data Pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"Google Cloud\"\u001b[0m,\n",
+ " \u001b[32m\"Serverless\"\u001b[0m,\n",
+ " \u001b[32m\"Enterprise\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m,\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"2_0\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.2342\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"content\"\u001b[0m: \u001b[32m\"Google Cloud Dataflow is a fully managed service that runs Apache Beam pipelines in the cloud. It abstracts away infrastructure management and handles dynamic scaling, load balancing, and fault tolerance. Developers can focus on writing data logic using the Beam SDK and deploy it easily to Google Cloud. Dataflow supports both batch and stream processing and integrates seamlessly with other Google services like BigQuery, Pub/Sub, and Cloud Storage. Its autoscaling capabilities allow it to adapt to changing data volumes, optimizing for cost and performance. Features like monitoring dashboards, job templates, and built-in logging make it suitable for both development and production use. With support for event time processing, stateful functions, and windowing, Dataflow is well-suited for real-time analytics and data transformation tasks. It’s a key component for architects building scalable, cloud-native data platforms. Dataflow also offers templates for common ETL tasks, helping teams get started quickly with minimal setup. Its integration with Cloud Functions and Cloud Composer enables event-driven and orchestrated workflows. Security and compliance are built-in with IAM roles, encryption at rest and in transit, and audit logging, making it suitable for enterprise environments.\"\u001b[0m,\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Google Cloud Dataflow: Run Apache Beam in the Cloud\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Google Cloud\"\u001b[0m,\n",
+ " \u001b[32m\"Dataflow\"\u001b[0m,\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"serverless\"\u001b[0m,\n",
+ " \u001b[32m\"stream and batch\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Cloud Computing\"\u001b[0m,\n",
+ " \u001b[32m\"Data Pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"Google Cloud\"\u001b[0m,\n",
+ " \u001b[32m\"Serverless\"\u001b[0m,\n",
+ " \u001b[32m\"Enterprise\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ "\u001b[1m}\u001b[0m\n"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "with beam.Pipeline() as p:\n",
+ " _ = (\n",
+ " p\n",
+ " | \"Creating Queries\" >> beam.Create([query_chunk])\n",
+ " | 'Generating Embeddings' >> MLTransform(\n",
+ " write_artifact_location=tempfile.mkdtemp()).with_transform(huggingface_embedder)\n",
+ " | \"Enriching W/ Milvus Vector Search\" >> Enrichment(milvus_handler)\n",
+ " | \"Formatting and Printing Results\" >> FormatAndPrintResults())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "cb72f9c6-5a29-4810-9768-574aa7ea5128",
+ "metadata": {},
+ "source": [
+ "#### Searching for Apache Beam with Filtered Search"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "df64b70f-bad8-469f-8419-723911f7f7cf",
+ "metadata": {},
+ "source": [
+ "To precisely target **Apache Beam** and ensure the retrieval of only relevant documents, we can leverage the power of **metadata filtering**. By applying a filter that specifies the document's `keywords` must contain `data pipelines`, we can instruct the undelrying search engine to exclude any documents related to `Google Beam` from the result set. This allows the vector search to operate on a pre-filtered, highly relevant subset of the corpus, guaranteeing that the retrieved information pertains exclusively to `Apache Beam`'s domain, thereby resolving the semantic ambiguity with remarkable precision."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 48,
+ "id": "6e79ef5c-a121-4e69-9089-0991821f8745",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "query = \"what is beam?\"\n",
+ "query_chunk = Chunk(content=Content(text=query))"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 49,
+ "id": "5314c531-14bb-4d81-92a5-fcf9cca7fa81",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "search_parameters = MilvusSearchParameters(\n",
+ " collection_name=collection_name,\n",
+ " search_strategy=VectorSearchParameters(\n",
+ " filter=\"ARRAY_CONTAINS(metadata['keywords'], 'data pipelines')\",\n",
+ " limit=10,\n",
+ " anns_field=\"embedding\",\n",
+ " ),\n",
+ " output_fields=[\"metadata\",\"content\"])"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 50,
+ "id": "0ecf2ac6-cf90-4ce7-b17f-113af90ab950",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "collection_load_parameters = MilvusCollectionLoadParameters() "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 51,
+ "id": "0cd92b69-b9dc-445c-9bd7-21bb3ceb0fd3",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "milvus_handler = MilvusSearchEnrichmentHandler(\n",
+ " connection_parameters=milvus_connection_parameters,\n",
+ " search_parameters=search_parameters,\n",
+ " collection_load_parameters=collection_load_parameters)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 52,
+ "id": "b06ecf64-c314-4c6a-ae1a-4fdf059aeead",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Union[Tuple[apache_beam.pvalue.PCollection[~MLTransformOutputT], apache_beam.pvalue.PCollection[apache_beam.pvalue.Row]], apache_beam.pvalue.PCollection[~MLTransformOutputT]] instead.\n",
+ "WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Union[Tuple[apache_beam.pvalue.PCollection[~MLTransformOutputT], apache_beam.pvalue.PCollection[apache_beam.pvalue.Row]], apache_beam.pvalue.PCollection[~MLTransformOutputT]] instead.\n"
+ ]
+ },
+ {
+ "data": {
+ "text/html": [
+ "{\n",
+ " \"query\": \"what is beam?\",\n",
+ " \"query_embedding\": 384,\n",
+ " \"results\": [\n",
+ " {\n",
+ " \"id\": \"1_0\",\n",
+ " \"distance\": 0.4598,\n",
+ " \"fields\": {\n",
+ " \"content\": \"Apache Beam is an open-source framework that provides a consistent programming model for both batch and streaming data processing. Developed originally by Google, it allows developers to write pipelines that can run on multiple engines, such as Apache Flink, Spark, and Google Cloud Dataflow. Beam uses abstractions like PCollections (data containers) and PTransforms (operations) to define the flow of data. The framework promotes portability through its runner architecture, letting the same pipeline execute on different backends. Support for multiple SDKs, including Java and Python, makes it accessible for a broad audience. Key features include support for event time, windowing, triggers, and stateful processing, which are essential for handling real-time data effectively. Beam is ideal for building ETL jobs, real-time analytics, and machine learning data pipelines. It helps teams focus on logic rather than infrastructure, offering flexibility and scalability in handling unbounded and bounded data sources. Apache Beam also supports a wide range of connectors for both input and output, including Kafka, BigQuery, and JDBC-based systems. This makes it easy to integrate Beam into existing data ecosystems. Developers can build reusable transforms and modularize pipeline logic, improving maintainability and testing.\",\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Beam\",\n",
+ " \"stream processing\",\n",
+ " \"batch processing\",\n",
+ " \"data pipelines\",\n",
+ " \"SDK\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Data Engineering\",\n",
+ " \"Open Source\",\n",
+ " \"Streaming\",\n",
+ " \"Batch\",\n",
+ " \"Big Data\"\n",
+ " ]\n",
+ " }\n",
+ " }\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"1_1\",\n",
+ " \"distance\": 0.4353,\n",
+ " \"fields\": {\n",
+ " \"content\": \"Developers can build reusable transforms and modularize pipeline logic, improving maintainability and testing. The concept of runners enables developers to write once and run anywhere, which is particularly appealing for organizations that want to avoid vendor lock-in. The Beam model is based on a unified programming model that decouples pipeline logic from execution. This makes it easier to reason about time and state in both batch and streaming pipelines. Advanced features like late data handling, watermarks, and session windowing allow for more accurate and meaningful processing of real-world data. Beam also integrates with orchestration tools and monitoring systems, allowing for production-grade deployments. Community support and contributions have grown significantly, making Beam a stable and evolving ecosystem. Many cloud providers offer native support for Beam pipelines, and it's increasingly a core component in modern data platform architectures.\",\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Beam\",\n",
+ " \"stream processing\",\n",
+ " \"batch processing\",\n",
+ " \"data pipelines\",\n",
+ " \"SDK\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Data Engineering\",\n",
+ " \"Open Source\",\n",
+ " \"Streaming\",\n",
+ " \"Batch\",\n",
+ " \"Big Data\"\n",
+ " ]\n",
+ " }\n",
+ " }\n",
+ " }\n",
+ " ]\n",
+ "}\n",
+ "\n"
+ ],
+ "text/plain": [
+ "\u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"what is beam?\"\u001b[0m,\n",
+ " \u001b[1;34m\"query_embedding\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n",
+ " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_0\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.4598\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"content\"\u001b[0m: \u001b[32m\"Apache Beam is an open-source framework that provides a consistent programming model for both batch and streaming data processing. Developed originally by Google, it allows developers to write pipelines that can run on multiple engines, such as Apache Flink, Spark, and Google Cloud Dataflow. Beam uses abstractions like PCollections (data containers) and PTransforms (operations) to define the flow of data. The framework promotes portability through its runner architecture, letting the same pipeline execute on different backends. Support for multiple SDKs, including Java and Python, makes it accessible for a broad audience. Key features include support for event time, windowing, triggers, and stateful processing, which are essential for handling real-time data effectively. Beam is ideal for building ETL jobs, real-time analytics, and machine learning data pipelines. It helps teams focus on logic rather than infrastructure, offering flexibility and scalability in handling unbounded and bounded data sources. Apache Beam also supports a wide range of connectors for both input and output, including Kafka, BigQuery, and JDBC-based systems. This makes it easy to integrate Beam into existing data ecosystems. Developers can build reusable transforms and modularize pipeline logic, improving maintainability and testing.\"\u001b[0m,\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"stream processing\"\u001b[0m,\n",
+ " \u001b[32m\"batch processing\"\u001b[0m,\n",
+ " \u001b[32m\"data pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"SDK\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Data Engineering\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Batch\"\u001b[0m,\n",
+ " \u001b[32m\"Big Data\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m,\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_1\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.4353\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"content\"\u001b[0m: \u001b[32m\"Developers can build reusable transforms and modularize pipeline logic, improving maintainability and testing. The concept of runners enables developers to write once and run anywhere, which is particularly appealing for organizations that want to avoid vendor lock-in. The Beam model is based on a unified programming model that decouples pipeline logic from execution. This makes it easier to reason about time and state in both batch and streaming pipelines. Advanced features like late data handling, watermarks, and session windowing allow for more accurate and meaningful processing of real-world data. Beam also integrates with orchestration tools and monitoring systems, allowing for production-grade deployments. Community support and contributions have grown significantly, making Beam a stable and evolving ecosystem. Many cloud providers offer native support for Beam pipelines, and it's increasingly a core component in modern data platform architectures.\"\u001b[0m,\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"stream processing\"\u001b[0m,\n",
+ " \u001b[32m\"batch processing\"\u001b[0m,\n",
+ " \u001b[32m\"data pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"SDK\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Data Engineering\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Batch\"\u001b[0m,\n",
+ " \u001b[32m\"Big Data\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ "\u001b[1m}\u001b[0m\n"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "with beam.Pipeline() as p:\n",
+ " _ = (\n",
+ " p\n",
+ " | \"Creating Queries\" >> beam.Create([query_chunk])\n",
+ " | 'Generating Embeddings' >> MLTransform(\n",
+ " write_artifact_location=tempfile.mkdtemp()).with_transform(huggingface_embedder)\n",
+ " | \"Enriching W/ Milvus Vector Search\" >> Enrichment(milvus_handler)\n",
+ " | \"Formatting and Printing Results\" >> FormatAndPrintResults())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "3e61bcf4-96e7-47dd-bb37-4788e99a2b89",
+ "metadata": {},
+ "source": [
+ "#### Searching for Google Beam with Filtered Search"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "a782f79b-a1a2-4474-807e-8abad62406b0",
+ "metadata": {},
+ "source": [
+ "To precisely target `Google Beam` and ensure the retrieval of only relevant documents, we can leverage the power of `metadata filtering`. By applying a filter that specifies the document's `tags` must contain `Remote Work`, we can instruct the underlying search engine to exclude any documents related to `Apache Beam` from the result set. This allows the vector search to operate on a pre-filtered, highly relevant subset of the corpus, guaranteeing that the retrieved information pertains exclusively to `Google Beam`'s domain, thereby resolving the semantic ambiguity with remarkable precision."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 53,
+ "id": "a8077395-c374-400f-abdc-fe6630eab8a4",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "query = \"what is beam?\"\n",
+ "query_chunk = Chunk(content=Content(text=query))"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 54,
+ "id": "3b712779-f283-4e37-88ed-d6b65c6c45d2",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "search_parameters = MilvusSearchParameters(\n",
+ " collection_name=collection_name,\n",
+ " search_strategy=VectorSearchParameters(filter=\"ARRAY_CONTAINS(metadata['tags'], 'Remote Work')\",limit=10,anns_field=\"embedding\"),\n",
+ " output_fields=[\"metadata\", \"content\"])"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 55,
+ "id": "7f0924a3-8832-4138-a599-d3aef648b962",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "collection_load_parameters = MilvusCollectionLoadParameters() "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 56,
+ "id": "516ecbf0-9bb0-4177-829b-b79300b29bbe",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "milvus_handler = MilvusSearchEnrichmentHandler(\n",
+ " connection_parameters=milvus_connection_parameters,\n",
+ " search_parameters=search_parameters,\n",
+ " collection_load_parameters=collection_load_parameters)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 57,
+ "id": "db32dda5-0668-4162-80ea-b6a0c2a79063",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Union[Tuple[apache_beam.pvalue.PCollection[~MLTransformOutputT], apache_beam.pvalue.PCollection[apache_beam.pvalue.Row]], apache_beam.pvalue.PCollection[~MLTransformOutputT]] instead.\n",
+ "WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Union[Tuple[apache_beam.pvalue.PCollection[~MLTransformOutputT], apache_beam.pvalue.PCollection[apache_beam.pvalue.Row]], apache_beam.pvalue.PCollection[~MLTransformOutputT]] instead.\n"
+ ]
+ },
+ {
+ "data": {
+ "text/html": [
+ "{\n",
+ " \"query\": \"what is beam?\",\n",
+ " \"query_embedding\": 384,\n",
+ " \"results\": [\n",
+ " {\n",
+ " \"id\": \"3_0\",\n",
+ " \"distance\": 0.3927,\n",
+ " \"fields\": {\n",
+ " \"content\": \"Google Beam is an innovative video communication platform that builds on the research of Project Starline. It uses AI, 3D imaging, and light field rendering to create immersive, lifelike video calls. Designed to replicate in-person interaction, Beam allows users to see life-sized, three-dimensional representations of each other without the need for headsets. This breakthrough makes remote conversations feel natural—capturing facial expressions, eye contact, and subtle gestures that traditional video conferencing often misses. Beam reduces meeting fatigue and enhances engagement, making it ideal for enterprise collaboration, interviews, and virtual presence scenarios. Powered by Google AI, Beam represents a significant leap in communication technology. Major companies like Salesforce, Deloitte, and NEC are already exploring its impact on digital collaboration. Google is partnering with HP to build and distribute Beam hardware, designed to work with existing productivity and video tools. Currently in limited early access for enterprise partners, Google Beam aims to redefine virtual meetings by bridging the gap between digital and physical presence. It’s a promising step toward more human and effective remote interactions.\",\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Google Beam: 3D Communication Powered by AI\",\n",
+ " \"keywords\": [\n",
+ " \"Google Beam\",\n",
+ " \"Project Starline\",\n",
+ " \"3D video\",\n",
+ " \"AI communication\",\n",
+ " \"real-time meetings\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"AI\",\n",
+ " \"Communication\",\n",
+ " \"3D Technology\",\n",
+ " \"Remote Work\",\n",
+ " \"Enterprise Tech\"\n",
+ " ]\n",
+ " }\n",
+ " }\n",
+ " }\n",
+ " ]\n",
+ "}\n",
+ "\n"
+ ],
+ "text/plain": [
+ "\u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"what is beam?\"\u001b[0m,\n",
+ " \u001b[1;34m\"query_embedding\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n",
+ " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"3_0\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.3927\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"content\"\u001b[0m: \u001b[32m\"Google Beam is an innovative video communication platform that builds on the research of Project Starline. It uses AI, 3D imaging, and light field rendering to create immersive, lifelike video calls. Designed to replicate in-person interaction, Beam allows users to see life-sized, three-dimensional representations of each other without the need for headsets. This breakthrough makes remote conversations feel natural—capturing facial expressions, eye contact, and subtle gestures that traditional video conferencing often misses. Beam reduces meeting fatigue and enhances engagement, making it ideal for enterprise collaboration, interviews, and virtual presence scenarios. Powered by Google AI, Beam represents a significant leap in communication technology. Major companies like Salesforce, Deloitte, and NEC are already exploring its impact on digital collaboration. Google is partnering with HP to build and distribute Beam hardware, designed to work with existing productivity and video tools. Currently in limited early access for enterprise partners, Google Beam aims to redefine virtual meetings by bridging the gap between digital and physical presence. It’s a promising step toward more human and effective remote interactions.\"\u001b[0m,\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Google Beam: 3D Communication Powered by AI\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Google Beam\"\u001b[0m,\n",
+ " \u001b[32m\"Project Starline\"\u001b[0m,\n",
+ " \u001b[32m\"3D video\"\u001b[0m,\n",
+ " \u001b[32m\"AI communication\"\u001b[0m,\n",
+ " \u001b[32m\"real-time meetings\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"AI\"\u001b[0m,\n",
+ " \u001b[32m\"Communication\"\u001b[0m,\n",
+ " \u001b[32m\"3D Technology\"\u001b[0m,\n",
+ " \u001b[32m\"Remote Work\"\u001b[0m,\n",
+ " \u001b[32m\"Enterprise Tech\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ "\u001b[1m}\u001b[0m\n"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "with beam.Pipeline() as p:\n",
+ " _ = (\n",
+ " p\n",
+ " | \"Creating Queries\" >> beam.Create([query_chunk])\n",
+ " | 'Generating Embeddings' >> MLTransform(\n",
+ " write_artifact_location=tempfile.mkdtemp()).with_transform(huggingface_embedder)\n",
+ " | \"Enriching W/ Milvus Vector Search\" >> Enrichment(milvus_handler)\n",
+ " | \"Formatting and Printing Results\" >> FormatAndPrintResults())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "c2670682-24bf-45b6-9593-bed0e3b1cee2",
+ "metadata": {},
+ "source": [
+ "## Cleanup"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 58,
+ "id": "0a3f4d66-3823-46c7-8a58-e9e8ac7899c8",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "MilvusEnrichmentTestHelper.stop_db_container(db)\n",
+ "db = None"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3 (ipykernel)",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.9.24"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py
index d71faa6d8477..12ec205d2e62 100644
--- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py
@@ -156,7 +156,7 @@ def enrichment_with_google_cloudsql_pg():
where_clause_template=where_clause_template,
where_clause_fields=where_clause_fields)
- cloudsql_handler = CloudSQLEnrichmentHandler(
+ handler = CloudSQLEnrichmentHandler(
connection_config=connection_config,
table_id=table_id,
query_config=query_config)
@@ -164,8 +164,7 @@ def enrichment_with_google_cloudsql_pg():
_ = (
p
| "Create" >> beam.Create(data)
- |
- "Enrich W/ Google CloudSQL PostgreSQL" >> Enrichment(cloudsql_handler)
+ | "Enrich W/ Google CloudSQL PostgreSQL" >> Enrichment(handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_google_cloudsql_pg]
@@ -327,3 +326,75 @@ def enrichment_with_external_sqlserver():
| "Enrich W/ Unmanaged SQL Server" >> Enrichment(cloudsql_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_external_sqlserver]
+
+
+def enrichment_with_milvus():
+ # [START enrichment_with_milvus]
+ import os
+ import apache_beam as beam
+ from apache_beam.ml.rag.types import Content
+ from apache_beam.ml.rag.types import Chunk
+ from apache_beam.ml.rag.types import Embedding
+ from apache_beam.transforms.enrichment import Enrichment
+ from apache_beam.ml.rag.enrichment.milvus_search import (
+ MilvusSearchEnrichmentHandler,
+ MilvusConnectionParameters,
+ MilvusSearchParameters,
+ MilvusCollectionLoadParameters,
+ VectorSearchParameters,
+ VectorSearchMetrics)
+
+ uri = os.environ.get("MILVUS_VECTOR_DB_URI")
+ user = os.environ.get("MILVUS_VECTOR_DB_USER")
+ password = os.environ.get("MILVUS_VECTOR_DB_PASSWORD")
+ db_id = os.environ.get("MILVUS_VECTOR_DB_ID")
+ token = os.environ.get("MILVUS_VECTOR_DB_TOKEN")
+ collection_name = os.environ.get("MILVUS_VECTOR_DB_COLLECTION_NAME")
+
+ data = [
+ Chunk(
+ id="query1",
+ embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3]),
+ content=Content())
+ ]
+
+ connection_parameters = MilvusConnectionParameters(
+ uri, user, password, db_id, token)
+
+ # The first condition (language == "en") excludes documents in other
+ # languages. Initially, this gives us two documents. After applying the second
+ # condition (cost < 50), only the first document returns in search results.
+ filter_expr = 'metadata["language"] == "en" AND cost < 50'
+
+ search_params = {"metric_type": VectorSearchMetrics.COSINE.value, "nprobe": 1}
+
+ vector_search_params = VectorSearchParameters(
+ anns_field="dense_embedding_cosine",
+ limit=3,
+ filter=filter_expr,
+ search_params=search_params)
+
+ search_parameters = MilvusSearchParameters(
+ collection_name=collection_name,
+ search_strategy=vector_search_params,
+ output_fields=["id", "content", "domain", "cost", "metadata"],
+ round_decimal=2)
+
+ # The collection load parameters are optional. They provide fine-graine
+ # control over how collections are loaded into memory. For simple use cases or
+ # when getting started, this parameter can be omitted to use default loading
+ # behavior. Consider using it in resource-constrained environments to optimize
+ # memory usage and query performance.
+ collection_load_parameters = MilvusCollectionLoadParameters()
+
+ milvus_search_handler = MilvusSearchEnrichmentHandler(
+ connection_parameters=connection_parameters,
+ search_parameters=search_parameters,
+ collection_load_parameters=collection_load_parameters)
+ with beam.Pipeline() as p:
+ _ = (
+ p
+ | "Create" >> beam.Create(data)
+ | "Enrich W/ Milvus" >> Enrichment(milvus_search_handler)
+ | "Print" >> beam.Map(print))
+ # [END enrichment_with_milvus]
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py
index 176eaa61e7a8..eeeeff77cf60 100644
--- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py
@@ -42,7 +42,8 @@
enrichment_with_google_cloudsql_pg,
enrichment_with_external_pg,
enrichment_with_external_mysql,
- enrichment_with_external_sqlserver)
+ enrichment_with_external_sqlserver,
+ enrichment_with_milvus)
from apache_beam.transforms.enrichment_handlers.cloudsql import (
DatabaseTypeAdapter)
from apache_beam.transforms.enrichment_handlers.cloudsql_it_test import (
@@ -51,9 +52,21 @@
ConnectionConfig,
CloudSQLConnectionConfig,
ExternalSQLDBConnectionConfig)
+ from apache_beam.ml.rag.enrichment.milvus_search import (
+ MilvusConnectionParameters)
+ from apache_beam.ml.rag.enrichment.milvus_search_it_test import (
+ MilvusEnrichmentTestHelper,
+ MilvusDBContainerInfo,
+ parse_chunk_strings,
+ assert_chunks_equivalent)
from apache_beam.io.requestresponse import RequestResponseIO
except ImportError as e:
- raise unittest.SkipTest(f'RequestResponseIO dependencies not installed: {e}')
+ raise unittest.SkipTest(f'Examples dependencies are not installed: {str(e)}')
+
+
+class TestContainerStartupError(Exception):
+ """Raised when any test container fails to start."""
+ pass
def validate_enrichment_with_bigtable():
@@ -119,6 +132,13 @@ def validate_enrichment_with_external_sqlserver():
return expected
+def validate_enrichment_with_milvus():
+ expected = '''[START enrichment_with_milvus]
+Chunk(content=Content(text=None), id='query1', index=0, metadata={'enrichment_data': defaultdict(, {'id': [1], 'distance': [1.0], 'fields': [{'content': 'This is a test document', 'cost': 49, 'domain': 'medical', 'id': 1, 'metadata': {'language': 'en'}}]})}, embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3], sparse_embedding=None))
+ [END enrichment_with_milvus]'''.splitlines()[1:-1]
+ return expected
+
+
@mock.patch('sys.stdout', new_callable=StringIO)
@pytest.mark.uses_testcontainer
class EnrichmentTest(unittest.TestCase):
@@ -148,48 +168,69 @@ def test_enrichment_with_vertex_ai_legacy(self, mock_stdout):
os.environ.get('ALLOYDB_PASSWORD'),
"ALLOYDB_PASSWORD environment var is not provided")
def test_enrichment_with_google_cloudsql_pg(self, mock_stdout):
- db_adapter = DatabaseTypeAdapter.POSTGRESQL
- with EnrichmentTestHelpers.sql_test_context(True, db_adapter):
- try:
+ try:
+ db_adapter = DatabaseTypeAdapter.POSTGRESQL
+ with EnrichmentTestHelpers.sql_test_context(True, db_adapter):
enrichment_with_google_cloudsql_pg()
output = mock_stdout.getvalue().splitlines()
expected = validate_enrichment_with_google_cloudsql_pg()
self.assertEqual(output, expected)
- except Exception as e:
- self.fail(f"Test failed with unexpected error: {e}")
+ except Exception as e:
+ self.fail(f"Test failed with unexpected error: {e}")
def test_enrichment_with_external_pg(self, mock_stdout):
- db_adapter = DatabaseTypeAdapter.POSTGRESQL
- with EnrichmentTestHelpers.sql_test_context(False, db_adapter):
- try:
+ try:
+ db_adapter = DatabaseTypeAdapter.POSTGRESQL
+ with EnrichmentTestHelpers.sql_test_context(False, db_adapter):
enrichment_with_external_pg()
output = mock_stdout.getvalue().splitlines()
expected = validate_enrichment_with_external_pg()
self.assertEqual(output, expected)
- except Exception as e:
- self.fail(f"Test failed with unexpected error: {e}")
+ except TestContainerStartupError as e:
+ raise unittest.SkipTest(str(e))
+ except Exception as e:
+ self.fail(f"Test failed with unexpected error: {e}")
def test_enrichment_with_external_mysql(self, mock_stdout):
- db_adapter = DatabaseTypeAdapter.MYSQL
- with EnrichmentTestHelpers.sql_test_context(False, db_adapter):
- try:
+ try:
+ db_adapter = DatabaseTypeAdapter.MYSQL
+ with EnrichmentTestHelpers.sql_test_context(False, db_adapter):
enrichment_with_external_mysql()
output = mock_stdout.getvalue().splitlines()
expected = validate_enrichment_with_external_mysql()
self.assertEqual(output, expected)
- except Exception as e:
- self.fail(f"Test failed with unexpected error: {e}")
+ except TestContainerStartupError as e:
+ raise unittest.SkipTest(str(e))
+ except Exception as e:
+ self.fail(f"Test failed with unexpected error: {e}")
def test_enrichment_with_external_sqlserver(self, mock_stdout):
- db_adapter = DatabaseTypeAdapter.SQLSERVER
- with EnrichmentTestHelpers.sql_test_context(False, db_adapter):
- try:
+ try:
+ db_adapter = DatabaseTypeAdapter.SQLSERVER
+ with EnrichmentTestHelpers.sql_test_context(False, db_adapter):
enrichment_with_external_sqlserver()
output = mock_stdout.getvalue().splitlines()
expected = validate_enrichment_with_external_sqlserver()
self.assertEqual(output, expected)
- except Exception as e:
- self.fail(f"Test failed with unexpected error: {e}")
+ except TestContainerStartupError as e:
+ raise unittest.SkipTest(str(e))
+ except Exception as e:
+ self.fail(f"Test failed with unexpected error: {e}")
+
+ def test_enrichment_with_milvus(self, mock_stdout):
+ try:
+ with EnrichmentTestHelpers.milvus_test_context():
+ enrichment_with_milvus()
+ output = mock_stdout.getvalue().splitlines()
+ expected = validate_enrichment_with_milvus()
+ self.maxDiff = None
+ output = parse_chunk_strings(output)
+ expected = parse_chunk_strings(expected)
+ assert_chunks_equivalent(output, expected)
+ except TestContainerStartupError as e:
+ raise unittest.SkipTest(str(e))
+ except Exception as e:
+ self.fail(f"Test failed with unexpected error: {e}")
@dataclass
@@ -201,6 +242,7 @@ class CloudSQLEnrichmentTestDataConstruct:
class EnrichmentTestHelpers:
+ @staticmethod
@contextmanager
def sql_test_context(is_cloudsql: bool, db_adapter: DatabaseTypeAdapter):
result: Optional[CloudSQLEnrichmentTestDataConstruct] = None
@@ -212,6 +254,17 @@ def sql_test_context(is_cloudsql: bool, db_adapter: DatabaseTypeAdapter):
if result:
EnrichmentTestHelpers.post_sql_enrichment_test(result)
+ @staticmethod
+ @contextmanager
+ def milvus_test_context():
+ db: Optional[MilvusDBContainerInfo] = None
+ try:
+ db = EnrichmentTestHelpers.pre_milvus_enrichment()
+ yield
+ finally:
+ if db:
+ EnrichmentTestHelpers.post_milvus_enrichment(db)
+
@staticmethod
def pre_sql_enrichment_test(
is_cloudsql: bool,
@@ -259,20 +312,25 @@ def pre_sql_enrichment_test(
password=password,
db_id=db_id)
else:
- db = SQLEnrichmentTestHelper.start_sql_db_container(db_adapter)
- os.environ['EXTERNAL_SQL_DB_HOST'] = db.host
- os.environ['EXTERNAL_SQL_DB_PORT'] = str(db.port)
- os.environ['EXTERNAL_SQL_DB_ID'] = db.id
- os.environ['EXTERNAL_SQL_DB_USER'] = db.user
- os.environ['EXTERNAL_SQL_DB_PASSWORD'] = db.password
- os.environ['EXTERNAL_SQL_DB_TABLE_ID'] = table_id
- connection_config = ExternalSQLDBConnectionConfig(
- db_adapter=db_adapter,
- host=db.host,
- port=db.port,
- user=db.user,
- password=db.password,
- db_id=db.id)
+ try:
+ db = SQLEnrichmentTestHelper.start_sql_db_container(db_adapter)
+ os.environ['EXTERNAL_SQL_DB_HOST'] = db.host
+ os.environ['EXTERNAL_SQL_DB_PORT'] = str(db.port)
+ os.environ['EXTERNAL_SQL_DB_ID'] = db.id
+ os.environ['EXTERNAL_SQL_DB_USER'] = db.user
+ os.environ['EXTERNAL_SQL_DB_PASSWORD'] = db.password
+ os.environ['EXTERNAL_SQL_DB_TABLE_ID'] = table_id
+ connection_config = ExternalSQLDBConnectionConfig(
+ db_adapter=db_adapter,
+ host=db.host,
+ port=db.port,
+ user=db.user,
+ password=db.password,
+ db_id=db.id)
+ except Exception as e:
+ db_name = db_adapter.value.lower()
+ raise TestContainerStartupError(
+ f"{db_name} container failed to start: {str(e)}")
conenctor = connection_config.get_connector_handler()
engine = create_engine(
@@ -311,6 +369,45 @@ def post_sql_enrichment_test(res: CloudSQLEnrichmentTestDataConstruct):
os.environ.pop('GOOGLE_CLOUD_SQL_DB_PASSWORD', None)
os.environ.pop('GOOGLE_CLOUD_SQL_DB_TABLE_ID', None)
+ @staticmethod
+ def pre_milvus_enrichment() -> MilvusDBContainerInfo:
+ try:
+ db = MilvusEnrichmentTestHelper.start_db_container()
+ except Exception as e:
+ raise TestContainerStartupError(
+ f"Milvus container failed to start: {str(e)}")
+
+ connection_params = MilvusConnectionParameters(
+ uri=db.uri,
+ user=db.user,
+ password=db.password,
+ db_id=db.id,
+ token=db.token)
+
+ collection_name = MilvusEnrichmentTestHelper.initialize_db_with_data(
+ connection_params)
+
+ # Setup environment variables for db and collection configuration. This will
+ # be used downstream by the milvus enrichment handler.
+ os.environ['MILVUS_VECTOR_DB_URI'] = db.uri
+ os.environ['MILVUS_VECTOR_DB_USER'] = db.user
+ os.environ['MILVUS_VECTOR_DB_PASSWORD'] = db.password
+ os.environ['MILVUS_VECTOR_DB_ID'] = db.id
+ os.environ['MILVUS_VECTOR_DB_TOKEN'] = db.token
+ os.environ['MILVUS_VECTOR_DB_COLLECTION_NAME'] = collection_name
+
+ return db
+
+ @staticmethod
+ def post_milvus_enrichment(db: MilvusDBContainerInfo):
+ MilvusEnrichmentTestHelper.stop_db_container(db)
+ os.environ.pop('MILVUS_VECTOR_DB_URI', None)
+ os.environ.pop('MILVUS_VECTOR_DB_USER', None)
+ os.environ.pop('MILVUS_VECTOR_DB_PASSWORD', None)
+ os.environ.pop('MILVUS_VECTOR_DB_ID', None)
+ os.environ.pop('MILVUS_VECTOR_DB_TOKEN', None)
+ os.environ.pop('MILVUS_VECTOR_DB_COLLECTION_NAME', None)
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdks/python/apache_beam/ml/rag/enrichment/milvus_search_it_test.py b/sdks/python/apache_beam/ml/rag/enrichment/milvus_search_it_test.py
index 5099b861be11..6c233586e3c2 100644
--- a/sdks/python/apache_beam/ml/rag/enrichment/milvus_search_it_test.py
+++ b/sdks/python/apache_beam/ml/rag/enrichment/milvus_search_it_test.py
@@ -295,9 +295,15 @@ def __init__(
class MilvusEnrichmentTestHelper:
+ # IMPORTANT: When upgrading the Milvus server version, ensure the pymilvus
+ # Python SDK client in setup.py is updated to match. Referring to the Milvus
+ # release notes compatibility matrix at
+ # https://milvus.io/docs/release_notes.md or PyPI at
+ # https://pypi.org/project/pymilvus/ for version compatibility.
+ # Example: Milvus v2.6.0 requires pymilvus==2.6.0 (exact match required).
@staticmethod
def start_db_container(
- image="milvusdb/milvus:v2.6.2",
+ image="milvusdb/milvus:v2.5.10",
max_vec_fields=5,
vector_client_max_retries=3,
tc_max_retries=TC_MAX_TRIES) -> Optional[MilvusDBContainerInfo]:
@@ -455,6 +461,13 @@ def create_user_yaml(service_port: int, max_vector_field_num=5):
user_config = {
'proxy': {
'maxVectorFieldNum': max_vector_field_num, 'port': service_port
+ },
+ 'etcd': {
+ 'use': {
+ 'embed': True
+ }, 'data': {
+ 'dir': '/var/lib/milvus/etcd'
+ }
}
}
@@ -481,11 +494,10 @@ class TestMilvusSearchEnrichment(unittest.TestCase):
"""Tests for search functionality across all search strategies"""
_db: MilvusDBContainerInfo
- _version = "milvusdb/milvus:v2.5.10"
@classmethod
def setUpClass(cls):
- cls._db = MilvusEnrichmentTestHelper.start_db_container(cls._version)
+ cls._db = MilvusEnrichmentTestHelper.start_db_container()
cls._connection_params = MilvusConnectionParameters(
uri=cls._db.uri,
user=cls._db.user,
@@ -1309,11 +1321,7 @@ def assert_chunks_equivalent(
expected_data = expected.metadata['enrichment_data']
# If actual has enrichment data, then perform detailed validation.
- if actual_data:
- # Ensure the id key exist.
- err_msg = f"Missing id key in metadata {actual.id}"
- assert 'id' in actual_data, err_msg
-
+ if actual_data and actual_data.get('id'):
# Validate IDs have consistent ordering.
actual_ids = sorted(actual_data['id'])
expected_ids = sorted(expected_data['id'])
diff --git a/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md
index a29b2672e678..c76bfc59ac24 100644
--- a/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md
+++ b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md
@@ -139,8 +139,6 @@ Output:
{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py" enrichment_with_external_sqlserver >}}
{{< /highlight >}}
-## Related transforms
-
-Not applicable.
+## API documentation
{{< button-pydoc path="apache_beam.transforms.enrichment_handlers.cloudsql" class="CloudSQLEnrichmentHandler" >}}
diff --git a/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-milvus.md b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-milvus.md
new file mode 100644
index 000000000000..f57c2b627ec1
--- /dev/null
+++ b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-milvus.md
@@ -0,0 +1,65 @@
+---
+title: "Enrichment with Milvus"
+---
+
+
+# Use Milvus to enrich data
+
+{{< localstorage language language-py >}}
+
+
+
+In Apache Beam 2.67.0 and later versions, the enrichment transform includes
+a built-in enrichment handler for
+[Milvus](https://milvus.io/).
+The following example demonstrates how to create a pipeline that use the enrichment transform with the [`MilvusSearchEnrichmentHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.rag.enrichment.milvus_search.html#apache_beam.ml.rag.enrichment.milvus_search.MilvusSearchEnrichmentHandler) handler.
+
+The data in the Milvus instance collection `docs_catalog` follows this format:
+
+{{< table >}}
+| id | content | domain | cost | metadata | dense_embedding | sparse_embedding |
+|:--:|:-------:|:------:|:----:|:--------:|:--------------:|:----------------:|
+| 1 | This is a test document | medical | 49 | {"language": "en"} | [0.1, 0.2, 0.3] | [auto-generated by Milvus] |
+| 2 | Another test document | legal | 75 | {"language": "en"} | [0.2, 0.3, 0.4] | [auto-generated by Milvus] |
+| 3 | وثيقة اختبار | financial | 149 | {"language": "ar"} | [0.3, 0.4, 0.5] | [auto-generated by Milvus] |
+{{< /table >}}
+
+
+{{< highlight language="py" >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py" enrichment_with_milvus >}}
+{{ highlight >}}
+
+{{< paragraph class="notebook-skip" >}}
+Output:
+{{< /paragraph >}}
+{{< highlight class="notebook-skip" >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py" enrichment_with_milvus >}}
+{{< /highlight >}}
+
+## Notebook exmaple
+
+
+
+
+
+## API documentation
+
+{{< button-pydoc path="apache_beam.ml.rag.enrichment.milvus_search" class="MilvusSearchEnrichmentHandler" >}}
diff --git a/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment.md b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment.md
index 4b352d0447ad..bd9ab25593ae 100644
--- a/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment.md
+++ b/website/www/site/content/en/documentation/transforms/python/elementwise/enrichment.md
@@ -42,6 +42,7 @@ The following examples demonstrate how to create a pipeline that use the enrichm
| Service | Example |
|:-----------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Cloud Bigtable | [Enrichment with Bigtable](/documentation/transforms/python/elementwise/enrichment-bigtable/#example) |
+| Milvus | [Enrichment with Milvus](/documentation/transforms/python/elementwise/enrichment-milvus/#example) |
| Cloud SQL (PostgreSQL, MySQL, SQLServer) | [Enrichment with CloudSQL](/documentation/transforms/python/elementwise/enrichment-cloudsql) |
| Vertex AI Feature Store | [Enrichment with Vertex AI Feature Store](/documentation/transforms/python/elementwise/enrichment-vertexai/#example-1-enrichment-with-vertex-ai-feature-store) |
| Vertex AI Feature Store (Legacy) | [Enrichment with Legacy Vertex AI Feature Store](/documentation/transforms/python/elementwise/enrichment-vertexai/#example-2-enrichment-with-vertex-ai-feature-store-legacy) |
@@ -100,4 +101,4 @@ enriched_data = (input_data
Not applicable.
-{{< button-pydoc path="apache_beam.transforms.enrichment" class="Enrichment" >}}
\ No newline at end of file
+{{< button-pydoc path="apache_beam.transforms.enrichment" class="Enrichment" >}}
diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html
index 1a60cfbdd9f1..0cc197d95fdc 100755
--- a/website/www/site/layouts/partials/section-menu/en/documentation.html
+++ b/website/www/site/layouts/partials/section-menu/en/documentation.html
@@ -297,6 +297,7 @@