// schedule_read is called o start reading void connection::connect() { boost::asio::ip::tcp::resolver resolver(_io); boost::asio::ip::tcp::resolver::query query(_hostname, std::to_string(_port)); boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query); boost::system::error_code ec; boost::asio::connect(_socket, iterator, ec); if (ec) { error("Cannot connect: " + ec.message()); throw fatal_exception(ec.message()); } schedule_read(); } connection::connection(): _socket(_io), { // _io is an instance variable _thread.reset(new std::thread(boost::bind(&boost::asio::io_service::run, &_io))); _worker.reset(new boost::asio::io_service::work(_io)); connect(); } void connection::schedule_read() { // _read_buf is a boost::asio::streambuf boost::asio::streambuf::mutable_buffers_type mutableBuffer = _read_buf.prepare(1024); // _socket.async_read_some(mutableBuffer, boost::bind(&connection::read_handler, this, _1, _2)); boost::asio::async_read(_socket, mutableBuffer, boost::asio::transfer_at_least(1), boost::bind(&connection::read_handler, this, _1, _2)); } void connection::read_handler(const boost::system::error_code &ec, std::size_t bytes_read) { // _read_buf is a boost::asio::streambuf trace("socket read returns: " + ec.message()); if (!ec) { trace(boost::str(boost::format("Read %d bytes") % bytes_read)); if (bytes_read > 0) { _read_buf.commit(bytes_read); boost::asio::streambuf::const_buffers_type bufs = _read_buf.data(); std::string data(boost::asio::buffers_begin(bufs), boost::asio::buffers_end(bufs)); trace(boost::str(boost::format("Connection is %p") % _connection.get())); std::size_t size = _connection->parse(data.c_str(), data.size()); trace(boost::str(boost::format("Consumed %d bytes") % size)); _read_buf.consume(size); trace(boost::str(boost::format("Still on buffer %d bytes") % _read_buf.size())); } schedule_read(); } else { if (!_closing) { throw fatal_exception("Error on connection: " + ec.message()); } } }