diff --git a/valkyrie/plugins/__init__.py b/valkyrie/plugins/__init__.py index 000287b..4818cd4 100644 --- a/valkyrie/plugins/__init__.py +++ b/valkyrie/plugins/__init__.py @@ -1,7 +1,8 @@ """ """ from pathlib import Path -from typing import List, Set, Dict, Any +from typing import List, Set, Dict, Any, Optional +import logging from valkyrie.core.types import ( RuleMetadata, SecurityFinding, ScanRule, @@ -15,8 +16,13 @@ class BaseSecurityRule(ScanRule): """Base implementation for security rules""" - def __init__(self, metadata: RuleMetadata): + def __init__( + self, + metadata: RuleMetadata, + logger: Optional[logging.Logger] = None + ): self._metadata = metadata + self.logger = logger or logging.getLogger(__name__) @property def metadata(self) -> RuleMetadata: @@ -42,9 +48,13 @@ async def scan( class PluginManager: """Manages scanner plugins and their lifecycle""" - def __init__(self): + def __init__( + self, + logger: Optional[logging.Logger] = None + ): self.plugins: Dict[str, ScannerPlugin] = {} self.enabled_plugins: Set[str] = set() + self.logger = logger or logging.getLogger(__name__) async def register_plugin( self, diff --git a/valkyrie/plugins/vulnera/__init__.py b/valkyrie/plugins/vulnera/__init__.py new file mode 100644 index 0000000..354a67d --- /dev/null +++ b/valkyrie/plugins/vulnera/__init__.py @@ -0,0 +1,172 @@ +import hashlib +from pathlib import Path +from typing import Dict, List, Any + +from valkyrie.plugins import BaseSecurityRule +from valkyrie.core.types import ( + ScanRule, ScannerPlugin, RuleMetadata, SecurityFinding, + FileLocation, SeverityLevel, FindingCategory, +) + +from .conf import VulnerabilityInfo +from .parser import parse_dependencies, is_supported + + +#### +## VULNERABILITY RULE +##### +class DependencyVulnerabilityRule(BaseSecurityRule): + """Rule for detecting vulnerable dependencies""" + + def __init__( + self, + vulnerability_db: Dict[str, List[VulnerabilityInfo]] + ): + metadata = RuleMetadata( + id = "deps-001", + name = "Dependency Vulnerability Scanner", + description = "Scans dependencies for known vulnerabilities", + category = FindingCategory.DEPENDENCIES, + severity = SeverityLevel.HIGH, + author = "Valkyrie Core Team", + tags = {"dependencies", "vulnerabilities", "sbom"} + ) + super().__init__(metadata) + self.vulnerability_db = vulnerability_db + + def is_applicable(self, file_path: Path) -> bool: + """Check if file is a supported dependency manifest""" + + return is_supported(file_path) + + async def scan( + self, + file_path: Path, + content: str + ) -> List[SecurityFinding]: + """Scan dependency file for vulnerabilities.""" + + findings = [] + + try: + dependencies = await self._parse_dependencies(file_path) + + for dep_name, version in dependencies.items(): + if dep_name in self.vulnerability_db: + vulnerabilities = self.vulnerability_db[dep_name] + + for vuln in vulnerabilities: + if self._is_version_affected(version, vuln.affected_versions): + + # Then add it to findings + finding = SecurityFinding( + id = hashlib.md5(f"{file_path}:{dep_name}:{vuln.cve_id}".encode()).hexdigest(), + title = f"Vulnerable dependency: {dep_name}", + description = f"Dependency {dep_name}@{version} has vulnerability {vuln.cve_id}: {vuln.description}", + severity = vuln.severity, + category = self.metadata.category, + location = FileLocation(file_path=file_path, line_number=1), + rule_id = self.metadata.id, + confidence = 0.9, + metadata = { + "dependency": dep_name, + "version": version, + "cve_id": vuln.cve_id, + "fixed_versions": vuln.fixed_versions, + "references": vuln.references + }, + remediation_advice = ( + f"Update {dep_name} to version " + f"{', '.join(vuln.fixed_versions) if vuln.fixed_versions else 'latest'}" + ) + ) + findings.append(finding) + + except Exception as e: + # Log parsing error but don't fail the scan + self.logger.warning( + f'Error scanning file {file_path}: {e}' + ) + + return findings + + async def _parse_dependencies(self, file_path: Path) -> Dict[str, str]: + """Parse dependencies from file content""" + + dependencies = {} + + # Parse the dependency file + for dep in parse_dependencies(file_path=file_path): + dependencies[dep.name] = dep.version + + return dependencies + + def _is_version_affected( + self, + version: str, + affected_versions: List[str] + ) -> bool: + """Check if version is affected by vulnerability""" + # Simplified version comparison for now + # I'll use a proper semver library in next push + return version in affected_versions + + +#### +## VULNERABILITY PLUGIN +##### +class DependenciesPlugin(ScannerPlugin): + """Plugin for dependency vulnerability scanning""" + + def __init__(self): + self.vulnerability_db: Dict[str, List[VulnerabilityInfo]] = {} + + @property + def name(self) -> str: + return "vulnera" + + @property + def version(self) -> str: + return "1.0.0" + + async def initialize(self, config: Dict[str, Any]) -> None: + """Initialize plugin and load vulnerability database""" + await self._load_vulnerability_db() + + async def _load_vulnerability_db(self) -> None: + """Load vulnerability database from external sources""" + + # Normally we need to call an external service + # Or load a local vulnerabilities dbm + # but i'm usinng a mock database and + # i'll fix that in next push + self.vulnerability_db = { + "lodash": [ + VulnerabilityInfo( + cve_id="CVE-2021-23337", + severity=SeverityLevel.HIGH, + description="Prototype pollution in lodash", + affected_versions=["4.17.20"], + fixed_versions=["4.17.21"], + references=["https://nvd.nist.gov/vuln/detail/CVE-2021-23337"] + ) + ], + "requests": [ + VulnerabilityInfo( + cve_id="CVE-2023-32681", + severity=SeverityLevel.MEDIUM, + description="Certificate verification bypass in requests", + affected_versions=["2.30.0", "2.29.0"], + fixed_versions=["2.31.0"], + references=["https://nvd.nist.gov/vuln/detail/CVE-2023-32681"] + ) + ] + } + + async def get_rules(self) -> List[ScanRule]: + """Return dependency scanning rules""" + return [DependencyVulnerabilityRule(self.vulnerability_db)] + + async def cleanup(self) -> None: + """Cleanup plugin resources""" + pass diff --git a/valkyrie/plugins/vulnera/conf.py b/valkyrie/plugins/vulnera/conf.py new file mode 100644 index 0000000..196d9ca --- /dev/null +++ b/valkyrie/plugins/vulnera/conf.py @@ -0,0 +1,60 @@ +from typing import List, Optional +from dataclasses import dataclass, field +from valkyrie.core.types import ( + SeverityLevel +) + + +#### +## VULNERABILITY MODEL +##### +@dataclass +class VulnerabilityInfo: + """Information about a vulnerability""" + + cve_id: str + severity: SeverityLevel + description: str + affected_versions: List[str] + fixed_versions: List[str] = field(default_factory=list) + references: List[str] = field(default_factory=list) + + +#### +## DEPENDENCY MODEL +##### +@dataclass +class Dependency: + """Project dependency rpresentation model""" + + name: str + version: Optional[str] = None + dev: bool = False + source: Optional[str] = None + + def __str__(self): + version_str = f"@{self.version}" if self.version else "" + dev_str = " (dev)" if self.dev else "" + return f"{self.name}{version_str}{dev_str}" + + +#### DEPENDENCIES +DEPS_FIES = { + # Node.js + 'package.json', 'package-lock.json', 'yarn.lock', + + # Python + 'requirements.txt', 'Pipfile', 'Pipfile.lock', 'poetry.lock', + + # Java + 'pom.xml', 'gradle.build', + + # Rust + 'Cargo.toml', 'Cargo.lock', + + # Go + 'go.mod', 'go.sum', + + # PHP + 'composer.json', 'composer.lock' +} diff --git a/valkyrie/plugins/vulnera/parser.py b/valkyrie/plugins/vulnera/parser.py new file mode 100644 index 0000000..9d1621d --- /dev/null +++ b/valkyrie/plugins/vulnera/parser.py @@ -0,0 +1,652 @@ +""" +Valkyrie vulnera plugin dependency parser module. +""" + +import json +import re +import xml.etree.ElementTree as ET +from pathlib import Path +from typing import Dict, List, Optional, Union, Type +import toml + +from .conf import Dependency + + +#### +## DEPENDENCY PARSER ERROR CLASS +##### +class DependencyParserError(Exception): + """Exception raised when an error occurs in dependency file parsing.""" + pass + + +#### +## BASE DEPENDENCY PARSER +##### +class BaseDependencyParser: + """Base calss for all dependenccy parsers.""" + + def __init__(self, file_path: Union[str, Path]): + self.file_path = Path(file_path) + + if not self.file_path.exists(): + raise FileNotFoundError( + f"The {self.file_path} file doesn't exist." + ) + + @property + def dep_file(self) -> str: + raise NotImplementedError( + "'dep_file' property must be implemented in all subclasses." + ) + + def parse(self) -> List[Dependency]: + """Parse a dependency file and return a deps list""" + + raise NotImplementedError( + "Parse method must be implemented in all subclasses." + ) + + def _read_file(self) -> str: + """Read and returns a dependency file content.""" + + try: + with open(self.file_path, 'r', encoding='utf-8') as f: + return f.read() + + except UnicodeDecodeError: + # Fallback for files with different encoding + with open(self.file_path, 'r', encoding='latin-1') as f: + return f.read() + + +#### +## MAIN DEPENDENCY PARSER +##### +class DependencyParser: + """Main dependency parser""" + + _PARSERS: Dict[str,Type[BaseDependencyParser]] = {} + + @classmethod + def register(cls, name: Optional[str] = None) -> None: # type: ignore + """Register a new Parser class.""" + + def wrapper(adapter: Type[BaseDependencyParser]): + """Wrapper""" + + nonlocal name + name = name or adapter.dep_file + name = name.upper() + if name not in cls._PARSERS.keys(): + cls._registry[name] = adapter + + return adapter + + return wrapper + + @classmethod + def get(cls, name: str) -> Type[BaseDependencyParser]: # type: ignore + """Get a Parser class by its name.""" + + if name not in cls._registry: + raise DependencyParserError( + f"Invalid Parser name: '{name}' not found." + ) + + return cls._PARSERS[name] + + @classmethod + def all(cls) -> List[Type[BaseDependencyParser]]: # type: ignore + """Get all registered Parsers classes.""" + return list(cls._PARSERS.values()) + + @classmethod + def clear(cls) -> None: + """Clear the registry.""" + cls._PARSERS.clear() + + @classmethod + def get_supported_files(cls) -> List[str]: + """List all registered Parsers names.""" + return list(cls._PARSERS.keys()) + + @classmethod + def parse(cls, file_path: Union[str, Path]) -> List[Dependency]: + """ + Parse a dependency file and return a dependencies list + + Args: + file_path: path to the dependency file + + Returns: + List of dependencies + + Raises: + DependencyParserError: if file format is not supported + FileNotFoundError: if hthe file doesn't exist + """ + + file_path = Path(file_path) + filename = file_path.name + + if filename not in cls._PARSERS: + raise DependencyParserError( + f"Usupported file format: {filename}. " + f"Supported format are: {', '.join(cls._PARSERS.keys())}" + ) + + parser_class = cls.get(filename) + parser = parser_class(file_path) + + try: + return parser.parse() + except Exception as e: + raise DependencyParserError( + f"Error when parsing {filename}: {str(e)}" + ) from e + + @classmethod + def is_supported(cls, file_path: Union[str, Path]) -> bool: + """check if a file is supported.""" + + filename = Path(file_path).name + return filename in cls._PARSERS + + +#### +## "Package.json" PARSER +##### +DependencyParser.register() +class PackageJsonParser(BaseDependencyParser): + """Parser for package.json (Node.js)""" + + @property + def dep_file(self): + return "package.json" + + def parse(self) -> List[Dependency]: + content = self._read_file() + data = json.loads(content) + + dependencies = [] + + # Production + if 'dependencies' in data: + for name, version in data['dependencies'].items(): + dependencies.append(Dependency(name, version, dev=False)) + + # Developement + if 'devDependencies' in data: + for name, version in data['devDependencies'].items(): + dependencies.append(Dependency(name, version, dev=True)) + + return dependencies + + +#### +## "package-lock.json" PARSER +##### +DependencyParser.register() +class PackageLockParser(BaseDependencyParser): + """Parser foor package-lock.json (Node.js)""" + + @property + def dep_file(self): + return "package-lock.json" + + def parse(self) -> List[Dependency]: + content = self._read_file() + data = json.loads(content) + + dependencies = [] + + # Format npm v7+ + if 'packages' in data: + for path, info in data['packages'].items(): + if path == "": # Skip root package + continue + + name = path.replace('node_modules/', '') + version = info.get('version') + is_dev = info.get('dev', False) + dependencies.append(Dependency(name, version, dev=is_dev)) + + # Format npm v6 + elif 'dependencies' in data: + for name, info in data['dependencies'].items(): + version = info.get('version') + is_dev = info.get('dev', False) + dependencies.append(Dependency(name, version, dev=is_dev)) + + return dependencies + + +#### +## "yarn.lock" PARSER +##### +DependencyParser.register() +class YarnLockParser(BaseDependencyParser): + """Parser for yarn.lock (Node.js)""" + + @property + def dep_file(self): + return "yarn.lock" + + def parse(self) -> List[Dependency]: + content = self._read_file() + dependencies = [] + + # yarn.lock entries matching pattern + pattern = r'^([^#\s][^:]*?):\s*\n(?:\s+.*\n)*?\s+version\s+"([^"]+)"' + matches = re.findall(pattern, content, re.MULTILINE) + + seen = set() + for name_pattern, version in matches: + # Extract package name (before @) + name = re.split(r'@(?=\d)', name_pattern.split(',')[0].strip())[0].strip('"') + if name not in seen: + dependencies.append(Dependency(name, version)) + seen.add(name) + + return dependencies + + +#### +## "requirements.txt" PARSER +##### +DependencyParser.register() +class RequirementsTxtParser(BaseDependencyParser): + """Parser for requirements.txt (Python)""" + + @property + def dep_file(self): + return "requirements.txt" + + def parse(self) -> List[Dependency]: + content = self._read_file() + dependencies = [] + + for line in content.strip().split('\n'): + line = line.strip() + if not line or line.startswith('#'): + continue + + # Skip pip options like -r, -e, etc. + if line.startswith('-'): + continue + + # matching name==version, nane>=version, etc. + match = re.match(r'^([a-zA-Z0-9_.-]+)([><=!]*)(.*?)(?:\s*#.*)?$', line) + if match: + name = match.group(1) + operator = match.group(2) + version = match.group(3).strip() if match.group(3) else None + version_str = f"{operator}{version}" if version else None + dependencies.append(Dependency(name, version_str)) + + return dependencies + + +#### +## "Pipfile" PARSER +##### +DependencyParser.register() +class PipfileParser(BaseDependencyParser): + """Parser for Pipfile (Python)""" + + @property + def dep_file(self): + return "Pipfile" + + def parse(self) -> List[Dependency]: + content = self._read_file() + data = toml.loads(content) + + dependencies = [] + + # Production + if 'packages' in data: + for name, version_info in data['packages'].items(): + version = ( + version_info + if isinstance(version_info, str) + else version_info.get('version', '*') + ) + dependencies.append(Dependency(name, version, dev=False)) + + # Developement + if 'dev-packages' in data: + for name, version_info in data['dev-packages'].items(): + version = ( + version_info + if isinstance(version_info, str) + else version_info.get('version', '*') + ) + dependencies.append(Dependency(name, version, dev=True)) + + return dependencies + + +#### +## "Pipfile.lock" PARSER +##### +DependencyParser.register() +class PipfileLockParser(BaseDependencyParser): + """Parser for Pipfile.lock (Python)""" + + @property + def dep_file(self): + return "Pipfile.lock" + + def parse(self) -> List[Dependency]: + content = self._read_file() + data = json.loads(content) + + dependencies = [] + + # Production + if 'default' in data: + for name, info in data['default'].items(): + version = info.get('version', '').replace('==', '') + dependencies.append(Dependency(name, version, dev=False)) + + # Developement + if 'develop' in data: + for name, info in data['develop'].items(): + version = info.get('version', '').replace('==', '') + dependencies.append(Dependency(name, version, dev=True)) + + return dependencies + + +#### +## "poetry.lock" PARSER +##### +DependencyParser.register() +class PoetryLockParser(BaseDependencyParser): + """Parser for poetry.lock (Python)""" + + @property + def dep_file(self): + return "poetry.lock" + + def parse(self) -> List[Dependency]: + content = self._read_file() + data = toml.loads(content) + + dependencies = [] + + if 'package' in data: + for package in data['package']: + name = package.get('name') + version = package.get('version') + category = package.get('category', 'main') + is_dev = category == 'dev' + dependencies.append(Dependency(name, version, dev=is_dev)) + + return dependencies + + +#### +## "pom.xml" PARSER +##### +DependencyParser.register() +class PomXmlParser(BaseDependencyParser): + """Parser for pom.xml (Java/Maven)""" + + @property + def dep_file(self): + return "pom.xml" + + def parse(self) -> List[Dependency]: + content = self._read_file() + root = ET.fromstring(content) + + # Maven namespaces + namespaces = {'maven': 'http://maven.apache.org/POM/4.0.0'} + if root.tag.startswith('{'): + ns = root.tag.split('}')[0] + '}' + namespaces['maven'] = ns[1:-1] + + dependencies = [] + + # Find all dependencies + for dep in root.findall('.//maven:dependency', namespaces) or root.findall('.//dependency'): + group_id = dep.find('maven:groupId', namespaces) or dep.find('groupId') + artifact_id = dep.find('maven:artifactId', namespaces) or dep.find('artifactId') + version = dep.find('maven:version', namespaces) or dep.find('version') + scope = dep.find('maven:scope', namespaces) or dep.find('scope') + + if group_id is not None and artifact_id is not None: + name = f"{group_id.text}:{artifact_id.text}" + version_str = version.text if version is not None else None + is_dev = scope is not None and scope.text in ['test', 'provided'] + dependencies.append(Dependency(name, version_str, dev=is_dev)) + + return dependencies + + +#### +## "build.gradle" PARSER +##### +DependencyParser.register() +class GradleBuildParser(BaseDependencyParser): + """Parser for build.gradle (Java/Gradle)""" + + @property + def dep_file(self): + return "build.gradle" + + def parse(self) -> List[Dependency]: + content = self._read_file() + dependencies = [] + + # Gradle deppendenciees matching pattern + patterns = [ + r"(?:implementation|compile|api|testImplementation|testCompile)\s+['\"]([^:]+):([^:]+):([^'\"]+)['\"]", + r"(?:implementation|compile|api|testImplementation|testCompile)\s+group:\s*['\"]([^'\"]+)['\"],\s*name:\s*['\"]([^'\"]+)['\"],\s*version:\s*['\"]([^'\"]+)['\"]" + ] + + for pattern in patterns: + matches = re.findall(pattern, content) + for match in matches: + if len(match) == 3: + group_id, artifact_id, version = match + name = f"{group_id}:{artifact_id}" + dependencies.append(Dependency(name, version)) + + return dependencies + + +#### +## "Cargo.toml" PARSER +##### +DependencyParser.register() +class CargoTomlParser(BaseDependencyParser): + """Parser for Cargo.toml (Rust)""" + + @property + def dep_file(self): + return "Cargo.toml" + + def parse(self) -> List[Dependency]: + content = self._read_file() + data = toml.loads(content) + + dependencies = [] + + # Production + if 'dependencies' in data: + for name, version_info in data['dependencies'].items(): + version = version_info if isinstance(version_info, str) else version_info.get('version') + dependencies.append(Dependency(name, version, dev=False)) + + # Developpement + if 'dev-dependencies' in data: + for name, version_info in data['dev-dependencies'].items(): + version = version_info if isinstance(version_info, str) else version_info.get('version') + dependencies.append(Dependency(name, version, dev=True)) + + return dependencies + + +#### +## "Cargo.lock" PARSER +##### +DependencyParser.register() +class CargoLockParser(BaseDependencyParser): + """Parser for Cargo.lock (Rust)""" + + @property + def dep_file(self): + return "Cargo.lock" + + def parse(self) -> List[Dependency]: + content = self._read_file() + data = toml.loads(content) + + dependencies = [] + + if 'package' in data: + for package in data['package']: + name = package.get('name') + version = package.get('version') + dependencies.append(Dependency(name, version)) + + return dependencies + + +#### +## "go.mod" PARSER +##### +DependencyParser.register() +class GoModParser(BaseDependencyParser): + """Parser for go.mod (Go)""" + + @property + def dep_file(self): + return "go.mod" + + def parse(self) -> List[Dependency]: + content = self._read_file() + dependencies = [] + + # Required matching pattern + in_require = False + for line in content.split('\n'): + line = line.strip() + + if line.startswith('require ('): + in_require = True + continue + elif line == ')' and in_require: + in_require = False + continue + + if in_require or line.startswith('require '): + # Clean the line + line = line.replace('require ', '').strip() + if not line or line == '(': + continue + + # Match module version + parts = line.split() + if len(parts) >= 2: + module = parts[0] + version = parts[1] + dependencies.append(Dependency(module, version)) + + return dependencies + + +#### +## "composer.json" PARSER +##### +DependencyParser.register() +class ComposerJsonParser(BaseDependencyParser): + """Parser for composer.json (PHP)""" + + @property + def dep_file(self): + return "composer.json" + + def parse(self) -> List[Dependency]: + content = self._read_file() + data = json.loads(content) + + dependencies = [] + + # Production + if 'require' in data: + for name, version in data['require'].items(): + if name != 'php': # Exclude PHP version + dependencies.append(Dependency(name, version, dev=False)) + + # Developement + if 'require-dev' in data: + for name, version in data['require-dev'].items(): + dependencies.append(Dependency(name, version, dev=True)) + + return dependencies + + +#### +## "composer.lock" PARSER +##### +DependencyParser.register() +class ComposerLockParser(BaseDependencyParser): + """Parser pour composer.lock (PHP)""" + + @property + def dep_file(self): + return "composer.lock " + + def parse(self) -> List[Dependency]: + content = self._read_file() + data = json.loads(content) + + dependencies = [] + + # Production + if 'packages' in data: + for package in data['packages']: + name = package.get('name') + version = package.get('version') + dependencies.append(Dependency(name, version, dev=False)) + + # Developement + if 'packages-dev' in data: + for package in data['packages-dev']: + name = package.get('name') + version = package.get('version') + dependencies.append(Dependency(name, version, dev=True)) + + return dependencies + + +#### DEPENDENCY PARSER UTILITY FFUNCTION +def parse_dependencies(file_path: Union[str, Path]) -> List[Dependency]: + """ + Utillity funnction for dependency files parsing + + Args: + file_path: depenedency file path + + Returns: + Dependency List + """ + return DependencyParser.parse(file_path) + +### IS DEP FILE SUPPORTED +def is_supported(file_path: Union[str,Path]) -> bool: + """ + Check if the file is a supported dependency manifest + + Args: + file_path: depenedency file path + + Returns: + True if file is upported else False + """ + return DependencyParser.is_supported(file_path)