Created
March 5, 2016 23:50
Revisions
-
voidexp created this gist
Mar 5, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,127 @@ #include "error.h" #include "mem.h" #include "queue.h" #include <assert.h> #include <pthread.h> #include <stdio.h> #include <string.h> #include <time.h> #include <sys/time.h> struct GK_Queue { void *data; size_t len; size_t size; size_t elem_size; pthread_mutex_t mutex; pthread_cond_t cond_has_data; pthread_cond_t cond_full; }; struct GK_Queue* gk_queue_new(size_t elem_size, size_t size) { assert(elem_size > 0); assert(size > 0); struct GK_Queue *q = gk_new(struct GK_Queue); if (!q) return NULL; if (!(q->data = gk_alloc0(elem_size * size))) { gk_free(q); return NULL; } q->size = size; q->elem_size = elem_size; if (pthread_mutex_init(&q->mutex, NULL) != 0) goto mutex_error; if (pthread_cond_init(&q->cond_has_data, NULL) != 0) goto cond_error; return q; cond_error: pthread_mutex_destroy(&q->mutex); mutex_error: gk_free(q->data); gk_free(q); return NULL; } void gk_queue_free(struct GK_Queue *q) { if (q) { pthread_cond_destroy(&q->cond_has_data); pthread_mutex_destroy(&q->mutex); gk_free(q->data); gk_free(q); } } void* gk_queue_get(struct GK_Queue *q) { pthread_mutex_lock(&q->mutex); void *elem = NULL; while (q->len == 0) { // block until the queue has some data in struct timespec ts; struct timeval tv; gettimeofday(&tv, NULL); ts.tv_sec = tv.tv_sec; ts.tv_nsec = tv.tv_usec * 1000 + 1000; pthread_cond_timedwait(&q->cond_has_data, &q->mutex, &ts); } // pick the first element elem = q->data; q->len--; // shift elements in the queue by one position // TODO: implement this better for (size_t i = 0, j = 1; j < q->len; i++, j++) { memcpy( q->data + q->elem_size * i, q->data + q->elem_size * j, q->elem_size ); } pthread_cond_broadcast(&q->cond_full); pthread_mutex_unlock(&q->mutex); return elem; } void gk_queue_add(struct GK_Queue *q, void *elem) { pthread_mutex_lock(&q->mutex); while (q->len == q->size) { // block while the queue is full pthread_cond_wait(&q->cond_full, &q->mutex); } // append the element to the end of the queue and notify memcpy(q->data + q->elem_size * q->len++, elem, q->elem_size); pthread_cond_broadcast(&q->cond_has_data); pthread_mutex_unlock(&q->mutex); } size_t gk_queue_len(struct GK_Queue *q) { size_t len; pthread_mutex_lock(&q->mutex); len = q->len; pthread_mutex_unlock(&q->mutex); return len; } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,168 @@ #include "mem.h" #include "queue.h" #include "task_queue.h" #include <assert.h> #include <pthread.h> #include <stdbool.h> #include <stdio.h> struct Worker { pthread_t thread; bool initialized; struct GK_TaskQueue *queue; }; struct Task { GK_TaskFunc func; void *data; }; struct GK_TaskQueue { struct GK_Queue *tasks; struct Worker *workers; unsigned num_workers; size_t num_running; pthread_mutex_t mutex; pthread_cond_t cond_running; }; static struct Task* get_task(struct GK_TaskQueue *tq) { struct Task *tsk = gk_queue_get(tq->tasks); pthread_mutex_lock(&tq->mutex); tq->num_running++; pthread_mutex_unlock(&tq->mutex); return tsk; } static void task_done(struct GK_TaskQueue *tq) { pthread_mutex_lock(&tq->mutex); tq->num_running--; pthread_cond_signal(&tq->cond_running); pthread_mutex_unlock(&tq->mutex); } static void* worker(void *arg) { struct Worker *wrk = arg; bool run = true; while (run) { printf("worker %p waits for task\n", wrk->thread); struct Task *tsk = get_task(wrk->queue); printf("worker %p started task\n", wrk->thread); if (tsk && tsk->func) { // TODO: handle return code tsk->func(tsk->data); } else { run = false; } task_done(wrk->queue); printf("worker %p finished task\n", wrk->thread); } return NULL; } struct GK_TaskQueue* gk_task_queue_new(size_t size, unsigned num_workers) { assert(size > 0); assert(num_workers > 0); struct GK_TaskQueue *tq = gk_new(struct GK_TaskQueue); if (!tq) return NULL; if (pthread_mutex_init(&tq->mutex, NULL) != 0) goto mutex_error; if (pthread_cond_init(&tq->cond_running, NULL) != 0) goto cond_error; tq->tasks = gk_queue_new(sizeof(struct Task), size); if (!tq->tasks) goto error; tq->num_workers = num_workers; tq->workers = gk_alloc0(sizeof(struct Worker) * num_workers); if (!tq->workers) goto error; // spawn worker threads for (unsigned i = 0; i < num_workers; i++) { struct Worker *wrk = &tq->workers[i]; wrk->queue = tq; if (pthread_create(&wrk->thread, NULL, worker, wrk) != 0) goto error; wrk->initialized = true; } return tq; error: gk_task_queue_free(tq); return NULL; cond_error: pthread_mutex_destroy(&tq->mutex); mutex_error: gk_free(tq); return NULL; } void gk_task_queue_free(struct GK_TaskQueue *tq) { if (tq) { if (tq->workers) { gk_task_queue_wait(tq); for (unsigned i = 0; i < tq->num_workers; i++) { gk_task_queue_add(tq, NULL, NULL); } for (unsigned i = 0; i < tq->num_workers; i++) { struct Worker *wrk = &tq->workers[i]; if (wrk->initialized) { pthread_join(wrk->thread, NULL); } } gk_free(tq->workers); } pthread_mutex_destroy(&tq->mutex); pthread_cond_destroy(&tq->cond_running); gk_queue_free(tq->tasks); gk_free(tq); } } int gk_task_queue_add(struct GK_TaskQueue *tq, GK_TaskFunc f, void *data) { struct Task task = { .func = f, .data = data }; gk_queue_add(tq->tasks, &task); return 1; } int gk_task_queue_wait(struct GK_TaskQueue *tq) { pthread_mutex_lock(&tq->mutex); while (tq->num_running > 0 || gk_queue_len(tq->tasks) > 0) pthread_cond_wait(&tq->cond_running, &tq->mutex); pthread_mutex_unlock(&tq->mutex); return 1; }