-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSpeedTestClient.py
More file actions
172 lines (138 loc) · 6.28 KB
/
SpeedTestClient.py
File metadata and controls
172 lines (138 loc) · 6.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import threading
import socket
import struct
import time
from constants import *
SIZE_ = struct.calcsize('!IBQQ')
class SpeedTestClient(object):
def __init__(self, available_tcp_connections: int, available_udp_connections: int,
file_size: int, initial_port: int):
self.available_tcp_connections = available_tcp_connections
self.available_udp_connections = available_udp_connections
self.threads = []
self.file_size = file_size
self.initial_port = initial_port
return
def __call__(self, *args, **kwargs):
self.initial_port += 1
self.search_offers(self.initial_port - 1)
def search_offers(self, listen_port: int):
"""Listen for UDP broadcast offers and parse them."""
try:
offer_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
offer_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
offer_socket.bind(('', listen_port))
while True:
data, address = offer_socket.recvfrom(BUFFER_SIZE)
# The received data is def not an offer
if len(data) != 9:
continue
# Use struct to parse the data
data = struct.unpack('!IBHH', data)
# Check that the offer is valid:
if data[0] != COOKIE or data[1] != OFFER_MSG:
continue
# Handle TCP, UDP:
self.handle_tcp(address, data[3], self.initial_port)
self.handle_udp(address, data[2], self.initial_port + self.available_tcp_connections)
# Join all the threads, waiting for data to be received:
for thread in self.threads:
thread.join()
# Continue...
print('All transfers complete, listening to offer requests')
except Exception as e:
print(f"Failed while searching for offers: \n{e}")
return
def handle_tcp(self, address, host_port, init_port):
for i in range(self.available_tcp_connections):
# Create the threads and start them:
thread = threading.Thread(target=self.tcp_conn, name=f'tcp_conn{i}',
args=(address, host_port, init_port + i))
thread.start()
# So we could join them later.
self.threads.append(thread)
def tcp_conn(self, address, host_port, sock_port):
# Helper variables
con_num = sock_port - self.initial_port + 1
# Create the socket:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# sock.bind(('', sock_port))
# Connect and send request to host:
connect_address = (address[0], host_port)
print(f'{connect_address}')
sock.connect(connect_address)
sock.send(bytes(f'{self.file_size}\n', "utf-8"))
# Count time:
start_time = time.perf_counter()
# Wait for response:
buf_size = max(self.file_size, BUFFER_SIZE)
data = sock.recv(buf_size)
data = data.decode("utf-8").strip()
# print statistics:
total_time = float(time.perf_counter() - start_time)
speed = float(self.file_size) / total_time
print(f'TCP Connection #{con_num} finished, total time: {total_time: .4f}, '
f'seconds, total speed: {speed: .4f} bps')
sock.close()
# Sanity check:
if len(data) != self.file_size:
print(f'The file received in {threading.current_thread().name} was smaller than requested')
def handle_udp(self, address, port, init_port):
for i in range(self.available_udp_connections):
thread = threading.Thread(
target=self.udp_conn,
name=f'udp_conn{i}',
args=(address, port, init_port + i)
)
thread.start()
self.threads.append(thread)
def udp_conn(self, address, port, sock_port):
# Helper variables
con_num = sock_port - self.initial_port - self.available_tcp_connections + 1
recv = 0
smax = 0
# Sending the request
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.bind(('', sock_port))
request = struct.pack('!IBQ', COOKIE, REQUEST_MSG, self.file_size)
client_socket.sendto(request, address)
client_socket.settimeout(BUFFER_SIZE // 200) # for buff size 1024 we wait max of 5 seconds
# Count the number of segments received, max number of segments possible and the time we started
start_time = time.perf_counter()
# Start listening:
while True:
# Listen for incoming requests:
try:
data, _ = client_socket.recvfrom(int(BUFFER_SIZE * 1.1)) # just to be safe.
except socket.timeout:
# if timeout had been reached
break
if len(data) < SIZE_:
break # not enough data
# Unpack data:
data = data[:SIZE_]
data = struct.unpack('!IBQQ', data)
if data[0] != COOKIE or data[1] != RESPONSE_MSG:
continue # not a valid message
# Update the smax
if smax == 0:
smax = float(data[2])
recv += 1
if smax == data[3]:
break
if smax == 0:
smax = float('inf')
elapsed = time.perf_counter() - start_time
speed = (float(recv) * float(self.file_size) / smax) / elapsed
percentage = min((float(recv) / smax) * 100, 100.0)
client_socket.close()
print(f'UDP transfer #{con_num} finished, total time: {elapsed: .4f} seconds, '
f'total speed: {speed: .4f} bps, percentage of packets received successfully: {percentage: .2f}%')
if __name__ == '__main__':
tcp_conns = int(input('How many TCP Connections would you like have?: '))
udp_conns = int(input('How many UDP Connections would you like have?: '))
init_port = int(input('The initial port which will be given to each connection in sequence: '))
file_size = int(input('What is the file size you\'d like to receive?: '))
print('Starting client...')
client = SpeedTestClient(tcp_conns, udp_conns, file_size, init_port)
client()