Skip to content
Merged
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ super-linter: ## Runs super linter locally
-e VALIDATE_JSON_PRETTIER=false \
-e VALIDATE_MARKDOWN_PRETTIER=false \
-e VALIDATE_PYTHON_PYLINT=false \
-e VALIDATE_PYTHON_PYINK=false \
-e VALIDATE_PYTHON_RUFF_FORMAT=false \
-e VALIDATE_SHELL_SHFMT=false \
-e VALIDATE_YAML=false \
Expand All @@ -34,12 +35,12 @@ ansible-lint: ## run ansible lint on ansible/ folder

.PHONY: ansible-sanitytest
ansible-sanitytest: ## run ansible unit tests
ansible-test sanity --docker default
ansible-test sanity --docker default --python 3.11 --python 3.12

.PHONY: ansible-unittest
ansible-unittest: ## run ansible unit tests
rm -rf tests/output
ansible-test units --docker
ansible-test units --docker --python 3.11 --python 3.12

.PHONY: test
test: ansible-sanitytest ansible-unittest
Expand Down
256 changes: 256 additions & 0 deletions plugins/module_utils/load_secrets_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,20 @@
__metaclass__ = type

import configparser
import getpass
import os
from collections.abc import MutableMapping

default_vp_vault_policies = {
"validatedPatternDefaultPolicy": (
"length=20\n"
'rule "charset" { charset = "abcdefghijklmnopqrstuvwxyz" min-chars = 1 }\n'
'rule "charset" { charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" min-chars = 1 }\n'
'rule "charset" { charset = "0123456789" min-chars = 1 }\n'
'rule "charset" { charset = "!@#%^&*" min-chars = 1 }\n'
)
}


