Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions terra/compute/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ def configure_logger(sender, **kwargs):
os.makedirs(settings.processing_dir, exist_ok=True)

if settings.logging.server.family == "AF_UNIX":
# Test filesystem handles named sockets
socket_dir = Path(settings.logging.server.listen_address).parent
os.makedirs(socket_dir, exist_ok=True)
temp_filename = socket_dir / (
Expand Down Expand Up @@ -334,11 +335,16 @@ def configure_logger(sender, **kwargs):
sender.tcp_logging_server = LogRecordSocketReceiver(
settings.logging.server.listen_address,
settings.logging.server.family)
# Get and store the value of the port used, so the runners/tasks will be
# able to connect
if settings.logging.server.port == 0:
settings.logging.server.port = \
sender.tcp_logging_server.socket.getsockname()[1]

if settings.logging.server.family.startswith("AF_INET"):
# Get and store the value of the port used, so the runners/tasks will
# be able to connect
if settings.logging.server.listen_address[1] == 0:
settings.logging.server.listen_address = (
settings.logging.server.listen_address[0],
sender.tcp_logging_server.socket.getsockname()[1]
)

listener_thread = threading.Thread(
target=sender.tcp_logging_server.serve_until_stopped)
listener_thread.daemon = True
Expand All @@ -364,12 +370,16 @@ def cleanup_thread():
"gracefully. Attempting to exit anyways.",
RuntimeWarning)
elif settings.terra.zone == 'runner':
if settings.logging.server.family == 'AF_UNIX':
if settings.logging.server.family in ('AF_UNIX', 'AF_PIPE'):
sender.main_log_handler = SocketHandler(
settings.logging.server.listen_address, None)
else:
elif settings.logging.server.family in ('AF_INET', 'AF_INET6'):
sender.main_log_handler = SocketHandler(
settings.logging.server.hostname, settings.logging.server.port)
settings.logging.server.hostname,
settings.logging.server.listen_address[0])
else:
raise Exception(
f"Server family {settings.logging.server.family} not supported")
# All runners have access to the master controller's stderr by virtue of
# running on the same host. By default, we go ahead and let them log
# there. Consequently, there is no need for the master controller to echo
Expand Down Expand Up @@ -401,9 +411,12 @@ def reconfigure_logger(sender, **kwargs):
sender._log_file.close()
sender._log_file = open(log_file, 'a')
elif settings.terra.zone == 'runner':
# Only if it's changed
if settings.logging.server.hostname != sender.main_log_handler.host or \
settings.logging.server.port != sender.main_log_handler.port:
# Only if it's changed (shared worker across multiple terra runs support)
# Primarily this is only celery which is only going to work via TCP
if settings.logging.server.family.startswith('INET') and (
settings.logging.server.hostname != sender.main_log_handler.host
or settings.logging.server.listen_address[1]
!= sender.main_log_handler.port):
# Reconnect Socket Handler
sender.main_log_handler.close()
try:
Expand All @@ -412,7 +425,8 @@ def reconfigure_logger(sender, **kwargs):
pass

sender.main_log_handler = SocketHandler(
settings.logging.server.hostname, settings.logging.server.port)
settings.logging.server.hostname,
settings.logging.server.listen_address[1])
sender.root_logger.addHandler(sender.main_log_handler)


Expand Down
24 changes: 15 additions & 9 deletions terra/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,20 +385,26 @@ def logging_listen_address(self):
logger will listen. In some environments this may need to be overridden
(e.g., ``0.0.0.0``) to ensure appropriate capture of service & task logs.
'''
if platform.system() == "Windows":
# Python still doesn't support named BSD sockets, even though Windows has
# since 2019: https://github.com/python/cpython/issues/77589
return (self.logging.server.listen_host, self.logging.server.port)
else:
return str(Path(self.processing_dir) / (
".terra_log_" + self.terra.uuid + ".sock"))
match self.logging.server.family:
case 'AF_INET' | 'AF_INET6':
return (self.logging.server.listen_host, self.logging.server.port)
case 'AF_UNIX':
return str(Path(self.processing_dir) / (
".terra_log_" + self.terra.uuid + ".sock"))
case 'AF_PIPE':
return f'\\\\.\\pipe\\terra-log-{self.terra.uuid}'
case _:
raise ValueError(
f'Unknown logging.server.family {self.logging.server.family}')


@settings_property
def logging_family(self):
if isinstance(self.logging.server.listen_address, str):
if platform.system() == "Windows":
# return 'AF_PIPE'
return 'AF_INET'
else:
return 'AF_UNIX'
return 'AF_INET'


@settings_property
Expand Down
2 changes: 1 addition & 1 deletion terra/executor/celery/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def reconfigure_logger(sender, pre_run_task=False,
pass
sender.main_log_handler = SocketHandler(
settings.logging.server.hostname,
settings.logging.server.port)
settings.logging.server.listen_address[1])
sender.root_logger.addHandler(sender.main_log_handler)
if post_settings_context:
# when the celery task is done, its logger is automatically
Expand Down
5 changes: 3 additions & 2 deletions terra/tests/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ def test_double_configure(self):
self._logs.configure_logger(None)

def test_port_0(self):
settings.configure({'logging': {'server': {'port': 0}},
settings.configure({'logging': {'server': {'port': 0,
'family': 'AF_INET'}},
'processing_dir': self.temp_dir.name})
self.assertEqual(settings.logging.server.port, 67890)
self.assertEqual(settings.logging.server.listen_address[1], 67890)

def test_temp_file_cleanup(self):
tmp_file = self._logs.tmp_file.name
Expand Down