diff --git a/.gitignore b/.gitignore index 71b820f..22961d7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ *.pyc -dumps __pycache__ +*.txt +/com/ +.*.swp diff --git a/README.md b/README.md index e199d4e..800c9cb 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +# smrt A utility to configure your TP-Link Easy Smart Switch on Linux or Mac OS X. This tool is written in Python. @@ -7,6 +8,244 @@ Supposedly supported switches: * TL-SG105E (tested) * TL-SG108E (tested) * TL-SG108PE -* TL-SG1016DE +* TL-SG1016DE (tested) * TL-SG1024DE +## Discover switches + +### Simple discovery + +``` +$ ./discovery.py +``` + +### Multiple interfaces + +If more than one interface, error message gives the list: + +``` +Error: more than 1 interface. Use -i or --interface to specify the name +Interfaces: + 'enp3s0' + 'virbr0' + 'virbr0-nic' +``` + +### Default output + +Give the right interface: +``` +$ ./discovery.py -i enp3s0 +``` + +Output (found 2 switches): +``` +192.168.9.36 +ba.ff.ee.ff.ac.ee +(1, 'type', 'TL-SG108E') +(2, 'hostname', 'sg108ev4') +(3, 'mac', '01:01:01:01:01:01') +(7, 'firmware', '1.0.0 Build 20181120 Rel.40749') +(8, 'hardware', 'TL-SG108E 4.0') +(9, 'dhcp', False) +(4, 'ip_addr', IPv4Address('192.168.9.202')) +(5, 'ip_mask', IPv4Address('255.255.255.0')) +(6, 'gateway', IPv4Address('192.168.9.1')) +(13, 'auto_save', True) +---------------- +(1, 'type', 'TL-SG108E') +(2, 'hostname', 'sg108ev1') +(3, 'mac', '02:02:02:02:02:02') +(7, 'firmware', '1.1.2 Build 20141017 Rel.50749') +(8, 'hardware', 'TL-SG108E 1.0') +(9, 'dhcp', False) +(4, 'ip_addr', IPv4Address('192.168.9.203')) +(5, 'ip_mask', IPv4Address('255.255.255.0')) +(6, 'gateway', IPv4Address('192.168.9.1')) +``` + +### Command output with -c + +With switch `-c` or `--command`, output gives the syntax for using smrt with right arguments + +``` +$ ./discovery.py -i enp3s0 -c +./smrt.py --username admin --password admin -i enp3s0 --switch-mac 01:01:01:01:01:01 +./smrt.py --username admin --password admin -i enp3s0 --switch-mac 02:02:02:02:02:02 +``` + +### Alias + +After discovery, setting an shell alias reduces command size + +``` +$ alias smrt='python ~/smrt/smrt.py --switch-mac 60:E3:27:83:25:3F -i eth0 --username admin --password admin' +``` + +## smrt.py + +### without command gives list of command + +``` +$ ./smrt.py --username admin --password admin --interface==eth0 --switch-mac 01:01:01:01:01:01 +Actions: type hostname mac ip_addr ip_mask gateway firmware hardware dhcp num_ports v4 username password save get_token_id igmp_snooping ports trunk mtu_vlan vlan_enabled vlan pvid vlan_filler qos1 qos2 mirror stats loop_prev +``` + +### vlan + +``` +$ smrt vlan # use alias + +(8704, 'vlan_enabled', '01') +(8705, 'vlan', [1, '1,2,3,4,5,6,7,8', '', 'Default_VLAN']) +(8705, 'vlan', [90, '1,2,3,4,5,6,7,8', '', 'LAN']) +(8705, 'vlan', [100, '1,2,3', '1', 'vlan_test_1']) +(8707, 'vlan_filler', ' ') +``` + +### pvid + +``` +$ smrt pvid +(8706, 'pvid', (1, 90)) +(8706, 'pvid', (2, 90)) +(8706, 'pvid', (3, 90)) +(8706, 'pvid', (4, 90)) +(8706, 'pvid', (5, 90)) +(8706, 'pvid', (6, 90)) +(8706, 'pvid', (7, 90)) +(8706, 'pvid', (8, 90)) +(8707, 'vlan_filler', ' ') +``` + +### stats + +``` +$ smrt stats +(16384, 'stats', (1, 1, 6, 48, 0, 6356, 14)) +(16384, 'stats', (2, 1, 0, 0, 0, 0, 0)) +(16384, 'stats', (3, 1, 0, 0, 0, 0, 0)) +(16384, 'stats', (4, 1, 5, 6404, 0, 0, 14)) +(16384, 'stats', (5, 1, 0, 0, 0, 0, 0)) +(16384, 'stats', (6, 1, 0, 0, 0, 0, 0)) +(16384, 'stats', (7, 1, 0, 0, 0, 0, 0)) +(16384, 'stats', (8, 1, 0, 0, 0, 0, 0)) +``` + +### smrt ports + +``` +$ smrt ports +(4096, 'ports', '01:01:00:01:06:00:00') +(4096, 'ports', '02:01:00:01:00:00:00') +(4096, 'ports', '03:01:00:01:00:00:00') +(4096, 'ports', '04:01:00:01:05:00:00') +(4096, 'ports', '05:01:00:01:00:00:00') +(4096, 'ports', '06:01:00:01:00:00:00') +(4096, 'ports', '07:01:00:01:00:00:00') +(4096, 'ports', '08:01:00:01:00:00:00') +``` +## Set VLAN settings + +### Syntax + +`smrt.py` shows parameters and can change them for VLANs if `--vlan` is present + +Specific VLAN parameters: + +* `--vlan`: vlan number (1-4093) +* `--vlan_name`: acceptable vlan name for TP-Link switch +* `--vlan_member`: comma separated list without space of member ports. Ex: 1,2,4 +* `--vlan_tagged`: comma separated list without space of tagged ports. Ex: 1,2 +* `--vlan_pvid`: set pvid to `--vlan` for given ports + +### Example 1 : add new vlan 120 + +``` +$ smrt vlan +(8704, 'vlan_enabled', '01') +(8705, 'vlan', [1, '1,2,3,4,5,6,7,8', '', 'Default_VLAN']) +(8705, 'vlan', [90, '1,2,3,4,5,6,7,8', '', 'LAN']) +(8705, 'vlan', [100, '1,2,3', '1', 'vlan_test_1']) +(8707, 'vlan_filler', ' ') +$ smrt vlan --vlan 120 --vlan_name "vlan_test_2" --vlan_member 1,4,5,6 --vlan_tagged 1 +(8704, 'vlan_enabled', '01') +(8705, 'vlan', [1, '1,2,3,4,5,6,7,8', '', 'Default_VLAN']) +(8705, 'vlan', [90, '1,2,3,4,5,6,7,8', '', 'LAN']) +(8705, 'vlan', [100, '1,2,3', '1', 'vlan_test_1']) +(8705, 'vlan', [120, '1,4,5,6', '1', 'vlan_test_2']) +(8707, 'vlan_filler', ' ') +``` + +### Example 2 : change to pvid 120 for ports 5 and 6 + +Note : vlan must exist (see example 1) before this command. + +``` +$ smrt pvid +(8706, 'pvid', (1, 90)) +(8706, 'pvid', (2, 90)) +(8706, 'pvid', (3, 90)) +(8706, 'pvid', (4, 90)) +(8706, 'pvid', (5, 90)) +(8706, 'pvid', (6, 90)) +(8706, 'pvid', (7, 90)) +(8706, 'pvid', (8, 90)) +(8707, 'vlan_filler', ' ') +$ smrt vlan --vlan 120 --vlan_pvid 5,6 +(8706, 'pvid', (1, 90)) +(8706, 'pvid', (2, 90)) +(8706, 'pvid', (3, 90)) +(8706, 'pvid', (4, 90)) +(8706, 'pvid', (5, 120)) +(8706, 'pvid', (6, 120)) +(8706, 'pvid', (7, 90)) +(8706, 'pvid', (8, 90)) +(8707, 'vlan_filler', ' ') +``` + +### Exemple 3 : remove vlan 120 + +``` +$ smrt vlan --vlan 120 --delete +(8704, 'vlan_enabled', '01') +(8705, 'vlan', [1, '1,2,3,4,5,6,7,8', '', 'Default_VLAN']) +(8705, 'vlan', [90, '1,2,3,4,5,6,7,8', '', 'LAN']) +(8705, 'vlan', [100, '1,2,3', '1', 'vlan_test_1']) +(8707, 'vlan_filler', ' ') +``` + +Note that, for PVID, corresponding ports are reinitialized to vlan 1: +``` +$ smrt pvid +(8706, 'pvid', (1, 90)) +(8706, 'pvid', (2, 90)) +(8706, 'pvid', (3, 90)) +(8706, 'pvid', (4, 90)) +(8706, 'pvid', (5, 1)) +(8706, 'pvid', (6, 1)) +(8706, 'pvid', (7, 90)) +(8706, 'pvid', (8, 90)) +(8707, 'vlan_filler', ' ') +``` + +### Exemple 4 : add new vlan and pvid + +``` +$ smrt vlan --vlan 130 --vlan_name "vlan_test_3" --vlan_member 1,4,5,6 --vlan_tagged 1 --vlan_pvid 4,5,6 +(8704, 'vlan_enabled', '01') +(8705, 'vlan', [1, '1,2,3,4,5,6,7,8', '', 'Default_VLAN']) +(8705, 'vlan', [90, '1,2,3,4,5,6,7,8', '', 'LAN']) +(8705, 'vlan', [100, '1,2,3', '1', 'vlan_test_1']) +(8705, 'vlan', [130, '1,4,5,6', '1', 'vlan_test_3']) +(8707, 'vlan_filler', ' ') +(8706, 'pvid', (1, 90)) +(8706, 'pvid', (2, 90)) +(8706, 'pvid', (3, 90)) +(8706, 'pvid', (4, 130)) +(8706, 'pvid', (5, 130)) +(8706, 'pvid', (6, 130)) +(8706, 'pvid', (7, 90)) +(8706, 'pvid', (8, 90)) +(8707, 'vlan_filler', ' ') +``` diff --git a/binary.py b/binary.py new file mode 100644 index 0000000..674f8f6 --- /dev/null +++ b/binary.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 + +SEP = "," + +def ports2list(ports): + if ports is None: + l = [] + else: + try: + l = [int(x) for x in ports.split(SEP)] + except ValueError: + l = [] + return l + +def ports2byte(ports): + out = 0 + l = ports2list(ports) + if l == []: + out = 0 + else: + for i in l: + out |= (1 << (int(i) - 1)) + return out + +def byte2ports(byte): + out = [] + for i in range(32): + if byte % 2: + out.append(str(i + 1)) + byte >>= 1 + return SEP.join(out) + +def mac_to_bytes(mac): + return bytes(int(byte, 16) for byte in mac.split(':')) + +def mac_to_str(mac): + return ':'.join(format(s, '02x') for s in mac) + + +if __name__ == '__main__': + a = ports2byte("1,2,5,6,8,12,15") + print(a, byte2ports(a)) + print(ports2list("1,2")) + m = mac_to_bytes("ba:ff:ee:ff:ac:ee") + print(mac_to_str(m)) diff --git a/discovery.py b/discovery.py new file mode 100755 index 0000000..6002b11 --- /dev/null +++ b/discovery.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python + +import random +import logging +import argparse + +from protocol import Protocol +from network import Network, ConnectionProblem, InterfaceProblem +from loglevel import loglevel + +logger = logging.getLogger(__name__) + +def discover_switches(interface=None): + net = Network(interface) + net.send(Protocol.DISCOVERY, {}) + ret = [] + while True: + try: + header, payload = net.receive() + ret.append((net.ip_address, net.host_mac, header, payload)) + except ConnectionProblem: + break + return ret + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--interface', '-i') + parser.add_argument('--command', '-c', action="store_true") + parser.add_argument('--loglevel', '-l', type=loglevel, default='INFO') + args = parser.parse_args() + logging.basicConfig(level=args.loglevel) + try: + switches = discover_switches(args.interface) + except InterfaceProblem as e: + print("Error:", e) + else: + for ip, mac, header, payload in switches: + if args.command: + p = {x[1]: x[2] for x in payload} + cmd = f"./smrt.py --username admin --password admin --host-mac={mac} --ip-address={ip} --switch-mac {p['mac']}" + print(cmd) + else: + print(ip, mac, *payload, sep="\n") + print("-"*16) + +if __name__ == "__main__": + main() diff --git a/loglevel.py b/loglevel.py new file mode 100644 index 0000000..d013946 --- /dev/null +++ b/loglevel.py @@ -0,0 +1,10 @@ +#!/usr/bin/env python3 +import logging +def loglevel(x): + try: + return getattr(logging, x.upper()) + except AttributeError: + raise argparse.ArgumentError('Select a proper loglevel') + +if __name__ == '__main__': + pass diff --git a/network.py b/network.py new file mode 100755 index 0000000..0bfbb20 --- /dev/null +++ b/network.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python + +import netifaces +import socket, random, logging + +from protocol import Protocol +from binary import byte2ports,mac_to_str,mac_to_bytes + +logger = logging.getLogger(__name__) + +class ConnectionProblem(Exception): + pass + +class InterfaceProblem(Exception): + pass + +class Network: + + BROADCAST_ADDR = "255.255.255.255" + UDP_SEND_TO_PORT = 29808 + UDP_RECEIVE_FROM_PORT = 29809 + + def __init__(self, interface=None, switch_mac="00:00:00:00:00:00"): + + # Normally, this module will be initialized with the MAC address of the switch we want to talk to + # There are however two other modes that might be used: + # - Specify MAC address fe:ff:ff:ff:ff:ff to go into broadcast listen mode. We use this to snoop on bidirectional traffic + # - Specify MAC address ff:ff:ff:ff:ff:ff to go into "fake switch" mode, where we reply to other clients + + self.switch_mac = switch_mac + self.ip_address, self.host_mac = self.get_interface(interface) + + self.sequence_id = random.randint(0, 1000) + + self.header = Protocol.header["blank"].copy() + self.header.update({ + 'sequence_id': self.sequence_id, + 'host_mac': mac_to_bytes(self.host_mac), + 'switch_mac': mac_to_bytes(self.switch_mac), + }) + + # Sending socket + self.ss = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + if switch_mac == "fe:ff:ff:ff:ff:ff" or switch_mac == "ff:ff:ff:ff:ff:ff": + self.ss.bind((Network.BROADCAST_ADDR, Network.UDP_SEND_TO_PORT)) + self.ss.settimeout(10) + else: + self.ss.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + self.ss.bind((self.ip_address, Network.UDP_RECEIVE_FROM_PORT)) + + # Receiving socket + self.rs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + if switch_mac == "ff:ff:ff:ff:ff:ff": + # This is a signal that we'll be operating in fake switch mode, rather than sending out commands + # For this, we need to switch the way that the recieving socket binds, to bind locally instead. + self.rs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + self.rs.bind((self.ip_address, Network.UDP_SEND_TO_PORT)) + + else: + + # Receiving socket + self.rs.bind((Network.BROADCAST_ADDR, Network.UDP_RECEIVE_FROM_PORT)) + self.rs.settimeout(10) + + def get_interface(self, interface=None): + if interface is None: + interfaces = netifaces.interfaces() + if "lo" in interfaces: + interfaces.remove("lo") + if len(interfaces) > 1: + msg = ["more than 1 interface. Use -i or --interface to specify the name"] + msg.append("Interfaces:") + for iface in interfaces: + msg.append(" " + repr(iface)) + raise InterfaceProblem("\n".join(msg)) + + settings = [] + addrs = netifaces.ifaddresses(interface) + logger.debug("addrs:" + repr(addrs)) + if netifaces.AF_INET not in addrs: + raise InterfaceProblem("not AF_INET address") + if netifaces.AF_LINK not in addrs: + raise InterfaceProblem("not AF_LINK address") + + mac = addrs[netifaces.AF_LINK][0]['addr'] + # take first address of interface + addr = addrs[netifaces.AF_INET][0] + if 'broadcast' not in addr or 'addr' not in addr: + raise InterfaceProblem("no addr or broadcast for address") + ip = addr['addr'] + + logger.debug("get_interface: %s %s %s " % (interface, ip, mac)) + + return ip, mac + + def send(self, op_code, payload): + self.sequence_id = (self.sequence_id + 1) % 1000 + self.header.update({ + 'sequence_id': self.sequence_id, + 'op_code': op_code, + }) + packet = Protocol.assemble_packet(self.header, payload) + logger.debug('Sending Packet: ' + packet.hex()) + packet = Protocol.encode(packet) + logger.debug('Sending Header: ' + str(self.header)) + logger.debug('Sending Payload: ' + str(payload)) + if self.switch_mac == "ff:ff:ff:ff:ff:ff": + self.rs.sendto(packet, (Network.BROADCAST_ADDR, Network.UDP_RECEIVE_FROM_PORT)) + else: + self.ss.sendto(packet, (Network.BROADCAST_ADDR, Network.UDP_SEND_TO_PORT)) + + def setHeader(self, header): + self.header = header + + def receive(self): + data = self.receive_socket(self.rs) + if data: + data = Protocol.decode(data) + logger.debug('Receive Packet: ' + data.hex()) + header, payload = Protocol.split(data) + header, payload = Protocol.interpret_header(header), Protocol.interpret_payload(payload) + logger.debug('Received Header: ' + str(header)) + logger.debug('Received Payload: ' + str(payload)) + self.header['token_id'] = header['token_id'] + return header, payload + else: + raise ConnectionProblem() + + def receive_socket(self, socket): + data = False + try: + data, addr = socket.recvfrom(1500) + except: + return False + return data + + def query(self, op_code, payload): + self.send(op_code, payload) + header, payload = self.receive() + return header, payload + + def login_dict(self, username, password): + return [ + (Protocol.get_id('username'), username.encode('ascii') + b'\x00'), + (Protocol.get_id('password'), password.encode('ascii') + b'\x00'), + ] + + def login(self, username, password): + self.query(Protocol.GET, [(Protocol.get_id("get_token_id"), b'')]) + self.query( + Protocol.LOGIN, + self.login_dict(username, password) + ) + + def set(self, username, password, payload): + self.query(Protocol.GET, [(Protocol.get_id("get_token_id"), b'')]) + real_payload = self.login_dict(username, password) + real_payload += payload + header, payload = self.query( + Protocol.LOGIN, + real_payload + ) + return header, payload diff --git a/protocol.py b/protocol.py new file mode 100644 index 0000000..47c9f00 --- /dev/null +++ b/protocol.py @@ -0,0 +1,206 @@ +import struct +from ipaddress import ip_address +from binary import byte2ports,mac_to_str + +class Protocol: + PACKET_END = b'\xff\xff\x00\x00' + + KEY = bytes ([191, 155, 227, 202, 99, 162, 79, 104, 49, 18, 190, 164, 30, + 76, 189, 131, 23, 52, 86, 106, 207, 125, 126, 169, 196, 28, 172, 58, + 188, 132, 160, 3, 36, 120, 144, 168, 12, 231, 116, 44, 41, 97, 108, + 213, 42, 198, 32, 148, 218, 107, 247, 112, 204, 14, 66, 68, 91, 224, + 206, 235, 33, 130, 203, 178, 1, 134, 199, 78, 249, 123, 7, 145, 73, + 208, 209, 100, 74, 115, 72, 118, 8, 22, 243, 147, 64, 96, 5, 87, 60, + 113, 233, 152, 31, 219, 143, 174, 232, 153, 245, 158, 254, 70, 170, + 75, 77, 215, 211, 59, 71, 133, 214, 157, 151, 6, 46, 81, 94, 136, + 166, 210, 4, 43, 241, 29, 223, 176, 67, 63, 186, 137, 129, 40, 248, + 255, 55, 15, 62, 183, 222, 105, 236, 197, 127, 54, 179, 194, 229, + 185, 37, 90, 237, 184, 25, 156, 173, 26, 187, 220, 2, 225, 0, 240, + 50, 251, 212, 253, 167, 17, 193, 205, 177, 21, 181, 246, 82, 226, + 38, 101, 163, 182, 242, 92, 20, 11, 95, 13, 230, 16, 121, 124, 109, + 195, 117, 39, 98, 239, 84, 56, 139, 161, 47, 201, 51, 135, 250, 10, + 19, 150, 45, 111, 27, 24, 142, 80, 85, 83, 234, 138, 216, 57, 93, + 65, 154, 141, 122, 34, 140, 128, 238, 88, 89, 9, 146, 171, 149, 53, + 102, 61, 114, 69, 217, 175, 103, 228, 35, 180, 252, 200, 192, 165, + 159, 221, 244, 110, 119, 48]) + + header = { + "len": 32, + "fmt": '!bb6s6shihhhhi', + "blank": { + 'version': 1, + 'op_code': 0, + 'switch_mac': b'\x00\x00\x00\x00\x00\x00', + 'host_mac': b'\x00\x00\x00\x00\x00\x00', + 'sequence_id': 0, + 'error_code': 0, + 'check_length': 0, + 'fragment_offset': 0, + 'flag': 0, + 'token_id': 0, + 'checksum': 0, + } + } + + DISCOVERY = 0 + GET = 1 + SET = 2 + LOGIN = 3 + RETURN = 4 + READ5 = 5 + + op_codes = { + DISCOVERY: 'DISCOVERY', + GET: 'GET', + SET: 'SET', + LOGIN: 'LOGIN', + RETURN: 'RETURN', + READ5: 'READ5' + } + + sequences = { + # name ->switch switch-> + 'login/change': (LOGIN, RETURN), + 'query': (GET, SET), + 'discover': (DISCOVERY, SET), + } + + ids_tp = { + 1: ('str', 'type', 0), + 2: ('str', 'hostname', 1, "Without arguments, shows switch details"), + 3: ('hex', 'mac', 0), + 4: ('ip', 'ip_addr', 0), + 5: ('ip', 'ip_mask', 0), + 6: ('ip', 'gateway', 0), + 7: ('str', 'firmware', 0), + 8: ('str', 'hardware', 0), + 9: ('bool', 'dhcp', 0), + 10: ('dec', 'num_ports', 0), + 12: ('bool', 'led_status', 0), + 13: ('bool', 'auto_save', 0), + 14: ('bool', 'is_factory', 0), + 15: ('hex', 'flash_type', 0), + 512: ('str', 'username', 0), + 514: ('str', 'password', 0), + 773: ('bool', 'reboot', 1, "Reboot the device"), + 2304: ('action','save', 1, "Saves the current configuration"), + 2305: ('action','get_token_id',1, "Unknown"), + 4352: ('bool', 'igmp_snooping', 0), + 4096: ('hex', 'ports', 0), + 4608: ('hex', 'trunk', 0), + 8192: ('hex', 'mtu_vlan', 0), + 8704: ('hex', 'vlan_enabled', 0), + 8705: ('vlan', 'vlan', 1, "Configure VLAN Membership"), + 8706: ('pvid', 'pvid', 0), + 8707: ('str', 'vlan_filler', 0), + 12288: ('bool', 'qos1', 0), + 12289: ('hex', 'qos2', 0), + 16640: ('hex', 'mirror', 0), + 16384: ('stat', 'stats', 0), + 17152: ('bool', 'loop_prev', 0), + } + + tp_ids = {v[1]: k for k, v in ids_tp.items()} + + def get_sequence_kind(sequence): + for key, value in Protocol.sequences.items(): + if value == sequence: return key + return 'unknown' + + def get_id(name): + return Protocol.tp_ids[name] + + def decode(data): + data = bytearray(data) + s = bytearray(Protocol.KEY) + j = 0 + for k in range(len(data)): + i = (k + 1) & 255 + j = (j + s[i]) & 255 + s[i], s[j] = s[j], s[i] + data[k] = data[k] ^ s[(s[i] + s[j]) & 255] + return bytes(data) + + encode = decode + + def split(data): + if len(data) < Protocol.header["len"] + len(Protocol.PACKET_END): + raise AssertionError('invalid data length') + if not data.endswith(Protocol.PACKET_END): + raise AssertionError('data without packet end') + return (data[0:Protocol.header["len"]], data[Protocol.header["len"]:]) + + def interpret_header(header): + names = Protocol.header['blank'].keys() + vals = struct.unpack(Protocol.header['fmt'], header) + return dict(zip(names, vals)) + + def interpret_payload(payload): + results = [] + while len(payload) > len(Protocol.PACKET_END): + dtype, dlen = struct.unpack('!hh', payload[0:4]) + data = payload[4:4+dlen] + results.append( ( + dtype, + Protocol.ids_tp[dtype][1], + Protocol.interpret_value(data, Protocol.ids_tp[dtype][0]) + ) + ) + payload = payload[4+dlen:] + return results + + def analyze(data): + header, payload = Protocol.split(data) + return Protocol.interpret_header(header), Protocol.interpret_payload(payload) + + def assemble_packet(header, payload): + payload_bytes = b'' + for dtype, value in payload: + payload_bytes += struct.pack('!hh', dtype, len(value)) + payload_bytes += value + header['check_length'] = Protocol.header["len"] + len(payload_bytes) + len(Protocol.PACKET_END) + header = tuple(header[part] for part in Protocol.header['blank'].keys()) + header_bytes = struct.pack(Protocol.header['fmt'], *header) + return header_bytes + payload_bytes + Protocol.PACKET_END + + def interpret_value(value, kind): + if kind == 'str': + value = value.split(b'\x00', 1)[0].decode('ascii') + elif kind == 'ip': + value = ip_address(value) + elif kind == 'hex': + value = mac_to_str(value) + elif kind == 'action': + value = "n/a" + elif kind == 'dec': + value = int.from_bytes(value, 'big') + elif kind == 'vlan': + value = list(struct.unpack("!hii", value[:10]) + (value[10:-1].decode('ascii'), )) + value[1] = byte2ports(value[1]) + value[2] = byte2ports(value[2]) + elif kind == 'pvid': + value = struct.unpack("!bh", value) if value else None + elif kind == 'stat': + value = struct.unpack("!bbbiiii", value) + elif kind == 'bool': + if len(value) == 0: + pass + elif len(value) == 1: + value = value[0] > 0 + else: + raise AssertionError('boolean should be one byte long') + return value + + def set_vlan(vlan_num, member_mask, tagged_mask, vlan_name): + value = struct.pack("!hii",vlan_num, member_mask, tagged_mask) + vlan_name.encode("ascii") + b'\x00' + return value + + def set_pvid(vlan_num, port): + value = struct.pack("!bh", port, vlan_num) + return value + +if __name__ == "__main__": + #v = Protocol.set_vlan(10, 255, 254, "test") + #print(v, len(v)) + out = bytearray() + print(Protocol.set_pvid(90, 5).hex()) diff --git a/setup.py b/setup.py deleted file mode 100644 index 0ccf676..0000000 --- a/setup.py +++ /dev/null @@ -1,63 +0,0 @@ -# -*- coding: utf-8 -*- - -try: - from setuptools import setup -except ImportError: - from distutils.core import setup - -try: - import pypandoc - # for PyPI: Removing images with relative paths and their descriptions: - import re - LDESC = open('README.md', 'r').read() - matches = re.findall(r'\n\n(.*(\n.+)*:\n\n!\[.*\]\((.*\))(\n\n)?)', LDESC) - for match in matches: - text, _, link, _ = match - if text.startswith('http://'): continue - LDESC = LDESC.replace(text, '') - # Converting to rst - LDESC = pypandoc.convert(LDESC, 'rst', format='md') -except (ImportError, IOError, RuntimeError): - LDESC = '' - -setup(name='smrt', - version = '0.1.dev', - description = 'Python package and software for the TP-Link TL-SG105E and TL-SG108E smart switches', - long_description = LDESC, - author = 'Philipp Klaus', - author_email = 'philipp.l.klaus@web.de', - url = 'https://github.com/pklaus/smrt', - license = 'GPL', - #packages = ['smrt', 'smrt.discovery', 'smrt.operations', 'smrt.protocol', 'smrt.smrt'], - packages = ['smrt'], - entry_points = { - 'console_scripts': [ - 'smrt.cli = smrt.smrt:main', - 'smrt.discovery = smrt.discovery:main', - ], - }, - include_package_data = True, - zip_safe = True, - platforms = 'any', - install_requires = ['netifaces'], - extras_require = { - #'savescreen': ["Pillow",], - }, - #package_data = { - # '': ['resources/*.png'], - #}, - keywords = 'TP-Link TL-SG105E TL-SG108E', - classifiers = [ - 'Development Status :: 4 - Beta', - 'Operating System :: OS Independent', - 'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)', - 'Programming Language :: Python', - 'Programming Language :: Python :: 3', - 'Topic :: Scientific/Engineering :: Visualization', - 'Topic :: Scientific/Engineering', - 'Topic :: System :: Hardware :: Hardware Drivers', - 'Intended Audience :: Science/Research', - ] -) - - diff --git a/smrt.py b/smrt.py new file mode 100755 index 0000000..bd08f9c --- /dev/null +++ b/smrt.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python + +import socket, time, random, argparse, logging + +from protocol import Protocol +from network import Network, InterfaceProblem +from binary import ports2byte, ports2list + +def loglevel(x): + try: + return getattr(logging, x.upper()) + except AttributeError: + raise argparse.ArgumentError('Select a proper loglevel') + +def main(): + logger = logging.getLogger(__name__) + parser = argparse.ArgumentParser() + parser.add_argument('--switch-mac', '-s') + parser.add_argument('--interface', '-i') + parser.add_argument('--username', '-u') + parser.add_argument('--password', '-p') + parser.add_argument('--vlan', type=int) + parser.add_argument('--vlan_name') + parser.add_argument('--vlan_member') + parser.add_argument('--vlan_tagged') + parser.add_argument('--vlan_pvid') + parser.add_argument('--delete', action="store_true") + parser.add_argument('--loglevel', '-l', type=loglevel, default='INFO') + parser.add_argument('action', default=None, nargs='?') + args = parser.parse_args() + + logging.basicConfig(level=args.loglevel) + actions = Protocol.tp_ids + + if args.action not in Protocol.tp_ids: + print("Actions:\n") + for action in Protocol.ids_tp.keys(): + if Protocol.ids_tp[action][2]: + print("%s:%s %s [%s]" % + (Protocol.ids_tp[action][1], + tabout(Protocol.ids_tp[action][1]), + Protocol.ids_tp[action][3], + Protocol.ids_tp[action][0])) + else: + net = None + try: + net = Network(args.interface, args.switch_mac) + except InterfaceProblem as e: + print("Error:", e) + + if net: + net.login(args.username, args.password) + else: + exit(1) + + if args.action == "vlan" and args.vlan: + if args.vlan_member or args.vlan_tagged or args.delete: + if (args.delete): + v = Protocol.set_vlan(int(args.vlan), 0, 0, "") + else: + v = Protocol.set_vlan(int(args.vlan), ports2byte(args.vlan_member), ports2byte(args.vlan_tagged), args.vlan_name) + l = [(actions["vlan"], v)] + + if args.vlan_pvid is not None: + l = [] + for port in ports2list(args.vlan_pvid): + if port != 0: + l.append( (actions["pvid"], Protocol.set_pvid(args.vlan, port)) ) + header, payload = net.set(args.username, args.password, l) + elif args.action in actions: + header, payload = net.query(Protocol.GET, [(actions[args.action], b'')]) + print(*payload, sep="\n") + +def tabout(cmd): + # Format help output + if len(cmd) < 8: + return "\t\t" + return "\t" + +if __name__ == "__main__": + main() diff --git a/smrt/__init__.py b/smrt/__init__.py deleted file mode 100644 index 7b5a59c..0000000 --- a/smrt/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ - -class SmrtPyException(Exception): pass - -class ConnectionException(SmrtPyException): pass - -class IncompatiblePlatformException(SmrtPyException): pass - -#from . import protocol -#from . import operations -#from . import network -#from . import discovery -#from . import smrt diff --git a/smrt/discovery.py b/smrt/discovery.py deleted file mode 100755 index 975242e..0000000 --- a/smrt/discovery.py +++ /dev/null @@ -1,90 +0,0 @@ -#!/usr/bin/env python - -import sys, socket, random, logging, platform, argparse - -import netifaces - -from . import IncompatiblePlatformException -from .protocol import * -from .network import * - -DISCOVERY_TIMEOUT = 0.5 - -logger = logging.getLogger(__name__) - -def discover_switches(interfaces='all'): - if interfaces == 'all': - interfaces = netifaces.interfaces() - settings = [] - for iface in interfaces: - addrs = netifaces.ifaddresses(iface) - if netifaces.AF_INET not in addrs: continue - if netifaces.AF_LINK not in addrs: continue - assert len(addrs[netifaces.AF_LINK]) == 1 - mac = addrs[netifaces.AF_LINK][0]['addr'] - for addr in addrs[netifaces.AF_INET]: - if 'broadcast' not in addr or 'addr' not in addr: continue - settings.append((iface, addr['addr'], mac, addr['broadcast'])) - - for iface, ip, mac, broadcast in settings: - logger.warning((iface, ip, mac, broadcast)) - sequence_id = random.randint(0, 1000) - header = DEFAULT_HEADER.copy() - header.update({ - 'sequence_id': sequence_id, - 'host_mac': bytes(int(byte, 16) for byte in mac.split(':')), - }) - packet = assemble_packet(header, {}) - packet = encode(packet) - - if platform.system().lower() == 'darwin': - rs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - rs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - rs.bind(('', UDP_RECEIVE_FROM_PORT)) - rs.settimeout(DISCOVERY_TIMEOUT) - elif platform.system().lower() == 'linux': - rs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - rs.bind((BROADCAST_ADDR, UDP_RECEIVE_FROM_PORT)) - #rs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - rs.settimeout(DISCOVERY_TIMEOUT) - else: - raise IncompatiblePlatformException() - - ss = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - ss.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - ss.bind((ip, UDP_RECEIVE_FROM_PORT)) - ss.sendto(packet, (BROADCAST_ADDR, UDP_SEND_TO_PORT)) - ss.close() - - - - while True: - try: - data, addr = rs.recvfrom(1500) - except: - break - data = decode(data) - header, payload = split(data) - payload = interpret_payload(payload) - context = {'iface': iface, 'ip': ip, 'mac': mac, 'broadcast': broadcast} - yield context, header, payload - rs.close() - -def main(): - if platform.system().lower() not in ('darwin', 'linux'): - sys.stderr.write('Discovery is not implemented for the platform %s' % platform.system()) - sys.exit(4) - parser = argparse.ArgumentParser() - parser.add_argument('interfaces', metavar='INTERFACE', nargs='*', default='all') - args = parser.parse_args() - logging.basicConfig(level=logging.WARNING) - switches = discover_switches(interfaces=args.interfaces) - for context, header, payload in switches: - get = lambda kind: get_payload_item_value(payload, kind) - fmt = "Found a switch: Host: (Interface: {iface:8s} IP: {host_ip} Broadcast: {broadcast})\n" - fmt += " Switch: (Kind: {kind:12s} MAC Address: {mac} IP Address: {switch_ip})" - print(fmt.format(iface=context['iface'], host_ip=context['ip'], broadcast=context['broadcast'], - kind=get('type'), mac=get('mac'), switch_ip=get('ip_addr'))) - -if __name__ == "__main__": main() diff --git a/smrt/network.py b/smrt/network.py deleted file mode 100755 index 2ed44ab..0000000 --- a/smrt/network.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/usr/bin/env python - -import socket, random, logging - -from .protocol import * -from .operations import * - -BROADCAST_ADDR = "255.255.255.255" -UDP_SEND_TO_PORT = 29808 -UDP_RECEIVE_FROM_PORT = 29809 - -logger = logging.getLogger(__name__) - -def mac_to_bytes(mac): - return bytes(int(byte, 16) for byte in mac.split(':')) - -def mac_to_str(mac): - return ':'.join('{:02X}'.format(byte) for byte in mac) - -class SwitchConversation(object): - - def __init__(self, switch_mac, host_mac, ip_address): - - self.switch_mac = switch_mac - self.host_mac = host_mac - self.ip_address = ip_address - - self.sequence_id = random.randint(0, 1000) - - header = DEFAULT_HEADER.copy() - header.update({ - 'sequence_id': self.sequence_id, - 'host_mac': mac_to_bytes(host_mac), - 'switch_mac': mac_to_bytes(switch_mac), - }) - self.header = header - - # Sending socket - ss = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - ss.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - ss.bind((ip_address, UDP_RECEIVE_FROM_PORT)) - - # Receiving socket - rs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - rs.bind((BROADCAST_ADDR, UDP_RECEIVE_FROM_PORT)) - rs.settimeout(0.4) - - self.ss, self.rs = ss, rs - - def send(self, op_code, payload): - self.sequence_id = (self.sequence_id + 1) % 1000 - - header = self.header - header.update({ - 'sequence_id': self.sequence_id, - 'op_code': op_code, - }) - - logger.debug('Sending Header: ' + str(header)) - logger.debug('Sending Payload: ' + str(payload)) - packet = assemble_packet(header, payload) - packet = encode(packet) - - self.ss.sendto(packet, (BROADCAST_ADDR, UDP_SEND_TO_PORT)) - - def receive(self): - try: - fragment_offset = 1 - total_payload = b'' - while fragment_offset: - data, addr = self.rs.recvfrom(1500) - data = decode(data) - header, payload = split(data) - header, payload = interpret_header(header), interpret_payload(payload) - fragment_offset = header['fragment_offset'] - logger.debug('Received Header: ' + str(header)) - logger.debug('Received Payload: ' + str(payload)) - self.header['token_id'] = header['token_id'] - total_payload += payload - return header, payload - except: - print("no response") - raise ConnectionProblem() - return {}, {} - - def query(self, op_code, payload): - self.send(op_code, payload) - header, payload = self.receive() - sequence_kind = get_sequence_kind((op_code, header['op_code'])) - logger.debug('Sequence kind: ' + sequence_kind) - return header, payload - - def login(self, username, password): - self.query(GET, get_token_id_payload()) - self.query(LOGIN, login_payload(username, password)) - diff --git a/smrt/operations.py b/smrt/operations.py deleted file mode 100644 index 55529c1..0000000 --- a/smrt/operations.py +++ /dev/null @@ -1,13 +0,0 @@ - -from .protocol import * - -def query_port_mirror_payload(): - return { get_id('port_mirror'): b'' } - -def login_payload(username, password): - username = username.encode('utf-8') + b'\x00' - password = password.encode('utf-8') + b'\x00' - return { get_id('username'): username, get_id('password'): password } - -def get_token_id_payload(): - return { get_id('get_token_id'): b'' } diff --git a/smrt/protocol.py b/smrt/protocol.py deleted file mode 100644 index f38f405..0000000 --- a/smrt/protocol.py +++ /dev/null @@ -1,233 +0,0 @@ -""" -TP-Link TL-SG105E and TL-SG108E switces -UDP configuration protocol - -Wireshark filter: -ip.addr ==192.168.178.250 || ip.addr ==192.168.178.254 && udp.port == 29808 -""" - -import struct -from ipaddress import ip_address -from collections import OrderedDict - -HEADER_LEN = 32 -PACKET_END = b'\xff\xff\x00\x00' - -DEFAULT_HEADER = { - 'fragment_offset': 0, - 'sequence_id': 0, - 'checksum': 0, - 'switch_mac': b'\x00\x00\x00\x00\x00\x00', - 'check_length': 0, - 'token_id': 0, - 'op_code': 0, - 'error_code': 0, - 'host_mac': b'\x00\x00\x00\x00\x00\x00', - 'version': 1, - 'flag': 0 -} - -header_structure = { - 'fmt': '!bb6s6shihhhhi', - 'designators': [ - 'version', - 'op_code', - 'switch_mac', - 'host_mac', - 'sequence_id', - 'error_code', - 'check_length', - 'fragment_offset', - 'flag', - 'token_id', - 'checksum', - ] -} - -DISCOVERY = 0 -GET = 1 -SET = 2 -LOGIN = 3 -RETURN = 4 -READ5 = 5 - -op_codes = { - DISCOVERY: 'DISCOVERY', - GET: 'GET', - SET: 'SET', - LOGIN: 'LOGIN', - RETURN: 'RETURN', - READ5: 'READ5' -} - -sequences = { - # name ->switch switch-> - 'login/change': (LOGIN, RETURN), - 'query': (GET, SET), - 'discover': (DISCOVERY, SET), -} - -receive_ids = { - 1: ['*s', 'str', 'type'], - 2: ['*s', 'str', 'hostname'], - 3: ['6s', 'hex', 'mac'], - 4: ['4s', 'ip', 'ip_addr'], - 5: ['4s', 'ip', 'ip_mask'], - 6: ['4s', 'ip', 'gateway'], - 7: ['*s', 'str', 'firmware_version'], - 8: ['*s', 'str', 'hardware_version'], - 9: ['?', 'bool', 'dhcp'], - 10: ['b', 'dec', 'num_ports'], - 512: ['*s', 'str', 'username'], - 514: ['*s', 'str', 'password'], - 2304: ['a', 'action','save'], - 2305: ['a', 'action','get_token_id'], - 4352: ['?', 'bool', 'igmp_snooping'], - 4096: ['*s', 'hex', 'port_settings'], - 4608: ['5s', 'hex', 'port_trunk'], - 8192: ['2s', 'hex', 'mtu_vlan'], - 8704: ['?', 'hex', '802.1q vlan enabled'], - 8705: ['*s', 'hex', '802.1q vlan'], - 8706: ['*s', 'hex', '802.1q vlan pvid'], - 12288: ['?', 'bool', 'QoS Basic 1'], - 12289: ['2s', 'hex', 'QoS Basic 2'], - 16640: ['10s','hex', 'port_mirror'], - 16384: ['*s', 'hex', 'port_statistics'], - 17152: ['?', 'bool', 'loop_prevention'], -} - -def get_sequence_kind(sequence): - for key, value in sequences.items(): - if value == sequence: return key - return 'unknown' - -def get_id(name): - for key, value in receive_ids.items(): - if value[2] == name: return key - raise Exception() - -def hex_readable(bts): - return ':'.join(['{:02X}'.format(byte) for byte in bts]) - -def payload_str(payload): - ret = '' - if type(payload) == bytes: - items = interpret_payload(payload) - else: - items = payload - for item_id in items.keys(): - value = items[item_id] - try: - value = interpret_value(value, receive_ids[item_id][1]) - except: - pass - if item_id not in receive_ids: - ret += 'Unknown code: %s (content: %s)\n' % (item_id, value) - continue - struct_fmt = receive_ids[item_id][0] - kind = receive_ids[item_id][1] - name = receive_ids[item_id][2] - fmt = '{name}: {value} (id: {id}, kind: {kind})\n' - ret += fmt.format(name=name, value=value, id=item_id, kind=kind) - return ret - -def decode(data): - data = list(data) - key = [ 191, 155, 227, 202, 99, 162, 79, 104, 49, 18, 190, 164, 30, - 76, 189, 131, 23, 52, 86, 106, 207, 125, 126, 169, 196, 28, 172, 58, - 188, 132, 160, 3, 36, 120, 144, 168, 12, 231, 116, 44, 41, 97, 108, - 213, 42, 198, 32, 148, 218, 107, 247, 112, 204, 14, 66, 68, 91, 224, - 206, 235, 33, 130, 203, 178, 1, 134, 199, 78, 249, 123, 7, 145, 73, - 208, 209, 100, 74, 115, 72, 118, 8, 22, 243, 147, 64, 96, 5, 87, 60, - 113, 233, 152, 31, 219, 143, 174, 232, 153, 245, 158, 254, 70, 170, - 75, 77, 215, 211, 59, 71, 133, 214, 157, 151, 6, 46, 81, 94, 136, - 166, 210, 4, 43, 241, 29, 223, 176, 67, 63, 186, 137, 129, 40, 248, - 255, 55, 15, 62, 183, 222, 105, 236, 197, 127, 54, 179, 194, 229, - 185, 37, 90, 237, 184, 25, 156, 173, 26, 187, 220, 2, 225, 0, 240, - 50, 251, 212, 253, 167, 17, 193, 205, 177, 21, 181, 246, 82, 226, - 38, 101, 163, 182, 242, 92, 20, 11, 95, 13, 230, 16, 121, 124, 109, - 195, 117, 39, 98, 239, 84, 56, 139, 161, 47, 201, 51, 135, 250, 10, - 19, 150, 45, 111, 27, 24, 142, 80, 85, 83, 234, 138, 216, 57, 93, - 65, 154, 141, 122, 34, 140, 128, 238, 88, 89, 9, 146, 171, 149, 53, - 102, 61, 114, 69, 217, 175, 103, 228, 35, 180, 252, 200, 192, 165, - 159, 221, 244, 110, 119, 48] - length = len(data) - #s = [bytes([char]) for char in key] - s = key - i, j = 0, 0 - for k in range(length): - i = (k + 1) % 256; - j = (j + s[i]) % 256; - s[i], s[j] = s[j], s[i] - data[k] = data[k] ^ s[(s[i] + s[j]) % 256]; - return bytes(data) - -encode = decode - -def split(data): - assert len(data) >= HEADER_LEN + len(PACKET_END) - assert data.endswith(PACKET_END) - return (data[0:HEADER_LEN], data[HEADER_LEN:]) - -def interpret_header(header): - names = header_structure['designators'] - vals = struct.unpack(header_structure['fmt'], header) - return dict(zip(names, vals)) - -def interpret_payload(payload): - results = OrderedDict() - while len(payload) > len(PACKET_END): - dtype, dlen = struct.unpack('!hh', payload[0:4]) - data = payload[4:4+dlen] - payload = payload[4+dlen:] - results[dtype] = data - return results - -def assemble_packet(header, payload): - payload_bytes = b'' - for dtype, value in payload.items(): - payload_bytes += struct.pack('!hh', dtype, len(value)) - payload_bytes += value - header['check_length'] = HEADER_LEN + len(payload_bytes) + len(PACKET_END) - header = tuple(header[part] for part in header_structure['designators']) - header_bytes = struct.pack(header_structure['fmt'], *header) - return header_bytes + payload_bytes + PACKET_END - -def interpret_value(value, kind): - if kind == 'str': - value = value.split(b'\x00', 1)[0].decode('ascii') - elif kind == 'ip': - value = ip_address(value) - elif kind == 'hex': - value = ':'.join(['{:02X}'.format(byte) for byte in value]) - elif kind == 'action': - value = "n/a" - elif kind == 'dec': - value = int(''.join('%02X' % byte for byte in value), 16) - elif kind == 'bool': - if len(value) == 0: pass - elif len(value) == 1: value = value[0] > 0 - else: raise AssertionError('boolean should be one byte long') - return value - -def get_payload_item_context(items, name_key): - hits = [key for key in items.keys() if receive_ids[key][2] == name_key] - assert len(hits) == 1 - item_id = hits[0] - - kind = receive_ids[item_id][1] - raw_value = items[item_id] - value = interpret_value(raw_value, kind) - - return { - 'id': item_id, - 'struct_fmt': receive_ids[item_id][0], - 'kind': kind, - 'name': receive_ids[item_id][2], - 'value': value, - 'raw_value': raw_value, - } - -def get_payload_item_value(items, name_key): - context = get_payload_item_context(items, name_key) - return context['value'] diff --git a/smrt/smrt.py b/smrt/smrt.py deleted file mode 100755 index e797160..0000000 --- a/smrt/smrt.py +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/env python - -import socket, time, random, argparse, logging - -from .protocol import * -from .network import SwitchConversation -from .operations import * - -def loglevel(x): - try: - return getattr(logging, x.upper()) - except AttributeError: - raise argparse.ArgumentError('Select a proper loglevel') - -def main(): - logger = logging.getLogger(__name__) - parser = argparse.ArgumentParser() - parser.add_argument('--configfile', '-c') - parser.add_argument('--switch-mac', '-s') - parser.add_argument('--host-mac', ) - parser.add_argument('--ip-address', '-i') - parser.add_argument('--username', '-u') - parser.add_argument('--password', '-p') - parser.add_argument('--loglevel', '-l', type=loglevel, default='INFO') - parser.add_argument('action') - args = parser.parse_args() - - logging.basicConfig(level=args.loglevel) - - switch_mac = args.switch_mac - host_mac = args.host_mac - ip_address = args.ip_address - - sc = SwitchConversation(switch_mac, host_mac, ip_address) - - sc.login(args.username, args.password) - - if args.action == 'query_port_mirror': - header, payload = sc.query(GET, query_port_mirror_payload()) - print(payload[16640]) - #elif args.action == 'status': - # header, payload = sc.query(GET, query_port_mirror_payload()) - -if __name__ == "__main__": - main() diff --git a/tp_analyse.py b/tp_analyse.py new file mode 100755 index 0000000..2d09406 --- /dev/null +++ b/tp_analyse.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python + +################################################################################################ +# Listen to bidirectional network communication (to and from TP-Link switches / easy switch tool +# Analyze the network communications + +import socket, time, random, argparse, logging +from protocol import Protocol +from network import Network + +def show_packet(payload): + print(payload) + data = Protocol.decode(payload) + print("TEST: " + data.hex(" ")) + try: + print(Protocol.analyze(data)[1]) + except AssertionError: + return + except KeyError: + return + +def main(): + + # Handle command-line parameters + parser = argparse.ArgumentParser() + parser.add_argument('--interface', '-i') + args = parser.parse_args() + + # Instantiate network driver + net = None + try: + net = Network(args.interface, "fe:ff:ff:ff:ff:ff") + except InterfaceProblem as e: + print("Error:", e) + + while True: + data = net.receive_socket(net.rs) + if data: + show_packet(data) + data = net.receive_socket(net.ss) + if data: + show_packet(data) + + time.sleep(0.1) + +if __name__ == '__main__': + main() +