Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ super-linter: ## Runs super linter locally
-e VALIDATE_JSON_PRETTIER=false \
-e VALIDATE_MARKDOWN_PRETTIER=false \
-e VALIDATE_PYTHON_PYLINT=false \
-e VALIDATE_PYTHON_PYINK=false \
-e VALIDATE_PYTHON_RUFF_FORMAT=false \
-e VALIDATE_SHELL_SHFMT=false \
-e VALIDATE_YAML=false \
Expand All @@ -34,12 +35,12 @@ ansible-lint: ## run ansible lint on ansible/ folder

.PHONY: ansible-sanitytest
ansible-sanitytest: ## run ansible unit tests
ansible-test sanity --docker default
ansible-test sanity --docker default --python 3.11 --python 3.12

.PHONY: ansible-unittest
ansible-unittest: ## run ansible unit tests
rm -rf tests/output
ansible-test units --docker
ansible-test units --docker --python 3.11 --python 3.12

.PHONY: test
test: ansible-sanitytest ansible-unittest
Expand Down
1 change: 1 addition & 0 deletions playbooks/process_secrets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@
kubernetes_secret_objects: "{{ secrets_results['kubernetes_secret_objects'] }}"
vault_policies: "{{ secrets_results['vault_policies'] }}"
parsed_secrets: "{{ secrets_results['parsed_secrets'] }}"
unique_vault_prefixes: "{{ secrets_results['unique_vault_prefixes'] | default([]) }}"
256 changes: 256 additions & 0 deletions plugins/module_utils/load_secrets_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,20 @@
__metaclass__ = type

import configparser
import getpass
import os
from collections.abc import MutableMapping

default_vp_vault_policies = {
"validatedPatternDefaultPolicy": (
"length=20\n"
'rule "charset" { charset = "abcdefghijklmnopqrstuvwxyz" min-chars = 1 }\n'
'rule "charset" { charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" min-chars = 1 }\n'
'rule "charset" { charset = "0123456789" min-chars = 1 }\n'
'rule "charset" { charset = "!@#%^&*" min-chars = 1 }\n'
)
}


