From 251e3a92734c2e387f1b3568b1f90d041686dda2 Mon Sep 17 00:00:00 2001 From: Andy Neff Date: Sun, 7 Dec 2025 13:38:28 -0500 Subject: [PATCH 1/3] Remove all referenece to listen_server.port Signed-off-by: Andy Neff --- terra/compute/base.py | 36 ++++++++++++++++++++----------- terra/executor/celery/executor.py | 2 +- terra/tests/test_logger.py | 2 +- 3 files changed, 26 insertions(+), 14 deletions(-) diff --git a/terra/compute/base.py b/terra/compute/base.py index f5ca6b53..7e5005e0 100644 --- a/terra/compute/base.py +++ b/terra/compute/base.py @@ -304,6 +304,7 @@ def configure_logger(sender, **kwargs): os.makedirs(settings.processing_dir, exist_ok=True) if settings.logging.server.family == "AF_UNIX": + # Test filesystem handles named sockets socket_dir = Path(settings.logging.server.listen_address).parent os.makedirs(socket_dir, exist_ok=True) temp_filename = socket_dir / ( @@ -334,11 +335,16 @@ def configure_logger(sender, **kwargs): sender.tcp_logging_server = LogRecordSocketReceiver( settings.logging.server.listen_address, settings.logging.server.family) - # Get and store the value of the port used, so the runners/tasks will be - # able to connect - if settings.logging.server.port == 0: - settings.logging.server.port = \ - sender.tcp_logging_server.socket.getsockname()[1] + + if settings.logging.server.family.startswith("AF_INET"): + # Get and store the value of the port used, so the runners/tasks will be + # able to connect + if settings.logging.server.listen_address[1] == 0: + settings.logging.server.listen_address = ( + settings.logging.server.listen_address[0], + sender.tcp_logging_server.socket.getsockname()[1] + ) + listener_thread = threading.Thread( target=sender.tcp_logging_server.serve_until_stopped) listener_thread.daemon = True @@ -364,12 +370,14 @@ def cleanup_thread(): "gracefully. Attempting to exit anyways.", RuntimeWarning) elif settings.terra.zone == 'runner': - if settings.logging.server.family == 'AF_UNIX': + if settings.logging.server.family in ('AF_UNIX', 'AF_PIPE'): sender.main_log_handler = SocketHandler( settings.logging.server.listen_address, None) - else: + elif settings.logging.server.family in ('AF_INET', 'AF_INET5'): sender.main_log_handler = SocketHandler( - settings.logging.server.hostname, settings.logging.server.port) + settings.logging.server.hostname, settings.logging.server.listen_address[0]) + else: + raise Exception(f"Server family {settings.logging.server.family} not supported") # All runners have access to the master controller's stderr by virtue of # running on the same host. By default, we go ahead and let them log # there. Consequently, there is no need for the master controller to echo @@ -401,9 +409,12 @@ def reconfigure_logger(sender, **kwargs): sender._log_file.close() sender._log_file = open(log_file, 'a') elif settings.terra.zone == 'runner': - # Only if it's changed - if settings.logging.server.hostname != sender.main_log_handler.host or \ - settings.logging.server.port != sender.main_log_handler.port: + # Only if it's changed (shared worker across multiple terra runs support) + # Primarily this is only celery which is only going to work via TCP + if settings.logging.server.family.startswith('INET') and ( + settings.logging.server.hostname != sender.main_log_handler.host or + settings.logging.server.listen_address[1] != sender.main_log_handler.port + ): # Reconnect Socket Handler sender.main_log_handler.close() try: @@ -412,7 +423,8 @@ def reconfigure_logger(sender, **kwargs): pass sender.main_log_handler = SocketHandler( - settings.logging.server.hostname, settings.logging.server.port) + settings.logging.server.hostname, + settings.logging.server.listen_address[1]) sender.root_logger.addHandler(sender.main_log_handler) diff --git a/terra/executor/celery/executor.py b/terra/executor/celery/executor.py index f781d4f2..6c88450a 100644 --- a/terra/executor/celery/executor.py +++ b/terra/executor/celery/executor.py @@ -278,7 +278,7 @@ def reconfigure_logger(sender, pre_run_task=False, pass sender.main_log_handler = SocketHandler( settings.logging.server.hostname, - settings.logging.server.port) + settings.logging.server.listen_address[1]) sender.root_logger.addHandler(sender.main_log_handler) if post_settings_context: # when the celery task is done, its logger is automatically diff --git a/terra/tests/test_logger.py b/terra/tests/test_logger.py index 41e12e79..3a67cfa7 100644 --- a/terra/tests/test_logger.py +++ b/terra/tests/test_logger.py @@ -53,7 +53,7 @@ def test_double_configure(self): def test_port_0(self): settings.configure({'logging': {'server': {'port': 0}}, 'processing_dir': self.temp_dir.name}) - self.assertEqual(settings.logging.server.port, 67890) + self.assertEqual(settings.logging.server.listen_address[1], 67890) def test_temp_file_cleanup(self): tmp_file = self._logs.tmp_file.name From 5b36b26372cb2ca08b560ad08241aef0e39b42d0 Mon Sep 17 00:00:00 2001 From: Andy Neff Date: Tue, 30 Dec 2025 12:42:20 -0500 Subject: [PATCH 2/3] Fix CI Signed-off-by: Andy Neff --- external/vsi_common | 2 +- terra/core/settings.py | 23 ++++++++++++++--------- terra/tests/test_logger.py | 2 +- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/external/vsi_common b/external/vsi_common index 19a35a71..d230000f 160000 --- a/external/vsi_common +++ b/external/vsi_common @@ -1 +1 @@ -Subproject commit 19a35a71164135ea8c23a24f347b0ce9bd1246a6 +Subproject commit d230000f75bd4ae535373246448f2f521ce6ae1a diff --git a/terra/core/settings.py b/terra/core/settings.py index abb7e7e9..16cff924 100644 --- a/terra/core/settings.py +++ b/terra/core/settings.py @@ -384,20 +384,25 @@ def logging_listen_address(self): logger will listen. In some environments this may need to be overridden (e.g., ``0.0.0.0``) to ensure appropriate capture of service & task logs. ''' - if platform.system() == "Windows": - # Python still doesn't support named BSD sockets, even though Windows has - # since 2019: https://github.com/python/cpython/issues/77589 - return (self.logging.server.listen_host, self.logging.server.port) - else: - return str(Path(self.processing_dir) / ( - ".terra_log_" + self.terra.uuid + ".sock")) + match self.logging.server.family: + case 'AF_INET' | 'AF_INET6': + return (self.logging.server.listen_host, self.logging.server.port) + case 'AF_UNIX': + return str(Path(self.processing_dir) / ( + ".terra_log_" + self.terra.uuid + ".sock")) + case 'AF_PIPE': + return f'\\\\.\\pipe\\terra-log-{self.terra.uuid}' + case _: + raise ValueError(f'Unknown logging.server.family {self.logging.server.family}') @settings_property def logging_family(self): - if isinstance(self.logging.server.listen_address, str): + if platform.system() == "Windows": + # return 'AF_PIPE' + return 'AF_INET' + else: return 'AF_UNIX' - return 'AF_INET' @settings_property diff --git a/terra/tests/test_logger.py b/terra/tests/test_logger.py index 3a67cfa7..d27839a6 100644 --- a/terra/tests/test_logger.py +++ b/terra/tests/test_logger.py @@ -51,7 +51,7 @@ def test_double_configure(self): self._logs.configure_logger(None) def test_port_0(self): - settings.configure({'logging': {'server': {'port': 0}}, + settings.configure({'logging': {'server': {'port': 0, 'family': 'AF_INET'}}, 'processing_dir': self.temp_dir.name}) self.assertEqual(settings.logging.server.listen_address[1], 67890) From 625ef563bbad93bbedb945bea354eee4324e8717 Mon Sep 17 00:00:00 2001 From: Andy Neff Date: Tue, 30 Dec 2025 12:54:46 -0500 Subject: [PATCH 3/3] pep8 Signed-off-by: Andy Neff --- terra/compute/base.py | 18 ++++++++++-------- terra/core/settings.py | 3 ++- terra/tests/test_logger.py | 3 ++- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/terra/compute/base.py b/terra/compute/base.py index 7e5005e0..d4ae5c2f 100644 --- a/terra/compute/base.py +++ b/terra/compute/base.py @@ -337,8 +337,8 @@ def configure_logger(sender, **kwargs): settings.logging.server.family) if settings.logging.server.family.startswith("AF_INET"): - # Get and store the value of the port used, so the runners/tasks will be - # able to connect + # Get and store the value of the port used, so the runners/tasks will + # be able to connect if settings.logging.server.listen_address[1] == 0: settings.logging.server.listen_address = ( settings.logging.server.listen_address[0], @@ -373,11 +373,13 @@ def cleanup_thread(): if settings.logging.server.family in ('AF_UNIX', 'AF_PIPE'): sender.main_log_handler = SocketHandler( settings.logging.server.listen_address, None) - elif settings.logging.server.family in ('AF_INET', 'AF_INET5'): + elif settings.logging.server.family in ('AF_INET', 'AF_INET6'): sender.main_log_handler = SocketHandler( - settings.logging.server.hostname, settings.logging.server.listen_address[0]) + settings.logging.server.hostname, + settings.logging.server.listen_address[0]) else: - raise Exception(f"Server family {settings.logging.server.family} not supported") + raise Exception( + f"Server family {settings.logging.server.family} not supported") # All runners have access to the master controller's stderr by virtue of # running on the same host. By default, we go ahead and let them log # there. Consequently, there is no need for the master controller to echo @@ -412,9 +414,9 @@ def reconfigure_logger(sender, **kwargs): # Only if it's changed (shared worker across multiple terra runs support) # Primarily this is only celery which is only going to work via TCP if settings.logging.server.family.startswith('INET') and ( - settings.logging.server.hostname != sender.main_log_handler.host or - settings.logging.server.listen_address[1] != sender.main_log_handler.port - ): + settings.logging.server.hostname != sender.main_log_handler.host + or settings.logging.server.listen_address[1] + != sender.main_log_handler.port): # Reconnect Socket Handler sender.main_log_handler.close() try: diff --git a/terra/core/settings.py b/terra/core/settings.py index 16cff924..ffb6568b 100644 --- a/terra/core/settings.py +++ b/terra/core/settings.py @@ -393,7 +393,8 @@ def logging_listen_address(self): case 'AF_PIPE': return f'\\\\.\\pipe\\terra-log-{self.terra.uuid}' case _: - raise ValueError(f'Unknown logging.server.family {self.logging.server.family}') + raise ValueError( + f'Unknown logging.server.family {self.logging.server.family}') @settings_property diff --git a/terra/tests/test_logger.py b/terra/tests/test_logger.py index d27839a6..65c94c6f 100644 --- a/terra/tests/test_logger.py +++ b/terra/tests/test_logger.py @@ -51,7 +51,8 @@ def test_double_configure(self): self._logs.configure_logger(None) def test_port_0(self): - settings.configure({'logging': {'server': {'port': 0, 'family': 'AF_INET'}}, + settings.configure({'logging': {'server': {'port': 0, + 'family': 'AF_INET'}}, 'processing_dir': self.temp_dir.name}) self.assertEqual(settings.logging.server.listen_address[1], 67890)