Skip to content

Instantly share code, notes, and snippets.

@Globik
Created May 8, 2018 23:47
Show Gist options
  • Save Globik/80fc3c9b83877a76c46e61ed1ccc78dd to your computer and use it in GitHub Desktop.
Save Globik/80fc3c9b83877a76c46e61ed1ccc78dd to your computer and use it in GitHub Desktop.

Revisions

  1. Globik created this gist May 8, 2018.
    408 changes: 408 additions & 0 deletions tasks.c
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,408 @@
    /*
    * Copyright (c) 2014 Joris Vink <[email protected]>
    *
    * Permission to use, copy, modify, and distribute this software for any
    * purpose with or without fee is hereby granted, provided that the above
    * copyright notice and this permission notice appear in all copies.
    *
    * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
    * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
    * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
    * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
    * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
    * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
    * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
    */

    #include <sys/param.h>
    #include <sys/queue.h>
    #include <sys/socket.h>

    #include <pthread.h>
    #include <stdio.h>
    #include <inttypes.h>
    #include <stdlib.h>

    #include "kore.h"
    #include "http.h"
    #include "tasks.h"

    static u_int8_t threads;
    static TAILQ_HEAD(, kore_task_thread) task_threads;

    u_int16_t kore_task_threads = KORE_TASK_THREADS;

    static void *task_thread(void *);
    static void task_channel_read(int, void *, u_int32_t);
    static void task_channel_write(int, void *, u_int32_t);
    // i hardcoded additional parameter to the spawn
    // if insert to the task_threads or not. In the future TODO this in creation function
    // and else an additional field to the struct 'kore_task'
    static void task_thread_spawn(struct kore_task_thread **, int);

    int suka=1;

    #define THREAD_FD_ASSIGN(t, f, i, o) \
    do { \
    if (pthread_self() == t) { \
    f = i; \
    } else { \
    f = o; \
    } \
    } while (0);

    void
    kore_task_init(void)
    {
    printf("kore_task_init\n");
    threads = 0;
    TAILQ_INIT(&task_threads);
    }

    void
    kore_task_create(struct kore_task *t, int (*entry)(struct kore_task *)/*, IT WOULD BE NICE HERE to have the dummy_flag integer */)
    {
    t->cb = NULL;
    // t->dummy_flag = dummy_flag
    #if !defined(KORE_NO_HTTP)
    t->req = NULL;
    #endif
    t->entry = entry;
    t->type = KORE_TYPE_TASK;
    t->state = KORE_TASK_STATE_CREATED;
    pthread_rwlock_init(&(t->lock), NULL);

    if (socketpair(AF_UNIX, SOCK_STREAM, 0, t->fds) == -1)
    fatal("kore_task_create: socketpair() %s", errno_s);
    }

    void
    kore_task_run(struct kore_task *t)
    {
    struct kore_task_thread *tt;
    // hardcoded to t->cb, but in the future TODO: add a standalone flag to additional parameter
    int dummy_flag=1;
    if(t->cb !=NULL){
    printf("TASK RUN: t->cb NOT NULL!!!!\n");
    // reserving a long running loop. TODO: add to callback additional parameter (flag)
    // if we are using one reserved thread or NOT
    // something like this:
    // void kore_task_create(struct kore_task *t, int (*entry)(struct kore_task *), int dummy_flag)
    // and then assign t->dummy_flag=1 or 0; long running task means not insert into task_threads;
    // not long running task means insert into task_threads
    dummy_flag=0;
    }

    kore_platform_schedule_read(t->fds[0], t);
    if (threads < kore_task_threads) {
    printf("task_thread_spawn() will lock tt->lock for us.\n");
    task_thread_spawn(&tt, dummy_flag);
    dummy_flag=1;
    } else {
    printf(" Cycle task around. \n");
    if ((tt = TAILQ_FIRST(&task_threads)) == NULL)
    fatal("no available tasks threads?");printf("no available tasks?");
    pthread_mutex_lock(&(tt->lock));
    TAILQ_REMOVE(&task_threads, tt, list);
    TAILQ_INSERT_TAIL(&task_threads, tt, list);
    }

    t->thread = tt;
    printf("insert tail\n");
    TAILQ_INSERT_TAIL(&(tt->tasks), t, list);

    pthread_mutex_unlock(&(tt->lock));
    pthread_cond_signal(&(tt->cond));
    }

    #if !defined(KORE_NO_HTTP)
    void
    kore_task_bind_request(struct kore_task *t, struct http_request *req)
    {
    kore_debug("kore_task_bind_request: %p bound to %p", req, t);
    printf("kore_task_bind_request: %p bound to %p\n", (void*)req, (void*)t);

    if (t->cb != NULL)
    fatal("cannot bind cbs and requests at the same time");

    t->req = req;
    LIST_INSERT_HEAD(&(req->tasks), t, rlist);

    http_request_sleep(req);
    }
    #endif

    void
    kore_task_bind_callback(struct kore_task *t, void (*cb)(struct kore_task *))
    {
    #if !defined(KORE_NO_HTTP)
    if (t->req != NULL)
    fatal("cannot bind requests and cbs at the same time");
    #endif
    t->cb = cb;
    }

    void
    kore_task_destroy(struct kore_task *t)
    {
    kore_debug("kore_task_destroy: %p", t);
    printf("kore_task_destroy: %p\n", (void*)t);
    #if !defined(KORE_NO_HTTP)
    if (t->req != NULL) {
    printf("t->req is NOT NULL in rlist\n");
    t->req = NULL;
    LIST_REMOVE(t, rlist);
    }else{printf("t->req is NULL\n");}
    #endif

    pthread_rwlock_wrlock(&(t->lock));

    if (t->fds[0] != -1) {
    printf("closing t->fds[0]\n");
    (void)close(t->fds[0]);
    t->fds[0] = -1;
    }

    if (t->fds[1] != -1) {
    (void)close(t->fds[1]);
    t->fds[1] = -1;
    }

    pthread_rwlock_unlock(&(t->lock));
    pthread_rwlock_destroy(&(t->lock));
    }

    int
    kore_task_finished(struct kore_task *t)
    {
    return ((kore_task_state(t) == KORE_TASK_STATE_FINISHED));
    }

    void
    kore_task_finish(struct kore_task *t)
    {
    kore_debug("kore_task_finished: %p (%d)", t, t->result);
    printf("kore_task_finished: %p (%d)\n", (void*)t, t->result);

    pthread_rwlock_wrlock(&(t->lock));

    if (t->fds[1] != -1) {
    (void)close(t->fds[1]);
    t->fds[1] = -1;
    }else{printf("t->fds[1] not -1\n");}

    pthread_rwlock_unlock(&(t->lock));
    }

    void
    kore_task_channel_write(struct kore_task *t, void *data, u_int32_t len)
    {
    int fd;

    kore_debug("kore_task_channel_write: %p <- %p (%ld)", t, data, len);
    printf("kore_task_channel_write: %p <- %p %" PRIu32 "\n", (void*)t, data, len);
    THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]);
    task_channel_write(fd, &len, sizeof(len));
    task_channel_write(fd, data, len);
    }

    u_int32_t
    kore_task_channel_read(struct kore_task *t, void *out, u_int32_t len)
    {
    int fd;
    u_int32_t dlen, bytes;

    kore_debug("kore_task_channel_read: %p -> %p (%ld)", t, out, len);
    printf("kore_task_channel_read: %p -> %p %" PRIu32 "\n", (void*)t, out,len);

    THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]);
    task_channel_read(fd, &dlen, sizeof(dlen));

    if (dlen > len)
    bytes = len;
    else
    bytes = dlen;

    task_channel_read(fd, out, bytes);

    return (dlen);
    }

    void
    kore_task_handle(struct kore_task *t, int finished)
    {
    kore_debug("kore_task_handle: %p, %d", t, finished);
    //printf("kore_task_handle: %p, %d\n", (void*)t, finished);

    #if !defined(KORE_NO_HTTP)
    if (t->req != NULL)
    http_request_wakeup(t->req);
    #endif

    if (finished) {
    //printf("IS FINISHED!\n");
    kore_platform_disable_read(t->fds[0]);
    kore_task_set_state(t, KORE_TASK_STATE_FINISHED);
    #if !defined(KORE_NO_HTTP)
    if (t->req != NULL)
    {
    // printf("t->req is not null\n");
    if (t->req->flags & HTTP_REQUEST_DELETE)
    kore_task_destroy(t);
    }
    #endif
    }

    if (t->cb != NULL)
    t->cb(t);

    }

    int
    kore_task_state(struct kore_task *t)
    {
    int s;

    pthread_rwlock_rdlock(&(t->lock));
    s = t->state;
    pthread_rwlock_unlock(&(t->lock));

    return (s);
    }

    void
    kore_task_set_state(struct kore_task *t, int state)
    {
    pthread_rwlock_wrlock(&(t->lock));
    t->state = state;
    pthread_rwlock_unlock(&(t->lock));
    }

    int
    kore_task_result(struct kore_task *t)
    {
    int r;

    pthread_rwlock_rdlock(&(t->lock));
    r = t->result;
    pthread_rwlock_unlock(&(t->lock));

    return (r);
    }

    void
    kore_task_set_result(struct kore_task *t, int result)
    {
    pthread_rwlock_wrlock(&(t->lock));
    t->result = result;
    pthread_rwlock_unlock(&(t->lock));
    }

    static void
    task_channel_write(int fd, void *data, u_int32_t len)
    {
    ssize_t r;
    u_int8_t *d;
    u_int32_t offset;

    d = data;
    offset = 0;
    while (offset != len) {
    r = write(fd, d + offset, len - offset);
    if (r == -1 && errno == EINTR)
    continue;
    if (r == -1)
    fatal("task_channel_write: %s", errno_s);
    offset += r;
    }
    }

    static void
    task_channel_read(int fd, void *out, u_int32_t len)
    {
    ssize_t r;
    u_int8_t *d;
    u_int32_t offset;

    d = out;
    offset = 0;
    while (offset != len) {
    r = read(fd, d + offset, len - offset);
    if (r == -1 && errno == EINTR)
    continue;
    if (r == -1)
    fatal("task_channel_read: %s", errno_s);
    if (r == 0)
    fatal("task_channel_read: unexpected eof");

    offset += r;
    }
    }

    static void
    task_thread_spawn(struct kore_task_thread **out, int dummy_flag)
    {
    struct kore_task_thread *tt;

    tt = kore_malloc(sizeof(*tt));
    // reserving for me 1 thread, which don't supposed to be inserted in to the task_threads LIST!;
    // running "for ever for the third party LOOP!!!!! No matter what. GLIB, LIBUV etc etc etc!!!
    // For what you're asking me? The f. knows. For the third party libs. WebRTC, Janus Gateway etc.
    if(dummy_flag !=0){tt->idx = threads++;}

    printf("idx %d\n",tt->idx);
    TAILQ_INIT(&(tt->tasks));
    pthread_cond_init(&(tt->cond), NULL);
    pthread_mutex_init(&(tt->lock), NULL);
    pthread_mutex_lock(&(tt->lock));
    if(dummy_flag == 0){
    printf("aha - not inserting TAIL into the task_threads\n");
    }else{
    // inserting!
    TAILQ_INSERT_TAIL(&task_threads, tt, list);
    }

    if (pthread_create(&(tt->tid), NULL, task_thread, tt) != 0)
    fatal("pthread_create: %s", errno_s);

    *out = tt;
    }

    static void *
    task_thread(void *arg)
    {
    struct kore_task *t;
    struct kore_task_thread *tt = arg;

    kore_debug("task_thread: #%d starting", tt->idx);
    printf("task_thread: #%d starting\n", tt->idx);

    pthread_mutex_lock(&(tt->lock));

    for (;;) {
    if (TAILQ_EMPTY(&(tt->tasks)))

    pthread_cond_wait(&(tt->cond), &(tt->lock));

    kore_debug("task_thread#%d: woke up", tt->idx);
    printf("task_thread#%d: woke up\n", tt->idx);
    t = TAILQ_FIRST(&(tt->tasks));
    TAILQ_REMOVE(&(tt->tasks), t, list);

    pthread_mutex_unlock(&(tt->lock));

    kore_debug("task_thread#%d: executing %p", tt->idx, t);
    printf("task_thread#%d: executing %p\n", tt->idx,(void*) t);

    kore_task_set_state(t, KORE_TASK_STATE_RUNNING);
    kore_task_set_result(t, t->entry(t));
    kore_task_finish(t);

    pthread_mutex_lock(&(tt->lock));
    }

    pthread_exit(NULL);

    /* NOTREACHED */
    return (NULL);
    }