diff --git a/README.md b/README.md index a0409cb..e2e1b53 100644 --- a/README.md +++ b/README.md @@ -156,8 +156,15 @@ Open a Python shell in a new terminal window and then run the following: The Primary's update_cycle() call: - fetches and validates all signed metadata for the vehicle, from the Director and Image repositories -- fetches all images that the Director instructs this vehicle to install, excluding any that do not exactly match corresponding images on the Image repository. Any images fetched from the repositories that do not match validated metadata are discarded. +- reads the Director's Targets metadata to determine what targets the Director +has instructed this vehicle to install, storing the trustworthy target info for +each (hash, length, etc.), and comparing it to similar info from the Image +repository. A mismatch results in that installation instruction being ignored. +- fetches the target images indicated and compares them to the trusted info +(hash, length, etc.). Any images fetched +from the repositories that do not match validated metadata are discarded. - queries the Timeserver for a signed attestation about the current time, including in it any nonces sent by Secondaries, so that Secondaries may trust that the time returned is at least as recent as their sent nonce +- validates the Timeserver's time attestations for itself and updates its time - generates a Vehicle Version Manifest with some vehicle metadata and all ECU Version Manifests received from Secondaries, describing currently installed images, most recent times available to each ECU, and reports of any attacks observed by Secondaries (can also be called directly: `dp.generate_signed_vehicle_manifest()`) - sends that Vehicle Version Manifest to the Director (can also be called directly: `dp.submit_vehicle_manifest_to_director()`) @@ -177,10 +184,11 @@ Example setting up a different Primary for a different vehicle: Primary have finished starting up and are hosting/listening.) Here, we start a single Secondary ECU and generate a signed ECU Manifest with information about the "firmware" that it is running, which we send to the -Primary. +Primary. Note that these instructions will demonstrate a full-verification +Secondary; for partial-verification Secondaries, see +[this section below](#partial-verification-secondaries). Open a Python shell in a new terminal window and then run the following: - ```python >>> import demo.demo_secondary as ds >>> ds.clean_slate() @@ -194,8 +202,17 @@ If the Secondary is in a different vehicle from the default vehicle, this call s The Secondary's update_cycle() call: - fetches and validates the signed metadata for the vehicle from the Primary -- fetches any image that the Primary assigns to this ECU, validating that against the instructions of the Director in the Director's metadata, and against file info available in the Image Repository's metadata. If the image from the Primary does not match validated metadata, it is discarded. -- fetches the latest Timeserver attestation from the Primary, checking for the nonce this Secondary last sent. If that nonce is included in the signed attestation from the Timeserver and the signature checks out, this time is saved as valid and reasonably recent. +- looks for target info (hash, length, etc.) in the now-validated Targets +metadata that includes this Secondary's ECU identifier, indicating an +instruction from the Director to install that target firmware +- fetches any image that the Primary assigns to this ECU, validating that +against the target info from the validated Director metadata. If the image from +the Primary does not match validated metadata, it is discarded. +- fetches the latest Timeserver attestation from the Primary, checking for the nonce this Secondary last sent. If that nonce is included in the signed attestation +from the Timeserver, the signature is correct and by the expected key, and the +time is more recent than the last validated time, this time is saved as valid, +allowing this Secondary to disregard any metadata with an expiration date +earlier than that time. - generates an ECU Version Manifest that indicates the secure hash of the image currently installed on this Secondary, the latest validated times, and a string describing attacks detected (can also be called directly: `ds.generate_signed_ecu_manifest()`) - submits the ECU Version Manifest to the Primary (can also be called directly: `ds.submit_ecu_manifest_to_primary()`) @@ -609,6 +626,56 @@ have been saved by the Primary. ``` +# Partial-Verification Secondaries +The Secondaries described and employed above ran full verification, employing +the full suite of TUF and Uptane security checks. Uptane provides for a +less demanding alternative: partial-verification Secondaries. These perform +a few checks instead of the full set, at a measured cost to security, allowing +weaker ECUs to still operate as Uptane-conformant Secondaries. + +The distinction is defined +[in Section 8.3 of the Implementation Specification](https://docs.google.com/document/d/1wjg3hl0iDLNh7jIRaHl3IXhwm0ssOtDje5NemyTBcaw/edit?pli=1#heading=h.22u629s8u37q). +Briefly, partial verification entails one signature check to validate metadata, +and one signature check whenever the minimum time is ratcheted forward (for +metadata expiration purposes). Partial-verification Secondaries need to know +the Director's Targets role public key and the Timeserver's public key. + +You can run a demo partial-verification Secondary like so: +(Mind that you have [the Uptane services](#window-1-the-uptane-services) and +[a Primary client](#window-2-the-primary-clients) both running). + +```python +>>> import demo.demo_secondary as ds +>>> ds.clean_slate(partial_verifying=True) +>>> ds.update_cycle() +``` + +The same optional arguments apply for the demo partial-verification Secondary +as for the full-verification version [above](#window-3-the-secondary-clients) +(ECU id/serial, vehicle id/VIN, Primary port, etc.). + + +The Secondary's update_cycle() call does the following: +- fetches the signed Director's Targets role metadata for the vehicle from the +Primary +- checks for a valid signature over the Targets role metadata, from the key it +knows to be the Director's Targets key. +- looks for target info (hash, length, etc.) in the now-validated Targets +metadata that includes this Secondary's ECU identifier, indicating an +instruction from the Director to install that target firmware +- fetches the image the Primary assigns to this ECU +- validates that image against the target info from the validated Targets +metadata. If the image from the Primary does not match validated metadata, it is discarded. +- fetches the latest Timeserver attestation from the Primary, checking for the nonce this Secondary last sent. If that nonce is included in the signed attestation from the Timeserver, the signature is correct and by the expected key, and the time +is more recent than the last validated time, this time is saved as valid, +allowing this Secondary to disregard any metadata with an expiration date +earlier than that time. +- generates an ECU Version Manifest that indicates the secure hash of the image currently installed on this Secondary, the latest validated times, and a string describing attacks detected (can also be run directly: `ds.generate_signed_ecu_manifest()`) +- submits the ECU Version Manifest to the Primary (can also be run directly: `ds.submit_ecu_manifest_to_primary()`) + + + + # Testing diff --git a/demo/demo_secondary.py b/demo/demo_secondary.py index bba2027..e2bd640 100644 --- a/demo/demo_secondary.py +++ b/demo/demo_secondary.py @@ -79,7 +79,8 @@ def clean_slate( vin=_vin, ecu_serial=_ecu_serial, primary_host=None, - primary_port=None): + primary_port=None, + partial_verifying=False): """ """ @@ -91,7 +92,6 @@ def clean_slate( global nonce global CLIENT_DIRECTORY global attacks_detected - _vin = vin _ecu_serial = ecu_serial @@ -142,6 +142,11 @@ def clean_slate( tuf.conf.repository_directory = CLIENT_DIRECTORY # This setting should probably be called CLIENT_DIRECTORY instead, post-TAP4. + #Importing director's public key if secondary is partial verification + if partial_verifying: + key_director_pub = demo.import_public_key('director') + else: + key_director_pub = None # Initialize a full verification Secondary ECU. # This also generates a nonce to use in the next time query, sets the initial @@ -154,7 +159,9 @@ def clean_slate( ecu_key=ecu_key, time=clock, firmware_fileinfo=factory_firmware_fileinfo, - timeserver_public_key=key_timeserver_pub) + timeserver_public_key=key_timeserver_pub, + director_public_key = key_director_pub, + partial_verifying = partial_verifying) @@ -300,9 +307,14 @@ def update_cycle(): # from it like so: time_attestation = time_attestation.data - # Download the metadata from the Primary in the form of an archive. This - # returns the binary data that we need to write to file. - metadata_archive = pserver.get_metadata(secondary_ecu.ecu_serial) + # Obtain metadata from the Primary, either as a single role file (the + # Director Targets role file) if this is a partial-verification Secondary, + # or as an archive that includes all the metadata files if this is a full- + # verification Secondary. + # This call returns the binary data that we need to write to the file. + + metadata_from_primary = pserver.get_metadata( + secondary_ecu.ecu_serial, secondary_ecu.partial_verifying) # Validate the time attestation and internalize the time. Continue # regardless. @@ -320,16 +332,23 @@ def update_cycle(): #else: # print(GREEN + 'Official time has been updated successfully.' + ENDCOLORS) - # Dump the archive file to disk. - archive_fname = os.path.join( - secondary_ecu.full_client_dir, 'metadata_archive.zip') - - with open(archive_fname, 'wb') as fobj: - fobj.write(metadata_archive.data) - - # Now tell the Secondary reference implementation code where the archive file - # is and let it expand and validate the metadata. - secondary_ecu.process_metadata(archive_fname) + # Write the metadata retrieved from the Primary to disk, whether it is a + # single role file (partial verification) or the full metadata archive. + if secondary_ecu.partial_verifying: + director_targets_role = os.path.join( + secondary_ecu.full_client_dir, 'director_targets.'+tuf.conf.METADATA_FORMAT) + with open(director_targets_role, 'wb') as f: + f.write(metadata_from_primary.data) + secondary_ecu.process_metadata(director_targets_role) + else: + archive_fname = os.path.join( + secondary_ecu.full_client_dir, 'metadata_from_primary.zip') + with open(archive_fname, 'wb') as fobj: + fobj.write(metadata_from_primary.data) + + # Now tell the Secondary reference implementation code where the archive + # file is and let it expand (as necessary) and validate the metadata. + secondary_ecu.process_metadata(archive_fname) # As part of the process_metadata call, the secondary will have saved diff --git a/samples/director_targets_bad_sig_v2.der b/samples/director_targets_bad_sig_v2.der new file mode 100644 index 0000000..db7f61f Binary files /dev/null and b/samples/director_targets_bad_sig_v2.der differ diff --git a/samples/director_targets_bad_sig_v2.json b/samples/director_targets_bad_sig_v2.json new file mode 100644 index 0000000..cb92e95 --- /dev/null +++ b/samples/director_targets_bad_sig_v2.json @@ -0,0 +1,30 @@ +{ + "signatures": [ + { + "keyid": "630cf584f392430b2119a4395e39624e86f5e5c5374507a789be5cf35bf090d6", + "method": "ed25519", + "sig": "a560312be8ebd451c689724a3ec2fcbdb7ba708b27669861e0146bd61a2a15f04baf90edc75edd2e437ac680a5d2b3d76b3e4b9766a8a961108af6b4e501950e" + } + ], + "signed": { + "_type": "Targets", + "delegations": { + "keys": {}, + "roles": [] + }, + "expires": "2021-01-10T15:14:21Z", + "targets": { + "/firmware13.img": { + "custom": { + "ecu_serial": "20000" + }, + "hashes": { + "sha256": "daeec2555599b8e7a82b6f1339d5f419346b57a0eb7a39ee6334b8f205595752", + "sha512": "1570937a84e9e74e35f5e56a8f8518c91e18258cffc8ace249d9acf173d1845d82002583b1b53373b79edf52494d524f2619f5f1896f3085038deca92c950486" + }, + "length": 20 + } + }, + "version": 2 + } +} diff --git a/samples/director_targets_empty_v1.der b/samples/director_targets_empty_v1.der new file mode 100644 index 0000000..99c8f2e Binary files /dev/null and b/samples/director_targets_empty_v1.der differ diff --git a/samples/director_targets_empty_v1.json b/samples/director_targets_empty_v1.json new file mode 100644 index 0000000..d1d0018 --- /dev/null +++ b/samples/director_targets_empty_v1.json @@ -0,0 +1,19 @@ +{ + "signatures": [ + { + "keyid": "630cf584f392430b2119a4395e39624e86f5e5c5374507a789be5cf35bf090d6", + "method": "ed25519", + "sig": "f474702d61728fbfa01cc6d9b6e9b6681710680927efc2cdcbae1e758b89ddb355c8ce2c636ab26160040c9a575265b81311a4e40349463f34e784b48d4e850e" + } + ], + "signed": { + "_type": "Targets", + "delegations": { + "keys": {}, + "roles": [] + }, + "expires": "2031-10-21T18:00:34Z", + "targets": {}, + "version": 1 + } +} \ No newline at end of file diff --git a/samples/director_targets_empty_v2.der b/samples/director_targets_empty_v2.der new file mode 100644 index 0000000..a733913 Binary files /dev/null and b/samples/director_targets_empty_v2.der differ diff --git a/samples/director_targets_empty_v2.json b/samples/director_targets_empty_v2.json new file mode 100644 index 0000000..2eba167 --- /dev/null +++ b/samples/director_targets_empty_v2.json @@ -0,0 +1,19 @@ +{ + "signatures": [ + { + "keyid": "630cf584f392430b2119a4395e39624e86f5e5c5374507a789be5cf35bf090d6", + "method": "ed25519", + "sig": "3b6f11eb9382fdbdbbcce54641617d8ea159d065f70484a3b013641ef22619902b252692286f6741e1fd9d68b3805dc727136b0b3ccc29c51b4bc97ecaf2f708" + } + ], + "signed": { + "_type": "Targets", + "delegations": { + "keys": {}, + "roles": [] + }, + "expires": "2031-10-21T18:00:34Z", + "targets": {}, + "version": 2 + } +} \ No newline at end of file diff --git a/samples/director_targets_empty_v3.der b/samples/director_targets_empty_v3.der new file mode 100644 index 0000000..4a5d18f Binary files /dev/null and b/samples/director_targets_empty_v3.der differ diff --git a/samples/director_targets_empty_v3.json b/samples/director_targets_empty_v3.json new file mode 100644 index 0000000..4b976d9 --- /dev/null +++ b/samples/director_targets_empty_v3.json @@ -0,0 +1,19 @@ +{ + "signatures": [ + { + "keyid": "630cf584f392430b2119a4395e39624e86f5e5c5374507a789be5cf35bf090d6", + "method": "ed25519", + "sig": "a36a0816ffe796a451d431a4f9b9f5df350650c69c36bf22397631c4418f6778f5da01e302d19e565638616cb0c32dcc3caf3cc0e92741a0a284a8b9cf62480a" + } + ], + "signed": { + "_type": "Targets", + "delegations": { + "keys": {}, + "roles": [] + }, + "expires": "2031-10-21T18:00:34Z", + "targets": {}, + "version": 3 + } +} \ No newline at end of file diff --git a/samples/director_targets_expired_v1.der b/samples/director_targets_expired_v1.der new file mode 100644 index 0000000..950695b Binary files /dev/null and b/samples/director_targets_expired_v1.der differ diff --git a/samples/director_targets_expired_v1.json b/samples/director_targets_expired_v1.json new file mode 100644 index 0000000..eb89162 --- /dev/null +++ b/samples/director_targets_expired_v1.json @@ -0,0 +1,19 @@ +{ + "signatures": [ + { + "keyid": "630cf584f392430b2119a4395e39624e86f5e5c5374507a789be5cf35bf090d6", + "method": "ed25519", + "sig": "fb26d9d8f9314175cc7d6cfdcc0628a57f24f9ef43b84a34496b06c7e53864f6581e02a8589fc307603e9de0275ba0edce8044a010666f7f72ca70c423b4e10d" + } + ], + "signed": { + "_type": "Targets", + "delegations": { + "keys": {}, + "roles": [] + }, + "expires": "2018-02-21T16:14:48Z", + "targets": {}, + "version": 1 + } +} \ No newline at end of file diff --git a/samples/director_targets_expired_v2.der b/samples/director_targets_expired_v2.der new file mode 100644 index 0000000..d70ea9b Binary files /dev/null and b/samples/director_targets_expired_v2.der differ diff --git a/samples/director_targets_expired_v2.json b/samples/director_targets_expired_v2.json new file mode 100644 index 0000000..7ff2ada --- /dev/null +++ b/samples/director_targets_expired_v2.json @@ -0,0 +1,19 @@ +{ + "signatures": [ + { + "keyid": "630cf584f392430b2119a4395e39624e86f5e5c5374507a789be5cf35bf090d6", + "method": "ed25519", + "sig": "2f43ed700ee2205de1ffea87b7008ecb1bac8777895f8565cfd23c3a8b32fd9ac882f5e94b772ff0e9ad10c49f6be14cba65b39c95eeae944fa5831d6de23a0e" + } + ], + "signed": { + "_type": "Targets", + "delegations": { + "keys": {}, + "roles": [] + }, + "expires": "2018-02-21T17:36:11Z", + "targets": {}, + "version": 2 + } +} \ No newline at end of file diff --git a/samples/director_targets_expired_v3.der b/samples/director_targets_expired_v3.der new file mode 100644 index 0000000..6f35e23 Binary files /dev/null and b/samples/director_targets_expired_v3.der differ diff --git a/samples/director_targets_expired_v3.json b/samples/director_targets_expired_v3.json new file mode 100644 index 0000000..d2ecc3f --- /dev/null +++ b/samples/director_targets_expired_v3.json @@ -0,0 +1,19 @@ +{ + "signatures": [ + { + "keyid": "630cf584f392430b2119a4395e39624e86f5e5c5374507a789be5cf35bf090d6", + "method": "ed25519", + "sig": "41b105b7af297ce24c58971af31df3cb80d28a5464eeb7424b3c9e57f4e955aa9f688eed0dd0399b82b888ea52cc4309d8380bd7f37aeae42778741686d45f03" + } + ], + "signed": { + "_type": "Targets", + "delegations": { + "keys": {}, + "roles": [] + }, + "expires": "2018-02-21T17:36:11Z", + "targets": {}, + "version": 3 + } +} \ No newline at end of file diff --git a/samples/director_targets_pv_bcu_v2.der b/samples/director_targets_pv_bcu_v2.der new file mode 100644 index 0000000..ec1aaa0 Binary files /dev/null and b/samples/director_targets_pv_bcu_v2.der differ diff --git a/samples/director_targets_pv_bcu_v2.json b/samples/director_targets_pv_bcu_v2.json new file mode 100644 index 0000000..17f2ed3 --- /dev/null +++ b/samples/director_targets_pv_bcu_v2.json @@ -0,0 +1,30 @@ +{ + "signatures": [ + { + "keyid": "630cf584f392430b2119a4395e39624e86f5e5c5374507a789be5cf35bf090d6", + "method": "ed25519", + "sig": "22823ee73a94b427860247b964a1fcfba15f23cdd294919e6e5687b293eccfc210be49a362299b5a8157b5601fa1a7dc41d3f82b3a026804b15ebfee3e420d0a" + } + ], + "signed": { + "_type": "Targets", + "delegations": { + "keys": {}, + "roles": [] + }, + "expires": "2031-10-22T19:07:35Z", + "targets": { + "/BCU1.0.txt": { + "custom": { + "ecu_serial": "pv_bcu" + }, + "hashes": { + "sha256": "fb0aa5699a4e7b68009fed6b094ecb00c3ad5670921be1b902b72a23cd4675b1", + "sha512": "0b0bb00bccf7bdad519d0a0af2794c945bd51ebdbc79f9616f0e3903b32f4ce2d5b250ab1bc2d34194bacf720b4f0aed361ef8d59ac72b1bc19e3a223a5e87cd" + }, + "length": 15 + } + }, + "version": 2 + } +} \ No newline at end of file diff --git a/tests/test_secondary.py b/tests/test_secondary.py index ab2abe1..b910fe9 100644 --- a/tests/test_secondary.py +++ b/tests/test_secondary.py @@ -37,6 +37,23 @@ # For temporary convenience: import demo # for generate_key, import_public_key, import_private_key +# TODO: Test data directories are somewhat more convoluted than necessary. +# The tests/test_data/ directory (TEST_DATA_DIR) contains: +# - director_metadata and image_repo_metadata directories, which each contain +# only root.json and root.der, sane files for use in testing. +# - flawed_manifests (with correct and various flawed vehicle and ECU +# manifests) +# - pinned.json, a sane pinning file for use in testing +# - temporary directories created during testing: +# - temp_test_secondary0, temp_test_partial_secondary0, etc. +# - temp_test_common, which seems to be unused and persists...? +# +# The samples/ directory (SAMPLE_DATA_DIR) contains snapshots of all repository +# metadata files from both repositories in a few states, with distant +# expiration dates (decades). It also contains a variety of samples of +# manifests and time attestations, along with flawed samples (expired, bad +# signatures, etc.) for both human consumption and testing purposes. +# TEST_DATA_DIR = os.path.join(uptane.WORKING_DIR, 'tests', 'test_data') TEST_DIRECTOR_METADATA_DIR = os.path.join(TEST_DATA_DIR, 'director_metadata') TEST_IMAGE_REPO_METADATA_DIR = os.path.join( @@ -46,21 +63,42 @@ TEST_IMAGE_REPO_ROOT_FNAME = os.path.join( TEST_IMAGE_REPO_METADATA_DIR, 'root.' + tuf.conf.METADATA_FORMAT) TEST_PINNING_FNAME = os.path.join(TEST_DATA_DIR, 'pinned.json') - -TEMP_CLIENT_DIRS = [ - os.path.join(TEST_DATA_DIR, 'temp_test_secondary0'), - os.path.join(TEST_DATA_DIR, 'temp_test_secondary1'), - os.path.join(TEST_DATA_DIR, 'temp_test_secondary2')] - -# I'll initialize these in the __init__ test, and use this for the simple -# non-damaging tests so as to avoid creating objects all over again. -secondary_instances = [None, None, None] - -# Changing these values would require producing new signed test data from the -# Timeserver (in the case of nonce) or a Secondary (in the case of the others). +SAMPLE_DATA_DIR = os.path.join(uptane.WORKING_DIR, 'samples') + +# For each Secondary instance we'll use in testing, a dictionary of the +# client directory, whether or not the instance is partial-verifying, the +# vehicle's ID, the Secondary's ID, and a reference to the instance. +# Also note the nonce we'll use when validating sample time attestation data. +# Changing the nonce or would require producing new signed test data +# from the Timeserver (in the case of nonce) or a Secondary (in the case of the +# others). nonce = 5 -vins = ['democar', 'democar', '000'] -ecu_serials = ['TCUdemocar', '00000', '00000'] +TEST_INSTANCES = [ + { + 'client_dir': os.path.join(TEST_DATA_DIR, 'temp_secondary0'), + 'partial_verifying': False, + 'vin': 'democar', + 'ecu_serial': 'TCUdemocar', + 'instance': None}, + { + 'client_dir': os.path.join(TEST_DATA_DIR, 'temp_secondary1'), + 'partial_verifying': False, + 'vin': 'democar', + 'ecu_serial': '00000', + 'instance': None}, + { + 'client_dir': os.path.join(TEST_DATA_DIR, 'temp_secondary2'), + 'partial_verifying': False, + 'vin': '000', + 'ecu_serial': '00000', + 'instance': None}, + { + 'client_dir': os.path.join(TEST_DATA_DIR, 'temp_partial_secondary0'), + 'partial_verifying': True, + 'vin': 'vehicle_w_pv_bcu', + 'ecu_serial': 'pv_bcu', + 'instance': None}] + # Set starting firmware fileinfo (that this ECU had coming from the factory) # It will serve as the initial firmware state for the Secondary clients. @@ -84,9 +122,9 @@ def destroy_temp_dir(): # Clean up anything that may currently exist in the temp test directories. - for client_dir in TEMP_CLIENT_DIRS: - if os.path.exists(client_dir): - shutil.rmtree(client_dir) + for instance_data in TEST_INSTANCES: + if os.path.exists(instance_data['client_dir']): + shutil.rmtree(instance_data['client_dir']) @@ -147,9 +185,9 @@ def setUpClass(cls): # We're going to cheat in this test module for the purpose of testing # and update tuf.conf.repository_directories before each Secondary is # created, to refer to the client we're creating. - for client_dir in TEMP_CLIENT_DIRS: + for instance_data in TEST_INSTANCES: uptane.common.create_directory_structure_for_client( - client_dir, + instance_data['client_dir'], TEST_PINNING_FNAME, {'imagerepo': TEST_IMAGE_REPO_ROOT_FNAME, 'director': TEST_DIRECTOR_ROOT_FNAME}) @@ -186,8 +224,8 @@ def test_01_init(self): secondary.Secondary( full_client_dir=42, director_repo_name=demo.DIRECTOR_REPO_NAME, - vin=vins[0], - ecu_serial=ecu_serials[0], + vin=TEST_INSTANCES[0]['vin'], + ecu_serial=TEST_INSTANCES[0]['ecu_serial'], ecu_key=TestSecondary.secondary_ecu_key, time=TestSecondary.initial_time, timeserver_public_key=TestSecondary.key_timeserver_pub, @@ -203,10 +241,10 @@ def test_01_init(self): # Invalid director_repo_name with self.assertRaises(tuf.FormatError): secondary.Secondary( - full_client_dir=TEMP_CLIENT_DIRS[0], + full_client_dir=TEST_INSTANCES[0]['client_dir'], director_repo_name=42, - vin=vins[0], - ecu_serial=ecu_serials[0], + vin=TEST_INSTANCES[0]['vin'], + ecu_serial=TEST_INSTANCES[0]['ecu_serial'], ecu_key=TestSecondary.secondary_ecu_key, time=TestSecondary.initial_time, timeserver_public_key=TestSecondary.key_timeserver_pub, @@ -217,10 +255,10 @@ def test_01_init(self): # Unknown director_repo_name with self.assertRaises(uptane.Error): secondary.Secondary( - full_client_dir=TEMP_CLIENT_DIRS[0], + full_client_dir=TEST_INSTANCES[0]['client_dir'], director_repo_name='string_that_is_not_a_known_repo_name', - vin=vins[0], - ecu_serial=ecu_serials[0], + vin=TEST_INSTANCES[0]['vin'], + ecu_serial=TEST_INSTANCES[0]['ecu_serial'], ecu_key=TestSecondary.secondary_ecu_key, time=TestSecondary.initial_time, timeserver_public_key=TestSecondary.key_timeserver_pub, @@ -231,10 +269,10 @@ def test_01_init(self): # Invalid VIN: with self.assertRaises(tuf.FormatError): secondary.Secondary( - full_client_dir=TEMP_CLIENT_DIRS[0], + full_client_dir=TEST_INSTANCES[0]['client_dir'], director_repo_name=demo.DIRECTOR_REPO_NAME, vin=5, - ecu_serial=ecu_serials[0], + ecu_serial=TEST_INSTANCES[0]['ecu_serial'], ecu_key=TestSecondary.secondary_ecu_key, time=TestSecondary.initial_time, timeserver_public_key=TestSecondary.key_timeserver_pub, @@ -245,9 +283,9 @@ def test_01_init(self): # Invalid ECU Serial with self.assertRaises(tuf.FormatError): secondary.Secondary( - full_client_dir=TEMP_CLIENT_DIRS[0], + full_client_dir=TEST_INSTANCES[0]['client_dir'], director_repo_name=demo.DIRECTOR_REPO_NAME, - vin=vins[0], + vin=TEST_INSTANCES[0]['vin'], ecu_serial=500, ecu_key=TestSecondary.secondary_ecu_key, time=TestSecondary.initial_time, @@ -258,10 +296,10 @@ def test_01_init(self): # Invalid ECU Key secondary.Secondary( - full_client_dir=TEMP_CLIENT_DIRS[0], + full_client_dir=TEST_INSTANCES[0]['client_dir'], director_repo_name=demo.DIRECTOR_REPO_NAME, - vin=vins[0], - ecu_serial=ecu_serials[0], + vin=TEST_INSTANCES[0]['vin'], + ecu_serial=TEST_INSTANCES[0]['ecu_serial'], ecu_key={''}, time=TestSecondary.initial_time, timeserver_public_key=TestSecondary.key_timeserver_pub, @@ -272,10 +310,10 @@ def test_01_init(self): # Invalid initial time: with self.assertRaises(tuf.FormatError): secondary.Secondary( - full_client_dir=TEMP_CLIENT_DIRS[0], + full_client_dir=TEST_INSTANCES[0]['client_dir'], director_repo_name=demo.DIRECTOR_REPO_NAME, - vin=vins[0], - ecu_serial=ecu_serials[0], + vin=TEST_INSTANCES[0]['vin'], + ecu_serial=TEST_INSTANCES[0]['ecu_serial'], ecu_key=TestSecondary.secondary_ecu_key, time='potato', timeserver_public_key=TestSecondary.key_timeserver_pub, @@ -286,10 +324,10 @@ def test_01_init(self): # Invalid director_public_key: with self.assertRaises(tuf.FormatError): secondary.Secondary( - full_client_dir=TEMP_CLIENT_DIRS[0], + full_client_dir=TEST_INSTANCES[0]['ecu_serial'], director_repo_name=demo.DIRECTOR_REPO_NAME, - vin=vins[0], - ecu_serial=ecu_serials[0], + vin=TEST_INSTANCES[0]['vin'], + ecu_serial=TEST_INSTANCES[0]['ecu_serial'], ecu_key=TestSecondary.secondary_ecu_key, time=TestSecondary.initial_time, timeserver_public_key=TestSecondary.key_timeserver_pub, @@ -304,10 +342,10 @@ def test_01_init(self): # for full verification are determined based on the root metadata file. with self.assertRaises(uptane.Error): secondary.Secondary( - full_client_dir=TEMP_CLIENT_DIRS[0], + full_client_dir=TEST_INSTANCES[0]['client_dir'], director_repo_name=demo.DIRECTOR_REPO_NAME, - vin=vins[0], - ecu_serial=ecu_serials[0], + vin=TEST_INSTANCES[0]['vin'], + ecu_serial=TEST_INSTANCES[0]['ecu_serial'], ecu_key=TestSecondary.secondary_ecu_key, time=TestSecondary.initial_time, timeserver_public_key=TestSecondary.key_timeserver_pub, @@ -316,10 +354,10 @@ def test_01_init(self): partial_verifying=False) with self.assertRaises(uptane.Error): secondary.Secondary( - full_client_dir=TEMP_CLIENT_DIRS[0], + full_client_dir=TEST_INSTANCES[0]['client_dir'], director_repo_name=demo.DIRECTOR_REPO_NAME, - vin=vins[0], - ecu_serial=ecu_serials[0], + vin=TEST_INSTANCES[0]['vin'], + ecu_serial=TEST_INSTANCES[0]['ecu_serial'], ecu_key=TestSecondary.secondary_ecu_key, time=TestSecondary.initial_time, timeserver_public_key=TestSecondary.key_timeserver_pub, @@ -331,10 +369,10 @@ def test_01_init(self): # Invalid timeserver key with self.assertRaises(tuf.FormatError): secondary.Secondary( - full_client_dir=TEMP_CLIENT_DIRS[0], + full_client_dir=TEST_INSTANCES[0]['client_dir'], director_repo_name=demo.DIRECTOR_REPO_NAME, - vin=vins[0], - ecu_serial=ecu_serials[0], + vin=TEST_INSTANCES[0]['vin'], + ecu_serial=TEST_INSTANCES[0]['ecu_serial'], ecu_key=TestSecondary.secondary_ecu_key, time=TestSecondary.initial_time, timeserver_public_key=TestSecondary.initial_time, # INVALID @@ -356,16 +394,23 @@ def test_01_init(self): # Initialize three clients and perform checks on each of them. - for i in range(0, len(TEMP_CLIENT_DIRS)): - client_dir = TEMP_CLIENT_DIRS[i] - ecu_serial = ecu_serials[i] - vin = vins[i] + for instance_data in TEST_INSTANCES: + client_dir = instance_data['client_dir'] + ecu_serial = instance_data['ecu_serial'] + vin = instance_data['vin'] + + # Partial verification Secondaries need to be initialized with the + # Director's public key. + if instance_data['partial_verifying']: + director_public_key_for_ecu = self.key_directortargets_pub + else: + director_public_key_for_ecu = None # Try initializing each of three secondaries, expecting these calls to - # work. Save the instances for future tests as elements in a module list - # variable(secondary_instances) to save time and code. + # work. Save the instances for future tests as elements in a module + # variable (TEST_INSTANCES) to save time and code. tuf.conf.repository_directory = client_dir - secondary_instances[i] = secondary.Secondary( + instance_data['instance'] = secondary.Secondary( full_client_dir=client_dir, director_repo_name=demo.DIRECTOR_REPO_NAME, vin=vin, @@ -374,10 +419,9 @@ def test_01_init(self): time=TestSecondary.initial_time, timeserver_public_key=TestSecondary.key_timeserver_pub, firmware_fileinfo=factory_firmware_fileinfo, - director_public_key=None, - partial_verifying=False) - - instance = secondary_instances[i] + director_public_key=director_public_key_for_ecu, + partial_verifying=instance_data['partial_verifying']) + instance = instance_data['instance'] # Check the fields initialized in the instance to make sure they're correct. @@ -393,11 +437,10 @@ def test_01_init(self): TestSecondary.initial_time, instance.all_valid_timeserver_times[1]) self.assertEqual( TestSecondary.key_timeserver_pub, instance.timeserver_public_key) - self.assertTrue(None is instance.director_public_key) - self.assertFalse(instance.partial_verifying) + # Fields initialized, but not directly with parameters - self.assertTrue(None is instance.last_nonce_sent) + self.assertIsNone(instance.last_nonce_sent) self.assertTrue(instance.nonce_next) # Random value self.assertIsInstance( instance.updater, tuf.client.updater.Updater) @@ -442,7 +485,7 @@ def test_10_nonce_rotation(self): """ # We'll just test one of the three client instances, since it shouldn't # make a difference. - instance = secondary_instances[0] + instance = TEST_INSTANCES[0]['instance'] old_nonce = instance.nonce_next @@ -465,7 +508,7 @@ def test_20_validate_time_attestation(self): # We'll just test one of the three client instances, since it shouldn't # make a difference. - instance = secondary_instances[0] + instance = TEST_INSTANCES[0]['instance'] # Try a valid time attestation first, signed by an expected timeserver key, # with an expected nonce (previously "received" from a Secondary) @@ -533,6 +576,14 @@ def test_20_validate_time_attestation(self): instance.validate_time_attestation(time_attestation__wrongnonce) + # Conduct one test with a different secondary instance: + # Expect that if a time attestation is submitted to be validated by a + # Secondary that hasn't ever sent a nonce, the validation function will + # reject the time attestation. (Because it doesn't matter, we'll use the + # same sensible time attestation previously generated in this test func.) + with self.assertRaises(uptane.BadTimeAttestation): + TEST_INSTANCES[1]['instance'].validate_time_attestation(time_attestation) + # TODO: Consider other tests here. @@ -546,7 +597,7 @@ def test_25_generate_signed_ecu_manifest(self): # We'll just test one of the three client instances, since it shouldn't # make a difference. - ecu_manifest = secondary_instances[0].generate_signed_ecu_manifest() + ecu_manifest = TEST_INSTANCES[0]['instance'].generate_signed_ecu_manifest() # If the ECU Manifest is in DER format, check its format and then # convert back to JSON so that we can inspect it further. @@ -584,9 +635,9 @@ def test_40_process_metadata(self): Tests uptane.clients.secondary.Secondary::process_metadata() Tests three clients: - - secondary_instances[0]: an update is provided in Director metadata - - secondary_instances[1]: no update is provided in Director metadata - - secondary_instances[2]: no Director metadata can be retrieved + - TEST_INSTANCES[0]: an update is provided in Director metadata + - TEST_INSTANCES[1]: no update is provided in Director metadata + - TEST_INSTANCES[2]: no Director metadata can be retrieved """ # --- Test this test module's setup (defensive) @@ -606,19 +657,19 @@ def test_40_process_metadata(self): # client directories when the directories were created by the # create_directory_structure_for_client() calls in setUpClass above, and # only the root metadata file. - for client_dir in TEMP_CLIENT_DIRS: + for instance_data in TEST_INSTANCES: for repo in ['director', 'imagerepo']: self.assertEqual( ['root.' + tuf.conf.METADATA_FORMAT], sorted(os.listdir(os.path.join( - client_dir, 'metadata', repo, 'current')))) + instance_data['client_dir'], 'metadata', repo, 'current')))) # --- Set up this test # Location of the sample Primary-produced metadata archive - sample_archive_fname = os.path.join( - uptane.WORKING_DIR, 'samples', 'metadata_samples_long_expiry', - 'update_to_one_ecu', 'full_metadata_archive.zip') + sample_archive_fname = os.path.join(SAMPLE_DATA_DIR, + 'metadata_samples_long_expiry', 'update_to_one_ecu', + 'full_metadata_archive.zip') assert os.path.exists(sample_archive_fname), 'Cannot test ' \ 'process_metadata; unable to find expected sample metadata archive' + \ @@ -626,9 +677,14 @@ def test_40_process_metadata(self): # Continue set-up followed by the test, per client. - for i in range(0, len(TEMP_CLIENT_DIRS)): - client_dir = TEMP_CLIENT_DIRS[i] - instance = secondary_instances[i] + # Only tests the full verification secondaries + for instance_data in TEST_INSTANCES: + + if instance_data['partial_verifying']: + continue + + client_dir = instance_data['client_dir'] + instance = instance_data['instance'] # Make sure TUF uses the right client directory. # Hack to allow multiple clients to run in the same Python process. @@ -646,7 +702,7 @@ def test_40_process_metadata(self): # Process this sample metadata. - if instance is secondary_instances[2]: + if instance_data is TEST_INSTANCES[2]: # Expect the update to fail for the third Secondary client. with self.assertRaises(tuf.NoWorkingMirrorError): instance.process_metadata(archive_fname) @@ -668,15 +724,15 @@ def test_40_process_metadata(self): # For clients 0 and 1, we expect root, snapshot, targets, and timestamp for # both director and image repo. - for client_dir in [TEMP_CLIENT_DIRS[0], TEMP_CLIENT_DIRS[1]]: + for instance_data in TEST_INSTANCES[0:2]: for repo in ['director', 'imagerepo']: self.assertEqual([ 'root.' + tuf.conf.METADATA_FORMAT, 'snapshot.' + tuf.conf.METADATA_FORMAT, 'targets.' + tuf.conf.METADATA_FORMAT, 'timestamp.' + tuf.conf.METADATA_FORMAT], - sorted(os.listdir(os.path.join(client_dir, 'metadata', repo, - 'current')))) + sorted(os.listdir(os.path.join(instance_data['client_dir'], + 'metadata', repo, 'current')))) # For client 2, we are certain that Director metadata will have failed to # update. Image Repository metadata may or may not have updated before the @@ -685,8 +741,8 @@ def test_40_process_metadata(self): # we expect to find. self.assertEqual( ['root.' + tuf.conf.METADATA_FORMAT], - sorted(os.listdir(os.path.join(TEMP_CLIENT_DIRS[2], 'metadata', - 'director', 'current')))) + sorted(os.listdir(os.path.join(TEST_INSTANCES[2]['client_dir'], + 'metadata', 'director', 'current')))) # Second: Check targets each Secondary client has been instructed to @@ -694,42 +750,189 @@ def test_40_process_metadata(self): # Client 0 should have validated expected_updated_fileinfo. self.assertEqual( expected_updated_fileinfo, - secondary_instances[0].validated_targets_for_this_ecu[0]) + TEST_INSTANCES[0]['instance'].validated_targets_for_this_ecu[0]) # Clients 1 and 2 should have no validated targets. - self.assertFalse(secondary_instances[1].validated_targets_for_this_ecu) - self.assertFalse(secondary_instances[2].validated_targets_for_this_ecu) + self.assertFalse(TEST_INSTANCES[1]['instance'].validated_targets_for_this_ecu) + self.assertFalse(TEST_INSTANCES[2]['instance'].validated_targets_for_this_ecu) # Finally, test behavior if the file we indicate does not exist. - instance = secondary_instances[0] + instance = TEST_INSTANCES[0]['instance'] + with self.assertRaises(uptane.Error): + instance.process_metadata('some_file_that_does_not_actually_exist.xyz') + + + + + + def test_45_process_partial_metadata(self): + """ + Tests uptane.clients.secondary.Secondary.process_partial_metadata() + + Tests PV Secondary client in 2 situations: + - Director's targets metadata available with valid signatures + - Director's targets metadata available with invalid signatures + """ + # --- Test this test module's setup (defensive) + # First, check the source directories, from which the temp dir is copied. + # This first part is testing this test module, since this setup was done + # above in setUpClass(), to maintain test integrity over time. + # We should see only root.(json or der). + for data_directory in [ + TEST_DIRECTOR_METADATA_DIR, TEST_IMAGE_REPO_METADATA_DIR]: + + self.assertEqual( + ['root.der', 'root.json'], + sorted(os.listdir(data_directory))) + + working_metadata_path = os.path.join(SAMPLE_DATA_DIR, + 'director_targets_pv_bcu_v2.' + tuf.conf.METADATA_FORMAT) + + bad_sig_metadata_path = os.path.join(SAMPLE_DATA_DIR, + 'director_targets_bad_sig_v2.' + tuf.conf.METADATA_FORMAT) + + expired_metadata_path = os.path.join(SAMPLE_DATA_DIR, + 'director_targets_expired_v3.' + tuf.conf.METADATA_FORMAT) + + replayed_metadata_path = os.path.join(SAMPLE_DATA_DIR, + 'director_targets_empty_v1.' + tuf.conf.METADATA_FORMAT) + + # The fourth test instance is currently our only partial verification + # test instance. If we end up with more, run a loop over the pv instances + # instead, like so: + # for instance_data in TEST_INSTANCES: + # if not instance_data['partial_verification']: + # continue + client_dir = TEST_INSTANCES[3]['client_dir'] + instance = TEST_INSTANCES[3]['instance'] + + # director_targets_metadata_path is where the partial verification Secondary + # client stores the Director Targets metadata it gets from the Primary, + # which it then will validate. + director_targets_metadata_path = os.path.join( + client_dir, 'metadata', 'director_targets.' + tuf.conf.METADATA_FORMAT) + + # First, test behavior if the file we indicate does not exist. with self.assertRaises(uptane.Error): instance.process_metadata('some_file_that_does_not_actually_exist.xyz') + # PV Secondary 1 with valid director public key. Update successfully. + # The metadata happens to have version == 2 (relevant in the next tests). + shutil.copy(working_metadata_path, director_targets_metadata_path) # <~> Is this right? + instance.process_metadata(director_targets_metadata_path) + + # If the Secondary expects a signature from a key of a different type than + # the one that signed the metadata, expect failure (whether or not it + # has the same key ID). + assert instance.director_public_key['keytype'] == 'ed25519', 'This test ' \ + 'is no longer correct: it assumes that the key type of the Director ' \ + 'Targets key will be ed25519, but it is actually ' + \ + instance.director_public_key['keytype'] + '; please fix the test.' + instance.director_public_key['keytype'] = 'rsa' + with self.assertRaises(tuf.BadSignatureError): + instance.process_metadata(director_targets_metadata_path) + instance.director_public_key['keytype'] = 'ed25519' # back to real key type + + # If the Secondary expects a signature from a different key than the one + # that signed the metadata, expect failure. + temp = instance.director_public_key + instance.director_public_key = self.key_timeserver_pub + with self.assertRaises(tuf.BadSignatureError): + instance.process_metadata(director_targets_metadata_path) + instance.director_public_key = temp # put the key back after the test + + # TODO: Make sure that it doesn't interfere with validation if there are + # other, unnecessary signatures on the metadata before the signature that + # the partial verification Secondary is expecting. + + + # PV Secondary 1 with valid director public key but update with + # invalid signature. version == 2 + shutil.copy(bad_sig_metadata_path, director_targets_metadata_path) + with self.assertRaises(tuf.BadSignatureError): + instance.process_metadata(director_targets_metadata_path) + + # Test with expired metadata (but version == 3, so not an apparent replay). + shutil.copy(expired_metadata_path, director_targets_metadata_path) + with self.assertRaises(tuf.ExpiredMetadataError): + instance.process_metadata(director_targets_metadata_path) + + # Test with metadata with a version == 1. Note that the client has already + # accepted Director Targets metadata with version == 2, so this should be + # rejected, since it's either a replay attack, strangely old metadata, or + # something more malicious). + shutil.copy(replayed_metadata_path, director_targets_metadata_path) + with self.assertRaises(tuf.ReplayedMetadataError): + instance.process_metadata(director_targets_metadata_path) + + # If the Secondary lacks a Director public key for some reason (even + # though the constructor checks for one if this is a partial-verification + # Secondary), it should raise this error: + with self.assertRaises(uptane.Error): + temp = instance.director_public_key + instance.director_public_key = None + instance.process_metadata(director_targets_metadata_path) + + instance.director_public_key = temp # put the key back after the test + + def test_50_validate_image(self): - image_fname = 'TCU1.1.txt' + # In these tests, the full verification Secondaries were or were not given + # instructions to install TCU1.1.txt, and the partial verification + # Secondary was given an instruction to install BCU1.0.txt. + fv_image_fname = 'TCU1.1.txt' + pv_image_fname = 'BCU1.0.txt' sample_image_location = os.path.join(demo.DEMO_DIR, 'images') - client_unverified_targets_dir = TEMP_CLIENT_DIRS[0] + '/unverified_targets' + fv_client_unverified_targets_dir = TEST_INSTANCES[0]['client_dir'] + \ + '/unverified_targets' + pv_client_unverified_targets_dir = TEST_INSTANCES[3]['client_dir'] + \ + '/unverified_targets' + + + # Copy the firmware into the Secondary's unverified targets directory. + # (This is what the Secondary would do when receiving the file from + # the Primary.) + # Delete and recreate the unverified targets directory first. + for instance_data in TEST_INSTANCES: + client_unverified_targets_dir = os.path.join( + instance_data['client_dir'], 'unverified_targets') + + if os.path.exists(client_unverified_targets_dir): + shutil.rmtree(client_unverified_targets_dir) + os.mkdir(client_unverified_targets_dir) + + if instance_data['partial_verifying']: + image_fname = pv_image_fname + else: + image_fname = fv_image_fname - if os.path.exists(client_unverified_targets_dir): - shutil.rmtree(client_unverified_targets_dir) - os.mkdir(client_unverified_targets_dir) + shutil.copy( + os.path.join(sample_image_location, image_fname), + client_unverified_targets_dir) - shutil.copy( - os.path.join(sample_image_location, image_fname), - client_unverified_targets_dir) - secondary_instances[0].validate_image(image_fname) + # For each Secondary, try validating the appropriate firmware image. + # Secondaries 0-2 are running full verification. + TEST_INSTANCES[0]['instance'].validate_image(fv_image_fname) with self.assertRaises(uptane.Error): - secondary_instances[1].validate_image(image_fname) + TEST_INSTANCES[1]['instance'].validate_image(fv_image_fname) with self.assertRaises(uptane.Error): - secondary_instances[2].validate_image(image_fname) + TEST_INSTANCES[2]['instance'].validate_image(fv_image_fname) + + # Secondary 3 is running partial verification and was given metadata + # indicating the following firmware: + shutil.copy( + os.path.join(sample_image_location, pv_image_fname), + client_unverified_targets_dir) + TEST_INSTANCES[3]['instance'].validate_image(pv_image_fname) + diff --git a/uptane/clients/secondary.py b/uptane/clients/secondary.py index 393436c..aee8dd6 100644 --- a/uptane/clients/secondary.py +++ b/uptane/clients/secondary.py @@ -3,20 +3,31 @@ secondary.py - Provides core functionality for Uptane Secondary ECU clients: - - Given an archive of metadata and an image file, performs full verification - of both, employing TUF (The Update Framework), determining if this + Provides core functionality for Uptane Secondary ECU clients. A detailed + explanation of the role of the Secondary in Uptane is available in the + "Design Overview" and "Implementation Specification" documents, links to + which are maintained at uptane.github.io + + Note that while this implementation uses files and archives, neither archives + nor a filesystem are key to the system and the same algorithms can be + employed regardless. + + Briefly, the Secondary code here does the following: + + - Given an archive of metadata (or in the case of a partial verifier, the + Director's Targets role file), performs full or partial verification of + both, employing TUF (The Update Framework), determining if this Secondary ECU has been instructed to install the image by the Director and - if the image is also valid per the Image Repository. + (for full verification) if the image is also valid per the Image Repository. + Core metadata validation functionality is provided by the function + process_metadata(), whose docstring describes the metadata validation. + - Generates ECU Manifests describing the state of the Secondary for Director perusal + - Generates nonces for time requests from the Timeserver, and validates signed times provided by the Timeserver, maintaining trustworthy times. Rotates nonces after they have appeared in Timeserver responses. - - A detailed explanation of the role of the Secondary in Uptane is available in - the "Design Overview" and "Implementation Specification" documents, links to - which are maintained at uptane.github.io """ from __future__ import print_function from __future__ import unicode_literals @@ -29,6 +40,7 @@ import random # for nonces import zipfile # to expand the metadata archive retrieved from the Primary import hashlib +import iso8601 # to manipulate TUF's datetime format for expirations import tuf.formats import tuf.keys @@ -37,7 +49,8 @@ import uptane.formats import uptane.common -import uptane.encoding.asn1_codec as asn1_codec +import uptane.encoding.asn1_codec as uptane_asn1_codec +import tuf.asn1_codec as tuf_asn1_codec from uptane.encoding.asn1_codec import DATATYPE_TIME_ATTESTATION from uptane.encoding.asn1_codec import DATATYPE_ECU_MANIFEST @@ -260,6 +273,11 @@ def __init__( 'key was not provided. Partial verification Secondaries validate ' 'only the ') + # Partial verification clients still have to track the last valid metadata + # version number. (For full verification clients, the TUF updater code + # handles this.) + if self.partial_verifying: + self.last_valid_targets_metadata_version_number = 0 # Create a TAP-4-compliant updater object. This will read pinned.json # and create single-repository updaters within it to handle connections to @@ -362,7 +380,7 @@ def generate_signed_ecu_manifest(self, description_of_attacks_observed=''): signable_ecu_manifest) if tuf.conf.METADATA_FORMAT == 'der': - der_signed_ecu_manifest = asn1_codec.convert_signed_metadata_to_der( + der_signed_ecu_manifest = uptane_asn1_codec.convert_signed_metadata_to_der( signable_ecu_manifest, DATATYPE_ECU_MANIFEST, resign=True, private_key=self.ecu_key) # TODO: Consider verification of output here. @@ -393,7 +411,7 @@ def validate_time_attestation(self, timeserver_attestation): # If we're using ASN.1/DER format, convert the attestation into something # comprehensible (JSON-compatible dictionary) instead. if tuf.conf.METADATA_FORMAT == 'der': - timeserver_attestation = asn1_codec.convert_signed_der_to_dersigned_json( + timeserver_attestation = uptane_asn1_codec.convert_signed_der_to_dersigned_json( timeserver_attestation, DATATYPE_TIME_ATTESTATION) # Check format. @@ -420,10 +438,12 @@ def validate_time_attestation(self, timeserver_attestation): if self.last_nonce_sent is None: # This ECU is fresh and hasn't actually ever sent a nonce to the Primary # yet. It would be impossible to validate a timeserver attestation. - log.warning(YELLOW + 'Cannot validate a timeserver attestation yet: ' + log.error('Cannot validate a timeserver attestation yet: ' 'this fresh Secondary ECU has never communicated a nonce and ECU ' - 'Version Manifest to the Primary.' + ENDCOLORS) - return + 'Version Manifest to the Primary.') + raise uptane.BadTimeAttestation('This Secondary has been have been ' + 'provided a time attestation, but there is no record of this ' + 'Secondary having ever previously sent any nonce.') elif self.last_nonce_sent not in timeserver_attestation['signed']['nonces']: # TODO: Create a new class for this Exception in this file. @@ -511,8 +531,8 @@ def fully_validate_metadata(self): ENDCOLORS) continue - - self.validated_targets_for_this_ecu = validated_targets_for_this_ecu + if validated_targets_for_this_ecu: + self.validated_targets_for_this_ecu = validated_targets_for_this_ecu @@ -547,20 +567,228 @@ def get_validated_target_info(self, target_filepath): - def process_metadata(self, metadata_archive_fname): + def process_metadata(self, metadata_fname): """ - Expand the metadata archive using _expand_metadata_archive() - Validate metadata files using fully_validate_metadata() - Select the Director targets.json file - Pick out the target file(s) with our ECU serial listed - Fully validate the metadata for the target file(s) + Runs either partial or full metadata verification, based on the + value of self.partial_verifying. + + Note that in both cases, the use of files and archives is not key. Keep an + eye on the procedure without regard to them. The central idea is to take + the metadata pointed at by the argument here as untrusted and verify it + using the full verification or partial verification algorithms from the + Uptane Implementation Specification. It's generally expected that this + metadata comes to the Secondary from the Primary, originally from the + Director and Image repositories, but the way it gets here does not matter + as long as it checks out as trustworthy. + + Full: + The given filename, metadata_fname, should point to an archive of all + metadata necessary to perform full verification, such as is produced by + primary.save_distributable_metadata_files(). + + process_metadata expands this archive to a local directory where + repository files are expected to be found (the 'unverified' directory in + directory self.full_client_dir). + + Then, these expanded metadata files are treated as repository metadata by + the call to fully_validate_metadata(). The Director targets.json file is + selected. The target file(s) with this Secondary's ECU serial listed is + fully validated, using whatever provided metadata is necessary, by the + underlying TUF code. + + Partial: + + The given filename, metadata_fname, should point to a single metadata + role file, the Director's Targets role. The signature on the Targets role + file is validated against the Director's public key + (self.director_public_key). If the signature is valid, the new Targets + role file is trusted, else it is discarded and we TUF raises a + signature exception. + + (Additional protections come from the Primary + having vetted the file for us using full verification, as long as the + Primary is trustworthy.) + + From the trusted Targets role file, the target with this Secondary's + ECU identifier/serial listed is chosen, and the metadata describing that + target (hash, length, etc.) is extracted from the metadata file and + taken as the trustworthy description of the targets file to be installed + on this Secondary. + """ - tuf.formats.RELPATH_SCHEMA.check_match(metadata_archive_fname) + tuf.formats.RELPATH_SCHEMA.check_match(metadata_fname) - self._expand_metadata_archive(metadata_archive_fname) + if self.partial_verifying: + self.process_partial_metadata(metadata_fname) - # This entails using the local metadata files as a repository. - self.fully_validate_metadata() + else: + self._expand_metadata_archive(metadata_fname) + self.fully_validate_metadata() + + + + + + def process_partial_metadata(self, director_targets_metadata_fname): + """ + + Given the filename of a file containing the Director's Targets role + metadata, validates and processes that metadata, determining what firmware + the Director has instructed this partial-verification Secondary ECU to + install. + + The given metadata replaces this client's current Director metadata if + the given metadata is valid -- i.e. if the metadata: + - is signed by a key matching self.director_public_key + - and is not expired (current date is before metadata's expiration date) + - and does not have an older version number than this client has + previously seen -- i.e. is not a rollback) + + Otherwise, an exception is raised indicating that the metadata is not + valid. + + Further, if the metadata is valid, this function then updates + self.validated_target_for_this_ecu if the metadata also lists a target + for this ECU (i.e. includes a target with field "ecu_serial" set to this + ECU's serial number) + + + + director_targets_metadata_fname + Filename of the Director's Targets role metadata, in either JSON or + ASN.1/DER format. + + + None + + + uptane.Error + if director_targets_metadata_fname does not specify a file that exists + or if tuf.conf.METADATA_FORMAT is somehow an unsupported format (i.e. + not 'json' or 'der') + + tuf.BadSignatureError + if the signature over the Targets metadata is not a valid + signature by the key corresponding to self.director_public_key, or if + the key type listed in the signature does not match the key type listed + in the public key + + tuf.ExpiredMetadataError + if the Targets metadata is expired + + tuf.ReplayedMetadataError + if the Targets metadata has a lower version number than + the last Targets metadata this client deemed valid (rollback) + + + May update this client's metadata (Director Targets); see + May update self.validated_targets_for_this_ecu; see + + """ + tuf.formats.RELPATH_SCHEMA.check_match(director_targets_metadata_fname) + # Checks if the secondary holds the director's public key + if self.director_public_key is None: + raise uptane.Error("Director public key not found for partial" + " verification of secondary.") + validated_targets_for_this_ecu = [] + target_metadata = {} + if not os.path.exists(director_targets_metadata_fname): + raise uptane.Error('Indicated Director Targets metadata file not found. ' + 'Filename: ' + repr(director_targets_metadata_fname)) + + metadata_file_object = tuf.util.load_file(director_targets_metadata_fname) + + data = metadata_file_object['signed'] + + + # Check to see if the metadata is expired by comparing it against the + # last validated time provided by the timeserver. (This may be old, and + # we take it as a minimum time.) + minimum_time = tuf.formats.datetime_to_unix_timestamp(iso8601.parse_date( + self.all_valid_timeserver_times[-1])) + + expiration_time = tuf.formats.datetime_to_unix_timestamp(iso8601.parse_date( + data['expires'])) + + if expiration_time < minimum_time: + raise tuf.ExpiredMetadataError('Expired metadata provided to partial ' + 'verification Secondary; last valid timeserver time: ' + + self.all_valid_timeserver_times[-1] + '; expiration date on ' + 'metadata: ' + data['expires']) + + + # Check to see if the metadata has a lower version number than expected. + if data['version'] < self.last_valid_targets_metadata_version_number: + log.error('Provided metadata has lower version number than the metadata ' + 'this partial verification Secondary has previously validated.') + raise tuf.ReplayedMetadataError('targets', + self.last_valid_targets_metadata_version_number, data['version']) + + # Make sure the data is in the exact format that it is expected to have + # been signed over in order to validate the signature over it. + # - In the case of JSON, that means TUF's canonical JSON encoding. + # - In the case of ASN.1/DER, that means a hash taken over the ASN.1/DER + # data (so it must be converted back, as the data was converted into a + # dictionary for ease of use). + if tuf.conf.METADATA_FORMAT == 'json': + data_signed = tuf.formats.encode_canonical(data).encode('utf-8') + + elif tuf.conf.METADATA_FORMAT == 'der': + data_signed = tuf_asn1_codec.convert_signed_metadata_to_der( + {'signed': data, 'signatures': []}, only_signed=True) + data_signed = hashlib.sha256(data_signed).digest() + + else: # pragma: no cover + raise uptane.Error('Unsupported metadata format: ' + repr(metadata_format) + + '; the supported formats are: "der" and "json".') + + signatures = metadata_file_object['signatures'] + + # Look for a valid signature over the metadata from the expected key. The + # Director's Targets metadata may be signed by a variety of keys that the + # partial verification secondary client isn't concerned with. For example, + # there are edge cases in which after a compromise it may be necessary to + # sign with old and new keys, both, to clear both full verification and + # partial verification checks and reach a partial verification Secondary.) + found_valid_signature = False + for signature in signatures: + + # Don't waste time checking the signature if the keyid (fingerprint) + # or key type (ed25519, rsa, etc.) don't match. + if self.director_public_key['keyid'] != signature['keyid']: + continue + elif self.director_public_key['keytype'] != signature['method']: + continue + + elif tuf.keys.verify_signature( + self.director_public_key, signature, data_signed): + found_valid_signature = True + break + + if not found_valid_signature: + log.info( + 'Validation failed on an director targets: signature is not valid. ' + 'It must be correctly signed by the expected key for that ECU.') + raise tuf.BadSignatureError('Sender supplied an invalid signature. ' + 'Director targets metadata is unacceptable. If you see this ' + 'persistently, it is possible that the Primary is compromised or ' + 'that there is a man in the middle attack or misconfiguration.') + + # The metadata is valid, so we update the last validated version number + # and begin inspecting the targets listed in the metadata. + self.last_valid_targets_metadata_version_number = data['version'] + targets = metadata_file_object['signed']['targets'] + + # Combs through the director's targets metadata to find the one assigned to + # the current ECU. + for target in targets: + if targets[target]['custom']['ecu_serial'] == self.ecu_serial: + target_metadata['filepath'] = target + target_metadata['fileinfo'] = targets[target] + validated_targets_for_this_ecu.append(target_metadata) + + if validated_targets_for_this_ecu: + self.validated_targets_for_this_ecu = validated_targets_for_this_ecu @@ -586,7 +814,6 @@ def _expand_metadata_archive(self, metadata_archive_fname): 'Filename: ' + repr(metadata_archive_fname)) z = zipfile.ZipFile(metadata_archive_fname) - z.extractall(os.path.join(self.full_client_dir, 'unverified'))