Created
October 24, 2019 12:05
-
-
Save oktal/64901677ae3820e7b224bf9e196c63d1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <getopt.h> | |
#include <unistd.h> | |
#include <time.h> | |
#include <errno.h> | |
#include <fcntl.h> | |
#include <sys/types.h> | |
#include <sys/socket.h> | |
#include <sys/uio.h> | |
#include <netinet/in.h> | |
#include <netinet/tcp.h> | |
#include <arpa/inet.h> | |
#if defined(WITH_URING) | |
#include "liburing.h" | |
#endif | |
#define MAX_CLIENTS 64 | |
typedef struct program_options | |
{ | |
int port; | |
int clients_count; | |
int buffer_size; | |
int no_delay; | |
int cork; | |
int total_messages; | |
int batch_size; | |
int message_rate; | |
int sleep_ms; | |
#if defined(WITH_URING) | |
int enable_uring; | |
int setup_sqpoll; | |
int sq_thread_cpu; | |
#endif | |
} program_options; | |
typedef struct client_data | |
{ | |
int sockfd; | |
int regfd; | |
struct sockaddr_in addr; | |
struct iovec iov; | |
struct msghdr msg; | |
int is_corked; | |
} client_data; | |
typedef struct server | |
{ | |
int sockfd; | |
size_t n_clients; | |
size_t clients_count; | |
client_data *clients[MAX_CLIENTS]; | |
#if defined(WITH_URING) | |
int enable_uring; | |
struct io_uring ring; | |
#endif | |
} server; | |
typedef struct summary | |
{ | |
long *latency_ns; | |
size_t n_latency; | |
size_t index; | |
} summary; | |
typedef struct stats | |
{ | |
summary* send_latency; | |
summary* batch_latency; | |
} stats; | |
#define BEGIN_TIMED(var) \ | |
do { \ | |
struct timespec _timed_time_begin, _timed_time_end; \ | |
summary* _timed_summary = var; \ | |
clock_gettime(CLOCK_MONOTONIC, &_timed_time_begin); \ | |
#define END_TIMED() \ | |
clock_gettime(CLOCK_MONOTONIC, &_timed_time_end); \ | |
struct timespec _timed_elapsed = time_diff(_timed_time_begin, _timed_time_end); \ | |
summary_push(_timed_summary, _timed_elapsed.tv_nsec); \ | |
} while (0); | |
int set_blocking(int sockfd, int value) | |
{ | |
int flags = fcntl(sockfd, F_GETFL, 0); | |
if (flags == -1) | |
return flags; | |
flags = value ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK); | |
return fcntl(sockfd, F_SETFL, flags); | |
} | |
int set_no_delay(int sockfd, int value) | |
{ | |
return setsockopt(sockfd, SOL_TCP, TCP_NODELAY, (void *) &value, sizeof(int)); | |
} | |
int set_cork(int sockfd, int value) | |
{ | |
return setsockopt(sockfd, SOL_TCP, TCP_CORK, &value, sizeof(value)); | |
} | |
typedef enum options_status | |
{ | |
options_error = -1, | |
options_ok = 0, | |
} options_status; | |
options_status validate_options(const program_options* options) | |
{ | |
if (options->port < 1024 || options->port > 65536) | |
{ | |
fprintf(stderr, "You must specify a valid port (--port). Value is %d\n", options->port); | |
return options_error; | |
} | |
if (options->clients_count < 0) | |
{ | |
fprintf(stderr, "You must specify a number of clients to wait before starting sending packets (--clients-count). Value is %d\n", | |
options->clients_count); | |
return options_error; | |
} | |
if (options->buffer_size < 0) | |
{ | |
fprintf(stderr, "You must specify a valid buffer size (--buffer-size). Value is %d\n", options->buffer_size); | |
return options_error; | |
} | |
if (options->total_messages < 0) | |
{ | |
fprintf(stderr, "You must specify a total number of messages to send (--total-messages). Value is %d\n", options->total_messages); | |
return options_error; | |
} | |
if (options->batch_size > 0 && options->message_rate > 0) | |
{ | |
fprintf(stderr, "You must specify either batch_size or message_rate, not both\n"); | |
return options_error; | |
} | |
return options_ok; | |
} | |
int parse_program_socket_options(const char* val, program_options* options) | |
{ | |
char c; | |
while ((c = *val++) != '\0') | |
{ | |
switch (c) | |
{ | |
case 'n': | |
options->no_delay = 1; | |
break; | |
case 'c': | |
options->cork = 1; | |
break; | |
} | |
} | |
return 0; | |
} | |
program_options parse_options(int argc, char* argv[]) | |
{ | |
program_options options; | |
options.port = -1; | |
options.clients_count = -1; | |
options.buffer_size = -1; | |
options.no_delay = 0; | |
options.cork = 0; | |
options.message_rate = -1; | |
options.total_messages = -1; | |
options.batch_size = -1; | |
options.sleep_ms = -1; | |
#if defined(WITH_URING) | |
options.enable_uring = 0; | |
options.setup_sqpoll = 0; | |
options.sq_thread_cpu = -1; | |
#endif | |
int c = 0; | |
struct option long_options[] = { | |
{ "sockopt", required_argument, 0, 'o' }, | |
{ "port", required_argument, 0, 'p' }, | |
{ "clients-count", required_argument, 0, 'c' }, | |
{ "buffer-size", required_argument, 0, 'b' }, | |
{ "total-messages", required_argument, 0, 't' }, | |
{ "batch-size", required_argument, 0, 'd' }, | |
{ "sleep-ms", required_argument, 0, 'm' }, | |
{ "message-rate", required_argument, 0, 'r' }, | |
#if defined(WITH_URING) | |
{ "uring", no_argument, 0, 'u' }, | |
{ "sq-poll", no_argument, 0, 's' }, | |
{ "sq-thread-affinity", required_argument, 0, 'a' }, | |
#endif | |
{ 0, 0, 0, 0} | |
}; | |
#if defined(WITH_URING) | |
const char* const short_options = "npb:c:d:m:t:"; | |
#else | |
const char* const short_options = "npusa:b:c:d:m:t:"; | |
#endif | |
while ((c = getopt_long(argc, argv, short_options, long_options, 0)) != -1) | |
{ | |
switch (c) | |
{ | |
case 'o': | |
parse_program_socket_options(optarg, &options); | |
break; | |
case 'p': | |
options.port = atoi(optarg); | |
break; | |
case 'b': | |
options.buffer_size = atoi(optarg); | |
break; | |
case 'c': | |
options.clients_count = atoi(optarg); | |
break; | |
case 'd': | |
options.batch_size = atoi(optarg); | |
break; | |
case 'm': | |
options.sleep_ms = atoi(optarg); | |
break; | |
case 'r': | |
options.message_rate = atoi(optarg); | |
break; | |
case 't': | |
options.total_messages = atoi(optarg); | |
break; | |
#if defined(WITH_URING) | |
case 'u': | |
options.enable_uring = 1; | |
break; | |
case 's': | |
options.setup_sqpoll = 1; | |
break; | |
case 'a': | |
options.sq_thread_cpu = atoi(optarg); | |
break; | |
#endif | |
} | |
} | |
return options; | |
} | |
void options_print(const program_options* options) | |
{ | |
puts("TCP Options"); | |
puts("---------------"); | |
printf("Port: %d\n", options->port); | |
printf("Clients count: %d\n", options->clients_count); | |
printf("No_delay: %d\n", options->no_delay); | |
printf("Cork: %d\n", options->cork); | |
putchar('\n'); | |
puts("Message Options"); | |
puts("---------------"); | |
printf("Buffer size: %d bytes\n", options->buffer_size); | |
printf("Batch size: %d\n", options->batch_size); | |
printf("Message rate: %d QPS\n", options->message_rate); | |
printf("Total messages: %d\n", options->total_messages); | |
printf("Sleep time: %dms\n", options->sleep_ms); | |
putchar('\n'); | |
#if defined(WITH_URING) | |
putchar('\n'); | |
puts("io_uring Options"); | |
puts("----------------"); | |
printf("Enable uring: %d\n", options->enable_uring); | |
printf("Setup sqpoll: %d\n", options->setup_sqpoll); | |
printf("sq_thread_cpu: %d\n", options->sq_thread_cpu); | |
putchar('\n'); | |
#endif | |
} | |
#if defined(WITH_URING) | |
struct io_uring_params get_uring_params(const program_options* options) | |
{ | |
struct io_uring_params p; | |
memset(&p, 0, sizeof(p)); | |
if (options->setup_sqpoll) | |
{ | |
p.flags |= IORING_SETUP_SQPOLL; | |
} | |
if (options->sq_thread_cpu != -1) | |
{ | |
p.flags |= IORING_SETUP_SQ_AFF; | |
p.sq_thread_cpu = options->sq_thread_cpu; | |
} | |
return p; | |
} | |
#endif | |
#if defined(WITH_URING) | |
int init_uring(const program_options* options, unsigned entries, struct io_uring* ring) | |
{ | |
struct io_uring_params p = get_uring_params(options); | |
int fd, ret; | |
fd = io_uring_setup(entries, &p); | |
if (fd < 0) | |
return -errno; | |
ret = io_uring_queue_mmap(fd, &p, ring); | |
if (ret) | |
close(fd); | |
return ret; | |
} | |
#endif | |
#if defined(WITH_URING) | |
int register_uring_files(server* srv) | |
{ | |
if (!srv->enable_uring) | |
return 0; | |
int* fds = malloc(srv->n_clients * sizeof(*fds)); | |
if (!fds) | |
return -1; | |
for (size_t i = 0; i < srv->n_clients; ++i) | |
{ | |
client_data* client = srv->clients[i]; | |
fds[i] = client->sockfd; | |
client->regfd = i; | |
} | |
int ret = io_uring_register_files(&srv->ring, fds, srv->n_clients); | |
if (ret < 0) | |
perror("io_uring_register_files"); | |
return ret; | |
} | |
#endif | |
server *server_new(const program_options* options) | |
{ | |
server* srv = calloc(1, sizeof *srv); | |
if (!srv) | |
return NULL; | |
int sockfd = socket(AF_INET, SOCK_STREAM, 0); | |
if (sockfd < 0) | |
{ | |
perror("socket"); | |
return NULL; | |
} | |
int optval = 1; | |
int ret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval); | |
if (ret < 0) | |
{ | |
perror("setsockopt"); | |
return NULL; | |
} | |
#if defined(WITH_URING) | |
if (options->enable_uring) | |
{ | |
int ret = init_uring(options, 1024, &srv->ring); | |
if (ret < 0) | |
{ | |
perror("init_uring"); | |
return NULL; | |
} | |
srv->enable_uring = 1; | |
} | |
#endif | |
srv->sockfd = sockfd; | |
srv->clients_count = options->clients_count; | |
return srv; | |
} | |
int server_bind(server* srv, uint16_t port) | |
{ | |
struct sockaddr_in addr; | |
memset(&addr, 0, sizeof(addr)); | |
addr.sin_family = AF_INET; | |
addr.sin_addr.s_addr = htonl(INADDR_ANY); | |
addr.sin_port = htons(port); | |
return bind(srv->sockfd, (struct sockaddr *)&addr, sizeof(addr)); | |
} | |
int server_listen(server* srv, int backlog) | |
{ | |
return listen(srv->sockfd, backlog); | |
} | |
server* server_start(const program_options* options) | |
{ | |
enum { Backlog = 128 }; | |
server* srv = server_new(options); | |
if (!srv) | |
goto fail; | |
int ret = server_bind(srv, options->port); | |
if (ret < 0) | |
goto fail; | |
ret = server_listen(srv, Backlog); | |
if (ret < 0) | |
goto fail; | |
return srv; | |
fail: | |
perror("server_start"); | |
free(srv); | |
return NULL; | |
} | |
void server_stop(server* srv) | |
{ | |
close(srv->sockfd); | |
#if defined(WITH_URING) | |
if (srv->enable_uring) | |
io_uring_queue_exit(&srv->ring); | |
#endif | |
} | |
client_data* client_new(struct sockaddr_in addr, int sockfd) | |
{ | |
client_data* client = calloc(1, sizeof *client); | |
if (!client) | |
return NULL; | |
client->addr = addr; | |
client->sockfd = sockfd; | |
return client; | |
} | |
summary* summary_new(size_t total_messages) | |
{ | |
summary* summary = malloc(sizeof *summary); | |
if (!summary) | |
return NULL; | |
long* latency_ns = malloc(total_messages * sizeof(long)); | |
if (latency_ns == NULL) | |
return NULL; | |
summary->latency_ns = latency_ns; | |
summary->n_latency = total_messages; | |
summary->index = 0; | |
return summary; | |
} | |
void summary_dump(const summary* summary) | |
{ | |
for (size_t i = 0; i < summary->n_latency; ++i) | |
{ | |
printf(" [%ld] = %ld ", i, summary->latency_ns[i]); | |
} | |
printf("\n"); | |
} | |
void summary_push(summary* summary, long latency_ns) | |
{ | |
summary->latency_ns[summary->index] = latency_ns; | |
summary->index = (summary->index + 1) % summary->n_latency; | |
} | |
struct timespec time_diff(struct timespec start, struct timespec end) | |
{ | |
struct timespec diff; | |
if ((end.tv_nsec - start.tv_nsec) < 0) | |
{ | |
diff.tv_sec = end.tv_sec - start.tv_sec - 1; | |
diff.tv_nsec = 1000000000 + end.tv_nsec - start.tv_nsec; | |
} | |
else | |
{ | |
diff.tv_sec = end.tv_sec - start.tv_sec; | |
diff.tv_nsec = end.tv_nsec - start.tv_nsec; | |
} | |
return diff; | |
} | |
void do_nanosleep(int sleep_ns) | |
{ | |
struct timespec time_sleep; | |
time_sleep.tv_sec = 0; | |
time_sleep.tv_nsec = sleep_ns; | |
nanosleep(&time_sleep, NULL); | |
} | |
static int latency_less(const void* l1, const void *l2) | |
{ | |
int v1 = *(long *)l1; | |
int v2 = *(long *)l2; | |
return v1 - v2; | |
} | |
int pct_nth_idx(double percentile, size_t size) | |
{ | |
return (int)(percentile * size); | |
} | |
long get_pct(long* arr, double percentile, size_t size) | |
{ | |
return arr[pct_nth_idx(percentile, size)]; | |
} | |
void report_summary(summary* summary) | |
{ | |
qsort(summary->latency_ns, summary->n_latency, sizeof(long), latency_less); | |
unsigned long long total_latency = 0; | |
for (size_t i = 0; i < summary->n_latency; ++i) | |
{ | |
total_latency += summary->latency_ns[i]; | |
} | |
long mean_latency_ns = total_latency / summary->n_latency; | |
double percentiles[] = { 0.10, 0.20, 0.50, 0.80, 0.90, 0.99 }; | |
printf("Min: %ldns\n", summary->latency_ns[0]); | |
printf("Mean: %ldns\n", mean_latency_ns); | |
printf("Max: %ldns\n", summary->latency_ns[summary->n_latency - 1]); | |
for (size_t i = 0; i < sizeof(percentiles) / sizeof(*percentiles); ++i) | |
{ | |
printf("p(%lf) = %ldns\n", percentiles[i], get_pct(summary->latency_ns, percentiles[i], summary->n_latency)); | |
} | |
} | |
void print_tcp_info(server* srv) | |
{ | |
for (size_t i = 0; i < srv->n_clients; ++i) | |
{ | |
struct tcp_info tcp_info; | |
socklen_t tcp_info_length = sizeof(tcp_info); | |
if (getsockopt(srv->clients[i]->sockfd, SOL_TCP, TCP_INFO, (void *)&tcp_info, &tcp_info_length) == 0) | |
{ | |
printf("fd %d: %u %u %u %u %u %u %u %u %u %u %u %u\n", | |
srv->clients[i]->sockfd, | |
tcp_info.tcpi_last_data_sent, | |
tcp_info.tcpi_last_data_recv, | |
tcp_info.tcpi_snd_cwnd, | |
tcp_info.tcpi_snd_ssthresh, | |
tcp_info.tcpi_rcv_ssthresh, | |
tcp_info.tcpi_rtt, | |
tcp_info.tcpi_rttvar, | |
tcp_info.tcpi_unacked, | |
tcp_info.tcpi_sacked, | |
tcp_info.tcpi_lost, | |
tcp_info.tcpi_retrans, | |
tcp_info.tcpi_fackets); | |
} | |
} | |
} | |
int fill_timestamp(client_data* client) | |
{ | |
struct timespec tp; | |
int res = clock_gettime(CLOCK_REALTIME, &tp); | |
if (res == 0) | |
{ | |
uint64_t nanoseconds_since_epoch = (uint64_t)(tp.tv_sec) * 1000000000UL + (uint64_t)tp.tv_nsec; | |
memcpy(client->iov.iov_base, &nanoseconds_since_epoch, sizeof(nanoseconds_since_epoch)); | |
} | |
return res; | |
} | |
int cork_clients(const program_options* options, server* srv, int value) | |
{ | |
if (options->cork) | |
{ | |
for (size_t i = 0; i < srv->n_clients; ++i) | |
{ | |
int sockfd = srv->clients[i]->sockfd; | |
int res = set_cork(sockfd, value); | |
if (res < 0) | |
return res; | |
} | |
} | |
return 0; | |
} | |
ssize_t do_sendmsg(server* srv) | |
{ | |
ssize_t bytes = 0; | |
for (size_t i = 0; i < srv->n_clients; ++i) | |
{ | |
client_data* client = srv->clients[i]; | |
ssize_t ret = sendmsg(client->sockfd, &client->msg, 0); | |
if (ret < 0) | |
{ | |
perror("sendmsg"); | |
return ret; | |
} | |
bytes += ret; | |
} | |
return bytes; | |
} | |
#if defined(WITH_URING) | |
ssize_t do_uring_sendmsg(server* srv) | |
{ | |
size_t bytes = 0; | |
for (size_t i = 0; i < srv->n_clients; ++i) | |
{ | |
client_data* client = srv->clients[i]; | |
struct io_uring_sqe* sqe = io_uring_get_sqe(&srv->ring); | |
io_uring_prep_sendmsg(sqe, client->regfd, &client->msg, 0); | |
sqe->flags |= IOSQE_FIXED_FILE; | |
} | |
int ret = io_uring_submit(&srv->ring); | |
if (ret < 0) | |
return ret; | |
struct io_uring_cqe* cqes[4]; | |
ret = io_uring_peek_batch_cqe(&srv->ring, cqes, srv->n_clients); | |
if (ret > 0) | |
{ | |
for (int i = 0; i < ret; ++i) | |
{ | |
const struct io_uring_cqe* cqe = *(cqes + i); | |
if (cqe->res < 0) | |
return cqe->res; | |
bytes += cqe->res; | |
} | |
io_uring_cq_advance(&srv->ring, ret); | |
} | |
return bytes; | |
} | |
#endif | |
typedef ssize_t (*send_func)(server* srv); | |
stats send_loop_rate(server* srv, const program_options* options) | |
{ | |
summary *send_latency = NULL; | |
summary* batch_latency = NULL; | |
stats stats; | |
stats.send_latency = stats.batch_latency = NULL; | |
size_t send_period_ms = 0; | |
size_t messages_per_period = 0; | |
size_t sent_messages = 0; | |
int ret; | |
send_latency = summary_new(options->total_messages); | |
if (!send_latency) | |
{ | |
perror("send_latency"); | |
return stats; | |
} | |
batch_latency = summary_new(options->total_messages); | |
if (!batch_latency) | |
{ | |
perror("batch_latency"); | |
return stats; | |
} | |
stats.send_latency = send_latency; | |
stats.batch_latency = batch_latency; | |
if (options->message_rate > 1000) | |
{ | |
send_period_ms = 1; | |
messages_per_period = options->message_rate / 1000; | |
} | |
else | |
{ | |
send_period_ms = 1000 / options->message_rate; | |
messages_per_period = 1; | |
} | |
printf("Sending %llu messages every %llu ms...\n", (unsigned long long) messages_per_period, (unsigned long long) send_period_ms); | |
#if defined(WITH_URING) | |
send_func send_func = srv->enable_uring ? &do_uring_sendmsg : &do_sendmsg; | |
#else | |
printf("[Warn] uring not available, forcing sendmsg\n"); | |
send_func send_func = &do_sendmsg; | |
#endif | |
for (size_t i = 0; i < srv->n_clients; ++i) | |
{ | |
char *buffer = malloc(options->buffer_size); | |
memset(buffer, 'a' + i, options->buffer_size); | |
client_data* client = srv->clients[i]; | |
client->iov.iov_base = (void *)buffer; | |
client->iov.iov_len = options->buffer_size; | |
memset(&client->msg, 0, sizeof(struct msghdr)); | |
client->msg.msg_iov = &client->iov; | |
client->msg.msg_iovlen = 1; | |
} | |
#if defined(WITH_URING) | |
ret = register_uring_files(srv); | |
if (ret < 0) | |
return NULL; | |
#endif | |
do | |
{ | |
struct timespec time_begin, time_end; | |
struct timespec tp; | |
ret = clock_gettime(CLOCK_REALTIME, &tp); | |
if (ret == 0) | |
{ | |
uint64_t nanoseconds_since_epoch = (uint64_t)(tp.tv_sec) * 1000000000UL + (uint64_t)tp.tv_nsec; | |
for (size_t i = 0; i < srv->n_clients; ++i) | |
{ | |
client_data* client = srv->clients[i]; | |
memcpy(client->iov.iov_base, &nanoseconds_since_epoch, sizeof(nanoseconds_since_epoch)); | |
} | |
} | |
cork_clients(options, srv, 1); | |
clock_gettime(CLOCK_MONOTONIC, &time_begin); | |
for (size_t i = 0; i < messages_per_period; ++i) | |
{ | |
BEGIN_TIMED(send_latency) | |
{ | |
ssize_t ret = (*send_func)(srv); | |
if (ret < 0) | |
{ | |
printf("send error: %s\n", strerror(-ret)); | |
break; | |
} | |
} END_TIMED(); | |
} | |
sent_messages += messages_per_period; | |
clock_gettime(CLOCK_MONOTONIC, &time_end); | |
struct timespec elapsed = time_diff(time_begin, time_end); | |
cork_clients(options, srv, 0); | |
long sleep_time_ns = (send_period_ms * 1000000) - elapsed.tv_nsec; | |
if (sleep_time_ns > 0) | |
do_nanosleep(sleep_time_ns); | |
} while (sent_messages < options->total_messages); | |
return stats; | |
} | |
stats send_loop_batched(server* srv, const program_options* options) | |
{ | |
summary *send_latency = NULL; | |
summary* batch_latency = NULL; | |
stats stats; | |
stats.send_latency = stats.batch_latency = NULL; | |
int ret; | |
size_t sent_messages = 0; | |
size_t batch_size; | |
send_latency = summary_new(options->total_messages); | |
if (!send_latency) | |
{ | |
perror("send_latency"); | |
return stats; | |
} | |
batch_latency = summary_new(options->total_messages); | |
if (!batch_latency) | |
{ | |
perror("batch_latency"); | |
return stats; | |
} | |
stats.send_latency = send_latency; | |
stats.batch_latency = batch_latency; | |
#if defined(WITH_URING) | |
send_func send_func = srv->enable_uring ? &do_uring_sendmsg : &do_sendmsg; | |
#else | |
printf("[Warn] uring not available, forcing sendmsg\n"); | |
send_func send_func = &do_sendmsg; | |
#endif | |
for (size_t i = 0; i < srv->n_clients; ++i) | |
{ | |
char *buffer = malloc(options->buffer_size); | |
memset(buffer, 'a' + i, options->buffer_size); | |
client_data* client = srv->clients[i]; | |
client->iov.iov_base = (void *)buffer; | |
client->iov.iov_len = options->buffer_size; | |
memset(&client->msg, 0, sizeof(struct msghdr)); | |
client->msg.msg_iov = &client->iov; | |
client->msg.msg_iovlen = 1; | |
} | |
#if defined(WITH_URING) | |
ret = register_uring_files(srv); | |
if (ret < 0) | |
return NULL; | |
#endif | |
batch_size = options->batch_size > 0 ? options->batch_size : 1; | |
do | |
{ | |
struct timespec tp; | |
ret = clock_gettime(CLOCK_REALTIME, &tp); | |
if (ret == 0) | |
{ | |
uint64_t nanoseconds_since_epoch = (uint64_t)(tp.tv_sec) * 1000000000UL + (uint64_t)tp.tv_nsec; | |
for (size_t i = 0; i < srv->n_clients; ++i) | |
{ | |
client_data* client = srv->clients[i]; | |
memcpy(client->iov.iov_base, &nanoseconds_since_epoch, sizeof(nanoseconds_since_epoch)); | |
} | |
} | |
cork_clients(options, srv, 1); | |
for (size_t i = 0; i < batch_size; ++i) | |
{ | |
BEGIN_TIMED(send_latency) | |
{ | |
ssize_t ret = (*send_func)(srv); | |
if (ret < 0) | |
{ | |
printf("send error: %s\n", strerror(-ret)); | |
break; | |
} | |
} | |
END_TIMED() | |
++sent_messages; | |
} | |
cork_clients(options, srv, 0); | |
if (options->sleep_ms > 0) | |
do_nanosleep(options->sleep_ms * 1000000); | |
} while (sent_messages < options->total_messages); | |
return stats; | |
} | |
stats send_loop(server* srv, const program_options* options) | |
{ | |
stats stats; | |
if (options->batch_size > 0) | |
stats = send_loop_batched(srv, options); | |
else if (options->message_rate > 0) | |
stats = send_loop_rate(srv, options); | |
return stats; | |
} | |
void accept_loop(server* srv, const program_options* options) | |
{ | |
for (;;) | |
{ | |
struct sockaddr_in client_addr; | |
socklen_t len = sizeof(client_addr); | |
int sockfd = accept(srv->sockfd, (struct sockaddr *) &client_addr, &len); | |
if (sockfd < 0) | |
{ | |
perror("accept"); | |
return; | |
} | |
set_blocking(sockfd, 0); | |
if (set_no_delay(sockfd, options->no_delay) < 0) | |
{ | |
perror("set_no_delay"); | |
break; | |
} | |
if (set_cork(sockfd, options->cork) < 0) | |
{ | |
perror("set_cork"); | |
break; | |
} | |
printf("Accepted connection from %s:%d\n", inet_ntoa(client_addr.sin_addr), client_addr.sin_port); | |
client_data* client = client_new(client_addr, sockfd); | |
if (!client) | |
{ | |
perror("client_new"); | |
break; | |
} | |
client->is_corked = options->cork; | |
srv->clients[srv->n_clients++] = client; | |
size_t remaining_clients = srv->clients_count - srv->n_clients; | |
if (remaining_clients == 0) | |
break; | |
else | |
printf("Waiting for %llu more clients\n", (unsigned long long) remaining_clients); | |
} | |
stats stats = send_loop(srv, options); | |
if (stats.send_latency) | |
{ | |
puts("Send latency report"); | |
report_summary(stats.send_latency); | |
} | |
if (stats.batch_latency) | |
{ | |
puts("batch latency report"); | |
report_summary(stats.batch_latency); | |
} | |
print_tcp_info(srv); | |
} | |
int main(int argc, char* argv[]) | |
{ | |
program_options options = parse_options(argc, argv); | |
if (validate_options(&options) == options_error) | |
return EXIT_FAILURE; | |
options_print(&options); | |
server* srv = server_start(&options); | |
if (!srv) | |
return EXIT_FAILURE; | |
printf("listening on 0.0.0.0:%d\n", options.port); | |
accept_loop(srv, &options); | |
server_stop(srv); | |
return EXIT_SUCCESS; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment