Skip to content

Instantly share code, notes, and snippets.

@native-m
Created July 6, 2020 17:07
Show Gist options
  • Save native-m/8b7b02b865759892cd5de00913d941e8 to your computer and use it in GitHub Desktop.
Save native-m/8b7b02b865759892cd5de00913d941e8 to your computer and use it in GitHub Desktop.

Revisions

  1. native-m created this gist Jul 6, 2020.
    112 changes: 112 additions & 0 deletions ThreadPool.h
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,112 @@
    #include <thread>
    #include <vector>
    #include <functional>
    #include <condition_variable>
    #include <queue>
    #include <utility>

    class ThreadPool
    {
    public:
    using Invoker = std::function<void(void*)>;

    ThreadPool(size_t numThreads = std::thread::hardware_concurrency())
    {
    start(numThreads);
    }

    ~ThreadPool()
    {
    stop();
    }

    template<class Function, class... Args>
    void addTask(Function&& t, Args&&... args)
    {
    using TupleType = std::tuple<std::decay_t<Function>, std::decay_t<Args>...>;

    {
    std::unique_lock<std::mutex> lock(m_locker);
    auto fnArgs = new TupleType(std::forward<Function>(t), std::forward<Args>(args)...);
    constexpr auto invoker = getInvoker<TupleType>(std::make_index_sequence<1 + sizeof...(Args)>{});
    m_taskQueue.emplace(std::pair<Invoker, void*>(std::move(invoker), static_cast<void*>(fnArgs)));
    }

    m_cv.notify_one();
    }

    // not guarantee
    void stopAllTasks()
    {
    stop();
    }

    private:
    std::vector<std::thread> m_threads;
    std::mutex m_locker;
    std::condition_variable m_cv;
    std::queue<std::pair<Invoker, void*>> m_taskQueue;
    bool m_stop = false;

    void start(size_t numThreads)
    {
    for (size_t i = 0; i < numThreads; i++) {
    auto task = [this, &i] {
    while (true) {
    Invoker t;
    void* data = nullptr;

    {
    std::unique_lock<std::mutex> lock(m_locker);

    m_cv.wait(lock, [&] {
    return m_stop || !m_taskQueue.empty();
    });

    if (m_stop && m_taskQueue.empty())
    break;

    std::pair<Invoker, void*>& p = m_taskQueue.front();
    t = std::move(p.first);
    data = p.second;
    m_taskQueue.pop();
    }

    t(data);
    }
    };

    m_threads.push_back(std::thread(task));
    }
    }

    void stop()
    {
    {
    std::unique_lock<std::mutex> lock(m_locker);
    m_stop = true;
    }

    m_cv.notify_all();

    for (auto& thread : m_threads) {
    if (thread.joinable()) {
    thread.join();
    }
    }
    }

    template<class Tuple, size_t... Indices>
    static constexpr auto getInvoker(std::index_sequence<Indices...>)
    {
    return &invoker<Tuple, Indices...>;
    }

    template<class Tuple, size_t... Indices>
    static void invoker(void* data)
    {
    const std::unique_ptr<Tuple> fn(static_cast<Tuple*>(data));
    Tuple& tp = *fn;
    std::invoke(std::move(std::get<Indices>(tp))...);
    }
    };