-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathscanner_raw.py
More file actions
237 lines (205 loc) · 7.17 KB
/
scanner_raw.py
File metadata and controls
237 lines (205 loc) · 7.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""
scanner_raw.py - 通用原始 protobuf 数据扫描器
支持 Rust (prost/rust-protobuf) 和任何嵌入了序列化 protobuf 描述符的程序
也可用作最后的兜底扫描器
"""
import gzip
import io
from binary_reader import BinaryReader
from proto_generator import is_well_known
from google.protobuf.descriptor_pb2 import FileDescriptorProto, FileDescriptorSet
def _decode_varint(data: bytes, pos: int) -> tuple:
"""解码 protobuf varint, 返回 (value, consumed_bytes)"""
result = 0
shift = 0
consumed = 0
while pos + consumed < len(data):
b = data[pos + consumed]
consumed += 1
result |= (b & 0x7F) << shift
if (b & 0x80) == 0:
return result, consumed
shift += 7
if shift > 63:
return 0, 0
return 0, 0
def _walk_protobuf_message(data: bytes, max_size: int) -> int | None:
"""
遍历 protobuf wire format 来确定消息的精确结束位置
"""
pos = 0
limit = min(len(data), max_size)
prev_valid_pos = 0
while pos < limit:
tag, consumed = _decode_varint(data, pos)
if consumed == 0:
break
field_number = tag >> 3
wire_type = tag & 0x07
if field_number == 0 or field_number > 536870911:
break
pos += consumed
if wire_type == 0: # VARINT
_, consumed = _decode_varint(data, pos)
if consumed == 0:
break
pos += consumed
elif wire_type == 1: # 64-bit
pos += 8
elif wire_type == 2: # Length-delimited
length, consumed = _decode_varint(data, pos)
if consumed == 0 or length > limit - pos:
break
pos += consumed + length
elif wire_type == 5: # 32-bit
pos += 4
elif wire_type in (3, 4): # Group
break
else:
break
if pos > limit:
break
prev_valid_pos = pos
return prev_valid_pos if prev_valid_pos > 0 else None
def _try_parse_fds(data: bytes) -> list:
"""尝试将数据解析为 FileDescriptorSet"""
fds = FileDescriptorSet()
try:
consumed = fds.ParseFromString(data)
if consumed > 0 and fds.file:
return list(fds.file)
except Exception:
pass
return []
def _try_parse_fd(data: bytes) -> FileDescriptorProto | None:
"""尝试将数据解析为 FileDescriptorProto"""
fd = FileDescriptorProto()
try:
consumed = fd.ParseFromString(data)
if consumed == 0:
return None
except Exception:
return None
if not fd.name:
return None
if not fd.name.endswith('.proto'):
return None
if not fd.message_type and not fd.enum_type and not fd.service:
return None
for msg in fd.message_type:
if not msg.name or len(msg.name) > 256:
return None
return fd
def scan_raw_descriptors(reader: BinaryReader) -> list:
"""
通用扫描: 搜索所有可能的 FileDescriptorProto 和 FileDescriptorSet
返回: [FileDescriptorProto, ...]
"""
results = []
seen_names = set()
# 策略1: 搜索 FileDescriptorSet (tag 0x0A for field 1 = repeated FileDescriptorProto)
# FileDescriptorSet 以嵌套的 length-delimited 消息开头
# 策略2: 搜索所有 .proto 文件名编码
# FileDescriptorProto.name = field 1 (tag 0x0A + varint length + string)
pos = 0
data = reader.data
data_len = len(data)
while pos < data_len - 10:
# 搜索 tag byte 0x0A (field 1, wire type 2)
pos = data.find(b'\x0a', pos)
if pos == -1:
break
# 快速检查: 读取长度
next_byte = data[pos + 1] if pos + 1 < data_len else 0
if next_byte == 0 or next_byte > 127:
# varint > 127 的文件名太长或长度为0
pos += 1
continue
str_start = pos + 2
str_end = str_start + next_byte
if str_end > data_len:
pos += 1
continue
# 检查是否为 .proto 文件名
try:
candidate = data[str_start:str_end]
if not candidate.endswith(b'.proto'):
pos += 1
continue
name = candidate.decode('utf-8')
except (UnicodeDecodeError, ValueError):
pos += 1
continue
if is_well_known(name) or name in seen_names:
pos += 1
continue
# 验证文件名合法性
if not all(c.isalnum() or c in '_/-.+' for c in name):
pos += 1
continue
# 尝试从此位置解析: 使用 wire format walker 确定精确边界
parsed = False
max_avail = min(65536, data_len - pos)
chunk_full = data[pos:pos + max_avail]
# 策略A: wire format walk
end_pos = _walk_protobuf_message(chunk_full, max_avail)
if end_pos and end_pos > 10:
chunk = chunk_full[:end_pos]
fd = _try_parse_fd(chunk)
if fd and fd.name == name:
results.append(fd)
seen_names.add(name)
parsed = True
# 策略B: 搜索 proto3/proto2 尾部标记
if not parsed:
for marker in [b'proto3', b'proto2']:
idx = chunk_full.find(marker)
if idx != -1:
end = idx + len(marker)
fd = _try_parse_fd(chunk_full[:end])
if fd and fd.name == name:
results.append(fd)
seen_names.add(name)
parsed = True
break
if parsed:
pos += 1
continue
pos += 1
# 策略3: 搜索 gzip 压缩的数据 (某些 Rust 实现也使用 gzip)
gzip_magic = b'\x1f\x8b\x08'
gp = 0
while gp < data_len - 10:
gp = data.find(gzip_magic, gp)
if gp == -1:
break
for try_len in [256, 512, 1024, 4096, 16384, 65536]:
end = min(gp + try_len, data_len)
chunk = data[gp:end]
try:
# 手动解析 gzip, 用 zlib 解压 (避免多 member 读取问题)
import zlib
if len(chunk) < 10: break
flags = chunk[3]
hpos = 10
if flags & 0x04:
xlen = chunk[hpos] | (chunk[hpos+1] << 8)
hpos += 2 + xlen
if flags & 0x08:
end_h = chunk.find(b'\x00', hpos)
if end_h != -1: hpos = end_h + 1
if flags & 0x10:
end_h = chunk.find(b'\x00', hpos)
if end_h != -1: hpos = end_h + 1
if flags & 0x02: hpos += 2
deobj = zlib.decompressobj(-zlib.MAX_WBITS)
decompressed = deobj.decompress(chunk[hpos:])
fd = _try_parse_fd(decompressed)
if fd and fd.name not in seen_names and not is_well_known(fd.name):
results.append(fd)
seen_names.add(fd.name)
break
except Exception:
continue
gp += 1
return results