#include #include #include #include #include #define streq(a, b) (!strcmp((a), (b))) #ifndef __USE_GNU #define __USE_GNU #endif #include #include #include #include #include "xmmintrin.h" #include typedef int8_t i8; typedef int16_t i16; typedef int32_t i32; typedef int64_t i64; typedef uint8_t u8; typedef uint16_t u16; typedef uint32_t u32; typedef uint64_t u64; typedef long word; #define MAX_WORK_QUEUE_THREAD 64 #define FIBER_STACK_SIZE (64 * 1024) #define MAX_SCHEDULER_WORKER 64 #define MAX_SCHEDULER_FIBERS 128 /* -------------------------------------------------------------- * * SPINLOCK * * --------------------------------------------------------------*/ static void spinlock_begin(volatile u32 *spinlock) {while (__sync_val_compare_and_swap(spinlock, 1, 0) != 0);} static void spinlock_end(volatile u32 *spinlock) {_mm_sfence(); *spinlock = 0;} /* -------------------------------------------------------------- * * SEMAPHORE * * --------------------------------------------------------------*/ struct sem { pthread_mutex_t guard; pthread_cond_t cond; int count; }; static void sem_init(struct sem *s, int init) { pthread_mutex_init(&s->guard, 0); pthread_cond_init(&s->cond, 0); s->count = init; } static void sem_free(struct sem *s) { pthread_mutex_destroy(&s->guard); pthread_cond_destroy(&s->cond); } static void sem_post(struct sem *s, int delta) { if (delta < 0) return; pthread_mutex_lock(&s->guard); s->count += delta; pthread_cond_broadcast(&s->cond); pthread_mutex_unlock(&s->guard); } static void sem_wait(struct sem *s, int delta) { if (delta < 0 ) return; pthread_mutex_lock(&s->guard); do { if (s->count >= delta) { s->count -= delta; break; } pthread_cond_wait(&s->cond, &s->guard); } while (1); pthread_mutex_unlock(&s->guard); } /* -------------------------------------------------------------- * * FIBER * * --------------------------------------------------------------*/ #undef _FORTIFY_SOURCE typedef void(*fiber_callback)(void*); struct sys_fiber { ucontext_t fib; }; static void fiber_create(struct sys_fiber *fib, void *stack, size_t stack_size, fiber_callback callback, void *data) { getcontext(&fib->fib); fib->fib.uc_stack.ss_size = stack_size; fib->fib.uc_stack.ss_sp = stack; fib->fib.uc_link = 0; makecontext(&fib->fib, (void(*)())callback, 1, data); } static void fiber_switch_to(struct sys_fiber *prev, struct sys_fiber *fib) { swapcontext(&prev->fib, &fib->fib); } /* -------------------------------------------------------------- * * QUEUE * * --------------------------------------------------------------*/ /* This is an implementation of a multi producer and consumer Non-Blocking * Concurrent FIFO Queue based on the paper from Phillippas Tsigas and Yi Zhangs: * www.cse.chalmers.se/~tsigas/papers/latest-spaa01.pdf */ #define MAX_WORK_QUEUE_JOBS (1024) #define WORK_QUEUE_MASK (MAX_WORK_QUEUE_JOBS-1) struct scheduler; typedef void(*job_callback)(struct scheduler*,void*); #define JOB_ENTRY_POINT(name) static void name(struct scheduler *sched, void *arg) #define BASE_ALIGN(x) __attribute__((aligned(x))) #define QUEUE_EMPTY 0 #define QUEUE_REMOVED 1 struct job { void *data; job_callback callback; volatile u32 *run_count; }; struct job_queue { volatile u32 head; struct job *jobs[MAX_WORK_QUEUE_JOBS]; volatile u32 tail; }; static void job_queue_init(struct job_queue *q) { memset(q, 0, sizeof(*q)); q->jobs[0] = QUEUE_EMPTY; q->head = 0; q->tail = 1; } static int job_queue_entry_free(struct job *p) { return (((uintptr_t)p == QUEUE_EMPTY) || ((uintptr_t)p == QUEUE_REMOVED)); } static int job_queue_push(struct job_queue *q, struct job *job) { while (1) { /* read tail */ u32 te = q->tail; u32 ate = te; struct job *tt = q->jobs[ate]; u32 tmp = (ate + 1) & WORK_QUEUE_MASK; struct job *tnew; /* we want to find the actual tail */ while (!(job_queue_entry_free(tt))) { /* check tails consistency */ if (te != q->tail) goto retry; /* check if queue is full */ if (tmp == q->head) break; tt = q->jobs[tmp]; ate = tmp; tmp = (ate + 1) & WORK_QUEUE_MASK; } /* check tails consistency */ if (te != q->tail) continue; /* check if queue is full */ if (tmp == q->head) { ate = (tmp + 1) & WORK_QUEUE_MASK; tt = q->jobs[ate]; if (!(job_queue_entry_free(tt))) return 0; /* queue is full */ /* let pop update header */ __sync_bool_compare_and_swap(&q->head, tmp, ate); continue; } if ((uintptr_t)tt == QUEUE_REMOVED) job = (struct job*)((uintptr_t)job | 0x01); if (te != q->tail) continue; if (__sync_bool_compare_and_swap(&q->jobs[ate], tt, job)) { if ((tmp & 1) == 0) __sync_bool_compare_and_swap(&q->tail, te, tmp); return 1; } retry:; } } static int job_queue_pop(struct job **job, struct job_queue *q) { while (1) { u32 th = q->head; u32 tmp = (th + 1) & WORK_QUEUE_MASK; struct job *tt = q->jobs[tmp]; struct job *tnull = 0; /* we want to find the actual head */ while ((job_queue_entry_free(tt))) { if (th != q->head) goto retry; if (tmp == q->tail) return 0; tmp = (tmp + 1) & WORK_QUEUE_MASK; tt = q->jobs[tmp]; } /* check head's consistency */ if (th != q->head) continue; /* check if queue is empty */ if (tmp == q->tail) { /* help push to update end */ __sync_bool_compare_and_swap(&q->tail, tmp, (tmp+1) & WORK_QUEUE_MASK); continue; /* retry */ } tnull = (((uintptr_t)tt & 0x01) ? (struct job*)QUEUE_REMOVED: (struct job*)QUEUE_EMPTY); if (th != q->head) continue; /* get actual head */ if (__sync_bool_compare_and_swap(&q->jobs[tmp], tt, tnull)) { if ((tmp & 0x1) == 0) __sync_bool_compare_and_swap(&q->head, th, tmp); *job = (struct job*)((uintptr_t)tt & ~(uintptr_t)1); return 1; } retry:; } } /* -------------------------------------------------------------- * * SCHEDULER * * --------------------------------------------------------------*/ typedef volatile u32 job_counter; enum job_queue_ids { JOB_QUEUE_LOW, JOB_QUEUE_NORMAL, JOB_QUEUE_HIGH, JOB_QUEUE_COUNT }; struct scheduler_fiber { struct scheduler_fiber *next; struct scheduler_fiber *prev; void *stack; size_t stack_size; struct sys_fiber handle; struct job job; u32 value; }; struct scheduler_worker { int id; pthread_t thread; struct scheduler_fiber *context; struct scheduler *sched; }; typedef void (*scheduler_profiler_callback_f)(void*, int thread_id); struct scheduler_profiling { void *userdata; scheduler_profiler_callback_f thread_start; scheduler_profiler_callback_f thread_stop; scheduler_profiler_callback_f context_switch; scheduler_profiler_callback_f wait_start; scheduler_profiler_callback_f wait_stop; }; struct scheduler { struct sem work_sem; struct job_queue queue[JOB_QUEUE_COUNT]; int worker_count; struct scheduler_worker worker[MAX_SCHEDULER_WORKER]; volatile u32 worker_running; volatile u32 worker_active; struct scheduler_profiling profiler; int fiber_count; struct scheduler_fiber fibers[MAX_SCHEDULER_FIBERS]; volatile u32 wait_list_lock; struct scheduler_fiber *wait_list; volatile u32 free_list_lock; struct scheduler_fiber *free_list; }; static struct job Job(job_callback callback, void *data) { struct job task; task.callback = callback; task.data = data; task.run_count = 0; return task; } static void scheduler_hook_into_list(struct scheduler_fiber **list, struct scheduler_fiber *element, volatile u32 *lock) { spinlock_begin(lock); if (!*list) { *list = element; element->prev = 0; element->next = 0; } else { element->prev = 0; element->next = *list; (*list)->prev = element; *list = element; } spinlock_end(lock); } static void scheduler_unhook_from_list(struct scheduler_fiber **list, struct scheduler_fiber *element, volatile u32 *lock) { spinlock_begin(lock); if (element->next) element->next->prev = element->prev; if (element->prev) element->prev->next = element->next; if (*list == element) *list = element->next; element->next = element->prev = 0; spinlock_end(lock); } static struct scheduler_fiber* scheduler_find_fiber_finished_waiting(struct scheduler *s) { struct scheduler_fiber *iter = s->wait_list; spinlock_begin(&s->wait_list_lock); while (iter) { if (*iter->job.run_count == iter->value) break; iter = iter->next; } spinlock_end(&s->wait_list_lock); return iter; } static struct scheduler_fiber* scheduler_get_free_fiber(struct scheduler *s) { struct scheduler_fiber *fib = 0; spinlock_begin(&s->free_list_lock); if (s->fiber_count < MAX_SCHEDULER_FIBERS) { fib = &s->fibers[s->fiber_count++]; } else if (s->free_list) { fib = s->free_list; } spinlock_end(&s->free_list_lock); return fib; } static void scheduler_run(struct scheduler *s, enum job_queue_ids q, struct job *jobs, u32 count, job_counter *counter) { u32 jobIndex = 0; struct job_queue *queue; assert(q < JOB_QUEUE_COUNT); assert(counter); assert(jobs); assert(s); queue = &s->queue[q]; while (jobIndex < count) { jobs[jobIndex].run_count = counter; if (job_queue_push(queue, &jobs[jobIndex])) { sem_post(&s->work_sem, 1); jobIndex++; } } *counter = count; } static void fiber_proc(void *arg) { struct scheduler_worker *w = (struct scheduler_worker*)arg; struct scheduler *s = w->sched; __sync_add_and_fetch(&s->worker_active, 1); while (1) { /* check if any fiber is done waiting */ struct scheduler_fiber *fiber = scheduler_find_fiber_finished_waiting(s); if (fiber) { /* put old worker context into freelist */ struct scheduler_fiber *old = w->context; memset(w->context, 0, sizeof(*w->context)); scheduler_unhook_from_list(&s->wait_list, fiber, &s->wait_list_lock); scheduler_hook_into_list(&s->free_list, old, &s->free_list_lock); /* set previously waiting fiber as worker context */ w->context = fiber; if (s->profiler.context_switch) s->profiler.context_switch(s->profiler.userdata, w->id); fiber_switch_to(&old->handle, &w->context->handle); } /* check if any new jobs inside work queues */ {struct job *job = 0; if (!(job_queue_pop(&job, &s->queue[JOB_QUEUE_HIGH])) && !(job_queue_pop(&job, &s->queue[JOB_QUEUE_NORMAL])) && !(job_queue_pop(&job, &s->queue[JOB_QUEUE_LOW]))) { /* currently no job so wait */ __sync_sub_and_fetch(&s->worker_active, 1); if (s->profiler.wait_start) s->profiler.wait_start(s->profiler.userdata, w->id); sem_wait(&s->work_sem, 1); if (s->profiler.wait_stop) s->profiler.wait_stop(s->profiler.userdata, w->id); __sync_add_and_fetch(&s->worker_active, 1); } else { /* run dequeued job */ w->context->job = *job; assert(job->callback); if (s->profiler.thread_start) s->profiler.thread_start(s->profiler.userdata, w->id); job->callback(s, job->data); if (s->profiler.thread_stop) s->profiler.thread_stop(s->profiler.userdata, w->id); __sync_sub_and_fetch(job->run_count, 1); }} } } static void* thread_proc(void *arg) { struct scheduler_worker *w = (struct scheduler_worker*)arg; struct scheduler_fiber *fiber; struct scheduler *s = w->sched; __sync_add_and_fetch(&s->worker_running, 1); /* create dummy fiber */ fiber = scheduler_get_free_fiber(s); assert(fiber); getcontext(&fiber->handle.fib); fiber->handle.fib.uc_link = 0; fiber->handle.fib.uc_stack.ss_size = 0; fiber->handle.fib.uc_stack.ss_sp = 0; w->context = fiber; fiber_proc(w); return 0; } static void scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value) { struct scheduler_worker *w; struct scheduler_fiber *old; assert(s); assert(counter); /* find threads own worker state */ {int worker_index = 0; pthread_t self = pthread_self(); for (worker_index; worker_index < s->worker_count; ++worker_index) { if (s->worker[worker_index].thread == self) w = &s->worker[worker_index]; }} assert(w); /* insert current context into waiting list */ old = w->context; w->context->value = value; w->context->job.run_count = counter; scheduler_hook_into_list(&s->wait_list, old, &s->wait_list_lock); /*either continue finished waiting job or start new one */ w->context = scheduler_find_fiber_finished_waiting(s); if (!w->context) { w->context = scheduler_get_free_fiber(s); assert(w->context); scheduler_unhook_from_list(&s->free_list, w->context, &s->free_list_lock); fiber_create(&w->context->handle, w->context->stack, w->context->stack_size, fiber_proc, w); } else scheduler_unhook_from_list(&s->wait_list, w->context, &s->wait_list_lock); if (s->profiler.context_switch) s->profiler.context_switch(s->profiler.userdata, w->id); fiber_switch_to(&old->handle, &w->context->handle); } static void sched_init(struct scheduler *sched, size_t worker_count) { size_t thread_index = 0; pthread_attr_t attr; assert(sched); assert(worker_count); memset(sched, 0, sizeof(*sched)); sched->worker_count = (int)worker_count; /* init semeaphores */ sem_init(&sched->work_sem, 0); job_queue_init(&sched->queue[0]); job_queue_init(&sched->queue[1]); job_queue_init(&sched->queue[2]); /* init fibers */ {int fiber_index = 0; for (fiber_index; fiber_index < MAX_SCHEDULER_FIBERS; ++fiber_index) { struct scheduler_fiber *fiber = sched->fibers + fiber_index; fiber->stack_size = FIBER_STACK_SIZE; fiber->stack = calloc(fiber->stack_size, 1); }} /* start worker threads */ pthread_attr_init(&attr); for (thread_index; thread_index < worker_count-1; ++thread_index) { cpu_set_t cpus; sched->worker[thread_index].id = (int)thread_index; sched->worker[thread_index].sched = sched; /* bind thread to core */ CPU_ZERO(&cpus); CPU_SET(thread_index, &cpus); pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpus); /* start worker thread */ pthread_create(&sched->worker[thread_index].thread, &attr, thread_proc, &sched->worker[thread_index]); pthread_detach(sched->worker[thread_index].thread); } /* initialize main thread as worker thread */ {cpu_set_t cpus; CPU_ZERO(&cpus); CPU_SET(thread_index, &cpus); sched->worker[thread_index].sched = sched; sched->worker[thread_index].thread = pthread_self(); sched->worker[thread_index].id = (int)thread_index; pthread_setaffinity_np(sched->worker[thread_index].thread, sizeof(cpu_set_t), &cpus);} /* create fiber for main thread worker */ {struct scheduler_fiber *fiber; fiber = scheduler_get_free_fiber(sched); assert(fiber); getcontext(&fiber->handle.fib); fiber->handle.fib.uc_link = 0; fiber->handle.fib.uc_stack.ss_size = 0; fiber->handle.fib.uc_stack.ss_sp = 0; sched->worker[thread_index].context = fiber;} pthread_attr_destroy(&attr); } static void sched_free(struct scheduler *sched) { /* free fibers stack */ {int fiber_index = 0; for (fiber_index; fiber_index < MAX_SCHEDULER_FIBERS; ++fiber_index) { struct scheduler_fiber *fiber = sched->fibers + fiber_index; free(fiber->stack); }} sem_free(&sched->work_sem); } /* -------------------------------------------------------------- * * TEST * * --------------------------------------------------------------*/ struct game_state { int data; }; struct test_data { int *data; int from; int to; }; JOB_ENTRY_POINT(test_work) { int i; struct test_data *data = arg; for (i = data->from; i < data->to; ++i) data->data[i] = i; sleep(1); } JOB_ENTRY_POINT(root) { struct game_state *game = arg; job_counter counter = 0; struct job jobs[8]; struct test_data data[8]; int i, n[2*1024]; printf("root\n"); for (i = 0; i < 8; ++i) { data[i].data = n; data[i].from = i * 256; data[i].to = (i+1)*256; jobs[i] = Job(test_work, &data); } scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 4, &counter); printf("run\n"); scheduler_wait_for(sched, &counter, 0); printf("done\n"); } int main(int argc, char **argv) { struct game_state app; struct job job = Job(root, &app); job_counter counter; /* start root process */ struct scheduler sched; size_t thread_count = (size_t)sysconf(_SC_NPROCESSORS_ONLN); sched_init(&sched, thread_count); printf("init\n"); scheduler_run(&sched, JOB_QUEUE_HIGH, &job, 1, &counter); printf("run\n"); scheduler_wait_for(&sched, &counter, 0); printf("finished\n"); return 0; }