-
-
Save aerosayan/fa00ee2dfc54c56da2c00539cb092e12 to your computer and use it in GitHub Desktop.
Revisions
-
vurtun revised this gist
Sep 19, 2019 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -130,7 +130,7 @@ bit_clear(u64 *bitset, unsigned int start, int len) * --------------------------------------------------------------*/ static void spinlock_begin(volatile u32 *spinlock) {while (__sync_val_compare_and_swap(spinlock, 0, 1) != 0) _mm_pause();} static void spinlock_end(volatile u32 *spinlock) -
vurtun revised this gist
May 16, 2017 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -53,7 +53,7 @@ static u64 bit_find_first_set(const u64 *addr, u64 size) { u64 idx; #define ffs(x) (u64)__builtin_ffsl((long int)(x)) for (idx = 0; idx * BITS_PER_LONG < size; idx++) { if (addr[idx]) { u64 first_set = ffs(addr[idx]) - 1; -
vurtun revised this gist
Mar 27, 2017 . 1 changed file with 84 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,84 @@ /* -------------------------------------------------------------- * * TEST * * --------------------------------------------------------------*/ struct game_state { int data; struct memory memory; struct allocator game_alloc; struct allocator gpu_alloc; struct allocator render_alloc; }; struct test_data { struct allocator *alloc; int *data; int from; int to; }; JOB_ENTRY_POINT(test_work) { int i; void *mem; struct test_data *data = arg; mem = alloc(data->alloc, scheduler_self_id(sched), 1024); for (i = data->from; i < data->to; ++i) data->data[i] = i; printf("sleep begin\n"); sleep(1); printf("sleep end\n"); } JOB_ENTRY_POINT(root) { struct game_state *game = arg; job_counter counter = 0; struct job jobs[8]; struct test_data data[8]; int i, n[2*1024]; printf("root\n"); for (i = 0; i < 8; ++i) { data[i].alloc = (i&1) ? &game->game_alloc: &game->render_alloc; data[i].data = n; data[i].from = i * 256; data[i].to = (i+1)*256; jobs[i] = Job(test_work, &data); } scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 8, &counter); printf("run\n"); scheduler_wait_for(sched, &counter, 0); mem_free(&game->memory, MEMORY_TAG_GAME); mem_free(&game->memory, MEMORY_TAG_RENDER); mem_free(&game->memory, MEMORY_TAG_GPU); printf("done\n"); } int main(int argc, char **argv) { /* setup app memory and allocator */ struct game_state app; size_t thread_count = (size_t)sysconf(_SC_NPROCESSORS_ONLN); memset(&app, 0, sizeof(app)); mem_init(&app.memory); alloctor_init(&app.game_alloc, thread_count, &app.memory, MEMORY_TAG_GAME); alloctor_init(&app.render_alloc, thread_count, &app.memory, MEMORY_TAG_RENDER); alloctor_init(&app.gpu_alloc, thread_count, &app.memory, MEMORY_TAG_GPU); /* start root process */ {struct scheduler sched; struct job job = Job(root, &app); job_counter counter; sched_init(&sched, thread_count); printf("init\n"); scheduler_run(&sched, JOB_QUEUE_HIGH, &job, 1, &counter); printf("run\n"); scheduler_wait_for(&sched, &counter, 0); printf("finished\n");} return 0; } -
vurtun revised this gist
Mar 27, 2017 . 1 changed file with 0 additions and 84 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -836,87 +836,3 @@ sched_free(struct scheduler *sched) sem_free(&sched->work_sem); } -
vurtun revised this gist
Mar 27, 2017 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -130,7 +130,7 @@ bit_clear(u64 *bitset, unsigned int start, int len) * --------------------------------------------------------------*/ static void spinlock_begin(volatile u32 *spinlock) {while (__sync_val_compare_and_swap(spinlock, 1, 0) != 0) _mm_pause();} static void spinlock_end(volatile u32 *spinlock) -
vurtun revised this gist
Jun 21, 2016 . 1 changed file with 0 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -508,7 +508,6 @@ struct scheduler_fiber { struct scheduler_worker { int id; pthread_t thread; struct scheduler_fiber *context; struct scheduler *sched; }; -
vurtun revised this gist
Jun 19, 2016 . 1 changed file with 0 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -197,8 +197,6 @@ alloc_block(struct memory *mem, enum memory_tag tag) spinlock_begin(&mem->lock); index = (unsigned int)bit_find_first_set(mem->free, MAX_MEMORY_BLOCKS); assert(index < MAX_MEMORY_BLOCKS); {char *block; bit_clear(mem->free, index, 1); -
vurtun revised this gist
Jun 19, 2016 . 1 changed file with 15 additions and 6 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -711,7 +711,7 @@ thread_proc(void *arg) } static int scheduler_self_id(struct scheduler *s) { int worker_index = 0; pthread_t self = pthread_self(); @@ -725,7 +725,7 @@ scheduler_self_index(struct scheduler *s) static struct scheduler_worker* scheduler_self(struct scheduler *s) { int worker_index = scheduler_self_id(s); if (worker_index < 0) return 0; return &s->worker[worker_index]; } @@ -847,10 +847,13 @@ sched_free(struct scheduler *sched) struct game_state { int data; struct memory memory; struct allocator game_alloc; struct allocator gpu_alloc; struct allocator render_alloc; }; struct test_data { struct allocator *alloc; int *data; int from; int to; @@ -859,7 +862,9 @@ struct test_data { JOB_ENTRY_POINT(test_work) { int i; void *mem; struct test_data *data = arg; mem = alloc(data->alloc, scheduler_self_id(sched), 1024); for (i = data->from; i < data->to; ++i) data->data[i] = i; @@ -878,6 +883,7 @@ JOB_ENTRY_POINT(root) printf("root\n"); for (i = 0; i < 8; ++i) { data[i].alloc = (i&1) ? &game->game_alloc: &game->render_alloc; data[i].data = n; data[i].from = i * 256; data[i].to = (i+1)*256; @@ -887,6 +893,9 @@ JOB_ENTRY_POINT(root) scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 8, &counter); printf("run\n"); scheduler_wait_for(sched, &counter, 0); mem_free(&game->memory, MEMORY_TAG_GAME); mem_free(&game->memory, MEMORY_TAG_RENDER); mem_free(&game->memory, MEMORY_TAG_GPU); printf("done\n"); } @@ -897,9 +906,9 @@ int main(int argc, char **argv) size_t thread_count = (size_t)sysconf(_SC_NPROCESSORS_ONLN); memset(&app, 0, sizeof(app)); mem_init(&app.memory); alloctor_init(&app.game_alloc, thread_count, &app.memory, MEMORY_TAG_GAME); alloctor_init(&app.render_alloc, thread_count, &app.memory, MEMORY_TAG_RENDER); alloctor_init(&app.gpu_alloc, thread_count, &app.memory, MEMORY_TAG_GPU); /* start root process */ {struct scheduler sched; -
vurtun revised this gist
Jun 19, 2016 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -194,12 +194,12 @@ static void* alloc_block(struct memory *mem, enum memory_tag tag) { unsigned int index; spinlock_begin(&mem->lock); index = (unsigned int)bit_find_first_set(mem->free, MAX_MEMORY_BLOCKS); assert(index < MAX_MEMORY_BLOCKS); if (index >= MAX_MEMORY_BLOCKS) return 0; {char *block; bit_clear(mem->free, index, 1); bit_set(mem->tags[tag], index, 1); -
vurtun revised this gist
Jun 19, 2016 . 1 changed file with 4 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -191,7 +191,7 @@ mem_free(struct memory *mem, enum memory_tag tag) } static void* alloc_block(struct memory *mem, enum memory_tag tag) { unsigned int index; index = (unsigned int)bit_find_first_set(mem->free, MAX_MEMORY_BLOCKS); @@ -236,7 +236,7 @@ alloctor_init(struct allocator *a, size_t worker_count, a->memory = memory; a->worker_count = worker_count; for(;i < a->worker_count; ++i) { a->blocks[i].data = alloc_block(a->memory, tag); a->blocks[i].capacity = MEMORY_BLOCK_SIZE; a->blocks[i].size = 0; } @@ -248,7 +248,7 @@ allocator_clear(struct allocator *a) unsigned int i = 0; for(;i < a->worker_count; ++i) { a->blocks[i].size = 0; a->blocks[i].data = alloc_block(a->memory, a->tag); a->blocks[i].capacity = MEMORY_BLOCK_SIZE; } } @@ -260,7 +260,7 @@ alloc(struct allocator *a, int thread_index, size_t size) assert(size < MEMORY_BLOCK_SIZE); if ((a->blocks[thread_index].size + size) > a->blocks[thread_index].capacity) { /* allocate new worker memory block */ a->blocks[thread_index].data = alloc_block(a->memory, a->tag); assert(a->blocks[thread_index].data); a->blocks[thread_index].size = 0; } -
vurtun revised this gist
Jun 19, 2016 . 1 changed file with 2 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -50,7 +50,7 @@ typedef uint64_t u64; #define max(a,b) (((a) > (b)) ? (a): (b)) static u64 bit_find_first_set(const u64 *addr, u64 size) { u64 idx; #define ffs(x) (u64)__builtin_ffsl((long int)addr[idx]) @@ -194,7 +194,7 @@ static void* block_alloc(struct memory *mem, enum memory_tag tag) { unsigned int index; index = (unsigned int)bit_find_first_set(mem->free, MAX_MEMORY_BLOCKS); assert(index < MAX_MEMORY_BLOCKS); if (index >= MAX_MEMORY_BLOCKS) return 0; -
vurtun revised this gist
Jun 19, 2016 . 1 changed file with 271 additions and 17 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,4 +1,5 @@ #include <stdio.h> #include <errno.h> #include <stdlib.h> #include <stdint.h> #include <assert.h> @@ -15,6 +16,8 @@ #include "xmmintrin.h" #include <ucontext.h> #include <sys/mman.h> typedef int8_t i8; typedef int16_t i16; typedef int32_t i32; @@ -23,13 +26,103 @@ typedef uint8_t u8; typedef uint16_t u16; typedef uint32_t u32; typedef uint64_t u64; #define MAX_WORK_QUEUE_THREAD 64 #define FIBER_STACK_SIZE (64 * 1024) #define MAX_SCHEDULER_WORKER 64 #define MAX_SCHEDULER_FIBERS 128 /* -------------------------------------------------------------- * * BITSET * * --------------------------------------------------------------*/ #define BITS_PER_BYTE 8 #define BITS_PER_LONG 64 #define BIT_WORD(nr) ((nr) / BITS_PER_LONG) #define DIV_ROUND_UP(n,d) (((n) + (d) - 1) / (d)) #define BITS_TO_LONGS(nr) DIV_ROUND_UP(nr, BITS_PER_BYTE * sizeof(u64)) #define BITMAP_FIRST_WORD_MASK(start) (~0UL << ((start) % (BITS_PER_LONG-1))) #define BITMAP_LAST_WORD_MASK(nbits) (((nbits) % BITS_PER_LONG) ? (1UL << ((nbits) % BITS_PER_LONG))-1: ~0UL) #define DECLARE_BITMAP(name, bits) u64 name[BITS_TO_LONGS(bits)] #define min(a,b) (((a) < (b)) ? (a): (b)) #define max(a,b) (((a) > (b)) ? (a): (b)) static u64 bit_find_first_bit(const u64 *addr, u64 size) { u64 idx; #define ffs(x) (u64)__builtin_ffsl((long int)addr[idx]) for (idx = 0; idx * BITS_PER_LONG < size; idx++) { if (addr[idx]) { u64 first_set = ffs(addr[idx]) - 1; return min(idx * BITS_PER_LONG + first_set, size); } } return size; } static void bit_fill(u64 *dst, unsigned int nbits) { unsigned int nlongs = (unsigned int)BITS_TO_LONGS(nbits); unsigned int len = (unsigned int)((nlongs - 1) * sizeof(u64)); memset(dst, 0xff, len); } static void bit_or(u64 *dst, const u64 *bitmap1, const u64 *bitmap2, unsigned int nbits) { unsigned int k; unsigned int nr = (unsigned int)BITS_TO_LONGS(nbits); for (k = 0; k < nr; ++k) dst[k] = bitmap1[k] | bitmap2[k]; } static void bit_set(u64 *bitset, unsigned int start, int len) { u64 *p = bitset + BIT_WORD(start); const unsigned int size = start + (unsigned int)len; int bits_to_set = (int)(BITS_PER_LONG - (start % BITS_PER_LONG)); u64 mask_to_set = BITMAP_FIRST_WORD_MASK(start); while (len - bits_to_set >= 0) { *p |= mask_to_set; len -= bits_to_set; bits_to_set = BITS_PER_LONG; mask_to_set = ~0UL; p++; } if (len) { mask_to_set &= BITMAP_LAST_WORD_MASK(size); *p |= mask_to_set; } } static void bit_clear(u64 *bitset, unsigned int start, int len) { u64 *p = bitset + BIT_WORD(start); const unsigned int size = start + (unsigned int)len; int bits_to_clear = (int)(BITS_PER_LONG - (start % BITS_PER_LONG)); unsigned int long mask_to_clear = BITMAP_FIRST_WORD_MASK(start); while (len - bits_to_clear >= 0) { *p &= ~mask_to_clear; len -= bits_to_clear; bits_to_clear = BITS_PER_LONG; mask_to_clear = ~0UL; p++; } if (len) { mask_to_clear &= BITMAP_LAST_WORD_MASK(size); *p &= ~mask_to_clear; } } /* -------------------------------------------------------------- * * SPINLOCK @@ -43,6 +136,140 @@ static void spinlock_end(volatile u32 *spinlock) {_mm_sfence(); *spinlock = 0;} /* -------------------------------------------------------------- * * MEMORY * * --------------------------------------------------------------*/ #define MEMORY_BLOCK_SIZE (2*1024*1024) /* 2MB page size */ #define MAX_MEMORY_BLOCKS 512 /* 1GB memory */ typedef DECLARE_BITMAP(memory_bitmap, MAX_MEMORY_BLOCKS); enum memory_tag { MEMORY_TAG_GAME, MEMORY_TAG_RENDER, MEMORY_TAG_GPU, MEMORY_TAG_COUNT }; struct memory { volatile u32 lock; memory_bitmap free; memory_bitmap tags[MEMORY_TAG_COUNT]; void *data; size_t size; }; static void mem_init(struct memory *mem) { int i; memset(mem, 0, sizeof(*mem)); mem->size = (size_t)MEMORY_BLOCK_SIZE * (size_t)MAX_MEMORY_BLOCKS; mem->data = mmap(0, mem->size, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_HUGETLB|MAP_ANONYMOUS, -1, 0); if (mem->data == MAP_FAILED) fprintf(stderr, "%s\n", strerror(errno)); assert(mem->data != MAP_FAILED); bit_fill(mem->free, MAX_MEMORY_BLOCKS); } static void mem_clear(struct memory *mem) { munmap(mem->data, mem->size); memset(mem, 0, sizeof(*mem)); } static void mem_free(struct memory *mem, enum memory_tag tag) { spinlock_begin(&mem->lock); bit_or(mem->free, mem->free, mem->tags[tag], MAX_MEMORY_BLOCKS); memset(mem->tags[tag], 0, sizeof(mem->tags[tag])); spinlock_end(&mem->lock); } static void* block_alloc(struct memory *mem, enum memory_tag tag) { unsigned int index; index = (unsigned int)bit_find_first_bit(mem->free, MAX_MEMORY_BLOCKS); assert(index < MAX_MEMORY_BLOCKS); if (index >= MAX_MEMORY_BLOCKS) return 0; spinlock_begin(&mem->lock); {char *block; bit_clear(mem->free, index, 1); bit_set(mem->tags[tag], index, 1); block = (char*)mem->data + (MEMORY_BLOCK_SIZE * index); spinlock_end(&mem->lock); return block;} } /* -------------------------------------------------------------- * * ALLOCATOR * * --------------------------------------------------------------*/ struct block { void *data; size_t size; size_t capacity; }; struct allocator { enum memory_tag tag; struct memory *memory; struct block blocks[MAX_SCHEDULER_WORKER]; size_t worker_count; }; static void alloctor_init(struct allocator *a, size_t worker_count, struct memory *memory, enum memory_tag tag) { unsigned int i = 0; memset(a, 0, sizeof(*a)); a->tag = tag; a->memory = memory; a->worker_count = worker_count; for(;i < a->worker_count; ++i) { a->blocks[i].data = block_alloc(a->memory, tag); a->blocks[i].capacity = MEMORY_BLOCK_SIZE; a->blocks[i].size = 0; } } static void allocator_clear(struct allocator *a) { unsigned int i = 0; for(;i < a->worker_count; ++i) { a->blocks[i].size = 0; a->blocks[i].data = block_alloc(a->memory, a->tag); a->blocks[i].capacity = MEMORY_BLOCK_SIZE; } } static void* alloc(struct allocator *a, int thread_index, size_t size) { void *memory = 0; assert(size < MEMORY_BLOCK_SIZE); if ((a->blocks[thread_index].size + size) > a->blocks[thread_index].capacity) { /* allocate new worker memory block */ a->blocks[thread_index].data = block_alloc(a->memory, a->tag); assert(a->blocks[thread_index].data); a->blocks[thread_index].size = 0; } /* allocate from worker memory block */ memory = (char*)a->blocks[thread_index].data + a->blocks[thread_index].size; a->blocks[thread_index].size += size; return memory; } /* -------------------------------------------------------------- * * SEMAPHORE @@ -82,7 +309,7 @@ sem_post(struct sem *s, int delta) static void sem_wait(struct sem *s, int delta) { if (delta < 0) return; pthread_mutex_lock(&s->guard); do { if (s->count >= delta) { @@ -101,7 +328,6 @@ sem_wait(struct sem *s, int delta) * --------------------------------------------------------------*/ #undef _FORTIFY_SOURCE typedef void(*fiber_callback)(void*); struct sys_fiber { ucontext_t fib; }; @@ -284,6 +510,7 @@ struct scheduler_fiber { struct scheduler_worker { int id; pthread_t thread; struct allocator allocator; struct scheduler_fiber *context; struct scheduler *sched; }; @@ -413,7 +640,7 @@ scheduler_run(struct scheduler *s, enum job_queue_ids q, struct job *jobs, } static void scheduler_fiber_proc(void *arg) { struct scheduler_worker *w = (struct scheduler_worker*)arg; struct scheduler *s = w->sched; @@ -479,10 +706,31 @@ thread_proc(void *arg) fiber->handle.fib.uc_stack.ss_sp = 0; w->context = fiber; scheduler_fiber_proc(w); return 0; } static int scheduler_self_index(struct scheduler *s) { int worker_index = 0; pthread_t self = pthread_self(); for (worker_index; worker_index < s->worker_count; ++worker_index) { if (s->worker[worker_index].thread == self) return worker_index; } return -1; } static struct scheduler_worker* scheduler_self(struct scheduler *s) { int worker_index = scheduler_self_index(s); if (worker_index < 0) return 0; return &s->worker[worker_index]; } static void scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value) { @@ -493,12 +741,7 @@ scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value) assert(counter); /* find threads own worker state */ w = scheduler_self(s); assert(w); /* insert current context into waiting list */ @@ -512,7 +755,8 @@ scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value) if (!w->context) { w->context = scheduler_get_free_fiber(s); assert(w->context); fiber_create(&w->context->handle, w->context->stack, w->context->stack_size, scheduler_fiber_proc, w); } if (s->profiler.context_switch) s->profiler.context_switch(s->profiler.userdata, w->id); @@ -557,7 +801,8 @@ sched_init(struct scheduler *sched, size_t worker_count) pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpus); /* start worker thread */ pthread_create(&sched->worker[thread_index].thread, &attr, thread_proc, &sched->worker[thread_index]); pthread_detach(sched->worker[thread_index].thread); } @@ -601,6 +846,8 @@ sched_free(struct scheduler *sched) * --------------------------------------------------------------*/ struct game_state { int data; struct memory memory; struct allocator alloc; }; struct test_data { @@ -645,18 +892,25 @@ JOB_ENTRY_POINT(root) int main(int argc, char **argv) { /* setup app memory and allocator */ struct game_state app; size_t thread_count = (size_t)sysconf(_SC_NPROCESSORS_ONLN); memset(&app, 0, sizeof(app)); mem_init(&app.memory); alloctor_init(&app.alloc, thread_count, &app.memory, MEMORY_TAG_GAME); alloctor_init(&app.alloc, thread_count, &app.memory, MEMORY_TAG_RENDER); alloctor_init(&app.alloc, thread_count, &app.memory, MEMORY_TAG_GPU); /* start root process */ {struct scheduler sched; struct job job = Job(root, &app); job_counter counter; sched_init(&sched, thread_count); printf("init\n"); scheduler_run(&sched, JOB_QUEUE_HIGH, &job, 1, &counter); printf("run\n"); scheduler_wait_for(&sched, &counter, 0); printf("finished\n");} return 0; } -
vurtun revised this gist
Jun 14, 2016 . 1 changed file with 5 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -99,6 +99,7 @@ sem_wait(struct sem *s, int delta) * FIBER * * --------------------------------------------------------------*/ #undef _FORTIFY_SOURCE typedef void(*fiber_callback)(void*); struct sys_fiber { @@ -614,7 +615,10 @@ JOB_ENTRY_POINT(test_work) struct test_data *data = arg; for (i = data->from; i < data->to; ++i) data->data[i] = i; printf("sleep begin\n"); sleep(1); printf("sleep end\n"); } JOB_ENTRY_POINT(root) @@ -633,7 +637,7 @@ JOB_ENTRY_POINT(root) jobs[i] = Job(test_work, &data); } scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 8, &counter); printf("run\n"); scheduler_wait_for(sched, &counter, 0); printf("done\n"); -
vurtun revised this gist
Jun 14, 2016 . 1 changed file with 5 additions and 8 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -99,7 +99,6 @@ sem_wait(struct sem *s, int delta) * FIBER * * --------------------------------------------------------------*/ typedef void(*fiber_callback)(void*); struct sys_fiber { @@ -349,7 +348,7 @@ static void scheduler_unhook_from_list(struct scheduler_fiber **list, struct scheduler_fiber *element, volatile u32 *lock) { if (lock) spinlock_begin(lock); if (element->next) element->next->prev = element->prev; if (element->prev) @@ -358,7 +357,7 @@ scheduler_unhook_from_list(struct scheduler_fiber **list, *list = element->next; element->next = element->prev = 0; if (lock) spinlock_end(lock); } static struct scheduler_fiber* @@ -370,6 +369,7 @@ scheduler_find_fiber_finished_waiting(struct scheduler *s) if (*iter->job.run_count == iter->value) break; iter = iter->next; } if (iter) scheduler_unhook_from_list(&s->wait_list, iter, 0); spinlock_end(&s->wait_list_lock); return iter; } @@ -383,6 +383,7 @@ scheduler_get_free_fiber(struct scheduler *s) fib = &s->fibers[s->fiber_count++]; } else if (s->free_list) { fib = s->free_list; scheduler_unhook_from_list(&s->free_list, fib, 0); } spinlock_end(&s->free_list_lock); return fib; @@ -424,7 +425,6 @@ fiber_proc(void *arg) /* put old worker context into freelist */ struct scheduler_fiber *old = w->context; memset(w->context, 0, sizeof(*w->context)); scheduler_hook_into_list(&s->free_list, old, &s->free_list_lock); /* set previously waiting fiber as worker context */ @@ -511,9 +511,8 @@ scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value) if (!w->context) { w->context = scheduler_get_free_fiber(s); assert(w->context); fiber_create(&w->context->handle, w->context->stack, w->context->stack_size, fiber_proc, w); } if (s->profiler.context_switch) s->profiler.context_switch(s->profiler.userdata, w->id); fiber_switch_to(&old->handle, &w->context->handle); @@ -570,7 +569,6 @@ sched_init(struct scheduler *sched, size_t worker_count) sched->worker[thread_index].id = (int)thread_index; pthread_setaffinity_np(sched->worker[thread_index].thread, sizeof(cpu_set_t), &cpus);} /* create fiber for main thread worker */ {struct scheduler_fiber *fiber; fiber = scheduler_get_free_fiber(sched); @@ -593,7 +591,6 @@ sched_free(struct scheduler *sched) free(fiber->stack); }} sem_free(&sched->work_sem); } /* -------------------------------------------------------------- -
vurtun revised this gist
Jun 5, 2016 . 1 changed file with 5 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -562,9 +562,14 @@ sched_init(struct scheduler *sched, size_t worker_count) } /* initialize main thread as worker thread */ {cpu_set_t cpus; CPU_ZERO(&cpus); CPU_SET(thread_index, &cpus); sched->worker[thread_index].sched = sched; sched->worker[thread_index].thread = pthread_self(); sched->worker[thread_index].id = (int)thread_index; pthread_setaffinity_np(sched->worker[thread_index].thread, sizeof(cpu_set_t), &cpus);} /* create fiber for main thread worker */ {struct scheduler_fiber *fiber; -
vurtun revised this gist
Jun 4, 2016 . 1 changed file with 54 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -62,6 +62,13 @@ sem_init(struct sem *s, int init) s->count = init; } static void sem_free(struct sem *s) { pthread_mutex_destroy(&s->guard); pthread_cond_destroy(&s->cond); } static void sem_post(struct sem *s, int delta) { @@ -281,12 +288,25 @@ struct scheduler_worker { struct scheduler *sched; }; typedef void (*scheduler_profiler_callback_f)(void*, int thread_id); struct scheduler_profiling { void *userdata; scheduler_profiler_callback_f thread_start; scheduler_profiler_callback_f thread_stop; scheduler_profiler_callback_f context_switch; scheduler_profiler_callback_f wait_start; scheduler_profiler_callback_f wait_stop; }; struct scheduler { struct sem work_sem; struct job_queue queue[JOB_QUEUE_COUNT]; int worker_count; struct scheduler_worker worker[MAX_SCHEDULER_WORKER]; volatile u32 worker_running; volatile u32 worker_active; struct scheduler_profiling profiler; int fiber_count; struct scheduler_fiber fibers[MAX_SCHEDULER_FIBERS]; @@ -395,6 +415,8 @@ fiber_proc(void *arg) { struct scheduler_worker *w = (struct scheduler_worker*)arg; struct scheduler *s = w->sched; __sync_add_and_fetch(&s->worker_active, 1); while (1) { /* check if any fiber is done waiting */ struct scheduler_fiber *fiber = scheduler_find_fiber_finished_waiting(s); @@ -407,6 +429,8 @@ fiber_proc(void *arg) /* set previously waiting fiber as worker context */ w->context = fiber; if (s->profiler.context_switch) s->profiler.context_switch(s->profiler.userdata, w->id); fiber_switch_to(&old->handle, &w->context->handle); } @@ -415,11 +439,23 @@ fiber_proc(void *arg) if (!(job_queue_pop(&job, &s->queue[JOB_QUEUE_HIGH])) && !(job_queue_pop(&job, &s->queue[JOB_QUEUE_NORMAL])) && !(job_queue_pop(&job, &s->queue[JOB_QUEUE_LOW]))) { /* currently no job so wait */ __sync_sub_and_fetch(&s->worker_active, 1); if (s->profiler.wait_start) s->profiler.wait_start(s->profiler.userdata, w->id); sem_wait(&s->work_sem, 1); if (s->profiler.wait_stop) s->profiler.wait_stop(s->profiler.userdata, w->id); __sync_add_and_fetch(&s->worker_active, 1); } else { /* run dequeued job */ w->context->job = *job; assert(job->callback); if (s->profiler.thread_start) s->profiler.thread_start(s->profiler.userdata, w->id); job->callback(s, job->data); if (s->profiler.thread_stop) s->profiler.thread_stop(s->profiler.userdata, w->id); __sync_sub_and_fetch(job->run_count, 1); }} } @@ -431,6 +467,7 @@ thread_proc(void *arg) struct scheduler_worker *w = (struct scheduler_worker*)arg; struct scheduler_fiber *fiber; struct scheduler *s = w->sched; __sync_add_and_fetch(&s->worker_running, 1); /* create dummy fiber */ fiber = scheduler_get_free_fiber(s); @@ -440,6 +477,7 @@ thread_proc(void *arg) fiber->handle.fib.uc_stack.ss_size = 0; fiber->handle.fib.uc_stack.ss_sp = 0; w->context = fiber; fiber_proc(w); return 0; } @@ -476,6 +514,8 @@ scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value) scheduler_unhook_from_list(&s->free_list, w->context, &s->free_list_lock); fiber_create(&w->context->handle, w->context->stack, w->context->stack_size, fiber_proc, w); } else scheduler_unhook_from_list(&s->wait_list, w->context, &s->wait_list_lock); if (s->profiler.context_switch) s->profiler.context_switch(s->profiler.userdata, w->id); fiber_switch_to(&old->handle, &w->context->handle); } @@ -538,6 +578,19 @@ sched_init(struct scheduler *sched, size_t worker_count) pthread_attr_destroy(&attr); } static void sched_free(struct scheduler *sched) { /* free fibers stack */ {int fiber_index = 0; for (fiber_index; fiber_index < MAX_SCHEDULER_FIBERS; ++fiber_index) { struct scheduler_fiber *fiber = sched->fibers + fiber_index; free(fiber->stack); }} sem_free(&sched->work_sem); } /* -------------------------------------------------------------- * * TEST @@ -559,6 +612,7 @@ JOB_ENTRY_POINT(test_work) struct test_data *data = arg; for (i = data->from; i < data->to; ++i) data->data[i] = i; sleep(1); } JOB_ENTRY_POINT(root) @@ -580,8 +634,6 @@ JOB_ENTRY_POINT(root) scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 4, &counter); printf("run\n"); scheduler_wait_for(sched, &counter, 0); printf("done\n"); } -
vurtun created this gist
Jun 3, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,604 @@ #include <stdio.h> #include <stdlib.h> #include <stdint.h> #include <assert.h> #include <string.h> #define streq(a, b) (!strcmp((a), (b))) #ifndef __USE_GNU #define __USE_GNU #endif #include <setjmp.h> #include <sched.h> #include <unistd.h> #include <pthread.h> #include "xmmintrin.h" #include <ucontext.h> typedef int8_t i8; typedef int16_t i16; typedef int32_t i32; typedef int64_t i64; typedef uint8_t u8; typedef uint16_t u16; typedef uint32_t u32; typedef uint64_t u64; typedef long word; #define MAX_WORK_QUEUE_THREAD 64 #define FIBER_STACK_SIZE (64 * 1024) #define MAX_SCHEDULER_WORKER 64 #define MAX_SCHEDULER_FIBERS 128 /* -------------------------------------------------------------- * * SPINLOCK * * --------------------------------------------------------------*/ static void spinlock_begin(volatile u32 *spinlock) {while (__sync_val_compare_and_swap(spinlock, 1, 0) != 0);} static void spinlock_end(volatile u32 *spinlock) {_mm_sfence(); *spinlock = 0;} /* -------------------------------------------------------------- * * SEMAPHORE * * --------------------------------------------------------------*/ struct sem { pthread_mutex_t guard; pthread_cond_t cond; int count; }; static void sem_init(struct sem *s, int init) { pthread_mutex_init(&s->guard, 0); pthread_cond_init(&s->cond, 0); s->count = init; } static void sem_post(struct sem *s, int delta) { if (delta < 0) return; pthread_mutex_lock(&s->guard); s->count += delta; pthread_cond_broadcast(&s->cond); pthread_mutex_unlock(&s->guard); } static void sem_wait(struct sem *s, int delta) { if (delta < 0 ) return; pthread_mutex_lock(&s->guard); do { if (s->count >= delta) { s->count -= delta; break; } pthread_cond_wait(&s->cond, &s->guard); } while (1); pthread_mutex_unlock(&s->guard); } /* -------------------------------------------------------------- * * FIBER * * --------------------------------------------------------------*/ #undef _FORTIFY_SOURCE typedef void(*fiber_callback)(void*); struct sys_fiber { ucontext_t fib; }; static void fiber_create(struct sys_fiber *fib, void *stack, size_t stack_size, fiber_callback callback, void *data) { getcontext(&fib->fib); fib->fib.uc_stack.ss_size = stack_size; fib->fib.uc_stack.ss_sp = stack; fib->fib.uc_link = 0; makecontext(&fib->fib, (void(*)())callback, 1, data); } static void fiber_switch_to(struct sys_fiber *prev, struct sys_fiber *fib) { swapcontext(&prev->fib, &fib->fib); } /* -------------------------------------------------------------- * * QUEUE * * --------------------------------------------------------------*/ /* This is an implementation of a multi producer and consumer Non-Blocking * Concurrent FIFO Queue based on the paper from Phillippas Tsigas and Yi Zhangs: * www.cse.chalmers.se/~tsigas/papers/latest-spaa01.pdf */ #define MAX_WORK_QUEUE_JOBS (1024) #define WORK_QUEUE_MASK (MAX_WORK_QUEUE_JOBS-1) struct scheduler; typedef void(*job_callback)(struct scheduler*,void*); #define JOB_ENTRY_POINT(name) static void name(struct scheduler *sched, void *arg) #define BASE_ALIGN(x) __attribute__((aligned(x))) #define QUEUE_EMPTY 0 #define QUEUE_REMOVED 1 struct job { void *data; job_callback callback; volatile u32 *run_count; }; struct job_queue { volatile u32 head; struct job *jobs[MAX_WORK_QUEUE_JOBS]; volatile u32 tail; }; static void job_queue_init(struct job_queue *q) { memset(q, 0, sizeof(*q)); q->jobs[0] = QUEUE_EMPTY; q->head = 0; q->tail = 1; } static int job_queue_entry_free(struct job *p) { return (((uintptr_t)p == QUEUE_EMPTY) || ((uintptr_t)p == QUEUE_REMOVED)); } static int job_queue_push(struct job_queue *q, struct job *job) { while (1) { /* read tail */ u32 te = q->tail; u32 ate = te; struct job *tt = q->jobs[ate]; u32 tmp = (ate + 1) & WORK_QUEUE_MASK; struct job *tnew; /* we want to find the actual tail */ while (!(job_queue_entry_free(tt))) { /* check tails consistency */ if (te != q->tail) goto retry; /* check if queue is full */ if (tmp == q->head) break; tt = q->jobs[tmp]; ate = tmp; tmp = (ate + 1) & WORK_QUEUE_MASK; } /* check tails consistency */ if (te != q->tail) continue; /* check if queue is full */ if (tmp == q->head) { ate = (tmp + 1) & WORK_QUEUE_MASK; tt = q->jobs[ate]; if (!(job_queue_entry_free(tt))) return 0; /* queue is full */ /* let pop update header */ __sync_bool_compare_and_swap(&q->head, tmp, ate); continue; } if ((uintptr_t)tt == QUEUE_REMOVED) job = (struct job*)((uintptr_t)job | 0x01); if (te != q->tail) continue; if (__sync_bool_compare_and_swap(&q->jobs[ate], tt, job)) { if ((tmp & 1) == 0) __sync_bool_compare_and_swap(&q->tail, te, tmp); return 1; } retry:; } } static int job_queue_pop(struct job **job, struct job_queue *q) { while (1) { u32 th = q->head; u32 tmp = (th + 1) & WORK_QUEUE_MASK; struct job *tt = q->jobs[tmp]; struct job *tnull = 0; /* we want to find the actual head */ while ((job_queue_entry_free(tt))) { if (th != q->head) goto retry; if (tmp == q->tail) return 0; tmp = (tmp + 1) & WORK_QUEUE_MASK; tt = q->jobs[tmp]; } /* check head's consistency */ if (th != q->head) continue; /* check if queue is empty */ if (tmp == q->tail) { /* help push to update end */ __sync_bool_compare_and_swap(&q->tail, tmp, (tmp+1) & WORK_QUEUE_MASK); continue; /* retry */ } tnull = (((uintptr_t)tt & 0x01) ? (struct job*)QUEUE_REMOVED: (struct job*)QUEUE_EMPTY); if (th != q->head) continue; /* get actual head */ if (__sync_bool_compare_and_swap(&q->jobs[tmp], tt, tnull)) { if ((tmp & 0x1) == 0) __sync_bool_compare_and_swap(&q->head, th, tmp); *job = (struct job*)((uintptr_t)tt & ~(uintptr_t)1); return 1; } retry:; } } /* -------------------------------------------------------------- * * SCHEDULER * * --------------------------------------------------------------*/ typedef volatile u32 job_counter; enum job_queue_ids { JOB_QUEUE_LOW, JOB_QUEUE_NORMAL, JOB_QUEUE_HIGH, JOB_QUEUE_COUNT }; struct scheduler_fiber { struct scheduler_fiber *next; struct scheduler_fiber *prev; void *stack; size_t stack_size; struct sys_fiber handle; struct job job; u32 value; }; struct scheduler_worker { int id; pthread_t thread; struct scheduler_fiber *context; struct scheduler *sched; }; struct scheduler { struct sem work_sem; struct job_queue queue[JOB_QUEUE_COUNT]; int worker_count; struct scheduler_worker worker[MAX_SCHEDULER_WORKER]; int fiber_count; struct scheduler_fiber fibers[MAX_SCHEDULER_FIBERS]; volatile u32 wait_list_lock; struct scheduler_fiber *wait_list; volatile u32 free_list_lock; struct scheduler_fiber *free_list; }; static struct job Job(job_callback callback, void *data) { struct job task; task.callback = callback; task.data = data; task.run_count = 0; return task; } static void scheduler_hook_into_list(struct scheduler_fiber **list, struct scheduler_fiber *element, volatile u32 *lock) { spinlock_begin(lock); if (!*list) { *list = element; element->prev = 0; element->next = 0; } else { element->prev = 0; element->next = *list; (*list)->prev = element; *list = element; } spinlock_end(lock); } static void scheduler_unhook_from_list(struct scheduler_fiber **list, struct scheduler_fiber *element, volatile u32 *lock) { spinlock_begin(lock); if (element->next) element->next->prev = element->prev; if (element->prev) element->prev->next = element->next; if (*list == element) *list = element->next; element->next = element->prev = 0; spinlock_end(lock); } static struct scheduler_fiber* scheduler_find_fiber_finished_waiting(struct scheduler *s) { struct scheduler_fiber *iter = s->wait_list; spinlock_begin(&s->wait_list_lock); while (iter) { if (*iter->job.run_count == iter->value) break; iter = iter->next; } spinlock_end(&s->wait_list_lock); return iter; } static struct scheduler_fiber* scheduler_get_free_fiber(struct scheduler *s) { struct scheduler_fiber *fib = 0; spinlock_begin(&s->free_list_lock); if (s->fiber_count < MAX_SCHEDULER_FIBERS) { fib = &s->fibers[s->fiber_count++]; } else if (s->free_list) { fib = s->free_list; } spinlock_end(&s->free_list_lock); return fib; } static void scheduler_run(struct scheduler *s, enum job_queue_ids q, struct job *jobs, u32 count, job_counter *counter) { u32 jobIndex = 0; struct job_queue *queue; assert(q < JOB_QUEUE_COUNT); assert(counter); assert(jobs); assert(s); queue = &s->queue[q]; while (jobIndex < count) { jobs[jobIndex].run_count = counter; if (job_queue_push(queue, &jobs[jobIndex])) { sem_post(&s->work_sem, 1); jobIndex++; } } *counter = count; } static void fiber_proc(void *arg) { struct scheduler_worker *w = (struct scheduler_worker*)arg; struct scheduler *s = w->sched; while (1) { /* check if any fiber is done waiting */ struct scheduler_fiber *fiber = scheduler_find_fiber_finished_waiting(s); if (fiber) { /* put old worker context into freelist */ struct scheduler_fiber *old = w->context; memset(w->context, 0, sizeof(*w->context)); scheduler_unhook_from_list(&s->wait_list, fiber, &s->wait_list_lock); scheduler_hook_into_list(&s->free_list, old, &s->free_list_lock); /* set previously waiting fiber as worker context */ w->context = fiber; fiber_switch_to(&old->handle, &w->context->handle); } /* check if any new jobs inside work queues */ {struct job *job = 0; if (!(job_queue_pop(&job, &s->queue[JOB_QUEUE_HIGH])) && !(job_queue_pop(&job, &s->queue[JOB_QUEUE_NORMAL])) && !(job_queue_pop(&job, &s->queue[JOB_QUEUE_LOW]))) { sem_wait(&s->work_sem, 1); } else { w->context->job = *job; assert(job->callback); job->callback(s, job->data); __sync_sub_and_fetch(job->run_count, 1); }} } } static void* thread_proc(void *arg) { struct scheduler_worker *w = (struct scheduler_worker*)arg; struct scheduler_fiber *fiber; struct scheduler *s = w->sched; /* create dummy fiber */ fiber = scheduler_get_free_fiber(s); assert(fiber); getcontext(&fiber->handle.fib); fiber->handle.fib.uc_link = 0; fiber->handle.fib.uc_stack.ss_size = 0; fiber->handle.fib.uc_stack.ss_sp = 0; w->context = fiber; fiber_proc(w); return 0; } static void scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value) { struct scheduler_worker *w; struct scheduler_fiber *old; assert(s); assert(counter); /* find threads own worker state */ {int worker_index = 0; pthread_t self = pthread_self(); for (worker_index; worker_index < s->worker_count; ++worker_index) { if (s->worker[worker_index].thread == self) w = &s->worker[worker_index]; }} assert(w); /* insert current context into waiting list */ old = w->context; w->context->value = value; w->context->job.run_count = counter; scheduler_hook_into_list(&s->wait_list, old, &s->wait_list_lock); /*either continue finished waiting job or start new one */ w->context = scheduler_find_fiber_finished_waiting(s); if (!w->context) { w->context = scheduler_get_free_fiber(s); assert(w->context); scheduler_unhook_from_list(&s->free_list, w->context, &s->free_list_lock); fiber_create(&w->context->handle, w->context->stack, w->context->stack_size, fiber_proc, w); } else scheduler_unhook_from_list(&s->wait_list, w->context, &s->wait_list_lock); fiber_switch_to(&old->handle, &w->context->handle); } static void sched_init(struct scheduler *sched, size_t worker_count) { size_t thread_index = 0; pthread_attr_t attr; assert(sched); assert(worker_count); memset(sched, 0, sizeof(*sched)); sched->worker_count = (int)worker_count; /* init semeaphores */ sem_init(&sched->work_sem, 0); job_queue_init(&sched->queue[0]); job_queue_init(&sched->queue[1]); job_queue_init(&sched->queue[2]); /* init fibers */ {int fiber_index = 0; for (fiber_index; fiber_index < MAX_SCHEDULER_FIBERS; ++fiber_index) { struct scheduler_fiber *fiber = sched->fibers + fiber_index; fiber->stack_size = FIBER_STACK_SIZE; fiber->stack = calloc(fiber->stack_size, 1); }} /* start worker threads */ pthread_attr_init(&attr); for (thread_index; thread_index < worker_count-1; ++thread_index) { cpu_set_t cpus; sched->worker[thread_index].id = (int)thread_index; sched->worker[thread_index].sched = sched; /* bind thread to core */ CPU_ZERO(&cpus); CPU_SET(thread_index, &cpus); pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpus); /* start worker thread */ pthread_create(&sched->worker[thread_index].thread, &attr, thread_proc, &sched->worker[thread_index]); pthread_detach(sched->worker[thread_index].thread); } /* initialize main thread as worker thread */ sched->worker[thread_index].sched = sched; sched->worker[thread_index].thread = pthread_self(); sched->worker[thread_index].id = (int)thread_index; /* create fiber for main thread worker */ {struct scheduler_fiber *fiber; fiber = scheduler_get_free_fiber(sched); assert(fiber); getcontext(&fiber->handle.fib); fiber->handle.fib.uc_link = 0; fiber->handle.fib.uc_stack.ss_size = 0; fiber->handle.fib.uc_stack.ss_sp = 0; sched->worker[thread_index].context = fiber;} pthread_attr_destroy(&attr); } /* -------------------------------------------------------------- * * TEST * * --------------------------------------------------------------*/ struct game_state { int data; }; struct test_data { int *data; int from; int to; }; JOB_ENTRY_POINT(test_work) { int i; struct test_data *data = arg; for (i = data->from; i < data->to; ++i) data->data[i] = i; } JOB_ENTRY_POINT(root) { struct game_state *game = arg; job_counter counter = 0; struct job jobs[8]; struct test_data data[8]; int i, n[2*1024]; printf("root\n"); for (i = 0; i < 8; ++i) { data[i].data = n; data[i].from = i * 256; data[i].to = (i+1)*256; jobs[i] = Job(test_work, &data); } scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 4, &counter); printf("run\n"); scheduler_wait_for(sched, &counter, 0); for (i = 0; i < 2*1024; ++i) assert(n[i] == i); printf("done\n"); } int main(int argc, char **argv) { struct game_state app; struct job job = Job(root, &app); job_counter counter; /* start root process */ struct scheduler sched; size_t thread_count = (size_t)sysconf(_SC_NPROCESSORS_ONLN); sched_init(&sched, thread_count); printf("init\n"); scheduler_run(&sched, JOB_QUEUE_HIGH, &job, 1, &counter); printf("run\n"); scheduler_wait_for(&sched, &counter, 0); printf("finished\n"); return 0; }