Created
January 12, 2021 16:37
-
-
Save jstimpfle/c79cae3b71bf390ebbed598034232114 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Baseline code (compiles on Linux) for sending UDP packets at a relatively | |
// high rate. With some modifications, this will be suited as a starting point | |
// to experiment with UDP-based transfer protocols. | |
// | |
// Achieves ~350 MiB/s (350K packets/s) on my laptop from 2011 (Lenovo x220, | |
// Sandy Bridge i5-2520M). | |
// | |
// Compile this as a binary "test" and run two instances in parallel (in two | |
// terminals). | |
// | |
// ./test receiver | |
// ./test sender | |
// | |
// 2021, Jens Stimpfle | |
// | |
#define _GNU_SOURCE // sendmmsg(), recvmmsg() | |
#define _POSIX_C_SOURCE 200809L | |
#include <assert.h> | |
#include <errno.h> | |
#include <stdarg.h> | |
#include <stdint.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <fcntl.h> | |
#include <sys/types.h> | |
#include <sys/socket.h> | |
#include <netdb.h> | |
#include <unistd.h> | |
#define ARRAY_COPY(dst, src, count) ((void)((dst)==(src))/*type check*/, memcpy(dst, src, count * sizeof *dst)) | |
void net_msg_vf(const char *fmt, va_list ap) | |
{ | |
vfprintf(stderr, fmt, ap); | |
fprintf(stderr, "\n"); | |
} | |
void net_msg_f(const char *fmt, ...) | |
{ | |
va_list ap; | |
va_start(ap, fmt); | |
net_msg_vf(fmt, ap); | |
va_end(ap); | |
} | |
void net_fatal_f(const char *fmt, ...) | |
{ | |
va_list ap; | |
va_start(ap, fmt); | |
net_msg_vf(fmt, ap); | |
va_end(ap); | |
abort(); | |
} | |
static inline void *_net_xcalloc(size_t count, size_t elem_size) | |
{ | |
void *ptr = calloc(count, elem_size); | |
if (ptr == NULL) | |
{ | |
net_fatal_f("OOM"); | |
} | |
return ptr; | |
} | |
static inline void net_free(void *ptr) | |
{ | |
free(ptr); | |
} | |
#define net_xcalloc(count, type) ((type *) _net_xcalloc((count), sizeof (type))) | |
///////////////////// | |
// Time | |
///////////////////// | |
#include <time.h> | |
typedef struct timespec Timestamp; | |
#include <errno.h> | |
#include <stdint.h> | |
#include <inttypes.h> | |
static inline void format_time_delta(int64_t us, char *out, int size) | |
{ | |
snprintf(out, size, "%" PRIi64 ".%.3" PRIi64 "s", us / 1000000, (us % 1000000) / 1000); | |
} | |
static inline Timestamp get_timestamp(void) | |
{ | |
Timestamp ts; | |
int r = clock_gettime(CLOCK_MONOTONIC_RAW, &ts); | |
if (r != 0) { | |
net_msg_f("clock_gettime() failed: %s", strerror(errno)); | |
abort(); | |
} | |
return ts; | |
} | |
static inline int64_t timestamp_diff_microsecs(Timestamp t1, Timestamp t2) | |
{ | |
int64_t result = ((int64_t) t1.tv_sec - t2.tv_sec) * (int64_t) 1000000LL | |
+ ((int64_t) t1.tv_nsec - t2.tv_nsec) / (int64_t) 1000; | |
return result; | |
} | |
static inline int64_t timestamp_diff_nanosecs(Timestamp t1, Timestamp t2) | |
{ | |
int64_t result = ((int64_t) t1.tv_sec - t2.tv_sec) * (int64_t) 1000000000LL | |
+ ((int64_t) t1.tv_nsec - t2.tv_nsec); | |
return result; | |
} | |
static inline int timestamp_less(Timestamp t1, Timestamp t2) | |
{ | |
// do we have to deal with wraparound? | |
if (t1.tv_sec < t2.tv_sec) | |
return 1; | |
if (t1.tv_sec > t2.tv_sec) | |
return 0; | |
return t1.tv_nsec < t2.tv_nsec; | |
} | |
struct TimeMeasure { Timestamp ts, ts2; int active; }; | |
#define MEASURE(caption) for (struct TimeMeasure tm = {0}; tm.active++ == 0 && ((tm.ts = get_timestamp()), 1); tm.ts2 = get_timestamp(), fprintf(stderr, "it took %dus\n", (int) (tm.ts2.tv_nsec - tm.ts.tv_nsec) / 1000)) | |
struct NetAddress | |
{ | |
int32_t ip_number; | |
int16_t port_number; | |
}; | |
static struct sockaddr_in _netaddress_to_sockaddr_in(struct NetAddress netaddress) | |
{ | |
struct sockaddr_in addr = {0}; | |
addr.sin_family = AF_INET; | |
addr.sin_addr.s_addr = htonl(netaddress.ip_number); | |
addr.sin_port = htons(netaddress.port_number); | |
return addr; | |
} | |
static struct NetAddress _sockaddr_in_to_netaddress(struct sockaddr_in *addr) | |
{ | |
struct NetAddress naddr = {0}; | |
naddr.ip_number = ntohl(addr->sin_addr.s_addr); | |
naddr.port_number = ntohs(addr->sin_port); | |
return naddr; | |
} | |
int net_address_parse(const char *node, const char *service, struct NetAddress *out) | |
{ | |
struct addrinfo *res = NULL; | |
int r = getaddrinfo(node, service, NULL, &res); | |
if (r != 0) { | |
net_msg_f("getaddrinfo() did not find any suitable address: %s", gai_strerror(r)); | |
return 0; | |
} | |
for (struct addrinfo *ai = res; | |
ai != NULL; | |
ai = ai->ai_next) | |
{ | |
if (ai->ai_family == AF_INET | |
&& ai->ai_socktype == SOCK_DGRAM | |
&& ai->ai_protocol == IPPROTO_UDP) | |
{ | |
assert(ai->ai_addrlen == sizeof (struct sockaddr_in)); | |
*out = _sockaddr_in_to_netaddress((struct sockaddr_in *) ai->ai_addr); | |
freeaddrinfo(res); | |
return 1; | |
} | |
} | |
net_msg_f("getaddrinfo() did not find any suitable address"); | |
freeaddrinfo(res); | |
return 0; | |
} | |
enum | |
{ | |
NET_FRAME_SIZE = 1024, | |
}; | |
struct NetReliableHeader | |
{ | |
uint32_t sn; // sequence number | |
uint32_t ack_sn; // acknowledged sequence number | |
uint16_t payload_size; | |
uint8_t payload[]; | |
}; | |
// A structure suitable for sending and receiving packets | |
struct NetPacket | |
{ | |
struct NetAddress peer_address; | |
Timestamp last_send_time; | |
uint16_t num_send_attempts; | |
uint16_t data_size; | |
char data[]; // for example a NetReliableHeader | |
}; | |
// A structure suitable as a buffer of NetPackets for sending and receiving on | |
// Linux (recvmmsg() / sendmmsg()) | |
struct NetPacketQueue | |
{ | |
struct NetPacket **packets; | |
struct mmsghdr *mmsghdrs; | |
struct iovec *iovecs; | |
struct sockaddr_in *sockaddrs; | |
int capacity; | |
int start_index; | |
int fill_count; | |
}; | |
struct NetPacketQueue *net_packet_queue_create(void); | |
void net_packet_queue_destroy(struct NetPacketQueue *queue); | |
void net_packet_queue_set_capacity(struct NetPacketQueue *queue, int capacity); | |
int net_packet_queue_enqueue(struct NetPacketQueue *queue, struct NetPacket *packet); | |
int net_packet_queue_send_or_recv(struct NetPacketQueue *queue, int sockfd, struct NetPacket **packets_return, int count, int is_send); | |
struct NetPacketQueue *net_packet_queue_create(void) | |
{ | |
return net_xcalloc(1, struct NetPacketQueue); | |
} | |
void net_packet_queue_destroy(struct NetPacketQueue *queue) | |
{ | |
return net_free(queue); | |
} | |
void net_packet_queue_set_capacity(struct NetPacketQueue *queue, int capacity) | |
{ | |
if (queue->capacity > 0) | |
{ | |
net_fatal_f("Not supported."); | |
} | |
queue->packets = net_xcalloc(capacity, struct NetPacket *); | |
queue->mmsghdrs = net_xcalloc(capacity, struct mmsghdr); | |
queue->iovecs = net_xcalloc(capacity, struct iovec); | |
queue->sockaddrs = net_xcalloc(capacity, struct sockaddr_in); | |
queue->capacity = capacity; | |
for (int i = 0; i < capacity; i++) | |
{ | |
struct mmsghdr *hdr = queue->mmsghdrs + i; | |
struct iovec *iov = queue->iovecs + i; | |
struct sockaddr_in *saddr = queue->sockaddrs + i; | |
memset(hdr, 0, sizeof *hdr); | |
//hdr->msg_hdr.msg_name = saddr; | |
hdr->msg_hdr.msg_namelen = sizeof *saddr; | |
hdr->msg_hdr.msg_iov = iov; | |
hdr->msg_hdr.msg_iovlen = 1; | |
} | |
} | |
int net_packet_queue_enqueue(struct NetPacketQueue *queue, struct NetPacket *packet) | |
{ | |
if (queue->fill_count == queue->capacity) | |
{ | |
return 0; | |
} | |
int index = queue->start_index + queue->fill_count; | |
if (index >= queue->capacity) | |
{ | |
index -= queue->capacity; | |
} | |
assert(0 <= index && index < queue->capacity); | |
queue->fill_count += 1; | |
queue->packets[index] = packet; | |
queue->iovecs[index].iov_base = packet->data; | |
queue->iovecs[index].iov_len = packet->data_size; | |
queue->sockaddrs[index] = _netaddress_to_sockaddr_in(packet->peer_address); // only needed if this is a send queue | |
return 1; | |
} | |
int net_packet_queue_send_or_recv(struct NetPacketQueue *queue, int sockfd, struct NetPacket **packets_return, int count, int is_send) | |
{ | |
int num_todo = count; | |
if (num_todo > queue->fill_count) | |
{ | |
num_todo = queue->fill_count; | |
} | |
int num_done = 0; | |
// We need a loop because the queue is implemented using a ringbuffer | |
// There will be at most 2 iterations. | |
while (num_done < num_todo) | |
{ | |
int num_ship = queue->capacity - queue->start_index; | |
if (num_ship > queue->fill_count) | |
{ | |
num_ship = queue->fill_count; | |
} | |
int r; | |
if (is_send) | |
{ | |
r = sendmmsg(sockfd, queue->mmsghdrs + queue->start_index, num_ship, 0); | |
//net_msg_f("sendmmsg(sockfd=%d, num_ship=%d) = %d", sockfd, num_ship, r); | |
} | |
else | |
{ | |
#if 0 | |
// wait at most 100 usecs. If we don't specify an explicit time, | |
// Linux will only give us 5-10 messages per call :-( | |
struct timespec max_time = { 0, 100000 }; | |
r = recvmmsg(sockfd, queue->mmsghdrs + queue->start_index, num_ship, 0, &max_time); | |
#else | |
r = recvmmsg(sockfd, queue->mmsghdrs + queue->start_index, num_ship, 0, NULL); | |
#endif | |
//net_msg_f("recvmmsg(): num_ship==%d. r=%d", num_ship, r); | |
} | |
if (r < 0) | |
{ | |
if (errno == EAGAIN) | |
{ | |
break; | |
} | |
else | |
{ | |
net_fatal_f("Unhandled error: %s", strerror(errno)); | |
} | |
} | |
ARRAY_COPY(packets_return + num_done, queue->packets + queue->start_index, num_ship); | |
num_done += r; | |
queue->fill_count -= r; | |
queue->start_index += r; | |
if (queue->start_index == queue->capacity) | |
{ | |
queue->start_index = 0; | |
} | |
assert(queue->start_index < queue->capacity); | |
// This loop is strictly to handle the wraparound - we don't | |
// want to end up shipping many small batches. So if less | |
// packetes were shipped than requested, we need to break and | |
// come back later. | |
if (r != num_ship) | |
{ | |
break; | |
} | |
} | |
return num_done; | |
} | |
struct NetConn | |
{ | |
int sockfd; | |
}; | |
int _net_make_nonblocking_socket(void) | |
{ | |
int sockfd = socket(AF_INET, SOCK_DGRAM, 0); | |
int flags = fcntl(sockfd, F_GETFL, 0); | |
if (flags == -1) { | |
close(sockfd); | |
return -1; | |
} | |
int r = fcntl(sockfd, F_SETFL, flags | O_NONBLOCK); | |
if (r == -1) { | |
close(sockfd); | |
return -1; | |
} | |
return sockfd; | |
} | |
struct NetConn *net_conn_create(void) | |
{ | |
struct NetConn *conn = net_xcalloc(1, struct NetConn); | |
conn->sockfd = _net_make_nonblocking_socket(); | |
return conn; | |
} | |
void net_conn_destroy(struct NetConn *conn) | |
{ | |
close(conn->sockfd); | |
net_free(conn); | |
} | |
void net_conn_connect(struct NetConn *conn, struct NetAddress server_address) | |
{ | |
struct sockaddr_in saddr = _netaddress_to_sockaddr_in(server_address); | |
int r = connect(conn->sockfd, (struct sockaddr *) &saddr, sizeof saddr); | |
if (r < 0) | |
{ | |
net_fatal_f("Failed to connect(): %s", strerror(errno)); | |
} | |
} | |
void net_conn_bind(struct NetConn *conn, struct NetAddress local_address) | |
{ | |
struct sockaddr_in saddr = _netaddress_to_sockaddr_in(local_address); | |
int r = bind(conn->sockfd, (struct sockaddr *) &saddr, sizeof saddr); | |
if (r < 0) | |
{ | |
net_fatal_f("Failed to bind(): %s", strerror(errno)); | |
} | |
} | |
void print_usage_and_exit(void) | |
{ | |
net_msg_f("Usage: ./test {sender|receiver}"); | |
exit(1); | |
} | |
int main(int argc, const char **argv) | |
{ | |
if (argc != 2) | |
{ | |
print_usage_and_exit(); | |
} | |
int is_sender; | |
if (!strcmp(argv[1], "sender")) | |
{ | |
is_sender = 1; | |
} | |
else if (!strcmp(argv[1], "receiver")) | |
{ | |
is_sender = 0; | |
} | |
else | |
{ | |
print_usage_and_exit(); | |
} | |
struct NetAddress server_address; | |
if (!net_address_parse("127.0.0.1", "9009", &server_address)) | |
{ | |
net_fatal_f("Failed to parse NetAddress"); | |
} | |
struct NetConn *conn = net_conn_create(); | |
struct NetPacketQueue *pkq = net_packet_queue_create(); | |
if (is_sender) | |
{ | |
net_conn_connect(conn, server_address); | |
} | |
else | |
{ | |
net_conn_bind(conn, server_address); | |
} | |
net_packet_queue_set_capacity(pkq, 1024); // UIO_MAXIOV is currently 1024, so it doesn't make a lot of sense to go beyond that | |
int free_packets_capacity = 1024; | |
int free_packets_count = free_packets_capacity; | |
struct NetPacket **free_packets_stack = net_xcalloc(free_packets_capacity, struct NetPacket *); | |
void *packets_memory = _net_xcalloc(free_packets_capacity, sizeof (struct NetPacket) + NET_FRAME_SIZE); | |
for (int i = 0; i < free_packets_capacity; i++) | |
{ | |
struct NetPacket *packet = (void *) ((char *) packets_memory + i * (sizeof (struct NetPacket) + NET_FRAME_SIZE)); | |
free_packets_stack[i] = packet; | |
packet->data_size = NET_FRAME_SIZE; | |
// Fill the packet payload with some fake data. Zeroed data | |
// works as well, but when it's written to the terminal one | |
// doesn't notice because no characters appear. Which is a | |
// problem because the terminal is typically the bottleneck, so | |
// one wants to know when the data is being written there. | |
int index = 0; | |
for (int j = 0; index < NET_FRAME_SIZE; j = (j + 1) % 26) | |
{ | |
char c = 'A' + j; | |
int stop = index + 32; | |
if (stop > NET_FRAME_SIZE) | |
{ | |
stop = NET_FRAME_SIZE; | |
} | |
memset((char *)packet->data + index, c, stop - index); | |
index = stop; | |
if (index > 0) | |
{ | |
packet->data[index - 1] = '\n'; | |
} | |
} | |
} | |
uint64_t num_iters = 0; | |
uint64_t num_packets_sent = 0; | |
uint64_t num_packets_received = 0; | |
for (;;) | |
{ | |
++ num_iters; | |
static struct timespec last_log; | |
struct timespec now; | |
clock_gettime(CLOCK_MONOTONIC_RAW, &now); | |
if (((uint64_t) 1000000000LL * (uint64_t) now.tv_sec + (uint64_t) now.tv_nsec) | |
- ((uint64_t) 1000000000LL * (uint64_t) last_log.tv_sec + (uint64_t) last_log.tv_nsec) | |
> 1000000000LL) | |
{ | |
last_log = now; | |
net_msg_f("\n\nnow %" PRIu64 " iterations", num_iters); | |
net_msg_f("%" PRIu64 " packets sent", num_packets_sent); | |
net_msg_f("%" PRIu64 " packets received", num_packets_received); | |
} | |
// Put as many packets in the ship queue as possible. | |
// XXX: we're not actually sending anything useful, for now. | |
while (free_packets_count > 0 && net_packet_queue_enqueue(pkq, free_packets_stack[free_packets_count - 1])) | |
{ | |
-- free_packets_count; | |
} | |
// try to ship | |
{ | |
int num = net_packet_queue_send_or_recv(pkq, conn->sockfd, | |
free_packets_stack + free_packets_count, | |
free_packets_capacity - free_packets_count, | |
is_sender); | |
for (int i = free_packets_count; i < free_packets_count + num; i++) | |
{ | |
struct NetPacket *packet = free_packets_stack[i]; | |
fwrite(packet->data, 1, packet->data_size, stdout); | |
} | |
free_packets_count += num; | |
if (is_sender) | |
{ | |
num_packets_sent += num; | |
} | |
else | |
{ | |
num_packets_received += num; | |
} | |
//net_msg_f("%s %d packets", is_sender ? "sent out" : "received", num); | |
} | |
{ | |
struct timespec ts = { 0, 100000 }; | |
nanosleep(&ts, NULL); | |
} | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment