#include #include #include #include #include #include #include #include #include #include #include #include #include #include /* the L2 protocols */ /* Parser files: https://github.com/FastVPSEestiOu/fastnetmon/blob/master/src/fastnetmon_packet_parser.c https://github.com/FastVPSEestiOu/fastnetmon/blob/master/src/fastnetmon_packet_parser.h */ #include "../fastnetmon_packet_parser.h" // 4194304 bytes unsigned int blocksiz = 1 << 22; // 2048 bytes unsigned int framesiz = 1 << 11; unsigned int blocknum = 64; struct block_desc { uint32_t version; uint32_t offset_to_priv; struct tpacket_hdr_v1 h1; }; /* Build it: g++ ../fastnetmon_packet_parser.c -ofastnetmon_packet_parser.o -c g++ af_packet.cpp fastnetmon_packet_parser.o -lboost_thread -lboost_system -lpthread */ // Get interface number by name int get_interface_number_by_device_name(int socket_fd, std::string interface_name) { struct ifreq ifr; memset(&ifr, 0, sizeof(ifr)); if (interface_name.size() > IFNAMSIZ) { return -1; } strncpy(ifr.ifr_name, interface_name.c_str(), sizeof(ifr.ifr_name)); if (ioctl(socket_fd, SIOCGIFINDEX, &ifr) == -1) { return -1; } return ifr.ifr_ifindex; } unsigned int af_packet_threads = 1; uint64_t received_packets = 0; uint64_t received_bytes = 0; void speed_printer() { while (true) { uint64_t packets_before = received_packets; boost::this_thread::sleep(boost::posix_time::seconds(1)); uint64_t packets_after = received_packets; uint64_t pps = packets_after - packets_before; printf("We process: %llu pps\n", pps); } } void flush_block(struct block_desc *pbd) { pbd->h1.block_status = TP_STATUS_KERNEL; } void walk_block(struct block_desc *pbd, const int block_num) { int num_pkts = pbd->h1.num_pkts, i; unsigned long bytes = 0; struct tpacket3_hdr *ppd; ppd = (struct tpacket3_hdr *) ((uint8_t *) pbd + pbd->h1.offset_to_first_pkt); for (i = 0; i < num_pkts; ++i) { bytes += ppd->tp_snaplen; // struct ethhdr *eth = (struct ethhdr *) ((uint8_t *) ppd + ppd->tp_mac); // Print packets // #define PRINT_PACKETS #ifdef PRINT_PACKETS struct pfring_pkthdr packet_header; memset(&packet_header, 0, sizeof(packet_header)); packet_header.len = ppd->tp_snaplen; packet_header.caplen = ppd->tp_snaplen; u_int8_t timestamp = 0; u_int8_t add_hash = 0; u_char* data_pointer = (u_char*)((uint8_t *) ppd + ppd->tp_mac); fastnetmon_parse_pkt(data_pointer, &packet_header, 4, timestamp, add_hash); char print_buffer[512]; fastnetmon_print_parsed_pkt(print_buffer, 512, data_pointer, &packet_header); printf("%s\n", print_buffer); #endif ppd = (struct tpacket3_hdr *) ((uint8_t *) ppd + ppd->tp_next_offset); } received_packets += num_pkts; received_bytes += bytes; } int setup_socket(std::string interface_name, int fanout_group_id) { // More details here: http://man7.org/linux/man-pages/man7/packet.7.html // We could use SOCK_RAW or SOCK_DGRAM for second argument // SOCK_RAW - raw packets pass from the kernel // SOCK_DGRAM - some amount of processing // Third argument manage ether type of captured packets int packet_socket = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL)); if (packet_socket == -1) { printf("Can't create AF_PACKET socket\n"); return -1; } // We whould use V3 bcause it could read/pool in per block basis instead per packet int version = TPACKET_V3; int setsockopt_packet_version = setsockopt(packet_socket, SOL_PACKET, PACKET_VERSION, &version, sizeof(version)); if (setsockopt_packet_version < 0) { printf("Can't set packet v3 version\n"); return -1; } int interface_number = get_interface_number_by_device_name(packet_socket, interface_name); if (interface_number == -1) { printf("Can't get interface number by interface name\n"); return -1; } // Switch to PROMISC mode struct packet_mreq sock_params; memset(&sock_params, 0, sizeof(sock_params)); sock_params.mr_type = PACKET_MR_PROMISC; sock_params.mr_ifindex = interface_number; int set_promisc = setsockopt(packet_socket, SOL_PACKET, PACKET_ADD_MEMBERSHIP, (void *)&sock_params, sizeof(sock_params)); if (set_promisc == -1) { printf("Can't enable promisc mode\n"); return -1; } struct sockaddr_ll bind_address; memset(&bind_address, 0, sizeof(bind_address)); bind_address.sll_family = AF_PACKET; bind_address.sll_protocol = htons(ETH_P_ALL); bind_address.sll_ifindex = interface_number; // We will follow http://yusufonlinux.blogspot.ru/2010/11/data-link-access-and-zero-copy.html // And this: https://www.kernel.org/doc/Documentation/networking/packet_mmap.txt struct tpacket_req3 req; memset(&req, 0, sizeof(req)); req.tp_block_size = blocksiz; req.tp_frame_size = framesiz; req.tp_block_nr = blocknum; req.tp_frame_nr = (blocksiz * blocknum) / framesiz; req.tp_retire_blk_tov = 60; // Timeout in msec req.tp_feature_req_word = TP_FT_REQ_FILL_RXHASH; int setsockopt_rx_ring = setsockopt(packet_socket, SOL_PACKET , PACKET_RX_RING , (void*)&req , sizeof(req)); if (setsockopt_rx_ring == -1) { printf("Can't enable RX_RING for AF_PACKET socket\n"); return -1; } // We use per thread structures uint8_t* mapped_buffer = NULL; struct iovec* rd = NULL; mapped_buffer = (uint8_t*)mmap(NULL, req.tp_block_size * req.tp_block_nr, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_LOCKED, packet_socket, 0); if (mapped_buffer == MAP_FAILED) { printf("mmap failed!\n"); return -1; } // Allocate iov structure for each block rd = (struct iovec*)malloc(req.tp_block_nr * sizeof(struct iovec)); // Initilize iov structures for (int i = 0; i < req.tp_block_nr; ++i) { rd[i].iov_base = mapped_buffer + (i * req.tp_block_size); rd[i].iov_len = req.tp_block_size; } int bind_result = bind(packet_socket, (struct sockaddr *)&bind_address, sizeof(bind_address)); if (bind_result == -1) { printf("Can't bind to AF_PACKET socket\n"); return -1; } if (fanout_group_id) { // PACKET_FANOUT_LB - round robin // PACKET_FANOUT_CPU - send packets to CPU where packet arrived int fanout_type = PACKET_FANOUT_CPU; int fanout_arg = (fanout_group_id | (fanout_type << 16)); int setsockopt_fanout = setsockopt(packet_socket, SOL_PACKET, PACKET_FANOUT, &fanout_arg, sizeof(fanout_arg)); if (setsockopt_fanout < 0) { printf("Can't configure fanout\n"); return -1; } } unsigned int current_block_num = 0; struct pollfd pfd; memset(&pfd, 0, sizeof(pfd)); pfd.fd = packet_socket; pfd.events = POLLIN | POLLERR; pfd.revents = 0; while (true) { struct block_desc *pbd = (struct block_desc *) rd[current_block_num].iov_base; if ((pbd->h1.block_status & TP_STATUS_USER) == 0) { poll(&pfd, 1, -1); continue; } walk_block(pbd, current_block_num); flush_block(pbd); current_block_num = (current_block_num + 1) % blocknum; } return packet_socket; } void start_af_packet_capture(std::string interface_name, int fanout_group_id) { setup_socket(interface_name, fanout_group_id); } void get_af_packet_stats() { // getsockopt PACKET_STATISTICS } // Could get some speed up on NUMA servers bool execute_strict_cpu_affinity = false; bool use_multiple_fanout_processes = true; int main() { int fanout_group_id = getpid() & 0xffff; boost::thread speed_printer_thread( speed_printer ); if (use_multiple_fanout_processes) { boost::thread_group packet_receiver_thread_group; unsigned int num_cpus = 8; for (int cpu = 0; cpu < num_cpus; cpu++) { boost::thread::attributes thread_attrs; if (execute_strict_cpu_affinity) { cpu_set_t current_cpu_set; int cpu_to_bind = cpu % num_cpus; CPU_ZERO(¤t_cpu_set); // We count cpus from zero CPU_SET(cpu_to_bind, ¤t_cpu_set); int set_affinity_result = pthread_attr_setaffinity_np(thread_attrs.native_handle(), sizeof(cpu_set_t), ¤t_cpu_set); if (set_affinity_result != 0) { printf("Can't set CPU affinity for thread\n"); } } packet_receiver_thread_group.add_thread( new boost::thread(thread_attrs, boost::bind(start_af_packet_capture, "eth6", fanout_group_id)) ); } // Wait all processes for finish packet_receiver_thread_group.join_all(); } else { start_af_packet_capture("eth6", 0); } speed_printer_thread.join(); }