diff --git a/external/vsi_common b/external/vsi_common index e4366026..d230000f 160000 --- a/external/vsi_common +++ b/external/vsi_common @@ -1 +1 @@ -Subproject commit e43660266069db234e17f1fb767d052ca779b78b +Subproject commit d230000f75bd4ae535373246448f2f521ce6ae1a diff --git a/terra/compute/base.py b/terra/compute/base.py index f5ca6b53..d4ae5c2f 100644 --- a/terra/compute/base.py +++ b/terra/compute/base.py @@ -304,6 +304,7 @@ def configure_logger(sender, **kwargs): os.makedirs(settings.processing_dir, exist_ok=True) if settings.logging.server.family == "AF_UNIX": + # Test filesystem handles named sockets socket_dir = Path(settings.logging.server.listen_address).parent os.makedirs(socket_dir, exist_ok=True) temp_filename = socket_dir / ( @@ -334,11 +335,16 @@ def configure_logger(sender, **kwargs): sender.tcp_logging_server = LogRecordSocketReceiver( settings.logging.server.listen_address, settings.logging.server.family) - # Get and store the value of the port used, so the runners/tasks will be - # able to connect - if settings.logging.server.port == 0: - settings.logging.server.port = \ - sender.tcp_logging_server.socket.getsockname()[1] + + if settings.logging.server.family.startswith("AF_INET"): + # Get and store the value of the port used, so the runners/tasks will + # be able to connect + if settings.logging.server.listen_address[1] == 0: + settings.logging.server.listen_address = ( + settings.logging.server.listen_address[0], + sender.tcp_logging_server.socket.getsockname()[1] + ) + listener_thread = threading.Thread( target=sender.tcp_logging_server.serve_until_stopped) listener_thread.daemon = True @@ -364,12 +370,16 @@ def cleanup_thread(): "gracefully. Attempting to exit anyways.", RuntimeWarning) elif settings.terra.zone == 'runner': - if settings.logging.server.family == 'AF_UNIX': + if settings.logging.server.family in ('AF_UNIX', 'AF_PIPE'): sender.main_log_handler = SocketHandler( settings.logging.server.listen_address, None) - else: + elif settings.logging.server.family in ('AF_INET', 'AF_INET6'): sender.main_log_handler = SocketHandler( - settings.logging.server.hostname, settings.logging.server.port) + settings.logging.server.hostname, + settings.logging.server.listen_address[0]) + else: + raise Exception( + f"Server family {settings.logging.server.family} not supported") # All runners have access to the master controller's stderr by virtue of # running on the same host. By default, we go ahead and let them log # there. Consequently, there is no need for the master controller to echo @@ -401,9 +411,12 @@ def reconfigure_logger(sender, **kwargs): sender._log_file.close() sender._log_file = open(log_file, 'a') elif settings.terra.zone == 'runner': - # Only if it's changed - if settings.logging.server.hostname != sender.main_log_handler.host or \ - settings.logging.server.port != sender.main_log_handler.port: + # Only if it's changed (shared worker across multiple terra runs support) + # Primarily this is only celery which is only going to work via TCP + if settings.logging.server.family.startswith('INET') and ( + settings.logging.server.hostname != sender.main_log_handler.host + or settings.logging.server.listen_address[1] + != sender.main_log_handler.port): # Reconnect Socket Handler sender.main_log_handler.close() try: @@ -412,7 +425,8 @@ def reconfigure_logger(sender, **kwargs): pass sender.main_log_handler = SocketHandler( - settings.logging.server.hostname, settings.logging.server.port) + settings.logging.server.hostname, + settings.logging.server.listen_address[1]) sender.root_logger.addHandler(sender.main_log_handler) diff --git a/terra/core/settings.py b/terra/core/settings.py index 09d85f87..b77190eb 100644 --- a/terra/core/settings.py +++ b/terra/core/settings.py @@ -385,20 +385,26 @@ def logging_listen_address(self): logger will listen. In some environments this may need to be overridden (e.g., ``0.0.0.0``) to ensure appropriate capture of service & task logs. ''' - if platform.system() == "Windows": - # Python still doesn't support named BSD sockets, even though Windows has - # since 2019: https://github.com/python/cpython/issues/77589 - return (self.logging.server.listen_host, self.logging.server.port) - else: - return str(Path(self.processing_dir) / ( - ".terra_log_" + self.terra.uuid + ".sock")) + match self.logging.server.family: + case 'AF_INET' | 'AF_INET6': + return (self.logging.server.listen_host, self.logging.server.port) + case 'AF_UNIX': + return str(Path(self.processing_dir) / ( + ".terra_log_" + self.terra.uuid + ".sock")) + case 'AF_PIPE': + return f'\\\\.\\pipe\\terra-log-{self.terra.uuid}' + case _: + raise ValueError( + f'Unknown logging.server.family {self.logging.server.family}') @settings_property def logging_family(self): - if isinstance(self.logging.server.listen_address, str): + if platform.system() == "Windows": + # return 'AF_PIPE' + return 'AF_INET' + else: return 'AF_UNIX' - return 'AF_INET' @settings_property diff --git a/terra/executor/celery/executor.py b/terra/executor/celery/executor.py index f781d4f2..6c88450a 100644 --- a/terra/executor/celery/executor.py +++ b/terra/executor/celery/executor.py @@ -278,7 +278,7 @@ def reconfigure_logger(sender, pre_run_task=False, pass sender.main_log_handler = SocketHandler( settings.logging.server.hostname, - settings.logging.server.port) + settings.logging.server.listen_address[1]) sender.root_logger.addHandler(sender.main_log_handler) if post_settings_context: # when the celery task is done, its logger is automatically diff --git a/terra/tests/test_logger.py b/terra/tests/test_logger.py index 41e12e79..65c94c6f 100644 --- a/terra/tests/test_logger.py +++ b/terra/tests/test_logger.py @@ -51,9 +51,10 @@ def test_double_configure(self): self._logs.configure_logger(None) def test_port_0(self): - settings.configure({'logging': {'server': {'port': 0}}, + settings.configure({'logging': {'server': {'port': 0, + 'family': 'AF_INET'}}, 'processing_dir': self.temp_dir.name}) - self.assertEqual(settings.logging.server.port, 67890) + self.assertEqual(settings.logging.server.listen_address[1], 67890) def test_temp_file_cleanup(self): tmp_file = self._logs.tmp_file.name