Skip to content

Instantly share code, notes, and snippets.

@voidexp
Created March 5, 2016 23:50

Revisions

  1. voidexp created this gist Mar 5, 2016.
    127 changes: 127 additions & 0 deletions queue.c
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,127 @@
    #include "error.h"
    #include "mem.h"
    #include "queue.h"
    #include <assert.h>
    #include <pthread.h>
    #include <stdio.h>
    #include <string.h>
    #include <time.h>
    #include <sys/time.h>

    struct GK_Queue {
    void *data;
    size_t len;
    size_t size;
    size_t elem_size;
    pthread_mutex_t mutex;
    pthread_cond_t cond_has_data;
    pthread_cond_t cond_full;
    };

    struct GK_Queue*
    gk_queue_new(size_t elem_size, size_t size)
    {
    assert(elem_size > 0);
    assert(size > 0);

    struct GK_Queue *q = gk_new(struct GK_Queue);
    if (!q)
    return NULL;

    if (!(q->data = gk_alloc0(elem_size * size))) {
    gk_free(q);
    return NULL;
    }
    q->size = size;
    q->elem_size = elem_size;

    if (pthread_mutex_init(&q->mutex, NULL) != 0)
    goto mutex_error;

    if (pthread_cond_init(&q->cond_has_data, NULL) != 0)
    goto cond_error;

    return q;

    cond_error:
    pthread_mutex_destroy(&q->mutex);

    mutex_error:
    gk_free(q->data);
    gk_free(q);
    return NULL;
    }

    void
    gk_queue_free(struct GK_Queue *q)
    {
    if (q) {
    pthread_cond_destroy(&q->cond_has_data);
    pthread_mutex_destroy(&q->mutex);
    gk_free(q->data);
    gk_free(q);
    }
    }

    void*
    gk_queue_get(struct GK_Queue *q)
    {
    pthread_mutex_lock(&q->mutex);

    void *elem = NULL;

    while (q->len == 0) {
    // block until the queue has some data in
    struct timespec ts;
    struct timeval tv;
    gettimeofday(&tv, NULL);
    ts.tv_sec = tv.tv_sec;
    ts.tv_nsec = tv.tv_usec * 1000 + 1000;
    pthread_cond_timedwait(&q->cond_has_data, &q->mutex, &ts);
    }

    // pick the first element
    elem = q->data;
    q->len--;

    // shift elements in the queue by one position
    // TODO: implement this better
    for (size_t i = 0, j = 1; j < q->len; i++, j++) {
    memcpy(
    q->data + q->elem_size * i,
    q->data + q->elem_size * j,
    q->elem_size
    );
    }
    pthread_cond_broadcast(&q->cond_full);

    pthread_mutex_unlock(&q->mutex);
    return elem;
    }

    void
    gk_queue_add(struct GK_Queue *q, void *elem)
    {
    pthread_mutex_lock(&q->mutex);

    while (q->len == q->size) {
    // block while the queue is full
    pthread_cond_wait(&q->cond_full, &q->mutex);
    }

    // append the element to the end of the queue and notify
    memcpy(q->data + q->elem_size * q->len++, elem, q->elem_size);
    pthread_cond_broadcast(&q->cond_has_data);

    pthread_mutex_unlock(&q->mutex);
    }

    size_t
    gk_queue_len(struct GK_Queue *q)
    {
    size_t len;
    pthread_mutex_lock(&q->mutex);
    len = q->len;
    pthread_mutex_unlock(&q->mutex);
    return len;
    }
    168 changes: 168 additions & 0 deletions task_queue.c
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,168 @@
    #include "mem.h"
    #include "queue.h"
    #include "task_queue.h"
    #include <assert.h>
    #include <pthread.h>
    #include <stdbool.h>
    #include <stdio.h>

    struct Worker {
    pthread_t thread;
    bool initialized;
    struct GK_TaskQueue *queue;
    };

    struct Task {
    GK_TaskFunc func;
    void *data;
    };

    struct GK_TaskQueue {
    struct GK_Queue *tasks;
    struct Worker *workers;
    unsigned num_workers;
    size_t num_running;
    pthread_mutex_t mutex;
    pthread_cond_t cond_running;
    };

    static struct Task*
    get_task(struct GK_TaskQueue *tq)
    {
    struct Task *tsk = gk_queue_get(tq->tasks);

    pthread_mutex_lock(&tq->mutex);
    tq->num_running++;
    pthread_mutex_unlock(&tq->mutex);

    return tsk;
    }

    static void
    task_done(struct GK_TaskQueue *tq)
    {
    pthread_mutex_lock(&tq->mutex);
    tq->num_running--;
    pthread_cond_signal(&tq->cond_running);
    pthread_mutex_unlock(&tq->mutex);
    }

    static void*
    worker(void *arg)
    {
    struct Worker *wrk = arg;

    bool run = true;
    while (run) {
    printf("worker %p waits for task\n", wrk->thread);
    struct Task *tsk = get_task(wrk->queue);
    printf("worker %p started task\n", wrk->thread);
    if (tsk && tsk->func) {
    // TODO: handle return code
    tsk->func(tsk->data);
    } else {
    run = false;
    }
    task_done(wrk->queue);
    printf("worker %p finished task\n", wrk->thread);
    }
    return NULL;
    }

    struct GK_TaskQueue*
    gk_task_queue_new(size_t size, unsigned num_workers)
    {
    assert(size > 0);
    assert(num_workers > 0);

    struct GK_TaskQueue *tq = gk_new(struct GK_TaskQueue);
    if (!tq)
    return NULL;

    if (pthread_mutex_init(&tq->mutex, NULL) != 0)
    goto mutex_error;

    if (pthread_cond_init(&tq->cond_running, NULL) != 0)
    goto cond_error;

    tq->tasks = gk_queue_new(sizeof(struct Task), size);
    if (!tq->tasks)
    goto error;

    tq->num_workers = num_workers;
    tq->workers = gk_alloc0(sizeof(struct Worker) * num_workers);
    if (!tq->workers)
    goto error;

    // spawn worker threads
    for (unsigned i = 0; i < num_workers; i++) {
    struct Worker *wrk = &tq->workers[i];
    wrk->queue = tq;

    if (pthread_create(&wrk->thread, NULL, worker, wrk) != 0)
    goto error;

    wrk->initialized = true;
    }

    return tq;

    error:
    gk_task_queue_free(tq);
    return NULL;

    cond_error:
    pthread_mutex_destroy(&tq->mutex);

    mutex_error:
    gk_free(tq);
    return NULL;
    }

    void
    gk_task_queue_free(struct GK_TaskQueue *tq)
    {
    if (tq) {
    if (tq->workers) {
    gk_task_queue_wait(tq);

    for (unsigned i = 0; i < tq->num_workers; i++) {
    gk_task_queue_add(tq, NULL, NULL);
    }

    for (unsigned i = 0; i < tq->num_workers; i++) {
    struct Worker *wrk = &tq->workers[i];
    if (wrk->initialized) {
    pthread_join(wrk->thread, NULL);
    }
    }
    gk_free(tq->workers);
    }
    pthread_mutex_destroy(&tq->mutex);
    pthread_cond_destroy(&tq->cond_running);
    gk_queue_free(tq->tasks);
    gk_free(tq);
    }
    }

    int
    gk_task_queue_add(struct GK_TaskQueue *tq, GK_TaskFunc f, void *data)
    {
    struct Task task = {
    .func = f,
    .data = data
    };
    gk_queue_add(tq->tasks, &task);

    return 1;
    }

    int
    gk_task_queue_wait(struct GK_TaskQueue *tq)
    {
    pthread_mutex_lock(&tq->mutex);
    while (tq->num_running > 0 || gk_queue_len(tq->tasks) > 0)
    pthread_cond_wait(&tq->cond_running, &tq->mutex);
    pthread_mutex_unlock(&tq->mutex);
    return 1;
    }