From b1ccaa81381a9d9bdb9f44cc9fd4bb8cabcb27e2 Mon Sep 17 00:00:00 2001 From: Simon Berger Date: Sun, 7 Sep 2025 11:54:10 +0000 Subject: [PATCH 1/3] feat: add type hints to ubireader.ubi --- ubireader/ubi/__init__.py | 41 ++++++++++------- ubireader/ubi/block/__init__.py | 30 ++++++++----- ubireader/ubi/block/layout.py | 23 ++++++++-- ubireader/ubi/block/sort.py | 26 ++++++----- ubireader/ubi/display.py | 23 +++++++--- ubireader/ubi/headers.py | 79 +++++++++++++++++++++++++++------ ubireader/ubi/image.py | 22 ++++++--- ubireader/ubi/volume.py | 33 +++++++++----- 8 files changed, 195 insertions(+), 82 deletions(-) diff --git a/ubireader/ubi/__init__.py b/ubireader/ubi/__init__.py index cf2cb88..ba16efa 100755 --- a/ubireader/ubi/__init__.py +++ b/ubireader/ubi/__init__.py @@ -17,12 +17,19 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations +from typing import TYPE_CHECKING from ubireader.debug import error from ubireader.ubi.block import sort, extract_blocks from ubireader.ubi import display from ubireader.ubi.image import description as image from ubireader.ubi.block import layout, rm_old_blocks +if TYPE_CHECKING: + from ubireader.ubi_io import ubi_file as UbiFile + from ubireader.ubi.block import description as Block + from ubireader.ubi.image import description as Image + class ubi_base(object): """UBI Base object @@ -39,7 +46,7 @@ class ubi_base(object): Dict:blocks -- Dict keyed by PEB number of all blocks. """ - def __init__(self, ubi_file): + def __init__(self, ubi_file: UbiFile) -> None: self.__name__ = 'UBI' self._file = ubi_file self._first_peb_num = 0 @@ -54,7 +61,7 @@ def __init__(self, ubi_file): self._leb_size = self.file.block_size - arbitrary_block.ec_hdr.data_offset - def _get_file(self): + def _get_file(self) -> UbiFile: """UBI File object Returns: @@ -64,7 +71,7 @@ def _get_file(self): file = property(_get_file) - def _get_block_count(self): + def _get_block_count(self) -> int: """Total amount of UBI blocks in file. Returns: @@ -74,9 +81,9 @@ def _get_block_count(self): block_count = property(_get_block_count) - def _set_first_peb_num(self, i): + def _set_first_peb_num(self, i: int) -> None: self._first_peb_num = i - def _get_first_peb_num(self): + def _get_first_peb_num(self) -> int: """First Physical Erase Block with UBI data Returns: @@ -86,7 +93,7 @@ def _get_first_peb_num(self): first_peb_num = property(_get_first_peb_num, _set_first_peb_num) - def _get_leb_size(self): + def _get_leb_size(self) -> int: """LEB size of UBI blocks in file. Returns: @@ -96,7 +103,7 @@ def _get_leb_size(self): leb_size = property(_get_leb_size) - def _get_peb_size(self): + def _get_peb_size(self) -> int: """PEB size of UBI blocks in file. Returns: @@ -106,7 +113,7 @@ def _get_peb_size(self): peb_size = property(_get_peb_size) - def _get_min_io_size(self): + def _get_min_io_size(self) -> int: """Min I/O Size Returns: @@ -116,7 +123,7 @@ def _get_min_io_size(self): min_io_size = property(_get_min_io_size) - def _get_blocks(self): + def _get_blocks(self) -> dict[int, Block]: """Main Dict of UBI Blocks Passed around for lists of indexes to be made or to be returned @@ -142,7 +149,7 @@ class ubi(ubi_base): List:unknown_blocks_list -- List of blocks with unknown types. * """ - def __init__(self, ubi_file): + def __init__(self, ubi_file: UbiFile) -> None: super(ubi, self).__init__(ubi_file) layout_list, data_list, int_vol_list, unknown_list = sort.by_type(self.blocks) @@ -161,12 +168,12 @@ def __init__(self, ubi_file): layout_infos = layout.associate_blocks(self.blocks, layout_pairs) - self._images = [] + self._images: list[Image] = [] for i in range(0, len(layout_infos)): self._images.append(image(self.blocks, layout_infos[i])) - def _get_images(self): + def _get_images(self) -> list[Image]: """Get UBI images. Returns: @@ -176,7 +183,7 @@ def _get_images(self): images = property(_get_images) - def _get_data_blocks_list(self): + def _get_data_blocks_list(self) -> list[int]: """Get all UBI blocks found in file that are data blocks. Returns: @@ -186,7 +193,7 @@ def _get_data_blocks_list(self): data_blocks_list = property(_get_data_blocks_list) - def _get_layout_blocks_list(self): + def _get_layout_blocks_list(self) -> list[int]: """Get all UBI blocks found in file that are layout volume blocks. Returns: @@ -196,7 +203,7 @@ def _get_layout_blocks_list(self): layout_blocks_list = property(_get_layout_blocks_list) - def _get_int_vol_blocks_list(self): + def _get_int_vol_blocks_list(self) -> list[int]: """Get all UBI blocks found in file that are internal volume blocks. Returns: @@ -208,7 +215,7 @@ def _get_int_vol_blocks_list(self): int_vol_blocks_list = property(_get_int_vol_blocks_list) - def _get_unknown_blocks_list(self): + def _get_unknown_blocks_list(self) -> list[int]: """Get all UBI blocks found in file of unknown type.. Returns: @@ -217,7 +224,7 @@ def _get_unknown_blocks_list(self): return self._unknown_blocks_list unknown_blocks_list = property(_get_unknown_blocks_list) - def display(self, tab=''): + def display(self, tab: str = '') -> str: """Print information about this object. Argument: diff --git a/ubireader/ubi/block/__init__.py b/ubireader/ubi/block/__init__.py index ce80f96..ca42d57 100755 --- a/ubireader/ubi/block/__init__.py +++ b/ubireader/ubi/block/__init__.py @@ -17,6 +17,8 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations +from typing import TYPE_CHECKING from zlib import crc32 from ubireader import settings from ubireader.debug import error, log, verbose_display, verbose_log @@ -24,6 +26,10 @@ from ubireader.ubi.defines import UBI_EC_HDR_SZ, UBI_VID_HDR_SZ, UBI_INTERNAL_VOL_START, UBI_EC_HDR_MAGIC, UBI_CRC32_INIT from ubireader.ubi.headers import ec_hdr, vid_hdr, vtbl_recs +if TYPE_CHECKING: + from collections.abc import Iterable, Mapping + from ubireader.ubi import ubi_base as UbiBase + from ubireader.ubi.headers import _vtbl_rec as VtblRec class description(object): """UBI Block description Object @@ -48,16 +54,18 @@ class description(object): Will print out all information when invoked as a string. """ - def __init__(self, block_buf): + data_crc: int + + def __init__(self, block_buf: bytes) -> None: self.file_offset = -1 self.peb_num = -1 self.leb_num = -1 self.size = -1 - self.vid_hdr = None + self.vid_hdr: vid_hdr | None = None self.is_internal_vol = False - self.vtbl_recs = [] + self.vtbl_recs: list[VtblRec] = [] # TODO better understanding of block types/errors self.ec_hdr = ec_hdr(block_buf[0:UBI_EC_HDR_SZ]) @@ -77,16 +85,16 @@ def __init__(self, block_buf): self.is_valid = not self.ec_hdr.errors and not self.vid_hdr.errors or settings.ignore_block_header_errors - def __repr__(self): + def __repr__(self) -> str: return 'Block: PEB# %s: LEB# %s' % (self.peb_num, self.leb_num) - def display(self, tab=''): + def display(self, tab: str ='') -> str: return display.block(self, tab) -def get_blocks_in_list(blocks, idx_list): +def get_blocks_in_list(blocks: Mapping[int, description], idx_list: Iterable[int]) -> dict[int, description]: """Retrieve block objects in list of indexes Arguments: @@ -103,7 +111,7 @@ def get_blocks_in_list(blocks, idx_list): -def extract_blocks(ubi): +def extract_blocks(ubi: UbiBase) -> dict[int, description]: """Get a list of UBI block objects from file Arguments:. @@ -113,11 +121,11 @@ def extract_blocks(ubi): Dict -- Of block objects keyed by PEB number. """ - blocks = {} + blocks: dict[int, description] = {} ubi.file.seek(ubi.file.start_offset) peb_count = 0 cur_offset = 0 - bad_blocks = [] + bad_blocks: list[int] = [] # range instead of xrange, as xrange breaks > 4GB end_offset. for i in range(ubi.file.start_offset, ubi.file.end_offset, ubi.file.block_size): @@ -157,8 +165,8 @@ def extract_blocks(ubi): return blocks -def rm_old_blocks(blocks, block_list): - del_blocks = [] +def rm_old_blocks(blocks: Mapping[int, description], block_list: Iterable[int]) -> list[int]: + del_blocks: list[int] = [] for i in block_list: if i in del_blocks: diff --git a/ubireader/ubi/block/layout.py b/ubireader/ubi/block/layout.py index 67388d9..f5e575e 100755 --- a/ubireader/ubi/block/layout.py +++ b/ubireader/ubi/block/layout.py @@ -17,10 +17,20 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations +from typing import TYPE_CHECKING, Literal, Protocol, overload from ubireader.debug import log from ubireader.ubi.block import sort -def group_pairs(blocks, layout_blocks_list): +if TYPE_CHECKING: + from collections.abc import Iterable, Mapping + from ubireader.ubi.block import description as Block + +class _LayoutPair(Protocol): + def __getitem__(self, idx: Literal[0, 1], /) -> int: ... + def __contains__(self, key: object, /) -> bool: ... + +def group_pairs(blocks: Mapping[int, Block], layout_blocks_list: Iterable[int]) -> list[_LayoutPair]: """Sort a list of layout blocks into pairs Arguments: @@ -31,7 +41,7 @@ def group_pairs(blocks, layout_blocks_list): List -- Layout block pair indexes grouped in a list """ - image_dict={} + image_dict: dict[int, _LayoutPair] = {} for block_id in layout_blocks_list: image_seq=blocks[block_id].ec_hdr.image_seq if image_seq not in image_dict: @@ -43,8 +53,13 @@ def group_pairs(blocks, layout_blocks_list): return list(image_dict.values()) +class _LayoutInfo(_LayoutPair, Protocol): + @overload + def __getitem__(self, idx: Literal[0, 1], /) -> int: ... + @overload + def __getitem__(self, idx: Literal[2], /) -> list[int]: ... -def associate_blocks(blocks, layout_pairs): +def associate_blocks(blocks: Mapping[int, Block], layout_pairs: list[_LayoutPair]) -> list[_LayoutInfo]: """Group block indexes with appropriate layout pairs Arguments: @@ -55,7 +70,7 @@ def associate_blocks(blocks, layout_pairs): List -- Layout block pairs grouped with associated block ranges. """ - seq_blocks = [] + seq_blocks: list[int] = [] for layout_pair in layout_pairs: seq_blocks = sort.by_image_seq(blocks, blocks[layout_pair[0]].ec_hdr.image_seq) seq_blocks = [b for b in seq_blocks if b not in layout_pair] diff --git a/ubireader/ubi/block/sort.py b/ubireader/ubi/block/sort.py index e9e115b..f9705ff 100644 --- a/ubireader/ubi/block/sort.py +++ b/ubireader/ubi/block/sort.py @@ -17,9 +17,15 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations +from typing import TYPE_CHECKING, Literal from ubireader import settings -def by_image_seq(blocks, image_seq): +if TYPE_CHECKING: + from collections.abc import Mapping + from ubireader.ubi.block import description as Block + +def by_image_seq(blocks: Mapping[int, Block], image_seq: int) -> list[int]: """Filter blocks to return only those associated with the provided image_seq number. If uboot_fix is set, associate blocks with an image_seq of 0 also. @@ -36,7 +42,7 @@ def by_image_seq(blocks, image_seq): else: return list(filter(lambda block: blocks[block].ec_hdr.image_seq == image_seq, blocks)) -def by_leb(blocks): +def by_leb(blocks: Mapping[int, Block]) -> list[Literal['x'] | int]: """Sort blocks by Logical Erase Block number. Arguments: @@ -46,7 +52,7 @@ def by_leb(blocks): List -- Indexes of blocks sorted by LEB. """ slist_len = len(blocks) - slist = ['x'] * slist_len + slist: list[Literal['x'] | int] = ['x'] * slist_len for block in blocks: if blocks[block].leb_num >= slist_len: @@ -59,7 +65,7 @@ def by_leb(blocks): return slist -def by_vol_id(blocks, slist=None): +def by_vol_id(blocks: Mapping[int, Block], slist: list[int] | None = None) -> dict[int, list[int]]: """Sort blocks by volume id Arguments: @@ -70,7 +76,7 @@ def by_vol_id(blocks, slist=None): Dict -- blocks grouped in lists with dict key as volume id. """ - vol_blocks = {} + vol_blocks: dict[int, list[int]] = {} # sort block by volume # not reliable with multiple partitions (fifo) @@ -88,7 +94,7 @@ def by_vol_id(blocks, slist=None): return vol_blocks -def by_type(blocks, slist=None): +def by_type(blocks: Mapping[int, Block], slist: list[int] | None = None) -> tuple[list[int], list[int], list[int], list[int]]: """Sort blocks into layout, internal volume, data or unknown Arguments: @@ -106,10 +112,10 @@ def by_type(blocks, slist=None): of crc in ed_hdr or vid_hdr. """ - layout = [] - data = [] - int_vol = [] - unknown = [] + layout: list[int] = [] + data: list[int] = [] + int_vol: list[int] = [] + unknown: list[int] = [] for i in blocks: if slist and i not in slist: diff --git a/ubireader/ubi/display.py b/ubireader/ubi/display.py index 9efbc6a..b2f9080 100755 --- a/ubireader/ubi/display.py +++ b/ubireader/ubi/display.py @@ -17,10 +17,19 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations +from typing import TYPE_CHECKING from ubireader import settings from ubireader.ubi.defines import PRINT_COMPAT_LIST, PRINT_VOL_TYPE_LIST, UBI_VTBL_AUTORESIZE_FLG -def ubi(ubi, tab=''): +if TYPE_CHECKING: + from ubireader.ubi import ubi as Ubi + from ubireader.ubi.block import description as Block + from ubireader.ubi.headers import ec_hdr as EcHdr, vid_hdr as VidHdr, _vtbl_rec as VtblRec + from ubireader.ubi.image import description as Image + from ubireader.ubi.volume import description as Volume + +def ubi(ubi: Ubi, tab: str = '') -> str: buf = '%sUBI File\n' % (tab) buf += '%s---------------------\n' % (tab) buf += '\t%sMin I/O: %s\n' % (tab, ubi.min_io_size) @@ -35,7 +44,7 @@ def ubi(ubi, tab=''): return buf -def image(image, tab=''): +def image(image: Image, tab: str = '') -> str: buf = '%s%s\n' % (tab, image) buf += '%s---------------------\n' % (tab) buf += '\t%sImage Sequence Num: %s\n' % (tab, image.image_seq) @@ -45,7 +54,7 @@ def image(image, tab=''): return buf -def volume(volume, tab=''): +def volume(volume: Volume, tab: str = '') -> str: buf = '%s%s\n' % (tab, volume) buf += '%s---------------------\n' % (tab) buf += '\t%sVol ID: %s\n' % (tab, volume.vol_id) @@ -61,7 +70,7 @@ def volume(volume, tab=''): return buf -def block(block, tab='\t'): +def block(block: Block, tab: str = '\t') -> str: buf = '%s%s\n' % (tab, block) buf += '%s---------------------\n' % (tab) buf += '\t%sFile Offset: %s\n' % (tab, block.file_offset) @@ -95,7 +104,7 @@ def block(block, tab='\t'): return buf -def ec_hdr(ec_hdr, tab=''): +def ec_hdr(ec_hdr: EcHdr, tab: str = '') -> str: buf = '' for key, value in ec_hdr: if key == 'errors': @@ -108,7 +117,7 @@ def ec_hdr(ec_hdr, tab=''): return buf -def vid_hdr(vid_hdr, tab=''): +def vid_hdr(vid_hdr: VidHdr, tab: str = '') -> str: buf = '' for key, value in vid_hdr: if key == 'errors': @@ -133,7 +142,7 @@ def vid_hdr(vid_hdr, tab=''): return buf -def vol_rec(vol_rec, tab=''): +def vol_rec(vol_rec: VtblRec, tab: str = '') -> str: buf = '' for key, value in vol_rec: if key == 'errors': diff --git a/ubireader/ubi/headers.py b/ubireader/ubi/headers.py index 5fa3905..8f60030 100755 --- a/ubireader/ubi/headers.py +++ b/ubireader/ubi/headers.py @@ -17,30 +17,48 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations import struct +from typing import TYPE_CHECKING, Any from zlib import crc32 from ubireader.debug import log from ubireader.ubi.defines import * +if TYPE_CHECKING: + from collections.abc import Iterator + + class ec_hdr(object): - def __init__(self, buf): + errors: list[str] + + magic: bytes + version: int + padding: bytes + ec: int + vid_hdr_offset: int + data_offset: int + image_seq: int + padding2: bytes + hdr_crc: int + + def __init__(self, buf: bytes) -> None: fields = dict(list(zip(EC_HDR_FIELDS, struct.unpack(EC_HDR_FORMAT,buf)))) for key in fields: setattr(self, key, fields[key]) setattr(self, 'errors', []) self._check_errors(buf[:-4]) - - def __repr__(self): + + def __repr__(self) -> str: return 'Erase Count Header' - def __iter__(self): + def __iter__(self) -> Iterator[tuple[str, Any]]: for key in dir(self): if not key.startswith('_'): yield key, getattr(self, key) - def _check_errors(self, buf_crc): + def _check_errors(self, buf_crc: bytes) -> None: crc_chk = (~crc32(buf_crc) & UBI_CRC32_INIT) if self.hdr_crc != crc_chk: log(vid_hdr, 'CRC Failed: expected 0x%x got 0x%x' % (crc_chk, self.hdr_crc)) @@ -48,7 +66,26 @@ def _check_errors(self, buf_crc): class vid_hdr(object): - def __init__(self, buf): + errors: list[str] + + magic: bytes + version: int + vol_type: int + copy_flag: int + compat: int + vol_id: int + lnum: int + padding: bytes + data_size: int + used_ebs: int + data_pad: int + data_crc: int + padding2: bytes + sqnum: int + padding3: bytes + hdr_crc: int + + def __init__(self, buf: bytes) -> None: fields = dict(list(zip(VID_HDR_FIELDS, struct.unpack(VID_HDR_FORMAT,buf)))) for key in fields: setattr(self, key, fields[key]) @@ -56,24 +93,24 @@ def __init__(self, buf): self._check_errors(buf[:-4]) - def __iter__(self): + def __iter__(self) -> Iterator[tuple[str, Any]]: for key in dir(self): if not key.startswith('_'): yield key, getattr(self, key) - def __repr__(self): + def __repr__(self) -> str: return 'VID Header' - def _check_errors(self, buf_crc): + def _check_errors(self, buf_crc: bytes) -> None: crc_chk = (~crc32(buf_crc) & UBI_CRC32_INIT) if self.hdr_crc != crc_chk: log(vid_hdr, 'CRC Failed: expected 0x%x got 0x%x' % (crc_chk, self.hdr_crc)) self.errors.append('crc') -def vtbl_recs(buf): +def vtbl_recs(buf: bytes) -> list[_vtbl_rec]: data_buf = buf - vtbl_recs = [] + vtbl_recs: list[_vtbl_rec] = [] vtbl_rec_ret = '' for i in range(0, UBI_MAX_VOLUMES): @@ -91,7 +128,21 @@ def vtbl_recs(buf): class _vtbl_rec(object): - def __init__(self, buf): + errors: list[str] + rec_index: int + + reserved_pebs: int + alignment: int + data_pad: int + vol_type: int + upd_marker: int + name_len: int + name: bytes + flags: int + padding: bytes + crc: int + + def __init__(self, buf: bytes) -> None: fields = dict(list(zip(VTBL_REC_FIELDS, struct.unpack(VTBL_REC_FORMAT,buf)))) for key in fields: setattr(self, key, fields[key]) @@ -104,11 +155,11 @@ def __init__(self, buf): def __repr__(self): return 'Volume Table Record: %s' % getattr(self, 'name') - def __iter__(self): + def __iter__(self) -> Iterator[tuple[str, Any]]: for key in dir(self): if not key.startswith('_'): yield key, getattr(self, key) - def _check_errors(self, buf_crc): + def _check_errors(self, buf_crc: bytes) -> None: if self.crc != (~crc32(buf_crc) & 0xFFFFFFFF): self.errors.append('crc') diff --git a/ubireader/ubi/image.py b/ubireader/ubi/image.py index f634b28..23852e6 100755 --- a/ubireader/ubi/image.py +++ b/ubireader/ubi/image.py @@ -17,13 +17,21 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations +from typing import TYPE_CHECKING from ubireader.debug import log from ubireader.ubi import display from ubireader.ubi.volume import get_volumes from ubireader.ubi.block import get_blocks_in_list +if TYPE_CHECKING: + from collections.abc import Mapping + from ubireader.ubi.block import description as Block + from ubireader.ubi.block.layout import _LayoutInfo + from ubireader.ubi.volume import description as Volume + class description(object): - def __init__(self, blocks, layout_info): + def __init__(self, blocks: dict[int, Block], layout_info: _LayoutInfo) -> None: self._image_seq = blocks[layout_info[0]].ec_hdr.image_seq self.vid_hdr_offset = blocks[layout_info[0]].ec_hdr.vid_hdr_offset self.version = blocks[layout_info[0]].ec_hdr.version @@ -33,28 +41,28 @@ def __init__(self, blocks, layout_info): self._volumes = get_volumes(blocks, layout_info) log(description, 'Created Image: %s, Volume Cnt: %s' % (self.image_seq, len(self.volumes))) - def __repr__(self): + def __repr__(self) -> str: return 'Image: %s' % (self.image_seq) - def get_blocks(self, blocks): + def get_blocks(self, blocks: Mapping[int, Block]) -> dict[int, Block]: return get_blocks_in_list(blocks, self._block_list) - def _get_peb_range(self): + def _get_peb_range(self) -> list[int]: return [self._start_peb, self._end_peb] peb_range = property(_get_peb_range) - def _get_image_seq(self): + def _get_image_seq(self) -> int: return self._image_seq image_seq = property(_get_image_seq) - def _get_volumes(self): + def _get_volumes(self) -> dict[str, Volume]: return self._volumes volumes = property(_get_volumes) - def display(self, tab=''): + def display(self, tab: str = '') -> str: return display.image(self, tab) diff --git a/ubireader/ubi/volume.py b/ubireader/ubi/volume.py index f02834b..4ee9cd9 100755 --- a/ubireader/ubi/volume.py +++ b/ubireader/ubi/volume.py @@ -17,10 +17,19 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations +from typing import TYPE_CHECKING from ubireader.debug import log from ubireader.ubi import display from ubireader.ubi.block import sort, get_blocks_in_list, rm_old_blocks +if TYPE_CHECKING: + from collections.abc import Iterator, Mapping + from ubireader.ubi import ubi as Ubi + from ubireader.ubi.block import description as Block + from ubireader.ubi.block.layout import _LayoutInfo + from ubireader.ubi.headers import _vtbl_rec as VtblRec + class description(object): """UBI Volume object @@ -39,7 +48,7 @@ class description(object): Volume object is basically a list of block indexes and some metadata describing a volume found in a UBI image. """ - def __init__(self, vol_id, vol_rec, block_list): + def __init__(self, vol_id: int, vol_rec: VtblRec, block_list: list[int]) -> None: self._vol_id = vol_id self._vol_rec = vol_rec self._name = self._vol_rec.name @@ -47,44 +56,44 @@ def __init__(self, vol_id, vol_rec, block_list): log(description, 'Create Volume: %s, ID: %s, Block Cnt: %s' % (self.name, self.vol_id, len(self.block_list))) - def __repr__(self): + def __repr__(self) -> str: return 'Volume: %s' % (self.name.decode('utf-8')) - def _get_name(self): + def _get_name(self) -> bytes: return self._name name = property(_get_name) - def _get_vol_id(self): + def _get_vol_id(self) -> int: return self._vol_id vol_id = property(_get_vol_id) - def _get_block_count(self): + def _get_block_count(self) -> int: return len(self._block_list) block_count = property(_get_block_count) - def _get_vol_rec(self): + def _get_vol_rec(self) -> VtblRec: return self._vol_rec vol_rec = property(_get_vol_rec) - def _get_block_list(self): + def _get_block_list(self) -> list[int]: return self._block_list block_list = property(_get_block_list) - def get_blocks(self, blocks): + def get_blocks(self, blocks: Mapping[int, Block]) -> dict[int, Block]: return get_blocks_in_list(blocks, self._block_list) - def display(self, tab=''): + def display(self, tab: str = '') -> str: return display.volume(self, tab) - def reader(self, ubi): + def reader(self, ubi: Ubi) -> Iterator[bytes]: last_leb = 0 for block in sort.by_leb(self.get_blocks(ubi.blocks)): if block == 'x': @@ -95,7 +104,7 @@ def reader(self, ubi): yield ubi.file.read_block_data(ubi.blocks[block]) -def get_volumes(blocks, layout_info): +def get_volumes(blocks: Mapping[int, Block], layout_info: _LayoutInfo) -> dict[str, description]: """Get a list of UBI volume objects from list of blocks Arguments: @@ -107,7 +116,7 @@ def get_volumes(blocks, layout_info): Dict -- Of Volume objects by volume name, including any relevant blocks. """ - volumes = {} + volumes: dict[str, description] = {} vol_blocks_lists = sort.by_vol_id(blocks, layout_info[2]) for vol_rec in blocks[layout_info[0]].vtbl_recs: From 6ed18b09cd196428d36b6553bf24a964b496d76f Mon Sep 17 00:00:00 2001 From: Simon Berger Date: Sun, 7 Sep 2025 11:54:25 +0000 Subject: [PATCH 2/3] feat: add type hints to ubireader.ubifs --- ubireader/ubifs/__init__.py | 24 ++-- ubireader/ubifs/decrypt.py | 17 ++- ubireader/ubifs/display.py | 24 ++-- ubireader/ubifs/list.py | 29 +++-- ubireader/ubifs/misc.py | 17 ++- ubireader/ubifs/nodes.py | 220 ++++++++++++++++++++++++++++++------ ubireader/ubifs/output.py | 17 ++- ubireader/ubifs/walk.py | 17 ++- 8 files changed, 285 insertions(+), 80 deletions(-) diff --git a/ubireader/ubifs/__init__.py b/ubireader/ubifs/__init__.py index d8b5418..4f8c808 100755 --- a/ubireader/ubifs/__init__.py +++ b/ubireader/ubifs/__init__.py @@ -17,10 +17,14 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations +from typing import TYPE_CHECKING from ubireader.debug import error, log, verbose_display from ubireader.ubifs.defines import * from ubireader.ubifs import nodes, display -from typing import Optional + +if TYPE_CHECKING: + from ubireader.ubi_io import ubi_file as UbiFile, leb_virtual_file as LebVirtualFile class ubifs(): """UBIFS object @@ -36,7 +40,7 @@ class ubifs(): Obj:mst_node -- Master Node of UBIFS image LEB1 Obj:mst_node2 -- Master Node 2 of UBIFS image LEB2 """ - def __init__(self, ubifs_file, master_key: Optional[bytes] = None): + def __init__(self, ubifs_file: UbiFile | LebVirtualFile, master_key: bytes | None = None) -> None: self.__name__ = 'UBIFS' self._file = ubifs_file self.master_key = master_key @@ -59,7 +63,7 @@ def __init__(self, ubifs_file, master_key: Optional[bytes] = None): except Exception as e: error(self, 'Fatal', 'Super block error: %s' % e) - self._mst_nodes = [None, None] + self._mst_nodes: list[nodes.mst_node | None] = [None, None] for i in range(0, 2): try: mst_offset = self.leb_size * (UBIFS_MST_LNUM + i) @@ -88,12 +92,12 @@ def __init__(self, ubifs_file, master_key: Optional[bytes] = None): log(self , 'Swapping Master Nodes due to bad first node.') - def _get_file(self): + def _get_file(self) -> UbiFile | LebVirtualFile: return self._file file = property(_get_file) - def _get_superblock(self): + def _get_superblock(self) -> nodes.sb_node: """ Superblock Node Object Returns: @@ -103,7 +107,7 @@ def _get_superblock(self): superblock_node = property(_get_superblock) - def _get_master_node(self): + def _get_master_node(self) -> nodes.mst_node | None: """Master Node Object Returns: @@ -113,7 +117,7 @@ def _get_master_node(self): master_node = property(_get_master_node) - def _get_master_node2(self): + def _get_master_node2(self) -> nodes.mst_node | None: """Master Node Object 2 Returns: @@ -123,7 +127,7 @@ def _get_master_node2(self): master_node2 = property(_get_master_node2) - def _get_leb_size(self): + def _get_leb_size(self) -> int: """LEB size of UBI blocks in file. Returns: @@ -133,7 +137,7 @@ def _get_leb_size(self): leb_size = property(_get_leb_size) - def _get_min_io_size(self): + def _get_min_io_size(self) -> int: """Min I/O Size Returns: @@ -142,7 +146,7 @@ def _get_min_io_size(self): return self._min_io_size min_io_size = property(_get_min_io_size) - def display(self, tab=''): + def display(self, tab: str = '') -> str: """Print information about this object. Argument: diff --git a/ubireader/ubifs/decrypt.py b/ubireader/ubifs/decrypt.py index 0d5b816..4eb4e0a 100644 --- a/ubireader/ubifs/decrypt.py +++ b/ubireader/ubifs/decrypt.py @@ -1,12 +1,19 @@ +from __future__ import annotations +from typing import TYPE_CHECKING from ubireader.ubifs.defines import UBIFS_XATTR_NAME_ENCRYPTION_CONTEXT from ubireader.debug import error from cryptography.hazmat.primitives.ciphers import ( Cipher, algorithms, modes ) +if TYPE_CHECKING: + from collections.abc import Mapping + from ubireader.ubifs import ubifs as Ubifs, nodes + from ubireader.ubifs.walk import Inode + AES_BLOCK_SIZE = algorithms.AES.block_size // 8 -def lookup_inode_nonce(inodes: dict, inode: dict) -> bytes: +def lookup_inode_nonce(inodes: Mapping[int, Inode], inode: Inode) -> bytes | None: # get the extended attribute 'xent' of the inode if 'xent' not in inode or not inode['xent']: raise ValueError(f"No xent found for inode {inode}") @@ -29,7 +36,7 @@ def derive_key_from_nonce(master_key: bytes, nonce: bytes) -> bytes: return derived_key -def filename_decrypt(key: bytes, ciphertext: bytes): +def filename_decrypt(key: bytes, ciphertext: bytes) -> bytes: # using AES CTS-CBC mode not supported by pyca cryptography if len(ciphertext) > AES_BLOCK_SIZE: @@ -59,7 +66,7 @@ def filename_decrypt(key: bytes, ciphertext: bytes): return plaintext.rstrip(b'\x00') -def datablock_decrypt(block_key: bytes, block_iv: bytes, block_data: bytes): +def datablock_decrypt(block_key: bytes, block_iv: bytes, block_data: bytes) -> bytes: decryptor = Cipher( algorithms.AES(block_key), modes.XTS(block_iv), @@ -67,7 +74,7 @@ def datablock_decrypt(block_key: bytes, block_iv: bytes, block_data: bytes): return decryptor.update(block_data) + decryptor.finalize() -def decrypt_filenames(ubifs, inodes): +def decrypt_filenames(ubifs: Ubifs, inodes: Mapping[int, Inode]) -> None: if ubifs.master_key is None: for inode in inodes.values(): for dent in inode.get('dent', []): @@ -87,7 +94,7 @@ def decrypt_filenames(ubifs, inodes): error(decrypt_filenames, 'Error', str(e)) -def decrypt_symlink_target(ubifs, inodes, dent_node) -> str: +def decrypt_symlink_target(ubifs: Ubifs, inodes: Mapping[int, Inode], dent_node: nodes.dent_node) -> str: if ubifs.master_key is None: return inodes[dent_node.inum]['ino'].data.decode() inode = inodes[dent_node.inum] diff --git a/ubireader/ubifs/display.py b/ubireader/ubifs/display.py index c7241f0..e0da488 100644 --- a/ubireader/ubifs/display.py +++ b/ubireader/ubifs/display.py @@ -17,16 +17,22 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations +from typing import TYPE_CHECKING from ubireader.ubifs.defines import PRINT_UBIFS_FLGS, PRINT_UBIFS_MST -def ubifs(ubifs, tab=''): +if TYPE_CHECKING: + from ubireader.ubifs import ubifs as Ubifs + from ubireader.ubifs import nodes + +def ubifs(ubifs: Ubifs, tab: str = '') -> str: buf = '%sUBIFS Image\n' % (tab) buf += '%s---------------------\n' % (tab) buf += '%sMin I/O: %s\n' % (tab, ubifs.min_io_size) buf += '%sLEB Size: %s\n' % (tab, ubifs.leb_size) return buf -def common_hdr(chdr, tab=''): +def common_hdr(chdr: nodes.common_hdr, tab: str = '') -> str: buf = '%s%s\n' % (tab, chdr) buf += '%s---------------------\n' % (tab) tab += '\t' @@ -41,7 +47,7 @@ def common_hdr(chdr, tab=''): buf += '%s%s: %r\n' % (tab, key, value) return buf -def sb_node(node, tab=''): +def sb_node(node: nodes.sb_node, tab: str = '') -> str: buf = '%s%s\n' % (tab, node) buf += '%sFile offset: %s\n' % (tab, node.file_offset) buf += '%s---------------------\n' % (tab) @@ -69,7 +75,7 @@ def sb_node(node, tab=''): return buf -def mst_node(node, tab=''): +def mst_node(node: nodes.mst_node, tab: str = '') -> str: buf = '%s%s\n' % (tab, node) buf += '%sFile offset: %s\n' % (tab, node.file_offset) buf += '%s---------------------\n' % (tab) @@ -94,7 +100,7 @@ def mst_node(node, tab=''): return buf -def dent_node(node, tab=''): +def dent_node(node: nodes.dent_node, tab: str = '') -> str: buf = '%s%s\n' % (tab, node) buf += '%s---------------------\n' % (tab) tab += '\t' @@ -108,7 +114,7 @@ def dent_node(node, tab=''): return buf -def data_node(node, tab=''): +def data_node(node: nodes.data_node, tab: str = '') -> str: buf = '%s%s\n' % (tab, node) buf += '%s---------------------\n' % (tab) tab += '\t' @@ -122,7 +128,7 @@ def data_node(node, tab=''): return buf -def idx_node(node, tab=''): +def idx_node(node: nodes.idx_node, tab: str = '') -> str: buf = '%s%s\n' % (tab, node) buf += '%s---------------------\n' % (tab) tab += '\t' @@ -136,7 +142,7 @@ def idx_node(node, tab=''): return buf -def ino_node(node, tab=''): +def ino_node(node: nodes.ino_node, tab: str = '') -> str: buf = '%s%s\n' % (tab, node) buf += '%s---------------------\n' % (tab) tab += '\t' @@ -150,7 +156,7 @@ def ino_node(node, tab=''): return buf -def branch(node, tab=''): +def branch(node: nodes.branch, tab: str = '') -> str: buf = '%s%s\n' % (tab, node) buf += '%s---------------------\n' % (tab) tab += '\t' diff --git a/ubireader/ubifs/list.py b/ubireader/ubifs/list.py index b7d2c92..c858d05 100755 --- a/ubireader/ubifs/list.py +++ b/ubireader/ubifs/list.py @@ -17,25 +17,30 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations import os import time -import struct +from typing import TYPE_CHECKING from ubireader.ubifs.decrypt import decrypt_symlink_target from ubireader.ubifs.defines import * from ubireader.ubifs import walk from ubireader.ubifs.misc import process_reg_file from ubireader.debug import error +if TYPE_CHECKING: + from collections.abc import Mapping + from ubireader.ubifs import ubifs as Ubifs, nodes + from ubireader.ubifs.walk import Inode -def list_files(ubifs, list_path): +def list_files(ubifs: Ubifs, list_path: str) -> None: pathnames = list_path.split("/") - pnames = [] + pnames: list[str] = [] for i in pathnames: if len(i) > 0: pnames.append(i) try: - inodes = {} - bad_blocks = [] + inodes: dict[int, Inode] = {} + bad_blocks: list[int] = [] walk.index(ubifs, ubifs.master_node.root_lnum, ubifs.master_node.root_offs, inodes, bad_blocks) @@ -60,9 +65,9 @@ def list_files(ubifs, list_path): error(list_files, 'Error', '%s' % e) -def copy_file(ubifs, filepath, destpath): +def copy_file(ubifs: Ubifs, filepath: str, destpath: str) -> bool: pathnames = filepath.split("/") - pnames = [] + pnames: list[str] = [] for i in pathnames: if len(i) > 0: pnames.append(i) @@ -70,8 +75,8 @@ def copy_file(ubifs, filepath, destpath): filename = pnames[len(pnames)-1] del pnames[-1] - inodes = {} - bad_blocks = [] + inodes: dict[int, Inode] = {} + bad_blocks: list[int] = [] walk.index(ubifs, ubifs.master_node.root_lnum, ubifs.master_node.root_offs, inodes, bad_blocks) @@ -97,7 +102,7 @@ def copy_file(ubifs, filepath, destpath): return False -def find_dir(inodes, inum, names, idx): +def find_dir(inodes: Mapping[int, Inode], inum: int, names: list[str], idx: int) -> int | None: if len(names) == 0: return 1 for dent in inodes[inum]['dent']: @@ -109,7 +114,7 @@ def find_dir(inodes, inum, names, idx): return None -def print_dent(ubifs, inodes, dent_node, long=True, longts=False): +def print_dent(ubifs: Ubifs, inodes: Mapping[int, Inode], dent_node: nodes.dent_node, long: bool = True, longts: bool = False) -> None: inode = inodes[dent_node.inum] if long: fl = file_leng(ubifs, inode) @@ -128,7 +133,7 @@ def print_dent(ubifs, inodes, dent_node, long=True, longts=False): print(dent_node.name) -def file_leng(ubifs, inode): +def file_leng(ubifs: Ubifs, inode: Inode) -> int: fl = 0 if 'data' in inode: compr_type = 0 diff --git a/ubireader/ubifs/misc.py b/ubireader/ubifs/misc.py index 792703c..30dbb33 100755 --- a/ubireader/ubifs/misc.py +++ b/ubireader/ubifs/misc.py @@ -17,6 +17,8 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations +from typing import TYPE_CHECKING, TypedDict from lzallright import LZOCompressor import struct import zlib @@ -25,13 +27,22 @@ from ubireader.debug import error from ubireader.ubifs.decrypt import lookup_inode_nonce, derive_key_from_nonce, datablock_decrypt +if TYPE_CHECKING: + from collections.abc import Mapping + from ubireader.ubifs import ubifs as Ubifs + from ubireader.ubifs.walk import Inode + # For happy printing ino_types = ['file', 'dir','lnk','blk','chr','fifo','sock'] node_types = ['ino','data','dent','xent','trun','pad','sb','mst','ref','idx','cs','orph'] key_types = ['ino','data','dent','xent'] +class ParsedKey(TypedDict): + type: str + ino_num: int + khash: int -def parse_key(key): +def parse_key(key: bytes) -> ParsedKey: """Parse node key Arguments: @@ -51,7 +62,7 @@ def parse_key(key): return {'type':key_type, 'ino_num':ino_num, 'khash': khash} -def decompress(ctype, unc_len, data): +def decompress(ctype: int, unc_len: int, data: bytes) -> bytes | None: """Decompress data. Arguments: @@ -76,7 +87,7 @@ def decompress(ctype, unc_len, data): return data -def process_reg_file(ubifs, inode, path, inodes): +def process_reg_file(ubifs: Ubifs, inode: Inode, path: str, inodes: Mapping[int, Inode]) -> bytes: try: buf = bytearray() start_key = (UBIFS_DATA_KEY << UBIFS_S_KEY_BLOCK_BITS) diff --git a/ubireader/ubifs/nodes.py b/ubireader/ubifs/nodes.py index 321cf45..29e7ad5 100755 --- a/ubireader/ubifs/nodes.py +++ b/ubireader/ubifs/nodes.py @@ -17,10 +17,16 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations +from typing import TYPE_CHECKING, Any from ubireader.ubifs.misc import parse_key from ubireader.ubifs.defines import * from ubireader.ubifs import display +if TYPE_CHECKING: + from collections.abc import Iterator + from ubireader.ubifs.misc import ParsedKey + class common_hdr(object): """Get common header at given LEB number + offset. @@ -29,7 +35,17 @@ class common_hdr(object): See ubifs/defines.py for object attributes. """ - def __init__(self, buf): + errors: list[str] + + magic: int + crc: int + sqnum: int + len: int + node_type: int + group_type: int + padding: bytes + + def __init__(self, buf: bytes) -> None: fields = dict(list(zip(UBIFS_COMMON_HDR_FIELDS, struct.unpack(UBIFS_COMMON_HDR_FORMAT, buf)))) for key in fields: @@ -37,15 +53,15 @@ def __init__(self, buf): setattr(self, 'errors', []) - def __repr__(self): + def __repr__(self) -> str: return 'UBIFS Common Header' - def __iter__(self): + def __iter__(self) -> Iterator[tuple[str, Any]]: for key in dir(self): if not key.startswith('_'): yield key, getattr(self, key) - def display(self, tab=''): + def display(self, tab: str = '') -> str: return display.common_hdr(self, tab) @@ -57,7 +73,32 @@ class ino_node(object): See ubifs/defines.py for object attributes. """ - def __init__(self, buf): + data: bytes + errors: list[str] + + key: ParsedKey + creat_sqnum: int + size: int + atime_sec: int + ctime_sec: int + mtime_sec: int + atime_nsec: int + ctime_nsec: int + mtime_nsec: int + nlink: int + uid: int + gid: int + mode: int + flags: int + data_len: int + xattr_cnt: int + xattr_size: int + padding1: bytes + xattr_names: int + compr_type: int + padding2: bytes + + def __init__(self, buf: bytes) -> None: fields = dict(list(zip(UBIFS_INO_NODE_FIELDS, struct.unpack(UBIFS_INO_NODE_FORMAT, buf[0:UBIFS_INO_NODE_SZ])))) for key in fields: @@ -69,15 +110,15 @@ def __init__(self, buf): setattr(self, 'data', buf[UBIFS_INO_NODE_SZ:]) setattr(self, 'errors', []) - def __repr__(self): + def __repr__(self) -> str: return 'UBIFS Ino Node' - def __iter__(self): + def __iter__(self) -> Iterator[tuple[str, Any]]: for key in dir(self): if not key.startswith('_'): yield key, getattr(self, key) - def display(self, tab=''): + def display(self, tab: str = '') -> str: return display.ino_node(self, tab) class xent_node(object): @@ -88,7 +129,17 @@ class xent_node(object): See ubifs/defines.py for object attributes. """ - def __init__(self, buf): + name: str + error: list[str] + + key: ParsedKey + inum: int + padding1: int + type: int + nlen: int + cookie: int + + def __init__(self, buf: bytes) -> None: fields = dict(zip(UBIFS_XENT_NODE_FIELDS, struct.unpack(UBIFS_XENT_NODE_FORMAT, buf[0:UBIFS_XENT_NODE_SZ]))) for key in fields: if key == 'key': @@ -106,7 +157,7 @@ def __iter__(self): if not key.startswith('_'): yield key, getattr(self, key) - def display(self, tab=''): + def display(self, tab: str = '') -> str: return display.dent_node(self, tab) class dent_node(object): @@ -117,7 +168,18 @@ class dent_node(object): See ubifs/defines.py for object attributes. """ - def __init__(self, buf): + raw_name: bytes + name: str + errors: list[str] + + key: ParsedKey + inum: int + padding1: int + type: int + nlen: int + cookie: int + + def __init__(self, buf: bytes) -> None: fields = dict(list(zip(UBIFS_DENT_NODE_FIELDS, struct.unpack(UBIFS_DENT_NODE_FORMAT, buf[0:UBIFS_DENT_NODE_SZ])))) for key in fields: if key == 'key': @@ -128,15 +190,15 @@ def __init__(self, buf): setattr(self, 'name', "") setattr(self, 'errors', []) - def __repr__(self): + def __repr__(self) -> str: return 'UBIFS Directory Entry Node' - def __iter__(self): + def __iter__(self) -> Iterator[tuple[str, Any]]: for key in dir(self): if not key.startswith('_'): yield key, getattr(self, key) - def display(self, tab=''): + def display(self, tab: str = '') -> str: return display.dent_node(self, tab) @@ -149,7 +211,16 @@ class data_node(object): See ubifs/defines.py for object attributes. """ - def __init__(self, buf, file_offset): + offset: int + compr_len: int + errors: list[str] + + key: ParsedKey + size: int + compr_type: int + plaintext_size: int + + def __init__(self, buf: bytes, file_offset: int) -> None: fields = dict(list(zip(UBIFS_DATA_NODE_FIELDS, struct.unpack(UBIFS_DATA_NODE_FORMAT, buf[0:UBIFS_DATA_NODE_SZ])))) for key in fields: @@ -162,15 +233,15 @@ def __init__(self, buf, file_offset): setattr(self, 'compr_len', (len(buf) - UBIFS_DATA_NODE_SZ)) setattr(self, 'errors', []) - def __repr__(self): + def __repr__(self) -> str: return 'UBIFS Data Node' - def __iter__(self): + def __iter__(self) -> Iterator[tuple[str, Any]]: for key in dir(self): if not key.startswith('_'): yield key, getattr(self, key) - def display(self, tab=''): + def display(self, tab: str = '') -> str: return display.data_node(self, tab) @@ -182,7 +253,13 @@ class idx_node(object): See ubifs/defines.py for object attributes. """ - def __init__(self, buf): + branches: list[branch] + errors: list[int] + + child_cnt: int + level: int + + def __init__(self, buf: bytes) -> None: fields = dict(list(zip(UBIFS_IDX_NODE_FIELDS, struct.unpack(UBIFS_IDX_NODE_FORMAT, buf[0:UBIFS_IDX_NODE_SZ])))) for key in fields: setattr(self, key, fields[key]) @@ -194,15 +271,15 @@ def __init__(self, buf): setattr(self, 'branches', [branch(buf[idxs+(brs*i):idxs+(brs*i)+brs]) for i in range(0, self.child_cnt)]) setattr(self, 'errors', []) - def __repr__(self): + def __repr__(self) -> str: return 'UBIFS Index Node' - def __iter__(self): + def __iter__(self) -> Iterator[tuple[str, Any]]: for key in dir(self): if not key.startswith('_'): yield key, getattr(self, key) - def display(self, tab=''): + def display(self, tab: str = '') -> str: return display.idx_node(self, tab) @@ -212,7 +289,15 @@ class branch(object): Arguments: Bin:buf -- Raw data to extract header information from. """ - def __init__(self, buf): + hash: bytes + errors: list[str] + + lnum: int + offs: int + len: int + key: ParsedKey + + def __init__(self, buf: bytes) -> None: fields = dict(list(zip(UBIFS_BRANCH_FIELDS, struct.unpack(UBIFS_BRANCH_FORMAT, buf[0:UBIFS_BRANCH_SZ])))) for key in fields: if key == 'key': @@ -225,15 +310,15 @@ def __init__(self, buf): setattr(self, 'errors', []) - def __repr__(self): + def __repr__(self) -> str: return 'UBIFS Branch' - def __iter__(self): + def __iter__(self) -> Iterator[tuple[str, Any]]: for key in dir(self): if not key.startswith('_'): yield key, getattr(self, key) - def display(self, tab=''): + def display(self, tab: str = '') -> str: return display.branch(self, tab) @@ -246,7 +331,39 @@ class sb_node(object): See ubifs/defines.py for object attributes. """ - def __init__(self, buf, file_offset=-1): + errors: list[str] + + padding: bytes + key_hash: int + key_fmt: int + flags: int + min_io_size: int + leb_size: int + leb_cnt: int + max_leb_cnt: int + max_bud_bytes: int + log_lebs: int + lpt_lebs: int + orph_lebs: int + jhead_cnt: int + fanout: int + lsave_cnt: int + fmt_version: int + default_compr: int + padding1: bytes + rp_uid: int + rp_gid: int + rp_size: int + time_gran: int + uuid: bytes + ro_compat_version: int + hmac: bytes + hmac_wkm: bytes + hash_algo: int + hash_mst: bytes + padding2: bytes + + def __init__(self, buf: bytes, file_offset: int = -1) -> None: self.file_offset = file_offset fields = dict(list(zip(UBIFS_SB_NODE_FIELDS, struct.unpack(UBIFS_SB_NODE_FORMAT, buf)))) for key in fields: @@ -254,15 +371,15 @@ def __init__(self, buf, file_offset=-1): setattr(self, 'errors', []) - def __repr__(self): + def __repr__(self) -> str: return 'UBIFS Super Block Node' - def __iter__(self): + def __iter__(self) -> Iterator[tuple[str, Any]]: for key in dir(self): if not key.startswith('_'): yield key, getattr(self, key) - def display(self, tab=''): + def display(self, tab: str = '') -> str: return display.sb_node(self, tab) @@ -275,7 +392,42 @@ class mst_node(object): See ubifs/defines.py for object attributes. """ - def __init__(self, buf, file_offset=-1): + errors: list[str] + + highest_inum: int + cmt_no: int + flags: int + log_lnum: int + root_lnum: int + root_offs: int + root_len: int + gc_lnum: int + ihead_lnum: int + ihead_offs: int + index_size: int + total_free: int + total_dirty: int + total_used: int + total_dead: int + total_dark: int + lpt_lnum: int + lpt_offs: int + nhead_lnum: int + nhead_offs: int + ltab_lnum: int + ltab_offs: int + lsave_lnum: int + lsave_offs: int + lscan_lnum: int + empty_lebs: int + idx_lebs: int + leb_cnt: int + hash_root_idx: int + hash_lpt: bytes + hmac: bytes + padding: bytes + + def __init__(self, buf: bytes, file_offset: int = -1) -> None: self.file_offset = file_offset fields = dict(list(zip(UBIFS_MST_NODE_FIELDS, struct.unpack(UBIFS_MST_NODE_FORMAT, buf)))) for key in fields: @@ -283,13 +435,13 @@ def __init__(self, buf, file_offset=-1): setattr(self, 'errors', []) - def __repr__(self): + def __repr__(self) -> str: return 'UBIFS Master Block Node' - def __iter__(self): + def __iter__(self) -> Iterator[tuple[str, Any]]: for key in dir(self): if not key.startswith('_'): yield key, getattr(self, key) - def display(self, tab=''): + def display(self, tab: str = '') -> str: return display.mst_node(self, tab) diff --git a/ubireader/ubifs/output.py b/ubireader/ubifs/output.py index 349ef58..a518e59 100755 --- a/ubireader/ubifs/output.py +++ b/ubireader/ubifs/output.py @@ -17,8 +17,10 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations import os import struct +from typing import TYPE_CHECKING from ubireader.ubifs.decrypt import decrypt_symlink_target from ubireader import settings @@ -27,13 +29,18 @@ from ubireader.ubifs.misc import process_reg_file from ubireader.debug import error, log, verbose_log -def is_safe_path(basedir, path): +if TYPE_CHECKING: + from collections.abc import Mapping + from ubireader.ubifs import ubifs as Ubifs, nodes + from ubireader.ubifs.walk import Inode + +def is_safe_path(basedir: str, path: str) -> bool: basedir = os.path.realpath(basedir) path = os.path.realpath(os.path.join(basedir, path)) return True if path.startswith(basedir) else False -def extract_files(ubifs, out_path, perms=False): +def extract_files(ubifs: Ubifs, out_path: str, perms: bool = False) -> None: """Extract UBIFS contents to_path/ Arguments: @@ -41,8 +48,8 @@ def extract_files(ubifs, out_path, perms=False): Str:out_path -- Path to extract contents to. """ try: - inodes = {} - bad_blocks = [] + inodes: dict[int, Inode] = {} + bad_blocks: list[int] = [] walk.index(ubifs, ubifs.master_node.root_lnum, ubifs.master_node.root_offs, inodes, bad_blocks) @@ -59,7 +66,7 @@ def extract_files(ubifs, out_path, perms=False): error(extract_files, 'Error', '%s' % e) -def extract_dents(ubifs, inodes, dent_node, path='', perms=False): +def extract_dents(ubifs: Ubifs, inodes: Mapping[int, Inode], dent_node: nodes.dent_node, path: str = '', perms: bool = False) -> None: if dent_node.inum not in inodes: error(extract_dents, 'Error', 'inum: %s not found in inodes' % (dent_node.inum)) return diff --git a/ubireader/ubifs/walk.py b/ubireader/ubifs/walk.py index 0b8802f..eb97262 100755 --- a/ubireader/ubifs/walk.py +++ b/ubireader/ubifs/walk.py @@ -17,13 +17,26 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations +from collections.abc import MutableMapping +from typing import TYPE_CHECKING, TypedDict from ubireader import settings from ubireader.ubifs import nodes from ubireader.ubifs.defines import * from ubireader.debug import error, log, verbose_log, verbose_display from ubireader.ubifs.decrypt import decrypt_filenames -def index(ubifs, lnum, offset, inodes={}, bad_blocks=[]): +if TYPE_CHECKING: + from ubireader.ubifs import ubifs as Ubifs + +class Inode(TypedDict, total=False): + ino: nodes.ino_node + data: list[nodes.data_node] + dent: list[nodes.dent_node] + xent: list[nodes.xent_node] + hlink: str + +def index(ubifs: Ubifs, lnum: int, offset: int, inodes: MutableMapping[int, Inode] = {}, bad_blocks: list[int] = []) -> None: """Walk the index gathering Inode, Dir Entry, and File nodes. Arguments: @@ -42,7 +55,7 @@ def index(ubifs, lnum, offset, inodes={}, bad_blocks=[]): _index(ubifs, lnum, offset, inodes, bad_blocks) decrypt_filenames(ubifs, inodes) -def _index(ubifs, lnum, offset, inodes={}, bad_blocks=[]): +def _index(ubifs: Ubifs, lnum: int, offset: int, inodes: MutableMapping[int, Inode] = {}, bad_blocks: list[int] = []) -> None: """Walk the index gathering Inode, Dir Entry, and File nodes. Arguments: From 9c7d47741d4eed67b2fc2a9d41731051396567a8 Mon Sep 17 00:00:00 2001 From: Simon Berger Date: Sun, 7 Sep 2025 11:54:35 +0000 Subject: [PATCH 3/3] feat: add type hints to top-level ubireader module --- ubireader/debug.py | 21 ++++++++++++++---- ubireader/py.typed | 0 ubireader/ubi_io.py | 54 ++++++++++++++++++++++++++------------------- ubireader/utils.py | 13 ++++++----- 4 files changed, 55 insertions(+), 33 deletions(-) create mode 100644 ubireader/py.typed diff --git a/ubireader/debug.py b/ubireader/debug.py index 7201fe9..cf1b35e 100755 --- a/ubireader/debug.py +++ b/ubireader/debug.py @@ -17,23 +17,36 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations import sys import traceback +from typing import Literal, NoReturn, Protocol, overload from ubireader import settings -def log(obj, message): +class _Obj(Protocol): + __name__: str + +def log(obj: _Obj, message: str) -> None: if settings.logging_on or settings.logging_on_verbose: print('{} {}'.format(obj.__name__, message)) -def verbose_log(obj, message): +def verbose_log(obj: _Obj, message: str) -> None: if settings.logging_on_verbose: log(obj, message) -def verbose_display(displayable_obj): +class _Displayable(Protocol): + def display(self, tab: str) -> str: ... + +def verbose_display(displayable_obj: _Displayable) -> None: if settings.logging_on_verbose: print(displayable_obj.display('\t')) -def error(obj, level, message): +@overload +def error(obj: _Obj, level: Literal['fatal', 'Fatal'], message: str) -> NoReturn: ... +@overload +def error(obj: _Obj, level: str, message: str) -> None: ... + +def error(obj: _Obj, level: str, message: str) -> None: if settings.error_action == 'exit': print('{} {}: {}'.format(obj.__name__, level, message)) if settings.fatal_traceback: diff --git a/ubireader/py.typed b/ubireader/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/ubireader/ubi_io.py b/ubireader/ubi_io.py index efb05c7..fd49621 100755 --- a/ubireader/ubi_io.py +++ b/ubireader/ubi_io.py @@ -17,10 +17,18 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations +from typing import TYPE_CHECKING from ubireader.debug import error, log, verbose_log from ubireader.ubi.block import sort from ubireader.ubi.defines import UBI_VID_STATIC +if TYPE_CHECKING: + from typing import Self # In TYPE_CHECKING block because it's only available after python 3.11 + from collections.abc import Iterator, Mapping + from ubireader.ubi import ubi as Ubi + from ubireader.ubi.block import description as Block + class ubi_file(object): """UBI image file object @@ -50,7 +58,7 @@ class ubi_file(object): extract blocks, etc. """ - def __init__(self, path, block_size, start_offset=0, end_offset=None): + def __init__(self, path: str, block_size: int, start_offset: int = 0, end_offset: int | None = None) -> None: self.__name__ = 'UBI_File' self.is_valid = False try: @@ -89,54 +97,54 @@ def __init__(self, path, block_size, start_offset=0, end_offset=None): self._last_read_addr = self._fhandle.tell() self.is_valid = True - def __enter__(self): + def __enter__(self) -> Self: return self - def __exit__(self, exc_type, exc_value, traceback): + def __exit__(self, exc_type, exc_value, traceback) -> None: self.close() - def _set_start(self, i): + def _set_start(self, i: int) -> None: self._start_offset = i - def _get_start(self): + def _get_start(self) -> int: return self._start_offset start_offset = property(_get_start, _set_start) - def _get_end(self): + def _get_end(self) -> int: return self._end_offset end_offset = property(_get_end) - def _get_block_size(self): + def _get_block_size(self) -> int: return self._block_size block_size = property(_get_block_size) - def close(self): + def close(self) -> None: self._fhandle.close() - def seek(self, offset): + def seek(self, offset: int) -> None: self._fhandle.seek(offset) - def read(self, size): + def read(self, size: int) -> bytes: self._last_read_addr = self.tell() verbose_log(self, 'read loc: %s, size: %s' % (self._last_read_addr, size)) return self._fhandle.read(size) - def tell(self): + def tell(self) -> int: return self._fhandle.tell() - def last_read_addr(self): + def last_read_addr(self) -> int: return self._last_read_addr - def reset(self): + def reset(self) -> None: self._fhandle.seek(self.start_offset) - def reader(self): + def reader(self) -> Iterator[bytes]: self.reset() while True: cur_loc = self._fhandle.tell() @@ -154,7 +162,7 @@ def reader(self): yield buf - def read_block(self, block): + def read_block(self, block: Block) -> bytes: """Read complete PEB data from file. Argument: @@ -164,7 +172,7 @@ def read_block(self, block): return self._fhandle.read(block.size) - def read_block_data(self, block): + def read_block_data(self, block: Block) -> bytes: """Read LEB data from file Argument: @@ -180,7 +188,7 @@ def read_block_data(self, block): class leb_virtual_file(): - def __init__(self, ubi, block_list): + def __init__(self, ubi: Ubi, block_list: Mapping[int, Block]) -> None: self.__name__ = 'leb_virtual_file' self.is_valid = False self._ubi = ubi @@ -196,7 +204,7 @@ def __init__(self, ubi, block_list): self.is_valid = True - def read(self, size): + def read(self, size: int) -> bytes: buf = '' leb = int(self.tell() / self._ubi.leb_size) offset = self.tell() % self._ubi.leb_size @@ -227,24 +235,24 @@ def read(self, size): error(self, 'Fatal', 'read loc: %s, size: %s, LEB: %s, offset: %s, error: %s' % (self._last_read_addr, size, leb, offset, e)) - def reset(self): + def reset(self) -> None: self.seek(0) - def seek(self, offset): + def seek(self, offset: int) -> None: self._seek = offset - def tell(self): + def tell(self) -> int: return self._seek - def last_read_addr(self): + def last_read_addr(self) -> int: """Start address of last physical file read""" return self._last_read_addr - def reader(self): + def reader(self) -> Iterator[bytes]: last_leb = 0 for block in self._blocks: while 0 != (self._ubi.blocks[block].leb_num - last_leb): diff --git a/ubireader/utils.py b/ubireader/utils.py index 1793285..ce57464 100755 --- a/ubireader/utils.py +++ b/ubireader/utils.py @@ -17,13 +17,14 @@ # along with this program. If not, see . ############################################################# +from __future__ import annotations import re from ubireader.debug import error, log from ubireader.ubi.defines import UBI_EC_HDR_MAGIC, FILE_CHUNK_SZ from ubireader.ubifs.defines import UBIFS_NODE_MAGIC, UBIFS_SB_NODE_SZ, UBIFS_SB_NODE, UBIFS_COMMON_HDR_SZ from ubireader.ubifs import nodes -def guess_start_offset(path, guess_offset=0): +def guess_start_offset(path: str, guess_offset: int =0) -> int | None: file_offset = guess_offset f = open(path, 'rb') @@ -60,7 +61,7 @@ def guess_start_offset(path, guess_offset=0): f.close() -def guess_filetype(path, start_offset=0): +def guess_filetype(path: str, start_offset: int = 0) -> bytes | None: log(guess_filetype, 'Looking for file type at %s' % start_offset) with open(path, 'rb') as f: @@ -81,7 +82,7 @@ def guess_filetype(path, start_offset=0): return ftype -def guess_leb_size(path): +def guess_leb_size(path: str) -> int | None: """Get LEB size from superblock Arguments: @@ -125,7 +126,7 @@ def guess_leb_size(path): return block_size -def guess_peb_size(path): +def guess_peb_size(path: str) -> int | None: """Determine the most likely block size Arguments: @@ -138,7 +139,7 @@ def guess_peb_size(path): common length between them. """ file_offset = 0 - offsets = [] + offsets: list[int] = [] f = open(path, 'rb') f.seek(0,2) file_size = f.tell()+1 @@ -160,7 +161,7 @@ def guess_peb_size(path): file_offset += FILE_CHUNK_SZ f.close() - occurrences = {} + occurrences: dict[int, int] = {} for i in range(0, len(offsets)): try: diff = offsets[i] - offsets[i-1]