def find_dupes(array):
"""
Expand Down Expand Up @@ -149,3 +161,247 @@ def filter_module_args(arg_spec):
pass

return arg_spec


class SecretsV2Base:
"""
Base class with common functionality for V2 secrets handling
"""

def __init__(self, module, syaml):
self.module = module
self.syaml = syaml

def _get_vault_policies(self, enable_default_vp_policies=True):
# We start off with the hard-coded default VP policy and add the user-defined ones
policies = (
default_vp_vault_policies.copy() if enable_default_vp_policies else {}
)
policies.update(self.syaml.get("vaultPolicies", {}))
return policies

def _get_secrets(self):
return self.syaml.get("secrets", [])

def _get_field_on_missing_value(self, f):
# By default if 'onMissingValue' is missing we assume we need to
# error out whenever the value is missing
return f.get("onMissingValue", "error")

def _get_field_value(self, f):
return f.get("value", None)

def _get_field_path(self, f):
return f.get("path", None)

def _get_field_ini_file(self, f):
return f.get("ini_file", None)

def _get_field_kind(self, f):
# value: null will be interpreted with None, so let's just
# check for the existence of the field, as we use 'value: null' to say
# "we want a value/secret and not a file path"
found = []
for i in ["value", "path", "ini_file"]:
if i in f:
found.append(i)

if len(found) > 1: # you can only have one of value, path and ini_file
self.module.fail_json(f"Both '{found[0]}' and '{found[1]}' cannot be used")

if len(found) == 0:
return ""
return found[0]

def _get_field_prompt(self, f):
return f.get("prompt", None)

def _get_field_base64(self, f):
return bool(f.get("base64", False))

def _get_field_override(self, f):
return bool(f.get("override", False))

def _validate_field(self, f):
# Check mandatory fields
if "name" not in f:
return (False, f"Field {f} is missing name")

# Validate field structure and types
result = self._validate_field_structure(f)
if not result[0]:
return result

# Validate vault policy
result = self._validate_vault_policy(f)
if not result[0]:
return result

on_missing_value = self._get_field_on_missing_value(f)
# Validate based on onMissingValue type
match on_missing_value:
case "error":
return self._validate_error_mode(f)
case "generate":
return self._validate_generate_mode(f)
case "prompt":
return self._validate_prompt_mode(f)

return (False, f"onMissingValue: {on_missing_value} is invalid")

def _validate_field_structure(self, f):
"""Validate field structure and basic types"""
kind = self._get_field_kind(f)
if kind == "ini_file":
ini_key = f.get("ini_key", None)
if ini_key is None:
return (False, "ini_file requires at least ini_key to be defined")

# Test if base64 and override are correct booleans
self._get_field_base64(f)
self._get_field_override(f)

return (True, "")

def _validate_vault_policy(self, f):
"""Validate vault policy exists if specified"""
vault_policy = f.get("vaultPolicy", None)
if vault_policy is not None and vault_policy not in self._get_vault_policies():
return (
False,
f"Secret has vaultPolicy set to {vault_policy} but no such policy exists",
)
return (True, "")

def _validate_error_mode(self, f):
"""Validate fields when onMissingValue is 'error'"""
value = self._get_field_value(f)
path = self._get_field_path(f)
ini_file = self._get_field_ini_file(f)

# Check that at least one source is provided and not empty
if (
(value is None or len(value) < 1)
and (path is None or len(path) < 1)
and (ini_file is None or len(ini_file) < 1)
):
return (
False,
"Secret has onMissingValue set to 'error' and has neither value nor path nor ini_file set",
)

# Validate file paths exist
if path is not None and not os.path.isfile(os.path.expanduser(path)):
return (False, f"Field has non-existing path: {path}")

if ini_file is not None and not os.path.isfile(os.path.expanduser(ini_file)):
return (False, f"Field has non-existing ini_file: {ini_file}")

# Override not allowed in error mode
if "override" in f:
return (
False,
"'override' attribute requires 'onMissingValue' to be set to 'generate'",
)

return (True, "")

def _validate_generate_mode(self, f):
"""Validate fields when onMissingValue is 'generate'"""
value = self._get_field_value(f)
path = self._get_field_path(f)
vault_policy = f.get("vaultPolicy", None)

if value is not None:
return (
False,
"Secret has onMissingValue set to 'generate' but has a value set",
)

if path is not None:
return (
False,
"Secret has onMissingValue set to 'generate' but has a path set",
)

if vault_policy is None:
return (
False,
"Secret has no vaultPolicy but onMissingValue is set to 'generate'",
)

return (True, "")

def _validate_prompt_mode(self, f):
"""Validate fields when onMissingValue is 'prompt'"""
# When we prompt, the user needs to set one of the following:
# - value: null # prompt for a secret without a default value
# - value: 123 # prompt for a secret but use a default value
# - path: null # prompt for a file path without a default value
# - path: /tmp/ca.crt # prompt for a file path with a default value
if "value" not in f and "path" not in f:
return (
False,
"Secret has onMissingValue set to 'prompt' but has no value nor path fields",
)

# Override not allowed in prompt mode
if "override" in f:
return (
False,
"'override' attribute requires 'onMissingValue' to be set to 'generate'",
)

return (True, "")

def _get_secret_value(self, name, field):
on_missing_value = self._get_field_on_missing_value(field)
# We checked for errors in _validate_secrets() already
match on_missing_value:
case "error":
value = field.get("value")
# Allow subclasses to override value processing
return self._process_secret_value(value)
case "prompt":
prompt = self._get_field_prompt(field)
if prompt is None:
prompt = f"Type secret for {name}/{field['name']}: "
value = self._get_field_value(field)
if value is not None:
prompt += f" [{value}]"
prompt += ": "
return getpass.getpass(prompt)
case _:
return None

def _process_secret_value(self, value):
"""
Process a secret value. Can be overridden by subclasses.
"""
return value

def _get_file_path(self, name, field):
on_missing_value = self._get_field_on_missing_value(field)
match on_missing_value:
case "error":
return os.path.expanduser(field.get("path"))
case "prompt":
prompt = self._get_field_prompt(field)
path = self._get_field_path(field)
if path is None:
path = ""

if prompt is None:
text = f"Type path for file {name}/{field['name']} [{path}]: "
else:
text = f"{prompt} [{path}]: "

newpath = getpass.getpass(text)
if newpath == "": # Set the default if no string was entered
newpath = path

if os.path.isfile(os.path.expanduser(newpath)):
return newpath
self.module.fail_json(f"File {newpath} not found, exiting")
case _:
self.module.fail_json("File with wrong onMissingValue")
Loading