Created
October 26, 2016 21:16
-
-
Save Viq111/68a5886a845ac06aa956b78d1e00ef83 to your computer and use it in GitHub Desktop.
Network tester
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import socket | |
import string | |
import struct | |
import sys | |
import time | |
START_SIZE = 1 | |
MULTIPLIER = 2 | |
MAX_SIZE = 10 * 1024 * 1024 # 10 Mo | |
PORT = 80 | |
def random_string(size): | |
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size)) | |
def speed(size, t): | |
" Display speed in KB/s for size and time taken" | |
s = 1.0 * size / t / 1024 | |
return "%.1f KB/s" % s | |
def recv_all(s, size): | |
"Receive data until all" | |
b = "" | |
while len(b) < size: | |
data = s.recv(size - len(b)) | |
if len(data) == 0: | |
sys.exit() | |
b += data | |
return b | |
if __name__ == '__main__': | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
addr = raw_input("Enter server address:") | |
s.connect((addr, PORT)) | |
current_size = START_SIZE | |
while True: | |
data = random_string(current_size) | |
s.sendall(struct.pack(">I", current_size)) | |
start = time.time() | |
#time.sleep(0.1) | |
s.sendall(data) | |
upload_finished = time.time() | |
data_r = recv_all(s, current_size) | |
download_finished = time.time() | |
if data_r == data: | |
status = "OK" | |
else: | |
status = "ERROR" | |
print "Sent {} bytes: {}, down: {}, up: {}".format(current_size, status, speed(current_size, download_finished - upload_finished), speed(current_size, upload_finished - start)) | |
time.sleep(0.1) | |
current_size = int(current_size * MULTIPLIER) | |
if current_size > MAX_SIZE: | |
current_size = MAX_SIZE |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import struct | |
import sys | |
import time | |
BIND_ADDR = ("", 80) | |
def recv_all(s, size): | |
"Receive until size has been reached" | |
b = "" | |
while len(b) < size: | |
data = s.recv(size - len(b)) | |
if len(data) == 0: | |
sys.exit() | |
b += data | |
return b | |
if __name__ == '__main__': | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.bind(("", 80)) | |
s.listen(1) | |
while True: | |
print "Waiting for 1 connection on {}...".format(":".join([str(i) for i in BIND_ADDR])) | |
c, a = s.accept() | |
print "Got a connection from {}, waiting for first payload".format(":".join([str(i) for i in a])) | |
try: | |
while True: # Forever wait | |
# Get the size he is going to send | |
size = struct.unpack('>I', recv_all(c, 4))[0] | |
if size == "": # Go to next connection | |
break | |
size = int(size) | |
print "Ready to receive {} bytes of data...".format(size), | |
b = recv_all(c, size) | |
print "received,", | |
# Send it back | |
#time.sleep(0.1) | |
c.sendall(b) | |
print "sent." | |
except Exception as e: | |
print "Finished thread with", e |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment