-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathscanner_go.py
More file actions
154 lines (130 loc) · 4.78 KB
/
scanner_go.py
File metadata and controls
154 lines (130 loc) · 4.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""
scanner_go.py - Go protobuf 扫描器
Go 编译的程序中, protobuf 描述符以 gzip 压缩的 FileDescriptorProto 形式嵌入
"""
import gzip
import io
from binary_reader import BinaryReader
from proto_generator import is_well_known
from google.protobuf.descriptor_pb2 import FileDescriptorProto
GZIP_MAGIC = b'\x1f\x8b'
def _try_decompress_and_parse(data: bytes) -> FileDescriptorProto | None:
"""尝试 gzip 解压并解析为 FileDescriptorProto (安全处理尾部数据)"""
import zlib
try:
# 手动解析 gzip header, 使用 zlib 解压 (避免多 member 读取问题)
# gzip header: 1f 8b 08 [flags] [4-byte mtime] [xfl] [os]
if len(data) < 10 or data[:2] != b'\x1f\x8b':
return None
flags = data[3]
pos = 10
if flags & 0x04: # FEXTRA
if pos + 2 > len(data): return None
xlen = data[pos] | (data[pos + 1] << 8)
pos += 2 + xlen
if flags & 0x08: # FNAME
end = data.find(b'\x00', pos)
if end == -1: return None
pos = end + 1
if flags & 0x10: # FCOMMENT
end = data.find(b'\x00', pos)
if end == -1: return None
pos = end + 1
if flags & 0x02: # FHCRC
pos += 2
# 解压 raw deflate stream
deobj = zlib.decompressobj(-zlib.MAX_WBITS)
decompressed = deobj.decompress(data[pos:])
except Exception:
return None
if not decompressed:
return None
fd = FileDescriptorProto()
try:
consumed = fd.ParseFromString(decompressed)
if consumed == 0:
return None
except Exception:
return None
# 基本验证
if not fd.name:
return None
if not fd.name.endswith('.proto'):
return None
if not fd.message_type and not fd.enum_type and not fd.service:
return None
for msg in fd.message_type:
if not msg.name or len(msg.name) > 256:
return None
return fd
def _find_gzip_streams(reader: BinaryReader) -> list:
"""在二进制中查找所有 gzip 流的起始位置"""
positions = reader.search(GZIP_MAGIC)
# 进一步验证: gzip header 的第三字节应该是 0x08 (deflate compression)
valid = []
for pos in positions:
if pos + 10 < reader.size:
method = reader.data[pos + 2]
if method == 0x08: # deflate
valid.append(pos)
return valid
def scan_go_protobuf(reader: BinaryReader) -> list:
"""
扫描 Go protobuf 的 gzip 压缩 FileDescriptorProto
返回: [FileDescriptorProto, ...]
"""
results = []
seen_names = set()
gzip_positions = _find_gzip_streams(reader)
for pos in gzip_positions:
# 尝试不同大小的数据块
for try_len in [256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, 131072]:
end = min(pos + try_len, reader.size)
chunk = reader.data[pos:end]
# gzip.decompress 会忽略末尾多余数据
fd = _try_decompress_and_parse(chunk)
if fd:
if fd.name not in seen_names and not is_well_known(fd.name):
results.append(fd)
seen_names.add(fd.name)
break
return results
def scan_go_protobuf_raw(reader: BinaryReader) -> list:
"""
备选方案: 搜索 Go 二进制中未压缩的 raw descriptor
某些 Go 版本可能不使用 gzip 压缩
"""
results = []
seen_names = set()
# 搜索 Go protobuf 注册相关的字符串模式
# Go protobuf v2 使用 protoimpl.DescBuilder
# 变量名通常为 file_xxx_proto_rawDescData
# 也尝试搜索未压缩的 FileDescriptorProto
import re
pattern = rb'\x0a[\x01-\x7f][a-zA-Z0-9_/\-\.]{2,100}\.proto'
for m in re.finditer(pattern, reader.data):
start = m.start()
name_len = reader.data[start + 1]
name_end = start + 2 + name_len
if name_end > reader.size:
continue
try:
name = reader.data[start + 2:name_end].decode('utf-8')
except UnicodeDecodeError:
continue
if not name.endswith('.proto') or is_well_known(name) or name in seen_names:
continue
# 尝试解析
for try_len in [256, 512, 1024, 2048, 4096, 8192, 16384, 32768]:
end = min(start + try_len, reader.size)
chunk = reader.data[start:end]
fd = FileDescriptorProto()
try:
consumed = fd.ParseFromString(chunk)
if consumed > 0 and fd.name == name and (fd.message_type or fd.enum_type or fd.service):
results.append(fd)
seen_names.add(name)
break
except Exception:
continue
return results