// Compile with clang++ -std=c++11 -g -o zmq_test zmq_test.cpp -lzmq /** * This demonstrates how a ZMQ_ROUTER socket can leave file descriptors * open forever. Compile this, and then run it like so. * One instance as a router: * zmq_test router 7777 * That starts a router that listens on port 7777. * * Next, start a number of peers which connect to this router. * zmq_test peer localhost 7777 * * These will announce themselves to the router via random identities, and * the router will start pinging them every 5 seconds, forever. * * Next, black-hole the peers by severing TCP traffic to the router. This can * be done via iptables. * * sudo iptables -A INPUT -p tcp --dport=7777 -j DROP * sudo iptables -A INPUT -p tcp --sport=7777 -j DROP * * At this point, the peers and router can not exchange packets and the * connection is effectively dead. * * Note, that the file descriptors in the router process remain open forever, * despite it trying to send a ping to each peer identity every 5 seconds. * * You can clear the iptables rules like this to restore the network: * sudo iptables -F INPUT * * Note, that the peers never reconnect to the router once the network is * fixed. * * You can see open file descriptors like this given your router's process ID * lsof -p | grep TCP * */ #include #include #include #include #include #include #include #include #include #include #include // Invoke like this to run a router listening on a port: // zmq_test router 1234 // Invoke like this to connect to a router // zmq_test peer using namespace std; void read_all_parts(void *zmq_sock, vector *parts) { int more; size_t more_size = sizeof(more); do { zmq_msg_t part; int rc = zmq_msg_init (&part); if (rc != 0) { zmq_msg_close (&part); return; } rc = zmq_msg_recv (&part, zmq_sock, 0); if (rc == -1) { fprintf(stderr, "zmq_msg_recv failed:%s\n", zmq_strerror(errno)); zmq_msg_close (&part); return; } parts->push_back(string((const char *)zmq_msg_data(&part), zmq_msg_size(&part))); rc = zmq_getsockopt (zmq_sock, ZMQ_RCVMORE, &more, &more_size); if (rc != 0) { fprintf(stderr, "zmq_getsockopt failed\n"); zmq_msg_close (&part); return; } zmq_msg_close (&part); } while (more); } void send_all_parts(void *zmq_sock, vector parts) { for (int i = 0; i < parts.size(); i++) { bool last = (i == parts.size() - 1); zmq_msg_t msg; string part = parts[i]; int rc = zmq_msg_init_size (&msg, part.length()); if (rc != 0) { return; } memcpy(zmq_msg_data(&msg), part.data(), part.length()); rc = zmq_msg_send(&msg, zmq_sock, last ? 0 : ZMQ_SNDMORE); if (rc == -1) { return; } } } void router(int port) { void *ctx = zmq_ctx_new(); void *sock = zmq_socket(ctx, ZMQ_ROUTER); int value = 500; zmq_setsockopt(sock, ZMQ_IDENTITY, "router", 6); zmq_setsockopt(sock, ZMQ_HEARTBEAT_IVL, &value, sizeof(value)); zmq_setsockopt(sock, ZMQ_HEARTBEAT_TTL, &value, sizeof(value)); stringstream ss; ss << "tcp://*:" << port; fprintf(stdout, "Listening on: %s\n", ss.str().data()); int rc = zmq_bind(sock, ss.str().data()); if (rc != 0) { fprintf(stderr, "zmq_bind failed: %s\n", zmq_strerror(errno)); } struct zmq_pollitem_t poll_item; poll_item.socket = sock; poll_item.fd = 0; poll_item.events = ZMQ_POLLIN; poll_item.revents = 0; vector peers; time_t last_ping = time(0); while (true) { int rc = zmq_poll(&poll_item, 1, 5000); time_t now = time(0); if (now - last_ping >= 5) { // Timed out. Ping everybody. fprintf(stdout, "Pinging all known peers\n"); for (auto peer : peers) { fprintf(stdout, "\t%s\n", peer.data()); vector envelope; envelope.push_back(peer); envelope.push_back(""); envelope.push_back("ping"); } last_ping = now; } if (rc == 0) { continue; } vector envelope; read_all_parts(sock, &envelope); //fprintf(stdout, "Got %zu parts\n", envelope.size()); if (envelope.size() != 3) { continue; } string peer = envelope[0]; if (find(peers.begin(), peers.end(), peer) == peers.end()) { fprintf(stdout, "New peer: %s\n", peer.data()); peers.push_back(peer); } fprintf(stdout, "Peer (%s) says (%s)\n", peer.data(), envelope[2].data()); } } void peer(char *router_hostname, int router_port) { void *ctx = zmq_ctx_new(); void *sock = zmq_socket(ctx, ZMQ_ROUTER); { stringstream ss; ss << "peer-" << getpid(); string id = ss.str(); zmq_setsockopt(sock, ZMQ_IDENTITY, id.data(), id.length()); } stringstream ss; ss << "tcp://" << router_hostname << ":" << router_port; fprintf(stdout, "Connecting to: %s\n", ss.str().data()); int rc = zmq_connect(sock, ss.str().data()); if (rc != 0) { fprintf(stderr, "zmq_connect failed: %s\n", zmq_strerror(errno)); } while (true) { vector envelope; envelope.push_back("router"); envelope.push_back(""); envelope.push_back("ping"); send_all_parts(sock, envelope); sleep(5); } } int main(int argc, char** argv) { if (argc < 2) { fprintf(stderr, "router or peer argument required\n"); return -1; } auto mode = string(argv[1]); if (mode == "router") { if (argc < 3) { fprintf(stderr, "Port required for router mode\n"); return -1; } int port; try { port = stoi(string(argv[2])); } catch (invalid_argument) { fprintf(stderr, "%s isn't a number\n", argv[2]); return -1; } fprintf(stdout, "Running in router mode on port %d (pid %d)\n", port, getpid()); router(port); } else if (mode == "peer") { if (argc < 4) { fprintf(stderr, "router hostname and port number required\n"); return -1; } int port; try { port = stoi(string(argv[3])); } catch (invalid_argument) { fprintf(stderr, "%s isn't a number\n", argv[3]); return -1; } fprintf(stdout, "Connecting to %s:%d (pid %d)\n", argv[2], port, getpid()); peer(argv[2], port); } else { fprintf(stderr, "Unknown mode %s\n", argv[1]); return -1; } return 0; }