Skip to content

Instantly share code, notes, and snippets.

@RealNeGate
Last active July 27, 2025 16:05
Show Gist options
  • Select an option

  • Save RealNeGate/7dd84f7b6ef37affedcbacf27bc4e52f to your computer and use it in GitHub Desktop.

Select an option

Save RealNeGate/7dd84f7b6ef37affedcbacf27bc4e52f to your computer and use it in GitHub Desktop.

Revisions

  1. RealNeGate revised this gist Nov 14, 2024. 1 changed file with 8 additions and 1 deletion.
    9 changes: 8 additions & 1 deletion nbhs.h
    Original file line number Diff line number Diff line change
    @@ -30,9 +30,14 @@

    // Virtual memory allocation (since the tables are generally nicely page-size friendly)
    #ifdef _WIN32
    #define WIN32_LEAN_AND_MEAN
    #include <windows.h>

    #define NBHS_VIRTUAL_ALLOC(size) VirtualAlloc(NULL, size, MEM_RESERVE | MEM_COMMIT, PAGE_READWRITE)
    #define NBHS_VIRTUAL_FREE(ptr, size) VirtualFree(ptr, size, MEM_RELEASE)
    #else
    #include <sys/mman.h>

    #define NBHS_VIRTUAL_ALLOC(size) mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0)
    #define NBHS_VIRTUAL_FREE(ptr, size) munmap(ptr, size)
    #endif
    @@ -355,6 +360,7 @@ NBHS_Table* NBHS_FN(move_items)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev,
    // dettach now
    NBHS__BEGIN("detach");
    latest->prev = NULL;
    NBHS_FN(exit_pinned)();

    int state_count = nbhs_ebr_count;
    uint64_t* states = NBHS_REALLOC(NULL, state_count * sizeof(uint64_t));
    @@ -407,6 +413,7 @@ NBHS_Table* NBHS_FN(move_items)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev,
    NBHS_VIRTUAL_FREE(prev, sizeof(NBHS_Table) + prev->cap*sizeof(void*));
    NBHS_REALLOC(states, 0);

    NBHS_FN(enter_pinned)();
    prev = NULL;
    NBHS__END();
    }
    @@ -521,4 +528,4 @@ void NBHS_FN(resize_barrier)(NBHS* hs) {
    }

    #undef NBHS_FN
    #endif // NBHS_FN
    #endif // NBHS_FN
  2. RealNeGate revised this gist Nov 9, 2024. 1 changed file with 48 additions and 104 deletions.
    152 changes: 48 additions & 104 deletions nbhs.h
    Original file line number Diff line number Diff line change
    @@ -46,8 +46,6 @@
    #define NBHS__DEBOOGING 0

    #if NBHS__DEBOOGING
    #define NBHS__THREAD_START(id) spall_auto_thread_init(id, SPALL_DEFAULT_BUFFER_SIZE)
    #define NBHS__THREAD_END() spall_auto_thread_quit()
    #define NBHS__BEGIN(name) spall_auto_buffer_begin(name, sizeof(name) - 1, NULL, 0)
    #define NBHS__END() spall_auto_buffer_end()
    #else
    @@ -166,90 +164,6 @@ _Thread_local bool nbhs_ebr_init;
    _Thread_local NBHS_EBREntry nbhs_ebr;
    _Atomic(int) nbhs_ebr_count;
    _Atomic(NBHS_EBREntry*) nbhs_ebr_list;

    static NBHS_FreeQueueNode NBHS_DUMMY = { 0 };

    // freeing queue
    _Atomic(NBHS_FreeQueueNode*) nbhs_free_queue_write = &NBHS_DUMMY;
    _Atomic(NBHS_FreeQueueNode*) nbhs_free_queue_read = &NBHS_DUMMY;

    atomic_flag nbhs_thread_init;
    thrd_t nbhs_thread;

    int nbhs_thread_fn(void* arg) {
    int state_cap = 0;
    uint64_t* states = NULL;

    for (;;) retry: {
    // if we can move the read head, we can take the item
    NBHS_FreeQueueNode *old_reader, *next;
    do {
    old_reader = atomic_load_explicit(&nbhs_free_queue_read, memory_order_relaxed);
    next = old_reader->next;
    if (next == NULL) {
    // queue is empty
    goto retry;
    }
    } while (!atomic_compare_exchange_strong(&nbhs_free_queue_read, &old_reader, next));

    // we can take *technically* sweet time resetting next_compile, it's not on the queue
    // rn so it's not visible to other threads.
    atomic_store_explicit(&next->next, NULL, memory_order_release);

    // ensure there's enough elements in the state array for this task
    int state_count = nbhs_ebr_count;
    if (state_cap < state_count) {
    state_cap = 1ull << (64 - __builtin_clzll(state_count - 1));
    states = NBHS_REALLOC(states, state_cap * sizeof(uint64_t));
    }

    // "snapshot" the current statuses, once the other threads either advance or aren't in the
    // hashset functions we know we can free.
    for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    // mark sure no ptrs refer to prev
    if (list->id < state_count) {
    states[list->id] = list->time;
    }
    }

    // important bit is that pointers can't be held across the critical sections, they'd need
    // to reload from `NBHS.latest`.
    //
    // Here's the states of our "epoch" critical section thingy:
    //
    // UNPINNED(id) -> PINNED(id) -> UNPINNED(id + 1) -> UNPINNED(id + 1) -> ...
    //
    // survey on if we can free the pointer if the status changed from X -> Y:
    //
    // # YES: if we started unlocked then we weren't holding pointers in the first place.
    // UNPINNED(A) -> PINNED(A)
    // UNPINNED(A) -> UNPINNED(A)
    // UNPINNED(A) -> UNPINNED(B)
    //
    // # YES: if we're locked we need to wait until we've stopped holding pointers.
    // PINNED(A) -> PINNED(B) we're a different call so we've let it go by now.
    // PINNED(A) -> UNPINNED(B) we've stopped caring about the state of the pointer at this point.
    //
    // # NO: we're still doing shit, wait a sec.
    // PINNED(A) -> PINNED(A)
    //
    // these aren't quite blocking the other threads, we're simply checking what their progress is concurrently.
    for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    if (list->id < state_count && (states[list->id] & NBHS_PINNED_BIT)) {
    uint64_t before_t = states[list->id], now_t;
    do {
    // idk, maybe this should be a better spinlock
    now_t = atomic_load(&list->time);
    } while (before_t == now_t);
    }
    }

    // no more refs, we can immediately free
    NBHS_Table* table = next->table;
    NBHS_VIRTUAL_FREE(table, sizeof(NBHS_Table) + table->cap*sizeof(void*));
    NBHS_REALLOC(next, 0);
    }
    }
    #endif // NBHS_IMPL

    // Templated implementation
    @@ -258,10 +172,6 @@ extern _Thread_local bool nbhs_ebr_init;
    extern _Thread_local NBHS_EBREntry nbhs_ebr;
    extern _Atomic(int) nbhs_ebr_count;
    extern _Atomic(NBHS_EBREntry*) nbhs_ebr_list;
    extern _Atomic(NBHS_FreeQueueNode*) nbhs_free_queue_write;
    extern _Atomic(NBHS_FreeQueueNode*) nbhs_free_queue_read;
    extern atomic_flag nbhs_thread_init;
    extern thrd_t nbhs_thread;

    extern int nbhs_thread_fn(void*);

    @@ -446,24 +356,58 @@ NBHS_Table* NBHS_FN(move_items)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev,
    NBHS__BEGIN("detach");
    latest->prev = NULL;

    if (!atomic_flag_test_and_set(&nbhs_thread_init)) {
    // spawn our lovely "GC" thread
    thrd_create(&nbhs_thread, nbhs_thread_fn, NULL);
    int state_count = nbhs_ebr_count;
    uint64_t* states = NBHS_REALLOC(NULL, state_count * sizeof(uint64_t));

    NBHS__BEGIN("scan");
    NBHS_EBREntry* us = &nbhs_ebr;
    // "snapshot" the current statuses, once the other threads either advance or aren't in the
    // hashset functions we know we can free.
    for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    // mark sure no ptrs refer to prev
    if (list != us && list->id < state_count) {
    states[list->id] = list->time;
    }
    }

    NBHS_FreeQueueNode* node = NBHS_REALLOC(NULL, sizeof(NBHS_FreeQueueNode));
    node->next = NULL;
    node->table = prev;
    // important bit is that pointers can't be held across the critical sections, they'd need
    // to reload from `NBHS.latest`.
    //
    // Here's the states of our "epoch" critical section thingy:
    //
    // UNPINNED(id) -> PINNED(id) -> UNPINNED(id + 1) -> UNPINNED(id + 1) -> ...
    //
    // survey on if we can free the pointer if the status changed from X -> Y:
    //
    // # YES: if we started unlocked then we weren't holding pointers in the first place.
    // UNPINNED(A) -> PINNED(A)
    // UNPINNED(A) -> UNPINNED(A)
    // UNPINNED(A) -> UNPINNED(B)
    //
    // # YES: if we're locked we need to wait until we've stopped holding pointers.
    // PINNED(A) -> PINNED(B) we're a different call so we've let it go by now.
    // PINNED(A) -> UNPINNED(B) we've stopped caring about the state of the pointer at this point.
    //
    // # NO: we're still doing shit, wait a sec.
    // PINNED(A) -> PINNED(A)
    //
    // these aren't quite blocking the other threads, we're simply checking what their progress is concurrently.
    for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    if (list != us && list->id < state_count && (states[list->id] & NBHS_PINNED_BIT)) {
    uint64_t before_t = states[list->id], now_t;
    do {
    // idk, maybe this should be a better spinlock
    now_t = atomic_load(&list->time);
    } while (before_t == now_t);
    }
    }
    NBHS__END();

    // queue into the "GC" thread
    // .next: NULL -> fn
    NBHS_FreeQueueNode *null, *old_writer;
    do {
    old_writer = atomic_load_explicit(&nbhs_free_queue_write, memory_order_relaxed);
    } while (null = NULL, !atomic_compare_exchange_strong(&nbhs_free_queue_write->next, &null, node));
    // no more refs, we can immediately free
    NBHS_VIRTUAL_FREE(prev, sizeof(NBHS_Table) + prev->cap*sizeof(void*));
    NBHS_REALLOC(states, 0);

    // once this store happens, the other threads can start submitting again
    atomic_store_explicit(&nbhs_free_queue_write, node, memory_order_release);
    prev = NULL;
    NBHS__END();
    }
    return prev;
  3. RealNeGate revised this gist Nov 9, 2024. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions nbhs.h
    Original file line number Diff line number Diff line change
    @@ -261,6 +261,7 @@ extern _Atomic(NBHS_EBREntry*) nbhs_ebr_list;
    extern _Atomic(NBHS_FreeQueueNode*) nbhs_free_queue_write;
    extern _Atomic(NBHS_FreeQueueNode*) nbhs_free_queue_read;
    extern atomic_flag nbhs_thread_init;
    extern thrd_t nbhs_thread;

    extern int nbhs_thread_fn(void*);

  4. RealNeGate revised this gist Nov 9, 2024. 2 changed files with 269 additions and 178 deletions.
    401 changes: 253 additions & 148 deletions nbhs.h
    Original file line number Diff line number Diff line change
    @@ -23,15 +23,44 @@
    #ifndef NBHS_H
    #define NBHS_H

    typedef void* (*NBHS_AllocZeroMem)(size_t size);
    typedef void (*NBHS_FreeMem)(void* ptr, size_t size);
    #include <threads.h>
    #include <stdint.h>
    #include <stddef.h>
    #include <stdatomic.h>

    // Virtual memory allocation (since the tables are generally nicely page-size friendly)
    #ifdef _WIN32
    #define NBHS_VIRTUAL_ALLOC(size) VirtualAlloc(NULL, size, MEM_RESERVE | MEM_COMMIT, PAGE_READWRITE)
    #define NBHS_VIRTUAL_FREE(ptr, size) VirtualFree(ptr, size, MEM_RELEASE)
    #else
    #define NBHS_VIRTUAL_ALLOC(size) mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0)
    #define NBHS_VIRTUAL_FREE(ptr, size) munmap(ptr, size)
    #endif

    // traditional heap ops
    #ifndef NBHS_REALLOC
    #define NBHS_REALLOC(ptr, size) realloc(ptr, size)
    #endif // NBHS_REALLOC

    // personal debooging stuff
    #define NBHS__DEBOOGING 0

    #if NBHS__DEBOOGING
    #define NBHS__THREAD_START(id) spall_auto_thread_init(id, SPALL_DEFAULT_BUFFER_SIZE)
    #define NBHS__THREAD_END() spall_auto_thread_quit()
    #define NBHS__BEGIN(name) spall_auto_buffer_begin(name, sizeof(name) - 1, NULL, 0)
    #define NBHS__END() spall_auto_buffer_end()
    #else
    #define NBHS__BEGIN(name)
    #define NBHS__END()
    #endif

    // for the time in the ebr entry
    #define NBHS_LOCKED_BIT (1ull << 63ull)
    #define NBHS_PINNED_BIT (1ull << 63ull)

    enum {
    NBHS_LOAD_FACTOR = 75,
    NBHS_MOVE_AMOUNT = 512,
    NBHS_MOVE_AMOUNT = 128,
    };

    typedef struct NBHS_EBREntry {
    @@ -46,7 +75,10 @@ typedef struct NBHS_Table NBHS_Table;
    struct NBHS_Table {
    _Atomic(NBHS_Table*) prev;

    uint32_t exp;
    uint32_t cap;

    // reciprocals to compute modulo
    uint64_t a, sh;

    // tracks how many entries have
    // been moved once we're resizing
    @@ -58,44 +90,73 @@ struct NBHS_Table {
    };

    typedef struct {
    NBHS_AllocZeroMem alloc_mem;
    NBHS_FreeMem free_mem;
    _Atomic(NBHS_Table*) latest;
    } NBHS;

    static void* nbhs__alloc_zero_mem(size_t s) { return calloc(1, s); }
    static void nbhs__free_mem(void* ptr, size_t s) { free(ptr); }
    typedef struct NBHS_FreeQueueNode NBHS_FreeQueueNode;
    struct NBHS_FreeQueueNode {
    _Atomic(NBHS_FreeQueueNode*) next;
    NBHS_Table* table;
    };

    static NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem free_mem) {
    if (alloc_mem == NULL) {
    assert(free_mem == NULL);
    alloc_mem = nbhs__alloc_zero_mem;
    free_mem = nbhs__free_mem;
    static size_t nbhs_compute_cap(size_t y) {
    // minimum capacity
    if (y < 512) {
    y = 512;
    } else {
    y = ((y + 1) / 3) * 4;
    }

    size_t exp = 64 - __builtin_clzll(initial_cap - 1);
    NBHS_Table* table = alloc_mem(sizeof(NBHS_Table) + (1ull << exp)*sizeof(void*));
    table->exp = exp;
    return (NBHS){
    .alloc_mem = alloc_mem,
    .free_mem = free_mem,
    .latest = table
    };
    size_t cap = 1ull << (64 - __builtin_clzll(y - 1));
    return cap - (sizeof(NBHS_Table) / sizeof(void*));
    }

    static void nbhs_compute_size(NBHS_Table* table, size_t cap) {
    // reciprocals to compute modulo
    #if defined(__GNUC__) || defined(__clang__)
    table->sh = 64 - __builtin_clzll(cap);
    #else
    uint64_t sh = 0;
    while (cap > (1ull << sh)){ sh++; }
    table->sh = sh;
    #endif

    table->sh += 63 - 64;

    #if (defined(__GNUC__) || defined(__clang__)) && defined(__x86_64__)
    uint64_t d,e;
    __asm__("divq %[v]" : "=a"(d), "=d"(e) : [v] "r"(cap), "a"(cap - 1), "d"(1ull << table->sh));
    table->a = d;
    #elif defined(_MSC_VER)
    uint64_t rem;
    table->a = _udiv128(1ull << table->sh, cap - 1, cap, &rem);
    #else
    #error "Unsupported target"
    #endif

    table->cap = cap;
    }

    static NBHS nbhs_alloc(size_t initial_cap) {
    size_t cap = nbhs_compute_cap(initial_cap);
    NBHS_Table* table = NBHS_VIRTUAL_ALLOC(sizeof(NBHS_Table) + cap*sizeof(void*));
    nbhs_compute_size(table, cap);
    return (NBHS){ .latest = table };
    }

    static void nbhs_free(NBHS* hs) {
    NBHS_Table* curr = hs->latest;
    while (curr) {
    NBHS_Table* next = curr->prev;
    hs->free_mem(curr, sizeof(NBHS_Table) + (1ull*curr->exp)*sizeof(void*));
    NBHS_VIRTUAL_FREE(curr, sizeof(NBHS_Table) + curr->cap*sizeof(void*));
    curr = next;
    }
    }

    // for spooky stuff
    static void** nbhs_array(NBHS* hs) { return (void**) hs->latest->data; }
    static size_t nbhs_count(NBHS* hs) { return hs->latest->count; }
    static size_t nbhs_capacity(NBHS* hs) { return 1ull << hs->latest->exp; }
    static size_t nbhs_capacity(NBHS* hs) { return hs->latest->cap; }

    #define nbhs_for(it, hs) for (void **it = nbhs_array(hs), **_end_ = &it[nbhs_capacity(hs)]; it != _end_; it++) if (*it != NULL)
    #endif // NBHS_H
    @@ -105,36 +166,135 @@ _Thread_local bool nbhs_ebr_init;
    _Thread_local NBHS_EBREntry nbhs_ebr;
    _Atomic(int) nbhs_ebr_count;
    _Atomic(NBHS_EBREntry*) nbhs_ebr_list;

    static NBHS_FreeQueueNode NBHS_DUMMY = { 0 };

    // freeing queue
    _Atomic(NBHS_FreeQueueNode*) nbhs_free_queue_write = &NBHS_DUMMY;
    _Atomic(NBHS_FreeQueueNode*) nbhs_free_queue_read = &NBHS_DUMMY;

    atomic_flag nbhs_thread_init;
    thrd_t nbhs_thread;

    int nbhs_thread_fn(void* arg) {
    int state_cap = 0;
    uint64_t* states = NULL;

    for (;;) retry: {
    // if we can move the read head, we can take the item
    NBHS_FreeQueueNode *old_reader, *next;
    do {
    old_reader = atomic_load_explicit(&nbhs_free_queue_read, memory_order_relaxed);
    next = old_reader->next;
    if (next == NULL) {
    // queue is empty
    goto retry;
    }
    } while (!atomic_compare_exchange_strong(&nbhs_free_queue_read, &old_reader, next));

    // we can take *technically* sweet time resetting next_compile, it's not on the queue
    // rn so it's not visible to other threads.
    atomic_store_explicit(&next->next, NULL, memory_order_release);

    // ensure there's enough elements in the state array for this task
    int state_count = nbhs_ebr_count;
    if (state_cap < state_count) {
    state_cap = 1ull << (64 - __builtin_clzll(state_count - 1));
    states = NBHS_REALLOC(states, state_cap * sizeof(uint64_t));
    }

    // "snapshot" the current statuses, once the other threads either advance or aren't in the
    // hashset functions we know we can free.
    for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    // mark sure no ptrs refer to prev
    if (list->id < state_count) {
    states[list->id] = list->time;
    }
    }

    // important bit is that pointers can't be held across the critical sections, they'd need
    // to reload from `NBHS.latest`.
    //
    // Here's the states of our "epoch" critical section thingy:
    //
    // UNPINNED(id) -> PINNED(id) -> UNPINNED(id + 1) -> UNPINNED(id + 1) -> ...
    //
    // survey on if we can free the pointer if the status changed from X -> Y:
    //
    // # YES: if we started unlocked then we weren't holding pointers in the first place.
    // UNPINNED(A) -> PINNED(A)
    // UNPINNED(A) -> UNPINNED(A)
    // UNPINNED(A) -> UNPINNED(B)
    //
    // # YES: if we're locked we need to wait until we've stopped holding pointers.
    // PINNED(A) -> PINNED(B) we're a different call so we've let it go by now.
    // PINNED(A) -> UNPINNED(B) we've stopped caring about the state of the pointer at this point.
    //
    // # NO: we're still doing shit, wait a sec.
    // PINNED(A) -> PINNED(A)
    //
    // these aren't quite blocking the other threads, we're simply checking what their progress is concurrently.
    for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    if (list->id < state_count && (states[list->id] & NBHS_PINNED_BIT)) {
    uint64_t before_t = states[list->id], now_t;
    do {
    // idk, maybe this should be a better spinlock
    now_t = atomic_load(&list->time);
    } while (before_t == now_t);
    }
    }

    // no more refs, we can immediately free
    NBHS_Table* table = next->table;
    NBHS_VIRTUAL_FREE(table, sizeof(NBHS_Table) + table->cap*sizeof(void*));
    NBHS_REALLOC(next, 0);
    }
    }
    #endif // NBHS_IMPL

    // Templated implementation
    #ifdef NBHS_FN

    // personal debooging stuff
    #if 1
    #define NBHS__BEGIN(name)
    #define NBHS__END()
    #else
    #define NBHS__BEGIN(name) spall_auto_buffer_begin(name, sizeof(name) - 1, NULL, 0)
    #define NBHS__END() spall_auto_buffer_end()
    #endif

    extern _Thread_local bool nbhs_ebr_init;
    extern _Thread_local NBHS_EBREntry nbhs_ebr;
    extern _Atomic(int) nbhs_ebr_count;
    extern _Atomic(NBHS_EBREntry*) nbhs_ebr_list;
    extern _Atomic(NBHS_FreeQueueNode*) nbhs_free_queue_write;
    extern _Atomic(NBHS_FreeQueueNode*) nbhs_free_queue_read;
    extern atomic_flag nbhs_thread_init;

    extern int nbhs_thread_fn(void*);

    static size_t NBHS_FN(hash2index)(NBHS_Table* table, uint64_t h) {
    // MulHi(h, table->a)
    #if defined(__GNUC__) || defined(__clang__)
    uint64_t hi = (uint64_t) (((unsigned __int128)h * table->a) >> 64);
    #elif defined(_MSC_VER)
    uint64_t hi;
    _umul128(a, b, &hi);
    #else
    #error "Unsupported target"
    #endif

    uint64_t q = hi >> table->sh;
    uint64_t q2 = h - (q * table->cap);

    assert(q2 == h % table->cap);
    return q2;
    }

    static void* NBHS_FN(raw_lookup)(NBHS* hs, NBHS_Table* table, uint32_t h, void* val) {
    size_t mask = (1 << table->exp) - 1;
    size_t first = h & mask, i = first;
    size_t cap = table->cap;
    size_t first = NBHS_FN(hash2index)(table, h), i = first;
    do {
    void* entry = atomic_load(&table->data[i]);
    if (entry == NULL) {
    return NULL;
    } else if (NBHS_FN(cmp)(entry, val)) {
    return entry;
    }
    i = (i + 1) & mask;

    // inc & wrap around
    i = (i == cap-1) ? 0 : i + 1;
    } while (i != first);

    return NULL;
    @@ -145,31 +305,31 @@ static void* NBHS_FN(raw_intern)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev,
    void* result = NULL;
    uint32_t h = NBHS_FN(hash)(val);
    for (;;) {
    uint32_t exp = latest->exp;
    size_t limit = ((1ull << exp) * NBHS_LOAD_FACTOR) / 100;
    size_t cap = latest->cap;
    size_t limit = (cap * NBHS_LOAD_FACTOR) / 100;
    if (prev == NULL && latest->count >= limit) {
    // make resized table, we'll amortize the moves upward
    size_t new_cap = 1ull << (exp + 1);
    size_t new_cap = nbhs_compute_cap(limit*2);

    NBHS_Table* new_top = hs->alloc_mem(sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t));
    new_top->exp = exp + 1;
    NBHS_Table* new_top = NBHS_VIRTUAL_ALLOC(sizeof(NBHS_Table) + new_cap*sizeof(void*));
    nbhs_compute_size(new_top, new_cap);

    // CAS latest -> new_table, if another thread wins the race we'll use its table
    new_top->prev = latest;
    if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) {
    hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(void*));
    NBHS_VIRTUAL_FREE(new_top, sizeof(NBHS_Table) + new_cap*sizeof(void*));
    prev = atomic_load(&latest->prev);
    } else {
    prev = latest;
    latest = new_top;

    // printf("Resize!!! %zu -> %zu (threshold=%zu)\n", new_cap / 2, new_cap, limit);
    // float s = sizeof(NBHS_Table) + new_cap*sizeof(void*);
    // printf("Resize: %.2f KiB (cap=%zu)\n", s / 1024.0f, new_cap);
    }
    continue;
    }

    size_t mask = (1ull << exp) - 1;
    size_t first = h & mask, i = first;
    size_t first = NBHS_FN(hash2index)(latest, h), i = first;
    do {
    void* entry = atomic_load(&latest->data[i]);
    if (entry == NULL) {
    @@ -197,7 +357,8 @@ static void* NBHS_FN(raw_intern)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev,
    return entry;
    }

    i = (i + 1) & mask;
    // inc & wrap around
    i = (i == cap-1) ? 0 : i + 1;
    } while (i != first);

    // if the table changed before our eyes, it means someone resized which sucks
    @@ -214,9 +375,9 @@ static void* NBHS_FN(raw_intern)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev,

    void NBHS_FN(raw_insert)(NBHS* hs, void* val) {
    NBHS_Table* table = hs->latest;
    size_t cap = table->cap;
    uint32_t h = NBHS_FN(hash)(val);
    size_t mask = (1 << table->exp) - 1;
    size_t first = h & mask, i = first;
    size_t first = NBHS_FN(hash2index)(table, h), i = first;
    do {
    void* entry = atomic_load_explicit(&table->data[i], memory_order_relaxed);
    if (entry == NULL) {
    @@ -226,27 +387,29 @@ void NBHS_FN(raw_insert)(NBHS* hs, void* val) {
    }

    assert(!NBHS_FN(cmp)((void*) entry, val));
    i = (i + 1) & mask;

    // inc & wrap around
    i = (i == cap-1) ? 0 : i + 1;
    } while (i != first);

    abort();
    }

    // flips the top bit on
    static void NBHS_FN(enter_critsec)(void) {
    static void NBHS_FN(enter_pinned)(void) {
    uint64_t t = atomic_load_explicit(&nbhs_ebr.time, memory_order_relaxed);
    atomic_store_explicit(&nbhs_ebr.time, t + NBHS_LOCKED_BIT, memory_order_release);
    atomic_store_explicit(&nbhs_ebr.time, t + NBHS_PINNED_BIT, memory_order_release);
    }

    // flips the top bit off AND increments time by one
    static void NBHS_FN(exit_critsec)(void) {
    static void NBHS_FN(exit_pinned)(void) {
    uint64_t t = atomic_load_explicit(&nbhs_ebr.time, memory_order_relaxed);
    atomic_store_explicit(&nbhs_ebr.time, t + NBHS_LOCKED_BIT + 1, memory_order_release);
    atomic_store_explicit(&nbhs_ebr.time, t + NBHS_PINNED_BIT + 1, memory_order_release);
    }

    NBHS_Table* NBHS_FN(move_items)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int items_to_move) {
    assert(prev);
    size_t cap = 1ull << prev->exp;
    size_t cap = prev->cap;

    // snatch up some number of items
    uint32_t old, new;
    @@ -282,67 +445,30 @@ NBHS_Table* NBHS_FN(move_items)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev,
    NBHS__BEGIN("detach");
    latest->prev = NULL;

    // since we're freeing at the moment, we don't want to block up other freeing threads
    NBHS_FN(exit_critsec)();

    NBHS__BEGIN("scan");
    int state_count = nbhs_ebr_count;
    uint64_t* states = hs->alloc_mem(state_count * 2 * sizeof(uint64_t));

    // check current state, once the other threads either advance or aren't in the
    // lookup function we know we can free.
    NBHS_EBREntry* us = &nbhs_ebr;
    for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    // mark sure no ptrs refer to prev
    if (us != list && list->id < state_count) {
    states[list->id] = list->time;
    }
    }

    // wait on each and every thread to make progress or not be in a critical section at the time.
    //
    // UNLOCKED(id) -> LOCKED(id) -> UNLOCKED(id + 1)
    //
    // survey on if we can free the pointer and no
    // one would be holding it at that time:
    //
    // UNLOCKED(A) != LOCKED(A) we used to be unlocked which means we couldn't have held the pointer.
    //
    // LOCKED(A) != LOCKED(B) we're a different call so we've let it go by now
    //
    // LOCKED(A) != UNLOCKED(A) we've stopped caring about the state of the pointer at this point.
    //
    for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    // mark sure no ptrs refer to prev
    if (us != list && list->id < state_count && (states[list->id] & NBHS_LOCKED_BIT)) {
    uint64_t before_t = states[list->id], now_t;
    do {
    // idk, maybe this should be a better spinlock
    now_t = atomic_load(&list->time);
    } while (before_t == now_t);
    states[state_count + list->id] = now_t;
    }
    if (!atomic_flag_test_and_set(&nbhs_thread_init)) {
    // spawn our lovely "GC" thread
    thrd_create(&nbhs_thread, nbhs_thread_fn, NULL);
    }

    hs->free_mem(states, state_count * sizeof(uint64_t));
    NBHS__END();

    // no more refs, we can immediately free
    memset(prev, 0xCC, sizeof(NBHS_Table) + (1ull<<prev->exp)*sizeof(void*));
    // hs->free_mem(prev, sizeof(NBHS_Table) + (1ull<<prev->exp)*sizeof(void*));
    NBHS_FreeQueueNode* node = NBHS_REALLOC(NULL, sizeof(NBHS_FreeQueueNode));
    node->next = NULL;
    node->table = prev;

    NBHS_FN(enter_critsec)();
    prev = NULL;
    // queue into the "GC" thread
    // .next: NULL -> fn
    NBHS_FreeQueueNode *null, *old_writer;
    do {
    old_writer = atomic_load_explicit(&nbhs_free_queue_write, memory_order_relaxed);
    } while (null = NULL, !atomic_compare_exchange_strong(&nbhs_free_queue_write->next, &null, node));

    // once this store happens, the other threads can start submitting again
    atomic_store_explicit(&nbhs_free_queue_write, node, memory_order_release);
    NBHS__END();
    }
    return prev;
    }

    void* NBHS_FN(get)(NBHS* hs, void* val) {
    NBHS__BEGIN("intern");

    assert(val);
    static void NBHS_FN(ebr_try_init)(void) {
    if (!nbhs_ebr_init) {
    NBHS__BEGIN("init");
    nbhs_ebr_init = true;
    @@ -357,9 +483,16 @@ void* NBHS_FN(get)(NBHS* hs, void* val) {
    } while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr));
    NBHS__END();
    }
    }

    void* NBHS_FN(get)(NBHS* hs, void* val) {
    NBHS__BEGIN("intern");

    assert(val);
    NBHS_FN(ebr_try_init)();

    // modifying the tables is possible now.
    NBHS_FN(enter_critsec)();
    NBHS_FN(enter_pinned)();
    NBHS_Table* latest = atomic_load(&hs->latest);

    // if there's earlier versions of the table we can move up entries as we go along.
    @@ -373,12 +506,10 @@ void* NBHS_FN(get)(NBHS* hs, void* val) {

    // just lookup into the tables, we don't need to reserve
    // actually lookup & insert
    uint32_t exp = latest->exp;
    size_t mask = (1 << exp) - 1;

    void* result = NULL;
    uint32_t cap = latest->cap;
    uint32_t h = NBHS_FN(hash)(val);
    size_t first = h & mask, i = first;
    size_t first = NBHS_FN(hash2index)(latest, h), i = first;
    do {
    void* entry = atomic_load(&latest->data[i]);
    if (entry == NULL) {
    @@ -395,10 +526,11 @@ void* NBHS_FN(get)(NBHS* hs, void* val) {
    break;
    }

    i = (i + 1) & mask;
    // inc & wrap around
    i = (i == cap-1) ? 0 : i + 1;
    } while (i != first);

    NBHS_FN(exit_critsec)();
    NBHS_FN(exit_pinned)();
    NBHS__END();
    return result;
    }
    @@ -407,22 +539,9 @@ void* NBHS_FN(intern)(NBHS* hs, void* val) {
    NBHS__BEGIN("intern");

    assert(val);
    if (!nbhs_ebr_init) {
    NBHS__BEGIN("init");
    nbhs_ebr_init = true;
    nbhs_ebr.id = nbhs_ebr_count++;

    // add to ebr list, we never free this because i don't care
    NBHS_EBREntry* old;
    do {
    old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed);
    nbhs_ebr.next = old;
    } while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr));
    NBHS__END();
    }
    NBHS_FN(ebr_try_init)();

    // enter critical section, modifying the tables is possible now.
    NBHS_FN(enter_critsec)();
    NBHS_FN(enter_pinned)();
    NBHS_Table* latest = atomic_load(&hs->latest);

    // if there's earlier versions of the table we can move up entries as we go along.
    @@ -436,37 +555,23 @@ void* NBHS_FN(intern)(NBHS* hs, void* val) {

    void* result = NBHS_FN(raw_intern)(hs, latest, prev, val);

    NBHS_FN(exit_critsec)();
    NBHS_FN(exit_pinned)();
    NBHS__END();
    return result;
    }

    // waits for all items to be moved up before continuing
    void NBHS_FN(resize_barrier)(NBHS* hs) {
    NBHS__BEGIN("intern");
    if (!nbhs_ebr_init) {
    NBHS__BEGIN("init");
    nbhs_ebr_init = true;
    nbhs_ebr.id = nbhs_ebr_count++;

    // add to ebr list, we never free this because i don't care
    NBHS_EBREntry* old;
    do {
    old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed);
    nbhs_ebr.next = old;
    } while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr));
    NBHS__END();
    }

    // enter critical section, modifying the tables is possible now.
    NBHS_FN(enter_critsec)();
    NBHS_FN(ebr_try_init)();

    NBHS_FN(enter_pinned)();
    NBHS_Table *prev, *latest = atomic_load(&hs->latest);
    while (prev = atomic_load(&latest->prev), prev != NULL) {
    NBHS_FN(move_items)(hs, latest, prev, 1ull << prev->exp);
    NBHS_FN(move_items)(hs, latest, prev, prev->cap);
    }

    NBHS_FN(exit_critsec)();
    NBHS_FN(exit_pinned)();
    NBHS__END();
    }

    46 changes: 16 additions & 30 deletions test.c
    Original file line number Diff line number Diff line change
    @@ -123,45 +123,31 @@ static void* test_thread_fn(void* arg)
    uintptr_t starting_id = (uintptr_t) arg;
    uint64_t seed = starting_id * 11400714819323198485ULL;

    int* stats = &thread_stats[starting_id*16];
    uint32_t* arr = malloc(attempts * sizeof(uint32_t));

    #if USE_SPALL
    spall_auto_thread_init(starting_id, SPALL_DEFAULT_BUFFER_SIZE);
    spall_auto_buffer_begin("work", 4, NULL, 0);
    #endif

    int* stats = &thread_stats[starting_id*16];
    uint32_t* arr = malloc(attempts * sizeof(uint32_t));

    if (testing_lhs) {
    for (int i = 0; i < attempts;) {
    int limit = i + 128;
    if (limit > attempts) { limit = attempts; }

    spall_auto_buffer_begin("batch", 5, NULL, 0);
    for (; i < limit; i++) {
    arr[i] = pcg32_pie(&seed) & 0xFFFF;
    if (lhs_intern(test_lhs, &arr[i]) == &arr[i]) {
    stats[0] += 1; // insertions
    } else {
    stats[1] += 1; // duplicate
    }
    for (int i = 0; i < attempts; i++) {
    arr[i] = pcg32_pie(&seed) & 0xFFFF;
    if (lhs_intern(test_lhs, &arr[i]) == &arr[i]) {
    stats[0] += 1; // insertions
    } else {
    stats[1] += 1; // duplicate
    }
    spall_auto_buffer_end();
    }
    } else {
    for (int i = 0; i < attempts;) {
    int limit = i + 128;
    if (limit > attempts) { limit = attempts; }

    spall_auto_buffer_begin("batch", 5, NULL, 0);
    for (; i < limit; i++) {
    arr[i] = pcg32_pie(&seed) & 0xFFFF;
    if (my_intern(&test_set, &arr[i]) == &arr[i]) {
    stats[0] += 1; // insertions
    } else {
    stats[1] += 1; // duplicate
    }
    for (int i = 0; i < attempts; i++) {
    arr[i] = pcg32_pie(&seed) & 0xFFFF;
    if (my_intern(&test_set, &arr[i]) == &arr[i]) {
    stats[0] += 1; // insertions
    } else {
    stats[1] += 1; // duplicate
    }
    spall_auto_buffer_end();
    }
    }

    @@ -201,7 +187,7 @@ int main(int argc, char** argv) {
    InitializeCriticalSection(&test_lhs->lock);
    #endif
    } else {
    test_set = nbhs_alloc(32, NULL, NULL);
    test_set = nbhs_alloc(32);
    }

    #ifdef _WIN32
  5. RealNeGate revised this gist Nov 8, 2024. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion nbhs.h
    Original file line number Diff line number Diff line change
    @@ -428,7 +428,7 @@ void* NBHS_FN(intern)(NBHS* hs, void* val) {
    // if there's earlier versions of the table we can move up entries as we go along.
    NBHS_Table* prev = atomic_load(&latest->prev);
    if (prev) {
    prev = NBHS_FN(move_items)(hs, latest, prev, 1ull << prev->exp);
    prev = NBHS_FN(move_items)(hs, latest, prev, NBHS_MOVE_AMOUNT);
    if (prev == NULL) {
    latest = atomic_load(&hs->latest);
    }
  6. RealNeGate revised this gist Nov 8, 2024. 2 changed files with 241 additions and 173 deletions.
    320 changes: 180 additions & 140 deletions nbhs.h
    Original file line number Diff line number Diff line change
    @@ -3,52 +3,46 @@
    ////////////////////////////////
    // You wanna intern lots of things on lots of cores? this is for you. It's
    // inspired by Cliff's non-blocking hashmap.
    //
    // To use it, you'll need to define NBHS_FN and then include the header:
    //
    // #define NBHS_FN(n) XXX_hs_ ## n
    // #include <nbhs.h>
    //
    // This will compile implementations of the hashset using
    //
    // bool NBHS_FN(cmp)(const void* a, const void* b);
    // uint32_t NBHS_FN(hash)(const void* a);
    //
    // The exported functions are:
    //
    // void* NBHS_FN(get)(NBHS* hs, void* val);
    // void* NBHS_FN(intern)(NBHS* hs, void* val);
    // void NBHS_FN(resize_barrier)(NBHS* hs);
    //
    #ifndef NBHS_H
    #define NBHS_H

    typedef void* (*NBHS_AllocZeroMem)(size_t size);
    typedef void (*NBHS_FreeMem)(void* ptr, size_t size);
    typedef uint32_t (*NBHS_Hash)(const void* a);
    typedef bool (*NBHS_Compare)(const void* a, const void* b);
    typedef void* (*NBHS_AllocZeroMem)(size_t size);
    typedef void (*NBHS_FreeMem)(void* ptr, size_t size);

    typedef struct NBHS_Table NBHS_Table;
    typedef struct {
    NBHS_AllocZeroMem alloc_mem;
    NBHS_FreeMem free_mem;
    NBHS_Compare compare;
    NBHS_Hash hash;

    _Atomic(NBHS_Table*) latest;
    } NBHS;

    NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem free_mem, NBHS_Compare compare, NBHS_Hash hash);
    void nbhs_free(NBHS* hs);
    void* nbhs_intern(NBHS* hs, void* val);
    void* nbhs_get(NBHS* hs, void* val);

    // thread-unsafe insert, useful during init since it's faster
    // than the thread-safe stuff.
    void nbhs_raw_insert(NBHS* hs, void* val);
    void nbhs_resize_barrier(NBHS* hs);

    #endif // NBHS_H

    #ifdef NBHS_IMPL
    // for the time in the ebr entry
    #define NBHS_LOCKED_BIT (1ull << 63ull)

    // personal debooging stuff
    #if 0
    #define NBHS__BEGIN(name)
    #define NBHS__END()
    #else
    #define NBHS__BEGIN(name) spall_auto_buffer_begin(name, sizeof(name) - 1, NULL, 0)
    #define NBHS__END() spall_auto_buffer_end()
    #endif
    enum {
    NBHS_LOAD_FACTOR = 75,
    NBHS_MOVE_AMOUNT = 512,
    };

    enum { NBHS_PROBE_LIMIT = 5 };
    typedef struct NBHS_EBREntry {
    _Atomic(struct NBHS_EBREntry*) next;
    _Atomic(uint64_t) time;

    // for the time in the ebr entry
    #define NBHS_LOCKED_BIT (1ull << 63ull)
    // keep on a separate cacheline to avoid false sharing
    _Alignas(64) int id;
    } NBHS_EBREntry;

    typedef struct NBHS_Table NBHS_Table;
    struct NBHS_Table {
    _Atomic(NBHS_Table*) prev;

    @@ -63,30 +57,81 @@ struct NBHS_Table {
    _Atomic(void*) data[];
    };

    typedef struct NBHS_EBREntry {
    _Atomic(struct NBHS_EBREntry*) next;
    _Atomic(uint64_t) time;

    // keep on a separate cacheline to avoid false sharing
    _Alignas(64) int id;
    } NBHS_EBREntry;

    static _Thread_local bool nbhs_ebr_init;
    static _Thread_local NBHS_EBREntry nbhs_ebr;
    static _Atomic(int) nbhs_ebr_count;
    static _Atomic(NBHS_EBREntry*) nbhs_ebr_list;
    typedef struct {
    NBHS_AllocZeroMem alloc_mem;
    NBHS_FreeMem free_mem;
    _Atomic(NBHS_Table*) latest;
    } NBHS;

    static void* nbhs__alloc_zero_mem(size_t s) { return calloc(1, s); }
    static void nbhs__free_mem(void* ptr, size_t s) { free(ptr); }

    static void* nbhs_raw_lookup(NBHS* hs, NBHS_Table* table, uint32_t h, void* val) {
    static NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem free_mem) {
    if (alloc_mem == NULL) {
    assert(free_mem == NULL);
    alloc_mem = nbhs__alloc_zero_mem;
    free_mem = nbhs__free_mem;
    }

    size_t exp = 64 - __builtin_clzll(initial_cap - 1);
    NBHS_Table* table = alloc_mem(sizeof(NBHS_Table) + (1ull << exp)*sizeof(void*));
    table->exp = exp;
    return (NBHS){
    .alloc_mem = alloc_mem,
    .free_mem = free_mem,
    .latest = table
    };
    }

    static void nbhs_free(NBHS* hs) {
    NBHS_Table* curr = hs->latest;
    while (curr) {
    NBHS_Table* next = curr->prev;
    hs->free_mem(curr, sizeof(NBHS_Table) + (1ull*curr->exp)*sizeof(void*));
    curr = next;
    }
    }

    // for spooky stuff
    static void** nbhs_array(NBHS* hs) { return (void**) hs->latest->data; }
    static size_t nbhs_count(NBHS* hs) { return hs->latest->count; }
    static size_t nbhs_capacity(NBHS* hs) { return 1ull << hs->latest->exp; }

    #define nbhs_for(it, hs) for (void **it = nbhs_array(hs), **_end_ = &it[nbhs_capacity(hs)]; it != _end_; it++) if (*it != NULL)
    #endif // NBHS_H

    #ifdef NBHS_IMPL
    _Thread_local bool nbhs_ebr_init;
    _Thread_local NBHS_EBREntry nbhs_ebr;
    _Atomic(int) nbhs_ebr_count;
    _Atomic(NBHS_EBREntry*) nbhs_ebr_list;
    #endif // NBHS_IMPL

    // Templated implementation
    #ifdef NBHS_FN

    // personal debooging stuff
    #if 1
    #define NBHS__BEGIN(name)
    #define NBHS__END()
    #else
    #define NBHS__BEGIN(name) spall_auto_buffer_begin(name, sizeof(name) - 1, NULL, 0)
    #define NBHS__END() spall_auto_buffer_end()
    #endif

    extern _Thread_local bool nbhs_ebr_init;
    extern _Thread_local NBHS_EBREntry nbhs_ebr;
    extern _Atomic(int) nbhs_ebr_count;
    extern _Atomic(NBHS_EBREntry*) nbhs_ebr_list;

    static void* NBHS_FN(raw_lookup)(NBHS* hs, NBHS_Table* table, uint32_t h, void* val) {
    size_t mask = (1 << table->exp) - 1;
    size_t first = h & mask, i = first;
    do {
    void* entry = atomic_load(&table->data[i]);
    if (entry == NULL) {
    return NULL;
    } else if (hs->compare(entry, val)) {
    } else if (NBHS_FN(cmp)(entry, val)) {
    return entry;
    }
    i = (i + 1) & mask;
    @@ -95,28 +140,46 @@ static void* nbhs_raw_lookup(NBHS* hs, NBHS_Table* table, uint32_t h, void* val)
    return NULL;
    }

    static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, void* val) {
    static void* NBHS_FN(raw_intern)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, void* val) {
    // actually lookup & insert
    void* result = NULL;
    uint32_t h = hs->hash(val);
    for (;;) retry: {
    int probe = 0;

    uint32_t h = NBHS_FN(hash)(val);
    for (;;) {
    uint32_t exp = latest->exp;
    size_t limit = ((1ull << exp) * NBHS_LOAD_FACTOR) / 100;
    if (prev == NULL && latest->count >= limit) {
    // make resized table, we'll amortize the moves upward
    size_t new_cap = 1ull << (exp + 1);

    NBHS_Table* new_top = hs->alloc_mem(sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t));
    new_top->exp = exp + 1;

    // CAS latest -> new_table, if another thread wins the race we'll use its table
    new_top->prev = latest;
    if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) {
    hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(void*));
    prev = atomic_load(&latest->prev);
    } else {
    prev = latest;
    latest = new_top;

    // printf("Resize!!! %zu -> %zu (threshold=%zu)\n", new_cap / 2, new_cap, limit);
    }
    continue;
    }

    size_t mask = (1ull << exp) - 1;
    size_t first = h & mask, i = first;
    do {
    void* entry = atomic_load(&latest->data[i]);
    if (entry == NULL) {
    void* to_write = val;
    NBHS_Table* p = prev;
    while (p != NULL) {
    void* old = nbhs_raw_lookup(hs, p, h, val);
    if (__builtin_expect(prev != NULL, 0)) {
    assert(prev->prev == NULL);
    void* old = NBHS_FN(raw_lookup)(hs, prev, h, val);
    if (old != NULL) {
    to_write = old;
    break;
    }
    p = atomic_load_explicit(&p->prev, memory_order_relaxed);
    }

    // fight to be the one to land into the modern table
    @@ -130,29 +193,10 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi
    }
    }

    if (hs->compare(entry, val)) {
    if (NBHS_FN(cmp)(entry, val)) {
    return entry;
    }

    if (++probe >= NBHS_PROBE_LIMIT) {
    // make resized table, we'll amortize the moves upward
    size_t new_cap = 1ull << (exp + 1);

    NBHS_Table* new_top = hs->alloc_mem(sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t));
    new_top->exp = exp + 1;

    // CAS latest -> new_table, if another thread wins the race we'll use its table
    new_top->prev = latest;
    if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) {
    hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(void*));
    prev = atomic_load(&latest->prev);
    } else {
    prev = latest;
    latest = new_top;
    }
    goto retry;
    }

    i = (i + 1) & mask;
    } while (i != first);

    @@ -168,9 +212,9 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi
    }
    }

    void nbhs_raw_insert(NBHS* hs, void* val) {
    void NBHS_FN(raw_insert)(NBHS* hs, void* val) {
    NBHS_Table* table = hs->latest;
    uint32_t h = hs->hash(val);
    uint32_t h = NBHS_FN(hash)(val);
    size_t mask = (1 << table->exp) - 1;
    size_t first = h & mask, i = first;
    do {
    @@ -181,47 +225,26 @@ void nbhs_raw_insert(NBHS* hs, void* val) {
    return;
    }

    assert(!hs->compare((void*) entry, val));
    assert(!NBHS_FN(cmp)((void*) entry, val));
    i = (i + 1) & mask;
    } while (i != first);

    abort();
    }

    NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem free_mem, NBHS_Compare compare, NBHS_Hash hash) {
    if (alloc_mem == NULL) {
    assert(free_mem == NULL);
    alloc_mem = nbhs__alloc_zero_mem;
    free_mem = nbhs__free_mem;
    }

    size_t exp = 64 - __builtin_clzll(initial_cap - 1);
    NBHS_Table* table = alloc_mem(sizeof(NBHS_Table) + (1ull << exp)*sizeof(void*));
    table->exp = exp;
    return (NBHS){
    .alloc_mem = alloc_mem,
    .free_mem = free_mem,
    .compare = compare,
    .hash = hash,
    .latest = table
    };
    }

    void nbhs_free(NBHS* hs) {
    NBHS_Table* curr = hs->latest;
    while (curr) {
    NBHS_Table* next = curr->prev;
    hs->free_mem(curr, sizeof(NBHS_Table) + (1ull*curr->exp)*sizeof(void*));
    curr = next;
    }
    // flips the top bit on
    static void NBHS_FN(enter_critsec)(void) {
    uint64_t t = atomic_load_explicit(&nbhs_ebr.time, memory_order_relaxed);
    atomic_store_explicit(&nbhs_ebr.time, t + NBHS_LOCKED_BIT, memory_order_release);
    }

    // flips the top bit on
    static void nbhs_enter_critsec(void) { atomic_fetch_add(&nbhs_ebr.time, NBHS_LOCKED_BIT); }
    // flips the top bit off AND increments time by one
    static void nbhs_exit_critsec(void) { atomic_fetch_add(&nbhs_ebr.time, NBHS_LOCKED_BIT + 1); }
    static void NBHS_FN(exit_critsec)(void) {
    uint64_t t = atomic_load_explicit(&nbhs_ebr.time, memory_order_relaxed);
    atomic_store_explicit(&nbhs_ebr.time, t + NBHS_LOCKED_BIT + 1, memory_order_release);
    }

    NBHS_Table* nbhs_move_items(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int items_to_move) {
    NBHS_Table* NBHS_FN(move_items)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int items_to_move) {
    assert(prev);
    size_t cap = 1ull << prev->exp;

    @@ -235,13 +258,17 @@ NBHS_Table* nbhs_move_items(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int
    if (new > cap) { new = cap; }
    } while (!atomic_compare_exchange_strong(&prev->moved, &(uint32_t){ old }, new));

    if (old == new) {
    return prev;
    }

    NBHS__BEGIN("copying old");
    for (size_t i = old; i < new; i++) {
    // either NULL or complete can go thru without waiting
    void* old_p = atomic_load(&prev->data[i]);
    if (old_p) {
    // we pass NULL for prev since we already know the entries exist in prev
    nbhs_raw_intern(hs, latest, NULL, old_p);
    NBHS_FN(raw_intern)(hs, latest, NULL, old_p);
    }
    }
    NBHS__END();
    @@ -253,14 +280,14 @@ NBHS_Table* nbhs_move_items(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int
    if (done == cap) {
    // dettach now
    NBHS__BEGIN("detach");
    latest->prev = prev->prev;
    latest->prev = NULL;

    // since we're freeing at the moment, we don't want to block up other freeing threads
    nbhs_exit_critsec();
    NBHS_FN(exit_critsec)();

    NBHS__BEGIN("scan");
    int state_count = nbhs_ebr_count;
    uint64_t* states = hs->alloc_mem(state_count * sizeof(uint64_t));
    uint64_t* states = hs->alloc_mem(state_count * 2 * sizeof(uint64_t));

    // check current state, once the other threads either advance or aren't in the
    // lookup function we know we can free.
    @@ -273,6 +300,18 @@ NBHS_Table* nbhs_move_items(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int
    }

    // wait on each and every thread to make progress or not be in a critical section at the time.
    //
    // UNLOCKED(id) -> LOCKED(id) -> UNLOCKED(id + 1)
    //
    // survey on if we can free the pointer and no
    // one would be holding it at that time:
    //
    // UNLOCKED(A) != LOCKED(A) we used to be unlocked which means we couldn't have held the pointer.
    //
    // LOCKED(A) != LOCKED(B) we're a different call so we've let it go by now
    //
    // LOCKED(A) != UNLOCKED(A) we've stopped caring about the state of the pointer at this point.
    //
    for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    // mark sure no ptrs refer to prev
    if (us != list && list->id < state_count && (states[list->id] & NBHS_LOCKED_BIT)) {
    @@ -281,24 +320,26 @@ NBHS_Table* nbhs_move_items(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int
    // idk, maybe this should be a better spinlock
    now_t = atomic_load(&list->time);
    } while (before_t == now_t);
    states[state_count + list->id] = now_t;
    }
    }

    hs->free_mem(states, state_count * sizeof(uint64_t));
    NBHS__END();

    // no more refs, we can immediately free
    hs->free_mem(prev, sizeof(NBHS_Table) + (1ull<<prev->exp)*sizeof(void*));
    memset(prev, 0xCC, sizeof(NBHS_Table) + (1ull<<prev->exp)*sizeof(void*));
    // hs->free_mem(prev, sizeof(NBHS_Table) + (1ull<<prev->exp)*sizeof(void*));

    nbhs_enter_critsec();
    NBHS_FN(enter_critsec)();
    prev = NULL;

    NBHS__END();
    }
    return prev;
    }

    void* nbhs_get(NBHS* hs, void* val) {
    void* NBHS_FN(get)(NBHS* hs, void* val) {
    NBHS__BEGIN("intern");

    assert(val);
    @@ -318,14 +359,13 @@ void* nbhs_get(NBHS* hs, void* val) {
    }

    // modifying the tables is possible now.
    nbhs_enter_critsec();

    NBHS_FN(enter_critsec)();
    NBHS_Table* latest = atomic_load(&hs->latest);

    // if there's earlier versions of the table we can move up entries as we go along.
    NBHS_Table* prev = atomic_load(&latest->prev);
    if (prev) {
    prev = nbhs_move_items(hs, latest, prev, 64);
    prev = NBHS_FN(move_items)(hs, latest, prev, NBHS_MOVE_AMOUNT);
    if (prev == NULL) {
    latest = atomic_load(&hs->latest);
    }
    @@ -337,33 +377,33 @@ void* nbhs_get(NBHS* hs, void* val) {
    size_t mask = (1 << exp) - 1;

    void* result = NULL;
    uint32_t h = hs->hash(val);
    uint32_t h = NBHS_FN(hash)(val);
    size_t first = h & mask, i = first;
    do {
    void* entry = atomic_load(&latest->data[i]);
    if (entry == NULL) {
    NBHS_Table* p = prev;
    while (p != NULL) {
    result = nbhs_raw_lookup(hs, prev, h, val);
    result = NBHS_FN(raw_lookup)(hs, prev, h, val);
    p = atomic_load_explicit(&p->prev, memory_order_relaxed);
    }
    break;
    }

    if (hs->compare(entry, val)) {
    if (NBHS_FN(cmp)(entry, val)) {
    result = entry;
    break;
    }

    i = (i + 1) & mask;
    } while (i != first);

    nbhs_exit_critsec();
    NBHS_FN(exit_critsec)();
    NBHS__END();
    return result;
    }

    void* nbhs_intern(NBHS* hs, void* val) {
    void* NBHS_FN(intern)(NBHS* hs, void* val) {
    NBHS__BEGIN("intern");

    assert(val);
    @@ -382,28 +422,27 @@ void* nbhs_intern(NBHS* hs, void* val) {
    }

    // enter critical section, modifying the tables is possible now.
    nbhs_enter_critsec();

    NBHS_FN(enter_critsec)();
    NBHS_Table* latest = atomic_load(&hs->latest);

    // if there's earlier versions of the table we can move up entries as we go along.
    NBHS_Table* prev = atomic_load(&latest->prev);
    if (prev) {
    prev = nbhs_move_items(hs, latest, prev, 64);
    prev = NBHS_FN(move_items)(hs, latest, prev, 1ull << prev->exp);
    if (prev == NULL) {
    latest = atomic_load(&hs->latest);
    }
    }

    void* result = nbhs_raw_intern(hs, latest, prev, val);
    void* result = NBHS_FN(raw_intern)(hs, latest, prev, val);

    nbhs_exit_critsec();
    NBHS_FN(exit_critsec)();
    NBHS__END();
    return result;
    }

    // waits for all items to be moved up before continuing
    void nbhs_resize_barrier(NBHS* hs) {
    void NBHS_FN(resize_barrier)(NBHS* hs) {
    NBHS__BEGIN("intern");
    if (!nbhs_ebr_init) {
    NBHS__BEGIN("init");
    @@ -420,15 +459,16 @@ void nbhs_resize_barrier(NBHS* hs) {
    }

    // enter critical section, modifying the tables is possible now.
    nbhs_enter_critsec();
    NBHS_FN(enter_critsec)();

    NBHS_Table *prev, *latest = atomic_load(&hs->latest);
    while (prev = atomic_load(&latest->prev), prev != NULL) {
    nbhs_move_items(hs, latest, prev, 1ull << prev->exp);
    NBHS_FN(move_items)(hs, latest, prev, 1ull << prev->exp);
    }

    nbhs_exit_critsec();
    NBHS_FN(exit_critsec)();
    NBHS__END();
    }

    #endif // NBHS_IMPL
    #undef NBHS_FN
    #endif // NBHS_FN
    94 changes: 61 additions & 33 deletions test.c
    Original file line number Diff line number Diff line change
    @@ -16,7 +16,7 @@
    #include <pthread.h>
    #endif

    #define USE_SPALL 0
    #define USE_SPALL 1

    #if USE_SPALL
    #include "spall_native_auto.h"
    @@ -25,61 +25,78 @@
    #define spall_auto_buffer_end(...)
    #endif

    static int num_threads;

    static uint32_t my_hash(const void* a) {
    const uint8_t* data = a;
    uint32_t h = 0x811C9DC5;
    for (size_t i = 0; i < 4; i++) {
    h = (data[i] ^ h) * 0x01000193;
    }
    return h;
    }

    static bool my_cmp(const void* a, const void* b) {
    return *(const uint32_t*)a == *(const uint32_t*)b;
    }

    #define NBHS_IMPL
    #define NBHS_FN(n) my_ ## n
    #include "nbhs.h"

    static int num_threads;

    typedef struct {
    NBHS_Compare compare;
    NBHS_Hash hash;

    #ifdef _WIN32
    CRITICAL_SECTION lock;
    #else
    pthread_mutex_t lock;
    #endif

    size_t exp;
    void* data[];
    } LockedHS;

    void* lhs_intern(LockedHS* hs, void* val) {
    NBHS__BEGIN("intern");
    if (num_threads > 1) { pthread_mutex_lock(&hs->lock); }

    if (num_threads > 1) {
    #ifdef _WIN32
    EnterCriticalSection(&hs->lock);
    #else
    pthread_mutex_lock(&hs->lock);
    #endif
    }

    // actually lookup & insert
    uint32_t exp = hs->exp;
    size_t mask = (1 << exp) - 1;

    void* result = NULL;
    uint32_t h = hs->hash(val);
    uint32_t h = my_hash(val);
    size_t first = h & mask, i = first;
    do {
    if (hs->data[i] == NULL) {
    hs->data[i] = result = val;
    break;
    } else if (hs->compare(hs->data[i], val)) {
    } else if (my_cmp(hs->data[i], val)) {
    result = hs->data[i];
    break;
    }
    i = (i + 1) & mask;
    } while (i != first);
    assert(result != NULL);

    if (num_threads > 1) { pthread_mutex_unlock(&hs->lock); }
    if (num_threads > 1) {
    #ifdef _WIN32
    LeaveCriticalSection(&hs->lock);
    #else
    pthread_mutex_unlock(&hs->lock);
    #endif
    }

    NBHS__END();
    return result;
    }

    static uint32_t my_hash(const void* a) {
    const uint8_t* data = a;
    uint32_t h = 0x811C9DC5;
    for (size_t i = 0; i < 4; i++) {
    h = (data[i] ^ h) * 0x01000193;
    }
    return h;
    // return (*(const uint32_t*)a * 11400714819323198485ULL) >> 32ull;
    }

    static bool my_cmp(const void* a, const void* b) { return *(const uint32_t*)a == *(const uint32_t*)b; }

    // https://github.com/demetri/scribbles/blob/master/randomness/prngs.c
    uint32_t pcg32_pie(uint64_t *state) {
    uint64_t old = *state ^ 0xc90fdaa2adf85459ULL;
    @@ -115,13 +132,20 @@ static void* test_thread_fn(void* arg)
    uint32_t* arr = malloc(attempts * sizeof(uint32_t));

    if (testing_lhs) {
    for (int i = 0; i < attempts; i++) {
    arr[i] = pcg32_pie(&seed) & 0xFFFF;
    if (lhs_intern(test_lhs, &arr[i]) == &arr[i]) {
    stats[0] += 1; // insertions
    } else {
    stats[1] += 1; // duplicate
    for (int i = 0; i < attempts;) {
    int limit = i + 128;
    if (limit > attempts) { limit = attempts; }

    spall_auto_buffer_begin("batch", 5, NULL, 0);
    for (; i < limit; i++) {
    arr[i] = pcg32_pie(&seed) & 0xFFFF;
    if (lhs_intern(test_lhs, &arr[i]) == &arr[i]) {
    stats[0] += 1; // insertions
    } else {
    stats[1] += 1; // duplicate
    }
    }
    spall_auto_buffer_end();
    }
    } else {
    for (int i = 0; i < attempts;) {
    @@ -131,7 +155,7 @@ static void* test_thread_fn(void* arg)
    spall_auto_buffer_begin("batch", 5, NULL, 0);
    for (; i < limit; i++) {
    arr[i] = pcg32_pie(&seed) & 0xFFFF;
    if (nbhs_intern(&test_set, &arr[i]) == &arr[i]) {
    if (my_intern(&test_set, &arr[i]) == &arr[i]) {
    stats[0] += 1; // insertions
    } else {
    stats[1] += 1; // duplicate
    @@ -150,6 +174,8 @@ static void* test_thread_fn(void* arg)
    }

    int main(int argc, char** argv) {
    printf("%p\n", argv);

    #if USE_SPALL
    spall_auto_init((char *)"profile.spall");
    spall_auto_thread_init(0, SPALL_DEFAULT_BUFFER_SIZE);
    @@ -164,16 +190,18 @@ int main(int argc, char** argv) {
    }

    // attempts = 1000000000 / threads;
    attempts = 50000000 / num_threads;
    attempts = 10000000 / num_threads;
    thread_stats = calloc(num_threads, 64 * sizeof(int));

    if (testing_lhs) {
    test_lhs = calloc(sizeof(LockedHS) + 262144*sizeof(void*), 1);
    test_lhs->exp = 18;
    test_lhs->compare = my_cmp;
    test_lhs->hash = my_hash;

    #ifdef _WIN32
    InitializeCriticalSection(&test_lhs->lock);
    #endif
    } else {
    test_set = nbhs_alloc(32, NULL, NULL, my_cmp, my_hash);
    test_set = nbhs_alloc(32, NULL, NULL);
    }

    #ifdef _WIN32
  7. RealNeGate revised this gist Oct 16, 2024. 1 changed file with 41 additions and 42 deletions.
    83 changes: 41 additions & 42 deletions nbhs.h
    Original file line number Diff line number Diff line change
    @@ -3,8 +3,6 @@
    ////////////////////////////////
    // You wanna intern lots of things on lots of cores? this is for you. It's
    // inspired by Cliff's non-blocking hashmap.
    //
    // Interning should be wait-free given the hashset's not completely full (wait-free enough :p)
    #ifndef NBHS_H
    #define NBHS_H

    @@ -38,14 +36,16 @@ void nbhs_resize_barrier(NBHS* hs);
    #ifdef NBHS_IMPL

    // personal debooging stuff
    #if 1
    #if 0
    #define NBHS__BEGIN(name)
    #define NBHS__END()
    #else
    #define NBHS__BEGIN(name) spall_auto_buffer_begin(name, sizeof(name) - 1, NULL, 0)
    #define NBHS__END() spall_auto_buffer_end()
    #endif

    enum { NBHS_PROBE_LIMIT = 5 };

    // for the time in the ebr entry
    #define NBHS_LOCKED_BIT (1ull << 63ull)

    @@ -96,45 +96,27 @@ static void* nbhs_raw_lookup(NBHS* hs, NBHS_Table* table, uint32_t h, void* val)
    }

    static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, void* val) {
    // resize on 50% load factor and we're not in the moving process rn
    uint32_t threshold = (1 << latest->exp) / 2;
    if (__builtin_expect(latest->count >= threshold && prev == NULL, 0)) {
    // make resized table, we'll amortize the moves upward
    size_t new_cap = 1ull << (latest->exp + 1);

    NBHS_Table* new_top = hs->alloc_mem(sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t));
    new_top->exp = latest->exp + 1;

    // CAS latest -> new_table, if another thread wins the race we'll use its table
    new_top->prev = latest;
    if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) {
    hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(void*));
    prev = atomic_load(&latest->prev);
    } else {
    prev = latest;
    latest = new_top;
    }
    }

    // actually lookup & insert
    uint32_t exp = latest->exp;
    size_t mask = (1 << exp) - 1;

    void* result = NULL;
    uint32_t h = hs->hash(val);
    for (;;) {
    for (;;) retry: {
    int probe = 0;

    uint32_t exp = latest->exp;
    size_t mask = (1ull << exp) - 1;
    size_t first = h & mask, i = first;
    do {
    void* entry = atomic_load(&latest->data[i]);
    if (entry == NULL) {
    void* to_write = val;
    if (prev != NULL) {
    // should only be one previous table
    assert(prev->prev == NULL);
    void* old = nbhs_raw_lookup(hs, prev, h, val);
    NBHS_Table* p = prev;
    while (p != NULL) {
    void* old = nbhs_raw_lookup(hs, p, h, val);
    if (old != NULL) {
    to_write = old;
    break;
    }
    p = atomic_load_explicit(&p->prev, memory_order_relaxed);
    }

    // fight to be the one to land into the modern table
    @@ -152,6 +134,25 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi
    return entry;
    }

    if (++probe >= NBHS_PROBE_LIMIT) {
    // make resized table, we'll amortize the moves upward
    size_t new_cap = 1ull << (exp + 1);

    NBHS_Table* new_top = hs->alloc_mem(sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t));
    new_top->exp = exp + 1;

    // CAS latest -> new_table, if another thread wins the race we'll use its table
    new_top->prev = latest;
    if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) {
    hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(void*));
    prev = atomic_load(&latest->prev);
    } else {
    prev = latest;
    latest = new_top;
    }
    goto retry;
    }

    i = (i + 1) & mask;
    } while (i != first);

    @@ -252,9 +253,7 @@ NBHS_Table* nbhs_move_items(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int
    if (done == cap) {
    // dettach now
    NBHS__BEGIN("detach");

    assert(prev->prev == NULL);
    latest->prev = NULL;
    latest->prev = prev->prev;

    // since we're freeing at the moment, we don't want to block up other freeing threads
    nbhs_exit_critsec();
    @@ -306,6 +305,7 @@ void* nbhs_get(NBHS* hs, void* val) {
    if (!nbhs_ebr_init) {
    NBHS__BEGIN("init");
    nbhs_ebr_init = true;
    nbhs_ebr.id = nbhs_ebr_count++;

    // add to ebr list, we never free this because i don't care
    // TODO(NeGate): i do care, this is a nightmare when threads die figure it out
    @@ -314,7 +314,6 @@ void* nbhs_get(NBHS* hs, void* val) {
    old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed);
    nbhs_ebr.next = old;
    } while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr));
    nbhs_ebr.id = nbhs_ebr_count++;
    NBHS__END();
    }

    @@ -326,7 +325,7 @@ void* nbhs_get(NBHS* hs, void* val) {
    // if there's earlier versions of the table we can move up entries as we go along.
    NBHS_Table* prev = atomic_load(&latest->prev);
    if (prev) {
    prev = nbhs_move_items(hs, latest, prev, 32);
    prev = nbhs_move_items(hs, latest, prev, 64);
    if (prev == NULL) {
    latest = atomic_load(&hs->latest);
    }
    @@ -343,10 +342,10 @@ void* nbhs_get(NBHS* hs, void* val) {
    do {
    void* entry = atomic_load(&latest->data[i]);
    if (entry == NULL) {
    if (prev != NULL) {
    // should only be one previous table
    assert(prev->prev == NULL);
    NBHS_Table* p = prev;
    while (p != NULL) {
    result = nbhs_raw_lookup(hs, prev, h, val);
    p = atomic_load_explicit(&p->prev, memory_order_relaxed);
    }
    break;
    }
    @@ -371,14 +370,14 @@ void* nbhs_intern(NBHS* hs, void* val) {
    if (!nbhs_ebr_init) {
    NBHS__BEGIN("init");
    nbhs_ebr_init = true;
    nbhs_ebr.id = nbhs_ebr_count++;

    // add to ebr list, we never free this because i don't care
    NBHS_EBREntry* old;
    do {
    old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed);
    nbhs_ebr.next = old;
    } while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr));
    nbhs_ebr.id = nbhs_ebr_count++;
    NBHS__END();
    }

    @@ -390,7 +389,7 @@ void* nbhs_intern(NBHS* hs, void* val) {
    // if there's earlier versions of the table we can move up entries as we go along.
    NBHS_Table* prev = atomic_load(&latest->prev);
    if (prev) {
    prev = nbhs_move_items(hs, latest, prev, 32);
    prev = nbhs_move_items(hs, latest, prev, 64);
    if (prev == NULL) {
    latest = atomic_load(&hs->latest);
    }
    @@ -409,14 +408,14 @@ void nbhs_resize_barrier(NBHS* hs) {
    if (!nbhs_ebr_init) {
    NBHS__BEGIN("init");
    nbhs_ebr_init = true;
    nbhs_ebr.id = nbhs_ebr_count++;

    // add to ebr list, we never free this because i don't care
    NBHS_EBREntry* old;
    do {
    old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed);
    nbhs_ebr.next = old;
    } while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr));
    nbhs_ebr.id = nbhs_ebr_count++;
    NBHS__END();
    }

  8. RealNeGate revised this gist Oct 11, 2024. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion nbhs.h
    Original file line number Diff line number Diff line change
    @@ -4,7 +4,7 @@
    // You wanna intern lots of things on lots of cores? this is for you. It's
    // inspired by Cliff's non-blocking hashmap.
    //
    // Interning should be wait-free given the hashset's not completely full
    // Interning should be wait-free given the hashset's not completely full (wait-free enough :p)
    #ifndef NBHS_H
    #define NBHS_H

  9. RealNeGate revised this gist Oct 4, 2024. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions nbhs.h
    Original file line number Diff line number Diff line change
    @@ -109,6 +109,7 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi
    new_top->prev = latest;
    if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) {
    hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(void*));
    prev = atomic_load(&latest->prev);
    } else {
    prev = latest;
    latest = new_top;
  10. RealNeGate revised this gist Oct 3, 2024. 1 changed file with 2 additions and 0 deletions.
    2 changes: 2 additions & 0 deletions nbhs.h
    Original file line number Diff line number Diff line change
    @@ -3,6 +3,8 @@
    ////////////////////////////////
    // You wanna intern lots of things on lots of cores? this is for you. It's
    // inspired by Cliff's non-blocking hashmap.
    //
    // Interning should be wait-free given the hashset's not completely full
    #ifndef NBHS_H
    #define NBHS_H

  11. RealNeGate revised this gist Oct 1, 2024. 1 changed file with 220 additions and 132 deletions.
    352 changes: 220 additions & 132 deletions nbhs.h
    Original file line number Diff line number Diff line change
    @@ -24,23 +24,26 @@ typedef struct {
    NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem free_mem, NBHS_Compare compare, NBHS_Hash hash);
    void nbhs_free(NBHS* hs);
    void* nbhs_intern(NBHS* hs, void* val);
    void* nbhs_get(NBHS* hs, void* val);

    // thread-unsafe insert, useful during init since it's faster
    // than the thread-safe stuff.
    void nbhs_raw_insert(NBHS* hs, void* val);
    void nbhs_resize_barrier(NBHS* hs);

    #endif // NBHS_H

    #ifdef NBHS_IMPL

    // personal debooging stuff
    #if 0
    #if 1
    #define NBHS__BEGIN(name)
    #define NBHS__END()
    #else
    #define NBHS__BEGIN(name) spall_auto_buffer_begin(name, sizeof(name) - 1, NULL, 0)
    #define NBHS__END() spall_auto_buffer_end()
    #endif

    // private constants (idk if i wanna undef these but don't touch i guess?)
    #define NBHS_RESERVE_BIT (1ull << 63ull)

    // for the time in the ebr entry
    #define NBHS_LOCKED_BIT (1ull << 63ull)

    @@ -55,7 +58,7 @@ struct NBHS_Table {
    _Atomic uint32_t move_done;
    _Atomic uint32_t count;

    _Atomic(uintptr_t) data[];
    _Atomic(void*) data[];
    };

    typedef struct NBHS_EBREntry {
    @@ -74,46 +77,22 @@ static _Atomic(NBHS_EBREntry*) nbhs_ebr_list;
    static void* nbhs__alloc_zero_mem(size_t s) { return calloc(1, s); }
    static void nbhs__free_mem(void* ptr, size_t s) { free(ptr); }

    static void* nbhs_try_entry(NBHS* hs, NBHS_Table* table, int i, void* val, uintptr_t entry) {
    void* entry_p = (void*) (entry & ~NBHS_RESERVE_BIT);
    if (hs->compare(entry_p, val)) {
    if ((entry & NBHS_RESERVE_BIT) == 0) {
    // not reserved, yay
    return entry_p;
    } else {
    // either NULL or complete can go thru without waiting
    uintptr_t decision;
    while (decision = atomic_load(&table->data[i]), decision & NBHS_RESERVE_BIT) {
    // idk if i should do a nicer wait than a spin-lock
    }
    return (void*) decision;
    }
    } else {
    return NULL;
    }
    }

    static void* nbhs_raw_lookup(NBHS* hs, NBHS_Table* table, uint32_t h, void* val) {
    size_t mask = (1 << table->exp) - 1;
    size_t first = h & mask, i = first;
    do {
    uintptr_t entry = atomic_load(&table->data[i]);
    if (entry == 0) {
    void* entry = atomic_load(&table->data[i]);
    if (entry == NULL) {
    return NULL;
    }

    void* k = nbhs_try_entry(hs, table, i, val, entry);
    if (k != NULL) {
    return k;
    } else if (hs->compare(entry, val)) {
    return entry;
    }
    i = (i + 1) & mask;
    } while (i != first);

    return NULL;
    }

    // static _Atomic int reprobes[100];

    static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, void* val) {
    // resize on 50% load factor and we're not in the moving process rn
    uint32_t threshold = (1 << latest->exp) / 2;
    @@ -127,15 +106,13 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi
    // CAS latest -> new_table, if another thread wins the race we'll use its table
    new_top->prev = latest;
    if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) {
    hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t));
    hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(void*));
    } else {
    prev = latest;
    latest = new_top;
    }
    }

    uintptr_t reserved_form = (uintptr_t)val | NBHS_RESERVE_BIT;

    // actually lookup & insert
    uint32_t exp = latest->exp;
    size_t mask = (1 << exp) - 1;
    @@ -145,40 +122,31 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi
    for (;;) {
    size_t first = h & mask, i = first;
    do {
    uintptr_t entry = atomic_load(&latest->data[i]);
    if (entry == 0) {
    // we insert the reserved_form to lock the entry until we've checked
    // if the entry already existed in the previous table, if there's no
    // previous table we don't need this.
    uintptr_t to_write = prev ? reserved_form : (uintptr_t) val;
    void* entry = atomic_load(&latest->data[i]);
    if (entry == NULL) {
    void* to_write = val;
    if (prev != NULL) {
    // should only be one previous table
    assert(prev->prev == NULL);
    void* old = nbhs_raw_lookup(hs, prev, h, val);
    if (old != NULL) {
    to_write = old;
    }
    }

    // fight to be the one to land into the modern table
    if (atomic_compare_exchange_strong(&latest->data[i], &entry, to_write)) {
    result = to_write;

    // count doesn't care that it's a migration, it's at least not replacing an existing
    // slot in this version of the table.
    atomic_fetch_add_explicit(&latest->count, 1, memory_order_relaxed);

    if (prev != NULL) {
    // should only be one previous table
    assert(prev->prev == NULL);
    void* old = nbhs_raw_lookup(hs, prev, h, val);

    if (old != NULL) {
    // migrate the entry up
    atomic_exchange(&latest->data[i], (uintptr_t) old);
    result = old;
    break;
    }
    }

    // we're the first occurrence
    atomic_exchange(&latest->data[i], (uintptr_t) val);
    result = val;
    break;
    }
    }

    void* k = nbhs_try_entry(hs, latest, i, val, entry);
    if (k != NULL) {
    return k;
    if (hs->compare(entry, val)) {
    return entry;
    }

    i = (i + 1) & mask;
    @@ -196,6 +164,26 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi
    }
    }

    void nbhs_raw_insert(NBHS* hs, void* val) {
    NBHS_Table* table = hs->latest;
    uint32_t h = hs->hash(val);
    size_t mask = (1 << table->exp) - 1;
    size_t first = h & mask, i = first;
    do {
    void* entry = atomic_load_explicit(&table->data[i], memory_order_relaxed);
    if (entry == NULL) {
    atomic_store_explicit(&table->data[i], val, memory_order_relaxed);
    atomic_fetch_add_explicit(&table->count, 1, memory_order_relaxed);
    return;
    }

    assert(!hs->compare((void*) entry, val));
    i = (i + 1) & mask;
    } while (i != first);

    abort();
    }

    NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem free_mem, NBHS_Compare compare, NBHS_Hash hash) {
    if (alloc_mem == NULL) {
    assert(free_mem == NULL);
    @@ -204,7 +192,7 @@ NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem fr
    }

    size_t exp = 64 - __builtin_clzll(initial_cap - 1);
    NBHS_Table* table = alloc_mem(sizeof(NBHS_Table) + (1ull << exp)*sizeof(uintptr_t));
    NBHS_Table* table = alloc_mem(sizeof(NBHS_Table) + (1ull << exp)*sizeof(void*));
    table->exp = exp;
    return (NBHS){
    .alloc_mem = alloc_mem,
    @@ -219,7 +207,7 @@ void nbhs_free(NBHS* hs) {
    NBHS_Table* curr = hs->latest;
    while (curr) {
    NBHS_Table* next = curr->prev;
    hs->free_mem(curr, sizeof(NBHS_Table) + (1ull*curr->exp)*sizeof(uintptr_t));
    hs->free_mem(curr, sizeof(NBHS_Table) + (1ull*curr->exp)*sizeof(void*));
    curr = next;
    }
    }
    @@ -229,13 +217,95 @@ static void nbhs_enter_critsec(void) { atomic_fetch_add(&nbhs_ebr.time, NBHS_LOC
    // flips the top bit off AND increments time by one
    static void nbhs_exit_critsec(void) { atomic_fetch_add(&nbhs_ebr.time, NBHS_LOCKED_BIT + 1); }

    void* nbhs_intern(NBHS* hs, void* val) {
    NBHS_Table* nbhs_move_items(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int items_to_move) {
    assert(prev);
    size_t cap = 1ull << prev->exp;

    // snatch up some number of items
    uint32_t old, new;
    do {
    old = atomic_load(&prev->moved);
    if (old == cap) { return prev; }
    // cap the number of items to copy... by the cap
    new = old + items_to_move;
    if (new > cap) { new = cap; }
    } while (!atomic_compare_exchange_strong(&prev->moved, &(uint32_t){ old }, new));

    NBHS__BEGIN("copying old");
    for (size_t i = old; i < new; i++) {
    // either NULL or complete can go thru without waiting
    void* old_p = atomic_load(&prev->data[i]);
    if (old_p) {
    // we pass NULL for prev since we already know the entries exist in prev
    nbhs_raw_intern(hs, latest, NULL, old_p);
    }
    }
    NBHS__END();

    uint32_t done = atomic_fetch_add(&prev->move_done, new - old);
    done += new - old;

    assert(done <= cap);
    if (done == cap) {
    // dettach now
    NBHS__BEGIN("detach");

    assert(prev->prev == NULL);
    latest->prev = NULL;

    // since we're freeing at the moment, we don't want to block up other freeing threads
    nbhs_exit_critsec();

    NBHS__BEGIN("scan");
    int state_count = nbhs_ebr_count;
    uint64_t* states = hs->alloc_mem(state_count * sizeof(uint64_t));

    // check current state, once the other threads either advance or aren't in the
    // lookup function we know we can free.
    NBHS_EBREntry* us = &nbhs_ebr;
    for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    // mark sure no ptrs refer to prev
    if (us != list && list->id < state_count) {
    states[list->id] = list->time;
    }
    }

    // wait on each and every thread to make progress or not be in a critical section at the time.
    for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    // mark sure no ptrs refer to prev
    if (us != list && list->id < state_count && (states[list->id] & NBHS_LOCKED_BIT)) {
    uint64_t before_t = states[list->id], now_t;
    do {
    // idk, maybe this should be a better spinlock
    now_t = atomic_load(&list->time);
    } while (before_t == now_t);
    }
    }

    hs->free_mem(states, state_count * sizeof(uint64_t));
    NBHS__END();

    // no more refs, we can immediately free
    hs->free_mem(prev, sizeof(NBHS_Table) + (1ull<<prev->exp)*sizeof(void*));

    nbhs_enter_critsec();
    prev = NULL;

    NBHS__END();
    }
    return prev;
    }

    void* nbhs_get(NBHS* hs, void* val) {
    NBHS__BEGIN("intern");

    assert(val);
    if (!nbhs_ebr_init) {
    NBHS__BEGIN("init");
    nbhs_ebr_init = true;

    // add to ebr list, we never free this because i don't care
    // TODO(NeGate): i do care, this is a nightmare when threads die figure it out
    NBHS_EBREntry* old;
    do {
    old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed);
    @@ -245,93 +315,82 @@ void* nbhs_intern(NBHS* hs, void* val) {
    NBHS__END();
    }

    // enter critical section, modifying the tables is possible now.
    // modifying the tables is possible now.
    nbhs_enter_critsec();

    NBHS_Table* latest = atomic_load(&hs->latest);

    // if there's earlier versions of the table we can move up entries as we go along.
    NBHS_Table* prev = atomic_load(&latest->prev);
    if (prev) {
    size_t cap = 1ull << prev->exp;
    prev = nbhs_move_items(hs, latest, prev, 32);
    if (prev == NULL) {
    latest = atomic_load(&hs->latest);
    }
    }

    // snatch up some number of items
    uint32_t old, new;
    do {
    old = atomic_load(&prev->moved);
    if (old == cap) { goto skip; }
    // cap the number of items to copy... by the cap
    new = old + 32;
    if (new > cap) { new = cap; }
    } while (!atomic_compare_exchange_strong(&prev->moved, &(uint32_t){ old }, new));

    NBHS__BEGIN("copying old");
    for (size_t i = old; i < new; i++) {
    // either NULL or complete can go thru without waiting
    uintptr_t decision;
    while (decision = atomic_load(&prev->data[i]), decision & NBHS_RESERVE_BIT) {
    // idk if i should do a nicer wait than a spin-lock
    }
    // just lookup into the tables, we don't need to reserve
    // actually lookup & insert
    uint32_t exp = latest->exp;
    size_t mask = (1 << exp) - 1;

    if (decision) {
    // we pass NULL for prev since we already know the entries exist in prev
    nbhs_raw_intern(hs, latest, NULL, (void*) decision);
    void* result = NULL;
    uint32_t h = hs->hash(val);
    size_t first = h & mask, i = first;
    do {
    void* entry = atomic_load(&latest->data[i]);
    if (entry == NULL) {
    if (prev != NULL) {
    // should only be one previous table
    assert(prev->prev == NULL);
    result = nbhs_raw_lookup(hs, prev, h, val);
    }
    break;
    }
    NBHS__END();

    uint32_t done = atomic_fetch_add(&prev->move_done, new - old);
    done += new - old;

    assert(done <= cap);
    if (done == cap) {
    // dettach now
    NBHS__BEGIN("detach");

    assert(prev->prev == NULL);
    latest->prev = NULL;
    if (hs->compare(entry, val)) {
    result = entry;
    break;
    }

    // since we're freeing at the moment, we don't want to block up other freeing threads
    nbhs_exit_critsec();
    i = (i + 1) & mask;
    } while (i != first);

    NBHS__BEGIN("scan");
    int state_count = nbhs_ebr_count;
    uint64_t* states = hs->alloc_mem(state_count * sizeof(uint64_t));
    nbhs_exit_critsec();
    NBHS__END();
    return result;
    }

    // check current state, once the other threads either advance or aren't in the
    // lookup function we know we can free.
    NBHS_EBREntry* us = &nbhs_ebr;
    for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    // mark sure no ptrs refer to prev
    if (us != list && list->id < state_count) {
    states[list->id] = list->time;
    }
    }
    void* nbhs_intern(NBHS* hs, void* val) {
    NBHS__BEGIN("intern");

    // wait on each and every thread to make progress or not be in a critical section at the time.
    for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    // mark sure no ptrs refer to prev
    if (us != list && list->id < state_count && (states[list->id] & NBHS_LOCKED_BIT)) {
    uint64_t before_t = states[list->id], now_t;
    do {
    // idk, maybe this should be a better spinlock
    now_t = atomic_load(&list->time);
    } while (before_t == now_t);
    }
    }
    assert(val);
    if (!nbhs_ebr_init) {
    NBHS__BEGIN("init");
    nbhs_ebr_init = true;

    hs->free_mem(states, state_count * sizeof(uint64_t));
    NBHS__END();
    // add to ebr list, we never free this because i don't care
    NBHS_EBREntry* old;
    do {
    old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed);
    nbhs_ebr.next = old;
    } while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr));
    nbhs_ebr.id = nbhs_ebr_count++;
    NBHS__END();
    }

    // no more refs, we can immediately free
    hs->free_mem(prev, sizeof(NBHS_Table) + (1ull<<prev->exp)*sizeof(uintptr_t));
    // enter critical section, modifying the tables is possible now.
    nbhs_enter_critsec();

    nbhs_enter_critsec();
    prev = NULL;
    NBHS_Table* latest = atomic_load(&hs->latest);

    NBHS__END();
    // if there's earlier versions of the table we can move up entries as we go along.
    NBHS_Table* prev = atomic_load(&latest->prev);
    if (prev) {
    prev = nbhs_move_items(hs, latest, prev, 32);
    if (prev == NULL) {
    latest = atomic_load(&hs->latest);
    }
    skip:;
    }

    void* result = nbhs_raw_intern(hs, latest, prev, val);
    @@ -341,4 +400,33 @@ void* nbhs_intern(NBHS* hs, void* val) {
    return result;
    }

    #endif // NBHS_IMPL
    // waits for all items to be moved up before continuing
    void nbhs_resize_barrier(NBHS* hs) {
    NBHS__BEGIN("intern");
    if (!nbhs_ebr_init) {
    NBHS__BEGIN("init");
    nbhs_ebr_init = true;

    // add to ebr list, we never free this because i don't care
    NBHS_EBREntry* old;
    do {
    old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed);
    nbhs_ebr.next = old;
    } while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr));
    nbhs_ebr.id = nbhs_ebr_count++;
    NBHS__END();
    }

    // enter critical section, modifying the tables is possible now.
    nbhs_enter_critsec();

    NBHS_Table *prev, *latest = atomic_load(&hs->latest);
    while (prev = atomic_load(&latest->prev), prev != NULL) {
    nbhs_move_items(hs, latest, prev, 1ull << prev->exp);
    }

    nbhs_exit_critsec();
    NBHS__END();
    }

    #endif // NBHS_IMPL
  12. RealNeGate revised this gist Sep 26, 2024. 1 changed file with 90 additions and 22 deletions.
    112 changes: 90 additions & 22 deletions test.c
    Original file line number Diff line number Diff line change
    @@ -1,6 +1,7 @@
    #define _CRT_SECURE_NO_WARNINGS
    #include <stdio.h>
    #include <stdint.h>
    #include <string.h>
    #include <stddef.h>
    #include <stdlib.h>
    #include <assert.h>
    @@ -27,6 +28,46 @@
    #define NBHS_IMPL
    #include "nbhs.h"

    static int num_threads;

    typedef struct {
    NBHS_Compare compare;
    NBHS_Hash hash;

    pthread_mutex_t lock;

    size_t exp;
    void* data[];
    } LockedHS;

    void* lhs_intern(LockedHS* hs, void* val) {
    NBHS__BEGIN("intern");
    if (num_threads > 1) { pthread_mutex_lock(&hs->lock); }

    // actually lookup & insert
    uint32_t exp = hs->exp;
    size_t mask = (1 << exp) - 1;

    void* result = NULL;
    uint32_t h = hs->hash(val);
    size_t first = h & mask, i = first;
    do {
    if (hs->data[i] == NULL) {
    hs->data[i] = result = val;
    break;
    } else if (hs->compare(hs->data[i], val)) {
    result = hs->data[i];
    break;
    }
    i = (i + 1) & mask;
    } while (i != first);
    assert(result != NULL);

    if (num_threads > 1) { pthread_mutex_unlock(&hs->lock); }
    NBHS__END();
    return result;
    }

    static uint32_t my_hash(const void* a) {
    const uint8_t* data = a;
    uint32_t h = 0x811C9DC5;
    @@ -48,8 +89,11 @@ uint32_t pcg32_pie(uint64_t *state) {
    return (xorshifted >> rot) | (xorshifted << ((-rot) & 31));
    }

    static LockedHS* test_lhs;
    static NBHS test_set;

    static int attempts; // per thread
    static bool testing_lhs;

    static int* thread_stats;

    @@ -68,22 +112,33 @@ static void* test_thread_fn(void* arg)
    #endif

    int* stats = &thread_stats[starting_id*16];

    uint32_t* arr = malloc(attempts * sizeof(uint32_t));
    for (int i = 0; i < attempts;) {
    int limit = i + 128;
    if (limit > attempts) { limit = attempts; }

    spall_auto_buffer_begin("batch", 5, NULL, 0);
    for (; i < limit; i++) {
    if (testing_lhs) {
    for (int i = 0; i < attempts; i++) {
    arr[i] = pcg32_pie(&seed) & 0xFFFF;
    if (nbhs_intern(&test_set, &arr[i]) == &arr[i]) {
    if (lhs_intern(test_lhs, &arr[i]) == &arr[i]) {
    stats[0] += 1; // insertions
    } else {
    stats[1] += 1; // duplicate
    }
    }
    spall_auto_buffer_end();
    } else {
    for (int i = 0; i < attempts;) {
    int limit = i + 128;
    if (limit > attempts) { limit = attempts; }

    spall_auto_buffer_begin("batch", 5, NULL, 0);
    for (; i < limit; i++) {
    arr[i] = pcg32_pie(&seed) & 0xFFFF;
    if (nbhs_intern(&test_set, &arr[i]) == &arr[i]) {
    stats[0] += 1; // insertions
    } else {
    stats[1] += 1; // duplicate
    }
    }
    spall_auto_buffer_end();
    }
    }

    #if USE_SPALL
    @@ -100,38 +155,51 @@ int main(int argc, char** argv) {
    spall_auto_thread_init(0, SPALL_DEFAULT_BUFFER_SIZE);
    #endif

    int threads = atoi(argv[1]);
    printf("Testing with %d threads\n", threads);
    num_threads = atoi(argv[1]);
    printf("Testing with %d threads\n", num_threads);

    if (argc >= 3 && strcmp(argv[2], "lhs") == 0) {
    testing_lhs = true;
    printf(" With Locked hashset...\n");
    }

    // attempts = 1000000000 / threads;
    attempts = 10000000 / threads;
    test_set = nbhs_alloc(32, NULL, NULL, my_cmp, my_hash);
    thread_stats = calloc(threads, 64 * sizeof(int));
    attempts = 50000000 / num_threads;
    thread_stats = calloc(num_threads, 64 * sizeof(int));

    if (testing_lhs) {
    test_lhs = calloc(sizeof(LockedHS) + 262144*sizeof(void*), 1);
    test_lhs->exp = 18;
    test_lhs->compare = my_cmp;
    test_lhs->hash = my_hash;
    } else {
    test_set = nbhs_alloc(32, NULL, NULL, my_cmp, my_hash);
    }

    #ifdef _WIN32
    HANDLE* arr = malloc(threads * sizeof(HANDLE));
    for (int i = 0; i < threads; i++) {
    HANDLE* arr = malloc(num_threads * sizeof(HANDLE));
    for (int i = 0; i < num_threads; i++) {
    arr[i] = (HANDLE) _beginthreadex(NULL, 0, test_thread_fn, (void*) (uintptr_t) i, 0, 0);
    }
    WaitForMultipleObjects(threads, arr, true, INFINITE);
    WaitForMultipleObjects(num_threads, arr, true, INFINITE);
    #else
    pthread_t* arr = malloc(threads * sizeof(pthread_t));
    for (int i = 0; i < threads; i++) {
    pthread_t* arr = malloc(num_threads * sizeof(pthread_t));
    for (int i = 0; i < num_threads; i++) {
    pthread_create(&arr[i], NULL, test_thread_fn, (void*) (uintptr_t) i);
    }
    for (int i = 0; i < threads; i++) {
    for (int i = 0; i < num_threads; i++) {
    pthread_join(arr[i], NULL);
    }
    #endif

    int inserted = 0, duplicates = 0;
    for (int i = 0; i < threads; i++) {
    for (int i = 0; i < num_threads; i++) {
    inserted += thread_stats[i*16 + 0];
    duplicates += thread_stats[i*16 + 1];
    }

    printf("%d + %d = %d (needed %d)\n", inserted, duplicates, inserted + duplicates, attempts*threads);
    if (inserted + duplicates != attempts*threads) {
    printf("%d + %d = %d (needed %d)\n", inserted, duplicates, inserted + duplicates, attempts*num_threads);
    if (inserted + duplicates != attempts*num_threads) {
    printf("FAIL!\n");
    abort();
    }
  13. RealNeGate revised this gist Sep 26, 2024. 2 changed files with 65 additions and 17 deletions.
    14 changes: 9 additions & 5 deletions nbhs.h
    Original file line number Diff line number Diff line change
    @@ -112,6 +112,8 @@ static void* nbhs_raw_lookup(NBHS* hs, NBHS_Table* table, uint32_t h, void* val)
    return NULL;
    }

    // static _Atomic int reprobes[100];

    static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, void* val) {
    // resize on 50% load factor and we're not in the moving process rn
    uint32_t threshold = (1 << latest->exp) / 2;
    @@ -132,14 +134,16 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi
    }
    }

    // actually lookup & insert
    uint32_t h = hs->hash(val);
    uintptr_t reserved_form = (uintptr_t)val | NBHS_RESERVE_BIT;

    // actually lookup & insert
    uint32_t exp = latest->exp;
    size_t mask = (1 << exp) - 1;

    void* result = NULL;
    uint32_t h = hs->hash(val);
    for (;;) {
    size_t mask = (1 << latest->exp) - 1;
    size_t first = h & mask, i = first;

    do {
    uintptr_t entry = atomic_load(&latest->data[i]);
    if (entry == 0) {
    @@ -337,4 +341,4 @@ void* nbhs_intern(NBHS* hs, void* val) {
    return result;
    }

    #endif // NBHS_IMPL
    #endif // NBHS_IMPL
    68 changes: 56 additions & 12 deletions test.c
    Original file line number Diff line number Diff line change
    @@ -7,11 +7,15 @@
    #include <stdbool.h>
    #include <stdatomic.h>

    #ifdef _WIN32
    #define WIN32_LEAN_AND_MEAN
    #include <windows.h>
    #include <process.h>
    #else
    #include <pthread.h>
    #endif

    #define USE_SPALL 1
    #define USE_SPALL 0

    #if USE_SPALL
    #include "spall_native_auto.h"
    @@ -23,7 +27,16 @@
    #define NBHS_IMPL
    #include "nbhs.h"

    static uint32_t my_hash(const void* a) { return (*(const uint32_t*)a * 11400714819323198485ULL) >> 32ull; }
    static uint32_t my_hash(const void* a) {
    const uint8_t* data = a;
    uint32_t h = 0x811C9DC5;
    for (size_t i = 0; i < 4; i++) {
    h = (data[i] ^ h) * 0x01000193;
    }
    return h;
    // return (*(const uint32_t*)a * 11400714819323198485ULL) >> 32ull;
    }

    static bool my_cmp(const void* a, const void* b) { return *(const uint32_t*)a == *(const uint32_t*)b; }

    // https://github.com/demetri/scribbles/blob/master/randomness/prngs.c
    @@ -40,48 +53,76 @@ static int attempts; // per thread

    static int* thread_stats;

    static unsigned int test_thread_fn(void* arg) {
    #ifdef _WIN32
    static unsigned int test_thread_fn(void* arg)
    #else
    static void* test_thread_fn(void* arg)
    #endif
    {
    uintptr_t starting_id = (uintptr_t) arg;
    uint64_t seed = starting_id * 11400714819323198485ULL;

    #if USE_SPALL
    spall_auto_thread_init(starting_id, SPALL_DEFAULT_BUFFER_SIZE);
    spall_auto_buffer_begin("work", 4, NULL, 0);
    #endif

    int* stats = &thread_stats[starting_id*16];

    uint32_t* arr = malloc(attempts * sizeof(uint32_t));
    for (int i = 0; i < attempts; i++) {
    arr[i] = pcg32_pie(&seed) & 0xFFFF;
    if (nbhs_intern(&test_set, &arr[i]) == &arr[i]) {
    stats[0] += 1; // insertions
    } else {
    stats[1] += 1; // duplicate
    for (int i = 0; i < attempts;) {
    int limit = i + 128;
    if (limit > attempts) { limit = attempts; }

    spall_auto_buffer_begin("batch", 5, NULL, 0);
    for (; i < limit; i++) {
    arr[i] = pcg32_pie(&seed) & 0xFFFF;
    if (nbhs_intern(&test_set, &arr[i]) == &arr[i]) {
    stats[0] += 1; // insertions
    } else {
    stats[1] += 1; // duplicate
    }
    }
    spall_auto_buffer_end();
    }

    #if USE_SPALL
    spall_auto_buffer_end();
    spall_auto_thread_quit();
    #endif

    return 0;
    }

    int main(int argc, char** argv) {
    #if USE_SPALL
    spall_auto_init((char *)"profile.spall");
    spall_auto_thread_init(0, SPALL_DEFAULT_BUFFER_SIZE);
    #endif

    int threads = atoi(argv[1]);
    printf("Testing with %d threads\n", threads);

    // attempts = 1000000000 / threads;
    attempts = 10000000 / threads;
    test_set = nbhs_alloc(32, NULL, NULL, my_cmp, my_hash);

    HANDLE* arr = malloc(threads * sizeof(HANDLE));
    thread_stats = calloc(threads, 64 * sizeof(int));

    #ifdef _WIN32
    HANDLE* arr = malloc(threads * sizeof(HANDLE));
    for (int i = 0; i < threads; i++) {
    arr[i] = (HANDLE) _beginthreadex(NULL, 0, test_thread_fn, (void*) (uintptr_t) i, 0, 0);
    }
    WaitForMultipleObjects(threads, arr, true, INFINITE);
    #else
    pthread_t* arr = malloc(threads * sizeof(pthread_t));
    for (int i = 0; i < threads; i++) {
    pthread_create(&arr[i], NULL, test_thread_fn, (void*) (uintptr_t) i);
    }
    for (int i = 0; i < threads; i++) {
    pthread_join(arr[i], NULL);
    }
    #endif

    int inserted = 0, duplicates = 0;
    for (int i = 0; i < threads; i++) {
    @@ -95,12 +136,15 @@ int main(int argc, char** argv) {
    abort();
    }

    #if USE_SPALL
    spall_auto_thread_quit();
    spall_auto_quit();
    #endif

    return 0;
    }

    #if USE_SPALL
    #define SPALL_AUTO_IMPLEMENTATION
    #include "spall_native_auto.h"
    #endif
    #endif
  14. RealNeGate revised this gist Sep 24, 2024. 1 changed file with 9 additions and 5 deletions.
    14 changes: 9 additions & 5 deletions test.c
    Original file line number Diff line number Diff line change
    @@ -11,9 +11,14 @@
    #include <windows.h>
    #include <process.h>

    #define USE_SPALL 1

    #if USE_SPALL
    #include "spall_native_auto.h"
    // #define spall_auto_buffer_begin(...)
    // #define spall_auto_buffer_end(...)
    #else
    #define spall_auto_buffer_begin(...)
    #define spall_auto_buffer_end(...)
    #endif

    #define NBHS_IMPL
    #include "nbhs.h"
    @@ -95,8 +100,7 @@ int main(int argc, char** argv) {
    return 0;
    }

    // #undef spall_auto_buffer_begin
    // #undef spall_auto_buffer_end

    #if USE_SPALL
    #define SPALL_AUTO_IMPLEMENTATION
    #include "spall_native_auto.h"
    #endif
  15. RealNeGate revised this gist Sep 24, 2024. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions nbhs.h
    Original file line number Diff line number Diff line change
    @@ -188,6 +188,7 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi
    }

    latest = new_latest;
    prev = atomic_load(&latest->prev);
    }
    }

  16. RealNeGate revised this gist Sep 24, 2024. 1 changed file with 10 additions and 10 deletions.
    20 changes: 10 additions & 10 deletions nbhs.h
    Original file line number Diff line number Diff line change
    @@ -58,18 +58,18 @@ struct NBHS_Table {
    _Atomic(uintptr_t) data[];
    };

    typedef struct NBHS_HazardEntry {
    _Atomic(struct NBHS_HazardEntry*) next;
    typedef struct NBHS_EBREntry {
    _Atomic(struct NBHS_EBREntry*) next;
    _Atomic(uint64_t) time;

    // keep on a separate cacheline to avoid false sharing
    _Alignas(64) int id;
    } NBHS_HazardEntry;
    } NBHS_EBREntry;

    static _Thread_local bool nbhs_ebr_init;
    static _Thread_local NBHS_HazardEntry nbhs_ebr;
    static _Thread_local NBHS_EBREntry nbhs_ebr;
    static _Atomic(int) nbhs_ebr_count;
    static _Atomic(NBHS_HazardEntry*) nbhs_ebr_list;
    static _Atomic(NBHS_EBREntry*) nbhs_ebr_list;

    static void* nbhs__alloc_zero_mem(size_t s) { return calloc(1, s); }
    static void nbhs__free_mem(void* ptr, size_t s) { free(ptr); }
    @@ -231,7 +231,7 @@ void* nbhs_intern(NBHS* hs, void* val) {
    nbhs_ebr_init = true;

    // add to ebr list, we never free this because i don't care
    NBHS_HazardEntry* old;
    NBHS_EBREntry* old;
    do {
    old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed);
    nbhs_ebr.next = old;
    @@ -295,16 +295,16 @@ void* nbhs_intern(NBHS* hs, void* val) {

    // check current state, once the other threads either advance or aren't in the
    // lookup function we know we can free.
    NBHS_HazardEntry* us = &nbhs_ebr;
    for (NBHS_HazardEntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    NBHS_EBREntry* us = &nbhs_ebr;
    for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    // mark sure no ptrs refer to prev
    if (us != list && list->id < state_count) {
    states[list->id] = list->time;
    }
    }

    // wait on each and every thread to make progress or not be in a critical section at the time.
    for (NBHS_HazardEntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    // mark sure no ptrs refer to prev
    if (us != list && list->id < state_count && (states[list->id] & NBHS_LOCKED_BIT)) {
    uint64_t before_t = states[list->id], now_t;
    @@ -336,4 +336,4 @@ void* nbhs_intern(NBHS* hs, void* val) {
    return result;
    }

    #endif // NBHS_IMPL
    #endif // NBHS_IMPL
  17. RealNeGate revised this gist Sep 24, 2024. 2 changed files with 108 additions and 94 deletions.
    179 changes: 91 additions & 88 deletions nbhs.h
    Original file line number Diff line number Diff line change
    @@ -34,13 +34,16 @@ void* nbhs_intern(NBHS* hs, void* val);
    #define NBHS__BEGIN(name)
    #define NBHS__END()
    #else
    #define NBHS__BEGIN(name) spall_auto_buffer_begin(#name, sizeof(#name) - 1, NULL, 0)
    #define NBHS__BEGIN(name) spall_auto_buffer_begin(name, sizeof(name) - 1, NULL, 0)
    #define NBHS__END() spall_auto_buffer_end()
    #endif

    // private constants (idk if i wanna undef these but don't touch i guess?)
    #define NBHS_RESERVE_BIT (1ull << 63ull)

    // for the time in the ebr entry
    #define NBHS_LOCKED_BIT (1ull << 63ull)

    struct NBHS_Table {
    _Atomic(NBHS_Table*) prev;

    @@ -55,25 +58,18 @@ struct NBHS_Table {
    _Atomic(uintptr_t) data[];
    };

    // single hazard pointer per thread
    typedef struct NBHS_HazardEntry {
    _Atomic(struct NBHS_HazardEntry*) next;
    _Atomic(uint64_t) time;

    // normal insertion and work:
    // 0 - latest
    // 1 - prev
    // inside the prev moving code, it does an insert so
    // it needs separate hazard entries:
    // 2 - latest
    // 3 - prev
    // misc:
    // 4 - temporary
    _Atomic(NBHS_Table*) ptr[5];
    // keep on a separate cacheline to avoid false sharing
    _Alignas(64) int id;
    } NBHS_HazardEntry;

    static _Thread_local bool nbhs_hazard_init;
    static _Thread_local NBHS_HazardEntry nbhs_hazard;
    static _Atomic(NBHS_HazardEntry*) nbhs_hazard_list;
    static _Thread_local bool nbhs_ebr_init;
    static _Thread_local NBHS_HazardEntry nbhs_ebr;
    static _Atomic(int) nbhs_ebr_count;
    static _Atomic(NBHS_HazardEntry*) nbhs_ebr_list;

    static void* nbhs__alloc_zero_mem(size_t s) { return calloc(1, s); }
    static void nbhs__free_mem(void* ptr, size_t s) { free(ptr); }
    @@ -116,40 +112,24 @@ static void* nbhs_raw_lookup(NBHS* hs, NBHS_Table* table, uint32_t h, void* val)
    return NULL;
    }

    static NBHS_Table* nbhs_hazard_access(_Atomic(NBHS_Table*)* table, int slot) {
    NBHS_Table* val = atomic_load(table);
    for (;;) {
    // mark as hazard
    nbhs_hazard.ptr[slot] = val;
    // check if it's been invalidated since the previous load, if so
    // then undo hazard and try load again.
    NBHS_Table* after = atomic_load(table);
    if (val == after) {
    return val;
    }
    val = after;
    }
    }

    static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, void* val, int hazard_slot) {
    static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, void* val) {
    // resize on 50% load factor and we're not in the moving process rn
    uint32_t threshold = (1 << latest->exp) / 2;
    if (latest->count >= threshold && atomic_load(&latest->prev) == NULL) {
    if (__builtin_expect(latest->count >= threshold && prev == NULL, 0)) {
    // make resized table, we'll amortize the moves upward
    size_t new_cap = 1ull << (latest->exp + 1);

    NBHS_Table* new_top = hs->alloc_mem(sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t));
    new_top->exp = latest->exp + 1;
    new_top->exp = latest->exp + 1;

    // CAS latest -> new_table, if another thread wins the race we'll use its table
    new_top->prev = latest;
    if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) {
    hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t));
    } else {
    prev = latest;
    latest = new_top;
    }

    nbhs_hazard.ptr[hazard_slot] = latest;
    }

    // actually lookup & insert
    @@ -163,35 +143,31 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, void* val, int hazard
    do {
    uintptr_t entry = atomic_load(&latest->data[i]);
    if (entry == 0) {
    // if we spot a NULL, the entry has never been in this table, that doesn't mean
    // it's not appeared already so we'll only reserve the slot until we figure that
    // out.
    //
    // everyone else will see our pointer marked as reserved and are free to wait on
    // us if they match, if not they can move along.
    if (atomic_compare_exchange_strong(&latest->data[i], &entry, reserved_form)) {
    void* old = NULL;
    NBHS_Table* curr = nbhs_hazard_access(&latest->prev, hazard_slot + 1);
    if (curr != NULL) {
    // should only be one previous table
    assert(curr->prev == NULL);

    old = nbhs_raw_lookup(hs, curr, h, val);
    nbhs_hazard.ptr[hazard_slot + 1] = NULL;
    }

    // we insert the reserved_form to lock the entry until we've checked
    // if the entry already existed in the previous table, if there's no
    // previous table we don't need this.
    uintptr_t to_write = prev ? reserved_form : (uintptr_t) val;
    if (atomic_compare_exchange_strong(&latest->data[i], &entry, to_write)) {
    // count doesn't care that it's a migration, it's at least not replacing an existing
    // slot in this version of the table.
    atomic_fetch_add_explicit(&latest->count, 1, memory_order_relaxed);

    if (old != NULL) {
    atomic_exchange(&latest->data[i], (uintptr_t) old);
    result = old;
    } else {
    // no entry was found, good (reserved -> val)
    atomic_exchange(&latest->data[i], (uintptr_t) val);
    result = val;
    if (prev != NULL) {
    // should only be one previous table
    assert(prev->prev == NULL);
    void* old = nbhs_raw_lookup(hs, prev, h, val);

    if (old != NULL) {
    // migrate the entry up
    atomic_exchange(&latest->data[i], (uintptr_t) old);
    result = old;
    break;
    }
    }

    // we're the first occurrence
    atomic_exchange(&latest->data[i], (uintptr_t) val);
    result = val;
    break;
    }
    }
    @@ -206,16 +182,11 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, void* val, int hazard

    // if the table changed before our eyes, it means someone resized which sucks
    // but it just means we need to retry
    NBHS_Table* new_latest = nbhs_hazard_access(&hs->latest, 4);
    NBHS_Table* new_latest = atomic_load(&hs->latest);
    if (latest == new_latest && result != NULL) {
    nbhs_hazard.ptr[hazard_slot] = NULL;
    nbhs_hazard.ptr[4] = NULL;
    return result;
    }

    // move to the correct hazard slot
    nbhs_hazard.ptr[hazard_slot] = new_latest;
    nbhs_hazard.ptr[4] = NULL;
    latest = new_latest;
    }
    }
    @@ -248,26 +219,34 @@ void nbhs_free(NBHS* hs) {
    }
    }

    // flips the top bit on
    static void nbhs_enter_critsec(void) { atomic_fetch_add(&nbhs_ebr.time, NBHS_LOCKED_BIT); }
    // flips the top bit off AND increments time by one
    static void nbhs_exit_critsec(void) { atomic_fetch_add(&nbhs_ebr.time, NBHS_LOCKED_BIT + 1); }

    void* nbhs_intern(NBHS* hs, void* val) {
    NBHS__BEGIN("intern");
    if (!nbhs_hazard_init) {
    if (!nbhs_ebr_init) {
    NBHS__BEGIN("init");
    nbhs_hazard_init = true;
    nbhs_ebr_init = true;

    // add to hazard list, we never free this because i don't care
    // add to ebr list, we never free this because i don't care
    NBHS_HazardEntry* old;
    do {
    old = atomic_load_explicit(&nbhs_hazard_list, memory_order_relaxed);
    nbhs_hazard.next = old;
    } while (!atomic_compare_exchange_strong(&nbhs_hazard_list, &old, &nbhs_hazard));
    old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed);
    nbhs_ebr.next = old;
    } while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr));
    nbhs_ebr.id = nbhs_ebr_count++;
    NBHS__END();
    }

    NBHS_Table* latest = nbhs_hazard_access(&hs->latest, 0);
    // enter critical section, modifying the tables is possible now.
    nbhs_enter_critsec();

    // if there's earlier versions of the table we can move up entries as we go
    // along.
    NBHS_Table* prev = nbhs_hazard_access(&latest->prev, 1);
    NBHS_Table* latest = atomic_load(&hs->latest);

    // if there's earlier versions of the table we can move up entries as we go along.
    NBHS_Table* prev = atomic_load(&latest->prev);
    if (prev) {
    size_t cap = 1ull << prev->exp;

    @@ -290,7 +269,8 @@ void* nbhs_intern(NBHS* hs, void* val) {
    }

    if (decision) {
    nbhs_raw_intern(hs, latest, (void*) decision, 2);
    // we pass NULL for prev since we already know the entries exist in prev
    nbhs_raw_intern(hs, latest, NULL, (void*) decision);
    }
    }
    NBHS__END();
    @@ -306,31 +286,54 @@ void* nbhs_intern(NBHS* hs, void* val) {
    assert(prev->prev == NULL);
    latest->prev = NULL;

    // since we're freeing at the moment, we don't want to block up other freeing threads
    nbhs_exit_critsec();

    NBHS__BEGIN("scan");
    // wait for all refs to stop holding on (just read the hazards until they don't match)
    NBHS_HazardEntry* us = &nbhs_hazard;
    for (NBHS_HazardEntry* list = atomic_load(&nbhs_hazard_list); list; list = list->next) {
    int state_count = nbhs_ebr_count;
    uint64_t* states = hs->alloc_mem(state_count * sizeof(uint64_t));

    // check current state, once the other threads either advance or aren't in the
    // lookup function we know we can free.
    NBHS_HazardEntry* us = &nbhs_ebr;
    for (NBHS_HazardEntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    // mark sure no ptrs refer to prev
    if (us != list) for (size_t i = 0; i < (sizeof(list->ptr)/sizeof(void*)); i++) {
    while (list->ptr[i] == prev) {
    // spin-lock
    }
    if (us != list && list->id < state_count) {
    states[list->id] = list->time;
    }
    }

    // wait on each and every thread to make progress or not be in a critical section at the time.
    for (NBHS_HazardEntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) {
    // mark sure no ptrs refer to prev
    if (us != list && list->id < state_count && (states[list->id] & NBHS_LOCKED_BIT)) {
    uint64_t before_t = states[list->id], now_t;
    do {
    // idk, maybe this should be a better spinlock
    now_t = atomic_load(&list->time);
    } while (before_t == now_t);
    }
    }

    hs->free_mem(states, state_count * sizeof(uint64_t));
    NBHS__END();

    // no one's referring to it and no one will ever refer to it again
    // no more refs, we can immediately free
    hs->free_mem(prev, sizeof(NBHS_Table) + (1ull<<prev->exp)*sizeof(uintptr_t));

    nbhs_enter_critsec();
    prev = NULL;

    NBHS__END();
    }
    skip:;
    }
    nbhs_hazard.ptr[1] = NULL; // ok it can be freed now

    void* result = nbhs_raw_intern(hs, latest, val, 0);
    void* result = nbhs_raw_intern(hs, latest, prev, val);

    nbhs_exit_critsec();
    NBHS__END();
    return result;
    }

    #endif // NBHS_IMPL

    #endif // NBHS_IMPL
    23 changes: 17 additions & 6 deletions test.c
    Original file line number Diff line number Diff line change
    @@ -31,24 +31,26 @@ uint32_t pcg32_pie(uint64_t *state) {
    }

    static NBHS test_set;
    static _Atomic int inserted, duplicates;

    static int attempts; // per thread

    static int* thread_stats;

    static unsigned int test_thread_fn(void* arg) {
    uintptr_t starting_id = (uintptr_t) arg;
    uint64_t seed = starting_id * 11400714819323198485ULL;

    spall_auto_thread_init(starting_id, SPALL_DEFAULT_BUFFER_SIZE);
    spall_auto_buffer_begin("work", 4, NULL, 0);

    int* stats = &thread_stats[starting_id*16];

    uint32_t* arr = malloc(attempts * sizeof(uint32_t));
    for (int i = 0; i < attempts; i++) {
    arr[i] = pcg32_pie(&seed) & 0xFFFFF;
    arr[i] = pcg32_pie(&seed) & 0xFFFF;
    if (nbhs_intern(&test_set, &arr[i]) == &arr[i]) {
    inserted += 1;
    stats[0] += 1; // insertions
    } else {
    duplicates += 1;
    stats[1] += 1; // duplicate
    }
    }

    @@ -64,15 +66,24 @@ int main(int argc, char** argv) {
    int threads = atoi(argv[1]);
    printf("Testing with %d threads\n", threads);

    attempts = 4000000 / threads;
    // attempts = 1000000000 / threads;
    attempts = 10000000 / threads;
    test_set = nbhs_alloc(32, NULL, NULL, my_cmp, my_hash);

    HANDLE* arr = malloc(threads * sizeof(HANDLE));
    thread_stats = calloc(threads, 64 * sizeof(int));

    for (int i = 0; i < threads; i++) {
    arr[i] = (HANDLE) _beginthreadex(NULL, 0, test_thread_fn, (void*) (uintptr_t) i, 0, 0);
    }
    WaitForMultipleObjects(threads, arr, true, INFINITE);

    int inserted = 0, duplicates = 0;
    for (int i = 0; i < threads; i++) {
    inserted += thread_stats[i*16 + 0];
    duplicates += thread_stats[i*16 + 1];
    }

    printf("%d + %d = %d (needed %d)\n", inserted, duplicates, inserted + duplicates, attempts*threads);
    if (inserted + duplicates != attempts*threads) {
    printf("FAIL!\n");
  18. RealNeGate revised this gist Apr 4, 2024. 2 changed files with 0 additions and 0 deletions.
    File renamed without changes.
    File renamed without changes.
  19. RealNeGate revised this gist Apr 3, 2024. 1 changed file with 336 additions and 0 deletions.
    336 changes: 336 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,336 @@
    ////////////////////////////////
    // NBHS - Non-blocking hashset
    ////////////////////////////////
    // You wanna intern lots of things on lots of cores? this is for you. It's
    // inspired by Cliff's non-blocking hashmap.
    #ifndef NBHS_H
    #define NBHS_H

    typedef void* (*NBHS_AllocZeroMem)(size_t size);
    typedef void (*NBHS_FreeMem)(void* ptr, size_t size);
    typedef uint32_t (*NBHS_Hash)(const void* a);
    typedef bool (*NBHS_Compare)(const void* a, const void* b);

    typedef struct NBHS_Table NBHS_Table;
    typedef struct {
    NBHS_AllocZeroMem alloc_mem;
    NBHS_FreeMem free_mem;
    NBHS_Compare compare;
    NBHS_Hash hash;

    _Atomic(NBHS_Table*) latest;
    } NBHS;

    NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem free_mem, NBHS_Compare compare, NBHS_Hash hash);
    void nbhs_free(NBHS* hs);
    void* nbhs_intern(NBHS* hs, void* val);

    #endif // NBHS_H

    #ifdef NBHS_IMPL

    // personal debooging stuff
    #if 0
    #define NBHS__BEGIN(name)
    #define NBHS__END()
    #else
    #define NBHS__BEGIN(name) spall_auto_buffer_begin(#name, sizeof(#name) - 1, NULL, 0)
    #define NBHS__END() spall_auto_buffer_end()
    #endif

    // private constants (idk if i wanna undef these but don't touch i guess?)
    #define NBHS_RESERVE_BIT (1ull << 63ull)

    struct NBHS_Table {
    _Atomic(NBHS_Table*) prev;

    uint32_t exp;

    // tracks how many entries have
    // been moved once we're resizing
    _Atomic uint32_t moved;
    _Atomic uint32_t move_done;
    _Atomic uint32_t count;

    _Atomic(uintptr_t) data[];
    };

    // single hazard pointer per thread
    typedef struct NBHS_HazardEntry {
    _Atomic(struct NBHS_HazardEntry*) next;

    // normal insertion and work:
    // 0 - latest
    // 1 - prev
    // inside the prev moving code, it does an insert so
    // it needs separate hazard entries:
    // 2 - latest
    // 3 - prev
    // misc:
    // 4 - temporary
    _Atomic(NBHS_Table*) ptr[5];
    } NBHS_HazardEntry;

    static _Thread_local bool nbhs_hazard_init;
    static _Thread_local NBHS_HazardEntry nbhs_hazard;
    static _Atomic(NBHS_HazardEntry*) nbhs_hazard_list;

    static void* nbhs__alloc_zero_mem(size_t s) { return calloc(1, s); }
    static void nbhs__free_mem(void* ptr, size_t s) { free(ptr); }

    static void* nbhs_try_entry(NBHS* hs, NBHS_Table* table, int i, void* val, uintptr_t entry) {
    void* entry_p = (void*) (entry & ~NBHS_RESERVE_BIT);
    if (hs->compare(entry_p, val)) {
    if ((entry & NBHS_RESERVE_BIT) == 0) {
    // not reserved, yay
    return entry_p;
    } else {
    // either NULL or complete can go thru without waiting
    uintptr_t decision;
    while (decision = atomic_load(&table->data[i]), decision & NBHS_RESERVE_BIT) {
    // idk if i should do a nicer wait than a spin-lock
    }
    return (void*) decision;
    }
    } else {
    return NULL;
    }
    }

    static void* nbhs_raw_lookup(NBHS* hs, NBHS_Table* table, uint32_t h, void* val) {
    size_t mask = (1 << table->exp) - 1;
    size_t first = h & mask, i = first;
    do {
    uintptr_t entry = atomic_load(&table->data[i]);
    if (entry == 0) {
    return NULL;
    }

    void* k = nbhs_try_entry(hs, table, i, val, entry);
    if (k != NULL) {
    return k;
    }
    i = (i + 1) & mask;
    } while (i != first);

    return NULL;
    }

    static NBHS_Table* nbhs_hazard_access(_Atomic(NBHS_Table*)* table, int slot) {
    NBHS_Table* val = atomic_load(table);
    for (;;) {
    // mark as hazard
    nbhs_hazard.ptr[slot] = val;
    // check if it's been invalidated since the previous load, if so
    // then undo hazard and try load again.
    NBHS_Table* after = atomic_load(table);
    if (val == after) {
    return val;
    }
    val = after;
    }
    }

    static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, void* val, int hazard_slot) {
    // resize on 50% load factor and we're not in the moving process rn
    uint32_t threshold = (1 << latest->exp) / 2;
    if (latest->count >= threshold && atomic_load(&latest->prev) == NULL) {
    // make resized table, we'll amortize the moves upward
    size_t new_cap = 1ull << (latest->exp + 1);

    NBHS_Table* new_top = hs->alloc_mem(sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t));
    new_top->exp = latest->exp + 1;

    // CAS latest -> new_table, if another thread wins the race we'll use its table
    new_top->prev = latest;
    if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) {
    hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t));
    } else {
    latest = new_top;
    }

    nbhs_hazard.ptr[hazard_slot] = latest;
    }

    // actually lookup & insert
    uint32_t h = hs->hash(val);
    uintptr_t reserved_form = (uintptr_t)val | NBHS_RESERVE_BIT;
    void* result = NULL;
    for (;;) {
    size_t mask = (1 << latest->exp) - 1;
    size_t first = h & mask, i = first;

    do {
    uintptr_t entry = atomic_load(&latest->data[i]);
    if (entry == 0) {
    // if we spot a NULL, the entry has never been in this table, that doesn't mean
    // it's not appeared already so we'll only reserve the slot until we figure that
    // out.
    //
    // everyone else will see our pointer marked as reserved and are free to wait on
    // us if they match, if not they can move along.
    if (atomic_compare_exchange_strong(&latest->data[i], &entry, reserved_form)) {
    void* old = NULL;
    NBHS_Table* curr = nbhs_hazard_access(&latest->prev, hazard_slot + 1);
    if (curr != NULL) {
    // should only be one previous table
    assert(curr->prev == NULL);

    old = nbhs_raw_lookup(hs, curr, h, val);
    nbhs_hazard.ptr[hazard_slot + 1] = NULL;
    }

    // count doesn't care that it's a migration, it's at least not replacing an existing
    // slot in this version of the table.
    atomic_fetch_add_explicit(&latest->count, 1, memory_order_relaxed);

    if (old != NULL) {
    atomic_exchange(&latest->data[i], (uintptr_t) old);
    result = old;
    } else {
    // no entry was found, good (reserved -> val)
    atomic_exchange(&latest->data[i], (uintptr_t) val);
    result = val;
    }
    break;
    }
    }

    void* k = nbhs_try_entry(hs, latest, i, val, entry);
    if (k != NULL) {
    return k;
    }

    i = (i + 1) & mask;
    } while (i != first);

    // if the table changed before our eyes, it means someone resized which sucks
    // but it just means we need to retry
    NBHS_Table* new_latest = nbhs_hazard_access(&hs->latest, 4);
    if (latest == new_latest && result != NULL) {
    nbhs_hazard.ptr[hazard_slot] = NULL;
    nbhs_hazard.ptr[4] = NULL;
    return result;
    }

    // move to the correct hazard slot
    nbhs_hazard.ptr[hazard_slot] = new_latest;
    nbhs_hazard.ptr[4] = NULL;
    latest = new_latest;
    }
    }

    NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem free_mem, NBHS_Compare compare, NBHS_Hash hash) {
    if (alloc_mem == NULL) {
    assert(free_mem == NULL);
    alloc_mem = nbhs__alloc_zero_mem;
    free_mem = nbhs__free_mem;
    }

    size_t exp = 64 - __builtin_clzll(initial_cap - 1);
    NBHS_Table* table = alloc_mem(sizeof(NBHS_Table) + (1ull << exp)*sizeof(uintptr_t));
    table->exp = exp;
    return (NBHS){
    .alloc_mem = alloc_mem,
    .free_mem = free_mem,
    .compare = compare,
    .hash = hash,
    .latest = table
    };
    }

    void nbhs_free(NBHS* hs) {
    NBHS_Table* curr = hs->latest;
    while (curr) {
    NBHS_Table* next = curr->prev;
    hs->free_mem(curr, sizeof(NBHS_Table) + (1ull*curr->exp)*sizeof(uintptr_t));
    curr = next;
    }
    }

    void* nbhs_intern(NBHS* hs, void* val) {
    NBHS__BEGIN("intern");
    if (!nbhs_hazard_init) {
    NBHS__BEGIN("init");
    nbhs_hazard_init = true;

    // add to hazard list, we never free this because i don't care
    NBHS_HazardEntry* old;
    do {
    old = atomic_load_explicit(&nbhs_hazard_list, memory_order_relaxed);
    nbhs_hazard.next = old;
    } while (!atomic_compare_exchange_strong(&nbhs_hazard_list, &old, &nbhs_hazard));
    NBHS__END();
    }

    NBHS_Table* latest = nbhs_hazard_access(&hs->latest, 0);

    // if there's earlier versions of the table we can move up entries as we go
    // along.
    NBHS_Table* prev = nbhs_hazard_access(&latest->prev, 1);
    if (prev) {
    size_t cap = 1ull << prev->exp;

    // snatch up some number of items
    uint32_t old, new;
    do {
    old = atomic_load(&prev->moved);
    if (old == cap) { goto skip; }
    // cap the number of items to copy... by the cap
    new = old + 32;
    if (new > cap) { new = cap; }
    } while (!atomic_compare_exchange_strong(&prev->moved, &(uint32_t){ old }, new));

    NBHS__BEGIN("copying old");
    for (size_t i = old; i < new; i++) {
    // either NULL or complete can go thru without waiting
    uintptr_t decision;
    while (decision = atomic_load(&prev->data[i]), decision & NBHS_RESERVE_BIT) {
    // idk if i should do a nicer wait than a spin-lock
    }

    if (decision) {
    nbhs_raw_intern(hs, latest, (void*) decision, 2);
    }
    }
    NBHS__END();

    uint32_t done = atomic_fetch_add(&prev->move_done, new - old);
    done += new - old;

    assert(done <= cap);
    if (done == cap) {
    // dettach now
    NBHS__BEGIN("detach");

    assert(prev->prev == NULL);
    latest->prev = NULL;

    NBHS__BEGIN("scan");
    // wait for all refs to stop holding on (just read the hazards until they don't match)
    NBHS_HazardEntry* us = &nbhs_hazard;
    for (NBHS_HazardEntry* list = atomic_load(&nbhs_hazard_list); list; list = list->next) {
    // mark sure no ptrs refer to prev
    if (us != list) for (size_t i = 0; i < (sizeof(list->ptr)/sizeof(void*)); i++) {
    while (list->ptr[i] == prev) {
    // spin-lock
    }
    }
    }
    NBHS__END();

    // no one's referring to it and no one will ever refer to it again
    hs->free_mem(prev, sizeof(NBHS_Table) + (1ull<<prev->exp)*sizeof(uintptr_t));
    NBHS__END();
    }
    skip:;
    }
    nbhs_hazard.ptr[1] = NULL; // ok it can be freed now

    void* result = nbhs_raw_intern(hs, latest, val, 0);
    NBHS__END();
    return result;
    }

    #endif // NBHS_IMPL

  20. RealNeGate revised this gist Apr 3, 2024. 1 changed file with 7 additions and 281 deletions.
    288 changes: 7 additions & 281 deletions nbhs.c
    Original file line number Diff line number Diff line change
    @@ -15,288 +15,14 @@
    // #define spall_auto_buffer_begin(...)
    // #define spall_auto_buffer_end(...)

    #define NBHS_RESERVE_BIT (1ull << 63ull)
    #define NBHS_IMPL
    #include "nbhs.h"

    typedef struct NBHS_Table {
    _Atomic(struct NBHS_Table*) prev;

    uint32_t exp;

    // tracks how many entries have
    // been moved once we're resizing
    _Atomic uint32_t moved;
    _Atomic uint32_t move_done;
    _Atomic uint32_t count;

    _Atomic(uintptr_t) data[];
    } NBHS_Table;

    // non-blocking hashset
    typedef struct {
    _Atomic(NBHS_Table*) latest;
    } NBHS;

    // single hazard pointer per thread
    typedef struct NBHS_HazardEntry {
    _Atomic(struct NBHS_HazardEntry*) next;

    // normal insertion and work:
    // 0 - latest
    // 1 - prev
    // inside the prev moving code, it does an insert so
    // it needs separate hazard entries:
    // 2 - latest
    // 3 - prev
    // misc:
    // 4 - temporary
    _Atomic(NBHS_Table*) ptr[5];
    } NBHS_HazardEntry;

    static _Thread_local bool nbhs_hazard_init;
    static _Thread_local NBHS_HazardEntry nbhs_hazard;
    static _Atomic(NBHS_HazardEntry*) nbhs_hazard_list;

    void* nbhs_alloc_zero_mem(size_t s) { return calloc(1, s); }
    void nbhs_free_mem(void* ptr) { free(ptr); }

    // User-defined
    uint32_t nbhs_hash(void* a) { return (*(uint32_t*)a * 11400714819323198485ULL) >> 32ull; }
    bool nbhs_cmp(void* a, void* b) { return *(uint32_t*)a == *(uint32_t*)b; }

    static void* nbhs_resolved_entry(NBHS_Table* table, size_t i) {
    // either NULL or complete can go thru without waiting
    uintptr_t decision;
    while (decision = atomic_load(&table->data[i]), decision & NBHS_RESERVE_BIT) {
    // idk if i should do a nicer wait than a spin-lock
    }
    return (void*) decision;
    }

    static void* nbhs_raw_lookup(NBHS_Table* table, uint32_t h, void* val) {
    size_t mask = (1 << table->exp) - 1;
    size_t first = h & mask, i = first;
    do {
    uintptr_t entry = atomic_load(&table->data[i]);
    if (entry == 0) {
    return NULL;
    }

    void* entry_p = (void*) (entry & ~NBHS_RESERVE_BIT);
    if (nbhs_cmp(entry_p, val)) {
    if ((entry & NBHS_RESERVE_BIT) == 0) {
    // not reserved, yay
    return entry_p;
    } else {
    return nbhs_resolved_entry(table, i);
    }
    }

    i = (i + 1) & mask;
    } while (i != first);

    return NULL;
    }

    static NBHS_Table* nbhs_hazard_access(_Atomic(NBHS_Table*)* table, int slot) {
    NBHS_Table* val = atomic_load(table);
    for (;;) {
    // mark as hazard
    nbhs_hazard.ptr[slot] = val;
    // check if it's been invalidated since the previous load, if so
    // then undo hazard and try load again.
    NBHS_Table* after = atomic_load(table);
    if (val == after) {
    return val;
    }
    val = after;
    }
    }

    static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, void* val, int hazard_slot) {
    // resize on 50% load factor and we're not in the moving process rn
    uint32_t threshold = (1 << latest->exp) / 2;
    if (latest->count >= threshold && atomic_load(&latest->prev) == NULL) {
    // make resized table, we'll amortize the moves upward
    size_t new_cap = 1ull << (latest->exp + 1);

    NBHS_Table* new_top = nbhs_alloc_zero_mem(sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t));
    new_top->exp = latest->exp + 1;

    // CAS latest -> new_table, if another thread wins the race we'll use its table
    new_top->prev = latest;
    if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) {
    nbhs_free_mem(new_top);
    } else {
    latest = new_top;
    }

    nbhs_hazard.ptr[hazard_slot] = latest;
    }

    // actually lookup & insert
    uint32_t h = nbhs_hash(val);
    uintptr_t reserved_form = (uintptr_t)val | NBHS_RESERVE_BIT;
    void* result = NULL;
    for (;;) {
    size_t mask = (1 << latest->exp) - 1;
    size_t first = h & mask, i = first;

    do {
    uintptr_t entry = atomic_load(&latest->data[i]);
    if (entry == 0) {
    // if we spot a NULL, the entry has never been in this table, that doesn't mean
    // it's not appeared already so we'll only reserve the slot until we figure that
    // out.
    if (atomic_compare_exchange_strong(&latest->data[i], &entry, reserved_form)) {
    void* old = NULL;
    NBHS_Table* curr = nbhs_hazard_access(&latest->prev, hazard_slot + 1);
    if (curr != NULL) {
    // should only be one previous table
    assert(curr->prev == NULL);

    old = nbhs_raw_lookup(curr, h, val);
    nbhs_hazard.ptr[hazard_slot + 1] = NULL;
    }

    // count doesn't care that it's a migration, it's at least not replacing an existing
    // slot in this version of the table.
    atomic_fetch_add_explicit(&latest->count, 1, memory_order_relaxed);

    if (old != NULL) {
    atomic_exchange(&latest->data[i], (uintptr_t) old);
    result = old;
    } else {
    // no entry was found, good (reserved -> val)
    atomic_exchange(&latest->data[i], (uintptr_t) val);
    result = val;
    }
    break;
    }
    }

    void* entry_p = (void*) (entry & ~NBHS_RESERVE_BIT);
    if (nbhs_cmp(entry_p, val)) {
    if ((entry & NBHS_RESERVE_BIT) == 0) {
    result = entry_p;
    } else {
    result = nbhs_resolved_entry(latest, i);
    }
    break;
    }

    i = (i + 1) & mask;
    } while (i != first);

    // if the table changed before our eyes, it means someone resized which sucks
    // but it just means we need to retry
    NBHS_Table* new_latest = nbhs_hazard_access(&hs->latest, 4);
    if (latest == new_latest && result != NULL) {
    nbhs_hazard.ptr[hazard_slot] = NULL;
    nbhs_hazard.ptr[4] = NULL;
    return result;
    }

    // move to the correct hazard slot
    nbhs_hazard.ptr[hazard_slot] = new_latest;
    nbhs_hazard.ptr[4] = NULL;
    latest = new_latest;
    }
    }

    NBHS nbhs_alloc(size_t initial_cap) {
    size_t exp = 64 - __builtin_clzll(initial_cap - 1);
    NBHS_Table* table = nbhs_alloc_zero_mem(sizeof(NBHS_Table) + (1ull << exp)*sizeof(uintptr_t));
    table->exp = exp;
    return (NBHS){ table };
    }

    void nbhs_free(NBHS* hs) {
    NBHS_Table* curr = hs->latest;
    while (curr) {
    NBHS_Table* next = curr->prev;
    nbhs_free_mem(curr);
    curr = next;
    }
    }

    void* nbhs_intern(NBHS* hs, void* val) {
    spall_auto_buffer_begin("intern", 6, NULL, 0);
    if (!nbhs_hazard_init) {
    spall_auto_buffer_begin("init", 4, NULL, 0);
    nbhs_hazard_init = true;

    // add to hazard list, we never free this because i don't care
    NBHS_HazardEntry* old;
    do {
    old = atomic_load_explicit(&nbhs_hazard_list, memory_order_relaxed);
    nbhs_hazard.next = old;
    } while (!atomic_compare_exchange_strong(&nbhs_hazard_list, &old, &nbhs_hazard));
    spall_auto_buffer_end();
    }

    NBHS_Table* latest = nbhs_hazard_access(&hs->latest, 0);

    // if there's earlier versions of the table we can move up entries as we go
    // along.
    NBHS_Table* prev = nbhs_hazard_access(&latest->prev, 1);
    if (prev) {
    size_t cap = 1ull << prev->exp;

    // snatch up some number of items
    uint32_t old, new;
    do {
    old = atomic_load(&prev->moved);
    if (old == cap) { goto skip; }
    // cap the number of items to copy... by the cap
    new = old + 32;
    if (new > cap) { new = cap; }
    } while (!atomic_compare_exchange_strong(&prev->moved, &(uint32_t){ old }, new));

    spall_auto_buffer_begin("copying old", 11, NULL, 0);
    for (size_t i = old; i < new; i++) {
    void* decision = nbhs_resolved_entry(prev, i);
    if (decision) { nbhs_raw_intern(hs, latest, decision, 2); }
    }
    spall_auto_buffer_end();

    uint32_t done = atomic_fetch_add(&prev->move_done, new - old);
    done += new - old;

    assert(done <= cap);
    if (done == cap) {
    // dettach now
    spall_auto_buffer_begin("detach", 6, NULL, 0);

    assert(prev->prev == NULL);
    latest->prev = NULL;

    spall_auto_buffer_begin("scan", 4, NULL, 0);
    // wait for all refs to stop holding on (just read the hazards until they don't match)
    NBHS_HazardEntry* us = &nbhs_hazard;
    for (NBHS_HazardEntry* list = atomic_load(&nbhs_hazard_list); list; list = list->next) {
    // mark sure no ptrs refer to prev
    if (us != list) for (size_t i = 0; i < (sizeof(list->ptr)/sizeof(void*)); i++) {
    while (list->ptr[i] == prev) {
    // spin-lock
    }
    }
    }
    spall_auto_buffer_end();

    // no one's referring to it and no one will ever refer to it again
    nbhs_free_mem(prev);
    spall_auto_buffer_end();
    }
    skip:;
    }
    nbhs_hazard.ptr[1] = NULL; // ok it can be freed now
    void* result = nbhs_raw_intern(hs, latest, val, 0);
    spall_auto_buffer_end();
    return result;
    }
    static uint32_t my_hash(const void* a) { return (*(const uint32_t*)a * 11400714819323198485ULL) >> 32ull; }
    static bool my_cmp(const void* a, const void* b) { return *(const uint32_t*)a == *(const uint32_t*)b; }

    // https://github.com/demetri/scribbles/blob/master/randomness/prngs.c
    uint32_t pcg32_pie (uint64_t *state) {
    uint32_t pcg32_pie(uint64_t *state) {
    uint64_t old = *state ^ 0xc90fdaa2adf85459ULL;
    *state = *state * 6364136223846793005ULL + 0xc90fdaa2adf85459ULL;
    uint32_t xorshifted = ((old >> 18u) ^ old) >> 27u;
    @@ -309,7 +35,7 @@ static _Atomic int inserted, duplicates;

    static int attempts; // per thread

    unsigned int test_thread_fn(void* arg) {
    static unsigned int test_thread_fn(void* arg) {
    uintptr_t starting_id = (uintptr_t) arg;
    uint64_t seed = starting_id * 11400714819323198485ULL;

    @@ -339,7 +65,7 @@ int main(int argc, char** argv) {
    printf("Testing with %d threads\n", threads);

    attempts = 4000000 / threads;
    test_set = nbhs_alloc(32);
    test_set = nbhs_alloc(32, NULL, NULL, my_cmp, my_hash);

    HANDLE* arr = malloc(threads * sizeof(HANDLE));
    for (int i = 0; i < threads; i++) {
  21. RealNeGate created this gist Apr 3, 2024.
    365 changes: 365 additions & 0 deletions nbhs.c
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,365 @@
    #define _CRT_SECURE_NO_WARNINGS
    #include <stdio.h>
    #include <stdint.h>
    #include <stddef.h>
    #include <stdlib.h>
    #include <assert.h>
    #include <stdbool.h>
    #include <stdatomic.h>

    #define WIN32_LEAN_AND_MEAN
    #include <windows.h>
    #include <process.h>

    #include "spall_native_auto.h"
    // #define spall_auto_buffer_begin(...)
    // #define spall_auto_buffer_end(...)

    #define NBHS_RESERVE_BIT (1ull << 63ull)

    typedef struct NBHS_Table {
    _Atomic(struct NBHS_Table*) prev;

    uint32_t exp;

    // tracks how many entries have
    // been moved once we're resizing
    _Atomic uint32_t moved;
    _Atomic uint32_t move_done;
    _Atomic uint32_t count;

    _Atomic(uintptr_t) data[];
    } NBHS_Table;

    // non-blocking hashset
    typedef struct {
    _Atomic(NBHS_Table*) latest;
    } NBHS;

    // single hazard pointer per thread
    typedef struct NBHS_HazardEntry {
    _Atomic(struct NBHS_HazardEntry*) next;

    // normal insertion and work:
    // 0 - latest
    // 1 - prev
    // inside the prev moving code, it does an insert so
    // it needs separate hazard entries:
    // 2 - latest
    // 3 - prev
    // misc:
    // 4 - temporary
    _Atomic(NBHS_Table*) ptr[5];
    } NBHS_HazardEntry;

    static _Thread_local bool nbhs_hazard_init;
    static _Thread_local NBHS_HazardEntry nbhs_hazard;
    static _Atomic(NBHS_HazardEntry*) nbhs_hazard_list;

    void* nbhs_alloc_zero_mem(size_t s) { return calloc(1, s); }
    void nbhs_free_mem(void* ptr) { free(ptr); }

    // User-defined
    uint32_t nbhs_hash(void* a) { return (*(uint32_t*)a * 11400714819323198485ULL) >> 32ull; }
    bool nbhs_cmp(void* a, void* b) { return *(uint32_t*)a == *(uint32_t*)b; }

    static void* nbhs_resolved_entry(NBHS_Table* table, size_t i) {
    // either NULL or complete can go thru without waiting
    uintptr_t decision;
    while (decision = atomic_load(&table->data[i]), decision & NBHS_RESERVE_BIT) {
    // idk if i should do a nicer wait than a spin-lock
    }
    return (void*) decision;
    }

    static void* nbhs_raw_lookup(NBHS_Table* table, uint32_t h, void* val) {
    size_t mask = (1 << table->exp) - 1;
    size_t first = h & mask, i = first;
    do {
    uintptr_t entry = atomic_load(&table->data[i]);
    if (entry == 0) {
    return NULL;
    }

    void* entry_p = (void*) (entry & ~NBHS_RESERVE_BIT);
    if (nbhs_cmp(entry_p, val)) {
    if ((entry & NBHS_RESERVE_BIT) == 0) {
    // not reserved, yay
    return entry_p;
    } else {
    return nbhs_resolved_entry(table, i);
    }
    }

    i = (i + 1) & mask;
    } while (i != first);

    return NULL;
    }

    static NBHS_Table* nbhs_hazard_access(_Atomic(NBHS_Table*)* table, int slot) {
    NBHS_Table* val = atomic_load(table);
    for (;;) {
    // mark as hazard
    nbhs_hazard.ptr[slot] = val;
    // check if it's been invalidated since the previous load, if so
    // then undo hazard and try load again.
    NBHS_Table* after = atomic_load(table);
    if (val == after) {
    return val;
    }
    val = after;
    }
    }

    static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, void* val, int hazard_slot) {
    // resize on 50% load factor and we're not in the moving process rn
    uint32_t threshold = (1 << latest->exp) / 2;
    if (latest->count >= threshold && atomic_load(&latest->prev) == NULL) {
    // make resized table, we'll amortize the moves upward
    size_t new_cap = 1ull << (latest->exp + 1);

    NBHS_Table* new_top = nbhs_alloc_zero_mem(sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t));
    new_top->exp = latest->exp + 1;

    // CAS latest -> new_table, if another thread wins the race we'll use its table
    new_top->prev = latest;
    if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) {
    nbhs_free_mem(new_top);
    } else {
    latest = new_top;
    }

    nbhs_hazard.ptr[hazard_slot] = latest;
    }

    // actually lookup & insert
    uint32_t h = nbhs_hash(val);
    uintptr_t reserved_form = (uintptr_t)val | NBHS_RESERVE_BIT;
    void* result = NULL;
    for (;;) {
    size_t mask = (1 << latest->exp) - 1;
    size_t first = h & mask, i = first;

    do {
    uintptr_t entry = atomic_load(&latest->data[i]);
    if (entry == 0) {
    // if we spot a NULL, the entry has never been in this table, that doesn't mean
    // it's not appeared already so we'll only reserve the slot until we figure that
    // out.
    if (atomic_compare_exchange_strong(&latest->data[i], &entry, reserved_form)) {
    void* old = NULL;
    NBHS_Table* curr = nbhs_hazard_access(&latest->prev, hazard_slot + 1);
    if (curr != NULL) {
    // should only be one previous table
    assert(curr->prev == NULL);

    old = nbhs_raw_lookup(curr, h, val);
    nbhs_hazard.ptr[hazard_slot + 1] = NULL;
    }

    // count doesn't care that it's a migration, it's at least not replacing an existing
    // slot in this version of the table.
    atomic_fetch_add_explicit(&latest->count, 1, memory_order_relaxed);

    if (old != NULL) {
    atomic_exchange(&latest->data[i], (uintptr_t) old);
    result = old;
    } else {
    // no entry was found, good (reserved -> val)
    atomic_exchange(&latest->data[i], (uintptr_t) val);
    result = val;
    }
    break;
    }
    }

    void* entry_p = (void*) (entry & ~NBHS_RESERVE_BIT);
    if (nbhs_cmp(entry_p, val)) {
    if ((entry & NBHS_RESERVE_BIT) == 0) {
    result = entry_p;
    } else {
    result = nbhs_resolved_entry(latest, i);
    }
    break;
    }

    i = (i + 1) & mask;
    } while (i != first);

    // if the table changed before our eyes, it means someone resized which sucks
    // but it just means we need to retry
    NBHS_Table* new_latest = nbhs_hazard_access(&hs->latest, 4);
    if (latest == new_latest && result != NULL) {
    nbhs_hazard.ptr[hazard_slot] = NULL;
    nbhs_hazard.ptr[4] = NULL;
    return result;
    }

    // move to the correct hazard slot
    nbhs_hazard.ptr[hazard_slot] = new_latest;
    nbhs_hazard.ptr[4] = NULL;
    latest = new_latest;
    }
    }

    NBHS nbhs_alloc(size_t initial_cap) {
    size_t exp = 64 - __builtin_clzll(initial_cap - 1);
    NBHS_Table* table = nbhs_alloc_zero_mem(sizeof(NBHS_Table) + (1ull << exp)*sizeof(uintptr_t));
    table->exp = exp;
    return (NBHS){ table };
    }

    void nbhs_free(NBHS* hs) {
    NBHS_Table* curr = hs->latest;
    while (curr) {
    NBHS_Table* next = curr->prev;
    nbhs_free_mem(curr);
    curr = next;
    }
    }

    void* nbhs_intern(NBHS* hs, void* val) {
    spall_auto_buffer_begin("intern", 6, NULL, 0);
    if (!nbhs_hazard_init) {
    spall_auto_buffer_begin("init", 4, NULL, 0);
    nbhs_hazard_init = true;

    // add to hazard list, we never free this because i don't care
    NBHS_HazardEntry* old;
    do {
    old = atomic_load_explicit(&nbhs_hazard_list, memory_order_relaxed);
    nbhs_hazard.next = old;
    } while (!atomic_compare_exchange_strong(&nbhs_hazard_list, &old, &nbhs_hazard));
    spall_auto_buffer_end();
    }

    NBHS_Table* latest = nbhs_hazard_access(&hs->latest, 0);

    // if there's earlier versions of the table we can move up entries as we go
    // along.
    NBHS_Table* prev = nbhs_hazard_access(&latest->prev, 1);
    if (prev) {
    size_t cap = 1ull << prev->exp;

    // snatch up some number of items
    uint32_t old, new;
    do {
    old = atomic_load(&prev->moved);
    if (old == cap) { goto skip; }
    // cap the number of items to copy... by the cap
    new = old + 32;
    if (new > cap) { new = cap; }
    } while (!atomic_compare_exchange_strong(&prev->moved, &(uint32_t){ old }, new));

    spall_auto_buffer_begin("copying old", 11, NULL, 0);
    for (size_t i = old; i < new; i++) {
    void* decision = nbhs_resolved_entry(prev, i);
    if (decision) { nbhs_raw_intern(hs, latest, decision, 2); }
    }
    spall_auto_buffer_end();

    uint32_t done = atomic_fetch_add(&prev->move_done, new - old);
    done += new - old;

    assert(done <= cap);
    if (done == cap) {
    // dettach now
    spall_auto_buffer_begin("detach", 6, NULL, 0);

    assert(prev->prev == NULL);
    latest->prev = NULL;

    spall_auto_buffer_begin("scan", 4, NULL, 0);
    // wait for all refs to stop holding on (just read the hazards until they don't match)
    NBHS_HazardEntry* us = &nbhs_hazard;
    for (NBHS_HazardEntry* list = atomic_load(&nbhs_hazard_list); list; list = list->next) {
    // mark sure no ptrs refer to prev
    if (us != list) for (size_t i = 0; i < (sizeof(list->ptr)/sizeof(void*)); i++) {
    while (list->ptr[i] == prev) {
    // spin-lock
    }
    }
    }
    spall_auto_buffer_end();

    // no one's referring to it and no one will ever refer to it again
    nbhs_free_mem(prev);
    spall_auto_buffer_end();
    }
    skip:;
    }
    nbhs_hazard.ptr[1] = NULL; // ok it can be freed now
    void* result = nbhs_raw_intern(hs, latest, val, 0);
    spall_auto_buffer_end();
    return result;
    }

    // https://github.com/demetri/scribbles/blob/master/randomness/prngs.c
    uint32_t pcg32_pie (uint64_t *state) {
    uint64_t old = *state ^ 0xc90fdaa2adf85459ULL;
    *state = *state * 6364136223846793005ULL + 0xc90fdaa2adf85459ULL;
    uint32_t xorshifted = ((old >> 18u) ^ old) >> 27u;
    uint32_t rot = old >> 59u;
    return (xorshifted >> rot) | (xorshifted << ((-rot) & 31));
    }

    static NBHS test_set;
    static _Atomic int inserted, duplicates;

    static int attempts; // per thread

    unsigned int test_thread_fn(void* arg) {
    uintptr_t starting_id = (uintptr_t) arg;
    uint64_t seed = starting_id * 11400714819323198485ULL;

    spall_auto_thread_init(starting_id, SPALL_DEFAULT_BUFFER_SIZE);
    spall_auto_buffer_begin("work", 4, NULL, 0);

    uint32_t* arr = malloc(attempts * sizeof(uint32_t));
    for (int i = 0; i < attempts; i++) {
    arr[i] = pcg32_pie(&seed) & 0xFFFFF;
    if (nbhs_intern(&test_set, &arr[i]) == &arr[i]) {
    inserted += 1;
    } else {
    duplicates += 1;
    }
    }

    spall_auto_buffer_end();
    spall_auto_thread_quit();
    return 0;
    }

    int main(int argc, char** argv) {
    spall_auto_init((char *)"profile.spall");
    spall_auto_thread_init(0, SPALL_DEFAULT_BUFFER_SIZE);

    int threads = atoi(argv[1]);
    printf("Testing with %d threads\n", threads);

    attempts = 4000000 / threads;
    test_set = nbhs_alloc(32);

    HANDLE* arr = malloc(threads * sizeof(HANDLE));
    for (int i = 0; i < threads; i++) {
    arr[i] = (HANDLE) _beginthreadex(NULL, 0, test_thread_fn, (void*) (uintptr_t) i, 0, 0);
    }
    WaitForMultipleObjects(threads, arr, true, INFINITE);

    printf("%d + %d = %d (needed %d)\n", inserted, duplicates, inserted + duplicates, attempts*threads);
    if (inserted + duplicates != attempts*threads) {
    printf("FAIL!\n");
    abort();
    }

    spall_auto_thread_quit();
    spall_auto_quit();
    return 0;
    }

    // #undef spall_auto_buffer_begin
    // #undef spall_auto_buffer_end

    #define SPALL_AUTO_IMPLEMENTATION
    #include "spall_native_auto.h"