-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathkt_gz.py
More file actions
119 lines (89 loc) · 4.53 KB
/
kt_gz.py
File metadata and controls
119 lines (89 loc) · 4.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os
import struct
import zlib
from typing import BinaryIO
KT_GZ_BLOCK_SIZE = 0x10000
KT_GZ_HEADER_STRUCT = struct.Struct("<iII")
KT_GZ_EXTENSION = ".gz"
def align_0x80(offset):
return (offset + 0x7F) & ~0x7F
def compress_kt_gz(in_stream: BinaryIO, out_stream: BinaryIO, total_size, level=-1):
base_offset = out_stream.tell()
block_count = (total_size - 1) // KT_GZ_BLOCK_SIZE + 1
current_offset = align_0x80(KT_GZ_HEADER_STRUCT.size + block_count * 4)
block_sizes = bytearray()
def write_block(in_block_size):
compressed_block_data = zlib.compress(in_stream.read(in_block_size), level)
block_size = len(compressed_block_data)
out_stream.seek(base_offset + current_offset)
out_stream.write(struct.pack("<I", block_size))
out_stream.write(compressed_block_data)
block_size += 4 # include the 4-byte size in block header
block_sizes.extend(struct.pack("<I", block_size))
return align_0x80(current_offset + block_size)
for _ in range(block_count - 1):
current_offset = write_block(KT_GZ_BLOCK_SIZE)
# last block and pad 0
current_offset = write_block(total_size - KT_GZ_BLOCK_SIZE * (block_count - 1))
out_stream.write(b'\x00' * (current_offset - (out_stream.tell() - base_offset)))
# write header
out_stream.seek(base_offset)
out_stream.write(KT_GZ_HEADER_STRUCT.pack(KT_GZ_BLOCK_SIZE, block_count, total_size))
out_stream.write(block_sizes)
out_stream.seek(base_offset + current_offset)
return current_offset
def decompress_kt_gz(in_stream: BinaryIO, out_stream: BinaryIO):
base_offset = in_stream.tell()
# read header
block_size, block_count, total_size = KT_GZ_HEADER_STRUCT.unpack(in_stream.read(KT_GZ_HEADER_STRUCT.size))
if block_size == -1: # block has no size header
raise NotImplementedError # doesn't exist in Three Houses
block_sizes = [struct.unpack('<I', in_stream.read(4))[0] for _ in range(block_count)]
current_offset = align_0x80(KT_GZ_HEADER_STRUCT.size + block_count * 4)
# deflate blocks
def write_block():
in_stream.seek(base_offset + current_offset)
cur_block_data_size = struct.unpack('<I', in_stream.read(4))[0]
out_stream.write(zlib.decompress(in_stream.read(cur_block_data_size)))
return align_0x80(current_offset + cur_block_size)
for cur_block_size in block_sizes[:-1]:
current_offset = write_block()
# For some reason last block can be not compressed. I have no idea how KT determines when to do this
# Seems to happen randomly when the size is small. Only way is to check
last_block_size = block_sizes[block_count - 1]
if last_block_size == total_size - block_size * (block_count - 1):
in_stream.seek(base_offset + current_offset)
out_stream.write(in_stream.read(last_block_size)) # not compressed
current_offset += align_0x80(last_block_size)
else:
current_offset = write_block()
return current_offset
def compress_kt_gz_file(in_path, out_path, level=-1):
with open(in_path, 'rb') as in_file, open(out_path, 'wb') as out_file:
compress_kt_gz(in_file, out_file, os.path.getsize(in_path), level)
def decompress_kt_gz_file(in_path, out_path):
with open(in_path, 'rb') as in_file, open(out_path, 'wb') as out_file:
decompress_kt_gz(in_file, out_file)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="KT-style GZip Compressor/Decompressor")
parser.add_argument("mode", choices=["compress", "decompress"], help="Mode to run: compress or decompress")
parser.add_argument("input", help="Input file path")
parser.add_argument("-o", "--output", help="Output file path (optional)")
parser.add_argument("-l", "--level", type=int, default=9, help="Compression level (0-9, default: 9)")
args = parser.parse_args()
# Infer output path if not given
if not args.output:
if args.mode == "compress":
args.output = args.input + KT_GZ_EXTENSION
else: # decompress
if args.input.lower().endswith(KT_GZ_EXTENSION):
args.output = args.input[:-len(KT_GZ_EXTENSION)]
else:
args.output = args.input + ".decompressed"
if args.mode == "compress":
compress_kt_gz_file(args.input, args.output, level=args.level)
print(f"Compressed: {args.input} → {args.output}")
elif args.mode == "decompress":
decompress_kt_gz_file(args.input, args.output)
print(f"Decompressed: {args.input} → {args.output}")