From 21b6ac465450f2c140c7cdc66651cf9cc0799299 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Palancher?= Date: Wed, 8 Oct 2025 10:52:04 +0200 Subject: [PATCH 1/2] feat(log): auto-pager feature --- CHANGELOG.md | 7 + src/log/pyproject.toml | 4 +- src/log/rfl/log/__init__.py | 10 +- src/log/rfl/log/pager.py | 292 ++++++++++++++++++++++++++++++++++++ 4 files changed, 311 insertions(+), 2 deletions(-) create mode 100644 src/log/rfl/log/pager.py diff --git a/CHANGELOG.md b/CHANGELOG.md index e6bd170..ef76080 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [unreleased] + +### Added +- log: + - Introduce AutoPager context manager for auto-paging feature on TTY (#44). + - Add dependency on `rfl.core`. + ## [1.5.0] - 2025-06-27 ### Added diff --git a/src/log/pyproject.toml b/src/log/pyproject.toml index 4891017..d131e91 100644 --- a/src/log/pyproject.toml +++ b/src/log/pyproject.toml @@ -12,7 +12,9 @@ keywords = ["utility", "basic"] authors = [ {name = "Rémi Palancher", email = "remi@rackslab.io"}, ] -dependencies = [] +dependencies = [ + "RFL.core", +] classifiers = [ "Development Status :: 5 - Production/Stable", "Environment :: Console", diff --git a/src/log/rfl/log/__init__.py b/src/log/rfl/log/__init__.py index 877fa8d..7d08cc8 100644 --- a/src/log/rfl/log/__init__.py +++ b/src/log/rfl/log/__init__.py @@ -8,8 +8,16 @@ import logging from .formatters import TTYFormatter, DaemonFormatter, auto_formatter +from .pager import AutoPager, enable_auto_paging, PagerError -__all__ = [TTYFormatter, DaemonFormatter, auto_formatter] +__all__ = [ + TTYFormatter, + DaemonFormatter, + auto_formatter, + AutoPager, + enable_auto_paging, + PagerError, +] def setup_logger( diff --git a/src/log/rfl/log/pager.py b/src/log/rfl/log/pager.py new file mode 100644 index 0000000..dd9d282 --- /dev/null +++ b/src/log/rfl/log/pager.py @@ -0,0 +1,292 @@ +# Copyright (c) 2025 Rackslab +# +# This file is part of RFL. +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import os +import sys +import subprocess +import shutil +import typing as t +import logging +import atexit + +from rfl.core.errors import RFLError + + +class PagerError(RFLError): + """Exception raised when pager operations fail.""" + + pass + + +class AutoPager: + """ + Auto-paging utility that redirects all program output (stdout/stderr) to a pager. + + This class provides automatic paging functionality for all program outputs, + including both logging and non-logging output. It automatically detects + the best available pager and handles terminal capabilities. + """ + + def __init__( + self, + pager: t.Optional[str] = None, + auto_detect: bool = True, + fallback_to_cat: bool = True, + ): + """ + Initialize the pager. + + Args: + pager: Specific pager command to use (e.g., 'less', 'more', 'cat') + auto_detect: Whether to automatically detect the best available pager + fallback_to_cat: Whether to fallback to 'cat' if no pager is found + """ + self.pager = pager + self.auto_detect = auto_detect + self.fallback_to_cat = fallback_to_cat + self._original_stdout = None + self._original_stderr = None + self._pager_process = None + self._pipe_read = None + self._pipe_write = None + self._logging_handlers_original_streams = [] + + def _detect_pager(self) -> str: + """ + Detect the best available pager for the current environment. + + Returns: + The pager command to use + + Raises: + PagerError: If no suitable pager is found + """ + # Check if a specific pager was requested + if self.pager: + if shutil.which(self.pager): + return self.pager + else: + raise PagerError(f"Requested pager '{self.pager}' not found") + + # Check environment variables + env_pager = os.environ.get("PAGER") + if env_pager and shutil.which(env_pager): + return env_pager + + # Check common pagers in order of preference + preferred_pagers = ["pager", "less", "more", "most", "pg"] + for pager in preferred_pagers: + if shutil.which(pager): + return pager + + # Fallback to cat if enabled + if self.fallback_to_cat and shutil.which("cat"): + return "cat" + + raise PagerError("No suitable pager found") + + def _is_tty(self) -> bool: + """Check if stdout is connected to a TTY.""" + return sys.stdout.isatty() + + def _should_page(self) -> bool: + """ + Determine if output should be paged. + + Returns: + True if output should be paged, False otherwise + """ + # Don't page if not a TTY + if not self._is_tty(): + return False + + # Don't page if explicitly disabled + if os.environ.get("NO_PAGER"): + return False + + return True + + def start(self) -> None: + """ + Start the pager and redirect stdout/stderr to it. + + Raises: + PagerError: If pager cannot be started + """ + if not self._should_page(): + return + + try: + pager_cmd = self._detect_pager() + except PagerError: + # If no pager is available, continue without paging + return + + # Store original streams + self._original_stdout = sys.stdout + self._original_stderr = sys.stderr + + # Create anonymous pipe + self._pipe_read, self._pipe_write = os.pipe() + + # Prepare environment for pager with enhanced settings + pager_env = os.environ.copy() + # Selected environment variables for better user experience: + # LESS: + # - F: quit if less than one screen + # - R: interpret raw control chars (eg. ANSI colors) + # - X: disable termcap init/deinit + # MORE: + # - F: quit if less than one screen + # - R: interpret raw control chars (eg. ANSI colors) + # - X: disable termcap init/deinit + # LV: + # - C: color mode + pager_env.update({"LESS": "FRX", "MORE": "FRX", "LV": "C"}) + + # Start pager process + self._pager_process = subprocess.Popen( + [pager_cmd], + stdin=self._pipe_read, + stdout=self._original_stdout, + stderr=self._original_stderr, + text=True, + env=pager_env, + ) + + # Close the read end in parent process + os.close(self._pipe_read) + self._pipe_read = None + + # Redirect stdout and stderr to the write end of the pipe + # Use line buffering to ensure short-lived outputs are flushed promptly + sys.stdout = os.fdopen(self._pipe_write, "w", buffering=1) + sys.stderr = sys.stdout + + # Redirect logging handlers that were writing to the original streams + self._redirect_logging_handlers() + + # Ensure pager is stopped and output flushed on interpreter exit + atexit.register(self.stop) + + def stop(self) -> None: + """ + Stop the pager and restore original stdout/stderr. + """ + # If paging was started (original streams saved), we must restore and flush + if self._original_stdout is None: + return + + # Close the write end of the pipe to signal EOF to pager + if sys.stdout != self._original_stdout: + sys.stdout.close() + + # Restore original streams + sys.stdout = self._original_stdout + sys.stderr = self._original_stderr + + # Restore logging handler streams + self._restore_logging_handlers() + + # Wait for pager process to complete + if self._pager_process: + try: + self._pager_process.wait() + except Exception as e: + raise PagerError(f"Failed to wait for pager process: {e}") + + # Clean up pipe write end if still open + if self._pipe_write is not None: + try: + os.close(self._pipe_write) + except OSError: + pass # Already closed + + def _redirect_logging_handlers(self) -> None: + """Redirect handlers targeting original std streams to the pager pipe. + + Stores original handler streams for later restoration. + """ + self._logging_handlers_original_streams = [] + try: + root_logger = logging.getLogger() + for handler in getattr(root_logger, "handlers", []): + handler_stream = getattr(handler, "stream", None) + if handler_stream in ( + self._original_stdout, + self._original_stderr, + ): + self._logging_handlers_original_streams.append( + (handler, handler_stream) + ) + handler.stream = sys.stdout + except Exception: + # Logging configuration may be unusual; fail safe and continue + self._logging_handlers_original_streams = [] + + def _restore_logging_handlers(self) -> None: + """Restore logging handler streams saved during redirection.""" + for handler, original_stream in self._logging_handlers_original_streams: + try: + handler.stream = original_stream + except Exception: + # Best-effort restoration + pass + self._logging_handlers_original_streams = [] + + def __enter__(self): + """Context manager entry.""" + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.stop() + + def __del__(self): + """Cleanup on destruction.""" + if self._pipe_write is not None: + try: + os.close(self._pipe_write) + except OSError: + pass # Already closed + if self._pager_process and self._pager_process.poll() is None: + try: + self._pager_process.terminate() + except OSError: + pass # Process already terminated + + +def enable_auto_paging( + pager: t.Optional[str] = None, + auto_detect: bool = True, + fallback_to_cat: bool = True, +) -> AutoPager: + """ + Enable auto-paging for the current program. + + This function starts paging immediately and returns a AutoPager instance + that should be stopped when paging is no longer needed. + + Args: + pager: Specific pager command to use + auto_detect: Whether to automatically detect the best available pager + fallback_to_cat: Whether to fallback to 'cat' if no pager is found + + Returns: + AutoPager instance that can be used to stop paging + + Example: + >>> from rfl.log import enable_auto_paging + >>> pager = enable_auto_paging() + >>> print("This will be paged") + >>> pager.stop() + """ + pager_instance = AutoPager( + pager=pager, auto_detect=auto_detect, fallback_to_cat=fallback_to_cat + ) + pager_instance.start() + return pager_instance From f4e394944d8461de72182440ec075359e46def80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Palancher?= Date: Wed, 8 Oct 2025 21:26:07 +0200 Subject: [PATCH 2/2] tests(log): cover pager module --- src/log/rfl/tests/test_pager.py | 550 ++++++++++++++++++++++++++++++++ 1 file changed, 550 insertions(+) create mode 100644 src/log/rfl/tests/test_pager.py diff --git a/src/log/rfl/tests/test_pager.py b/src/log/rfl/tests/test_pager.py new file mode 100644 index 0000000..7ffcf1b --- /dev/null +++ b/src/log/rfl/tests/test_pager.py @@ -0,0 +1,550 @@ +# Copyright (c) 2025 Rackslab +# +# This file is part of RFL. +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import unittest +from unittest import mock +import os +import sys +import logging + +from rfl.log.pager import AutoPager, PagerError, enable_auto_paging + + +class TestAutoPagerInit(unittest.TestCase): + """Test the AutoPager constructor.""" + + def test_init_default_parameters(self): + """Test AutoPager initialization with default parameters.""" + pager = AutoPager() + self.assertIsNone(pager.pager) + self.assertTrue(pager.fallback_to_cat) + self.assertIsNone(pager._original_stdout) + self.assertIsNone(pager._original_stderr) + self.assertIsNone(pager._pager_process) + self.assertIsNone(pager._pipe_read) + self.assertIsNone(pager._pipe_write) + self.assertEqual(pager._logging_handlers_original_streams, []) + + def test_init_custom_parameters(self): + """Test AutoPager initialization with custom parameters.""" + pager = AutoPager(pager="less", fallback_to_cat=False) + self.assertEqual(pager.pager, "less") + self.assertFalse(pager.fallback_to_cat) + + def test_init_pager_none(self): + """Test AutoPager initialization with pager=None.""" + pager = AutoPager(pager=None) + self.assertIsNone(pager.pager) + + +class TestAutoPagerStart(unittest.TestCase): + """Test the AutoPager.start() method.""" + + def setUp(self): + """Set up test environment.""" + self.original_stdout = sys.stdout + self.original_stderr = sys.stderr + self.original_environ = os.environ.copy() + + def tearDown(self): + """Clean up test environment.""" + sys.stdout = self.original_stdout + sys.stderr = self.original_stderr + os.environ.clear() + os.environ.update(self.original_environ) + + @mock.patch("sys.stdout.isatty") + @mock.patch("os.environ.get") + def test_start_no_tty(self, mock_env_get, mock_isatty): + """Test start() when not connected to a TTY.""" + mock_isatty.return_value = False + mock_env_get.return_value = None + + pager = AutoPager() + pager.start() + + # Should not start paging when not a TTY + self.assertIsNone(pager._original_stdout) + self.assertIsNone(pager._original_stderr) + + @mock.patch("sys.stdout.isatty") + @mock.patch("os.environ.get") + def test_start_no_pager_env_var(self, mock_env_get, mock_isatty): + """Test start() when NO_PAGER environment variable is set.""" + mock_isatty.return_value = True + mock_env_get.side_effect = lambda key: "1" if key == "NO_PAGER" else None + + pager = AutoPager() + pager.start() + + # Should not start paging when NO_PAGER is set + self.assertIsNone(pager._original_stdout) + self.assertIsNone(pager._original_stderr) + + @mock.patch("sys.stdout.isatty") + @mock.patch("os.environ.get") + @mock.patch("shutil.which") + @mock.patch("subprocess.Popen") + @mock.patch("os.pipe") + @mock.patch("os.close") + @mock.patch("os.fdopen") + def test_start_successful( + self, + mock_fdopen, + mock_close, + mock_pipe, + mock_popen, + mock_which, + mock_env_get, + mock_isatty, + ): + """Test successful start() with pager detection.""" + mock_isatty.return_value = True + mock_env_get.return_value = None + mock_which.return_value = "/usr/bin/less" + mock_pipe.return_value = (3, 4) # read_fd, write_fd + mock_process = mock.MagicMock() + mock_popen.return_value = mock_process + mock_file = mock.MagicMock() + mock_fdopen.return_value = mock_file + + pager = AutoPager() + pager.start() + + # Should have started paging + self.assertEqual(pager._original_stdout, self.original_stdout) + self.assertEqual(pager._original_stderr, self.original_stderr) + self.assertEqual(pager._pager_process, mock_process) + # _pipe_read is set to None after os.close() is called + self.assertIsNone(pager._pipe_read) + self.assertEqual(pager._pipe_write, 4) + + @mock.patch("sys.stdout.isatty") + @mock.patch("os.environ.get") + @mock.patch("shutil.which") + def test_start_no_pager_available(self, mock_which, mock_env_get, mock_isatty): + """Test start() when no pager is available and fallback is disabled.""" + mock_isatty.return_value = True + mock_env_get.return_value = None + mock_which.return_value = None + + pager = AutoPager(fallback_to_cat=False) + pager.start() + + # Should not start paging when no pager is available + self.assertIsNone(pager._original_stdout) + self.assertIsNone(pager._original_stderr) + + @mock.patch("sys.stdout.isatty") + @mock.patch("os.environ.get") + @mock.patch("shutil.which") + @mock.patch("subprocess.Popen") + @mock.patch("os.pipe") + @mock.patch("os.close") + @mock.patch("os.fdopen") + def test_start_with_specific_pager( + self, + mock_fdopen, + mock_close, + mock_pipe, + mock_popen, + mock_which, + mock_env_get, + mock_isatty, + ): + """Test start() with a specific pager command.""" + mock_isatty.return_value = True + mock_env_get.return_value = None + mock_which.return_value = "/usr/bin/more" + mock_pipe.return_value = (3, 4) + mock_process = mock.MagicMock() + mock_popen.return_value = mock_process + mock_file = mock.MagicMock() + mock_fdopen.return_value = mock_file + + pager = AutoPager(pager="more") + pager.start() + + # Should have started with the specific pager + mock_popen.assert_called_once() + call_args = mock_popen.call_args[0][0] + self.assertEqual(call_args, ["more"]) + + @mock.patch("sys.stdout.isatty") + @mock.patch("os.environ.get") + @mock.patch("shutil.which") + def test_start_specific_pager_not_found( + self, mock_which, mock_env_get, mock_isatty + ): + """Test start() when specific pager is not found.""" + mock_isatty.return_value = True + mock_env_get.return_value = None + mock_which.return_value = None + + pager = AutoPager(pager="nonexistent_pager") + pager.start() + + # Should not start paging when specific pager is not found + self.assertIsNone(pager._original_stdout) + self.assertIsNone(pager._original_stderr) + + +class TestAutoPagerStop(unittest.TestCase): + """Test the AutoPager.stop() method.""" + + def setUp(self): + """Set up test environment.""" + self.original_stdout = sys.stdout + self.original_stderr = sys.stderr + + def tearDown(self): + """Clean up test environment.""" + sys.stdout = self.original_stdout + sys.stderr = self.original_stderr + + def test_stop_not_started(self): + """Test stop() when pager was not started.""" + pager = AutoPager() + # Don't call start() + pager.stop() + + # Should not raise any errors + self.assertIsNone(pager._original_stdout) + + @mock.patch("subprocess.Popen") + @mock.patch("os.pipe") + @mock.patch("sys.stdout.isatty") + @mock.patch("os.environ.get") + @mock.patch("shutil.which") + @mock.patch("os.close") + @mock.patch("os.fdopen") + def test_stop_successful( + self, + mock_fdopen, + mock_close, + mock_which, + mock_env_get, + mock_isatty, + mock_pipe, + mock_popen, + ): + """Test successful stop() after start().""" + # Setup mocks + mock_isatty.return_value = True + mock_env_get.return_value = None + mock_which.return_value = "/usr/bin/less" + mock_pipe.return_value = (3, 4) + mock_process = mock.MagicMock() + mock_process.wait.return_value = 0 + mock_popen.return_value = mock_process + mock_file = mock.MagicMock() + mock_fdopen.return_value = mock_file + + pager = AutoPager() + pager.start() + pager.stop() + + # Should have restored original streams + self.assertEqual(sys.stdout, self.original_stdout) + self.assertEqual(sys.stderr, self.original_stderr) + # Should have waited for pager process + mock_process.wait.assert_called_once() + + @mock.patch("subprocess.Popen") + @mock.patch("os.pipe") + @mock.patch("sys.stdout.isatty") + @mock.patch("os.environ.get") + @mock.patch("shutil.which") + @mock.patch("os.close") + @mock.patch("os.fdopen") + def test_stop_process_wait_error( + self, + mock_fdopen, + mock_close, + mock_which, + mock_env_get, + mock_isatty, + mock_pipe, + mock_popen, + ): + """Test stop() when process.wait() raises an exception.""" + # Setup mocks + mock_isatty.return_value = True + mock_env_get.return_value = None + mock_which.return_value = "/usr/bin/less" + mock_pipe.return_value = (3, 4) + mock_process = mock.MagicMock() + mock_process.wait.side_effect = Exception("Process error") + mock_popen.return_value = mock_process + mock_file = mock.MagicMock() + mock_fdopen.return_value = mock_file + + pager = AutoPager() + pager.start() + + # Should raise PagerError when process.wait() fails + with self.assertRaises(PagerError) as context: + pager.stop() + self.assertIn("Failed to wait for pager process", str(context.exception)) + + +class TestAutoPagerContextManager(unittest.TestCase): + """Test the AutoPager context manager functionality.""" + + def setUp(self): + """Set up test environment.""" + self.original_stdout = sys.stdout + self.original_stderr = sys.stderr + + def tearDown(self): + """Clean up test environment.""" + sys.stdout = self.original_stdout + sys.stderr = self.original_stderr + + @mock.patch("sys.stdout.isatty") + @mock.patch("os.environ.get") + @mock.patch("shutil.which") + @mock.patch("subprocess.Popen") + @mock.patch("os.pipe") + @mock.patch("os.close") + @mock.patch("os.fdopen") + def test_context_manager_success( + self, + mock_fdopen, + mock_close, + mock_pipe, + mock_popen, + mock_which, + mock_env_get, + mock_isatty, + ): + """Test context manager with successful pager start/stop.""" + # Setup mocks + mock_isatty.return_value = True + mock_env_get.return_value = None + mock_which.return_value = "/usr/bin/less" + mock_pipe.return_value = (3, 4) + mock_process = mock.MagicMock() + mock_process.wait.return_value = 0 + mock_popen.return_value = mock_process + mock_file = mock.MagicMock() + mock_fdopen.return_value = mock_file + + with AutoPager() as pager: + # Should have started paging + self.assertEqual(pager._original_stdout, self.original_stdout) + self.assertEqual(pager._original_stderr, self.original_stderr) + + # Should have restored streams after context exit + self.assertEqual(sys.stdout, self.original_stdout) + self.assertEqual(sys.stderr, self.original_stderr) + + @mock.patch("sys.stdout.isatty") + @mock.patch("os.environ.get") + @mock.patch("shutil.which") + @mock.patch("subprocess.Popen") + @mock.patch("os.pipe") + @mock.patch("os.close") + @mock.patch("os.fdopen") + def test_context_manager_exception( + self, + mock_fdopen, + mock_close, + mock_pipe, + mock_popen, + mock_which, + mock_env_get, + mock_isatty, + ): + """Test context manager with exception during execution.""" + # Setup mocks + mock_isatty.return_value = True + mock_env_get.return_value = None + mock_which.return_value = "/usr/bin/less" + mock_pipe.return_value = (3, 4) + mock_process = mock.MagicMock() + mock_process.wait.return_value = 0 + mock_popen.return_value = mock_process + mock_file = mock.MagicMock() + mock_fdopen.return_value = mock_file + + with self.assertRaises(ValueError): + with AutoPager() as pager: + # Should have started paging + self.assertEqual(pager._original_stdout, self.original_stdout) + raise ValueError("Test exception") + + # Should have restored streams even after exception + self.assertEqual(sys.stdout, self.original_stdout) + self.assertEqual(sys.stderr, self.original_stderr) + + +class TestEnableAutoPaging(unittest.TestCase): + """Test the enable_auto_paging() function.""" + + def setUp(self): + """Set up test environment.""" + self.original_stdout = sys.stdout + self.original_stderr = sys.stderr + + def tearDown(self): + """Clean up test environment.""" + sys.stdout = self.original_stdout + sys.stderr = self.original_stderr + + @mock.patch("sys.stdout.isatty") + @mock.patch("os.environ.get") + @mock.patch("shutil.which") + @mock.patch("subprocess.Popen") + @mock.patch("os.pipe") + @mock.patch("os.close") + @mock.patch("os.fdopen") + def test_enable_auto_paging_default( + self, + mock_fdopen, + mock_close, + mock_pipe, + mock_popen, + mock_which, + mock_env_get, + mock_isatty, + ): + """Test enable_auto_paging() with default parameters.""" + # Setup mocks + mock_isatty.return_value = True + mock_env_get.return_value = None + mock_which.return_value = "/usr/bin/less" + mock_pipe.return_value = (3, 4) + mock_process = mock.MagicMock() + mock_popen.return_value = mock_process + mock_file = mock.MagicMock() + mock_fdopen.return_value = mock_file + + pager = enable_auto_paging() + + # Should return an AutoPager instance + self.assertIsInstance(pager, AutoPager) + # Should have started paging + self.assertEqual(pager._original_stdout, self.original_stdout) + self.assertEqual(pager._original_stderr, self.original_stderr) + + @mock.patch("sys.stdout.isatty") + @mock.patch("os.environ.get") + @mock.patch("shutil.which") + @mock.patch("subprocess.Popen") + @mock.patch("os.pipe") + @mock.patch("os.close") + @mock.patch("os.fdopen") + def test_enable_auto_paging_custom( + self, + mock_fdopen, + mock_close, + mock_pipe, + mock_popen, + mock_which, + mock_env_get, + mock_isatty, + ): + """Test enable_auto_paging() with custom parameters.""" + # Setup mocks + mock_isatty.return_value = True + mock_env_get.return_value = None + mock_which.return_value = "/usr/bin/more" + mock_pipe.return_value = (3, 4) + mock_process = mock.MagicMock() + mock_popen.return_value = mock_process + mock_file = mock.MagicMock() + mock_fdopen.return_value = mock_file + + pager = enable_auto_paging(pager="more", fallback_to_cat=False) + + # Should return an AutoPager instance with custom parameters + self.assertIsInstance(pager, AutoPager) + self.assertEqual(pager.pager, "more") + self.assertFalse(pager.fallback_to_cat) + + @mock.patch("sys.stdout.isatty") + @mock.patch("os.environ.get") + def test_enable_auto_paging_no_tty(self, mock_env_get, mock_isatty): + """Test enable_auto_paging() when not connected to a TTY.""" + mock_isatty.return_value = False + mock_env_get.return_value = None + + pager = enable_auto_paging() + + # Should return an AutoPager instance but not start paging + self.assertIsInstance(pager, AutoPager) + self.assertIsNone(pager._original_stdout) + self.assertIsNone(pager._original_stderr) + + +class TestAutoPagerLoggingHandlers(unittest.TestCase): + """Test logging handler redirection functionality.""" + + def setUp(self): + """Set up test environment.""" + self.original_stdout = sys.stdout + self.original_stderr = sys.stderr + # Clear any existing handlers + root_logger = logging.getLogger() + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + + def tearDown(self): + """Clean up test environment.""" + sys.stdout = self.original_stdout + sys.stderr = self.original_stderr + # Clear any existing handlers + root_logger = logging.getLogger() + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + + @mock.patch("sys.stdout.isatty") + @mock.patch("os.environ.get") + @mock.patch("shutil.which") + @mock.patch("subprocess.Popen") + @mock.patch("os.pipe") + @mock.patch("os.close") + @mock.patch("os.fdopen") + def test_logging_handler_redirection( + self, + mock_fdopen, + mock_close, + mock_pipe, + mock_popen, + mock_which, + mock_env_get, + mock_isatty, + ): + """Test that logging handlers are properly redirected and restored.""" + # Setup mocks + mock_isatty.return_value = True + mock_env_get.return_value = None + mock_which.return_value = "/usr/bin/less" + mock_pipe.return_value = (3, 4) + mock_process = mock.MagicMock() + mock_process.wait.return_value = 0 + mock_popen.return_value = mock_process + mock_file = mock.MagicMock() + mock_fdopen.return_value = mock_file + + # Create a logging handler that writes to stdout + handler = logging.StreamHandler(sys.stdout) + root_logger = logging.getLogger() + root_logger.addHandler(handler) + + pager = AutoPager() + pager.start() + + # Handler should be redirected to the pager pipe + self.assertEqual(handler.stream, mock_file) + self.assertEqual(len(pager._logging_handlers_original_streams), 1) + + pager.stop() + + # Handler should be restored to original stream + self.assertEqual(handler.stream, self.original_stdout) + self.assertEqual(len(pager._logging_handlers_original_streams), 0)