Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Linux, Understanding setsockopt() PACKET_FANOUT for network scaling

I have read the packet man page and a few blog | posts trying to understand how to use the PACKET_FANOUT socket option to scale the processing of received data (I am looking to use SOCK_RAW to capture traffic at high speeds, >10Gbps). I have read through this example code (copied below) but I'm not sure if I have fully understood it.

Lets imagine a scenario; RSS has been set up on the NIC and ingressing traffic is evenly distributed between RX queues, there is an 8 core CPU and 8 NIC RX queues, each RX queue [0-7] sends an interrupt to CPU [0-7] respectively (further discussion about MMAP, zero copy, poll() et al is off topic here).

This is the order of events as I see it in the example code:

  1. 8 worker threads [0-7] are started, each one pinned to a CPU [0-7].
  2. Each thread creates a socket (again we'll say 0-7) using setup_socket() bound to the same physical NIC, in promisc mode, and all part of the same FANOUT group.
  3. Now we have (for example) worker thread 0 bound to CPU 0 which created socket 0. This thread enters an infinite loop calling read() against socket 0 only.
  4. When a packet comes into NIC RX queue 0, an interrupt is sent to CPU 0. CPU 0 DMAs the packet into Kernel receive buffer space. The PACKET_FANOUT socket option was applied with the flag PACKET_FANOUT_CPU so only sockets on the same CPU core as that which the packet came in on (core 0) will show data available when a read() call is made to that socket (so socket 0 created by thread 0 only) and the data is then copied to the userland receive buffer for that thread only, because of this flag.

Point number 4 is the main point of doubt in my understanding of this process. Have I understood correctly how scaling works with PACKET_FANOUT in this scenario and how we lock a worker thread to the same core processing the interrupt?

void start_af_packet_capture(std::string interface_name, int fanout_group_id) {

    // setup_socket() calls socket() (using SOCK_RAW) to created the socketFD,
    // setsockopt() to enable promisc mode on the NIC,
    // bind() to bind the socketFD to NIC,
    // and setsockopt() again to set PACKET_FANOUT + PACKET_FANOUT_CPU
    int packet_socket = setup_socket(interface_name, fanout_group_id); 

    if (packet_socket == -1) {
        printf("Can't create socket\n");
        return;
    }

    unsigned int capture_length = 1500;
    char buffer[capture_length];

    while (true) {
        received_packets++;

        int readed_bytes = read(packet_socket, buffer, capture_length); 

        // printf("Got %d bytes from interface\n", readed_bytes);

        consume_pkt((u_char*)buffer, readed_bytes);

        if (readed_bytes < 0) {
            break;
        }
    }
} 

...

bool use_multiple_fanout_processes = true;

// Could get some speed up on NUMA servers
bool execute_strict_cpu_affinity = false;

int main() {
     boost::thread speed_printer_thread( speed_printer );

    int fanout_group_id = getpid() & 0xffff;

    if (use_multiple_fanout_processes) {
        boost::thread_group packet_receiver_thread_group;

        unsigned int num_cpus = 8;
        for (int cpu = 0; cpu < num_cpus; cpu++) {
            boost::thread::attributes thread_attrs;

            if (execute_strict_cpu_affinity) {
                cpu_set_t current_cpu_set;

                int cpu_to_bind = cpu % num_cpus;
                CPU_ZERO(&current_cpu_set);
                // We count cpus from zero
                CPU_SET(cpu_to_bind, &current_cpu_set);

                int set_affinity_result = pthread_attr_setaffinity_np(thread_attrs.native_handle(), sizeof(cpu_set_t), &current_cpu_set);

                if (set_affinity_result != 0) {
                    printf("Can't set CPU affinity for thread\n");
                } 
            }

            packet_receiver_thread_group.add_thread(
                new boost::thread(thread_attrs, boost::bind(start_af_packet_capture, "eth6", fanout_group_id))
            );
        }

        // Wait all processes for finish
        packet_receiver_thread_group.join_all();
    } else {
        start_af_packet_capture("eth6", 0);
    }

    speed_printer_thread.join();
}

Edit: Bonus Question

This might be too unrelated in which case please advise and I will start a separate SO post. The aim here is not just to scale packet processing across multiple cores but also to place the packet processing code on the same core that receives that packet (later MMAP & RX_RING will be explored) so that there are fewer context switches and cache misses on the CPU. My understanding is that this goal is being achieved here, can someone please confirm or deny?

like image 752
jwbensley Avatar asked Jan 05 '23 17:01

jwbensley


1 Answers

As best I can tell, no, not quite. fanout_demux_cpu calculates a "hash" using the cpu and the number of sockets in the fanout group, which happens to be smp_processor_id() % num. packet_rcv_fanout then uses this as an index into the array of sockets in the fanout group to determine which socket gets it.

Once you see that the whole design of the fanout group is based on creating some sort of hash based on the properties of the received packet, and not based on the properties of a thread trying to read a socket, you should probably just let the scheduler sort things out rather than pinning threads.

Alternatively, you could dig further into the code to reverse engineer the order of the sockets in the array, but that would be fragile, and you might want to verify that you have done so correctly using systemtap. You could then create the sockets in a deterministic order (hopefully resulting in a deterministic order in the array) and pin the thread listening on a given socket to the appropriate cpu.

like image 168
Jim D. Avatar answered Jan 07 '23 17:01

Jim D.