I am writing a small server that will receive data from multiple sources and process this data. The sources and data received is significant, but no more than epoll should be able to handle quite well. However, all received data must be parsed and run through a large number of tests which is time consuming and will block a single thread despite epoll multiplexing. Basically, the pattern should be something like follows: IO-loop receives data and bundles it into a job, sends to the first thread available in the pool, the bundle is processed by the job and the result is passed pack to the IO loop for writing to file.
I have decided to go for a single IO thread and N worker threads. The IO thread for accepting tcp connections and reading data is easy to implement using the example provided at: http://linux.die.net/man/7/epoll
Thread are also usually easy enough to deal with, but I am struggling to combine the epoll IO loop with a threadpool in an elegant manner. I am unable to find any "best practice" for using epoll with a worker pool online either, but quite a few questions regarding the same topic.
I therefore have some question I hope someone can help me answering:
EDIT: Can one possible solution be to update a ring buffer from the IO-loop, after update send the ring buffer index to the workers through a shared pipe for all workers (thus giving away control of that index to the first worker that reads the index off the pipe), let the worker own that index until end of processing and then send the index number back into the IO-thread through a pipe again, thus giving back control?
My application is Linux-only, so I can use Linux-only functionality in order to achieve this in the most elegant way possible. Cross platform support is not needed, but performance and thread safety is.
When performing this model, because we only know the packet size once we have fully received the packet, unfortunately we cannot offload the receive itself to a worker thread. Instead the best we can still do is a thread to receive the data which will have to pass off pointers to fully received packets.
The data itself is probably best held in a circular buffer, however we will want a separate buffer for each input source (if we get a partial packet we can continue receiving from other sources without splitting up the data. The remaining question is how to inform the workers of when a new packet is ready, and to give them a pointer to the data in said packet. Because there is little data here, just some pointers the most elegant way of doing this would be with posix message queues. These provide the ability for multiple senders and multiple receivers to write and read messages, always ensuring every message is received and by precisely 1 thread.
You will want a struct resembling the one below for each data source, I shall go through the fields purposes now.
struct DataSource
{
int SourceFD;
char DataBuffer[MAX_PACKET_SIZE * (THREAD_COUNT + 1)];
char *LatestPacket;
char *CurrentLocation
int SizeLeft;
};
The SourceFD is obviously the file descriptor to the data stream in question, the DataBuffer is where Packets contents are held while being processed, it is a circular buffer. The LatestPacket pointer is used to temporarily hold a pointer to the most resent packet in case we receive a partial packet and move onto another source before passing the packet off. The CurrentLocation stores where the latest packet ends so that we know where to place the next one, or where to carry on in case of partial receive. The size left is the room left in the buffer, this will be used to tell if we can fit the packet or need to circle back around to the beginning.
The receiving function will thus effectively
The worker thread will do its processing using the received pointers and then increase the SizeLeft so the receiver thread will know it can carry on filling the buffer. The atomic functions will be needed to work on the size value in the struct so we don't get race conditions with the size property (as it is possible it is written by a worker and the IO thread simultaneously, causing lost writes, see my comment below), they are listed here and are simple and extremely useful.
Now, I have given some general background but will address the points given specifically:
Finally your edit is fairly sensible, except for the fact as I ave suggested, message queues are far better than pipes here as they very efficiently signal events , guarantee a full message read and provide automatic framing.
I hope this helps, however it is late so if I missed anything or you have questions feel free to comment for clarification or more explanation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With