// https://gist.github.com/markpapadakis/8dba5c480c13b12a056e (example) // https://medium.com/@markpapadakis/high-performance-services-using-coroutines-ac8e9f54d727 #include #include #include #include #include #include #include // Computed gotos for faster dispatch and lifted restrictions wrt to code that can be placed in switch {} #define COROS_HAVE_COMPUTEDLABELSDISPATCH class CorosScheduler; // Caveats/gotchas: // 1. you can't rely on stack(stackless coroutines/actors), so you need to use coroutine members for persisting state // e.g for (uint32_t i = 0; i != 10; ++i) { YieldCoro(); Print(i, "\n"); } will not work // whereas if you have a member uint32_t i, then for (i = 0; i != 10; ++i) { YieldCoro(); Print("i, "\n"); } will work fine // // 2. If your coroutine calls a function/functor and it or another factor in the call stack wants to yield, then the functor must // be converted into a coroutine instead and scheduled via WaitCoro() // // 3. Because labels are used as return points, you can't create/initialize vars in BeginCoro() .. EndCoro() unless // they are inside { }. This shouldn't be much of an issue. // // Benefits: // Very lightweight coroutines. Because they are stackless, they can run on any thread, and its fairly trivial to optimize CorosScheduler::Run() to // e.g consider type of coro and move it to a background thread (e.g for disk I/O), dequeing from a global/per-thread coroutines queue (submitted work) etc // Priorities allow for an interesting scheme; this is based on Linux Kernel O(1) scheduler implementation semantics. // // You can also use thread_local freelists of various coroutine instances, as opposed to deleting them when completed, if you // creating many 1000s/second and that becomes an issue. It's not implemented in this prototype, but it should be trivial to support that // kind of reuse semantics. struct coroutine { friend class CorosScheduler; using runres_t = coroutine *; #ifdef COROS_HAVE_COMPUTEDLABELSDISPATCH using resume_token_t = const void *; #else using resume_token_t = uint16_t; #endif struct coroutine { protected: switch_dlist schedulerList; resume_token_t resumeToken{0}; coroutine *parent{nullptr}; // 8 different prioerities, from 0(lowest) to 7(highest) // higher priority coros are executed first. // e.g no coros of priority 0 will run unless there are no more runnable coros of priority 1 or higher uint8_t prio{0}; #pragma mark BEGIN:Coroutiens API #ifdef COROS_HAVE_COMPUTEDLABELSDISPATCH // Place at the beginning of your operato() impl. #define BeginCoro() if (!resumeToken) { resumeToken = &&___coroEntry; } else { goto *resumeToken; } ___coroEntry: // Place at the end of your operator() impl. #define EndCoro() return 0 #define YieldCoroImpl(res) resumeToken = &&__macro_concat(___coroLabel, __LINE__); return (runres_t)(res); __macro_concat(___coroLabel, __LINE__): // Yield the coro, but next time it it's scheduled, it will be restarted as opposed to continue from where RestartCoro() has been invoked // This is e.g useful for when you have a coro for network I/O, and you want to restart from beginning whenever it runs #define RestartCoro() resumeToken = nullptr; return (runres_t)2 #else #define BeginCoro() switch (resumeToken) { case 0: #define EndCoro() } return 0 #define YieldCoroImpl(res) resumeToken = __LINE__; return (runres_t)(res); case __LINE__: #define RestartCoro() resumeToken = 0; return (runres_t)2 #endif // Exit coro - won't reschedule; will delete/free it #define ExitCoro() return 0 // Yield to antother(if any) runnable coro. // Will exit and return control back to the scheduler, which will place it back in runnable queues, but will // choose another coro to run #define YieldCoro() YieldCoroImpl(1) // Will yield, but will not be placed back in the runnable queue sto run again // Instead `c` will be scheduled and when it's done, then this coro will be placed back in the runnable queues to run again #define WaitCoro(c) YieldCoroImpl(c) #define WaitCoroWithPrio(c, p) YieldCoroImpl(coroutine::SetPrio(c, p)) // Will yield, but coro will not be placed in the runnable queues. It will not be deleted, and is // expected to be scheduled back again later. // Useful for when e.g you want to 'block' this thread until say a network event comes in in which case your network I/O logic matches it // with this coro and schedules it back in. Useful for rate, special-case workloads // WaitCoroWithPrio() is a handy macro for also setting priority to coro #define FreezeCoro() YieldCoroImpl(3) // You may want to designate other scalar/constants as return values for YieldCoroImpl(). e.g 5 for schedule to bg thread, or 10 for reschedule again after 1 minute, etc. #pragma mark END:Coroutines API public: coroutine(void) { switch_dlist_init(&schedulerList); } virtual ~coroutine(void) { } virtual runres_t operator()(void) = 0; _ALWAYS_inline_ auto Priority(void) -> uint8_t { return prio; } void SetPriority(const uint8_t p) { assert(p < 8); prio = p; } // See: WaitCoroWithPrio() static _ALWAYS_inline_ coroutine *SetPrio(coroutine *const c, const uint8_t p) { c->SetPriority(p); return c; } }; // An example scheduler implementation // For an alternative impl. see Run() comments // A more sophisticated scheduler would run runnable coros, and also dequeue from a coros queue submitted by other threads, // support delayed re-scheduling, etc. // // The overhead compared to not using coros is <= 1us class CorosScheduler { private: switch_dlist corosList[8]; // Multiple runnable queues, for each supported prio uint32_t blockedCoros{0}; // How many coros are blocked(created, not runnable) waiting for a child coro to complete uint8_t runnableMask{0}; private: void ScheduleCoro(coroutine *const c) { const auto p = c->Priority(); if (unlikely(p >= 8)) { // Low-priority; move to another background thread(e.g disk I/O operation) // IMPLEMENT_ME } else { switch_dlist_insert_before(&corosList[p], &c->schedulerList); runnableMask|=(1U<schedulerList); if (switch_dlist_isempty(&l)) { // This runnable queue is now empty runnableMask&=~(1U << p); } return coro; } } void FreeCoro(coroutine *const coro) { // TODO: maintain thread_local freeList for coro->Type() delete coro; } void RunCoro(coroutine *const coro) { const auto r = (*coro)(); switch ((uintptr_t)r) { case 0: // Coro has endeded if (coro->parent) { // Has a parent waiting for this coro's completion to resume --blockedCoros; ScheduleCoro(coro->parent); } FreeCoro(coro); break; case 1: // Yielded, place back in runnable, but choose another coro now, if available // See YieldCoro() ScheduleCoro(coro); break; case 2: // Finished, but it wants to run again as soon as it can // This can be useful, though not sure how yet;) // See RestartCoro() ScheduleCoro(coro); break; case 3: // Co-ro is frozen. That is, its not runnable, but we won't delete it // The idea is that another thread or another coro will eventually make it runnable again later // This is for some edge-cases where you need to freeze the coro, e.g run soemthing on another thread(not as a coro) and then // notify the thread scheduler to schedule it back in // See FreezeCoro() break; default: // Coro is waiting for another coro, setup parenthood and schedule it (potentially to another thread?) // See WaitCoro() ++blockedCoros; r->parent = coro; r->resumeToken = 0; ScheduleCoro(r); break; } } bool RunNextRunnable(void) { if (auto *const coro = NextRunnable()) { RunCoro(coro); return true; } else return false; } inline bool AnyRunnable(void) const { return runnableMask; } inline auto Blocked(void) const -> uint32_t { return blockedCoros; } public: CorosScheduler(void) { for (auto &it : corosList) switch_dlist_init(&it); } static void ScheduleInThreadScheduler(coroutine *const c) { // Schedule in current thread scheduler // TODO: implement me } void Schedule(coroutine *const c, const uint8_t prio = 0) { ScheduleCoro(c); c->SetPriority(prio); c->resumeToken = 0; } // `c` ran on another thread and now it's done, and that thred handed it off back to us // We need to check if it has a parent, and if it does, make it runnable again(was waiting for `c`) void ProcessCompletedInAnotherThread(coroutine *const c) { if (auto *const parent = c->parent) { // submit into e.g thread-specific MPSQ queue to be dequeued later by e.g TryDequeSubmittedCoro() or DequeueSubmittedCoro() ScheduleCoroUnsafe(parent); } delete c; } virtual void Run(void) { // An alternative implementation of this method would // also try dequeing from a thread-specific tasks queue, or a global queue, or whatever else // e.g // // for (;;) // { // (void)RunNextRunnable(); // // if (AnyRunnable()) // { // // At least one runnable, don't block waiting for external work // if (auto *const c = TryDequeSubmittedCoro()) // ScheduleCoro(c); // } // else // { // // No runnables, block waiting for external work if needed // ScheduleCoro(DequeueSubmittedCoro()); // } //} // See also: https://gist.github.com/markpapadakis/8dba5c480c13b12a056e // // We could also implement network I/O poll as another coro with the lowest priority which // returns control to the scheduler with RestartCoro(). See example in this file // // This is optimal for services that accept and manage connections I/O and also execute their requests while (RunNextRunnable()) { } } }; // https://twitter.com/ID_AA_Carmack/status/575788622554628096 static CorosScheduler TheScheduler; struct singer_coro : public coroutine { resume_res_t operator()(void) override { BeginCoro(); Print("Singing\n"); EndCoro(); } }; struct diskreader_coro : public coroutine { strwlen32_t *out; // dummy diskreader_coro(strwlen32_t *const o) : out(o) { } resume_res_t operator()(void) override { BeginCoro(); out->Set(_S("Hello World")); EndCoro(); } }; struct dancer_coro : public coroutine { strwlen32_t localBuf; uint32_t i; resume_res_t operator()(void) override { BeginCoro(); Print("Dancing\n"); YieldCoro(); Print("Did Sing!\n"); // Get some data into localBuf // maybe this would block accessing the disk or whatever else. // WaitCoro() will put this coro to sleep, waiting until another coro runs and then // it's made runnable again WaitCoro(new diskreader_coro(&localBuf)); Print("Got:", localBuf, "\n"); for (i = 0; i != 10; ++i) { Print("i = ", i, "\n"); if (i == 5) ExitCoro(); else YieldCoro(); } EndCoro(); } }; struct reader_coro : public coroutine { int fd; const uint64_t offset, len; void *const buf; reader_coro(int _fd, void *const _buf, const uint64_t _offset, const uint64_t _len) : fd(_fd), offset(_offset), len(_len), buf(_buf) { } resume_res_t operator()(void) override { BeginCoro(); (void)pread64(fd, buf, len, offset); EndCoro(); } }; // A simple MD5 checksum coro struct task_coroutine : public coroutine { const char *const path; uint64_t fileSize, offset, upto, span; CMD5 md5Factory; uint8_t buf[1024]; int fd; task_coroutine(const char *const fullPath) : path{fullPath} { } resume_res_t operator()(void) override { BeginCoro(); fd = open(path, O_RDONLY); assert(fd != -1); fileSize = lseek64(fd, 0, SEEK_END); md5Factory.Init(); for (offset = 0; offset != fileSize; ) { upto = Min(fileSize, offset + 1024); span = upto - offset; // If we did have pread2v(), we could attempt read, and if it failed with EAGAIN, we 'd // use WaitCor() which would (based on scheduler semantics) schedule it on another background thread // see: https://lwn.net/Articles/612483/ WaitCoro(new reader_coro(fd, buf, offset, span)); md5Factory.Update(buf, span); offset = upto; } (void)close(fd); uint8_t digest[16]; md5Factory.Finalize(digest); Print(hex_fmt(digest, 16), "\n"); EndCoro(); } }; struct fetchcf_coro : public coroutine { }; int main(int argc, char *argv[]) { #if 0 TheScheduler.Schedule(new dancer_coro()); TheScheduler.Schedule(new callable_coro( [](void) { Print("Hello World\n"); })); TheScheduler.Schedule(new singer_coro()); #endif TheScheduler.Schedule(new task_coroutine("/etc/passwd")); TheScheduler.Run(); return 0; }