@@ -133,51 +133,115 @@ def test_oauth_access_token(self):
133133from io import StringIO
134134import sys
135135
136- # Example static base32 secret
137- TOTP_SECRET = "O5D7DQICJTM34AZROWHSAO4O53ELRJN3"
138136
139137@pytest .fixture
140- def totp_code ():
141- return pyotp .TOTP (TOTP_SECRET ).now ()
142-
143- def test_totp_connection_string (totp_code ):
144- """Test: TOTP passed via connection info (like Go test WithTOTPInConnStr)"""
145- try :
146- conn_info = {
147- 'user' : "dbadmin" ,
148- 'host' : "localhost" ,
149- 'port' : 5433 ,
150- 'database' : "dbadmin" ,
151- 'totp' : totp_code ,
152- 'ssl' : False ,
153- }
154-
155- with vertica_python .connect (** conn_info ) as conn :
156- cur = conn .cursor ()
157- cur .execute ("SELECT version()" )
158- result = cur .fetchone ()
159- print ("[ConnStr] Connected to Vertica Version:" , result [0 ])
160- except Exception as e :
161- pytest .skip (f"Skipping test_totp_connection_string due to connection error: { e } " )
162-
163- def test_totp_from_stdin (monkeypatch , totp_code ):
164- """Test: Simulate user entering TOTP at runtime (like Go test WithTOTPFromStdin)"""
165- try :
166- monkeypatch .setattr ('sys.stdin' , StringIO (totp_code + "\n " ))
167-
168- conn_info = {
169- 'user' : "dbadmin" ,
170- 'host' : "localhost" ,
171- 'port' : 5433 ,
172- 'database' : "dbadmin" ,
173- # Note: TOTP not passed here, simulating prompt from driver
174- 'ssl' : False ,
175- }
176-
177- with vertica_python .connect (** conn_info ) as conn :
178- cur = conn .cursor ()
179- cur .execute ("SELECT version()" )
180- result = cur .fetchone ()
181- print ("[Stdin] Connected to Vertica Version:" , result [0 ])
182- except Exception as e :
183- pytest .skip (f"Skipping test_totp_from_stdin due to connection error: { e } " )
138+ # Positive TOTP Test (Like SHA512 format)
139+ def test_TOTP (self ):
140+ with self ._connect () as conn :
141+ cur = conn .cursor ()
142+
143+ cur .execute ("DROP USER IF EXISTS totp_user" )
144+ cur .execute ("DROP AUTHENTICATION IF EXISTS totp_auth CASCADE" )
145+
146+ try :
147+ # Create user with MFA
148+ cur .execute ("CREATE USER totp_user IDENTIFIED BY 'password' ENFORCEMFA" )
149+
150+ # Grant authentication
151+ # Note: METHOD is 'trusted' or 'password' depending on how MFA is enforced in Vertica
152+ cur .execute ("CREATE AUTHENTICATION totp_auth METHOD 'password' HOST '0.0.0.0/0'" )
153+ cur .execute ("GRANT AUTHENTICATION totp_auth TO totp_user" )
154+
155+ # Generate TOTP
156+ import pyotp
157+ TOTP_SECRET = "O5D7DQICJTM34AZROWHSAO4O53ELRJN3"
158+ totp_code = pyotp .TOTP (TOTP_SECRET ).now ()
159+
160+ # Set connection info
161+ self ._conn_info ['user' ] = 'totp_user'
162+ self ._conn_info ['password' ] = 'password'
163+ self ._conn_info ['totp' ] = totp_code
164+
165+ # Try connection
166+ with self ._connect () as totp_conn :
167+ c = totp_conn .cursor ()
168+ c .execute ("SELECT 1" )
169+ res = c .fetchone ()
170+ self .assertEqual (res [0 ], 1 )
171+
172+ finally :
173+ cur .execute ("DROP USER IF EXISTS totp_user" )
174+ cur .execute ("DROP AUTHENTICATION IF EXISTS totp_auth CASCADE" )
175+
176+ # Negative Test: Missing TOTP
177+ def test_TOTP_missing_code (self ):
178+ with self ._connect () as conn :
179+ cur = conn .cursor ()
180+
181+ cur .execute ("DROP USER IF EXISTS totp_user" )
182+ cur .execute ("DROP AUTHENTICATION IF EXISTS totp_auth CASCADE" )
183+
184+ try :
185+ cur .execute ("CREATE USER totp_user IDENTIFIED BY 'password' ENFORCEMFA" )
186+ cur .execute ("CREATE AUTHENTICATION totp_auth METHOD 'password' HOST '0.0.0.0/0'" )
187+ cur .execute ("GRANT AUTHENTICATION totp_auth TO totp_user" )
188+
189+ self ._conn_info ['user' ] = 'totp_user'
190+ self ._conn_info ['password' ] = 'password'
191+ self ._conn_info .pop ('totp' , None ) # No TOTP
192+
193+ err_msg = "TOTP was requested but not provided"
194+ self .assertConnectionFail (err_msg = err_msg )
195+
196+ finally :
197+ cur .execute ("DROP USER IF EXISTS totp_user" )
198+ cur .execute ("DROP AUTHENTICATION IF EXISTS totp_auth CASCADE" )
199+
200+ # Negative Test: Invalid TOTP Format
201+ def test_TOTP_invalid_format (self ):
202+ with self ._connect () as conn :
203+ cur = conn .cursor ()
204+
205+ cur .execute ("DROP USER IF EXISTS totp_user" )
206+ cur .execute ("DROP AUTHENTICATION IF EXISTS totp_auth CASCADE" )
207+
208+ try :
209+ cur .execute ("CREATE USER totp_user IDENTIFIED BY 'password' ENFORCEMFA" )
210+ cur .execute ("CREATE AUTHENTICATION totp_auth METHOD 'password' HOST '0.0.0.0/0'" )
211+ cur .execute ("GRANT AUTHENTICATION totp_auth TO totp_user" )
212+
213+ self ._conn_info ['user' ] = 'totp_user'
214+ self ._conn_info ['password' ] = 'password'
215+ self ._conn_info ['totp' ] = "123" # Invalid
216+
217+ err_msg = "Invalid TOTP format"
218+ self .assertConnectionFail (err_msg = err_msg )
219+
220+ finally :
221+ cur .execute ("DROP USER IF EXISTS totp_user" )
222+ cur .execute ("DROP AUTHENTICATION IF EXISTS totp_auth CASCADE" )
223+
224+ # Negative Test: Wrong TOTP (Valid format, wrong value)
225+ def test_TOTP_wrong_code (self ):
226+ with self ._connect () as conn :
227+ cur = conn .cursor ()
228+
229+ cur .execute ("DROP USER IF EXISTS totp_user" )
230+ cur .execute ("DROP AUTHENTICATION IF EXISTS totp_auth CASCADE" )
231+
232+ try :
233+ cur .execute ("CREATE USER totp_user IDENTIFIED BY 'password' ENFORCEMFA" )
234+ cur .execute ("CREATE AUTHENTICATION totp_auth METHOD 'password' HOST '0.0.0.0/0'" )
235+ cur .execute ("GRANT AUTHENTICATION totp_auth TO totp_user" )
236+
237+ self ._conn_info ['user' ] = 'totp_user'
238+ self ._conn_info ['password' ] = 'password'
239+ self ._conn_info ['totp' ] = "999999" # Wrong OTP
240+
241+ err_msg = "Invalid TOTP"
242+ self .assertConnectionFail (err_msg = err_msg )
243+
244+ finally :
245+ cur .execute ("DROP USER IF EXISTS totp_user" )
246+ cur .execute ("DROP AUTHENTICATION IF EXISTS totp_auth CASCADE" )
247+
0 commit comments