Skip to content

Instantly share code, notes, and snippets.

@danikin
Last active January 30, 2019 03:24
Show Gist options
  • Save danikin/8bfa33e6dff93ccf211c to your computer and use it in GitHub Desktop.
Save danikin/8bfa33e6dff93ccf211c to your computer and use it in GitHub Desktop.
Asynchronous client for Tarantool with synchronous interface
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <sys/time.h>
#include <tarantool/tarantool.h>
#include <tarantool/tnt_net.h>
#include <tarantool/tnt_opt.h>
#include <pthread.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <deque>
#include <thread>
#include <mutex>
// An object of this class can be used to connect to tarantool
// and then to issue requests on one connection from different threads.
//
// The key thing about this class is that one object is good to serve all the workload
// from a single machiune because Tarantool allows using one socket in parallel for different
// requests
// Plus this workload will be served extremely efficient because under the hoop inside the
// standard Tarantool c-library all the parallel requests will be packed into a single packet and
// all the parallel responses will come in in a single packet which allows to recude the number
// of system calls at order or magitude or even more
class TarantoolConnection
{
public:
// conn_string - "IP:port"
TarantoolConnection(const char *conn_string);
bool IsConnected() { return connected_; }
// Requests the server for the spercified query then receives a response
int DoQuery(struct tnt_stream *tuple, struct tnt_reply **result);
private:
void SendingThread()
{
int j = (int)time_t(NULL);
struct tnt_stream *tuple = tnt_object(NULL);
// Constantly sending everything from the out queue
while (true)
{
// Check out_queue outside of mutex for performance reason
// I should use interlocks here, but this only an POC alpha-version
if (!out_queue_.empty())
{
// Copy all the queue to the temp one
// We're doing that under the mutex
// In fact this is extremely fast as it is the swap operation
std::deque<tnt_stream*> temp;
{
std::lock_guard<std::mutex> l(out_mutex_);
temp.swap(out_queue_);
}
// Now we don't need a mutex. Just send everything over the network
for (auto i = temp.begin(); i != temp.end(); ++i)
{
tnt_object_add_array(tuple, 2);
tnt_object_add_int(tuple, j);
tnt_object_add_int(tuple, j);
tnt_replace(tnt_, 512, tuple);
tnt_flush(tnt_);
tnt_object_reset(tuple);
}
}
else
// Wait a little bit for a new requests in the out_queue
// Use usleep for now instead of the conditional
// Yeah, I know, this is shit :-)
usleep(1000);
}
}
void ReceivingThread()
{
struct tnt_reply reply;
tnt_reply_init(&reply);
// Constantly receiving everything and putting it to the in queue
while (true)
{
// Receive everything that we can receive
std::deque<struct tnt_reply*> temp;
/*while (true)
{
struct tnt_reply *reply = tnt_reply_init(NULL);
int r = tnt_->read_reply(tnt_, reply);
if (!r)
break;
temp.push_back(reply);
}*/
// Receive as much as we can but no more than 100
int r = 0;
int counter;
for (counter = 0; counter < 1000; ++counter)
{
r = tnt_->read_reply(tnt_, &reply);
if (r == 0)
temp.push_back(NULL);
else
break;
}
// printf("counter=%d\n", counter);
// We got the result
if (!temp.empty())
{
// Put it to the in queue under the mutex
std::lock_guard<std::mutex> l(in_mutex_);
in_queue_.insert(in_queue_.end(), temp.begin(), temp.end());
}
// We ain't got any result
else
{
if (r == -1)
fprintf(stderr, "Error receiveing response: r=%d, '%s'\n", r, reply.error);
else
// No result - just wait a little bit
if (r == 1)
usleep(1000);
}
}
}
void TimerThread()
{
int64_t prev_answers = 0, rps;
while (true)
{
sleep(1);
{
std::lock_guard<std::mutex> l(in_mutex_);
rps = num_answers_ - prev_answers;
prev_answers = num_answers_;
}
printf("RPS=%d\n", rps);
fflush(stdout);
}
}
std::deque<struct tnt_reply*> in_queue_;
std::deque<tnt_stream*> out_queue_;
std::mutex in_mutex_, out_mutex_;
std::thread send_thread_, receive_thread_, timer_thread_;
struct tnt_stream *tnt_;
bool connected_;
int64_t num_answers_;
};
TarantoolConnection::TarantoolConnection(const char *conn_string)
{
num_answers_ = 0;
connected_ = false;
tnt_ = tnt_net(NULL);
tnt_set(tnt_, TNT_OPT_URI, conn_string);
if (tnt_connect(tnt_) < 0)
{
fprintf(stderr, "Connection refused on '%s'\n", conn_string);
fflush(stderr);
return;
}
connected_ = true;
// Starting threads
send_thread_ = std::thread( [=] { SendingThread(); });
receive_thread_ = std::thread( [=] { ReceivingThread(); });
timer_thread_ = std::thread( [=] { TimerThread(); });
}
int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply **result)
{
// Put the query into outgoing queue
// Protect this queue from multithreading access
{
std::lock_guard<std::mutex> l(out_mutex_);
out_queue_.push_back(tuple);
}
// Now this query should be processed in a background sending thread
// Waiting for a response which will come in from a background receving thread
while (true)
{
// Check in_queue outside of mutex for performance reason
// I should use interlocks here, but this only an POC alpha-version
if (!in_queue_.empty())
{
// There is something in queue - get this answer to the client
std::lock_guard<std::mutex> l(in_mutex_);
*result = in_queue_.front();
in_queue_.pop_front();
++num_answers_;
// if (!(num_answers_%100000))
// printf("num_answers_=%d\n", num_answers_);
return 1;
}
else
// Wait a little bit for a new responses in the in_queue
// Use usleep for now instead of the conditional
// Yeah, I know, this is shit :-)
usleep(1000);
}
}
void DoTest(TarantoolConnection *conn)
{
struct tnt_reply *result;
for (int i = 0; i < 100000000; ++i)
conn->DoQuery(NULL, &result);
}
int main()
{
TarantoolConnection conn("172.31.26.200:3301");
if (!conn.IsConnected())
return 0;
std::thread t[1000];
for (int i = 0;i < 500;++i)
t[i] = std::thread(&DoTest, &conn);
sleep(1000000);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment