Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dissect/target/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,7 @@ class ConfigurationParsingError(Error):

class TargetPathNotFoundError(TargetError):
"""The path to the target does not exist."""


class FileDecryptionError(FilesystemError):
"""An error occurred during file decryption."""
40 changes: 40 additions & 0 deletions dissect/target/filesystems/extfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dissect.extfs import extfs

from dissect.target.exceptions import (
FileDecryptionError,
FileNotFoundError,
FilesystemError,
IsADirectoryError,
Expand All @@ -18,19 +19,29 @@
if TYPE_CHECKING:
from collections.abc import Iterator

from dissect.target.helpers.fscrypt import FSCrypt, FSCryptEntryDecryptor


class ExtFilesystem(Filesystem):
__type__ = "ext"

def __init__(self, fh: BinaryIO, *args, **kwargs):
super().__init__(fh, *args, **kwargs)
self.extfs = extfs.ExtFS(fh)
self.fscrypt: FSCrypt | None = None

@staticmethod
def _detect(fh: BinaryIO) -> bool:
fh.seek(1024)
return fh.read(512)[56:58] == b"\x53\xef"

def add_fscrypt(self, fscrypt: FSCrypt) -> None:
self.fscrypt = fscrypt
self.extfs._get_inode_decryptor = self.get_inode_decryptor

def get_inode_decryptor(self, inode: extfs.INode) -> ExtEntryDecryptor:
return ExtEntryDecryptor(inode, self)

def get(self, path: str) -> FilesystemEntry:
return ExtFilesystemEntry(self, path, self._get_node(path))

Expand Down Expand Up @@ -147,3 +158,32 @@ def attr(self) -> Any:

def lattr(self) -> Any:
return self.entry.xattr


class ExtEntryDecryptor:
def __init__(self, inode: extfs.INode, fs: ExtFilesystem):
self.inode = inode
self.fs = fs
self._decryptor: FSCryptEntryDecryptor | None = None

@property
def decryptor(self) -> FSCryptEntryDecryptor:
if self._decryptor is None:
if not self.inode.is_encrypted:
raise ValueError("Entry is not encrypted")
if self.fs.fscrypt is None:
raise FileDecryptionError("No fscrypt context provided to this filesystem")

encryption_context = next((attr.value for attr in self.inode.xattr if attr.name == "encryption.c"), None)

if encryption_context is None:
raise FileDecryptionError("No encryption context found for this inode")
# Will raise FileDecryptionError if no key was found
self._decryptor = self.fs.fscrypt.get_decryptor(encryption_context)
return self._decryptor

def open_decrypt(self) -> BinaryIO:
return self.decryptor.wrap_content_stream(self.inode.dataruns(), self.inode.size)

def decrypt_filename(self, encrypted_filename: bytes) -> bytes:
return self.decryptor.decrypt_filename(encrypted_filename)
207 changes: 207 additions & 0 deletions dissect/target/helpers/fscrypt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
from __future__ import annotations

import hashlib
import hmac
from typing import BinaryIO

from Crypto.Cipher import AES
from dissect.cstruct import cstruct
from dissect.fve.crypto import create_cipher
from dissect.util.stream import AlignedStream

from dissect.target.exceptions import FileDecryptionError

fscrypt_def = """
// https://github.com/torvalds/linux/blob/master/fs/crypto/fscrypt_private.h
#define HKDF_CONTEXT_KEY_IDENTIFIER 1 /* info=<empty> */
#define HKDF_CONTEXT_PER_FILE_ENC_KEY 2 /* info=file_nonce */
#define HKDF_CONTEXT_DIRECT_KEY 3 /* info=mode_num */
#define HKDF_CONTEXT_IV_INO_LBLK_64_KEY 4 /* info=mode_num||fs_uuid */
#define HKDF_CONTEXT_DIRHASH_KEY 5 /* info=file_nonce */
#define HKDF_CONTEXT_IV_INO_LBLK_32_KEY 6 /* info=mode_num||fs_uuid */
#define HKDF_CONTEXT_INODE_HASH_KEY 7 /* info=<empty */

#define FSCRYPT_POLICY_V1 0
#define FSCRYPT_KEY_DESCRIPTOR_SIZE 8
#define FSCRYPT_POLICY_V2 2
#define FSCRYPT_KEY_IDENTIFIER_SIZE 16
#define FSCRYPT_FILE_NONCE_SIZE 16

/* Encryption algorithms */
#define FSCRYPT_MODE_AES_256_XTS 1
#define FSCRYPT_MODE_AES_256_CTS 4
#define FSCRYPT_MODE_AES_128_CBC 5
#define FSCRYPT_MODE_AES_128_CTS 6
#define FSCRYPT_MODE_SM4_XTS 7
#define FSCRYPT_MODE_SM4_CTS 8
#define FSCRYPT_MODE_ADIANTUM 9
#define FSCRYPT_MODE_AES_256_HCTR2 10

struct fscrypt_context_v1 {
uint8 version; /* FSCRYPT_CONTEXT_V1 */
uint8 contents_encryption_mode;
uint8 filenames_encryption_mode;
uint8 flags;
char master_key_descriptor[FSCRYPT_KEY_DESCRIPTOR_SIZE];
char nonce[FSCRYPT_FILE_NONCE_SIZE];
};

struct fscrypt_context_v2 {
uint8 version; /* FSCRYPT_CONTEXT_V2 */
uint8 contents_encryption_mode;
uint8 filenames_encryption_mode;
uint8 flags;
uint8 log2_data_unit_size;
uint8 __reserved[3];
char master_key_identifier[FSCRYPT_KEY_IDENTIFIER_SIZE];
char nonce[FSCRYPT_FILE_NONCE_SIZE];
};
"""

c_fscrypt = cstruct().load(fscrypt_def)
ZERO_BYTE_IV = b"\x00" * AES.block_size

# The final byte '01' is actually not part of the info but the block counter, which is always 1 for our purposes
FSCRYPT_IDENTIFIER_STR = b"fscrypt\0"
FSCRYPT_KEY_IDENTIFIER_INFO = (
FSCRYPT_IDENTIFIER_STR + c_fscrypt.HKDF_CONTEXT_KEY_IDENTIFIER.to_bytes(1, "little") + b"\x01"
)


def fscrypt_hdkf(key: bytes, info_and_block_index: bytes) -> bytes:
"""Unsalted HKDF-SHA512 using a key and an info.

We also ask the caller to pass a block index. Normally the hashing function should do that for the caller, but
this allows the caller to pass a static info value that we don't have to concatonate another byte to every time
this function is called."""
hdkf_extracted_key = hmac.new(b"", key, hashlib.sha512).digest()
hdkf_expand = hmac.new(hdkf_extracted_key, info_and_block_index, hashlib.sha512)
return hdkf_expand.digest()


def fscrypt_key_identifier(key: bytes) -> str:
"""Derive a fscrypt-specific key identifier from a key."""
return fscrypt_hdkf(key, FSCRYPT_KEY_IDENTIFIER_INFO)[: c_fscrypt.FSCRYPT_KEY_IDENTIFIER_SIZE].hex()


def aes_256_cts_cbc_decrypt(ciphertext: bytes, key: bytes) -> bytes:
"""AES 256 with Ciphertext Stealing."""
if len(ciphertext) <= 16:
return AES.new(key, AES.MODE_ECB).decrypt(ciphertext)

cipher = AES.new(key, AES.MODE_CBC, ZERO_BYTE_IV)

# Round the ciphertext up to the next multiple of 16 and deduct 16
last_block_start = ((len(ciphertext) + 15) & ~15) - 16
second_to_last_block_start = last_block_start - 16

# Put second-to-last block _behind_ the last block
cts_ciphertext = ciphertext[last_block_start:]
second_to_last_block = ciphertext[second_to_last_block_start:last_block_start]
cts_ciphertext += second_to_last_block

decrypted_prefix = cipher.decrypt(ciphertext[:second_to_last_block_start]) if last_block_start > 16 else b""
return decrypted_prefix + cipher.decrypt(cts_ciphertext)


def aes_256_xts_decrypt(ciphertext: bytes, key: bytes, sector: int) -> bytes:
"""AES-XTS-256 decryption with a sector size of 4096."""
return create_cipher("aes-xts-256", key, sector_size=4096, iv_sector_size=4096).decrypt(ciphertext, sector)


class FSCrypt:
"""FSCrypt decryption.

The class can be given master keys.
Then, for a given encyption context, if the associated master key is present, a combined key is derived and wrapped
in an instance of the FSCryptEntryDecryptor class, that can use said key for filename and file contents decryption.

Resources:
- https://www.kernel.org/doc/html/v4.18/filesystems/fscrypt.html

"""

def __init__(self, fh: BinaryIO):
self.keys: dict[str, bytes] = {}
self.fh = fh

def add_key(self, key: bytes) -> None:
self.keys[fscrypt_key_identifier(key)] = key

def get_decryptor(self, encryption_context: bytes) -> FSCryptEntryDecryptor:
encryption_context = c_fscrypt.fscrypt_context_v2(encryption_context)
if encryption_context.version != c_fscrypt.FSCRYPT_POLICY_V2:
raise NotImplementedError("Only FSCRYPT_POLICY_V2 is supported")
master_key_identifier = encryption_context.master_key_identifier.hex()
if master_key_identifier not in self.keys:
raise FileDecryptionError("Requested key not found in keystore")
master_key = self.keys[master_key_identifier]

file_enc_info = (
FSCRYPT_IDENTIFIER_STR
+ c_fscrypt.HKDF_CONTEXT_PER_FILE_ENC_KEY.to_bytes(1, "little")
+ encryption_context.nonce
)
file_enc_info += b"\01" # Block counter
derived_key = fscrypt_hdkf(master_key, file_enc_info)
return FSCryptEntryDecryptor(self, encryption_context, derived_key)


class FSCryptEntryDecryptor:
"""Given an encryption context, and a derived key, can decrypt filenames and file contents."""

def __init__(self, fscrypt: FSCrypt, encryption_context: c_fscrypt.fscrypt_context_v2, key: bytes):
self.fscrypt = fscrypt
self.encryption_context = encryption_context
self.key = key

def decrypt_filename(self, filename: bytes) -> bytes:
if filename in [b".", b".."]:
return filename
if self.encryption_context.filenames_encryption_mode == c_fscrypt.FSCRYPT_MODE_AES_256_CTS:
filename_key = self.key[0:32]
ret = aes_256_cts_cbc_decrypt(filename, filename_key)
return ret.rstrip(b"\0")

raise NotImplementedError("Only FSCRYPT_MODE_AES_256_CTS is supported for filenames")

def wrap_content_stream(self, runlist: tuple[int, int], size: int) -> XTSRunlistDecryptionStream:
if self.encryption_context.contents_encryption_mode != c_fscrypt.FSCRYPT_MODE_AES_256_XTS:
raise NotImplementedError("Only FSCRYPT_MODE_AES_256_XTS is supported for content streams")
return XTSRunlistDecryptionStream(self.fscrypt.fh, self.key, runlist, size)


class XTSRunlistDecryptionStream(AlignedStream):
"""AES-XTS-256 decryption stream for a given runlist."""

def __init__(
self, fh: BinaryIO, key: bytes, runlist: list[tuple[int, int]], size: int | None = None, align: int = 4096
):
self.fh = fh
self.key = key
self.runlist = runlist
self._block_offsets: list[int] = None
super().__init__(size, align)

def _get_block_offsets(self) -> list[int]:
if self._block_offsets is None:
self._block_offsets = []
for start, length in self.runlist:
for i in range(length):
self._block_offsets.append(start + i)
return self._block_offsets

def _decrypt_block(self, block_index: int, offset: int) -> bytes:
self.fh.seek(offset * self.align)
encrypted = self.fh.read(self.align)
return aes_256_xts_decrypt(encrypted, self.key, block_index)

def _read(self, offset: int, length: int) -> bytes:
r = []
first_idx = offset // self.align
last_idx = (first_idx + (length // self.align)) - 1
for block_idx, block_offset in enumerate(self._get_block_offsets()):
if block_idx < first_idx or block_idx > last_idx:
continue
r.append(self._decrypt_block(block_idx, block_offset))
return b"".join(r)
1 change: 1 addition & 0 deletions dissect/target/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def open(path: str | Path, *, fallbacks: list[type[Loader]] | None = None, **kwa
register("local", "LocalLoader")
register("ad1", "AD1Loader")
register("asdf", "AsdfLoader")
register("avd", "AVDLoader")
register("vmx", "VmxLoader")
register("vmwarevm", "VmwarevmLoader")
register("hyperv", "HyperVLoader")
Expand Down
58 changes: 58 additions & 0 deletions dissect/target/loaders/avd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from __future__ import annotations

import logging
from pathlib import Path
from typing import TYPE_CHECKING

from dissect.hypervisor.disk.qcow2 import QCow2

from dissect.target.containers.raw import RawContainer
from dissect.target.loader import Loader
from dissect.target.volume import Volume

log = logging.getLogger(__name__)
if TYPE_CHECKING:
from dissect.target.target import Target


class AVDLoader(Loader):
"""Load an Android Virtual Device."""

def __init__(self, path: Path, **kwargs):
super().__init__(path, **kwargs)
self.avd_folder = path
self.encryptionkey_path = self.avd_folder.joinpath("encryptionkey.img.qcow2")
self.encryptionkey_backing_path = self.avd_folder.joinpath("encryptionkey.img")

self.userdata_path = self.avd_folder.joinpath("userdata-qemu.img.qcow2")
self.userdata_backing_path = self.avd_folder.joinpath("userdata-qemu.img")

qemu_config_path = self.avd_folder.joinpath("hardware-qemu.ini")
self.system_partition_path = None

if qemu_config_path.exists():
qemu_config = qemu_config_path.read_text(encoding="utf-8").splitlines()
for line in qemu_config:
if line.startswith("disk.systemPartition.initPath"):
_, _, system_partition_path = line.partition("=")
self.system_partition_path = Path(system_partition_path.strip())
if not self.system_partition_path.exists():
self.system_partition_path = None
break

@staticmethod
def detect(path: Path) -> bool:
return path.is_dir() and path.name.endswith(".avd") and path.joinpath("AVD.conf").exists()

def map(self, target: Target) -> None:
if self.system_partition_path:
container = RawContainer(self.system_partition_path.open("rb"))
target.disks.add(container)

metadata_partition_disk_fh = QCow2(
fh=self.encryptionkey_path.open("rb"), backing_file=self.encryptionkey_backing_path.open("rb")
).open()
userdata_fh = QCow2(fh=self.userdata_path.open("rb"), backing_file=self.userdata_backing_path.open("rb")).open()

target.disks.add(RawContainer(metadata_partition_disk_fh))
target.volumes.add(Volume(userdata_fh, 1, 0, userdata_fh.size, None, "userdata"))
Loading