#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include // An object of this class can be used to connect to tarantool // and then to issue requests on one connection from different threads. // // The key thing about this class is that one object is good to serve all the workload // from a single machiune because Tarantool allows using one socket in parallel for different // requests // Plus this workload will be served extremely efficient because under the hoop inside the // standard Tarantool c-library all the parallel requests will be packed into a single packet and // all the parallel responses will come in in a single packet which allows to recude the number // of system calls at order or magitude or even more class TarantoolConnection { public: // conn_string - "IP:port" TarantoolConnection(const char *conn_string); bool IsConnected() { return connected_; } // Requests the server for the spercified query then receives a response int DoQuery(struct tnt_stream *tuple, struct tnt_reply **result); private: void SendingThread() { int j = (int)time_t(NULL); struct tnt_stream *tuple = tnt_object(NULL); // Constantly sending everything from the out queue while (true) { std::deque temp; { std::unique_lock l(out_mutex_); // Waiting while out_queue is empty // out_cond should be fired once a queue is not empty while (out_queue_.empty()) out_cond_.wait(l); // Copy all the queue to the temp one // We're doing that under the mutex // In fact this is extremely fast as it is the swap operation temp.swap(out_queue_); } assert(!temp.empty()); // printf("sending %d requests\n", temp.size()); // Now we don't need a mutex. Just send everything over the network for (auto i = temp.begin(); i != temp.end(); ++i) { tnt_object_add_array(tuple, 2); tnt_object_add_int(tuple, j); tnt_object_add_int(tuple, j); tnt_replace(tnt_, 512, tuple); ++j; // tnt_flush(tnt_); tnt_object_reset(tuple); } tnt_flush(tnt_); // tnt_object_reset(tuple); // Notify the receiveing thread that it can start receive data // It we did not than it would in an active waiting as read_reply would return 1 send_notify_cond_.notify_all(); } // while } void ReceivingThread() { struct tnt_reply reply; tnt_reply_init(&reply); // Constantly receiving everything and putting it to the in queue while (true) { // Receive everything that we can receive std::deque temp; /*while (true) { struct tnt_reply *reply = tnt_reply_init(NULL); int r = tnt_->read_reply(tnt_, reply); if (!r) break; temp.push_back(reply); }*/ // Receive as much as we can but no more than 10000 int r = 0; int counter; for (counter = 0; counter < 10000; ++counter) { r = tnt_->read_reply(tnt_, &reply); if (r == 0) temp.push_back(NULL); else break; } // printf("counter=%d\n", counter); // We got the result if (!temp.empty()) { // Put it to the in queue under the mutex std::lock_guard l(in_mutex_); in_queue_.insert(in_queue_.end(), temp.begin(), temp.end()); // We're notifying all the threads, but it would be better to notify only those of them // that waits for the data in just received "temp" in_cond_.notify_all(); } // We ain't got any result else { if (r == -1) fprintf(stderr, "Error receiveing response: r=%d, '%s'\n", r, reply.error); else // No result - just wait util it comes in if (r == 1) { // Waiting for a send thread to send data to network // We have to wait because right now we don't have any data to read from network std::unique_lock l(send_notify_mutex_); send_notify_cond_.wait(l); } } } } void TimerThread() { int64_t prev_answers = 0, rps; while (true) { sleep(1); { std::lock_guard l(in_mutex_); rps = num_answers_ - prev_answers; prev_answers = num_answers_; } printf("RPS=%d\n", rps); fflush(stdout); } } std::deque in_queue_; std::deque out_queue_; std::mutex in_mutex_, out_mutex_, send_notify_mutex_; std::condition_variable in_cond_, out_cond_, send_notify_cond_; std::thread send_thread_, receive_thread_, timer_thread_; struct tnt_stream *tnt_; bool connected_; int64_t num_answers_; }; TarantoolConnection::TarantoolConnection(const char *conn_string) { num_answers_ = 0; connected_ = false; tnt_ = tnt_net(NULL); tnt_set(tnt_, TNT_OPT_URI, conn_string); if (tnt_connect(tnt_) < 0) { fprintf(stderr, "Connection refused on '%s'\n", conn_string); fflush(stderr); return; } connected_ = true; // Starting threads send_thread_ = std::thread( [=] { SendingThread(); }); receive_thread_ = std::thread( [=] { ReceivingThread(); }); timer_thread_ = std::thread( [=] { TimerThread(); }); } int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply **result) { // Put the query into outgoing queue // Protect this queue from multithreading access { std::lock_guard l(out_mutex_); out_queue_.push_back(tuple); // Notify the sending thread that it can send data out_cond_.notify_all(); } // Now this query should be processed in a background sending thread // There is something in queue - get this answer to the client std::unique_lock l(in_mutex_); { // Wait until we have data in the in_queue while (in_queue_.empty()) in_cond_.wait(l); assert(!in_queue_.empty()); *result = in_queue_.front(); in_queue_.pop_front(); ++num_answers_; } // if (!(num_answers_%100000)) // printf("num_answers_=%d\n", num_answers_); return 1; } void DoTest(TarantoolConnection *conn) { struct tnt_reply *result; for (int i = 0; i < 100000000; ++i) conn->DoQuery(NULL, &result); } int main() { TarantoolConnection conn("172.31.26.200:3301"); if (!conn.IsConnected()) return 0; std::thread t[1000]; for (int i = 0;i < 1000;++i) t[i] = std::thread(&DoTest, &conn); sleep(1000000); return 0; }