Last active
January 30, 2019 03:24
-
-
Save danikin/8bfa33e6dff93ccf211c to your computer and use it in GitHub Desktop.
Revisions
-
danikin revised this gist
Mar 22, 2016 . 1 changed file with 2 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -78,11 +78,11 @@ class TarantoolConnection tnt_replace(tnt_, 512, tuple); ++j; // tnt_flush(tnt_); tnt_object_reset(tuple); } tnt_flush(tnt_); // tnt_object_reset(tuple); // Notify the receiveing thread that it can start receive data -
danikin revised this gist
Mar 22, 2016 . 1 changed file with 68 additions and 67 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -13,6 +13,8 @@ #include <deque> #include <thread> #include <mutex> #include <condition_variable> #include <cassert> // An object of this class can be used to connect to tarantool // and then to issue requests on one connection from different threads. @@ -47,52 +49,47 @@ class TarantoolConnection // Constantly sending everything from the out queue while (true) { std::deque<tnt_stream*> temp; { std::unique_lock<std::mutex> l(out_mutex_); // Waiting while out_queue is empty // out_cond should be fired once a queue is not empty while (out_queue_.empty()) out_cond_.wait(l); // Copy all the queue to the temp one // We're doing that under the mutex // In fact this is extremely fast as it is the swap operation temp.swap(out_queue_); } assert(!temp.empty()); // printf("sending %d requests\n", temp.size()); // Now we don't need a mutex. Just send everything over the network for (auto i = temp.begin(); i != temp.end(); ++i) { tnt_object_add_array(tuple, 2); tnt_object_add_int(tuple, j); tnt_object_add_int(tuple, j); tnt_replace(tnt_, 512, tuple); ++j; tnt_flush(tnt_); tnt_object_reset(tuple); } // tnt_flush(tnt_); // tnt_object_reset(tuple); // Notify the receiveing thread that it can start receive data // It we did not than it would in an active waiting as read_reply would return 1 send_notify_cond_.notify_all(); } // while } void ReceivingThread() @@ -138,16 +135,26 @@ class TarantoolConnection std::lock_guard<std::mutex> l(in_mutex_); in_queue_.insert(in_queue_.end(), temp.begin(), temp.end()); // We're notifying all the threads, but it would be better to notify only those of them // that waits for the data in just received "temp" in_cond_.notify_all(); } // We ain't got any result else { if (r == -1) fprintf(stderr, "Error receiveing response: r=%d, '%s'\n", r, reply.error); else // No result - just wait util it comes in if (r == 1) { // Waiting for a send thread to send data to network // We have to wait because right now we don't have any data to read from network std::unique_lock<std::mutex> l(send_notify_mutex_); send_notify_cond_.wait(l); } } } } @@ -173,7 +180,8 @@ class TarantoolConnection std::deque<struct tnt_reply*> in_queue_; std::deque<tnt_stream*> out_queue_; std::mutex in_mutex_, out_mutex_, send_notify_mutex_; std::condition_variable in_cond_, out_cond_, send_notify_cond_; std::thread send_thread_, receive_thread_, timer_thread_; @@ -216,40 +224,33 @@ int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply **re std::lock_guard<std::mutex> l(out_mutex_); out_queue_.push_back(tuple); // Notify the sending thread that it can send data out_cond_.notify_all(); } // Now this query should be processed in a background sending thread // There is something in queue - get this answer to the client std::unique_lock<std::mutex> l(in_mutex_); { // Wait until we have data in the in_queue while (in_queue_.empty()) in_cond_.wait(l); assert(!in_queue_.empty()); *result = in_queue_.front(); in_queue_.pop_front(); ++num_answers_; } // if (!(num_answers_%100000)) // printf("num_answers_=%d\n", num_answers_); return 1; } void DoTest(TarantoolConnection *conn) @@ -267,7 +268,7 @@ int main() return 0; std::thread t[1000]; for (int i = 0;i < 1000;++i) t[i] = std::thread(&DoTest, &conn); sleep(1000000); -
danikin revised this gist
Mar 21, 2016 . 1 changed file with 22 additions and 20 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -67,19 +67,24 @@ class TarantoolConnection continue; } else { //printf("sending %d requests\n", temp.size()); // Now we don't need a mutex. Just send everything over the network for (auto i = temp.begin(); i != temp.end(); ++i) { tnt_object_add_array(tuple, 2); tnt_object_add_int(tuple, j); tnt_object_add_int(tuple, j); tnt_replace(tnt_, 512, tuple); ++j; tnt_flush(tnt_); tnt_object_reset(tuple); } } } else // Wait a little bit for a new requests in the out_queue // Use usleep for now instead of the conditional @@ -95,9 +100,9 @@ class TarantoolConnection struct tnt_reply reply; tnt_reply_init(&reply); // Constantly receiving everything and putting it to the in queue while (true) { // Receive everything that we can receive std::deque<struct tnt_reply*> temp; /*while (true) @@ -111,10 +116,10 @@ class TarantoolConnection }*/ // Receive as much as we can but no more than 10000 int r = 0; int counter; for (counter = 0; counter < 10000; ++counter) { r = tnt_->read_reply(tnt_, &reply); @@ -144,7 +149,7 @@ class TarantoolConnection if (r == 1) usleep(1000); } } } void TimerThread() @@ -227,10 +232,7 @@ int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply **re // Check it again if (in_queue_.empty()) continue; *result = in_queue_.front(); in_queue_.pop_front(); -
danikin revised this gist
Mar 21, 2016 . 1 changed file with 27 additions and 13 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -51,16 +51,23 @@ class TarantoolConnection // I should use interlocks here, but this only an POC alpha-version if (!out_queue_.empty()) { // Copy all the queue to the temp one // We're doing that under the mutex // In fact this is extremely fast as it is the swap operation std::deque<tnt_stream*> temp; { std::lock_guard<std::mutex> l(out_mutex_); temp.swap(out_queue_); } // If queue is empty then just wait if (temp.empty()) { usleep(1000); continue; } else // Now we don't need a mutex. Just send everything over the network for (auto i = temp.begin(); i != temp.end(); ++i) { tnt_object_add_array(tuple, 2); @@ -215,11 +222,18 @@ int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply **re // I should use interlocks here, but this only an POC alpha-version if (!in_queue_.empty()) { // There is something in queue - get this answer to the client std::lock_guard<std::mutex> l(in_mutex_); // Check it again if (in_queue_.empty()) { usleep(1000); continue; } *result = in_queue_.front(); in_queue_.pop_front(); ++num_answers_; -
danikin revised this gist
Mar 21, 2016 . 1 changed file with 151 additions and 24 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -10,7 +10,9 @@ #include <string.h> #include <unistd.h> #include <deque> #include <thread> #include <mutex> // An object of this class can be used to connect to tarantool // and then to issue requests on one connection from different threads. @@ -28,14 +30,20 @@ class TarantoolConnection // conn_string - "IP:port" TarantoolConnection(const char *conn_string); bool IsConnected() { return connected_; } // Requests the server for the spercified query then receives a response int DoQuery(struct tnt_stream *tuple, struct tnt_reply **result); private: void SendingThread() { int j = (int)time_t(NULL); struct tnt_stream *tuple = tnt_object(NULL); // Constantly sending everything from the out queue while (true) { @@ -46,15 +54,24 @@ class TarantoolConnection // Copy all the queue to the temp one // We're doing that under the mutex // In fact this is extremely fast as it is the swap operation std::deque<tnt_stream*> temp; { std::lock_guard<std::mutex> l(out_mutex_); temp.swap(out_queue_); } // Now we don't need a mutex. Just send everything over the network for (auto i = temp.begin(); i != temp.end(); ++i) { tnt_object_add_array(tuple, 2); tnt_object_add_int(tuple, j); tnt_object_add_int(tuple, j); tnt_replace(tnt_, 512, tuple); tnt_flush(tnt_); tnt_object_reset(tuple); } } else // Wait a little bit for a new requests in the out_queue @@ -68,41 +85,123 @@ class TarantoolConnection void ReceivingThread() { struct tnt_reply reply; tnt_reply_init(&reply); // Constantly receiving everything and putting it to the in queue while (true) { // Receive everything that we can receive std::deque<struct tnt_reply*> temp; /*while (true) { struct tnt_reply *reply = tnt_reply_init(NULL); int r = tnt_->read_reply(tnt_, reply); if (!r) break; temp.push_back(reply); }*/ // Receive as much as we can but no more than 100 int r = 0; int counter; for (counter = 0; counter < 1000; ++counter) { r = tnt_->read_reply(tnt_, &reply); if (r == 0) temp.push_back(NULL); else break; } // printf("counter=%d\n", counter); // We got the result if (!temp.empty()) { // Put it to the in queue under the mutex std::lock_guard<std::mutex> l(in_mutex_); in_queue_.insert(in_queue_.end(), temp.begin(), temp.end()); } // We ain't got any result else { if (r == -1) fprintf(stderr, "Error receiveing response: r=%d, '%s'\n", r, reply.error); else // No result - just wait a little bit if (r == 1) usleep(1000); } } } void TimerThread() { int64_t prev_answers = 0, rps; while (true) { sleep(1); { std::lock_guard<std::mutex> l(in_mutex_); rps = num_answers_ - prev_answers; prev_answers = num_answers_; } printf("RPS=%d\n", rps); fflush(stdout); } } std::deque<struct tnt_reply*> in_queue_; std::deque<tnt_stream*> out_queue_; std::mutex in_mutex_, out_mutex_; std::thread send_thread_, receive_thread_, timer_thread_; struct tnt_stream *tnt_; bool connected_; int64_t num_answers_; }; TarantoolConnection::TarantoolConnection(const char *conn_string) { num_answers_ = 0; connected_ = false; tnt_ = tnt_net(NULL); tnt_set(tnt_, TNT_OPT_URI, conn_string); if (tnt_connect(tnt_) < 0) { fprintf(stderr, "Connection refused on '%s'\n", conn_string); fflush(stderr); return; } connected_ = true; // Starting threads send_thread_ = std::thread( [=] { SendingThread(); }); receive_thread_ = std::thread( [=] { ReceivingThread(); }); timer_thread_ = std::thread( [=] { TimerThread(); }); } int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply **result) { // Put the query into outgoing queue // Protect this queue from multithreading access { std::lock_guard<std::mutex> l(out_mutex_); out_queue_.push_back(tuple); } @@ -117,18 +216,46 @@ int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply &res if (!in_queue_.empty()) { // There is something in queue - get this answer to the client std::lock_guard<std::mutex> l(in_mutex_); *result = in_queue_.front(); in_queue_.pop_front(); ++num_answers_; // if (!(num_answers_%100000)) // printf("num_answers_=%d\n", num_answers_); return 1; } else // Wait a little bit for a new responses in the in_queue // Use usleep for now instead of the conditional // Yeah, I know, this is shit :-) usleep(1000); } } void DoTest(TarantoolConnection *conn) { struct tnt_reply *result; for (int i = 0; i < 100000000; ++i) conn->DoQuery(NULL, &result); } int main() { TarantoolConnection conn("172.31.26.200:3301"); if (!conn.IsConnected()) return 0; std::thread t[1000]; for (int i = 0;i < 500;++i) t[i] = std::thread(&DoTest, &conn); sleep(1000000); return 0; } -
danikin revised this gist
Mar 21, 2016 . 1 changed file with 122 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -10,3 +10,125 @@ #include <string.h> #include <unistd.h> #include <dequeue> // An object of this class can be used to connect to tarantool // and then to issue requests on one connection from different threads. // // The key thing about this class is that one object is good to serve all the workload // from a single machiune because Tarantool allows using one socket in parallel for different // requests // Plus this workload will be served extremely efficient because under the hoop inside the // standard Tarantool c-library all the parallel requests will be packed into a single packet and // all the parallel responses will come in in a single packet which allows to recude the number // of system calls at order or magitude or even more class TarantoolConnection { public: // conn_string - "IP:port" TarantoolConnection(const char *conn_string); // Requests the server for the spercified query then receives a response int DoQuery(struct tnt_stream *tuple, struct tnt_reply &result); private: void SendingThread() { // Constantly sending everything from the out queue while (true) { // Check out_queue outside of mutex for performance reason // I should use interlocks here, but this only an POC alpha-version if (!out_queue_.empty()) { // Copy all the queue to the temp one // We're doing that under the mutex // In fact this is extremely fast as it is the swap operation std::dequeue<tnt_stream*> temp; { std::scoped_lock l(out_queue_); temp.swap(out_queue_); } // Now we don't need a mutex. Just send everything over the network for (auto i = temp.begin(); i != temp.end(); ++i) tnt_flush(*i); } else // Wait a little bit for a new requests in the out_queue // Use usleep for now instead of the conditional // Yeah, I know, this is shit :-) usleep(1000); } } void ReceivingThread() { // Constantly receiving everything and putting it to the in queue while (true) { // Receive everything that we can receive std::dequeue<tnt_reply*> temp; while (true) { struct tnt_reply *reply = tnt_reply_init(NULL); int r = tnt->read_reply(tnt, reply); if (!r) break; temp.push_back(reply); } // Put it to the in queue under the mutex std::scoped_lock l(in_mutex_); in_queue_ += temp; } } std::dequeue<tnt_reply*> in_queue_; std::dequeue<tnt_stream*> out_queue_; std::mutex in_mutex_, out_mutex_; }; int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply &result) { // Put the query into outgoing queue // Protect this queue from multithreading access { std::scoped_lock l(out_mutex_); out_queue_.push_back(tuple); } // Now this query should be processed in a background sending thread // Waiting for a response which will come in from a background receving thread while (true) { // Check in_queue outside of mutex for performance reason // I should use interlocks here, but this only an POC alpha-version if (!in_queue_.empty()) { // There is something in queue - get this answer to the client std::scoped_lock l(in_mutex_); result = in_queue_.front(); in_queue_.pop_front(); } else // Wait a little bit for a new responses in the in_queue // Use usleep for now instead of the conditional // Yeah, I know, this is shit :-) usleep(1000); } } } -
danikin created this gist
Mar 21, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,12 @@ #include <stdio.h> #include <stdlib.h> #include <time.h> #include <sys/time.h> #include <tarantool/tarantool.h> #include <tarantool/tnt_net.h> #include <tarantool/tnt_opt.h> #include <pthread.h> #include <errno.h> #include <string.h> #include <unistd.h>