diff --git a/ubireader/debug.py b/ubireader/debug.py
index 7201fe9..cf1b35e 100755
--- a/ubireader/debug.py
+++ b/ubireader/debug.py
@@ -17,23 +17,36 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
import sys
import traceback
+from typing import Literal, NoReturn, Protocol, overload
from ubireader import settings
-def log(obj, message):
+class _Obj(Protocol):
+ __name__: str
+
+def log(obj: _Obj, message: str) -> None:
if settings.logging_on or settings.logging_on_verbose:
print('{} {}'.format(obj.__name__, message))
-def verbose_log(obj, message):
+def verbose_log(obj: _Obj, message: str) -> None:
if settings.logging_on_verbose:
log(obj, message)
-def verbose_display(displayable_obj):
+class _Displayable(Protocol):
+ def display(self, tab: str) -> str: ...
+
+def verbose_display(displayable_obj: _Displayable) -> None:
if settings.logging_on_verbose:
print(displayable_obj.display('\t'))
-def error(obj, level, message):
+@overload
+def error(obj: _Obj, level: Literal['fatal', 'Fatal'], message: str) -> NoReturn: ...
+@overload
+def error(obj: _Obj, level: str, message: str) -> None: ...
+
+def error(obj: _Obj, level: str, message: str) -> None:
if settings.error_action == 'exit':
print('{} {}: {}'.format(obj.__name__, level, message))
if settings.fatal_traceback:
diff --git a/ubireader/py.typed b/ubireader/py.typed
new file mode 100644
index 0000000..e69de29
diff --git a/ubireader/ubi/__init__.py b/ubireader/ubi/__init__.py
index cf2cb88..ba16efa 100755
--- a/ubireader/ubi/__init__.py
+++ b/ubireader/ubi/__init__.py
@@ -17,12 +17,19 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
+from typing import TYPE_CHECKING
from ubireader.debug import error
from ubireader.ubi.block import sort, extract_blocks
from ubireader.ubi import display
from ubireader.ubi.image import description as image
from ubireader.ubi.block import layout, rm_old_blocks
+if TYPE_CHECKING:
+ from ubireader.ubi_io import ubi_file as UbiFile
+ from ubireader.ubi.block import description as Block
+ from ubireader.ubi.image import description as Image
+
class ubi_base(object):
"""UBI Base object
@@ -39,7 +46,7 @@ class ubi_base(object):
Dict:blocks -- Dict keyed by PEB number of all blocks.
"""
- def __init__(self, ubi_file):
+ def __init__(self, ubi_file: UbiFile) -> None:
self.__name__ = 'UBI'
self._file = ubi_file
self._first_peb_num = 0
@@ -54,7 +61,7 @@ def __init__(self, ubi_file):
self._leb_size = self.file.block_size - arbitrary_block.ec_hdr.data_offset
- def _get_file(self):
+ def _get_file(self) -> UbiFile:
"""UBI File object
Returns:
@@ -64,7 +71,7 @@ def _get_file(self):
file = property(_get_file)
- def _get_block_count(self):
+ def _get_block_count(self) -> int:
"""Total amount of UBI blocks in file.
Returns:
@@ -74,9 +81,9 @@ def _get_block_count(self):
block_count = property(_get_block_count)
- def _set_first_peb_num(self, i):
+ def _set_first_peb_num(self, i: int) -> None:
self._first_peb_num = i
- def _get_first_peb_num(self):
+ def _get_first_peb_num(self) -> int:
"""First Physical Erase Block with UBI data
Returns:
@@ -86,7 +93,7 @@ def _get_first_peb_num(self):
first_peb_num = property(_get_first_peb_num, _set_first_peb_num)
- def _get_leb_size(self):
+ def _get_leb_size(self) -> int:
"""LEB size of UBI blocks in file.
Returns:
@@ -96,7 +103,7 @@ def _get_leb_size(self):
leb_size = property(_get_leb_size)
- def _get_peb_size(self):
+ def _get_peb_size(self) -> int:
"""PEB size of UBI blocks in file.
Returns:
@@ -106,7 +113,7 @@ def _get_peb_size(self):
peb_size = property(_get_peb_size)
- def _get_min_io_size(self):
+ def _get_min_io_size(self) -> int:
"""Min I/O Size
Returns:
@@ -116,7 +123,7 @@ def _get_min_io_size(self):
min_io_size = property(_get_min_io_size)
- def _get_blocks(self):
+ def _get_blocks(self) -> dict[int, Block]:
"""Main Dict of UBI Blocks
Passed around for lists of indexes to be made or to be returned
@@ -142,7 +149,7 @@ class ubi(ubi_base):
List:unknown_blocks_list -- List of blocks with unknown types. *
"""
- def __init__(self, ubi_file):
+ def __init__(self, ubi_file: UbiFile) -> None:
super(ubi, self).__init__(ubi_file)
layout_list, data_list, int_vol_list, unknown_list = sort.by_type(self.blocks)
@@ -161,12 +168,12 @@ def __init__(self, ubi_file):
layout_infos = layout.associate_blocks(self.blocks, layout_pairs)
- self._images = []
+ self._images: list[Image] = []
for i in range(0, len(layout_infos)):
self._images.append(image(self.blocks, layout_infos[i]))
- def _get_images(self):
+ def _get_images(self) -> list[Image]:
"""Get UBI images.
Returns:
@@ -176,7 +183,7 @@ def _get_images(self):
images = property(_get_images)
- def _get_data_blocks_list(self):
+ def _get_data_blocks_list(self) -> list[int]:
"""Get all UBI blocks found in file that are data blocks.
Returns:
@@ -186,7 +193,7 @@ def _get_data_blocks_list(self):
data_blocks_list = property(_get_data_blocks_list)
- def _get_layout_blocks_list(self):
+ def _get_layout_blocks_list(self) -> list[int]:
"""Get all UBI blocks found in file that are layout volume blocks.
Returns:
@@ -196,7 +203,7 @@ def _get_layout_blocks_list(self):
layout_blocks_list = property(_get_layout_blocks_list)
- def _get_int_vol_blocks_list(self):
+ def _get_int_vol_blocks_list(self) -> list[int]:
"""Get all UBI blocks found in file that are internal volume blocks.
Returns:
@@ -208,7 +215,7 @@ def _get_int_vol_blocks_list(self):
int_vol_blocks_list = property(_get_int_vol_blocks_list)
- def _get_unknown_blocks_list(self):
+ def _get_unknown_blocks_list(self) -> list[int]:
"""Get all UBI blocks found in file of unknown type..
Returns:
@@ -217,7 +224,7 @@ def _get_unknown_blocks_list(self):
return self._unknown_blocks_list
unknown_blocks_list = property(_get_unknown_blocks_list)
- def display(self, tab=''):
+ def display(self, tab: str = '') -> str:
"""Print information about this object.
Argument:
diff --git a/ubireader/ubi/block/__init__.py b/ubireader/ubi/block/__init__.py
index ce80f96..ca42d57 100755
--- a/ubireader/ubi/block/__init__.py
+++ b/ubireader/ubi/block/__init__.py
@@ -17,6 +17,8 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
+from typing import TYPE_CHECKING
from zlib import crc32
from ubireader import settings
from ubireader.debug import error, log, verbose_display, verbose_log
@@ -24,6 +26,10 @@
from ubireader.ubi.defines import UBI_EC_HDR_SZ, UBI_VID_HDR_SZ, UBI_INTERNAL_VOL_START, UBI_EC_HDR_MAGIC, UBI_CRC32_INIT
from ubireader.ubi.headers import ec_hdr, vid_hdr, vtbl_recs
+if TYPE_CHECKING:
+ from collections.abc import Iterable, Mapping
+ from ubireader.ubi import ubi_base as UbiBase
+ from ubireader.ubi.headers import _vtbl_rec as VtblRec
class description(object):
"""UBI Block description Object
@@ -48,16 +54,18 @@ class description(object):
Will print out all information when invoked as a string.
"""
- def __init__(self, block_buf):
+ data_crc: int
+
+ def __init__(self, block_buf: bytes) -> None:
self.file_offset = -1
self.peb_num = -1
self.leb_num = -1
self.size = -1
- self.vid_hdr = None
+ self.vid_hdr: vid_hdr | None = None
self.is_internal_vol = False
- self.vtbl_recs = []
+ self.vtbl_recs: list[VtblRec] = []
# TODO better understanding of block types/errors
self.ec_hdr = ec_hdr(block_buf[0:UBI_EC_HDR_SZ])
@@ -77,16 +85,16 @@ def __init__(self, block_buf):
self.is_valid = not self.ec_hdr.errors and not self.vid_hdr.errors or settings.ignore_block_header_errors
- def __repr__(self):
+ def __repr__(self) -> str:
return 'Block: PEB# %s: LEB# %s' % (self.peb_num, self.leb_num)
- def display(self, tab=''):
+ def display(self, tab: str ='') -> str:
return display.block(self, tab)
-def get_blocks_in_list(blocks, idx_list):
+def get_blocks_in_list(blocks: Mapping[int, description], idx_list: Iterable[int]) -> dict[int, description]:
"""Retrieve block objects in list of indexes
Arguments:
@@ -103,7 +111,7 @@ def get_blocks_in_list(blocks, idx_list):
-def extract_blocks(ubi):
+def extract_blocks(ubi: UbiBase) -> dict[int, description]:
"""Get a list of UBI block objects from file
Arguments:.
@@ -113,11 +121,11 @@ def extract_blocks(ubi):
Dict -- Of block objects keyed by PEB number.
"""
- blocks = {}
+ blocks: dict[int, description] = {}
ubi.file.seek(ubi.file.start_offset)
peb_count = 0
cur_offset = 0
- bad_blocks = []
+ bad_blocks: list[int] = []
# range instead of xrange, as xrange breaks > 4GB end_offset.
for i in range(ubi.file.start_offset, ubi.file.end_offset, ubi.file.block_size):
@@ -157,8 +165,8 @@ def extract_blocks(ubi):
return blocks
-def rm_old_blocks(blocks, block_list):
- del_blocks = []
+def rm_old_blocks(blocks: Mapping[int, description], block_list: Iterable[int]) -> list[int]:
+ del_blocks: list[int] = []
for i in block_list:
if i in del_blocks:
diff --git a/ubireader/ubi/block/layout.py b/ubireader/ubi/block/layout.py
index 67388d9..f5e575e 100755
--- a/ubireader/ubi/block/layout.py
+++ b/ubireader/ubi/block/layout.py
@@ -17,10 +17,20 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
+from typing import TYPE_CHECKING, Literal, Protocol, overload
from ubireader.debug import log
from ubireader.ubi.block import sort
-def group_pairs(blocks, layout_blocks_list):
+if TYPE_CHECKING:
+ from collections.abc import Iterable, Mapping
+ from ubireader.ubi.block import description as Block
+
+class _LayoutPair(Protocol):
+ def __getitem__(self, idx: Literal[0, 1], /) -> int: ...
+ def __contains__(self, key: object, /) -> bool: ...
+
+def group_pairs(blocks: Mapping[int, Block], layout_blocks_list: Iterable[int]) -> list[_LayoutPair]:
"""Sort a list of layout blocks into pairs
Arguments:
@@ -31,7 +41,7 @@ def group_pairs(blocks, layout_blocks_list):
List -- Layout block pair indexes grouped in a list
"""
- image_dict={}
+ image_dict: dict[int, _LayoutPair] = {}
for block_id in layout_blocks_list:
image_seq=blocks[block_id].ec_hdr.image_seq
if image_seq not in image_dict:
@@ -43,8 +53,13 @@ def group_pairs(blocks, layout_blocks_list):
return list(image_dict.values())
+class _LayoutInfo(_LayoutPair, Protocol):
+ @overload
+ def __getitem__(self, idx: Literal[0, 1], /) -> int: ...
+ @overload
+ def __getitem__(self, idx: Literal[2], /) -> list[int]: ...
-def associate_blocks(blocks, layout_pairs):
+def associate_blocks(blocks: Mapping[int, Block], layout_pairs: list[_LayoutPair]) -> list[_LayoutInfo]:
"""Group block indexes with appropriate layout pairs
Arguments:
@@ -55,7 +70,7 @@ def associate_blocks(blocks, layout_pairs):
List -- Layout block pairs grouped with associated block ranges.
"""
- seq_blocks = []
+ seq_blocks: list[int] = []
for layout_pair in layout_pairs:
seq_blocks = sort.by_image_seq(blocks, blocks[layout_pair[0]].ec_hdr.image_seq)
seq_blocks = [b for b in seq_blocks if b not in layout_pair]
diff --git a/ubireader/ubi/block/sort.py b/ubireader/ubi/block/sort.py
index e9e115b..f9705ff 100644
--- a/ubireader/ubi/block/sort.py
+++ b/ubireader/ubi/block/sort.py
@@ -17,9 +17,15 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
+from typing import TYPE_CHECKING, Literal
from ubireader import settings
-def by_image_seq(blocks, image_seq):
+if TYPE_CHECKING:
+ from collections.abc import Mapping
+ from ubireader.ubi.block import description as Block
+
+def by_image_seq(blocks: Mapping[int, Block], image_seq: int) -> list[int]:
"""Filter blocks to return only those associated with the provided image_seq number.
If uboot_fix is set, associate blocks with an image_seq of 0 also.
@@ -36,7 +42,7 @@ def by_image_seq(blocks, image_seq):
else:
return list(filter(lambda block: blocks[block].ec_hdr.image_seq == image_seq, blocks))
-def by_leb(blocks):
+def by_leb(blocks: Mapping[int, Block]) -> list[Literal['x'] | int]:
"""Sort blocks by Logical Erase Block number.
Arguments:
@@ -46,7 +52,7 @@ def by_leb(blocks):
List -- Indexes of blocks sorted by LEB.
"""
slist_len = len(blocks)
- slist = ['x'] * slist_len
+ slist: list[Literal['x'] | int] = ['x'] * slist_len
for block in blocks:
if blocks[block].leb_num >= slist_len:
@@ -59,7 +65,7 @@ def by_leb(blocks):
return slist
-def by_vol_id(blocks, slist=None):
+def by_vol_id(blocks: Mapping[int, Block], slist: list[int] | None = None) -> dict[int, list[int]]:
"""Sort blocks by volume id
Arguments:
@@ -70,7 +76,7 @@ def by_vol_id(blocks, slist=None):
Dict -- blocks grouped in lists with dict key as volume id.
"""
- vol_blocks = {}
+ vol_blocks: dict[int, list[int]] = {}
# sort block by volume
# not reliable with multiple partitions (fifo)
@@ -88,7 +94,7 @@ def by_vol_id(blocks, slist=None):
return vol_blocks
-def by_type(blocks, slist=None):
+def by_type(blocks: Mapping[int, Block], slist: list[int] | None = None) -> tuple[list[int], list[int], list[int], list[int]]:
"""Sort blocks into layout, internal volume, data or unknown
Arguments:
@@ -106,10 +112,10 @@ def by_type(blocks, slist=None):
of crc in ed_hdr or vid_hdr.
"""
- layout = []
- data = []
- int_vol = []
- unknown = []
+ layout: list[int] = []
+ data: list[int] = []
+ int_vol: list[int] = []
+ unknown: list[int] = []
for i in blocks:
if slist and i not in slist:
diff --git a/ubireader/ubi/display.py b/ubireader/ubi/display.py
index 9efbc6a..b2f9080 100755
--- a/ubireader/ubi/display.py
+++ b/ubireader/ubi/display.py
@@ -17,10 +17,19 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
+from typing import TYPE_CHECKING
from ubireader import settings
from ubireader.ubi.defines import PRINT_COMPAT_LIST, PRINT_VOL_TYPE_LIST, UBI_VTBL_AUTORESIZE_FLG
-def ubi(ubi, tab=''):
+if TYPE_CHECKING:
+ from ubireader.ubi import ubi as Ubi
+ from ubireader.ubi.block import description as Block
+ from ubireader.ubi.headers import ec_hdr as EcHdr, vid_hdr as VidHdr, _vtbl_rec as VtblRec
+ from ubireader.ubi.image import description as Image
+ from ubireader.ubi.volume import description as Volume
+
+def ubi(ubi: Ubi, tab: str = '') -> str:
buf = '%sUBI File\n' % (tab)
buf += '%s---------------------\n' % (tab)
buf += '\t%sMin I/O: %s\n' % (tab, ubi.min_io_size)
@@ -35,7 +44,7 @@ def ubi(ubi, tab=''):
return buf
-def image(image, tab=''):
+def image(image: Image, tab: str = '') -> str:
buf = '%s%s\n' % (tab, image)
buf += '%s---------------------\n' % (tab)
buf += '\t%sImage Sequence Num: %s\n' % (tab, image.image_seq)
@@ -45,7 +54,7 @@ def image(image, tab=''):
return buf
-def volume(volume, tab=''):
+def volume(volume: Volume, tab: str = '') -> str:
buf = '%s%s\n' % (tab, volume)
buf += '%s---------------------\n' % (tab)
buf += '\t%sVol ID: %s\n' % (tab, volume.vol_id)
@@ -61,7 +70,7 @@ def volume(volume, tab=''):
return buf
-def block(block, tab='\t'):
+def block(block: Block, tab: str = '\t') -> str:
buf = '%s%s\n' % (tab, block)
buf += '%s---------------------\n' % (tab)
buf += '\t%sFile Offset: %s\n' % (tab, block.file_offset)
@@ -95,7 +104,7 @@ def block(block, tab='\t'):
return buf
-def ec_hdr(ec_hdr, tab=''):
+def ec_hdr(ec_hdr: EcHdr, tab: str = '') -> str:
buf = ''
for key, value in ec_hdr:
if key == 'errors':
@@ -108,7 +117,7 @@ def ec_hdr(ec_hdr, tab=''):
return buf
-def vid_hdr(vid_hdr, tab=''):
+def vid_hdr(vid_hdr: VidHdr, tab: str = '') -> str:
buf = ''
for key, value in vid_hdr:
if key == 'errors':
@@ -133,7 +142,7 @@ def vid_hdr(vid_hdr, tab=''):
return buf
-def vol_rec(vol_rec, tab=''):
+def vol_rec(vol_rec: VtblRec, tab: str = '') -> str:
buf = ''
for key, value in vol_rec:
if key == 'errors':
diff --git a/ubireader/ubi/headers.py b/ubireader/ubi/headers.py
index 5fa3905..8f60030 100755
--- a/ubireader/ubi/headers.py
+++ b/ubireader/ubi/headers.py
@@ -17,30 +17,48 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
import struct
+from typing import TYPE_CHECKING, Any
from zlib import crc32
from ubireader.debug import log
from ubireader.ubi.defines import *
+if TYPE_CHECKING:
+ from collections.abc import Iterator
+
+
class ec_hdr(object):
- def __init__(self, buf):
+ errors: list[str]
+
+ magic: bytes
+ version: int
+ padding: bytes
+ ec: int
+ vid_hdr_offset: int
+ data_offset: int
+ image_seq: int
+ padding2: bytes
+ hdr_crc: int
+
+ def __init__(self, buf: bytes) -> None:
fields = dict(list(zip(EC_HDR_FIELDS, struct.unpack(EC_HDR_FORMAT,buf))))
for key in fields:
setattr(self, key, fields[key])
setattr(self, 'errors', [])
self._check_errors(buf[:-4])
-
- def __repr__(self):
+
+ def __repr__(self) -> str:
return 'Erase Count Header'
- def __iter__(self):
+ def __iter__(self) -> Iterator[tuple[str, Any]]:
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
- def _check_errors(self, buf_crc):
+ def _check_errors(self, buf_crc: bytes) -> None:
crc_chk = (~crc32(buf_crc) & UBI_CRC32_INIT)
if self.hdr_crc != crc_chk:
log(vid_hdr, 'CRC Failed: expected 0x%x got 0x%x' % (crc_chk, self.hdr_crc))
@@ -48,7 +66,26 @@ def _check_errors(self, buf_crc):
class vid_hdr(object):
- def __init__(self, buf):
+ errors: list[str]
+
+ magic: bytes
+ version: int
+ vol_type: int
+ copy_flag: int
+ compat: int
+ vol_id: int
+ lnum: int
+ padding: bytes
+ data_size: int
+ used_ebs: int
+ data_pad: int
+ data_crc: int
+ padding2: bytes
+ sqnum: int
+ padding3: bytes
+ hdr_crc: int
+
+ def __init__(self, buf: bytes) -> None:
fields = dict(list(zip(VID_HDR_FIELDS, struct.unpack(VID_HDR_FORMAT,buf))))
for key in fields:
setattr(self, key, fields[key])
@@ -56,24 +93,24 @@ def __init__(self, buf):
self._check_errors(buf[:-4])
- def __iter__(self):
+ def __iter__(self) -> Iterator[tuple[str, Any]]:
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
- def __repr__(self):
+ def __repr__(self) -> str:
return 'VID Header'
- def _check_errors(self, buf_crc):
+ def _check_errors(self, buf_crc: bytes) -> None:
crc_chk = (~crc32(buf_crc) & UBI_CRC32_INIT)
if self.hdr_crc != crc_chk:
log(vid_hdr, 'CRC Failed: expected 0x%x got 0x%x' % (crc_chk, self.hdr_crc))
self.errors.append('crc')
-def vtbl_recs(buf):
+def vtbl_recs(buf: bytes) -> list[_vtbl_rec]:
data_buf = buf
- vtbl_recs = []
+ vtbl_recs: list[_vtbl_rec] = []
vtbl_rec_ret = ''
for i in range(0, UBI_MAX_VOLUMES):
@@ -91,7 +128,21 @@ def vtbl_recs(buf):
class _vtbl_rec(object):
- def __init__(self, buf):
+ errors: list[str]
+ rec_index: int
+
+ reserved_pebs: int
+ alignment: int
+ data_pad: int
+ vol_type: int
+ upd_marker: int
+ name_len: int
+ name: bytes
+ flags: int
+ padding: bytes
+ crc: int
+
+ def __init__(self, buf: bytes) -> None:
fields = dict(list(zip(VTBL_REC_FIELDS, struct.unpack(VTBL_REC_FORMAT,buf))))
for key in fields:
setattr(self, key, fields[key])
@@ -104,11 +155,11 @@ def __init__(self, buf):
def __repr__(self):
return 'Volume Table Record: %s' % getattr(self, 'name')
- def __iter__(self):
+ def __iter__(self) -> Iterator[tuple[str, Any]]:
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
- def _check_errors(self, buf_crc):
+ def _check_errors(self, buf_crc: bytes) -> None:
if self.crc != (~crc32(buf_crc) & 0xFFFFFFFF):
self.errors.append('crc')
diff --git a/ubireader/ubi/image.py b/ubireader/ubi/image.py
index f634b28..23852e6 100755
--- a/ubireader/ubi/image.py
+++ b/ubireader/ubi/image.py
@@ -17,13 +17,21 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
+from typing import TYPE_CHECKING
from ubireader.debug import log
from ubireader.ubi import display
from ubireader.ubi.volume import get_volumes
from ubireader.ubi.block import get_blocks_in_list
+if TYPE_CHECKING:
+ from collections.abc import Mapping
+ from ubireader.ubi.block import description as Block
+ from ubireader.ubi.block.layout import _LayoutInfo
+ from ubireader.ubi.volume import description as Volume
+
class description(object):
- def __init__(self, blocks, layout_info):
+ def __init__(self, blocks: dict[int, Block], layout_info: _LayoutInfo) -> None:
self._image_seq = blocks[layout_info[0]].ec_hdr.image_seq
self.vid_hdr_offset = blocks[layout_info[0]].ec_hdr.vid_hdr_offset
self.version = blocks[layout_info[0]].ec_hdr.version
@@ -33,28 +41,28 @@ def __init__(self, blocks, layout_info):
self._volumes = get_volumes(blocks, layout_info)
log(description, 'Created Image: %s, Volume Cnt: %s' % (self.image_seq, len(self.volumes)))
- def __repr__(self):
+ def __repr__(self) -> str:
return 'Image: %s' % (self.image_seq)
- def get_blocks(self, blocks):
+ def get_blocks(self, blocks: Mapping[int, Block]) -> dict[int, Block]:
return get_blocks_in_list(blocks, self._block_list)
- def _get_peb_range(self):
+ def _get_peb_range(self) -> list[int]:
return [self._start_peb, self._end_peb]
peb_range = property(_get_peb_range)
- def _get_image_seq(self):
+ def _get_image_seq(self) -> int:
return self._image_seq
image_seq = property(_get_image_seq)
- def _get_volumes(self):
+ def _get_volumes(self) -> dict[str, Volume]:
return self._volumes
volumes = property(_get_volumes)
- def display(self, tab=''):
+ def display(self, tab: str = '') -> str:
return display.image(self, tab)
diff --git a/ubireader/ubi/volume.py b/ubireader/ubi/volume.py
index f02834b..4ee9cd9 100755
--- a/ubireader/ubi/volume.py
+++ b/ubireader/ubi/volume.py
@@ -17,10 +17,19 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
+from typing import TYPE_CHECKING
from ubireader.debug import log
from ubireader.ubi import display
from ubireader.ubi.block import sort, get_blocks_in_list, rm_old_blocks
+if TYPE_CHECKING:
+ from collections.abc import Iterator, Mapping
+ from ubireader.ubi import ubi as Ubi
+ from ubireader.ubi.block import description as Block
+ from ubireader.ubi.block.layout import _LayoutInfo
+ from ubireader.ubi.headers import _vtbl_rec as VtblRec
+
class description(object):
"""UBI Volume object
@@ -39,7 +48,7 @@ class description(object):
Volume object is basically a list of block indexes and some metadata
describing a volume found in a UBI image.
"""
- def __init__(self, vol_id, vol_rec, block_list):
+ def __init__(self, vol_id: int, vol_rec: VtblRec, block_list: list[int]) -> None:
self._vol_id = vol_id
self._vol_rec = vol_rec
self._name = self._vol_rec.name
@@ -47,44 +56,44 @@ def __init__(self, vol_id, vol_rec, block_list):
log(description, 'Create Volume: %s, ID: %s, Block Cnt: %s' % (self.name, self.vol_id, len(self.block_list)))
- def __repr__(self):
+ def __repr__(self) -> str:
return 'Volume: %s' % (self.name.decode('utf-8'))
- def _get_name(self):
+ def _get_name(self) -> bytes:
return self._name
name = property(_get_name)
- def _get_vol_id(self):
+ def _get_vol_id(self) -> int:
return self._vol_id
vol_id = property(_get_vol_id)
- def _get_block_count(self):
+ def _get_block_count(self) -> int:
return len(self._block_list)
block_count = property(_get_block_count)
- def _get_vol_rec(self):
+ def _get_vol_rec(self) -> VtblRec:
return self._vol_rec
vol_rec = property(_get_vol_rec)
- def _get_block_list(self):
+ def _get_block_list(self) -> list[int]:
return self._block_list
block_list = property(_get_block_list)
- def get_blocks(self, blocks):
+ def get_blocks(self, blocks: Mapping[int, Block]) -> dict[int, Block]:
return get_blocks_in_list(blocks, self._block_list)
- def display(self, tab=''):
+ def display(self, tab: str = '') -> str:
return display.volume(self, tab)
- def reader(self, ubi):
+ def reader(self, ubi: Ubi) -> Iterator[bytes]:
last_leb = 0
for block in sort.by_leb(self.get_blocks(ubi.blocks)):
if block == 'x':
@@ -95,7 +104,7 @@ def reader(self, ubi):
yield ubi.file.read_block_data(ubi.blocks[block])
-def get_volumes(blocks, layout_info):
+def get_volumes(blocks: Mapping[int, Block], layout_info: _LayoutInfo) -> dict[str, description]:
"""Get a list of UBI volume objects from list of blocks
Arguments:
@@ -107,7 +116,7 @@ def get_volumes(blocks, layout_info):
Dict -- Of Volume objects by volume name, including any
relevant blocks.
"""
- volumes = {}
+ volumes: dict[str, description] = {}
vol_blocks_lists = sort.by_vol_id(blocks, layout_info[2])
for vol_rec in blocks[layout_info[0]].vtbl_recs:
diff --git a/ubireader/ubi_io.py b/ubireader/ubi_io.py
index efb05c7..fd49621 100755
--- a/ubireader/ubi_io.py
+++ b/ubireader/ubi_io.py
@@ -17,10 +17,18 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
+from typing import TYPE_CHECKING
from ubireader.debug import error, log, verbose_log
from ubireader.ubi.block import sort
from ubireader.ubi.defines import UBI_VID_STATIC
+if TYPE_CHECKING:
+ from typing import Self # In TYPE_CHECKING block because it's only available after python 3.11
+ from collections.abc import Iterator, Mapping
+ from ubireader.ubi import ubi as Ubi
+ from ubireader.ubi.block import description as Block
+
class ubi_file(object):
"""UBI image file object
@@ -50,7 +58,7 @@ class ubi_file(object):
extract blocks, etc.
"""
- def __init__(self, path, block_size, start_offset=0, end_offset=None):
+ def __init__(self, path: str, block_size: int, start_offset: int = 0, end_offset: int | None = None) -> None:
self.__name__ = 'UBI_File'
self.is_valid = False
try:
@@ -89,54 +97,54 @@ def __init__(self, path, block_size, start_offset=0, end_offset=None):
self._last_read_addr = self._fhandle.tell()
self.is_valid = True
- def __enter__(self):
+ def __enter__(self) -> Self:
return self
- def __exit__(self, exc_type, exc_value, traceback):
+ def __exit__(self, exc_type, exc_value, traceback) -> None:
self.close()
- def _set_start(self, i):
+ def _set_start(self, i: int) -> None:
self._start_offset = i
- def _get_start(self):
+ def _get_start(self) -> int:
return self._start_offset
start_offset = property(_get_start, _set_start)
- def _get_end(self):
+ def _get_end(self) -> int:
return self._end_offset
end_offset = property(_get_end)
- def _get_block_size(self):
+ def _get_block_size(self) -> int:
return self._block_size
block_size = property(_get_block_size)
- def close(self):
+ def close(self) -> None:
self._fhandle.close()
- def seek(self, offset):
+ def seek(self, offset: int) -> None:
self._fhandle.seek(offset)
- def read(self, size):
+ def read(self, size: int) -> bytes:
self._last_read_addr = self.tell()
verbose_log(self, 'read loc: %s, size: %s' % (self._last_read_addr, size))
return self._fhandle.read(size)
- def tell(self):
+ def tell(self) -> int:
return self._fhandle.tell()
- def last_read_addr(self):
+ def last_read_addr(self) -> int:
return self._last_read_addr
- def reset(self):
+ def reset(self) -> None:
self._fhandle.seek(self.start_offset)
- def reader(self):
+ def reader(self) -> Iterator[bytes]:
self.reset()
while True:
cur_loc = self._fhandle.tell()
@@ -154,7 +162,7 @@ def reader(self):
yield buf
- def read_block(self, block):
+ def read_block(self, block: Block) -> bytes:
"""Read complete PEB data from file.
Argument:
@@ -164,7 +172,7 @@ def read_block(self, block):
return self._fhandle.read(block.size)
- def read_block_data(self, block):
+ def read_block_data(self, block: Block) -> bytes:
"""Read LEB data from file
Argument:
@@ -180,7 +188,7 @@ def read_block_data(self, block):
class leb_virtual_file():
- def __init__(self, ubi, block_list):
+ def __init__(self, ubi: Ubi, block_list: Mapping[int, Block]) -> None:
self.__name__ = 'leb_virtual_file'
self.is_valid = False
self._ubi = ubi
@@ -196,7 +204,7 @@ def __init__(self, ubi, block_list):
self.is_valid = True
- def read(self, size):
+ def read(self, size: int) -> bytes:
buf = ''
leb = int(self.tell() / self._ubi.leb_size)
offset = self.tell() % self._ubi.leb_size
@@ -227,24 +235,24 @@ def read(self, size):
error(self, 'Fatal', 'read loc: %s, size: %s, LEB: %s, offset: %s, error: %s' % (self._last_read_addr, size, leb, offset, e))
- def reset(self):
+ def reset(self) -> None:
self.seek(0)
- def seek(self, offset):
+ def seek(self, offset: int) -> None:
self._seek = offset
- def tell(self):
+ def tell(self) -> int:
return self._seek
- def last_read_addr(self):
+ def last_read_addr(self) -> int:
"""Start address of last physical file read"""
return self._last_read_addr
- def reader(self):
+ def reader(self) -> Iterator[bytes]:
last_leb = 0
for block in self._blocks:
while 0 != (self._ubi.blocks[block].leb_num - last_leb):
diff --git a/ubireader/ubifs/__init__.py b/ubireader/ubifs/__init__.py
index d8b5418..4f8c808 100755
--- a/ubireader/ubifs/__init__.py
+++ b/ubireader/ubifs/__init__.py
@@ -17,10 +17,14 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
+from typing import TYPE_CHECKING
from ubireader.debug import error, log, verbose_display
from ubireader.ubifs.defines import *
from ubireader.ubifs import nodes, display
-from typing import Optional
+
+if TYPE_CHECKING:
+ from ubireader.ubi_io import ubi_file as UbiFile, leb_virtual_file as LebVirtualFile
class ubifs():
"""UBIFS object
@@ -36,7 +40,7 @@ class ubifs():
Obj:mst_node -- Master Node of UBIFS image LEB1
Obj:mst_node2 -- Master Node 2 of UBIFS image LEB2
"""
- def __init__(self, ubifs_file, master_key: Optional[bytes] = None):
+ def __init__(self, ubifs_file: UbiFile | LebVirtualFile, master_key: bytes | None = None) -> None:
self.__name__ = 'UBIFS'
self._file = ubifs_file
self.master_key = master_key
@@ -59,7 +63,7 @@ def __init__(self, ubifs_file, master_key: Optional[bytes] = None):
except Exception as e:
error(self, 'Fatal', 'Super block error: %s' % e)
- self._mst_nodes = [None, None]
+ self._mst_nodes: list[nodes.mst_node | None] = [None, None]
for i in range(0, 2):
try:
mst_offset = self.leb_size * (UBIFS_MST_LNUM + i)
@@ -88,12 +92,12 @@ def __init__(self, ubifs_file, master_key: Optional[bytes] = None):
log(self , 'Swapping Master Nodes due to bad first node.')
- def _get_file(self):
+ def _get_file(self) -> UbiFile | LebVirtualFile:
return self._file
file = property(_get_file)
- def _get_superblock(self):
+ def _get_superblock(self) -> nodes.sb_node:
""" Superblock Node Object
Returns:
@@ -103,7 +107,7 @@ def _get_superblock(self):
superblock_node = property(_get_superblock)
- def _get_master_node(self):
+ def _get_master_node(self) -> nodes.mst_node | None:
"""Master Node Object
Returns:
@@ -113,7 +117,7 @@ def _get_master_node(self):
master_node = property(_get_master_node)
- def _get_master_node2(self):
+ def _get_master_node2(self) -> nodes.mst_node | None:
"""Master Node Object 2
Returns:
@@ -123,7 +127,7 @@ def _get_master_node2(self):
master_node2 = property(_get_master_node2)
- def _get_leb_size(self):
+ def _get_leb_size(self) -> int:
"""LEB size of UBI blocks in file.
Returns:
@@ -133,7 +137,7 @@ def _get_leb_size(self):
leb_size = property(_get_leb_size)
- def _get_min_io_size(self):
+ def _get_min_io_size(self) -> int:
"""Min I/O Size
Returns:
@@ -142,7 +146,7 @@ def _get_min_io_size(self):
return self._min_io_size
min_io_size = property(_get_min_io_size)
- def display(self, tab=''):
+ def display(self, tab: str = '') -> str:
"""Print information about this object.
Argument:
diff --git a/ubireader/ubifs/decrypt.py b/ubireader/ubifs/decrypt.py
index 0d5b816..4eb4e0a 100644
--- a/ubireader/ubifs/decrypt.py
+++ b/ubireader/ubifs/decrypt.py
@@ -1,12 +1,19 @@
+from __future__ import annotations
+from typing import TYPE_CHECKING
from ubireader.ubifs.defines import UBIFS_XATTR_NAME_ENCRYPTION_CONTEXT
from ubireader.debug import error
from cryptography.hazmat.primitives.ciphers import (
Cipher, algorithms, modes
)
+if TYPE_CHECKING:
+ from collections.abc import Mapping
+ from ubireader.ubifs import ubifs as Ubifs, nodes
+ from ubireader.ubifs.walk import Inode
+
AES_BLOCK_SIZE = algorithms.AES.block_size // 8
-def lookup_inode_nonce(inodes: dict, inode: dict) -> bytes:
+def lookup_inode_nonce(inodes: Mapping[int, Inode], inode: Inode) -> bytes | None:
# get the extended attribute 'xent' of the inode
if 'xent' not in inode or not inode['xent']:
raise ValueError(f"No xent found for inode {inode}")
@@ -29,7 +36,7 @@ def derive_key_from_nonce(master_key: bytes, nonce: bytes) -> bytes:
return derived_key
-def filename_decrypt(key: bytes, ciphertext: bytes):
+def filename_decrypt(key: bytes, ciphertext: bytes) -> bytes:
# using AES CTS-CBC mode not supported by pyca cryptography
if len(ciphertext) > AES_BLOCK_SIZE:
@@ -59,7 +66,7 @@ def filename_decrypt(key: bytes, ciphertext: bytes):
return plaintext.rstrip(b'\x00')
-def datablock_decrypt(block_key: bytes, block_iv: bytes, block_data: bytes):
+def datablock_decrypt(block_key: bytes, block_iv: bytes, block_data: bytes) -> bytes:
decryptor = Cipher(
algorithms.AES(block_key),
modes.XTS(block_iv),
@@ -67,7 +74,7 @@ def datablock_decrypt(block_key: bytes, block_iv: bytes, block_data: bytes):
return decryptor.update(block_data) + decryptor.finalize()
-def decrypt_filenames(ubifs, inodes):
+def decrypt_filenames(ubifs: Ubifs, inodes: Mapping[int, Inode]) -> None:
if ubifs.master_key is None:
for inode in inodes.values():
for dent in inode.get('dent', []):
@@ -87,7 +94,7 @@ def decrypt_filenames(ubifs, inodes):
error(decrypt_filenames, 'Error', str(e))
-def decrypt_symlink_target(ubifs, inodes, dent_node) -> str:
+def decrypt_symlink_target(ubifs: Ubifs, inodes: Mapping[int, Inode], dent_node: nodes.dent_node) -> str:
if ubifs.master_key is None:
return inodes[dent_node.inum]['ino'].data.decode()
inode = inodes[dent_node.inum]
diff --git a/ubireader/ubifs/display.py b/ubireader/ubifs/display.py
index c7241f0..e0da488 100644
--- a/ubireader/ubifs/display.py
+++ b/ubireader/ubifs/display.py
@@ -17,16 +17,22 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
+from typing import TYPE_CHECKING
from ubireader.ubifs.defines import PRINT_UBIFS_FLGS, PRINT_UBIFS_MST
-def ubifs(ubifs, tab=''):
+if TYPE_CHECKING:
+ from ubireader.ubifs import ubifs as Ubifs
+ from ubireader.ubifs import nodes
+
+def ubifs(ubifs: Ubifs, tab: str = '') -> str:
buf = '%sUBIFS Image\n' % (tab)
buf += '%s---------------------\n' % (tab)
buf += '%sMin I/O: %s\n' % (tab, ubifs.min_io_size)
buf += '%sLEB Size: %s\n' % (tab, ubifs.leb_size)
return buf
-def common_hdr(chdr, tab=''):
+def common_hdr(chdr: nodes.common_hdr, tab: str = '') -> str:
buf = '%s%s\n' % (tab, chdr)
buf += '%s---------------------\n' % (tab)
tab += '\t'
@@ -41,7 +47,7 @@ def common_hdr(chdr, tab=''):
buf += '%s%s: %r\n' % (tab, key, value)
return buf
-def sb_node(node, tab=''):
+def sb_node(node: nodes.sb_node, tab: str = '') -> str:
buf = '%s%s\n' % (tab, node)
buf += '%sFile offset: %s\n' % (tab, node.file_offset)
buf += '%s---------------------\n' % (tab)
@@ -69,7 +75,7 @@ def sb_node(node, tab=''):
return buf
-def mst_node(node, tab=''):
+def mst_node(node: nodes.mst_node, tab: str = '') -> str:
buf = '%s%s\n' % (tab, node)
buf += '%sFile offset: %s\n' % (tab, node.file_offset)
buf += '%s---------------------\n' % (tab)
@@ -94,7 +100,7 @@ def mst_node(node, tab=''):
return buf
-def dent_node(node, tab=''):
+def dent_node(node: nodes.dent_node, tab: str = '') -> str:
buf = '%s%s\n' % (tab, node)
buf += '%s---------------------\n' % (tab)
tab += '\t'
@@ -108,7 +114,7 @@ def dent_node(node, tab=''):
return buf
-def data_node(node, tab=''):
+def data_node(node: nodes.data_node, tab: str = '') -> str:
buf = '%s%s\n' % (tab, node)
buf += '%s---------------------\n' % (tab)
tab += '\t'
@@ -122,7 +128,7 @@ def data_node(node, tab=''):
return buf
-def idx_node(node, tab=''):
+def idx_node(node: nodes.idx_node, tab: str = '') -> str:
buf = '%s%s\n' % (tab, node)
buf += '%s---------------------\n' % (tab)
tab += '\t'
@@ -136,7 +142,7 @@ def idx_node(node, tab=''):
return buf
-def ino_node(node, tab=''):
+def ino_node(node: nodes.ino_node, tab: str = '') -> str:
buf = '%s%s\n' % (tab, node)
buf += '%s---------------------\n' % (tab)
tab += '\t'
@@ -150,7 +156,7 @@ def ino_node(node, tab=''):
return buf
-def branch(node, tab=''):
+def branch(node: nodes.branch, tab: str = '') -> str:
buf = '%s%s\n' % (tab, node)
buf += '%s---------------------\n' % (tab)
tab += '\t'
diff --git a/ubireader/ubifs/list.py b/ubireader/ubifs/list.py
index b7d2c92..c858d05 100755
--- a/ubireader/ubifs/list.py
+++ b/ubireader/ubifs/list.py
@@ -17,25 +17,30 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
import os
import time
-import struct
+from typing import TYPE_CHECKING
from ubireader.ubifs.decrypt import decrypt_symlink_target
from ubireader.ubifs.defines import *
from ubireader.ubifs import walk
from ubireader.ubifs.misc import process_reg_file
from ubireader.debug import error
+if TYPE_CHECKING:
+ from collections.abc import Mapping
+ from ubireader.ubifs import ubifs as Ubifs, nodes
+ from ubireader.ubifs.walk import Inode
-def list_files(ubifs, list_path):
+def list_files(ubifs: Ubifs, list_path: str) -> None:
pathnames = list_path.split("/")
- pnames = []
+ pnames: list[str] = []
for i in pathnames:
if len(i) > 0:
pnames.append(i)
try:
- inodes = {}
- bad_blocks = []
+ inodes: dict[int, Inode] = {}
+ bad_blocks: list[int] = []
walk.index(ubifs, ubifs.master_node.root_lnum, ubifs.master_node.root_offs, inodes, bad_blocks)
@@ -60,9 +65,9 @@ def list_files(ubifs, list_path):
error(list_files, 'Error', '%s' % e)
-def copy_file(ubifs, filepath, destpath):
+def copy_file(ubifs: Ubifs, filepath: str, destpath: str) -> bool:
pathnames = filepath.split("/")
- pnames = []
+ pnames: list[str] = []
for i in pathnames:
if len(i) > 0:
pnames.append(i)
@@ -70,8 +75,8 @@ def copy_file(ubifs, filepath, destpath):
filename = pnames[len(pnames)-1]
del pnames[-1]
- inodes = {}
- bad_blocks = []
+ inodes: dict[int, Inode] = {}
+ bad_blocks: list[int] = []
walk.index(ubifs, ubifs.master_node.root_lnum, ubifs.master_node.root_offs, inodes, bad_blocks)
@@ -97,7 +102,7 @@ def copy_file(ubifs, filepath, destpath):
return False
-def find_dir(inodes, inum, names, idx):
+def find_dir(inodes: Mapping[int, Inode], inum: int, names: list[str], idx: int) -> int | None:
if len(names) == 0:
return 1
for dent in inodes[inum]['dent']:
@@ -109,7 +114,7 @@ def find_dir(inodes, inum, names, idx):
return None
-def print_dent(ubifs, inodes, dent_node, long=True, longts=False):
+def print_dent(ubifs: Ubifs, inodes: Mapping[int, Inode], dent_node: nodes.dent_node, long: bool = True, longts: bool = False) -> None:
inode = inodes[dent_node.inum]
if long:
fl = file_leng(ubifs, inode)
@@ -128,7 +133,7 @@ def print_dent(ubifs, inodes, dent_node, long=True, longts=False):
print(dent_node.name)
-def file_leng(ubifs, inode):
+def file_leng(ubifs: Ubifs, inode: Inode) -> int:
fl = 0
if 'data' in inode:
compr_type = 0
diff --git a/ubireader/ubifs/misc.py b/ubireader/ubifs/misc.py
index 792703c..30dbb33 100755
--- a/ubireader/ubifs/misc.py
+++ b/ubireader/ubifs/misc.py
@@ -17,6 +17,8 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
+from typing import TYPE_CHECKING, TypedDict
from lzallright import LZOCompressor
import struct
import zlib
@@ -25,13 +27,22 @@
from ubireader.debug import error
from ubireader.ubifs.decrypt import lookup_inode_nonce, derive_key_from_nonce, datablock_decrypt
+if TYPE_CHECKING:
+ from collections.abc import Mapping
+ from ubireader.ubifs import ubifs as Ubifs
+ from ubireader.ubifs.walk import Inode
+
# For happy printing
ino_types = ['file', 'dir','lnk','blk','chr','fifo','sock']
node_types = ['ino','data','dent','xent','trun','pad','sb','mst','ref','idx','cs','orph']
key_types = ['ino','data','dent','xent']
+class ParsedKey(TypedDict):
+ type: str
+ ino_num: int
+ khash: int
-def parse_key(key):
+def parse_key(key: bytes) -> ParsedKey:
"""Parse node key
Arguments:
@@ -51,7 +62,7 @@ def parse_key(key):
return {'type':key_type, 'ino_num':ino_num, 'khash': khash}
-def decompress(ctype, unc_len, data):
+def decompress(ctype: int, unc_len: int, data: bytes) -> bytes | None:
"""Decompress data.
Arguments:
@@ -76,7 +87,7 @@ def decompress(ctype, unc_len, data):
return data
-def process_reg_file(ubifs, inode, path, inodes):
+def process_reg_file(ubifs: Ubifs, inode: Inode, path: str, inodes: Mapping[int, Inode]) -> bytes:
try:
buf = bytearray()
start_key = (UBIFS_DATA_KEY << UBIFS_S_KEY_BLOCK_BITS)
diff --git a/ubireader/ubifs/nodes.py b/ubireader/ubifs/nodes.py
index 321cf45..29e7ad5 100755
--- a/ubireader/ubifs/nodes.py
+++ b/ubireader/ubifs/nodes.py
@@ -17,10 +17,16 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
+from typing import TYPE_CHECKING, Any
from ubireader.ubifs.misc import parse_key
from ubireader.ubifs.defines import *
from ubireader.ubifs import display
+if TYPE_CHECKING:
+ from collections.abc import Iterator
+ from ubireader.ubifs.misc import ParsedKey
+
class common_hdr(object):
"""Get common header at given LEB number + offset.
@@ -29,7 +35,17 @@ class common_hdr(object):
See ubifs/defines.py for object attributes.
"""
- def __init__(self, buf):
+ errors: list[str]
+
+ magic: int
+ crc: int
+ sqnum: int
+ len: int
+ node_type: int
+ group_type: int
+ padding: bytes
+
+ def __init__(self, buf: bytes) -> None:
fields = dict(list(zip(UBIFS_COMMON_HDR_FIELDS, struct.unpack(UBIFS_COMMON_HDR_FORMAT, buf))))
for key in fields:
@@ -37,15 +53,15 @@ def __init__(self, buf):
setattr(self, 'errors', [])
- def __repr__(self):
+ def __repr__(self) -> str:
return 'UBIFS Common Header'
- def __iter__(self):
+ def __iter__(self) -> Iterator[tuple[str, Any]]:
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
- def display(self, tab=''):
+ def display(self, tab: str = '') -> str:
return display.common_hdr(self, tab)
@@ -57,7 +73,32 @@ class ino_node(object):
See ubifs/defines.py for object attributes.
"""
- def __init__(self, buf):
+ data: bytes
+ errors: list[str]
+
+ key: ParsedKey
+ creat_sqnum: int
+ size: int
+ atime_sec: int
+ ctime_sec: int
+ mtime_sec: int
+ atime_nsec: int
+ ctime_nsec: int
+ mtime_nsec: int
+ nlink: int
+ uid: int
+ gid: int
+ mode: int
+ flags: int
+ data_len: int
+ xattr_cnt: int
+ xattr_size: int
+ padding1: bytes
+ xattr_names: int
+ compr_type: int
+ padding2: bytes
+
+ def __init__(self, buf: bytes) -> None:
fields = dict(list(zip(UBIFS_INO_NODE_FIELDS, struct.unpack(UBIFS_INO_NODE_FORMAT, buf[0:UBIFS_INO_NODE_SZ]))))
for key in fields:
@@ -69,15 +110,15 @@ def __init__(self, buf):
setattr(self, 'data', buf[UBIFS_INO_NODE_SZ:])
setattr(self, 'errors', [])
- def __repr__(self):
+ def __repr__(self) -> str:
return 'UBIFS Ino Node'
- def __iter__(self):
+ def __iter__(self) -> Iterator[tuple[str, Any]]:
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
- def display(self, tab=''):
+ def display(self, tab: str = '') -> str:
return display.ino_node(self, tab)
class xent_node(object):
@@ -88,7 +129,17 @@ class xent_node(object):
See ubifs/defines.py for object attributes.
"""
- def __init__(self, buf):
+ name: str
+ error: list[str]
+
+ key: ParsedKey
+ inum: int
+ padding1: int
+ type: int
+ nlen: int
+ cookie: int
+
+ def __init__(self, buf: bytes) -> None:
fields = dict(zip(UBIFS_XENT_NODE_FIELDS, struct.unpack(UBIFS_XENT_NODE_FORMAT, buf[0:UBIFS_XENT_NODE_SZ])))
for key in fields:
if key == 'key':
@@ -106,7 +157,7 @@ def __iter__(self):
if not key.startswith('_'):
yield key, getattr(self, key)
- def display(self, tab=''):
+ def display(self, tab: str = '') -> str:
return display.dent_node(self, tab)
class dent_node(object):
@@ -117,7 +168,18 @@ class dent_node(object):
See ubifs/defines.py for object attributes.
"""
- def __init__(self, buf):
+ raw_name: bytes
+ name: str
+ errors: list[str]
+
+ key: ParsedKey
+ inum: int
+ padding1: int
+ type: int
+ nlen: int
+ cookie: int
+
+ def __init__(self, buf: bytes) -> None:
fields = dict(list(zip(UBIFS_DENT_NODE_FIELDS, struct.unpack(UBIFS_DENT_NODE_FORMAT, buf[0:UBIFS_DENT_NODE_SZ]))))
for key in fields:
if key == 'key':
@@ -128,15 +190,15 @@ def __init__(self, buf):
setattr(self, 'name', "")
setattr(self, 'errors', [])
- def __repr__(self):
+ def __repr__(self) -> str:
return 'UBIFS Directory Entry Node'
- def __iter__(self):
+ def __iter__(self) -> Iterator[tuple[str, Any]]:
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
- def display(self, tab=''):
+ def display(self, tab: str = '') -> str:
return display.dent_node(self, tab)
@@ -149,7 +211,16 @@ class data_node(object):
See ubifs/defines.py for object attributes.
"""
- def __init__(self, buf, file_offset):
+ offset: int
+ compr_len: int
+ errors: list[str]
+
+ key: ParsedKey
+ size: int
+ compr_type: int
+ plaintext_size: int
+
+ def __init__(self, buf: bytes, file_offset: int) -> None:
fields = dict(list(zip(UBIFS_DATA_NODE_FIELDS, struct.unpack(UBIFS_DATA_NODE_FORMAT, buf[0:UBIFS_DATA_NODE_SZ]))))
for key in fields:
@@ -162,15 +233,15 @@ def __init__(self, buf, file_offset):
setattr(self, 'compr_len', (len(buf) - UBIFS_DATA_NODE_SZ))
setattr(self, 'errors', [])
- def __repr__(self):
+ def __repr__(self) -> str:
return 'UBIFS Data Node'
- def __iter__(self):
+ def __iter__(self) -> Iterator[tuple[str, Any]]:
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
- def display(self, tab=''):
+ def display(self, tab: str = '') -> str:
return display.data_node(self, tab)
@@ -182,7 +253,13 @@ class idx_node(object):
See ubifs/defines.py for object attributes.
"""
- def __init__(self, buf):
+ branches: list[branch]
+ errors: list[int]
+
+ child_cnt: int
+ level: int
+
+ def __init__(self, buf: bytes) -> None:
fields = dict(list(zip(UBIFS_IDX_NODE_FIELDS, struct.unpack(UBIFS_IDX_NODE_FORMAT, buf[0:UBIFS_IDX_NODE_SZ]))))
for key in fields:
setattr(self, key, fields[key])
@@ -194,15 +271,15 @@ def __init__(self, buf):
setattr(self, 'branches', [branch(buf[idxs+(brs*i):idxs+(brs*i)+brs]) for i in range(0, self.child_cnt)])
setattr(self, 'errors', [])
- def __repr__(self):
+ def __repr__(self) -> str:
return 'UBIFS Index Node'
- def __iter__(self):
+ def __iter__(self) -> Iterator[tuple[str, Any]]:
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
- def display(self, tab=''):
+ def display(self, tab: str = '') -> str:
return display.idx_node(self, tab)
@@ -212,7 +289,15 @@ class branch(object):
Arguments:
Bin:buf -- Raw data to extract header information from.
"""
- def __init__(self, buf):
+ hash: bytes
+ errors: list[str]
+
+ lnum: int
+ offs: int
+ len: int
+ key: ParsedKey
+
+ def __init__(self, buf: bytes) -> None:
fields = dict(list(zip(UBIFS_BRANCH_FIELDS, struct.unpack(UBIFS_BRANCH_FORMAT, buf[0:UBIFS_BRANCH_SZ]))))
for key in fields:
if key == 'key':
@@ -225,15 +310,15 @@ def __init__(self, buf):
setattr(self, 'errors', [])
- def __repr__(self):
+ def __repr__(self) -> str:
return 'UBIFS Branch'
- def __iter__(self):
+ def __iter__(self) -> Iterator[tuple[str, Any]]:
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
- def display(self, tab=''):
+ def display(self, tab: str = '') -> str:
return display.branch(self, tab)
@@ -246,7 +331,39 @@ class sb_node(object):
See ubifs/defines.py for object attributes.
"""
- def __init__(self, buf, file_offset=-1):
+ errors: list[str]
+
+ padding: bytes
+ key_hash: int
+ key_fmt: int
+ flags: int
+ min_io_size: int
+ leb_size: int
+ leb_cnt: int
+ max_leb_cnt: int
+ max_bud_bytes: int
+ log_lebs: int
+ lpt_lebs: int
+ orph_lebs: int
+ jhead_cnt: int
+ fanout: int
+ lsave_cnt: int
+ fmt_version: int
+ default_compr: int
+ padding1: bytes
+ rp_uid: int
+ rp_gid: int
+ rp_size: int
+ time_gran: int
+ uuid: bytes
+ ro_compat_version: int
+ hmac: bytes
+ hmac_wkm: bytes
+ hash_algo: int
+ hash_mst: bytes
+ padding2: bytes
+
+ def __init__(self, buf: bytes, file_offset: int = -1) -> None:
self.file_offset = file_offset
fields = dict(list(zip(UBIFS_SB_NODE_FIELDS, struct.unpack(UBIFS_SB_NODE_FORMAT, buf))))
for key in fields:
@@ -254,15 +371,15 @@ def __init__(self, buf, file_offset=-1):
setattr(self, 'errors', [])
- def __repr__(self):
+ def __repr__(self) -> str:
return 'UBIFS Super Block Node'
- def __iter__(self):
+ def __iter__(self) -> Iterator[tuple[str, Any]]:
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
- def display(self, tab=''):
+ def display(self, tab: str = '') -> str:
return display.sb_node(self, tab)
@@ -275,7 +392,42 @@ class mst_node(object):
See ubifs/defines.py for object attributes.
"""
- def __init__(self, buf, file_offset=-1):
+ errors: list[str]
+
+ highest_inum: int
+ cmt_no: int
+ flags: int
+ log_lnum: int
+ root_lnum: int
+ root_offs: int
+ root_len: int
+ gc_lnum: int
+ ihead_lnum: int
+ ihead_offs: int
+ index_size: int
+ total_free: int
+ total_dirty: int
+ total_used: int
+ total_dead: int
+ total_dark: int
+ lpt_lnum: int
+ lpt_offs: int
+ nhead_lnum: int
+ nhead_offs: int
+ ltab_lnum: int
+ ltab_offs: int
+ lsave_lnum: int
+ lsave_offs: int
+ lscan_lnum: int
+ empty_lebs: int
+ idx_lebs: int
+ leb_cnt: int
+ hash_root_idx: int
+ hash_lpt: bytes
+ hmac: bytes
+ padding: bytes
+
+ def __init__(self, buf: bytes, file_offset: int = -1) -> None:
self.file_offset = file_offset
fields = dict(list(zip(UBIFS_MST_NODE_FIELDS, struct.unpack(UBIFS_MST_NODE_FORMAT, buf))))
for key in fields:
@@ -283,13 +435,13 @@ def __init__(self, buf, file_offset=-1):
setattr(self, 'errors', [])
- def __repr__(self):
+ def __repr__(self) -> str:
return 'UBIFS Master Block Node'
- def __iter__(self):
+ def __iter__(self) -> Iterator[tuple[str, Any]]:
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)
- def display(self, tab=''):
+ def display(self, tab: str = '') -> str:
return display.mst_node(self, tab)
diff --git a/ubireader/ubifs/output.py b/ubireader/ubifs/output.py
index 349ef58..a518e59 100755
--- a/ubireader/ubifs/output.py
+++ b/ubireader/ubifs/output.py
@@ -17,8 +17,10 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
import os
import struct
+from typing import TYPE_CHECKING
from ubireader.ubifs.decrypt import decrypt_symlink_target
from ubireader import settings
@@ -27,13 +29,18 @@
from ubireader.ubifs.misc import process_reg_file
from ubireader.debug import error, log, verbose_log
-def is_safe_path(basedir, path):
+if TYPE_CHECKING:
+ from collections.abc import Mapping
+ from ubireader.ubifs import ubifs as Ubifs, nodes
+ from ubireader.ubifs.walk import Inode
+
+def is_safe_path(basedir: str, path: str) -> bool:
basedir = os.path.realpath(basedir)
path = os.path.realpath(os.path.join(basedir, path))
return True if path.startswith(basedir) else False
-def extract_files(ubifs, out_path, perms=False):
+def extract_files(ubifs: Ubifs, out_path: str, perms: bool = False) -> None:
"""Extract UBIFS contents to_path/
Arguments:
@@ -41,8 +48,8 @@ def extract_files(ubifs, out_path, perms=False):
Str:out_path -- Path to extract contents to.
"""
try:
- inodes = {}
- bad_blocks = []
+ inodes: dict[int, Inode] = {}
+ bad_blocks: list[int] = []
walk.index(ubifs, ubifs.master_node.root_lnum, ubifs.master_node.root_offs, inodes, bad_blocks)
@@ -59,7 +66,7 @@ def extract_files(ubifs, out_path, perms=False):
error(extract_files, 'Error', '%s' % e)
-def extract_dents(ubifs, inodes, dent_node, path='', perms=False):
+def extract_dents(ubifs: Ubifs, inodes: Mapping[int, Inode], dent_node: nodes.dent_node, path: str = '', perms: bool = False) -> None:
if dent_node.inum not in inodes:
error(extract_dents, 'Error', 'inum: %s not found in inodes' % (dent_node.inum))
return
diff --git a/ubireader/ubifs/walk.py b/ubireader/ubifs/walk.py
index 0b8802f..eb97262 100755
--- a/ubireader/ubifs/walk.py
+++ b/ubireader/ubifs/walk.py
@@ -17,13 +17,26 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
+from collections.abc import MutableMapping
+from typing import TYPE_CHECKING, TypedDict
from ubireader import settings
from ubireader.ubifs import nodes
from ubireader.ubifs.defines import *
from ubireader.debug import error, log, verbose_log, verbose_display
from ubireader.ubifs.decrypt import decrypt_filenames
-def index(ubifs, lnum, offset, inodes={}, bad_blocks=[]):
+if TYPE_CHECKING:
+ from ubireader.ubifs import ubifs as Ubifs
+
+class Inode(TypedDict, total=False):
+ ino: nodes.ino_node
+ data: list[nodes.data_node]
+ dent: list[nodes.dent_node]
+ xent: list[nodes.xent_node]
+ hlink: str
+
+def index(ubifs: Ubifs, lnum: int, offset: int, inodes: MutableMapping[int, Inode] = {}, bad_blocks: list[int] = []) -> None:
"""Walk the index gathering Inode, Dir Entry, and File nodes.
Arguments:
@@ -42,7 +55,7 @@ def index(ubifs, lnum, offset, inodes={}, bad_blocks=[]):
_index(ubifs, lnum, offset, inodes, bad_blocks)
decrypt_filenames(ubifs, inodes)
-def _index(ubifs, lnum, offset, inodes={}, bad_blocks=[]):
+def _index(ubifs: Ubifs, lnum: int, offset: int, inodes: MutableMapping[int, Inode] = {}, bad_blocks: list[int] = []) -> None:
"""Walk the index gathering Inode, Dir Entry, and File nodes.
Arguments:
diff --git a/ubireader/utils.py b/ubireader/utils.py
index 1793285..ce57464 100755
--- a/ubireader/utils.py
+++ b/ubireader/utils.py
@@ -17,13 +17,14 @@
# along with this program. If not, see .
#############################################################
+from __future__ import annotations
import re
from ubireader.debug import error, log
from ubireader.ubi.defines import UBI_EC_HDR_MAGIC, FILE_CHUNK_SZ
from ubireader.ubifs.defines import UBIFS_NODE_MAGIC, UBIFS_SB_NODE_SZ, UBIFS_SB_NODE, UBIFS_COMMON_HDR_SZ
from ubireader.ubifs import nodes
-def guess_start_offset(path, guess_offset=0):
+def guess_start_offset(path: str, guess_offset: int =0) -> int | None:
file_offset = guess_offset
f = open(path, 'rb')
@@ -60,7 +61,7 @@ def guess_start_offset(path, guess_offset=0):
f.close()
-def guess_filetype(path, start_offset=0):
+def guess_filetype(path: str, start_offset: int = 0) -> bytes | None:
log(guess_filetype, 'Looking for file type at %s' % start_offset)
with open(path, 'rb') as f:
@@ -81,7 +82,7 @@ def guess_filetype(path, start_offset=0):
return ftype
-def guess_leb_size(path):
+def guess_leb_size(path: str) -> int | None:
"""Get LEB size from superblock
Arguments:
@@ -125,7 +126,7 @@ def guess_leb_size(path):
return block_size
-def guess_peb_size(path):
+def guess_peb_size(path: str) -> int | None:
"""Determine the most likely block size
Arguments:
@@ -138,7 +139,7 @@ def guess_peb_size(path):
common length between them.
"""
file_offset = 0
- offsets = []
+ offsets: list[int] = []
f = open(path, 'rb')
f.seek(0,2)
file_size = f.tell()+1
@@ -160,7 +161,7 @@ def guess_peb_size(path):
file_offset += FILE_CHUNK_SZ
f.close()
- occurrences = {}
+ occurrences: dict[int, int] = {}
for i in range(0, len(offsets)):
try:
diff = offsets[i] - offsets[i-1]