diff --git a/dissect/target/exceptions.py b/dissect/target/exceptions.py index 354c8095d2..853c449bb0 100644 --- a/dissect/target/exceptions.py +++ b/dissect/target/exceptions.py @@ -133,3 +133,7 @@ class ConfigurationParsingError(Error): class TargetPathNotFoundError(TargetError): """The path to the target does not exist.""" + + +class FileDecryptionError(FilesystemError): + """An error occurred during file decryption.""" diff --git a/dissect/target/filesystems/extfs.py b/dissect/target/filesystems/extfs.py index c65774dba6..7049739c73 100644 --- a/dissect/target/filesystems/extfs.py +++ b/dissect/target/filesystems/extfs.py @@ -6,6 +6,7 @@ from dissect.extfs import extfs from dissect.target.exceptions import ( + FileDecryptionError, FileNotFoundError, FilesystemError, IsADirectoryError, @@ -18,6 +19,8 @@ if TYPE_CHECKING: from collections.abc import Iterator + from dissect.target.helpers.fscrypt import FSCrypt, FSCryptEntryDecryptor + class ExtFilesystem(Filesystem): __type__ = "ext" @@ -25,12 +28,20 @@ class ExtFilesystem(Filesystem): def __init__(self, fh: BinaryIO, *args, **kwargs): super().__init__(fh, *args, **kwargs) self.extfs = extfs.ExtFS(fh) + self.fscrypt: FSCrypt | None = None @staticmethod def _detect(fh: BinaryIO) -> bool: fh.seek(1024) return fh.read(512)[56:58] == b"\x53\xef" + def add_fscrypt(self, fscrypt: FSCrypt) -> None: + self.fscrypt = fscrypt + self.extfs._get_inode_decryptor = self.get_inode_decryptor + + def get_inode_decryptor(self, inode: extfs.INode) -> ExtEntryDecryptor: + return ExtEntryDecryptor(inode, self) + def get(self, path: str) -> FilesystemEntry: return ExtFilesystemEntry(self, path, self._get_node(path)) @@ -147,3 +158,32 @@ def attr(self) -> Any: def lattr(self) -> Any: return self.entry.xattr + + +class ExtEntryDecryptor: + def __init__(self, inode: extfs.INode, fs: ExtFilesystem): + self.inode = inode + self.fs = fs + self._decryptor: FSCryptEntryDecryptor | None = None + + @property + def decryptor(self) -> FSCryptEntryDecryptor: + if self._decryptor is None: + if not self.inode.is_encrypted: + raise ValueError("Entry is not encrypted") + if self.fs.fscrypt is None: + raise FileDecryptionError("No fscrypt context provided to this filesystem") + + encryption_context = next((attr.value for attr in self.inode.xattr if attr.name == "encryption.c"), None) + + if encryption_context is None: + raise FileDecryptionError("No encryption context found for this inode") + # Will raise FileDecryptionError if no key was found + self._decryptor = self.fs.fscrypt.get_decryptor(encryption_context) + return self._decryptor + + def open_decrypt(self) -> BinaryIO: + return self.decryptor.wrap_content_stream(self.inode.dataruns(), self.inode.size) + + def decrypt_filename(self, encrypted_filename: bytes) -> bytes: + return self.decryptor.decrypt_filename(encrypted_filename) diff --git a/dissect/target/helpers/fscrypt.py b/dissect/target/helpers/fscrypt.py new file mode 100644 index 0000000000..284097cab9 --- /dev/null +++ b/dissect/target/helpers/fscrypt.py @@ -0,0 +1,207 @@ +from __future__ import annotations + +import hashlib +import hmac +from typing import BinaryIO + +from Crypto.Cipher import AES +from dissect.cstruct import cstruct +from dissect.fve.crypto import create_cipher +from dissect.util.stream import AlignedStream + +from dissect.target.exceptions import FileDecryptionError + +fscrypt_def = """ +// https://github.com/torvalds/linux/blob/master/fs/crypto/fscrypt_private.h +#define HKDF_CONTEXT_KEY_IDENTIFIER 1 /* info= */ +#define HKDF_CONTEXT_PER_FILE_ENC_KEY 2 /* info=file_nonce */ +#define HKDF_CONTEXT_DIRECT_KEY 3 /* info=mode_num */ +#define HKDF_CONTEXT_IV_INO_LBLK_64_KEY 4 /* info=mode_num||fs_uuid */ +#define HKDF_CONTEXT_DIRHASH_KEY 5 /* info=file_nonce */ +#define HKDF_CONTEXT_IV_INO_LBLK_32_KEY 6 /* info=mode_num||fs_uuid */ +#define HKDF_CONTEXT_INODE_HASH_KEY 7 /* info= bytes: + """Unsalted HKDF-SHA512 using a key and an info. + + We also ask the caller to pass a block index. Normally the hashing function should do that for the caller, but + this allows the caller to pass a static info value that we don't have to concatonate another byte to every time + this function is called.""" + hdkf_extracted_key = hmac.new(b"", key, hashlib.sha512).digest() + hdkf_expand = hmac.new(hdkf_extracted_key, info_and_block_index, hashlib.sha512) + return hdkf_expand.digest() + + +def fscrypt_key_identifier(key: bytes) -> str: + """Derive a fscrypt-specific key identifier from a key.""" + return fscrypt_hdkf(key, FSCRYPT_KEY_IDENTIFIER_INFO)[: c_fscrypt.FSCRYPT_KEY_IDENTIFIER_SIZE].hex() + + +def aes_256_cts_cbc_decrypt(ciphertext: bytes, key: bytes) -> bytes: + """AES 256 with Ciphertext Stealing.""" + if len(ciphertext) <= 16: + return AES.new(key, AES.MODE_ECB).decrypt(ciphertext) + + cipher = AES.new(key, AES.MODE_CBC, ZERO_BYTE_IV) + + # Round the ciphertext up to the next multiple of 16 and deduct 16 + last_block_start = ((len(ciphertext) + 15) & ~15) - 16 + second_to_last_block_start = last_block_start - 16 + + # Put second-to-last block _behind_ the last block + cts_ciphertext = ciphertext[last_block_start:] + second_to_last_block = ciphertext[second_to_last_block_start:last_block_start] + cts_ciphertext += second_to_last_block + + decrypted_prefix = cipher.decrypt(ciphertext[:second_to_last_block_start]) if last_block_start > 16 else b"" + return decrypted_prefix + cipher.decrypt(cts_ciphertext) + + +def aes_256_xts_decrypt(ciphertext: bytes, key: bytes, sector: int) -> bytes: + """AES-XTS-256 decryption with a sector size of 4096.""" + return create_cipher("aes-xts-256", key, sector_size=4096, iv_sector_size=4096).decrypt(ciphertext, sector) + + +class FSCrypt: + """FSCrypt decryption. + + The class can be given master keys. + Then, for a given encyption context, if the associated master key is present, a combined key is derived and wrapped + in an instance of the FSCryptEntryDecryptor class, that can use said key for filename and file contents decryption. + + Resources: + - https://www.kernel.org/doc/html/v4.18/filesystems/fscrypt.html + + """ + + def __init__(self, fh: BinaryIO): + self.keys: dict[str, bytes] = {} + self.fh = fh + + def add_key(self, key: bytes) -> None: + self.keys[fscrypt_key_identifier(key)] = key + + def get_decryptor(self, encryption_context: bytes) -> FSCryptEntryDecryptor: + encryption_context = c_fscrypt.fscrypt_context_v2(encryption_context) + if encryption_context.version != c_fscrypt.FSCRYPT_POLICY_V2: + raise NotImplementedError("Only FSCRYPT_POLICY_V2 is supported") + master_key_identifier = encryption_context.master_key_identifier.hex() + if master_key_identifier not in self.keys: + raise FileDecryptionError("Requested key not found in keystore") + master_key = self.keys[master_key_identifier] + + file_enc_info = ( + FSCRYPT_IDENTIFIER_STR + + c_fscrypt.HKDF_CONTEXT_PER_FILE_ENC_KEY.to_bytes(1, "little") + + encryption_context.nonce + ) + file_enc_info += b"\01" # Block counter + derived_key = fscrypt_hdkf(master_key, file_enc_info) + return FSCryptEntryDecryptor(self, encryption_context, derived_key) + + +class FSCryptEntryDecryptor: + """Given an encryption context, and a derived key, can decrypt filenames and file contents.""" + + def __init__(self, fscrypt: FSCrypt, encryption_context: c_fscrypt.fscrypt_context_v2, key: bytes): + self.fscrypt = fscrypt + self.encryption_context = encryption_context + self.key = key + + def decrypt_filename(self, filename: bytes) -> bytes: + if filename in [b".", b".."]: + return filename + if self.encryption_context.filenames_encryption_mode == c_fscrypt.FSCRYPT_MODE_AES_256_CTS: + filename_key = self.key[0:32] + ret = aes_256_cts_cbc_decrypt(filename, filename_key) + return ret.rstrip(b"\0") + + raise NotImplementedError("Only FSCRYPT_MODE_AES_256_CTS is supported for filenames") + + def wrap_content_stream(self, runlist: tuple[int, int], size: int) -> XTSRunlistDecryptionStream: + if self.encryption_context.contents_encryption_mode != c_fscrypt.FSCRYPT_MODE_AES_256_XTS: + raise NotImplementedError("Only FSCRYPT_MODE_AES_256_XTS is supported for content streams") + return XTSRunlistDecryptionStream(self.fscrypt.fh, self.key, runlist, size) + + +class XTSRunlistDecryptionStream(AlignedStream): + """AES-XTS-256 decryption stream for a given runlist.""" + + def __init__( + self, fh: BinaryIO, key: bytes, runlist: list[tuple[int, int]], size: int | None = None, align: int = 4096 + ): + self.fh = fh + self.key = key + self.runlist = runlist + self._block_offsets: list[int] = None + super().__init__(size, align) + + def _get_block_offsets(self) -> list[int]: + if self._block_offsets is None: + self._block_offsets = [] + for start, length in self.runlist: + for i in range(length): + self._block_offsets.append(start + i) + return self._block_offsets + + def _decrypt_block(self, block_index: int, offset: int) -> bytes: + self.fh.seek(offset * self.align) + encrypted = self.fh.read(self.align) + return aes_256_xts_decrypt(encrypted, self.key, block_index) + + def _read(self, offset: int, length: int) -> bytes: + r = [] + first_idx = offset // self.align + last_idx = (first_idx + (length // self.align)) - 1 + for block_idx, block_offset in enumerate(self._get_block_offsets()): + if block_idx < first_idx or block_idx > last_idx: + continue + r.append(self._decrypt_block(block_idx, block_offset)) + return b"".join(r) diff --git a/dissect/target/loader.py b/dissect/target/loader.py index b37ee5a205..5216a38829 100644 --- a/dissect/target/loader.py +++ b/dissect/target/loader.py @@ -250,6 +250,7 @@ def open(path: str | Path, *, fallbacks: list[type[Loader]] | None = None, **kwa register("local", "LocalLoader") register("ad1", "AD1Loader") register("asdf", "AsdfLoader") +register("avd", "AVDLoader") register("vmx", "VmxLoader") register("vmwarevm", "VmwarevmLoader") register("hyperv", "HyperVLoader") diff --git a/dissect/target/loaders/avd.py b/dissect/target/loaders/avd.py new file mode 100644 index 0000000000..5ac3379287 --- /dev/null +++ b/dissect/target/loaders/avd.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +import logging +from pathlib import Path +from typing import TYPE_CHECKING + +from dissect.hypervisor.disk.qcow2 import QCow2 + +from dissect.target.containers.raw import RawContainer +from dissect.target.loader import Loader +from dissect.target.volume import Volume + +log = logging.getLogger(__name__) +if TYPE_CHECKING: + from dissect.target.target import Target + + +class AVDLoader(Loader): + """Load an Android Virtual Device.""" + + def __init__(self, path: Path, **kwargs): + super().__init__(path, **kwargs) + self.avd_folder = path + self.encryptionkey_path = self.avd_folder.joinpath("encryptionkey.img.qcow2") + self.encryptionkey_backing_path = self.avd_folder.joinpath("encryptionkey.img") + + self.userdata_path = self.avd_folder.joinpath("userdata-qemu.img.qcow2") + self.userdata_backing_path = self.avd_folder.joinpath("userdata-qemu.img") + + qemu_config_path = self.avd_folder.joinpath("hardware-qemu.ini") + self.system_partition_path = None + + if qemu_config_path.exists(): + qemu_config = qemu_config_path.read_text(encoding="utf-8").splitlines() + for line in qemu_config: + if line.startswith("disk.systemPartition.initPath"): + _, _, system_partition_path = line.partition("=") + self.system_partition_path = Path(system_partition_path.strip()) + if not self.system_partition_path.exists(): + self.system_partition_path = None + break + + @staticmethod + def detect(path: Path) -> bool: + return path.is_dir() and path.name.endswith(".avd") and path.joinpath("AVD.conf").exists() + + def map(self, target: Target) -> None: + if self.system_partition_path: + container = RawContainer(self.system_partition_path.open("rb")) + target.disks.add(container) + + metadata_partition_disk_fh = QCow2( + fh=self.encryptionkey_path.open("rb"), backing_file=self.encryptionkey_backing_path.open("rb") + ).open() + userdata_fh = QCow2(fh=self.userdata_path.open("rb"), backing_file=self.userdata_backing_path.open("rb")).open() + + target.disks.add(RawContainer(metadata_partition_disk_fh)) + target.volumes.add(Volume(userdata_fh, 1, 0, userdata_fh.size, None, "userdata")) diff --git a/dissect/target/plugins/child/avd.py b/dissect/target/plugins/child/avd.py new file mode 100644 index 0000000000..c905bc42b4 --- /dev/null +++ b/dissect/target/plugins/child/avd.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from dissect.target.exceptions import UnsupportedPluginError +from dissect.target.helpers.record import ChildTargetRecord +from dissect.target.plugin import ChildTargetPlugin + +if TYPE_CHECKING: + from collections.abc import Iterator + from pathlib import Path + + from dissect.target import Target + + +def find_devices(paths: list[Path]) -> Iterator[str, Path]: + for path in paths: + for config_path in path.glob("*.ini"): + with config_path.open("rt") as fh: + for line in fh: + if not (line := line.strip()): + continue + + key, _, value = line.partition("=") + if key == "path": + path = value.strip('"') + yield config_path.stem, path + + +class AVDChildTargetPlugin(ChildTargetPlugin): + """Child target plugin that yields Android Virtual Devices. + + Resources: + - https://developer.android.com/studio/run/emulator-commandline + """ + + __type__ = "avd" + + def __init__(self, target: Target): + super().__init__(target) + # TODO: Windows + self.paths = [ + path + for user in self.target.user_details.all_with_home() + if (path := user.home_path.joinpath(".android/avd")).exists() + ] + + def check_compatible(self) -> None: + if not self.paths: + raise UnsupportedPluginError("No AVD folders found") + + def list_children(self) -> Iterator[ChildTargetRecord]: + for device_name, device_path in find_devices(self.paths): + yield ChildTargetRecord( + type=self.__type__, + name=device_name, + path=device_path, + _target=self.target, + ) diff --git a/dissect/target/plugins/os/unix/linux/android/_os.py b/dissect/target/plugins/os/unix/linux/android/_os.py index 76bcae4061..45a9c5c688 100644 --- a/dissect/target/plugins/os/unix/linux/android/_os.py +++ b/dissect/target/plugins/os/unix/linux/android/_os.py @@ -2,11 +2,34 @@ from typing import TYPE_CHECKING +from dissect.target import filesystem +from dissect.target.exceptions import FilesystemError from dissect.target.helpers import configutil from dissect.target.helpers.record import EmptyRecord from dissect.target.plugin import OperatingSystem, export from dissect.target.plugins.os.unix.linux._os import LinuxPlugin +try: + from dissect.target.helpers.fscrypt import FSCrypt + from dissect.target.plugins.os.unix.linux.android.lock.xts import ( + XTSDecryptionStream, + ) + + HAS_CRYPTO = True +except ImportError: + HAS_CRYPTO = False + +METADATA_ENCRYPTION_KEY_PATH = "vold/metadata_encryption/key/" + +VOLUME_MAPPINGS = { + "system": "/", + "product": "/product", + "vendor": "/vendor", + "system_ext": "/system_ext", + "userdata": "/data", + "metadata": "/metadata", +} + if TYPE_CHECKING: from collections.abc import Iterator from pathlib import Path @@ -22,6 +45,11 @@ def __init__(self, target: Target): super().__init__(target) self.target = target + self.userdata_partition_unlocked = False + self.device_encrypted_storage_unlocked = False + self.credential_encrypted_storage_unlocked = False + self.unlock() + self.build_prop_paths = set(find_build_props(self.target.fs)) self.props = {} @@ -38,6 +66,126 @@ def __init__(self, target: Target): except Exception as e: # noqa: PERF203 self.target.log.warning("Unable to parse Android build.prop file %s: %s", build_prop, e) + def _unlock_userdata_partition(self) -> bool: + """Using the key material from the metadata partition, wrap the userdata partition's file handle in a decryption + stream.""" + if self.userdata_partition_unlocked: + return True + + userdata_volume = next((vol for vol in self.target.volumes if vol.name == "userdata"), None) + + # To be able to decrypt the device-encrypted or credential-encrypted storage the userdata partition must be + # decrypted first + if not self.target.fs.exists("/metadata"): + self.target.log.error("Metadata filesystem is required to unlock userdata partition") + return False + if not HAS_CRYPTO: + self.target.log.error("dissect.fve is required to decrypt the userdata partition") + return False + + metadata_path = self.target.fs.path("/metadata").joinpath(METADATA_ENCRYPTION_KEY_PATH) + metadata_decryption_key = self.target.keystore.retrieve_key(metadata_path) + userdata_volume.fh = XTSDecryptionStream(userdata_volume.fh, metadata_decryption_key, userdata_volume.size) + + try: + fs = filesystem.open(userdata_volume.fh) + except FilesystemError: + self.target.log.exception("Unable to open filesystem on decrypted userdata partition") + # Technically, the partition has been unlocked. We just can't the filesystem contained within it + return True + + userdata_volume.fs = fs + self.target.filesystems.add(fs) + self.target.fs.mount("/data", fs) + self.userdata_partition_unlocked = True + return True + + def _unlock_device_encrypted_storage(self) -> bool: + """Requires a decryption of the userdata partition. Add FSCrypt to the userdata filesystem and add key material + from paths owned by system.""" + if self.device_encrypted_storage_unlocked: + return True + + userdata_volume = next((vol for vol in self.target.volumes if vol.name == "userdata"), None) + + encrypted_fh = userdata_volume.fh.fh + fscrypt = FSCrypt(encrypted_fh) + userdata_fs = userdata_volume.fs + method = getattr(userdata_fs, "add_fscrypt", None) + if method is None or not callable(method): + # This could happen if Dissect were to support F2FS without implementing fscrypt support for it + raise ValueError("Volume with userdata must have a filesystem implementation that supports fscrypt") + userdata_fs.add_fscrypt(fscrypt) + + # The metadata key is used for file-based encryption to protect device-encrypted storage + metadata_path = self.target.fs.path("/metadata").joinpath(METADATA_ENCRYPTION_KEY_PATH) + metadata_decryption_key = self.target.keystore.retrieve_key(metadata_path) + userdata_fs.fscrypt.add_key(metadata_decryption_key) + + try: + # https://cs.android.com/android/platform/superproject/main/+/main:system/vold/FsCrypt.cpp + + # The 'device key': no other folders can be opened before this key is added + key = self.target.keystore.retrieve_key(userdata_fs.path("/unencrypted/key")) + userdata_fs.fscrypt.add_key(key) + + # The 'device encryption key' for a given user + key = self.target.keystore.retrieve_key(userdata_fs.path("/misc/vold/user_keys/de/0")) + userdata_fs.fscrypt.add_key(key) + + self.device_encrypted_storage_unlocked = True + except Exception: + self.target.log.exception( + "Unable to decrypt device-encrypted storage. Cannot decrypt credential-encrypted storage" + ) + return False + + self.device_encrypted_storage_unlocked = True + return True + + def _unlock_credential_encrypted_storage(self) -> bool: + """Requires a decryption of the userdata partition and device-encrypted storage. Try to decrypt the synthetic + password and add the resulting key to fscrypt.""" + if self.credential_encrypted_storage_unlocked: + return True + + userdata_volume = next((vol for vol in self.target.volumes if vol.name == "userdata"), None) + userdata_fs = userdata_volume.fs + fscrypt = userdata_fs.fscrypt + if fscrypt is None: + raise ValueError( + "Volume with userdata must have fscrypt initialized to unlock credential-encrypted storage" + ) + method = getattr(userdata_fs, "add_fscrypt", None) + if method is None or not callable(method): + # This could happen if Dissect were to support F2FS without implementing fscrypt support for it + raise ValueError("Volume with userdata must have a filesystem implementation that supports fscrypt") + + # Finally, try to unlock the credential-encrypted storage + try: + fscrypt.add_key(self.target.synthetic_password_manager.get_credential_encryption_key()) + self.credential_encrypted_storage_unlocked = True + except Exception as e: + self.target.log.warning("Unable to decrypt credential-encrypted storage: %s", e) + return self.credential_encrypted_storage_unlocked + + def unlock(self) -> bool: + """Unlock the Android device storage. This is a multi-step process: + 1. Decrypt the userdata partition using the key material from the metadata partition + 2. Decrypt the device-encrypted storage using key material stored on the userdata partition + 3. Decrypt the credential-encrypted storage using key material derived from the user's credentials + + Resources: + - https://blog.quarkslab.com/android-data-encryption-in-depth.html + - https://github.com/SlugFiller/fbe-decrypt + - https://android.stackexchange.com/questions/217019/what-is-a-synthetic-password-and-how-is-it-used-by-android + """ + if not self._unlock_userdata_partition(): + return False + if not self._unlock_device_encrypted_storage(): + return False + return self._unlock_credential_encrypted_storage() + @classmethod def detect(cls, target: Target) -> Filesystem | None: ANDROID_PATHS = ( @@ -46,15 +194,31 @@ def detect(cls, target: Target) -> Filesystem | None: "vendor", "product", ) - + userdata_fs = None + metadata_fs = None for fs in target.filesystems: if all(fs.exists(p) for p in ANDROID_PATHS) and any(find_build_props(fs)): return fs - return None + if fs.exists("/unencrypted") and fs.exists("misc"): + userdata_fs = fs + + # The userdata partition can be full-volume encrypted (with the metadata partition having the decryption + # key), so as a fallback we want to detect based on the metadata partition + if fs.exists(METADATA_ENCRYPTION_KEY_PATH): + metadata_fs = fs + # Prefer userdata_fs if both are found + return userdata_fs if userdata_fs else metadata_fs @classmethod def create(cls, target: Target, sysvol: Filesystem) -> Self: - target.fs.mount("/", sysvol) + # NOTE: When system partitions cannot be opened, the volume will not be successfully opened and fs will be None. + # We skip mounting such volumes. + for volume in target.volumes: + if volume.name in VOLUME_MAPPINGS and volume.fs is not None: + target.fs.mount(VOLUME_MAPPINGS[volume.name], volume.fs) + else: + # No volumes: probably a virtual target + target.fs.mount("/", sysvol) return cls(target) @export(property=True) diff --git a/dissect/target/plugins/os/unix/linux/android/lock/__init__.py b/dissect/target/plugins/os/unix/linux/android/lock/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dissect/target/plugins/os/unix/linux/android/lock/keystore.py b/dissect/target/plugins/os/unix/linux/android/lock/keystore.py new file mode 100644 index 0000000000..58b6400c3f --- /dev/null +++ b/dissect/target/plugins/os/unix/linux/android/lock/keystore.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from Crypto.Cipher import AES +from dissect.cstruct import cstruct +from dissect.database.sqlite3 import SQLite3 + +from dissect.target.plugin import InternalPlugin + +if TYPE_CHECKING: + from dissect.target.helpers.fsutil import TargetPath + +keystore_def = """ +// https://cs.android.com/android/platform/superproject/main/+/main:system/security/keystore2/src/sw_keyblob.rs +// KeyBlob::new_from_serialized +struct KeyBlob { + uint8 version; + uint32 key_material_length; + char key_material[key_material_length]; + // In actuality KeyBlobs contain software- and hardware enforced parameters, including the encryption algorithm. + // We do not use such fields and assume AES-256-GCM for all keys. +}; +""" + +PERSISTENT_KEYSTORE_PATH = "/data/misc/keystore/persistent.sqlite" +KEY_ENTRY_TABLE = "keyentry" + +c_keystore = cstruct().load(keystore_def) + + +class KeystorePlugin(InternalPlugin): + """The keystore is a system service that manages cryptographic keys and is used by Android's keychain API. + It has two sorts of keys: persistent and per-boot. + + TODO: The Keystore is used by more than the synthetic password manager and fscrypt, but we only implemented it as + far as needed for those two use cases. Expand functionality as needed. + + References: + - https://developer.android.com/privacy-and-security/keystore + - https://cs.android.com/android/platform/superproject/main/+/main:system/security/keystore2/src/database.rs + - https://cs.android.com/android/platform/superproject/main/+/main:system/security/keystore2/src/sw_keyblob.rs + - https://cs.android.com/android/platform/superproject/main/+/main:system/vold/KeyStorage.cpp + + """ + + __namespace__ = "keystore" + + def check_compatible(self) -> None: + # TODO: What would be a good compatibility check? Keystore functions are used for 'bootstrapping' the Android OS + # plugin so a compatibility check on OS type cannot be performed when this plugin is initialized + pass + + def get_persistent_keyblob(self, key: str) -> bytes: + persistent_db = SQLite3(self.target.fs.get(PERSISTENT_KEYSTORE_PATH).open()) + + key_entry_id = next( + (row["id"] for row in persistent_db.table(KEY_ENTRY_TABLE).rows() if row["alias"] == key), + None, + ) + if key_entry_id is None: + raise ValueError(f"Could not find {key} in persistent keystore") + + keyblob = next( + (row["blob"] for row in persistent_db.table("blobentry") if row["keyentryid"] == key_entry_id), + None, + ) + if keyblob is None: + raise ValueError(f"Entry was found in keystore for '{key}', but no associated blob was found") + + return keyblob + + @staticmethod + def decrypt_blob(blob: bytes, key: bytes) -> bytes: + nonce, ciphertext, auth_tag = blob[:12], blob[12:-16], blob[-16:] + return AES.new(key, AES.MODE_GCM, nonce=nonce).decrypt_and_verify(ciphertext, auth_tag) + + @staticmethod + def decrypt_key_from_keyblob(encrypted_key: bytes, keymaster_blob: bytes) -> bytes: + keymaster = c_keystore.KeyBlob(keymaster_blob) + return KeystorePlugin.decrypt_blob(encrypted_key, keymaster.key_material) + + @staticmethod + def retrieve_key(path: TargetPath) -> bytes: + encrypted_key = path.joinpath("encrypted_key") + if not encrypted_key.exists(): + raise ValueError("encrypted_key not found in path") + keymaster_blob = path.joinpath("keymaster_key_blob") + if not keymaster_blob.exists(): + raise ValueError("keymaster_key_blob not found in path") + return KeystorePlugin.decrypt_key_from_keyblob(encrypted_key.open().read(), keymaster_blob.open().read()) diff --git a/dissect/target/plugins/os/unix/linux/android/lock/synthetic_password_manager.py b/dissect/target/plugins/os/unix/linux/android/lock/synthetic_password_manager.py new file mode 100644 index 0000000000..ed3e3b861d --- /dev/null +++ b/dissect/target/plugins/os/unix/linux/android/lock/synthetic_password_manager.py @@ -0,0 +1,198 @@ +from __future__ import annotations + +import hmac +import logging +import struct +from hashlib import sha256, sha512 + +from Crypto.Cipher import AES +from Crypto.Protocol.KDF import scrypt +from dissect.cstruct import cstruct +from dissect.database.sqlite3 import SQLite3 + +from dissect.target.exceptions import UnsupportedPluginError +from dissect.target.helpers import keychain +from dissect.target.plugin import InternalPlugin + +log = logging.getLogger(__name__) + +synthetic_password_def = """ +// https://cs.android.com/android/platform/superproject/main/+/main:frameworks/base/services/core/java/com/android/server/locksettings/SyntheticPasswordManager.java; + +// PasswordData::fromBytes +struct PasswordData { + uint16 credentialType_unused; // See fromBytes function for explanation why these two bytes must be ignored + uint16 credentialType; + uint8 scryptLogN; + uint8 scryptLogR; + uint8 scryptLogP; + uint32 saltLen; + char salt[saltLen]; + uint32 handleLen; + char handle[handleLen]; +}; + +// SyntheticPassword::fromBytes +struct SyntheticPasswordBlob { + uint8 mVersion; + uint8 mProtectorType; + // Dynamic length and therefore not included: + // byte[] mContent; +}; +""" +c_synthetic_password = cstruct(endian=">").load(synthetic_password_def) + +DEFAULT_SYNTHETIC_PASSWORD_CREDENTIALS = b"default-password".ljust(32, b"\x00") +SEC_DISCARDABLE_PREFIX = b"Android secdiscardable SHA512" +KEY_WRAPPING_PREFIX = b"Android key wrapping key generation SHA512" +PERSONALIZATION_FBE_KEY = b"fbe-key" +PERSONALIZATION_CONTEXT = b"android-synthetic-password-personalization-context" +PERSONALIZATION_SECDISCARDABLE = b"secdiscardable-transform" + +SYNTHETIC_PASSWORD_VERSION_V3 = 3 +PROTECTOR_TYPE_LSKF_BASED = 0 + +CREDENTIAL_ENCRYPTION_KEY_PATH = "/data/misc/vold/user_keys/ce/0/current" +SYNTHETIC_PASSWORD_BLOB_PATH = "/data/system_de/0/spblob" +LOCKSETTINGS_DB_PATH = "/data/system/locksettings.db" + + +def sp_800_derive(key: bytes, label: bytes, context: bytes) -> bytes: + """Wraps a given key in a key-derivation function following Android's implementation of NIST SP 800-108. + + https://source.android.com/docs/security/features/encryption/hw-wrapped-keys + https://android.googlesource.com/platform/frameworks/base/+/master/services/core/java/com/android/server/locksettings/SP800Derive.java + """ + + # As label and context are dynamic-size, struct.pack is more readable than some cstruct hacks + counter = struct.pack(">I", 1) + context_length = struct.pack(">I", len(context) * 8) + output_length = struct.pack(">I", 256) + sp_800_output = counter + label + b"\x00" + context + context_length + output_length + out = hmac.new(key, digestmod=sha256) + out.update(sp_800_output) + return out.digest() + + +def personalized_hash(personalization: bytes, message: bytes) -> bytes: + """Wrap a message in a SHA-512 hash with personalization.""" + digest = sha512(personalization.ljust(128, b"\x00")) + digest.update(message) + return digest.digest() + + +def decrypt_blob(blob: bytes, key: bytes) -> bytes: + # Same method as keystore, but source code implements it seperately in the synthetic password manager + nonce, ciphertext, auth_tag = blob[:12], blob[12:-16], blob[-16:] + return AES.new(key, AES.MODE_GCM, nonce=nonce).decrypt_and_verify(ciphertext, auth_tag) + + +class SyntheticPasswordManager(InternalPlugin): + """A user's Synthetic Password (SP) never changes (as it is the main input for file-based encryption) but SP + protectors can be added or removed. There must be at least one Lock-Screen Knowledge Factor (LSKF), though it may be + 'none' for an unprotected device. In such cases, a default value is used as an input for key derivation + + References: + - https://cs.android.com/android/platform/superproject/main/+/main:frameworks/base/services/core/java/com/android/server/locksettings/SyntheticPasswordManager.java + + """ + + __namespace__ = "synthetic_password_manager" + + def check_compatible(self) -> None: + if not self.target.fs.path(LOCKSETTINGS_DB_PATH).exists(): + raise UnsupportedPluginError("Locksettings database not found") + + def decrypt_synthetic_password(self, sp_name: int) -> bytes: + """Attempt to decrypt the SP using any available credentials in the keychain.""" + sp_name_zero_padded = f"{sp_name:016x}" + sp_name = f"{sp_name:x}" + + sp_handle = self.target.fs.get(f"{SYNTHETIC_PASSWORD_BLOB_PATH}/{sp_name_zero_padded}.spblob").open() + sp_blob = c_synthetic_password.SyntheticPasswordBlob(sp_handle) + if sp_blob.mVersion != SYNTHETIC_PASSWORD_VERSION_V3: + raise NotImplementedError("Only synthetic password version 3 is supported") + if sp_blob.mProtectorType != PROTECTOR_TYPE_LSKF_BASED: + raise NotImplementedError("Only LSKF based synthetic password is supported") + twice_encrypted_synthetic_password = sp_handle.read() + + alias_in_keystore = f"synthetic_password_{sp_name}" + synthetic_password_blob = self.target.keystore.get_persistent_keyblob(alias_in_keystore) + once_encrypted_synthetic_password = self.target.keystore.decrypt_key_from_keyblob( + twice_encrypted_synthetic_password, synthetic_password_blob + ) + + secdiscardable_transformed = personalized_hash( + PERSONALIZATION_SECDISCARDABLE, + self.target.fs.get(f"{SYNTHETIC_PASSWORD_BLOB_PATH}/{sp_name_zero_padded}.secdis").open().read(), + ) + + pwd_path = next(self.target.fs.path(SYNTHETIC_PASSWORD_BLOB_PATH).glob("*.pwd"), None) + if pwd_path is None: + # Synthetic Password is not protected by a user-credential + passwords = [(DEFAULT_SYNTHETIC_PASSWORD_CREDENTIALS, DEFAULT_SYNTHETIC_PASSWORD_CREDENTIALS)] + else: + password_data = c_synthetic_password.PasswordData(pwd_path.open().read()) + passwords = [] + for key in keychain.get_keys_for_provider("android") + keychain.get_keys_without_provider(): + if key.key_type == keychain.KeyType.PASSPHRASE: + passwords.append( + ( + key.value, + scrypt( + key.value, + password_data.salt, + 32, + 1 << password_data.scryptLogN, + 1 << password_data.scryptLogR, + 1 << password_data.scryptLogP, + ), + ) + ) + + if len(passwords) == 0: + raise ValueError("Cannot decrypt credential-encrypted storage without a (derived) password") + for plaintext_password, encrypted_password in passwords: + inner_key = personalized_hash(b"application-id", encrypted_password + secdiscardable_transformed)[:32] + try: + decrypted_key = decrypt_blob(once_encrypted_synthetic_password, inner_key) + # TODO: For reviewer: logging the plaintext password that ended up working _can_ be useful but also + # undesireable. Ideally this would be configurable + logging.info("Decrypted credential-encrypted storage with password '%s'", plaintext_password) + except Exception: + pass + else: + return decrypted_key + raise ValueError(f"Failed to decrypt credential-encrypted storage, tried {len(passwords)} password(s).") + + def get_credential_encryption_key(self) -> bytes: + """Get the credential encryption key from the device.""" + encrypted_credential_encryption_key_path = self.target.fs.path(CREDENTIAL_ENCRYPTION_KEY_PATH) + locksettings_db = SQLite3(self.target.fs.get(LOCKSETTINGS_DB_PATH).open()) + sp_handle = next( + (row.value for row in locksettings_db.table("locksettings").rows() if row.name == "sp-handle"), None + ) + + if sp_handle is None: + # The credential encryption key is only encrypted using the keymaster file in the same directory + # This is only the case for old Android versions + return self.target.keystore.retrieve_key(encrypted_credential_encryption_key_path) + + # 64-bit boundary + sp_name = int(sp_handle) & 0xFFFFFFFFFFFFFFFF + + # The CE key is encrypted using a wrapped key derived from the synthetic password + encrypted_credential_encryption_key = ( + encrypted_credential_encryption_key_path.joinpath("encrypted_key").open().read() + ) + + # Every user has one synthetic password, which may be protected by any number of spblobs + # This is the part that might require a Lock-Screen Knowledge Factor (LSKF) if the device is protected + synthetic_password = self.decrypt_synthetic_password(sp_name) + + # Derive a key from the synthetic password that is used by fscrypt + sp_800_derived_pw = sp_800_derive(synthetic_password, PERSONALIZATION_FBE_KEY, PERSONALIZATION_CONTEXT) + wrapped_pw = personalized_hash(KEY_WRAPPING_PREFIX, sp_800_derived_pw)[:32] + + # The final key to be used for fscrypt + return decrypt_blob(encrypted_credential_encryption_key, wrapped_pw) diff --git a/dissect/target/plugins/os/unix/linux/android/lock/xts.py b/dissect/target/plugins/os/unix/linux/android/lock/xts.py new file mode 100644 index 0000000000..2ca8606a5c --- /dev/null +++ b/dissect/target/plugins/os/unix/linux/android/lock/xts.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from dissect.fve.crypto import create_cipher +from dissect.util.stream import AlignedStream + + +# TODO: Perhaps this should be moved into dissect.fve +class XTSDecryptionStream(AlignedStream): + def __init__(self, fh: AlignedStream, key: bytes, size: int | None = None, block_size: int = 4096): + self.fh = fh + self.key = key + self.block_size = block_size + + super().__init__(size=size, align=block_size) + + def _decrypt_block(self, offset: int) -> bytes: + self.fh.seek(offset * self.block_size) + encrypted = self.fh.read(self.block_size) + return create_cipher("aes-xts-256", self.key, sector_size=4096, iv_sector_size=4096).decrypt(encrypted, offset) + + def _read(self, offset: int, length: int) -> bytes: + return b"".join( + self._decrypt_block(block) + for block in range(offset // self.block_size, (offset + length) // self.block_size) + ) diff --git a/tests/_data/helpers/fscrypt/encrypted.jpg b/tests/_data/helpers/fscrypt/encrypted.jpg new file mode 100644 index 0000000000..c9bb544328 --- /dev/null +++ b/tests/_data/helpers/fscrypt/encrypted.jpg @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:c16fdbc48a50e2b66cafa3915d4cbd8415718fd399f4c8e77f30eaa8334f17d1 +size 86016 diff --git a/tests/_data/plugins/os/unix/linux/android/avd/metadata_with_pin.csv.gz b/tests/_data/plugins/os/unix/linux/android/avd/metadata_with_pin.csv.gz new file mode 100644 index 0000000000..20a9a6a4da --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/android/avd/metadata_with_pin.csv.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:d8615e6e1988b77b7c0363ba281233de8897d15679b852851fe2536d078b4a3e +size 15600 diff --git a/tests/_data/plugins/os/unix/linux/android/avd/metadata_without_pin.csv.gz b/tests/_data/plugins/os/unix/linux/android/avd/metadata_without_pin.csv.gz new file mode 100644 index 0000000000..260b05b339 --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/android/avd/metadata_without_pin.csv.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:848f8205d40b3ea2c5b2bbfb6eb5834c4cdc447b3ab6bc23aac23d164de5cac1 +size 15451 diff --git a/tests/_data/plugins/os/unix/linux/android/avd/userdata_with_pin.csv.gz b/tests/_data/plugins/os/unix/linux/android/avd/userdata_with_pin.csv.gz new file mode 100644 index 0000000000..782a39b4ba --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/android/avd/userdata_with_pin.csv.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:2d674033c0a26bf1f7a191f1ef387db3692b1cf35c6c6f74f6cbf46bc96d2b3f +size 9962384 diff --git a/tests/_data/plugins/os/unix/linux/android/avd/userdata_without_pin.csv.gz b/tests/_data/plugins/os/unix/linux/android/avd/userdata_without_pin.csv.gz new file mode 100644 index 0000000000..e4fd4e4b0f --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/android/avd/userdata_without_pin.csv.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:aa218336e4b6eb0a3f0bb20c551de7acc747ffe7e2ed2015607f63a2fe820047 +size 9948457 diff --git a/tests/helpers/test_fscrypt.py b/tests/helpers/test_fscrypt.py new file mode 100644 index 0000000000..78c997a8c9 --- /dev/null +++ b/tests/helpers/test_fscrypt.py @@ -0,0 +1,100 @@ +import hashlib + +import pytest + +from dissect.target.exceptions import FileDecryptionError +from dissect.target.helpers.fscrypt import FSCrypt, FSCryptEntryDecryptor, c_fscrypt, fscrypt_key_identifier +from tests._utils import absolute_path + +FILESYSTEM_MASTER_KEY = bytes.fromhex( + "98866cabd76ab14358ec324f7f217da0e063824919ed0fffaa4c9c415dbc74d3496811cc68f8997294e5b7741d7c3e7a646317d37af77b170e4f4b3c27dd978f" +) + +MASTER_KEY_IDENTIFIER = b"\x9eZ\xde\x06?f\x0c\xd5\x02\xf2zJ\x98\x90\x01X" + +ENCRYPTED_DIRECTORY_ENCRYPTION_CONTEXT = c_fscrypt.fscrypt_context_v2( + version=0x2, + contents_encryption_mode=0x1, + filenames_encryption_mode=0x4, + flags=0x2, + log2_data_unit_size=0x0, + __reserved=b"\x00\x00\x00", + master_key_identifier=MASTER_KEY_IDENTIFIER, + nonce=b"\xb5g\x19\xe6>\xfc+K'\xfd\xea\\\xc9pH\xc8", +) + +ENCRYPTED_DIRECTORY_EXPECTED_DERIVED_KEY = bytes.fromhex( + "ba352926cdde93a0bdb6886991a1570fa659a8d36773e977378b98efbbdb816a90d8e11017b1cadf20ff2608a03b55368b3e8da986ab5c3011c95a59f651ab9e" +) + +ENCRYPTED_FILE_ENCYRPTION_CONTEXT = c_fscrypt.fscrypt_context_v2( + version=0x2, + contents_encryption_mode=0x1, + filenames_encryption_mode=0x4, + flags=0x2, + log2_data_unit_size=0x0, + __reserved=b"\x00\x00\x00", + master_key_identifier=MASTER_KEY_IDENTIFIER, + nonce=b"\xe2|\x07GT\r\xab\x8duc0\x89k\x7f2\x9b", +) + +ENCRYPTED_FILE_EXPECTED_DERIVED_KEY = bytes.fromhex( + "6e61113c6cdfa174484593be20c5319895ab3a370a5a438a3e722e2e934008682b3af28b3b61fe00c78564c1a6bdfa1a7dbcb966b1194ae35655f9ca4c99639e" +) + +ENCRYPTED_FILE_SIZE = 85622 +ENCRYPTED_FILE_NUM_BLOCKS = 21 + + +def test_fscrypt_key_identifier() -> None: + assert fscrypt_key_identifier(FILESYSTEM_MASTER_KEY) == MASTER_KEY_IDENTIFIER.hex() + + +def test_decrypt_directory_filenames() -> None: + fscrypt = FSCrypt(None) + decryptor = FSCryptEntryDecryptor( + fscrypt, ENCRYPTED_DIRECTORY_ENCRYPTION_CONTEXT, ENCRYPTED_DIRECTORY_EXPECTED_DERIVED_KEY + ) + + assert ( + decryptor.decrypt_filename( + b"\xc4u\x9c\xb1\xf4\x16M\xad\xa8N\x98\xc6\xfdAh\x1c\x1c\xf4+F\x0e;\x10s\xe7\x12b\xb0\xba\x9d\xd8S\xd4J\xf9\xc0E\x1d\x05sKJ.\xc9\xad'\xa9\xd4" + ) + == b"but-there-are-so-many-routes-to-take.jpg" + ) + + assert decryptor.decrypt_filename(b"\x93\x13svT:\xc5\x9d\x1c\xde\xc85\x14\xd1\xeeF") == b"short.filename" + assert decryptor.decrypt_filename(b".") == b"." + assert decryptor.decrypt_filename(b"..") == b".." + + +def test_fscrypt_get_decryptor() -> None: + fscrypt = FSCrypt(None) + with pytest.raises(FileDecryptionError): + fscrypt.get_decryptor(ENCRYPTED_DIRECTORY_ENCRYPTION_CONTEXT.dumps()) + fscrypt.add_key(FILESYSTEM_MASTER_KEY) + decryptor = fscrypt.get_decryptor(ENCRYPTED_DIRECTORY_ENCRYPTION_CONTEXT.dumps()) + decryptor.key = ENCRYPTED_DIRECTORY_EXPECTED_DERIVED_KEY + + +def test_xts_runlist_decryption_stream() -> None: + runlist = [(0, ENCRYPTED_FILE_NUM_BLOCKS)] + + fscrypt = FSCrypt(None) + fscrypt.add_key(FILESYSTEM_MASTER_KEY) + decryptor = fscrypt.get_decryptor(ENCRYPTED_FILE_ENCYRPTION_CONTEXT.dumps()) + + decrypted_stream = decryptor.wrap_content_stream(runlist, ENCRYPTED_FILE_SIZE) + + # Nonce + Master key + assert decrypted_stream.key == ENCRYPTED_FILE_EXPECTED_DERIVED_KEY + + with absolute_path("_data/helpers/fscrypt/encrypted.jpg").open("rb") as fh: + decrypted_stream.fh = fh + decrypted_stream.seek(0) + + # Should be a jpeg + assert decrypted_stream.read(10) == b"\xff\xd8\xff\xe0\x00\x10JFIF" + decrypted_stream.seek(0) + full_stream = decrypted_stream.read() + assert hashlib.md5(full_stream).hexdigest() == "325f72e4c6d8177d2a7cbcf3cb01f30f" diff --git a/tests/plugins/os/unix/linux/android/test__os.py b/tests/plugins/os/unix/linux/android/test__os.py index 3a7240f614..ab4fd16e78 100644 --- a/tests/plugins/os/unix/linux/android/test__os.py +++ b/tests/plugins/os/unix/linux/android/test__os.py @@ -1,13 +1,54 @@ -from io import BytesIO +import csv +import gzip +from io import BytesIO, TextIOWrapper +from pathlib import Path +from unittest.mock import patch import pytest +from dissect.util.stream import MappingStream +from dissect.target.containers.raw import RawContainer +from dissect.target.exceptions import FileDecryptionError from dissect.target.filesystem import VirtualFilesystem +from dissect.target.helpers import keychain from dissect.target.plugins.os.unix.linux.android._os import AndroidPlugin from dissect.target.target import Target +from dissect.target.volume import Volume from tests._utils import absolute_path +def map_stream_from_csv(csv_path: Path, size: int, align: int = 8192) -> MappingStream: + stream = MappingStream(align=align, size=size) + with TextIOWrapper(gzip.open(csv_path, "r")) as fh: + for offset, data in csv.reader(fh): + offset = int(offset) + data = bytes.fromhex(data) + stream.add(offset, len(data), BytesIO(data)) + return stream + + +@pytest.fixture +def avd_disks_with_pin() -> tuple[MappingStream, MappingStream]: + return map_stream_from_csv( + absolute_path("_data/plugins/os/unix/linux/android/avd/userdata_with_pin.csv.gz"), + size=6442450944, + ), map_stream_from_csv( + absolute_path("_data/plugins/os/unix/linux/android/avd/metadata_with_pin.csv.gz"), + size=18874368, + ) + + +@pytest.fixture +def avd_disks_without_pin() -> tuple[MappingStream, MappingStream]: + return map_stream_from_csv( + absolute_path("_data/plugins/os/unix/linux/android/avd/userdata_without_pin.csv.gz"), + size=6442450944, + ), map_stream_from_csv( + absolute_path("_data/plugins/os/unix/linux/android/avd/metadata_without_pin.csv.gz"), + size=18874368, + ) + + def test_android_os(target_android: Target) -> None: target_android.add_plugin(AndroidPlugin) @@ -56,3 +97,56 @@ def test_android_os_detect_props(target_bare: Target, build_prop_locations: list # test if glob does not go too deep. assert "/foo/bar/too/deep/build.prop" not in target_bare._os.build_prop_paths + + +def test_android_os_unlock_credential_encrypted_volume_with_pin( + target_bare: Target, avd_disks_with_pin: tuple[MappingStream, MappingStream] +) -> None: + userdata_fh, metadata_fh = avd_disks_with_pin + target_bare.disks.add(RawContainer(metadata_fh)) + target_bare.volumes.add(Volume(userdata_fh, 1, 0, userdata_fh.size, None, "userdata")) + target_bare.apply() + + assert isinstance(target_bare._os, AndroidPlugin) + assert target_bare._os.userdata_partition_unlocked + assert target_bare._os.device_encrypted_storage_unlocked + assert not target_bare._os.credential_encrypted_storage_unlocked + + # Verify we can find the credential encrypted userdata directory, but not read it + encrypted_userdata = target_bare.fs.get("/data/data") + + # Should be not be able to list the encrypted userdata directory yet + with pytest.raises(FileDecryptionError): + encrypted_userdata.listdir() + + with patch.object(keychain, "KEYCHAIN", []): + keychain.register_key( + keychain.KeyType.PASSPHRASE, + b"30031853", # Birthday of a legend + identifier=None, + provider="android", + ) + target_bare._os.unlock() + assert target_bare._os.credential_encrypted_storage_unlocked + + # Verify we can now list the encrypted userdata directory + assert "com.google.android.apps.photos" in encrypted_userdata.listdir() + + +def test_android_os_unlock_credential_encrypted_volume_without_pin( + target_bare: Target, avd_disks_without_pin: tuple[MappingStream, MappingStream] +) -> None: + userdata_fh, metadata_fh = avd_disks_without_pin + target_bare.disks.add(RawContainer(metadata_fh)) + target_bare.volumes.add(Volume(userdata_fh, 1, 0, userdata_fh.size, None, "userdata")) + target_bare.apply() + + assert isinstance(target_bare._os, AndroidPlugin) + + # As this device doesn't have a PIN/password/pattern set, all storages should be unlocked automatically + assert target_bare._os.userdata_partition_unlocked + assert target_bare._os.device_encrypted_storage_unlocked + assert target_bare._os.credential_encrypted_storage_unlocked + + # Verify we can list the encrypted userdata directory + assert "com.google.android.apps.photos" in target_bare.fs.get("/data/data/").listdir()