diff --git a/action.yml b/action.yml index 96f3384..3084310 100644 --- a/action.yml +++ b/action.yml @@ -26,6 +26,10 @@ inputs: signing-key-file: description: 'The .pem file used to sign the statement' required: true + datatrails-url: + description: 'The fully url of the DataTrails SCITT Service' + required: false + default: 'https://app.datatrails.ai' runs: using: 'docker' @@ -38,3 +42,4 @@ runs: - ${{ inputs.transparent-statement-file }} - ${{ inputs.issuer }} - ${{ inputs.signing-key-file }} + - ${{ inputs.datatrails-url }} diff --git a/scitt-scripts/.gitignore b/scitt-scripts/.gitignore new file mode 100644 index 0000000..bee8a64 --- /dev/null +++ b/scitt-scripts/.gitignore @@ -0,0 +1 @@ +__pycache__ diff --git a/scitt-scripts/api_requests.py b/scitt-scripts/api_requests.py new file mode 100644 index 0000000..3c4ef5e --- /dev/null +++ b/scitt-scripts/api_requests.py @@ -0,0 +1,40 @@ +import os +import logging +import requests + +REQUEST_TIMEOUT = 30 + + +def get_app_auth_header(datatrails_url: str = "https://app.datatrails.ai") -> str: + """ + Get DataTrails bearer token from OIDC credentials in env + """ + # Pick up credentials from env + client_id = os.environ.get("DATATRAILS_CLIENT_ID") + client_secret = os.environ.get("DATATRAILS_CLIENT_SECRET") + + if client_id is None or client_secret is None: + raise ValueError( + "Please configure your DataTrails credentials in the shell environment" + ) + + # Get token from the auth endpoint + url = f"{datatrails_url}/archivist/iam/v1/appidp/token" + response = requests.post( + url, + data={ + "grant_type": "client_credentials", + "client_id": client_id, + "client_secret": client_secret, + }, + timeout=REQUEST_TIMEOUT, + ) + + if response.status_code != 200: + raise ValueError( + "FAILED to acquire bearer token %s, %s", response.text, response.reason + ) + + # Format as a request header + res = response.json() + return f'{res["token_type"]} {res["access_token"]}' diff --git a/scitt-scripts/check_operation_status.py b/scitt-scripts/check_operation_status.py index 2314d5a..9f1a29b 100755 --- a/scitt-scripts/check_operation_status.py +++ b/scitt-scripts/check_operation_status.py @@ -77,7 +77,7 @@ def poll_operation_status( except requests.HTTPError as e: logger.debug("failed getting operation status, error: %s", e) - + time_sleep(POLL_INTERVAL) raise TimeoutError("signed statement not registered within polling duration") @@ -137,5 +137,6 @@ def main(): print(e, file=sys.stderr) sys.exit(1) + if __name__ == "__main__": main() diff --git a/scitt-scripts/create_hashed_signed_statement.py b/scitt-scripts/create_hashed_signed_statement.py index 04c1705..3d07bad 100644 --- a/scitt-scripts/create_hashed_signed_statement.py +++ b/scitt-scripts/create_hashed_signed_statement.py @@ -42,7 +42,8 @@ # CBOR Object Signing and Encryption (COSE) "typ" (type) Header Parameter # https://datatracker.ietf.org/doc/rfc9596/ HEADER_LABEL_TYPE = 16 -COSE_TYPE="application/hashed+cose" +COSE_TYPE = "application/hashed+cose" + def open_signing_key(key_file: str) -> SigningKey: """ @@ -208,7 +209,7 @@ def main(): payload=payload_contents, payload_location=args.payload_location, signing_key=signing_key, - subject=args.subject + subject=args.subject, ) with open(args.output_file, "wb") as output_file: diff --git a/scitt-scripts/create_signed_statement.py b/scitt-scripts/create_signed_statement.py index 2d42a40..def8918 100755 --- a/scitt-scripts/create_signed_statement.py +++ b/scitt-scripts/create_signed_statement.py @@ -93,6 +93,8 @@ def create_signed_statement( HEADER_LABEL_CWT: { HEADER_LABEL_CWT_ISSUER: issuer, HEADER_LABEL_CWT_SUBJECT: subject, + # HEADER_LABEL_CWT_ISSUER: issuer.encode("ascii"), + # HEADER_LABEL_CWT_SUBJECT: subject.encode("ascii"), HEADER_LABEL_CWT_CNF: { HEADER_LABEL_CNF_COSE_KEY: { KpKty: KtyEC2, @@ -162,6 +164,8 @@ def main(): "--subject", type=str, help="subject to correlate statements made about an artifact.", + # a default of None breaks registration because registration does not allow nil issuer + default="scitt-subject", ) # issuer @@ -169,6 +173,8 @@ def main(): "--issuer", type=str, help="issuer who owns the signing key.", + # a default of None breaks registration because registration does not allow nil subject + default="scitt-issuer", ) # output file diff --git a/scitt-scripts/entrypoint.sh b/scitt-scripts/entrypoint.sh index 06fed04..656d593 100755 --- a/scitt-scripts/entrypoint.sh +++ b/scitt-scripts/entrypoint.sh @@ -9,6 +9,7 @@ SUBJECT=${4} TRANSPARENT_STATEMENT_FILE=${5} ISSUER=${6} SIGNING_KEY_FILE=${7} +DATATRAILS_URL=${8} SIGNED_STATEMENT_FILE="signed-statement.cbor" TOKEN_FILE="./bearer-token.txt" @@ -23,6 +24,7 @@ TOKEN_FILE="./bearer-token.txt" # echo "SIGNING_KEY_FILE: " ${SIGNING_KEY_FILE} # echo "SIGNED_STATEMENT_FILE: " ${SIGNED_STATEMENT_FILE} # echo "TOKEN_FILE: " ${TOKEN_FILE} +# echo "DATATRAILS_URL: " ${DATATRAILS_URL} if [ ! -f $PAYLOAD_FILE ]; then echo "ERROR: Payload File: [$PAYLOAD_FILE] Not found!" @@ -30,12 +32,12 @@ if [ ! -f $PAYLOAD_FILE ]; then fi # "Create an access token" -/scripts/create-token.sh $TOKEN_FILE +# /scripts/create-token.sh $TOKEN_FILE -if [ ! -f $TOKEN_FILE ]; then - echo "ERROR: Token File: [$TOKEN_FILE] Not found!" - exit 126 -fi +#if [ ! -f $TOKEN_FILE ]; then +# echo "ERROR: Token File: [$TOKEN_FILE] Not found!" +# exit 126 +#fi echo "Create a Signed Statement, hashing the payload" python /scripts/create_hashed_signed_statement.py \ @@ -52,13 +54,15 @@ if [ ! -f $SIGNED_STATEMENT_FILE ]; then exit 126 fi -echo "Register the SCITT Signed Statement to https://app.datatrails.ai/archivist/v1/publicscitt/entries" +# --datatrails-url $DATATRAILS_URL \ +echo "Register the SCITT Signed Statement to $DATATRAILS_URL/archivist/v1/publicscitt/entries" python /scripts/register_signed_statement.py \ --signed-statement-file $SIGNED_STATEMENT_FILE \ --output-file $TRANSPARENT_STATEMENT_FILE \ + --datatrails-url https://app.dev-robin-0.dev.datatrails.ai \ --log-level INFO python /scripts/dump_cbor.py \ --input $TRANSPARENT_STATEMENT_FILE -# curl https://app.datatrails.ai/archivist/v2/publicassets/-/events?event_attributes.subject=$SUBJECT | jq +# curl https://$DATATRAILS_URL/archivist/v2/publicassets/-/events?event_attributes.subject=$SUBJECT | jq diff --git a/scitt-scripts/generate_signing_key.py b/scitt-scripts/generate_signing_key.py new file mode 100644 index 0000000..d2196e4 --- /dev/null +++ b/scitt-scripts/generate_signing_key.py @@ -0,0 +1,26 @@ +""" +Generates an EXAMPLE issuer signing key using python ecdsa +""" + +from ecdsa import SigningKey, NIST256p + +FILE_NAME = "scitt-signing-key.pem" + + +def generate_key(topem=True): + key = SigningKey.generate(curve=NIST256p) + if not topem: + return key + return key.to_pem() + + +def main(): + pem_key = generate_key(topem=True) + # Save the private key to a file + with open(FILE_NAME, "wb") as pem_file: + pem_file.write(pem_key) + print(f"PEM formatted private key generated and saved as '{FILE_NAME}'") + + +if __name__ == "__main__": + main() diff --git a/scitt-scripts/verify_receipt_signature.py b/scitt-scripts/getdidwebpub.py similarity index 57% rename from scitt-scripts/verify_receipt_signature.py rename to scitt-scripts/getdidwebpub.py index f7f7c83..6e70024 100644 --- a/scitt-scripts/verify_receipt_signature.py +++ b/scitt-scripts/getdidwebpub.py @@ -1,14 +1,6 @@ -""" Module for verifying the counter signed receipt signature """ - import re -from base64 import b64decode -import argparse - import requests - from jwcrypto import jwk - -from pycose.messages import Sign1Message from pycose.keys.curves import P384 from pycose.keys.keyparam import KpKty, EC2KpX, EC2KpY, KpKeyOps, EC2KpCurve from pycose.keys.keytype import KtyEC2 @@ -16,18 +8,6 @@ from pycose.keys import CoseKey from pycose.headers import KID -HEADER_LABEL_DID = 391 - - -def open_receipt(receipt_file: str) -> str: - """ - opens the receipt from the receipt file. - NOTE: the receipt is expected to be in base64 encoding. - """ - with open(receipt_file, encoding="UTF-8") as file: - receipt = file.read() - return receipt - def get_didweb_pubkey(didurl: str, kid: bytes) -> dict: """ @@ -90,55 +70,3 @@ def get_didweb_pubkey(didurl: str, kid: bytes) -> dict: return cose_key raise ValueError(f"no key with kid: {kid} in verification methods of did document") - - -def verify_receipt(receipt: str) -> bool: - """ - verifies the counter signed receipt signature - """ - - # base64 decode the receipt into a cose sign1 message - b64decoded_message = b64decode(receipt) - - # decode the cbor encoded cose sign1 message - message = Sign1Message.decode(b64decoded_message) - - # get the verification key from didweb - kid: bytes = message.phdr[KID] - didurl = message.phdr[HEADER_LABEL_DID] - - cose_key_dict = get_didweb_pubkey(didurl, kid) - cose_key = CoseKey.from_dict(cose_key_dict) - - message.key = cose_key - - # verify the counter signed receipt signature - verified = message.verify_signature() - - return verified - - -def main(): - """Verifies a counter signed receipt signature""" - - parser = argparse.ArgumentParser(description="Create a signed statement.") - - # signing key file - parser.add_argument( - "--receipt-file", - type=str, - help="filepath to the stored receipt, in base64 format.", - default="scitt-receipt.txt", - ) - - args = parser.parse_args() - - receipt = open_receipt(args.receipt_file) - - verified = verify_receipt(receipt) - - print(verified) - - -if __name__ == "__main__": - main() diff --git a/scitt-scripts/mmriver_algorithms.py b/scitt-scripts/mmriver_algorithms.py new file mode 100644 index 0000000..2c49723 --- /dev/null +++ b/scitt-scripts/mmriver_algorithms.py @@ -0,0 +1,99 @@ +""" +Selective copy of + +https://github.com/robinbryce/draft-bryce-cose-merkle-mountain-range-proofs/blob/main/algorithms.py + +Which is a reference implementation of + +https://robinbryce.github.io/draft-bryce-cose-merkle-mountain-range-proofs/draft-bryce-cose-merkle-mountain-range-proofs.html + + +""" +from typing import List +import hashlib + + +def included_root(i: int, nodehash: bytes, proof: List[bytes]) -> bytes: + """Apply the proof to nodehash to produce the implied root + + For a valid cose receipt of inclusion, using the returned root as the + detached payload will result in a receipt message whose signature can be + verified. + + Args: + i (int): the mmr index where `nodehash` is located. + nodehash (bytes): the value whose inclusion is being proven. + proof (List[bytes]): the siblings required to produce `root` from `nodehash`. + + Returns: + the root hash produced for `nodehash` using `path` + """ + + # set `root` to the value whose inclusion is to be proven + root = nodehash + + # set g to the zero based height of i. + g = index_height(i) + + # for each sibling in the proof + for sibling in proof: + # if the height of the entry immediately after i is greater than g, then + # i is a right child. + if index_height(i + 1) > g: + # advance i to the parent. As i is a right child, the parent is at `i+1` + i = i + 1 + # Set `root` to `H(i+1 || sibling || root)` + root = hash_pospair64(i + 1, sibling, root) + else: + # Advance i to the parent. As i is a left child, the parent is at `i + (2^(g+1))` + i = i + (2 << g) + # Set `root` to `H(i+1 || root || sibling)` + root = hash_pospair64(i + 1, root, sibling) + + # Set g to the height index above the current + g = g + 1 + + # Return the hash produced. If the path length was zero, the original nodehash is returned + return root + + +def index_height(i: int) -> int: + """Returns the 0 based height of the mmr entry indexed by i""" + # convert the index to a position to take advantage of the bit patterns afforded + pos = i + 1 + while not all_ones(pos): + pos = pos - (most_sig_bit(pos) - 1) + + return pos.bit_length() - 1 + + +def hash_pospair64(pos: int, a: bytes, b: bytes) -> bytes: + """ + Compute the hash of pos || a || b + + Args: + pos (int): the 1-based position of an mmr node. If a, b are left and + right children, pos should be the parent position. + a (bytes): the first value to include in the hash + b (bytes): the second value to include in the hash + + Returns: + The value for the node identified by pos + """ + h = hashlib.sha256() + h.update(pos.to_bytes(8, byteorder="big", signed=False)) + h.update(a) + h.update(b) + return h.digest() + + +def most_sig_bit(pos) -> int: + """Returns the mask for the the most significant bit in pos""" + return 1 << (pos.bit_length() - 1) + + +def all_ones(pos) -> bool: + """Returns true if all bits, starting with the most significant, are 1""" + imsb = pos.bit_length() - 1 + mask = (1 << (imsb + 1)) - 1 + return pos == mask diff --git a/scitt-scripts/register_signed_statement.py b/scitt-scripts/register_signed_statement.py index b5d892e..84d5bfa 100644 --- a/scitt-scripts/register_signed_statement.py +++ b/scitt-scripts/register_signed_statement.py @@ -7,9 +7,13 @@ import os import sys from time import sleep as time_sleep +import requests from pycose.messages import Sign1Message -import requests + +from api_requests import get_app_auth_header +from v3leafhash import leaf_hash +from verify_receipt import verify_receipt # CWT header label comes from version 4 of the scitt architecture document # https://www.ietf.org/archive/id/draft-ietf-scitt-architecture-04.html#name-issuer-identity @@ -30,45 +34,14 @@ POLL_TIMEOUT = 120 POLL_INTERVAL = 10 - -def get_dt_auth_header(logger: logging.Logger) -> str: - """ - Get DataTrails bearer token from OIDC credentials in env - """ - # Pick up credentials from env - client_id = os.environ.get("DATATRAILS_CLIENT_ID") - client_secret = os.environ.get("DATATRAILS_CLIENT_SECRET") - - if client_id is None or client_secret is None: - logger.error( - "Please configure your DataTrails credentials in the shell environment" - ) - sys.exit(1) - - # Get token from the auth endpoint - response = requests.post( - "https://app.datatrails.ai/archivist/iam/v1/appidp/token", - data={ - "grant_type": "client_credentials", - "client_id": client_id, - "client_secret": client_secret, - }, - timeout=REQUEST_TIMEOUT, - ) - logging.info("Response: %s", response) - - if response.status_code != 200: - logger.error("FAILED to acquire bearer token %s, %s", response.text, response.reason) - logger.debug(response) - sys.exit(1) - - # Format as a request header - res = response.json() - return f'{res["token_type"]} {res["access_token"]}' +DATATRAILS_URL_DEFAULT="https://app.datatrails.ai" def submit_statement( - statement_file_path: str, headers: dict, logger: logging.Logger + statement_file_path: str, + headers: dict, + logger: logging.Logger, + datatrails_url: str = DATATRAILS_URL_DEFAULT, ) -> str: logging.info("submit_statement()") """ @@ -82,34 +55,34 @@ def submit_statement( logging.info("statement_file_path opened: %s", statement_file_path) # Make the POST request response = requests.post( - "https://app.datatrails.ai/archivist/v1/publicscitt/entries", + f"{datatrails_url}/archivist/v1/publicscitt/entries", headers=headers, data=data, timeout=REQUEST_TIMEOUT, ) if response.status_code != 200: - logger.error("FAILED to submit statement response.raw: %s", response.raw) - logger.error("FAILED to submit statement response.text: %s", response.text) - logger.error("FAILED to submit statement response.reason: %s", response.reason) + logger.debug("FAILED to submit statement response.raw: %s", response.raw) + logger.debug("FAILED to submit statement response.text: %s", response.text) + logger.debug("FAILED to submit statement response.reason: %s", response.reason) logger.debug(response) - sys.exit(1) + raise Exception("Failed to submit statement") # Make sure it's actually in process and wil work res = response.json() if not "operationID" in res: - logger.error("FAILED No OperationID locator in response") - logger.debug(res) - sys.exit(1) + raise Exception("FAILED No OperationID locator in response") return res["operationID"] -def get_operation_status(operation_id: str, headers: dict) -> dict: +def get_operation_status( + operation_id: str, headers: dict, datatrails_url: str = DATATRAILS_URL_DEFAULT +) -> dict: """ Gets the status of a long-running registration operation """ response = requests.get( - f"https://app.datatrails.ai/archivist/v1/publicscitt/operations/{operation_id}", + f"{datatrails_url}/archivist/v1/publicscitt/operations/{operation_id}", headers=headers, timeout=REQUEST_TIMEOUT, ) @@ -119,7 +92,12 @@ def get_operation_status(operation_id: str, headers: dict) -> dict: return response.json() -def wait_for_entry_id(operation_id: str, headers: dict, logger: logging.Logger) -> str: +def wait_for_entry_id( + operation_id: str, + headers: dict, + logger: logging.Logger, + datatrails_url: str = DATATRAILS_URL_DEFAULT, +) -> str: """ Polls for the operation status to be 'succeeded'. """ @@ -129,9 +107,8 @@ def wait_for_entry_id(operation_id: str, headers: dict, logger: logging.Logger) logger.info("starting to poll for operation status 'succeeded'") for _ in range(poll_attempts): - try: - operation_status = get_operation_status(operation_id, headers) + operation_status = get_operation_status(operation_id, headers, datatrails_url) # pylint: disable=fixme # TODO: ensure get_operation_status handles error cases from the rest request @@ -149,50 +126,107 @@ def wait_for_entry_id(operation_id: str, headers: dict, logger: logging.Logger) raise TimeoutError("signed statement not registered within polling duration") -def attach_receipt( - entry_id: str, - signed_statement_filepath: str, - transparent_statement_file_path: str, - headers: dict, - logger: logging.Logger, -): - """ - Given a Signed Statement and a corresponding Entry ID, fetch a Receipt from - the Transparency Service and write out a complete Transparent Statement - """ +def get_receipt(entry_id: str, request_headers: dict, datatrails_url: str = DATATRAILS_URL_DEFAULT): + """Get the receipt for the provided entry id""" # Get the receipt response = requests.get( - f"https://app.datatrails.ai/archivist/v1/publicscitt/entries/{entry_id}/receipt", - headers=headers, + f"{datatrails_url}/archivist/v1/publicscitt/entries/{entry_id}/receipt", + headers=request_headers, timeout=REQUEST_TIMEOUT, ) if response.status_code != 200: - logger.error("FAILED to get receipt") - logger.debug(response) - sys.exit(1) + raise Exception("FAILED to get receipt") + + return response.content + + +def attach_receipt( + receipt: bytes, + signed_statement_filepath: str, + transparent_statement_file_path: str, +): + """ + Given a Signed Statement file on disc and the provided receipt content, from the Transparency Service, + read the statement fromm disc, attach the provided receipt, writing the re-encoded result back to disc. + The resulting re-encoded statement is now a Transparent Statement. - logger.debug(response.content) + The caller is expected to have *verified* the receipt first. + """ # Open up the signed statement with open(signed_statement_filepath, "rb") as data_file: data = data_file.read() message = Sign1Message.decode(data) - logger.debug(message) # Add receipt to the unprotected header and re-encode - message.uhdr["receipts"] = [response.content] + message.uhdr["receipts"] = [receipt] ts = message.encode(sign=False) # Write out the updated Transparent Statement with open(transparent_statement_file_path, "wb") as file: file.write(ts) - logger.info("File saved successfully") + + +def get_leaf_hash(entry_id: str, datatrails_url: str = DATATRAILS_URL_DEFAULT) -> str: + """Obtain the leaf hash for a given Entry ID + + The leaf hash is the value that is proven by the COSE Receipt attached to the transparent statement. + + For SCITT Statements registered with datatrails, the leaf hash currently includes content + that is additional to the signed statement. + It currently requires a proprietary API call to DataTrails to obtain that content. + The content is available on a public access endpoint (no authorisation is required) + + These limitations are not inherent to the SCITT architecture. + The are specific to the current DataTrails implementation, and will be addressed in future releases. + + Note that the leaf hash can be read directly from the merkle log given only information in the receipt. + And, as the log data is public and easily replicable, this does not require interaction with datatrails. + + However, on its own, this does not show that the leaf hash commits the statement to the log. + """ + identity = api_entryid_to_identity(entry_id) + public_url = f"{datatrails_url}/archivist/v2/public{identity}" + response = requests.get(public_url, timeout=REQUEST_TIMEOUT) + response.raise_for_status() + event = response.json() + return leaf_hash(event) + + +def api_entryid_to_identity(entryid: str) -> str: + """ + Convert a SCITT Entry ID to a DataTrails Event Identity + """ + eventsplit = entryid.split("_events_") + eventUUID = eventsplit[-1] + + bucketsplit = eventsplit[0].split("assets_") + bucketUUID = bucketsplit[-1] + + return f"assets/{bucketUUID}/events/{eventUUID}" + + +def get_dt_auth_header(logger: logging.Logger, fqdn: str) -> str: + """ + Get DataTrails bearer token from OIDC credentials in env + """ + try: + return get_app_auth_header(fqdn=fqdn) + except Exception as e: + logger.error(repr(e)) + sys.exit(1) def main(): """Creates a Transparent Statement""" parser = argparse.ArgumentParser(description="Create a signed statement.") + parser.add_argument( + "--datatrails-url", + type=str, + help="The url of the DataTrails transparency service.", + default=DATATRAILS_URL_DEFAULT, + ) # Signed Statement file parser.add_argument( @@ -217,6 +251,12 @@ def main(): help="log level. for any individual poll errors use DEBUG, defaults to WARNING", default="WARNING", ) + parser.add_argument( + "--verify", + help="verify the result of registraion", + default=False, + action="store_true", + ) args = parser.parse_args() @@ -225,31 +265,51 @@ def main(): # Get auth logging.info("Get Auth Headers") - auth_headers = {"Authorization": get_dt_auth_header(logger)} + try: + auth_headers = {"Authorization": get_app_auth_header(args.datatrails_url)} + except Exception as e: + logger.error(repr(e)) + sys.exit(1) # Submit Signed Statement to DataTrails logging.info("submit_statement: %s", args.signed_statement_file) - op_id = submit_statement(args.signed_statement_file, auth_headers, logger) + op_id = submit_statement( + args.signed_statement_file, auth_headers, logger, datatrails_url=args.datatrails_url + ) logging.info("Successfully submitted with Operation ID %s", op_id) - # If the client wants the Transparent Statement, wait for it - if args.output_file != "": + # If the client wants the Transparent Statement or receipt, wait for registration to complete + if args.verify or args.output_file != "": logging.info("Waiting for registration to complete") - # Wait for the registration to complete try: - entry_id = wait_for_entry_id(op_id, auth_headers, logger) + entry_id = wait_for_entry_id(op_id, auth_headers, logger, datatrails_url=args.datatrails_url) except TimeoutError as e: logger.error(e) sys.exit(1) + logger.info("Fully Registered with Entry ID %s", entry_id) - logger.info("Fully Registered with Entry ID %s", entry_id) + leaf = get_leaf_hash(entry_id, datatrails_url=args.datatrails_url) + logger.info("Leaf Hash: %s", leaf.hex()) - # Attach the receipt - attach_receipt( - entry_id, args.signed_statement_file, args.output_file, auth_headers, logger - ) + if args.verify or args.output_file != "": + # Don't attach the receipt without verifying the log returned a receipt + # that genuinely represents the expected content. + + receipt = get_receipt(entry_id, auth_headers, datatrails_url=args.datatrails_url) + if not verify_receipt(receipt, leaf): + logger.info("Receipt verification failed") + sys.exit(1) + + if args.output_file == "": + return + + # Attach the receipt + attach_receipt( + receipt, args.signed_statement_file, args.output_file + ) + logger.info(f"File saved successfully {args.output_file}") if __name__ == "__main__": diff --git a/scitt-scripts/requirements.txt b/scitt-scripts/requirements.txt index 8156493..31a6399 100644 --- a/scitt-scripts/requirements.txt +++ b/scitt-scripts/requirements.txt @@ -2,4 +2,5 @@ pycose~=1.0.1 ecdsa~=0.18.0 jwcrypto~=1.5.0 -requests>=2.32.0 \ No newline at end of file +requests>=2.32.0 +bencode.py~=4.0.0 \ No newline at end of file diff --git a/scitt-scripts/v3leafhash.py b/scitt-scripts/v3leafhash.py new file mode 100644 index 0000000..d013664 --- /dev/null +++ b/scitt-scripts/v3leafhash.py @@ -0,0 +1,75 @@ +""" +See KB: https://support.datatrails.ai/hc/en-gb/articles/18120936244370-How-to-independently-verify-Merkle-Log-Events-recorded-on-the-DataTrails-transparency-ledger#h_01HTYDD6ZH0FV2K95D61RQ61ZJ +""" +from typing import List +import hashlib +import bencodepy + +V3FIELDS = [ + "identity", + "event_attributes", + "asset_attributes", + "operation", + "behaviour", + "timestamp_declared", + "timestamp_accepted", + "timestamp_committed", + "principal_accepted", + "principal_declared", + "tenant_identity", +] + + +def leaf_hash(event: dict, domain=0) -> bytes: + """ + Return the leaf hash which is proven by a scitt receipt for the provided CONFIRMED event + + Computes: + + SHA256(BYTE(0x00) || BYTES(idTimestamp) || BENCODE(redactedEvent)) + + See KB: https://support.datatrails.ai/hc/en-gb/articles/18120936244370-How-to-independently-verify-Merkle-Log-Events-recorded-on-the-DataTrails-transparency-ledger#h_01HTYDD6ZH0FV2K95D61RQ61ZJ + """ + salt = get_mmrsalt(event, domain) + preimage = get_v3preimage(event) + return hashlib.sha256(salt + preimage).digest() + + +def get_mmrsalt(event: dict, domain=0) -> List[bytes]: + """ + Get the public salt details from a v3 event record. + + Returns the bytes comprised of + + DOMAIN || BYTES(IDTIMESTAMP) + + """ + + # Note this value is also present in the trie index data in the public merkle log + # which can be obtained directly from app.datatrails.ai/verifiabledata/merklelogs + # without authentication. veracity provides cli tooling for this sort of thing. + hexidtimestamp = event["merklelog_entry"]["commit"]["idtimestamp"] + idtimestamp = bytes.fromhex(hexidtimestamp[2:]) # strip the epoch from the front + return bytes([domain]) + idtimestamp + + +def get_v3preimage(event: dict) -> bytes: + """ + Calculate the leaf hash of a V3 leaf + """ + + preimage = {} + for field in V3FIELDS: + # Ensure the leaf contains all required fields + try: + value = event[field] + except KeyError: + raise KeyError(f"V3 leaf is missing required field: {field}") + + preimage[field] = value + + # their is only one occurence + if preimage["identity"].startswith("public"): + preimage["identity"] = preimage["identity"].replace("public", "") + + return bencodepy.encode(preimage) diff --git a/scitt-scripts/verify_receipt.py b/scitt-scripts/verify_receipt.py new file mode 100644 index 0000000..1ff0d2a --- /dev/null +++ b/scitt-scripts/verify_receipt.py @@ -0,0 +1,266 @@ +""" Module for verifying the counter signed receipt signature """ +from typing import List, Dict +from dataclasses import dataclass + +import re +from base64 import b64decode +import argparse + +import cbor2 + +from pycose.keys.curves import P384 +from pycose.keys.keytype import KtyEC2 +from pycose.keys.keyparam import KpKty, KpKeyOps, KpAlg, EC2KpD, EC2KpX, EC2KpY, EC2KpCurve +from pycose.keys.keyops import VerifyOp +from pycose.keys import CoseKey +from pycose.headers import KID, Algorithm +from pycose.messages.cosebase import CoseBase +from pycose.messages import Sign1Message +from pycose.messages.signcommon import SignCommon + +from mmriver_algorithms import included_root + +# COSE Receipts headers +# https://cose-wg.github.io/draft-ietf-cose-merkle-tree-proofs/draft-ietf-cose-merkle-tree-proofs.html#name-new-entries-to-the-cose-hea +HEADER_LABEL_DID = 391 +HEADER_LABEL_COSE_RECEIPTS_VDS = 395 +HEADER_LABEL_COSE_RECEIPTS_VDP = 396 +HEADER_LABEL_COSE_RECEIPTS_INCLUSION_PROOFS = -1 +# MMRIVER headers +# https://robinbryce.github.io/draft-bryce-cose-merkle-mountain-range-proofs/draft-bryce-cose-merkle-mountain-range-proofs.html#name-receipt-of-inclusion +HEADER_LABEL_MMRIVER_VDS_TREE_ALG = 2 +HEADER_LABEL_MMRIVER_INCLUSION_PROOF_INDEX = 1 +HEADER_LABEL_MMRIVER_INCLUSION_PROOF_PATH = 2 + +# CWT header label comes from version 4 of the scitt architecture document +# https://www.ietf.org/archive/id/draft-ietf-scitt-architecture-04.html#name-issuer-identity +HEADER_LABEL_CWT = 13 + +# Various CWT header labels come from: +# https://www.rfc-editor.org/rfc/rfc8392.html#section-3.1 +HEADER_LABEL_CWT_ISSUER = 1 +HEADER_LABEL_CWT_SUBJECT = 2 + +# CWT CNF header labels come from: +# https://datatracker.ietf.org/doc/html/rfc8747#name-confirmation-claim +HEADER_LABEL_CWT_CNF = 8 +HEADER_LABEL_CNF_COSE_KEY = 1 + +# CWT CNF header labels come from: +# https://datatracker.ietf.org/doc/html/rfc8747#name-confirmation-claim +HEADER_LABEL_CWT_CNF = 8 +HEADER_LABEL_CNF_COSE_KEY = 1 + + +def cnf_key_from_phdr(phdr: dict) -> CoseKey: + """ + Extracts the confirmation key from the cwt claims. + """ + cwt_claims = phdr.get(HEADER_LABEL_CWT) + # Note: issuer is the key vault key identity, subject is the tenant's merkle log tile path + cnf_claim = cwt_claims.get(HEADER_LABEL_CWT_CNF) + if not cnf_claim: + raise ValueError("Missing confirmation claim in cwt claims") + key = cnf_claim.get(HEADER_LABEL_CNF_COSE_KEY) + if not key: + raise ValueError("Missing confirmation key in cwt claims") + + key = key.copy() + + # There is a legacy "deliberate" bug in the common datatrails cose library, due to a short cut for jwt compatibility. + # We encode the key as 'EC', the cose spec sais it MUST be 'EC2' + if key.get(KpKty.identifier) == "EC": + key[KpKty.identifier] = KtyEC2.identifier + + # A bug in our implementation sets key curve as 'P-384' rather than 'P_384'. + if key[EC2KpCurve.identifier] == "P-384": + key[EC2KpCurve.identifier] = P384.identifier + + if not KpKeyOps.identifier in key: + key[KpKeyOps.identifier] = [VerifyOp] + + try: + key = CoseKey.from_dict(key) + except Exception as e: + raise ValueError(f"Error extracting confirmation key: {e}") + return key + + +def verify_receipt(receipt: bytes, leaf: bytes) -> bool: + """ + Verifies the counter signed receipt signature + Args: + receipt: COSE Receipt as cbor encoded bytes + leaf: append only log leaf hash proven by the receipt. provided as bytes + """ + + message = decode_sign1_detached(receipt) + + # While many proofs may be supplied, only the first is used here. + # The checks will raise unless there is at least one proof found. + # Note that when the proof is None it means the inclusion path is empty and the leaf is the payload of the receipt. + # (And is also a direct member of the accumulator) + proof = mmriver_inclusion_proofs(message.phdr, message.uhdr)[0] + path = proof.path or [] + + root = included_root(proof.index, leaf, path) + message.payload = root + + # Extract the signing key from the cwt claims in the protected header + # The receipt signing key is the merklelog consistency checkpoint siging key. + # Which is declared publicly in many places including the DataTrails web ui. + # Note that this is *not* the same as the signed statement counter signing key. + + signing_key = cnf_key_from_phdr(message.phdr) + message.key = signing_key + return message.verify_signature() + + +@dataclass +class InclusionProof: + index: int + path: List[bytes] + + +def mmriver_inclusion_proofs(phdr: dict, uhdr: dict) -> List[InclusionProof]: + """ + Checks the headers of the mmriver receipt for the correct values + and returns a list of inclusion proofs. + """ + # check the receipt headers + try: + vds = phdr[HEADER_LABEL_COSE_RECEIPTS_VDS] + except KeyError: + raise KeyError("Missing COSE Receipt VDS header") + + if vds != HEADER_LABEL_MMRIVER_VDS_TREE_ALG: + raise ValueError("COSE Receipt VDS tree algorithm is not MMRIVER") + + try: + vds = uhdr[HEADER_LABEL_COSE_RECEIPTS_VDP] + except KeyError: + raise KeyError("Missing COSE Receipt VDS header") + + try: + inclusion_proofs = vds[HEADER_LABEL_COSE_RECEIPTS_INCLUSION_PROOFS] + except KeyError: + raise KeyError("Missing COSE Receipt VDS inclusion proof") + + if len(inclusion_proofs) == 0: + raise ValueError("COSE Receipt VDS inclusion proof count is not at least 1") + + proofs: List[Dict] = [] + # Now check the MMRIVER specifics + for inclusion_proof in inclusion_proofs: + if HEADER_LABEL_MMRIVER_INCLUSION_PROOF_INDEX not in inclusion_proof: + raise ValueError("Missing mmr-index from MMRIVER COSE Receipt of inclusion") + if HEADER_LABEL_MMRIVER_INCLUSION_PROOF_PATH not in inclusion_proof: + raise ValueError( + "Missing inclusion-proof from MMRIVER COSE Receipt of inclusion" + ) + + proofs.append( + InclusionProof( + inclusion_proof[HEADER_LABEL_MMRIVER_INCLUSION_PROOF_INDEX], + inclusion_proof[HEADER_LABEL_MMRIVER_INCLUSION_PROOF_PATH], + ) + ) + + return proofs + + +def decode_sign1_detached( + message: bytes, payload=None, *args, **kwargs +) -> Sign1Message: + """ + Decodes a COSE sign1 message from a message with a detached payload. + + For COSE Receipts the caller can not provide payload in advance. + The payload is dependent on the receipt's unprotected header contents which are only available + after calling this function. + + WARNING: The message will NOT VERIFY unless the payload is replaced with the payload that was signed. + + Args: + message: the bytes of the COSE sign1 message + payload: + Used as the payload if not none, otherwise payload is forced to b''. + Verification will fail until the correct payload has been set on the returned + Sign1Message. + args: passed on to Sign1Message.__init__ + kwargs: passed on to Sign1Message.__init__ + """ + # decode the cbor encoded cose sign1 message, per the CoseBase implementation + try: + cbor_msg = cbor2.loads(message) + cose_obj = cbor_msg.value + except AttributeError: + raise AttributeError("Message was not tagged.") + except ValueError: + raise ValueError("Decode accepts only bytes as input.") + + if payload is None: + payload = b"" + + cose_obj[2] = payload # force replace with b'' if payload is detached, due to lack of pycose support + return Sign1Message.from_cose_obj(cose_obj, True) + + +def is_hexadecimal(s: str) -> bool: + """ + Checks if a string is hexadecimal. The string may optionally start with '0x'. + The exact string '0x' is also considered valid. + """ + pattern = r"^0x[0-9a-fA-F]*$|^[0-9a-fA-F]+$" + return bool(re.match(pattern, s)) + + +def open_receipt(receipt_file: str) -> str: + """ + opens the receipt from the receipt file. + NOTE: the receipt is expected to be in base64 encoding. + """ + with open(receipt_file, encoding="UTF-8") as file: + receipt = file.read() + return receipt + + +def main(): + """Verifies a signed statement receipt + + The inclusion proof is verified to produce the receipt payload. + The receipt signature is then verified. + + The leaf value is the merkle log entry committing the statement to the append only log. + For the DataTrails platform, the leaf hash is available in the ui. + It can also be produced from the event data returned by the events api. + The event data includes the signed statement, allong with other platform specific metadata, + that can be ignored by generic scitt consumers. + """ + + parser = argparse.ArgumentParser( + description="Verify the receipt for a signed statement." + ) + + # signing key file + parser.add_argument( + "--receipt-file", + type=str, + help="filepath to the stored receipt, in base64 format.", + default="scitt-receipt.txt", + ) + parser.add_argument( + "--leaf", type=str, help="The append only log leaf hash proven by the receipt." + ) + + args = parser.parse_args() + + receipt_b64 = open_receipt(args.receipt_file) + receipt = b64decode(receipt_b64) + verified = verify_receipt(receipt, bytes.fromhex(args.leaf)) + + print(verified) + + +if __name__ == "__main__": + main()