From 1dc8447a7702353c0fe69f0e4c0b0ef78d162f0d Mon Sep 17 00:00:00 2001 From: Alex Malao Date: Tue, 4 Nov 2025 11:49:00 -0500 Subject: [PATCH 1/7] added source window to assign context window mapping fn --- sdks/python/apache_beam/transforms/sideinputs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index 7d72a02f8874..dd6482e38c2f 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -60,7 +60,8 @@ def default_window_mapping_fn( def map_via_end(source_window: window.BoundedWindow) -> window.BoundedWindow: return list( target_window_fn.assign( - window.WindowFn.AssignContext(source_window.max_timestamp())))[-1] + window.WindowFn.AssignContext(source_window.max_timestamp(), + window=source_window)))[-1] return map_via_end From a30b97ff739dcdfdf834cc4fb68b8fe8dca4f147 Mon Sep 17 00:00:00 2001 From: Alex Malao Date: Tue, 4 Nov 2025 13:34:50 -0500 Subject: [PATCH 2/7] yapf --- sdks/python/apache_beam/transforms/sideinputs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index dd6482e38c2f..a38e05d66cbe 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -60,8 +60,8 @@ def default_window_mapping_fn( def map_via_end(source_window: window.BoundedWindow) -> window.BoundedWindow: return list( target_window_fn.assign( - window.WindowFn.AssignContext(source_window.max_timestamp(), - window=source_window)))[-1] + window.WindowFn.AssignContext( + source_window.max_timestamp(), window=source_window)))[-1] return map_via_end From d6d570da5e4e6541b05b7376d13c7e7a298faeb1 Mon Sep 17 00:00:00 2001 From: Alex Malao Date: Tue, 18 Nov 2025 17:02:43 -0500 Subject: [PATCH 3/7] added default window mapping test --- .../apache_beam/transforms/sideinputs_test.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 5f3cf761e1eb..bc3aca1905d9 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -30,6 +30,7 @@ from typing import Union import pytest +from unittest import mock import apache_beam as beam from apache_beam.testing.synthetic_pipeline import SyntheticSDFAsSource @@ -41,6 +42,7 @@ from apache_beam.transforms import Map from apache_beam.transforms import trigger from apache_beam.transforms import window +from apache_beam.transforms import sideinputs from apache_beam.utils.timestamp import Timestamp @@ -489,6 +491,33 @@ def process( assert_that(results, equal_to([(num_records, expected_fingerprint)])) pipeline.run() + def test_default_window_mapping_fn_source_window(self): + """Test that the default window mapping function will propagate the + source window when attempting to assign context. + """ + + class StringIDWindow(window.BoundedWindow): + """A window defined by an arbitrary string ID.""" + + def __init__(self, window_id: str): + super().__init__(self._getTimestampFromProto()) + self.id = window_id + + class StringIDWindows(window.NonMergingWindowFn): + """ A windowing function that assigns each element a window with ID.""" + def assign( + self, assign_context: window.WindowFn.AssignContext + ) -> Iterable[StringIDWindow]: + if assign_context.element is None: + return [assign_context.window] + return [StringIDWindow(str(assign_context.element))] + + mapping_fn = sideinputs.default_window_mapping_fn(StringIDWindows()) + source_window = StringIDWindows().assign(window.WindowFn.AssignContext(Timestamp(10), element='element'))[0] + bounded_window = mapping_fn(source_window) + assert bounded_window is not None + assert bounded_window.id == 'element' + if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG) From fedded5b9f77e6bd4cbf8af7c52e169928fa9fef Mon Sep 17 00:00:00 2001 From: Alex Malao Date: Wed, 19 Nov 2025 11:11:31 -0500 Subject: [PATCH 4/7] side input test --- sdks/python/apache_beam/transforms/sideinputs_test.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index bc3aca1905d9..9517c0390c54 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -495,10 +495,8 @@ def test_default_window_mapping_fn_source_window(self): """Test that the default window mapping function will propagate the source window when attempting to assign context. """ - class StringIDWindow(window.BoundedWindow): """A window defined by an arbitrary string ID.""" - def __init__(self, window_id: str): super().__init__(self._getTimestampFromProto()) self.id = window_id @@ -506,14 +504,15 @@ def __init__(self, window_id: str): class StringIDWindows(window.NonMergingWindowFn): """ A windowing function that assigns each element a window with ID.""" def assign( - self, assign_context: window.WindowFn.AssignContext + self, assign_context: window.WindowFn.AssignContext ) -> Iterable[StringIDWindow]: if assign_context.element is None: return [assign_context.window] return [StringIDWindow(str(assign_context.element))] mapping_fn = sideinputs.default_window_mapping_fn(StringIDWindows()) - source_window = StringIDWindows().assign(window.WindowFn.AssignContext(Timestamp(10), element='element'))[0] + source_window = StringIDWindows().assign( + window.WindowFn.AssignContext(Timestamp(10), element='element'))[0] bounded_window = mapping_fn(source_window) assert bounded_window is not None assert bounded_window.id == 'element' From 8dabb36eba8a3cd7ca2d19a7f2f8a911ed37793a Mon Sep 17 00:00:00 2001 From: Alex Malao Date: Wed, 11 Feb 2026 13:56:05 -0500 Subject: [PATCH 5/7] added window and windowing fn required methods --- sdks/python/apache_beam/transforms/sideinputs_test.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 9517c0390c54..be73b0f40fda 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -501,6 +501,10 @@ def __init__(self, window_id: str): super().__init__(self._getTimestampFromProto()) self.id = window_id + @staticmethod + def _getTimestampFromProto() -> Timestamp: + return Timestamp(micros=0) + class StringIDWindows(window.NonMergingWindowFn): """ A windowing function that assigns each element a window with ID.""" def assign( @@ -510,6 +514,9 @@ def assign( return [assign_context.window] return [StringIDWindow(str(assign_context.element))] + def get_window_coder(self): + return None # temp value + mapping_fn = sideinputs.default_window_mapping_fn(StringIDWindows()) source_window = StringIDWindows().assign( window.WindowFn.AssignContext(Timestamp(10), element='element'))[0] From 450d8d31d2f2509b0e5c8e441dad6c91a783255f Mon Sep 17 00:00:00 2001 From: Alex Malao Date: Wed, 11 Feb 2026 13:56:08 -0500 Subject: [PATCH 6/7] added window and windowing fn required methods --- sdks/python/apache_beam/transforms/sideinputs_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index be73b0f40fda..64d50382c19c 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -515,7 +515,7 @@ def assign( return [StringIDWindow(str(assign_context.element))] def get_window_coder(self): - return None # temp value + return None mapping_fn = sideinputs.default_window_mapping_fn(StringIDWindows()) source_window = StringIDWindows().assign( From e41566c317307c97cc6d6af5e3750e02332d6d33 Mon Sep 17 00:00:00 2001 From: Alex Malao Date: Tue, 24 Feb 2026 13:59:23 -0500 Subject: [PATCH 7/7] updated typehint for assign in sideinputs_test.py --- sdks/python/apache_beam/transforms/sideinputs_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 64d50382c19c..2794e2ac31bb 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -509,7 +509,7 @@ class StringIDWindows(window.NonMergingWindowFn): """ A windowing function that assigns each element a window with ID.""" def assign( self, assign_context: window.WindowFn.AssignContext - ) -> Iterable[StringIDWindow]: + ) -> Iterable[BoundedWindow | None]: if assign_context.element is None: return [assign_context.window] return [StringIDWindow(str(assign_context.element))]