diff --git a/CyberScan.py b/CyberScan.py index 9bb7609..b6be782 100644 --- a/CyberScan.py +++ b/CyberScan.py @@ -1,5 +1,5 @@ -#!/usr/bin/python -# -*- coding utf-8 -*- +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License. @@ -15,526 +15,441 @@ # MA 02110-1301, USA. # # Author: Mohamed BEN ALI +# Converted to Python 3 by ChatGPT +from concurrent.futures import ThreadPoolExecutor import os import sys import platform import argparse import logging logging.getLogger("scapy.runtime").setLevel(logging.ERROR) +from threading import Lock + import time import socket -import pygeoip +import geoip2.database -from scapy import * from scapy.all import * -from libs.colorama import * -from libs import FileUtils - - +# colorama should be installed via pip: pip install colorama +from colorama import * -if platform.system() == 'Windows': - from libs.colorama.win32 import * +init(autoreset=True) # Initialize colorama __version__ = '1.1.1' -__description__ = '''\ +__description__ = f'''\ ___________________________________________ - CyberScan | v.''' + __version__ + ''' + CyberScan | v.{__version__} Author: BEN ALI Mohamed ___________________________________________ ''' + def header(): - MAYOR_VERSION = 1 - MINOR_VERSION = 1 - REVISION = 1 - VERSION = { - "MAYOR_VERSION": MAYOR_VERSION, - "MINOR_VERSION": MINOR_VERSION, - "REVISION": REVISION - } + """Prints the application banner.""" + # Determine the path to banner.txt relative to this script + script_dir = os.path.dirname(os.path.abspath(__file__)) + banner_path = os.path.join(script_dir, 'banner.txt') + + try: + with open(banner_path, 'r', encoding='utf-8') as f: + # Split version for banner formatting + mayor, minor, revision = __version__.split('.') + version_dict = {"MAYOR_VERSION": mayor, "MINOR_VERSION": minor, "REVISION": revision} + banner = f.read().format(**version_dict) + print(Style.BRIGHT + Fore.RED + banner) + except FileNotFoundError: + print(Style.BRIGHT + Fore.RED + f"CyberScan | v.{__version__}") + except Exception as e: + print(f"Error loading banner: {e}") - PROGRAM_BANNER = open(FileUtils.buildPath('banner.txt')).read().format(**VERSION) - message = Style.BRIGHT + Fore.RED + PROGRAM_BANNER + Style.RESET_ALL - write(message) - -def usage(): - print (''' \033[92m CyberScan v.1.1.1 http://github/medbenali/CyberScan - It is the end user's responsibility to obey all applicable laws. - It is just for server testing script. Your ip is visible. \n - ___________________________________________ - - CyberScan | v.1.1.1 - Author: BEN ALI Mohamed - ___________________________________________ - - - \n \033[0m''') - -def write(string): - if platform.system() == 'Windows': - sys.stdout.write(string) - sys.stdout.flush() - sys.stdout.write('\n') - sys.stdout.flush() - else: - sys.stdout.write(string + '\n') - sys.stdout.flush() - sys.stdout.flush() def geo_ip(host): - try: + # Note: This requires the GeoLite2-City.mmdb file. + # Download from: https://dev.maxmind.com/geoip/geolite2-free-geolocation-data + with geoip2.database.Reader('GeoLite2-City.mmdb') as reader: + response = reader.city(host) + + print(f'[*] --- GeoIP Data for {host} ---') + print(f"[*] IP Address: {host}") + print(f"[*] Country: {response.country.name} ({response.country.iso_code})") + if response.subdivisions: + print(f"[*] Region: {response.subdivisions.most_specific.name}") + print(f"[*] City: {response.city.name}") + print(f"[*] Postal Code: {response.postal.code}") + print(f"[*] Coordinates: (Lat: {response.location.latitude}, Lon: {response.location.longitude})") + print(f"[*] Time Zone: {response.location.time_zone}") + print(f"[*] Continent: {response.continent.name} ({response.continent.code})") + + except FileNotFoundError: + print("[!] Error: 'GeoLite2-City.mmdb' not found.") + print("[!] Please download it from MaxMind and place it in the same directory.") + except geoip2.errors.AddressNotFoundError: + print(f"[*] Could not find GeoIP data for the address: {host}") + except Exception as e: + print(f"[*] Could not get GeoIP data for {host}. Error: {e}") + - rawdata = pygeoip.GeoIP('GeoLiteCity.dat') - data = rawdata.record_by_name(host) - country = data['country_name'] - city = data['city'] - longi = data['longitude'] - lat = data['latitude'] - time_zone = data['time_zone'] - area_code = data['area_code'] - country_code = data['country_code'] - region_code = data['region_code'] - dma_code = data['dma_code'] - metro_code = data['metro_code'] - country_code3 = data['country_code3'] - zip_code = data['postal_code'] - continent = data['continent'] - - print '[*] IP Address: ',host - print '[*] City: ',city - print '[*] Region Code: ',region_code - print '[*] Area Code: ',area_code - print '[*] Time Zone: ',time_zone - print '[*] Dma Code: ',dma_code - print '[*] Metro Code: ',metro_code - print '[*] Latitude: ',lat - print '[*] Longitude: ',longi - print '[*] Zip Code: ',zip_code - print '[*] Country Name: ',country - print '[*] Country Code: ',country_code - print '[*] Country Code3: ',country_code3 - print '[*] Continent: ',continent - - except : - print "[*] Please verify your ip !" - - - def arp_ping(host): - print '[*] Starting CyberScan Ping ARP for %s' %(host) - ans, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=host), timeout=2) - ans.summary(lambda (s,r): r.sprintf("%Ether.src% %ARP.psrc%")) + print('[*] Starting CyberScan Ping ARP for {}'.format(host)) + ans, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=host), timeout=2, verbose=0) + # scapy will call the lambda with two args (sent, received) + ans.summary(lambda s, r: r.sprintf("%Ether.src% %ARP.psrc%")) + def icmp_ping(host): - print '[*] Starting CyberScan Ping ICMP for %s' %(host) - ans, unans =srp(IP(dst=host)/ICMP()) - ans.summary(lambda (s,r): r.sprint("%IP.src% is alive")) - -def tcp_ping(host,dport): - ans, unans = sr(IP(dst=host)/TCP(dport,flags="S")) - ans.summary(lambda (s,r): r.sprintf("%IP.src% is alive")) - -def udp_ping(host,port=0): - print '[*] Starting CyberScan Ping UDP for %s' %(host) - ans, unans = sr(IP(dst=host)/UDP(dport=port)) - ans.summary(lambda(s, r): r.sprintf("%IP.src% is alive")) - -def superscan(host,start_port,end_port): - print '[*] CyberScan Port Scanner' - open_ports = [] - common_ports = { - '21': 'FTP', - '22': 'SSH', - '23': 'TELNET', - '25': 'SMTP', - '53': 'DNS', - '69': 'TFTP', - '80': 'HTTP', - '109': 'POP2', - '110': 'POP3', - '123': 'NTP', - '137': 'NETBIOS-NS', - '138': 'NETBIOS-DGM', - '139': 'NETBIOS-SSN', - '143': 'IMAP', - '156': 'SQL-SERVER', - '389': 'LDAP', - '443': 'HTTPS', - '546': 'DHCP-CLIENT', - '547': 'DHCP-SERVER', - '993': 'IMAP-SSL', - '995': 'POP3-SSL', - '2082': 'CPANEL', - '2083': 'CPANEL', - '2086': 'WHM/CPANEL', - '2087': 'WHM/CPANEL', - '3306' :'MYSQL', - '8443': 'PLESK', - '10000': 'VIRTUALMIN/WEBIN' - - - } - - starting_time=time.time() - if(flag): - print "[*] Scanning For Most Common Ports On %s" % (host) - else: - print "[*] Scanning %s From Port %s To %s: " % (host,start_port,end_port) - print "[*] Starting CyberScan 1.01 at %s" %(time.strftime("%Y-%m-%d %H:%M %Z")) - def check_port(host,port,result= 1): - try: - sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) - sock.settimeout(0.5) - r = sock.connect_ex((host,port)) - if r ==0: - result = r - sock.close() - except Exception, e: - pass - return result - - def get_service(port): - port = str(port) - if port in common_ports: - return common_ports[port] - else: - return 0 - try: - print "[*] Scan In Progress ..." - print "[*] Connecting To Port : ", - - if flag: - for p in sorted(common_ports): - sys.stdout.flush() - p = int(p) - print p, - response = check_port(host,p) - - if response ==0: - open_ports.append(p) - - sys.stdout.write('\b' * len(str(p))) - - - else: - for p in range(start_port,end_port+1): - sys.stdout.flush() - print p, - response = check_port(host,p) - - if response ==0: - open_ports.append(p) - if not p == end_port: - sys.stdout.write('\b' * len(str(p))) - - print "\n[*] Scanning Completed at %s" %(time.strftime("%Y-%m-%d %H:%M %Z")) - ending_time = time.time() - total_time = ending_time - starting_time - if total_time <=60: - print "[*] CyberScan done: 1IP address (1host up) scanned in %.2f seconds" %(total_time) - - else: - total_time = total_time / 60 - print "[*] CyberScan done: 1IP address (1host up) scanned in %.2f Minutes" %(total_time) - - - if open_ports: - print "[*] Open Ports: " - for i in sorted(open_ports): - service = get_service(i) - if not service: - service= "Unknown service" - print "\t%s %s: Open" % (i,service) - - else: - print "[*] Sorry, No Open Ports Found.!!" - - - except KeyboardInterrupt: - print "\n[*] You Pressed Ctrl+C. Exiting" - sys.exit(1) - - -def pcap_analyser_eth(file): - pkts = rdpcap(file) - i=0 - for pkt in pkts: - i += 1 - print "-" * 40 - print "[*] Packet : " + str(i) - print "[+] ### [ Ethernet ] ###" - print "[*] Mac Destination : " + pkt.dst - print "[*] Mac Source : " + pkt.src - print "[*] Ethernet Type : " + str(pkt.type) - + print('[*] Starting CyberScan Ping ICMP for {}'.format(host)) + ans, unans = sr(IP(dst=host) / ICMP()) + ans.summary(lambda s, r: r.sprintf("%IP.src% is alive")) + + +def tcp_ping(host, dport): + print('[*] Starting CyberScan Ping TCP for {} port {}'.format(host, dport)) + ans, unans = sr(IP(dst=host) / TCP(dport=dport, flags="S")) + ans.summary(lambda s, r: r.sprintf("%IP.src% is alive")) + + +def udp_ping(host, port=0): + print('[*] Starting CyberScan Ping UDP for {}'.format(host)) + ans, unans = sr(IP(dst=host) / UDP(dport=port)) + ans.summary(lambda s, r: r.sprintf("%IP.src% is alive")) + + +def superscan(host, start_port=None, end_port=None): + print('[*] CyberScan Port Scanner') + open_ports = [] + common_ports = { + '21': 'FTP', + '22': 'SSH', + '23': 'TELNET', + '25': 'SMTP', + '53': 'DNS', + '69': 'TFTP', + '80': 'HTTP', + '109': 'POP2', + '110': 'POP3', + '123': 'NTP', + '137': 'NETBIOS-NS', + '138': 'NETBIOS-DGM', + '139': 'NETBIOS-SSN', + '143': 'IMAP', + '156': 'SQL-SERVER', + '389': 'LDAP', + '443': 'HTTPS', + '546': 'DHCP-CLIENT', + '547': 'DHCP-SERVER', + '993': 'IMAP-SSL', + '995': 'POP3-SSL', + '2082': 'CPANEL', + '2083': 'CPANEL', + '2086': 'WHM/CPANEL', + '2087': 'WHM/CPANEL', + '3306': 'MYSQL', + '8443': 'PLESK', + '10000': 'VIRTUALMIN/WEBIN' + } + + # Determine which ports to scan + if start_port is None and end_port is None: + ports_to_scan = [int(p) for p in common_ports.keys()] + scan_type_msg = f"[*] Scanning For Most Common Ports On {host}" + else: + ports_to_scan = range(start_port, end_port + 1) + scan_type_msg = f"[*] Scanning {host} From Port {start_port} To {end_port}" + + starting_time = time.time() + print(scan_type_msg) + print("[*] Starting CyberScan 1.01 at {}".format(time.strftime("%Y-%m-%d %H:%M %Z"))) + + print_lock = Lock() + + def check_port(port): + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(0.5) + r = sock.connect_ex((host, port)) + if r == 0: + with print_lock: + open_ports.append(port) + sock.close() + except socket.error as e: + with print_lock: + print(f"\nError on port {port}: {e}") + + def get_service(port): + port_s = str(port) + return common_ports.get(port_s, "Unknown service") + + try: + print("[*] Scan In Progress ...") + + # Use a thread pool to scan ports concurrently + with ThreadPoolExecutor(max_workers=100) as executor: + executor.map(check_port, ports_to_scan) + + print("\n[*] Scanning Completed at {}".format(time.strftime("%Y-%m-%d %H:%M %Z"))) + ending_time = time.time() + total_time = ending_time - starting_time + if total_time <= 60: + print("[*] CyberScan done: 1IP address (1host up) scanned in {:.2f} seconds".format(total_time)) + else: + total_time = total_time / 60 + print("[*] CyberScan done: 1IP address (1host up) scanned in {:.2f} Minutes".format(total_time)) + + if open_ports: + print("[*] Open Ports: ") + for i in sorted(open_ports): + service = get_service(i) + print(f"\t{i:<5} {service}: Open") + else: + print("[*] Sorry, No Open Ports Found.!!") + + except KeyboardInterrupt: + print("\n[*] You Pressed Ctrl+C. Exiting") + sys.exit(1) + + +def pcap_analyser(file_path, layer_name): + """Analyzes a pcap file and prints details for a specific protocol layer.""" + try: + pkts = rdpcap(file_path) + except Scapy_Exception as e: + print(f"Error reading pcap file: {e}") + return + + layer_map = {"eth": Ether, "ip": IP, "tcp": TCP, "udp": UDP, "icmp": ICMP} + layer_to_check = layer_map.get(layer_name.lower()) + + if not layer_to_check: + print(f"Error: Invalid layer '{layer_name}'. Choose from {list(layer_map.keys())}.") + return + + print(f"[*] Analyzing for {layer_to_check.name} layer in {file_path}") + count = 0 + for i, pkt in enumerate(pkts): + if pkt.haslayer(layer_to_check): + count += 1 + print("-" * 40) + print(f"[*] Packet : {i+1} (Match #{count})") + print(f"[+] ###[ {layer_to_check.name} ] ###") + + # Print detailed summary of the layer + pkt[layer_to_check].show(dump=True) + + # For IP-based protocols, show src/dst + if pkt.haslayer(IP): + print(f" IP Source: {pkt[IP].src}, IP Destination: {pkt[IP].dst}") + + print("[*] Hexdump:") + hexdump(pkt[layer_to_check]) + + if count == 0: + print(f"[*] No packets with layer '{layer_to_check.name}' found.") + +# The individual pcap analyser functions are now obsolete and can be removed. +# pcap_analyser_eth, pcap_analyser_ip, etc. are no longer needed. + +""" def pcap_analyser_ip(file): - pkts = rdpcap(file) - i=0 - for pkt in pkts: - - if pkt.haslayer(IP): - i += 1 - print "-" * 40 - print "[*] Packet : " + str(i) - print "[+] ###[ IP ] ###" - IPpkt = pkt[IP] - srcIP = IPpkt.fields['src'] - dstIP = IPpkt.fields['dst'] - print "[*] IP Source : " + srcIP - print "[*] IP Destination : " + dstIP - verIP = IPpkt.version - print "[*] IP Version : " ,verIP - ihlIP = IPpkt.ihl - print "[*] IP Ihl : " ,ihlIP - tosIP = IPpkt.tos - print "[*] IP Tos : " ,tosIP - lenIP = IPpkt.len - print "[*] IP Len : " ,lenIP - idIP = IPpkt.id - print "[*] IP Id : " ,idIP - flagsIP = IPpkt.flags - print "[*] IP Flags : " ,flagsIP - fragIP = IPpkt.frag - print "[*] IP Frag : " ,fragIP - ttlIP = IPpkt.ttl - print "[*] IP Ttl : " ,ttlIP - protoIP = IPpkt.proto - print "[*] IP Protocol : " ,protoIP - chksumIP = IPpkt.chksum - print "[*] IP Chksum : " ,chksumIP - optionsIP = IPpkt.options - print "[*] IP Options : " ,optionsIP - print "[*] IP Dump : " - print hexdump(IPpkt) + pkts = rdpcap(file) + i = 0 + for pkt in pkts: + if pkt.haslayer(IP): + i += 1 + print("-" * 40) + print("[*] Packet : {}".format(i)) + print("[+] ###[ IP ] ###") + IPpkt = pkt[IP] + srcIP = IPpkt.fields.get('src') + dstIP = IPpkt.fields.get('dst') + print("[*] IP Source : {}".format(srcIP)) + print("[*] IP Destination : {}".format(dstIP)) + print("[*] IP Version : {}".format(IPpkt.version)) + print("[*] IP Ihl : {}".format(IPpkt.ihl)) + print("[*] IP Tos : {}".format(IPpkt.tos)) + print("[*] IP Len : {}".format(IPpkt.len)) + print("[*] IP Id : {}".format(IPpkt.id)) + print("[*] IP Flags : {}".format(IPpkt.flags)) + print("[*] IP Frag : {}".format(IPpkt.frag)) + print("[*] IP Ttl : {}".format(IPpkt.ttl)) + print("[*] IP Protocol : {}".format(IPpkt.proto)) + print("[*] IP Chksum : {}".format(IPpkt.chksum)) + print("[*] IP Options : {}".format(IPpkt.options)) + print("[*] IP Dump : ") + hexdump(IPpkt) + def pcap_analyser_tcp(file): - pkts = rdpcap(file) - i=0 - SYN = 0x02 - FIN = 0X01 - RST = 0x04 - PSH = 0X08 - ACK = 0X10 - URG = 0x20 - - for pkt in pkts: - - if pkt.haslayer(TCP): - i += 1 - print "-" * 40 - print "[*] Packet : " + str(i) - print "[+] ###[ TCP ] ###" - TCPpkt = pkt[TCP] - sportTCP = TCPpkt.sport - print "[*] TCP Source Port : " ,sportTCP - dportTCP = TCPpkt.dport - print "[*] TCP Destination Port : " ,dportTCP - seqTCP = TCPpkt.seq - print "[*] TCP Seq : " ,seqTCP - ackTCP = TCPpkt.ack - print "[*] TCP Ack : " ,ackTCP - dataofsTCP = TCPpkt.dataofs - print "[*] TCP Dataofs : " ,dataofsTCP - reservedTCP = TCPpkt.reserved - print "[*] TCP Reserved : " ,reservedTCP - flagsTCP = TCPpkt.flags - print "[*] TCP Flags : " ,flagsTCP - windowTCP = TCPpkt.window - print "[*] TCP Window : " ,windowTCP - chksumTCP = TCPpkt.chksum - print "[*] TCP Chksum : " ,chksumTCP - urgptrTCP = TCPpkt.urgptr - print "[*] TCP Urgptr : " ,urgptrTCP - optionsTCP = TCPpkt.options - print "[*] TCP Options : " ,optionsTCP - nbrsyn=0 - nbrrst=0 - nbrack=0 - nbrfin=0 - nbrurg=0 - nbrpsh=0 - FlagsTCP=pkt[TCP].flags - if FlagsTCP==SYN: - nbrsun=1 - print "[*] TCP SYN FLAGS : " ,nbrsyn - elif FlagsTCP==RST: - nbrrst=1 - print "[*] TCP RST FLAGS : " ,nbrrst - elif FlagsTCP==ACK: - nbrack=1 - print "[*] TCP ACK FLAGS : " ,nbrack - elif FlagsTCP==FIN: - nbrfin=1 - print "[*] TCP FIN FLAGS : " ,nbrfin - elif FlagsTCP==URG: - nbrurg=1 - print "[*] TCP URG FLAGS : " ,nbrurg - elif FlagsTCP==PSH: - nbrpsh=1 - print "[*] TCP PSH FLAGS : " ,nbrpsh - print "[*] TCP Dump : " - print hexdump(TCPpkt) + pkts = rdpcap(file) + i = 0 + SYN = 0x02 + FIN = 0x01 + RST = 0x04 + PSH = 0x08 + ACK = 0x10 + URG = 0x20 + + for pkt in pkts: + if pkt.haslayer(TCP): + i += 1 + print("-" * 40) + print("[*] Packet : {}".format(i)) + print("[+] ###[ TCP ] ###") + TCPpkt = pkt[TCP] + sportTCP = TCPpkt.sport + print("[*] TCP Source Port : {}".format(sportTCP)) + dportTCP = TCPpkt.dport + print("[*] TCP Destination Port : {}".format(dportTCP)) + print("[*] TCP Seq : {}".format(TCPpkt.seq)) + print("[*] TCP Ack : {}".format(TCPpkt.ack)) + print("[*] TCP Dataofs : {}".format(TCPpkt.dataofs)) + print("[*] TCP Reserved : {}".format(TCPpkt.reserved)) + flagsTCP = TCPpkt.flags + print("[*] TCP Flags : {}".format(flagsTCP)) + print("[*] TCP Window : {}".format(TCPpkt.window)) + print("[*] TCP Chksum : {}".format(TCPpkt.chksum)) + print("[*] TCP Urgptr : {}".format(TCPpkt.urgptr)) + print("[*] TCP Options : {}".format(TCPpkt.options)) + nbrsyn = 0 + nbrrst = 0 + nbrack = 0 + nbrfin = 0 + nbrurg = 0 + nbrpsh = 0 + FlagsTCP = pkt[TCP].flags + if FlagsTCP == SYN: + nbrsyn = 1 + print("[*] TCP SYN FLAGS : {}".format(nbrsyn)) + elif FlagsTCP == RST: + nbrrst = 1 + print("[*] TCP RST FLAGS : {}".format(nbrrst)) + elif FlagsTCP == ACK: + nbrack = 1 + print("[*] TCP ACK FLAGS : {}".format(nbrack)) + elif FlagsTCP == FIN: + nbrfin = 1 + print("[*] TCP FIN FLAGS : {}".format(nbrfin)) + elif FlagsTCP == URG: + nbrurg = 1 + print("[*] TCP URG FLAGS : {}".format(nbrurg)) + elif FlagsTCP == PSH: + nbrpsh = 1 + print("[*] TCP PSH FLAGS : {}".format(nbrpsh)) + print("[*] TCP Dump : ") + hexdump(TCPpkt) def pcap_analyser_udp(file): - pkts = rdpcap(file) - i=0 - for pkt in pkts: - - if pkt.haslayer(UDP): - i += 1 - print "-" * 40 - print "[*] Packet : " + str(i) - print "[+] ###[ UDP ] ###" - UDPpkt = pkt[UDP] - sportUDP = UDPpkt.sport - print "[*] UDP Source Port : " ,sportUDP - dportUDP = UDPpkt.dport - print "[*] UDP Destination Port : " ,dportUDP - lenUDP = UDPpkt.len - print "[*] UDP Len : " ,lenUDP - chksumUDP = UDPpkt.chksum - print "[*] UDP Chksum : " ,chksumUDP - print "[*] UDP Dump : " - print hexdump(UDPpkt) + pkts = rdpcap(file) + i = 0 + for pkt in pkts: + if pkt.haslayer(UDP): + i += 1 + print("-" * 40) + print("[*] Packet : {}".format(i)) + print("[+] ###[ UDP ] ###") + UDPpkt = pkt[UDP] + print("[*] UDP Source Port : {}".format(UDPpkt.sport)) + print("[*] UDP Destination Port : {}".format(UDPpkt.dport)) + print("[*] UDP Len : {}".format(UDPpkt.len)) + print("[*] UDP Chksum : {}".format(UDPpkt.chksum)) + print("[*] UDP Dump : ") + hexdump(UDPpkt) def pcap_analyser_icmp(file): - pkts = rdpcap(file) - i=0 - for pkt in pkts: - - if pkt.haslayer(ICMP): - i += 1 - print "-" * 40 - print "[*] Packet : " + str(i) - print "[+] ###[ ICMP ] ###" - ICMPpkt = pkt[ICMP] - typeICMP = ICMPpkt.type - print "[*] ICMP Type : " ,typeICMP - codeICMP = ICMPpkt.code - print "[*] ICMP Code : " ,codeICMP - chksumICMP = ICMPpkt.chksum - print "[*] ICMP Chksum : " ,chksumICMP - idICMP = ICMPpkt.id - print "[*] ICMP Id : " ,idICMP - seqICMP = ICMPpkt.seq - print "[*] ICMP Seq : " ,seqICMP - print "[*] ICMP Dump : " - print hexdump(ICMPpkt) - + pkts = rdpcap(file) + i = 0 + for pkt in pkts: + if pkt.haslayer(ICMP): + i += 1 + print("-" * 40) + print("[*] Packet : {}".format(i)) + print("[+] ###[ ICMP ] ###") + ICMPpkt = pkt[ICMP] + print("[*] ICMP Type : {}".format(ICMPpkt.type)) + print("[*] ICMP Code : {}".format(ICMPpkt.code)) + print("[*] ICMP Chksum : {}".format(ICMPpkt.chksum)) + print("[*] ICMP Id : {}".format(ICMPpkt.id)) + print("[*] ICMP Seq : {}".format(ICMPpkt.seq)) + print("[*] ICMP Dump : ") + hexdump(ICMPpkt) +""" def main(): + parser = argparse.ArgumentParser( + description=__description__, + formatter_class=argparse.RawTextHelpFormatter, + epilog=f'''\ +{Style.BRIGHT}Examples:{Style.RESET_ALL} + # Scan common ports on a host + {sys.argv[0]} -s 192.168.1.1 -p scan + + # Scan a specific port range + {sys.argv[0]} -s 192.168.1.1 -p scan -d 1 -t 1024 + + # Perform an ARP ping on the local network + {sys.argv[0]} -s 192.168.1.0/24 -p arp + + # Analyze TCP headers in a pcap file + {sys.argv[0]} -f capture.pcap -p tcp +''' + ) + parser.add_argument("-s", "--serveur", dest="host", help="Target host IP address or domain") + parser.add_argument("-p", "--level", dest="mode", help="Mode of operation (e.g., scan, arp, icmp, geoip, etc.)") + parser.add_argument("-d", "--sport", dest="start_port", type=int, help="Start port for scanning") + parser.add_argument("-t", "--eport", dest="end_port", type=int, help="End port for scanning") + parser.add_argument("-f", "--file", dest="pcap_file", help="PCAP file to read and analyze") + parser.add_argument('--version', action='version', version=f"CyberScan {__version__}") - global serveur - global level - global sport - global eport - global file - global flag - flag=0 - - try: - - parser = argparse.ArgumentParser(version=__version__,description=__description__,formatter_class=argparse.RawTextHelpFormatter,epilog='''\ -levels with ip adress: - scan : scan ports - arp : ping arp - icmp : ping arp - tcp : ping tcp - udp : ping udp - geoip : geolocalisation - -levels with pcap file: - eth : extract ethernet headers - ip : extract ip headers - tcp : extract tcp headers - udp : extract udp headers - icmp : extract icmp headers - - ''') - - parser.add_argument("-s","--serveur", dest="serveur",help="attack to serveur ip") - parser.add_argument("-p","--level",dest="level",help="stack to level") - parser.add_argument("-d","--sport",dest="sport",help="start port to scan") - parser.add_argument("-t","--eport",dest="eport",help="end port to scan") - parser.add_argument("-f", "--file", dest="file", - help="read pcap file") - - - args = parser.parse_args() - serveur = args.serveur - file = args.file - level = args.level - sport = args.sport - eport = args.eport - - - if file is not None or serveur is not None: - - header() - usage() - - if file and level == "eth": - pcap_analyser_eth(file) - elif file and level == "ip": - pcap_analyser_ip(file) - elif file and level == "tcp": - pcap_analyser_tcp(file) - elif file and level == "udp": - pcap_analyser_udp(file) - elif file and level == "icmp": - pcap_analyser_icmp(file) - elif serveur is not None and level == "arp": - arp_ping(serveur) - elif serveur is not None and level == "icmp": - icmp_ping(serveur) - - elif serveur is not None and level == "tcp" and sport is not None: - port = sport - tcp_ping(serveur,port) - - elif serveur is not None and level == "scan" and sport is not None and eport is not None: - start_port = int(sport) - end_port = int(eport) - flag = 0 - superscan(serveur,start_port,end_port) - - elif serveur is not None and level == "scan" and sport is None and eport is None: - start_port = int(0) - end_port = int(0) - flag=1 - superscan(serveur,start_port,end_port) - - elif serveur is not None and level == "udp": - udp_ping(serveur,port=0) - - elif serveur is not None and level == "geoip": - geo_ip(serveur) - - + args = parser.parse_args() - else: - - print '''usage: CyberScan.py [-h] [-s SERVEUR] [-p LEVEL] [-d SPORT] [-t EPORT] - [-f FILE] -use cyberscan -h to help ''' - - except KeyboardInterrupt: - print "\n[*] You Pressed Ctrl+C. Exiting" - sys.exit(1) + if not any([args.host, args.pcap_file]): + parser.print_help() + sys.exit(1) + header() + print(f'''{Fore.GREEN}CyberScan v{__version__} | http://github/medbenali/CyberScan +It is the end user's responsibility to obey all applicable laws. +This is a server testing script. Your IP is visible.{Fore.RESET}\n''') - -if __name__ == '__main__': - main() + try: + if args.host: + if args.mode == "scan": + if args.start_port and args.end_port: + superscan(args.host, args.start_port, args.end_port) + else: + superscan(args.host) # Scan common ports + elif args.mode == "arp": + arp_ping(args.host) + elif args.mode == "icmp": + icmp_ping(args.host) + elif args.mode == "tcp": + if not args.start_port: + print("Error: TCP ping requires a port. Use -d .") + sys.exit(1) + tcp_ping(args.host, args.start_port) + elif args.mode == "udp": + udp_ping(args.host, port=0) + elif args.mode == "geoip": + geo_ip(args.host) + else: + print(f"Error: Invalid mode '{args.mode}' for a host target.") + + elif args.pcap_file: + if args.mode in ["eth", "ip", "tcp", "udp", "icmp"]: + pcap_analyser(args.pcap_file, args.mode) + else: + print(f"Error: Invalid mode '{args.mode}' for a pcap file.") + else: + parser.print_help() - - - - + except KeyboardInterrupt: + print("\n[*] You Pressed Ctrl+C. Exiting") + sys.exit(1) +if __name__ == '__main__': + main()