Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Single producer and multiple single-threaded consumers

My application receives packets from the network and dispatches them to one or more "processors". (Each packet belongs to a predefined "stream" which can be identified by looking at the packet data.)

There is currently a single thread that does all the work:

  1. fetch the packets from the network device
  2. identify the processors for each packet
  3. dispatching the packet to its processors

Incoming data is received at a rate of 20 million packets per second (10Gbps of 60-byte packets.)

This solution however only can keep up with a very small number of streams and processors. For example, in case of 10 streams there's already about 10-20% packet loss.

Since step (3) is the most expensive one I plan to delegate that work to a pool of worker threads.

However, I must be careful because the processors themselves are not thread-safe. So only one worker thread can dispatch packets to the same processor at the same time.

This seems like a good use-case for task-based programming. But I can't easily match the design patterns explained in the TBB docs to my problem.

So my question is: How can I organize my consumer threads so that they distribute the packets evenly to the single-threaded processors?

I'm not expecting a fully worked out solution but I would be happy with just your suggestions or random ideas :)

like image 573
StackedCrooked Avatar asked Jan 21 '15 00:01

StackedCrooked


3 Answers

I've done some embedded programming where I had to deal with relatively high throughputs - not as fast as you have here! Hopefully, you're on some much more capable hardware than I'm used to as well... There are some simple strategies that should apply to your situation!

1. The input/processing queue and related memory management is critical.

The queue for incoming data must be very very efficient if you have high-data rates. You should do the least amount of processing possible, otherwise you risk losing data from the device. (I'm used to reading data from some kind of fast-serial device with a relatively small buffer, so there are realtime constraints on how long the device can be left without being read from without losing data. This has got me in the habit of dealing with reading from the device as a completely standalone task which just deals with reading data and nothing else.)

A very simple series of fixed-sized preallocated buffers is about as efficient as it gets: have a queue of 'free' buffers, and a queue of 'filled' buffers. If you use a lock-free linked-list, maintaining these lists can be very fast, and operations for enqueue/dequeue are pretty common in many OSs.

Avoid using malloc or other dynamic allocation since they have significant (and often unpredictable) overheads when they need to manage their own data-structures of 'free' and 'allocated' blocks. They also may perform locks that might block the producer or worker threads unpredictably if they're both freeing or allocating memory around the same time. Instead, try and find the lower level routines for allocating and releasing whole pages provided by your OS for your queues (mmap on unixy-platforms, VirtualAllocEx). These are usually have to do a lot less work since they're using MMU features to map physical pages of RAM and don't have a complicated data structure in memory to maintain, have a more dependable runtime on each call, and could be fast enough to expand your free list if it's running low.

In the producer, don't worry about units smaller than whole blocks. Take a free block from the queue, pack a block full of data, add it to the queue to be processed. If you have to ensure each packet is processed within a fixed period of time, or you need to deal with 'bursty' data rates, then still try and read a full buffer from your input device, but either reduce the size of the block to be a 'reasonable' amount of time, or use a timeout and enqueue partially filled blocks to be processed and 'fill' the remainder with some kind of null-packet. I've found that it's often faster to do that than have to include lots of code for handling partially filled buffers.

If you can, set the processor affinity and thread priority of you producer thread very carefully. Ideally, you want the producer thread to have higher priority than any of the consumer threads, and to be tied to a specific core. Nothing should prevent incoming data from being read short of running out of buffer space.

2. Processing

You've said that there are:

  1. Several streams
  2. Several 'processors', that are not thread-safe

What would be useful to do here is to parallelise running the processors on the packets, but it's not clear from your question to what extent that's possible.

Are processors thread-safe across streams? (Could we run a processor in two different threads as long as they were operating on two different streams?)

Are processors thread-safe across different processors in the same stream? (Could we run several processors on the same stream in seperate threads?)

Do processors need to be ran in a specific order?

Without knowing this, there's still some generic things that are useful advice.

Have a second thread which is dealing with reading full buffers from the producer and dispatching them to appropriate processors (in other threads) and then putting the complete buffer back in the 'empty' queue for processing. Whilst you lose some straight-line efficiency (one thread doing reading and dispatching would be marginally 'faster' than two), at least this way won't block reading from the input device if there's a momentary lock.

Create or find a library with allows you to allocate jobs to a thread-pool, especially if you have many processors compared to the number of threads that you can run in parallel. It's relatively straightforward to also implement some kind of job-queuing that allows some simple relationships between jobs (e.g. "this job requires that job X and Y have been done first", "this job cannot be run in parallel with any other job that uses the same processor"). Even a simple strategy where the job-manager just runs the first runnable job on the first available thread can be very effective.

Try to avoid copying. If the processors can process a packet 'in-place' without copying them from the buffer, then you've saved a lot of pointless cycles. Even if you do have to copy, having several threads copying data from a 'read only' shared buffer is better than having a single thread copying and dispatching messages to several threads.

If checking whether a processor should be ran for a given packet is very fast, then you may be better off having several jobs, each checking if it should do some processing. Instead of having a single thread figure out which processors should run on which packets, it may be faster to have multiple threads, one for each processor or group of processors, checking each packet once whether it's processor should be ran. This just comes down to the idea that a simple check on a read-only resource several times in several threads might take less time overall than doing the synchronisation between the threads.

If you can run processors in parallel if they're processing data from different streams, then doing a pass through the data to get a list of the streams and then start a job for each stream is a good idea. You could also gather a list of the packets that belong to each stream, but again, it's a trade-off between how fast a job could check each packet vs the time it takes to gather that list in a single-thread and pass each to their respective jobs.

Hopefully, some of these strategies can be useful in your case! Let us know how it works out... that's a hell of a lot of data you've got to process, and it'd be good to know what is and isn't effective on faster data rates than I'm used to! Good luck!

like image 167
Wuggy Avatar answered Oct 01 '22 16:10

Wuggy


Here's my idea for a possible solution.

Let's say we have n processors. Let's introduce n mutexes, one per processor. Let's also introduce a queue for packets. All incoming packets are put into this queue.

A worker thread operates like this:

  1. Grab packet from the incoming packets queue.
  2. Identify the necessary processor.
  3. Try to acquire the corresponding mutex. If lock acquisition succeeds, process the packet. Otherwise, re-enqueue and go to 1.
  4. After processing is done, go to step 1.

Possible downsides:

  1. Packets are re-enqueued which means they can be delayed/processed out-of-order which may be a deal-breaker for you (not sure).
  2. Contention on the queue is likely to be high. You probably want to look into using a lock-free queue for this.
  3. The queue obviously consumes additional memory, I don't know if you have memory to spare.

EDIT: more thoughts on memory consumption - of course, its's possible to put an upper limit on the amount of memory the queue can consume - then, the question is what to do when you run out of memory. I would say the best thing to do is just start dropping packets (I got the impression that dropping a few isn't a big deal in your case) until the queue drains a bit.

Somewhat related to that - I think a good queue implementation for this use case should avoid dynamic memory allocation at all costs - preallocate memory upfront and make sure there are no allocs on the critical code path.

like image 44
nicebyte Avatar answered Oct 01 '22 17:10

nicebyte


Why can't you use multiple queues, one per each processor? These queues can be lock-free (without mutexes).

  1. fetch the packets from the network device
  2. identify the processors for each packet (PID)
  3. push packet to queue[PID]
  4. a worker: process packet from the queue[k]

For similar problem, I use a poll of lock-free ring buffers with auto-overwriting of oldest packets.

like image 32
Glavnyy Avatar answered Oct 01 '22 17:10

Glavnyy