Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"]

steps:
- uses: actions/checkout@v4
Expand All @@ -25,10 +25,10 @@ jobs:
uses: actions/cache@v4
with:
path: ~/.local
key: poetry-1.2.2-0
key: poetry-2.0.0-0
- uses: snok/install-poetry@v1
with:
version: 1.2.2
version: 2.0.0
virtualenvs-create: true
virtualenvs-in-project: true
- name: Install dependencies
Expand All @@ -50,7 +50,7 @@ jobs:
python-version: '3.11'
- uses: snok/install-poetry@v1
with:
version: 1.2.2
version: 2.0.0
virtualenvs-create: true
virtualenvs-in-project: true
- name: Install dependencies
Expand Down
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ repos:
additional_dependencies:
- flake8-bugbear
- flake8-comprehensions
- flake8-simplify

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.10.0
Expand Down
7 changes: 5 additions & 2 deletions isops/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import pkg_resources
try:
from importlib.metadata import version
except ImportError:
from importlib_metadata import version # type: ignore[import-not-found,no-redef]

__version__ = pkg_resources.get_distribution("isops").version
__version__ = version("isops")
43 changes: 36 additions & 7 deletions isops/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re
from pathlib import Path
from typing import Dict, List, Pattern, Tuple
from typing import Dict, List, Optional, Pattern, Tuple

import click

Expand All @@ -10,6 +10,7 @@
find_all_files_by_regex,
find_by_key,
load_all_yaml,
load_all_yaml_with_encoding,
verify_encryption_regex,
)

