Last active
July 27, 2025 16:05
-
-
Save RealNeGate/7dd84f7b6ef37affedcbacf27bc4e52f to your computer and use it in GitHub Desktop.
Revisions
-
RealNeGate revised this gist
Nov 14, 2024 . 1 changed file with 8 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -30,9 +30,14 @@ // Virtual memory allocation (since the tables are generally nicely page-size friendly) #ifdef _WIN32 #define WIN32_LEAN_AND_MEAN #include <windows.h> #define NBHS_VIRTUAL_ALLOC(size) VirtualAlloc(NULL, size, MEM_RESERVE | MEM_COMMIT, PAGE_READWRITE) #define NBHS_VIRTUAL_FREE(ptr, size) VirtualFree(ptr, size, MEM_RELEASE) #else #include <sys/mman.h> #define NBHS_VIRTUAL_ALLOC(size) mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0) #define NBHS_VIRTUAL_FREE(ptr, size) munmap(ptr, size) #endif @@ -355,6 +360,7 @@ NBHS_Table* NBHS_FN(move_items)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, // dettach now NBHS__BEGIN("detach"); latest->prev = NULL; NBHS_FN(exit_pinned)(); int state_count = nbhs_ebr_count; uint64_t* states = NBHS_REALLOC(NULL, state_count * sizeof(uint64_t)); @@ -407,6 +413,7 @@ NBHS_Table* NBHS_FN(move_items)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, NBHS_VIRTUAL_FREE(prev, sizeof(NBHS_Table) + prev->cap*sizeof(void*)); NBHS_REALLOC(states, 0); NBHS_FN(enter_pinned)(); prev = NULL; NBHS__END(); } @@ -521,4 +528,4 @@ void NBHS_FN(resize_barrier)(NBHS* hs) { } #undef NBHS_FN #endif // NBHS_FN -
RealNeGate revised this gist
Nov 9, 2024 . 1 changed file with 48 additions and 104 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -46,8 +46,6 @@ #define NBHS__DEBOOGING 0 #if NBHS__DEBOOGING #define NBHS__BEGIN(name) spall_auto_buffer_begin(name, sizeof(name) - 1, NULL, 0) #define NBHS__END() spall_auto_buffer_end() #else @@ -166,90 +164,6 @@ _Thread_local bool nbhs_ebr_init; _Thread_local NBHS_EBREntry nbhs_ebr; _Atomic(int) nbhs_ebr_count; _Atomic(NBHS_EBREntry*) nbhs_ebr_list; #endif // NBHS_IMPL // Templated implementation @@ -258,10 +172,6 @@ extern _Thread_local bool nbhs_ebr_init; extern _Thread_local NBHS_EBREntry nbhs_ebr; extern _Atomic(int) nbhs_ebr_count; extern _Atomic(NBHS_EBREntry*) nbhs_ebr_list; extern int nbhs_thread_fn(void*); @@ -446,24 +356,58 @@ NBHS_Table* NBHS_FN(move_items)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, NBHS__BEGIN("detach"); latest->prev = NULL; int state_count = nbhs_ebr_count; uint64_t* states = NBHS_REALLOC(NULL, state_count * sizeof(uint64_t)); NBHS__BEGIN("scan"); NBHS_EBREntry* us = &nbhs_ebr; // "snapshot" the current statuses, once the other threads either advance or aren't in the // hashset functions we know we can free. for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) { // mark sure no ptrs refer to prev if (list != us && list->id < state_count) { states[list->id] = list->time; } } // important bit is that pointers can't be held across the critical sections, they'd need // to reload from `NBHS.latest`. // // Here's the states of our "epoch" critical section thingy: // // UNPINNED(id) -> PINNED(id) -> UNPINNED(id + 1) -> UNPINNED(id + 1) -> ... // // survey on if we can free the pointer if the status changed from X -> Y: // // # YES: if we started unlocked then we weren't holding pointers in the first place. // UNPINNED(A) -> PINNED(A) // UNPINNED(A) -> UNPINNED(A) // UNPINNED(A) -> UNPINNED(B) // // # YES: if we're locked we need to wait until we've stopped holding pointers. // PINNED(A) -> PINNED(B) we're a different call so we've let it go by now. // PINNED(A) -> UNPINNED(B) we've stopped caring about the state of the pointer at this point. // // # NO: we're still doing shit, wait a sec. // PINNED(A) -> PINNED(A) // // these aren't quite blocking the other threads, we're simply checking what their progress is concurrently. for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) { if (list != us && list->id < state_count && (states[list->id] & NBHS_PINNED_BIT)) { uint64_t before_t = states[list->id], now_t; do { // idk, maybe this should be a better spinlock now_t = atomic_load(&list->time); } while (before_t == now_t); } } NBHS__END(); // no more refs, we can immediately free NBHS_VIRTUAL_FREE(prev, sizeof(NBHS_Table) + prev->cap*sizeof(void*)); NBHS_REALLOC(states, 0); prev = NULL; NBHS__END(); } return prev; -
RealNeGate revised this gist
Nov 9, 2024 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -261,6 +261,7 @@ extern _Atomic(NBHS_EBREntry*) nbhs_ebr_list; extern _Atomic(NBHS_FreeQueueNode*) nbhs_free_queue_write; extern _Atomic(NBHS_FreeQueueNode*) nbhs_free_queue_read; extern atomic_flag nbhs_thread_init; extern thrd_t nbhs_thread; extern int nbhs_thread_fn(void*); -
RealNeGate revised this gist
Nov 9, 2024 . 2 changed files with 269 additions and 178 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -23,15 +23,44 @@ #ifndef NBHS_H #define NBHS_H #include <threads.h> #include <stdint.h> #include <stddef.h> #include <stdatomic.h> // Virtual memory allocation (since the tables are generally nicely page-size friendly) #ifdef _WIN32 #define NBHS_VIRTUAL_ALLOC(size) VirtualAlloc(NULL, size, MEM_RESERVE | MEM_COMMIT, PAGE_READWRITE) #define NBHS_VIRTUAL_FREE(ptr, size) VirtualFree(ptr, size, MEM_RELEASE) #else #define NBHS_VIRTUAL_ALLOC(size) mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0) #define NBHS_VIRTUAL_FREE(ptr, size) munmap(ptr, size) #endif // traditional heap ops #ifndef NBHS_REALLOC #define NBHS_REALLOC(ptr, size) realloc(ptr, size) #endif // NBHS_REALLOC // personal debooging stuff #define NBHS__DEBOOGING 0 #if NBHS__DEBOOGING #define NBHS__THREAD_START(id) spall_auto_thread_init(id, SPALL_DEFAULT_BUFFER_SIZE) #define NBHS__THREAD_END() spall_auto_thread_quit() #define NBHS__BEGIN(name) spall_auto_buffer_begin(name, sizeof(name) - 1, NULL, 0) #define NBHS__END() spall_auto_buffer_end() #else #define NBHS__BEGIN(name) #define NBHS__END() #endif // for the time in the ebr entry #define NBHS_PINNED_BIT (1ull << 63ull) enum { NBHS_LOAD_FACTOR = 75, NBHS_MOVE_AMOUNT = 128, }; typedef struct NBHS_EBREntry { @@ -46,7 +75,10 @@ typedef struct NBHS_Table NBHS_Table; struct NBHS_Table { _Atomic(NBHS_Table*) prev; uint32_t cap; // reciprocals to compute modulo uint64_t a, sh; // tracks how many entries have // been moved once we're resizing @@ -58,44 +90,73 @@ struct NBHS_Table { }; typedef struct { _Atomic(NBHS_Table*) latest; } NBHS; typedef struct NBHS_FreeQueueNode NBHS_FreeQueueNode; struct NBHS_FreeQueueNode { _Atomic(NBHS_FreeQueueNode*) next; NBHS_Table* table; }; static size_t nbhs_compute_cap(size_t y) { // minimum capacity if (y < 512) { y = 512; } else { y = ((y + 1) / 3) * 4; } size_t cap = 1ull << (64 - __builtin_clzll(y - 1)); return cap - (sizeof(NBHS_Table) / sizeof(void*)); } static void nbhs_compute_size(NBHS_Table* table, size_t cap) { // reciprocals to compute modulo #if defined(__GNUC__) || defined(__clang__) table->sh = 64 - __builtin_clzll(cap); #else uint64_t sh = 0; while (cap > (1ull << sh)){ sh++; } table->sh = sh; #endif table->sh += 63 - 64; #if (defined(__GNUC__) || defined(__clang__)) && defined(__x86_64__) uint64_t d,e; __asm__("divq %[v]" : "=a"(d), "=d"(e) : [v] "r"(cap), "a"(cap - 1), "d"(1ull << table->sh)); table->a = d; #elif defined(_MSC_VER) uint64_t rem; table->a = _udiv128(1ull << table->sh, cap - 1, cap, &rem); #else #error "Unsupported target" #endif table->cap = cap; } static NBHS nbhs_alloc(size_t initial_cap) { size_t cap = nbhs_compute_cap(initial_cap); NBHS_Table* table = NBHS_VIRTUAL_ALLOC(sizeof(NBHS_Table) + cap*sizeof(void*)); nbhs_compute_size(table, cap); return (NBHS){ .latest = table }; } static void nbhs_free(NBHS* hs) { NBHS_Table* curr = hs->latest; while (curr) { NBHS_Table* next = curr->prev; NBHS_VIRTUAL_FREE(curr, sizeof(NBHS_Table) + curr->cap*sizeof(void*)); curr = next; } } // for spooky stuff static void** nbhs_array(NBHS* hs) { return (void**) hs->latest->data; } static size_t nbhs_count(NBHS* hs) { return hs->latest->count; } static size_t nbhs_capacity(NBHS* hs) { return hs->latest->cap; } #define nbhs_for(it, hs) for (void **it = nbhs_array(hs), **_end_ = &it[nbhs_capacity(hs)]; it != _end_; it++) if (*it != NULL) #endif // NBHS_H @@ -105,36 +166,135 @@ _Thread_local bool nbhs_ebr_init; _Thread_local NBHS_EBREntry nbhs_ebr; _Atomic(int) nbhs_ebr_count; _Atomic(NBHS_EBREntry*) nbhs_ebr_list; static NBHS_FreeQueueNode NBHS_DUMMY = { 0 }; // freeing queue _Atomic(NBHS_FreeQueueNode*) nbhs_free_queue_write = &NBHS_DUMMY; _Atomic(NBHS_FreeQueueNode*) nbhs_free_queue_read = &NBHS_DUMMY; atomic_flag nbhs_thread_init; thrd_t nbhs_thread; int nbhs_thread_fn(void* arg) { int state_cap = 0; uint64_t* states = NULL; for (;;) retry: { // if we can move the read head, we can take the item NBHS_FreeQueueNode *old_reader, *next; do { old_reader = atomic_load_explicit(&nbhs_free_queue_read, memory_order_relaxed); next = old_reader->next; if (next == NULL) { // queue is empty goto retry; } } while (!atomic_compare_exchange_strong(&nbhs_free_queue_read, &old_reader, next)); // we can take *technically* sweet time resetting next_compile, it's not on the queue // rn so it's not visible to other threads. atomic_store_explicit(&next->next, NULL, memory_order_release); // ensure there's enough elements in the state array for this task int state_count = nbhs_ebr_count; if (state_cap < state_count) { state_cap = 1ull << (64 - __builtin_clzll(state_count - 1)); states = NBHS_REALLOC(states, state_cap * sizeof(uint64_t)); } // "snapshot" the current statuses, once the other threads either advance or aren't in the // hashset functions we know we can free. for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) { // mark sure no ptrs refer to prev if (list->id < state_count) { states[list->id] = list->time; } } // important bit is that pointers can't be held across the critical sections, they'd need // to reload from `NBHS.latest`. // // Here's the states of our "epoch" critical section thingy: // // UNPINNED(id) -> PINNED(id) -> UNPINNED(id + 1) -> UNPINNED(id + 1) -> ... // // survey on if we can free the pointer if the status changed from X -> Y: // // # YES: if we started unlocked then we weren't holding pointers in the first place. // UNPINNED(A) -> PINNED(A) // UNPINNED(A) -> UNPINNED(A) // UNPINNED(A) -> UNPINNED(B) // // # YES: if we're locked we need to wait until we've stopped holding pointers. // PINNED(A) -> PINNED(B) we're a different call so we've let it go by now. // PINNED(A) -> UNPINNED(B) we've stopped caring about the state of the pointer at this point. // // # NO: we're still doing shit, wait a sec. // PINNED(A) -> PINNED(A) // // these aren't quite blocking the other threads, we're simply checking what their progress is concurrently. for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) { if (list->id < state_count && (states[list->id] & NBHS_PINNED_BIT)) { uint64_t before_t = states[list->id], now_t; do { // idk, maybe this should be a better spinlock now_t = atomic_load(&list->time); } while (before_t == now_t); } } // no more refs, we can immediately free NBHS_Table* table = next->table; NBHS_VIRTUAL_FREE(table, sizeof(NBHS_Table) + table->cap*sizeof(void*)); NBHS_REALLOC(next, 0); } } #endif // NBHS_IMPL // Templated implementation #ifdef NBHS_FN extern _Thread_local bool nbhs_ebr_init; extern _Thread_local NBHS_EBREntry nbhs_ebr; extern _Atomic(int) nbhs_ebr_count; extern _Atomic(NBHS_EBREntry*) nbhs_ebr_list; extern _Atomic(NBHS_FreeQueueNode*) nbhs_free_queue_write; extern _Atomic(NBHS_FreeQueueNode*) nbhs_free_queue_read; extern atomic_flag nbhs_thread_init; extern int nbhs_thread_fn(void*); static size_t NBHS_FN(hash2index)(NBHS_Table* table, uint64_t h) { // MulHi(h, table->a) #if defined(__GNUC__) || defined(__clang__) uint64_t hi = (uint64_t) (((unsigned __int128)h * table->a) >> 64); #elif defined(_MSC_VER) uint64_t hi; _umul128(a, b, &hi); #else #error "Unsupported target" #endif uint64_t q = hi >> table->sh; uint64_t q2 = h - (q * table->cap); assert(q2 == h % table->cap); return q2; } static void* NBHS_FN(raw_lookup)(NBHS* hs, NBHS_Table* table, uint32_t h, void* val) { size_t cap = table->cap; size_t first = NBHS_FN(hash2index)(table, h), i = first; do { void* entry = atomic_load(&table->data[i]); if (entry == NULL) { return NULL; } else if (NBHS_FN(cmp)(entry, val)) { return entry; } // inc & wrap around i = (i == cap-1) ? 0 : i + 1; } while (i != first); return NULL; @@ -145,31 +305,31 @@ static void* NBHS_FN(raw_intern)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, void* result = NULL; uint32_t h = NBHS_FN(hash)(val); for (;;) { size_t cap = latest->cap; size_t limit = (cap * NBHS_LOAD_FACTOR) / 100; if (prev == NULL && latest->count >= limit) { // make resized table, we'll amortize the moves upward size_t new_cap = nbhs_compute_cap(limit*2); NBHS_Table* new_top = NBHS_VIRTUAL_ALLOC(sizeof(NBHS_Table) + new_cap*sizeof(void*)); nbhs_compute_size(new_top, new_cap); // CAS latest -> new_table, if another thread wins the race we'll use its table new_top->prev = latest; if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) { NBHS_VIRTUAL_FREE(new_top, sizeof(NBHS_Table) + new_cap*sizeof(void*)); prev = atomic_load(&latest->prev); } else { prev = latest; latest = new_top; // float s = sizeof(NBHS_Table) + new_cap*sizeof(void*); // printf("Resize: %.2f KiB (cap=%zu)\n", s / 1024.0f, new_cap); } continue; } size_t first = NBHS_FN(hash2index)(latest, h), i = first; do { void* entry = atomic_load(&latest->data[i]); if (entry == NULL) { @@ -197,7 +357,8 @@ static void* NBHS_FN(raw_intern)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, return entry; } // inc & wrap around i = (i == cap-1) ? 0 : i + 1; } while (i != first); // if the table changed before our eyes, it means someone resized which sucks @@ -214,9 +375,9 @@ static void* NBHS_FN(raw_intern)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, void NBHS_FN(raw_insert)(NBHS* hs, void* val) { NBHS_Table* table = hs->latest; size_t cap = table->cap; uint32_t h = NBHS_FN(hash)(val); size_t first = NBHS_FN(hash2index)(table, h), i = first; do { void* entry = atomic_load_explicit(&table->data[i], memory_order_relaxed); if (entry == NULL) { @@ -226,27 +387,29 @@ void NBHS_FN(raw_insert)(NBHS* hs, void* val) { } assert(!NBHS_FN(cmp)((void*) entry, val)); // inc & wrap around i = (i == cap-1) ? 0 : i + 1; } while (i != first); abort(); } // flips the top bit on static void NBHS_FN(enter_pinned)(void) { uint64_t t = atomic_load_explicit(&nbhs_ebr.time, memory_order_relaxed); atomic_store_explicit(&nbhs_ebr.time, t + NBHS_PINNED_BIT, memory_order_release); } // flips the top bit off AND increments time by one static void NBHS_FN(exit_pinned)(void) { uint64_t t = atomic_load_explicit(&nbhs_ebr.time, memory_order_relaxed); atomic_store_explicit(&nbhs_ebr.time, t + NBHS_PINNED_BIT + 1, memory_order_release); } NBHS_Table* NBHS_FN(move_items)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int items_to_move) { assert(prev); size_t cap = prev->cap; // snatch up some number of items uint32_t old, new; @@ -282,67 +445,30 @@ NBHS_Table* NBHS_FN(move_items)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, NBHS__BEGIN("detach"); latest->prev = NULL; if (!atomic_flag_test_and_set(&nbhs_thread_init)) { // spawn our lovely "GC" thread thrd_create(&nbhs_thread, nbhs_thread_fn, NULL); } NBHS_FreeQueueNode* node = NBHS_REALLOC(NULL, sizeof(NBHS_FreeQueueNode)); node->next = NULL; node->table = prev; // queue into the "GC" thread // .next: NULL -> fn NBHS_FreeQueueNode *null, *old_writer; do { old_writer = atomic_load_explicit(&nbhs_free_queue_write, memory_order_relaxed); } while (null = NULL, !atomic_compare_exchange_strong(&nbhs_free_queue_write->next, &null, node)); // once this store happens, the other threads can start submitting again atomic_store_explicit(&nbhs_free_queue_write, node, memory_order_release); NBHS__END(); } return prev; } static void NBHS_FN(ebr_try_init)(void) { if (!nbhs_ebr_init) { NBHS__BEGIN("init"); nbhs_ebr_init = true; @@ -357,9 +483,16 @@ void* NBHS_FN(get)(NBHS* hs, void* val) { } while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr)); NBHS__END(); } } void* NBHS_FN(get)(NBHS* hs, void* val) { NBHS__BEGIN("intern"); assert(val); NBHS_FN(ebr_try_init)(); // modifying the tables is possible now. NBHS_FN(enter_pinned)(); NBHS_Table* latest = atomic_load(&hs->latest); // if there's earlier versions of the table we can move up entries as we go along. @@ -373,12 +506,10 @@ void* NBHS_FN(get)(NBHS* hs, void* val) { // just lookup into the tables, we don't need to reserve // actually lookup & insert void* result = NULL; uint32_t cap = latest->cap; uint32_t h = NBHS_FN(hash)(val); size_t first = NBHS_FN(hash2index)(latest, h), i = first; do { void* entry = atomic_load(&latest->data[i]); if (entry == NULL) { @@ -395,10 +526,11 @@ void* NBHS_FN(get)(NBHS* hs, void* val) { break; } // inc & wrap around i = (i == cap-1) ? 0 : i + 1; } while (i != first); NBHS_FN(exit_pinned)(); NBHS__END(); return result; } @@ -407,22 +539,9 @@ void* NBHS_FN(intern)(NBHS* hs, void* val) { NBHS__BEGIN("intern"); assert(val); NBHS_FN(ebr_try_init)(); NBHS_FN(enter_pinned)(); NBHS_Table* latest = atomic_load(&hs->latest); // if there's earlier versions of the table we can move up entries as we go along. @@ -436,37 +555,23 @@ void* NBHS_FN(intern)(NBHS* hs, void* val) { void* result = NBHS_FN(raw_intern)(hs, latest, prev, val); NBHS_FN(exit_pinned)(); NBHS__END(); return result; } // waits for all items to be moved up before continuing void NBHS_FN(resize_barrier)(NBHS* hs) { NBHS__BEGIN("intern"); NBHS_FN(ebr_try_init)(); NBHS_FN(enter_pinned)(); NBHS_Table *prev, *latest = atomic_load(&hs->latest); while (prev = atomic_load(&latest->prev), prev != NULL) { NBHS_FN(move_items)(hs, latest, prev, prev->cap); } NBHS_FN(exit_pinned)(); NBHS__END(); } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -123,45 +123,31 @@ static void* test_thread_fn(void* arg) uintptr_t starting_id = (uintptr_t) arg; uint64_t seed = starting_id * 11400714819323198485ULL; int* stats = &thread_stats[starting_id*16]; uint32_t* arr = malloc(attempts * sizeof(uint32_t)); #if USE_SPALL spall_auto_thread_init(starting_id, SPALL_DEFAULT_BUFFER_SIZE); spall_auto_buffer_begin("work", 4, NULL, 0); #endif if (testing_lhs) { for (int i = 0; i < attempts; i++) { arr[i] = pcg32_pie(&seed) & 0xFFFF; if (lhs_intern(test_lhs, &arr[i]) == &arr[i]) { stats[0] += 1; // insertions } else { stats[1] += 1; // duplicate } } } else { for (int i = 0; i < attempts; i++) { arr[i] = pcg32_pie(&seed) & 0xFFFF; if (my_intern(&test_set, &arr[i]) == &arr[i]) { stats[0] += 1; // insertions } else { stats[1] += 1; // duplicate } } } @@ -201,7 +187,7 @@ int main(int argc, char** argv) { InitializeCriticalSection(&test_lhs->lock); #endif } else { test_set = nbhs_alloc(32); } #ifdef _WIN32 -
RealNeGate revised this gist
Nov 8, 2024 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -428,7 +428,7 @@ void* NBHS_FN(intern)(NBHS* hs, void* val) { // if there's earlier versions of the table we can move up entries as we go along. NBHS_Table* prev = atomic_load(&latest->prev); if (prev) { prev = NBHS_FN(move_items)(hs, latest, prev, NBHS_MOVE_AMOUNT); if (prev == NULL) { latest = atomic_load(&hs->latest); } -
RealNeGate revised this gist
Nov 8, 2024 . 2 changed files with 241 additions and 173 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -3,52 +3,46 @@ //////////////////////////////// // You wanna intern lots of things on lots of cores? this is for you. It's // inspired by Cliff's non-blocking hashmap. // // To use it, you'll need to define NBHS_FN and then include the header: // // #define NBHS_FN(n) XXX_hs_ ## n // #include <nbhs.h> // // This will compile implementations of the hashset using // // bool NBHS_FN(cmp)(const void* a, const void* b); // uint32_t NBHS_FN(hash)(const void* a); // // The exported functions are: // // void* NBHS_FN(get)(NBHS* hs, void* val); // void* NBHS_FN(intern)(NBHS* hs, void* val); // void NBHS_FN(resize_barrier)(NBHS* hs); // #ifndef NBHS_H #define NBHS_H typedef void* (*NBHS_AllocZeroMem)(size_t size); typedef void (*NBHS_FreeMem)(void* ptr, size_t size); // for the time in the ebr entry #define NBHS_LOCKED_BIT (1ull << 63ull) enum { NBHS_LOAD_FACTOR = 75, NBHS_MOVE_AMOUNT = 512, }; typedef struct NBHS_EBREntry { _Atomic(struct NBHS_EBREntry*) next; _Atomic(uint64_t) time; // keep on a separate cacheline to avoid false sharing _Alignas(64) int id; } NBHS_EBREntry; typedef struct NBHS_Table NBHS_Table; struct NBHS_Table { _Atomic(NBHS_Table*) prev; @@ -63,30 +57,81 @@ struct NBHS_Table { _Atomic(void*) data[]; }; typedef struct { NBHS_AllocZeroMem alloc_mem; NBHS_FreeMem free_mem; _Atomic(NBHS_Table*) latest; } NBHS; static void* nbhs__alloc_zero_mem(size_t s) { return calloc(1, s); } static void nbhs__free_mem(void* ptr, size_t s) { free(ptr); } static NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem free_mem) { if (alloc_mem == NULL) { assert(free_mem == NULL); alloc_mem = nbhs__alloc_zero_mem; free_mem = nbhs__free_mem; } size_t exp = 64 - __builtin_clzll(initial_cap - 1); NBHS_Table* table = alloc_mem(sizeof(NBHS_Table) + (1ull << exp)*sizeof(void*)); table->exp = exp; return (NBHS){ .alloc_mem = alloc_mem, .free_mem = free_mem, .latest = table }; } static void nbhs_free(NBHS* hs) { NBHS_Table* curr = hs->latest; while (curr) { NBHS_Table* next = curr->prev; hs->free_mem(curr, sizeof(NBHS_Table) + (1ull*curr->exp)*sizeof(void*)); curr = next; } } // for spooky stuff static void** nbhs_array(NBHS* hs) { return (void**) hs->latest->data; } static size_t nbhs_count(NBHS* hs) { return hs->latest->count; } static size_t nbhs_capacity(NBHS* hs) { return 1ull << hs->latest->exp; } #define nbhs_for(it, hs) for (void **it = nbhs_array(hs), **_end_ = &it[nbhs_capacity(hs)]; it != _end_; it++) if (*it != NULL) #endif // NBHS_H #ifdef NBHS_IMPL _Thread_local bool nbhs_ebr_init; _Thread_local NBHS_EBREntry nbhs_ebr; _Atomic(int) nbhs_ebr_count; _Atomic(NBHS_EBREntry*) nbhs_ebr_list; #endif // NBHS_IMPL // Templated implementation #ifdef NBHS_FN // personal debooging stuff #if 1 #define NBHS__BEGIN(name) #define NBHS__END() #else #define NBHS__BEGIN(name) spall_auto_buffer_begin(name, sizeof(name) - 1, NULL, 0) #define NBHS__END() spall_auto_buffer_end() #endif extern _Thread_local bool nbhs_ebr_init; extern _Thread_local NBHS_EBREntry nbhs_ebr; extern _Atomic(int) nbhs_ebr_count; extern _Atomic(NBHS_EBREntry*) nbhs_ebr_list; static void* NBHS_FN(raw_lookup)(NBHS* hs, NBHS_Table* table, uint32_t h, void* val) { size_t mask = (1 << table->exp) - 1; size_t first = h & mask, i = first; do { void* entry = atomic_load(&table->data[i]); if (entry == NULL) { return NULL; } else if (NBHS_FN(cmp)(entry, val)) { return entry; } i = (i + 1) & mask; @@ -95,28 +140,46 @@ static void* nbhs_raw_lookup(NBHS* hs, NBHS_Table* table, uint32_t h, void* val) return NULL; } static void* NBHS_FN(raw_intern)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, void* val) { // actually lookup & insert void* result = NULL; uint32_t h = NBHS_FN(hash)(val); for (;;) { uint32_t exp = latest->exp; size_t limit = ((1ull << exp) * NBHS_LOAD_FACTOR) / 100; if (prev == NULL && latest->count >= limit) { // make resized table, we'll amortize the moves upward size_t new_cap = 1ull << (exp + 1); NBHS_Table* new_top = hs->alloc_mem(sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t)); new_top->exp = exp + 1; // CAS latest -> new_table, if another thread wins the race we'll use its table new_top->prev = latest; if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) { hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(void*)); prev = atomic_load(&latest->prev); } else { prev = latest; latest = new_top; // printf("Resize!!! %zu -> %zu (threshold=%zu)\n", new_cap / 2, new_cap, limit); } continue; } size_t mask = (1ull << exp) - 1; size_t first = h & mask, i = first; do { void* entry = atomic_load(&latest->data[i]); if (entry == NULL) { void* to_write = val; if (__builtin_expect(prev != NULL, 0)) { assert(prev->prev == NULL); void* old = NBHS_FN(raw_lookup)(hs, prev, h, val); if (old != NULL) { to_write = old; } } // fight to be the one to land into the modern table @@ -130,29 +193,10 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi } } if (NBHS_FN(cmp)(entry, val)) { return entry; } i = (i + 1) & mask; } while (i != first); @@ -168,9 +212,9 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi } } void NBHS_FN(raw_insert)(NBHS* hs, void* val) { NBHS_Table* table = hs->latest; uint32_t h = NBHS_FN(hash)(val); size_t mask = (1 << table->exp) - 1; size_t first = h & mask, i = first; do { @@ -181,47 +225,26 @@ void nbhs_raw_insert(NBHS* hs, void* val) { return; } assert(!NBHS_FN(cmp)((void*) entry, val)); i = (i + 1) & mask; } while (i != first); abort(); } // flips the top bit on static void NBHS_FN(enter_critsec)(void) { uint64_t t = atomic_load_explicit(&nbhs_ebr.time, memory_order_relaxed); atomic_store_explicit(&nbhs_ebr.time, t + NBHS_LOCKED_BIT, memory_order_release); } // flips the top bit off AND increments time by one static void NBHS_FN(exit_critsec)(void) { uint64_t t = atomic_load_explicit(&nbhs_ebr.time, memory_order_relaxed); atomic_store_explicit(&nbhs_ebr.time, t + NBHS_LOCKED_BIT + 1, memory_order_release); } NBHS_Table* NBHS_FN(move_items)(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int items_to_move) { assert(prev); size_t cap = 1ull << prev->exp; @@ -235,13 +258,17 @@ NBHS_Table* nbhs_move_items(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int if (new > cap) { new = cap; } } while (!atomic_compare_exchange_strong(&prev->moved, &(uint32_t){ old }, new)); if (old == new) { return prev; } NBHS__BEGIN("copying old"); for (size_t i = old; i < new; i++) { // either NULL or complete can go thru without waiting void* old_p = atomic_load(&prev->data[i]); if (old_p) { // we pass NULL for prev since we already know the entries exist in prev NBHS_FN(raw_intern)(hs, latest, NULL, old_p); } } NBHS__END(); @@ -253,14 +280,14 @@ NBHS_Table* nbhs_move_items(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int if (done == cap) { // dettach now NBHS__BEGIN("detach"); latest->prev = NULL; // since we're freeing at the moment, we don't want to block up other freeing threads NBHS_FN(exit_critsec)(); NBHS__BEGIN("scan"); int state_count = nbhs_ebr_count; uint64_t* states = hs->alloc_mem(state_count * 2 * sizeof(uint64_t)); // check current state, once the other threads either advance or aren't in the // lookup function we know we can free. @@ -273,6 +300,18 @@ NBHS_Table* nbhs_move_items(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int } // wait on each and every thread to make progress or not be in a critical section at the time. // // UNLOCKED(id) -> LOCKED(id) -> UNLOCKED(id + 1) // // survey on if we can free the pointer and no // one would be holding it at that time: // // UNLOCKED(A) != LOCKED(A) we used to be unlocked which means we couldn't have held the pointer. // // LOCKED(A) != LOCKED(B) we're a different call so we've let it go by now // // LOCKED(A) != UNLOCKED(A) we've stopped caring about the state of the pointer at this point. // for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) { // mark sure no ptrs refer to prev if (us != list && list->id < state_count && (states[list->id] & NBHS_LOCKED_BIT)) { @@ -281,24 +320,26 @@ NBHS_Table* nbhs_move_items(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int // idk, maybe this should be a better spinlock now_t = atomic_load(&list->time); } while (before_t == now_t); states[state_count + list->id] = now_t; } } hs->free_mem(states, state_count * sizeof(uint64_t)); NBHS__END(); // no more refs, we can immediately free memset(prev, 0xCC, sizeof(NBHS_Table) + (1ull<<prev->exp)*sizeof(void*)); // hs->free_mem(prev, sizeof(NBHS_Table) + (1ull<<prev->exp)*sizeof(void*)); NBHS_FN(enter_critsec)(); prev = NULL; NBHS__END(); } return prev; } void* NBHS_FN(get)(NBHS* hs, void* val) { NBHS__BEGIN("intern"); assert(val); @@ -318,14 +359,13 @@ void* nbhs_get(NBHS* hs, void* val) { } // modifying the tables is possible now. NBHS_FN(enter_critsec)(); NBHS_Table* latest = atomic_load(&hs->latest); // if there's earlier versions of the table we can move up entries as we go along. NBHS_Table* prev = atomic_load(&latest->prev); if (prev) { prev = NBHS_FN(move_items)(hs, latest, prev, NBHS_MOVE_AMOUNT); if (prev == NULL) { latest = atomic_load(&hs->latest); } @@ -337,33 +377,33 @@ void* nbhs_get(NBHS* hs, void* val) { size_t mask = (1 << exp) - 1; void* result = NULL; uint32_t h = NBHS_FN(hash)(val); size_t first = h & mask, i = first; do { void* entry = atomic_load(&latest->data[i]); if (entry == NULL) { NBHS_Table* p = prev; while (p != NULL) { result = NBHS_FN(raw_lookup)(hs, prev, h, val); p = atomic_load_explicit(&p->prev, memory_order_relaxed); } break; } if (NBHS_FN(cmp)(entry, val)) { result = entry; break; } i = (i + 1) & mask; } while (i != first); NBHS_FN(exit_critsec)(); NBHS__END(); return result; } void* NBHS_FN(intern)(NBHS* hs, void* val) { NBHS__BEGIN("intern"); assert(val); @@ -382,28 +422,27 @@ void* nbhs_intern(NBHS* hs, void* val) { } // enter critical section, modifying the tables is possible now. NBHS_FN(enter_critsec)(); NBHS_Table* latest = atomic_load(&hs->latest); // if there's earlier versions of the table we can move up entries as we go along. NBHS_Table* prev = atomic_load(&latest->prev); if (prev) { prev = NBHS_FN(move_items)(hs, latest, prev, 1ull << prev->exp); if (prev == NULL) { latest = atomic_load(&hs->latest); } } void* result = NBHS_FN(raw_intern)(hs, latest, prev, val); NBHS_FN(exit_critsec)(); NBHS__END(); return result; } // waits for all items to be moved up before continuing void NBHS_FN(resize_barrier)(NBHS* hs) { NBHS__BEGIN("intern"); if (!nbhs_ebr_init) { NBHS__BEGIN("init"); @@ -420,15 +459,16 @@ void nbhs_resize_barrier(NBHS* hs) { } // enter critical section, modifying the tables is possible now. NBHS_FN(enter_critsec)(); NBHS_Table *prev, *latest = atomic_load(&hs->latest); while (prev = atomic_load(&latest->prev), prev != NULL) { NBHS_FN(move_items)(hs, latest, prev, 1ull << prev->exp); } NBHS_FN(exit_critsec)(); NBHS__END(); } #undef NBHS_FN #endif // NBHS_FN This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -16,7 +16,7 @@ #include <pthread.h> #endif #define USE_SPALL 1 #if USE_SPALL #include "spall_native_auto.h" @@ -25,61 +25,78 @@ #define spall_auto_buffer_end(...) #endif static int num_threads; static uint32_t my_hash(const void* a) { const uint8_t* data = a; uint32_t h = 0x811C9DC5; for (size_t i = 0; i < 4; i++) { h = (data[i] ^ h) * 0x01000193; } return h; } static bool my_cmp(const void* a, const void* b) { return *(const uint32_t*)a == *(const uint32_t*)b; } #define NBHS_IMPL #define NBHS_FN(n) my_ ## n #include "nbhs.h" typedef struct { #ifdef _WIN32 CRITICAL_SECTION lock; #else pthread_mutex_t lock; #endif size_t exp; void* data[]; } LockedHS; void* lhs_intern(LockedHS* hs, void* val) { NBHS__BEGIN("intern"); if (num_threads > 1) { #ifdef _WIN32 EnterCriticalSection(&hs->lock); #else pthread_mutex_lock(&hs->lock); #endif } // actually lookup & insert uint32_t exp = hs->exp; size_t mask = (1 << exp) - 1; void* result = NULL; uint32_t h = my_hash(val); size_t first = h & mask, i = first; do { if (hs->data[i] == NULL) { hs->data[i] = result = val; break; } else if (my_cmp(hs->data[i], val)) { result = hs->data[i]; break; } i = (i + 1) & mask; } while (i != first); assert(result != NULL); if (num_threads > 1) { #ifdef _WIN32 LeaveCriticalSection(&hs->lock); #else pthread_mutex_unlock(&hs->lock); #endif } NBHS__END(); return result; } // https://github.com/demetri/scribbles/blob/master/randomness/prngs.c uint32_t pcg32_pie(uint64_t *state) { uint64_t old = *state ^ 0xc90fdaa2adf85459ULL; @@ -115,13 +132,20 @@ static void* test_thread_fn(void* arg) uint32_t* arr = malloc(attempts * sizeof(uint32_t)); if (testing_lhs) { for (int i = 0; i < attempts;) { int limit = i + 128; if (limit > attempts) { limit = attempts; } spall_auto_buffer_begin("batch", 5, NULL, 0); for (; i < limit; i++) { arr[i] = pcg32_pie(&seed) & 0xFFFF; if (lhs_intern(test_lhs, &arr[i]) == &arr[i]) { stats[0] += 1; // insertions } else { stats[1] += 1; // duplicate } } spall_auto_buffer_end(); } } else { for (int i = 0; i < attempts;) { @@ -131,7 +155,7 @@ static void* test_thread_fn(void* arg) spall_auto_buffer_begin("batch", 5, NULL, 0); for (; i < limit; i++) { arr[i] = pcg32_pie(&seed) & 0xFFFF; if (my_intern(&test_set, &arr[i]) == &arr[i]) { stats[0] += 1; // insertions } else { stats[1] += 1; // duplicate @@ -150,6 +174,8 @@ static void* test_thread_fn(void* arg) } int main(int argc, char** argv) { printf("%p\n", argv); #if USE_SPALL spall_auto_init((char *)"profile.spall"); spall_auto_thread_init(0, SPALL_DEFAULT_BUFFER_SIZE); @@ -164,16 +190,18 @@ int main(int argc, char** argv) { } // attempts = 1000000000 / threads; attempts = 10000000 / num_threads; thread_stats = calloc(num_threads, 64 * sizeof(int)); if (testing_lhs) { test_lhs = calloc(sizeof(LockedHS) + 262144*sizeof(void*), 1); test_lhs->exp = 18; #ifdef _WIN32 InitializeCriticalSection(&test_lhs->lock); #endif } else { test_set = nbhs_alloc(32, NULL, NULL); } #ifdef _WIN32 -
RealNeGate revised this gist
Oct 16, 2024 . 1 changed file with 41 additions and 42 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -3,8 +3,6 @@ //////////////////////////////// // You wanna intern lots of things on lots of cores? this is for you. It's // inspired by Cliff's non-blocking hashmap. #ifndef NBHS_H #define NBHS_H @@ -38,14 +36,16 @@ void nbhs_resize_barrier(NBHS* hs); #ifdef NBHS_IMPL // personal debooging stuff #if 0 #define NBHS__BEGIN(name) #define NBHS__END() #else #define NBHS__BEGIN(name) spall_auto_buffer_begin(name, sizeof(name) - 1, NULL, 0) #define NBHS__END() spall_auto_buffer_end() #endif enum { NBHS_PROBE_LIMIT = 5 }; // for the time in the ebr entry #define NBHS_LOCKED_BIT (1ull << 63ull) @@ -96,45 +96,27 @@ static void* nbhs_raw_lookup(NBHS* hs, NBHS_Table* table, uint32_t h, void* val) } static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, void* val) { // actually lookup & insert void* result = NULL; uint32_t h = hs->hash(val); for (;;) retry: { int probe = 0; uint32_t exp = latest->exp; size_t mask = (1ull << exp) - 1; size_t first = h & mask, i = first; do { void* entry = atomic_load(&latest->data[i]); if (entry == NULL) { void* to_write = val; NBHS_Table* p = prev; while (p != NULL) { void* old = nbhs_raw_lookup(hs, p, h, val); if (old != NULL) { to_write = old; break; } p = atomic_load_explicit(&p->prev, memory_order_relaxed); } // fight to be the one to land into the modern table @@ -152,6 +134,25 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi return entry; } if (++probe >= NBHS_PROBE_LIMIT) { // make resized table, we'll amortize the moves upward size_t new_cap = 1ull << (exp + 1); NBHS_Table* new_top = hs->alloc_mem(sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t)); new_top->exp = exp + 1; // CAS latest -> new_table, if another thread wins the race we'll use its table new_top->prev = latest; if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) { hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(void*)); prev = atomic_load(&latest->prev); } else { prev = latest; latest = new_top; } goto retry; } i = (i + 1) & mask; } while (i != first); @@ -252,9 +253,7 @@ NBHS_Table* nbhs_move_items(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int if (done == cap) { // dettach now NBHS__BEGIN("detach"); latest->prev = prev->prev; // since we're freeing at the moment, we don't want to block up other freeing threads nbhs_exit_critsec(); @@ -306,6 +305,7 @@ void* nbhs_get(NBHS* hs, void* val) { if (!nbhs_ebr_init) { NBHS__BEGIN("init"); nbhs_ebr_init = true; nbhs_ebr.id = nbhs_ebr_count++; // add to ebr list, we never free this because i don't care // TODO(NeGate): i do care, this is a nightmare when threads die figure it out @@ -314,7 +314,6 @@ void* nbhs_get(NBHS* hs, void* val) { old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed); nbhs_ebr.next = old; } while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr)); NBHS__END(); } @@ -326,7 +325,7 @@ void* nbhs_get(NBHS* hs, void* val) { // if there's earlier versions of the table we can move up entries as we go along. NBHS_Table* prev = atomic_load(&latest->prev); if (prev) { prev = nbhs_move_items(hs, latest, prev, 64); if (prev == NULL) { latest = atomic_load(&hs->latest); } @@ -343,10 +342,10 @@ void* nbhs_get(NBHS* hs, void* val) { do { void* entry = atomic_load(&latest->data[i]); if (entry == NULL) { NBHS_Table* p = prev; while (p != NULL) { result = nbhs_raw_lookup(hs, prev, h, val); p = atomic_load_explicit(&p->prev, memory_order_relaxed); } break; } @@ -371,14 +370,14 @@ void* nbhs_intern(NBHS* hs, void* val) { if (!nbhs_ebr_init) { NBHS__BEGIN("init"); nbhs_ebr_init = true; nbhs_ebr.id = nbhs_ebr_count++; // add to ebr list, we never free this because i don't care NBHS_EBREntry* old; do { old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed); nbhs_ebr.next = old; } while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr)); NBHS__END(); } @@ -390,7 +389,7 @@ void* nbhs_intern(NBHS* hs, void* val) { // if there's earlier versions of the table we can move up entries as we go along. NBHS_Table* prev = atomic_load(&latest->prev); if (prev) { prev = nbhs_move_items(hs, latest, prev, 64); if (prev == NULL) { latest = atomic_load(&hs->latest); } @@ -409,14 +408,14 @@ void nbhs_resize_barrier(NBHS* hs) { if (!nbhs_ebr_init) { NBHS__BEGIN("init"); nbhs_ebr_init = true; nbhs_ebr.id = nbhs_ebr_count++; // add to ebr list, we never free this because i don't care NBHS_EBREntry* old; do { old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed); nbhs_ebr.next = old; } while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr)); NBHS__END(); } -
RealNeGate revised this gist
Oct 11, 2024 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -4,7 +4,7 @@ // You wanna intern lots of things on lots of cores? this is for you. It's // inspired by Cliff's non-blocking hashmap. // // Interning should be wait-free given the hashset's not completely full (wait-free enough :p) #ifndef NBHS_H #define NBHS_H -
RealNeGate revised this gist
Oct 4, 2024 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -109,6 +109,7 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi new_top->prev = latest; if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) { hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(void*)); prev = atomic_load(&latest->prev); } else { prev = latest; latest = new_top; -
RealNeGate revised this gist
Oct 3, 2024 . 1 changed file with 2 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -3,6 +3,8 @@ //////////////////////////////// // You wanna intern lots of things on lots of cores? this is for you. It's // inspired by Cliff's non-blocking hashmap. // // Interning should be wait-free given the hashset's not completely full #ifndef NBHS_H #define NBHS_H -
RealNeGate revised this gist
Oct 1, 2024 . 1 changed file with 220 additions and 132 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -24,23 +24,26 @@ typedef struct { NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem free_mem, NBHS_Compare compare, NBHS_Hash hash); void nbhs_free(NBHS* hs); void* nbhs_intern(NBHS* hs, void* val); void* nbhs_get(NBHS* hs, void* val); // thread-unsafe insert, useful during init since it's faster // than the thread-safe stuff. void nbhs_raw_insert(NBHS* hs, void* val); void nbhs_resize_barrier(NBHS* hs); #endif // NBHS_H #ifdef NBHS_IMPL // personal debooging stuff #if 1 #define NBHS__BEGIN(name) #define NBHS__END() #else #define NBHS__BEGIN(name) spall_auto_buffer_begin(name, sizeof(name) - 1, NULL, 0) #define NBHS__END() spall_auto_buffer_end() #endif // for the time in the ebr entry #define NBHS_LOCKED_BIT (1ull << 63ull) @@ -55,7 +58,7 @@ struct NBHS_Table { _Atomic uint32_t move_done; _Atomic uint32_t count; _Atomic(void*) data[]; }; typedef struct NBHS_EBREntry { @@ -74,46 +77,22 @@ static _Atomic(NBHS_EBREntry*) nbhs_ebr_list; static void* nbhs__alloc_zero_mem(size_t s) { return calloc(1, s); } static void nbhs__free_mem(void* ptr, size_t s) { free(ptr); } static void* nbhs_raw_lookup(NBHS* hs, NBHS_Table* table, uint32_t h, void* val) { size_t mask = (1 << table->exp) - 1; size_t first = h & mask, i = first; do { void* entry = atomic_load(&table->data[i]); if (entry == NULL) { return NULL; } else if (hs->compare(entry, val)) { return entry; } i = (i + 1) & mask; } while (i != first); return NULL; } static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, void* val) { // resize on 50% load factor and we're not in the moving process rn uint32_t threshold = (1 << latest->exp) / 2; @@ -127,15 +106,13 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi // CAS latest -> new_table, if another thread wins the race we'll use its table new_top->prev = latest; if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) { hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(void*)); } else { prev = latest; latest = new_top; } } // actually lookup & insert uint32_t exp = latest->exp; size_t mask = (1 << exp) - 1; @@ -145,40 +122,31 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi for (;;) { size_t first = h & mask, i = first; do { void* entry = atomic_load(&latest->data[i]); if (entry == NULL) { void* to_write = val; if (prev != NULL) { // should only be one previous table assert(prev->prev == NULL); void* old = nbhs_raw_lookup(hs, prev, h, val); if (old != NULL) { to_write = old; } } // fight to be the one to land into the modern table if (atomic_compare_exchange_strong(&latest->data[i], &entry, to_write)) { result = to_write; // count doesn't care that it's a migration, it's at least not replacing an existing // slot in this version of the table. atomic_fetch_add_explicit(&latest->count, 1, memory_order_relaxed); break; } } if (hs->compare(entry, val)) { return entry; } i = (i + 1) & mask; @@ -196,6 +164,26 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi } } void nbhs_raw_insert(NBHS* hs, void* val) { NBHS_Table* table = hs->latest; uint32_t h = hs->hash(val); size_t mask = (1 << table->exp) - 1; size_t first = h & mask, i = first; do { void* entry = atomic_load_explicit(&table->data[i], memory_order_relaxed); if (entry == NULL) { atomic_store_explicit(&table->data[i], val, memory_order_relaxed); atomic_fetch_add_explicit(&table->count, 1, memory_order_relaxed); return; } assert(!hs->compare((void*) entry, val)); i = (i + 1) & mask; } while (i != first); abort(); } NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem free_mem, NBHS_Compare compare, NBHS_Hash hash) { if (alloc_mem == NULL) { assert(free_mem == NULL); @@ -204,7 +192,7 @@ NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem fr } size_t exp = 64 - __builtin_clzll(initial_cap - 1); NBHS_Table* table = alloc_mem(sizeof(NBHS_Table) + (1ull << exp)*sizeof(void*)); table->exp = exp; return (NBHS){ .alloc_mem = alloc_mem, @@ -219,7 +207,7 @@ void nbhs_free(NBHS* hs) { NBHS_Table* curr = hs->latest; while (curr) { NBHS_Table* next = curr->prev; hs->free_mem(curr, sizeof(NBHS_Table) + (1ull*curr->exp)*sizeof(void*)); curr = next; } } @@ -229,13 +217,95 @@ static void nbhs_enter_critsec(void) { atomic_fetch_add(&nbhs_ebr.time, NBHS_LOC // flips the top bit off AND increments time by one static void nbhs_exit_critsec(void) { atomic_fetch_add(&nbhs_ebr.time, NBHS_LOCKED_BIT + 1); } NBHS_Table* nbhs_move_items(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, int items_to_move) { assert(prev); size_t cap = 1ull << prev->exp; // snatch up some number of items uint32_t old, new; do { old = atomic_load(&prev->moved); if (old == cap) { return prev; } // cap the number of items to copy... by the cap new = old + items_to_move; if (new > cap) { new = cap; } } while (!atomic_compare_exchange_strong(&prev->moved, &(uint32_t){ old }, new)); NBHS__BEGIN("copying old"); for (size_t i = old; i < new; i++) { // either NULL or complete can go thru without waiting void* old_p = atomic_load(&prev->data[i]); if (old_p) { // we pass NULL for prev since we already know the entries exist in prev nbhs_raw_intern(hs, latest, NULL, old_p); } } NBHS__END(); uint32_t done = atomic_fetch_add(&prev->move_done, new - old); done += new - old; assert(done <= cap); if (done == cap) { // dettach now NBHS__BEGIN("detach"); assert(prev->prev == NULL); latest->prev = NULL; // since we're freeing at the moment, we don't want to block up other freeing threads nbhs_exit_critsec(); NBHS__BEGIN("scan"); int state_count = nbhs_ebr_count; uint64_t* states = hs->alloc_mem(state_count * sizeof(uint64_t)); // check current state, once the other threads either advance or aren't in the // lookup function we know we can free. NBHS_EBREntry* us = &nbhs_ebr; for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) { // mark sure no ptrs refer to prev if (us != list && list->id < state_count) { states[list->id] = list->time; } } // wait on each and every thread to make progress or not be in a critical section at the time. for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) { // mark sure no ptrs refer to prev if (us != list && list->id < state_count && (states[list->id] & NBHS_LOCKED_BIT)) { uint64_t before_t = states[list->id], now_t; do { // idk, maybe this should be a better spinlock now_t = atomic_load(&list->time); } while (before_t == now_t); } } hs->free_mem(states, state_count * sizeof(uint64_t)); NBHS__END(); // no more refs, we can immediately free hs->free_mem(prev, sizeof(NBHS_Table) + (1ull<<prev->exp)*sizeof(void*)); nbhs_enter_critsec(); prev = NULL; NBHS__END(); } return prev; } void* nbhs_get(NBHS* hs, void* val) { NBHS__BEGIN("intern"); assert(val); if (!nbhs_ebr_init) { NBHS__BEGIN("init"); nbhs_ebr_init = true; // add to ebr list, we never free this because i don't care // TODO(NeGate): i do care, this is a nightmare when threads die figure it out NBHS_EBREntry* old; do { old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed); @@ -245,93 +315,82 @@ void* nbhs_intern(NBHS* hs, void* val) { NBHS__END(); } // modifying the tables is possible now. nbhs_enter_critsec(); NBHS_Table* latest = atomic_load(&hs->latest); // if there's earlier versions of the table we can move up entries as we go along. NBHS_Table* prev = atomic_load(&latest->prev); if (prev) { prev = nbhs_move_items(hs, latest, prev, 32); if (prev == NULL) { latest = atomic_load(&hs->latest); } } // just lookup into the tables, we don't need to reserve // actually lookup & insert uint32_t exp = latest->exp; size_t mask = (1 << exp) - 1; void* result = NULL; uint32_t h = hs->hash(val); size_t first = h & mask, i = first; do { void* entry = atomic_load(&latest->data[i]); if (entry == NULL) { if (prev != NULL) { // should only be one previous table assert(prev->prev == NULL); result = nbhs_raw_lookup(hs, prev, h, val); } break; } if (hs->compare(entry, val)) { result = entry; break; } i = (i + 1) & mask; } while (i != first); nbhs_exit_critsec(); NBHS__END(); return result; } void* nbhs_intern(NBHS* hs, void* val) { NBHS__BEGIN("intern"); assert(val); if (!nbhs_ebr_init) { NBHS__BEGIN("init"); nbhs_ebr_init = true; // add to ebr list, we never free this because i don't care NBHS_EBREntry* old; do { old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed); nbhs_ebr.next = old; } while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr)); nbhs_ebr.id = nbhs_ebr_count++; NBHS__END(); } // enter critical section, modifying the tables is possible now. nbhs_enter_critsec(); NBHS_Table* latest = atomic_load(&hs->latest); // if there's earlier versions of the table we can move up entries as we go along. NBHS_Table* prev = atomic_load(&latest->prev); if (prev) { prev = nbhs_move_items(hs, latest, prev, 32); if (prev == NULL) { latest = atomic_load(&hs->latest); } } void* result = nbhs_raw_intern(hs, latest, prev, val); @@ -341,4 +400,33 @@ void* nbhs_intern(NBHS* hs, void* val) { return result; } // waits for all items to be moved up before continuing void nbhs_resize_barrier(NBHS* hs) { NBHS__BEGIN("intern"); if (!nbhs_ebr_init) { NBHS__BEGIN("init"); nbhs_ebr_init = true; // add to ebr list, we never free this because i don't care NBHS_EBREntry* old; do { old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed); nbhs_ebr.next = old; } while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr)); nbhs_ebr.id = nbhs_ebr_count++; NBHS__END(); } // enter critical section, modifying the tables is possible now. nbhs_enter_critsec(); NBHS_Table *prev, *latest = atomic_load(&hs->latest); while (prev = atomic_load(&latest->prev), prev != NULL) { nbhs_move_items(hs, latest, prev, 1ull << prev->exp); } nbhs_exit_critsec(); NBHS__END(); } #endif // NBHS_IMPL -
RealNeGate revised this gist
Sep 26, 2024 . 1 changed file with 90 additions and 22 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,6 +1,7 @@ #define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <stdint.h> #include <string.h> #include <stddef.h> #include <stdlib.h> #include <assert.h> @@ -27,6 +28,46 @@ #define NBHS_IMPL #include "nbhs.h" static int num_threads; typedef struct { NBHS_Compare compare; NBHS_Hash hash; pthread_mutex_t lock; size_t exp; void* data[]; } LockedHS; void* lhs_intern(LockedHS* hs, void* val) { NBHS__BEGIN("intern"); if (num_threads > 1) { pthread_mutex_lock(&hs->lock); } // actually lookup & insert uint32_t exp = hs->exp; size_t mask = (1 << exp) - 1; void* result = NULL; uint32_t h = hs->hash(val); size_t first = h & mask, i = first; do { if (hs->data[i] == NULL) { hs->data[i] = result = val; break; } else if (hs->compare(hs->data[i], val)) { result = hs->data[i]; break; } i = (i + 1) & mask; } while (i != first); assert(result != NULL); if (num_threads > 1) { pthread_mutex_unlock(&hs->lock); } NBHS__END(); return result; } static uint32_t my_hash(const void* a) { const uint8_t* data = a; uint32_t h = 0x811C9DC5; @@ -48,8 +89,11 @@ uint32_t pcg32_pie(uint64_t *state) { return (xorshifted >> rot) | (xorshifted << ((-rot) & 31)); } static LockedHS* test_lhs; static NBHS test_set; static int attempts; // per thread static bool testing_lhs; static int* thread_stats; @@ -68,22 +112,33 @@ static void* test_thread_fn(void* arg) #endif int* stats = &thread_stats[starting_id*16]; uint32_t* arr = malloc(attempts * sizeof(uint32_t)); if (testing_lhs) { for (int i = 0; i < attempts; i++) { arr[i] = pcg32_pie(&seed) & 0xFFFF; if (lhs_intern(test_lhs, &arr[i]) == &arr[i]) { stats[0] += 1; // insertions } else { stats[1] += 1; // duplicate } } } else { for (int i = 0; i < attempts;) { int limit = i + 128; if (limit > attempts) { limit = attempts; } spall_auto_buffer_begin("batch", 5, NULL, 0); for (; i < limit; i++) { arr[i] = pcg32_pie(&seed) & 0xFFFF; if (nbhs_intern(&test_set, &arr[i]) == &arr[i]) { stats[0] += 1; // insertions } else { stats[1] += 1; // duplicate } } spall_auto_buffer_end(); } } #if USE_SPALL @@ -100,38 +155,51 @@ int main(int argc, char** argv) { spall_auto_thread_init(0, SPALL_DEFAULT_BUFFER_SIZE); #endif num_threads = atoi(argv[1]); printf("Testing with %d threads\n", num_threads); if (argc >= 3 && strcmp(argv[2], "lhs") == 0) { testing_lhs = true; printf(" With Locked hashset...\n"); } // attempts = 1000000000 / threads; attempts = 50000000 / num_threads; thread_stats = calloc(num_threads, 64 * sizeof(int)); if (testing_lhs) { test_lhs = calloc(sizeof(LockedHS) + 262144*sizeof(void*), 1); test_lhs->exp = 18; test_lhs->compare = my_cmp; test_lhs->hash = my_hash; } else { test_set = nbhs_alloc(32, NULL, NULL, my_cmp, my_hash); } #ifdef _WIN32 HANDLE* arr = malloc(num_threads * sizeof(HANDLE)); for (int i = 0; i < num_threads; i++) { arr[i] = (HANDLE) _beginthreadex(NULL, 0, test_thread_fn, (void*) (uintptr_t) i, 0, 0); } WaitForMultipleObjects(num_threads, arr, true, INFINITE); #else pthread_t* arr = malloc(num_threads * sizeof(pthread_t)); for (int i = 0; i < num_threads; i++) { pthread_create(&arr[i], NULL, test_thread_fn, (void*) (uintptr_t) i); } for (int i = 0; i < num_threads; i++) { pthread_join(arr[i], NULL); } #endif int inserted = 0, duplicates = 0; for (int i = 0; i < num_threads; i++) { inserted += thread_stats[i*16 + 0]; duplicates += thread_stats[i*16 + 1]; } printf("%d + %d = %d (needed %d)\n", inserted, duplicates, inserted + duplicates, attempts*num_threads); if (inserted + duplicates != attempts*num_threads) { printf("FAIL!\n"); abort(); } -
RealNeGate revised this gist
Sep 26, 2024 . 2 changed files with 65 additions and 17 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -112,6 +112,8 @@ static void* nbhs_raw_lookup(NBHS* hs, NBHS_Table* table, uint32_t h, void* val) return NULL; } // static _Atomic int reprobes[100]; static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, void* val) { // resize on 50% load factor and we're not in the moving process rn uint32_t threshold = (1 << latest->exp) / 2; @@ -132,14 +134,16 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi } } uintptr_t reserved_form = (uintptr_t)val | NBHS_RESERVE_BIT; // actually lookup & insert uint32_t exp = latest->exp; size_t mask = (1 << exp) - 1; void* result = NULL; uint32_t h = hs->hash(val); for (;;) { size_t first = h & mask, i = first; do { uintptr_t entry = atomic_load(&latest->data[i]); if (entry == 0) { @@ -337,4 +341,4 @@ void* nbhs_intern(NBHS* hs, void* val) { return result; } #endif // NBHS_IMPL This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -7,11 +7,15 @@ #include <stdbool.h> #include <stdatomic.h> #ifdef _WIN32 #define WIN32_LEAN_AND_MEAN #include <windows.h> #include <process.h> #else #include <pthread.h> #endif #define USE_SPALL 0 #if USE_SPALL #include "spall_native_auto.h" @@ -23,7 +27,16 @@ #define NBHS_IMPL #include "nbhs.h" static uint32_t my_hash(const void* a) { const uint8_t* data = a; uint32_t h = 0x811C9DC5; for (size_t i = 0; i < 4; i++) { h = (data[i] ^ h) * 0x01000193; } return h; // return (*(const uint32_t*)a * 11400714819323198485ULL) >> 32ull; } static bool my_cmp(const void* a, const void* b) { return *(const uint32_t*)a == *(const uint32_t*)b; } // https://github.com/demetri/scribbles/blob/master/randomness/prngs.c @@ -40,48 +53,76 @@ static int attempts; // per thread static int* thread_stats; #ifdef _WIN32 static unsigned int test_thread_fn(void* arg) #else static void* test_thread_fn(void* arg) #endif { uintptr_t starting_id = (uintptr_t) arg; uint64_t seed = starting_id * 11400714819323198485ULL; #if USE_SPALL spall_auto_thread_init(starting_id, SPALL_DEFAULT_BUFFER_SIZE); spall_auto_buffer_begin("work", 4, NULL, 0); #endif int* stats = &thread_stats[starting_id*16]; uint32_t* arr = malloc(attempts * sizeof(uint32_t)); for (int i = 0; i < attempts;) { int limit = i + 128; if (limit > attempts) { limit = attempts; } spall_auto_buffer_begin("batch", 5, NULL, 0); for (; i < limit; i++) { arr[i] = pcg32_pie(&seed) & 0xFFFF; if (nbhs_intern(&test_set, &arr[i]) == &arr[i]) { stats[0] += 1; // insertions } else { stats[1] += 1; // duplicate } } spall_auto_buffer_end(); } #if USE_SPALL spall_auto_buffer_end(); spall_auto_thread_quit(); #endif return 0; } int main(int argc, char** argv) { #if USE_SPALL spall_auto_init((char *)"profile.spall"); spall_auto_thread_init(0, SPALL_DEFAULT_BUFFER_SIZE); #endif int threads = atoi(argv[1]); printf("Testing with %d threads\n", threads); // attempts = 1000000000 / threads; attempts = 10000000 / threads; test_set = nbhs_alloc(32, NULL, NULL, my_cmp, my_hash); thread_stats = calloc(threads, 64 * sizeof(int)); #ifdef _WIN32 HANDLE* arr = malloc(threads * sizeof(HANDLE)); for (int i = 0; i < threads; i++) { arr[i] = (HANDLE) _beginthreadex(NULL, 0, test_thread_fn, (void*) (uintptr_t) i, 0, 0); } WaitForMultipleObjects(threads, arr, true, INFINITE); #else pthread_t* arr = malloc(threads * sizeof(pthread_t)); for (int i = 0; i < threads; i++) { pthread_create(&arr[i], NULL, test_thread_fn, (void*) (uintptr_t) i); } for (int i = 0; i < threads; i++) { pthread_join(arr[i], NULL); } #endif int inserted = 0, duplicates = 0; for (int i = 0; i < threads; i++) { @@ -95,12 +136,15 @@ int main(int argc, char** argv) { abort(); } #if USE_SPALL spall_auto_thread_quit(); spall_auto_quit(); #endif return 0; } #if USE_SPALL #define SPALL_AUTO_IMPLEMENTATION #include "spall_native_auto.h" #endif -
RealNeGate revised this gist
Sep 24, 2024 . 1 changed file with 9 additions and 5 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -11,9 +11,14 @@ #include <windows.h> #include <process.h> #define USE_SPALL 1 #if USE_SPALL #include "spall_native_auto.h" #else #define spall_auto_buffer_begin(...) #define spall_auto_buffer_end(...) #endif #define NBHS_IMPL #include "nbhs.h" @@ -95,8 +100,7 @@ int main(int argc, char** argv) { return 0; } #if USE_SPALL #define SPALL_AUTO_IMPLEMENTATION #include "spall_native_auto.h" #endif -
RealNeGate revised this gist
Sep 24, 2024 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -188,6 +188,7 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, voi } latest = new_latest; prev = atomic_load(&latest->prev); } } -
RealNeGate revised this gist
Sep 24, 2024 . 1 changed file with 10 additions and 10 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -58,18 +58,18 @@ struct NBHS_Table { _Atomic(uintptr_t) data[]; }; typedef struct NBHS_EBREntry { _Atomic(struct NBHS_EBREntry*) next; _Atomic(uint64_t) time; // keep on a separate cacheline to avoid false sharing _Alignas(64) int id; } NBHS_EBREntry; static _Thread_local bool nbhs_ebr_init; static _Thread_local NBHS_EBREntry nbhs_ebr; static _Atomic(int) nbhs_ebr_count; static _Atomic(NBHS_EBREntry*) nbhs_ebr_list; static void* nbhs__alloc_zero_mem(size_t s) { return calloc(1, s); } static void nbhs__free_mem(void* ptr, size_t s) { free(ptr); } @@ -231,7 +231,7 @@ void* nbhs_intern(NBHS* hs, void* val) { nbhs_ebr_init = true; // add to ebr list, we never free this because i don't care NBHS_EBREntry* old; do { old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed); nbhs_ebr.next = old; @@ -295,16 +295,16 @@ void* nbhs_intern(NBHS* hs, void* val) { // check current state, once the other threads either advance or aren't in the // lookup function we know we can free. NBHS_EBREntry* us = &nbhs_ebr; for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) { // mark sure no ptrs refer to prev if (us != list && list->id < state_count) { states[list->id] = list->time; } } // wait on each and every thread to make progress or not be in a critical section at the time. for (NBHS_EBREntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) { // mark sure no ptrs refer to prev if (us != list && list->id < state_count && (states[list->id] & NBHS_LOCKED_BIT)) { uint64_t before_t = states[list->id], now_t; @@ -336,4 +336,4 @@ void* nbhs_intern(NBHS* hs, void* val) { return result; } #endif // NBHS_IMPL -
RealNeGate revised this gist
Sep 24, 2024 . 2 changed files with 108 additions and 94 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -34,13 +34,16 @@ void* nbhs_intern(NBHS* hs, void* val); #define NBHS__BEGIN(name) #define NBHS__END() #else #define NBHS__BEGIN(name) spall_auto_buffer_begin(name, sizeof(name) - 1, NULL, 0) #define NBHS__END() spall_auto_buffer_end() #endif // private constants (idk if i wanna undef these but don't touch i guess?) #define NBHS_RESERVE_BIT (1ull << 63ull) // for the time in the ebr entry #define NBHS_LOCKED_BIT (1ull << 63ull) struct NBHS_Table { _Atomic(NBHS_Table*) prev; @@ -55,25 +58,18 @@ struct NBHS_Table { _Atomic(uintptr_t) data[]; }; typedef struct NBHS_HazardEntry { _Atomic(struct NBHS_HazardEntry*) next; _Atomic(uint64_t) time; // keep on a separate cacheline to avoid false sharing _Alignas(64) int id; } NBHS_HazardEntry; static _Thread_local bool nbhs_ebr_init; static _Thread_local NBHS_HazardEntry nbhs_ebr; static _Atomic(int) nbhs_ebr_count; static _Atomic(NBHS_HazardEntry*) nbhs_ebr_list; static void* nbhs__alloc_zero_mem(size_t s) { return calloc(1, s); } static void nbhs__free_mem(void* ptr, size_t s) { free(ptr); } @@ -116,40 +112,24 @@ static void* nbhs_raw_lookup(NBHS* hs, NBHS_Table* table, uint32_t h, void* val) return NULL; } static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, NBHS_Table* prev, void* val) { // resize on 50% load factor and we're not in the moving process rn uint32_t threshold = (1 << latest->exp) / 2; if (__builtin_expect(latest->count >= threshold && prev == NULL, 0)) { // make resized table, we'll amortize the moves upward size_t new_cap = 1ull << (latest->exp + 1); NBHS_Table* new_top = hs->alloc_mem(sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t)); new_top->exp = latest->exp + 1; // CAS latest -> new_table, if another thread wins the race we'll use its table new_top->prev = latest; if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) { hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t)); } else { prev = latest; latest = new_top; } } // actually lookup & insert @@ -163,35 +143,31 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, void* val, int hazard do { uintptr_t entry = atomic_load(&latest->data[i]); if (entry == 0) { // we insert the reserved_form to lock the entry until we've checked // if the entry already existed in the previous table, if there's no // previous table we don't need this. uintptr_t to_write = prev ? reserved_form : (uintptr_t) val; if (atomic_compare_exchange_strong(&latest->data[i], &entry, to_write)) { // count doesn't care that it's a migration, it's at least not replacing an existing // slot in this version of the table. atomic_fetch_add_explicit(&latest->count, 1, memory_order_relaxed); if (prev != NULL) { // should only be one previous table assert(prev->prev == NULL); void* old = nbhs_raw_lookup(hs, prev, h, val); if (old != NULL) { // migrate the entry up atomic_exchange(&latest->data[i], (uintptr_t) old); result = old; break; } } // we're the first occurrence atomic_exchange(&latest->data[i], (uintptr_t) val); result = val; break; } } @@ -206,16 +182,11 @@ static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, void* val, int hazard // if the table changed before our eyes, it means someone resized which sucks // but it just means we need to retry NBHS_Table* new_latest = atomic_load(&hs->latest); if (latest == new_latest && result != NULL) { return result; } latest = new_latest; } } @@ -248,26 +219,34 @@ void nbhs_free(NBHS* hs) { } } // flips the top bit on static void nbhs_enter_critsec(void) { atomic_fetch_add(&nbhs_ebr.time, NBHS_LOCKED_BIT); } // flips the top bit off AND increments time by one static void nbhs_exit_critsec(void) { atomic_fetch_add(&nbhs_ebr.time, NBHS_LOCKED_BIT + 1); } void* nbhs_intern(NBHS* hs, void* val) { NBHS__BEGIN("intern"); if (!nbhs_ebr_init) { NBHS__BEGIN("init"); nbhs_ebr_init = true; // add to ebr list, we never free this because i don't care NBHS_HazardEntry* old; do { old = atomic_load_explicit(&nbhs_ebr_list, memory_order_relaxed); nbhs_ebr.next = old; } while (!atomic_compare_exchange_strong(&nbhs_ebr_list, &old, &nbhs_ebr)); nbhs_ebr.id = nbhs_ebr_count++; NBHS__END(); } // enter critical section, modifying the tables is possible now. nbhs_enter_critsec(); NBHS_Table* latest = atomic_load(&hs->latest); // if there's earlier versions of the table we can move up entries as we go along. NBHS_Table* prev = atomic_load(&latest->prev); if (prev) { size_t cap = 1ull << prev->exp; @@ -290,7 +269,8 @@ void* nbhs_intern(NBHS* hs, void* val) { } if (decision) { // we pass NULL for prev since we already know the entries exist in prev nbhs_raw_intern(hs, latest, NULL, (void*) decision); } } NBHS__END(); @@ -306,31 +286,54 @@ void* nbhs_intern(NBHS* hs, void* val) { assert(prev->prev == NULL); latest->prev = NULL; // since we're freeing at the moment, we don't want to block up other freeing threads nbhs_exit_critsec(); NBHS__BEGIN("scan"); int state_count = nbhs_ebr_count; uint64_t* states = hs->alloc_mem(state_count * sizeof(uint64_t)); // check current state, once the other threads either advance or aren't in the // lookup function we know we can free. NBHS_HazardEntry* us = &nbhs_ebr; for (NBHS_HazardEntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) { // mark sure no ptrs refer to prev if (us != list && list->id < state_count) { states[list->id] = list->time; } } // wait on each and every thread to make progress or not be in a critical section at the time. for (NBHS_HazardEntry* list = atomic_load(&nbhs_ebr_list); list; list = list->next) { // mark sure no ptrs refer to prev if (us != list && list->id < state_count && (states[list->id] & NBHS_LOCKED_BIT)) { uint64_t before_t = states[list->id], now_t; do { // idk, maybe this should be a better spinlock now_t = atomic_load(&list->time); } while (before_t == now_t); } } hs->free_mem(states, state_count * sizeof(uint64_t)); NBHS__END(); // no more refs, we can immediately free hs->free_mem(prev, sizeof(NBHS_Table) + (1ull<<prev->exp)*sizeof(uintptr_t)); nbhs_enter_critsec(); prev = NULL; NBHS__END(); } skip:; } void* result = nbhs_raw_intern(hs, latest, prev, val); nbhs_exit_critsec(); NBHS__END(); return result; } #endif // NBHS_IMPL This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -31,24 +31,26 @@ uint32_t pcg32_pie(uint64_t *state) { } static NBHS test_set; static int attempts; // per thread static int* thread_stats; static unsigned int test_thread_fn(void* arg) { uintptr_t starting_id = (uintptr_t) arg; uint64_t seed = starting_id * 11400714819323198485ULL; spall_auto_thread_init(starting_id, SPALL_DEFAULT_BUFFER_SIZE); spall_auto_buffer_begin("work", 4, NULL, 0); int* stats = &thread_stats[starting_id*16]; uint32_t* arr = malloc(attempts * sizeof(uint32_t)); for (int i = 0; i < attempts; i++) { arr[i] = pcg32_pie(&seed) & 0xFFFF; if (nbhs_intern(&test_set, &arr[i]) == &arr[i]) { stats[0] += 1; // insertions } else { stats[1] += 1; // duplicate } } @@ -64,15 +66,24 @@ int main(int argc, char** argv) { int threads = atoi(argv[1]); printf("Testing with %d threads\n", threads); // attempts = 1000000000 / threads; attempts = 10000000 / threads; test_set = nbhs_alloc(32, NULL, NULL, my_cmp, my_hash); HANDLE* arr = malloc(threads * sizeof(HANDLE)); thread_stats = calloc(threads, 64 * sizeof(int)); for (int i = 0; i < threads; i++) { arr[i] = (HANDLE) _beginthreadex(NULL, 0, test_thread_fn, (void*) (uintptr_t) i, 0, 0); } WaitForMultipleObjects(threads, arr, true, INFINITE); int inserted = 0, duplicates = 0; for (int i = 0; i < threads; i++) { inserted += thread_stats[i*16 + 0]; duplicates += thread_stats[i*16 + 1]; } printf("%d + %d = %d (needed %d)\n", inserted, duplicates, inserted + duplicates, attempts*threads); if (inserted + duplicates != attempts*threads) { printf("FAIL!\n"); -
RealNeGate revised this gist
Apr 4, 2024 . 2 changed files with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes.File renamed without changes. -
RealNeGate revised this gist
Apr 3, 2024 . 1 changed file with 336 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,336 @@ //////////////////////////////// // NBHS - Non-blocking hashset //////////////////////////////// // You wanna intern lots of things on lots of cores? this is for you. It's // inspired by Cliff's non-blocking hashmap. #ifndef NBHS_H #define NBHS_H typedef void* (*NBHS_AllocZeroMem)(size_t size); typedef void (*NBHS_FreeMem)(void* ptr, size_t size); typedef uint32_t (*NBHS_Hash)(const void* a); typedef bool (*NBHS_Compare)(const void* a, const void* b); typedef struct NBHS_Table NBHS_Table; typedef struct { NBHS_AllocZeroMem alloc_mem; NBHS_FreeMem free_mem; NBHS_Compare compare; NBHS_Hash hash; _Atomic(NBHS_Table*) latest; } NBHS; NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem free_mem, NBHS_Compare compare, NBHS_Hash hash); void nbhs_free(NBHS* hs); void* nbhs_intern(NBHS* hs, void* val); #endif // NBHS_H #ifdef NBHS_IMPL // personal debooging stuff #if 0 #define NBHS__BEGIN(name) #define NBHS__END() #else #define NBHS__BEGIN(name) spall_auto_buffer_begin(#name, sizeof(#name) - 1, NULL, 0) #define NBHS__END() spall_auto_buffer_end() #endif // private constants (idk if i wanna undef these but don't touch i guess?) #define NBHS_RESERVE_BIT (1ull << 63ull) struct NBHS_Table { _Atomic(NBHS_Table*) prev; uint32_t exp; // tracks how many entries have // been moved once we're resizing _Atomic uint32_t moved; _Atomic uint32_t move_done; _Atomic uint32_t count; _Atomic(uintptr_t) data[]; }; // single hazard pointer per thread typedef struct NBHS_HazardEntry { _Atomic(struct NBHS_HazardEntry*) next; // normal insertion and work: // 0 - latest // 1 - prev // inside the prev moving code, it does an insert so // it needs separate hazard entries: // 2 - latest // 3 - prev // misc: // 4 - temporary _Atomic(NBHS_Table*) ptr[5]; } NBHS_HazardEntry; static _Thread_local bool nbhs_hazard_init; static _Thread_local NBHS_HazardEntry nbhs_hazard; static _Atomic(NBHS_HazardEntry*) nbhs_hazard_list; static void* nbhs__alloc_zero_mem(size_t s) { return calloc(1, s); } static void nbhs__free_mem(void* ptr, size_t s) { free(ptr); } static void* nbhs_try_entry(NBHS* hs, NBHS_Table* table, int i, void* val, uintptr_t entry) { void* entry_p = (void*) (entry & ~NBHS_RESERVE_BIT); if (hs->compare(entry_p, val)) { if ((entry & NBHS_RESERVE_BIT) == 0) { // not reserved, yay return entry_p; } else { // either NULL or complete can go thru without waiting uintptr_t decision; while (decision = atomic_load(&table->data[i]), decision & NBHS_RESERVE_BIT) { // idk if i should do a nicer wait than a spin-lock } return (void*) decision; } } else { return NULL; } } static void* nbhs_raw_lookup(NBHS* hs, NBHS_Table* table, uint32_t h, void* val) { size_t mask = (1 << table->exp) - 1; size_t first = h & mask, i = first; do { uintptr_t entry = atomic_load(&table->data[i]); if (entry == 0) { return NULL; } void* k = nbhs_try_entry(hs, table, i, val, entry); if (k != NULL) { return k; } i = (i + 1) & mask; } while (i != first); return NULL; } static NBHS_Table* nbhs_hazard_access(_Atomic(NBHS_Table*)* table, int slot) { NBHS_Table* val = atomic_load(table); for (;;) { // mark as hazard nbhs_hazard.ptr[slot] = val; // check if it's been invalidated since the previous load, if so // then undo hazard and try load again. NBHS_Table* after = atomic_load(table); if (val == after) { return val; } val = after; } } static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, void* val, int hazard_slot) { // resize on 50% load factor and we're not in the moving process rn uint32_t threshold = (1 << latest->exp) / 2; if (latest->count >= threshold && atomic_load(&latest->prev) == NULL) { // make resized table, we'll amortize the moves upward size_t new_cap = 1ull << (latest->exp + 1); NBHS_Table* new_top = hs->alloc_mem(sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t)); new_top->exp = latest->exp + 1; // CAS latest -> new_table, if another thread wins the race we'll use its table new_top->prev = latest; if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) { hs->free_mem(new_top, sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t)); } else { latest = new_top; } nbhs_hazard.ptr[hazard_slot] = latest; } // actually lookup & insert uint32_t h = hs->hash(val); uintptr_t reserved_form = (uintptr_t)val | NBHS_RESERVE_BIT; void* result = NULL; for (;;) { size_t mask = (1 << latest->exp) - 1; size_t first = h & mask, i = first; do { uintptr_t entry = atomic_load(&latest->data[i]); if (entry == 0) { // if we spot a NULL, the entry has never been in this table, that doesn't mean // it's not appeared already so we'll only reserve the slot until we figure that // out. // // everyone else will see our pointer marked as reserved and are free to wait on // us if they match, if not they can move along. if (atomic_compare_exchange_strong(&latest->data[i], &entry, reserved_form)) { void* old = NULL; NBHS_Table* curr = nbhs_hazard_access(&latest->prev, hazard_slot + 1); if (curr != NULL) { // should only be one previous table assert(curr->prev == NULL); old = nbhs_raw_lookup(hs, curr, h, val); nbhs_hazard.ptr[hazard_slot + 1] = NULL; } // count doesn't care that it's a migration, it's at least not replacing an existing // slot in this version of the table. atomic_fetch_add_explicit(&latest->count, 1, memory_order_relaxed); if (old != NULL) { atomic_exchange(&latest->data[i], (uintptr_t) old); result = old; } else { // no entry was found, good (reserved -> val) atomic_exchange(&latest->data[i], (uintptr_t) val); result = val; } break; } } void* k = nbhs_try_entry(hs, latest, i, val, entry); if (k != NULL) { return k; } i = (i + 1) & mask; } while (i != first); // if the table changed before our eyes, it means someone resized which sucks // but it just means we need to retry NBHS_Table* new_latest = nbhs_hazard_access(&hs->latest, 4); if (latest == new_latest && result != NULL) { nbhs_hazard.ptr[hazard_slot] = NULL; nbhs_hazard.ptr[4] = NULL; return result; } // move to the correct hazard slot nbhs_hazard.ptr[hazard_slot] = new_latest; nbhs_hazard.ptr[4] = NULL; latest = new_latest; } } NBHS nbhs_alloc(size_t initial_cap, NBHS_AllocZeroMem alloc_mem, NBHS_FreeMem free_mem, NBHS_Compare compare, NBHS_Hash hash) { if (alloc_mem == NULL) { assert(free_mem == NULL); alloc_mem = nbhs__alloc_zero_mem; free_mem = nbhs__free_mem; } size_t exp = 64 - __builtin_clzll(initial_cap - 1); NBHS_Table* table = alloc_mem(sizeof(NBHS_Table) + (1ull << exp)*sizeof(uintptr_t)); table->exp = exp; return (NBHS){ .alloc_mem = alloc_mem, .free_mem = free_mem, .compare = compare, .hash = hash, .latest = table }; } void nbhs_free(NBHS* hs) { NBHS_Table* curr = hs->latest; while (curr) { NBHS_Table* next = curr->prev; hs->free_mem(curr, sizeof(NBHS_Table) + (1ull*curr->exp)*sizeof(uintptr_t)); curr = next; } } void* nbhs_intern(NBHS* hs, void* val) { NBHS__BEGIN("intern"); if (!nbhs_hazard_init) { NBHS__BEGIN("init"); nbhs_hazard_init = true; // add to hazard list, we never free this because i don't care NBHS_HazardEntry* old; do { old = atomic_load_explicit(&nbhs_hazard_list, memory_order_relaxed); nbhs_hazard.next = old; } while (!atomic_compare_exchange_strong(&nbhs_hazard_list, &old, &nbhs_hazard)); NBHS__END(); } NBHS_Table* latest = nbhs_hazard_access(&hs->latest, 0); // if there's earlier versions of the table we can move up entries as we go // along. NBHS_Table* prev = nbhs_hazard_access(&latest->prev, 1); if (prev) { size_t cap = 1ull << prev->exp; // snatch up some number of items uint32_t old, new; do { old = atomic_load(&prev->moved); if (old == cap) { goto skip; } // cap the number of items to copy... by the cap new = old + 32; if (new > cap) { new = cap; } } while (!atomic_compare_exchange_strong(&prev->moved, &(uint32_t){ old }, new)); NBHS__BEGIN("copying old"); for (size_t i = old; i < new; i++) { // either NULL or complete can go thru without waiting uintptr_t decision; while (decision = atomic_load(&prev->data[i]), decision & NBHS_RESERVE_BIT) { // idk if i should do a nicer wait than a spin-lock } if (decision) { nbhs_raw_intern(hs, latest, (void*) decision, 2); } } NBHS__END(); uint32_t done = atomic_fetch_add(&prev->move_done, new - old); done += new - old; assert(done <= cap); if (done == cap) { // dettach now NBHS__BEGIN("detach"); assert(prev->prev == NULL); latest->prev = NULL; NBHS__BEGIN("scan"); // wait for all refs to stop holding on (just read the hazards until they don't match) NBHS_HazardEntry* us = &nbhs_hazard; for (NBHS_HazardEntry* list = atomic_load(&nbhs_hazard_list); list; list = list->next) { // mark sure no ptrs refer to prev if (us != list) for (size_t i = 0; i < (sizeof(list->ptr)/sizeof(void*)); i++) { while (list->ptr[i] == prev) { // spin-lock } } } NBHS__END(); // no one's referring to it and no one will ever refer to it again hs->free_mem(prev, sizeof(NBHS_Table) + (1ull<<prev->exp)*sizeof(uintptr_t)); NBHS__END(); } skip:; } nbhs_hazard.ptr[1] = NULL; // ok it can be freed now void* result = nbhs_raw_intern(hs, latest, val, 0); NBHS__END(); return result; } #endif // NBHS_IMPL -
RealNeGate revised this gist
Apr 3, 2024 . 1 changed file with 7 additions and 281 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -15,288 +15,14 @@ // #define spall_auto_buffer_begin(...) // #define spall_auto_buffer_end(...) #define NBHS_IMPL #include "nbhs.h" static uint32_t my_hash(const void* a) { return (*(const uint32_t*)a * 11400714819323198485ULL) >> 32ull; } static bool my_cmp(const void* a, const void* b) { return *(const uint32_t*)a == *(const uint32_t*)b; } // https://github.com/demetri/scribbles/blob/master/randomness/prngs.c uint32_t pcg32_pie(uint64_t *state) { uint64_t old = *state ^ 0xc90fdaa2adf85459ULL; *state = *state * 6364136223846793005ULL + 0xc90fdaa2adf85459ULL; uint32_t xorshifted = ((old >> 18u) ^ old) >> 27u; @@ -309,7 +35,7 @@ static _Atomic int inserted, duplicates; static int attempts; // per thread static unsigned int test_thread_fn(void* arg) { uintptr_t starting_id = (uintptr_t) arg; uint64_t seed = starting_id * 11400714819323198485ULL; @@ -339,7 +65,7 @@ int main(int argc, char** argv) { printf("Testing with %d threads\n", threads); attempts = 4000000 / threads; test_set = nbhs_alloc(32, NULL, NULL, my_cmp, my_hash); HANDLE* arr = malloc(threads * sizeof(HANDLE)); for (int i = 0; i < threads; i++) { -
RealNeGate created this gist
Apr 3, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,365 @@ #define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <stdint.h> #include <stddef.h> #include <stdlib.h> #include <assert.h> #include <stdbool.h> #include <stdatomic.h> #define WIN32_LEAN_AND_MEAN #include <windows.h> #include <process.h> #include "spall_native_auto.h" // #define spall_auto_buffer_begin(...) // #define spall_auto_buffer_end(...) #define NBHS_RESERVE_BIT (1ull << 63ull) typedef struct NBHS_Table { _Atomic(struct NBHS_Table*) prev; uint32_t exp; // tracks how many entries have // been moved once we're resizing _Atomic uint32_t moved; _Atomic uint32_t move_done; _Atomic uint32_t count; _Atomic(uintptr_t) data[]; } NBHS_Table; // non-blocking hashset typedef struct { _Atomic(NBHS_Table*) latest; } NBHS; // single hazard pointer per thread typedef struct NBHS_HazardEntry { _Atomic(struct NBHS_HazardEntry*) next; // normal insertion and work: // 0 - latest // 1 - prev // inside the prev moving code, it does an insert so // it needs separate hazard entries: // 2 - latest // 3 - prev // misc: // 4 - temporary _Atomic(NBHS_Table*) ptr[5]; } NBHS_HazardEntry; static _Thread_local bool nbhs_hazard_init; static _Thread_local NBHS_HazardEntry nbhs_hazard; static _Atomic(NBHS_HazardEntry*) nbhs_hazard_list; void* nbhs_alloc_zero_mem(size_t s) { return calloc(1, s); } void nbhs_free_mem(void* ptr) { free(ptr); } // User-defined uint32_t nbhs_hash(void* a) { return (*(uint32_t*)a * 11400714819323198485ULL) >> 32ull; } bool nbhs_cmp(void* a, void* b) { return *(uint32_t*)a == *(uint32_t*)b; } static void* nbhs_resolved_entry(NBHS_Table* table, size_t i) { // either NULL or complete can go thru without waiting uintptr_t decision; while (decision = atomic_load(&table->data[i]), decision & NBHS_RESERVE_BIT) { // idk if i should do a nicer wait than a spin-lock } return (void*) decision; } static void* nbhs_raw_lookup(NBHS_Table* table, uint32_t h, void* val) { size_t mask = (1 << table->exp) - 1; size_t first = h & mask, i = first; do { uintptr_t entry = atomic_load(&table->data[i]); if (entry == 0) { return NULL; } void* entry_p = (void*) (entry & ~NBHS_RESERVE_BIT); if (nbhs_cmp(entry_p, val)) { if ((entry & NBHS_RESERVE_BIT) == 0) { // not reserved, yay return entry_p; } else { return nbhs_resolved_entry(table, i); } } i = (i + 1) & mask; } while (i != first); return NULL; } static NBHS_Table* nbhs_hazard_access(_Atomic(NBHS_Table*)* table, int slot) { NBHS_Table* val = atomic_load(table); for (;;) { // mark as hazard nbhs_hazard.ptr[slot] = val; // check if it's been invalidated since the previous load, if so // then undo hazard and try load again. NBHS_Table* after = atomic_load(table); if (val == after) { return val; } val = after; } } static void* nbhs_raw_intern(NBHS* hs, NBHS_Table* latest, void* val, int hazard_slot) { // resize on 50% load factor and we're not in the moving process rn uint32_t threshold = (1 << latest->exp) / 2; if (latest->count >= threshold && atomic_load(&latest->prev) == NULL) { // make resized table, we'll amortize the moves upward size_t new_cap = 1ull << (latest->exp + 1); NBHS_Table* new_top = nbhs_alloc_zero_mem(sizeof(NBHS_Table) + new_cap*sizeof(uintptr_t)); new_top->exp = latest->exp + 1; // CAS latest -> new_table, if another thread wins the race we'll use its table new_top->prev = latest; if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) { nbhs_free_mem(new_top); } else { latest = new_top; } nbhs_hazard.ptr[hazard_slot] = latest; } // actually lookup & insert uint32_t h = nbhs_hash(val); uintptr_t reserved_form = (uintptr_t)val | NBHS_RESERVE_BIT; void* result = NULL; for (;;) { size_t mask = (1 << latest->exp) - 1; size_t first = h & mask, i = first; do { uintptr_t entry = atomic_load(&latest->data[i]); if (entry == 0) { // if we spot a NULL, the entry has never been in this table, that doesn't mean // it's not appeared already so we'll only reserve the slot until we figure that // out. if (atomic_compare_exchange_strong(&latest->data[i], &entry, reserved_form)) { void* old = NULL; NBHS_Table* curr = nbhs_hazard_access(&latest->prev, hazard_slot + 1); if (curr != NULL) { // should only be one previous table assert(curr->prev == NULL); old = nbhs_raw_lookup(curr, h, val); nbhs_hazard.ptr[hazard_slot + 1] = NULL; } // count doesn't care that it's a migration, it's at least not replacing an existing // slot in this version of the table. atomic_fetch_add_explicit(&latest->count, 1, memory_order_relaxed); if (old != NULL) { atomic_exchange(&latest->data[i], (uintptr_t) old); result = old; } else { // no entry was found, good (reserved -> val) atomic_exchange(&latest->data[i], (uintptr_t) val); result = val; } break; } } void* entry_p = (void*) (entry & ~NBHS_RESERVE_BIT); if (nbhs_cmp(entry_p, val)) { if ((entry & NBHS_RESERVE_BIT) == 0) { result = entry_p; } else { result = nbhs_resolved_entry(latest, i); } break; } i = (i + 1) & mask; } while (i != first); // if the table changed before our eyes, it means someone resized which sucks // but it just means we need to retry NBHS_Table* new_latest = nbhs_hazard_access(&hs->latest, 4); if (latest == new_latest && result != NULL) { nbhs_hazard.ptr[hazard_slot] = NULL; nbhs_hazard.ptr[4] = NULL; return result; } // move to the correct hazard slot nbhs_hazard.ptr[hazard_slot] = new_latest; nbhs_hazard.ptr[4] = NULL; latest = new_latest; } } NBHS nbhs_alloc(size_t initial_cap) { size_t exp = 64 - __builtin_clzll(initial_cap - 1); NBHS_Table* table = nbhs_alloc_zero_mem(sizeof(NBHS_Table) + (1ull << exp)*sizeof(uintptr_t)); table->exp = exp; return (NBHS){ table }; } void nbhs_free(NBHS* hs) { NBHS_Table* curr = hs->latest; while (curr) { NBHS_Table* next = curr->prev; nbhs_free_mem(curr); curr = next; } } void* nbhs_intern(NBHS* hs, void* val) { spall_auto_buffer_begin("intern", 6, NULL, 0); if (!nbhs_hazard_init) { spall_auto_buffer_begin("init", 4, NULL, 0); nbhs_hazard_init = true; // add to hazard list, we never free this because i don't care NBHS_HazardEntry* old; do { old = atomic_load_explicit(&nbhs_hazard_list, memory_order_relaxed); nbhs_hazard.next = old; } while (!atomic_compare_exchange_strong(&nbhs_hazard_list, &old, &nbhs_hazard)); spall_auto_buffer_end(); } NBHS_Table* latest = nbhs_hazard_access(&hs->latest, 0); // if there's earlier versions of the table we can move up entries as we go // along. NBHS_Table* prev = nbhs_hazard_access(&latest->prev, 1); if (prev) { size_t cap = 1ull << prev->exp; // snatch up some number of items uint32_t old, new; do { old = atomic_load(&prev->moved); if (old == cap) { goto skip; } // cap the number of items to copy... by the cap new = old + 32; if (new > cap) { new = cap; } } while (!atomic_compare_exchange_strong(&prev->moved, &(uint32_t){ old }, new)); spall_auto_buffer_begin("copying old", 11, NULL, 0); for (size_t i = old; i < new; i++) { void* decision = nbhs_resolved_entry(prev, i); if (decision) { nbhs_raw_intern(hs, latest, decision, 2); } } spall_auto_buffer_end(); uint32_t done = atomic_fetch_add(&prev->move_done, new - old); done += new - old; assert(done <= cap); if (done == cap) { // dettach now spall_auto_buffer_begin("detach", 6, NULL, 0); assert(prev->prev == NULL); latest->prev = NULL; spall_auto_buffer_begin("scan", 4, NULL, 0); // wait for all refs to stop holding on (just read the hazards until they don't match) NBHS_HazardEntry* us = &nbhs_hazard; for (NBHS_HazardEntry* list = atomic_load(&nbhs_hazard_list); list; list = list->next) { // mark sure no ptrs refer to prev if (us != list) for (size_t i = 0; i < (sizeof(list->ptr)/sizeof(void*)); i++) { while (list->ptr[i] == prev) { // spin-lock } } } spall_auto_buffer_end(); // no one's referring to it and no one will ever refer to it again nbhs_free_mem(prev); spall_auto_buffer_end(); } skip:; } nbhs_hazard.ptr[1] = NULL; // ok it can be freed now void* result = nbhs_raw_intern(hs, latest, val, 0); spall_auto_buffer_end(); return result; } // https://github.com/demetri/scribbles/blob/master/randomness/prngs.c uint32_t pcg32_pie (uint64_t *state) { uint64_t old = *state ^ 0xc90fdaa2adf85459ULL; *state = *state * 6364136223846793005ULL + 0xc90fdaa2adf85459ULL; uint32_t xorshifted = ((old >> 18u) ^ old) >> 27u; uint32_t rot = old >> 59u; return (xorshifted >> rot) | (xorshifted << ((-rot) & 31)); } static NBHS test_set; static _Atomic int inserted, duplicates; static int attempts; // per thread unsigned int test_thread_fn(void* arg) { uintptr_t starting_id = (uintptr_t) arg; uint64_t seed = starting_id * 11400714819323198485ULL; spall_auto_thread_init(starting_id, SPALL_DEFAULT_BUFFER_SIZE); spall_auto_buffer_begin("work", 4, NULL, 0); uint32_t* arr = malloc(attempts * sizeof(uint32_t)); for (int i = 0; i < attempts; i++) { arr[i] = pcg32_pie(&seed) & 0xFFFFF; if (nbhs_intern(&test_set, &arr[i]) == &arr[i]) { inserted += 1; } else { duplicates += 1; } } spall_auto_buffer_end(); spall_auto_thread_quit(); return 0; } int main(int argc, char** argv) { spall_auto_init((char *)"profile.spall"); spall_auto_thread_init(0, SPALL_DEFAULT_BUFFER_SIZE); int threads = atoi(argv[1]); printf("Testing with %d threads\n", threads); attempts = 4000000 / threads; test_set = nbhs_alloc(32); HANDLE* arr = malloc(threads * sizeof(HANDLE)); for (int i = 0; i < threads; i++) { arr[i] = (HANDLE) _beginthreadex(NULL, 0, test_thread_fn, (void*) (uintptr_t) i, 0, 0); } WaitForMultipleObjects(threads, arr, true, INFINITE); printf("%d + %d = %d (needed %d)\n", inserted, duplicates, inserted + duplicates, attempts*threads); if (inserted + duplicates != attempts*threads) { printf("FAIL!\n"); abort(); } spall_auto_thread_quit(); spall_auto_quit(); return 0; } // #undef spall_auto_buffer_begin // #undef spall_auto_buffer_end #define SPALL_AUTO_IMPLEMENTATION #include "spall_native_auto.h"