Last active
January 30, 2019 03:24
-
-
Save danikin/8bfa33e6dff93ccf211c to your computer and use it in GitHub Desktop.
Asynchronous client for Tarantool with synchronous interface
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include <stdio.h> | |
| #include <stdlib.h> | |
| #include <time.h> | |
| #include <sys/time.h> | |
| #include <tarantool/tarantool.h> | |
| #include <tarantool/tnt_net.h> | |
| #include <tarantool/tnt_opt.h> | |
| #include <pthread.h> | |
| #include <errno.h> | |
| #include <string.h> | |
| #include <unistd.h> | |
| #include <deque> | |
| #include <thread> | |
| #include <mutex> | |
| // An object of this class can be used to connect to tarantool | |
| // and then to issue requests on one connection from different threads. | |
| // | |
| // The key thing about this class is that one object is good to serve all the workload | |
| // from a single machiune because Tarantool allows using one socket in parallel for different | |
| // requests | |
| // Plus this workload will be served extremely efficient because under the hoop inside the | |
| // standard Tarantool c-library all the parallel requests will be packed into a single packet and | |
| // all the parallel responses will come in in a single packet which allows to recude the number | |
| // of system calls at order or magitude or even more | |
| class TarantoolConnection | |
| { | |
| public: | |
| // conn_string - "IP:port" | |
| TarantoolConnection(const char *conn_string); | |
| bool IsConnected() { return connected_; } | |
| // Requests the server for the spercified query then receives a response | |
| int DoQuery(struct tnt_stream *tuple, struct tnt_reply **result); | |
| private: | |
| void SendingThread() | |
| { | |
| int j = (int)time_t(NULL); | |
| struct tnt_stream *tuple = tnt_object(NULL); | |
| // Constantly sending everything from the out queue | |
| while (true) | |
| { | |
| // Check out_queue outside of mutex for performance reason | |
| // I should use interlocks here, but this only an POC alpha-version | |
| if (!out_queue_.empty()) | |
| { | |
| // Copy all the queue to the temp one | |
| // We're doing that under the mutex | |
| // In fact this is extremely fast as it is the swap operation | |
| std::deque<tnt_stream*> temp; | |
| { | |
| std::lock_guard<std::mutex> l(out_mutex_); | |
| temp.swap(out_queue_); | |
| } | |
| // Now we don't need a mutex. Just send everything over the network | |
| for (auto i = temp.begin(); i != temp.end(); ++i) | |
| { | |
| tnt_object_add_array(tuple, 2); | |
| tnt_object_add_int(tuple, j); | |
| tnt_object_add_int(tuple, j); | |
| tnt_replace(tnt_, 512, tuple); | |
| tnt_flush(tnt_); | |
| tnt_object_reset(tuple); | |
| } | |
| } | |
| else | |
| // Wait a little bit for a new requests in the out_queue | |
| // Use usleep for now instead of the conditional | |
| // Yeah, I know, this is shit :-) | |
| usleep(1000); | |
| } | |
| } | |
| void ReceivingThread() | |
| { | |
| struct tnt_reply reply; | |
| tnt_reply_init(&reply); | |
| // Constantly receiving everything and putting it to the in queue | |
| while (true) | |
| { | |
| // Receive everything that we can receive | |
| std::deque<struct tnt_reply*> temp; | |
| /*while (true) | |
| { | |
| struct tnt_reply *reply = tnt_reply_init(NULL); | |
| int r = tnt_->read_reply(tnt_, reply); | |
| if (!r) | |
| break; | |
| temp.push_back(reply); | |
| }*/ | |
| // Receive as much as we can but no more than 100 | |
| int r = 0; | |
| int counter; | |
| for (counter = 0; counter < 1000; ++counter) | |
| { | |
| r = tnt_->read_reply(tnt_, &reply); | |
| if (r == 0) | |
| temp.push_back(NULL); | |
| else | |
| break; | |
| } | |
| // printf("counter=%d\n", counter); | |
| // We got the result | |
| if (!temp.empty()) | |
| { | |
| // Put it to the in queue under the mutex | |
| std::lock_guard<std::mutex> l(in_mutex_); | |
| in_queue_.insert(in_queue_.end(), temp.begin(), temp.end()); | |
| } | |
| // We ain't got any result | |
| else | |
| { | |
| if (r == -1) | |
| fprintf(stderr, "Error receiveing response: r=%d, '%s'\n", r, reply.error); | |
| else | |
| // No result - just wait a little bit | |
| if (r == 1) | |
| usleep(1000); | |
| } | |
| } | |
| } | |
| void TimerThread() | |
| { | |
| int64_t prev_answers = 0, rps; | |
| while (true) | |
| { | |
| sleep(1); | |
| { | |
| std::lock_guard<std::mutex> l(in_mutex_); | |
| rps = num_answers_ - prev_answers; | |
| prev_answers = num_answers_; | |
| } | |
| printf("RPS=%d\n", rps); | |
| fflush(stdout); | |
| } | |
| } | |
| std::deque<struct tnt_reply*> in_queue_; | |
| std::deque<tnt_stream*> out_queue_; | |
| std::mutex in_mutex_, out_mutex_; | |
| std::thread send_thread_, receive_thread_, timer_thread_; | |
| struct tnt_stream *tnt_; | |
| bool connected_; | |
| int64_t num_answers_; | |
| }; | |
| TarantoolConnection::TarantoolConnection(const char *conn_string) | |
| { | |
| num_answers_ = 0; | |
| connected_ = false; | |
| tnt_ = tnt_net(NULL); | |
| tnt_set(tnt_, TNT_OPT_URI, conn_string); | |
| if (tnt_connect(tnt_) < 0) | |
| { | |
| fprintf(stderr, "Connection refused on '%s'\n", conn_string); | |
| fflush(stderr); | |
| return; | |
| } | |
| connected_ = true; | |
| // Starting threads | |
| send_thread_ = std::thread( [=] { SendingThread(); }); | |
| receive_thread_ = std::thread( [=] { ReceivingThread(); }); | |
| timer_thread_ = std::thread( [=] { TimerThread(); }); | |
| } | |
| int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply **result) | |
| { | |
| // Put the query into outgoing queue | |
| // Protect this queue from multithreading access | |
| { | |
| std::lock_guard<std::mutex> l(out_mutex_); | |
| out_queue_.push_back(tuple); | |
| } | |
| // Now this query should be processed in a background sending thread | |
| // Waiting for a response which will come in from a background receving thread | |
| while (true) | |
| { | |
| // Check in_queue outside of mutex for performance reason | |
| // I should use interlocks here, but this only an POC alpha-version | |
| if (!in_queue_.empty()) | |
| { | |
| // There is something in queue - get this answer to the client | |
| std::lock_guard<std::mutex> l(in_mutex_); | |
| *result = in_queue_.front(); | |
| in_queue_.pop_front(); | |
| ++num_answers_; | |
| // if (!(num_answers_%100000)) | |
| // printf("num_answers_=%d\n", num_answers_); | |
| return 1; | |
| } | |
| else | |
| // Wait a little bit for a new responses in the in_queue | |
| // Use usleep for now instead of the conditional | |
| // Yeah, I know, this is shit :-) | |
| usleep(1000); | |
| } | |
| } | |
| void DoTest(TarantoolConnection *conn) | |
| { | |
| struct tnt_reply *result; | |
| for (int i = 0; i < 100000000; ++i) | |
| conn->DoQuery(NULL, &result); | |
| } | |
| int main() | |
| { | |
| TarantoolConnection conn("172.31.26.200:3301"); | |
| if (!conn.IsConnected()) | |
| return 0; | |
| std::thread t[1000]; | |
| for (int i = 0;i < 500;++i) | |
| t[i] = std::thread(&DoTest, &conn); | |
| sleep(1000000); | |
| return 0; | |
| } | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment