Skip to content

Instantly share code, notes, and snippets.

@Viq111
Created October 26, 2016 21:16
Show Gist options
  • Save Viq111/68a5886a845ac06aa956b78d1e00ef83 to your computer and use it in GitHub Desktop.
Save Viq111/68a5886a845ac06aa956b78d1e00ef83 to your computer and use it in GitHub Desktop.
Network tester
import random
import socket
import string
import struct
import sys
import time
START_SIZE = 1
MULTIPLIER = 2
MAX_SIZE = 10 * 1024 * 1024 # 10 Mo
PORT = 80
def random_string(size):
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size))
def speed(size, t):
" Display speed in KB/s for size and time taken"
s = 1.0 * size / t / 1024
return "%.1f KB/s" % s
def recv_all(s, size):
"Receive data until all"
b = ""
while len(b) < size:
data = s.recv(size - len(b))
if len(data) == 0:
sys.exit()
b += data
return b
if __name__ == '__main__':
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
addr = raw_input("Enter server address:")
s.connect((addr, PORT))
current_size = START_SIZE
while True:
data = random_string(current_size)
s.sendall(struct.pack(">I", current_size))
start = time.time()
#time.sleep(0.1)
s.sendall(data)
upload_finished = time.time()
data_r = recv_all(s, current_size)
download_finished = time.time()
if data_r == data:
status = "OK"
else:
status = "ERROR"
print "Sent {} bytes: {}, down: {}, up: {}".format(current_size, status, speed(current_size, download_finished - upload_finished), speed(current_size, upload_finished - start))
time.sleep(0.1)
current_size = int(current_size * MULTIPLIER)
if current_size > MAX_SIZE:
current_size = MAX_SIZE
import socket
import struct
import sys
import time
BIND_ADDR = ("", 80)
def recv_all(s, size):
"Receive until size has been reached"
b = ""
while len(b) < size:
data = s.recv(size - len(b))
if len(data) == 0:
sys.exit()
b += data
return b
if __name__ == '__main__':
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 80))
s.listen(1)
while True:
print "Waiting for 1 connection on {}...".format(":".join([str(i) for i in BIND_ADDR]))
c, a = s.accept()
print "Got a connection from {}, waiting for first payload".format(":".join([str(i) for i in a]))
try:
while True: # Forever wait
# Get the size he is going to send
size = struct.unpack('>I', recv_all(c, 4))[0]
if size == "": # Go to next connection
break
size = int(size)
print "Ready to receive {} bytes of data...".format(size),
b = recv_all(c, size)
print "received,",
# Send it back
#time.sleep(0.1)
c.sendall(b)
print "sent."
except Exception as e:
print "Finished thread with", e
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment