I read the ZeroMq guide and I stumbled upon the following:
You MUST NOT share ØMQ sockets between threads. ØMQ sockets are not threadsafe. Technically it's possible to do this, but it demands semaphores, locks, or mutexes. This will make your application slow and fragile. The only place where it's remotely sane to share sockets between threads are in language bindings that need to do magic like garbage collection on sockets.
and later on:
Remember: Do not use or close sockets except in the thread that created them.
I also understood that the ZeroMQ Context
is threadsafe.
If a class registers for an event of a another class, in .Net, this event might be invoked from a different thread than the thread the listener was created on.
I think there are only two options to be able to dispatch something via ZeroMQ-Sockets from within an eventhandler:
Socket
was created in Socket
/ get the exisiting ZeroMQ-Socket
for the thread within the eventhandler by using the threadsafe ZeroMQ-Context
It seems that the 0MQ-Guide to discourage the first one and I don't think that creating a new ZeroMq-Socket for each thread is performant / the way to go.
My Question:
What is the correct pattern (the way it is meant to be) to publish messages via 0MQ from within an eventhandler?
Also, did the authors of the guide have the ZeroMQ-Binding for .Net in mind when they wrote:
The only place where it's remotely sane to share sockets between threads are in language bindings that need to do magic like garbage collection on sockets. ?
Here is some samplecode to emphasize my problem/question:
public class ExampleClass
{
public event EventHandler<ByteEventArgs> SomethinIsCalledFromAnotherThread;
}
public class ByteEventArgs : EventArgs
{
public byte[] BytesToSend;
}
public class Dispatcher
{
ZMQ.Context ctx;
public Dispatcher(ZMQ.Context mqcontext, ExampleClass exampleClassInstance)
{
this.ctx = mqcontext;
exampleClassInstance.SomethinIsCalledFromAnotherThread += new EventHandler<ByteEventArgs>(exampleClass_SomethinIsCalledFromAnotherThread);
}
void exampleClass_SomethinIsCalledFromAnotherThread(object sender, ByteEventArgs e)
{
// this method might be called by a different thread. So I have to get a new socket etc?
using (var socket = ctx.Socket(ZMQ.SocketType.PUSH))
{
// init socket etc..... and finally:
socket.Send(e.BytesToSend);
}
// isn't that too much overhead?
}
}
ZeroMQ (also spelled ØMQ, 0MQ or ZMQ) is an asynchronous messaging library, aimed at use in distributed or concurrent applications. It provides a message queue, but unlike message-oriented middleware, a ZeroMQ system can run without a dedicated message broker; the zero in the name is for zero broker.
A ØMQ context is thread safe and may be shared among as many application threads as necessary, without any additional locking required on the part of the caller. Individual ØMQ sockets are not thread safe except in the case where full memory barriers are issued when migrating a socket from one thread to another.
Contexts help manage any sockets that are created as well as the number of threads ZeroMQ uses behind the scenes. Create one when you initialize a process and destroy it as the process is terminated. Contexts can be shared between threads and, in fact, are the only ZeroMQ objects that can safely do this.
Writing to a socket by multiple threads is thread-safe as long as the other end can make sense of the intereleaved data. Reading from a socket by multiple threads is thread-safe as long as this end can make sense of the interleaved data.
You can create lots of 0MQ sockets, certainly as many as you have threads. If you create a socket in one thread, and use it in another, you must execute a full memory barrier between the two operations. Anything else will result in weird random failures in libzmq, as socket objects are not threadsafe.
There are a few conventional patterns, though I don't know how these map specifically to .NET:
In .net framework v4 and up you can use concurrent collection to solve this problem. Namely Producer-Consumer pattern. Multiple threads (handlers) can send data to a thread-safe queue and just single thread consumes data from the queue and sends it using the socket.
Here is the idea:
sendQueue = new BlockingCollection<MyStuff>(new ConcurrentQueue<MyStuff>());
// concurrent queue can accept from multiple threads/handlers safely
MyHandler += (MyStuff stuffToSend) => sendQueue.Add(stuffToSend);
// start single-threaded data send loop
Task.Factory.StartNew(() => {
using(var socket = context.Socket()) {
MyStuff stuffToSend;
// this enumerable will be blocking until CompleteAdding is called
foreach(var stuff in sendQueue.GetConsumingEnumerable())
socket.Send(stuff.Serialize());
}
});
// break out of the send loop when done
OnMyAppExit += sendQueue.CompleteAdding;
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With