def find_dupes(array):
"""
Expand Down Expand Up @@ -149,3 +161,247 @@ def filter_module_args(arg_spec):
pass

return arg_spec


class SecretsV2Base:
"""
Base class with common functionality for V2 secrets handling
"""

def __init__(self, module, syaml):
self.module = module
self.syaml = syaml

def _get_vault_policies(self, enable_default_vp_policies=True):
# We start off with the hard-coded default VP policy and add the user-defined ones
policies = (
default_vp_vault_policies.copy() if enable_default_vp_policies else {}
)
policies.update(self.syaml.get("vaultPolicies", {}))
return policies

def _get_secrets(self):
return self.syaml.get("secrets", [])

def _get_field_on_missing_value(self, f):
# By default if 'onMissingValue' is missing we assume we need to
# error out whenever the value is missing
return f.get("onMissingValue", "error")

def _get_field_value(self, f):
return f.get("value", None)

def _get_field_path(self, f):
return f.get("path", None)

def _get_field_ini_file(self, f):
return f.get("ini_file", None)

def _get_field_kind(self, f):
# value: null will be interpreted with None, so let's just
# check for the existence of the field, as we use 'value: null' to say
# "we want a value/secret and not a file path"
found = []
for i in ["value", "path", "ini_file"]:
if i in f:
found.append(i)

if len(found) > 1: # you can only have one of value, path and ini_file
self.module.fail_json(f"Both '{found[0]}' and '{found[1]}' cannot be used")

if len(found) == 0:
return ""
return found[0]

def _get_field_prompt(self, f):
return f.get("prompt", None)

def _get_field_base64(self, f):
return bool(f.get("base64", False))

def _get_field_override(self, f):
return bool(f.get("override", False))

def _validate_field(self, f):
# Check mandatory fields
if "name" not in f:
return (False, f"Field {f} is missing name")

# Validate field structure and types
result = self._validate_field_structure(f)
if not result[0]:
return result

# Validate vault policy
result = self._validate_vault_policy(f)
if not result[0]:
return result

on_missing_value = self._get_field_on_missing_value(f)
# Validate based on onMissingValue type
match on_missing_value:
case "error":
return self._validate_error_mode(f)
case "generate":
return self._validate_generate_mode(f)
case "prompt":
return self._validate_prompt_mode(f)

return (False, f"onMissingValue: {on_missing_value} is invalid")

def _validate_field_structure(self, f):
"""Validate field structure and basic types"""
kind = self._get_field_kind(f)
if kind == "ini_file":
ini_key = f.get("ini_key", None)
if ini_key is None:
return (False, "ini_file requires at least ini_key to be defined")

# Test if base64 and override are correct booleans
self._get_field_base64(f)
self._get_field_override(f)

return (True, "")

def _validate_vault_policy(self, f):
"""Validate vault policy exists if specified"""
vault_policy = f.get("vaultPolicy", None)
if vault_policy is not None and vault_policy not in self._get_vault_policies():
return (
False,
f"Secret has vaultPolicy set to {vault_policy} but no such policy exists",
)
return (True, "")

def _validate_error_mode(self, f):
"""Validate fields when onMissingValue is 'error'"""
value = self._get_field_value(f)
path = self._get_field_path(f)
ini_file = self._get_field_ini_file(f)

# Check that at least one source is provided and not empty
if (
(value is None or len(value) < 1)
and (path is None or len(path) < 1)
and (ini_file is None or len(ini_file) < 1)
):
return (
False,
"Secret has onMissingValue set to 'error' and has neither value nor path nor ini_file set",
)

# Validate file paths exist
if path is not None and not os.path.isfile(os.path.expanduser(path)):
return (False, f"Field has non-existing path: {path}")

if ini_file is not None and not os.path.isfile(os.path.expanduser(ini_file)):
return (False, f"Field has non-existing ini_file: {ini_file}")

# Override not allowed in error mode
if "override" in f:
return (
False,
"'override' attribute requires 'onMissingValue' to be set to 'generate'",
)

return (True, "")

def _validate_generate_mode(self, f):
"""Validate fields when onMissingValue is 'generate'"""
value = self._get_field_value(f)
path = self._get_field_path(f)
vault_policy = f.get("vaultPolicy", None)

if value is not None:
return (
False,
"Secret has onMissingValue set to 'generate' but has a value set",
)

if path is not None:
return (
False,
"Secret has onMissingValue set to 'generate' but has a path set",
)

if vault_policy is None:
return (
False,
"Secret has no vaultPolicy but onMissingValue is set to 'generate'",
)

return (True, "")

def _validate_prompt_mode(self, f):
"""Validate fields when onMissingValue is 'prompt'"""
# When we prompt, the user needs to set one of the following:
# - value: null # prompt for a secret without a default value
# - value: 123 # prompt for a secret but use a default value
# - path: null # prompt for a file path without a default value
# - path: /tmp/ca.crt # prompt for a file path with a default value
if "value" not in f and "path" not in f:
return (
False,
"Secret has onMissingValue set to 'prompt' but has no value nor path fields",
)

# Override not allowed in prompt mode
if "override" in f:
return (
False,
"'override' attribute requires 'onMissingValue' to be set to 'generate'",
)

return (True, "")

def _get_secret_value(self, name, field):
on_missing_value = self._get_field_on_missing_value(field)
# We checked for errors in _validate_secrets() already
match on_missing_value:
case "error":
value = field.get("value")
# Allow subclasses to override value processing
return self._process_secret_value(value)
case "prompt":
prompt = self._get_field_prompt(field)
if prompt is None:
prompt = f"Type secret for {name}/{field['name']}: "
value = self._get_field_value(field)
if value is not None:
prompt += f" [{value}]"
prompt += ": "
return getpass.getpass(prompt)
case _:
return None

def _process_secret_value(self, value):
"""
Process a secret value. Can be overridden by subclasses.
"""
return value

def _get_file_path(self, name, field):
on_missing_value = self._get_field_on_missing_value(field)
match on_missing_value:
case "error":
return os.path.expanduser(field.get("path"))
case "prompt":
prompt = self._get_field_prompt(field)
path = self._get_field_path(field)
if path is None:
path = ""

if prompt is None:
text = f"Type path for file {name}/{field['name']} [{path}]: "
else:
text = f"{prompt} [{path}]: "

newpath = getpass.getpass(text)
if newpath == "": # Set the default if no string was entered
newpath = path

if os.path.isfile(os.path.expanduser(newpath)):
return newpath
self.module.fail_json(f"File {newpath} not found, exiting")
case _:
self.module.fail_json("File with wrong onMissingValue")
Loading