diff --git a/salt/__init__.py b/salt/__init__.py index 4468a442d459..9330e1ed8684 100644 --- a/salt/__init__.py +++ b/salt/__init__.py @@ -9,6 +9,8 @@ import sys import warnings +import salt.utils.versions + # Aweful hack to keep salt-ssh tests passing with tornado >=6.4.2. Salt ssh # needs to be transitioned to use a relenv environemnt by default. This should # be removed when salt-ssh uses relenv or we no longer want salt-ssh to support @@ -77,13 +79,25 @@ def exec_module(self, module): # never show up ) -# Filter the backports package UserWarning about being re-imported -warnings.filterwarnings( - "ignore", - "^Module backports was already imported from (.*), but (.*) is being added to sys.path$", - UserWarning, - append=True, -) +def _setup_backports_compat(): + """ + Ensure the optional backports namespace exists for legacy imports. + """ + try: + salt.utils.versions.ensure_backports_compat() + except Exception: # pylint: disable=broad-except + return False + return "backports" in sys.modules + + +if _setup_backports_compat(): + # Filter the backports package UserWarning about being re-imported + warnings.filterwarnings( + "ignore", + "^Module backports was already imported from (.*), but (.*) is being added to sys.path$", + UserWarning, + append=True, + ) # Filter the setuptools UserWarning until we stop relying on distutils warnings.filterwarnings( diff --git a/salt/crypt.py b/salt/crypt.py index 71585de68bab..4649e227df5c 100644 --- a/salt/crypt.py +++ b/salt/crypt.py @@ -38,6 +38,7 @@ import salt.utils.rsax931 import salt.utils.sdb import salt.utils.stringutils +import salt.utils.minion import salt.utils.user import salt.utils.verify import salt.version @@ -1420,17 +1421,24 @@ def verify_master(self, payload, master_pub=True): :return: An empty string on verification failure. On success, the decrypted AES message in the payload. """ m_pub_fn = os.path.join(self.opts["pki_dir"], self.mpub) - m_pub_exists = os.path.isfile(m_pub_fn) + local_master_pub = salt.utils.minion.read_master_pubkey(m_pub_fn) + if local_master_pub is None: + log.error("Unable to read master pubkey at %s", m_pub_fn) + return "" + payload_master_pub = salt.utils.minion.normalize_master_pubkey( + payload.get("pub_key") + ) + m_pub_exists = bool(local_master_pub) if m_pub_exists and master_pub and not self.opts["open_mode"]: - with salt.utils.files.fopen(m_pub_fn) as fp_: - local_master_pub = clean_key(fp_.read()) - - if payload["pub_key"] != local_master_pub: + if payload_master_pub != local_master_pub: if not self.check_auth_deps(payload): return "" if self.opts["verify_master_pubkey_sign"]: if self.verify_signing_master(payload): + salt.utils.minion.write_master_pubkey( + m_pub_fn, payload.get("pub_key", payload_master_pub) + ) return self.extract_aes(payload, master_pub=False) else: return "" @@ -1476,8 +1484,9 @@ def verify_master(self, payload, master_pub=True): if not m_pub_exists: # the minion has not received any masters pubkey yet, write # the newly received pubkey to minion_master.pub - with salt.utils.files.fopen(m_pub_fn, "wb+") as fp_: - fp_.write(salt.utils.stringutils.to_bytes(payload["pub_key"])) + salt.utils.minion.write_master_pubkey( + m_pub_fn, payload.get("pub_key", payload_master_pub) + ) return self.extract_aes(payload, master_pub=False) def _finger_fail(self, finger, master_key): diff --git a/salt/loader/__init__.py b/salt/loader/__init__.py index 0d32ff740d43..464a938ccda9 100644 --- a/salt/loader/__init__.py +++ b/salt/loader/__init__.py @@ -34,6 +34,20 @@ log = logging.getLogger(__name__) + +_BACKPORTS_COMPAT_READY = False + + +def _ensure_backports_compat(): + global _BACKPORTS_COMPAT_READY + if _BACKPORTS_COMPAT_READY: + return + try: + salt.utils.versions.ensure_backports_compat() + except Exception: # pylint: disable=broad-except + log.debug("Failed to set up backports compatibility", exc_info=True) + _BACKPORTS_COMPAT_READY = True + # Because on the cloud drivers we do `from salt.cloud.libcloudfuncs import *` # which simplifies code readability, it adds some unsupported functions into # the driver's module scope. @@ -127,6 +141,7 @@ def _module_dirs( base_path=None, load_extensions=True, ): + _ensure_backports_compat() if tag is None: tag = ext_type sys_types = [os.path.join(base_path or str(SALT_BASE_PATH), int_type or ext_type)] diff --git a/salt/minion.py b/salt/minion.py index aa647beac341..b122e77b7c3a 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -519,6 +519,44 @@ def process_beacons(self, functions): ) # pylint: disable=no-member return [] + @staticmethod + def _should_retry_connection_exception(exc): + """ + Decide if a connection-related exception should trigger retry logic. + """ + return salt.transport.is_retryable_connection_error(exc) + + @staticmethod + def _next_auth_wait(auth_wait, auth_wait_step, max_auth_wait): + """ + Increase auth wait time up to the configured maximum. + """ + if auth_wait < max_auth_wait: + return min(auth_wait + auth_wait_step, max_auth_wait) + return auth_wait + + def _handle_invalid_master_key(self, exc, opts=None): + """ + Provide actionable logging for invalid master key errors. + """ + if not salt.utils.minion.is_invalid_master_key_error(exc): + return False + target_opts = opts or self.opts + master_key_path = salt.utils.minion.master_pubkey_path(target_opts) + refreshed = salt.utils.minion.refresh_master_pubkey_if_invalid(target_opts) + if refreshed: + log.warning( + "Removed empty or invalid master key at %s; retrying authentication", + master_key_path, + ) + else: + log.error( + "Invalid master key detected. Verify the master key at %s. If the " + "master key rotated, remove this file to re-authenticate.", + master_key_path, + ) + return True + async def eval_master( self, opts, timeout=60, safe=True, failed=False, failback=False ): @@ -744,21 +782,38 @@ async def eval_master( await pub_channel.connect() conn = True break - except SaltClientError as exc: - last_exc = exc - if exc.strerror.startswith("Could not access"): - log.info( - "Failed to initiate connection with Master %s: check" - " ownership/permissions. Error message: %s", - opts["master"], - exc, + except Exception as exc: # pylint: disable=broad-except + if isinstance(exc, SaltClientError): + last_exc = exc + if exc.strerror.startswith("Could not access"): + log.info( + "Failed to initiate connection with Master %s: check" + " ownership/permissions. Error message: %s", + opts["master"], + exc, + ) + else: + log.info( + "Master %s could not be reached, trying next master" + " (if any)", + opts["master"], + ) + elif self._should_retry_connection_exception(exc): + last_exc = SaltClientError( + "Transient transport error while connecting to master " + f"{opts['master']}: " + f"{salt.transport.format_connection_error(exc)}" ) - else: - log.info( - "Master %s could not be reached, trying next master (if" - " any)", + log.warning( + "Master %s had a transient transport error (%s), " + "trying next master (if any)", opts["master"], + salt.transport.format_connection_error(exc), ) + else: + if pub_channel: + pub_channel.close() + raise pub_channel.close() pub_channel = None continue @@ -790,6 +845,7 @@ async def eval_master( " Ignoring." ) pub_channel = None + last_exc = None while True: if attempts != 0: # Give up a little time between connection attempts @@ -827,12 +883,32 @@ async def eval_master( self.tok = pub_channel.auth.gen_token(b"salt") self.connected = True return (opts["master"], pub_channel) - except SaltClientError: + except Exception as exc: # pylint: disable=broad-except + retryable = isinstance(exc, SaltClientError) or self._should_retry_connection_exception(exc) + if not retryable: + if pub_channel: + pub_channel.close() + raise + if isinstance(exc, SaltClientError): + last_exc = exc + else: + last_exc = SaltClientError( + "Transient transport error while connecting to master " + f"{opts['master']}: " + f"{salt.transport.format_connection_error(exc)}" + ) + log.warning( + "Transient transport error while connecting to master %s: %s", + opts["master"], + salt.transport.format_connection_error(exc), + ) if pub_channel: pub_channel.close() if attempts == tries: # Exhausted all attempts. Return exception. self.connected = False + if last_exc: + raise last_exc raise def _discover_masters(self): @@ -1161,6 +1237,7 @@ async def _connect_minion(self, minion): except SaltClientError as exc: minion.destroy() failed = True + minion._handle_invalid_master_key(exc) log.error( "Error while bringing up minion for multi-master. Is " "master at %s responding? The error message was %s", @@ -1169,8 +1246,9 @@ async def _connect_minion(self, minion): exc_info=True, ) last = time.time() - if auth_wait < self.max_auth_wait: - auth_wait += self.auth_wait + auth_wait = minion._next_auth_wait( + auth_wait, self.auth_wait, self.max_auth_wait + ) await asyncio.sleep(auth_wait) except SaltMasterUnresolvableError: minion.destroy() @@ -1182,9 +1260,24 @@ async def _connect_minion(self, minion): ) log.error(err) break - except Exception as e: # pylint: disable=broad-except + except Exception as exc: # pylint: disable=broad-except minion.destroy() failed = True + if minion._should_retry_connection_exception(exc): + log.warning( + "Transient transport error while connecting to %s; " + "retrying in %s seconds: %s", + minion.opts["master"], + auth_wait, + salt.transport.format_connection_error(exc), + exc_info=True, + ) + last = time.time() + auth_wait = minion._next_auth_wait( + auth_wait, self.auth_wait, self.max_auth_wait + ) + await asyncio.sleep(auth_wait) + continue log.critical( "Unexpected error while connecting to %s", minion.opts["master"], @@ -1440,8 +1533,20 @@ async def connect_master(self, failed=False): self.opts, io_loop=self.io_loop ) log.debug("Connecting minion's long-running req channel") - await self.req_channel.connect() - await self._post_master_init(master) + try: + await self.req_channel.connect() + await self._post_master_init(master) + except Exception as exc: # pylint: disable=broad-except + if self.req_channel: + self.req_channel.close() + self.req_channel = None + if self._should_retry_connection_exception(exc): + raise SaltClientError( + "Transient transport error while connecting minion request channel" + f" to master {self.opts.get('master')}: " + f"{salt.transport.format_connection_error(exc)}" + ) from exc + raise async def handle_payload(self, payload, reply_func): self.payloads.append(payload) @@ -3710,24 +3815,41 @@ async def _connect_syndic(self, opts): break except SaltClientError as exc: failed = True + self._handle_invalid_master_key(exc, opts=opts) log.error( "Error while bringing up syndic for multi-syndic. Is the " "master at %s responding?", opts["master"], ) last = time.time() - if auth_wait < self.max_auth_wait: - auth_wait += self.auth_wait + auth_wait = self._next_auth_wait( + auth_wait, self.auth_wait, self.max_auth_wait + ) await asyncio.sleep(auth_wait) # TODO: log? except (KeyboardInterrupt, SystemExit): # pylint: disable=try-except-raise raise - except Exception: # pylint: disable=broad-except + except Exception as exc: # pylint: disable=broad-except failed = True + if self._should_retry_connection_exception(exc): + log.warning( + "Transient transport error while connecting syndic to %s; " + "retrying in %s seconds: %s", + opts["master"], + auth_wait, + salt.transport.format_connection_error(exc), + exc_info=True, + ) + auth_wait = self._next_auth_wait( + auth_wait, self.auth_wait, self.max_auth_wait + ) + await asyncio.sleep(auth_wait) + continue log.critical( "Unexpected error while connecting to %s", opts["master"], exc_info=True, ) + break return syndic diff --git a/salt/transport/__init__.py b/salt/transport/__init__.py index 8795cb9d52af..4f0906567f17 100644 --- a/salt/transport/__init__.py +++ b/salt/transport/__init__.py @@ -2,9 +2,11 @@ Encapsulate the different transports available to Salt. """ +import asyncio import logging import warnings +import salt.utils.zeromq from salt.transport.base import ( TRANSPORTS, ipc_publish_client, @@ -23,10 +25,31 @@ "ignore", message="IOLoop.current expected instance.*", category=RuntimeWarning ) + +def is_retryable_connection_error(exc): + """ + Return ``True`` when transport setup failed due to a transient error. + """ + if isinstance(exc, asyncio.TimeoutError): + return True + return salt.utils.zeromq.is_retryable_connection_error(exc) + + +def format_connection_error(exc): + """ + Return a short, consistent error string for transport failures. + """ + if isinstance(exc, asyncio.TimeoutError): + return "transport timeout while connecting to master" + return salt.utils.zeromq.format_connection_error(exc) + + __all__ = ( "TRANSPORTS", + "format_connection_error", "ipc_publish_client", "ipc_publish_server", + "is_retryable_connection_error", "publish_client", "publish_server", "request_client", diff --git a/salt/utils/minion.py b/salt/utils/minion.py index f4d8829d7b60..4eaa66db2f0a 100644 --- a/salt/utils/minion.py +++ b/salt/utils/minion.py @@ -10,10 +10,106 @@ import salt.utils.files import salt.utils.platform import salt.utils.process +import salt.utils.stringutils log = logging.getLogger(__name__) +def normalize_master_pubkey(key): + """ + Normalize master public key values into a comparable string. + """ + if key is None: + return "" + try: + key_str = salt.utils.stringutils.to_str(key) + except Exception: # pylint: disable=broad-except + log.debug("Failed to decode master pubkey content", exc_info=True) + return "" + key_str = key_str.strip() + if not key_str: + return "" + return "\n".join(key_str.splitlines()) + + +def read_master_pubkey(path): + """ + Read and normalize the master public key from disk. + """ + try: + with salt.utils.files.fopen(path, "rb") as fp_: + data = fp_.read() + except FileNotFoundError: + return "" + except OSError as exc: + log.debug("Unable to read master pubkey at %s: %s", path, exc) + return None + normalized = normalize_master_pubkey(data) + if not normalized: + log.debug("Master pubkey at %s is empty or invalid", path) + return normalized + + +def write_master_pubkey(path, key): + """ + Write a normalized master public key to disk. + """ + normalized = normalize_master_pubkey(key) + if not normalized: + log.warning("Refusing to write empty master pubkey to %s", path) + return False + try: + with salt.utils.files.fopen(path, "wb+") as fp_: + fp_.write(salt.utils.stringutils.to_bytes(normalized)) + return True + except OSError as exc: + log.warning("Unable to write master pubkey to %s: %s", path, exc) + return False + + +def master_pubkey_path(opts): + """ + Determine the correct master pubkey path for the minion role. + """ + if opts.get("__role") == "syndic": + name = "syndic_master.pub" + elif opts.get("alert_master"): + name = "monitor_master.pub" + else: + name = "minion_master.pub" + return os.path.join(opts["pki_dir"], name) + + +def refresh_master_pubkey_if_invalid(opts): + """ + Remove an empty/invalid master pubkey so it can be re-fetched. + """ + path = master_pubkey_path(opts) + if not os.path.exists(path): + return False + current = read_master_pubkey(path) + if current is None or current: + return False + try: + os.remove(path) + except OSError as exc: + log.debug("Unable to remove invalid master pubkey at %s: %s", path, exc) + return False + log.warning("Removed invalid master pubkey at %s; will re-authenticate", path) + return True + + +def is_invalid_master_key_error(exc): + """ + Return True if the exception message indicates an invalid master key. + """ + try: + msg = salt.utils.stringutils.to_str(exc) + except Exception: # pylint: disable=broad-except + return False + return "Invalid master key" in msg + + def running(opts): """ Return the running jobs on this minion diff --git a/salt/utils/versions.py b/salt/utils/versions.py index d5074e0c27da..6f27a7b94bba 100644 --- a/salt/utils/versions.py +++ b/salt/utils/versions.py @@ -8,11 +8,13 @@ """ import datetime +import importlib.util import inspect import logging import numbers import os import sys +import types import warnings import looseversion @@ -23,6 +25,47 @@ log = logging.getLogger(__name__) +def _resolve_backports_ssl_match_hostname(): + """ + Return a module-like object that provides match_hostname and CertificateError. + """ + try: + import ssl + + if hasattr(ssl, "match_hostname") and hasattr(ssl, "CertificateError"): + module = types.ModuleType("backports.ssl_match_hostname") + module.match_hostname = ssl.match_hostname + module.CertificateError = ssl.CertificateError + return module + except Exception: # pylint: disable=broad-except + pass + try: + from salt.ext import ssl_match_hostname as salt_ssl_match_hostname + + return salt_ssl_match_hostname + except Exception: # pylint: disable=broad-except + return None + + +def ensure_backports_compat(): + """ + Ensure the optional 'backports' namespace package is importable. + """ + if "backports" in sys.modules: + return False + if importlib.util.find_spec("backports") is not None: + return False + backports_pkg = types.ModuleType("backports") + backports_pkg.__path__ = [] + sys.modules["backports"] = backports_pkg + ssl_match = _resolve_backports_ssl_match_hostname() + if ssl_match is not None: + sys.modules.setdefault("backports.ssl_match_hostname", ssl_match) + setattr(backports_pkg, "ssl_match_hostname", ssl_match) + log.debug("Injected backports namespace stubs for compatibility") + return True + + class Version(packaging.version.Version): def __lt__(self, other): if isinstance(other, str): diff --git a/salt/utils/zeromq.py b/salt/utils/zeromq.py index 080256338505..a103a3e86fcc 100644 --- a/salt/utils/zeromq.py +++ b/salt/utils/zeromq.py @@ -2,10 +2,11 @@ ZMQ-specific functions """ +import errno import logging import salt.utils.versions -from salt.exceptions import SaltSystemExit +from salt.exceptions import SaltReqTimeoutError, SaltSystemExit from salt.utils.network import ip_bracket as _new_ip_bracket log = logging.getLogger(__name__) @@ -28,6 +29,120 @@ log.exception("Error while getting LibZMQ/PyZMQ library version") +def _errno_values(*names): + values = set() + for name in names: + value = getattr(errno, name, None) + if value is not None: + values.add(value) + return values + + +_RETRYABLE_OS_ERRNOS = frozenset( + _errno_values( + "EAGAIN", + "EINTR", + "ECONNABORTED", + "ECONNRESET", + "ECONNREFUSED", + "ETIMEDOUT", + "EHOSTUNREACH", + "ENETUNREACH", + "ENOTCONN", + ) +) + + +def _retryable_zmq_errnos(): + if zmq is None: + return frozenset() + errnos = set() + for name in ( + "EAGAIN", + "EINTR", + "ECONNABORTED", + "ECONNRESET", + "ECONNREFUSED", + "ETIMEDOUT", + "EHOSTUNREACH", + "ENETUNREACH", + "ENOTCONN", + "EFSM", + "ETERM", + "ENOTSOCK", + ): + value = getattr(zmq, name, None) + if value is not None: + errnos.add(value) + return frozenset(errnos) + + +_RETRYABLE_ZMQ_ERRNOS = _retryable_zmq_errnos() + + +def _iter_exception_chain(exc, max_depth=16): + """ + Iterate an exception and its cause/context chain. + """ + current = exc + seen = set() + depth = 0 + while current is not None and depth < max_depth: + exc_id = id(current) + if exc_id in seen: + break + seen.add(exc_id) + yield current + depth += 1 + current = current.__cause__ or current.__context__ + + +def _is_tornado_future_callbacks_race(exc): + """ + Detect the Tornado 4 Future callback race observed as: + ``TypeError: 'NoneType' object is not iterable``. + """ + if not isinstance(exc, TypeError): + return False + message = str(exc) + return "NoneType" in message and "iterable" in message + + +def is_retryable_connection_error(exc): + """ + Return ``True`` when ``exc`` looks like a transient transport failure. + """ + if exc is None: + return False + for candidate in _iter_exception_chain(exc): + if isinstance(candidate, SaltReqTimeoutError): + return True + if isinstance(candidate, TimeoutError): + return True + if _is_tornado_future_callbacks_race(candidate): + return True + if isinstance(candidate, OSError) and candidate.errno in _RETRYABLE_OS_ERRNOS: + return True + if zmq is not None and isinstance(candidate, zmq.ZMQError): + if getattr(candidate, "errno", None) in _RETRYABLE_ZMQ_ERRNOS: + return True + return False + + +def format_connection_error(exc): + """ + Return a concise string for connection-related exception logging. + """ + if exc is None: + return "unknown transport exception" + if _is_tornado_future_callbacks_race(exc): + return ( + "Transient tornado future callback race " + "('NoneType' callbacks list while setting exception)" + ) + return str(exc) + + def check_ipc_path_max_len(uri): """ The socket path is limited to 107 characters on Solaris and diff --git a/tests/pytests/unit/test_minion.py b/tests/pytests/unit/test_minion.py index f9143cf3a754..ff2ce48a997b 100644 --- a/tests/pytests/unit/test_minion.py +++ b/tests/pytests/unit/test_minion.py @@ -1202,6 +1202,30 @@ async def test_connect_master_general_exception_error(minion_opts, connect_maste assert minion.connect_master.calls == 2 +async def test_connect_master_transient_typeerror_retries(minion_opts, connect_master_mock): + """ + Ensure transient transport TypeError failures are retried during connection. + """ + minion_opts["acceptance_wait_time"] = 0 + mm = salt.minion.MinionManager(minion_opts) + minion = salt.minion.Minion(minion_opts) + connect_master_mock.exc = lambda: TypeError("'NoneType' object is not iterable") + minion.connect_master = connect_master_mock + minion.destroy = MagicMock() + + await mm._connect_minion(minion) + minion.destroy.assert_called_once() + assert minion.connect_master.calls == 2 + + +def test_should_retry_connection_exception_typeerror(): + """ + Ensure the helper marks known transient transport errors as retryable. + """ + exc = TypeError("'NoneType' object is not iterable") + assert salt.minion.Minion._should_retry_connection_exception(exc) + + async def test_minion_manager_async_stop(io_loop, minion_opts, tmp_path): """ Ensure MinionManager's stop method works correctly and calls the diff --git a/tests/unit/utils/test_zeromq.py b/tests/unit/utils/test_zeromq.py index fdb4faaaeb19..5bedf6d46af1 100644 --- a/tests/unit/utils/test_zeromq.py +++ b/tests/unit/utils/test_zeromq.py @@ -24,3 +24,22 @@ def test_check_ipc_length(self): self.assertRaises( SaltSystemExit, salt.utils.zeromq.check_ipc_path_max_len, "1" * 1024 ) + + +def test_is_retryable_connection_error_tornado_callbacks_race(): + exc = TypeError("'NoneType' object is not iterable") + assert salt.utils.zeromq.is_retryable_connection_error(exc) + + +def test_is_retryable_connection_error_walks_exception_chain(): + try: + try: + raise TypeError("'NoneType' object is not iterable") + except TypeError as exc: + raise RuntimeError("wrapper") from exc + except RuntimeError as exc: + assert salt.utils.zeromq.is_retryable_connection_error(exc) + + +def test_is_retryable_connection_error_non_retryable(): + assert not salt.utils.zeromq.is_retryable_connection_error(ValueError("fatal")) \ No newline at end of file