Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions salt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import sys
import warnings

import salt.utils.versions

# Aweful hack to keep salt-ssh tests passing with tornado >=6.4.2. Salt ssh
# needs to be transitioned to use a relenv environemnt by default. This should
# be removed when salt-ssh uses relenv or we no longer want salt-ssh to support
Expand Down Expand Up @@ -77,13 +79,25 @@ def exec_module(self, module):
# never show up
)

# Filter the backports package UserWarning about being re-imported
warnings.filterwarnings(
"ignore",
"^Module backports was already imported from (.*), but (.*) is being added to sys.path$",
UserWarning,
append=True,
)
def _setup_backports_compat():
"""
Ensure the optional backports namespace exists for legacy imports.
"""
try:
salt.utils.versions.ensure_backports_compat()
except Exception: # pylint: disable=broad-except
return False
return "backports" in sys.modules


if _setup_backports_compat():
# Filter the backports package UserWarning about being re-imported
warnings.filterwarnings(
"ignore",
"^Module backports was already imported from (.*), but (.*) is being added to sys.path$",
UserWarning,
append=True,
)

# Filter the setuptools UserWarning until we stop relying on distutils
warnings.filterwarnings(
Expand Down
23 changes: 16 additions & 7 deletions salt/crypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import salt.utils.rsax931
import salt.utils.sdb
import salt.utils.stringutils
import salt.utils.minion
import salt.utils.user
import salt.utils.verify
import salt.version
Expand Down Expand Up @@ -1420,17 +1421,24 @@ def verify_master(self, payload, master_pub=True):
:return: An empty string on verification failure. On success, the decrypted AES message in the payload.
"""
m_pub_fn = os.path.join(self.opts["pki_dir"], self.mpub)
m_pub_exists = os.path.isfile(m_pub_fn)
local_master_pub = salt.utils.minion.read_master_pubkey(m_pub_fn)
if local_master_pub is None:
log.error("Unable to read master pubkey at %s", m_pub_fn)
return ""
payload_master_pub = salt.utils.minion.normalize_master_pubkey(
payload.get("pub_key")
)
m_pub_exists = bool(local_master_pub)
if m_pub_exists and master_pub and not self.opts["open_mode"]:
with salt.utils.files.fopen(m_pub_fn) as fp_:
local_master_pub = clean_key(fp_.read())

if payload["pub_key"] != local_master_pub:
if payload_master_pub != local_master_pub:
if not self.check_auth_deps(payload):
return ""

if self.opts["verify_master_pubkey_sign"]:
if self.verify_signing_master(payload):
salt.utils.minion.write_master_pubkey(
m_pub_fn, payload.get("pub_key", payload_master_pub)
)
return self.extract_aes(payload, master_pub=False)
else:
return ""
Expand Down Expand Up @@ -1476,8 +1484,9 @@ def verify_master(self, payload, master_pub=True):
if not m_pub_exists:
# the minion has not received any masters pubkey yet, write
# the newly received pubkey to minion_master.pub
with salt.utils.files.fopen(m_pub_fn, "wb+") as fp_:
fp_.write(salt.utils.stringutils.to_bytes(payload["pub_key"]))
salt.utils.minion.write_master_pubkey(
m_pub_fn, payload.get("pub_key", payload_master_pub)
)
return self.extract_aes(payload, master_pub=False)

def _finger_fail(self, finger, master_key):
Expand Down
15 changes: 15 additions & 0 deletions salt/loader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@

log = logging.getLogger(__name__)


_BACKPORTS_COMPAT_READY = False


def _ensure_backports_compat():
global _BACKPORTS_COMPAT_READY
if _BACKPORTS_COMPAT_READY:
return
try:
salt.utils.versions.ensure_backports_compat()
except Exception: # pylint: disable=broad-except
log.debug("Failed to set up backports compatibility", exc_info=True)
_BACKPORTS_COMPAT_READY = True

# Because on the cloud drivers we do `from salt.cloud.libcloudfuncs import *`
# which simplifies code readability, it adds some unsupported functions into
# the driver's module scope.
Expand Down Expand Up @@ -127,6 +141,7 @@ def _module_dirs(
base_path=None,
load_extensions=True,
):
_ensure_backports_compat()
if tag is None:
tag = ext_type
sys_types = [os.path.join(base_path or str(SALT_BASE_PATH), int_type or ext_type)]
Expand Down
164 changes: 143 additions & 21 deletions salt/minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,44 @@ def process_beacons(self, functions):
) # pylint: disable=no-member
return []

@staticmethod
def _should_retry_connection_exception(exc):
"""
Decide if a connection-related exception should trigger retry logic.
"""
return salt.transport.is_retryable_connection_error(exc)

@staticmethod
def _next_auth_wait(auth_wait, auth_wait_step, max_auth_wait):
"""
Increase auth wait time up to the configured maximum.
"""
if auth_wait < max_auth_wait:
return min(auth_wait + auth_wait_step, max_auth_wait)
return auth_wait

def _handle_invalid_master_key(self, exc, opts=None):
"""
Provide actionable logging for invalid master key errors.
"""
if not salt.utils.minion.is_invalid_master_key_error(exc):
return False
target_opts = opts or self.opts
master_key_path = salt.utils.minion.master_pubkey_path(target_opts)
refreshed = salt.utils.minion.refresh_master_pubkey_if_invalid(target_opts)
if refreshed:
log.warning(
"Removed empty or invalid master key at %s; retrying authentication",
master_key_path,
)
else:
log.error(
"Invalid master key detected. Verify the master key at %s. If the "
"master key rotated, remove this file to re-authenticate.",
master_key_path,
)
return True

async def eval_master(
self, opts, timeout=60, safe=True, failed=False, failback=False
):
Expand Down Expand Up @@ -744,21 +782,38 @@ async def eval_master(
await pub_channel.connect()
conn = True
break
except SaltClientError as exc:
last_exc = exc
if exc.strerror.startswith("Could not access"):
log.info(
"Failed to initiate connection with Master %s: check"
" ownership/permissions. Error message: %s",
opts["master"],
exc,
except Exception as exc: # pylint: disable=broad-except
if isinstance(exc, SaltClientError):
last_exc = exc
if exc.strerror.startswith("Could not access"):
log.info(
"Failed to initiate connection with Master %s: check"
" ownership/permissions. Error message: %s",
opts["master"],
exc,
)
else:
log.info(
"Master %s could not be reached, trying next master"
" (if any)",
opts["master"],
)
elif self._should_retry_connection_exception(exc):
last_exc = SaltClientError(
"Transient transport error while connecting to master "
f"{opts['master']}: "
f"{salt.transport.format_connection_error(exc)}"
)
else:
log.info(
"Master %s could not be reached, trying next master (if"
" any)",
log.warning(
"Master %s had a transient transport error (%s), "
"trying next master (if any)",
opts["master"],
salt.transport.format_connection_error(exc),
)
else:
if pub_channel:
pub_channel.close()
raise
pub_channel.close()
pub_channel = None
continue
Expand Down Expand Up @@ -790,6 +845,7 @@ async def eval_master(
" Ignoring."
)
pub_channel = None
last_exc = None
while True:
if attempts != 0:
# Give up a little time between connection attempts
Expand Down Expand Up @@ -827,12 +883,32 @@ async def eval_master(
self.tok = pub_channel.auth.gen_token(b"salt")
self.connected = True
return (opts["master"], pub_channel)
except SaltClientError:
except Exception as exc: # pylint: disable=broad-except
retryable = isinstance(exc, SaltClientError) or self._should_retry_connection_exception(exc)
if not retryable:
if pub_channel:
pub_channel.close()
raise
if isinstance(exc, SaltClientError):
last_exc = exc
else:
last_exc = SaltClientError(
"Transient transport error while connecting to master "
f"{opts['master']}: "
f"{salt.transport.format_connection_error(exc)}"
)
log.warning(
"Transient transport error while connecting to master %s: %s",
opts["master"],
salt.transport.format_connection_error(exc),
)
if pub_channel:
pub_channel.close()
if attempts == tries:
# Exhausted all attempts. Return exception.
self.connected = False
if last_exc:
raise last_exc
raise

def _discover_masters(self):
Expand Down Expand Up @@ -1161,6 +1237,7 @@ async def _connect_minion(self, minion):
except SaltClientError as exc:
minion.destroy()
failed = True
minion._handle_invalid_master_key(exc)
log.error(
"Error while bringing up minion for multi-master. Is "
"master at %s responding? The error message was %s",
Expand All @@ -1169,8 +1246,9 @@ async def _connect_minion(self, minion):
exc_info=True,
)
last = time.time()
if auth_wait < self.max_auth_wait:
auth_wait += self.auth_wait
auth_wait = minion._next_auth_wait(
auth_wait, self.auth_wait, self.max_auth_wait
)
await asyncio.sleep(auth_wait)
except SaltMasterUnresolvableError:
minion.destroy()
Expand All @@ -1182,9 +1260,24 @@ async def _connect_minion(self, minion):
)
log.error(err)
break
except Exception as e: # pylint: disable=broad-except
except Exception as exc: # pylint: disable=broad-except
minion.destroy()
failed = True
if minion._should_retry_connection_exception(exc):
log.warning(
"Transient transport error while connecting to %s; "
"retrying in %s seconds: %s",
minion.opts["master"],
auth_wait,
salt.transport.format_connection_error(exc),
exc_info=True,
)
last = time.time()
auth_wait = minion._next_auth_wait(
auth_wait, self.auth_wait, self.max_auth_wait
)
await asyncio.sleep(auth_wait)
continue
log.critical(
"Unexpected error while connecting to %s",
minion.opts["master"],
Expand Down Expand Up @@ -1440,8 +1533,20 @@ async def connect_master(self, failed=False):
self.opts, io_loop=self.io_loop
)
log.debug("Connecting minion's long-running req channel")
await self.req_channel.connect()
await self._post_master_init(master)
try:
await self.req_channel.connect()
await self._post_master_init(master)
except Exception as exc: # pylint: disable=broad-except
if self.req_channel:
self.req_channel.close()
self.req_channel = None
if self._should_retry_connection_exception(exc):
raise SaltClientError(
"Transient transport error while connecting minion request channel"
f" to master {self.opts.get('master')}: "
f"{salt.transport.format_connection_error(exc)}"
) from exc
raise

async def handle_payload(self, payload, reply_func):
self.payloads.append(payload)
Expand Down Expand Up @@ -3710,24 +3815,41 @@ async def _connect_syndic(self, opts):
break
except SaltClientError as exc:
failed = True
self._handle_invalid_master_key(exc, opts=opts)
log.error(
"Error while bringing up syndic for multi-syndic. Is the "
"master at %s responding?",
opts["master"],
)
last = time.time()
if auth_wait < self.max_auth_wait:
auth_wait += self.auth_wait
auth_wait = self._next_auth_wait(
auth_wait, self.auth_wait, self.max_auth_wait
)
await asyncio.sleep(auth_wait) # TODO: log?
except (KeyboardInterrupt, SystemExit): # pylint: disable=try-except-raise
raise
except Exception: # pylint: disable=broad-except
except Exception as exc: # pylint: disable=broad-except
failed = True
if self._should_retry_connection_exception(exc):
log.warning(
"Transient transport error while connecting syndic to %s; "
"retrying in %s seconds: %s",
opts["master"],
auth_wait,
salt.transport.format_connection_error(exc),
exc_info=True,
)
auth_wait = self._next_auth_wait(
auth_wait, self.auth_wait, self.max_auth_wait
)
await asyncio.sleep(auth_wait)
continue
log.critical(
"Unexpected error while connecting to %s",
opts["master"],
exc_info=True,
)
break

return syndic

Expand Down
Loading