Skip to content

Instantly share code, notes, and snippets.

@aerosayan
Forked from vurtun/fibers.c
Created May 27, 2022 01:21
Show Gist options
  • Select an option

  • Save aerosayan/fa00ee2dfc54c56da2c00539cb092e12 to your computer and use it in GitHub Desktop.

Select an option

Save aerosayan/fa00ee2dfc54c56da2c00539cb092e12 to your computer and use it in GitHub Desktop.

Revisions

  1. @vurtun vurtun revised this gist Sep 19, 2019. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion fibers.c
    Original file line number Diff line number Diff line change
    @@ -130,7 +130,7 @@ bit_clear(u64 *bitset, unsigned int start, int len)
    * --------------------------------------------------------------*/
    static void
    spinlock_begin(volatile u32 *spinlock)
    {while (__sync_val_compare_and_swap(spinlock, 1, 0) != 0) _mm_pause();}
    {while (__sync_val_compare_and_swap(spinlock, 0, 1) != 0) _mm_pause();}

    static void
    spinlock_end(volatile u32 *spinlock)
  2. @vurtun vurtun revised this gist May 16, 2017. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion fibers.c
    Original file line number Diff line number Diff line change
    @@ -53,7 +53,7 @@ static u64
    bit_find_first_set(const u64 *addr, u64 size)
    {
    u64 idx;
    #define ffs(x) (u64)__builtin_ffsl((long int)addr[idx])
    #define ffs(x) (u64)__builtin_ffsl((long int)(x))
    for (idx = 0; idx * BITS_PER_LONG < size; idx++) {
    if (addr[idx]) {
    u64 first_set = ffs(addr[idx]) - 1;
  3. @vurtun vurtun revised this gist Mar 27, 2017. 1 changed file with 84 additions and 0 deletions.
    84 changes: 84 additions & 0 deletions test.c
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,84 @@
    /* --------------------------------------------------------------
    *
    * TEST
    *
    * --------------------------------------------------------------*/
    struct game_state {
    int data;
    struct memory memory;
    struct allocator game_alloc;
    struct allocator gpu_alloc;
    struct allocator render_alloc;
    };

    struct test_data {
    struct allocator *alloc;
    int *data;
    int from;
    int to;
    };

    JOB_ENTRY_POINT(test_work)
    {
    int i;
    void *mem;
    struct test_data *data = arg;
    mem = alloc(data->alloc, scheduler_self_id(sched), 1024);
    for (i = data->from; i < data->to; ++i)
    data->data[i] = i;

    printf("sleep begin\n");
    sleep(1);
    printf("sleep end\n");
    }

    JOB_ENTRY_POINT(root)
    {
    struct game_state *game = arg;
    job_counter counter = 0;
    struct job jobs[8];
    struct test_data data[8];
    int i, n[2*1024];

    printf("root\n");
    for (i = 0; i < 8; ++i) {
    data[i].alloc = (i&1) ? &game->game_alloc: &game->render_alloc;
    data[i].data = n;
    data[i].from = i * 256;
    data[i].to = (i+1)*256;
    jobs[i] = Job(test_work, &data);
    }

    scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 8, &counter);
    printf("run\n");
    scheduler_wait_for(sched, &counter, 0);
    mem_free(&game->memory, MEMORY_TAG_GAME);
    mem_free(&game->memory, MEMORY_TAG_RENDER);
    mem_free(&game->memory, MEMORY_TAG_GPU);
    printf("done\n");
    }

    int main(int argc, char **argv)
    {
    /* setup app memory and allocator */
    struct game_state app;
    size_t thread_count = (size_t)sysconf(_SC_NPROCESSORS_ONLN);
    memset(&app, 0, sizeof(app));
    mem_init(&app.memory);
    alloctor_init(&app.game_alloc, thread_count, &app.memory, MEMORY_TAG_GAME);
    alloctor_init(&app.render_alloc, thread_count, &app.memory, MEMORY_TAG_RENDER);
    alloctor_init(&app.gpu_alloc, thread_count, &app.memory, MEMORY_TAG_GPU);

    /* start root process */
    {struct scheduler sched;
    struct job job = Job(root, &app);
    job_counter counter;

    sched_init(&sched, thread_count);
    printf("init\n");
    scheduler_run(&sched, JOB_QUEUE_HIGH, &job, 1, &counter);
    printf("run\n");
    scheduler_wait_for(&sched, &counter, 0);
    printf("finished\n");}
    return 0;
    }
  4. @vurtun vurtun revised this gist Mar 27, 2017. 1 changed file with 0 additions and 84 deletions.
    84 changes: 0 additions & 84 deletions fibers.c
    Original file line number Diff line number Diff line change
    @@ -836,87 +836,3 @@ sched_free(struct scheduler *sched)
    sem_free(&sched->work_sem);
    }

    /* --------------------------------------------------------------
    *
    * TEST
    *
    * --------------------------------------------------------------*/
    struct game_state {
    int data;
    struct memory memory;
    struct allocator game_alloc;
    struct allocator gpu_alloc;
    struct allocator render_alloc;
    };

    struct test_data {
    struct allocator *alloc;
    int *data;
    int from;
    int to;
    };

    JOB_ENTRY_POINT(test_work)
    {
    int i;
    void *mem;
    struct test_data *data = arg;
    mem = alloc(data->alloc, scheduler_self_id(sched), 1024);
    for (i = data->from; i < data->to; ++i)
    data->data[i] = i;

    printf("sleep begin\n");
    sleep(1);
    printf("sleep end\n");
    }

    JOB_ENTRY_POINT(root)
    {
    struct game_state *game = arg;
    job_counter counter = 0;
    struct job jobs[8];
    struct test_data data[8];
    int i, n[2*1024];

    printf("root\n");
    for (i = 0; i < 8; ++i) {
    data[i].alloc = (i&1) ? &game->game_alloc: &game->render_alloc;
    data[i].data = n;
    data[i].from = i * 256;
    data[i].to = (i+1)*256;
    jobs[i] = Job(test_work, &data);
    }

    scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 8, &counter);
    printf("run\n");
    scheduler_wait_for(sched, &counter, 0);
    mem_free(&game->memory, MEMORY_TAG_GAME);
    mem_free(&game->memory, MEMORY_TAG_RENDER);
    mem_free(&game->memory, MEMORY_TAG_GPU);
    printf("done\n");
    }

    int main(int argc, char **argv)
    {
    /* setup app memory and allocator */
    struct game_state app;
    size_t thread_count = (size_t)sysconf(_SC_NPROCESSORS_ONLN);
    memset(&app, 0, sizeof(app));
    mem_init(&app.memory);
    alloctor_init(&app.game_alloc, thread_count, &app.memory, MEMORY_TAG_GAME);
    alloctor_init(&app.render_alloc, thread_count, &app.memory, MEMORY_TAG_RENDER);
    alloctor_init(&app.gpu_alloc, thread_count, &app.memory, MEMORY_TAG_GPU);

    /* start root process */
    {struct scheduler sched;
    struct job job = Job(root, &app);
    job_counter counter;

    sched_init(&sched, thread_count);
    printf("init\n");
    scheduler_run(&sched, JOB_QUEUE_HIGH, &job, 1, &counter);
    printf("run\n");
    scheduler_wait_for(&sched, &counter, 0);
    printf("finished\n");}
    return 0;
    }
  5. @vurtun vurtun revised this gist Mar 27, 2017. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion fibers.c
    Original file line number Diff line number Diff line change
    @@ -130,7 +130,7 @@ bit_clear(u64 *bitset, unsigned int start, int len)
    * --------------------------------------------------------------*/
    static void
    spinlock_begin(volatile u32 *spinlock)
    {while (__sync_val_compare_and_swap(spinlock, 1, 0) != 0);}
    {while (__sync_val_compare_and_swap(spinlock, 1, 0) != 0) _mm_pause();}

    static void
    spinlock_end(volatile u32 *spinlock)
  6. @vurtun vurtun revised this gist Jun 21, 2016. 1 changed file with 0 additions and 1 deletion.
    1 change: 0 additions & 1 deletion fibers.c
    Original file line number Diff line number Diff line change
    @@ -508,7 +508,6 @@ struct scheduler_fiber {
    struct scheduler_worker {
    int id;
    pthread_t thread;
    struct allocator allocator;
    struct scheduler_fiber *context;
    struct scheduler *sched;
    };
  7. @vurtun vurtun revised this gist Jun 19, 2016. 1 changed file with 0 additions and 2 deletions.
    2 changes: 0 additions & 2 deletions fibers.c
    Original file line number Diff line number Diff line change
    @@ -197,8 +197,6 @@ alloc_block(struct memory *mem, enum memory_tag tag)
    spinlock_begin(&mem->lock);
    index = (unsigned int)bit_find_first_set(mem->free, MAX_MEMORY_BLOCKS);
    assert(index < MAX_MEMORY_BLOCKS);
    if (index >= MAX_MEMORY_BLOCKS)
    return 0;

    {char *block;
    bit_clear(mem->free, index, 1);
  8. @vurtun vurtun revised this gist Jun 19, 2016. 1 changed file with 15 additions and 6 deletions.
    21 changes: 15 additions & 6 deletions fibers.c
    Original file line number Diff line number Diff line change
    @@ -711,7 +711,7 @@ thread_proc(void *arg)
    }

    static int
    scheduler_self_index(struct scheduler *s)
    scheduler_self_id(struct scheduler *s)
    {
    int worker_index = 0;
    pthread_t self = pthread_self();
    @@ -725,7 +725,7 @@ scheduler_self_index(struct scheduler *s)
    static struct scheduler_worker*
    scheduler_self(struct scheduler *s)
    {
    int worker_index = scheduler_self_index(s);
    int worker_index = scheduler_self_id(s);
    if (worker_index < 0) return 0;
    return &s->worker[worker_index];
    }
    @@ -847,10 +847,13 @@ sched_free(struct scheduler *sched)
    struct game_state {
    int data;
    struct memory memory;
    struct allocator alloc;
    struct allocator game_alloc;
    struct allocator gpu_alloc;
    struct allocator render_alloc;
    };

    struct test_data {
    struct allocator *alloc;
    int *data;
    int from;
    int to;
    @@ -859,7 +862,9 @@ struct test_data {
    JOB_ENTRY_POINT(test_work)
    {
    int i;
    void *mem;
    struct test_data *data = arg;
    mem = alloc(data->alloc, scheduler_self_id(sched), 1024);
    for (i = data->from; i < data->to; ++i)
    data->data[i] = i;

    @@ -878,6 +883,7 @@ JOB_ENTRY_POINT(root)

    printf("root\n");
    for (i = 0; i < 8; ++i) {
    data[i].alloc = (i&1) ? &game->game_alloc: &game->render_alloc;
    data[i].data = n;
    data[i].from = i * 256;
    data[i].to = (i+1)*256;
    @@ -887,6 +893,9 @@ JOB_ENTRY_POINT(root)
    scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 8, &counter);
    printf("run\n");
    scheduler_wait_for(sched, &counter, 0);
    mem_free(&game->memory, MEMORY_TAG_GAME);
    mem_free(&game->memory, MEMORY_TAG_RENDER);
    mem_free(&game->memory, MEMORY_TAG_GPU);
    printf("done\n");
    }

    @@ -897,9 +906,9 @@ int main(int argc, char **argv)
    size_t thread_count = (size_t)sysconf(_SC_NPROCESSORS_ONLN);
    memset(&app, 0, sizeof(app));
    mem_init(&app.memory);
    alloctor_init(&app.alloc, thread_count, &app.memory, MEMORY_TAG_GAME);
    alloctor_init(&app.alloc, thread_count, &app.memory, MEMORY_TAG_RENDER);
    alloctor_init(&app.alloc, thread_count, &app.memory, MEMORY_TAG_GPU);
    alloctor_init(&app.game_alloc, thread_count, &app.memory, MEMORY_TAG_GAME);
    alloctor_init(&app.render_alloc, thread_count, &app.memory, MEMORY_TAG_RENDER);
    alloctor_init(&app.gpu_alloc, thread_count, &app.memory, MEMORY_TAG_GPU);

    /* start root process */
    {struct scheduler sched;
  9. @vurtun vurtun revised this gist Jun 19, 2016. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion fibers.c
    Original file line number Diff line number Diff line change
    @@ -194,12 +194,12 @@ static void*
    alloc_block(struct memory *mem, enum memory_tag tag)
    {
    unsigned int index;
    spinlock_begin(&mem->lock);
    index = (unsigned int)bit_find_first_set(mem->free, MAX_MEMORY_BLOCKS);
    assert(index < MAX_MEMORY_BLOCKS);
    if (index >= MAX_MEMORY_BLOCKS)
    return 0;

    spinlock_begin(&mem->lock);
    {char *block;
    bit_clear(mem->free, index, 1);
    bit_set(mem->tags[tag], index, 1);
  10. @vurtun vurtun revised this gist Jun 19, 2016. 1 changed file with 4 additions and 4 deletions.
    8 changes: 4 additions & 4 deletions fibers.c
    Original file line number Diff line number Diff line change
    @@ -191,7 +191,7 @@ mem_free(struct memory *mem, enum memory_tag tag)
    }

    static void*
    block_alloc(struct memory *mem, enum memory_tag tag)
    alloc_block(struct memory *mem, enum memory_tag tag)
    {
    unsigned int index;
    index = (unsigned int)bit_find_first_set(mem->free, MAX_MEMORY_BLOCKS);
    @@ -236,7 +236,7 @@ alloctor_init(struct allocator *a, size_t worker_count,
    a->memory = memory;
    a->worker_count = worker_count;
    for(;i < a->worker_count; ++i) {
    a->blocks[i].data = block_alloc(a->memory, tag);
    a->blocks[i].data = alloc_block(a->memory, tag);
    a->blocks[i].capacity = MEMORY_BLOCK_SIZE;
    a->blocks[i].size = 0;
    }
    @@ -248,7 +248,7 @@ allocator_clear(struct allocator *a)
    unsigned int i = 0;
    for(;i < a->worker_count; ++i) {
    a->blocks[i].size = 0;
    a->blocks[i].data = block_alloc(a->memory, a->tag);
    a->blocks[i].data = alloc_block(a->memory, a->tag);
    a->blocks[i].capacity = MEMORY_BLOCK_SIZE;
    }
    }
    @@ -260,7 +260,7 @@ alloc(struct allocator *a, int thread_index, size_t size)
    assert(size < MEMORY_BLOCK_SIZE);
    if ((a->blocks[thread_index].size + size) > a->blocks[thread_index].capacity) {
    /* allocate new worker memory block */
    a->blocks[thread_index].data = block_alloc(a->memory, a->tag);
    a->blocks[thread_index].data = alloc_block(a->memory, a->tag);
    assert(a->blocks[thread_index].data);
    a->blocks[thread_index].size = 0;
    }
  11. @vurtun vurtun revised this gist Jun 19, 2016. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions fibers.c
    Original file line number Diff line number Diff line change
    @@ -50,7 +50,7 @@ typedef uint64_t u64;
    #define max(a,b) (((a) > (b)) ? (a): (b))

    static u64
    bit_find_first_bit(const u64 *addr, u64 size)
    bit_find_first_set(const u64 *addr, u64 size)
    {
    u64 idx;
    #define ffs(x) (u64)__builtin_ffsl((long int)addr[idx])
    @@ -194,7 +194,7 @@ static void*
    block_alloc(struct memory *mem, enum memory_tag tag)
    {
    unsigned int index;
    index = (unsigned int)bit_find_first_bit(mem->free, MAX_MEMORY_BLOCKS);
    index = (unsigned int)bit_find_first_set(mem->free, MAX_MEMORY_BLOCKS);
    assert(index < MAX_MEMORY_BLOCKS);
    if (index >= MAX_MEMORY_BLOCKS)
    return 0;
  12. @vurtun vurtun revised this gist Jun 19, 2016. 1 changed file with 271 additions and 17 deletions.
    288 changes: 271 additions & 17 deletions fibers.c
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,5 @@
    #include <stdio.h>
    #include <errno.h>
    #include <stdlib.h>
    #include <stdint.h>
    #include <assert.h>
    @@ -15,6 +16,8 @@
    #include "xmmintrin.h"
    #include <ucontext.h>

    #include <sys/mman.h>

    typedef int8_t i8;
    typedef int16_t i16;
    typedef int32_t i32;
    @@ -23,13 +26,103 @@ typedef uint8_t u8;
    typedef uint16_t u16;
    typedef uint32_t u32;
    typedef uint64_t u64;
    typedef long word;

    #define MAX_WORK_QUEUE_THREAD 64
    #define FIBER_STACK_SIZE (64 * 1024)
    #define MAX_SCHEDULER_WORKER 64
    #define MAX_SCHEDULER_FIBERS 128

    /* --------------------------------------------------------------
    *
    * BITSET
    *
    * --------------------------------------------------------------*/
    #define BITS_PER_BYTE 8
    #define BITS_PER_LONG 64
    #define BIT_WORD(nr) ((nr) / BITS_PER_LONG)
    #define DIV_ROUND_UP(n,d) (((n) + (d) - 1) / (d))
    #define BITS_TO_LONGS(nr) DIV_ROUND_UP(nr, BITS_PER_BYTE * sizeof(u64))
    #define BITMAP_FIRST_WORD_MASK(start) (~0UL << ((start) % (BITS_PER_LONG-1)))
    #define BITMAP_LAST_WORD_MASK(nbits) (((nbits) % BITS_PER_LONG) ? (1UL << ((nbits) % BITS_PER_LONG))-1: ~0UL)
    #define DECLARE_BITMAP(name, bits) u64 name[BITS_TO_LONGS(bits)]

    #define min(a,b) (((a) < (b)) ? (a): (b))
    #define max(a,b) (((a) > (b)) ? (a): (b))

    static u64
    bit_find_first_bit(const u64 *addr, u64 size)
    {
    u64 idx;
    #define ffs(x) (u64)__builtin_ffsl((long int)addr[idx])
    for (idx = 0; idx * BITS_PER_LONG < size; idx++) {
    if (addr[idx]) {
    u64 first_set = ffs(addr[idx]) - 1;
    return min(idx * BITS_PER_LONG + first_set, size);
    }
    }
    return size;
    }

    static void
    bit_fill(u64 *dst, unsigned int nbits)
    {
    unsigned int nlongs = (unsigned int)BITS_TO_LONGS(nbits);
    unsigned int len = (unsigned int)((nlongs - 1) * sizeof(u64));
    memset(dst, 0xff, len);
    }

    static void
    bit_or(u64 *dst, const u64 *bitmap1, const u64 *bitmap2,
    unsigned int nbits)
    {
    unsigned int k;
    unsigned int nr = (unsigned int)BITS_TO_LONGS(nbits);
    for (k = 0; k < nr; ++k)
    dst[k] = bitmap1[k] | bitmap2[k];
    }

    static void
    bit_set(u64 *bitset, unsigned int start, int len)
    {
    u64 *p = bitset + BIT_WORD(start);
    const unsigned int size = start + (unsigned int)len;
    int bits_to_set = (int)(BITS_PER_LONG - (start % BITS_PER_LONG));
    u64 mask_to_set = BITMAP_FIRST_WORD_MASK(start);

    while (len - bits_to_set >= 0) {
    *p |= mask_to_set;
    len -= bits_to_set;
    bits_to_set = BITS_PER_LONG;
    mask_to_set = ~0UL;
    p++;
    }
    if (len) {
    mask_to_set &= BITMAP_LAST_WORD_MASK(size);
    *p |= mask_to_set;
    }
    }

    static void
    bit_clear(u64 *bitset, unsigned int start, int len)
    {
    u64 *p = bitset + BIT_WORD(start);
    const unsigned int size = start + (unsigned int)len;
    int bits_to_clear = (int)(BITS_PER_LONG - (start % BITS_PER_LONG));
    unsigned int long mask_to_clear = BITMAP_FIRST_WORD_MASK(start);

    while (len - bits_to_clear >= 0) {
    *p &= ~mask_to_clear;
    len -= bits_to_clear;
    bits_to_clear = BITS_PER_LONG;
    mask_to_clear = ~0UL;
    p++;
    }
    if (len) {
    mask_to_clear &= BITMAP_LAST_WORD_MASK(size);
    *p &= ~mask_to_clear;
    }
    }

    /* --------------------------------------------------------------
    *
    * SPINLOCK
    @@ -43,6 +136,140 @@ static void
    spinlock_end(volatile u32 *spinlock)
    {_mm_sfence(); *spinlock = 0;}

    /* --------------------------------------------------------------
    *
    * MEMORY
    *
    * --------------------------------------------------------------*/
    #define MEMORY_BLOCK_SIZE (2*1024*1024) /* 2MB page size */
    #define MAX_MEMORY_BLOCKS 512 /* 1GB memory */
    typedef DECLARE_BITMAP(memory_bitmap, MAX_MEMORY_BLOCKS);

    enum memory_tag {
    MEMORY_TAG_GAME,
    MEMORY_TAG_RENDER,
    MEMORY_TAG_GPU,
    MEMORY_TAG_COUNT
    };

    struct memory {
    volatile u32 lock;
    memory_bitmap free;
    memory_bitmap tags[MEMORY_TAG_COUNT];
    void *data;
    size_t size;
    };

    static void
    mem_init(struct memory *mem)
    {
    int i;
    memset(mem, 0, sizeof(*mem));
    mem->size = (size_t)MEMORY_BLOCK_SIZE * (size_t)MAX_MEMORY_BLOCKS;
    mem->data = mmap(0, mem->size, PROT_READ|PROT_WRITE,
    MAP_PRIVATE|MAP_HUGETLB|MAP_ANONYMOUS, -1, 0);
    if (mem->data == MAP_FAILED)
    fprintf(stderr, "%s\n", strerror(errno));
    assert(mem->data != MAP_FAILED);
    bit_fill(mem->free, MAX_MEMORY_BLOCKS);
    }

    static void
    mem_clear(struct memory *mem)
    {
    munmap(mem->data, mem->size);
    memset(mem, 0, sizeof(*mem));
    }

    static void
    mem_free(struct memory *mem, enum memory_tag tag)
    {
    spinlock_begin(&mem->lock);
    bit_or(mem->free, mem->free, mem->tags[tag], MAX_MEMORY_BLOCKS);
    memset(mem->tags[tag], 0, sizeof(mem->tags[tag]));
    spinlock_end(&mem->lock);
    }

    static void*
    block_alloc(struct memory *mem, enum memory_tag tag)
    {
    unsigned int index;
    index = (unsigned int)bit_find_first_bit(mem->free, MAX_MEMORY_BLOCKS);
    assert(index < MAX_MEMORY_BLOCKS);
    if (index >= MAX_MEMORY_BLOCKS)
    return 0;

    spinlock_begin(&mem->lock);
    {char *block;
    bit_clear(mem->free, index, 1);
    bit_set(mem->tags[tag], index, 1);
    block = (char*)mem->data + (MEMORY_BLOCK_SIZE * index);
    spinlock_end(&mem->lock);
    return block;}
    }

    /* --------------------------------------------------------------
    *
    * ALLOCATOR
    *
    * --------------------------------------------------------------*/
    struct block {
    void *data;
    size_t size;
    size_t capacity;
    };

    struct allocator {
    enum memory_tag tag;
    struct memory *memory;
    struct block blocks[MAX_SCHEDULER_WORKER];
    size_t worker_count;
    };

    static void
    alloctor_init(struct allocator *a, size_t worker_count,
    struct memory *memory, enum memory_tag tag)
    {
    unsigned int i = 0;
    memset(a, 0, sizeof(*a));
    a->tag = tag;
    a->memory = memory;
    a->worker_count = worker_count;
    for(;i < a->worker_count; ++i) {
    a->blocks[i].data = block_alloc(a->memory, tag);
    a->blocks[i].capacity = MEMORY_BLOCK_SIZE;
    a->blocks[i].size = 0;
    }
    }

    static void
    allocator_clear(struct allocator *a)
    {
    unsigned int i = 0;
    for(;i < a->worker_count; ++i) {
    a->blocks[i].size = 0;
    a->blocks[i].data = block_alloc(a->memory, a->tag);
    a->blocks[i].capacity = MEMORY_BLOCK_SIZE;
    }
    }

    static void*
    alloc(struct allocator *a, int thread_index, size_t size)
    {
    void *memory = 0;
    assert(size < MEMORY_BLOCK_SIZE);
    if ((a->blocks[thread_index].size + size) > a->blocks[thread_index].capacity) {
    /* allocate new worker memory block */
    a->blocks[thread_index].data = block_alloc(a->memory, a->tag);
    assert(a->blocks[thread_index].data);
    a->blocks[thread_index].size = 0;
    }
    /* allocate from worker memory block */
    memory = (char*)a->blocks[thread_index].data + a->blocks[thread_index].size;
    a->blocks[thread_index].size += size;
    return memory;
    }

    /* --------------------------------------------------------------
    *
    * SEMAPHORE
    @@ -82,7 +309,7 @@ sem_post(struct sem *s, int delta)
    static void
    sem_wait(struct sem *s, int delta)
    {
    if (delta < 0 ) return;
    if (delta < 0) return;
    pthread_mutex_lock(&s->guard);
    do {
    if (s->count >= delta) {
    @@ -101,7 +328,6 @@ sem_wait(struct sem *s, int delta)
    * --------------------------------------------------------------*/
    #undef _FORTIFY_SOURCE
    typedef void(*fiber_callback)(void*);

    struct sys_fiber {
    ucontext_t fib;
    };
    @@ -284,6 +510,7 @@ struct scheduler_fiber {
    struct scheduler_worker {
    int id;
    pthread_t thread;
    struct allocator allocator;
    struct scheduler_fiber *context;
    struct scheduler *sched;
    };
    @@ -413,7 +640,7 @@ scheduler_run(struct scheduler *s, enum job_queue_ids q, struct job *jobs,
    }

    static void
    fiber_proc(void *arg)
    scheduler_fiber_proc(void *arg)
    {
    struct scheduler_worker *w = (struct scheduler_worker*)arg;
    struct scheduler *s = w->sched;
    @@ -479,10 +706,31 @@ thread_proc(void *arg)
    fiber->handle.fib.uc_stack.ss_sp = 0;
    w->context = fiber;

    fiber_proc(w);
    scheduler_fiber_proc(w);
    return 0;
    }

    static int
    scheduler_self_index(struct scheduler *s)
    {
    int worker_index = 0;
    pthread_t self = pthread_self();
    for (worker_index; worker_index < s->worker_count; ++worker_index) {
    if (s->worker[worker_index].thread == self)
    return worker_index;
    }
    return -1;
    }

    static struct scheduler_worker*
    scheduler_self(struct scheduler *s)
    {
    int worker_index = scheduler_self_index(s);
    if (worker_index < 0) return 0;
    return &s->worker[worker_index];
    }


    static void
    scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value)
    {
    @@ -493,12 +741,7 @@ scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value)
    assert(counter);

    /* find threads own worker state */
    {int worker_index = 0;
    pthread_t self = pthread_self();
    for (worker_index; worker_index < s->worker_count; ++worker_index) {
    if (s->worker[worker_index].thread == self)
    w = &s->worker[worker_index];
    }}
    w = scheduler_self(s);
    assert(w);

    /* insert current context into waiting list */
    @@ -512,7 +755,8 @@ scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value)
    if (!w->context) {
    w->context = scheduler_get_free_fiber(s);
    assert(w->context);
    fiber_create(&w->context->handle, w->context->stack, w->context->stack_size, fiber_proc, w);
    fiber_create(&w->context->handle, w->context->stack,
    w->context->stack_size, scheduler_fiber_proc, w);
    }
    if (s->profiler.context_switch)
    s->profiler.context_switch(s->profiler.userdata, w->id);
    @@ -557,7 +801,8 @@ sched_init(struct scheduler *sched, size_t worker_count)
    pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpus);

    /* start worker thread */
    pthread_create(&sched->worker[thread_index].thread, &attr, thread_proc, &sched->worker[thread_index]);
    pthread_create(&sched->worker[thread_index].thread, &attr,
    thread_proc, &sched->worker[thread_index]);
    pthread_detach(sched->worker[thread_index].thread);
    }

    @@ -601,6 +846,8 @@ sched_free(struct scheduler *sched)
    * --------------------------------------------------------------*/
    struct game_state {
    int data;
    struct memory memory;
    struct allocator alloc;
    };

    struct test_data {
    @@ -645,18 +892,25 @@ JOB_ENTRY_POINT(root)

    int main(int argc, char **argv)
    {
    /* setup app memory and allocator */
    struct game_state app;
    size_t thread_count = (size_t)sysconf(_SC_NPROCESSORS_ONLN);
    memset(&app, 0, sizeof(app));
    mem_init(&app.memory);
    alloctor_init(&app.alloc, thread_count, &app.memory, MEMORY_TAG_GAME);
    alloctor_init(&app.alloc, thread_count, &app.memory, MEMORY_TAG_RENDER);
    alloctor_init(&app.alloc, thread_count, &app.memory, MEMORY_TAG_GPU);

    /* start root process */
    {struct scheduler sched;
    struct job job = Job(root, &app);
    job_counter counter;

    /* start root process */
    struct scheduler sched;
    size_t thread_count = (size_t)sysconf(_SC_NPROCESSORS_ONLN);
    sched_init(&sched, thread_count);
    printf("init\n");
    scheduler_run(&sched, JOB_QUEUE_HIGH, &job, 1, &counter);
    printf("run\n");
    scheduler_wait_for(&sched, &counter, 0);
    printf("finished\n");
    printf("finished\n");}
    return 0;
    }
  13. @vurtun vurtun revised this gist Jun 14, 2016. 1 changed file with 5 additions and 1 deletion.
    6 changes: 5 additions & 1 deletion fibers.c
    Original file line number Diff line number Diff line change
    @@ -99,6 +99,7 @@ sem_wait(struct sem *s, int delta)
    * FIBER
    *
    * --------------------------------------------------------------*/
    #undef _FORTIFY_SOURCE
    typedef void(*fiber_callback)(void*);

    struct sys_fiber {
    @@ -614,7 +615,10 @@ JOB_ENTRY_POINT(test_work)
    struct test_data *data = arg;
    for (i = data->from; i < data->to; ++i)
    data->data[i] = i;

    printf("sleep begin\n");
    sleep(1);
    printf("sleep end\n");
    }

    JOB_ENTRY_POINT(root)
    @@ -633,7 +637,7 @@ JOB_ENTRY_POINT(root)
    jobs[i] = Job(test_work, &data);
    }

    scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 4, &counter);
    scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 8, &counter);
    printf("run\n");
    scheduler_wait_for(sched, &counter, 0);
    printf("done\n");
  14. @vurtun vurtun revised this gist Jun 14, 2016. 1 changed file with 5 additions and 8 deletions.
    13 changes: 5 additions & 8 deletions fibers.c
    Original file line number Diff line number Diff line change
    @@ -99,7 +99,6 @@ sem_wait(struct sem *s, int delta)
    * FIBER
    *
    * --------------------------------------------------------------*/
    #undef _FORTIFY_SOURCE
    typedef void(*fiber_callback)(void*);

    struct sys_fiber {
    @@ -349,7 +348,7 @@ static void
    scheduler_unhook_from_list(struct scheduler_fiber **list,
    struct scheduler_fiber *element, volatile u32 *lock)
    {
    spinlock_begin(lock);
    if (lock) spinlock_begin(lock);
    if (element->next)
    element->next->prev = element->prev;
    if (element->prev)
    @@ -358,7 +357,7 @@ scheduler_unhook_from_list(struct scheduler_fiber **list,
    *list = element->next;

    element->next = element->prev = 0;
    spinlock_end(lock);
    if (lock) spinlock_end(lock);
    }

    static struct scheduler_fiber*
    @@ -370,6 +369,7 @@ scheduler_find_fiber_finished_waiting(struct scheduler *s)
    if (*iter->job.run_count == iter->value) break;
    iter = iter->next;
    }
    if (iter) scheduler_unhook_from_list(&s->wait_list, iter, 0);
    spinlock_end(&s->wait_list_lock);
    return iter;
    }
    @@ -383,6 +383,7 @@ scheduler_get_free_fiber(struct scheduler *s)
    fib = &s->fibers[s->fiber_count++];
    } else if (s->free_list) {
    fib = s->free_list;
    scheduler_unhook_from_list(&s->free_list, fib, 0);
    }
    spinlock_end(&s->free_list_lock);
    return fib;
    @@ -424,7 +425,6 @@ fiber_proc(void *arg)
    /* put old worker context into freelist */
    struct scheduler_fiber *old = w->context;
    memset(w->context, 0, sizeof(*w->context));
    scheduler_unhook_from_list(&s->wait_list, fiber, &s->wait_list_lock);
    scheduler_hook_into_list(&s->free_list, old, &s->free_list_lock);

    /* set previously waiting fiber as worker context */
    @@ -511,9 +511,8 @@ scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value)
    if (!w->context) {
    w->context = scheduler_get_free_fiber(s);
    assert(w->context);
    scheduler_unhook_from_list(&s->free_list, w->context, &s->free_list_lock);
    fiber_create(&w->context->handle, w->context->stack, w->context->stack_size, fiber_proc, w);
    } else scheduler_unhook_from_list(&s->wait_list, w->context, &s->wait_list_lock);
    }
    if (s->profiler.context_switch)
    s->profiler.context_switch(s->profiler.userdata, w->id);
    fiber_switch_to(&old->handle, &w->context->handle);
    @@ -570,7 +569,6 @@ sched_init(struct scheduler *sched, size_t worker_count)
    sched->worker[thread_index].id = (int)thread_index;
    pthread_setaffinity_np(sched->worker[thread_index].thread, sizeof(cpu_set_t), &cpus);}


    /* create fiber for main thread worker */
    {struct scheduler_fiber *fiber;
    fiber = scheduler_get_free_fiber(sched);
    @@ -593,7 +591,6 @@ sched_free(struct scheduler *sched)
    free(fiber->stack);
    }}
    sem_free(&sched->work_sem);

    }

    /* --------------------------------------------------------------
  15. @vurtun vurtun revised this gist Jun 5, 2016. 1 changed file with 5 additions and 0 deletions.
    5 changes: 5 additions & 0 deletions fibers.c
    Original file line number Diff line number Diff line change
    @@ -562,9 +562,14 @@ sched_init(struct scheduler *sched, size_t worker_count)
    }

    /* initialize main thread as worker thread */
    {cpu_set_t cpus;
    CPU_ZERO(&cpus);
    CPU_SET(thread_index, &cpus);
    sched->worker[thread_index].sched = sched;
    sched->worker[thread_index].thread = pthread_self();
    sched->worker[thread_index].id = (int)thread_index;
    pthread_setaffinity_np(sched->worker[thread_index].thread, sizeof(cpu_set_t), &cpus);}


    /* create fiber for main thread worker */
    {struct scheduler_fiber *fiber;
  16. @vurtun vurtun revised this gist Jun 4, 2016. 1 changed file with 54 additions and 2 deletions.
    56 changes: 54 additions & 2 deletions fibers.c
    Original file line number Diff line number Diff line change
    @@ -62,6 +62,13 @@ sem_init(struct sem *s, int init)
    s->count = init;
    }

    static void
    sem_free(struct sem *s)
    {
    pthread_mutex_destroy(&s->guard);
    pthread_cond_destroy(&s->cond);
    }

    static void
    sem_post(struct sem *s, int delta)
    {
    @@ -281,12 +288,25 @@ struct scheduler_worker {
    struct scheduler *sched;
    };

    typedef void (*scheduler_profiler_callback_f)(void*, int thread_id);
    struct scheduler_profiling {
    void *userdata;
    scheduler_profiler_callback_f thread_start;
    scheduler_profiler_callback_f thread_stop;
    scheduler_profiler_callback_f context_switch;
    scheduler_profiler_callback_f wait_start;
    scheduler_profiler_callback_f wait_stop;
    };

    struct scheduler {
    struct sem work_sem;
    struct job_queue queue[JOB_QUEUE_COUNT];

    int worker_count;
    struct scheduler_worker worker[MAX_SCHEDULER_WORKER];
    volatile u32 worker_running;
    volatile u32 worker_active;
    struct scheduler_profiling profiler;

    int fiber_count;
    struct scheduler_fiber fibers[MAX_SCHEDULER_FIBERS];
    @@ -395,6 +415,8 @@ fiber_proc(void *arg)
    {
    struct scheduler_worker *w = (struct scheduler_worker*)arg;
    struct scheduler *s = w->sched;

    __sync_add_and_fetch(&s->worker_active, 1);
    while (1) {
    /* check if any fiber is done waiting */
    struct scheduler_fiber *fiber = scheduler_find_fiber_finished_waiting(s);
    @@ -407,6 +429,8 @@ fiber_proc(void *arg)

    /* set previously waiting fiber as worker context */
    w->context = fiber;
    if (s->profiler.context_switch)
    s->profiler.context_switch(s->profiler.userdata, w->id);
    fiber_switch_to(&old->handle, &w->context->handle);
    }

    @@ -415,11 +439,23 @@ fiber_proc(void *arg)
    if (!(job_queue_pop(&job, &s->queue[JOB_QUEUE_HIGH])) &&
    !(job_queue_pop(&job, &s->queue[JOB_QUEUE_NORMAL])) &&
    !(job_queue_pop(&job, &s->queue[JOB_QUEUE_LOW]))) {
    /* currently no job so wait */
    __sync_sub_and_fetch(&s->worker_active, 1);
    if (s->profiler.wait_start)
    s->profiler.wait_start(s->profiler.userdata, w->id);
    sem_wait(&s->work_sem, 1);
    if (s->profiler.wait_stop)
    s->profiler.wait_stop(s->profiler.userdata, w->id);
    __sync_add_and_fetch(&s->worker_active, 1);
    } else {
    /* run dequeued job */
    w->context->job = *job;
    assert(job->callback);
    if (s->profiler.thread_start)
    s->profiler.thread_start(s->profiler.userdata, w->id);
    job->callback(s, job->data);
    if (s->profiler.thread_stop)
    s->profiler.thread_stop(s->profiler.userdata, w->id);
    __sync_sub_and_fetch(job->run_count, 1);
    }}
    }
    @@ -431,6 +467,7 @@ thread_proc(void *arg)
    struct scheduler_worker *w = (struct scheduler_worker*)arg;
    struct scheduler_fiber *fiber;
    struct scheduler *s = w->sched;
    __sync_add_and_fetch(&s->worker_running, 1);

    /* create dummy fiber */
    fiber = scheduler_get_free_fiber(s);
    @@ -440,6 +477,7 @@ thread_proc(void *arg)
    fiber->handle.fib.uc_stack.ss_size = 0;
    fiber->handle.fib.uc_stack.ss_sp = 0;
    w->context = fiber;

    fiber_proc(w);
    return 0;
    }
    @@ -476,6 +514,8 @@ scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value)
    scheduler_unhook_from_list(&s->free_list, w->context, &s->free_list_lock);
    fiber_create(&w->context->handle, w->context->stack, w->context->stack_size, fiber_proc, w);
    } else scheduler_unhook_from_list(&s->wait_list, w->context, &s->wait_list_lock);
    if (s->profiler.context_switch)
    s->profiler.context_switch(s->profiler.userdata, w->id);
    fiber_switch_to(&old->handle, &w->context->handle);
    }

    @@ -538,6 +578,19 @@ sched_init(struct scheduler *sched, size_t worker_count)
    pthread_attr_destroy(&attr);
    }

    static void
    sched_free(struct scheduler *sched)
    {
    /* free fibers stack */
    {int fiber_index = 0;
    for (fiber_index; fiber_index < MAX_SCHEDULER_FIBERS; ++fiber_index) {
    struct scheduler_fiber *fiber = sched->fibers + fiber_index;
    free(fiber->stack);
    }}
    sem_free(&sched->work_sem);

    }

    /* --------------------------------------------------------------
    *
    * TEST
    @@ -559,6 +612,7 @@ JOB_ENTRY_POINT(test_work)
    struct test_data *data = arg;
    for (i = data->from; i < data->to; ++i)
    data->data[i] = i;
    sleep(1);
    }

    JOB_ENTRY_POINT(root)
    @@ -580,8 +634,6 @@ JOB_ENTRY_POINT(root)
    scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 4, &counter);
    printf("run\n");
    scheduler_wait_for(sched, &counter, 0);
    for (i = 0; i < 2*1024; ++i)
    assert(n[i] == i);
    printf("done\n");
    }

  17. @vurtun vurtun created this gist Jun 3, 2016.
    604 changes: 604 additions & 0 deletions fibers.c
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,604 @@
    #include <stdio.h>
    #include <stdlib.h>
    #include <stdint.h>
    #include <assert.h>
    #include <string.h>
    #define streq(a, b) (!strcmp((a), (b)))

    #ifndef __USE_GNU
    #define __USE_GNU
    #endif
    #include <setjmp.h>
    #include <sched.h>
    #include <unistd.h>
    #include <pthread.h>
    #include "xmmintrin.h"
    #include <ucontext.h>

    typedef int8_t i8;
    typedef int16_t i16;
    typedef int32_t i32;
    typedef int64_t i64;
    typedef uint8_t u8;
    typedef uint16_t u16;
    typedef uint32_t u32;
    typedef uint64_t u64;
    typedef long word;

    #define MAX_WORK_QUEUE_THREAD 64
    #define FIBER_STACK_SIZE (64 * 1024)
    #define MAX_SCHEDULER_WORKER 64
    #define MAX_SCHEDULER_FIBERS 128

    /* --------------------------------------------------------------
    *
    * SPINLOCK
    *
    * --------------------------------------------------------------*/
    static void
    spinlock_begin(volatile u32 *spinlock)
    {while (__sync_val_compare_and_swap(spinlock, 1, 0) != 0);}

    static void
    spinlock_end(volatile u32 *spinlock)
    {_mm_sfence(); *spinlock = 0;}

    /* --------------------------------------------------------------
    *
    * SEMAPHORE
    *
    * --------------------------------------------------------------*/
    struct sem {
    pthread_mutex_t guard;
    pthread_cond_t cond;
    int count;
    };

    static void
    sem_init(struct sem *s, int init)
    {
    pthread_mutex_init(&s->guard, 0);
    pthread_cond_init(&s->cond, 0);
    s->count = init;
    }

    static void
    sem_post(struct sem *s, int delta)
    {
    if (delta < 0) return;
    pthread_mutex_lock(&s->guard);
    s->count += delta;
    pthread_cond_broadcast(&s->cond);
    pthread_mutex_unlock(&s->guard);
    }

    static void
    sem_wait(struct sem *s, int delta)
    {
    if (delta < 0 ) return;
    pthread_mutex_lock(&s->guard);
    do {
    if (s->count >= delta) {
    s->count -= delta;
    break;
    }
    pthread_cond_wait(&s->cond, &s->guard);
    } while (1);
    pthread_mutex_unlock(&s->guard);
    }

    /* --------------------------------------------------------------
    *
    * FIBER
    *
    * --------------------------------------------------------------*/
    #undef _FORTIFY_SOURCE
    typedef void(*fiber_callback)(void*);

    struct sys_fiber {
    ucontext_t fib;
    };

    static void
    fiber_create(struct sys_fiber *fib, void *stack, size_t stack_size,
    fiber_callback callback, void *data)
    {
    getcontext(&fib->fib);
    fib->fib.uc_stack.ss_size = stack_size;
    fib->fib.uc_stack.ss_sp = stack;
    fib->fib.uc_link = 0;
    makecontext(&fib->fib, (void(*)())callback, 1, data);
    }

    static void
    fiber_switch_to(struct sys_fiber *prev, struct sys_fiber *fib)
    {
    swapcontext(&prev->fib, &fib->fib);
    }

    /* --------------------------------------------------------------
    *
    * QUEUE
    *
    * --------------------------------------------------------------*/
    /* This is an implementation of a multi producer and consumer Non-Blocking
    * Concurrent FIFO Queue based on the paper from Phillippas Tsigas and Yi Zhangs:
    * www.cse.chalmers.se/~tsigas/papers/latest-spaa01.pdf */
    #define MAX_WORK_QUEUE_JOBS (1024)
    #define WORK_QUEUE_MASK (MAX_WORK_QUEUE_JOBS-1)

    struct scheduler;
    typedef void(*job_callback)(struct scheduler*,void*);
    #define JOB_ENTRY_POINT(name) static void name(struct scheduler *sched, void *arg)
    #define BASE_ALIGN(x) __attribute__((aligned(x)))

    #define QUEUE_EMPTY 0
    #define QUEUE_REMOVED 1

    struct job {
    void *data;
    job_callback callback;
    volatile u32 *run_count;
    };

    struct job_queue {
    volatile u32 head;
    struct job *jobs[MAX_WORK_QUEUE_JOBS];
    volatile u32 tail;
    };

    static void
    job_queue_init(struct job_queue *q)
    {
    memset(q, 0, sizeof(*q));
    q->jobs[0] = QUEUE_EMPTY;
    q->head = 0;
    q->tail = 1;
    }

    static int
    job_queue_entry_free(struct job *p)
    {
    return (((uintptr_t)p == QUEUE_EMPTY) || ((uintptr_t)p == QUEUE_REMOVED));
    }

    static int
    job_queue_push(struct job_queue *q, struct job *job)
    {
    while (1) {
    /* read tail */
    u32 te = q->tail;
    u32 ate = te;
    struct job *tt = q->jobs[ate];
    u32 tmp = (ate + 1) & WORK_QUEUE_MASK;
    struct job *tnew;

    /* we want to find the actual tail */
    while (!(job_queue_entry_free(tt))) {
    /* check tails consistency */
    if (te != q->tail) goto retry;
    /* check if queue is full */
    if (tmp == q->head) break;
    tt = q->jobs[tmp];
    ate = tmp;
    tmp = (ate + 1) & WORK_QUEUE_MASK;
    }

    /* check tails consistency */
    if (te != q->tail) continue;

    /* check if queue is full */
    if (tmp == q->head) {
    ate = (tmp + 1) & WORK_QUEUE_MASK;
    tt = q->jobs[ate];
    if (!(job_queue_entry_free(tt)))
    return 0; /* queue is full */

    /* let pop update header */
    __sync_bool_compare_and_swap(&q->head, tmp, ate);
    continue;
    }

    if ((uintptr_t)tt == QUEUE_REMOVED)
    job = (struct job*)((uintptr_t)job | 0x01);
    if (te != q->tail) continue;
    if (__sync_bool_compare_and_swap(&q->jobs[ate], tt, job)) {
    if ((tmp & 1) == 0)
    __sync_bool_compare_and_swap(&q->tail, te, tmp);
    return 1;
    }
    retry:;
    }
    }

    static int
    job_queue_pop(struct job **job, struct job_queue *q)
    {
    while (1) {
    u32 th = q->head;
    u32 tmp = (th + 1) & WORK_QUEUE_MASK;
    struct job *tt = q->jobs[tmp];
    struct job *tnull = 0;

    /* we want to find the actual head */
    while ((job_queue_entry_free(tt))) {
    if (th != q->head) goto retry;
    if (tmp == q->tail) return 0;
    tmp = (tmp + 1) & WORK_QUEUE_MASK;
    tt = q->jobs[tmp];
    }

    /* check head's consistency */
    if (th != q->head) continue;

    /* check if queue is empty */
    if (tmp == q->tail) {
    /* help push to update end */
    __sync_bool_compare_and_swap(&q->tail, tmp, (tmp+1) & WORK_QUEUE_MASK);
    continue; /* retry */
    }
    tnull = (((uintptr_t)tt & 0x01) ? (struct job*)QUEUE_REMOVED: (struct job*)QUEUE_EMPTY);
    if (th != q->head) continue;

    /* get actual head */
    if (__sync_bool_compare_and_swap(&q->jobs[tmp], tt, tnull)) {
    if ((tmp & 0x1) == 0)
    __sync_bool_compare_and_swap(&q->head, th, tmp);
    *job = (struct job*)((uintptr_t)tt & ~(uintptr_t)1);
    return 1;
    }
    retry:;
    }
    }

    /* --------------------------------------------------------------
    *
    * SCHEDULER
    *
    * --------------------------------------------------------------*/
    typedef volatile u32 job_counter;
    enum job_queue_ids {
    JOB_QUEUE_LOW,
    JOB_QUEUE_NORMAL,
    JOB_QUEUE_HIGH,
    JOB_QUEUE_COUNT
    };

    struct scheduler_fiber {
    struct scheduler_fiber *next;
    struct scheduler_fiber *prev;
    void *stack;
    size_t stack_size;
    struct sys_fiber handle;
    struct job job;
    u32 value;
    };

    struct scheduler_worker {
    int id;
    pthread_t thread;
    struct scheduler_fiber *context;
    struct scheduler *sched;
    };

    struct scheduler {
    struct sem work_sem;
    struct job_queue queue[JOB_QUEUE_COUNT];

    int worker_count;
    struct scheduler_worker worker[MAX_SCHEDULER_WORKER];

    int fiber_count;
    struct scheduler_fiber fibers[MAX_SCHEDULER_FIBERS];

    volatile u32 wait_list_lock;
    struct scheduler_fiber *wait_list;
    volatile u32 free_list_lock;
    struct scheduler_fiber *free_list;
    };

    static struct job
    Job(job_callback callback, void *data)
    {
    struct job task;
    task.callback = callback;
    task.data = data;
    task.run_count = 0;
    return task;
    }

    static void
    scheduler_hook_into_list(struct scheduler_fiber **list,
    struct scheduler_fiber *element, volatile u32 *lock)
    {
    spinlock_begin(lock);
    if (!*list) {
    *list = element;
    element->prev = 0;
    element->next = 0;
    } else {
    element->prev = 0;
    element->next = *list;
    (*list)->prev = element;
    *list = element;
    }
    spinlock_end(lock);
    }

    static void
    scheduler_unhook_from_list(struct scheduler_fiber **list,
    struct scheduler_fiber *element, volatile u32 *lock)
    {
    spinlock_begin(lock);
    if (element->next)
    element->next->prev = element->prev;
    if (element->prev)
    element->prev->next = element->next;
    if (*list == element)
    *list = element->next;

    element->next = element->prev = 0;
    spinlock_end(lock);
    }

    static struct scheduler_fiber*
    scheduler_find_fiber_finished_waiting(struct scheduler *s)
    {
    struct scheduler_fiber *iter = s->wait_list;
    spinlock_begin(&s->wait_list_lock);
    while (iter) {
    if (*iter->job.run_count == iter->value) break;
    iter = iter->next;
    }
    spinlock_end(&s->wait_list_lock);
    return iter;
    }

    static struct scheduler_fiber*
    scheduler_get_free_fiber(struct scheduler *s)
    {
    struct scheduler_fiber *fib = 0;
    spinlock_begin(&s->free_list_lock);
    if (s->fiber_count < MAX_SCHEDULER_FIBERS) {
    fib = &s->fibers[s->fiber_count++];
    } else if (s->free_list) {
    fib = s->free_list;
    }
    spinlock_end(&s->free_list_lock);
    return fib;
    }

    static void
    scheduler_run(struct scheduler *s, enum job_queue_ids q, struct job *jobs,
    u32 count, job_counter *counter)
    {
    u32 jobIndex = 0;
    struct job_queue *queue;
    assert(q < JOB_QUEUE_COUNT);
    assert(counter);
    assert(jobs);
    assert(s);

    queue = &s->queue[q];
    while (jobIndex < count) {
    jobs[jobIndex].run_count = counter;
    if (job_queue_push(queue, &jobs[jobIndex])) {
    sem_post(&s->work_sem, 1);
    jobIndex++;
    }
    }
    *counter = count;
    }

    static void
    fiber_proc(void *arg)
    {
    struct scheduler_worker *w = (struct scheduler_worker*)arg;
    struct scheduler *s = w->sched;
    while (1) {
    /* check if any fiber is done waiting */
    struct scheduler_fiber *fiber = scheduler_find_fiber_finished_waiting(s);
    if (fiber) {
    /* put old worker context into freelist */
    struct scheduler_fiber *old = w->context;
    memset(w->context, 0, sizeof(*w->context));
    scheduler_unhook_from_list(&s->wait_list, fiber, &s->wait_list_lock);
    scheduler_hook_into_list(&s->free_list, old, &s->free_list_lock);

    /* set previously waiting fiber as worker context */
    w->context = fiber;
    fiber_switch_to(&old->handle, &w->context->handle);
    }

    /* check if any new jobs inside work queues */
    {struct job *job = 0;
    if (!(job_queue_pop(&job, &s->queue[JOB_QUEUE_HIGH])) &&
    !(job_queue_pop(&job, &s->queue[JOB_QUEUE_NORMAL])) &&
    !(job_queue_pop(&job, &s->queue[JOB_QUEUE_LOW]))) {
    sem_wait(&s->work_sem, 1);
    } else {
    w->context->job = *job;
    assert(job->callback);
    job->callback(s, job->data);
    __sync_sub_and_fetch(job->run_count, 1);
    }}
    }
    }

    static void*
    thread_proc(void *arg)
    {
    struct scheduler_worker *w = (struct scheduler_worker*)arg;
    struct scheduler_fiber *fiber;
    struct scheduler *s = w->sched;

    /* create dummy fiber */
    fiber = scheduler_get_free_fiber(s);
    assert(fiber);
    getcontext(&fiber->handle.fib);
    fiber->handle.fib.uc_link = 0;
    fiber->handle.fib.uc_stack.ss_size = 0;
    fiber->handle.fib.uc_stack.ss_sp = 0;
    w->context = fiber;
    fiber_proc(w);
    return 0;
    }

    static void
    scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value)
    {
    struct scheduler_worker *w;
    struct scheduler_fiber *old;

    assert(s);
    assert(counter);

    /* find threads own worker state */
    {int worker_index = 0;
    pthread_t self = pthread_self();
    for (worker_index; worker_index < s->worker_count; ++worker_index) {
    if (s->worker[worker_index].thread == self)
    w = &s->worker[worker_index];
    }}
    assert(w);

    /* insert current context into waiting list */
    old = w->context;
    w->context->value = value;
    w->context->job.run_count = counter;
    scheduler_hook_into_list(&s->wait_list, old, &s->wait_list_lock);

    /*either continue finished waiting job or start new one */
    w->context = scheduler_find_fiber_finished_waiting(s);
    if (!w->context) {
    w->context = scheduler_get_free_fiber(s);
    assert(w->context);
    scheduler_unhook_from_list(&s->free_list, w->context, &s->free_list_lock);
    fiber_create(&w->context->handle, w->context->stack, w->context->stack_size, fiber_proc, w);
    } else scheduler_unhook_from_list(&s->wait_list, w->context, &s->wait_list_lock);
    fiber_switch_to(&old->handle, &w->context->handle);
    }

    static void
    sched_init(struct scheduler *sched, size_t worker_count)
    {
    size_t thread_index = 0;
    pthread_attr_t attr;

    assert(sched);
    assert(worker_count);
    memset(sched, 0, sizeof(*sched));
    sched->worker_count = (int)worker_count;

    /* init semeaphores */
    sem_init(&sched->work_sem, 0);
    job_queue_init(&sched->queue[0]);
    job_queue_init(&sched->queue[1]);
    job_queue_init(&sched->queue[2]);

    /* init fibers */
    {int fiber_index = 0;
    for (fiber_index; fiber_index < MAX_SCHEDULER_FIBERS; ++fiber_index) {
    struct scheduler_fiber *fiber = sched->fibers + fiber_index;
    fiber->stack_size = FIBER_STACK_SIZE;
    fiber->stack = calloc(fiber->stack_size, 1);
    }}

    /* start worker threads */
    pthread_attr_init(&attr);
    for (thread_index; thread_index < worker_count-1; ++thread_index) {
    cpu_set_t cpus;
    sched->worker[thread_index].id = (int)thread_index;
    sched->worker[thread_index].sched = sched;

    /* bind thread to core */
    CPU_ZERO(&cpus);
    CPU_SET(thread_index, &cpus);
    pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpus);

    /* start worker thread */
    pthread_create(&sched->worker[thread_index].thread, &attr, thread_proc, &sched->worker[thread_index]);
    pthread_detach(sched->worker[thread_index].thread);
    }

    /* initialize main thread as worker thread */
    sched->worker[thread_index].sched = sched;
    sched->worker[thread_index].thread = pthread_self();
    sched->worker[thread_index].id = (int)thread_index;

    /* create fiber for main thread worker */
    {struct scheduler_fiber *fiber;
    fiber = scheduler_get_free_fiber(sched);
    assert(fiber);
    getcontext(&fiber->handle.fib);
    fiber->handle.fib.uc_link = 0;
    fiber->handle.fib.uc_stack.ss_size = 0;
    fiber->handle.fib.uc_stack.ss_sp = 0;
    sched->worker[thread_index].context = fiber;}
    pthread_attr_destroy(&attr);
    }

    /* --------------------------------------------------------------
    *
    * TEST
    *
    * --------------------------------------------------------------*/
    struct game_state {
    int data;
    };

    struct test_data {
    int *data;
    int from;
    int to;
    };

    JOB_ENTRY_POINT(test_work)
    {
    int i;
    struct test_data *data = arg;
    for (i = data->from; i < data->to; ++i)
    data->data[i] = i;
    }

    JOB_ENTRY_POINT(root)
    {
    struct game_state *game = arg;
    job_counter counter = 0;
    struct job jobs[8];
    struct test_data data[8];
    int i, n[2*1024];

    printf("root\n");
    for (i = 0; i < 8; ++i) {
    data[i].data = n;
    data[i].from = i * 256;
    data[i].to = (i+1)*256;
    jobs[i] = Job(test_work, &data);
    }

    scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 4, &counter);
    printf("run\n");
    scheduler_wait_for(sched, &counter, 0);
    for (i = 0; i < 2*1024; ++i)
    assert(n[i] == i);
    printf("done\n");
    }

    int main(int argc, char **argv)
    {
    struct game_state app;
    struct job job = Job(root, &app);
    job_counter counter;

    /* start root process */
    struct scheduler sched;
    size_t thread_count = (size_t)sysconf(_SC_NPROCESSORS_ONLN);
    sched_init(&sched, thread_count);
    printf("init\n");
    scheduler_run(&sched, JOB_QUEUE_HIGH, &job, 1, &counter);
    printf("run\n");
    scheduler_wait_for(&sched, &counter, 0);
    printf("finished\n");
    return 0;
    }