-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
203 lines (178 loc) · 7 KB
/
main.py
File metadata and controls
203 lines (178 loc) · 7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import wmi
import hashlib
import requests
import os
import json
import time
import sys
# Configuration file for storing the API key
CONFIG_FILE = "config.json"
def load_config():
"""Loads configuration from config.json."""
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, "r") as f:
return json.load(f)
except Exception as e:
print(f"Error loading configuration: {e}")
sys.exit(1)
else:
print(f"Configuration file {CONFIG_FILE} not found!")
sys.exit(1)
config = load_config()
API_KEY = config.get("API_KEY")
if not API_KEY:
print("No API_KEY found in the configuration!")
sys.exit(1)
# Additional configuration settings
THRESHOLD = 1 # At least 1 malicious report qualifies as a threat
CACHE_FILE = "vt_cache.json"
SLEEP_INTERVAL = 15 # Delay (in seconds) between API queries
def load_cache():
"""Loads the cache from the JSON file, if it exists."""
if os.path.exists(CACHE_FILE):
try:
with open(CACHE_FILE, "r") as f:
return json.load(f)
except Exception as e:
print(f"Error loading cache: {e}")
return {}
def save_cache(cache):
"""Saves the cache to the JSON file."""
try:
with open(CACHE_FILE, "w") as f:
json.dump(cache, f, indent=4)
except Exception as e:
print(f"Error saving cache: {e}")
def get_processes_with_exe():
"""
Retrieves all running processes and returns a list of dictionaries,
each containing the PID, process name, and path to the executable.
"""
c = wmi.WMI()
processes = []
for process in c.Win32_Process():
if process.ExecutablePath and os.path.exists(process.ExecutablePath):
processes.append({
"pid": process.ProcessId,
"name": process.Name,
"exe": process.ExecutablePath
})
return processes
def calculate_file_hash(file_path, algorithm='sha256'):
"""Calculates the hash (e.g., SHA-256) of the specified file."""
hash_func = hashlib.new(algorithm)
try:
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hash_func.update(chunk)
return hash_func.hexdigest()
except Exception as e:
print(f"Error reading file {file_path}: {e}")
return None
def check_file_with_virustotal(file_hash, api_key):
"""
Queries the VirusTotal API using the file hash and returns the number
of malicious reports and the total number of reports.
If a 401 error occurs, the script terminates.
"""
url = "https://www.virustotal.com/api/v3/files/" + file_hash
headers = {"x-apikey": api_key}
try:
response = requests.get(url, headers=headers)
except Exception as e:
print(f"Error querying VirusTotal: {e}")
return None, None
if response.status_code == 200:
data = response.json()
stats = data["data"]["attributes"]["last_analysis_stats"]
malicious = stats.get("malicious", 0)
total = sum(stats.values())
return malicious, total
elif response.status_code == 401:
print("VirusTotal API Error: Status Code 401. Please check your API key!")
sys.exit(1)
else:
print(f"VirusTotal API Error: Status Code {response.status_code}")
return None, None
def format_seconds(seconds):
"""Formats seconds as hh:mm:ss."""
return time.strftime("%H:%M:%S", time.gmtime(seconds))
def scan_processes_for_threats(api_key, cache, threshold=THRESHOLD):
"""
Retrieves all processes, computes their file hashes, and builds a list.
For each process whose hash is not yet in the cache, the VirusTotal API is queried
(with a SLEEP_INTERVAL pause per API call). The progress display shows:
- Number of API calls completed (out of the total required)
- Estimated remaining time (based on remaining API calls)
- Number of processes classified as threats versus clean
Finally, a detailed list of the processes flagged as malicious is printed.
"""
processes = get_processes_with_exe()
process_list = []
# Pre-calculate the hash for each process and add to the list
for proc in processes:
file_hash = calculate_file_hash(proc["exe"])
if file_hash:
proc["file_hash"] = file_hash
process_list.append(proc)
total_processes = len(process_list)
if total_processes == 0:
print("No processes with executables found.")
return
# Determine how many API queries are needed (i.e., for processes whose hash is not cached)
total_api_calls = sum(1 for proc in process_list if proc["file_hash"] not in cache)
api_remaining = total_api_calls
api_calls_done = 0
print(f"{total_api_calls} API queries will be performed (one every {SLEEP_INTERVAL} seconds).")
threat_count = 0
clean_count = 0
malicious_processes = []
for proc in process_list:
file_hash = proc["file_hash"]
# Use cached data if available
if file_hash in cache:
result = cache[file_hash]
mal_votes = result.get("malicious", 0)
tot_votes = result.get("total", 0)
else:
mal_votes, tot_votes = check_file_with_virustotal(file_hash, api_key)
if mal_votes is not None:
cache[file_hash] = {"malicious": mal_votes, "total": tot_votes}
# Immediately save the cache so progress is retained
save_cache(cache)
time.sleep(SLEEP_INTERVAL)
api_calls_done += 1
api_remaining = total_api_calls - api_calls_done
if mal_votes is None:
# Skip this process if a valid response wasn't received
continue
if mal_votes >= threshold:
threat_count += 1
malicious_processes.append(proc)
else:
clean_count += 1
# Calculate estimated remaining time based on remaining API calls
estimated_seconds = api_remaining * SLEEP_INTERVAL
remaining_str = format_seconds(estimated_seconds)
# Update progress display (single-line update)
progress_line = (f"[{api_calls_done}/{total_api_calls}] Estimated time: {remaining_str} | "
f"Threats found: {threat_count} | Clean: {clean_count}")
sys.stdout.write("\r" + progress_line)
sys.stdout.flush()
print("\n\nScan complete.")
print(f"Total processes scanned: {total_processes}")
print(f"Threats found: {threat_count}")
print(f"Clean processes: {clean_count}")
if threat_count > 0:
print("\nList of potentially malicious processes:")
for proc in malicious_processes:
print(f"PID {proc['pid']}: {proc['name']} - {proc['exe']}")
def main():
print("Starting process scanner PyPScan...\n")
cache = load_cache()
scan_processes_for_threats(API_KEY, cache)
save_cache(cache)
print("\nCache updated.")
if __name__ == "__main__":
main()