Skip to content

Instantly share code, notes, and snippets.

@jstimpfle
Last active December 14, 2024 12:35
Show Gist options
  • Save jstimpfle/f626423f86bc2648971268aeb46e76f7 to your computer and use it in GitHub Desktop.
Save jstimpfle/f626423f86bc2648971268aeb46e76f7 to your computer and use it in GitHub Desktop.
Integer increment multithreading test / demo
# -z noexecstack to suppress warning about .s file
gcc -O2 -Wall -z noexecstack -o test incfuncs.s test.c
// Increment functions implemented directly in assembly so we have better
// control over the instructions used. The main thing we're interested is
// making a single-instruction and a multi-instruction increment "transaction".
.section .text
.global inc_multiinstr
.global inc_singleinstr
.global inc_atomic
// increment-function using multiple instructions.
// read from memory to register, increment register, write out register to
// memory.
inc_multiinstr:
mov (%rdi),%eax
add $0x1,%eax
mov %eax,(%rdi)
ret
.balign 8
// increment-function using a _single_ instruction.
// Reads value from memory, and writes the incremented value back, in a single
// instruction.
inc_singleinstr:
addl $0x1,(%rdi)
ret
.balign 8
// Another single-instruction increment function, but this one is atomic.
// Atomic here means that the CPU requests exclusive access to the cache line
// (MESI protocol) to read, increment and write back.
// That allows incrementing to work even when multiple threads run in parallel
// (at the same time, on different CPUs).
// We can generate the same or equivalent thing from C code usin
// GCC builtin: __sync_fetch_and_add(ptr, 1);
// or C11 _Atomic
// or C++11 std::atomic
inc_atomic:
lock addl $0x1,(%rdi)
ret
.balign 8
# file exists only to name the gist
#define _GNU_SOURCE
#include <assert.h>
#include <errno.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stdlib.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <inttypes.h>
#include <pthread.h>
#include <sched.h>
#include <time.h>
void inc_multiinstr(int32_t *p);
void inc_singleinstr(int32_t *p);
void inc_atomic(int32_t *p);
#define NUM_INCREMENTER_THREADS 2
static void fatal(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
fprintf(stderr, "FATAL ERROR: ");
vfprintf(stderr, fmt, ap);
fprintf(stderr, "\n");
va_end(ap);
abort();
}
static void force_thread_to_cpu(int cpu)
{
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu, &cpuset);
if (sched_setaffinity(0, sizeof cpuset, &cpuset) < 0)
fatal("sched_setaffinity(): %s", strerror(errno));
}
typedef struct Context Context;
struct Context
{
int32_t *p;
pthread_mutex_t *mutex;
size_t repeatcount;
};
// This function doesn't return the correct result. The most basic reason is
// that multiple threads read, increment, and then write the variable,
// NON-ATOMICALLY. There is a race condition in this way of incrementing.
static void *threadfunc_unsynchronized(void *arg)
{
Context *ctx = (Context *) arg;
int32_t *p = ctx->p;
size_t repeatcount = ctx->repeatcount;
for (size_t i = 0; i < repeatcount; i++)
{
inc_multiinstr(p);
}
return NULL;
}
// Let's try updating the value using only a single CPU instruction!
// The idea is that we prevent running multiple increments at the same time.
// This approach makes teared transactions (and thus lost increments) somewhat
// less likely.
// But we still get a lot of lost increments.
// The reason is that the single instruction is only "atomic" as an assembler
// instruction: To execute the instruction, the CPU still needs a couple of
// steps to read, increment, and write back the value.
// These steps are not observed as an atomic transaction from other CPUs.
// If the threads are being run in parallel on different CPUs, we get
// wrong results.
static void *threadfunc_singleinstr(void *arg)
{
Context *ctx = (Context *) arg;
int32_t *p = ctx->p;
size_t repeatcount = ctx->repeatcount;
for (size_t i = 0; i < repeatcount; i++)
{
inc_singleinstr(p);
}
return NULL;
}
// Another attempt: Let's force all threads to run on the same CPU. That way,
// we still have _concurrency_ (multiple runnable threads exist at the same
// time), but not _parallelism_ (because only 1 thread is being run at any
// time).
// This reduces the chance of lost increments dramatically.
// But it doesn't entirely prevent them: The OS can still switch threads in the
// middle of the instruction sequence that increments the value.
static void *threadfunc_samecpu(void *arg)
{
Context *ctx = (Context *) arg;
int32_t *p = ctx->p;
size_t repeatcount = ctx->repeatcount;
force_thread_to_cpu(0);
for (size_t i = 0; i < repeatcount; i++)
{
inc_multiinstr(p);
}
return NULL;
}
// Combine the previous attempts: Use only a single instruction and run only on
// one CPU. This finally works, we never lose an increment.
// But it's not very satisfying because we don't really exploit parallelism.
// We never run multiple threads at the same time.
static void *threadfunc_samecpu_singleinstr(void *arg)
{
Context *ctx = (Context *) arg;
int32_t *p = ctx->p;
size_t repeatcount = ctx->repeatcount;
force_thread_to_cpu(0);
for (size_t i = 0; i < repeatcount; i++)
{
inc_singleinstr(p);
}
return NULL;
}
// The easiest and most canonic way to implement transactions is to use a mutex.
// Using a mutex we can make a sequence of instructions effectively atomic.
// Such a sequence is called a "critical section".
// Only one thread can run the increment-procedure at the same time.
// IOW There can still be parallelism and most of the code can run on multiple
// CPUs at the same time. But we prevent parallelism for the critical section!
// (NOTE: mutex is a data object and to be precise we don't really prevent
// running the increment function in parallel. But we prevent multiple threads
// "holding" the mutex object at the same time.)
static void *threadfunc_mutex(void *arg)
{
Context *ctx = (Context *) arg;
int32_t *p = ctx->p;
size_t repeatcount = ctx->repeatcount;
pthread_mutex_t *mutex = ctx->mutex;
for (size_t i = 0; i < repeatcount; i++)
{
pthread_mutex_lock(mutex);
inc_multiinstr(p);
pthread_mutex_unlock(mutex);
}
return NULL;
}
// Increment test using a single atomic instruction. This is similar to the
// "single_instruction" test but can run in parallel on multiple CPUs even
// without a mutex. The locking happens completely in hardware -- the cache
// line holding the counter will be locked while the value gets read,
// incremented, and written back.
// Atomic instructions are much slower than non-atomic ones, because
// technically there is still locking. But compared to using only mutexes, much
// faster solutions can be created using atomics. Mutexes are a higher-level
// concept and they are in fact implemented by combining atomics and operating
// system facilities. By using atomics directly, faster solutions can often be
// created where performance is a problem.
// The advantage of mutexes is that they are very generally applicable.
// Creating thread-safe functionality using only atomic building blocks that
// are more performant than just using mutexes can be challenging in general.
// (For the integer increment test, it's very easy).
static void *threadfunc_atomic(void *arg)
{
Context *ctx = (Context *) arg;
int32_t *p = ctx->p;
size_t repeatcount = ctx->repeatcount;
for (size_t i = 0; i < repeatcount; i++)
{
inc_atomic(p);
}
return NULL;
}
static struct timespec gettime(void)
{
struct timespec ts;
if (clock_gettime(CLOCK_MONOTONIC, &ts) < 0)
fatal("clock_gettime(CLOCK_MONOTONIC): %s", strerror(errno));
return ts;
}
static struct timespec time_difference(struct timespec a, struct timespec b)
{
struct timespec out;
if (a.tv_nsec <= b.tv_nsec)
{
out.tv_sec = a.tv_sec - b.tv_sec - 1;
out.tv_nsec = (long) a.tv_nsec + 1000000000l - (long) b.tv_nsec;
}
else
{
out.tv_sec = a.tv_sec - b.tv_sec;
out.tv_nsec = a.tv_nsec - b.tv_nsec;
}
return out;
}
void run_test(const char *name, void *(*threadfunc)(void *))
{
printf("\nRunning test: %s\n", name);
int32_t x = 0;
pthread_t th[NUM_INCREMENTER_THREADS];
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
Context ctx;
ctx.p = &x;
ctx.mutex = &mutex;
ctx.repeatcount = 20000000;
for (size_t i = 0; i < NUM_INCREMENTER_THREADS; i++)
{
int err = pthread_create(&th[i], NULL, threadfunc, &ctx);
if (err)
fatal("pthread_create(): %s\n", strerror(err));
}
struct timespec starttime = gettime();
for (size_t i = 0; i < NUM_INCREMENTER_THREADS; i++)
{
int err = pthread_join(th[i], NULL);
if (err)
fatal("pthread_join(): %s\n", strerror(err));
}
struct timespec endtime = gettime();
int32_t expected = NUM_INCREMENTER_THREADS * ctx.repeatcount;
if (x == expected)
printf("final value is %d (OK)\n", (int) x);
else
{
printf("\033[31;1;4m");
printf("final value is %d (FAIL, expected: %d)\n", (int) x, (int) expected);
printf("\033[0m");
}
struct timespec time_taken = time_difference(endtime, starttime);
printf("Time taken: %ld.%.3ds\n",
(long) time_taken.tv_sec, (int) (time_taken.tv_nsec / 1000000));
}
int main()
{
run_test("unsynchronized", threadfunc_unsynchronized);
printf("This test has almost 100%% chance of failing)\n");
run_test("singleinstr", threadfunc_singleinstr);
printf("(Only a marginal improvement over 'unsynchronized')\n");
run_test("samecpu", threadfunc_samecpu);
printf("(This test still has a bug but is dramatically less likely to fail)\n");
run_test("samecpu_singleinstr", threadfunc_samecpu_singleinstr);
printf("(Combining the 2 previous attempts. I believe this to be correct)\n");
run_test("atomic", threadfunc_atomic);
run_test("mutex", threadfunc_mutex);
printf("(Using a mutex to make the increment code an atomic transaction)\n");
printf(R"""(
EXERCISE
========
Compare the running times. Understand why they differ so much, by making
yourself familiar with what happens on the lower levels (machine instructions,
OS scheduler, CPU caches & memory). There are some comments in the code
describing what the individual tests are doing.
Note regarding 'samecpu_singleinstr': I believe this approach to be correct,
but its only purpose is to help understanding. It's not a practical approach.
Note regarding mutexes: Mutexes are the generic solution to make transaction
atomic. They work with any sequence of instructions. And they should usually be
the first choice, being also performant enough in most situations. But in some
situations they can be slower than other approaches. For good performance, It's
important to understand what they do, how to use them, and when to use, or
combine with, other approaches. Possibilities include using CPU atomics
directly to implement lock-fre algorithms, or to improve the program's
architecture to avoid contention on the mutex.
)""");
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment