Skip to content

Instantly share code, notes, and snippets.

@danikin
Last active January 30, 2019 03:24
Show Gist options
  • Save danikin/8bfa33e6dff93ccf211c to your computer and use it in GitHub Desktop.
Save danikin/8bfa33e6dff93ccf211c to your computer and use it in GitHub Desktop.

Revisions

  1. danikin revised this gist Mar 22, 2016. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions tar_async.cpp
    Original file line number Diff line number Diff line change
    @@ -78,11 +78,11 @@ class TarantoolConnection
    tnt_replace(tnt_, 512, tuple);
    ++j;

    tnt_flush(tnt_);
    // tnt_flush(tnt_);
    tnt_object_reset(tuple);
    }

    // tnt_flush(tnt_);
    tnt_flush(tnt_);
    // tnt_object_reset(tuple);

    // Notify the receiveing thread that it can start receive data
  2. danikin revised this gist Mar 22, 2016. 1 changed file with 68 additions and 67 deletions.
    135 changes: 68 additions & 67 deletions tar_async.cpp
    Original file line number Diff line number Diff line change
    @@ -13,6 +13,8 @@
    #include <deque>
    #include <thread>
    #include <mutex>
    #include <condition_variable>
    #include <cassert>

    // An object of this class can be used to connect to tarantool
    // and then to issue requests on one connection from different threads.
    @@ -47,52 +49,47 @@ class TarantoolConnection
    // Constantly sending everything from the out queue
    while (true)
    {
    // Check out_queue outside of mutex for performance reason
    // I should use interlocks here, but this only an POC alpha-version
    if (!out_queue_.empty())
    {
    std::deque<tnt_stream*> temp;

    {
    std::unique_lock<std::mutex> l(out_mutex_);

    // Waiting while out_queue is empty
    // out_cond should be fired once a queue is not empty
    while (out_queue_.empty())
    out_cond_.wait(l);

    // Copy all the queue to the temp one
    // We're doing that under the mutex
    // In fact this is extremely fast as it is the swap operation
    std::deque<tnt_stream*> temp;
    {
    std::lock_guard<std::mutex> l(out_mutex_);
    temp.swap(out_queue_);
    }
    temp.swap(out_queue_);
    }

    // If queue is empty then just wait
    if (temp.empty())
    {
    usleep(1000);
    continue;
    }
    else
    {
    //printf("sending %d requests\n", temp.size());

    // Now we don't need a mutex. Just send everything over the network
    for (auto i = temp.begin(); i != temp.end(); ++i)
    {
    tnt_object_add_array(tuple, 2);
    tnt_object_add_int(tuple, j);
    tnt_object_add_int(tuple, j);
    tnt_replace(tnt_, 512, tuple);
    ++j;
    tnt_flush(tnt_);
    tnt_object_reset(tuple);

    }
    assert(!temp.empty());

    // printf("sending %d requests\n", temp.size());

    // Now we don't need a mutex. Just send everything over the network
    for (auto i = temp.begin(); i != temp.end(); ++i)
    {
    tnt_object_add_array(tuple, 2);
    tnt_object_add_int(tuple, j);
    tnt_object_add_int(tuple, j);
    tnt_replace(tnt_, 512, tuple);
    ++j;

    tnt_flush(tnt_);
    tnt_object_reset(tuple);
    }

    }
    }
    else
    // Wait a little bit for a new requests in the out_queue
    // Use usleep for now instead of the conditional
    // Yeah, I know, this is shit :-)
    usleep(1000);
    // tnt_flush(tnt_);
    // tnt_object_reset(tuple);

    // Notify the receiveing thread that it can start receive data
    // It we did not than it would in an active waiting as read_reply would return 1
    send_notify_cond_.notify_all();


    }
    } // while
    }

    void ReceivingThread()
    @@ -138,16 +135,26 @@ class TarantoolConnection
    std::lock_guard<std::mutex> l(in_mutex_);

    in_queue_.insert(in_queue_.end(), temp.begin(), temp.end());
    }

    // We're notifying all the threads, but it would be better to notify only those of them
    // that waits for the data in just received "temp"
    in_cond_.notify_all();
    }
    // We ain't got any result
    else
    {
    if (r == -1)
    fprintf(stderr, "Error receiveing response: r=%d, '%s'\n", r, reply.error);
    else
    // No result - just wait a little bit
    // No result - just wait util it comes in
    if (r == 1)
    usleep(1000);
    {

    // Waiting for a send thread to send data to network
    // We have to wait because right now we don't have any data to read from network
    std::unique_lock<std::mutex> l(send_notify_mutex_);
    send_notify_cond_.wait(l);
    }
    }
    }
    }
    @@ -173,7 +180,8 @@ class TarantoolConnection
    std::deque<struct tnt_reply*> in_queue_;
    std::deque<tnt_stream*> out_queue_;

    std::mutex in_mutex_, out_mutex_;
    std::mutex in_mutex_, out_mutex_, send_notify_mutex_;
    std::condition_variable in_cond_, out_cond_, send_notify_cond_;

    std::thread send_thread_, receive_thread_, timer_thread_;

    @@ -216,40 +224,33 @@ int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply **re
    std::lock_guard<std::mutex> l(out_mutex_);

    out_queue_.push_back(tuple);

    // Notify the sending thread that it can send data
    out_cond_.notify_all();
    }

    // Now this query should be processed in a background sending thread
    // Now this query should be processed in a background sending thread

    // Waiting for a response which will come in from a background receving thread
    while (true)
    {
    // Check in_queue outside of mutex for performance reason
    // I should use interlocks here, but this only an POC alpha-version
    if (!in_queue_.empty())
    {
    // There is something in queue - get this answer to the client
    std::lock_guard<std::mutex> l(in_mutex_);

    // Check it again
    if (in_queue_.empty())
    continue;
    // There is something in queue - get this answer to the client
    std::unique_lock<std::mutex> l(in_mutex_);

    {
    // Wait until we have data in the in_queue
    while (in_queue_.empty())
    in_cond_.wait(l);

    assert(!in_queue_.empty());

    *result = in_queue_.front();
    in_queue_.pop_front();

    ++num_answers_;
    }

    // if (!(num_answers_%100000))
    // printf("num_answers_=%d\n", num_answers_);

    return 1;
    }
    else
    // Wait a little bit for a new responses in the in_queue
    // Use usleep for now instead of the conditional
    // Yeah, I know, this is shit :-)
    usleep(1000);
    }
    return 1;
    }

    void DoTest(TarantoolConnection *conn)
    @@ -267,7 +268,7 @@ int main()
    return 0;

    std::thread t[1000];
    for (int i = 0;i < 500;++i)
    for (int i = 0;i < 1000;++i)
    t[i] = std::thread(&DoTest, &conn);

    sleep(1000000);
  3. danikin revised this gist Mar 21, 2016. 1 changed file with 22 additions and 20 deletions.
    42 changes: 22 additions & 20 deletions tar_async.cpp
    Original file line number Diff line number Diff line change
    @@ -67,19 +67,24 @@ class TarantoolConnection
    continue;
    }
    else
    // Now we don't need a mutex. Just send everything over the network
    for (auto i = temp.begin(); i != temp.end(); ++i)
    {
    tnt_object_add_array(tuple, 2);
    tnt_object_add_int(tuple, j);
    tnt_object_add_int(tuple, j);
    tnt_replace(tnt_, 512, tuple);
    tnt_flush(tnt_);

    tnt_object_reset(tuple);
    {
    //printf("sending %d requests\n", temp.size());

    // Now we don't need a mutex. Just send everything over the network
    for (auto i = temp.begin(); i != temp.end(); ++i)
    {
    tnt_object_add_array(tuple, 2);
    tnt_object_add_int(tuple, j);
    tnt_object_add_int(tuple, j);
    tnt_replace(tnt_, 512, tuple);
    ++j;
    tnt_flush(tnt_);
    tnt_object_reset(tuple);

    }

    }
    }
    }
    else
    // Wait a little bit for a new requests in the out_queue
    // Use usleep for now instead of the conditional
    @@ -95,9 +100,9 @@ class TarantoolConnection
    struct tnt_reply reply;
    tnt_reply_init(&reply);

    // Constantly receiving everything and putting it to the in queue
    while (true)
    {
    // Constantly receiving everything and putting it to the in queue
    while (true)
    {
    // Receive everything that we can receive
    std::deque<struct tnt_reply*> temp;
    /*while (true)
    @@ -111,10 +116,10 @@ class TarantoolConnection
    }*/


    // Receive as much as we can but no more than 100
    // Receive as much as we can but no more than 10000
    int r = 0;
    int counter;
    for (counter = 0; counter < 1000; ++counter)
    for (counter = 0; counter < 10000; ++counter)
    {
    r = tnt_->read_reply(tnt_, &reply);

    @@ -144,7 +149,7 @@ class TarantoolConnection
    if (r == 1)
    usleep(1000);
    }
    }
    }
    }

    void TimerThread()
    @@ -227,10 +232,7 @@ int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply **re

    // Check it again
    if (in_queue_.empty())
    {
    usleep(1000);
    continue;
    }

    *result = in_queue_.front();
    in_queue_.pop_front();
  4. danikin revised this gist Mar 21, 2016. 1 changed file with 27 additions and 13 deletions.
    40 changes: 27 additions & 13 deletions tar_async.cpp
    Original file line number Diff line number Diff line change
    @@ -51,16 +51,23 @@ class TarantoolConnection
    // I should use interlocks here, but this only an POC alpha-version
    if (!out_queue_.empty())
    {
    // Copy all the queue to the temp one
    // We're doing that under the mutex
    // In fact this is extremely fast as it is the swap operation
    std::deque<tnt_stream*> temp;
    {
    std::lock_guard<std::mutex> l(out_mutex_);
    temp.swap(out_queue_);
    }
    // Copy all the queue to the temp one
    // We're doing that under the mutex
    // In fact this is extremely fast as it is the swap operation
    std::deque<tnt_stream*> temp;
    {
    std::lock_guard<std::mutex> l(out_mutex_);
    temp.swap(out_queue_);
    }

    // Now we don't need a mutex. Just send everything over the network
    // If queue is empty then just wait
    if (temp.empty())
    {
    usleep(1000);
    continue;
    }
    else
    // Now we don't need a mutex. Just send everything over the network
    for (auto i = temp.begin(); i != temp.end(); ++i)
    {
    tnt_object_add_array(tuple, 2);
    @@ -215,11 +222,18 @@ int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply **re
    // I should use interlocks here, but this only an POC alpha-version
    if (!in_queue_.empty())
    {
    // There is something in queue - get this answer to the client
    std::lock_guard<std::mutex> l(in_mutex_);
    // There is something in queue - get this answer to the client
    std::lock_guard<std::mutex> l(in_mutex_);

    *result = in_queue_.front();
    in_queue_.pop_front();
    // Check it again
    if (in_queue_.empty())
    {
    usleep(1000);
    continue;
    }

    *result = in_queue_.front();
    in_queue_.pop_front();

    ++num_answers_;

  5. danikin revised this gist Mar 21, 2016. 1 changed file with 151 additions and 24 deletions.
    175 changes: 151 additions & 24 deletions tar_async.cpp
    Original file line number Diff line number Diff line change
    @@ -10,7 +10,9 @@
    #include <string.h>
    #include <unistd.h>

    #include <dequeue>
    #include <deque>
    #include <thread>
    #include <mutex>

    // An object of this class can be used to connect to tarantool
    // and then to issue requests on one connection from different threads.
    @@ -28,14 +30,20 @@ class TarantoolConnection

    // conn_string - "IP:port"
    TarantoolConnection(const char *conn_string);


    bool IsConnected() { return connected_; }

    // Requests the server for the spercified query then receives a response
    int DoQuery(struct tnt_stream *tuple, struct tnt_reply &result);
    int DoQuery(struct tnt_stream *tuple, struct tnt_reply **result);

    private:

    void SendingThread()
    {
    int j = (int)time_t(NULL);

    struct tnt_stream *tuple = tnt_object(NULL);

    // Constantly sending everything from the out queue
    while (true)
    {
    @@ -46,15 +54,24 @@ class TarantoolConnection
    // Copy all the queue to the temp one
    // We're doing that under the mutex
    // In fact this is extremely fast as it is the swap operation
    std::dequeue<tnt_stream*> temp;
    std::deque<tnt_stream*> temp;
    {
    std::scoped_lock l(out_queue_);
    std::lock_guard<std::mutex> l(out_mutex_);
    temp.swap(out_queue_);
    }

    // Now we don't need a mutex. Just send everything over the network
    for (auto i = temp.begin(); i != temp.end(); ++i)
    tnt_flush(*i);
    for (auto i = temp.begin(); i != temp.end(); ++i)
    {
    tnt_object_add_array(tuple, 2);
    tnt_object_add_int(tuple, j);
    tnt_object_add_int(tuple, j);
    tnt_replace(tnt_, 512, tuple);
    tnt_flush(tnt_);

    tnt_object_reset(tuple);

    }
    }
    else
    // Wait a little bit for a new requests in the out_queue
    @@ -68,41 +85,123 @@ class TarantoolConnection

    void ReceivingThread()
    {
    struct tnt_reply reply;
    tnt_reply_init(&reply);

    // Constantly receiving everything and putting it to the in queue
    while (true)
    {
    // Receive everything that we can receive
    std::dequeue<tnt_reply*> temp;
    while (true)
    std::deque<struct tnt_reply*> temp;
    /*while (true)
    {
    struct tnt_reply *reply = tnt_reply_init(NULL);
    int r = tnt->read_reply(tnt, reply);
    int r = tnt_->read_reply(tnt_, reply);
    if (!r)
    break;
    temp.push_back(reply);
    }

    // Put it to the in queue under the mutex
    std::scoped_lock l(in_mutex_);
    }*/


    // Receive as much as we can but no more than 100
    int r = 0;
    int counter;
    for (counter = 0; counter < 1000; ++counter)
    {
    r = tnt_->read_reply(tnt_, &reply);

    if (r == 0)
    temp.push_back(NULL);
    else
    break;
    }

    // printf("counter=%d\n", counter);

    // We got the result
    if (!temp.empty())
    {
    // Put it to the in queue under the mutex
    std::lock_guard<std::mutex> l(in_mutex_);

    in_queue_ += temp;
    }
    in_queue_.insert(in_queue_.end(), temp.begin(), temp.end());
    }
    // We ain't got any result
    else
    {
    if (r == -1)
    fprintf(stderr, "Error receiveing response: r=%d, '%s'\n", r, reply.error);
    else
    // No result - just wait a little bit
    if (r == 1)
    usleep(1000);
    }
    }
    }

    void TimerThread()
    {
    int64_t prev_answers = 0, rps;
    while (true)
    {
    sleep(1);

    {
    std::lock_guard<std::mutex> l(in_mutex_);
    rps = num_answers_ - prev_answers;
    prev_answers = num_answers_;
    }

    printf("RPS=%d\n", rps);
    fflush(stdout);
    }
    }

    std::dequeue<tnt_reply*> in_queue_;
    std::dequeue<tnt_stream*> out_queue_;
    std::deque<struct tnt_reply*> in_queue_;
    std::deque<tnt_stream*> out_queue_;

    std::mutex in_mutex_, out_mutex_;

    std::thread send_thread_, receive_thread_, timer_thread_;

    struct tnt_stream *tnt_;

    bool connected_;

    int64_t num_answers_;
    };

    int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply &result)
    TarantoolConnection::TarantoolConnection(const char *conn_string)
    {
    num_answers_ = 0;
    connected_ = false;

    tnt_ = tnt_net(NULL);
    tnt_set(tnt_, TNT_OPT_URI, conn_string);

    if (tnt_connect(tnt_) < 0)
    {
    fprintf(stderr, "Connection refused on '%s'\n", conn_string);
    fflush(stderr);
    return;
    }

    connected_ = true;

    // Starting threads
    send_thread_ = std::thread( [=] { SendingThread(); });
    receive_thread_ = std::thread( [=] { ReceivingThread(); });
    timer_thread_ = std::thread( [=] { TimerThread(); });
    }

    int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply **result)
    {
    // Put the query into outgoing queue

    // Protect this queue from multithreading access
    {
    std::scoped_lock l(out_mutex_);
    std::lock_guard<std::mutex> l(out_mutex_);

    out_queue_.push_back(tuple);
    }
    @@ -117,18 +216,46 @@ int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply &res
    if (!in_queue_.empty())
    {
    // There is something in queue - get this answer to the client
    std::scoped_lock l(in_mutex_);
    std::lock_guard<std::mutex> l(in_mutex_);

    result = in_queue_.front();
    *result = in_queue_.front();
    in_queue_.pop_front();

    ++num_answers_;

    // if (!(num_answers_%100000))
    // printf("num_answers_=%d\n", num_answers_);

    return 1;
    }
    else
    // Wait a little bit for a new responses in the in_queue
    // Use usleep for now instead of the conditional
    // Yeah, I know, this is shit :-)
    usleep(1000);
    }
    }
    }

    void DoTest(TarantoolConnection *conn)
    {
    struct tnt_reply *result;
    for (int i = 0; i < 100000000; ++i)
    conn->DoQuery(NULL, &result);
    }

    int main()
    {
    TarantoolConnection conn("172.31.26.200:3301");

    if (!conn.IsConnected())
    return 0;

    std::thread t[1000];
    for (int i = 0;i < 500;++i)
    t[i] = std::thread(&DoTest, &conn);

    sleep(1000000);

    return 0;
    }

  6. danikin revised this gist Mar 21, 2016. 1 changed file with 122 additions and 0 deletions.
    122 changes: 122 additions & 0 deletions tar_async.cpp
    Original file line number Diff line number Diff line change
    @@ -10,3 +10,125 @@
    #include <string.h>
    #include <unistd.h>

    #include <dequeue>

    // An object of this class can be used to connect to tarantool
    // and then to issue requests on one connection from different threads.
    //
    // The key thing about this class is that one object is good to serve all the workload
    // from a single machiune because Tarantool allows using one socket in parallel for different
    // requests
    // Plus this workload will be served extremely efficient because under the hoop inside the
    // standard Tarantool c-library all the parallel requests will be packed into a single packet and
    // all the parallel responses will come in in a single packet which allows to recude the number
    // of system calls at order or magitude or even more
    class TarantoolConnection
    {
    public:

    // conn_string - "IP:port"
    TarantoolConnection(const char *conn_string);

    // Requests the server for the spercified query then receives a response
    int DoQuery(struct tnt_stream *tuple, struct tnt_reply &result);

    private:

    void SendingThread()
    {
    // Constantly sending everything from the out queue
    while (true)
    {
    // Check out_queue outside of mutex for performance reason
    // I should use interlocks here, but this only an POC alpha-version
    if (!out_queue_.empty())
    {
    // Copy all the queue to the temp one
    // We're doing that under the mutex
    // In fact this is extremely fast as it is the swap operation
    std::dequeue<tnt_stream*> temp;
    {
    std::scoped_lock l(out_queue_);
    temp.swap(out_queue_);
    }

    // Now we don't need a mutex. Just send everything over the network
    for (auto i = temp.begin(); i != temp.end(); ++i)
    tnt_flush(*i);
    }
    else
    // Wait a little bit for a new requests in the out_queue
    // Use usleep for now instead of the conditional
    // Yeah, I know, this is shit :-)
    usleep(1000);


    }
    }

    void ReceivingThread()
    {
    // Constantly receiving everything and putting it to the in queue
    while (true)
    {
    // Receive everything that we can receive
    std::dequeue<tnt_reply*> temp;
    while (true)
    {
    struct tnt_reply *reply = tnt_reply_init(NULL);
    int r = tnt->read_reply(tnt, reply);
    if (!r)
    break;

    temp.push_back(reply);
    }

    // Put it to the in queue under the mutex
    std::scoped_lock l(in_mutex_);

    in_queue_ += temp;
    }
    }

    std::dequeue<tnt_reply*> in_queue_;
    std::dequeue<tnt_stream*> out_queue_;

    std::mutex in_mutex_, out_mutex_;
    };

    int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply &result)
    {
    // Put the query into outgoing queue

    // Protect this queue from multithreading access
    {
    std::scoped_lock l(out_mutex_);

    out_queue_.push_back(tuple);
    }

    // Now this query should be processed in a background sending thread

    // Waiting for a response which will come in from a background receving thread
    while (true)
    {
    // Check in_queue outside of mutex for performance reason
    // I should use interlocks here, but this only an POC alpha-version
    if (!in_queue_.empty())
    {
    // There is something in queue - get this answer to the client
    std::scoped_lock l(in_mutex_);

    result = in_queue_.front();
    in_queue_.pop_front();
    }
    else
    // Wait a little bit for a new responses in the in_queue
    // Use usleep for now instead of the conditional
    // Yeah, I know, this is shit :-)
    usleep(1000);
    }
    }
    }


  7. danikin created this gist Mar 21, 2016.
    12 changes: 12 additions & 0 deletions tar_async.cpp
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,12 @@
    #include <stdio.h>
    #include <stdlib.h>
    #include <time.h>
    #include <sys/time.h>
    #include <tarantool/tarantool.h>
    #include <tarantool/tnt_net.h>
    #include <tarantool/tnt_opt.h>
    #include <pthread.h>
    #include <errno.h>
    #include <string.h>
    #include <unistd.h>