@@ -298,56 +298,56 @@ def test_sslcontext_verify_full(self):
298298 res = self ._query_and_fetchone (self .SSL_STATE_SQL )
299299 self .assertEqual (res [0 ], 'Server' )
300300
301- def tls13_support_auto_negotiation (self ):
302- """
303- Verify that the client supports TLS 1.3 negotiation.
304- If the server supports TLS 1.3, the connection should establish using it.
305- If the server supports only TLS 1.2, the connection should still succeed.
306- """
307-
308- # Set up server certificates and enable TLS
309- CA_cert = self . _generate_and_set_certificates ( )
310-
311- # Create SSL context allowing both TLS 1.2 and 1.3
312- ssl_context = ssl . SSLContext ( ssl . PROTOCOL_TLS_CLIENT )
313- ssl_context . verify_mode = ssl . CERT_REQUIRED
314- ssl_context . check_hostname = True
315- ssl_context . load_verify_locations ( cadata = CA_cert )
316-
317- # Assign SSL context to connection info
318- self . _conn_info [ 'ssl' ] = ssl_context
319-
320- with self . _connect () as conn :
321- cur = conn . cursor ()
322- res = self . _query_and_fetchone ( self . SSL_STATE_SQL )
323- self .assertEqual ( res [ 0 ], 'Server' )
324-
325- # Try to get the negotiated TLS version from the socket
326- tls_version = None
327- try :
328- if hasattr ( conn . _socket , "_sslobj" ):
329- tls_version = conn . _socket . _sslobj . version ()
330- elif hasattr ( conn . _socket , "version" ):
331- tls_version = conn . _socket . version ()
332- except Exception :
333- pass
334-
335- # Log version for debug (optional)
336- print ( f"Negotiated TLS version: { tls_version } " )
337-
338- # Ensure TLS negotiation was successful
339- self .assertIsNotNone ( tls_version , "Could not determine negotiated TLS version" )
340-
341- # Accept both 1.2 and 1.3, but prefer 1.3 if available
342- self . assertIn (
343- tls_version , ( "TLSv1.2" , "TLSv1.3" ),
344- msg = f"Unexpected TLS version negotiated: { tls_version } "
345- )
346-
347- if tls_version == "TLSv1.3" :
348- print ( "TLS 1.3 is successfully negotiated and supported." )
349- else :
350- print ( "Fell back to TLS 1.2 (TLS 1.3 not supported by server)." )
301+ def _get_tls_version (self , conn ):
302+ sock = getattr ( conn , '_socket' , None )
303+ if not sock :
304+ return None
305+
306+ if hasattr ( sock , 'version' ) and callable ( sock . version ):
307+ return sock . version ()
308+
309+ ssl_obj = getattr ( sock , '_sslobj' , None )
310+ if ssl_obj and hasattr ( ssl_obj , 'version' ):
311+ return ssl_obj . version ()
312+
313+ return None
314+
315+ def test_tls13_support_auto_negotiation ( self ):
316+ """
317+ Verify that the client supports TLS 1.3 negotiation.
318+ If the server supports TLS 1.3, the connection should establish using it.
319+ If the server supports only TLS 1.2, the connection should still succeed.
320+ """
321+
322+ # Set up server certificates and enable TLS
323+ CA_cert = self ._generate_and_set_certificates ( )
324+
325+ ssl_context = ssl . SSLContext ( ssl . PROTOCOL_TLS_CLIENT )
326+ ssl_context . verify_mode = ssl . CERT_REQUIRED
327+ ssl_context . check_hostname = True
328+ ssl_context . load_verify_locations ( cadata = CA_cert )
329+
330+ self . _conn_info [ 'ssl' ] = ssl_context
331+
332+ with self . _connect () as conn :
333+ # First ensure TLS really got enabled on server
334+ res = self . _query_and_fetchone ( self . SSL_STATE_SQL )
335+ if res [ 0 ] != 'Server' :
336+ self . skipTest ( " TLS is not configured on server " )
337+
338+ # Prefer public API, fall back only if needed
339+ tls_version = self ._get_tls_version ( conn )
340+
341+ self . assertIsNotNone (
342+ tls_version ,
343+ "Could not determine negotiated TLS version"
344+ )
345+
346+ self . assertIn (
347+ tls_version ,
348+ ( "TLSv1.2" , "TLSv1.3" ),
349+ msg = f"Unexpected TLS version negotiated: { tls_version } "
350+ )
351351
352352 def test_sslcontext_mutual_TLS (self ):
353353 # Setting certificates with TLS configuration
0 commit comments