-
-
Save joshrotenberg/943323 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <unistd.h> | |
#include <assert.h> | |
#include <time.h> | |
#include <stdbool.h> | |
#include <ev.h> | |
#include <evcom.h> | |
#include <sys/types.h> | |
// Load the logic for the config parsing | |
#include "config.c" | |
// 1M should be enough buffer for any reasonable message | |
#define MAX_MESSAGE_LENGTH 1048576 | |
int active_messages; // Number left in this active group | |
// Streams to all the peers | |
evcom_stream *streams; | |
FILE *vc_fd; | |
FILE *checkpoint_fd; | |
int* vector_clock; | |
int* last_label_rcvd; | |
int* first_label_sent; | |
int sequenceNo = 0; | |
char buffer[MAX_MESSAGE_LENGTH]; | |
int offset = 0; | |
static void go_active(); | |
static void take_checkpoint(); | |
// Sends a json message and cleans up memory | |
static void stream_write(evcom_stream* stream, cJSON* message) { | |
cJSON_AddItemToObject(message, "clock", cJSON_CreateIntArray(vector_clock, numNodes)); | |
cJSON_AddNumberToObject(message, "from", nodeID); | |
char* json = cJSON_PrintUnformatted(message); | |
char* out = (char*)malloc(strlen(json) + 1); | |
sprintf(out, "%s\n", json); | |
printf("%02d: >-. %s\n", nodeID, json); | |
cJSON_Delete(message); | |
evcom_stream_write(stream, (void*)out, strlen(out)); | |
free(json); | |
// free(out); | |
} | |
// Wrapper to make timeouts easy | |
static void set_timeout(void (*callback)(int), int milliseconds) { | |
ev_timer* timer; | |
timer = (ev_timer *)malloc(sizeof(ev_timer)); | |
ev_timer_init(timer, callback, milliseconds / 1000.0, 0); | |
ev_timer_start(EV_DEFAULT_ timer); | |
} | |
static void on_message(evcom_stream* stream, cJSON* message) { | |
int i; | |
// Merge vector clocks | |
cJSON* vector = cJSON_GetObjectItem(message, "clock"); | |
for (i = 0; i < numNodes; i++) { | |
int v = cJSON_GetArrayItem(vector, i)->valueint; | |
vector_clock[i] = vector_clock[i] > v ? vector_clock[i] : v; | |
} | |
if (cJSON_GetObjectItem(message, "to")) { | |
int from = cJSON_GetObjectItem(message, "from")->valueint; | |
last_label_rcvd[from] = vector_clock[from]; | |
if (!active && maxNumber > 0) { | |
go_active(); | |
} | |
} | |
if (cJSON_GetObjectItem(message, "llr")) { | |
int llr = cJSON_GetObjectItem(message, "llr")->valueint; | |
if (first_label_sent > llr) { | |
take_checkpoint(); | |
} | |
} | |
} | |
// Grab the chunks and merge/split them into discrete message events | |
static void on_chunk (evcom_stream *stream, const void *base, size_t len) { | |
int i; | |
char* input; | |
input = (char*) base; | |
// char* debug = (char*)malloc(len + 1); | |
// memcpy(debug, base, len); | |
// debug[len] = 0; | |
// printf("%02d: on_chunk: %d '%s'\n", nodeID, len, debug); | |
for (i = 0; i < len; i++) { | |
// Ignore \r characters | |
if (input[i] == '\r') { | |
continue; | |
} | |
// Capture all content up to the first newline as a message | |
if (input[i] == '\n') { | |
buffer[offset] = 0; | |
offset++; | |
char* message = (char*)malloc(sizeof(char)*(offset)); | |
memcpy(message, buffer, offset); | |
offset = 0; | |
cJSON *result = cJSON_Parse(message); | |
printf("%02d: `-> %s\n", nodeID, message); | |
free(message); | |
if (result == NULL) { | |
cJSON* error_message = cJSON_CreateObject(); | |
cJSON_AddItemToObject(error_message, "error", cJSON_CreateString("Invalid JSON")); | |
stream_write(stream, error_message); | |
continue; | |
} | |
on_message(stream, result); | |
continue; | |
} | |
// Put a fence at the end of the buffer, just in case | |
if (offset < MAX_MESSAGE_LENGTH) { | |
buffer[offset] = input[i]; | |
offset++; | |
} else { | |
printf("WARNING: Discarding %c\n", input[i]); | |
} | |
} | |
} | |
// Start up the tcp server | |
void start_tcp_server() { | |
evcom_server *server = (evcom_server*)malloc(sizeof(evcom_server)); | |
evcom_server_init(server); | |
// Nested function for server connection | |
evcom_stream* on_server_connection (evcom_server *server, struct sockaddr *addr) | |
{ | |
assert(server); | |
assert(addr); | |
// This is the stream from a peer. | |
evcom_stream *stream = malloc(sizeof(evcom_stream)); | |
evcom_stream_init(stream); | |
stream->on_read = on_chunk; | |
return stream; | |
} | |
server->on_connection = on_server_connection; | |
int r = evcom_server_listen(server, (struct sockaddr*)&addresses[nodeID], 10); | |
assert(r == 0); | |
evcom_server_attach(EV_DEFAULT_ server); | |
printf("%02d: Server listening on port %d\n", nodeID, hostPort); | |
} | |
// Called when the input is invalid. | |
int usage(char const *argv[]) { | |
printf("Usage:\n\t%s config_file nodeID\n", argv[0]); | |
return 1; | |
} | |
// Connect to our peer stations and call "callback" when done. | |
static void connect_to_peers() { | |
int i; | |
streams = (evcom_stream *)malloc(sizeof(evcom_stream) * numNodes); | |
for (i = 0; i < numNeighbors; i++) { | |
int j = neighbors[i]; | |
evcom_stream_init(&streams[j]); | |
assert(EVCOM_INITIALIZED == evcom_stream_state(&streams[j])); | |
printf("%02d: Connecting to peer %02d.\n", nodeID, j); | |
int r = evcom_stream_connect(&streams[j], (struct sockaddr*)&addresses[j]); | |
assert(r == 0); | |
evcom_stream_attach(EV_DEFAULT_ &streams[j]); | |
} | |
} | |
static void done() { | |
// Close up. | |
printf("%02d: Done\n", nodeID); | |
// closeLog(); | |
exit(0); | |
} | |
static void send_message() { | |
if (maxNumber == 0) { | |
set_timeout(&done, 1000); | |
return; | |
} | |
if (active_messages == 0) { return; } | |
int target = neighbors[rand() % numNeighbors]; | |
cJSON* message = cJSON_CreateObject(); | |
cJSON_AddNumberToObject(message, "to", target); | |
vector_clock[nodeID]++; | |
first_label_sent[target] = vector_clock[nodeID] + 1; | |
stream_write(&streams[target], message); | |
active_messages--; | |
maxNumber--; | |
set_timeout(&send_message, minSendDelay); | |
} | |
static void go_active() { | |
active_messages = rand() % (maxPerActive - minPerActive + 1) + minPerActive; | |
send_message(); | |
} | |
static void on_warm() { | |
if (active) { | |
go_active(); | |
} | |
} | |
static void take_checkpoint() { | |
sequenceNo++; | |
printf("%02d: Take Checkpoint!\n", nodeID); | |
char* list = cJSON_PrintUnformatted(cJSON_CreateIntArray(vector_clock, numNodes)); | |
fprintf(vc_fd,"%d\t\t\t%s\n", sequenceNo, list); | |
cJSON* checkpoint = cJSON_CreateObject(); | |
cJSON_AddItemToObject(checkpoint, "clock", cJSON_CreateIntArray(vector_clock, numNodes)); | |
cJSON_AddItemToObject(checkpoint, "llr", cJSON_CreateIntArray(last_label_rcvd, numNodes)); | |
cJSON_AddItemToObject(checkpoint, "fls", cJSON_CreateIntArray(first_label_sent, numNodes)); | |
fprintf(checkpoint_fd,"%d\t\t\t%s\t\t\t%s\n", sequenceNo, list, cJSON_PrintUnformatted(checkpoint)); | |
int i; | |
for (i = 0; i < numNeighbors; i++) { | |
int target = neighbors[i]; | |
if (last_label_rcvd[target] > 0) { | |
cJSON* message = cJSON_CreateObject(); | |
cJSON_AddNumberToObject(message, "llr", last_label_rcvd[target]); | |
stream_write(&streams[target], message); | |
} | |
} | |
// Reset history | |
for (i = 0; i < numNodes; i++) { | |
last_label_rcvd[i] = 0; | |
first_label_sent[i] = 0; | |
} | |
} | |
int main (int argc, char const *argv[]) { | |
// Ensure there are 2 arguments on the command line | |
if (argc != 3) return usage(argv); | |
// Ensure the nodeID is an integer | |
nodeID = atoi(argv[2]); | |
// Set up system | |
load_config(argv[1]); | |
char vc_filename[9]; | |
char checkpoint_filename[18]; | |
sprintf(vc_filename, "vc%02d.txt", nodeID); | |
sprintf(checkpoint_filename, "checkpoints%02d.txt", nodeID); | |
vc_fd = fopen(vc_filename, "w"); | |
checkpoint_fd = fopen(checkpoint_filename, "w"); | |
// Initialize the clock and the checkpoint vars | |
int i; | |
vector_clock = (int*)malloc(sizeof(int) * numNodes); | |
last_label_rcvd = (int*)malloc(sizeof(int) * numNodes); | |
first_label_sent = (int*)malloc(sizeof(int) * numNodes); | |
for (i = 0; i < numNodes; i++) { | |
vector_clock[i] = 0; | |
last_label_rcvd[i] = 0; | |
first_label_sent[i] = 0; | |
} | |
// initLog(); | |
start_tcp_server(); | |
// Prime the random number generator | |
srand(10); | |
srand(rand()); | |
set_timeout(&take_checkpoint, instDelay + 3000); | |
set_timeout(&on_warm, 3000); | |
set_timeout(&connect_to_peers, 1500); | |
ev_loop(EV_DEFAULT_ 0); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment