From 4c0f9b0974122b1607e14b96df3d1cd8a863d019 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 23 Nov 2025 22:26:47 +0000 Subject: [PATCH] feat: Add timezone parameter to get_hist() - Issue #72 Addresses GitHub issue #72 by allowing users to specify the timezone for the datetime index in the returned DataFrame. Changes: - Add `timezone` parameter to get_hist() method - Add _get_timezone_object() helper function for timezone resolution - Modify __create_df() to convert timestamps to specified timezone - Support both zoneinfo (Python 3.9+) and pytz as fallback - Add TV_TIMEZONE environment variable support - Maintain backward compatibility (no timezone = local system time) Timezone priority: parameter > TV_TIMEZONE env var > local system Examples: - timezone='UTC' returns UTC timestamps - timezone='America/New_York' returns EST/EDT timestamps - No timezone = local system time (backward compatible) Tests: Added 15 new tests in TestTimezoneFeature class - Timezone object creation tests - Conversion accuracy tests (UTC, EST) - Environment variable priority tests - Invalid timezone error tests Documentation updated in README.md and CLAUDE.md --- CLAUDE.md | 19 + README.md | 34 +- tests/unit/test_main.py | 2850 +++++++++++++++++++++------------------ tvDatafeed/main.py | 158 ++- 4 files changed, 1737 insertions(+), 1324 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index d7af588..7025904 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -669,6 +669,25 @@ Résultats avec JWT token (Pro Premium) : - 📝 Clés reCAPTCHA TradingView identifiées - 📝 Différence documentée entre `sessionid` (cookie) et `auth_token` (JWT) +### Version 1.7 (2025-11-23) +- ✅ **Issue #72 - Timezone Support** : + - Nouveau paramètre `timezone` dans `get_hist()` pour spécifier le fuseau horaire des données + - Supporte tous les timezones IANA (UTC, America/New_York, Europe/Paris, Asia/Tokyo, etc.) + - Variable d'environnement `TV_TIMEZONE` pour définir le timezone par défaut + - Priorité : paramètre > variable d'environnement > timezone local du système + - Rétrocompatibilité totale : sans paramètre timezone, comportement identique à avant +- ✅ **Implémentation technique** : + - Fonction `_get_timezone_object()` pour résoudre les noms de timezone + - Support `zoneinfo` (Python 3.9+) avec fallback vers `pytz` + - Modification de `__create_df()` pour la conversion des timestamps + - Stockage du timezone dans `df.attrs['timezone']` +- ✅ **Tests ajoutés** : + - 15 nouveaux tests unitaires dans `TestTimezoneFeature` + - Tests de conversion UTC et EST + - Tests de validation des timezones courants + - Tests de priorité paramètre/env var +- 📝 Documentation README.md mise à jour avec exemples d'utilisation + ### Version 1.6 (2025-11-23) - ✅ **Scripts d'automatisation token** créés : - `scripts/get_auth_token.py` : Extraction JWT via Playwright + stealth mode diff --git a/README.md b/README.md index dd5e6cc..d1934bb 100644 --- a/README.md +++ b/README.md @@ -240,10 +240,42 @@ tv.get_hist( fut_contract: int = None, # Futures contract (1=front, 2=next) extended_session: bool = False, # Include extended hours start_date: datetime = None, # Start date (use with end_date) - NEW in v1.4 - end_date: datetime = None # End date (use with start_date) - NEW in v1.4 + end_date: datetime = None, # End date (use with start_date) - NEW in v1.4 + timezone: str = None # Timezone for datetime index - NEW in v1.5 ) -> pd.DataFrame ``` +#### Timezone Support (NEW in v1.5) + +By default, timestamps are returned in your local system timezone. You can specify a timezone to get consistent datetime values: + +```python +# Get data in UTC (recommended for cross-instrument analysis) +df = tv.get_hist('BTCUSDT', 'BINANCE', Interval.in_1_hour, n_bars=100, timezone='UTC') + +# Get data in US Eastern time +df = tv.get_hist('AAPL', 'NASDAQ', Interval.in_daily, n_bars=50, timezone='America/New_York') + +# Common timezones: +# - 'UTC': Coordinated Universal Time +# - 'America/New_York': US Eastern (EST/EDT) +# - 'America/Chicago': US Central (CST/CDT) +# - 'Europe/London': UK (GMT/BST) +# - 'Europe/Paris': Central European (CET/CEST) +# - 'Asia/Tokyo': Japan Standard Time +# - 'Asia/Hong_Kong': Hong Kong Time +``` + +You can also set the default timezone via environment variable: +```bash +export TV_TIMEZONE=UTC +``` + +The timezone is stored in DataFrame metadata: +```python +print(df.attrs.get('timezone')) # Output: 'UTC' +``` + ### Live Data Feed For real-time data monitoring with callbacks: diff --git a/tests/unit/test_main.py b/tests/unit/test_main.py index e38a8bc..a00cb09 100644 --- a/tests/unit/test_main.py +++ b/tests/unit/test_main.py @@ -1,1308 +1,1542 @@ -""" -Unit tests for TvDatafeed main class -""" -import pytest -from unittest.mock import Mock, patch, MagicMock -from tvDatafeed import TvDatafeed, Interval, CaptchaRequiredError, AuthenticationError -import pandas as pd - - -@pytest.mark.unit -class TestTvDatafeed: - """Test TvDatafeed class""" - - def test_tvdatafeed_creation_no_auth(self): - """Test creating TvDatafeed without authentication""" - with patch('tvDatafeed.main.requests.post') as mock_post: - tv = TvDatafeed() - - assert tv.token == "unauthorized_user_token" - # Should not call auth endpoint when no credentials - mock_post.assert_not_called() - - def test_tvdatafeed_creation_with_auth(self, mock_auth_response): - """Test creating TvDatafeed with authentication""" - with patch('tvDatafeed.main.requests.Session') as mock_session_class: - mock_session = MagicMock() - mock_session.post.return_value = mock_auth_response - mock_session_class.return_value = mock_session - - tv = TvDatafeed(username='testuser', password='testpass') - - assert tv.token == 'test_token_12345' - - def test_auth_failure(self): - """Test authentication failure""" - mock_response = Mock() - mock_response.status_code = 200 - mock_response.json.return_value = { - 'error': 'Invalid credentials', - 'code': 'invalid_credentials' - } - - with patch('tvDatafeed.main.requests.Session') as mock_session_class: - mock_session = MagicMock() - mock_session.post.return_value = mock_response - mock_session_class.return_value = mock_session - - with pytest.raises(AuthenticationError): - TvDatafeed(username='testuser', password='wrongpass') - - def test_session_generation(self): - """Test session ID generation""" - tv = TvDatafeed() - - assert tv.session.startswith('qs_') - assert len(tv.session) == 15 # 'qs_' + 12 chars - - def test_chart_session_generation(self): - """Test chart session ID generation""" - tv = TvDatafeed() - - assert tv.chart_session.startswith('cs_') - assert len(tv.chart_session) == 15 # 'cs_' + 12 chars - - def test_format_symbol_simple(self): - """Test symbol formatting without contract""" - tv = TvDatafeed() - - formatted = tv._TvDatafeed__format_symbol('BTCUSDT', 'BINANCE') - - assert formatted == 'BINANCE:BTCUSDT' - - def test_format_symbol_with_contract(self): - """Test symbol formatting with futures contract""" - tv = TvDatafeed() - - formatted = tv._TvDatafeed__format_symbol('NIFTY', 'NSE', contract=1) - - assert formatted == 'NSE:NIFTY1!' - - def test_format_symbol_already_formatted(self): - """Test symbol that's already formatted""" - tv = TvDatafeed() - - formatted = tv._TvDatafeed__format_symbol('BINANCE:BTCUSDT', 'BINANCE') - - assert formatted == 'BINANCE:BTCUSDT' - - def test_format_symbol_already_formatted_different_exchange(self): - """Test symbol that's already formatted with different exchange parameter""" - tv = TvDatafeed() - - # Symbol has BINANCE but we pass NYSE - should use symbol's exchange - formatted = tv._TvDatafeed__format_symbol('BINANCE:BTCUSDT', 'NYSE') - - assert formatted == 'BINANCE:BTCUSDT' # Keeps BINANCE from symbol - - def test_format_symbol_invalid_contract(self): - """Test symbol formatting with invalid contract type""" - tv = TvDatafeed() - - with pytest.raises(ValueError, match="not a valid contract"): - tv._TvDatafeed__format_symbol('NIFTY', 'NSE', contract='invalid') - - @patch('tvDatafeed.main.create_connection') - def test_create_connection(self, mock_create_connection): - """Test WebSocket connection creation""" - mock_ws = Mock() - mock_create_connection.return_value = mock_ws - - tv = TvDatafeed() - tv._TvDatafeed__create_connection() - - assert tv.ws == mock_ws - mock_create_connection.assert_called_once() - - @patch('tvDatafeed.main.create_connection') - def test_search_symbol(self, mock_create_connection, sample_symbol_search_response): - """Test symbol search""" - with patch('tvDatafeed.main.requests.get') as mock_get: - mock_response = Mock() - mock_response.text = str(sample_symbol_search_response) - mock_get.return_value = mock_response - - tv = TvDatafeed() - results = tv.search_symbol('BTC', 'BINANCE') - - assert isinstance(results, list) - mock_get.assert_called_once() - - @patch('tvDatafeed.main.create_connection') - def test_search_symbol_error(self, mock_create_connection): - """Test symbol search with error""" - with patch('tvDatafeed.main.requests.get') as mock_get: - mock_get.side_effect = Exception("Network error") - - tv = TvDatafeed() - results = tv.search_symbol('BTC', 'BINANCE') - - assert results == [] - - def test_create_df_valid_data(self): - """Test DataFrame creation from valid WebSocket data""" - tv = TvDatafeed() - - # Use a single-line format that matches what __create_df expects - # The regex expects: "s":[{"i":0,"v":[timestamp,o,h,l,c,v]}] - raw_data = '{"s":[{"i":0,"v":[1609459200,29000.0,29500.0,28500.0,29200.0,15000000.0]}]}' - - df = tv._TvDatafeed__create_df(raw_data, 'BTCUSDT') - - assert df is not None - assert isinstance(df, pd.DataFrame) - assert 'symbol' in df.columns - assert df['symbol'].iloc[0] == 'BTCUSDT' - assert 'open' in df.columns - assert 'high' in df.columns - assert 'low' in df.columns - assert 'close' in df.columns - assert 'volume' in df.columns - - def test_create_df_invalid_data(self): - """Test DataFrame creation from invalid data""" - tv = TvDatafeed() - - df = tv._TvDatafeed__create_df('invalid data', 'BTCUSDT') - - assert df is None - - def test_create_df_no_volume(self): - """Test DataFrame creation when volume data is missing""" - # Create data without volume (5 values instead of 6) - # Format: {"s":[{"i":0,"v":[timestamp,o,h,l,c]}]} - raw_data = '{"s":[{"i":0,"v":[1609459200,29000,29500,28500,29200]}]}' - - tv = TvDatafeed() - df = tv._TvDatafeed__create_df(raw_data, 'BTCUSDT') - - # Should still create df with volume=0.0 - assert df is not None - assert 'volume' in df.columns - assert df['volume'].iloc[0] == 0.0 - - def test_auth_captcha_required(self): - """Test authentication when CAPTCHA is required""" - mock_response = Mock() - mock_response.status_code = 200 - mock_response.json.return_value = { - 'error': 'Please confirm that you are not a robot by clicking the captcha box', - 'code': 'recaptcha_required' - } - - with patch('tvDatafeed.main.requests.Session') as mock_session_class: - mock_session = MagicMock() - mock_session.post.return_value = mock_response - mock_session_class.return_value = mock_session - - with pytest.raises(CaptchaRequiredError) as exc_info: - TvDatafeed(username='testuser', password='testpass') - - # Check that the exception contains the username - assert exc_info.value.username == 'testuser' - # Check that the error message mentions workaround - assert 'CAPTCHA' in str(exc_info.value) - assert 'authToken' in str(exc_info.value) - - def test_auth_token_direct_usage(self): - """Test using pre-obtained auth token directly""" - tv = TvDatafeed(auth_token='test_token_direct') - - assert tv.token == 'test_token_direct' - - def test_auth_token_priority_over_credentials(self): - """Test that auth_token takes priority over username/password""" - mock_response = Mock() - mock_response.status_code = 200 - mock_response.json.return_value = { - 'user': {'auth_token': 'should_not_be_used'} - } - - with patch('tvDatafeed.main.requests.post', return_value=mock_response) as mock_post: - tv = TvDatafeed( - username='testuser', - password='testpass', - auth_token='token_from_browser' - ) - - # Should use the provided token - assert tv.token == 'token_from_browser' - # Should not call auth endpoint - mock_post.assert_not_called() - - def test_auth_generic_error(self): - """Test authentication with generic error""" - mock_response = Mock() - mock_response.status_code = 200 - mock_response.json.return_value = { - 'error': 'Invalid credentials', - 'code': 'invalid_credentials' - } - - with patch('tvDatafeed.main.requests.Session') as mock_session_class: - mock_session = MagicMock() - mock_session.post.return_value = mock_response - mock_session_class.return_value = mock_session - - with pytest.raises(AuthenticationError) as exc_info: - TvDatafeed(username='testuser', password='wrongpass') - - # Should raise AuthenticationError, not CaptchaRequiredError - assert not isinstance(exc_info.value, CaptchaRequiredError) - assert 'Invalid credentials' in str(exc_info.value) - - def test_ws_timeout_default(self): - """Test that default WebSocket timeout is used""" - tv = TvDatafeed() - - # Default timeout is either 5s (hardcoded) or 30s (from NetworkConfig) - # depending on whether NetworkConfig is available - assert tv.ws_timeout in [5.0, 30.0] - # If NetworkConfig is available, it should use its recv_timeout (30.0) - # Otherwise, it falls back to __ws_timeout (5.0) - - def test_ws_timeout_custom_parameter(self): - """Test custom WebSocket timeout via parameter""" - tv = TvDatafeed(ws_timeout=30.0) - - assert tv.ws_timeout == 30.0 - - def test_ws_timeout_no_timeout(self): - """Test WebSocket with no timeout (-1)""" - tv = TvDatafeed(ws_timeout=-1) - - assert tv.ws_timeout == -1.0 - - def test_ws_timeout_from_env_variable(self): - """Test WebSocket timeout from environment variable""" - import os - - # Set environment variable - os.environ['TV_WS_TIMEOUT'] = '45.0' - - try: - tv = TvDatafeed() - assert tv.ws_timeout == 45.0 - finally: - # Clean up - os.environ.pop('TV_WS_TIMEOUT', None) - - def test_ws_timeout_parameter_overrides_env(self): - """Test that parameter overrides environment variable""" - import os - - os.environ['TV_WS_TIMEOUT'] = '45.0' - - try: - tv = TvDatafeed(ws_timeout=20.0) - # Parameter should take priority - assert tv.ws_timeout == 20.0 - finally: - os.environ.pop('TV_WS_TIMEOUT', None) - - def test_ws_timeout_invalid_env_variable(self): - """Test handling of invalid environment variable""" - import os - - os.environ['TV_WS_TIMEOUT'] = 'invalid' - - try: - tv = TvDatafeed() - # Should fall back to default - assert tv.ws_timeout == 5.0 - finally: - os.environ.pop('TV_WS_TIMEOUT', None) - - def test_format_search_results_empty(self): - """Test formatting empty search results""" - tv = TvDatafeed() - - formatted = tv.format_search_results([]) - - assert 'No results found' in formatted - - def test_format_search_results_single(self): - """Test formatting single search result""" - tv = TvDatafeed() - - results = [ - { - 'exchange': 'BINANCE', - 'symbol': 'BTCUSDT', - 'description': 'Bitcoin / TetherUS', - 'type': 'crypto' - } - ] - - formatted = tv.format_search_results(results) - - assert 'BINANCE:BTCUSDT' in formatted - assert 'Bitcoin / TetherUS' in formatted - assert 'crypto' in formatted - assert 'Found 1 results' in formatted - - def test_format_search_results_multiple(self): - """Test formatting multiple search results""" - tv = TvDatafeed() - - results = [ - { - 'exchange': 'BINANCE', - 'symbol': 'BTCUSDT', - 'description': 'Bitcoin / TetherUS', - 'type': 'crypto' - }, - { - 'exchange': 'COINBASE', - 'symbol': 'BTCUSD', - 'description': 'Bitcoin / US Dollar', - 'type': 'crypto' - }, - { - 'exchange': 'KRAKEN', - 'symbol': 'BTCEUR', - 'description': 'Bitcoin / Euro', - 'type': 'crypto' - } - ] - - formatted = tv.format_search_results(results, max_results=2) - - # Should only show first 2 - assert 'BINANCE:BTCUSDT' in formatted - assert 'COINBASE:BTCUSD' in formatted - # Third should not be included - assert 'KRAKEN:BTCEUR' not in formatted - assert 'showing first 2' in formatted - - def test_format_search_results_includes_usage(self): - """Test that formatted results include usage examples""" - tv = TvDatafeed() - - results = [ - { - 'exchange': 'BINANCE', - 'symbol': 'BTCUSDT', - 'description': 'Bitcoin / TetherUS', - 'type': 'crypto' - } - ] - - formatted = tv.format_search_results(results) - - assert 'Usage:' in formatted - assert 'tv.get_hist' in formatted - assert 'Example:' in formatted - - def test_verbose_mode_default(self): - """Test that verbose mode is enabled by default""" - tv = TvDatafeed() - - assert tv.verbose is True - - def test_verbose_mode_disabled(self): - """Test that verbose mode can be disabled""" - tv = TvDatafeed(verbose=False) - - assert tv.verbose is False - - def test_verbose_mode_enabled_explicit(self): - """Test that verbose mode can be explicitly enabled""" - tv = TvDatafeed(verbose=True) - - assert tv.verbose is True - - def test_verbose_mode_from_env_true(self): - """Test verbose mode from environment variable (true)""" - import os - - # Test various true values - true_values = ['true', 'True', 'TRUE', '1', 'yes', 'Yes', 'on', 'On'] - - for value in true_values: - os.environ['TV_VERBOSE'] = value - - try: - tv = TvDatafeed() - assert tv.verbose is True, f"Failed for TV_VERBOSE={value}" - finally: - os.environ.pop('TV_VERBOSE', None) - - def test_verbose_mode_from_env_false(self): - """Test verbose mode from environment variable (false)""" - import os - - # Test various false values - false_values = ['false', 'False', 'FALSE', '0', 'no', 'No', 'off', 'Off'] - - for value in false_values: - os.environ['TV_VERBOSE'] = value - - try: - tv = TvDatafeed() - assert tv.verbose is False, f"Failed for TV_VERBOSE={value}" - finally: - os.environ.pop('TV_VERBOSE', None) - - def test_verbose_mode_parameter_overrides_env_true_to_false(self): - """Test that parameter overrides environment variable (env=true, param=false)""" - import os - - os.environ['TV_VERBOSE'] = 'true' - - try: - tv = TvDatafeed(verbose=False) - # Parameter should take priority - assert tv.verbose is False - finally: - os.environ.pop('TV_VERBOSE', None) - - def test_verbose_mode_parameter_overrides_env_false_to_true(self): - """Test that parameter overrides environment variable (env=false, param=true)""" - import os - - os.environ['TV_VERBOSE'] = 'false' - - try: - tv = TvDatafeed(verbose=True) - # Parameter should take priority - assert tv.verbose is True - finally: - os.environ.pop('TV_VERBOSE', None) - - def test_verbose_mode_logging_level_verbose(self): - """Test that verbose mode sets appropriate logging level""" - import logging - from tvDatafeed.main import logger - - # Save original level - original_level = logger.level - - try: - tv = TvDatafeed(verbose=True) - - # Logger should be set to INFO or lower - assert logger.level <= logging.INFO - finally: - # Restore original level - logger.setLevel(original_level) - - def test_verbose_mode_logging_level_quiet(self): - """Test that quiet mode sets appropriate logging level""" - import logging - from tvDatafeed.main import logger - - # Save original level - original_level = logger.level - - try: - tv = TvDatafeed(verbose=False) - - # Logger should be set to WARNING - assert logger.level == logging.WARNING - finally: - # Restore original level - logger.setLevel(original_level) - - def test_verbose_mode_env_not_set(self): - """Test behavior when TV_VERBOSE is not set""" - import os - - # Ensure TV_VERBOSE is not set - os.environ.pop('TV_VERBOSE', None) - - tv = TvDatafeed() - - # Should default to True - assert tv.verbose is True - - def test_verbose_mode_env_invalid_value(self): - """Test behavior when TV_VERBOSE has invalid value""" - import os - - os.environ['TV_VERBOSE'] = 'invalid_value' - - try: - tv = TvDatafeed() - - # Invalid value should be treated as False - # (not in ['true', '1', 'yes', 'on']) - assert tv.verbose is False - finally: - os.environ.pop('TV_VERBOSE', None) - - -@pytest.mark.unit -class TestInterval: - """Test Interval enum""" - - def test_interval_values(self): - """Test that intervals have correct values""" - assert Interval.in_1_minute.value == "1" - assert Interval.in_3_minute.value == "3" - assert Interval.in_5_minute.value == "5" - assert Interval.in_15_minute.value == "15" - assert Interval.in_30_minute.value == "30" - assert Interval.in_45_minute.value == "45" - assert Interval.in_1_hour.value == "1H" - assert Interval.in_2_hour.value == "2H" - assert Interval.in_3_hour.value == "3H" - assert Interval.in_4_hour.value == "4H" - assert Interval.in_daily.value == "1D" - assert Interval.in_weekly.value == "1W" - assert Interval.in_monthly.value == "1M" - - def test_interval_names(self): - """Test interval names""" - assert Interval.in_1_hour.name == "in_1_hour" - assert Interval.in_daily.name == "in_daily" - - -@pytest.mark.unit -class TestDateRangeFeature: - """Test date range search feature (PR #69)""" - - def test_is_valid_date_range_valid(self): - """Test is_valid_date_range with valid date range""" - from datetime import datetime - start = datetime(2024, 1, 1) - end = datetime(2024, 1, 31) - - result = TvDatafeed.is_valid_date_range(start, end) - - assert result is True - - def test_is_valid_date_range_start_after_end(self): - """Test is_valid_date_range when start is after end""" - from datetime import datetime - start = datetime(2024, 1, 31) - end = datetime(2024, 1, 1) - - result = TvDatafeed.is_valid_date_range(start, end) - - assert result is False - - def test_is_valid_date_range_start_equals_end(self): - """Test is_valid_date_range when start equals end""" - from datetime import datetime - start = datetime(2024, 1, 15) - end = datetime(2024, 1, 15) - - result = TvDatafeed.is_valid_date_range(start, end) - - assert result is False - - def test_is_valid_date_range_future_start(self): - """Test is_valid_date_range with future start date""" - from datetime import datetime, timedelta - future = datetime.now() + timedelta(days=30) - end = datetime.now() + timedelta(days=60) - - result = TvDatafeed.is_valid_date_range(future, end) - - assert result is False - - def test_is_valid_date_range_future_end(self): - """Test is_valid_date_range with future end date""" - from datetime import datetime, timedelta - start = datetime(2024, 1, 1) - end = datetime.now() + timedelta(days=30) - - result = TvDatafeed.is_valid_date_range(start, end) - - assert result is False - - def test_is_valid_date_range_before_2000(self): - """Test is_valid_date_range with dates before 2000""" - from datetime import datetime - start = datetime(1999, 1, 1) - end = datetime(1999, 12, 31) - - result = TvDatafeed.is_valid_date_range(start, end) - - assert result is False - - def test_is_valid_date_range_start_before_2000(self): - """Test is_valid_date_range with start before 2000""" - from datetime import datetime - start = datetime(1999, 12, 31) - end = datetime(2024, 1, 1) - - result = TvDatafeed.is_valid_date_range(start, end) - - assert result is False - - def test_get_hist_mutually_exclusive_error(self): - """Test get_hist raises error when both n_bars and date range provided""" - from datetime import datetime - from tvDatafeed import DataValidationError - - tv = TvDatafeed() - - with pytest.raises(DataValidationError) as exc_info: - tv.get_hist( - 'BTCUSDT', - 'BINANCE', - Interval.in_1_hour, - n_bars=100, - start_date=datetime(2024, 1, 1), - end_date=datetime(2024, 1, 31) - ) - - assert 'mutually exclusive' in str(exc_info.value).lower() - - def test_get_hist_only_start_date_error(self): - """Test get_hist raises error when only start_date provided""" - from datetime import datetime - from tvDatafeed import DataValidationError - - tv = TvDatafeed() - - with pytest.raises(DataValidationError) as exc_info: - tv.get_hist( - 'BTCUSDT', - 'BINANCE', - Interval.in_1_hour, - start_date=datetime(2024, 1, 1) - ) - - assert 'both' in str(exc_info.value).lower() - - def test_get_hist_only_end_date_error(self): - """Test get_hist raises error when only end_date provided""" - from datetime import datetime - from tvDatafeed import DataValidationError - - tv = TvDatafeed() - - with pytest.raises(DataValidationError) as exc_info: - tv.get_hist( - 'BTCUSDT', - 'BINANCE', - Interval.in_1_hour, - end_date=datetime(2024, 1, 31) - ) - - assert 'both' in str(exc_info.value).lower() - - def test_get_hist_invalid_date_range_error(self): - """Test get_hist raises error for invalid date range""" - from datetime import datetime - from tvDatafeed import DataValidationError - - tv = TvDatafeed() - - # Start after end - with pytest.raises(DataValidationError) as exc_info: - tv.get_hist( - 'BTCUSDT', - 'BINANCE', - Interval.in_1_hour, - start_date=datetime(2024, 1, 31), - end_date=datetime(2024, 1, 1) - ) - - assert 'invalid date range' in str(exc_info.value).lower() - - def test_get_hist_defaults_to_n_bars_10(self): - """Test get_hist defaults to n_bars=10 when neither n_bars nor dates provided""" - from unittest.mock import patch, MagicMock - - with patch('tvDatafeed.main.create_connection') as mock_create_connection: - mock_ws = MagicMock() - mock_ws.recv.side_effect = [ - '~m~123~m~{"m":"series_completed"}', - ] - mock_create_connection.return_value = mock_ws - - tv = TvDatafeed() - - # Mock __create_df to avoid parsing issues - with patch.object(tv, '_TvDatafeed__create_df') as mock_create_df: - mock_df = pd.DataFrame({ - 'symbol': ['BTCUSDT'] * 10, - 'open': [29000] * 10, - 'high': [29500] * 10, - 'low': [28500] * 10, - 'close': [29200] * 10, - 'volume': [1000] * 10 - }) - mock_create_df.return_value = mock_df - - try: - df = tv.get_hist('BTCUSDT', 'BINANCE', Interval.in_1_hour) - # Should not raise error and should have data - assert df is not None - except Exception: - # This is expected since we're mocking, just verify no validation error - pass - - def test_interval_len_dictionary_exists(self): - """Test that interval_len dictionary is defined with all intervals""" - from tvDatafeed.main import interval_len - - # Check dictionary exists - assert interval_len is not None - assert isinstance(interval_len, dict) - - # Check all intervals are present - expected_intervals = ["1", "3", "5", "15", "30", "45", "1H", "2H", "3H", "4H", "1D", "1W", "1M"] - for interval in expected_intervals: - assert interval in interval_len - assert isinstance(interval_len[interval], int) - assert interval_len[interval] > 0 - - def test_interval_len_values_correct(self): - """Test that interval_len values are correct""" - from tvDatafeed.main import interval_len - - assert interval_len["1"] == 60 # 1 minute - assert interval_len["5"] == 300 # 5 minutes - assert interval_len["1H"] == 3600 # 1 hour - assert interval_len["1D"] == 86400 # 1 day - assert interval_len["1W"] == 604800 # 1 week - assert interval_len["1M"] == 2592000 # 1 month (30 days) - - def test_create_df_with_timezone(self): - """Test __create_df accepts timezone parameter""" - tv = TvDatafeed() - raw_data = '''{"s":[{"v":[1609459200,29000,29500,28500,29200,1000000]}]}''' - - df = tv._TvDatafeed__create_df(raw_data, 'BTCUSDT', 3600, 'America/New_York') - - assert df is not None - assert 'timezone' in df.attrs - assert df.attrs['timezone'] == 'America/New_York' - - def test_create_df_without_timezone_backward_compatible(self): - """Test __create_df still works without timezone (backward compatibility)""" - tv = TvDatafeed() - raw_data = '''{"s":[{"v":[1609459200,29000,29500,28500,29200,1000000]}]}''' - - df = tv._TvDatafeed__create_df(raw_data, 'BTCUSDT') - - assert df is not None - assert 'timezone' not in df.attrs or df.attrs.get('timezone') is None - - -@pytest.mark.unit -class TestTwoFactorAuthentication: - """Test Two-Factor Authentication (2FA/TOTP) support""" - - def test_2fa_params_stored(self, valid_totp_secret): - """Test that 2FA parameters are stored correctly""" - tv = TvDatafeed(totp_secret=valid_totp_secret, totp_code='123456') - - assert tv._totp_secret == valid_totp_secret - assert tv._totp_code == '123456' - - def test_2fa_params_from_env(self, monkeypatch, valid_totp_secret): - """Test that 2FA parameters are read from environment""" - monkeypatch.setenv('TV_TOTP_SECRET', valid_totp_secret) - monkeypatch.setenv('TV_2FA_CODE', '654321') - - tv = TvDatafeed() - - assert tv._totp_secret == valid_totp_secret - assert tv._totp_code == '654321' - - def test_2fa_params_priority_over_env(self, monkeypatch, valid_totp_secret): - """Test that parameters take priority over environment variables""" - monkeypatch.setenv('TV_TOTP_SECRET', 'ENVENVENVENV') - monkeypatch.setenv('TV_2FA_CODE', '111111') - - tv = TvDatafeed(totp_secret=valid_totp_secret, totp_code='999999') - - # Parameters should take priority - assert tv._totp_secret == valid_totp_secret - assert tv._totp_code == '999999' - - def test_get_totp_code_manual(self): - """Test _get_totp_code returns manual code when provided""" - tv = TvDatafeed(totp_code='123456') - - code = tv._get_totp_code() - - assert code == '123456' - - def test_get_totp_code_generated(self, valid_totp_secret): - """Test _get_totp_code generates code from secret""" - tv = TvDatafeed(totp_secret=valid_totp_secret) - - code = tv._get_totp_code() - - # Should be a 6-digit code - assert code is not None - assert len(code) == 6 - assert code.isdigit() - - def test_get_totp_code_none_when_not_configured(self): - """Test _get_totp_code returns None when neither secret nor code provided""" - tv = TvDatafeed() - - code = tv._get_totp_code() - - assert code is None - - def test_get_totp_code_manual_priority(self, valid_totp_secret): - """Test that manual code takes priority over secret""" - tv = TvDatafeed(totp_secret=valid_totp_secret, totp_code='111111') - - code = tv._get_totp_code() - - # Manual code should be returned, not generated one - assert code == '111111' - - def test_get_totp_code_invalid_secret(self): - """Test _get_totp_code raises error for invalid secret""" - from tvDatafeed import ConfigurationError - - tv = TvDatafeed(totp_secret='INVALID!!!') - - with pytest.raises(ConfigurationError) as exc_info: - tv._get_totp_code() - - assert 'Invalid TOTP secret' in str(exc_info.value) - - def test_2fa_flow_success( - self, - mock_2fa_required_response, - mock_2fa_success_response, - valid_totp_secret - ): - """Test successful 2FA authentication flow""" - with patch('tvDatafeed.main.requests.Session') as mock_session_class: - mock_session = MagicMock() - # First call returns 2FA required, second call returns success - mock_session.post.side_effect = [ - mock_2fa_required_response, - mock_2fa_success_response - ] - mock_session_class.return_value = mock_session - - tv = TvDatafeed( - username='testuser', - password='testpass', - totp_secret=valid_totp_secret - ) - - assert tv.token == 'test_token_2fa_12345' - assert mock_session.post.call_count == 2 - - def test_2fa_flow_with_manual_code( - self, - mock_2fa_required_response, - mock_2fa_success_response - ): - """Test 2FA authentication flow with manual code""" - with patch('tvDatafeed.main.requests.Session') as mock_session_class: - mock_session = MagicMock() - mock_session.post.side_effect = [ - mock_2fa_required_response, - mock_2fa_success_response - ] - mock_session_class.return_value = mock_session - - tv = TvDatafeed( - username='testuser', - password='testpass', - totp_code='123456' - ) - - assert tv.token == 'test_token_2fa_12345' - # Verify 2FA endpoint was called with the code - second_call = mock_session.post.call_args_list[1] - assert second_call[1]['data']['code'] == '123456' - - def test_2fa_required_but_not_provided(self, mock_2fa_required_response): - """Test that TwoFactorRequiredError is raised when 2FA is required but not provided""" - from tvDatafeed import TwoFactorRequiredError - - with patch('tvDatafeed.main.requests.Session') as mock_session_class: - mock_session = MagicMock() - mock_session.post.return_value = mock_2fa_required_response - mock_session_class.return_value = mock_session - - with pytest.raises(TwoFactorRequiredError) as exc_info: - TvDatafeed(username='testuser', password='testpass') - - assert 'Two-factor authentication required' in str(exc_info.value) - - def test_2fa_invalid_code( - self, - mock_2fa_required_response, - mock_2fa_invalid_code_response, - valid_totp_secret - ): - """Test 2FA with invalid code raises AuthenticationError""" - with patch('tvDatafeed.main.requests.Session') as mock_session_class: - mock_session = MagicMock() - mock_session.post.side_effect = [ - mock_2fa_required_response, - mock_2fa_invalid_code_response - ] - mock_session_class.return_value = mock_session - - with pytest.raises(AuthenticationError) as exc_info: - TvDatafeed( - username='testuser', - password='testpass', - totp_code='000000' - ) - - assert 'Invalid' in str(exc_info.value) or 'invalid' in str(exc_info.value) - - def test_2fa_totp_secret_cleaned(self, valid_totp_secret): - """Test that TOTP secret is cleaned (spaces removed, uppercase)""" - # Add spaces and lowercase - messy_secret = 'jbsw y3dp ehpk 3pxp' - - tv = TvDatafeed(totp_secret=messy_secret) - - # Should still generate a valid code - code = tv._get_totp_code() - assert code is not None - assert len(code) == 6 - assert code.isdigit() - - def test_2fa_not_required(self, mock_auth_response): - """Test authentication without 2FA when account doesn't have it enabled""" - with patch('tvDatafeed.main.requests.Session') as mock_session_class: - mock_session = MagicMock() - mock_session.post.return_value = mock_auth_response - mock_session_class.return_value = mock_session - - # Should authenticate without 2FA - tv = TvDatafeed(username='testuser', password='testpass') - - assert tv.token == 'test_token_12345' - # Only one call (no 2FA needed) - assert mock_session.post.call_count == 1 - - def test_docstring_examples_present(self): - """Verify that 2FA examples are in the TvDatafeed docstring""" - docstring = TvDatafeed.__init__.__doc__ - - assert 'totp_secret' in docstring - assert 'totp_code' in docstring - assert '2FA' in docstring - assert 'TV_TOTP_SECRET' in docstring - - def test_2fa_url_configured(self): - """Test that 2FA URL is correctly configured""" - assert hasattr(TvDatafeed, '_TvDatafeed__2fa_url') - assert 'two-factor' in TvDatafeed._TvDatafeed__2fa_url - - -@pytest.mark.unit -class TestWebSocketRetryAndTimeout: - """Test WebSocket retry and cumulative timeout features (Phase 2)""" - - # ========================================================================== - # Tests for __create_connection() retry behavior - # ========================================================================== - - @patch('tvDatafeed.main.retry_with_backoff') - def test_create_connection_retry_on_timeout(self, mock_retry_with_backoff): - """Test that connection is retried after a timeout""" - mock_ws = Mock() - mock_retry_with_backoff.return_value = mock_ws - - tv = TvDatafeed() - tv._TvDatafeed__create_connection() - - # Verify retry_with_backoff was called with correct parameters - mock_retry_with_backoff.assert_called_once() - call_kwargs = mock_retry_with_backoff.call_args[1] - - # Check that TimeoutError is in the exceptions to catch - assert TimeoutError in call_kwargs['exceptions'] - assert call_kwargs['max_retries'] == 3 # Default max retries - assert call_kwargs['base_delay'] == 2.0 # Default base delay - assert call_kwargs['max_delay'] == 10.0 # Default max delay - - @patch('tvDatafeed.main.retry_with_backoff') - def test_create_connection_retry_on_connection_error(self, mock_retry_with_backoff): - """Test that connection is retried after a connection error""" - mock_ws = Mock() - mock_retry_with_backoff.return_value = mock_ws - - tv = TvDatafeed() - tv._TvDatafeed__create_connection() - - # Verify retry_with_backoff was called - mock_retry_with_backoff.assert_called_once() - call_kwargs = mock_retry_with_backoff.call_args[1] - - # Check that ConnectionError and OSError are in the exceptions to catch - assert ConnectionError in call_kwargs['exceptions'] - assert OSError in call_kwargs['exceptions'] - - @patch('tvDatafeed.main.retry_with_backoff') - def test_create_connection_max_retries_exceeded_timeout(self, mock_retry_with_backoff): - """Test that WebSocketTimeoutError is raised after all timeout retries fail""" - from tvDatafeed import WebSocketTimeoutError - - # Simulate retry_with_backoff raising TimeoutError after all retries - mock_retry_with_backoff.side_effect = TimeoutError("Connection timed out") - - tv = TvDatafeed() - - with pytest.raises(WebSocketTimeoutError) as exc_info: - tv._TvDatafeed__create_connection() - - assert 'timed out' in str(exc_info.value).lower() - assert '3 retry attempts' in str(exc_info.value) - - @patch('tvDatafeed.main.retry_with_backoff') - def test_create_connection_max_retries_exceeded_connection_error(self, mock_retry_with_backoff): - """Test that WebSocketError is raised after all connection retries fail""" - from tvDatafeed import WebSocketError - - # Simulate retry_with_backoff raising ConnectionError after all retries - mock_retry_with_backoff.side_effect = ConnectionError("Connection refused") - - tv = TvDatafeed() - - with pytest.raises(WebSocketError) as exc_info: - tv._TvDatafeed__create_connection() - - assert 'Failed to connect' in str(exc_info.value) - assert '3 retry attempts' in str(exc_info.value) - - @patch('tvDatafeed.main.create_connection') - @patch('tvDatafeed.main.retry_with_backoff') - def test_create_connection_success_after_retry(self, mock_retry_with_backoff, mock_create_connection): - """Test that connection succeeds after initial failures (simulated by retry_with_backoff)""" - mock_ws = Mock() - # retry_with_backoff successfully returns after internal retries - mock_retry_with_backoff.return_value = mock_ws - - tv = TvDatafeed() - tv._TvDatafeed__create_connection() - - # WebSocket should be set - assert tv.ws == mock_ws - # retry_with_backoff should have been called - mock_retry_with_backoff.assert_called_once() - - @patch('tvDatafeed.main.retry_with_backoff') - def test_create_connection_on_retry_callback_called(self, mock_retry_with_backoff): - """Test that on_retry callback is passed to retry_with_backoff""" - mock_ws = Mock() - mock_retry_with_backoff.return_value = mock_ws - - tv = TvDatafeed() - tv._TvDatafeed__create_connection() - - # Verify on_retry callback was passed - call_kwargs = mock_retry_with_backoff.call_args[1] - assert 'on_retry' in call_kwargs - assert call_kwargs['on_retry'] is not None - assert callable(call_kwargs['on_retry']) - - @patch('tvDatafeed.main.retry_with_backoff') - def test_create_connection_unexpected_error(self, mock_retry_with_backoff): - """Test that unexpected errors are wrapped in WebSocketError""" - from tvDatafeed import WebSocketError - - # Simulate an unexpected error - mock_retry_with_backoff.side_effect = RuntimeError("Unexpected error") - - tv = TvDatafeed() - - with pytest.raises(WebSocketError) as exc_info: - tv._TvDatafeed__create_connection() - - assert 'RuntimeError' in str(exc_info.value) - assert 'Unexpected error' in str(exc_info.value) - - # ========================================================================== - # Tests for __get_response() cumulative timeout behavior - # ========================================================================== - - def test_get_response_cumulative_timeout(self): - """Test that WebSocketTimeoutError is raised after cumulative timeout""" - from tvDatafeed import WebSocketTimeoutError - import time - - tv = TvDatafeed() - # Set a very short max_response_time for testing - tv.max_response_time = 0.1 # 100ms - - # Mock the WebSocket to return data slowly (never completing) - mock_ws = Mock() - # Return data that doesn't contain "series_completed" - mock_ws.recv.return_value = '~m~100~m~{"m":"some_message","p":[]}' - tv.ws = mock_ws - - with pytest.raises(WebSocketTimeoutError) as exc_info: - tv._TvDatafeed__get_response() - - error_msg = str(exc_info.value) - assert 'Timed out' in error_msg - assert 'series_completed' in error_msg - assert 'TV_MAX_RESPONSE_TIME' in error_msg - - def test_get_response_env_var_timeout(self): - """Test that TV_MAX_RESPONSE_TIME environment variable is respected""" - import os - - # Set environment variable - os.environ['TV_MAX_RESPONSE_TIME'] = '120.0' - - try: - tv = TvDatafeed() - assert tv.max_response_time == 120.0 - finally: - os.environ.pop('TV_MAX_RESPONSE_TIME', None) - - def test_get_response_env_var_timeout_invalid(self): - """Test that invalid TV_MAX_RESPONSE_TIME falls back to default""" - import os - - os.environ['TV_MAX_RESPONSE_TIME'] = 'invalid' - - try: - tv = TvDatafeed() - # Should fall back to default (60.0) - assert tv.max_response_time == 60.0 - finally: - os.environ.pop('TV_MAX_RESPONSE_TIME', None) - - def test_get_response_env_var_timeout_negative(self): - """Test that negative TV_MAX_RESPONSE_TIME falls back to default""" - import os - - os.environ['TV_MAX_RESPONSE_TIME'] = '-10.0' - - try: - tv = TvDatafeed() - # Should fall back to default (60.0) - assert tv.max_response_time == 60.0 - finally: - os.environ.pop('TV_MAX_RESPONSE_TIME', None) - - def test_get_response_env_var_timeout_zero(self): - """Test that zero TV_MAX_RESPONSE_TIME falls back to default""" - import os - - os.environ['TV_MAX_RESPONSE_TIME'] = '0' - - try: - tv = TvDatafeed() - # Should fall back to default (60.0) - assert tv.max_response_time == 60.0 - finally: - os.environ.pop('TV_MAX_RESPONSE_TIME', None) - - def test_get_response_success_before_timeout(self): - """Test that normal response works before timeout""" - tv = TvDatafeed() - tv.max_response_time = 60.0 # Generous timeout - - # Mock WebSocket to return series_completed quickly - mock_ws = Mock() - mock_ws.recv.side_effect = [ - '~m~100~m~{"m":"some_message","p":[]}', - '~m~100~m~{"m":"timescale_update","p":[]}', - '~m~100~m~{"m":"series_completed","p":[]}' - ] - tv.ws = mock_ws - - # Should not raise any exception - raw_data = tv._TvDatafeed__get_response() - - assert 'series_completed' in raw_data - assert mock_ws.recv.call_count == 3 - - def test_get_response_per_message_timeout_error(self): - """Test that per-message timeout raises WebSocketTimeoutError""" - from tvDatafeed import WebSocketTimeoutError - - tv = TvDatafeed() - tv.max_response_time = 60.0 - - # Mock WebSocket to raise TimeoutError on recv - mock_ws = Mock() - mock_ws.recv.side_effect = TimeoutError("Timed out waiting for data") - tv.ws = mock_ws - - with pytest.raises(WebSocketTimeoutError) as exc_info: - tv._TvDatafeed__get_response() - - error_msg = str(exc_info.value) - assert 'Timeout' in error_msg - assert 'ws_timeout' in error_msg - - def test_get_response_generic_error(self): - """Test that generic errors during recv raise WebSocketError""" - from tvDatafeed import WebSocketError - - tv = TvDatafeed() - tv.max_response_time = 60.0 - - # Mock WebSocket to raise a generic exception - mock_ws = Mock() - mock_ws.recv.side_effect = Exception("Network error") - tv.ws = mock_ws - - with pytest.raises(WebSocketError) as exc_info: - tv._TvDatafeed__get_response() - - error_msg = str(exc_info.value) - assert 'Error receiving data' in error_msg - assert 'Network error' in error_msg - - def test_get_response_message_count_in_error(self): - """Test that error message includes message count""" - from tvDatafeed import WebSocketTimeoutError - - tv = TvDatafeed() - tv.max_response_time = 0.1 # Very short timeout - - # Mock WebSocket to return multiple messages before timeout - mock_ws = Mock() - mock_ws.recv.return_value = '~m~100~m~{"m":"some_message","p":[]}' - tv.ws = mock_ws - - with pytest.raises(WebSocketTimeoutError) as exc_info: - tv._TvDatafeed__get_response() - - error_msg = str(exc_info.value) - assert 'messages received' in error_msg - - def test_get_response_default_max_response_time(self): - """Test that default max_response_time is 60 seconds""" - import os - - # Ensure environment variable is not set - os.environ.pop('TV_MAX_RESPONSE_TIME', None) - - tv = TvDatafeed() - assert tv.max_response_time == 60.0 - - def test_max_response_time_attribute_exists(self): - """Test that max_response_time attribute is set on TvDatafeed instance""" - tv = TvDatafeed() - - assert hasattr(tv, 'max_response_time') - assert isinstance(tv.max_response_time, float) - assert tv.max_response_time > 0 - - def test_class_default_max_response_time(self): - """Test that class-level __max_response_time is defined""" - # Access the class attribute through name mangling - assert hasattr(TvDatafeed, '_TvDatafeed__max_response_time') - assert TvDatafeed._TvDatafeed__max_response_time == 60.0 - - def test_class_retry_configuration_exists(self): - """Test that class-level retry configuration attributes are defined""" - # Access class attributes through name mangling - assert hasattr(TvDatafeed, '_TvDatafeed__ws_max_retries') - assert hasattr(TvDatafeed, '_TvDatafeed__ws_retry_base_delay') - assert hasattr(TvDatafeed, '_TvDatafeed__ws_retry_max_delay') - - assert TvDatafeed._TvDatafeed__ws_max_retries == 3 - assert TvDatafeed._TvDatafeed__ws_retry_base_delay == 2.0 - assert TvDatafeed._TvDatafeed__ws_retry_max_delay == 10.0 +""" +Unit tests for TvDatafeed main class +""" +import pytest +from unittest.mock import Mock, patch, MagicMock +from tvDatafeed import TvDatafeed, Interval, CaptchaRequiredError, AuthenticationError +import pandas as pd + + +@pytest.mark.unit +class TestTvDatafeed: + """Test TvDatafeed class""" + + def test_tvdatafeed_creation_no_auth(self): + """Test creating TvDatafeed without authentication""" + with patch('tvDatafeed.main.requests.post') as mock_post: + tv = TvDatafeed() + + assert tv.token == "unauthorized_user_token" + # Should not call auth endpoint when no credentials + mock_post.assert_not_called() + + def test_tvdatafeed_creation_with_auth(self, mock_auth_response): + """Test creating TvDatafeed with authentication""" + with patch('tvDatafeed.main.requests.Session') as mock_session_class: + mock_session = MagicMock() + mock_session.post.return_value = mock_auth_response + mock_session_class.return_value = mock_session + + tv = TvDatafeed(username='testuser', password='testpass') + + assert tv.token == 'test_token_12345' + + def test_auth_failure(self): + """Test authentication failure""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'error': 'Invalid credentials', + 'code': 'invalid_credentials' + } + + with patch('tvDatafeed.main.requests.Session') as mock_session_class: + mock_session = MagicMock() + mock_session.post.return_value = mock_response + mock_session_class.return_value = mock_session + + with pytest.raises(AuthenticationError): + TvDatafeed(username='testuser', password='wrongpass') + + def test_session_generation(self): + """Test session ID generation""" + tv = TvDatafeed() + + assert tv.session.startswith('qs_') + assert len(tv.session) == 15 # 'qs_' + 12 chars + + def test_chart_session_generation(self): + """Test chart session ID generation""" + tv = TvDatafeed() + + assert tv.chart_session.startswith('cs_') + assert len(tv.chart_session) == 15 # 'cs_' + 12 chars + + def test_format_symbol_simple(self): + """Test symbol formatting without contract""" + tv = TvDatafeed() + + formatted = tv._TvDatafeed__format_symbol('BTCUSDT', 'BINANCE') + + assert formatted == 'BINANCE:BTCUSDT' + + def test_format_symbol_with_contract(self): + """Test symbol formatting with futures contract""" + tv = TvDatafeed() + + formatted = tv._TvDatafeed__format_symbol('NIFTY', 'NSE', contract=1) + + assert formatted == 'NSE:NIFTY1!' + + def test_format_symbol_already_formatted(self): + """Test symbol that's already formatted""" + tv = TvDatafeed() + + formatted = tv._TvDatafeed__format_symbol('BINANCE:BTCUSDT', 'BINANCE') + + assert formatted == 'BINANCE:BTCUSDT' + + def test_format_symbol_already_formatted_different_exchange(self): + """Test symbol that's already formatted with different exchange parameter""" + tv = TvDatafeed() + + # Symbol has BINANCE but we pass NYSE - should use symbol's exchange + formatted = tv._TvDatafeed__format_symbol('BINANCE:BTCUSDT', 'NYSE') + + assert formatted == 'BINANCE:BTCUSDT' # Keeps BINANCE from symbol + + def test_format_symbol_invalid_contract(self): + """Test symbol formatting with invalid contract type""" + tv = TvDatafeed() + + with pytest.raises(ValueError, match="not a valid contract"): + tv._TvDatafeed__format_symbol('NIFTY', 'NSE', contract='invalid') + + @patch('tvDatafeed.main.create_connection') + def test_create_connection(self, mock_create_connection): + """Test WebSocket connection creation""" + mock_ws = Mock() + mock_create_connection.return_value = mock_ws + + tv = TvDatafeed() + tv._TvDatafeed__create_connection() + + assert tv.ws == mock_ws + mock_create_connection.assert_called_once() + + @patch('tvDatafeed.main.create_connection') + def test_search_symbol(self, mock_create_connection, sample_symbol_search_response): + """Test symbol search""" + with patch('tvDatafeed.main.requests.get') as mock_get: + mock_response = Mock() + mock_response.text = str(sample_symbol_search_response) + mock_get.return_value = mock_response + + tv = TvDatafeed() + results = tv.search_symbol('BTC', 'BINANCE') + + assert isinstance(results, list) + mock_get.assert_called_once() + + @patch('tvDatafeed.main.create_connection') + def test_search_symbol_error(self, mock_create_connection): + """Test symbol search with error""" + with patch('tvDatafeed.main.requests.get') as mock_get: + mock_get.side_effect = Exception("Network error") + + tv = TvDatafeed() + results = tv.search_symbol('BTC', 'BINANCE') + + assert results == [] + + def test_create_df_valid_data(self): + """Test DataFrame creation from valid WebSocket data""" + tv = TvDatafeed() + + # Use a single-line format that matches what __create_df expects + # The regex expects: "s":[{"i":0,"v":[timestamp,o,h,l,c,v]}] + raw_data = '{"s":[{"i":0,"v":[1609459200,29000.0,29500.0,28500.0,29200.0,15000000.0]}]}' + + df = tv._TvDatafeed__create_df(raw_data, 'BTCUSDT') + + assert df is not None + assert isinstance(df, pd.DataFrame) + assert 'symbol' in df.columns + assert df['symbol'].iloc[0] == 'BTCUSDT' + assert 'open' in df.columns + assert 'high' in df.columns + assert 'low' in df.columns + assert 'close' in df.columns + assert 'volume' in df.columns + + def test_create_df_invalid_data(self): + """Test DataFrame creation from invalid data""" + tv = TvDatafeed() + + df = tv._TvDatafeed__create_df('invalid data', 'BTCUSDT') + + assert df is None + + def test_create_df_no_volume(self): + """Test DataFrame creation when volume data is missing""" + # Create data without volume (5 values instead of 6) + # Format: {"s":[{"i":0,"v":[timestamp,o,h,l,c]}]} + raw_data = '{"s":[{"i":0,"v":[1609459200,29000,29500,28500,29200]}]}' + + tv = TvDatafeed() + df = tv._TvDatafeed__create_df(raw_data, 'BTCUSDT') + + # Should still create df with volume=0.0 + assert df is not None + assert 'volume' in df.columns + assert df['volume'].iloc[0] == 0.0 + + def test_auth_captcha_required(self): + """Test authentication when CAPTCHA is required""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'error': 'Please confirm that you are not a robot by clicking the captcha box', + 'code': 'recaptcha_required' + } + + with patch('tvDatafeed.main.requests.Session') as mock_session_class: + mock_session = MagicMock() + mock_session.post.return_value = mock_response + mock_session_class.return_value = mock_session + + with pytest.raises(CaptchaRequiredError) as exc_info: + TvDatafeed(username='testuser', password='testpass') + + # Check that the exception contains the username + assert exc_info.value.username == 'testuser' + # Check that the error message mentions workaround + assert 'CAPTCHA' in str(exc_info.value) + assert 'authToken' in str(exc_info.value) + + def test_auth_token_direct_usage(self): + """Test using pre-obtained auth token directly""" + tv = TvDatafeed(auth_token='test_token_direct') + + assert tv.token == 'test_token_direct' + + def test_auth_token_priority_over_credentials(self): + """Test that auth_token takes priority over username/password""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'user': {'auth_token': 'should_not_be_used'} + } + + with patch('tvDatafeed.main.requests.post', return_value=mock_response) as mock_post: + tv = TvDatafeed( + username='testuser', + password='testpass', + auth_token='token_from_browser' + ) + + # Should use the provided token + assert tv.token == 'token_from_browser' + # Should not call auth endpoint + mock_post.assert_not_called() + + def test_auth_generic_error(self): + """Test authentication with generic error""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'error': 'Invalid credentials', + 'code': 'invalid_credentials' + } + + with patch('tvDatafeed.main.requests.Session') as mock_session_class: + mock_session = MagicMock() + mock_session.post.return_value = mock_response + mock_session_class.return_value = mock_session + + with pytest.raises(AuthenticationError) as exc_info: + TvDatafeed(username='testuser', password='wrongpass') + + # Should raise AuthenticationError, not CaptchaRequiredError + assert not isinstance(exc_info.value, CaptchaRequiredError) + assert 'Invalid credentials' in str(exc_info.value) + + def test_ws_timeout_default(self): + """Test that default WebSocket timeout is used""" + tv = TvDatafeed() + + # Default timeout is either 5s (hardcoded) or 30s (from NetworkConfig) + # depending on whether NetworkConfig is available + assert tv.ws_timeout in [5.0, 30.0] + # If NetworkConfig is available, it should use its recv_timeout (30.0) + # Otherwise, it falls back to __ws_timeout (5.0) + + def test_ws_timeout_custom_parameter(self): + """Test custom WebSocket timeout via parameter""" + tv = TvDatafeed(ws_timeout=30.0) + + assert tv.ws_timeout == 30.0 + + def test_ws_timeout_no_timeout(self): + """Test WebSocket with no timeout (-1)""" + tv = TvDatafeed(ws_timeout=-1) + + assert tv.ws_timeout == -1.0 + + def test_ws_timeout_from_env_variable(self): + """Test WebSocket timeout from environment variable""" + import os + + # Set environment variable + os.environ['TV_WS_TIMEOUT'] = '45.0' + + try: + tv = TvDatafeed() + assert tv.ws_timeout == 45.0 + finally: + # Clean up + os.environ.pop('TV_WS_TIMEOUT', None) + + def test_ws_timeout_parameter_overrides_env(self): + """Test that parameter overrides environment variable""" + import os + + os.environ['TV_WS_TIMEOUT'] = '45.0' + + try: + tv = TvDatafeed(ws_timeout=20.0) + # Parameter should take priority + assert tv.ws_timeout == 20.0 + finally: + os.environ.pop('TV_WS_TIMEOUT', None) + + def test_ws_timeout_invalid_env_variable(self): + """Test handling of invalid environment variable""" + import os + + os.environ['TV_WS_TIMEOUT'] = 'invalid' + + try: + tv = TvDatafeed() + # Should fall back to default + assert tv.ws_timeout == 5.0 + finally: + os.environ.pop('TV_WS_TIMEOUT', None) + + def test_format_search_results_empty(self): + """Test formatting empty search results""" + tv = TvDatafeed() + + formatted = tv.format_search_results([]) + + assert 'No results found' in formatted + + def test_format_search_results_single(self): + """Test formatting single search result""" + tv = TvDatafeed() + + results = [ + { + 'exchange': 'BINANCE', + 'symbol': 'BTCUSDT', + 'description': 'Bitcoin / TetherUS', + 'type': 'crypto' + } + ] + + formatted = tv.format_search_results(results) + + assert 'BINANCE:BTCUSDT' in formatted + assert 'Bitcoin / TetherUS' in formatted + assert 'crypto' in formatted + assert 'Found 1 results' in formatted + + def test_format_search_results_multiple(self): + """Test formatting multiple search results""" + tv = TvDatafeed() + + results = [ + { + 'exchange': 'BINANCE', + 'symbol': 'BTCUSDT', + 'description': 'Bitcoin / TetherUS', + 'type': 'crypto' + }, + { + 'exchange': 'COINBASE', + 'symbol': 'BTCUSD', + 'description': 'Bitcoin / US Dollar', + 'type': 'crypto' + }, + { + 'exchange': 'KRAKEN', + 'symbol': 'BTCEUR', + 'description': 'Bitcoin / Euro', + 'type': 'crypto' + } + ] + + formatted = tv.format_search_results(results, max_results=2) + + # Should only show first 2 + assert 'BINANCE:BTCUSDT' in formatted + assert 'COINBASE:BTCUSD' in formatted + # Third should not be included + assert 'KRAKEN:BTCEUR' not in formatted + assert 'showing first 2' in formatted + + def test_format_search_results_includes_usage(self): + """Test that formatted results include usage examples""" + tv = TvDatafeed() + + results = [ + { + 'exchange': 'BINANCE', + 'symbol': 'BTCUSDT', + 'description': 'Bitcoin / TetherUS', + 'type': 'crypto' + } + ] + + formatted = tv.format_search_results(results) + + assert 'Usage:' in formatted + assert 'tv.get_hist' in formatted + assert 'Example:' in formatted + + def test_verbose_mode_default(self): + """Test that verbose mode is enabled by default""" + tv = TvDatafeed() + + assert tv.verbose is True + + def test_verbose_mode_disabled(self): + """Test that verbose mode can be disabled""" + tv = TvDatafeed(verbose=False) + + assert tv.verbose is False + + def test_verbose_mode_enabled_explicit(self): + """Test that verbose mode can be explicitly enabled""" + tv = TvDatafeed(verbose=True) + + assert tv.verbose is True + + def test_verbose_mode_from_env_true(self): + """Test verbose mode from environment variable (true)""" + import os + + # Test various true values + true_values = ['true', 'True', 'TRUE', '1', 'yes', 'Yes', 'on', 'On'] + + for value in true_values: + os.environ['TV_VERBOSE'] = value + + try: + tv = TvDatafeed() + assert tv.verbose is True, f"Failed for TV_VERBOSE={value}" + finally: + os.environ.pop('TV_VERBOSE', None) + + def test_verbose_mode_from_env_false(self): + """Test verbose mode from environment variable (false)""" + import os + + # Test various false values + false_values = ['false', 'False', 'FALSE', '0', 'no', 'No', 'off', 'Off'] + + for value in false_values: + os.environ['TV_VERBOSE'] = value + + try: + tv = TvDatafeed() + assert tv.verbose is False, f"Failed for TV_VERBOSE={value}" + finally: + os.environ.pop('TV_VERBOSE', None) + + def test_verbose_mode_parameter_overrides_env_true_to_false(self): + """Test that parameter overrides environment variable (env=true, param=false)""" + import os + + os.environ['TV_VERBOSE'] = 'true' + + try: + tv = TvDatafeed(verbose=False) + # Parameter should take priority + assert tv.verbose is False + finally: + os.environ.pop('TV_VERBOSE', None) + + def test_verbose_mode_parameter_overrides_env_false_to_true(self): + """Test that parameter overrides environment variable (env=false, param=true)""" + import os + + os.environ['TV_VERBOSE'] = 'false' + + try: + tv = TvDatafeed(verbose=True) + # Parameter should take priority + assert tv.verbose is True + finally: + os.environ.pop('TV_VERBOSE', None) + + def test_verbose_mode_logging_level_verbose(self): + """Test that verbose mode sets appropriate logging level""" + import logging + from tvDatafeed.main import logger + + # Save original level + original_level = logger.level + + try: + tv = TvDatafeed(verbose=True) + + # Logger should be set to INFO or lower + assert logger.level <= logging.INFO + finally: + # Restore original level + logger.setLevel(original_level) + + def test_verbose_mode_logging_level_quiet(self): + """Test that quiet mode sets appropriate logging level""" + import logging + from tvDatafeed.main import logger + + # Save original level + original_level = logger.level + + try: + tv = TvDatafeed(verbose=False) + + # Logger should be set to WARNING + assert logger.level == logging.WARNING + finally: + # Restore original level + logger.setLevel(original_level) + + def test_verbose_mode_env_not_set(self): + """Test behavior when TV_VERBOSE is not set""" + import os + + # Ensure TV_VERBOSE is not set + os.environ.pop('TV_VERBOSE', None) + + tv = TvDatafeed() + + # Should default to True + assert tv.verbose is True + + def test_verbose_mode_env_invalid_value(self): + """Test behavior when TV_VERBOSE has invalid value""" + import os + + os.environ['TV_VERBOSE'] = 'invalid_value' + + try: + tv = TvDatafeed() + + # Invalid value should be treated as False + # (not in ['true', '1', 'yes', 'on']) + assert tv.verbose is False + finally: + os.environ.pop('TV_VERBOSE', None) + + +@pytest.mark.unit +class TestInterval: + """Test Interval enum""" + + def test_interval_values(self): + """Test that intervals have correct values""" + assert Interval.in_1_minute.value == "1" + assert Interval.in_3_minute.value == "3" + assert Interval.in_5_minute.value == "5" + assert Interval.in_15_minute.value == "15" + assert Interval.in_30_minute.value == "30" + assert Interval.in_45_minute.value == "45" + assert Interval.in_1_hour.value == "1H" + assert Interval.in_2_hour.value == "2H" + assert Interval.in_3_hour.value == "3H" + assert Interval.in_4_hour.value == "4H" + assert Interval.in_daily.value == "1D" + assert Interval.in_weekly.value == "1W" + assert Interval.in_monthly.value == "1M" + + def test_interval_names(self): + """Test interval names""" + assert Interval.in_1_hour.name == "in_1_hour" + assert Interval.in_daily.name == "in_daily" + + +@pytest.mark.unit +class TestDateRangeFeature: + """Test date range search feature (PR #69)""" + + def test_is_valid_date_range_valid(self): + """Test is_valid_date_range with valid date range""" + from datetime import datetime + start = datetime(2024, 1, 1) + end = datetime(2024, 1, 31) + + result = TvDatafeed.is_valid_date_range(start, end) + + assert result is True + + def test_is_valid_date_range_start_after_end(self): + """Test is_valid_date_range when start is after end""" + from datetime import datetime + start = datetime(2024, 1, 31) + end = datetime(2024, 1, 1) + + result = TvDatafeed.is_valid_date_range(start, end) + + assert result is False + + def test_is_valid_date_range_start_equals_end(self): + """Test is_valid_date_range when start equals end""" + from datetime import datetime + start = datetime(2024, 1, 15) + end = datetime(2024, 1, 15) + + result = TvDatafeed.is_valid_date_range(start, end) + + assert result is False + + def test_is_valid_date_range_future_start(self): + """Test is_valid_date_range with future start date""" + from datetime import datetime, timedelta + future = datetime.now() + timedelta(days=30) + end = datetime.now() + timedelta(days=60) + + result = TvDatafeed.is_valid_date_range(future, end) + + assert result is False + + def test_is_valid_date_range_future_end(self): + """Test is_valid_date_range with future end date""" + from datetime import datetime, timedelta + start = datetime(2024, 1, 1) + end = datetime.now() + timedelta(days=30) + + result = TvDatafeed.is_valid_date_range(start, end) + + assert result is False + + def test_is_valid_date_range_before_2000(self): + """Test is_valid_date_range with dates before 2000""" + from datetime import datetime + start = datetime(1999, 1, 1) + end = datetime(1999, 12, 31) + + result = TvDatafeed.is_valid_date_range(start, end) + + assert result is False + + def test_is_valid_date_range_start_before_2000(self): + """Test is_valid_date_range with start before 2000""" + from datetime import datetime + start = datetime(1999, 12, 31) + end = datetime(2024, 1, 1) + + result = TvDatafeed.is_valid_date_range(start, end) + + assert result is False + + def test_get_hist_mutually_exclusive_error(self): + """Test get_hist raises error when both n_bars and date range provided""" + from datetime import datetime + from tvDatafeed import DataValidationError + + tv = TvDatafeed() + + with pytest.raises(DataValidationError) as exc_info: + tv.get_hist( + 'BTCUSDT', + 'BINANCE', + Interval.in_1_hour, + n_bars=100, + start_date=datetime(2024, 1, 1), + end_date=datetime(2024, 1, 31) + ) + + assert 'mutually exclusive' in str(exc_info.value).lower() + + def test_get_hist_only_start_date_error(self): + """Test get_hist raises error when only start_date provided""" + from datetime import datetime + from tvDatafeed import DataValidationError + + tv = TvDatafeed() + + with pytest.raises(DataValidationError) as exc_info: + tv.get_hist( + 'BTCUSDT', + 'BINANCE', + Interval.in_1_hour, + start_date=datetime(2024, 1, 1) + ) + + assert 'both' in str(exc_info.value).lower() + + def test_get_hist_only_end_date_error(self): + """Test get_hist raises error when only end_date provided""" + from datetime import datetime + from tvDatafeed import DataValidationError + + tv = TvDatafeed() + + with pytest.raises(DataValidationError) as exc_info: + tv.get_hist( + 'BTCUSDT', + 'BINANCE', + Interval.in_1_hour, + end_date=datetime(2024, 1, 31) + ) + + assert 'both' in str(exc_info.value).lower() + + def test_get_hist_invalid_date_range_error(self): + """Test get_hist raises error for invalid date range""" + from datetime import datetime + from tvDatafeed import DataValidationError + + tv = TvDatafeed() + + # Start after end + with pytest.raises(DataValidationError) as exc_info: + tv.get_hist( + 'BTCUSDT', + 'BINANCE', + Interval.in_1_hour, + start_date=datetime(2024, 1, 31), + end_date=datetime(2024, 1, 1) + ) + + assert 'invalid date range' in str(exc_info.value).lower() + + def test_get_hist_defaults_to_n_bars_10(self): + """Test get_hist defaults to n_bars=10 when neither n_bars nor dates provided""" + from unittest.mock import patch, MagicMock + + with patch('tvDatafeed.main.create_connection') as mock_create_connection: + mock_ws = MagicMock() + mock_ws.recv.side_effect = [ + '~m~123~m~{"m":"series_completed"}', + ] + mock_create_connection.return_value = mock_ws + + tv = TvDatafeed() + + # Mock __create_df to avoid parsing issues + with patch.object(tv, '_TvDatafeed__create_df') as mock_create_df: + mock_df = pd.DataFrame({ + 'symbol': ['BTCUSDT'] * 10, + 'open': [29000] * 10, + 'high': [29500] * 10, + 'low': [28500] * 10, + 'close': [29200] * 10, + 'volume': [1000] * 10 + }) + mock_create_df.return_value = mock_df + + try: + df = tv.get_hist('BTCUSDT', 'BINANCE', Interval.in_1_hour) + # Should not raise error and should have data + assert df is not None + except Exception: + # This is expected since we're mocking, just verify no validation error + pass + + def test_interval_len_dictionary_exists(self): + """Test that interval_len dictionary is defined with all intervals""" + from tvDatafeed.main import interval_len + + # Check dictionary exists + assert interval_len is not None + assert isinstance(interval_len, dict) + + # Check all intervals are present + expected_intervals = ["1", "3", "5", "15", "30", "45", "1H", "2H", "3H", "4H", "1D", "1W", "1M"] + for interval in expected_intervals: + assert interval in interval_len + assert isinstance(interval_len[interval], int) + assert interval_len[interval] > 0 + + def test_interval_len_values_correct(self): + """Test that interval_len values are correct""" + from tvDatafeed.main import interval_len + + assert interval_len["1"] == 60 # 1 minute + assert interval_len["5"] == 300 # 5 minutes + assert interval_len["1H"] == 3600 # 1 hour + assert interval_len["1D"] == 86400 # 1 day + assert interval_len["1W"] == 604800 # 1 week + assert interval_len["1M"] == 2592000 # 1 month (30 days) + + def test_create_df_with_timezone(self): + """Test __create_df accepts timezone parameter""" + tv = TvDatafeed() + raw_data = '''{"s":[{"v":[1609459200,29000,29500,28500,29200,1000000]}]}''' + + df = tv._TvDatafeed__create_df(raw_data, 'BTCUSDT', 3600, 'America/New_York') + + assert df is not None + assert 'timezone' in df.attrs + assert df.attrs['timezone'] == 'America/New_York' + + def test_create_df_without_timezone_backward_compatible(self): + """Test __create_df still works without timezone (backward compatibility)""" + tv = TvDatafeed() + raw_data = '''{"s":[{"v":[1609459200,29000,29500,28500,29200,1000000]}]}''' + + df = tv._TvDatafeed__create_df(raw_data, 'BTCUSDT') + + assert df is not None + assert 'timezone' not in df.attrs or df.attrs.get('timezone') is None + + +@pytest.mark.unit +class TestTwoFactorAuthentication: + """Test Two-Factor Authentication (2FA/TOTP) support""" + + def test_2fa_params_stored(self, valid_totp_secret): + """Test that 2FA parameters are stored correctly""" + tv = TvDatafeed(totp_secret=valid_totp_secret, totp_code='123456') + + assert tv._totp_secret == valid_totp_secret + assert tv._totp_code == '123456' + + def test_2fa_params_from_env(self, monkeypatch, valid_totp_secret): + """Test that 2FA parameters are read from environment""" + monkeypatch.setenv('TV_TOTP_SECRET', valid_totp_secret) + monkeypatch.setenv('TV_2FA_CODE', '654321') + + tv = TvDatafeed() + + assert tv._totp_secret == valid_totp_secret + assert tv._totp_code == '654321' + + def test_2fa_params_priority_over_env(self, monkeypatch, valid_totp_secret): + """Test that parameters take priority over environment variables""" + monkeypatch.setenv('TV_TOTP_SECRET', 'ENVENVENVENV') + monkeypatch.setenv('TV_2FA_CODE', '111111') + + tv = TvDatafeed(totp_secret=valid_totp_secret, totp_code='999999') + + # Parameters should take priority + assert tv._totp_secret == valid_totp_secret + assert tv._totp_code == '999999' + + def test_get_totp_code_manual(self): + """Test _get_totp_code returns manual code when provided""" + tv = TvDatafeed(totp_code='123456') + + code = tv._get_totp_code() + + assert code == '123456' + + def test_get_totp_code_generated(self, valid_totp_secret): + """Test _get_totp_code generates code from secret""" + tv = TvDatafeed(totp_secret=valid_totp_secret) + + code = tv._get_totp_code() + + # Should be a 6-digit code + assert code is not None + assert len(code) == 6 + assert code.isdigit() + + def test_get_totp_code_none_when_not_configured(self): + """Test _get_totp_code returns None when neither secret nor code provided""" + tv = TvDatafeed() + + code = tv._get_totp_code() + + assert code is None + + def test_get_totp_code_manual_priority(self, valid_totp_secret): + """Test that manual code takes priority over secret""" + tv = TvDatafeed(totp_secret=valid_totp_secret, totp_code='111111') + + code = tv._get_totp_code() + + # Manual code should be returned, not generated one + assert code == '111111' + + def test_get_totp_code_invalid_secret(self): + """Test _get_totp_code raises error for invalid secret""" + from tvDatafeed import ConfigurationError + + tv = TvDatafeed(totp_secret='INVALID!!!') + + with pytest.raises(ConfigurationError) as exc_info: + tv._get_totp_code() + + assert 'Invalid TOTP secret' in str(exc_info.value) + + def test_2fa_flow_success( + self, + mock_2fa_required_response, + mock_2fa_success_response, + valid_totp_secret + ): + """Test successful 2FA authentication flow""" + with patch('tvDatafeed.main.requests.Session') as mock_session_class: + mock_session = MagicMock() + # First call returns 2FA required, second call returns success + mock_session.post.side_effect = [ + mock_2fa_required_response, + mock_2fa_success_response + ] + mock_session_class.return_value = mock_session + + tv = TvDatafeed( + username='testuser', + password='testpass', + totp_secret=valid_totp_secret + ) + + assert tv.token == 'test_token_2fa_12345' + assert mock_session.post.call_count == 2 + + def test_2fa_flow_with_manual_code( + self, + mock_2fa_required_response, + mock_2fa_success_response + ): + """Test 2FA authentication flow with manual code""" + with patch('tvDatafeed.main.requests.Session') as mock_session_class: + mock_session = MagicMock() + mock_session.post.side_effect = [ + mock_2fa_required_response, + mock_2fa_success_response + ] + mock_session_class.return_value = mock_session + + tv = TvDatafeed( + username='testuser', + password='testpass', + totp_code='123456' + ) + + assert tv.token == 'test_token_2fa_12345' + # Verify 2FA endpoint was called with the code + second_call = mock_session.post.call_args_list[1] + assert second_call[1]['data']['code'] == '123456' + + def test_2fa_required_but_not_provided(self, mock_2fa_required_response): + """Test that TwoFactorRequiredError is raised when 2FA is required but not provided""" + from tvDatafeed import TwoFactorRequiredError + + with patch('tvDatafeed.main.requests.Session') as mock_session_class: + mock_session = MagicMock() + mock_session.post.return_value = mock_2fa_required_response + mock_session_class.return_value = mock_session + + with pytest.raises(TwoFactorRequiredError) as exc_info: + TvDatafeed(username='testuser', password='testpass') + + assert 'Two-factor authentication required' in str(exc_info.value) + + def test_2fa_invalid_code( + self, + mock_2fa_required_response, + mock_2fa_invalid_code_response, + valid_totp_secret + ): + """Test 2FA with invalid code raises AuthenticationError""" + with patch('tvDatafeed.main.requests.Session') as mock_session_class: + mock_session = MagicMock() + mock_session.post.side_effect = [ + mock_2fa_required_response, + mock_2fa_invalid_code_response + ] + mock_session_class.return_value = mock_session + + with pytest.raises(AuthenticationError) as exc_info: + TvDatafeed( + username='testuser', + password='testpass', + totp_code='000000' + ) + + assert 'Invalid' in str(exc_info.value) or 'invalid' in str(exc_info.value) + + def test_2fa_totp_secret_cleaned(self, valid_totp_secret): + """Test that TOTP secret is cleaned (spaces removed, uppercase)""" + # Add spaces and lowercase + messy_secret = 'jbsw y3dp ehpk 3pxp' + + tv = TvDatafeed(totp_secret=messy_secret) + + # Should still generate a valid code + code = tv._get_totp_code() + assert code is not None + assert len(code) == 6 + assert code.isdigit() + + def test_2fa_not_required(self, mock_auth_response): + """Test authentication without 2FA when account doesn't have it enabled""" + with patch('tvDatafeed.main.requests.Session') as mock_session_class: + mock_session = MagicMock() + mock_session.post.return_value = mock_auth_response + mock_session_class.return_value = mock_session + + # Should authenticate without 2FA + tv = TvDatafeed(username='testuser', password='testpass') + + assert tv.token == 'test_token_12345' + # Only one call (no 2FA needed) + assert mock_session.post.call_count == 1 + + def test_docstring_examples_present(self): + """Verify that 2FA examples are in the TvDatafeed docstring""" + docstring = TvDatafeed.__init__.__doc__ + + assert 'totp_secret' in docstring + assert 'totp_code' in docstring + assert '2FA' in docstring + assert 'TV_TOTP_SECRET' in docstring + + def test_2fa_url_configured(self): + """Test that 2FA URL is correctly configured""" + assert hasattr(TvDatafeed, '_TvDatafeed__2fa_url') + assert 'two-factor' in TvDatafeed._TvDatafeed__2fa_url + + +@pytest.mark.unit +class TestWebSocketRetryAndTimeout: + """Test WebSocket retry and cumulative timeout features (Phase 2)""" + + # ========================================================================== + # Tests for __create_connection() retry behavior + # ========================================================================== + + @patch('tvDatafeed.main.retry_with_backoff') + def test_create_connection_retry_on_timeout(self, mock_retry_with_backoff): + """Test that connection is retried after a timeout""" + mock_ws = Mock() + mock_retry_with_backoff.return_value = mock_ws + + tv = TvDatafeed() + tv._TvDatafeed__create_connection() + + # Verify retry_with_backoff was called with correct parameters + mock_retry_with_backoff.assert_called_once() + call_kwargs = mock_retry_with_backoff.call_args[1] + + # Check that TimeoutError is in the exceptions to catch + assert TimeoutError in call_kwargs['exceptions'] + assert call_kwargs['max_retries'] == 3 # Default max retries + assert call_kwargs['base_delay'] == 2.0 # Default base delay + assert call_kwargs['max_delay'] == 10.0 # Default max delay + + @patch('tvDatafeed.main.retry_with_backoff') + def test_create_connection_retry_on_connection_error(self, mock_retry_with_backoff): + """Test that connection is retried after a connection error""" + mock_ws = Mock() + mock_retry_with_backoff.return_value = mock_ws + + tv = TvDatafeed() + tv._TvDatafeed__create_connection() + + # Verify retry_with_backoff was called + mock_retry_with_backoff.assert_called_once() + call_kwargs = mock_retry_with_backoff.call_args[1] + + # Check that ConnectionError and OSError are in the exceptions to catch + assert ConnectionError in call_kwargs['exceptions'] + assert OSError in call_kwargs['exceptions'] + + @patch('tvDatafeed.main.retry_with_backoff') + def test_create_connection_max_retries_exceeded_timeout(self, mock_retry_with_backoff): + """Test that WebSocketTimeoutError is raised after all timeout retries fail""" + from tvDatafeed import WebSocketTimeoutError + + # Simulate retry_with_backoff raising TimeoutError after all retries + mock_retry_with_backoff.side_effect = TimeoutError("Connection timed out") + + tv = TvDatafeed() + + with pytest.raises(WebSocketTimeoutError) as exc_info: + tv._TvDatafeed__create_connection() + + assert 'timed out' in str(exc_info.value).lower() + assert '3 retry attempts' in str(exc_info.value) + + @patch('tvDatafeed.main.retry_with_backoff') + def test_create_connection_max_retries_exceeded_connection_error(self, mock_retry_with_backoff): + """Test that WebSocketError is raised after all connection retries fail""" + from tvDatafeed import WebSocketError + + # Simulate retry_with_backoff raising ConnectionError after all retries + mock_retry_with_backoff.side_effect = ConnectionError("Connection refused") + + tv = TvDatafeed() + + with pytest.raises(WebSocketError) as exc_info: + tv._TvDatafeed__create_connection() + + assert 'Failed to connect' in str(exc_info.value) + assert '3 retry attempts' in str(exc_info.value) + + @patch('tvDatafeed.main.create_connection') + @patch('tvDatafeed.main.retry_with_backoff') + def test_create_connection_success_after_retry(self, mock_retry_with_backoff, mock_create_connection): + """Test that connection succeeds after initial failures (simulated by retry_with_backoff)""" + mock_ws = Mock() + # retry_with_backoff successfully returns after internal retries + mock_retry_with_backoff.return_value = mock_ws + + tv = TvDatafeed() + tv._TvDatafeed__create_connection() + + # WebSocket should be set + assert tv.ws == mock_ws + # retry_with_backoff should have been called + mock_retry_with_backoff.assert_called_once() + + @patch('tvDatafeed.main.retry_with_backoff') + def test_create_connection_on_retry_callback_called(self, mock_retry_with_backoff): + """Test that on_retry callback is passed to retry_with_backoff""" + mock_ws = Mock() + mock_retry_with_backoff.return_value = mock_ws + + tv = TvDatafeed() + tv._TvDatafeed__create_connection() + + # Verify on_retry callback was passed + call_kwargs = mock_retry_with_backoff.call_args[1] + assert 'on_retry' in call_kwargs + assert call_kwargs['on_retry'] is not None + assert callable(call_kwargs['on_retry']) + + @patch('tvDatafeed.main.retry_with_backoff') + def test_create_connection_unexpected_error(self, mock_retry_with_backoff): + """Test that unexpected errors are wrapped in WebSocketError""" + from tvDatafeed import WebSocketError + + # Simulate an unexpected error + mock_retry_with_backoff.side_effect = RuntimeError("Unexpected error") + + tv = TvDatafeed() + + with pytest.raises(WebSocketError) as exc_info: + tv._TvDatafeed__create_connection() + + assert 'RuntimeError' in str(exc_info.value) + assert 'Unexpected error' in str(exc_info.value) + + # ========================================================================== + # Tests for __get_response() cumulative timeout behavior + # ========================================================================== + + def test_get_response_cumulative_timeout(self): + """Test that WebSocketTimeoutError is raised after cumulative timeout""" + from tvDatafeed import WebSocketTimeoutError + import time + + tv = TvDatafeed() + # Set a very short max_response_time for testing + tv.max_response_time = 0.1 # 100ms + + # Mock the WebSocket to return data slowly (never completing) + mock_ws = Mock() + # Return data that doesn't contain "series_completed" + mock_ws.recv.return_value = '~m~100~m~{"m":"some_message","p":[]}' + tv.ws = mock_ws + + with pytest.raises(WebSocketTimeoutError) as exc_info: + tv._TvDatafeed__get_response() + + error_msg = str(exc_info.value) + assert 'Timed out' in error_msg + assert 'series_completed' in error_msg + assert 'TV_MAX_RESPONSE_TIME' in error_msg + + def test_get_response_env_var_timeout(self): + """Test that TV_MAX_RESPONSE_TIME environment variable is respected""" + import os + + # Set environment variable + os.environ['TV_MAX_RESPONSE_TIME'] = '120.0' + + try: + tv = TvDatafeed() + assert tv.max_response_time == 120.0 + finally: + os.environ.pop('TV_MAX_RESPONSE_TIME', None) + + def test_get_response_env_var_timeout_invalid(self): + """Test that invalid TV_MAX_RESPONSE_TIME falls back to default""" + import os + + os.environ['TV_MAX_RESPONSE_TIME'] = 'invalid' + + try: + tv = TvDatafeed() + # Should fall back to default (60.0) + assert tv.max_response_time == 60.0 + finally: + os.environ.pop('TV_MAX_RESPONSE_TIME', None) + + def test_get_response_env_var_timeout_negative(self): + """Test that negative TV_MAX_RESPONSE_TIME falls back to default""" + import os + + os.environ['TV_MAX_RESPONSE_TIME'] = '-10.0' + + try: + tv = TvDatafeed() + # Should fall back to default (60.0) + assert tv.max_response_time == 60.0 + finally: + os.environ.pop('TV_MAX_RESPONSE_TIME', None) + + def test_get_response_env_var_timeout_zero(self): + """Test that zero TV_MAX_RESPONSE_TIME falls back to default""" + import os + + os.environ['TV_MAX_RESPONSE_TIME'] = '0' + + try: + tv = TvDatafeed() + # Should fall back to default (60.0) + assert tv.max_response_time == 60.0 + finally: + os.environ.pop('TV_MAX_RESPONSE_TIME', None) + + def test_get_response_success_before_timeout(self): + """Test that normal response works before timeout""" + tv = TvDatafeed() + tv.max_response_time = 60.0 # Generous timeout + + # Mock WebSocket to return series_completed quickly + mock_ws = Mock() + mock_ws.recv.side_effect = [ + '~m~100~m~{"m":"some_message","p":[]}', + '~m~100~m~{"m":"timescale_update","p":[]}', + '~m~100~m~{"m":"series_completed","p":[]}' + ] + tv.ws = mock_ws + + # Should not raise any exception + raw_data = tv._TvDatafeed__get_response() + + assert 'series_completed' in raw_data + assert mock_ws.recv.call_count == 3 + + def test_get_response_per_message_timeout_error(self): + """Test that per-message timeout raises WebSocketTimeoutError""" + from tvDatafeed import WebSocketTimeoutError + + tv = TvDatafeed() + tv.max_response_time = 60.0 + + # Mock WebSocket to raise TimeoutError on recv + mock_ws = Mock() + mock_ws.recv.side_effect = TimeoutError("Timed out waiting for data") + tv.ws = mock_ws + + with pytest.raises(WebSocketTimeoutError) as exc_info: + tv._TvDatafeed__get_response() + + error_msg = str(exc_info.value) + assert 'Timeout' in error_msg + assert 'ws_timeout' in error_msg + + def test_get_response_generic_error(self): + """Test that generic errors during recv raise WebSocketError""" + from tvDatafeed import WebSocketError + + tv = TvDatafeed() + tv.max_response_time = 60.0 + + # Mock WebSocket to raise a generic exception + mock_ws = Mock() + mock_ws.recv.side_effect = Exception("Network error") + tv.ws = mock_ws + + with pytest.raises(WebSocketError) as exc_info: + tv._TvDatafeed__get_response() + + error_msg = str(exc_info.value) + assert 'Error receiving data' in error_msg + assert 'Network error' in error_msg + + def test_get_response_message_count_in_error(self): + """Test that error message includes message count""" + from tvDatafeed import WebSocketTimeoutError + + tv = TvDatafeed() + tv.max_response_time = 0.1 # Very short timeout + + # Mock WebSocket to return multiple messages before timeout + mock_ws = Mock() + mock_ws.recv.return_value = '~m~100~m~{"m":"some_message","p":[]}' + tv.ws = mock_ws + + with pytest.raises(WebSocketTimeoutError) as exc_info: + tv._TvDatafeed__get_response() + + error_msg = str(exc_info.value) + assert 'messages received' in error_msg + + def test_get_response_default_max_response_time(self): + """Test that default max_response_time is 60 seconds""" + import os + + # Ensure environment variable is not set + os.environ.pop('TV_MAX_RESPONSE_TIME', None) + + tv = TvDatafeed() + assert tv.max_response_time == 60.0 + + def test_max_response_time_attribute_exists(self): + """Test that max_response_time attribute is set on TvDatafeed instance""" + tv = TvDatafeed() + + assert hasattr(tv, 'max_response_time') + assert isinstance(tv.max_response_time, float) + assert tv.max_response_time > 0 + + def test_class_default_max_response_time(self): + """Test that class-level __max_response_time is defined""" + # Access the class attribute through name mangling + assert hasattr(TvDatafeed, '_TvDatafeed__max_response_time') + assert TvDatafeed._TvDatafeed__max_response_time == 60.0 + + def test_class_retry_configuration_exists(self): + """Test that class-level retry configuration attributes are defined""" + # Access class attributes through name mangling + assert hasattr(TvDatafeed, '_TvDatafeed__ws_max_retries') + assert hasattr(TvDatafeed, '_TvDatafeed__ws_retry_base_delay') + assert hasattr(TvDatafeed, '_TvDatafeed__ws_retry_max_delay') + + assert TvDatafeed._TvDatafeed__ws_max_retries == 3 + assert TvDatafeed._TvDatafeed__ws_retry_base_delay == 2.0 + assert TvDatafeed._TvDatafeed__ws_retry_max_delay == 10.0 + + +@pytest.mark.unit +class TestTimezoneFeature: + """Tests for timezone parameter in get_hist()""" + + def test_get_timezone_object_utc(self): + """Test that UTC timezone returns datetime.timezone.utc""" + from tvDatafeed.main import _get_timezone_object + import datetime + + tz = _get_timezone_object('UTC') + + assert tz == datetime.timezone.utc + + def test_get_timezone_object_america_new_york(self): + """Test that America/New_York timezone is valid""" + from tvDatafeed.main import _get_timezone_object + + tz = _get_timezone_object('America/New_York') + + assert tz is not None + assert str(tz) in ['America/New_York', 'EST', 'EDT', 'US/Eastern'] + + def test_get_timezone_object_europe_paris(self): + """Test that Europe/Paris timezone is valid""" + from tvDatafeed.main import _get_timezone_object + + tz = _get_timezone_object('Europe/Paris') + + assert tz is not None + + def test_get_timezone_object_invalid_raises_error(self): + """Test that invalid timezone raises ConfigurationError""" + from tvDatafeed.main import _get_timezone_object + from tvDatafeed.exceptions import ConfigurationError + + with pytest.raises(ConfigurationError) as exc_info: + _get_timezone_object('Invalid/Timezone') + + assert 'Invalid timezone' in str(exc_info.value) + + def test_create_df_with_utc_timezone(self): + """Test __create_df converts timestamps to UTC""" + import datetime + + tv = TvDatafeed() + raw_data = '''{"s":[{"v":[1704067200.0,100.0,105.0,99.0,102.0,1000.0]}]}''' + + # Call __create_df with UTC timezone object + df = tv._TvDatafeed__create_df( + raw_data, + 'TEST', + time_zone='UTC', + tz_object=datetime.timezone.utc + ) + + assert df is not None + assert 'timezone' in df.attrs + assert df.attrs['timezone'] == 'UTC' + # Check that datetime is timezone-aware + assert df.index[0].tzinfo is not None + + def test_create_df_without_timezone_backward_compatible(self): + """Test __create_df without timezone (backward compatibility)""" + tv = TvDatafeed() + raw_data = '''{"s":[{"v":[1704067200.0,100.0,105.0,99.0,102.0,1000.0]}]}''' + + # Call __create_df without timezone (should use local time) + df = tv._TvDatafeed__create_df(raw_data, 'TEST') + + assert df is not None + # Should not have timezone attribute or it should be None + assert 'timezone' not in df.attrs or df.attrs.get('timezone') is None + # Check that datetime is timezone-naive (local time) + assert df.index[0].tzinfo is None + + def test_get_hist_timezone_parameter_accepted(self): + """Test that get_hist accepts timezone parameter""" + import inspect + + sig = inspect.signature(TvDatafeed.get_hist) + params = list(sig.parameters.keys()) + + assert 'timezone' in params + + def test_get_hist_timezone_from_env_variable(self, monkeypatch): + """Test that TV_TIMEZONE environment variable is used""" + monkeypatch.setenv('TV_TIMEZONE', 'UTC') + + tv = TvDatafeed() + + # Create a mock WebSocket and test the timezone resolution + with patch.object(tv, '_TvDatafeed__create_connection'): + with patch.object(tv, '_TvDatafeed__send_message'): + with patch.object(tv, '_TvDatafeed__get_response') as mock_response: + mock_response.return_value = '''{"s":[{"v":[1704067200.0,100.0,105.0,99.0,102.0,1000.0]}]}''' + with patch.object(tv, 'ws') as mock_ws: + mock_ws.close = Mock() + + df = tv.get_hist('BTCUSDT', 'BINANCE', Interval.in_daily, n_bars=1) + + # Should use UTC from environment + assert 'timezone' in df.attrs + assert df.attrs['timezone'] == 'UTC' + assert df.index[0].tzinfo is not None + + def test_get_hist_timezone_parameter_overrides_env(self, monkeypatch): + """Test that timezone parameter takes priority over TV_TIMEZONE""" + monkeypatch.setenv('TV_TIMEZONE', 'Europe/London') + + tv = TvDatafeed() + + with patch.object(tv, '_TvDatafeed__create_connection'): + with patch.object(tv, '_TvDatafeed__send_message'): + with patch.object(tv, '_TvDatafeed__get_response') as mock_response: + mock_response.return_value = '''{"s":[{"v":[1704067200.0,100.0,105.0,99.0,102.0,1000.0]}]}''' + with patch.object(tv, 'ws') as mock_ws: + mock_ws.close = Mock() + + # Explicitly pass UTC - should override env var + df = tv.get_hist('BTCUSDT', 'BINANCE', Interval.in_daily, + n_bars=1, timezone='UTC') + + assert df.attrs['timezone'] == 'UTC' + + def test_get_hist_no_timezone_uses_local(self, monkeypatch): + """Test that no timezone uses local system time (backward compatible)""" + # Ensure env var is not set + monkeypatch.delenv('TV_TIMEZONE', raising=False) + + tv = TvDatafeed() + + with patch.object(tv, '_TvDatafeed__create_connection'): + with patch.object(tv, '_TvDatafeed__send_message'): + with patch.object(tv, '_TvDatafeed__get_response') as mock_response: + mock_response.return_value = '''{"s":[{"v":[1704067200.0,100.0,105.0,99.0,102.0,1000.0]}]}''' + with patch.object(tv, 'ws') as mock_ws: + mock_ws.close = Mock() + + df = tv.get_hist('BTCUSDT', 'BINANCE', Interval.in_daily, n_bars=1) + + # Should not have timezone (backward compatible) + assert df.attrs.get('timezone') is None + assert df.index[0].tzinfo is None + + def test_get_hist_invalid_timezone_raises_error(self, monkeypatch): + """Test that invalid timezone raises ConfigurationError""" + from tvDatafeed.exceptions import ConfigurationError + + monkeypatch.delenv('TV_TIMEZONE', raising=False) + tv = TvDatafeed() + + with pytest.raises(ConfigurationError) as exc_info: + tv.get_hist('BTCUSDT', 'BINANCE', Interval.in_daily, + n_bars=1, timezone='Invalid/Timezone') + + assert 'Invalid timezone' in str(exc_info.value) + + def test_timezone_converts_timestamp_correctly_utc(self): + """Test that UTC timezone converts timestamps correctly""" + import datetime + + tv = TvDatafeed() + # Unix timestamp 1704067200 = 2024-01-01 00:00:00 UTC + # Raw data format must match the regex in __create_df + raw_data = '"s":[{"i":0,"v":[1704067200.0,100.0,105.0,99.0,102.0,1000.0]}]}' + + df = tv._TvDatafeed__create_df( + raw_data, + 'TEST', + time_zone='UTC', + tz_object=datetime.timezone.utc + ) + + # In UTC, this should be 2024-01-01 00:00:00 + expected_dt = datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) + assert df.index[0] == expected_dt + + def test_timezone_converts_timestamp_correctly_est(self): + """Test that America/New_York timezone converts timestamps correctly""" + from tvDatafeed.main import _get_timezone_object + import datetime + + tv = TvDatafeed() + # Unix timestamp 1704067200 = 2024-01-01 00:00:00 UTC = 2023-12-31 19:00:00 EST + raw_data = '"s":[{"i":0,"v":[1704067200.0,100.0,105.0,99.0,102.0,1000.0]}]}' + + tz = _get_timezone_object('America/New_York') + df = tv._TvDatafeed__create_df( + raw_data, + 'TEST', + time_zone='America/New_York', + tz_object=tz + ) + + # In EST (UTC-5), this should be 2023-12-31 19:00:00 + assert df.index[0].year == 2023 + assert df.index[0].month == 12 + assert df.index[0].day == 31 + assert df.index[0].hour == 19 + assert df.index[0].minute == 0 + + def test_common_timezones_are_valid(self): + """Test that common trading timezones are valid""" + from tvDatafeed.main import _get_timezone_object + + common_timezones = [ + 'UTC', + 'America/New_York', + 'America/Chicago', + 'America/Los_Angeles', + 'Europe/London', + 'Europe/Paris', + 'Europe/Berlin', # Frankfurt uses Berlin timezone + 'Asia/Tokyo', + 'Asia/Hong_Kong', + 'Asia/Singapore', + 'Asia/Shanghai', + 'Australia/Sydney' + ] + + for tz_name in common_timezones: + tz = _get_timezone_object(tz_name) + assert tz is not None, f"Timezone {tz_name} should be valid" + + def test_docstring_includes_timezone_examples(self): + """Test that get_hist docstring includes timezone examples""" + docstring = TvDatafeed.get_hist.__doc__ + + assert 'timezone' in docstring.lower() + assert 'UTC' in docstring + assert 'America/New_York' in docstring + assert 'TV_TIMEZONE' in docstring diff --git a/tvDatafeed/main.py b/tvDatafeed/main.py index a696d15..ca03e01 100644 --- a/tvDatafeed/main.py +++ b/tvDatafeed/main.py @@ -11,11 +11,28 @@ import os import re import time -from typing import Optional +from typing import Optional, Union import pandas as pd from websocket import create_connection import requests +# Timezone handling: prefer zoneinfo (Python 3.9+), fallback to pytz +ZONEINFO_AVAILABLE = False +PYTZ_AVAILABLE = False + +try: + from zoneinfo import ZoneInfo + ZONEINFO_AVAILABLE = True +except ImportError: + pass + +if not ZONEINFO_AVAILABLE: + try: + import pytz + PYTZ_AVAILABLE = True + except ImportError: + pass + from .exceptions import ( AuthenticationError, CaptchaRequiredError, @@ -45,6 +62,57 @@ logger = logging.getLogger(__name__) +def _get_timezone_object(tz_name: str): + """ + Get a timezone object from a timezone name string. + + Parameters + ---------- + tz_name : str + Timezone name (e.g., 'UTC', 'America/New_York', 'Europe/Paris') + + Returns + ------- + timezone object + A timezone object compatible with datetime.fromtimestamp() + + Raises + ------ + ConfigurationError + If the timezone name is invalid or timezone libraries are not available + """ + if tz_name == 'UTC': + return datetime.timezone.utc + + if ZONEINFO_AVAILABLE: + try: + return ZoneInfo(tz_name) + except KeyError: + raise ConfigurationError( + "timezone", + tz_name, + f"Invalid timezone: '{tz_name}'. Use a valid IANA timezone name " + f"(e.g., 'UTC', 'America/New_York', 'Europe/Paris')." + ) + elif PYTZ_AVAILABLE: + try: + return pytz.timezone(tz_name) + except pytz.exceptions.UnknownTimeZoneError: + raise ConfigurationError( + "timezone", + tz_name, + f"Invalid timezone: '{tz_name}'. Use a valid IANA timezone name " + f"(e.g., 'UTC', 'America/New_York', 'Europe/Paris')." + ) + else: + raise ConfigurationError( + "timezone", + tz_name, + "Timezone conversion requires Python 3.9+ (zoneinfo) or pytz library. " + "Install pytz with: pip install pytz" + ) + + class Interval(enum.Enum): in_1_minute = "1" in_3_minute = "3" @@ -663,7 +731,8 @@ def __create_df( raw_data: str, symbol: str, interval_len: Optional[int] = None, - time_zone: Optional[str] = None + time_zone: Optional[str] = None, + tz_object: Optional[datetime.timezone] = None ) -> Optional[pd.DataFrame]: """ Create pandas DataFrame from raw WebSocket data @@ -678,7 +747,11 @@ def __create_df( Interval length in seconds (for timezone-aware data) time_zone : str, optional Timezone name (e.g., 'America/New_York', 'UTC') - If provided, adds timezone metadata to the DataFrame + Used for DataFrame metadata (df.attrs['timezone']) + tz_object : timezone object, optional + Timezone object for datetime conversion. + If provided, timestamps will be converted to this timezone. + If None, timestamps use the local system timezone (backward compatible). Returns ------- @@ -690,6 +763,10 @@ def __create_df( The DataFrame includes columns: symbol, datetime (index), open, high, low, close, volume. If time_zone is provided, timezone information is added to the DataFrame metadata. + + Timezone behavior: + - If tz_object is provided: datetimes are timezone-aware in that timezone + - If tz_object is None: datetimes are timezone-naive in local time (default) """ try: out = re.search(r'"s":\[(.+?)\}\]', raw_data).group(1) @@ -699,7 +776,16 @@ def __create_df( for xi in x: xi = re.split(r"\[|:|,|\]", xi) - ts = datetime.datetime.fromtimestamp(float(xi[4])) + # Convert Unix timestamp to datetime + # TradingView sends timestamps as Unix epoch (seconds) + unix_ts = float(xi[4]) + + if tz_object is not None: + # Timezone-aware datetime in specified timezone + ts = datetime.datetime.fromtimestamp(unix_ts, tz=tz_object) + else: + # Timezone-naive datetime in local time (backward compatible) + ts = datetime.datetime.fromtimestamp(unix_ts) row = [ts] @@ -953,6 +1039,7 @@ def get_hist( extended_session: bool = False, start_date: Optional[datetime.datetime] = None, end_date: Optional[datetime.datetime] = None, + timezone: Optional[str] = None, ) -> Optional[pd.DataFrame]: """Get historical OHLCV data from TradingView @@ -979,12 +1066,26 @@ def get_hist( end_date : datetime, optional End date for historical data query. Mutually exclusive with n_bars. Must be used together with start_date. Can be timezone-aware or naive. + timezone : str, optional + Timezone for the datetime index in the returned DataFrame. + Use IANA timezone names (e.g., 'UTC', 'America/New_York', 'Europe/Paris'). + Can also be set via TV_TIMEZONE environment variable. + Priority: parameter > environment variable > None (local system timezone). + If None, timestamps are in local system timezone (backward compatible). + Common values: + - 'UTC': Coordinated Universal Time + - 'America/New_York': US Eastern Time (EST/EDT) + - 'Europe/London': UK time (GMT/BST) + - 'Asia/Tokyo': Japan Standard Time Returns ------- pd.DataFrame or None DataFrame with columns: symbol, datetime (index), open, high, low, close, volume - If start_date/end_date used, DataFrame includes timezone metadata in df.attrs. + The datetime index timezone depends on the `timezone` parameter: + - If timezone is specified: datetime index is timezone-aware in that timezone + - If timezone is None: datetime index is timezone-naive in local time + Timezone metadata is stored in df.attrs['timezone']. Returns None if no data is available. Raises @@ -995,6 +1096,8 @@ def get_hist( If only one of start_date/end_date is provided InvalidIntervalError If interval is not supported + ConfigurationError + If timezone name is invalid or timezone library not available WebSocketError If WebSocket connection fails DataNotFoundError @@ -1002,32 +1105,54 @@ def get_hist( Examples -------- - >>> # Using n_bars (traditional method) + >>> # Using n_bars (traditional method) - local timezone (default) >>> tv = TvDatafeed() >>> df = tv.get_hist('BTCUSDT', 'BINANCE', Interval.in_1_hour, n_bars=100) >>> print(df.head()) >>> - >>> # Using date range (new method) + >>> # Get data in UTC timezone + >>> df = tv.get_hist('BTCUSDT', 'BINANCE', Interval.in_1_hour, n_bars=100, timezone='UTC') + >>> print(df.head()) # Datetime index is now in UTC + >>> print(f"Timezone: {df.attrs.get('timezone')}") # Output: UTC + >>> + >>> # Get data in US Eastern timezone + >>> df = tv.get_hist('AAPL', 'NASDAQ', Interval.in_daily, n_bars=50, + ... timezone='America/New_York') + >>> print(df.head()) # Datetime index is in EST/EDT + >>> + >>> # Using date range with timezone >>> from datetime import datetime >>> df = tv.get_hist( ... 'BTCUSDT', 'BINANCE', Interval.in_1_hour, ... start_date=datetime(2024, 1, 1), - ... end_date=datetime(2024, 1, 31) + ... end_date=datetime(2024, 1, 31), + ... timezone='UTC' ... ) >>> print(df.head()) - >>> print(f"Timezone: {df.attrs.get('timezone', 'Not set')}") Notes ----- - n_bars and date range (start_date/end_date) are mutually exclusive - Date range validation: start < end, no future dates, after 2000-01-01 - TradingView API applies a -30min adjustment to timestamps internally - - Timezone information is preserved in df.attrs when using date ranges + - Timezone can be set via TV_TIMEZONE environment variable + - Requires Python 3.9+ (zoneinfo) or pytz for non-UTC timezones """ # Validate inputs symbol = Validators.validate_symbol(symbol, allow_formatted=True) exchange = Validators.validate_exchange(exchange) + # Resolve timezone: parameter > environment variable > None (local) + tz_name = timezone + if tz_name is None: + tz_name = os.getenv('TV_TIMEZONE') + + # Get timezone object if specified + tz_object = None + if tz_name: + tz_object = _get_timezone_object(tz_name) + logger.debug(f"Using timezone: {tz_name}") + # Validate interval if not isinstance(interval, Interval): raise InvalidIntervalError( @@ -1187,11 +1312,14 @@ def get_hist( # Get interval length for timezone metadata interval_seconds = interval_len.get(interval) - # Create DataFrame with timezone metadata if using date range - if using_date_range: - df = self.__create_df(raw_data, symbol, interval_seconds, "exchange") - else: - df = self.__create_df(raw_data, symbol) + # Create DataFrame with timezone conversion and metadata + df = self.__create_df( + raw_data=raw_data, + symbol=symbol, + interval_len=interval_seconds, + time_zone=tz_name, + tz_object=tz_object + ) if df is None: logger.warning(f"No data returned for {symbol} on {exchange}")