diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5c5ef45..59da450 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,9 +2,9 @@ name: CI Pipeline on: push: - branches: ["*"] # Runs on all branches + branches: ["*"] # Runs on all branches pull_request: - branches: ["*"] # Runs on all pull requests + branches: ["*"] # Runs on all pull requests jobs: tests: @@ -18,7 +18,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: "3.8" + python-version: "3.9" - name: Install dependencies run: | @@ -27,4 +27,4 @@ jobs: pip install pytest pytest-cov - name: Run Tests with Coverage - run: pytest --cov=src --cov-report=term --cov-fail-under=75 \ No newline at end of file + run: pytest --cov=src --cov-report=term --cov-fail-under=75 diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index 2b6a073..4c98762 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -14,27 +14,13 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: "3.8" - - - name: Cache Python dependencies - uses: actions/cache@v3 - with: - path: ~/.cache/pip - key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }} - restore-keys: | - ${{ runner.os }}-pip- + python-version: "3.9" - name: Install dependencies run: | python -m pip install --upgrade pip - pip install bandit pip-audit + pip install bandit - name: Run Bandit Security Scan id: bandit - continue-on-error: true - run: bandit -r src/pentestkit/ --verbose - - - name: Run pip-audit - id: pip-audit - continue-on-error: true - run: pip-audit -r requirements.txt + run: bandit -c bandit.yaml -r src/pentestkit/ --verbose diff --git a/.gitignore b/.gitignore index 0b8e57a..1de7cac 100644 --- a/.gitignore +++ b/.gitignore @@ -171,4 +171,7 @@ cython_debug/ .pypirc #VSCode -.vscode \ No newline at end of file +.vscode + +# .ipynb files +*.ipynb \ No newline at end of file diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..b27ed64 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,2 @@ +include requirements.txt +include VERSION \ No newline at end of file diff --git a/README.md b/README.md index 7aa28fe..ed665dc 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,5 @@ This project is licensed under the **Apache 2.0 License**. See [LICENSE](LICENSE ## Additional Documentation For more detailed information, please refer to the following documents: -- [API Reference](docs/API_REFERENCE.md): Comprehensive guide to the API endpoints and their usage. - [Changelog](docs/CHANGELOG.md): A log of all the changes, updates, and fixes made to the project. - [Contributing Guide](docs/CONTRIBUTING.md): Guidelines for contributing to the project, including how to report issues and submit code changes. \ No newline at end of file diff --git a/bandit.yaml b/bandit.yaml new file mode 100644 index 0000000..cadff06 --- /dev/null +++ b/bandit.yaml @@ -0,0 +1,10 @@ +# Bandit configuration file + +# Skip warnings for specific tests +skips: + - B311 # Skip B311 which warns about `random` usage + - B101 # Skip B101 which warns about assert statements in tests + +#Additional configurations +exclude_dirs: + - tests # Exclude test directories as they often contain intentional test patterns \ No newline at end of file diff --git a/docs/USAGE.md b/docs/USAGE.md index e69de29..3ab3750 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -0,0 +1,30 @@ +# API Reference + +This document provides detailed documentation for the pentest library. + +## Table of Contents + +- [Core Components](#core-components) + - [Parser Module](#parser-module) + +## Core Components + +### Parser Module + +The parser module handles parsing OpenAPI/Swagger specifications. + +#### `OpenAPIParser` + +```python +from pentestkit.parser import OpenAPIParser + +parser = OpenAPIParser(source="swagger.json") +endpoints = parser.parse("swagger.json") +base_url = parser.get_base_url() +``` + +**Methods:** + +- `parse(source: str) -> List[Endpoint]`: Parse API specification from a URL or file path +- `get_format() -> SpecFormat`: Get the format of the specification (OpenAPI v2 or v3) +- `get_base_url() -> Optional[str]`: Get the base URL from the specification \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index c3ca841..f9c92ce 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ requests==2.32.3 -urllib3>=2.2.3 +urllib3==2.4.0 rstr==3.2.2 \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 66aaa92..518b4b3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -15,4 +15,13 @@ omit = [coverage:report] exclude_lines = - pass \ No newline at end of file + pass + +[mypy] +warn_unused_configs = True + +[mypy-pytest.*] +ignore_missing_imports = True + +[mypy-pentestkit.*] +ignore_missing_imports = True \ No newline at end of file diff --git a/src/pentestkit/dataclasses/body.py b/src/pentestkit/dataclasses/body.py index 0cb5154..e980f1a 100644 --- a/src/pentestkit/dataclasses/body.py +++ b/src/pentestkit/dataclasses/body.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import random +from ..utils import generate_test_value from dataclasses import dataclass, field -from typing import Any, Dict, Optional +from typing import Any, Optional, Union @dataclass @@ -21,8 +23,219 @@ class RequestBody: """Represents a request body.""" content_type: str - schema: Dict[str, Any] + schema: dict[str, Any] required: bool = False example: Optional[Any] = None - properties: Dict[str, Any] = field(default_factory=dict) - resolved_schema: Dict[str, Any] = field(default_factory=dict) \ No newline at end of file + properties: dict[str, Any] = field(default_factory=dict) + resolved_schema: dict[str, Any] = field(default_factory=dict) + + def __post_init__(self): + """Generates example values if not provided""" + if self.example is None: + self.example = generate_example_body(self) + + +def generate_example_body(request_body: RequestBody) -> Any: + """ + Generate an example body based on the request body's content type. + + Args: + request_body (RequestBody): The request body object. + + Returns: + Any: A suitable example value for the request body based on its content type. + """ + + content_type_handlers = { + "application/json": lambda: generate_json_example(request_body.properties), + } + + handler = content_type_handlers.get(request_body.content_type) + return handler() if handler else None + + +def generate_json_example(properties: dict[str, Any]) -> dict[str, Any]: + """ + Generate a JSON example based on property schemas. + + Args: + properties (dict[str, Any]): A dictionary mapping property names to their schemas. + + Returns: + dict[str, Any]: A dictionary containing example values for each property. + """ + result = {} + for prop_name, prop_schema in properties.items(): + # Handle anyOf case + if "anyOf" in prop_schema: + for schema_option in prop_schema["anyOf"]: + if schema_option.get("type") != "null": + result[prop_name] = generate_property_value(schema_option) + break + continue + + # Handle normal types + result[prop_name] = generate_property_value(prop_schema) + + return result + + +def generate_property_value(schema: dict[str, Any]) -> Any: + """ + Generate an example value for a property based on its schema. + + Args: + schema (dict[str, Any]): The schema definition of the property. + + Returns: + Any: A suitable example value based on the property's type and constraints. + """ + try: + prop_type = schema.get("type", "string") + + if prop_type == "string": + return generate_string_property(schema) + elif prop_type == "number" or prop_type == "integer": + return generate_numerical_property(schema, prop_type == "integer") + elif prop_type == "boolean": + return random.choice([True, False]) + elif prop_type == "object": + return generate_object_property(schema) + elif prop_type == "array": + return generate_array_property(schema) + + # Default fallback + return "example" + except: + return "example" + + +def generate_string_property(schema: dict[str, Any]) -> str: + """ + Generate an example string value based on schema constraints. + + Args: + schema (dict[str, Any]): The schema definition for a string property. + + Returns: + str: A string example that satisfies the schema constraints. + """ + + for attr in ("examples", "enum", "example"): + if values := schema.get(attr): + return random.choice(values) if isinstance(values, list) else values + + pattern = schema.get("pattern") + min_length = schema.get("minLength", 3) + max_length = schema.get("maxLength", 50) + + if pattern: + return generate_test_value(pattern, min_length, max_length) + + format_values = { + "date": "2025-04-23", + "date-time": "2025-04-23T14:30:00Z", + "email": "user@example.com", + "uri": "https://example.com/resource", + "hostname": "example.com", + "ipv4": "192.168.1.1", + "ipv6": "2001:0db8:85a3:0000:0000:8a2e:0370:7334", + } + + return format_values.get(schema.get("format", "none"), "example") + + +def generate_numerical_property( + schema: dict[str, Any], is_integer: bool +) -> Union[int, float]: + """ + Generate an example numerical value based on schema constraints. + + Args: + schema (dict[str, Any]): The schema definition for a numerical property. + is_integer (bool): Whether the value should be an integer (True) or float (False). + + Returns: + Union[int, float]: A numerical example that satisfies the schema constraints. + """ + try: + + for attr in ("examples", "enum", "example"): + if values := schema.get(attr): + + value = random.choice(values) if isinstance(values, list) else values + return int(value) if is_integer else float(value) + + if "exclusiveMinimum" in schema: + minimum = schema["exclusiveMinimum"] + (1 if is_integer else 0.01) + else: + minimum = schema.get("minimum", 1) + + if "exclusiveMaximum" in schema: + maximum = schema["exclusiveMaximum"] - (1 if is_integer else 0.01) + else: + maximum = schema.get("maximum", 100) + + # Ensure min doesn't exceed max + if minimum > maximum: + minimum, maximum = maximum, minimum + + if is_integer: + return random.randint(int(minimum), int(maximum)) + + multiple_of = schema.get("multipleOf") + if multiple_of: + value = random.uniform(float(minimum), float(maximum)) + return round(value / multiple_of) * multiple_of + + return round(random.uniform(float(minimum), float(maximum)), 2) + + except (ValueError, TypeError): + return 1 if is_integer else 1.0 + + +def generate_object_property(schema: dict[str, Any]) -> dict[str, Any]: + """ + Generate an example object based on schema definition. + + Args: + schema (dict[str, Any]): The schema definition for an object property. + + Returns: + dict[str, Any]: An object example that satisfies the schema definition. + """ + + if "properties" in schema: + return generate_json_example(schema.get("properties", {})) + elif "additionalProperties" in schema and isinstance( + schema["additionalProperties"], dict + ): + sample_props = { + "key1": generate_property_value(schema["additionalProperties"]), + "key2": generate_property_value(schema["additionalProperties"]), + } + return sample_props + return {"example": "object-value"} + + +def generate_array_property(schema: dict[str, Any]) -> list[Any]: + """ + Generate an example array based on schema definition. + + Args: + schema (dict[str, Any]): The schema definition for an array property. + + Returns: + list[Any]: An array example that satisfies the schema constraints including + min/max items and item schema. + """ + + items_schema = schema.get("items", {}) + min_items = schema.get("minItems", 1) + max_items = schema.get("maxItems", 3) + num_items = min(min_items, max_items) + + result = [] + for _ in range(num_items): + result.append(generate_property_value(items_schema)) + return result diff --git a/src/pentestkit/dataclasses/endpoint.py b/src/pentestkit/dataclasses/endpoint.py index 596fb69..c616617 100644 --- a/src/pentestkit/dataclasses/endpoint.py +++ b/src/pentestkit/dataclasses/endpoint.py @@ -13,7 +13,7 @@ # limitations under the License. from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional +from typing import Any, Optional from .parameter import Parameter from .body import RequestBody @@ -27,9 +27,9 @@ class Endpoint: operation_id: Optional[str] = None summary: Optional[str] = None description: Optional[str] = None - parameters: List[Parameter] = field(default_factory=list) + parameters: list[Parameter] = field(default_factory=list) request_body: Optional[RequestBody] = None - responses: Dict[str, Any] = field(default_factory=dict) - security: List[Dict[str, List[str]]] = field(default_factory=list) - tags: List[str] = field(default_factory=list) - deprecated: bool = False \ No newline at end of file + responses: dict[str, Any] = field(default_factory=dict) + security: list[dict[str, list[str]]] = field(default_factory=list) + tags: list[str] = field(default_factory=list) + deprecated: bool = False diff --git a/src/pentestkit/dataclasses/parameter.py b/src/pentestkit/dataclasses/parameter.py index 502a78f..7192c23 100644 --- a/src/pentestkit/dataclasses/parameter.py +++ b/src/pentestkit/dataclasses/parameter.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import rstr import random from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional, Union +from typing import Any, Optional, Union +from ..utils import generate_test_value @dataclass @@ -28,14 +28,14 @@ class Parameter: type: str = "string" format: Optional[str] = None description: Optional[str] = None - enum: List[Any] = field(default_factory=list) + enum: list[Any] = field(default_factory=list) example: Optional[Any] = None pattern: Optional[str] = None min_length: Optional[int] = None max_length: Optional[int] = None minimum: Optional[Union[int, float]] = None maximum: Optional[Union[int, float]] = None - schema: Optional[Dict[str, Any]] = None + schema: Optional[dict[str, Any]] = None def __post_init__(self): """Apply defaults only when values are None.""" @@ -43,30 +43,6 @@ def __post_init__(self): self.example = generate_example_value(self) -def generate_test_value( - pattern: str, min_length: int, max_length: int -) -> Optional[str]: - """ - Generate a valid test value that matches the given regex pattern - and length constraints. - - Args: - pattern (str): The regex pattern to match - min_length (int): The minimum length of the value. - max_length (int): The maximum length of the value. - - Returns: - Optional[str]: A valid test value that matches the given pattern - and length constraints. Returns None if a valid value could not be - generated. - """ - for _ in range(100): # Limit attempts to avoid infinite loops - value = rstr.xeger(pattern) - if min_length <= len(value) <= max_length: - return value - return None - - def generate_example_value(parameter: Parameter) -> Any: """ Generate an example value for a parameter based on its type, location, and constraints. diff --git a/src/pentestkit/parser/base_parser.py b/src/pentestkit/parser/base_parser.py index 807bcda..3fe73e8 100644 --- a/src/pentestkit/parser/base_parser.py +++ b/src/pentestkit/parser/base_parser.py @@ -23,7 +23,7 @@ import urllib.parse from enum import Enum from abc import ABC, abstractmethod -from typing import Any, Dict, List, Optional, Union +from typing import Any, Optional, Union from ..dataclasses import Endpoint @@ -44,7 +44,7 @@ class SpecFormat(Enum): class BaseParser(ABC): """Base class for API parsers.""" - def __init__(self, source: Union[str, Dict[str, Any]]): + def __init__(self, source: Union[str, dict[str, Any]]): """ Initialize the BaseParser with a source. @@ -58,10 +58,10 @@ def __init__(self, source: Union[str, Dict[str, Any]]): ParserError: If the specification cannot be loaded or parsed """ self.logger = logging.getLogger(self.__class__.__name__) - self.endpoints: List[Endpoint] = [] - self.spec: Dict[str, Any] = self._load_spec(source) - self.servers: List[str] = [] - self.security_schemes: Dict[str, Dict[str, Any]] = {} + self.endpoints: list[Endpoint] = [] + self.spec: dict[str, Any] = self._load_spec(source) + self.servers: list[str] = [] + self.security_schemes: dict[str, dict[str, Any]] = {} # Parse the specification immediately upon initialization self._parse() @@ -83,7 +83,7 @@ def get_base_url(self) -> Optional[str]: """Get the base URL of the API.""" return self.servers[0] if self.servers else None - def _load_spec(self, source: Union[str, Dict[str, Any]]) -> Dict[str, Any]: + def _load_spec(self, source: Union[str, dict[str, Any]]) -> dict[str, Any]: """ Load the API specification from various sources. diff --git a/src/pentestkit/parser/openapi_parser.py b/src/pentestkit/parser/openapi_parser.py index 9d65525..f78e5a9 100644 --- a/src/pentestkit/parser/openapi_parser.py +++ b/src/pentestkit/parser/openapi_parser.py @@ -17,7 +17,7 @@ Supports both OpenAPI v2 (Swagger 2.0) and OpenAPI v3 specifications. """ -from typing import Any, Dict, List, Optional, Set, Union +from typing import Any, Optional, Union from .base_parser import BaseParser, ParserError, SpecFormat from ..dataclasses import Endpoint, Parameter, RequestBody @@ -25,9 +25,9 @@ class OpenAPIParser(BaseParser): """Parser for OpenAPI/Swagger specifications.""" - def __init__(self, source: Union[str, Dict[str, Any]]): - self.components_schemas: Dict[str, Any] = {} - self.visited_refs: Set[str] = set() + def __init__(self, source: Union[str, dict[str, Any]]): + self.components_schemas: dict[str, Any] = {} + self.visited_refs: set[str] = set() super().__init__(source) def _parse(self): @@ -54,8 +54,8 @@ def _parse(self): def get_format(self) -> SpecFormat: """Get the format of the API specification.""" if "swagger" in self.spec and self.spec["swagger"].startswith("2."): - self.logger.debug("Detected OpenAPI v2 (Swagger) specification") - return SpecFormat.OPENAPI_V2 + self.logger.debug("Detected OpenAPI v2 (Swagger) specification") + return SpecFormat.OPENAPI_V2 elif "openapi" in self.spec and self.spec["openapi"].startswith("3."): self.logger.debug("Detected OpenAPI v3 specification") return SpecFormat.OPENAPI_V3 @@ -72,7 +72,6 @@ def _extract_servers(self): f"Extracted {len(self.servers)} servers from OpenAPI v3 spec" ) elif self.format == SpecFormat.OPENAPI_V2: - # Swagger 2.0 uses 'schemes', 'host', and 'basePath' fields schemes = self.spec.get("schemes", ["http"]) host = self.spec.get("host", "") base_path = self.spec.get("basePath", "/") @@ -179,14 +178,13 @@ def _extract_endpoints(self) -> None: self.logger.warning( f"Error processing endpoint {method.upper()} {path}: {str(e)}" ) - # Continue processing other endpoints self.logger.info(f"Extracted {endpoint_count} endpoints from specification") except Exception as e: self.logger.error(f"Error extracting endpoints: {str(e)}") raise ParserError(f"Failed to extract endpoints: {str(e)}") from e - def _parse_parameters(self, parameters: List[Dict[str, Any]]) -> List[Parameter]: + def _parse_parameters(self, parameters: list[dict[str, Any]]) -> list[Parameter]: """Parse parameters from the specification.""" result = [] @@ -202,7 +200,6 @@ def _parse_parameters(self, parameters: List[Dict[str, Any]]) -> List[Parameter] ) continue - # Check for required fields if "name" not in param: self.logger.warning( "Parameter missing required 'name' field, skipping" @@ -245,11 +242,10 @@ def _parse_parameters(self, parameters: List[Dict[str, Any]]) -> List[Parameter] except Exception as e: param_name = param.get("name", "unnamed") self.logger.warning(f"Error parsing parameter '{param_name}': {str(e)}") - # Continue processing other parameters return result - def _parse_request_body(self, operation: Dict[str, Any]) -> Optional[RequestBody]: + def _parse_request_body(self, operation: dict[str, Any]) -> Optional[RequestBody]: """Parse request body from the operation.""" try: if self.format == SpecFormat.OPENAPI_V3: @@ -258,7 +254,6 @@ def _parse_request_body(self, operation: Dict[str, Any]) -> Optional[RequestBody request_body = operation["requestBody"] - # Handle request body reference if "$ref" in request_body: ref_path = request_body["$ref"] self.logger.debug(f"Resolving request body reference: {ref_path}") @@ -275,15 +270,23 @@ def _parse_request_body(self, operation: Dict[str, Any]) -> Optional[RequestBody return None # Get the first content type - try: - content_type, content_schema = next(iter(content.items())) - except StopIteration: - self.logger.warning("Request body content is empty") - return None + # Prefer json content type if available + content_types = set(content.keys()) + preferred_content_types = [ + t for t in content_types if "json" in t.lower() + ] + if preferred_content_types: + content_type = preferred_content_types[0] + content_schema = content[content_type] + else: + try: + content_type, content_schema = next(iter(content.items())) + except StopIteration: + self.logger.warning("Request body content is empty") + return None schema = content_schema.get("schema", {}) - # Resolve schema reference if needed resolved_schema = {} if "$ref" in schema: ref_path = schema["$ref"] @@ -300,7 +303,6 @@ def _parse_request_body(self, operation: Dict[str, Any]) -> Optional[RequestBody elif "properties" in resolved_schema: properties = resolved_schema["properties"] - # Resolve property references resolved_properties = self._resolve_property_references(properties) self.logger.debug( @@ -316,12 +318,10 @@ def _parse_request_body(self, operation: Dict[str, Any]) -> Optional[RequestBody ) elif self.format == SpecFormat.OPENAPI_V2: - # In Swagger 2.0, request body is defined in parameters with "in": "body" for param in operation.get("parameters", []): if param.get("in") == "body" and "schema" in param: schema = param["schema"] - # Resolve schema reference if needed resolved_schema = {} if "$ref" in schema: ref_path = schema["$ref"] @@ -338,14 +338,13 @@ def _parse_request_body(self, operation: Dict[str, Any]) -> Optional[RequestBody elif "properties" in resolved_schema: properties = resolved_schema["properties"] - # Resolve property references resolved_properties = self._resolve_property_references( properties ) self.logger.debug("Parsed Swagger 2.0 request body") return RequestBody( - content_type="application/json", # Default for Swagger 2.0 + content_type="application/json", schema=schema, required=param.get("required", False), example=param.get("example"), @@ -359,8 +358,8 @@ def _parse_request_body(self, operation: Dict[str, Any]) -> Optional[RequestBody return None def _resolve_property_references( - self, properties: Dict[str, Any] - ) -> Dict[str, Any]: + self, properties: dict[str, Any] + ) -> dict[str, Any]: """Recursively resolve references in properties.""" resolved = {} @@ -379,14 +378,32 @@ def _resolve_property_references( resolved[prop_name] = prop_schema continue - # Create a new dict to avoid modifying the original schema resolved_prop = {**prop_schema, **ref_schema} resolved_prop.pop("$ref", None) resolved[prop_name] = resolved_prop + elif "anyOf" in prop_schema: + resolved_any_of = [] + for sub_schema in prop_schema["anyOf"]: + if "$ref" in sub_schema: + ref_path = sub_schema["$ref"] + self.logger.debug( + f"Resolving anyOf reference: {ref_path} for property {prop_name}" + ) + ref_sub_schema = self._resolve_reference(ref_path) + if not ref_sub_schema: + self.logger.warning( + f"Failed to resolve anyOf reference: {ref_path} for property {prop_name}" + ) + continue + + resolved_sub_schema = {**sub_schema, **ref_sub_schema} + resolved_sub_schema.pop("$ref", None) + resolved_any_of.append(ref_sub_schema) + else: + resolved_any_of.append(sub_schema) elif ( prop_schema.get("type") == "object" and "properties" in prop_schema ): - # Recursively resolve nested properties nested_props = self._resolve_property_references( prop_schema["properties"] ) @@ -410,7 +427,6 @@ def _resolve_property_references( resolved_items = {**items, **ref_items} resolved_items.pop("$ref", None) - # Handle nested object in array items if ( resolved_items.get("type") == "object" and "properties" in resolved_items @@ -428,13 +444,11 @@ def _resolve_property_references( resolved[prop_name] = prop_schema except Exception as e: self.logger.warning(f"Error resolving property {prop_name}: {str(e)}") - resolved[prop_name] = ( - prop_schema # Use original schema if resolution fails - ) + resolved[prop_name] = prop_schema return resolved - def _resolve_reference(self, ref: str) -> Dict[str, Any]: + def _resolve_reference(self, ref: str) -> dict[str, Any]: """Resolve a reference in the specification.""" if not ref: self.logger.warning("Empty reference provided") @@ -442,17 +456,14 @@ def _resolve_reference(self, ref: str) -> Dict[str, Any]: try: if ref in self.visited_refs: - # Prevent infinite recursion with circular references self.logger.warning(f"Circular reference detected: {ref}") return {} self.visited_refs.add(ref) - # Handle local references if ref.startswith("#/"): parts = ref.split("/")[1:] - # Navigate through the spec current = self.spec for part in parts: if part not in current: @@ -463,12 +474,10 @@ def _resolve_reference(self, ref: str) -> Dict[str, Any]: return {} current = current[part] - # If the resolved object has further references, resolve them too if isinstance(current, dict) and "$ref" in current: nested_ref = current["$ref"] self.logger.debug(f"Resolving nested reference: {nested_ref}") resolved = self._resolve_reference(nested_ref) - # Merge with the original object, but keep the original values if they exist for key, value in resolved.items(): if key not in current: current[key] = value @@ -476,7 +485,6 @@ def _resolve_reference(self, ref: str) -> Dict[str, Any]: self.visited_refs.remove(ref) return current - # External references not supported yet self.logger.warning(f"External references not supported: {ref}") self.visited_refs.remove(ref) return {} diff --git a/src/pentestkit/utils.py b/src/pentestkit/utils.py index e69de29..eb6e066 100644 --- a/src/pentestkit/utils.py +++ b/src/pentestkit/utils.py @@ -0,0 +1,40 @@ +# Copyright 2025 Rahul Kaushal +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import rstr + + +def generate_test_value(pattern: str, min_length: int, max_length: int) -> str: + """ + Generate a valid test value that matches the given regex pattern + and length constraints. + + Args: + pattern (str): The regex pattern to match + min_length (int): The minimum length of the value. + max_length (int): The maximum length of the value. + + Returns: + str: A valid test value that matches the given pattern + and length constraints. Returns "example" if a valid value could not be + generated. + """ + try: + for _ in range(100): # Limit attempts to avoid infinite loops + value = rstr.xeger(pattern) + if min_length <= len(value) <= max_length: + return value + return "example" + except Exception: + return "example" diff --git a/src/tests/test_dataclasses/test_body.py b/src/tests/test_dataclasses/test_body.py new file mode 100644 index 0000000..e7ce04c --- /dev/null +++ b/src/tests/test_dataclasses/test_body.py @@ -0,0 +1,305 @@ +import re +from ...pentestkit.dataclasses.body import ( + RequestBody, +) + + +def test_request_body_with_example(): + example = {"name": "test"} + + request_body = RequestBody( + content_type="application/json", + schema={"type": "object"}, + example=example, + ) + + assert request_body.example == example + + +def test_generate_example_body_json(): + request_body = RequestBody( + content_type="application/json", + schema={"type": "object"}, + properties={ + "name": {"type": "string"}, + }, + ) + + result = request_body.example + assert isinstance(result, dict) + assert "name" in result + assert isinstance(result["name"], str) + + +def test_generate_example_body_unsupported_content_type(): + request_body = RequestBody( + content_type="unsupported/type", + schema={"type": "object"}, + properties={ + "name": {"type": "string"}, + }, + ) + + result = request_body.example + assert result is None + + +def test_generate_example_body_string_properties(): + request_body = RequestBody( + content_type="application/json", + schema={"type": "object"}, + properties={ + "simple": {"type": "string"}, + "with_enum": { + "type": "string", + "enum": ["value1", "value2", "value3"], + }, + "with_example": { + "type": "string", + "example": "example_value", + }, + "with_pattern": { + "type": "string", + "pattern": r"^[a-zA-Z0-9]{3,10}$", + }, + "with_format_email": { + "type": "string", + "format": "email", + }, + "with_format_date": { + "type": "string", + "format": "date", + }, + "with_format_date_time": { + "type": "string", + "format": "date-time", + }, + "with_format_uri": { + "type": "string", + "format": "uri", + }, + "with_format_ipv4": { + "type": "string", + "format": "ipv4", + }, + "with_format_ipv6": { + "type": "string", + "format": "ipv6", + }, + "with_format_hostname": { + "type": "string", + "format": "hostname", + }, + }, + ) + + result = request_body.example + assert isinstance(result, dict) + assert isinstance(result["simple"], str) + assert result["with_enum"] in ["value1", "value2", "value3"] + assert result["with_example"] == "example_value" + assert re.match(r"^[a-zA-Z0-9]{3,10}$", result["with_pattern"]) + assert result["with_format_email"] == "user@example.com" + assert result["with_format_date"] == "2025-04-23" + assert result["with_format_date_time"] == "2025-04-23T14:30:00Z" + assert result["with_format_uri"] == "https://example.com/resource" + assert result["with_format_ipv4"] == "192.168.1.1" + assert result["with_format_ipv6"] == "2001:0db8:85a3:0000:0000:8a2e:0370:7334" + assert result["with_format_hostname"] == "example.com" + + +def test_generate_example_body_numeric_properties(): + request_body = RequestBody( + content_type="application/json", + schema={"type": "object"}, + properties={ + "integer_simple": {"type": "integer"}, + "integer_with_enum": { + "type": "integer", + "enum": [1, 2, 3], + }, + "integer_with_example": { + "type": "integer", + "example": 42, + }, + "integer_with_range": { + "type": "integer", + "minimum": 10, + "maximum": 12, + }, + "integer_with_exclusive_range": { + "type": "integer", + "exclusiveMinimum": 10, + "exclusiveMaximum": 12, + }, + "integer_with_wrong_min_max": { + "type": "integer", + "minimum": 10, + "maximum": 5, + }, + "integer_with_error": {"type": "integer", "example": "error"}, + "number_simple": {"type": "number"}, + "number_with_enum": { + "type": "number", + "enum": [1.1, 2.2, 3.3], + }, + "number_with_example": { + "type": "number", + "example": 3.14, + }, + "number_with_range": { + "type": "number", + "minimum": 1.0, + "maximum": 2.0, + }, + "number_with_exclusive_range": { + "type": "number", + "exclusiveMinimum": 10.0, + "exclusiveMaximum": 11.0, + }, + "number_with_multiple_of": { + "type": "number", + "minimum": 0.0, + "maximum": 2.0, + "multipleOf": 0.5, + }, + "number_with_wrong_min_max": { + "type": "number", + "minimum": 10.0, + "maximum": 5.0, + }, + "number_with_error": {"type": "number", "example": "error"}, + }, + ) + + result = request_body.example + assert isinstance(result, dict) + assert isinstance(result["integer_simple"], int) + assert result["integer_with_enum"] in [1, 2, 3] + assert result["integer_with_example"] == 42 + assert 10 <= result["integer_with_range"] <= 12 + assert result["integer_with_exclusive_range"] == 11 + assert 5 <= result["integer_with_wrong_min_max"] <= 10 + assert result["integer_with_error"] == 1 + assert isinstance(result["number_simple"], float) + assert result["number_with_enum"] in [1.1, 2.2, 3.3] + assert result["number_with_example"] == 3.14 + assert 1.0 <= result["number_with_range"] <= 2.0 + assert 10.0 < result["number_with_exclusive_range"] < 11.0 + assert result["number_with_multiple_of"] in [0.0, 0.5, 1.0, 1.5, 2.0] + assert 5.0 <= result["number_with_wrong_min_max"] <= 10.0 + assert result["number_with_error"] == 1.0 + + +def test_generate_example_body_boolean_properties(): + request_body = RequestBody( + content_type="application/json", + schema={"type": "object"}, + properties={"is_active": {"type": "boolean"}}, + ) + result = request_body.example + assert isinstance(result, dict) + assert isinstance(result["is_active"], bool) + + +def test_generate_example_body_object_properties(): + request_body = RequestBody( + content_type="application/json", + schema={"type": "object"}, + properties={ + "simple_object": { + "type": "object", + "properties": {"name": {"type": "string"}}, + }, + "empty_object": {"type": "object"}, + "additional_props_object": { + "type": "object", + "additionalProperties": {"type": "string"}, + }, + }, + ) + + result = request_body.example + assert isinstance(result, dict) + assert isinstance(result["simple_object"], dict) + assert "name" in result["simple_object"] + assert isinstance(result["simple_object"]["name"], str) + assert isinstance(result["empty_object"], dict) + assert result["empty_object"] == {"example": "object-value"} + assert isinstance(result["additional_props_object"], dict) + assert "key1" in result["additional_props_object"] + assert "key2" in result["additional_props_object"] + + +def test_generate_example_body_array_properties(): + request_body = RequestBody( + content_type="application/json", + schema={"type": "object"}, + properties={ + "string_array": {"type": "array", "items": {"type": "string"}}, + "integer_array": { + "type": "array", + "items": {"type": "integer"}, + "minItems": 2, + "maxItems": 5, + }, + "complex_array": { + "type": "array", + "items": {"type": "object", "properties": {"name": {"type": "string"}}}, + }, + }, + ) + + result = request_body.example + assert isinstance(result, dict) + assert isinstance(result["string_array"], list) + assert all(isinstance(item, str) for item in result["string_array"]) + assert isinstance(result["integer_array"], list) + assert 2 <= len(result["integer_array"]) <= 5 + assert all(isinstance(item, int) for item in result["integer_array"]) + assert isinstance(result["complex_array"], list) + assert all(isinstance(item, dict) for item in result["complex_array"]) + assert all("name" in item for item in result["complex_array"]) + + +def test_generate_example_body_with_anyof(): + request_body = RequestBody( + content_type="application/json", + schema={"type": "object"}, + properties={ + "name": {"type": "string"}, + "status": { + "anyOf": [ + {"type": "string", "pattern": r"^(active|inactive)$"}, + {"type": "null"}, + ] + }, + }, + ) + + result = request_body.example + assert isinstance(result, dict) + assert "name" in result + assert "status" in result + assert isinstance(result["name"], str) + assert isinstance(result["status"], str) + assert re.match(r"^(active|inactive)$", result["status"]) + + +def test_generate_example_body_unsupported_properties(): + request_body = RequestBody( + content_type="application/json", + schema={"type": "object"}, + properties={"is_active": {"type": "unsupported"}}, + ) + result = request_body.example + assert result == {"is_active": "example"} + +def test_generate_example_body_schema_error(): + request_body = RequestBody( + content_type="application/json", + schema={"type": "object"}, + properties={"is_active": {"type" : "array", "minItems": "error"}}, + ) + result = request_body.example + assert result == {"is_active": "example"} diff --git a/src/tests/test_dataclasses/test_parameter.py b/src/tests/test_dataclasses/test_parameter.py index 88e8deb..f20739b 100644 --- a/src/tests/test_dataclasses/test_parameter.py +++ b/src/tests/test_dataclasses/test_parameter.py @@ -1,34 +1,10 @@ import re -from ...pentestkit.dataclasses.parameter import ( - Parameter, - generate_test_value, - generate_example_value, -) - - -# Test cases for generate_test_value -def test_generate_test_value_valid_pattern(): - pattern = r"[a-z]{5}" - min_length = 5 - max_length = 5 - value = generate_test_value(pattern, min_length, max_length) - assert value is not None - assert len(value) == 5 - assert bool(re.fullmatch(pattern, value)) - - -def test_generate_test_value_invalid_pattern(): - pattern = r"[a-z]{5}" - min_length = 6 - max_length = 10 - value = generate_test_value(pattern, min_length, max_length) - assert value is None +from ...pentestkit.dataclasses.parameter import Parameter -# Test cases for generate_example_value - generate_path_parameter_value def test_generate_path_parameter_value_string_without_pattern(): param = Parameter(name="id", location="path", type="string") - value = generate_example_value(param) + value = param.example assert len(value) >= 3 assert len(value) <= 50 assert bool(re.fullmatch(r"[a-z]+", value)) @@ -37,7 +13,7 @@ def test_generate_path_parameter_value_string_without_pattern(): def test_generate_path_parameter_value_string_with_pattern(): pattern = r"^\d{3}-\d{3}-\d{4}$" param = Parameter(name="id", location="path", type="string", pattern=pattern) - value = generate_example_value(param) + value = param.example assert len(value) >= 3 assert len(value) <= 50 assert bool(re.fullmatch(pattern, value)) @@ -46,34 +22,34 @@ def test_generate_path_parameter_value_string_with_pattern(): def test_generate_path_parameter_value_string_with_enum(): possible_values = ["value-1", "value-2", "value-3"] param = Parameter(name="id", location="path", type="string", enum=possible_values) - value = generate_example_value(param) + value = param.example assert value in possible_values def test_generate_path_parameter_value_integer(): param = Parameter(name="id", location="path", type="integer") - value = generate_example_value(param) + value = param.example assert isinstance(value, int) assert 1 <= value <= 100 def test_generate_path_parameter_value_number(): param = Parameter(name="id", location="path", type="number") - value = generate_example_value(param) + value = param.example assert isinstance(value, int) assert 1 <= value <= 100 def test_generate_path_parameter_value_object(): param = Parameter(name="id", location="path", type="object") - value = generate_example_value(param) + value = param.example assert value == "default" # Test cases for generate_example_value - generate_query_parameter_value def test_generate_query_parameter_value_string_without_pattern(): param = Parameter(name="query", location="query", type="string") - value = generate_example_value(param) + value = param.example assert len(value) >= 3 assert len(value) <= 50 assert bool(re.fullmatch(r"[a-z]+", value)) @@ -82,7 +58,7 @@ def test_generate_query_parameter_value_string_without_pattern(): def test_generate_query_parameter_value_string_with_pattern(): pattern = r"^#([A-Fa-f0-9]{6}|[A-Fa-f0-9]{3})$" param = Parameter(name="query", location="query", type="string", pattern=pattern) - value = generate_example_value(param) + value = param.example assert len(value) >= 3 assert len(value) <= 50 assert bool(re.fullmatch(pattern, value)) @@ -93,31 +69,31 @@ def test_generate_query_parameter_value_string_with_enum(): param = Parameter( name="query", location="query", type="string", enum=possible_values ) - value = generate_example_value(param) + value = param.example assert value in possible_values def test_generate_query_parameter_value_string_with_date_format(): param = Parameter(name="query", location="query", type="string", format="date") - value = generate_example_value(param) + value = param.example assert value == "2025-03-06" def test_generate_query_parameter_value_string_with_datetime_format(): param = Parameter(name="query", location="query", type="string", format="date-time") - value = generate_example_value(param) + value = param.example assert value == "2025-03-06T12:30:00Z" def test_generate_query_parameter_value_boolean(): param = Parameter(name="flag", location="query", type="boolean") - value = generate_example_value(param) + value = param.example assert value in ["true", "false"] def test_generate_query_parameter_value_integer_without_constraints(): param = Parameter(name="count", location="query", type="integer") - value = generate_example_value(param) + value = param.example assert isinstance(value, str) assert value.isdigit() assert 1 <= int(value) <= 100 @@ -127,7 +103,7 @@ def test_generate_query_parameter_value_integer_with_constraints(): param = Parameter( name="count", location="query", type="integer", minimum=20, maximum=50 ) - value = generate_example_value(param) + value = param.example assert isinstance(value, str) assert value.isdigit() assert 20 <= int(value) <= 50 @@ -135,7 +111,7 @@ def test_generate_query_parameter_value_integer_with_constraints(): def test_generate_query_parameter_value_number_without_constraints(): param = Parameter(name="count", location="query", type="number") - value = generate_example_value(param) + value = param.example assert isinstance(value, str) assert 1.0 <= float(value) <= 100.0 @@ -144,14 +120,14 @@ def test_generate_query_parameter_value_number_with_constraints(): param = Parameter( name="count", location="query", type="number", minimum=20, maximum=50 ) - value = generate_example_value(param) + value = param.example assert isinstance(value, str) assert 20.0 <= float(value) <= 50.0 def test_generate_query_parameter_value_object(): param = Parameter(name="count", location="query", type="object") - value = generate_example_value(param) + value = param.example assert value == "default" @@ -161,31 +137,31 @@ def test_generate_header_parameter_value_string_with_enum(): param = Parameter( name="Authorization", location="header", type="string", enum=possible_values ) - value = generate_example_value(param) + value = param.example assert value in possible_values def test_generate_header_parameter_value_string_accept(): param = Parameter(name="accept", location="header", type="string") - value = generate_example_value(param) + value = param.example assert value == "application/json" def test_generate_header_parameter_value_string_content_type(): param = Parameter(name="content-type", location="header", type="string") - value = generate_example_value(param) + value = param.example assert value == "application/json" def test_generate_header_parameter_value_string_authorization(): param = Parameter(name="authorization", location="header", type="string") - value = generate_example_value(param) + value = param.example assert value == "Bearer example-token" def test_generate_header_parameter_value_string_without_pattern(): param = Parameter(name="header", location="header", type="string") - value = generate_example_value(param) + value = param.example assert len(value) >= 3 assert len(value) <= 50 assert bool(re.fullmatch(r"[A-Za-z0-9\-]+", value)) @@ -194,7 +170,7 @@ def test_generate_header_parameter_value_string_without_pattern(): def test_generate_header_parameter_value_string_with_pattern(): pattern = r"\b\w+ing\b" param = Parameter(name="header", location="header", type="string", pattern=pattern) - value = generate_example_value(param) + value = param.example assert len(value) >= 3 assert len(value) <= 50 assert bool(re.fullmatch(pattern, value)) @@ -202,7 +178,7 @@ def test_generate_header_parameter_value_string_with_pattern(): def test_generate_header_parameter_value_integer_without_constraints(): param = Parameter(name="count", location="header", type="integer") - value = generate_example_value(param) + value = param.example assert isinstance(value, str) assert value.isdigit() assert 1 <= int(value) <= 100 @@ -212,7 +188,7 @@ def test_generate_header_parameter_value_integer_with_constraints(): param = Parameter( name="count", location="header", type="integer", minimum=20, maximum=50 ) - value = generate_example_value(param) + value = param.example assert isinstance(value, str) assert value.isdigit() assert 20 <= int(value) <= 50 @@ -220,7 +196,7 @@ def test_generate_header_parameter_value_integer_with_constraints(): def test_generate_header_parameter_value_number_without_constraints(): param = Parameter(name="count", location="header", type="number") - value = generate_example_value(param) + value = param.example assert isinstance(value, str) assert 1.0 <= float(value) <= 100.0 @@ -229,14 +205,14 @@ def test_generate_header_parameter_value_number_with_constraints(): param = Parameter( name="count", location="header", type="number", minimum=20, maximum=50 ) - value = generate_example_value(param) + value = param.example assert isinstance(value, str) assert 20.0 <= float(value) <= 50.0 def test_generate_header_parameter_value_object(): param = Parameter(name="count", location="header", type="object") - value = generate_example_value(param) + value = param.example assert value == "example-header-value" @@ -246,31 +222,31 @@ def test_generate_cookie_parameter_value_string_with_enum(): param = Parameter( name="cookie", location="cookie", type="string", enum=possible_values ) - value = generate_example_value(param) + value = param.example assert value in possible_values def test_generate_cookie_parameter_value_string_session(): param = Parameter(name="session", location="cookie", type="string") - value = generate_example_value(param) + value = param.example assert value == "session123" def test_generate_cookie_parameter_value_string_sessionid(): param = Parameter(name="sessionid", location="cookie", type="string") - value = generate_example_value(param) + value = param.example assert value == "session123" def test_generate_cookie_parameter_value_string_token(): param = Parameter(name="token", location="cookie", type="string") - value = generate_example_value(param) + value = param.example assert value == "example-token-value" def test_generate_cookie_parameter_value_string_without_pattern(): param = Parameter(name="cookie", location="cookie", type="string") - value = generate_example_value(param) + value = param.example assert len(value) >= 3 assert len(value) <= 50 assert bool(re.fullmatch(r"[A-Za-z0-9\-]+", value)) @@ -279,7 +255,7 @@ def test_generate_cookie_parameter_value_string_without_pattern(): def test_generate_cookie_parameter_value_string_with_pattern(): pattern = r"\b\w+ing\b" param = Parameter(name="cookie", location="cookie", type="string", pattern=pattern) - value = generate_example_value(param) + value = param.example assert len(value) >= 3 assert len(value) <= 50 assert bool(re.fullmatch(pattern, value)) @@ -287,7 +263,7 @@ def test_generate_cookie_parameter_value_string_with_pattern(): def test_generate_cookie_parameter_value_integer_without_constraints(): param = Parameter(name="count", location="cookie", type="integer") - value = generate_example_value(param) + value = param.example assert isinstance(value, str) assert value.isdigit() assert 1 <= int(value) <= 100 @@ -297,7 +273,7 @@ def test_generate_cookie_parameter_value_integer_with_constraints(): param = Parameter( name="count", location="cookie", type="integer", minimum=20, maximum=50 ) - value = generate_example_value(param) + value = param.example assert isinstance(value, str) assert value.isdigit() assert 20 <= int(value) <= 50 @@ -305,7 +281,7 @@ def test_generate_cookie_parameter_value_integer_with_constraints(): def test_generate_cookie_parameter_value_number_without_constraints(): param = Parameter(name="count", location="cookie", type="number") - value = generate_example_value(param) + value = param.example assert isinstance(value, str) assert 1.0 <= float(value) <= 100.0 @@ -314,19 +290,19 @@ def test_generate_cookie_parameter_value_number_with_constraints(): param = Parameter( name="count", location="cookie", type="number", minimum=20, maximum=50 ) - value = generate_example_value(param) + value = param.example assert isinstance(value, str) assert 20.0 <= float(value) <= 50.0 def test_generate_cookie_parameter_value_object(): param = Parameter(name="count", location="cookie", type="object") - value = generate_example_value(param) + value = param.example assert value == "example-cookie-value" # Test cases for generate_example_value - edge case scenario def test_generate_example_value_object(): param = Parameter(name="count", location="object", type="object") - value = generate_example_value(param) + value = param.example assert value == "default" diff --git a/src/tests/test_utils.py b/src/tests/test_utils.py new file mode 100644 index 0000000..bbd35b6 --- /dev/null +++ b/src/tests/test_utils.py @@ -0,0 +1,29 @@ +import re +from ..pentestkit.utils import generate_test_value + + +# Test cases for generate_test_value +def test_generate_test_value_valid_pattern(): + pattern = r"[a-z]{5}" + min_length = 5 + max_length = 5 + value = generate_test_value(pattern, min_length, max_length) + assert value is not None + assert len(value) == 5 + assert bool(re.fullmatch(pattern, value)) + + +def test_generate_test_value_invalid_pattern(): + pattern = r"[a-z]{5}" + min_length = 6 + max_length = 10 + value = generate_test_value(pattern, min_length, max_length) + assert value is "example" + + +def test_generate_test_value_exception_handling(): + pattern = r"[a-z]{5}" + min_length = "5" + max_length = "10" + value = generate_test_value(pattern, min_length, max_length) + assert value is "example"