-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathscanner_cpp.py
More file actions
290 lines (241 loc) · 8.82 KB
/
scanner_cpp.py
File metadata and controls
290 lines (241 loc) · 8.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
"""
scanner_cpp.py - C++ protobuf 扫描器
搜索嵌入在二进制中的序列化 FileDescriptorProto blob
适用于使用 Google protobuf C++ 库编译的程序
"""
from binary_reader import BinaryReader
from proto_generator import is_well_known
from google.protobuf.descriptor_pb2 import FileDescriptorProto
def _try_parse_file_descriptor(data: bytes) -> FileDescriptorProto | None:
"""尝试将数据解析为 FileDescriptorProto"""
fd = FileDescriptorProto()
try:
consumed = fd.ParseFromString(data)
if consumed == 0:
return None
except Exception:
return None
# 验证: 必须有 name 或 package, 且至少有 message_type 或 enum_type
if not fd.name and not fd.package:
return None
if not fd.message_type and not fd.enum_type and not fd.service:
return None
# 验证 name 看起来像 .proto 文件
if fd.name and not fd.name.endswith('.proto'):
return None
# 验证 message/enum 合理性
for msg in fd.message_type:
if not msg.name or len(msg.name) > 256:
return None
for enum in fd.enum_type:
if not enum.name or len(enum.name) > 256:
return None
return fd
def _try_parse_fd_autosize(data: bytes, max_size: int = 65536) -> FileDescriptorProto | None:
"""
自动确定大小并解析 FileDescriptorProto
使用 protobuf wire format 遍历来精确确定消息边界
"""
# 策略1: 使用 wire format walker 确定精确边界
end_pos = _walk_protobuf_message(data, max_size)
if end_pos and end_pos > 10:
fd = _try_parse_file_descriptor(data[:end_pos])
if fd:
return fd
# 策略2: 搜索常见的尾部模式 (proto3/proto2 syntax 标记)
# FileDescriptorProto.syntax (field 12) 编码为 62 XX "protoN"
for marker in [b'proto3', b'proto2']:
idx = 0
while True:
idx = data.find(marker, idx)
if idx == -1 or idx > max_size:
break
end = idx + len(marker)
fd = _try_parse_file_descriptor(data[:end])
if fd:
return fd
idx += 1
# 策略3: 逐字节精确搜索 (在粗粒度范围内)
# 先用粗粒度找到大致范围
coarse_range = None
for size in [64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536]:
if size > max_size:
break
fd = _try_parse_file_descriptor(data[:size])
if fd:
return fd
# 如果这个大小失败了,但下一个也失败,说明边界可能在这个范围内
# 我们记录候选范围
return None
def _walk_protobuf_message(data: bytes, max_size: int) -> int | None:
"""
遍历 protobuf wire format 来确定消息的精确结束位置
返回消息结束的字节偏移,如果解析失败返回 None
"""
pos = 0
limit = min(len(data), max_size)
prev_valid_pos = 0
while pos < limit:
# 读取 field tag
tag, consumed = _decode_varint(data, pos)
if consumed == 0:
break
field_number = tag >> 3
wire_type = tag & 0x07
# 验证 field number 合理性
if field_number == 0 or field_number > 536870911: # max field number
break
pos += consumed
if wire_type == 0: # VARINT
_, consumed = _decode_varint(data, pos)
if consumed == 0:
break
pos += consumed
elif wire_type == 1: # 64-bit
pos += 8
elif wire_type == 2: # Length-delimited
length, consumed = _decode_varint(data, pos)
if consumed == 0 or length > limit - pos:
break
pos += consumed + length
elif wire_type == 5: # 32-bit
pos += 4
elif wire_type == 3: # Start group (deprecated)
# 跳过 group 内容直到 end group
break
elif wire_type == 4: # End group
break
else:
break # 未知 wire type
if pos > limit:
break
prev_valid_pos = pos
return prev_valid_pos if prev_valid_pos > 0 else None
def _find_proto_filename_refs(reader: BinaryReader) -> list:
"""在二进制中搜索 .proto 文件名字符串引用"""
import re
results = []
# 搜索以 .proto 结尾的字符串
pattern = rb'[a-zA-Z0-9_/\-\.]{2,128}\.proto'
for m in re.finditer(pattern, reader.data):
filename = m.group().decode('ascii', errors='replace')
if is_well_known(filename):
continue
# 确保前一个字节是 null 或 protobuf tag byte (表示这是字符串开头)
start = m.start()
if start > 0:
prev = reader.data[start - 1]
# 前面是 null 字节,或者是 protobuf 长度前缀
if prev == 0 or prev == len(filename):
results.append((start, filename))
return results
def _scan_around_proto_string(reader: BinaryReader, str_offset: int, filename: str) -> FileDescriptorProto | None:
"""
在 .proto 文件名附近搜索序列化的 FileDescriptorProto
C++ protobuf 的序列化 descriptor blob 包含 .proto 文件名
"""
# FileDescriptorProto 的 name 字段 (field 1, wire type 2 = LEN) 编码为:
# tag=0x0A, length=N, "filename.proto"
name_bytes = filename.encode('utf-8')
encoded_name = b'\x0a' + _encode_varint(len(name_bytes)) + name_bytes
# 在整个二进制中搜索这个编码
positions = reader.search(encoded_name)
for pos in positions:
# 使用自动大小解析
max_avail = min(65536, reader.size - pos)
fd = _try_parse_fd_autosize(reader.data[pos:pos + max_avail], max_avail)
if fd and fd.name == filename:
return fd
return None
def _encode_varint(value: int) -> bytes:
"""编码 protobuf varint"""
result = []
while value > 0x7f:
result.append((value & 0x7f) | 0x80)
value >>= 7
result.append(value & 0x7f)
return bytes(result)
def _decode_varint(data: bytes, offset: int) -> tuple:
"""解码 protobuf varint, 返回 (value, bytes_consumed)"""
result = 0
shift = 0
for i in range(10):
if offset + i >= len(data):
return (0, 0)
b = data[offset + i]
result |= (b & 0x7f) << shift
if (b & 0x80) == 0:
return (result, i + 1)
shift += 7
return (0, 0)
def _brute_scan_file_descriptors(reader: BinaryReader, known_filenames: set = None) -> list:
"""
暴力扫描: 在二进制中搜索所有可能的 FileDescriptorProto 编码
FileDescriptorProto 的 field 1 (name) 编码总是以 0x0A 开头
"""
results = []
seen_names = set()
# 搜索所有 tag=0x0A 的位置 (field 1, wire type 2 = LEN)
pos = 0
while pos < reader.size - 10:
pos = reader.data.find(b'\x0a', pos)
if pos == -1:
break
# 解码长度
length, varint_len = _decode_varint(reader.data, pos + 1)
if varint_len == 0 or length == 0 or length > 256:
pos += 1
continue
# 检查字符串是否以 .proto 结尾
str_start = pos + 1 + varint_len
str_end = str_start + length
if str_end > reader.size:
pos += 1
continue
try:
name = reader.data[str_start:str_end].decode('utf-8')
except UnicodeDecodeError:
pos += 1
continue
if not name.endswith('.proto'):
pos += 1
continue
# 跳过 well-known types
if is_well_known(name):
pos += 1
continue
# 跳过已解析的
if name in seen_names:
pos += 1
continue
# 尝试从这个位置解析 FileDescriptorProto
max_avail = min(65536, reader.size - pos)
fd = _try_parse_fd_autosize(reader.data[pos:pos + max_avail], max_avail)
if fd and fd.name == name:
results.append(fd)
seen_names.add(name)
pos += 1
return results
def scan_cpp_protobuf(reader: BinaryReader) -> list:
"""
扫描 C++ protobuf 嵌入的序列化 FileDescriptorProto
返回: [FileDescriptorProto, ...]
"""
results = []
seen_names = set()
# 方法1: 搜索 .proto 文件名并在附近解析
proto_refs = _find_proto_filename_refs(reader)
for offset, filename in proto_refs:
if filename in seen_names:
continue
fd = _scan_around_proto_string(reader, offset, filename)
if fd:
results.append(fd)
seen_names.add(filename)
# 方法2: 暴力扫描
brute_results = _brute_scan_file_descriptors(reader, seen_names)
for fd in brute_results:
if fd.name not in seen_names:
results.append(fd)
seen_names.add(fd.name)
return results