diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml index f9cba9b..1f65ddd 100644 --- a/.github/workflows/python-publish.yml +++ b/.github/workflows/python-publish.yml @@ -16,8 +16,29 @@ permissions: contents: read jobs: + pytest-code: + runs: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install dependencies + run: python -m pip install --upgrade pip setuptools wheel -r requirements.txt + + - name: Test with pytest + run: | + coverage run -m pytest -v -s + + - name: Generate Coverage Report + run: | + coverage report -m + release-build: runs-on: ubuntu-latest + needs: [pytest-code] steps: - uses: actions/checkout@v4 diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml new file mode 100644 index 0000000..f346b86 --- /dev/null +++ b/.github/workflows/unit-tests.yml @@ -0,0 +1,25 @@ +name: Unit Test GCBrickWork + +on: + workflow_dispatch: + +jobs: + pytest-code: + runs: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install dependencies + run: python -m pip install --upgrade pip setuptools wheel -r requirements.txt + + - name: Test with pytest + run: | + coverage run -m pytest -v -s + + - name: Generate Coverage Report + run: | + coverage report -m \ No newline at end of file diff --git a/README.md b/README.md index 520ac11..06a4462 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ The structure of these files break down in the following way: ### Jump / JMP These types of files typically table-like structures that are loaded into RAM during run-time. +These files are similar to modern day data-tables. * JMP Files contain a giant header block and data entry block. * The header block contains the definition of all field headers (columns) and field level data. Loads the first 16 bytes to determine (in order): * How many data entries there are @@ -50,6 +51,7 @@ These types of files typically table-like structures that are loaded into RAM du * The next 2 bytes represent the starting byte for the field within a given data line in the JMP file. * The second to last byte represents the shift bytes, which is required when reading certain field data. * The last byte represents the data type, as defined as either Int, Str, or Floats. + * Order of the JMPFileHeaders does not matter in JMP files, as long as all fields used are defined. * The data block contains the table row data one line at a time. * Each row is represented by multiple columns of data, each of which should match to a JMP field header and its respective value type (Int, Str, Float, etc.) * It should be noted that there will be extra bytes typically at the end of a jmp file, which are padded with "@". diff --git a/gcbrickwork/JMP.py b/gcbrickwork/JMP.py index f00fb7f..f65e754 100644 --- a/gcbrickwork/JMP.py +++ b/gcbrickwork/JMP.py @@ -8,7 +8,37 @@ JMP_STRING_BYTE_LENGTH = 32 type JMPValue = int | str | float -type JMPEntry = dict[JMPFieldHeader, JMPValue] + + +class JMPEntry(dict["JMPFieldHeader", JMPValue]): + """ + A JMP entry (row) that allows accessing fields by string name or JMPFieldHeader. + This is a simple wrapper around a dict to allow getting values by string instead of having to use JMPFieldHeaders. + """ + def _find_entry_field(self, jmp_field: "int | str | JMPFieldHeader") -> "JMPFieldHeader": + """Finds a specific JMP field by its hash value or field name. Can return None as well if no field found.""" + if isinstance(jmp_field, str): + field: JMPFieldHeader = next((field for field in self.keys() if field.field_name == jmp_field), None) + elif isinstance(jmp_field, int): + field: JMPFieldHeader = next((field for field in self.keys() if field.field_hash == jmp_field), None) + elif isinstance(jmp_field, JMPFieldHeader): + field = jmp_field + else: + raise ValueError(f"Cannot index JMPEntry with value of type {type(jmp_field)}") + + if field is None: + raise KeyError(f"No JMPHeaderField was found with name/hash '{str(jmp_field)}'") + return field + + + def __getitem__(self, key: "str | int | JMPFieldHeader") -> JMPValue: + """Gets a specific JMPHeaderField by its name, hash, or field directly.""" + return super().__getitem__(self._find_entry_field(key)) + + + def __setitem__(self, key: "str | int | JMPFieldHeader", value: JMPValue): + """Updates a specific JMPHeaderField by its name, hash, or field directly to the provided value.""" + super().__setitem__(self._find_entry_field(key), value) class JMPFileError(Exception): @@ -41,6 +71,7 @@ class JMPFieldHeader: field_shift_byte: int = 0 field_data_type: JMPType = None + def __init__(self, jmp_hash: int, jmp_bitmask: int, jmp_start_byte: int, jmp_shift_byte: int, jmp_data_type: int): self.field_hash = jmp_hash self.field_name = str(self.field_hash) @@ -49,35 +80,73 @@ def __init__(self, jmp_hash: int, jmp_bitmask: int, jmp_start_byte: int, jmp_shi self.field_shift_byte = jmp_shift_byte self.field_data_type = JMPType(jmp_data_type) + def __str__(self): return str(self.__dict__) + def __hash__(self): - return self.field_hash + return id(self) + + + def __eq__(self, other): + return self is other + + + def validate_header(self): + if not isinstance(self.field_hash, int): + raise JMPFileError("JMPFieldHeader Field Hash must be of type integer.") + elif not (0 <= self.field_hash <= 2**32 - 1): + raise JMPFileError(f"JMPFieldHeader Field Hash must be between 0 and '{str(2**32 - 1)}'") + + if not isinstance(self.field_bitmask, int): + raise JMPFileError("JMPFieldHeader Field BitMask must be of type integer.") + elif not (0 <= self.field_bitmask <= 2**32-1): + raise JMPFileError(f"JMPFieldHeader Field BitMask must be between 0 and '{str(2**32-1)}'") + + if not isinstance(self.field_start_byte, int): + raise JMPFileError("JMPFieldHeader Start Byte must be of type integer.") + elif not self.field_start_byte % 4 == 0: + raise JMPFileError("JMPFieldHeader Start Byte must be divisible by '4'.") + elif not (0 <= self.field_start_byte <= 2**16 - 1): + raise JMPFileError(f"JMPFieldHeader Start Byte must be between 0 and '{str(2**16 - 1)}'") + + if not isinstance(self.field_shift_byte, int): + raise JMPFileError("JMPFieldHeader Shift Byte must be of type integer.") + elif not (0 <= self.field_shift_byte <= 2**8 - 1): + raise JMPFileError(f"JMPFieldHeader Shift Byte must be between 0 and '{str(2**8 - 1)}'") + class JMP: """ JMP Files are table-structured format files that contain a giant header block and data entry block. - The header block contains the definition of all field headers (columns) and field level data + These files remark a similar structure to modern day data tables. + The header block contains the definition of all field headers (columns) and field data + Definition of these headers does not matter. The data block contains the table row data one line at a time. Each row is represented as a single list index, where a dictionary maps the key (column) to the value. JMP Files also start with 16 bytes that are useful to explain the rest of the structure of the file. """ - data_entries: list[JMPEntry] = [] + _data_entries: list[JMPEntry] = [] _fields: list[JMPFieldHeader] = [] - def __init__(self, data_entries: list[JMPEntry]): - if not self._validate_all_entries(): - raise JMPFileError("One or more data_entry's have either extra JMPFieldHeaders or less.\n" + - "Each data_entry should share the exact same number of JMPFieldHeaders, even if they are 0/empty.") + def __init__(self, fields: list[JMPFieldHeader], data_entries: list[JMPEntry]): + self._fields = fields + self.validate_jmp_fields() + self._data_entries = data_entries + self.validate_all_jmp_entries() - self.data_entries = data_entries - if data_entries is None or len(data_entries) == 0: - self._fields = [] - else: - self._update_list_of_headers() + + def validate_jmp_fields(self): + """Validates that the list of JMPFieldHeaders have correct information and confirms no duplicates are found.""" + field_hashes: list[int] = [] + for j_field in self._fields: + if j_field.field_hash in field_hashes: + raise JMPFileError(f"JMPFieldHeader with hash '{str(j_field.field_hash)}' already exists in JMPFieldHeaderList.") + j_field.validate_header() + field_hashes.append(j_field.field_hash) @property @@ -86,38 +155,33 @@ def fields(self) -> list[JMPFieldHeader]: return self._fields - @classmethod - def load_jmp(cls, jmp_data: BytesIO): - """ - Loads the first 16 bytes to determine (in order): how many data entries there are, how many fields are defined, - Gives the total size of the header block, and the number of data files that are defined in the file. - Each of these are 4 bytes long, with the first 8 bytes being signed integers and the second 8 bytes are unsigned. - It should be noted that there will be extra bytes typically at the end of a jmp file, which are padded with "@". - These paddings can be anywhere from 1 to 31 bytes, up until the total bytes is divisible by 32. - """ - original_file_size = jmp_data.seek(0, 2) + def add_jmp_header(self, jmp_field: JMPFieldHeader, default_val: JMPValue): + """Adds a new JMPFieldHeader and a default value to all existing data entries.""" + jmp_field.validate_header() + if jmp_field in self._fields or jmp_field.field_hash in [f.field_hash for f in self._fields]: + raise JMPFileError(f"JMPFieldHeader with hash '{str(jmp_field.field_hash)}' already exists in JMPFieldHeaderList.") - # Get important file bytes - data_entry_count: int = read_s32(jmp_data, 0) - field_count: int = read_s32(jmp_data, 4) - header_block_size: int = read_u32(jmp_data, 8) - single_entry_size: int = read_u32(jmp_data, 12) + self._fields.append(jmp_field) + for data_entry in self._data_entries: + data_entry[jmp_field] = default_val - # Load all headers of this file - header_size: int = header_block_size - 16 # JMP Field details start after the above 16 bytes - if (header_size % JMP_HEADER_SIZE != 0 or not (header_size / JMP_HEADER_SIZE) == field_count or - header_block_size > original_file_size): - raise JMPFileError("When trying to read the header block of the JMP file, the size was bigger than " + - "expected and could not be parsed properly.") - fields = _load_headers(jmp_data, field_count) - # Load all data entries / rows of this table. - if header_block_size + (single_entry_size * data_entry_count) > original_file_size: - raise JMPFileError("When trying to read the date entries block of the JMP file, the size was bigger than " + - "expected and could not be parsed properly.") - entries = _load_entries(jmp_data, data_entry_count, single_entry_size, header_block_size, fields) + def delete_jmp_header(self, field_key: str | int | JMPFieldHeader): + """Deletes a JMPFieldHeader based on the provided field name, hash, or field itself. + Automatically removes the field from all data entries as well, to avoid issues later on.""" + if isinstance(field_key, str) or isinstance(field_key, int): + field = self.find_jmp_header(field_key) + elif isinstance(field_key, JMPFieldHeader): + field = field_key + else: + raise ValueError(f"Cannot index JMPEntry with value of type {type(field_key)}") + + if field is None: + return - return cls(entries) + self._fields.remove(field) + for data_entry in self._data_entries: + del data_entry[field] def map_hash_to_name(self, field_names: dict[int, str]): @@ -125,116 +189,135 @@ def map_hash_to_name(self, field_names: dict[int, str]): Using the user provided dictionary, maps out the field hash to their designated name, making it easier to query. """ for key, val in field_names.items(): - jmp_field: JMPFieldHeader = self._find_field_by_hash(key) + jmp_field: JMPFieldHeader = self.find_jmp_header(key) if jmp_field is None: continue jmp_field.field_name = val - def _find_field_by_hash(self, jmp_field_hash: int) -> JMPFieldHeader | None: - """Finds a specific JMP field by its hash value. Can return None as well if no field found.""" - return next((j_field for j_field in self._fields if j_field.field_hash == jmp_field_hash), None) - - - def _find_field_by_name(self, jmp_field_name: str) -> JMPFieldHeader | None: - """Finds a specific JMP field by its field name. Can return None as well if no field found.""" - return next((j_field for j_field in self._fields if j_field.field_name == jmp_field_name), None) - - - def add_jmp_header(self, jmp_field: JMPFieldHeader, default_val: JMPValue): - """Adds a new JMPFieldHeader and a default value to all existing data entries.""" - if not jmp_field.field_start_byte % 4 == 0: - raise JMPFileError("JMPFieldHeader start bytes must be divisible by 4") - - self._fields.append(jmp_field) + def find_jmp_header(self, field_key: str | int) -> JMPFieldHeader | None: + """Finds a JMPFieldHeader based on either the field's name or its hash.""" + if isinstance(field_key, str): + return next((j_field for j_field in self._fields if j_field.field_name == field_key), None) + elif isinstance(field_key, int): + return next((j_field for j_field in self._fields if j_field.field_hash == field_key), None) + else: + raise ValueError(f"Cannot index JMPEntry with value of type {type(field_key)}") - for data_entry in self.data_entries: - data_entry[jmp_field] = default_val + @property + def data_entries(self) -> list[JMPEntry]: + """Returns the list of JMPEntry (rows) that are defined in this file.""" + return self._data_entries - def check_header_name_has_value(self, jmp_entry: JMPEntry, field_name: str, field_value: JMPValue) -> bool: - """With the given jmp_entry, searches each header name to see if the name and value match.""" - if not jmp_entry in self.data_entries: - raise JMPFileError("Provided entry does not exist in the current list of JMP data entries.") - return any((jmp_field, jmp_value) for (jmp_field, jmp_value) in jmp_entry.items() if - jmp_field.field_name == field_name and jmp_entry[jmp_field] == field_value) + def clear_data_entries(self): + """Resets data_entries into an empty list (no rows defined)""" + self._data_entries = [] - def check_header_hash_has_value(self, jmp_entry: JMPEntry, field_hash: int, field_value: JMPValue) -> bool: - """With the given jmp_entry, searches each header hash to see if the name and value match.""" - if not jmp_entry in self.data_entries: - raise JMPFileError("Provided entry does not exist in the current list of JMP data entries.") + def delete_jmp_entry(self, jmp_entry: int | JMPEntry): + """Deletes a JMPEntry by either the Entry itself or the index number.""" + if isinstance(jmp_entry, int): + entry: JMPEntry = self._data_entries[jmp_entry] + elif isinstance(jmp_entry, JMPEntry): + entry: JMPEntry = jmp_entry + else: + raise ValueError(f"Cannot index JMPEntry with value of type {type(jmp_entry)}") - return any((jmp_field, jmp_value) for (jmp_field, jmp_value) in jmp_entry.items() if - jmp_field.field_hash == field_hash and jmp_entry[jmp_field] == field_value) + self._data_entries.remove(entry) - def get_jmp_header_name_value(self, jmp_entry: JMPEntry, field_name: str) -> JMPValue: - """With the given jmp_entry, returns the current value from the provided field name""" - if not jmp_entry in self.data_entries: - raise JMPFileError("Provided entry does not exist in the current list of JMP data entries.") + def add_jmp_entry(self, jmp_entry: dict[str | int, JMPValue] | JMPEntry): + """Adds a new data entry using field names or hashes as keys with complete field validation.""" + if not self._fields: + raise JMPFileError("Cannot add a JMPEntry to the JMP with no defined fields.") + elif jmp_entry is None or len(jmp_entry.keys()) == 0: + raise JMPFileError("Cannot add an empty JMPEntry to the JMP.") - jmp_field: JMPFieldHeader = self._find_field_by_name(field_name) - if jmp_field is None: - raise JMPFileError(f"No JMP field with name '{field_name}' was found in the provided entry.") + self._data_entries.append(self.validate_jmp_entry(jmp_entry)) - if not jmp_field in self._fields: - raise JMPFileError("Although a JMP field was found for this entry, it does not exist in the list " + - "of fields for the JMP file. Please ensure to properly add this field via the 'add_jmp_header' function") - return jmp_entry[jmp_field] + def validate_jmp_entry(self, entry_data: dict[str | int, JMPValue] | JMPEntry) -> JMPEntry: + """Validates the current JMPEntry does not have invalid fields, missing required fields, and correct values. + If a required field (which is a field defined in the self.fields), a JMPFIleError is thrown.""" + entry_to_use: JMPEntry = JMPEntry() + invalid_fields: list[str] = [] + for key, val in entry_data.items(): + if isinstance(key, str) or isinstance(key, int): + jmp_field: JMPFieldHeader = self.find_jmp_header(key) + if jmp_field is None: + invalid_fields.append(f"'{str(key)}' {"(name)" if isinstance(key, str) else "(hash)"}") + continue + entry_to_use[jmp_field] = val + elif isinstance(key, JMPFieldHeader): + if not key in self._fields: + invalid_fields.append(f"(JMPFieldHeader) Name: '{key.field_name}'; Hash: '{str(key.field_hash)}'") + continue - def get_jmp_header_hash_value(self, jmp_entry: JMPEntry, field_hash: int) -> JMPValue: - """With the given jmp_entry, returns the current value from the provided field name""" - if not jmp_entry in self.data_entries: - raise JMPFileError("Provided entry does not exist in the current list of JMP data entries.") + entry_to_use[key] = val + else: + raise JMPFileError("Entry keys must be field names (str) or field hashes (int)") - jmp_field: JMPFieldHeader = self._find_field_by_hash(field_hash) - if jmp_field is None: - raise JMPFileError(f"No JMP field with hash '{str(field_hash)}' was found in the provided entry.") + if invalid_fields: + raise JMPFileError(f"Invalid fields not found in JMP file schema: {', '.join(invalid_fields)}") - if not jmp_field in self._fields: - raise JMPFileError("Although a JMP field was found for this entry, it does not exist in the list " + - "of fields for the JMP file. Please ensure to properly add this field via the 'add_jmp_header' function") + # Validate the entry has all required fields + missing_fields = set(self._fields) - set(entry_to_use.keys()) + if missing_fields: + raise JMPFileError(f"Missing required JMP: {', '.join([f"(JMPFieldHeader) Name: '{f.field_name}'; " + + f"Hash: '{str(f.field_hash)}'" for f in missing_fields])}") - return jmp_entry[jmp_field] + return entry_to_use - def update_jmp_header_name_value(self, jmp_entry: JMPEntry, field_name: str, field_value: JMPValue): - """Updates a JMP header with the provided value in the given JMPEntry""" - if not jmp_entry in self.data_entries: - raise JMPFileError("Provided entry does not exist in the current list of JMP data entries.") + @classmethod + def load_jmp(cls, jmp_data: BytesIO): + """ + Loads the first 16 bytes to determine (in order): how many data entries there are, how many fields are defined, + Gives the total size of the header block, and the number of data files that are defined in the file. + Each of these are 4 bytes long, with the first 8 bytes being signed integers and the second 8 bytes are unsigned. + It should be noted that there will be extra bytes typically at the end of a jmp file, which are padded with "@". + These paddings can be anywhere from 1 to 31 bytes, up until the total bytes is divisible by 32. + """ + original_file_size = jmp_data.seek(0, 2) - jmp_field = self._find_field_by_name(field_name) - jmp_entry[jmp_field] = field_value + # Get important file bytes + data_entry_count: int = read_s32(jmp_data, 0) + field_count: int = read_s32(jmp_data, 4) + header_block_size: int = read_u32(jmp_data, 8) + single_entry_size: int = read_u32(jmp_data, 12) + # Load all headers of this file + header_size: int = header_block_size - 16 # JMP Field details start after the above 16 bytes + if (header_size % JMP_HEADER_SIZE != 0 or not (header_size / JMP_HEADER_SIZE) == field_count or + header_block_size > original_file_size): + raise JMPFileError("When trying to read the header block of the JMP file, the size was bigger than " + + "expected and could not be parsed properly.") + fields = _load_headers(jmp_data, field_count) - def update_jmp_header_hash_value(self, jmp_entry: JMPEntry, field_hash: int, field_value: JMPValue): - """Updates a JMP header with the provided value in the given JMPEntry""" - if not jmp_entry in self.data_entries: - raise JMPFileError("Provided entry does not exist in the current list of JMP data entries.") + # Load all data entries / rows of this table. + if header_block_size + (single_entry_size * data_entry_count) > original_file_size: + raise JMPFileError("When trying to read the date entries block of the JMP file, the size was bigger than " + + "expected and could not be parsed properly.") + entries = _load_entries(jmp_data, data_entry_count, single_entry_size, header_block_size, fields) - jmp_field = self._find_field_by_hash(field_hash) - jmp_entry[jmp_field] = field_value + return cls(fields, entries) def create_new_jmp(self) -> BytesIO: """ - Create a new the file from the fields / data_entries, as new entries / headers could have been added. Keeping the - original structure of: Important 16 header bytes, Header Block, and then the Data entries block. + Create a new the file from the fields / _data_entries, as new entries / headers could have been added. + Keeping the original structure of: Important 16 header bytes, Header Block, and then the Data entries block. """ - if not self._validate_all_entries(): - raise JMPFileError("One or more data_entry's have either extra JMPFieldHeaders or less.\n" + - "Each data_entry should share the exact same number of JMPFieldHeaders, even if they are 0/empty.") - - self._update_list_of_headers() + self.validate_jmp_fields() + self.validate_all_jmp_entries() local_data: BytesIO = BytesIO() single_entry_size: int = self._calculate_entry_size() new_header_size: int = len(self._fields) * JMP_HEADER_SIZE + 16 - write_s32(local_data, 0, len(self.data_entries)) # Amount of data entries + write_s32(local_data, 0, len(self._data_entries)) # Amount of data entries write_s32(local_data, 4, len(self._fields)) # Amount of JMP fields write_u32(local_data, 8, new_header_size) # Size of Header Block write_u32(local_data, 12, single_entry_size) # Size of a single data entry @@ -250,11 +333,6 @@ def create_new_jmp(self) -> BytesIO: return local_data - def _update_list_of_headers(self): - """Using the first data entry, re-build the list of JMP header fields.""" - self._fields = sorted(list(self.data_entries[0].keys()), key=lambda jmp_field: jmp_field.field_start_byte) - - def _update_headers(self, local_data: BytesIO) -> int: """ Add the individual headers to complete the header block """ current_offset: int = 16 @@ -272,7 +350,7 @@ def _update_headers(self, local_data: BytesIO) -> int: def _update_entries(self, local_data: BytesIO, current_offset: int, entry_size: int): """ Add the all the data entry lines. Integers with bitmask 0xFFFFFFFF will write their values directly, while other integers will need to shift/mask their values accordingly.""" - for line_entry in self.data_entries: + for line_entry in self._data_entries: for key, val in line_entry.items(): match key.field_data_type: case JMPType.Int: @@ -299,17 +377,15 @@ def _calculate_entry_size(self) -> int: return sorted_jmp_fields[0].field_start_byte + _get_field_size(JMPType(sorted_jmp_fields[0].field_data_type)) - def _validate_all_entries(self) -> bool: + def validate_all_jmp_entries(self): """ Validates all entries have the same JMPFieldHeaders. All of them must have a value, even if its 0. If a data_entry defines a field that is not shared by the others, it will cause parsing errors later. """ - if self.data_entries is None or len(self.data_entries) == 0: - return True - headers_list: list[list[JMPFieldHeader]] = [] - for entry in self.data_entries: - headers_list.append(sorted(list(entry.keys()), key=lambda j_field: j_field.field_start_byte)) - return all(sublist == headers_list[0] for sublist in headers_list) + if self._data_entries is None or len(self._data_entries) == 0: + return + for jmp_entry in self._data_entries: + self.validate_jmp_entry(jmp_entry) def _load_headers(header_data: BytesIO, field_count: int) -> list[JMPFieldHeader]: @@ -338,19 +414,22 @@ def _load_entries(entry_data: BytesIO, entry_count: int, entry_size: int, header data_entries: list[JMPEntry] = [] for current_entry in range(entry_count): - new_entry: JMPEntry = {} + val_to_use: JMPValue | None = None + new_entry: JMPEntry = JMPEntry() data_entry_start: int = (current_entry * entry_size) + header_size for jmp_header in field_list: match jmp_header.field_data_type: case JMPType.Int: current_val: int = read_u32(entry_data, data_entry_start + jmp_header.field_start_byte) - new_entry[jmp_header] = (current_val & jmp_header.field_bitmask) >> jmp_header.field_shift_byte + val_to_use = (current_val & jmp_header.field_bitmask) >> jmp_header.field_shift_byte case JMPType.Str: - new_entry[jmp_header] = read_str_until_null_character(entry_data, + val_to_use = read_str_until_null_character(entry_data, data_entry_start + jmp_header.field_start_byte, JMP_STRING_BYTE_LENGTH) case JMPType.Flt: - new_entry[jmp_header] = read_float(entry_data, data_entry_start + jmp_header.field_start_byte) + val_to_use = read_float(entry_data, data_entry_start + jmp_header.field_start_byte) + + new_entry[jmp_header] = val_to_use data_entries.append(new_entry) return data_entries diff --git a/gcbrickwork/PRM.py b/gcbrickwork/PRM.py index cc38960..61c48db 100644 --- a/gcbrickwork/PRM.py +++ b/gcbrickwork/PRM.py @@ -89,6 +89,10 @@ def __str__(self): class PRM: + """ PRM Files are parameterized files that have one or more parameters that can be changed/manipulated. + These files typically host values that would change frequently and are read by the program at run-time. + PRM Files start with 4 bytes as an unsigned int to tell how many parameters are defined. + The structure of the entries can be found in PRMFieldEntry. """ data_entries: list[PRMFieldEntry] = [] @@ -98,12 +102,9 @@ def __init__(self, input_entries: list[PRMFieldEntry]): @classmethod def load_prm(cls, prm_data: BytesIO): + """ Loads the various prm values from the file into a list of PRMFieldEntries """ - PRM Files are parameterized files that have one or more parameters that can be changed/manipulated. - These files typically host values that would change frequently and are read by the program at run-time. - PRM Files start with 4 bytes as an unsigned int to tell how many parameters are defined. - The structure of the entries can be found in PRMFieldEntry - """ + entry_value: PRMValue | None = None prm_entries: list[PRMFieldEntry] = [] current_offset: int = 0 num_of_entries: int = read_u32(prm_data, 0) @@ -179,5 +180,17 @@ def create_new_prm(self) -> BytesIO: return local_data - def get_entry(self, field_name: str) -> PRMFieldEntry: - return next(entry for entry in self.data_entries if entry.field_name == field_name) \ No newline at end of file + def get_prm_entry(self, prm_field: str | int) -> PRMFieldEntry: + """Gets a PRMFieldEntry based on a provided field name/hash.""" + if isinstance(prm_field, str): + return next(prm_entry for prm_entry in self.data_entries if prm_entry.field_name == prm_field) + elif isinstance(prm_field, int): + return next(prm_entry for prm_entry in self.data_entries if prm_entry.field_hash == prm_field) + else: + raise ValueError(f"Cannot index PRMFieldEntry with value of type {type(prm_field)}") + + + def update_prm_entry(self, prm_field: str | int, prm_value: PRMValue): + """Updates a PRMFieldEntry based on a provided field/value.""" + prm_entry: PRMFieldEntry = self.get_prm_entry(prm_field) + prm_entry.field_value = prm_value \ No newline at end of file diff --git a/gcbrickwork/__init__.py b/gcbrickwork/__init__.py index bffc172..ca87ab1 100644 --- a/gcbrickwork/__init__.py +++ b/gcbrickwork/__init__.py @@ -1,2 +1,2 @@ from gcbrickwork.PRM import PRM, PRMType, PRMVector, PRMColor, PRMFieldEntry -from gcbrickwork.JMP import JMP, JMPType, JMPFieldHeader \ No newline at end of file +from gcbrickwork.JMP import JMP, JMPType, JMPFieldHeader, JMPEntry \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c888930 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +pytest==9.0.2 \ No newline at end of file diff --git a/setup.py b/setup.py index 0e741f0..cb759c7 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setuptools.setup( name="gcbrickwork", packages=setuptools.find_packages(), - version="2.1.4", + version="3.0.0", license="MIT", author="Some Jake Guy", author_email="somejakeguy@gmail.com", diff --git a/unit_tests/__init__.py b/unit_tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/unit_tests/test_jmp.py b/unit_tests/test_jmp.py new file mode 100644 index 0000000..662e49e --- /dev/null +++ b/unit_tests/test_jmp.py @@ -0,0 +1,176 @@ +import pytest, struct +from io import BytesIO +from sys import exception + +from gcbrickwork import JMP +from gcbrickwork.Bytes_Helper import ByteHelperError +from gcbrickwork.JMP import JMPFileError + + +def _jmp_sixteen_header(field_count: int=0, entry_count: int=0, header_size: int=0, entry_size: int=0) -> BytesIO: + """Writes a quick jmp where only the first 16 bytes are specified.""" + # Calculate sizes + field_count = field_count + data_entry_count = entry_count + header_block_size = header_size + single_entry_size = entry_size + + io_data = BytesIO() + io_data.write(struct.pack(">i", data_entry_count)) # Offset 0: data_entry_count (s32) + io_data.write(struct.pack(">i", field_count)) # Offset 4: field_count (s32) + io_data.write(struct.pack(">I", header_block_size)) # Offset 8: header_block_size (u32) + io_data.write(struct.pack(">I", single_entry_size)) # Offset 12: single_entry_size (u32) + return io_data + +def _create_sample_jmp() -> BytesIO: + """Creates a valid JMP file with 2 fields and 2 entries""" + + # Define field headers + field1_hash = 0x12345678 + field1_bitmask = 0xFFFFFFFF # Will be packed/unpacked as is + field1_start_byte = 0 + field1_shift_byte = 0 + field1_type = 0 # JMPType.Int + + field2_hash = 0xABCDEF01 + field2_bitmask = 0 + field2_start_byte = 4 + field2_shift_byte = 0 + field2_type = 2 # JMPType.Flt + + field3_hash = 0xCCCCAAAA + field3_bitmask = 0xFF # Will be masked as needed + field3_start_byte = 8 + field3_shift_byte = 0 + field3_type = 0 # JMPType.Int + + field4_hash = 0xDDDDBBBB + field4_bitmask = 0x3F00 # Will be masked as needed + field4_start_byte = 8 + field4_shift_byte = 8 + field4_type = 0 # JMPType.Int + + + # Calculate sizes + field_count = 4 + data_entry_count = 2 + header_block_size = 16 + (field_count * 12) + single_entry_size = 8 + + jmp_data: BytesIO = _jmp_sixteen_header(field_count, data_entry_count, header_block_size, single_entry_size) + + # Write field headers (24 bytes total, 12 bytes each) + jmp_data.write(struct.pack(">I", field1_hash)) + jmp_data.write(struct.pack(">I", field1_bitmask)) + jmp_data.write(struct.pack(">H", field1_start_byte)) + jmp_data.write(struct.pack(">B", field1_shift_byte)) + jmp_data.write(struct.pack(">B", field1_type)) + + jmp_data.write(struct.pack(">I", field2_hash)) + jmp_data.write(struct.pack(">I", field2_bitmask)) + jmp_data.write(struct.pack(">H", field2_start_byte)) + jmp_data.write(struct.pack(">B", field2_shift_byte)) + jmp_data.write(struct.pack(">B", field2_type)) + + jmp_data.write(struct.pack(">I", field3_hash)) + jmp_data.write(struct.pack(">I", field3_bitmask)) + jmp_data.write(struct.pack(">H", field3_start_byte)) + jmp_data.write(struct.pack(">B", field3_shift_byte)) + jmp_data.write(struct.pack(">B", field3_type)) + + jmp_data.write(struct.pack(">I", field4_hash)) + jmp_data.write(struct.pack(">I", field4_bitmask)) + jmp_data.write(struct.pack(">H", field4_start_byte)) + jmp_data.write(struct.pack(">B", field4_shift_byte)) + jmp_data.write(struct.pack(">B", field4_type)) + + # Write data entries (16 bytes total, 8 bytes each) + jmp_data.write(struct.pack(">I", 5)) + jmp_data.write(struct.pack(">f", 100.0)) + jmp_data.write(struct.pack(">I", 0 | ((5 << field3_shift_byte) & field3_bitmask) | ((42 << field4_shift_byte) & field4_bitmask))) + + jmp_data.write(struct.pack(">I", 10)) + jmp_data.write(struct.pack(">f", 200.0)) + jmp_data.write(struct.pack(">I", 2660)) + + # Pad to 32-byte boundary with '@' characters + current_size = jmp_data.tell() + padding_needed = (32 - (current_size % 32)) % 32 + if padding_needed > 0: + jmp_data.write(b'@' * padding_needed) + + return jmp_data + +def test_none_jmp_data(): + """Tests JMP type creation when None type is provided""" + with pytest.raises(AttributeError): + JMP.load_jmp(None) + +def test_empty_jmp_data(): + """Tests JMP type creation when empty BytesIO is provided""" + with pytest.raises(ByteHelperError): + JMP.load_jmp(BytesIO()) + +def test_jmp_first_sixteen_bytes(): + """Tests JMP type creation when only the first 16 bytes are provided""" + with pytest.raises(JMPFileError): + JMP.load_jmp(_jmp_sixteen_header()) + +def test_full_jmp(): + """Tests the whole JMP file is read correctly""" + try: + JMP.load_jmp(_create_sample_jmp()) + except exception as ex: + raise pytest.fail("Reading JMP Sample raised an exception: {0}".format(ex)) + +def test_jmp_save(): + """Ensures JMP file can be saved as expected.""" + try: + temp_jmp: JMP = JMP.load_jmp(_create_sample_jmp()) + temp_jmp.create_new_jmp() + except exception as ex: + raise pytest.fail("Saving JMP Sample raised an exception: {0}".format(ex)) + +def test_non_jmp_header_type_get(): + """Checks if an invalid JMP Header type is used to get a key""" + temp_jmp: JMP = JMP.load_jmp(_create_sample_jmp()) + with pytest.raises(ValueError): + temp_jmp.data_entries[0][None] = [] + +def test_non_existent_jmp_header_type_get(): + """Checks for when a jmp header does not exist at all""" + temp_jmp: JMP = JMP.load_jmp(_create_sample_jmp()) + with pytest.raises(KeyError): + temp_jmp.data_entries[0].__getitem__("Ch)eery") + +def test_jmp_list_value_then_save(): + """Updates an entry to have a list valid, which is not valid and should error out.""" + temp_jmp: JMP = JMP.load_jmp(_create_sample_jmp()) + temp_jmp.data_entries[0][0x12345678] = [] + with pytest.raises(struct.error): + temp_jmp.create_new_jmp() + +def test_jmp_read_is_correct(): + temp_jmp: JMP = JMP.load_jmp(_create_sample_jmp()) + assert (temp_jmp.data_entries[0][0x12345678] == 5) + assert (temp_jmp.data_entries[0][0xABCDEF01] == 100.000000) + assert (temp_jmp.data_entries[0][0xCCCCAAAA] == 5) + assert (temp_jmp.data_entries[0][0xDDDDBBBB] == 42) + +def test_jmp_read_save_then_reread(): + """Try to read, save, then re-read the data to check for data loss.""" + try: + temp_jmp: JMP = JMP.load_jmp(_create_sample_jmp()) + temp_data: BytesIO = temp_jmp.create_new_jmp() + JMP.load_jmp(temp_data) + except exception as ex: + raise pytest.fail("Reading, saving, then re-reading the JMP Sample raised an exception: {0}".format(ex)) + +def test_jmp_read_is_correct_after_reread(): + temp_jmp: JMP = JMP.load_jmp(_create_sample_jmp()) + temp_data: BytesIO = temp_jmp.create_new_jmp() + temp_jmp = JMP.load_jmp(temp_data) + assert (temp_jmp.data_entries[0][0x12345678] == 5) + assert (temp_jmp.data_entries[0][0xABCDEF01] == 100.000000) + assert (temp_jmp.data_entries[0][0xCCCCAAAA] == 5) + assert (temp_jmp.data_entries[0][0xDDDDBBBB] == 42)