diff --git a/Makefile b/Makefile index ce4c792..f62ebfb 100644 --- a/Makefile +++ b/Makefile @@ -18,6 +18,7 @@ super-linter: ## Runs super linter locally -e VALIDATE_JSON_PRETTIER=false \ -e VALIDATE_MARKDOWN_PRETTIER=false \ -e VALIDATE_PYTHON_PYLINT=false \ + -e VALIDATE_PYTHON_PYINK=false \ -e VALIDATE_PYTHON_RUFF_FORMAT=false \ -e VALIDATE_SHELL_SHFMT=false \ -e VALIDATE_YAML=false \ @@ -34,12 +35,12 @@ ansible-lint: ## run ansible lint on ansible/ folder .PHONY: ansible-sanitytest ansible-sanitytest: ## run ansible unit tests - ansible-test sanity --docker default + ansible-test sanity --docker default --python 3.11 --python 3.12 .PHONY: ansible-unittest ansible-unittest: ## run ansible unit tests rm -rf tests/output - ansible-test units --docker + ansible-test units --docker --python 3.11 --python 3.12 .PHONY: test test: ansible-sanitytest ansible-unittest diff --git a/plugins/module_utils/load_secrets_common.py b/plugins/module_utils/load_secrets_common.py index eb5e51b..f8ebe5d 100644 --- a/plugins/module_utils/load_secrets_common.py +++ b/plugins/module_utils/load_secrets_common.py @@ -21,8 +21,20 @@ __metaclass__ = type import configparser +import getpass +import os from collections.abc import MutableMapping +default_vp_vault_policies = { + "validatedPatternDefaultPolicy": ( + "length=20\n" + 'rule "charset" { charset = "abcdefghijklmnopqrstuvwxyz" min-chars = 1 }\n' + 'rule "charset" { charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" min-chars = 1 }\n' + 'rule "charset" { charset = "0123456789" min-chars = 1 }\n' + 'rule "charset" { charset = "!@#%^&*" min-chars = 1 }\n' + ) +} + def find_dupes(array): """ @@ -149,3 +161,247 @@ def filter_module_args(arg_spec): pass return arg_spec + + +class SecretsV2Base: + """ + Base class with common functionality for V2 secrets handling + """ + + def __init__(self, module, syaml): + self.module = module + self.syaml = syaml + + def _get_vault_policies(self, enable_default_vp_policies=True): + # We start off with the hard-coded default VP policy and add the user-defined ones + policies = ( + default_vp_vault_policies.copy() if enable_default_vp_policies else {} + ) + policies.update(self.syaml.get("vaultPolicies", {})) + return policies + + def _get_secrets(self): + return self.syaml.get("secrets", []) + + def _get_field_on_missing_value(self, f): + # By default if 'onMissingValue' is missing we assume we need to + # error out whenever the value is missing + return f.get("onMissingValue", "error") + + def _get_field_value(self, f): + return f.get("value", None) + + def _get_field_path(self, f): + return f.get("path", None) + + def _get_field_ini_file(self, f): + return f.get("ini_file", None) + + def _get_field_kind(self, f): + # value: null will be interpreted with None, so let's just + # check for the existence of the field, as we use 'value: null' to say + # "we want a value/secret and not a file path" + found = [] + for i in ["value", "path", "ini_file"]: + if i in f: + found.append(i) + + if len(found) > 1: # you can only have one of value, path and ini_file + self.module.fail_json(f"Both '{found[0]}' and '{found[1]}' cannot be used") + + if len(found) == 0: + return "" + return found[0] + + def _get_field_prompt(self, f): + return f.get("prompt", None) + + def _get_field_base64(self, f): + return bool(f.get("base64", False)) + + def _get_field_override(self, f): + return bool(f.get("override", False)) + + def _validate_field(self, f): + # Check mandatory fields + if "name" not in f: + return (False, f"Field {f} is missing name") + + # Validate field structure and types + result = self._validate_field_structure(f) + if not result[0]: + return result + + # Validate vault policy + result = self._validate_vault_policy(f) + if not result[0]: + return result + + on_missing_value = self._get_field_on_missing_value(f) + # Validate based on onMissingValue type + match on_missing_value: + case "error": + return self._validate_error_mode(f) + case "generate": + return self._validate_generate_mode(f) + case "prompt": + return self._validate_prompt_mode(f) + + return (False, f"onMissingValue: {on_missing_value} is invalid") + + def _validate_field_structure(self, f): + """Validate field structure and basic types""" + kind = self._get_field_kind(f) + if kind == "ini_file": + ini_key = f.get("ini_key", None) + if ini_key is None: + return (False, "ini_file requires at least ini_key to be defined") + + # Test if base64 and override are correct booleans + self._get_field_base64(f) + self._get_field_override(f) + + return (True, "") + + def _validate_vault_policy(self, f): + """Validate vault policy exists if specified""" + vault_policy = f.get("vaultPolicy", None) + if vault_policy is not None and vault_policy not in self._get_vault_policies(): + return ( + False, + f"Secret has vaultPolicy set to {vault_policy} but no such policy exists", + ) + return (True, "") + + def _validate_error_mode(self, f): + """Validate fields when onMissingValue is 'error'""" + value = self._get_field_value(f) + path = self._get_field_path(f) + ini_file = self._get_field_ini_file(f) + + # Check that at least one source is provided and not empty + if ( + (value is None or len(value) < 1) + and (path is None or len(path) < 1) + and (ini_file is None or len(ini_file) < 1) + ): + return ( + False, + "Secret has onMissingValue set to 'error' and has neither value nor path nor ini_file set", + ) + + # Validate file paths exist + if path is not None and not os.path.isfile(os.path.expanduser(path)): + return (False, f"Field has non-existing path: {path}") + + if ini_file is not None and not os.path.isfile(os.path.expanduser(ini_file)): + return (False, f"Field has non-existing ini_file: {ini_file}") + + # Override not allowed in error mode + if "override" in f: + return ( + False, + "'override' attribute requires 'onMissingValue' to be set to 'generate'", + ) + + return (True, "") + + def _validate_generate_mode(self, f): + """Validate fields when onMissingValue is 'generate'""" + value = self._get_field_value(f) + path = self._get_field_path(f) + vault_policy = f.get("vaultPolicy", None) + + if value is not None: + return ( + False, + "Secret has onMissingValue set to 'generate' but has a value set", + ) + + if path is not None: + return ( + False, + "Secret has onMissingValue set to 'generate' but has a path set", + ) + + if vault_policy is None: + return ( + False, + "Secret has no vaultPolicy but onMissingValue is set to 'generate'", + ) + + return (True, "") + + def _validate_prompt_mode(self, f): + """Validate fields when onMissingValue is 'prompt'""" + # When we prompt, the user needs to set one of the following: + # - value: null # prompt for a secret without a default value + # - value: 123 # prompt for a secret but use a default value + # - path: null # prompt for a file path without a default value + # - path: /tmp/ca.crt # prompt for a file path with a default value + if "value" not in f and "path" not in f: + return ( + False, + "Secret has onMissingValue set to 'prompt' but has no value nor path fields", + ) + + # Override not allowed in prompt mode + if "override" in f: + return ( + False, + "'override' attribute requires 'onMissingValue' to be set to 'generate'", + ) + + return (True, "") + + def _get_secret_value(self, name, field): + on_missing_value = self._get_field_on_missing_value(field) + # We checked for errors in _validate_secrets() already + match on_missing_value: + case "error": + value = field.get("value") + # Allow subclasses to override value processing + return self._process_secret_value(value) + case "prompt": + prompt = self._get_field_prompt(field) + if prompt is None: + prompt = f"Type secret for {name}/{field['name']}: " + value = self._get_field_value(field) + if value is not None: + prompt += f" [{value}]" + prompt += ": " + return getpass.getpass(prompt) + case _: + return None + + def _process_secret_value(self, value): + """ + Process a secret value. Can be overridden by subclasses. + """ + return value + + def _get_file_path(self, name, field): + on_missing_value = self._get_field_on_missing_value(field) + match on_missing_value: + case "error": + return os.path.expanduser(field.get("path")) + case "prompt": + prompt = self._get_field_prompt(field) + path = self._get_field_path(field) + if path is None: + path = "" + + if prompt is None: + text = f"Type path for file {name}/{field['name']} [{path}]: " + else: + text = f"{prompt} [{path}]: " + + newpath = getpass.getpass(text) + if newpath == "": # Set the default if no string was entered + newpath = path + + if os.path.isfile(os.path.expanduser(newpath)): + return newpath + self.module.fail_json(f"File {newpath} not found, exiting") + case _: + self.module.fail_json("File with wrong onMissingValue") diff --git a/plugins/module_utils/load_secrets_v2.py b/plugins/module_utils/load_secrets_v2.py index 96b7c68..90f1c19 100644 --- a/plugins/module_utils/load_secrets_v2.py +++ b/plugins/module_utils/load_secrets_v2.py @@ -21,34 +21,23 @@ __metaclass__ = type import base64 -import getpass import os import time from ansible_collections.rhvp.cluster_utils.plugins.module_utils.load_secrets_common import ( + SecretsV2Base, find_dupes, get_ini_value, get_version, ) -default_vp_vault_policies = { - "validatedPatternDefaultPolicy": ( - "length=20\n" - 'rule "charset" { charset = "abcdefghijklmnopqrstuvwxyz" min-chars = 1 }\n' - 'rule "charset" { charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" min-chars = 1 }\n' - 'rule "charset" { charset = "0123456789" min-chars = 1 }\n' - 'rule "charset" { charset = "!@#%^&*" min-chars = 1 }\n' - ) -} - -class LoadSecretsV2: +class LoadSecretsV2(SecretsV2Base): def __init__(self, module, syaml, namespace, pod): - self.module = module + super().__init__(module, syaml) self.namespace = namespace self.pod = pod - self.syaml = syaml def _run_command(self, command, attempts=1, sleep=3, checkrc=True): """ @@ -86,193 +75,76 @@ def _get_backingstore(self): """ return str(self.syaml.get("backingStore", "vault")) - def _get_vault_policies(self, enable_default_vp_policies=True): - # We start off with the hard-coded default VP policy and add the user-defined ones - if enable_default_vp_policies: - policies = default_vp_vault_policies.copy() - else: - policies = {} - policies.update(self.syaml.get("vaultPolicies", {})) - return policies - def _get_secrets(self): return self.syaml.get("secrets", {}) - def _get_field_on_missing_value(self, f): - # By default if 'onMissingValue' is missing we assume we need to - # error out whenever the value is missing - return f.get("onMissingValue", "error") - - def _get_field_value(self, f): - return f.get("value", None) - - def _get_field_path(self, f): - return f.get("path", None) - - def _get_field_ini_file(self, f): - return f.get("ini_file", None) - - def _get_field_kind(self, f): - # value: null will be interpreted with None, so let's just - # check for the existence of the field, as we use 'value: null' to say - # "we want a value/secret and not a file path" - found = [] - for i in ["value", "path", "ini_file"]: - if i in f: - found.append(i) - - if len(found) > 1: # you can only have one of value, path and ini_file - self.module.fail_json(f"Both '{found[0]}' and '{found[1]}' cannot be used") + def _validate_secrets(self): + secrets = self._get_secrets() + if len(secrets) == 0: + self.module.fail_json("No secrets found") - if len(found) == 0: - return "" - return found[0] + # Validate each secret and collect names for duplicate checking + secret_names = [] + for secret in secrets: + result = self._validate_secret(secret) + if not result[0]: + return result + secret_names.append(secret["name"]) - def _get_field_prompt(self, f): - return f.get("prompt", None) + # Check for duplicate secret names + dupes = find_dupes(secret_names) + if len(dupes) > 0: + return (False, f"You cannot have duplicate secret names: {dupes}") - def _get_field_base64(self, f): - return bool(f.get("base64", False)) + return (True, "") - def _get_field_override(self, f): - return bool(f.get("override", False)) + def _validate_secret(self, secret): + """Validate a single secret configuration""" + # Check mandatory fields + if "name" not in secret: + return (False, f"Secret {secret} is missing name") - # This function could use some rewriting and it should call a specific validation function - # for each type (value, path, ini_file) - def _validate_field(self, f): - # These fields are mandatory - try: - unused = f["name"] - except KeyError: - return (False, f"Field {f} is missing name") + secret_name = secret["name"] - on_missing_value = self._get_field_on_missing_value(f) - if on_missing_value not in ["error", "generate", "prompt"]: - return (False, f"onMissingValue: {on_missing_value} is invalid") + # Validate vault prefixes + result = self._validate_vault_prefixes(secret) + if not result[0]: + return result - value = self._get_field_value(f) - path = self._get_field_path(f) - ini_file = self._get_field_ini_file(f) - kind = self._get_field_kind(f) - if kind == "ini_file": - # if we are using ini_file then at least ini_key needs to be defined - # ini_section defaults to 'default' when omitted - ini_key = f.get("ini_key", None) - if ini_key is None: - return ( - False, - "ini_file requires at least ini_key to be defined", - ) - - # Test if base64 is a correct boolean (defaults to False) - unused = self._get_field_base64(f) - unused = self._get_field_override(f) - - vault_policy = f.get("vaultPolicy", None) - if vault_policy is not None and vault_policy not in self._get_vault_policies(): - return ( - False, - f"Secret has vaultPolicy set to {vault_policy} but no such policy exists", - ) - - if on_missing_value in ["error"]: - if ( - (value is None or len(value) < 1) - and (path is None or len(path) < 1) - and (ini_file is None or len(ini_file) < 1) - ): - return ( - False, - "Secret has onMissingValue set to 'error' and has neither value nor path nor ini_file set", - ) - if path is not None and not os.path.isfile(os.path.expanduser(path)): - return (False, f"Field has non-existing path: {path}") - - if ini_file is not None and not os.path.isfile( - os.path.expanduser(ini_file) - ): - return (False, f"Field has non-existing ini_file: {ini_file}") - - if "override" in f: - return ( - False, - "'override' attribute requires 'onMissingValue' to be set to 'generate'", - ) - - if on_missing_value in ["generate"]: - if value is not None: - return ( - False, - "Secret has onMissingValue set to 'generate' but has a value set", - ) - if path is not None: - return ( - False, - "Secret has onMissingValue set to 'generate' but has a path set", - ) - if vault_policy is None: - return ( - False, - "Secret has no vaultPolicy but onMissingValue is set to 'generate'", - ) - - if on_missing_value in ["prompt"]: - # When we prompt, the user needs to set one of the following: - # - value: null # prompt for a secret without a default value - # - value: 123 # prompt for a secret but use a default value - # - path: null # prompt for a file path without a default value - # - path: /tmp/ca.crt # prompt for a file path with a default value - if "value" not in f and "path" not in f: - return ( - False, - "Secret has onMissingValue set to 'prompt' but has no value nor path fields", - ) - - if "override" in f: - return ( - False, - "'override' attribute requires 'onMissingValue' to be set to 'generate'", - ) + # Validate fields + result = self._validate_secret_fields(secret) + if not result[0]: + return result return (True, "") - def _validate_secrets(self): - secrets = self._get_secrets() - if len(secrets) == 0: - self.module.fail_json("No secrets found") - - names = [] - for s in secrets: - # These fields are mandatory - for i in ["name"]: - try: - unused = s[i] - except KeyError: - return (False, f"Secret {s['name']} is missing {i}") - names.append(s["name"]) - - vault_prefixes = s.get("vaultPrefixes", ["hub"]) - # This checks for the case when vaultPrefixes: is specified but empty - if vault_prefixes is None or len(vault_prefixes) == 0: - return (False, f"Secret {s['name']} has empty vaultPrefixes") + def _validate_vault_prefixes(self, secret): + """Validate vault prefixes for a secret""" + vault_prefixes = secret.get("vaultPrefixes", ["hub"]) + # This checks for the case when vaultPrefixes: is specified but empty + if vault_prefixes is None or len(vault_prefixes) == 0: + return (False, f"Secret {secret['name']} has empty vaultPrefixes") + return (True, "") - fields = s.get("fields", []) - if len(fields) == 0: - return (False, f"Secret {s['name']} does not have any fields") + def _validate_secret_fields(self, secret): + """Validate all fields for a secret""" + fields = secret.get("fields", []) + if len(fields) == 0: + return (False, f"Secret {secret['name']} does not have any fields") + + # Validate each field and collect names for duplicate checking + field_names = [] + for field in fields: + result = self._validate_field(field) + if not result[0]: + return result + field_names.append(field["name"]) + + # Check for duplicate field names + field_dupes = find_dupes(field_names) + if len(field_dupes) > 0: + return (False, f"You cannot have duplicate field names: {field_dupes}") - field_names = [] - for i in fields: - (ret, msg) = self._validate_field(i) - if not ret: - return (False, msg) - field_names.append(i["name"]) - field_dupes = find_dupes(field_names) - if len(field_dupes) > 0: - return (False, f"You cannot have duplicate field names: {field_dupes}") - - dupes = find_dupes(names) - if len(dupes) > 0: - return (False, f"You cannot have duplicate secret names: {dupes}") return (True, "") def inject_vault_policies(self): @@ -303,53 +175,10 @@ def sanitize_values(self): self.module.fail_json( f"Currently only the 'vault' backingStore is supported: {backing_store}" ) - (ret, msg) = self._validate_secrets() if not ret: self.module.fail_json(msg) - def _get_secret_value(self, name, field): - on_missing_value = self._get_field_on_missing_value(field) - # We cannot use match + case as RHEL8 has python 3.9 (it needs 3.10) - # We checked for errors in _validate_secrets() already - if on_missing_value == "error": - return field.get("value") - elif on_missing_value == "prompt": - prompt = self._get_field_prompt(field) - if prompt is None: - prompt = f"Type secret for {name}/{field['name']}: " - value = self._get_field_value(field) - if value is not None: - prompt += f" [{value}]" - prompt += ": " - return getpass.getpass(prompt) - return None - - def _get_file_path(self, name, field): - on_missing_value = self._get_field_on_missing_value(field) - if on_missing_value == "error": - return os.path.expanduser(field.get("path")) - elif on_missing_value == "prompt": - prompt = self._get_field_prompt(field) - path = self._get_field_path(field) - if path is None: - path = "" - - if prompt is None: - text = f"Type path for file {name}/{field['name']} [{path}]: " - else: - text = f"{prompt} [{path}]: " - - newpath = getpass.getpass(text) - if newpath == "": # Set the default if no string was entered - newpath = path - - if os.path.isfile(os.path.expanduser(newpath)): - return newpath - self.module.fail_json(f"File {newpath} not found, exiting") - - self.module.fail_json("File with wrong onMissingValue") - def _vault_secret_attr_exists(self, mount, prefix, secret_name, attribute): cmd = ( f"oc exec -n {self.namespace} {self.pod} -i -- sh -c " @@ -363,79 +192,105 @@ def _vault_secret_attr_exists(self, mount, prefix, secret_name, attribute): return False def _inject_field(self, secret_name, f, mount, prefixes, first=False): + verb = "put" if first else "patch" + kind = self._get_field_kind(f) + + match kind: + case "value" | "": + self._inject_value_field(secret_name, f, mount, prefixes, verb) + case "path": + self._inject_path_field(secret_name, f, mount, prefixes, verb) + case "ini_file": + self._inject_ini_field(secret_name, f, mount, prefixes, verb) + + def _inject_value_field(self, secret_name, f, mount, prefixes, verb): + """Inject a value-based field into vault""" on_missing_value = self._get_field_on_missing_value(f) - override = self._get_field_override(f) + + if on_missing_value == "generate": + self._inject_generated_secret(secret_name, f, mount, prefixes, verb) + else: + self._inject_provided_secret(secret_name, f, mount, prefixes, verb) + + def _inject_generated_secret(self, secret_name, f, mount, prefixes, verb): + """Generate and inject a secret using vault policy""" kind = self._get_field_kind(f) - # If we're generating the password then we just push the secret in the vault directly - verb = "put" if first else "patch" + if kind == "path": + self.module.fail_json( + "You cannot have onMissingValue set to 'generate' with a path" + ) + + override = self._get_field_override(f) b64 = self._get_field_base64(f) - if kind in ["value", ""]: - if on_missing_value == "generate": - if kind == "path": - self.module.fail_json( - "You cannot have onMissingValue set to 'generate' with a path" - ) - vault_policy = f.get("vaultPolicy") - gen_cmd = f"vault read -field=password sys/policies/password/{vault_policy}/generate" - if b64: - gen_cmd += " | base64 --wrap=0" - for prefix in prefixes: - # if the override field is False and the secret attribute exists at the prefix then we just - # skip, as we do not want to overwrite the existing secret - if not override and self._vault_secret_attr_exists( - mount, prefix, secret_name, f["name"] - ): - continue - cmd = ( - f"oc exec -n {self.namespace} {self.pod} -i -- sh -c " - f"\"{gen_cmd} | vault kv {verb} -mount={mount} {prefix}/{secret_name} {f['name']}=-\"" - ) - self._run_command(cmd, attempts=3) - return - - # If we're not generating the secret inside the vault directly we either read it from the file ("error") - # or we are prompting the user for it - secret = self._get_secret_value(secret_name, f) - if b64: - secret = base64.b64encode(secret.encode()).decode("utf-8") - for prefix in prefixes: - cmd = ( - f"oc exec -n {self.namespace} {self.pod} -i -- sh -c " - f"\"vault kv {verb} -mount={mount} {prefix}/{secret_name} {f['name']}='{secret}'\"" - ) - self._run_command(cmd, attempts=3) - - elif kind == "path": # path. we upload files - # If we're generating the password then we just push the secret in the vault directly - verb = "put" if first else "patch" - path = self._get_file_path(secret_name, f) - for prefix in prefixes: - if b64: - b64_cmd = "| base64 --wrap=0 " - else: - b64_cmd = "" - cmd = ( - f"cat '{path}' | oc exec -n {self.namespace} {self.pod} -i -- sh -c " - f"'cat - {b64_cmd}> /tmp/vcontent'; " - f"oc exec -n {self.namespace} {self.pod} -i -- sh -c '" - f"vault kv {verb} -mount={mount} {prefix}/{secret_name} {f['name']}=@/tmp/vcontent; " - f"rm /tmp/vcontent'" - ) - self._run_command(cmd, attempts=3) - elif kind == "ini_file": # ini_file. we parse an ini_file - verb = "put" if first else "patch" - ini_file = os.path.expanduser(f.get("ini_file")) - ini_section = f.get("ini_section", "default") - ini_key = f.get("ini_key") - secret = get_ini_value(ini_file, ini_section, ini_key) - if b64: - secret = base64.b64encode(secret.encode()).decode("utf-8") - for prefix in prefixes: - cmd = ( - f"oc exec -n {self.namespace} {self.pod} -i -- sh -c " - f"\"vault kv {verb} -mount={mount} {prefix}/{secret_name} {f['name']}='{secret}'\"" - ) - self._run_command(cmd, attempts=3) + vault_policy = f.get("vaultPolicy") + + gen_cmd = ( + f"vault read -field=password sys/policies/password/{vault_policy}/generate" + ) + if b64: + gen_cmd += " | base64 --wrap=0" + + for prefix in prefixes: + # Skip if secret exists and override is False + if not override and self._vault_secret_attr_exists( + mount, prefix, secret_name, f["name"] + ): + continue + + cmd = ( + f"oc exec -n {self.namespace} {self.pod} -i -- sh -c " + f"\"{gen_cmd} | vault kv {verb} -mount={mount} {prefix}/{secret_name} {f['name']}=-\"" + ) + self._run_command(cmd, attempts=3) + + def _inject_provided_secret(self, secret_name, f, mount, prefixes, verb): + """Inject a user-provided secret value""" + secret = self._get_secret_value(secret_name, f) + secret = self._encode_secret_if_needed(secret, f) + + for prefix in prefixes: + cmd = ( + f"oc exec -n {self.namespace} {self.pod} -i -- sh -c " + f"\"vault kv {verb} -mount={mount} {prefix}/{secret_name} {f['name']}='{secret}'\"" + ) + self._run_command(cmd, attempts=3) + + def _inject_path_field(self, secret_name, f, mount, prefixes, verb): + """Inject a file-based field into vault""" + path = self._get_file_path(secret_name, f) + b64_cmd = "| base64 --wrap=0 " if self._get_field_base64(f) else "" + + for prefix in prefixes: + cmd = ( + f"cat '{path}' | oc exec -n {self.namespace} {self.pod} -i -- sh -c " + f"'cat - {b64_cmd}> /tmp/vcontent'; " + f"oc exec -n {self.namespace} {self.pod} -i -- sh -c '" + f"vault kv {verb} -mount={mount} {prefix}/{secret_name} {f['name']}=@/tmp/vcontent; " + f"rm /tmp/vcontent'" + ) + self._run_command(cmd, attempts=3) + + def _inject_ini_field(self, secret_name, f, mount, prefixes, verb): + """Inject an INI file-based field into vault""" + ini_file = os.path.expanduser(f.get("ini_file")) + ini_section = f.get("ini_section", "default") + ini_key = f.get("ini_key") + + secret = get_ini_value(ini_file, ini_section, ini_key) + secret = self._encode_secret_if_needed(secret, f) + + for prefix in prefixes: + cmd = ( + f"oc exec -n {self.namespace} {self.pod} -i -- sh -c " + f"\"vault kv {verb} -mount={mount} {prefix}/{secret_name} {f['name']}='{secret}'\"" + ) + self._run_command(cmd, attempts=3) + + def _encode_secret_if_needed(self, secret, f): + """Apply base64 encoding if required""" + if self._get_field_base64(f): + return base64.b64encode(secret.encode()).decode("utf-8") + return secret # This assumes that self.sanitize_values() has already been called # so we do a lot less validation as it has already happened diff --git a/plugins/module_utils/parse_secrets_v2.py b/plugins/module_utils/parse_secrets_v2.py index 4d27511..cf5b564 100644 --- a/plugins/module_utils/parse_secrets_v2.py +++ b/plugins/module_utils/parse_secrets_v2.py @@ -21,34 +21,23 @@ __metaclass__ = type import base64 -import getpass import os from ansible_collections.rhvp.cluster_utils.plugins.module_utils.load_secrets_common import ( + SecretsV2Base, find_dupes, get_ini_value, get_version, stringify_dict, ) -default_vp_vault_policies = { - "validatedPatternDefaultPolicy": ( - "length=20\n" - 'rule "charset" { charset = "abcdefghijklmnopqrstuvwxyz" min-chars = 1 }\n' - 'rule "charset" { charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" min-chars = 1 }\n' - 'rule "charset" { charset = "0123456789" min-chars = 1 }\n' - 'rule "charset" { charset = "!@#%^&*" min-chars = 1 }\n' - ) -} - secret_store_namespace = "validated-patterns-secrets" -class ParseSecretsV2: +class ParseSecretsV2(SecretsV2Base): def __init__(self, module, syaml, secrets_backing_store): - self.module = module - self.syaml = syaml + super().__init__(module, syaml) self.secrets_backing_store = str(secrets_backing_store) self.secret_store_namespace = None self.parsed_secrets = {} @@ -82,11 +71,8 @@ def _get_backingstore(self): return self.secrets_backing_store def _get_vault_policies(self, enable_default_vp_policies=True): - # We start off with the hard-coded default VP policy and add the user-defined ones - if enable_default_vp_policies: - policies = default_vp_vault_policies.copy() - else: - policies = {} + # Override base class to add YAML sanitization for policies + policies = super()._get_vault_policies(enable_default_vp_policies) # This is useful for embedded newlines, which occur with YAML # flow-type scalars (|, |- for example) @@ -102,20 +88,6 @@ def _get_secrets(self): # We also check for None here to cover when there is no jinja filter is used (unit tests) return [] if secrets == "None" or secrets is None else secrets - def _get_field_on_missing_value(self, f): - # By default if 'onMissingValue' is missing we assume we need to - # error out whenever the value is missing - return f.get("onMissingValue", "error") - - def _get_field_value(self, f): - return f.get("value", None) - - def _get_field_path(self, f): - return f.get("path", None) - - def _get_field_ini_file(self, f): - return f.get("ini_file", None) - def _get_field_annotations(self, f): return f.get("annotations", {}) @@ -123,9 +95,7 @@ def _get_field_labels(self, f): return f.get("labels", {}) def _get_field_kind(self, f): - # value: null will be interpreted with None, so let's just - # check for the existence of the field, as we use 'value: null' to say - # "we want a value/secret and not a file path" + # Override the base class implementation to include field name in error message found = [] for i in ["value", "path", "ini_file"]: if i in f: @@ -141,15 +111,6 @@ def _get_field_kind(self, f): return "" return found[0] - def _get_field_prompt(self, f): - return f.get("prompt", None) - - def _get_field_base64(self, f): - return bool(f.get("base64", False)) - - def _get_field_override(self, f): - return bool(f.get("override", False)) - def _get_secret_store_namespace(self): return str(self.syaml.get("secretStoreNamespace", secret_store_namespace)) @@ -252,82 +213,6 @@ def parse(self): return total_secrets - # This function could use some rewriting and it should call a specific validation function - # for each type (value, path, ini_file) - def _validate_field(self, f): - # These fields are mandatory - try: - unused = f["name"] - except KeyError: - return (False, f"Field {f} is missing name") - - on_missing_value = self._get_field_on_missing_value(f) - if on_missing_value not in ["error", "generate", "prompt"]: - return (False, f"onMissingValue: {on_missing_value} is invalid") - - value = self._get_field_value(f) - path = self._get_field_path(f) - ini_file = self._get_field_ini_file(f) - kind = self._get_field_kind(f) - if kind == "ini_file": - # if we are using ini_file then at least ini_key needs to be defined - # ini_section defaults to 'default' when omitted - ini_key = f.get("ini_key", None) - if ini_key is None: - return ( - False, - "ini_file requires at least ini_key to be defined", - ) - - # Test if base64 is a correct boolean (defaults to False) - unused = self._get_field_base64(f) - unused = self._get_field_override(f) - - vault_policy = f.get("vaultPolicy", None) - if vault_policy is not None and vault_policy not in self._get_vault_policies(): - return ( - False, - f"Secret has vaultPolicy set to {vault_policy} but no such policy exists", - ) - - if on_missing_value in ["error"]: - if ( - (value is None or len(value) < 1) - and (path is None or len(path) < 1) - and (ini_file is None or len(ini_file) < 1) - ): - return ( - False, - "Secret has onMissingValue set to 'error' and has neither value nor path nor ini_file set", - ) - if path is not None and not os.path.isfile(os.path.expanduser(path)): - return (False, f"Field has non-existing path: {path}") - - if ini_file is not None and not os.path.isfile( - os.path.expanduser(ini_file) - ): - return (False, f"Field has non-existing ini_file: {ini_file}") - - if on_missing_value in ["prompt"]: - # When we prompt, the user needs to set one of the following: - # - value: null # prompt for a secret without a default value - # - value: 123 # prompt for a secret but use a default value - # - path: null # prompt for a file path without a default value - # - path: /tmp/ca.crt # prompt for a file path with a default value - if "value" not in f and "path" not in f: - return ( - False, - "Secret has onMissingValue set to 'prompt' but has no value nor path fields", - ) - - if "override" in f: - return ( - False, - "'override' attribute requires 'onMissingValue' to be set to 'generate'", - ) - - return (True, "") - def _validate_secrets(self): backing_store = self._get_backingstore() secrets = self._get_secrets() @@ -414,47 +299,11 @@ def sanitize_values(self): if not ret: self.module.fail_json(msg) - def _get_secret_value(self, name, field): - on_missing_value = self._get_field_on_missing_value(field) - # We cannot use match + case as RHEL8 has python 3.9 (it needs 3.10) - # We checked for errors in _validate_secrets() already - if on_missing_value == "error": - return self._sanitize_yaml_value(field.get("value")) - elif on_missing_value == "prompt": - prompt = self._get_field_prompt(field) - if prompt is None: - prompt = f"Type secret for {name}/{field['name']}: " - value = self._get_field_value(field) - if value is not None: - prompt += f" [{value}]" - prompt += ": " - return getpass.getpass(prompt) - return None - - def _get_file_path(self, name, field): - on_missing_value = self._get_field_on_missing_value(field) - if on_missing_value == "error": - return os.path.expanduser(field.get("path")) - elif on_missing_value == "prompt": - prompt = self._get_field_prompt(field) - path = self._get_field_path(field) - if path is None: - path = "" - - if prompt is None: - text = f"Type path for file {name}/{field['name']} [{path}]: " - else: - text = f"{prompt} [{path}]: " - - newpath = getpass.getpass(text) - if newpath == "": # Set the default if no string was entered - newpath = path - - if os.path.isfile(os.path.expanduser(newpath)): - return newpath - self.module.fail_json(f"File {newpath} not found, exiting") - - self.module.fail_json("File with wrong onMissingValue") + def _process_secret_value(self, value): + """ + Override base class to add YAML sanitization + """ + return self._sanitize_yaml_value(value) def _inject_field(self, secret_name, f): on_missing_value = self._get_field_on_missing_value(f) diff --git a/roles/vault_utils/README.md b/roles/vault_utils/README.md index 7aea6f8..50dbec1 100644 --- a/roles/vault_utils/README.md +++ b/roles/vault_utils/README.md @@ -9,7 +9,7 @@ ansible-galaxy collection install kubernetes.core (formerly known as community.k ## Role Variables Defaults as to where the values-secret.yaml file is and the two ways to connect to a kubernetes cluster -(KUBERCONFIG and ~/.kube/config respectively): +(KUBECONFIG and ~/.kube/config respectively): ```yaml values_secret: "{{ lookup('env', 'HOME') }}/values-secret.yaml" diff --git a/roles/vault_utils/tasks/vault_spokes_init.yaml b/roles/vault_utils/tasks/vault_spokes_init.yaml index e362b2f..40651dd 100644 --- a/roles/vault_utils/tasks/vault_spokes_init.yaml +++ b/roles/vault_utils/tasks/vault_spokes_init.yaml @@ -37,7 +37,7 @@ ansible.builtin.set_fact: clusters: "{{ clusters | default({}) | combine({item.metadata.name: {'server_api': item.spec.managedClusterClientConfigs[0].url, - 'cluster_fqdn':_cluster_fqdn }}, recursive=True) }}" + 'cluster_fqdn': _cluster_fqdn }}, recursive=True) }}" loop: "{{ resources }}" vars: _cluster_fqdn: "{{ item.status.clusterClaims | selectattr('name', 'equalto', 'consoleurl.cluster.open-cluster-management.io') diff --git a/tests/unit/test_vault_load_secrets_v2.py b/tests/unit/test_vault_load_secrets_v2.py index 0f8ab89..0cc3f40 100644 --- a/tests/unit/test_vault_load_secrets_v2.py +++ b/tests/unit/test_vault_load_secrets_v2.py @@ -63,7 +63,6 @@ def fail_json(*args, **kwargs): @mock.patch("getpass.getpass") class TestMyModule(unittest.TestCase): - def setUp(self): self.mock_module_helper = patch.multiple( basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json @@ -220,24 +219,6 @@ def test_ensure_policies_are_injected(self, getpass): ] mock_run_command.assert_has_calls(calls) - def test_ensure_error_wrong_onmissing_value(self, getpass): - with self.assertRaises(AnsibleFailJson) as ansible_err: - set_module_args( - { - "values_secrets": os.path.join( - self.testdir_v2, "values-secret-v2-wrong-onmissingvalue.yaml" - ), - } - ) - vault_load_secrets.main() - - ret = ansible_err.exception.args[0] - self.assertEqual(ret["failed"], True) - assert ( - ret["args"][1] - == "Secret has vaultPolicy set to nonExisting but no such policy exists" - ) - def test_ensure_error_wrong_vaultpolicy(self, getpass): with self.assertRaises(AnsibleFailJson) as ansible_err: set_module_args( @@ -724,6 +705,22 @@ def test_ensure_ini_file_works(self, getpass): ] mock_run_command.assert_has_calls(calls) + def test_ensure_error_on_wrong_missing_value(self, getpass): + with self.assertRaises(AnsibleFailJson) as ansible_err: + set_module_args( + { + "values_secrets": os.path.join( + self.testdir_v2, + "values-secret-v2-wrong-onmissingvalue.yaml", + ), + } + ) + vault_load_secrets.main() + + ret = ansible_err.exception.args[0] + self.assertEqual(ret["failed"], True) + assert ret["args"][1] == "onMissingValue: notexisting is invalid" + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/v2/values-secret-v2-wrong-onmissingvalue.yaml b/tests/unit/v2/values-secret-v2-wrong-onmissingvalue.yaml index 2d53807..35364ae 100644 --- a/tests/unit/v2/values-secret-v2-wrong-onmissingvalue.yaml +++ b/tests/unit/v2/values-secret-v2-wrong-onmissingvalue.yaml @@ -1,20 +1,8 @@ version: "2.0" - backingStore: vault -vaultPolicies: - basicPolicy: | - length=10 - rule "charset" { charset = "abcdefghijklmnopqrstuvwxyz" min-chars = 1 } - rule "charset" { charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" min-chars = 1 } - rule "charset" { charset = "0123456789" min-chars = 1 } - secrets: - name: config-demo - vaultPrefixes: - - secret/region-one - - secret/snowflake.blueprints.rhecoeng.com fields: - name: secret - onMissingValue: generate - vaultPolicy: nonExisting + onMissingValue: notexisting