diff --git a/CHANGELOG.md b/CHANGELOG.md index 9160be91..fd306dae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 Red Hat Tech Preview release, based on upstream [sigstore/model-transparency](https://github.com/sigstore/model-transparency) v1.1.1. ### Added -- Added support for signing and verifying OCI model manifests directly without requiring model files on disk. OCI manifest JSON files can be detected and signed, or verified against. When verifying local files against signatures created from OCI manifests, the tool automatically matches files by path using `org.opencontainers.image.title` annotations (ORAS-style), enabling cross-verification between OCI images and local model directories. +- Added support for signing and verifying OCI model manifests directly without requiring model files on disk. OCI manifest JSON files can be detected and signed, or verified against. +- Added OCI image signing and verification. Sign and verify container images directly in registries using `model_signing sign sigstore quay.io/user/model:latest`. Supports both OCI 1.1 Referrers API and tag-based attachment. +- Added smart target detection for CLI commands. The tool auto-detects the target type: if the path exists locally, it is signed/verified as a file; otherwise, it is treated as an OCI image reference. +- Added `--local-model` option to verify that local files match a signed image's layer digests. +- Added `sign_image()` and `verify_image()` methods to the Python API. - Added the `digest` subcommand to compute and print a model's digest. This enables other tools to easily pair the attestations with a model directory. - Package renamed to `rh-model-signing` for Red Hat distribution. - Added `rh_model_signing` CLI entry point (in addition to `model_signing`). diff --git a/README.md b/README.md index 78966380..a90570de 100644 --- a/README.md +++ b/README.md @@ -215,66 +215,84 @@ Similarly, for key verification, we can use #### Signing and Verifying OCI Images -The tool supports signing and verifying OCI model images directly from their manifest without requiring the model files on disk. This is useful for signing images in registries without pulling them. - -**Signing from OCI Manifest:** +The tool supports signing and verifying OCI container images directly from +registries. Signatures are automatically attached to the registry. ```bash -# Get the OCI manifest (from skopeo inspect --raw) -[...]$ skopeo inspect --raw docker://quay.io/user/model:latest > manifest.json +# Sign with Sigstore +[...]$ model_signing sign sigstore quay.io/user/model:latest + +# Sign with EC key +[...]$ model_signing sign key quay.io/user/model:latest --private-key key.pem +``` + +Registry authentication uses your existing credentials from `~/.docker/config.json` +or podman's `auth.json`. -# Sign using the manifest -[...]$ model_signing sign manifest.json +By default, signatures are attached using the OCI 1.1 Referrers API. For older +registries, use `--attachment-mode tag`: + +```bash +[...]$ model_signing sign sigstore quay.io/user/model:latest --attachment-mode tag ``` -**Verifying OCI Images:** +Use `--output-mode` to control where signatures are written: + +```bash +# Write signature to file only (no registry attachment) +[...]$ model_signing sign sigstore quay.io/user/model:latest \ + --output-mode file --signature model.sig + +# Attach to registry AND write to file +[...]$ model_signing sign sigstore quay.io/user/model:latest \ + --output-mode both --signature model.sig +``` -You can verify in two ways: +To verify: -1. **Against the OCI manifest** (no files needed): ```bash -[...]$ model_signing verify manifest.json \ - --signature model.sig \ +# Verify Sigstore signature +[...]$ model_signing verify sigstore quay.io/user/model:latest \ --identity user@example.com \ - --identity_provider https://accounts.google.com + --identity-provider https://accounts.google.com + +# Verify key-based signature +[...]$ model_signing verify key quay.io/user/model:latest --public-key key.pub ``` -2. **Against local model files** (automatically detects OCI layer signatures): +You can also verify that local files match a signed image: + ```bash -[...]$ model_signing verify model_dir \ - --signature model.sig \ +[...]$ model_signing verify sigstore quay.io/user/model:latest \ --identity user@example.com \ - --identity_provider https://accounts.google.com + --identity-provider https://accounts.google.com \ + --local-model ./downloaded-model ``` -The tool automatically detects OCI manifest signatures and matches files by path using `org.opencontainers.image.title` annotations (ORAS-style). For multi-layer images, verification against local files attempts to match individual files by path. +The tool auto-detects the target type: if the path exists locally, it is treated +as a file; otherwise, it is treated as an OCI image reference. -**Python API:** +##### Python API ```python -import json -from model_signing import hashing, signing, verifying - -# Sign from OCI manifest -with open("manifest.json") as f: - oci_data = json.load(f) +import model_signing -manifest = hashing.create_manifest_from_oci_layers(oci_data) -signing.Config().use_sigstore_signer().sign_from_manifest( - manifest, "model.sig" +# Sign an image +model_signing.signing.Config().use_sigstore_signer().sign_image( + "quay.io/user/model:latest" ) -# Verify from OCI manifest -verifying.Config().use_sigstore_verifier( +# Verify an image +model_signing.verifying.Config().use_sigstore_verifier( identity="user@example.com", oidc_issuer="https://accounts.google.com" -).verify_from_oci_manifest(oci_data, "model.sig") +).verify_image("quay.io/user/model:latest") -# Or verify from local files (automatically handles OCI signatures) -verifying.Config().use_sigstore_verifier( +# Verify image and check local files match +model_signing.verifying.Config().use_sigstore_verifier( identity="user@example.com", oidc_issuer="https://accounts.google.com" -).verify("model_dir", "model.sig") +).verify_image("quay.io/user/model:latest", local_model_path="./model_dir") ``` #### Signing with PKCS #11 URIs @@ -451,7 +469,7 @@ The same verification configuration can be used to verify multiple models: ```python import model_signing -verifying_config = model_signing.signing.Config().use_elliptic_key_verifier( +verifying_config = model_signing.verifying.Config().use_elliptic_key_verifier( public_key="key.pub" ) diff --git a/pyproject.toml b/pyproject.toml index 78462b0b..3dee5aab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ dependencies = [ "click", "cryptography", "in-toto-attestation", + "oras>=0.2.30", "sigstore>=4.0", "sigstore-models>=0.0.5", "typing_extensions", diff --git a/src/model_signing/_cli.py b/src/model_signing/_cli.py index f311b33e..028f6ebe 100644 --- a/src/model_signing/_cli.py +++ b/src/model_signing/_cli.py @@ -16,8 +16,6 @@ from collections.abc import Iterable, Sequence import contextlib -import enum -import json import logging import pathlib import sys @@ -25,6 +23,7 @@ import click import model_signing +from model_signing._oci.registry import ImageReference class NoOpTracer: @@ -44,73 +43,14 @@ def set_attribute(self, key, value): tracer = None -class PathType(enum.Enum): - """Type of path detected for model signing/verification.""" - - MODEL_DIRECTORY = "model_directory" - OCI_MANIFEST = "oci_manifest" - - -class ModelPath: - """Wrapper for a model directory path.""" - - def __init__(self, path: pathlib.Path): - """Initialize with a model directory path. - - Args: - path: Path to the model directory. - """ - self._path = path - - @property - def path(self) -> pathlib.Path: - """Get the model directory path.""" - return self._path - - def exists(self) -> bool: - """Check if the path exists.""" - return self._path.exists() - - -class OCIManifestPath: - """Wrapper for an OCI manifest JSON file path with cached JSON data.""" - - def __init__(self, path: pathlib.Path, data: dict): - """Initialize with an OCI manifest file path and parsed JSON data. - - Args: - path: Path to the OCI manifest JSON file. - data: Parsed JSON data from the manifest file. - """ - self._path = path - self._data = data - - @property - def path(self) -> pathlib.Path: - """Get the OCI manifest file path.""" - return self._path - - @property - def data(self) -> dict: - """Get the parsed JSON data from the manifest file.""" - return self._data - - @property - def model_name(self) -> str | None: - """Get the model name derived from the manifest file stem.""" - return self._path.stem if self._path else None - - def exists(self) -> bool: - """Check if the path exists.""" - return self._path.exists() - - -# Decorator for the commonly used argument for the model path or model manifest. -# This can be either a local model directory or an OCI manifest JSON file. +# Decorator for the commonly used argument for the model path. _model_path_argument = click.argument( - "model_path", type=pathlib.Path, metavar="MODEL_PATH_OR_MANIFEST" + "model_path", type=pathlib.Path, metavar="MODEL_PATH" ) +# Decorator for the target argument (image reference or local path). +_target_argument = click.argument("target", type=str, metavar="TARGET") + # Decorator for the commonly used option to set the signature path when signing. _write_signature_option = click.option( @@ -127,8 +67,7 @@ def exists(self) -> bool: "--signature", type=pathlib.Path, metavar="SIGNATURE_PATH", - required=True, - help="Location of the signature file to verify.", + help="Location of the signature file (required for file targets).", ) # Decorator for the commonly used option for the custom trust configuration. @@ -219,39 +158,45 @@ def exists(self) -> bool: help="Whether to allow following symlinks when signing or verifying files.", ) +# Decorator for the attachment mode option (OCI image signing). +_attachment_mode_option = click.option( + "--attachment-mode", + type=click.Choice(["referrers", "tag"], case_sensitive=False), + default="referrers", + show_default=True, + help=( + "How to attach the signature to the registry. " + "'referrers' uses OCI 1.1 Referrers API (recommended). " + "'tag' uses tag-based attachment (sha256-DIGEST.sig)." + ), +) -def _detect_path_type( - path: pathlib.Path, -) -> tuple[PathType, ModelPath | OCIManifestPath]: - """Detect if a path is a model directory or an OCI manifest file. - - Args: - path: The path to detect. - - Returns: - A tuple of (PathType, wrapper_object) where wrapper_object is either - ModelPath or OCIManifestPath. The OCIManifestPath caches the parsed - JSON data to avoid loading it twice. - - Raises: - ValueError: If the path is None or invalid. - """ - if path is None: - raise ValueError("Path cannot be None") - - if path.is_file() and path.suffix.lower() == ".json": - try: - with open(path, "r", encoding="utf-8") as f: - data = json.load(f) - if isinstance(data, dict) and ( - "layers" in data or "schemaVersion" in data - ): - return (PathType.OCI_MANIFEST, OCIManifestPath(path, data)) - except (json.JSONDecodeError, OSError): - pass +# Decorator for the local model verification option. +_local_model_option = click.option( + "--local-model", + type=pathlib.Path, + metavar="LOCAL_MODEL_PATH", + default=None, + help=( + "Path to local model files for additional verification. " + "When verifying an image, also checks that local files match " + "the signed layer digests." + ), +) - # Default model dir - return (PathType.MODEL_DIRECTORY, ModelPath(path)) +# Decorator for the output mode option (OCI image signing). +_output_mode_option = click.option( + "--output-mode", + type=click.Choice(["registry", "file", "both"], case_sensitive=False), + default="registry", + show_default=True, + help=( + "Where to output the signature for image targets. " + "'registry' attaches to the OCI registry (default). " + "'file' writes to disk only (requires --signature). " + "'both' attaches to registry AND writes to disk." + ), +) def _resolve_ignore_paths( @@ -269,6 +214,49 @@ def _resolve_ignore_paths( return resolved_paths +def _handle_image_signing( + config: "model_signing.signing.Config", + image_ref: ImageReference, + attachment_mode: str, + output_mode: str, + signature: pathlib.Path, +) -> None: + """Handle common image signing logic for all signing methods. + + Args: + config: The signing configuration with signer already set. + image_ref: The parsed image reference to sign. + attachment_mode: How to attach signature ("referrers" or "tag"). + output_mode: Where to output ("registry", "file", or "both"). + signature: Path for signature file output. + """ + write_to_file = output_mode.lower() in ("file", "both") + attach_to_registry = output_mode.lower() in ("registry", "both") + + if attach_to_registry: + if attachment_mode.lower() == "tag": + digest = image_ref.digest or "DIGEST" + sig_tag = digest.replace(":", "-") + ".sig" + click.echo( + f"Pushing signature to: {image_ref.registry}/" + f"{image_ref.repository}:{sig_tag}" + ) + else: + click.echo(f"Pushing signature to: {image_ref} (referrers API)") + + sig_digest = config.sign_image( + image_ref, + attachment_mode=attachment_mode, + signature_path=signature if write_to_file else None, + attach=attach_to_registry, + ) + + if attach_to_registry: + click.echo(f"Signature pushed: {sig_digest}") + if write_to_file: + click.echo(f"Signature written to: {signature}") + + class _PKICmdGroup(click.Group): """A custom group to configure the supported PKI methods.""" @@ -414,13 +402,14 @@ def _sign() -> None: """Sign models. Produces a cryptographic signature (in the form of a Sigstore bundle) for a - model. We support any model format, either as a single file or as a - directory. + model. Supports both local files/directories and OCI images. + + TARGET can be either: + - A local file/directory path (e.g., ./my-model) + - An OCI image reference (e.g., quay.io/user/model:latest) - You can provide either: - - A local model path (directory or file) to sign the model files directly - - An OCI image manifest JSON file (from 'skopeo inspect --raw') to sign - the image layers without requiring the model files on disk + The tool auto-detects the target type: if the path exists locally, it is + signed as a file; otherwise, it is treated as an OCI image reference. We support multiple PKI methods, specified as subcommands. By default, the signature is generated via Sigstore (as if invoking `sigstore` subcommand). @@ -430,11 +419,13 @@ def _sign() -> None: @_sign.command(name="sigstore") -@_model_path_argument +@_target_argument @_ignore_paths_option @_ignore_git_paths_option @_allow_symlinks_option @_write_signature_option +@_attachment_mode_option +@_output_mode_option @_sigstore_staging_option @_trust_config_option @click.option( @@ -474,11 +465,13 @@ def _sign() -> None: help="The custom OpenID Connect client secret to use during OAuth2", ) def _sign_sigstore( - model_path: pathlib.Path, + target: str, ignore_paths: Iterable[pathlib.Path], ignore_git_paths: bool, allow_symlinks: bool, signature: pathlib.Path, + attachment_mode: str, + output_mode: str, use_ambient_credentials: bool, use_staging: bool, oauth_force_oob: bool, @@ -489,110 +482,84 @@ def _sign_sigstore( ) -> None: """Sign using Sigstore (DEFAULT signing method). - Signing the model at MODEL_PATH, produces the signature at SIGNATURE_PATH - (as per `--signature` option). Files in IGNORE_PATHS are not part of the - signature. + TARGET can be a local file/directory or an OCI image reference. + If the path exists locally, it's signed as a file. Otherwise, it's + treated as an OCI image reference. - If using Sigstore, we need to provision an OIDC token. In general, this is - taken from an interactive OIDC flow, but ambient credentials could be used - to use workload identity tokens (e.g., when running in GitHub actions). - Alternatively, a constant identity token can be provided via - `--identity-token`. - - Sigstore allows users to use a staging instance for test-only signatures. - Passing the `--use-staging` flag would use that instance instead of the - production one. - - Additionally, you can specify a custom trust configuration JSON file using - the `--trust-config` flag. This allows you to fully customize the PKI - (Private Key Infrastructure) used in the signing process. By providing a - `--trust-config`, you can define your own transparency logs, certificate - authorities, and other trust settings, enabling full control over the - trust model, including which PKI to use for signature verification. - - If `--trust-config` is not provided, the default Sigstore instance is - used, which is pre-configured with Sigstore’s own trusted transparency - logs and certificate authorities. This provides a ready-to-use default - trust model for most use cases but may not be suitable for custom or - highly regulated environments. + For local files: Creates a signature file (default: model.sig). + + For images: Use --output-mode to control where the signature is stored: + - 'registry' (default): Attaches signature to the OCI registry + - 'file': Writes signature to disk only (use --signature for path) + - 'both': Attaches to registry AND writes to disk + + Sigstore requires an OIDC token for signing. By default, this is obtained + via an interactive browser flow. Use --use-ambient-credentials for workload + identity tokens (e.g., GitHub Actions), or --identity-token to provide a + fixed token. + + Use --use-staging for test signatures against Sigstore's staging instance. + + Use --trust-config to specify a custom PKI configuration with your own + transparency logs and certificate authorities. If not provided, the default + Sigstore production instance is used. """ + is_file = pathlib.Path(target).exists() + with tracer.start_as_current_span("Sign") as span: span.set_attribute("sigstore.sign_method", "sigstore") - span.set_attribute("sigstore.signature", str(signature)) + target_type = "file" if is_file else "image" + span.set_attribute("sigstore.target_type", target_type) span.set_attribute( "sigstore.use_ambient_credentials", use_ambient_credentials ) span.set_attribute("sigstore.use_staging", use_staging) - try: - path_type, path_wrapper = _detect_path_type(model_path) - - if path_type == PathType.OCI_MANIFEST: - oci_manifest = path_wrapper - assert isinstance(oci_manifest, OCIManifestPath) - if not oci_manifest.exists(): - click.echo( - f"OCI manifest file not found: {oci_manifest.path}", - err=True, - ) - sys.exit(1) - span.set_attribute( - "sigstore.oci_manifest", str(oci_manifest.path) - ) - - model_manifest = ( - model_signing.hashing.create_manifest_from_oci_layers( - oci_manifest.data, model_name=oci_manifest.model_name - ) - ) - - model_signing.signing.Config().use_sigstore_signer( - use_ambient_credentials=use_ambient_credentials, - use_staging=use_staging, - identity_token=identity_token, - force_oob=oauth_force_oob, - client_id=client_id, - client_secret=client_secret, - trust_config=trust_config, - ).sign_from_manifest(model_manifest, signature) - else: - model_path_obj = path_wrapper - assert isinstance(model_path_obj, ModelPath) + try: + config = model_signing.signing.Config().use_sigstore_signer( + use_ambient_credentials=use_ambient_credentials, + use_staging=use_staging, + identity_token=identity_token, + force_oob=oauth_force_oob, + client_id=client_id, + client_secret=client_secret, + trust_config=trust_config, + ) - span.set_attribute( - "sigstore.model_path", str(model_path_obj.path) - ) + if is_file: + model_path = pathlib.Path(target) + span.set_attribute("sigstore.model_path", str(model_path)) + span.set_attribute("sigstore.signature", str(signature)) ignored = _resolve_ignore_paths( - model_path_obj.path, list(ignore_paths) + [signature] + model_path, list(ignore_paths) + [signature] ) - model_signing.signing.Config().use_sigstore_signer( - use_ambient_credentials=use_ambient_credentials, - use_staging=use_staging, - identity_token=identity_token, - force_oob=oauth_force_oob, - client_id=client_id, - client_secret=client_secret, - trust_config=trust_config, - ).set_hashing_config( + config.set_hashing_config( model_signing.hashing.Config() .set_ignored_paths( paths=ignored, ignore_git_paths=ignore_git_paths ) .set_allow_symlinks(allow_symlinks) - ).sign(model_path_obj.path, signature) + ).sign(model_path, signature) + click.echo("Signing succeeded") + else: + image_ref = ImageReference.parse(target) + _handle_image_signing( + config, image_ref, attachment_mode, output_mode, signature + ) + except Exception as err: click.echo(f"Signing failed with error: {err}", err=True) sys.exit(1) - click.echo("Signing succeeded") - @_sign.command(name="key") -@_model_path_argument +@_target_argument @_ignore_paths_option @_ignore_git_paths_option @_allow_symlinks_option @_write_signature_option +@_attachment_mode_option +@_output_mode_option @_private_key_option @click.option( "--password", @@ -600,71 +567,73 @@ def _sign_sigstore( metavar="PASSWORD", help="Password for the key encryption, if any", ) -def _sign_private_key( - model_path: pathlib.Path, +def _sign_key( + target: str, ignore_paths: Iterable[pathlib.Path], ignore_git_paths: bool, allow_symlinks: bool, signature: pathlib.Path, + attachment_mode: str, + output_mode: str, private_key: pathlib.Path, password: str | None = None, ) -> None: """Sign using a private key (paired with a public one). - Signing the model at MODEL_PATH_OR_MANIFEST, produces the signature at - SIGNATURE_PATH (as per `--signature` option). Files in IGNORE_PATHS are not - part of the signature. + TARGET can be a local file/directory or an OCI image reference. + If the path exists locally, it's signed as a file. Otherwise, it's + treated as an OCI image reference. - Traditionally, signing could be achieved by using a public/private key pair. - Pass the signing key using `--private_key`. + For local files: Creates a signature file (default: model.sig). - Note that this method does not provide a way to tie to the identity of the - signer, outside of pairing the keys. Also note that we don't offer key - management protocols. + For images: Use --output-mode to control where the signature is stored: + - 'registry' (default): Attaches signature to the OCI registry + - 'file': Writes signature to disk only (use --signature for path) + - 'both': Attaches to registry AND writes to disk + + The private key must be an elliptic curve key (NIST P-256, P-384, or P-521) + in PEM format. Use --password if the key is encrypted. Verification + requires the corresponding public key. + + Note: This method does not tie to a signer identity like Sigstore does. + Key management is the user's responsibility. """ - try: - path_type, path_wrapper = _detect_path_type(model_path) - - if path_type == PathType.OCI_MANIFEST: - oci_manifest = path_wrapper - assert isinstance(oci_manifest, OCIManifestPath) - if not oci_manifest.exists(): - click.echo( - f"OCI manifest file not found: {oci_manifest.path}", - err=True, - ) - sys.exit(1) + is_file = pathlib.Path(target).exists() - model_manifest = ( - model_signing.hashing.create_manifest_from_oci_layers( - oci_manifest.data, model_name=oci_manifest.model_name - ) - ) + with tracer.start_as_current_span("Sign") as span: + span.set_attribute("sigstore.sign_method", "key") + target_type = "file" if is_file else "image" + span.set_attribute("sigstore.target_type", target_type) - model_signing.signing.Config().use_elliptic_key_signer( + try: + config = model_signing.signing.Config().use_elliptic_key_signer( private_key=private_key, password=password - ).sign_from_manifest(model_manifest, signature) - else: - model_path_obj = path_wrapper - assert isinstance(model_path_obj, ModelPath) - - ignored = _resolve_ignore_paths( - model_path_obj.path, list(ignore_paths) + [signature] ) - model_signing.signing.Config().use_elliptic_key_signer( - private_key=private_key, password=password - ).set_hashing_config( - model_signing.hashing.Config() - .set_ignored_paths( - paths=ignored, ignore_git_paths=ignore_git_paths + + if is_file: + model_path = pathlib.Path(target) + span.set_attribute("sigstore.model_path", str(model_path)) + span.set_attribute("sigstore.signature", str(signature)) + ignored = _resolve_ignore_paths( + model_path, list(ignore_paths) + [signature] + ) + config.set_hashing_config( + model_signing.hashing.Config() + .set_ignored_paths( + paths=ignored, ignore_git_paths=ignore_git_paths + ) + .set_allow_symlinks(allow_symlinks) + ).sign(model_path, signature) + click.echo("Signing succeeded") + else: + image_ref = ImageReference.parse(target) + _handle_image_signing( + config, image_ref, attachment_mode, output_mode, signature ) - .set_allow_symlinks(allow_symlinks) - ).sign(model_path_obj.path, signature) - except Exception as err: - click.echo(f"Signing failed with error: {err}", err=True) - sys.exit(1) - click.echo("Signing succeeded") + except Exception as err: + click.echo(f"Signing failed with error: {err}", err=True) + sys.exit(1) @_sign.command(name="pkcs11-key") @@ -684,55 +653,28 @@ def _sign_pkcs11_key( ) -> None: """Sign using a private key using a PKCS #11 URI. - Signing the model at MODEL_PATH_OR_MANIFEST, produces the signature at - SIGNATURE_PATH (as per `--signature` option). Files in IGNORE_PATHS are not - part of the signature. + Signing the model at MODEL_PATH, produces the signature at SIGNATURE_PATH + (as per `--signature` option). Files in IGNORE_PATHS are not part of the + signature. Traditionally, signing could be achieved by using a public/private key pair. - Pass the PKCS #11 URI of the signing key using `--pkcs11_uri`. + Pass the PKCS #11 URI of the signing key using `--pkcs11-uri`. Note that this method does not provide a way to tie to the identity of the signer, outside of pairing the keys. Also note that we don't offer key management protocols. """ try: - path_type, path_wrapper = _detect_path_type(model_path) - - if path_type == PathType.OCI_MANIFEST: - oci_manifest = path_wrapper - assert isinstance(oci_manifest, OCIManifestPath) - if not oci_manifest.exists(): - click.echo( - f"OCI manifest file not found: {oci_manifest.path}", - err=True, - ) - sys.exit(1) - - model_manifest = ( - model_signing.hashing.create_manifest_from_oci_layers( - oci_manifest.data, model_name=oci_manifest.model_name - ) - ) - - model_signing.signing.Config().use_pkcs11_signer( - pkcs11_uri=pkcs11_uri - ).sign_from_manifest(model_manifest, signature) - else: - model_path_obj = path_wrapper - assert isinstance(model_path_obj, ModelPath) - - ignored = _resolve_ignore_paths( - model_path_obj.path, list(ignore_paths) + [signature] - ) - model_signing.signing.Config().use_pkcs11_signer( - pkcs11_uri=pkcs11_uri - ).set_hashing_config( - model_signing.hashing.Config() - .set_ignored_paths( - paths=ignored, ignore_git_paths=ignore_git_paths - ) - .set_allow_symlinks(allow_symlinks) - ).sign(model_path_obj.path, signature) + ignored = _resolve_ignore_paths( + model_path, list(ignore_paths) + [signature] + ) + model_signing.signing.Config().use_pkcs11_signer( + pkcs11_uri=pkcs11_uri + ).set_hashing_config( + model_signing.hashing.Config() + .set_ignored_paths(paths=ignored, ignore_git_paths=ignore_git_paths) + .set_allow_symlinks(allow_symlinks) + ).sign(model_path, signature) except Exception as err: click.echo(f"Signing failed with error: {err}", err=True) sys.exit(1) @@ -761,62 +703,33 @@ def _sign_certificate( ) -> None: """Sign using a certificate. - Signing the model at MODEL_PATH_OR_MANIFEST, produces the signature at - SIGNATURE_PATH (as per `--signature` option). Files in IGNORE_PATHS are not - part of the signature. + Signing the model at MODEL_PATH, produces the signature at SIGNATURE_PATH + (as per `--signature` option). Files in IGNORE_PATHS are not part of the + signature. Traditionally, signing can be achieved by using keys from a certificate. The certificate can also provide the identity of the signer, making this method more informative than just using a public/private key pair for - signing. Pass the private signing key using `--private_key` and signing - certificate via `--signing_certificate`. Optionally, pass a certificate - chain via `--certificate_chain` to establish root of trust (this option can + signing. Pass the private signing key using `--private-key` and signing + certificate via `--signing-certificate`. Optionally, pass a certificate + chain via `--certificate-chain` to establish root of trust (this option can be repeated as needed, or all cerificates could be placed in a single file). Note that we don't offer certificate and key management protocols. """ try: - path_type, path_wrapper = _detect_path_type(model_path) - - if path_type == PathType.OCI_MANIFEST: - oci_manifest = path_wrapper - assert isinstance(oci_manifest, OCIManifestPath) - if not oci_manifest.exists(): - click.echo( - f"OCI manifest file not found: {oci_manifest.path}", - err=True, - ) - sys.exit(1) - - model_manifest = ( - model_signing.hashing.create_manifest_from_oci_layers( - oci_manifest.data, model_name=oci_manifest.model_name - ) - ) - - model_signing.signing.Config().use_certificate_signer( - private_key=private_key, - signing_certificate=signing_certificate, - certificate_chain=certificate_chain, - ).sign_from_manifest(model_manifest, signature) - else: - model_path_obj = path_wrapper - assert isinstance(model_path_obj, ModelPath) - - ignored = _resolve_ignore_paths( - model_path_obj.path, list(ignore_paths) + [signature] - ) - model_signing.signing.Config().use_certificate_signer( - private_key=private_key, - signing_certificate=signing_certificate, - certificate_chain=certificate_chain, - ).set_hashing_config( - model_signing.hashing.Config() - .set_ignored_paths( - paths=ignored, ignore_git_paths=ignore_git_paths - ) - .set_allow_symlinks(allow_symlinks) - ).sign(model_path_obj.path, signature) + ignored = _resolve_ignore_paths( + model_path, list(ignore_paths) + [signature] + ) + model_signing.signing.Config().use_certificate_signer( + private_key=private_key, + signing_certificate=signing_certificate, + certificate_chain=certificate_chain, + ).set_hashing_config( + model_signing.hashing.Config() + .set_ignored_paths(paths=ignored, ignore_git_paths=ignore_git_paths) + .set_allow_symlinks(allow_symlinks) + ).sign(model_path, signature) except Exception as err: click.echo(f"Signing failed with error: {err}", err=True) sys.exit(1) @@ -845,63 +758,34 @@ def _sign_pkcs11_certificate( ) -> None: """Sign using a certificate. - Signing the model at MODEL_PATH_OR_MANIFEST, produces the signature at - SIGNATURE_PATH (as per `--signature` option). Files in IGNORE_PATHS are not - part of the signature. + Signing the model at MODEL_PATH, produces the signature at SIGNATURE_PATH + (as per `--signature` option). Files in IGNORE_PATHS are not part of the + signature. Traditionally, signing can be achieved by using keys from a certificate. The certificate can also provide the identity of the signer, making this method more informative than just using a public/private key pair for signing. Pass the PKCS #11 URI of the private signing key using - `--pkcs11_uri` and then signing certificate via `--signing_certificate`. - Optionally, pass a certificate chain via `--certificate_chain` to establish + `--pkcs11-uri` and then signing certificate via `--signing-certificate`. + Optionally, pass a certificate chain via `--certificate-chain` to establish root of trust (this option can be repeated as needed, or all cerificates could be placed in a single file). Note that we don't offer certificate and key management protocols. """ try: - path_type, path_wrapper = _detect_path_type(model_path) - - if path_type == PathType.OCI_MANIFEST: - oci_manifest = path_wrapper - assert isinstance(oci_manifest, OCIManifestPath) - if not oci_manifest.exists(): - click.echo( - f"OCI manifest file not found: {oci_manifest.path}", - err=True, - ) - sys.exit(1) - - model_manifest = ( - model_signing.hashing.create_manifest_from_oci_layers( - oci_manifest.data, model_name=oci_manifest.model_name - ) - ) - - model_signing.signing.Config().use_pkcs11_certificate_signer( - pkcs11_uri=pkcs11_uri, - signing_certificate=signing_certificate, - certificate_chain=certificate_chain, - ).sign_from_manifest(model_manifest, signature) - else: - model_path_obj = path_wrapper - assert isinstance(model_path_obj, ModelPath) - - ignored = _resolve_ignore_paths( - model_path_obj.path, list(ignore_paths) + [signature] - ) - model_signing.signing.Config().use_pkcs11_certificate_signer( - pkcs11_uri=pkcs11_uri, - signing_certificate=signing_certificate, - certificate_chain=certificate_chain, - ).set_hashing_config( - model_signing.hashing.Config() - .set_ignored_paths( - paths=ignored, ignore_git_paths=ignore_git_paths - ) - .set_allow_symlinks(allow_symlinks) - ).sign(model_path_obj.path, signature) + ignored = _resolve_ignore_paths( + model_path, list(ignore_paths) + [signature] + ) + model_signing.signing.Config().use_pkcs11_certificate_signer( + pkcs11_uri=pkcs11_uri, + signing_certificate=signing_certificate, + certificate_chain=certificate_chain, + ).set_hashing_config( + model_signing.hashing.Config() + .set_ignored_paths(paths=ignored, ignore_git_paths=ignore_git_paths) + .set_allow_symlinks(allow_symlinks) + ).sign(model_path, signature) except Exception as err: click.echo(f"Signing failed with error: {err}", err=True) sys.exit(1) @@ -914,14 +798,15 @@ def _verify() -> None: """Verify models. Given a model and a cryptographic signature (in the form of a Sigstore - bundle) for the model, this call checks that the model matches the - signature, that the model has not been tampered with. We support any model - format, either as a single file or as a directory. + bundle), this verifies that the model matches the signature and has not + been tampered with. Supports both local files/directories and OCI images. + + TARGET can be either: + - A local file/directory path (e.g., ./my-model) + - An OCI image reference (e.g., quay.io/user/model:latest) - You can provide either: - - A local model path (directory or file) to verify the model files directly - - An OCI image manifest JSON file (from 'skopeo inspect --raw') to verify - against the image layers without requiring the model files on disk + The tool auto-detects the target type: if the path exists locally, it is + verified as a file; otherwise, it is treated as an OCI image reference. We support multiple PKI methods, specified as subcommands. By default, the signature is assumed to be generated via Sigstore (as if invoking `sigstore` @@ -938,11 +823,13 @@ def _verify() -> None: @_verify.command(name="sigstore") -@_model_path_argument +@_target_argument @_read_signature_option @_ignore_paths_option @_ignore_git_paths_option @_allow_symlinks_option +@_attachment_mode_option +@_local_model_option @_sigstore_staging_option @_trust_config_option @click.option( @@ -961,93 +848,117 @@ def _verify() -> None: ) @_ignore_unsigned_files_option def _verify_sigstore( - model_path: pathlib.Path, - signature: pathlib.Path, + target: str, + signature: pathlib.Path | None, ignore_paths: Iterable[pathlib.Path], ignore_git_paths: bool, allow_symlinks: bool, + attachment_mode: str, + local_model: pathlib.Path | None, identity: str, identity_provider: str, use_staging: bool, ignore_unsigned_files: bool, trust_config: pathlib.Path | None = None, ) -> None: - """Verify using Sigstore (DEFAULT verification method). + r"""Verify using Sigstore (DEFAULT verification method). - Verifies the integrity of model at MODEL_PATH, according to signature from - SIGNATURE_PATH (given via `--signature` option). Files in IGNORE_PATHS are - ignored. + TARGET can be a local file/directory or an OCI image reference. + If the path exists locally, it's verified as a file. Otherwise, it's + treated as an OCI image reference. + + For local files: Requires --signature option. + For images: Fetches signature from registry. - For Sigstore, we also need to provide an expected identity and identity - provider for the signature. If these don't match what is provided in the - signature, verification would fail. + The --identity and --identity-provider must match the signer's identity + from the OIDC token used during signing. Common providers include: + - Google: https://accounts.google.com + - GitHub: https://github.com/login/oauth + - GitHub Actions: https://token.actions.githubusercontent.com + - Microsoft: https://login.microsoftonline.com + + Use --use-staging if the signature was created with Sigstore's staging + instance. Use --trust-config for custom PKI configurations. """ + is_file = pathlib.Path(target).exists() + with tracer.start_as_current_span("Verify") as span: span.set_attribute("sigstore.method", "sigstore") - span.set_attribute("sigstore.signature", str(signature)) + target_type = "file" if is_file else "image" + span.set_attribute("sigstore.target_type", target_type) span.set_attribute("sigstore.identity", identity) span.set_attribute("sigstore.oidc_issuer", identity_provider) span.set_attribute("sigstore.use_staging", use_staging) - try: - path_type, path_wrapper = _detect_path_type(model_path) - - if path_type == PathType.OCI_MANIFEST: - oci_manifest = path_wrapper - assert isinstance(oci_manifest, OCIManifestPath) - if not oci_manifest.exists(): - click.echo( - f"OCI manifest file not found: {oci_manifest.path}", - err=True, - ) - sys.exit(1) - span.set_attribute( - "sigstore.oci_manifest", str(oci_manifest.path) - ) - - model_signing.verifying.Config().use_sigstore_verifier( - identity=identity, - oidc_issuer=identity_provider, - use_staging=use_staging, - trust_config=trust_config, - ).verify_from_oci_manifest(oci_manifest.data, signature) - else: - model_path_obj = path_wrapper - assert isinstance(model_path_obj, ModelPath) + try: + config = model_signing.verifying.Config().use_sigstore_verifier( + identity=identity, + oidc_issuer=identity_provider, + use_staging=use_staging, + trust_config=trust_config, + ) - span.set_attribute( - "sigstore.model_path", str(model_path_obj.path) - ) + if is_file: + if signature is None: + raise click.UsageError( + "--signature is required when verifying local files" + ) + model_path = pathlib.Path(target) + span.set_attribute("sigstore.model_path", str(model_path)) + span.set_attribute("sigstore.signature", str(signature)) + click.echo(f"Verifying: {model_path}") + click.echo(f"Signature: {signature}") ignored = _resolve_ignore_paths( - model_path_obj.path, list(ignore_paths) + [signature] + model_path, list(ignore_paths) + [signature] ) - model_signing.verifying.Config().use_sigstore_verifier( - identity=identity, - oidc_issuer=identity_provider, - use_staging=use_staging, - trust_config=trust_config, - ).set_hashing_config( + config.set_hashing_config( model_signing.hashing.Config() .set_ignored_paths( paths=ignored, ignore_git_paths=ignore_git_paths ) .set_allow_symlinks(allow_symlinks) ).set_ignore_unsigned_files(ignore_unsigned_files).verify( - model_path_obj.path, signature + model_path, signature ) + else: + image_ref = ImageReference.parse(target) + click.echo(f"Verifying: {image_ref}") + use_default = attachment_mode == "referrers" + mode = None if use_default else attachment_mode + if mode == "tag": + click.echo("Fetching signature from tag...") + elif mode is None: + click.echo("Fetching signature from registry...") + else: + click.echo("Fetching signature via referrers API...") + config.verify_image( + image_ref, + local_model_path=local_model, + attachment_mode=mode, + ignore_git_paths=ignore_git_paths, + ) + if local_model: + click.echo(f"Local files verified: {local_model}") + + click.echo("\nThe following checks were performed:") + click.echo(" - Signature verified against Sigstore bundle") + click.echo(" - Signing identity matched") + click.echo(" - OIDC issuer matched") + click.echo("\nVerification succeeded") + except Exception as err: - click.echo(f"Verification failed with error: {err}", err=True) + click.echo(f"Verification failed:\n{err}", err=True) sys.exit(1) - click.echo("Verification succeeded") - @_verify.command(name="key") -@_model_path_argument +@_target_argument @_read_signature_option @_ignore_paths_option @_ignore_git_paths_option @_allow_symlinks_option +@_attachment_mode_option +@_local_model_option @click.option( "--public-key", type=pathlib.Path, @@ -1056,67 +967,91 @@ def _verify_sigstore( help="Path to the public key used for verification.", ) @_ignore_unsigned_files_option -def _verify_private_key( - model_path: pathlib.Path, - signature: pathlib.Path, +def _verify_key( + target: str, + signature: pathlib.Path | None, ignore_paths: Iterable[pathlib.Path], ignore_git_paths: bool, allow_symlinks: bool, + attachment_mode: str, + local_model: pathlib.Path | None, public_key: pathlib.Path, ignore_unsigned_files: bool, ) -> None: - """Verify using a public key (paired with a private one). + r"""Verify using a public key (paired with a private one). - Verifies the integrity of model at MODEL_PATH_OR_MANIFEST, according to - signature from SIGNATURE_PATH (given via `--signature` option). Files in - IGNORE_PATHS are ignored. + TARGET can be a local file/directory or an OCI image reference. + If the path exists locally, it's verified as a file. Otherwise, it's + treated as an OCI image reference. - The public key provided via `--public_key` must have been paired with the - private key used when generating the signature. + For local files: Requires --signature option. + For images: Fetches signature from registry. - Note that this method does not provide a way to tie to the identity of the - signer, outside of pairing the keys. Also note that we don't offer key - management protocols. + The public key must correspond to the private key used for signing. It can + be in PEM format (file) or raw/compressed format. Supported curves are + NIST P-256, P-384, and P-521. """ - try: - path_type, path_wrapper = _detect_path_type(model_path) - - if path_type == PathType.OCI_MANIFEST: - oci_manifest = path_wrapper - assert isinstance(oci_manifest, OCIManifestPath) - if not oci_manifest.exists(): - click.echo( - f"OCI manifest file not found: {oci_manifest.path}", - err=True, - ) - sys.exit(1) + is_file = pathlib.Path(target).exists() - model_signing.verifying.Config().use_elliptic_key_verifier( - public_key=public_key - ).verify_from_oci_manifest(oci_manifest.data, signature) - else: - model_path_obj = path_wrapper - assert isinstance(model_path_obj, ModelPath) + with tracer.start_as_current_span("Verify") as span: + span.set_attribute("sigstore.method", "key") + target_type = "file" if is_file else "image" + span.set_attribute("sigstore.target_type", target_type) - ignored = _resolve_ignore_paths( - model_path_obj.path, list(ignore_paths) + [signature] - ) - model_signing.verifying.Config().use_elliptic_key_verifier( + try: + config = model_signing.verifying.Config().use_elliptic_key_verifier( public_key=public_key - ).set_hashing_config( - model_signing.hashing.Config() - .set_ignored_paths( - paths=ignored, ignore_git_paths=ignore_git_paths - ) - .set_allow_symlinks(allow_symlinks) - ).set_ignore_unsigned_files(ignore_unsigned_files).verify( - model_path_obj.path, signature ) - except Exception as err: - click.echo(f"Verification failed with error: {err}", err=True) - sys.exit(1) - click.echo("Verification succeeded") + if is_file: + if signature is None: + raise click.UsageError( + "--signature is required when verifying local files" + ) + model_path = pathlib.Path(target) + span.set_attribute("sigstore.model_path", str(model_path)) + span.set_attribute("sigstore.signature", str(signature)) + click.echo(f"Verifying: {model_path}") + click.echo(f"Signature: {signature}") + ignored = _resolve_ignore_paths( + model_path, list(ignore_paths) + [signature] + ) + config.set_hashing_config( + model_signing.hashing.Config() + .set_ignored_paths( + paths=ignored, ignore_git_paths=ignore_git_paths + ) + .set_allow_symlinks(allow_symlinks) + ).set_ignore_unsigned_files(ignore_unsigned_files).verify( + model_path, signature + ) + else: + image_ref = ImageReference.parse(target) + click.echo(f"Verifying: {image_ref}") + use_default = attachment_mode == "referrers" + mode = None if use_default else attachment_mode + if mode == "tag": + click.echo("Fetching signature from tag...") + elif mode is None: + click.echo("Fetching signature from registry...") + else: + click.echo("Fetching signature via referrers API...") + config.verify_image( + image_ref, + local_model_path=local_model, + attachment_mode=mode, + ignore_git_paths=ignore_git_paths, + ) + if local_model: + click.echo(f"Local files verified: {local_model}") + + click.echo("\nThe following checks were performed:") + click.echo(" - Signature verified against public key") + click.echo("\nVerification succeeded") + + except Exception as err: + click.echo(f"Verification failed:\n{err}", err=True) + sys.exit(1) @_verify.command(name="certificate") @@ -1147,13 +1082,13 @@ def _verify_certificate( ) -> None: """Verify using a certificate. - Verifies the integrity of model at MODEL_PATH_OR_MANIFEST, according to - signature from SIGNATURE_PATH (given via `--signature` option). Files in - IGNORE_PATHS are ignored. + Verifies the integrity of model at MODEL_PATH, according to signature from + SIGNATURE_PATH (given via `--signature` option). Files in IGNORE_PATHS are + ignored. The signing certificate is encoded in the signature, as part of the Sigstore bundle. To verify the root of trust, pass additional certificates in the - certificate chain, using `--certificate_chain` (this option can be repeated + certificate chain, using `--certificate-chain` (this option can be repeated as needed, or all certificates could be placed in a single file). Note that we don't offer certificate and key management protocols. @@ -1162,43 +1097,21 @@ def _verify_certificate( logging.basicConfig(format="%(message)s", level=logging.INFO) try: - path_type, path_wrapper = _detect_path_type(model_path) - - if path_type == PathType.OCI_MANIFEST: - oci_manifest = path_wrapper - assert isinstance(oci_manifest, OCIManifestPath) - if not oci_manifest.exists(): - click.echo( - f"OCI manifest file not found: {oci_manifest.path}", - err=True, - ) - sys.exit(1) - - model_signing.verifying.Config().use_certificate_verifier( - certificate_chain=certificate_chain, - log_fingerprints=log_fingerprints, - ).verify_from_oci_manifest(oci_manifest.data, signature) - else: - model_path_obj = path_wrapper - assert isinstance(model_path_obj, ModelPath) - - ignored = _resolve_ignore_paths( - model_path_obj.path, list(ignore_paths) + [signature] - ) - model_signing.verifying.Config().use_certificate_verifier( - certificate_chain=certificate_chain, - log_fingerprints=log_fingerprints, - ).set_hashing_config( - model_signing.hashing.Config() - .set_ignored_paths( - paths=ignored, ignore_git_paths=ignore_git_paths - ) - .set_allow_symlinks(allow_symlinks) - ).set_ignore_unsigned_files(ignore_unsigned_files).verify( - model_path_obj.path, signature - ) + ignored = _resolve_ignore_paths( + model_path, list(ignore_paths) + [signature] + ) + model_signing.verifying.Config().use_certificate_verifier( + certificate_chain=certificate_chain, + log_fingerprints=log_fingerprints, + ).set_hashing_config( + model_signing.hashing.Config() + .set_ignored_paths(paths=ignored, ignore_git_paths=ignore_git_paths) + .set_allow_symlinks(allow_symlinks) + ).set_ignore_unsigned_files(ignore_unsigned_files).verify( + model_path, signature + ) except Exception as err: - click.echo(f"Verification failed with error: {err}", err=True) + click.echo(f"Verification failed:\n{err}", err=True) sys.exit(1) click.echo("Verification succeeded") diff --git a/src/model_signing/_oci/__init__.py b/src/model_signing/_oci/__init__.py new file mode 100644 index 00000000..e3fd3f34 --- /dev/null +++ b/src/model_signing/_oci/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2025 The Sigstore Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/model_signing/_oci/attachment.py b/src/model_signing/_oci/attachment.py new file mode 100644 index 00000000..248caca0 --- /dev/null +++ b/src/model_signing/_oci/attachment.py @@ -0,0 +1,195 @@ +# Copyright 2025 The Sigstore Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Signature attachment strategies for OCI registries. + +Provides two modes for attaching signatures to images: +1. Referrers API (OCI 1.1) - Creates artifact referencing the signed image +2. Tag-based - Uses a tag derived from the image digest (legacy/fallback) +""" + +from __future__ import annotations + +import enum +import json +from typing import TYPE_CHECKING + +from model_signing._oci import registry as oci_registry + + +if TYPE_CHECKING: + from model_signing._oci.registry import ImageReference + from model_signing._oci.registry import OrasClient + + +def _is_matching_signature(sig_bytes: bytes, signature_type: str) -> bool: + """Check if signature bundle matches the expected signature type. + + Args: + sig_bytes: The signature bundle bytes (JSON-encoded). + signature_type: Expected type - "sigstore" for certificate-based, + or any other value for public key-based signatures. + + Returns: + True if the signature bundle contains the expected verification + material type, False otherwise. + """ + key = "certificate" if signature_type == "sigstore" else "publicKey" + try: + bundle = json.loads(sig_bytes) + return key in bundle.get("verificationMaterial", {}) + except (json.JSONDecodeError, UnicodeDecodeError): + return False + + +class AttachmentMode(enum.Enum): + """Signature attachment mode.""" + + REFERRERS = "referrers" + TAG = "tag" + + +class ReferrersAttachment: + """Signature attachment using OCI 1.1 Referrers API.""" + + def attach( + self, + client: OrasClient, + image_ref: ImageReference, + signature_bundle: bytes, + image_digest: str, + subject_manifest_size: int = 0, + ) -> str: + return client.push_signature( + image_ref, + signature_bundle, + subject_digest=image_digest, + subject_size=subject_manifest_size, + ) + + def fetch( + self, + client: OrasClient, + image_ref: ImageReference, + image_digest: str, + signature_type: str = "sigstore", + ) -> bytes | None: + referrers = client.get_referrers( + image_ref.with_digest(image_digest), + artifact_type=oci_registry.MODEL_SIGNING_ARTIFACT_TYPE, + ) + if not referrers: + return None + + for sig_ref in reversed(referrers): + sig_digest = sig_ref.get("digest") + if not sig_digest: + continue + sig_bytes = self._fetch_layer(client, image_ref, sig_digest) + if sig_bytes and _is_matching_signature(sig_bytes, signature_type): + return sig_bytes + return None + + def _fetch_layer( + self, client: OrasClient, image_ref: ImageReference, sig_digest: str + ) -> bytes | None: + """Fetch first layer blob from a signature artifact manifest.""" + try: + manifest, _ = client.get_manifest(image_ref.with_digest(sig_digest)) + except Exception: + return None + + layers = manifest.get("layers", []) + if not layers or not layers[0].get("digest"): + return None + + try: + return client.pull_blob(image_ref, layers[0]["digest"]) + except Exception: + return None + + +class TagAttachment: + """Signature attachment using tag-based convention (sha256-xxx.sig).""" + + def _digest_to_tag(self, digest: str) -> str: + return digest.replace(":", "-") + ".sig" + + def attach( + self, + client: OrasClient, + image_ref: ImageReference, + signature_bundle: bytes, + image_digest: str, + subject_manifest_size: int = 0, # noqa: ARG002 + ) -> str: + sig_tag = self._digest_to_tag(image_digest) + return client.push_signature_tag(image_ref, signature_bundle, sig_tag) + + def fetch( + self, + client: OrasClient, + image_ref: ImageReference, + image_digest: str, + signature_type: str = "sigstore", + ) -> bytes | None: + sig_tag = self._digest_to_tag(image_digest) + try: + manifest, _ = client.get_manifest(image_ref.with_tag(sig_tag)) + except Exception: + return None + + layers = manifest.get("layers", []) + if not layers or not layers[0].get("digest"): + return None + + try: + sig_bytes = client.pull_blob(image_ref, layers[0]["digest"]) + except Exception: + return None + + if not sig_bytes: + return None + if _is_matching_signature(sig_bytes, signature_type): + return sig_bytes + return None + + +def get_attachment_strategy( + mode: AttachmentMode, +) -> ReferrersAttachment | TagAttachment: + """Get the attachment strategy for the given mode.""" + if mode == AttachmentMode.REFERRERS: + return ReferrersAttachment() + return TagAttachment() + + +def try_fetch_signature( + client: OrasClient, + image_ref: ImageReference, + image_digest: str, + signature_type: str = "sigstore", +) -> tuple[bytes, AttachmentMode] | None: + """Try to fetch a signature using referrers first, then tag-based.""" + ref_strategy = ReferrersAttachment() + sig = ref_strategy.fetch(client, image_ref, image_digest, signature_type) + if sig: + return sig, AttachmentMode.REFERRERS + + tag_strategy = TagAttachment() + sig = tag_strategy.fetch(client, image_ref, image_digest, signature_type) + if sig: + return sig, AttachmentMode.TAG + + return None diff --git a/src/model_signing/_oci/registry.py b/src/model_signing/_oci/registry.py new file mode 100644 index 00000000..1af62534 --- /dev/null +++ b/src/model_signing/_oci/registry.py @@ -0,0 +1,388 @@ +# Copyright 2025 The Sigstore Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OCI registry client using oras-py for authentication.""" + +from __future__ import annotations + +from dataclasses import dataclass +from dataclasses import field +from dataclasses import replace +import hashlib +import json +import re +from typing import Any + +import oras.provider +import requests + + +# OCI Distribution Spec media types +OCI_MANIFEST_MEDIA_TYPE = "application/vnd.oci.image.manifest.v1+json" +OCI_INDEX_MEDIA_TYPE = "application/vnd.oci.image.index.v1+json" +OCI_CONFIG_MEDIA_TYPE = "application/vnd.oci.image.config.v1+json" + +# Media types for model signing signature artifacts +MODEL_SIGNING_ARTIFACT_TYPE = "application/vnd.model-signing.signature.v0.1" +MODEL_SIGNING_CONFIG_MEDIA_TYPE = ( + "application/vnd.model-signing.signature.v0.1.config+json" +) +MODEL_SIGNING_LAYER_MEDIA_TYPE = "application/vnd.dev.sigstore.bundle.v0.3+json" + + +@dataclass +class Descriptor: + """OCI content descriptor. + + See: https://github.com/opencontainers/image-spec/blob/main/descriptor.md + + Attributes: + media_type: The media type of the referenced content. + digest: The digest of the referenced content. + size: The size in bytes of the referenced content. + annotations: Optional arbitrary metadata. + """ + + media_type: str + digest: str + size: int + annotations: dict[str, str] | None = None + + def to_dict(self) -> dict[str, Any]: + """Convert to a JSON-serializable dictionary.""" + result: dict[str, Any] = { + "mediaType": self.media_type, + "digest": self.digest, + "size": self.size, + } + if self.annotations: + result["annotations"] = self.annotations + return result + + +@dataclass +class OCIManifest: + """OCI image manifest. + + See: https://github.com/opencontainers/image-spec/blob/main/manifest.md + + Attributes: + config: The config descriptor. + layers: List of layer descriptors. + artifact_type: Optional artifact type for OCI 1.1 artifacts. + subject: Optional subject descriptor for OCI 1.1 referrers. + annotations: Optional arbitrary metadata. + """ + + config: Descriptor + layers: list[Descriptor] = field(default_factory=list) + artifact_type: str | None = None + subject: Descriptor | None = None + annotations: dict[str, str] | None = None + + def to_dict(self) -> dict[str, Any]: + """Convert to a JSON-serializable dictionary.""" + result: dict[str, Any] = { + "schemaVersion": 2, + "mediaType": OCI_MANIFEST_MEDIA_TYPE, + "config": self.config.to_dict(), + "layers": [layer.to_dict() for layer in self.layers], + } + if self.artifact_type: + result["artifactType"] = self.artifact_type + if self.subject: + result["subject"] = self.subject.to_dict() + if self.annotations: + result["annotations"] = self.annotations + return result + + def compute_digest(self) -> str: + """Calculate the sha256 digest of this manifest.""" + content = json.dumps(self.to_dict(), separators=(",", ":")).encode() + return f"sha256:{hashlib.sha256(content).hexdigest()}" + + +@dataclass +class ImageReference: + """Parsed OCI image reference. + + Format: registry/repository:tag or registry/repository@sha256:digest + """ + + registry: str + repository: str + tag: str | None + digest: str | None + + @classmethod + def parse(cls, reference: str) -> ImageReference: + """Parse an image reference string.""" + if "/" not in reference: + raise ValueError(f"Invalid reference '{reference}': missing /") + + digest = None + if "@" in reference: + reference, digest = reference.rsplit("@", 1) + if not re.match(r"^sha256:[a-f0-9]{64}$", digest): + raise ValueError(f"Invalid digest format: {digest}") + + tag = None + if ":" in reference and not digest: + parts = reference.rsplit(":", 1) + if "/" not in parts[1]: + reference, tag = parts + + parts = reference.split("/", 1) + if len(parts) != 2 or not parts[1]: + raise ValueError(f"Invalid image reference '{reference}'") + + registry, repository = parts[0], parts[1] + + if not tag and not digest: + raise ValueError( + f"Image reference must have :tag or @digest: {reference}" + ) + + return cls(registry, repository, tag, digest) + + def __str__(self) -> str: + result = f"{self.registry}/{self.repository}" + if self.digest: + result += f"@{self.digest}" + elif self.tag: + result += f":{self.tag}" + return result + + @property + def reference(self) -> str: + if self.digest: + return self.digest + return self.tag or "latest" + + def with_digest(self, digest: str) -> ImageReference: + return replace(self, tag=None, digest=digest) + + def with_tag(self, tag: str) -> ImageReference: + return replace(self, tag=tag, digest=None) + + +class OrasClient: + """OCI registry client using oras-py for authentication.""" + + def __init__(self, *, insecure: bool = False, tls_verify: bool = True): + self._insecure = insecure + self._tls_verify = tls_verify + self._registry_cache: dict[str, oras.provider.Registry] = {} + + def _auth_registry( + self, image_ref: ImageReference + ) -> oras.provider.Registry: + """Get an authenticated oras Registry instance. + + Caches authenticated registries by hostname to avoid repeated + authentication overhead when performing multiple operations. + """ + hostname = image_ref.registry + if hostname in self._registry_cache: + return self._registry_cache[hostname] + + reg = oras.provider.Registry( + hostname=hostname, + insecure=self._insecure, + tls_verify=self._tls_verify, + ) + reg.auth.load_configs(reg.get_container(str(image_ref))) + return reg + + def _base_url(self, image_ref: ImageReference) -> str: + """Get the base URL for a registry.""" + registry = image_ref.registry + if registry in ("docker.io", "index.docker.io"): + registry = "registry-1.docker.io" + return f"{'http' if self._insecure else 'https'}://{registry}" + + def get_manifest( + self, image_ref: ImageReference + ) -> tuple[dict[str, Any], str]: + """Get a manifest from the registry.""" + reg = self._auth_registry(image_ref) + manifest = reg.get_manifest(str(image_ref)) + manifest_bytes = json.dumps(manifest, separators=(",", ":")).encode() + digest = f"sha256:{hashlib.sha256(manifest_bytes).hexdigest()}" + return manifest, digest + + def resolve_digest(self, image_ref: ImageReference) -> str: + """Resolve an image reference to its digest.""" + if image_ref.digest: + return image_ref.digest + _, digest = self.get_manifest(image_ref) + return digest + + def push_blob( + self, image_ref: ImageReference, blob_bytes: bytes, media_type: str + ) -> str: + """Push a blob to the registry.""" + digest = f"sha256:{hashlib.sha256(blob_bytes).hexdigest()}" + base_url = self._base_url(image_ref) + reg = self._auth_registry(image_ref) + + check_url = f"{base_url}/v2/{image_ref.repository}/blobs/{digest}" + try: + if reg.do_request(check_url, "HEAD").status_code == 200: + return digest + except requests.HTTPError: + pass + + upload_url = f"{base_url}/v2/{image_ref.repository}/blobs/uploads/" + reg = self._auth_registry(image_ref) + response = reg.do_request(upload_url, "POST") + location = response.headers.get("Location") + if not location: + raise ValueError("Registry did not return upload location") + if location.startswith("/"): + location = f"{base_url}{location}" + sep = "&" if "?" in location else "?" + location = f"{location}{sep}digest={digest}" + + headers = {"Content-Type": media_type} + reg.do_request(location, "PUT", data=blob_bytes, headers=headers) + return digest + + def push_manifest( + self, + image_ref: ImageReference, + manifest: dict[str, Any] | OCIManifest, + media_type: str = OCI_MANIFEST_MEDIA_TYPE, + ) -> str: + """Push a manifest to the registry.""" + if isinstance(manifest, OCIManifest): + manifest = manifest.to_dict() + manifest_bytes = json.dumps(manifest, separators=(",", ":")).encode() + digest = f"sha256:{hashlib.sha256(manifest_bytes).hexdigest()}" + base = self._base_url(image_ref) + repo = image_ref.repository + url = f"{base}/v2/{repo}/manifests/{image_ref.reference}" + headers = {"Content-Type": media_type} + self._auth_registry(image_ref).do_request( + url, "PUT", data=manifest_bytes, headers=headers + ) + return digest + + def push_signature( + self, + image_ref: ImageReference, + signature_bytes: bytes, + subject_digest: str, + subject_size: int, + ) -> str: + """Push a signature using OCI 1.1 Referrers API.""" + layer_digest = self.push_blob( + image_ref, signature_bytes, MODEL_SIGNING_LAYER_MEDIA_TYPE + ) + + config_bytes = b"{}" + config_digest = self.push_blob( + image_ref, config_bytes, MODEL_SIGNING_CONFIG_MEDIA_TYPE + ) + + manifest = OCIManifest( + artifact_type=MODEL_SIGNING_ARTIFACT_TYPE, + config=Descriptor( + media_type=MODEL_SIGNING_CONFIG_MEDIA_TYPE, + digest=config_digest, + size=len(config_bytes), + ), + layers=[ + Descriptor( + media_type=MODEL_SIGNING_LAYER_MEDIA_TYPE, + digest=layer_digest, + size=len(signature_bytes), + ) + ], + subject=Descriptor( + media_type=OCI_MANIFEST_MEDIA_TYPE, + digest=subject_digest, + size=subject_size, + ), + ) + + return self.push_manifest( + image_ref.with_digest(manifest.compute_digest()), manifest + ) + + def push_signature_tag( + self, image_ref: ImageReference, signature_bytes: bytes, tag: str + ) -> str: + """Push a signature with a specific tag.""" + layer_digest = self.push_blob( + image_ref, signature_bytes, MODEL_SIGNING_LAYER_MEDIA_TYPE + ) + + config_bytes = b"{}" + config_digest = self.push_blob( + image_ref, config_bytes, OCI_CONFIG_MEDIA_TYPE + ) + + manifest = OCIManifest( + config=Descriptor( + media_type=OCI_CONFIG_MEDIA_TYPE, + digest=config_digest, + size=len(config_bytes), + ), + layers=[ + Descriptor( + media_type=MODEL_SIGNING_LAYER_MEDIA_TYPE, + digest=layer_digest, + size=len(signature_bytes), + ) + ], + annotations={ + "dev.sigstore.model-signing.artifact-type": ( + MODEL_SIGNING_ARTIFACT_TYPE + ) + }, + ) + + return self.push_manifest(image_ref.with_tag(tag), manifest) + + def get_referrers( + self, image_ref: ImageReference, artifact_type: str | None = None + ) -> list[dict[str, Any]]: + """Get referrers for an image (OCI 1.1).""" + digest = image_ref.digest or self.resolve_digest(image_ref) + base = self._base_url(image_ref) + url = f"{base}/v2/{image_ref.repository}/referrers/{digest}" + try: + response = self._auth_registry(image_ref).do_request( + url, "GET", headers={"Accept": OCI_INDEX_MEDIA_TYPE} + ) + if response.status_code != 200: + return [] + manifests = response.json().get("manifests", []) + if artifact_type: + manifests = [ + m + for m in manifests + if m.get("artifactType") == artifact_type + ] + return manifests + except requests.HTTPError as e: + if e.response is not None and e.response.status_code == 404: + return [] + raise + + def pull_blob(self, image_ref: ImageReference, digest: str) -> bytes: + """Pull a blob from the registry.""" + reg = self._auth_registry(image_ref) + return reg.get_blob(str(image_ref), digest).content diff --git a/src/model_signing/_signing/sign_sigstore.py b/src/model_signing/_signing/sign_sigstore.py index df2390b7..e1e6d359 100644 --- a/src/model_signing/_signing/sign_sigstore.py +++ b/src/model_signing/_signing/sign_sigstore.py @@ -128,7 +128,7 @@ def __init__( if not oidc_issuer: oidc_issuer = trust_config.signing_config.get_oidc_url() - self._issuer = sigstore_oidc.Issuer(oidc_issuer) + self._oidc_issuer = oidc_issuer self._signing_context = ( sigstore_signer.SigningContext.from_trust_config(trust_config) ) @@ -155,7 +155,8 @@ def _get_identity_token(self) -> sigstore_oidc.IdentityToken: if token: return sigstore_oidc.IdentityToken(token, self._client_id) - return self._issuer.identity_token( + issuer = sigstore_oidc.Issuer(self._oidc_issuer) + return issuer.identity_token( force_oob=self._force_oob, client_id=self._client_id, client_secret=self._client_secret, diff --git a/src/model_signing/hashing.py b/src/model_signing/hashing.py index b5cdc5b6..b908a9cc 100644 --- a/src/model_signing/hashing.py +++ b/src/model_signing/hashing.py @@ -128,36 +128,52 @@ def create_manifest_from_oci_layers( model_name: str | None = None, include_config: bool = True, ) -> manifest.Manifest: - """Create a manifest from an OCI image manifest. + """Convert an OCI image manifest into a model signing manifest. - This function extracts layer digests from an OCI image manifest (as returned - by `skopeo inspect --raw`) and creates a model signing manifest. Each layer - is treated as a file entry in the manifest. + This function takes an OCI image manifest (the registry artifact descriptor + containing layer references) and converts it into a model signing manifest + (our internal representation of file paths and their digests). + + For ORAS-style artifacts, file paths are extracted from the + `org.opencontainers.image.title` annotation on each layer. For standard + OCI images, layers are named generically as `layer_000.tar.gz`, etc. Args: oci_manifest: The OCI image manifest as a dictionary (from JSON). - Expected to have "layers" array with "digest" fields, and optionally - a "config" field with a "digest". + This is the artifact manifest from the registry containing layer + descriptors with digests. Expected to have "layers" array with + "digest" fields, and optionally a "config" field. model_name: Optional name for the model. If not provided, will attempt to extract from annotations or use "oci-image". include_config: Whether to include the config blob digest as a file entry. Default is True. Returns: - A Manifest object ready for signing. + A model signing Manifest containing file/layer paths mapped to their + SHA256 digests, ready for signing or comparison. Raises: - ValueError: If the OCI manifest structure is invalid or missing required - fields. + ValueError: If the OCI image manifest structure is invalid or missing + required fields. """ if "layers" not in oci_manifest: raise ValueError("OCI manifest missing 'layers' field") manifest_items = [] + # Collect layer paths first to detect conflicts with config + layer_paths = set() + for layer in oci_manifest["layers"]: + if "annotations" in layer: + title = layer["annotations"].get("org.opencontainers.image.title") + if title: + layer_paths.add(title) + + # Only include OCI config if it won't conflict with a layer named + # config.json (ORAS artifacts have placeholder config, files are in layers) if include_config and "config" in oci_manifest: config = oci_manifest["config"] - if "digest" in config: + if "digest" in config and "config.json" not in layer_paths: config_digest = parse_digest_string(config["digest"]) config_path = pathlib.PurePosixPath("config.json") manifest_items.append( diff --git a/src/model_signing/signing.py b/src/model_signing/signing.py index f33f50c2..9aa5ad1f 100644 --- a/src/model_signing/signing.py +++ b/src/model_signing/signing.py @@ -39,15 +39,71 @@ signing_config.sign(model, f"{model}_sharded.sig") ``` +## OCI Image Signing + +The module supports signing OCI container images directly in registries. + +**Note:** OCI image signing currently supports Sigstore and elliptic key signing +only. Certificate-based and PKCS#11 signing are not yet supported for images. + +```python +# Sign an image with Sigstore (opens OIDC browser flow) +sig_digest = ( + model_signing.signing.Config() + .use_sigstore_signer() + .sign_image("quay.io/user/model:latest") +) + +# Sign with a private key +sig_digest = ( + model_signing.signing.Config() + .use_elliptic_key_signer(private_key="key.pem") + .sign_image("quay.io/user/model:latest") +) + +# Use tag-based attachment for registries without OCI 1.1 Referrers API +sig_digest = ( + model_signing.signing.Config() + .use_sigstore_signer() + .sign_image("quay.io/user/model:latest", attachment_mode="tag") +) + +# Write signature to file instead of attaching to registry +model_signing.signing.Config().use_sigstore_signer().sign_image( + "quay.io/user/model:latest", + signature_path=pathlib.Path("model.sig"), + attach=False, +) + +# Attach to registry AND write signature to file +sig_digest = ( + model_signing.signing.Config() + .use_sigstore_signer() + .sign_image( + "quay.io/user/model:latest", + signature_path=pathlib.Path("model.sig"), + attach=True, + ) +) +``` + +Registry authentication uses existing Docker/Podman credentials from +`~/.docker/config.json` or `${XDG_RUNTIME_DIR}/containers/auth.json`. + The API defined here is stable and backwards compatible. """ from collections.abc import Iterable +import json import pathlib import sys +import requests + from model_signing import hashing from model_signing import manifest +from model_signing._oci import attachment as oci_attachment +from model_signing._oci import registry as oci_registry from model_signing._signing import sign_certificate as certificate from model_signing._signing import sign_ec_key as ec_key from model_signing._signing import sign_sigstore as sigstore @@ -131,6 +187,158 @@ def sign_from_manifest( signature = self._signer.sign(payload) signature.write(pathlib.Path(signature_path)) + def sign_image( + self, + image_ref: str | oci_registry.ImageReference, + attachment_mode: str = "referrers", + signature_path: pathlib.Path | None = None, + attach: bool = True, + ) -> str | None: + """Sign an OCI image with flexible output options. + + Signing performs the following steps: + + 1. Fetch the OCI image manifest from the registry (the artifact + descriptor containing layer references and digests) + 2. Convert it into a model signing manifest (our internal format + mapping file paths to their SHA256 digests) + 3. Sign the model signing manifest, producing a signature bundle + 4. Optionally write the signature bundle to disk + 5. Optionally attach the signature bundle to the registry + + Note: + OCI image signing currently supports Sigstore and elliptic key + signing only. Use `use_sigstore_signer()` or + `use_elliptic_key_signer()` before calling this method. + Certificate-based and PKCS#11 signing are not yet supported. + + Args: + image_ref: OCI image reference as a string (e.g., + "quay.io/user/model:latest") or a parsed ImageReference object. + attachment_mode: How to attach the signature to the registry. + - "referrers" (default): Uses OCI 1.1 Referrers API. Falls back + to tag-based if the registry doesn't support OCI 1.1 artifacts. + - "tag": Uses tag-based attachment (sha256-DIGEST.sig) + signature_path: Optional path to write the signature bundle to disk. + If provided, the signature will be written to this file. + attach: Whether to attach the signature to the registry. Default is + True. If False, signature_path must be provided. + + Returns: + The digest of the attached signature artifact if attach=True, + otherwise None. + + Raises: + ValueError: If the image reference is invalid, attachment fails, + or attach=False without signature_path. + """ + if not self._signer: + raise ValueError( + "No signer configured. Call use_sigstore_signer(), " + "use_elliptic_key_signer(), or another signer method first." + ) + + if not attach and signature_path is None: + raise ValueError( + "Must specify signature_path when attach=False. " + "Either set attach=True to attach to registry, " + "or provide signature_path to write to disk." + ) + + if isinstance(image_ref, oci_registry.ImageReference): + parsed_ref = image_ref + else: + try: + parsed_ref = oci_registry.ImageReference.parse(image_ref) + except Exception as e: + raise ValueError( + f"Invalid image reference '{image_ref}': {e}" + ) from e + + client = oci_registry.OrasClient() + + try: + oci_manifest, image_digest = client.get_manifest(parsed_ref) + except requests.HTTPError as e: + if e.response is not None and e.response.status_code == 401: + raise ValueError( + f"Authentication failed for image '{image_ref}'. " + "Check your registry credentials in ~/.docker/config.json " + "or ${XDG_RUNTIME_DIR}/containers/auth.json." + ) from e + elif e.response is not None and e.response.status_code == 404: + raise ValueError( + f"Image not found: '{image_ref}'. " + "Verify the image exists and you have access." + ) from e + raise ValueError( + f"Failed to fetch manifest for '{image_ref}': {e}" + ) from e + + manifest_size = len(json.dumps(oci_manifest, separators=(",", ":"))) + + model_manifest = hashing.create_manifest_from_oci_layers( + oci_manifest, model_name=str(parsed_ref) + ) + + payload = signing.Payload(model_manifest) + signature = self._signer.sign(payload) + + signature_bytes = signature.bundle.to_json().encode("utf-8") + + if signature_path is not None: + signature_path.parent.mkdir(parents=True, exist_ok=True) + signature_path.write_bytes(signature_bytes) + + if not attach: + return None + + match attachment_mode.lower(): + case "referrers": + mode = oci_attachment.AttachmentMode.REFERRERS + case "tag": + mode = oci_attachment.AttachmentMode.TAG + case _: + raise ValueError( + f"Invalid attachment mode '{attachment_mode}'. " + "Must be 'referrers' or 'tag'." + ) + + strategy = oci_attachment.get_attachment_strategy(mode) + + try: + sig_digest = strategy.attach( + client, parsed_ref, signature_bytes, image_digest, manifest_size + ) + except requests.HTTPError as e: + if ( + mode == oci_attachment.AttachmentMode.REFERRERS + and e.response is not None + and e.response.status_code == 400 + ): + # Registry doesn't support OCI 1.1 artifacts, fall back to tags + fallback = oci_attachment.get_attachment_strategy( + oci_attachment.AttachmentMode.TAG + ) + sig_digest = fallback.attach( + client, + parsed_ref, + signature_bytes, + image_digest, + manifest_size, + ) + elif e.response is not None and e.response.status_code == 401: + raise ValueError( + f"Authentication failed when attaching signature to " + f"'{image_ref}'. Check your registry credentials." + ) from e + else: + raise ValueError( + f"Failed to attach signature to '{image_ref}': {e}" + ) from e + + return sig_digest + def set_hashing_config(self, hashing_config: hashing.Config) -> Self: """Sets the new configuration for hashing models. diff --git a/src/model_signing/verifying.py b/src/model_signing/verifying.py index 45df5a75..ff7470ec 100644 --- a/src/model_signing/verifying.py +++ b/src/model_signing/verifying.py @@ -26,7 +26,7 @@ The same verification configuration can be used to verify multiple models: ```python -verifying_config = model_signing.signing.Config().use_elliptic_key_verifier( +verifying_config = model_signing.verifying.Config().use_elliptic_key_verifier( public_key="key.pub" ) @@ -34,16 +34,49 @@ verifying_config.verify(model, f"{model}_sharded.sig") ``` +## OCI Image Verification + +The module supports verifying OCI container images signed in registries. + +**Note:** OCI image verification currently supports Sigstore and elliptic key +verification only. Certificate-based verification is not yet supported. + +```python +# Verify a Sigstore-signed image +model_signing.verifying.Config().use_sigstore_verifier( + identity="user@example.com", oidc_issuer="https://accounts.google.com" +).verify_image("quay.io/user/model:latest") + +# Verify a key-signed image +model_signing.verifying.Config().use_elliptic_key_verifier( + public_key="key.pub" +).verify_image("quay.io/user/model:latest") + +# Verify image AND check that local files match the signed layers +model_signing.verifying.Config().use_sigstore_verifier( + identity="user@example.com", oidc_issuer="https://accounts.google.com" +).verify_image( + "quay.io/user/model:latest", local_model_path="./downloaded-model" +) +``` + +Registry authentication uses existing Docker/Podman credentials from +`~/.docker/config.json` or `${XDG_RUNTIME_DIR}/containers/auth.json`. + The API defined here is stable and backwards compatible. """ from collections.abc import Iterable +import hashlib +import json import pathlib import sys from model_signing import hashing from model_signing import manifest from model_signing._hashing import hashing as _hashing +from model_signing._oci import attachment as oci_attachment +from model_signing._oci import registry as oci_registry from model_signing._signing import sign_certificate as certificate from model_signing._signing import sign_ec_key as ec_key from model_signing._signing import sign_sigstore as sigstore @@ -56,6 +89,50 @@ from typing_extensions import Self +def _format_verification_error( + missing: list[str], extra: list[str], mismatched: list[tuple[str, str, str]] +) -> str: + """Format verification errors into a readable message. + + Args: + missing: List of missing file paths. + extra: List of extra file paths not in signature. + mismatched: List of (path, expected_hash, actual_hash) tuples. + + Returns: + Formatted error message. + """ + sections = [] + + if missing: + items = [f" {f}" for f in missing[:5]] + if len(missing) > 5: + items.append(f" ... and {len(missing) - 5} more") + header = f" Missing files ({len(missing)}):" + sections.append(header + "\n" + "\n".join(items)) + + if extra: + items = [f" {f}" for f in extra[:5]] + if len(extra) > 5: + items.append(f" ... and {len(extra) - 5} more") + header = f" Extra files ({len(extra)}):" + sections.append(header + "\n" + "\n".join(items)) + + if mismatched: + items = [] + for path, expected, actual in mismatched[:5]: + exp = expected[:16] + "..." if len(expected) > 16 else expected + act = actual[:16] + "..." if len(actual) > 16 else actual + items.append(f" {path}: expected {exp}, got {act}") + if len(mismatched) > 5: + items.append(f" ... and {len(mismatched) - 5} more") + sections.append( + f" Hash mismatches ({len(mismatched)}):\n" + "\n".join(items) + ) + + return "\n".join(sections) + + class Config: """Configuration to use when verifying models against signatures. @@ -123,28 +200,34 @@ def verify( ) if actual_manifest != expected_manifest: - diff_message = self._get_manifest_diff( - actual_manifest, expected_manifest + raise ValueError( + self._get_manifest_diff(actual_manifest, expected_manifest) ) - raise ValueError(f"Signature mismatch: {diff_message}") + + _GIT_PATHS = frozenset([".git", ".gitattributes", ".github", ".gitignore"]) def _verify_oci_layers_from_files( - self, model_path: hashing.PathLike, expected_manifest: manifest.Manifest + self, + model_path: hashing.PathLike, + expected_manifest: manifest.Manifest, + ignore_git_paths: bool = True, ): - """Verify OCI layer-based signature against local files. + """Verify local files match the signed model signing manifest. - This verifies by matching file paths from the signature with local - files. If the signature was created from an OCI manifest with file - path annotations (e.g., org.opencontainers.image.title), it matches - files by path and compares their digests. + This compares local files against the model signing manifest extracted + from the signature bundle. For ORAS-style artifacts where layers have + file path annotations (org.opencontainers.image.title), it matches + files by path and compares their SHA256 digests. Args: - model_path: Path to local model directory - expected_manifest: Manifest extracted from signature (contains - layer digests) + model_path: Path to local model directory containing files to verify + expected_manifest: The model signing manifest extracted from the + signature bundle, containing expected file paths and digests + ignore_git_paths: Whether to ignore git-related files when checking + for extra files (default True) Raises: - ValueError: If local files don't match the OCI layer digests + ValueError: If local files don't match the expected digests """ model_path = pathlib.Path(model_path) @@ -155,8 +238,6 @@ def _verify_oci_layers_from_files( for rd in expected_manifest.resource_descriptors(): identifier = str(rd.identifier) - if identifier == "config.json": - continue is_generic_layer = identifier.startswith( "layer_" ) and identifier.endswith(".tar.gz") @@ -165,9 +246,8 @@ def _verify_oci_layers_from_files( expected_file_digests[identifier] = rd.digest if has_file_paths: - # ORAS-style: verify by matching individual files by path return self._verify_oci_files_by_path( - model_path, expected_file_digests + model_path, expected_file_digests, ignore_git_paths ) else: print( @@ -178,20 +258,30 @@ def _verify_oci_layers_from_files( ) sys.exit(1) + def _is_git_path(self, rel_path_str: str) -> bool: + """Check if a path is git-related.""" + parts = pathlib.PurePosixPath(rel_path_str).parts + return any(p in self._GIT_PATHS or p.startswith(".git") for p in parts) + def _verify_oci_files_by_path( self, model_path: pathlib.Path, expected_file_digests: dict[str, _hashing.Digest], + ignore_git_paths: bool = True, ): - """Verify OCI files by matching paths and computing file digests.""" - import hashlib - + """Verify local files match expected digests from signature bundle.""" missing_files = [] mismatched_files = [] + extra_files = [] verified_files = [] - for file_path_str, expected_digest in expected_file_digests.items(): - local_file_path = model_path / file_path_str + normalized_digests = { + p.replace("\\", "/"): d for p, d in expected_file_digests.items() + } + + for file_path_str, expected_digest in normalized_digests.items(): + path_parts = pathlib.PurePosixPath(file_path_str).parts + local_file_path = model_path.joinpath(*path_parts) if not local_file_path.exists(): missing_files.append(file_path_str) @@ -216,98 +306,181 @@ def _verify_oci_files_by_path( ) ) - if missing_files or mismatched_files: - error_parts = [] - if missing_files: - missing_list = ", ".join(missing_files[:5]) - more_text = ( - f" ... and {len(missing_files) - 5} more" - if len(missing_files) > 5 - else "" - ) - error_parts.append( - f"Missing files ({len(missing_files)}): " - f"{missing_list}{more_text}" - ) - if mismatched_files: - mismatches = [] - for path, expected, actual in mismatched_files[:3]: - mismatches.append( - f" {path}: expected {expected[:16]}..., " - f"got {actual[:16]}..." - ) - mismatch_text = "\n".join(mismatches) - more_mismatches = ( - f"\n ... and {len(mismatched_files) - 3} more" - if len(mismatched_files) > 3 - else "" - ) - error_parts.append( - f"Hash mismatches ({len(mismatched_files)}):\n" - f"{mismatch_text}{more_mismatches}" - ) + expected_paths = set(normalized_digests.keys()) + for local_file in model_path.rglob("*"): + if not local_file.is_file(): + continue + rel_path = local_file.relative_to(model_path) + rel_path_str = str(rel_path).replace("\\", "/") - error_msg = ( - "Verification failed:\n" - + "\n".join(error_parts) - + "\n\n" - + f"Successfully verified {len(verified_files)} file(s)." - ) - raise ValueError(error_msg) + if ignore_git_paths and self._is_git_path(rel_path_str): + continue - return + if rel_path_str not in expected_paths: + extra_files.append(rel_path_str) - def verify_from_oci_manifest( - self, - oci_manifest: dict, - signature_path: hashing.PathLike, - *, - include_config: bool = True, - ): - """Verifies that an OCI image manifest conforms to a signature. + if missing_files or mismatched_files or extra_files: + raise ValueError( + _format_verification_error( + missing=missing_files, + extra=sorted(extra_files), + mismatched=mismatched_files, + ) + ) - This method verifies a signature against an OCI image manifest without - requiring the actual model files. It extracts the expected manifest from - the signature and compares it with a manifest created from the OCI image - manifest. + def verify_image( + self, + image_ref: str | oci_registry.ImageReference, + local_model_path: hashing.PathLike | None = None, + attachment_mode: str | None = None, + ignore_git_paths: bool = True, + ) -> None: + """Verify an OCI image signature from the registry. + + Verification performs the following steps: + + 1. Fetch the signature bundle from the registry (attached to the image + via tag or referrers API) + 2. Cryptographically verify the signature bundle and extract the + expected model signing manifest (list of file/layer digests) + 3. Fetch the OCI image manifest from the registry (the actual artifact) + 4. Convert the OCI image manifest layers into a model signing manifest + 5. Compare the expected vs actual model signing manifests + 6. Optionally verify local files match the signed digests + + Note: + OCI image verification currently supports Sigstore and elliptic key + verification only. Use `use_sigstore_verifier()` or + `use_elliptic_key_verifier()` before calling this method. + Certificate-based verification is not yet supported for images. Args: - oci_manifest: The OCI image manifest as a dictionary (from JSON). - Expected to have "layers" array with "digest" fields, - and optionally a "config" field with a "digest". - signature_path: The path to the signature file. - include_config: Whether to include the config blob digest in the - comparison. Should match the value used during signing. - Default is True. + image_ref: OCI image reference as a string (e.g., + "quay.io/user/model:latest") or a parsed ImageReference object. + local_model_path: Optional path to local model files. If provided, + verification will also check that local files match the signed + layer digests (for ORAS-style images with file path annotations). + attachment_mode: Optional attachment mode to use for fetching the + signature. If None (default), tries both referrers and tag-based. + Use "tag" to force tag-based fetching when multiple signatures + exist (e.g., when verifying key-based signatures alongside + Sigstore signatures). + ignore_git_paths: Whether to ignore git-related files (.git/, + .gitattributes, .gitignore, .github/) when checking for extra + files in local_model_path. Default is True. Raises: - ValueError: No verifier has been configured, - the OCI manifest is invalid, or verification - fails. + ValueError: If no verifier configured, signature not found, or + verification fails. """ if self._verifier is None: raise ValueError("Attempting to verify with no configured verifier") + if isinstance(image_ref, oci_registry.ImageReference): + parsed_ref = image_ref + else: + parsed_ref = oci_registry.ImageReference.parse(image_ref) + + client = oci_registry.OrasClient() + + image_digest = client.resolve_digest(parsed_ref) + sig_type = "sigstore" if self._uses_sigstore else "key" + + if attachment_mode == "tag": + tag_strategy = oci_attachment.TagAttachment() + signature_bytes = tag_strategy.fetch( + client, parsed_ref, image_digest, sig_type + ) + if signature_bytes is None: + raise ValueError( + f"No tag-based signature found for image {image_ref}. " + "Ensure the image was signed with --attachment-mode tag." + ) + elif attachment_mode == "referrers": + ref_strategy = oci_attachment.ReferrersAttachment() + signature_bytes = ref_strategy.fetch( + client, parsed_ref, image_digest, sig_type + ) + if signature_bytes is None: + raise ValueError( + f"No referrers-based signature for image {image_ref}. " + "Ensure the image was signed with referrers attachment." + ) + else: + result = oci_attachment.try_fetch_signature( + client, parsed_ref, image_digest, sig_type + ) + if result is None: + raise ValueError( + f"No signature found for image {image_ref}. " + "Ensure the image has been signed and the signature is " + "attached to the registry." + ) + signature_bytes, _ = result + + try: + signature_json = signature_bytes.decode("utf-8") + except UnicodeDecodeError as e: + raise ValueError( + f"Failed to decode signature for image {image_ref}: " + f"signature data is not valid UTF-8. {e}" + ) from e + if self._uses_sigstore: - signature = sigstore.Signature.read(pathlib.Path(signature_path)) + from sigstore import models as sigstore_models + + try: + bundle = sigstore_models.Bundle.from_json(signature_json) + signature = sigstore.Signature(bundle) + except json.JSONDecodeError as e: + raise ValueError( + f"Failed to parse Sigstore signature for {image_ref}: " + f"invalid JSON. {e}" + ) from e + except Exception as e: + raise ValueError( + f"Failed to decode Sigstore signature for {image_ref}: " + f"{type(e).__name__}: {e}" + ) from e else: - signature = sigstore_pb.Signature.read(pathlib.Path(signature_path)) + from sigstore_models.bundle import v1 as bundle_pb + + try: + parsed_dict = json.loads(signature_json) + signature = sigstore_pb.Signature( + bundle_pb.Bundle.from_dict(parsed_dict) + ) + except json.JSONDecodeError as e: + raise ValueError( + f"Failed to parse signature for image {image_ref}: " + f"invalid JSON. {e}" + ) from e + except Exception as e: + raise ValueError( + f"Failed to decode signature for image {image_ref}: " + f"{type(e).__name__}: {e}" + ) from e expected_manifest = self._verifier.verify(signature) + ref_with_digest = parsed_ref.with_digest(image_digest) + oci_manifest, _ = client.get_manifest(ref_with_digest) + actual_manifest = hashing.create_manifest_from_oci_layers( - oci_manifest, include_config=include_config + oci_manifest, model_name=str(parsed_ref) ) if actual_manifest != expected_manifest: - diff_message = self._get_manifest_diff( - actual_manifest, expected_manifest + raise ValueError( + self._get_manifest_diff(actual_manifest, expected_manifest) ) - raise ValueError(f"Signature mismatch: {diff_message}") - def _get_manifest_diff(self, actual, expected) -> list[str]: - diffs = [] + if local_model_path is not None: + self._verify_oci_layers_from_files( + local_model_path, expected_manifest, ignore_git_paths + ) + def _get_manifest_diff(self, actual, expected) -> str: actual_hashes = { rd.identifier: rd.digest for rd in actual.resource_descriptors() } @@ -315,33 +488,29 @@ def _get_manifest_diff(self, actual, expected) -> list[str]: rd.identifier: rd.digest for rd in expected.resource_descriptors() } - extra_actual_files = set(actual_hashes.keys()) - set( - expected_hashes.keys() + extra = sorted(set(actual_hashes.keys()) - set(expected_hashes.keys())) + missing = sorted( + set(expected_hashes.keys()) - set(actual_hashes.keys()) ) - if extra_actual_files: - diffs.append( - f"Extra files found in model '{actual.model_name}': " - f"{', '.join(sorted(extra_actual_files))}" - ) - - missing_actual_files = set(expected_hashes.keys()) - set( - actual_hashes.keys() - ) - if missing_actual_files: - diffs.append( - f"Missing files in model '{actual.model_name}': " - f"{', '.join(sorted(missing_actual_files))}" - ) - common_files = set(actual_hashes.keys()) & set(expected_hashes.keys()) - for identifier in sorted(common_files): + mismatched = [] + for identifier in sorted( + set(actual_hashes.keys()) & set(expected_hashes.keys()) + ): if actual_hashes[identifier] != expected_hashes[identifier]: - diffs.append( - f"Hash mismatch for '{identifier}': " - f"Expected '{expected_hashes[identifier]}', " - f"Actual '{actual_hashes[identifier]}'" + mismatched.append( + ( + str(identifier), + str(expected_hashes[identifier]), + str(actual_hashes[identifier]), + ) ) - return diffs + + return _format_verification_error( + missing=[str(m) for m in missing], + extra=[str(e) for e in extra], + mismatched=mismatched, + ) def set_hashing_config(self, hashing_config: hashing.Config) -> Self: """Sets the new configuration for hashing models. diff --git a/tests/_oci/__init__.py b/tests/_oci/__init__.py new file mode 100644 index 00000000..17d5d0de --- /dev/null +++ b/tests/_oci/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2025 The Sigstore Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/tests/_oci/attachment_test.py b/tests/_oci/attachment_test.py new file mode 100644 index 00000000..352f4d28 --- /dev/null +++ b/tests/_oci/attachment_test.py @@ -0,0 +1,442 @@ +# Copyright 2025 The Sigstore Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for OCI signature attachment strategies.""" + +import hashlib +import json + +from model_signing._oci import attachment +from model_signing._oci import registry + + +class TestGetAttachmentStrategy: + def test_get_referrers_strategy(self): + strategy = attachment.get_attachment_strategy( + attachment.AttachmentMode.REFERRERS + ) + assert isinstance(strategy, attachment.ReferrersAttachment) + + def test_get_tag_strategy(self): + strategy = attachment.get_attachment_strategy( + attachment.AttachmentMode.TAG + ) + assert isinstance(strategy, attachment.TagAttachment) + + +class TestTagAttachment: + def test_digest_to_tag(self): + strategy = attachment.TagAttachment() + tag = strategy._digest_to_tag("sha256:abc123def456") + assert tag == "sha256-abc123def456.sig" + + +class MockOrasClient(registry.OrasClient): + def __init__(self): + self.blobs: dict[str, bytes] = {} + self.manifests: dict[str, dict] = {} + self.referrers: dict[str, list[dict]] = {} + self.unavailable_blobs: set[str] = set() + + def push_signature( + self, + image_ref: registry.ImageReference, + signature_bytes: bytes, + subject_digest: str, + subject_size: int, + ) -> str: + sig_digest = f"sha256:{hashlib.sha256(signature_bytes).hexdigest()}" + self.blobs[sig_digest] = signature_bytes + if subject_digest not in self.referrers: + self.referrers[subject_digest] = [] + self.referrers[subject_digest].append( + { + "digest": sig_digest, + "artifactType": registry.MODEL_SIGNING_ARTIFACT_TYPE, + } + ) + manifest = { + "layers": [{"digest": sig_digest}], + "subject": {"digest": subject_digest, "size": subject_size}, + } + self.manifests[sig_digest] = manifest + return sig_digest + + def push_signature_tag( + self, + image_ref: registry.ImageReference, + signature_bytes: bytes, + tag: str, + ) -> str: + sig_digest = f"sha256:{hashlib.sha256(signature_bytes).hexdigest()}" + self.blobs[sig_digest] = signature_bytes + manifest = {"layers": [{"digest": sig_digest}]} + self.manifests[tag] = manifest + return sig_digest + + def get_manifest( + self, image_ref: registry.ImageReference + ) -> tuple[dict, str]: + key = image_ref.tag if image_ref.tag else image_ref.digest + if key in self.manifests: + manifest = self.manifests[key] + content = json.dumps(manifest, separators=(",", ":")).encode() + digest = f"sha256:{hashlib.sha256(content).hexdigest()}" + return manifest, digest + raise Exception(f"Manifest not found: {key}") + + def pull_blob( + self, image_ref: registry.ImageReference, digest: str + ) -> bytes: + if digest in self.unavailable_blobs: + raise Exception(f"Blob not available: {digest}") + if digest in self.blobs: + return self.blobs[digest] + raise Exception(f"Blob not found: {digest}") + + def get_referrers( + self, + image_ref: registry.ImageReference, + artifact_type: str | None = None, + ) -> list[dict]: + digest = image_ref.digest + refs = self.referrers.get(digest, []) + if artifact_type: + refs = [r for r in refs if r.get("artifactType") == artifact_type] + return refs + + +class TestReferrersAttachmentIntegration: + def test_attach_and_fetch(self): + client = MockOrasClient() + strategy = attachment.ReferrersAttachment() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:imageabc123" + signature_bundle = b'{"verificationMaterial": {"certificate": "x"}}' + + sig_digest = strategy.attach( + client, image_ref, signature_bundle, image_digest + ) + assert sig_digest.startswith("sha256:") + assert image_digest in client.referrers + assert len(client.referrers[image_digest]) == 1 + + fetched = strategy.fetch(client, image_ref, image_digest) + assert fetched == signature_bundle + + +class TestTagAttachmentIntegration: + def test_attach_and_fetch(self): + client = MockOrasClient() + strategy = attachment.TagAttachment() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:imageabc123" + signature_bundle = b'{"verificationMaterial": {"certificate": "x"}}' + + sig_digest = strategy.attach( + client, image_ref, signature_bundle, image_digest + ) + assert sig_digest.startswith("sha256:") + expected_tag = "sha256-imageabc123.sig" + assert expected_tag in client.manifests + + fetched = strategy.fetch(client, image_ref, image_digest) + assert fetched == signature_bundle + + def test_fetch_not_found_returns_none(self): + client = MockOrasClient() + strategy = attachment.TagAttachment() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + + fetched = strategy.fetch(client, image_ref, "sha256:nonexistent") + assert fetched is None + + +class TestTryFetchSignature: + def test_tries_referrers_first(self): + client = MockOrasClient() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:testimage" + signature_bundle = b'{"verificationMaterial": {"certificate": "x"}}' + + referrers_strategy = attachment.ReferrersAttachment() + referrers_strategy.attach( + client, image_ref, signature_bundle, image_digest + ) + + result = attachment.try_fetch_signature(client, image_ref, image_digest) + assert result is not None + sig_bytes, mode = result + assert sig_bytes == signature_bundle + assert mode == attachment.AttachmentMode.REFERRERS + + def test_falls_back_to_tag(self): + client = MockOrasClient() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:testimage" + signature_bundle = b'{"verificationMaterial": {"certificate": "x"}}' + + tag_strategy = attachment.TagAttachment() + tag_strategy.attach(client, image_ref, signature_bundle, image_digest) + + result = attachment.try_fetch_signature(client, image_ref, image_digest) + assert result is not None + sig_bytes, mode = result + assert sig_bytes == signature_bundle + assert mode == attachment.AttachmentMode.TAG + + def test_returns_none_when_not_found(self): + client = MockOrasClient() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + + result = attachment.try_fetch_signature( + client, image_ref, "sha256:nosig" + ) + assert result is None + + +class TestReferrersAttachmentWithSize: + def test_attach_with_subject_manifest_size(self): + client = MockOrasClient() + strategy = attachment.ReferrersAttachment() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:imageabc123" + signature_bundle = b'{"bundle": "data"}' + manifest_size = 1234 + + sig_digest = strategy.attach( + client, + image_ref, + signature_bundle, + image_digest, + subject_manifest_size=manifest_size, + ) + assert sig_digest.startswith("sha256:") + + referrer = client.referrers[image_digest][0] + ref_digest = referrer["digest"] + manifest = client.manifests[ref_digest] + assert manifest["subject"]["size"] == manifest_size + + +class TestReferrersFetchEdgeCases: + def test_fetch_skips_referrer_without_digest(self): + client = MockOrasClient() + strategy = attachment.ReferrersAttachment() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:imageabc123" + + client.referrers[image_digest] = [ + {"artifactType": registry.MODEL_SIGNING_ARTIFACT_TYPE} + ] + + result = strategy.fetch(client, image_ref, image_digest) + assert result is None + + def test_fetch_skips_manifest_without_layers(self): + client = MockOrasClient() + strategy = attachment.ReferrersAttachment() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:imageabc123" + + ref_digest = "sha256:ref123" + artifact_type = registry.MODEL_SIGNING_ARTIFACT_TYPE + client.referrers[image_digest] = [ + {"digest": ref_digest, "artifactType": artifact_type} + ] + client.manifests[ref_digest] = {"layers": []} + + result = strategy.fetch(client, image_ref, image_digest) + assert result is None + + def test_fetch_skips_layer_without_digest(self): + client = MockOrasClient() + strategy = attachment.ReferrersAttachment() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:imageabc123" + + ref_digest = "sha256:ref123" + artifact_type = registry.MODEL_SIGNING_ARTIFACT_TYPE + client.referrers[image_digest] = [ + {"digest": ref_digest, "artifactType": artifact_type} + ] + client.manifests[ref_digest] = {"layers": [{"mediaType": "test"}]} + + result = strategy.fetch(client, image_ref, image_digest) + assert result is None + + def test_fetch_skips_missing_blob(self): + client = MockOrasClient() + strategy = attachment.ReferrersAttachment() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:imageabc123" + + ref_digest = "sha256:ref123" + layer_digest = "sha256:nonexistent" + artifact_type = registry.MODEL_SIGNING_ARTIFACT_TYPE + client.referrers[image_digest] = [ + {"digest": ref_digest, "artifactType": artifact_type} + ] + client.manifests[ref_digest] = {"layers": [{"digest": layer_digest}]} + + result = strategy.fetch(client, image_ref, image_digest) + assert result is None + + def test_fetch_skips_missing_manifest(self): + client = MockOrasClient() + strategy = attachment.ReferrersAttachment() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:imageabc123" + + # Referrer points to a manifest that doesn't exist + ref_digest = "sha256:nonexistent_manifest" + artifact_type = registry.MODEL_SIGNING_ARTIFACT_TYPE + client.referrers[image_digest] = [ + {"digest": ref_digest, "artifactType": artifact_type} + ] + # Note: we don't add anything to client.manifests + + result = strategy.fetch(client, image_ref, image_digest) + assert result is None + + +class TestTagAttachmentFetchEdgeCases: + def test_fetch_returns_none_for_empty_layers(self): + client = MockOrasClient() + strategy = attachment.TagAttachment() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:imageabc123" + + sig_tag = "sha256-imageabc123.sig" + client.manifests[sig_tag] = {"layers": []} + + result = strategy.fetch(client, image_ref, image_digest) + assert result is None + + def test_fetch_returns_none_for_layer_without_digest(self): + client = MockOrasClient() + strategy = attachment.TagAttachment() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:imageabc123" + + sig_tag = "sha256-imageabc123.sig" + client.manifests[sig_tag] = {"layers": [{"mediaType": "test"}]} + + result = strategy.fetch(client, image_ref, image_digest) + assert result is None + + def test_fetch_returns_none_when_blob_fetch_fails(self): + client = MockOrasClient() + strategy = attachment.TagAttachment() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:imageabc123" + + sig_tag = "sha256-imageabc123.sig" + layer_digest = "sha256:layerdigest" + client.manifests[sig_tag] = {"layers": [{"digest": layer_digest}]} + client.unavailable_blobs.add(layer_digest) + + result = strategy.fetch(client, image_ref, image_digest) + assert result is None + + def test_fetch_returns_none_when_blob_is_empty(self): + client = MockOrasClient() + strategy = attachment.TagAttachment() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:imageabc123" + + sig_tag = "sha256-imageabc123.sig" + layer_digest = "sha256:layerdigest" + client.manifests[sig_tag] = {"layers": [{"digest": layer_digest}]} + client.blobs[layer_digest] = b"" + + result = strategy.fetch(client, image_ref, image_digest) + assert result is None + + def test_fetch_returns_none_for_wrong_signature_type(self): + client = MockOrasClient() + strategy = attachment.TagAttachment() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:imageabc123" + key_signature = b'{"verificationMaterial": {"publicKey": "xyz"}}' + + strategy.attach(client, image_ref, key_signature, image_digest) + + result = strategy.fetch( + client, image_ref, image_digest, signature_type="sigstore" + ) + assert result is None + + +class TestReferrersFetchInvalidJson: + def test_fetch_skips_invalid_json_blob(self): + client = MockOrasClient() + strategy = attachment.ReferrersAttachment() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:imageabc123" + + ref_digest = "sha256:ref123" + layer_digest = "sha256:layer123" + artifact_type = registry.MODEL_SIGNING_ARTIFACT_TYPE + client.referrers[image_digest] = [ + {"digest": ref_digest, "artifactType": artifact_type} + ] + client.manifests[ref_digest] = {"layers": [{"digest": layer_digest}]} + client.blobs[layer_digest] = b"not valid json {{{" + + result = strategy.fetch(client, image_ref, image_digest) + assert result is None + + def test_fetch_skips_non_utf8_blob(self): + client = MockOrasClient() + strategy = attachment.ReferrersAttachment() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:imageabc123" + + ref_digest = "sha256:ref123" + layer_digest = "sha256:layer123" + artifact_type = registry.MODEL_SIGNING_ARTIFACT_TYPE + client.referrers[image_digest] = [ + {"digest": ref_digest, "artifactType": artifact_type} + ] + client.manifests[ref_digest] = {"layers": [{"digest": layer_digest}]} + client.blobs[layer_digest] = b"\xff\xfe invalid utf8" + + result = strategy.fetch(client, image_ref, image_digest) + assert result is None + + def test_fetch_continues_to_valid_after_invalid(self): + client = MockOrasClient() + strategy = attachment.ReferrersAttachment() + image_ref = registry.ImageReference.parse("quay.io/user/model:latest") + image_digest = "sha256:imageabc123" + + ref1_digest = "sha256:ref1" + layer1_digest = "sha256:layer1" + ref2_digest = "sha256:ref2" + layer2_digest = "sha256:layer2" + valid_sig = b'{"verificationMaterial": {"certificate": "x"}}' + + artifact_type = registry.MODEL_SIGNING_ARTIFACT_TYPE + client.referrers[image_digest] = [ + {"digest": ref1_digest, "artifactType": artifact_type}, + {"digest": ref2_digest, "artifactType": artifact_type}, + ] + client.manifests[ref1_digest] = {"layers": [{"digest": layer1_digest}]} + client.manifests[ref2_digest] = {"layers": [{"digest": layer2_digest}]} + client.blobs[layer1_digest] = valid_sig + client.blobs[layer2_digest] = b"invalid json" + + result = strategy.fetch(client, image_ref, image_digest) + assert result == valid_sig diff --git a/tests/_oci/registry_test.py b/tests/_oci/registry_test.py new file mode 100644 index 00000000..01151b69 --- /dev/null +++ b/tests/_oci/registry_test.py @@ -0,0 +1,399 @@ +# Copyright 2025 The Sigstore Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for OCI registry client.""" + +from unittest import mock + +import pytest +import requests + +from model_signing._oci import registry +from model_signing._oci.registry import ImageReference +from model_signing._oci.registry import OrasClient + + +class TestImageReference: + def test_parse_full_reference_with_tag(self): + ref = ImageReference.parse("quay.io/user/model:latest") + assert ref.registry == "quay.io" + assert ref.repository == "user/model" + assert ref.tag == "latest" + assert ref.digest is None + + def test_parse_full_reference_with_digest(self): + digest = "sha256:" + "a" * 64 + ref = ImageReference.parse(f"ghcr.io/org/model@{digest}") + assert ref.registry == "ghcr.io" + assert ref.repository == "org/model" + assert ref.tag is None + assert ref.digest == digest + + def test_parse_nested_repository(self): + ref = ImageReference.parse("registry.example.com/org/team/model:v1.0") + assert ref.registry == "registry.example.com" + assert ref.repository == "org/team/model" + assert ref.tag == "v1.0" + + def test_parse_registry_with_port(self): + ref = ImageReference.parse("localhost:5000/mymodel:test") + assert ref.registry == "localhost:5000" + assert ref.repository == "mymodel" + assert ref.tag == "test" + + def test_parse_requires_slash(self): + with pytest.raises(ValueError, match="missing /"): + ImageReference.parse("ubuntu:latest") + + def test_parse_requires_tag_or_digest(self): + with pytest.raises(ValueError, match="must have :tag or @digest"): + ImageReference.parse("quay.io/user/model") + + def test_parse_invalid_digest_format(self): + with pytest.raises(ValueError, match="Invalid digest format"): + ImageReference.parse("quay.io/user/model@invalid-digest") + + def test_parse_invalid_short_digest(self): + with pytest.raises(ValueError, match="Invalid digest format"): + ImageReference.parse("quay.io/user/model@sha256:abc") + + def test_parse_empty_repository_raises(self): + with pytest.raises(ValueError, match="Invalid"): + ImageReference.parse("quay.io/:tag") + + def test_str_with_tag(self): + ref = ImageReference.parse("quay.io/user/model:v1") + assert str(ref) == "quay.io/user/model:v1" + + def test_str_with_digest(self): + digest = "sha256:" + "a" * 64 + ref = ImageReference.parse(f"quay.io/user/model@{digest}") + assert str(ref) == f"quay.io/user/model@{digest}" + + def test_reference_property_with_tag(self): + ref = ImageReference.parse("quay.io/user/model:v1") + assert ref.reference == "v1" + + def test_reference_property_with_digest(self): + digest = "sha256:" + "a" * 64 + ref = ImageReference.parse(f"quay.io/user/model@{digest}") + assert ref.reference == digest + + def test_with_digest(self): + ref = ImageReference.parse("quay.io/user/model:v1") + new_ref = ref.with_digest("sha256:newdigest") + assert new_ref.digest == "sha256:newdigest" + assert new_ref.tag is None + assert new_ref.registry == ref.registry + assert new_ref.repository == ref.repository + + def test_with_tag(self): + digest = "sha256:" + "a" * 64 + ref = ImageReference.parse(f"quay.io/user/model@{digest}") + new_ref = ref.with_tag("newtag") + assert new_ref.tag == "newtag" + assert new_ref.digest is None + assert new_ref.registry == ref.registry + assert new_ref.repository == ref.repository + + +class TestOrasClient: + @mock.patch("oras.provider.Registry") + def test_get_manifest(self, mock_registry_class): + mock_reg = mock.MagicMock() + mock_registry_class.return_value = mock_reg + mock_reg.get_manifest.return_value = {"schemaVersion": 2} + + client = OrasClient() + ref = ImageReference.parse("quay.io/user/model:latest") + manifest, digest = client.get_manifest(ref) + + assert manifest == {"schemaVersion": 2} + assert digest.startswith("sha256:") + mock_reg.auth.load_configs.assert_called_once() + + @mock.patch("oras.provider.Registry") + def test_resolve_digest_with_existing_digest(self, mock_registry_class): + client = OrasClient() + digest = "sha256:" + "a" * 64 + ref = ImageReference.parse(f"quay.io/user/model@{digest}") + + result = client.resolve_digest(ref) + assert result == digest + mock_registry_class.assert_not_called() + + @mock.patch("oras.provider.Registry") + def test_resolve_digest_fetches_manifest(self, mock_registry_class): + mock_reg = mock.MagicMock() + mock_registry_class.return_value = mock_reg + mock_reg.get_manifest.return_value = {"schemaVersion": 2} + + client = OrasClient() + ref = ImageReference.parse("quay.io/user/model:latest") + result = client.resolve_digest(ref) + + assert result.startswith("sha256:") + mock_reg.get_manifest.assert_called_once() + + @mock.patch("oras.provider.Registry") + def test_push_blob_already_exists(self, mock_registry_class): + mock_reg = mock.MagicMock() + mock_registry_class.return_value = mock_reg + mock_response = mock.MagicMock() + mock_response.status_code = 200 + mock_reg.do_request.return_value = mock_response + + client = OrasClient() + ref = ImageReference.parse("quay.io/user/model:latest") + digest = client.push_blob(ref, b"test data", "application/octet-stream") + + assert digest.startswith("sha256:") + mock_reg.do_request.assert_called_once() + + @mock.patch("oras.provider.Registry") + def test_push_blob_uploads_new(self, mock_registry_class): + mock_reg = mock.MagicMock() + mock_registry_class.return_value = mock_reg + head_response = mock.MagicMock() + head_response.status_code = 404 + post_response = mock.MagicMock() + post_response.headers = {"Location": "/upload/path?upload_id=123"} + put_response = mock.MagicMock() + mock_reg.do_request.side_effect = [ + requests.HTTPError(), + post_response, + put_response, + ] + + client = OrasClient() + ref = ImageReference.parse("quay.io/user/model:latest") + digest = client.push_blob(ref, b"test data", "application/octet-stream") + + assert digest.startswith("sha256:") + assert mock_reg.do_request.call_count == 3 + + @mock.patch("oras.provider.Registry") + def test_push_blob_no_location_raises(self, mock_registry_class): + mock_reg = mock.MagicMock() + mock_registry_class.return_value = mock_reg + post_response = mock.MagicMock() + post_response.headers = {} + mock_reg.do_request.side_effect = [requests.HTTPError(), post_response] + + client = OrasClient() + ref = ImageReference.parse("quay.io/user/model:latest") + + with pytest.raises(ValueError, match="upload location"): + client.push_blob(ref, b"test", "application/octet-stream") + + @mock.patch("oras.provider.Registry") + def test_push_manifest(self, mock_registry_class): + mock_reg = mock.MagicMock() + mock_registry_class.return_value = mock_reg + + client = OrasClient() + ref = ImageReference.parse("quay.io/user/model:latest") + digest = client.push_manifest(ref, {"schemaVersion": 2}) + + assert digest.startswith("sha256:") + mock_reg.do_request.assert_called_once() + + @mock.patch("oras.provider.Registry") + def test_push_signature(self, mock_registry_class): + mock_reg = mock.MagicMock() + mock_registry_class.return_value = mock_reg + mock_response = mock.MagicMock() + mock_response.status_code = 200 + mock_reg.do_request.return_value = mock_response + + client = OrasClient() + ref = ImageReference.parse("quay.io/user/model:latest") + digest = client.push_signature( + ref, b'{"sig": "data"}', "sha256:abc", 100 + ) + + assert digest.startswith("sha256:") + + @mock.patch("oras.provider.Registry") + def test_push_signature_tag(self, mock_registry_class): + mock_reg = mock.MagicMock() + mock_registry_class.return_value = mock_reg + mock_response = mock.MagicMock() + mock_response.status_code = 200 + mock_reg.do_request.return_value = mock_response + + client = OrasClient() + ref = ImageReference.parse("quay.io/user/model:latest") + digest = client.push_signature_tag(ref, b'{"sig": "data"}', "v1.sig") + + assert digest.startswith("sha256:") + + @mock.patch("oras.provider.Registry") + def test_get_referrers(self, mock_registry_class): + mock_reg = mock.MagicMock() + mock_registry_class.return_value = mock_reg + mock_response = mock.MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "manifests": [{"digest": "sha256:abc", "artifactType": "test"}] + } + mock_reg.do_request.return_value = mock_response + + client = OrasClient() + digest = "sha256:" + "a" * 64 + ref = ImageReference.parse(f"quay.io/user/model@{digest}") + result = client.get_referrers(ref) + + assert len(result) == 1 + assert result[0]["digest"] == "sha256:abc" + + @mock.patch("oras.provider.Registry") + def test_get_referrers_filters_by_type(self, mock_registry_class): + mock_reg = mock.MagicMock() + mock_registry_class.return_value = mock_reg + mock_response = mock.MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "manifests": [ + {"digest": "sha256:abc", "artifactType": "type1"}, + {"digest": "sha256:def", "artifactType": "type2"}, + ] + } + mock_reg.do_request.return_value = mock_response + + client = OrasClient() + digest = "sha256:" + "a" * 64 + ref = ImageReference.parse(f"quay.io/user/model@{digest}") + result = client.get_referrers(ref, artifact_type="type1") + + assert len(result) == 1 + assert result[0]["digest"] == "sha256:abc" + + @mock.patch("oras.provider.Registry") + def test_get_referrers_returns_empty_on_404(self, mock_registry_class): + mock_reg = mock.MagicMock() + mock_registry_class.return_value = mock_reg + error = requests.HTTPError() + error.response = mock.MagicMock() + error.response.status_code = 404 + mock_reg.do_request.side_effect = error + + client = OrasClient() + digest = "sha256:" + "a" * 64 + ref = ImageReference.parse(f"quay.io/user/model@{digest}") + result = client.get_referrers(ref) + + assert result == [] + + @mock.patch("oras.provider.Registry") + def test_get_referrers_returns_empty_on_non_200(self, mock_registry_class): + mock_reg = mock.MagicMock() + mock_registry_class.return_value = mock_reg + mock_response = mock.MagicMock() + mock_response.status_code = 500 + mock_reg.do_request.return_value = mock_response + + client = OrasClient() + digest = "sha256:" + "a" * 64 + ref = ImageReference.parse(f"quay.io/user/model@{digest}") + result = client.get_referrers(ref) + + assert result == [] + + @mock.patch("oras.provider.Registry") + def test_pull_blob(self, mock_registry_class): + mock_reg = mock.MagicMock() + mock_registry_class.return_value = mock_reg + mock_blob_response = mock.MagicMock() + mock_blob_response.content = b"blob data" + mock_reg.get_blob.return_value = mock_blob_response + + client = OrasClient() + ref = ImageReference.parse("quay.io/user/model:latest") + result = client.pull_blob(ref, "sha256:abc") + + assert result == b"blob data" + + def test_base_url_https(self): + client = OrasClient() + ref = ImageReference.parse("quay.io/user/model:latest") + url = client._base_url(ref) + assert url == "https://quay.io" + + def test_base_url_http_insecure(self): + client = OrasClient(insecure=True) + ref = ImageReference.parse("quay.io/user/model:latest") + url = client._base_url(ref) + assert url == "http://quay.io" + + def test_base_url_docker_hub(self): + client = OrasClient() + ref = ImageReference( + registry="docker.io", + repository="library/ubuntu", + tag="latest", + digest=None, + ) + url = client._base_url(ref) + assert url == "https://registry-1.docker.io" + + +class TestDescriptor: + def test_to_dict_with_annotations(self): + media_type = "application/vnd.oci.image.layer.v1.tar+gzip" + descriptor = registry.Descriptor( + media_type=media_type, + digest="sha256:abc123", + size=1024, + annotations={"org.opencontainers.image.title": "model.bin"}, + ) + result = descriptor.to_dict() + assert result["mediaType"] == media_type + assert result["digest"] == "sha256:abc123" + assert result["size"] == 1024 + assert result["annotations"] == { + "org.opencontainers.image.title": "model.bin" + } + + +class TestOCIManifestDigest: + def test_calculates_correct_digest(self): + manifest = registry.OCIManifest( + config=registry.Descriptor( + media_type="application/vnd.oci.image.config.v1+json", + digest="sha256:abc123", + size=2, + ) + ) + digest = manifest.compute_digest() + assert digest.startswith("sha256:") + assert len(digest) == 71 # "sha256:" + 64 hex chars + + +class TestOrasClientEdgeCases: + @mock.patch("oras.provider.Registry") + def test_get_referrers_raises_on_non_404_error(self, mock_registry_class): + mock_reg = mock.MagicMock() + mock_registry_class.return_value = mock_reg + error = requests.HTTPError() + error.response = mock.MagicMock() + error.response.status_code = 500 + mock_reg.do_request.side_effect = error + + client = OrasClient() + digest = "sha256:" + "a" * 64 + ref = ImageReference.parse(f"quay.io/user/model@{digest}") + + with pytest.raises(requests.HTTPError): + client.get_referrers(ref) diff --git a/tests/api_test.py b/tests/api_test.py index 280c5567..265dbc28 100644 --- a/tests/api_test.py +++ b/tests/api_test.py @@ -490,28 +490,6 @@ def _create_mock_oci_manifest_from_directory( class TestOCIManifestSigning: """Tests for signing and verifying with OCI manifests.""" - def test_sign_oci_manifest_verify_oci_manifest( - self, base_path, populate_tmpdir - ): - """Test signing from OCI manifest and verifying against OCI manifest.""" - os.chdir(base_path) - - model_path = populate_tmpdir - signature = Path(model_path / "model.sig") - private_key = Path(TESTDATA / "keys/certificate/signing-key.pem") - public_key = Path(TESTDATA / "keys/certificate/signing-key-pub.pem") - - oci_manifest = _create_mock_oci_manifest_from_directory(model_path) - - model_manifest = hashing.create_manifest_from_oci_layers(oci_manifest) - signing.Config().use_elliptic_key_signer( - private_key=private_key, password=None - ).sign_from_manifest(model_manifest, signature) - - verifying.Config().use_elliptic_key_verifier( - public_key=public_key - ).verify_from_oci_manifest(oci_manifest, signature) - def test_sign_oci_manifest_verify_local_files( self, base_path, populate_tmpdir ): @@ -542,44 +520,6 @@ def test_sign_oci_manifest_verify_local_files( ) ).verify(model_path, signature) - def test_sign_local_files_verify_oci_manifest( - self, base_path, populate_tmpdir - ): - """Test signing from local files and verifying against OCI manifest.""" - os.chdir(base_path) - - model_path = populate_tmpdir - signature = Path(model_path / "model.sig") - private_key = Path(TESTDATA / "keys/certificate/signing-key.pem") - public_key = Path(TESTDATA / "keys/certificate/signing-key-pub.pem") - - signing.Config().use_elliptic_key_signer( - private_key=private_key, password=None - ).set_hashing_config( - hashing.Config().set_ignored_paths( - paths=[signature], ignore_git_paths=False - ) - ).sign(model_path, signature) - - oci_manifest = _create_mock_oci_manifest_from_directory(model_path) - - verifier = verifying.Config().use_elliptic_key_verifier( - public_key=public_key - ) - - try: - verifier.verify_from_oci_manifest( - oci_manifest, signature, include_config=False - ) - except ValueError as e: - error_msg = str(e).lower() - assert ( - "mismatch" in error_msg - or "manifest" in error_msg - or "digest" in error_msg - or "signature" in error_msg - ) - def test_create_manifest_from_oci_layers_missing_layers(self): """Test that missing 'layers' field raises ValueError.""" invalid_manifest = {"schemaVersion": 2} @@ -592,39 +532,6 @@ def test_create_manifest_from_oci_layers_empty_layers(self): with pytest.raises(ValueError, match="No digests found"): hashing.create_manifest_from_oci_layers(manifest) - def test_verify_oci_manifest_mismatch_digest( - self, base_path, populate_tmpdir - ): - """Test verification fails when OCI manifest digests don't match.""" - os.chdir(base_path) - - model_path = populate_tmpdir - signature = Path(model_path / "model.sig") - private_key = Path(TESTDATA / "keys/certificate/signing-key.pem") - public_key = Path(TESTDATA / "keys/certificate/signing-key-pub.pem") - - oci_manifest1 = _create_mock_oci_manifest_from_directory( - model_path, include_config=False - ) - model_manifest = hashing.create_manifest_from_oci_layers( - oci_manifest1, include_config=False - ) - signing.Config().use_elliptic_key_signer( - private_key=private_key, password=None - ).sign_from_manifest(model_manifest, signature) - - oci_manifest2 = json.loads(json.dumps(oci_manifest1)) # Deep copy - oci_manifest2["layers"][0]["digest"] = ( - "sha256:0000000000000000000000000000000000000000000000000000000000000000" - ) - - with pytest.raises(ValueError, match="Signature mismatch"): - verifying.Config().use_elliptic_key_verifier( - public_key=public_key - ).verify_from_oci_manifest( - oci_manifest2, signature, include_config=False - ) - def test_verify_local_files_mismatch_oci_signature( self, base_path, populate_tmpdir ): @@ -658,39 +565,3 @@ def test_verify_local_files_mismatch_oci_signature( paths=[signature], ignore_git_paths=False ) ).verify(model_path, signature) - - def test_sign_oci_manifest_with_certificate( - self, base_path, populate_tmpdir - ): - """Test signing OCI manifest with certificate method.""" - os.chdir(base_path) - - model_path = populate_tmpdir - signature = Path(model_path / "model.sig") - private_key = Path(TESTDATA / "keys/certificate/signing-key.pem") - signing_certificate = Path( - TESTDATA / "keys/certificate/signing-key-cert.pem" - ) - certificate_chain = [ - Path(TESTDATA / "keys/certificate/int-ca-cert.pem") - ] - - oci_manifest = _create_mock_oci_manifest_from_directory( - model_path, include_config=False - ) - model_manifest = hashing.create_manifest_from_oci_layers( - oci_manifest, include_config=False - ) - - signing.Config().use_certificate_signer( - private_key=private_key, - signing_certificate=signing_certificate, - certificate_chain=certificate_chain, - ).sign_from_manifest(model_manifest, signature) - - certificate_chain = [Path(TESTDATA / "keys/certificate/ca-cert.pem")] - verifying.Config().use_certificate_verifier( - certificate_chain=certificate_chain - ).verify_from_oci_manifest( - oci_manifest, signature, include_config=False - )