@@ -109,8 +109,22 @@ def test_propagation(enable_extended_tracing):
109109 len (from_inject_spans ) >= 2
110110 ) # "Expecting at least 2 spans from the injected trace exporter"
111111 gotNames = [span .name for span in from_inject_spans ]
112+
113+ # Check if multiplexed sessions are enabled
114+ import os
115+ multiplexed_enabled = (
116+ os .getenv ("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS" , "" ).lower () == "true"
117+ )
118+
119+ # Determine expected session span name based on multiplexed sessions
120+ expected_session_span_name = (
121+ "CloudSpanner.CreateMultiplexedSession"
122+ if multiplexed_enabled
123+ else "CloudSpanner.CreateSession"
124+ )
125+
112126 wantNames = [
113- "CloudSpanner.CreateSession" ,
127+ expected_session_span_name ,
114128 "CloudSpanner.Snapshot.execute_sql" ,
115129 ]
116130 assert gotNames == wantNames
@@ -392,6 +406,7 @@ def tx_update(txn):
392406 reason = "Tracing requires OpenTelemetry" ,
393407)
394408def test_database_partitioned_error ():
409+ import os
395410 from opentelemetry .trace .status import StatusCode
396411
397412 db , trace_exporter = create_db_trace_exporter ()
@@ -402,43 +417,101 @@ def test_database_partitioned_error():
402417 pass
403418
404419 got_statuses , got_events = finished_spans_statuses (trace_exporter )
405- # Check for the series of events
406- want_events = [
407- ("Acquiring session" , {"kind" : "BurstyPool" }),
408- ("Waiting for a session to become available" , {"kind" : "BurstyPool" }),
409- ("No sessions available in pool. Creating session" , {"kind" : "BurstyPool" }),
410- ("Creating Session" , {}),
411- ("Starting BeginTransaction" , {}),
412- (
420+
421+ # Check if multiplexed sessions are enabled for partitioned operations
422+ multiplexed_partitioned_enabled = (
423+ os .getenv ("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS" ) == "true"
424+ )
425+
426+ # Define expected events based on whether multiplexed sessions are enabled
427+ if multiplexed_partitioned_enabled :
428+ # When multiplexed sessions are enabled for partitioned operations,
429+ # the execution path is different - sessions manager creates multiplexed sessions directly
430+ expected_event_names = [
431+ "Creating Session" ,
432+ "Using session" ,
433+ "Starting BeginTransaction" ,
434+ "Returning session" ,
413435 "exception" ,
414- {
415- "exception.type" : "google.api_core.exceptions.InvalidArgument" ,
416- "exception.message" : "400 Table not found: NonExistent [at 1:8]\n UPDATE NonExistent SET name = 'foo' WHERE id > 1\n ^" ,
417- "exception.stacktrace" : "EPHEMERAL" ,
418- "exception.escaped" : "False" ,
419- },
420- ),
421- (
422436 "exception" ,
423- {
424- "exception.type" : "google.api_core.exceptions.InvalidArgument" ,
425- "exception.message" : "400 Table not found: NonExistent [at 1:8]\n UPDATE NonExistent SET name = 'foo' WHERE id > 1\n ^" ,
426- "exception.stacktrace" : "EPHEMERAL" ,
427- "exception.escaped" : "False" ,
428- },
429- ),
430- ]
431- assert got_events == want_events
437+ ]
438+ # Check that we have the expected events
439+ assert len (got_events ) == len (expected_event_names )
440+ for i , expected_name in enumerate (expected_event_names ):
441+ assert got_events [i ][0 ] == expected_name
442+
443+ # Verify session usage event shows multiplexed session
444+ assert got_events [1 ][1 ]["multiplexed" ] is True
445+
446+ # Verify session return event shows multiplexed session
447+ assert got_events [3 ][1 ]["multiplexed" ] is True
448+
449+ # Verify the exception details
450+ for i in [4 , 5 ]: # Both exception events
451+ assert (
452+ got_events [i ][1 ]["exception.type" ]
453+ == "google.api_core.exceptions.InvalidArgument"
454+ )
455+ assert (
456+ "Table not found: NonExistent" in got_events [i ][1 ]["exception.message" ]
457+ )
458+ else :
459+ # When multiplexed sessions are disabled, sessions manager still manages sessions
460+ # but uses regular pool sessions instead of multiplexed sessions
461+ expected_event_names = [
462+ "Acquiring session" ,
463+ "Waiting for a session to become available" ,
464+ "No sessions available in pool. Creating session" ,
465+ "Creating Session" ,
466+ "Using session" ,
467+ "Starting BeginTransaction" ,
468+ "Returning session" ,
469+ "exception" ,
470+ "exception" ,
471+ ]
432472
433- # Check for the statues.
473+ # Check that we have the expected events
474+ assert len (got_events ) == len (expected_event_names )
475+ for i , expected_name in enumerate (expected_event_names ):
476+ assert got_events [i ][0 ] == expected_name
477+
478+ # Verify pool-related events
479+ assert got_events [0 ][1 ]["kind" ] == "BurstyPool"
480+ assert got_events [1 ][1 ]["kind" ] == "BurstyPool"
481+ assert got_events [2 ][1 ]["kind" ] == "BurstyPool"
482+
483+ # Verify session usage event shows non-multiplexed session
484+ assert got_events [4 ][1 ]["multiplexed" ] is False
485+
486+ # Verify session return event shows non-multiplexed session
487+ assert got_events [6 ][1 ]["multiplexed" ] is False
488+
489+ # Verify the exception details
490+ for i in [7 , 8 ]: # Both exception events
491+ assert (
492+ got_events [i ][1 ]["exception.type" ]
493+ == "google.api_core.exceptions.InvalidArgument"
494+ )
495+ assert (
496+ "Table not found: NonExistent" in got_events [i ][1 ]["exception.message" ]
497+ )
498+
499+ # Check for the statuses.
434500 codes = StatusCode
501+
502+ # Determine expected session creation span name based on multiplexed sessions
503+ expected_session_span_name = (
504+ "CloudSpanner.CreateMultiplexedSession"
505+ if multiplexed_partitioned_enabled
506+ else "CloudSpanner.CreateSession"
507+ )
435508 want_statuses = [
436509 (
437510 "CloudSpanner.Database.execute_partitioned_pdml" ,
438511 codes .ERROR ,
439512 "InvalidArgument: 400 Table not found: NonExistent [at 1:8]\n UPDATE NonExistent SET name = 'foo' WHERE id > 1\n ^" ,
440513 ),
441- ("CloudSpanner.CreateSession" , codes .OK , None ),
514+ (expected_session_span_name , codes .OK , None ),
442515 (
443516 "CloudSpanner.ExecuteStreamingSql" ,
444517 codes .ERROR ,
0 commit comments