Expand All @@ -31,6 +32,27 @@ def _categorize_keys_based_on_their_values(
return good_keys, bad_keys


def _print_status(file: Path, key: str, is_safe: bool, encoding: Optional[str]) -> None:
"""Print status line with optional encoding warning.

Args:
file: The file path being checked.
key: The secret key name.
is_safe: Whether the secret is safely encrypted.
encoding: The detected file encoding, or None.
"""
status = "[SAFE]" if is_safe else "[UNSAFE]"
color = "green" if is_safe else "red"

click.secho(message=f"{file}::{key} ", bold=False, nl=False)
click.secho(message=status, bold=False, fg=color, nl=False)

if encoding and encoding.startswith("utf-16"):
click.secho(message=" [UTF-16 ENCODING]", bold=False, fg="yellow")
else:
click.echo() # Just newline


def _validate_regex(ctx: click.Context, param: click.Parameter, value: str) -> str:
try:
re.compile(value)
Expand Down Expand Up @@ -70,6 +92,9 @@ def cli(ctx: click.Context, path: Path, config_regex: Pattern[str], summary: boo
creation_rules = []
for match_path in find_all_files_by_regex(config_regex, received_path):
for config in load_all_yaml(Path(match_path)):
# Skip None (empty YAML documents)
if config is None:
continue
try:
creation_rules += config["creation_rules"]
click.secho(message=f"Found config file: {match_path}", bold=True, fg="blue")
Expand Down Expand Up @@ -132,12 +157,18 @@ def cli(ctx: click.Context, path: Path, config_regex: Pattern[str], summary: boo
break

for file in find_all_files_by_regex(path_regex, received_path):
if not load_all_yaml(file):
yaml_data, encoding = load_all_yaml_with_encoding(file)

if not yaml_data:
click.secho(message=f"{file} is not a valid YAML!", bold=True, fg="red")
broken_yaml_found = f"{file}"
break

for secret in load_all_yaml(file):
for secret in yaml_data:
# Skip None (empty YAML documents)
if secret is None:
continue

if "sops" in secret:
secret.pop("sops", None)

Expand All @@ -148,12 +179,10 @@ def cli(ctx: click.Context, path: Path, config_regex: Pattern[str], summary: boo

for key in all_keys:
if key in good_keys:
click.secho(message=f"{file}::{key} ", bold=False, nl=False)
click.secho(message="[SAFE]", bold=False, fg="green")
_print_status(file, key, True, encoding)
good_keys_number += 1
else:
click.secho(message=f"{file}::{key} ", bold=False, nl=False)
click.secho(message="[UNSAFE]", bold=False, fg="red")
_print_status(file, key, False, encoding)
bad_keys_number += 1
if summary:
summary_line = f"UNSAFE secret '{key}' in '{file}'"
Expand Down
4 changes: 4 additions & 0 deletions isops/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
from isops.utils.helpers import (
all_dict_values,
detect_encoding,
find_all_files_by_regex,
find_by_key,
load_all_yaml,
load_all_yaml_with_encoding,
load_yaml,
)
from isops.utils.sops import verify_encryption_regex

__all__ = [
"load_yaml",
"load_all_yaml",
"load_all_yaml_with_encoding",
"detect_encoding",
"find_by_key",
"all_dict_values",
"verify_encryption_regex",
Expand Down
127 changes: 117 additions & 10 deletions isops/utils/helpers.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,45 @@
import os
import re
from pathlib import Path
from typing import Dict, Generator, List, Pattern, Tuple
from typing import Dict, Generator, List, Optional, Pattern, Tuple

import pathspec
from ruamel.yaml import YAML, YAMLError
from ruamel.yaml.parser import ParserError
from ruamel.yaml.scanner import ScannerError


def detect_encoding(path: Path) -> Optional[str]:
"""Detect the encoding of a file using BOM markers.

Checks for UTF-16 BOM markers. Defaults to UTF-8 for files without BOM.
Only reads a sample of the file for performance.

Args:
path (Path): The path of the file.

Returns:
Optional[str]: 'utf-16-le', 'utf-16-be', 'utf-8', or None on error.
"""
try:
with open(path, "rb") as f:
# Read first 2 bytes for BOM check
bom = f.read(2)
if len(bom) < 2:
return None

# Check for UTF-16 BOM markers
if bom == b"\xfe\xff":
return "utf-16-be"
elif bom == b"\xff\xfe":
return "utf-16-le"

# Default to UTF-8 for files without BOM
return "utf-8"
except OSError:
return None


def load_yaml(path: Path) -> Dict:
"""Load a YAML content into a python dictionary.

Expand All @@ -20,7 +52,7 @@ def load_yaml(path: Path) -> Dict:
try:
yaml = YAML(typ="safe")
return yaml.load(path)
except YAMLError:
except (YAMLError, UnicodeDecodeError):
return {}


Expand All @@ -38,10 +70,38 @@ def load_all_yaml(path: Path) -> List[Dict]:
try:
yaml = YAML(typ="safe")
return list(yaml.load_all(path))
except (ParserError, ScannerError):
except (ParserError, ScannerError, UnicodeDecodeError):
return []


def load_all_yaml_with_encoding(path: Path) -> Tuple[List[Dict], Optional[str]]:
"""Like load_all_yaml, but also returns the detected encoding.

Args:
path (Path): The path of the YAML file.

Returns:
Tuple[List[Dict], Optional[str]]: A tuple containing:
- List of dictionaries corresponding to the different yaml blocks
- The detected encoding (e.g., 'utf-8', 'utf-16') or None
If parsing fails or file cannot be read, returns ([], None).
"""
encoding = detect_encoding(path)

# Handle UTF-16 files explicitly
if encoding and encoding.startswith("utf-16"):
try:
yaml = YAML(typ="safe")
with open(path, "r", encoding=encoding) as f:
return list(yaml.load_all(f)), encoding
except (ParserError, ScannerError, UnicodeDecodeError, OSError):
return [], None

# For UTF-8 or errors, use standard load_all_yaml
data = load_all_yaml(path)
return data, encoding if data else None


def find_by_key(data: Dict, target: Pattern[str]) -> Generator[Dict, None, None]:
"""Find the innermost key-value pair children of a target key in a dictionary.

Expand Down Expand Up @@ -82,25 +142,72 @@ def all_dict_values(data: Dict) -> Generator[Tuple[str, str], None, None]:
yield from all_dict_values(value)
elif isinstance(value, list):
for elem in value:
yield from all_dict_values(elem)
# Only recurse if the element is a dict
if isinstance(elem, dict):
yield from all_dict_values(elem)
elif not isinstance(value, dict):
yield key, str(value)


def _load_gitignore_spec(search_path: Path) -> Optional[pathspec.PathSpec]:
"""Load .gitignore patterns from the search path.

Args:
search_path (Path): The root path to search for .gitignore.

Returns:
Optional[pathspec.PathSpec]: A PathSpec object, or None if no .gitignore found
or if parsing fails.
"""
gitignore_path = search_path / ".gitignore"
if not (gitignore_path.exists() and gitignore_path.is_file()):
return None

try:
with open(gitignore_path, "r", encoding="utf-8") as f:
patterns = f.read().splitlines()
return pathspec.PathSpec.from_lines("gitwildmatch", patterns)
except (OSError, UnicodeDecodeError):
# File access or encoding issues - return None
return None


def find_all_files_by_regex(regex: Pattern[str], path: Path) -> Generator[Path, None, None]:
"""Find all the files that match a regular expression.

Respects .gitignore patterns if a .gitignore file exists in the search path.
Automatically excludes .git directory.

Args:
regex (Pattern[str]): Regex pattern.
regex (Pattern[str]): Regex pattern (string or compiled).
path (Path): Path of the root directory to search.

Yields:
Generator[Path, None, None]: Iterable of all the files
in 'path' that match the 'regex'.
"""
pattern: Pattern[str] = re.compile(regex)
for root, _, files in os.walk(path):
# Ensure pattern is compiled (handles both string and Pattern inputs)
pattern = re.compile(regex) if isinstance(regex, str) else regex
gitignore_spec = _load_gitignore_spec(path)

for root, dirs, files in os.walk(path):
root_path = Path(root)
rel_root = root_path.relative_to(path)

# Filter directories: exclude .git and gitignored dirs in one pass
dirs[:] = [
d
for d in dirs
if d != ".git"
and (not gitignore_spec or not gitignore_spec.match_file(str(rel_root / d) + "/"))
]

for file in files:
match = pattern.search(os.path.join(root, file))
if match:
yield Path(os.path.join(root, file))
file_path = root_path / file

# Check if file matches the regex and is not ignored
if pattern.search(str(file_path)):
if not gitignore_spec or not gitignore_spec.match_file(
str(file_path.relative_to(path))
):
yield file_path
Loading