diff --git a/Python/CCcamTester.py b/Python/CCcamTester.py deleted file mode 100644 index b545f0e..0000000 --- a/Python/CCcamTester.py +++ /dev/null @@ -1,132 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -#Created by Dagger -- https://github.com/DaggerES - -import CriptoBlock - -recvblock = CriptoBlock.CryptographicBlock() -sendblock = CriptoBlock.CryptographicBlock() - -def TestCline(cline): - import socket, re, sys, array, time, select - - returnValue = False - regExpr = re.compile('[C]:\s*(\S+)+\s+(\d*)\s+(\S+)\s+([\w.-]+)') - match = regExpr.search(cline) - - if match is None: - return False; - - testSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP) - testSocket.settimeout(30) #timeout of 30 seconds - - host = match.group(1) - port = int(match.group(2)) - username = match.group(3) - password = match.group(4) - - try: - ip = socket.gethostbyname(host) - testSocket.connect((ip, port)) - - DoHanshake(testSocket) #Do handshake with the server - - try: - userArray = GetPaddedUsername(username) - sendcount = SendMessage(userArray, len(userArray), testSocket) #Send the username - - passwordArray = GetPaddedPassword(password) - sendblock.Encrypt(passwordArray, len(passwordArray)) #We encript the password - - #But we send "CCCam" with the password encripted CriptoBlock - cccamArray = GetCcam() - sendcount = SendMessage(cccamArray, len(cccamArray), testSocket) - - receivedBytes = bytearray(20) - recvCount = testSocket.recv_into(receivedBytes, 20) - - if recvCount > 0: - recvblock.Decrypt(receivedBytes, 20) - if (receivedBytes.decode("ascii").rstrip('\0') == "CCcam"): - print "SUCCESS! working cline: " + cline - returnValue = True - else: - print "Wrong ACK received!" - returnValue = False - else: - print "Bad username/password for cline: " + cline - returnValue = False - - except: - print "Bad username/password for cline: " + cline - returnValue = False - except: - print "Error while connecting to cline: " + cline - - testSocket.close() - return returnValue - -def GetPaddedUsername(userName): - import array - - #We create an array of 20 bytes with the username in it as bytes and padded with 0 behind - #Like: [23,33,64,13,0,0,0,0,0,0,0...] - userBytes = array.array("B", userName) - userByteArray = FillArray(bytearray(20), userBytes) - - return userByteArray - -def GetCcam(): - import array - - #We create an array of 6 bytes with the "CCcam\0" in it as bytes - cccamBytes = array.array("B", "CCcam") - cccamByteArray = FillArray(bytearray(6), cccamBytes) - return cccamByteArray - -def GetPaddedPassword(password): - import array - - #We create an array of with the password in it as bytes - #Like: [23,33,64,13,48,78,45] - passwordBytes = array.array("B", password) - passwordByteArray = FillArray(bytearray(len(password)),passwordBytes) - - return passwordByteArray - -def DoHanshake(socket): - import hashlib, array, CriptoBlock - - random = bytearray(16) - socket.recv_into(random, 16) #Receive first 16 "Hello" random bytes - print "Hello bytes: " + random - - random = CriptoBlock.Xor(random); #Do a Xor with "CCcam" string to the hello bytes - - sha1 = hashlib.sha1() - sha1.update(random) - sha1digest = array.array('B', sha1.digest()) #Create a sha1 hash with the xor hello bytes - sha1hash = FillArray(bytearray(20), sha1digest) - - recvblock.Init(sha1hash, 20) #initialize the receive handler - recvblock.Decrypt(random, 16) - - sendblock.Init(random, 16) #initialize the send handler - sendblock.Decrypt(sha1hash, 20) - - rcount = SendMessage(sha1hash, 20, socket) #Send the a crypted sha1hash! - -def SendMessage(data, len, socket): - buffer = FillArray(bytearray(len), data) - sendblock.Encrypt(buffer, len) - rcount = socket.send(buffer) - return rcount - -def FillArray(array, source): - if len(source) <= len(array): - for i in range(0, len(source)): - array[i] = source[i] - else: - for i in range(0, len(array)): - array[i] = source[i] - return array diff --git a/Python/CLineTester.py b/Python/CLineTester.py new file mode 100644 index 0000000..52cafc2 --- /dev/null +++ b/Python/CLineTester.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +#Created by Dagger -- https://github.com/DaggerES + +import CriptoBlock +import socket, re, array, time + +regExpr = re.compile('[C]:\s*(\S+)+\s+(\d*)\s+(\S+)\s+([\w.-]+)') + +recvblock = CriptoBlock.CryptographicBlock() +sendblock = CriptoBlock.CryptographicBlock() + +class CLineTester(object): + def __init__(self, cline): + super(self.__class__, self).__init__() + self._success = False + self._timeout = 30 + match = regExpr.search(cline) + if match is None: + self._success = False + raise Exception("No valid CLine") + self._cline = cline + self._readMatch(*match.groups()) + self._ip = socket.gethostbyname(self._host) + self._heloBytes = None + self._ping = None + + def _readMatch(self, host, port, username, password): + self._host = str(host) + self._port = int(port) + self._username = str(username) + self._password = str(password) + + @property + def Timeout(self): + return self._timeout + @Timeout.setter + def Timeout(self, value): + if type(value) is not int: + raise Exception("Timeout in seconds needs to be an int.") + self._timeout = value + @property + def Success(self): + return self._success + @property + def CLine(self): + return self._cline + @property + def CLineIP(self): + return "C: %s %s %s %s" % (self.Ip, self.Port, self.Username, self.Password) + @property + def Ip(self): + return self._ip + @property + def Host(self): + return self._host + @property + def Port(self): + return self._port + @property + def Username(self): + return self._username + @property + def Password(self): + return self._password + @property + def HeloBytes(self): + return self._heloBytes + @property + def Ping(self): + return self._ping + + def Test(self): + self._success = False + startTime = time.time() + try: + testSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP) + testSocket.settimeout(self._timeout) + testSocket.connect((self.Ip, self.Port)) + self._doHanshake(testSocket) #Do handshake with the server + + try: + sendcount = self._sendMessage(self._getPaddedUsername(), len(self._getPaddedUsername()), testSocket) #Send the username + sendblock.Encrypt(self._getPaddedPassword(), len(self._getPaddedPassword())) #We encript the password + #But we send "CCCam" with the password encripted CriptoBlock + cccamArray = self._getCcam() + sendcount = self._sendMessage(cccamArray, len(cccamArray), testSocket) + receivedBytes = bytearray(20) + recvCount = testSocket.recv_into(receivedBytes, 20) + + if recvCount > 0: + recvblock.Decrypt(receivedBytes, 20) + if (receivedBytes.decode("ascii").rstrip('\0') == "CCcam"): + self._success = True + else: + raise Exception("Wrong ACK received") + else: + raise Exception("Bad username/password") + except: + raise + except: + raise + finally: + self._ping = time.time() - startTime + testSocket.close() + return self.Success + + def _getPaddedUsername(self): + if not hasattr(self, '_paddedUsername'): + #We create an array of 20 bytes with the username in it as bytes and padded with 0 behind + #Like: [23,33,64,13,0,0,0,0,0,0,0...] + userBytes = array.array("B", self.Username) + userByteArray = self._fillArray(bytearray(20), userBytes) + self._paddedUsername = userByteArray + return self._paddedUsername + + def _getPaddedPassword(self): + if not hasattr(self, '_paddedPassword'): + # We create an array of with the password in it as bytes + # Like: [23,33,64,13,48,78,45] + passwordBytes = array.array("B", self.Password) + passwordByteArray = self._fillArray(bytearray(len(self.Password)), passwordBytes) + self._paddedPassword = passwordByteArray + return self._paddedPassword + + def _getCcam(self): + #We create an array of 6 bytes with the "CCcam\0" in it as bytes + cccamBytes = array.array("B", "CCcam") + cccamByteArray = self._fillArray(bytearray(6), cccamBytes) + return cccamByteArray + + def _doHanshake(self, socket): + import hashlib, array, CriptoBlock + + random = bytearray(16) + socket.recv_into(random, 16) #Receive first 16 "Hello" random bytes + self._heloBytes = random + + random = CriptoBlock.Xor(random); #Do a Xor with "CCcam" string to the hello bytes + + sha1 = hashlib.sha1() + sha1.update(random) + sha1digest = array.array('B', sha1.digest()) #Create a sha1 hash with the xor hello bytes + sha1hash = self._fillArray(bytearray(20), sha1digest) + + recvblock.Init(sha1hash, 20) #initialize the receive handler + recvblock.Decrypt(random, 16) + + sendblock.Init(random, 16) #initialize the send handler + sendblock.Decrypt(sha1hash, 20) + + rcount = self._sendMessage(sha1hash, 20, socket) #Send the a crypted sha1hash! + + def _sendMessage(self, data, len, socket): + buffer = self._fillArray(bytearray(len), data) + sendblock.Encrypt(buffer, len) + rcount = socket.send(buffer) + return rcount + + def _fillArray(self, array, source): + if len(source) <= len(array): + for i in range(0, len(source)): + array[i] = source[i] + else: + for i in range(0, len(array)): + array[i] = source[i] + return array diff --git a/Python/Main.py b/Python/Main.py index 0186c92..6d16f51 100644 --- a/Python/Main.py +++ b/Python/Main.py @@ -2,6 +2,16 @@ # -*- coding: utf-8 -*- #Created by Dagger -- https://github.com/DaggerES +from CLineTester import CLineTester + if __name__ == "__main__": - import CCcamTester - CCcamTester.TestCline("C: jjjjj.jjjj.com 22000 uuuu pppp") + cline = "C: jjjjj.jjjj.com 22000 uuuu pppp" + try: + cl = CLineTester(cline) + cl.Timeout = 10 # wait 10 seconds only for the request to return (default: 30) + print "IP of CLine is '%s'." % (cl.Ip,) + if cl.Test(): + # show cline with IP on success + print "CLine '%s' is OK. Ping is %ss" % (cl.CLineIP, cl.Ping,) + except Exception as e: + print "CLine '%s' failed: %s" % (cline, e,) diff --git a/Python/README.md b/Python/README.md new file mode 100644 index 0000000..68e1082 --- /dev/null +++ b/Python/README.md @@ -0,0 +1,24 @@ +# CLine tester scripts +*Also called Cline, CCcline, CCcam and Camd* + +Scripts for testing that a Cline works or not. This is the python part altered to my needs from +**https://github.com/DaggerES/CLineTester**. + +## Usage + +Usage has changed from forked project, +see Main.py + +## Changes + +* __init__.py in the folder, as I use it as a package +* Let Exceptions propagate +* Encapsulate all in a class, use Properties for result +* Get IP before connecting to CLine, testing is done when calling Test() +* Timeout is configurable by setting Timeout property (default 30s) +* Measure Ping time +* Some quick cleanups + +## Thanks + +Thanks go to DaggerES, which released the original version. diff --git a/Python/__init__.py b/Python/__init__.py new file mode 100644 index 0000000..e69de29