Skip to content

Instantly share code, notes, and snippets.

@pavel-odintsov
Last active June 27, 2024 19:28
Show Gist options
  • Select an option

  • Save pavel-odintsov/15b7435e484134650f20 to your computer and use it in GitHub Desktop.

Select an option

Save pavel-odintsov/15b7435e484134650f20 to your computer and use it in GitHub Desktop.

Revisions

  1. pavel-odintsov revised this gist Jun 26, 2015. 1 changed file with 5 additions and 0 deletions.
    5 changes: 5 additions & 0 deletions af_packet_rx_ring.c
    Original file line number Diff line number Diff line change
    @@ -14,6 +14,11 @@
    #include <linux/if_packet.h>
    #include <net/ethernet.h> /* the L2 protocols */

    /*
    Parser files:
    https://github.com/FastVPSEestiOu/fastnetmon/blob/master/src/fastnetmon_packet_parser.c
    https://github.com/FastVPSEestiOu/fastnetmon/blob/master/src/fastnetmon_packet_parser.h
    */
    #include "../fastnetmon_packet_parser.h"

    // 4194304 bytes
  2. pavel-odintsov created this gist Jun 25, 2015.
    309 changes: 309 additions & 0 deletions af_packet_rx_ring.c
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,309 @@
    #include <stdlib.h>
    #include <stdio.h>
    #include <string.h>
    #include <iostream>
    #include <unistd.h>

    #include <boost/thread.hpp>
    #include <sys/mman.h>
    #include <poll.h>
    #include <arpa/inet.h>
    #include <sys/socket.h>
    #include <sys/ioctl.h>
    #include <net/if.h>
    #include <linux/if_packet.h>
    #include <net/ethernet.h> /* the L2 protocols */

    #include "../fastnetmon_packet_parser.h"

    // 4194304 bytes
    unsigned int blocksiz = 1 << 22;
    // 2048 bytes
    unsigned int framesiz = 1 << 11;
    unsigned int blocknum = 64;

    struct block_desc {
    uint32_t version;
    uint32_t offset_to_priv;
    struct tpacket_hdr_v1 h1;
    };

    /*
    Build it:
    g++ ../fastnetmon_packet_parser.c -ofastnetmon_packet_parser.o -c
    g++ af_packet.cpp fastnetmon_packet_parser.o -lboost_thread -lboost_system -lpthread
    */

    // Get interface number by name
    int get_interface_number_by_device_name(int socket_fd, std::string interface_name) {
    struct ifreq ifr;
    memset(&ifr, 0, sizeof(ifr));

    if (interface_name.size() > IFNAMSIZ) {
    return -1;
    }

    strncpy(ifr.ifr_name, interface_name.c_str(), sizeof(ifr.ifr_name));

    if (ioctl(socket_fd, SIOCGIFINDEX, &ifr) == -1) {
    return -1;
    }

    return ifr.ifr_ifindex;
    }

    unsigned int af_packet_threads = 1;

    uint64_t received_packets = 0;
    uint64_t received_bytes = 0;

    void speed_printer() {
    while (true) {
    uint64_t packets_before = received_packets;

    boost::this_thread::sleep(boost::posix_time::seconds(1));

    uint64_t packets_after = received_packets;
    uint64_t pps = packets_after - packets_before;

    printf("We process: %llu pps\n", pps);
    }
    }

    void flush_block(struct block_desc *pbd) {
    pbd->h1.block_status = TP_STATUS_KERNEL;
    }

    void walk_block(struct block_desc *pbd, const int block_num) {
    int num_pkts = pbd->h1.num_pkts, i;
    unsigned long bytes = 0;
    struct tpacket3_hdr *ppd;

    ppd = (struct tpacket3_hdr *) ((uint8_t *) pbd +
    pbd->h1.offset_to_first_pkt);
    for (i = 0; i < num_pkts; ++i) {
    bytes += ppd->tp_snaplen;

    // struct ethhdr *eth = (struct ethhdr *) ((uint8_t *) ppd + ppd->tp_mac);
    // Print packets

    // #define PRINT_PACKETS
    #ifdef PRINT_PACKETS
    struct pfring_pkthdr packet_header;
    memset(&packet_header, 0, sizeof(packet_header));
    packet_header.len = ppd->tp_snaplen;
    packet_header.caplen = ppd->tp_snaplen;

    u_int8_t timestamp = 0;
    u_int8_t add_hash = 0;

    u_char* data_pointer = (u_char*)((uint8_t *) ppd + ppd->tp_mac);

    fastnetmon_parse_pkt(data_pointer, &packet_header, 4, timestamp, add_hash);

    char print_buffer[512];
    fastnetmon_print_parsed_pkt(print_buffer, 512, data_pointer, &packet_header);
    printf("%s\n", print_buffer);
    #endif

    ppd = (struct tpacket3_hdr *) ((uint8_t *) ppd +
    ppd->tp_next_offset);
    }

    received_packets += num_pkts;
    received_bytes += bytes;
    }

    int setup_socket(std::string interface_name, int fanout_group_id) {
    // More details here: http://man7.org/linux/man-pages/man7/packet.7.html
    // We could use SOCK_RAW or SOCK_DGRAM for second argument
    // SOCK_RAW - raw packets pass from the kernel
    // SOCK_DGRAM - some amount of processing
    // Third argument manage ether type of captured packets
    int packet_socket = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL));

    if (packet_socket == -1) {
    printf("Can't create AF_PACKET socket\n");
    return -1;
    }

    // We whould use V3 bcause it could read/pool in per block basis instead per packet
    int version = TPACKET_V3;
    int setsockopt_packet_version = setsockopt(packet_socket, SOL_PACKET, PACKET_VERSION, &version, sizeof(version));

    if (setsockopt_packet_version < 0) {
    printf("Can't set packet v3 version\n");
    return -1;
    }

    int interface_number = get_interface_number_by_device_name(packet_socket, interface_name);

    if (interface_number == -1) {
    printf("Can't get interface number by interface name\n");
    return -1;
    }

    // Switch to PROMISC mode
    struct packet_mreq sock_params;
    memset(&sock_params, 0, sizeof(sock_params));
    sock_params.mr_type = PACKET_MR_PROMISC;
    sock_params.mr_ifindex = interface_number;

    int set_promisc = setsockopt(packet_socket, SOL_PACKET, PACKET_ADD_MEMBERSHIP, (void *)&sock_params, sizeof(sock_params));

    if (set_promisc == -1) {
    printf("Can't enable promisc mode\n");
    return -1;
    }

    struct sockaddr_ll bind_address;
    memset(&bind_address, 0, sizeof(bind_address));

    bind_address.sll_family = AF_PACKET;
    bind_address.sll_protocol = htons(ETH_P_ALL);
    bind_address.sll_ifindex = interface_number;

    // We will follow http://yusufonlinux.blogspot.ru/2010/11/data-link-access-and-zero-copy.html
    // And this: https://www.kernel.org/doc/Documentation/networking/packet_mmap.txt

    struct tpacket_req3 req;
    memset(&req, 0, sizeof(req));

    req.tp_block_size = blocksiz;
    req.tp_frame_size = framesiz;
    req.tp_block_nr = blocknum;
    req.tp_frame_nr = (blocksiz * blocknum) / framesiz;

    req.tp_retire_blk_tov = 60; // Timeout in msec
    req.tp_feature_req_word = TP_FT_REQ_FILL_RXHASH;

    int setsockopt_rx_ring = setsockopt(packet_socket, SOL_PACKET , PACKET_RX_RING , (void*)&req , sizeof(req));

    if (setsockopt_rx_ring == -1) {
    printf("Can't enable RX_RING for AF_PACKET socket\n");
    return -1;
    }

    // We use per thread structures
    uint8_t* mapped_buffer = NULL;
    struct iovec* rd = NULL;

    mapped_buffer = (uint8_t*)mmap(NULL, req.tp_block_size * req.tp_block_nr, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_LOCKED, packet_socket, 0);

    if (mapped_buffer == MAP_FAILED) {
    printf("mmap failed!\n");
    return -1;
    }

    // Allocate iov structure for each block
    rd = (struct iovec*)malloc(req.tp_block_nr * sizeof(struct iovec));

    // Initilize iov structures
    for (int i = 0; i < req.tp_block_nr; ++i) {
    rd[i].iov_base = mapped_buffer + (i * req.tp_block_size);
    rd[i].iov_len = req.tp_block_size;
    }

    int bind_result = bind(packet_socket, (struct sockaddr *)&bind_address, sizeof(bind_address));

    if (bind_result == -1) {
    printf("Can't bind to AF_PACKET socket\n");
    return -1;
    }

    if (fanout_group_id) {
    // PACKET_FANOUT_LB - round robin
    // PACKET_FANOUT_CPU - send packets to CPU where packet arrived
    int fanout_type = PACKET_FANOUT_CPU;

    int fanout_arg = (fanout_group_id | (fanout_type << 16));

    int setsockopt_fanout = setsockopt(packet_socket, SOL_PACKET, PACKET_FANOUT, &fanout_arg, sizeof(fanout_arg));

    if (setsockopt_fanout < 0) {
    printf("Can't configure fanout\n");
    return -1;
    }
    }

    unsigned int current_block_num = 0;

    struct pollfd pfd;
    memset(&pfd, 0, sizeof(pfd));

    pfd.fd = packet_socket;
    pfd.events = POLLIN | POLLERR;
    pfd.revents = 0;

    while (true) {
    struct block_desc *pbd = (struct block_desc *) rd[current_block_num].iov_base;

    if ((pbd->h1.block_status & TP_STATUS_USER) == 0) {
    poll(&pfd, 1, -1);

    continue;
    }

    walk_block(pbd, current_block_num);
    flush_block(pbd);
    current_block_num = (current_block_num + 1) % blocknum;
    }

    return packet_socket;
    }

    void start_af_packet_capture(std::string interface_name, int fanout_group_id) {
    setup_socket(interface_name, fanout_group_id);
    }

    void get_af_packet_stats() {
    // getsockopt PACKET_STATISTICS
    }

    // Could get some speed up on NUMA servers
    bool execute_strict_cpu_affinity = false;

    bool use_multiple_fanout_processes = true;

    int main() {
    int fanout_group_id = getpid() & 0xffff;

    boost::thread speed_printer_thread( speed_printer );

    if (use_multiple_fanout_processes) {
    boost::thread_group packet_receiver_thread_group;

    unsigned int num_cpus = 8;
    for (int cpu = 0; cpu < num_cpus; cpu++) {
    boost::thread::attributes thread_attrs;

    if (execute_strict_cpu_affinity) {
    cpu_set_t current_cpu_set;

    int cpu_to_bind = cpu % num_cpus;
    CPU_ZERO(&current_cpu_set);
    // We count cpus from zero
    CPU_SET(cpu_to_bind, &current_cpu_set);

    int set_affinity_result = pthread_attr_setaffinity_np(thread_attrs.native_handle(), sizeof(cpu_set_t), &current_cpu_set);

    if (set_affinity_result != 0) {
    printf("Can't set CPU affinity for thread\n");
    }
    }

    packet_receiver_thread_group.add_thread(
    new boost::thread(thread_attrs, boost::bind(start_af_packet_capture, "eth6", fanout_group_id))
    );
    }

    // Wait all processes for finish
    packet_receiver_thread_group.join_all();
    } else {
    start_af_packet_capture("eth6", 0);
    }

    speed_printer_thread.join();
    }