Skip to content

Instantly share code, notes, and snippets.

@RealNeGate
Last active July 27, 2025 16:05
Show Gist options
  • Select an option

  • Save RealNeGate/7dd84f7b6ef37affedcbacf27bc4e52f to your computer and use it in GitHub Desktop.

Select an option

Save RealNeGate/7dd84f7b6ef37affedcbacf27bc4e52f to your computer and use it in GitHub Desktop.
////////////////////////////////
// NBHS - Non-blocking hashset
////////////////////////////////
// You wanna intern lots of things on lots of cores? this is for you. It's
// inspired by Cliff's non-blocking hashmap.
//
// Interning should be wait-free given the hashset's not completely full
#ifndef NBHS_H
#define NBHS_H
typedef void* (*NBHS_AllocZeroMem)(size_t size);
typedef void (*NBHS_FreeMem)(void* ptr, size_t size);
typedef uint32_t (*NBHS_Hash)(const void* a);
typedef bool (*NBHS_Compare)(const void* a, const void* b);
typedef struct NBHS_Table NBHS_Table;
typedef struct {
NBHS_AllocZeroMem alloc_mem;
NBHS_FreeMem free_mem;
NBHS_Compare compare;
NBHS_Hash hash;
_Atomic(NBHS_Table*) latest;
} NBHS;
NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem free_mem, NBHS_Compare compare, NBHS_Hash hash);
void nbhs_free(NBHS* hs);
void* nbhs_intern(NBHS* hs, void* val);
void* nbhs_get(NBHS* hs, void* val);
// thread-unsafe insert, useful during init since it's faster
// than the thread-safe stuff.
void nbhs_raw_insert(NBHS* hs, void* val);
void nbhs_resize_barrier(NBHS* hs);
#endif // NBHS_H
#ifdef NBHS_IMPL
// personal debooging stuff
#if 1
#define NBHS__BEGIN(name)
#define NBHS__END()
#else
#define NBHS__BEGIN(name) spall_auto_buffer_begin(name, sizeof(name) - 1, NULL, 0)
#define NBHS__END() spall_auto_buffer_end()
#endif
// for the time in the ebr entry
#define NBHS_LOCKED_BIT (1ull << 63ull)
struct NBHS_Table {
_Atomic(NBHS_Table*) prev;
uint32_t exp;
// tracks how many entries have
// been moved once we're resizing
_Atomic uint32_t moved;
_Atomic uint32_t move_done;
_Atomic uint32_t count;
_Atomic(void*) data[];
};
typedef struct NBHS_EBREntry {
_Atomic(struct NBHS_EBREntry*) next;
_Atomic(uint64_t) time;
// keep on a separate cacheline to avoid false sharing
_Alignas(64) int id;
} NBHS_EBREntry;
static _Thread_local bool nbhs_ebr_init;
static _Thread_local NBHS_EBREntry nbhs_ebr;
static _Atomic(int) nbhs_ebr_count;
static _Atomic(NBHS_EBREntry*) nbhs_ebr_list;
static void* nbhs__alloc_zero_mem(size_t s) { return calloc(1, s); }
static void nbhs__free_mem(void* ptr, size_t s) { free(ptr); }
static void* nbhs_raw_lookup(NBHS* hs, NBHS_Table* table, uint32_t h, void* val) {
size_t mask = (1 << table->exp) - 1;
size_t first = h & mask, i = first;
do {
void* entry = atomic_load(&table->data[i]);
if (entry == NULL) {
return NULL;
} else if (hs->compare(entry, val)) {
return entry;
}
i = (i + 1) & mask;
} while (i != first);
return NULL;
}
static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, void* val) {
// resize on 50% load factor and we're not in the moving process rn
uint32_t threshold = (1 << latest->exp) / 2;
if (__builtin_expect(latest->count >= threshold && prev == NULL, 0)) {
// make resized table, we'll amortize the moves upward
size_t new_cap = 1ull << (latest->exp + 1);
NBHS_Table* new_top = hs->alloc_mem(sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t));
new_top->exp = latest->exp + 1;
// CAS latest -> new_table, if another thread wins the race we'll use its table
new_top->prev = latest;
if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) {
hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(void*));
} else {
prev = latest;
latest = new_top;
}
}
// actually lookup & insert
uint32_t exp = latest->exp;
size_t mask = (1 << exp) - 1;
void* result = NULL;
uint32_t h = hs->hash(val);
for (;;) {
size_t first = h & mask, i = first;
do {
void* entry = atomic_load(&latest->data[i]);
if (entry == NULL) {
void* to_write = val;
if (prev != NULL) {
// should only be one previous table
assert(prev->prev == NULL);
void* old = nbhs_raw_lookup(hs, prev, h, val);
if (old != NULL) {
to_write = old;
}
}
// fight to be the one to land into the modern table
if (atomic_compare_exchange_strong(&latest->data[i], &entry, to_write)) {
result = to_write;
// count doesn't care that it's a migration, it's at least not replacing an existing
// slot in this version of the table.
atomic_fetch_add_explicit(&latest->count, 1, memory_order_relaxed);
break;
}
}
if (hs->compare(entry, val)) {
return entry;
}
i = (i + 1) & mask;
} while (i != first);
// if the table changed before our eyes, it means someone resized which sucks
// but it just means we need to retry
NBHS_Table* new_latest = atomic_load(&hs->latest);
if (latest == new_latest && result != NULL) {
return result;
}
latest = new_latest;
prev = atomic_load(&latest->prev);
}
}
void nbhs_raw_insert(NBHS* hs, void* val) {
NBHS_Table* table = hs->latest;
uint32_t h = hs->hash(val);
size_t mask = (1 << table->exp) - 1;
size_t first = h & mask, i = first;
do {
void* entry = atomic_load_explicit(&table->data[i], memory_order_relaxed);
if (entry == NULL) {
atomic_store_explicit(&table->data[i], val, memory_order_relaxed);
atomic_fetch_add_explicit(&table->count, 1, memory_order_relaxed);
return;
}
assert(!hs->compare((void*) entry, val));
i = (i + 1) & mask;
} while (i != first);
abort();
}
NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem free_mem, NBHS_Compare compare, NBHS_Hash hash) {
if (alloc_mem == NULL) {
assert(free_mem == NULL);
alloc_mem = nbhs__alloc_zero_mem;
free_mem = nbhs__free_mem;
}
size_t exp = 64 - __builtin_clzll(initial_cap - 1);
NBHS_Table* table = alloc_mem(sizeof(NBHS_Table) + (1ull << exp)*sizeof(void*));
table->exp = exp;
return (NBHS){
.alloc_mem = alloc_mem,
.free_mem = free_mem,
.compare = compare,
.hash = hash,
.latest = table
};
}
void nbhs_free(NBHS* hs) {
NBHS_Table* curr = hs->latest;
while (curr) {
NBHS_Table* next = curr->prev;
hs->free_mem(curr, sizeof(NBHS_Table) + (1ull*curr->exp)*sizeof(void*));
curr = next;
}
}
// flips the top bit on
static void nbhs_enter_critsec(void) { atomic_fetch_add(&nbhs_ebr.time, NBHS_LOCKED_BIT); }
// flips the top bit off AND increments time by one
static void nbhs_exit_critsec(void) { atomic_fetch_add(&nbhs_ebr.time, NBHS_LOCKED_BIT + 1); }
NBHS_Table* nbhs_move_items(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int items_to_move) {
assert(prev);
size_t cap = 1ull << prev->exp;
// snatch up some number of items
uint32_t old, new;
do {
old = atomic_load(&prev->moved);
if (old == cap) { return prev; }
// cap the number of items to copy... by the cap
new = old + items_to_move;
if (new > cap) { new = cap; }
} while (!atomic_compare_exchange_strong(&prev->moved, &(uint32_t){ old }, new));
NBHS__BEGIN("copying old");
for (size_t i = old; i < new; i++) {
// either NULL or complete can go thru without waiting
void* old_p = atomic_load(&prev->data[i]);
if (old_p) {
// we pass NULL for prev since we already know the entries exist in prev
nbhs_raw_intern(hs, latest, NULL, old_p);
}
}
NBHS__END();
uint32_t done = atomic_fetch_add(&prev->move_done, new - old);
done += new - old;
assert(done <= cap);
if (done == cap) {
// dettach now
NBHS__BEGIN("detach");
assert(prev->prev == NULL);
latest->prev = NULL;
// since we're freeing at the moment, we don't want to block up other freeing threads
nbhs_exit_critsec();
NBHS__BEGIN("scan");
int state_count = nbhs_ebr_count;
uint64_t* states = hs->alloc_mem(state_count * sizeof(uint64_t));
// check current state, once the other threads either advance or aren't in the
// lookup function we know we can free.
NBHS_EBREntry* us = &nbhs_ebr;
for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
// mark sure no ptrs refer to prev
if (us != list && list->id < state_count) {
states[list->id] = list->time;
}
}
// wait on each and every thread to make progress or not be in a critical section at the time.
for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
// mark sure no ptrs refer to prev
if (us != list && list->id < state_count && (states[list->id] & NBHS_LOCKED_BIT)) {
uint64_t before_t = states[list->id], now_t;
do {
// idk, maybe this should be a better spinlock
now_t = atomic_load(&list->time);
} while (before_t == now_t);
}
}
hs->free_mem(states, state_count * sizeof(uint64_t));
NBHS__END();
// no more refs, we can immediately free
hs->free_mem(prev, sizeof(NBHS_Table) + (1ull<<prev->exp)*sizeof(void*));
nbhs_enter_critsec();
prev = NULL;
NBHS__END();
}
return prev;
}
void* nbhs_get(NBHS* hs, void* val) {
NBHS__BEGIN("intern");
assert(val);
if (!nbhs_ebr_init) {
NBHS__BEGIN("init");
nbhs_ebr_init = true;
// add to ebr list, we never free this because i don't care
// TODO(NeGate): i do care, this is a nightmare when threads die figure it out
NBHS_EBREntry* old;
do {
old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed);
nbhs_ebr.next = old;
} while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr));
nbhs_ebr.id = nbhs_ebr_count++;
NBHS__END();
}
// modifying the tables is possible now.
nbhs_enter_critsec();
NBHS_Table* latest = atomic_load(&hs->latest);
// if there's earlier versions of the table we can move up entries as we go along.
NBHS_Table* prev = atomic_load(&latest->prev);
if (prev) {
prev = nbhs_move_items(hs, latest, prev, 32);
if (prev == NULL) {
latest = atomic_load(&hs->latest);
}
}
// just lookup into the tables, we don't need to reserve
// actually lookup & insert
uint32_t exp = latest->exp;
size_t mask = (1 << exp) - 1;
void* result = NULL;
uint32_t h = hs->hash(val);
size_t first = h & mask, i = first;
do {
void* entry = atomic_load(&latest->data[i]);
if (entry == NULL) {
if (prev != NULL) {
// should only be one previous table
assert(prev->prev == NULL);
result = nbhs_raw_lookup(hs, prev, h, val);
}
break;
}
if (hs->compare(entry, val)) {
result = entry;
break;
}
i = (i + 1) & mask;
} while (i != first);
nbhs_exit_critsec();
NBHS__END();
return result;
}
void* nbhs_intern(NBHS* hs, void* val) {
NBHS__BEGIN("intern");
assert(val);
if (!nbhs_ebr_init) {
NBHS__BEGIN("init");
nbhs_ebr_init = true;
// add to ebr list, we never free this because i don't care
NBHS_EBREntry* old;
do {
old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed);
nbhs_ebr.next = old;
} while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr));
nbhs_ebr.id = nbhs_ebr_count++;
NBHS__END();
}
// enter critical section, modifying the tables is possible now.
nbhs_enter_critsec();
NBHS_Table* latest = atomic_load(&hs->latest);
// if there's earlier versions of the table we can move up entries as we go along.
NBHS_Table* prev = atomic_load(&latest->prev);
if (prev) {
prev = nbhs_move_items(hs, latest, prev, 32);
if (prev == NULL) {
latest = atomic_load(&hs->latest);
}
}
void* result = nbhs_raw_intern(hs, latest, prev, val);
nbhs_exit_critsec();
NBHS__END();
return result;
}
// waits for all items to be moved up before continuing
void nbhs_resize_barrier(NBHS* hs) {
NBHS__BEGIN("intern");
if (!nbhs_ebr_init) {
NBHS__BEGIN("init");
nbhs_ebr_init = true;
// add to ebr list, we never free this because i don't care
NBHS_EBREntry* old;
do {
old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed);
nbhs_ebr.next = old;
} while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr));
nbhs_ebr.id = nbhs_ebr_count++;
NBHS__END();
}
// enter critical section, modifying the tables is possible now.
nbhs_enter_critsec();
NBHS_Table *prev, *latest = atomic_load(&hs->latest);
while (prev = atomic_load(&latest->prev), prev != NULL) {
nbhs_move_items(hs, latest, prev, 1ull << prev->exp);
}
nbhs_exit_critsec();
NBHS__END();
}
#endif // NBHS_IMPL
#define _CRT_SECURE_NO_WARNINGS
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <stddef.h>
#include <stdlib.h>
#include <assert.h>
#include <stdbool.h>
#include <stdatomic.h>
#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <process.h>
#else
#include <pthread.h>
#endif
#define USE_SPALL 0
#if USE_SPALL
#include "spall_native_auto.h"
#else
#define spall_auto_buffer_begin(...)
#define spall_auto_buffer_end(...)
#endif
#define NBHS_IMPL
#include "nbhs.h"
static int num_threads;
typedef struct {
NBHS_Compare compare;
NBHS_Hash hash;
pthread_mutex_t lock;
size_t exp;
void* data[];
} LockedHS;
void* lhs_intern(LockedHS* hs, void* val) {
NBHS__BEGIN("intern");
if (num_threads > 1) { pthread_mutex_lock(&hs->lock); }
// actually lookup & insert
uint32_t exp = hs->exp;
size_t mask = (1 << exp) - 1;
void* result = NULL;
uint32_t h = hs->hash(val);
size_t first = h & mask, i = first;
do {
if (hs->data[i] == NULL) {
hs->data[i] = result = val;
break;
} else if (hs->compare(hs->data[i], val)) {
result = hs->data[i];
break;
}
i = (i + 1) & mask;
} while (i != first);
assert(result != NULL);
if (num_threads > 1) { pthread_mutex_unlock(&hs->lock); }
NBHS__END();
return result;
}
static uint32_t my_hash(const void* a) {
const uint8_t* data = a;
uint32_t h = 0x811C9DC5;
for (size_t i = 0; i < 4; i++) {
h = (data[i] ^ h) * 0x01000193;
}
return h;
// return (*(const uint32_t*)a * 11400714819323198485ULL) >> 32ull;
}
static bool my_cmp(const void* a, const void* b) { return *(const uint32_t*)a == *(const uint32_t*)b; }
// https://github.com/demetri/scribbles/blob/master/randomness/prngs.c
uint32_t pcg32_pie(uint64_t *state) {
uint64_t old = *state ^ 0xc90fdaa2adf85459ULL;
*state = *state * 6364136223846793005ULL + 0xc90fdaa2adf85459ULL;
uint32_t xorshifted = ((old >> 18u) ^ old) >> 27u;
uint32_t rot = old >> 59u;
return (xorshifted >> rot) | (xorshifted << ((-rot) & 31));
}
static LockedHS* test_lhs;
static NBHS test_set;
static int attempts; // per thread
static bool testing_lhs;
static int* thread_stats;
#ifdef _WIN32
static unsigned int test_thread_fn(void* arg)
#else
static void* test_thread_fn(void* arg)
#endif
{
uintptr_t starting_id = (uintptr_t) arg;
uint64_t seed = starting_id * 11400714819323198485ULL;
#if USE_SPALL
spall_auto_thread_init(starting_id, SPALL_DEFAULT_BUFFER_SIZE);
spall_auto_buffer_begin("work", 4, NULL, 0);
#endif
int* stats = &thread_stats[starting_id*16];
uint32_t* arr = malloc(attempts * sizeof(uint32_t));
if (testing_lhs) {
for (int i = 0; i < attempts; i++) {
arr[i] = pcg32_pie(&seed) & 0xFFFF;
if (lhs_intern(test_lhs, &arr[i]) == &arr[i]) {
stats[0] += 1; // insertions
} else {
stats[1] += 1; // duplicate
}
}
} else {
for (int i = 0; i < attempts;) {
int limit = i + 128;
if (limit > attempts) { limit = attempts; }
spall_auto_buffer_begin("batch", 5, NULL, 0);
for (; i < limit; i++) {
arr[i] = pcg32_pie(&seed) & 0xFFFF;
if (nbhs_intern(&test_set, &arr[i]) == &arr[i]) {
stats[0] += 1; // insertions
} else {
stats[1] += 1; // duplicate
}
}
spall_auto_buffer_end();
}
}
#if USE_SPALL
spall_auto_buffer_end();
spall_auto_thread_quit();
#endif
return 0;
}
int main(int argc, char** argv) {
#if USE_SPALL
spall_auto_init((char *)"profile.spall");
spall_auto_thread_init(0, SPALL_DEFAULT_BUFFER_SIZE);
#endif
num_threads = atoi(argv[1]);
printf("Testing with %d threads\n", num_threads);
if (argc >= 3 && strcmp(argv[2], "lhs") == 0) {
testing_lhs = true;
printf(" With Locked hashset...\n");
}
// attempts = 1000000000 / threads;
attempts = 50000000 / num_threads;
thread_stats = calloc(num_threads, 64 * sizeof(int));
if (testing_lhs) {
test_lhs = calloc(sizeof(LockedHS) + 262144*sizeof(void*), 1);
test_lhs->exp = 18;
test_lhs->compare = my_cmp;
test_lhs->hash = my_hash;
} else {
test_set = nbhs_alloc(32, NULL, NULL, my_cmp, my_hash);
}
#ifdef _WIN32
HANDLE* arr = malloc(num_threads * sizeof(HANDLE));
for (int i = 0; i < num_threads; i++) {
arr[i] = (HANDLE) _beginthreadex(NULL, 0, test_thread_fn, (void*) (uintptr_t) i, 0, 0);
}
WaitForMultipleObjects(num_threads, arr, true, INFINITE);
#else
pthread_t* arr = malloc(num_threads * sizeof(pthread_t));
for (int i = 0; i < num_threads; i++) {
pthread_create(&arr[i], NULL, test_thread_fn, (void*) (uintptr_t) i);
}
for (int i = 0; i < num_threads; i++) {
pthread_join(arr[i], NULL);
}
#endif
int inserted = 0, duplicates = 0;
for (int i = 0; i < num_threads; i++) {
inserted += thread_stats[i*16 + 0];
duplicates += thread_stats[i*16 + 1];
}
printf("%d + %d = %d (needed %d)\n", inserted, duplicates, inserted + duplicates, attempts*num_threads);
if (inserted + duplicates != attempts*num_threads) {
printf("FAIL!\n");
abort();
}
#if USE_SPALL
spall_auto_thread_quit();
spall_auto_quit();
#endif
return 0;
}
#if USE_SPALL
#define SPALL_AUTO_IMPLEMENTATION
#include "spall_native_auto.h"
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment