After reading some articles like this about Qt Signal-Slot communications I still have a question concerning the queued connection.
If I have some threads sending signals all the time to each other and lets say one thread_slow
is running a slow method in it's event loop and another thread_fast
is running a fast one that sends multiple signals while the other thread is still running it's slow method.....when the slow method from thread_slow
returns to the event loop, will it process all the signals that were sent before by thread_fast
or just the last one (all the signals are the same type)?
If it will process all the signals, is it there a way to make the thread_slow
only process the last one? (Considering "the last one" in a multithread application might be vague, let's consider the last signal before the thread asked for the last signal, for the sake of simplicity, so new ones being sent while the thread looks for the last might be lost).
(I am asking this because I have multiple threads receiving data from multiple threads, and I dont want them to process old data, just the last one that was sent)
I have run some tests, and it appears that Qt will process all the signals. I made one thread do:
while(true) { QThread::msleep(500); emit testQueue(test); test++; }
and a slot in another will do:
void test::testQueue(int test) { test.store(private_test.load() + test); emit testText(QString("Test Queue: ") + QString::number(private_test.load())); }
and the thread will run:
while(true) { QThread::msleep(3000); QCoreApplication::processEvents(); private_test.store(private_test.load() + 1000); }
I am sending a signal from one thread to the other every 500 milliseconds, and the other thread sleeps for 3000 milliseconds (3 seconds) and then wakes up and increment an internal variable by 100. every time the slot is executed it emits a text with the value received + the internal variable. The result I am having is that every time QCoreApplication::processEvents();
is called, all signals are executed....(I edited this part because I found a bug in my previous code)
Queued Connection The slot is invoked when control returns to the event loop of the receiver's thread. The slot is executed in the receiver's thread. Blocking Queued Connection The slot is invoked as for the Queued Connection, except the current thread blocks until the slot returns.
In Qt, we have an alternative to the callback technique: We use signals and slots. A signal is emitted when a particular event occurs. Qt's widgets have many predefined signals, but we can always subclass widgets to add our own signals to them. A slot is a function that is called in response to a particular signal.
Every queued slot call ends up in the posting of a QMetaCallEvent
to the target object. The event contains the sender object, the signal id, the slot index, and packaged call parameters. On Qt 5, the signal id generally doesn't equal the value returned by QMetaObject::signalIndex()
: it is an index computed as if the object only had signal methods and no other methods.
The objective is to compress such calls so that only one unique call exists in the event queue for a given tuple of (sender object, sender signal, receiver object, receiver slot).
This is the only sane way to do it, without having to make changes to source or target objects, and while maintaining minimal overhead. The event-loop-recursing methods in my other answers have serious stack overhead per each event, on the order of 1kbyte when Qt is built for 64-bit-pointer architectures.
The event queue can be accessed when new events are posted to an object that has one or more events already posted to it. In such case, QCoreApplication::postEvent
calls QCoreApplication::compressEvent
. compressEvent
is not called when the first event is posted to an object. In a reimplementation of this method, the contents of a QMetaCallEvent
posted to the target object can be checked for a call to your slot, and the obsolete duplicate has to be deleted. Private Qt headers have to be included to obtain the definitions of QMetaCallEvent
, QPostEvent
and QPostEventList
.
Pros: Neither the sender nor the receiver objects have to be aware of anything. Signals and slots work as-is, including the method-pointer calls in Qt 5. Qt itself uses this way of compressing events.
Cons: Requires inclusion of private Qt headers and forcible clearing of QEvent::posted
flag.
Instead of hacking the QEvent::posted
flag, the events to be deleted can be queued in a separate list and deleted outside of the compressEvent
call, when a zero-duration timer is fired. This has the overhead of an extra list of events, and each event deletion iterating through the posted event list.
The point of doing it some other way is not to use Qt's internals.
L1 The first limitation is not having access to the contents of privately defined QMetaCallEvent
. It can be dealt with as follows:
A proxy object with signals and slots of same signatures as those of the target can be connected between the source and target objects.
Running the QMetaCallEvent
on a proxy object allows extraction of the call type, the called slot id, and the arguments.
In lieu of signal-slot connections, events can be explicitly posted to the target object. The target object, or an event filter, must explicitly re-synthesize the slot call from event's data.
A custom compressedConnect
implementation can be used in lieu of QObject::connect
. This fully exposes the details of the signal and slot. A proxy object can be used to perform a compression-friendly equivalent of queued_activate
on the side of the sender object.
L2 The second limitation is not being able to completely reimplement QCoreApplication::compressEvent
, since the event list is defined privately. We still have access to the event being compressed, and we can still decide whether to delete it or not, but there's no way to iterate the event list. Thus:
The event queue can be accessed implicitly by recursively calling sendPostedEvents
from within notify
(thus also from eventFilter()
, event()
or from the slots). This doesn't cause a deadlock, since QCoreApplication::sendPostedEvents
can't (and doesn't) hold an event loop mutex while the event is delivered via sendEvent
. Events can be filtered as follows:
QCoreApplication::notify
,QInternal::EventNotifyCallback
, QObject::event()
in the target class.The duplicate events are still posted to the event queue. The recursive calls to notify
from within sendPostedEvents
consume quite a bit of stack space (budget 1kb on 64-bit-pointer architectures).
The events already present can be removed by calling QCoreApplication::removePostedEvents
before posting a new event to an object. Unfortunately, doing this within QCoreApplication::compressEvent
causes a deadlock as the event queue mutex is already held.
A custom event class that includes the pointer to the receiver object can automatically call removePostedEvents
in the constructor.
Existing compressed events, such as QEvent::Exit
, can be reappropriated.
The set of those events is an implementation detail and could change. Qt doesn't discriminate among those events other than by the receiver QObject
pointer. An implementation requires the overhead of a proxy QObject per each (event type, receiver object) tuple.
The code below works on both Qt 4 and Qt 5. On the latter, make sure to add QT += core-private
to your qmake project file, so that private Qt headers get included.
The implementations not using Qt internal headers are given in other answers:
There are two event-removing code paths, selected by if (true)
. The enabled code path retains the most recent event and makes most sense, typically. Alternatively, you could want to retain the oldest event - that's what the disabled code path does.
#include <QApplication> #include <QMap> #include <QSet> #include <QMetaMethod> #include <QMetaObject> #include <private/qcoreapplication_p.h> #include <private/qthread_p.h> #include <private/qobject_p.h> #include <QWidget> #include <QPushButton> #include <QPlainTextEdit> #include <QSpinBox> #include <QFormLayout> // Works on both Qt 4 and Qt 5. // // Common Code /*! Keeps a list of singal indices for one or more meatobject classes. * The indices are signal indices as given by QMetaCallEvent.signalId. * On Qt 5, those do *not* match QMetaObject::methodIndex since they * exclude non-signal methods. */ class SignalList { Q_DISABLE_COPY(SignalList) typedef QMap<const QMetaObject *, QSet<int> > T; T m_data; /*! Returns a signal index that is can be compared to QMetaCallEvent.signalId. */ static int signalIndex(const QMetaMethod & method) { Q_ASSERT(method.methodType() == QMetaMethod::Signal); #if QT_VERSION >= QT_VERSION_CHECK(5,0,0) int index = -1; const QMetaObject * mobj = method.enclosingMetaObject(); for (int i = 0; i <= method.methodIndex(); ++i) { if (mobj->method(i).methodType() != QMetaMethod::Signal) continue; ++ index; } return index; #else return method.methodIndex(); #endif } public: SignalList() {} void add(const QMetaMethod & method) { m_data[method.enclosingMetaObject()].insert(signalIndex(method)); } void remove(const QMetaMethod & method) { T::iterator it = m_data.find(method.enclosingMetaObject()); if (it != m_data.end()) { it->remove(signalIndex(method)); if (it->empty()) m_data.erase(it); } } bool contains(const QMetaObject * metaObject, int signalId) { T::const_iterator it = m_data.find(metaObject); return it != m_data.end() && it.value().contains(signalId); } }; // // Implementation Using Event Compression With Access to Private Qt Headers struct EventHelper : private QEvent { static void clearPostedFlag(QEvent * ev) { (&static_cast<EventHelper*>(ev)->t)[1] &= ~0x8001; // Hack to clear QEvent::posted } }; template <class Base> class CompressorApplication : public Base { SignalList m_compressedSignals; public: CompressorApplication(int & argc, char ** argv) : Base(argc, argv) {} void addCompressedSignal(const QMetaMethod & method) { m_compressedSignals.add(method); } void removeCompressedSignal(const QMetaMethod & method) { m_compressedSignals.remove(method); } protected: bool compressEvent(QEvent *event, QObject *receiver, QPostEventList *postedEvents) { if (event->type() != QEvent::MetaCall) return Base::compressEvent(event, receiver, postedEvents); QMetaCallEvent *mce = static_cast<QMetaCallEvent*>(event); if (! m_compressedSignals.contains(mce->sender()->metaObject(), mce->signalId())) return false; for (QPostEventList::iterator it = postedEvents->begin(); it != postedEvents->end(); ++it) { QPostEvent &cur = *it; if (cur.receiver != receiver || cur.event == 0 || cur.event->type() != event->type()) continue; QMetaCallEvent *cur_mce = static_cast<QMetaCallEvent*>(cur.event); if (cur_mce->sender() != mce->sender() || cur_mce->signalId() != mce->signalId() || cur_mce->id() != mce->id()) continue; if (true) { /* Keep The Newest Call */ // We can't merely qSwap the existing posted event with the new one, since QEvent // keeps track of whether it has been posted. Deletion of a formerly posted event // takes the posted event list mutex and does a useless search of the posted event // list upon deletion. We thus clear the QEvent::posted flag before deletion. EventHelper::clearPostedFlag(cur.event); delete cur.event; cur.event = event; } else { /* Keep the Oldest Call */ delete event; } return true; } return false; } }; // // Demo GUI class Signaller : public QObject { Q_OBJECT public: Q_SIGNAL void emptySignal(); Q_SIGNAL void dataSignal(int); }; class Widget : public QWidget { Q_OBJECT QPlainTextEdit * m_edit; QSpinBox * m_count; Signaller m_signaller; Q_SLOT void emptySlot() { m_edit->appendPlainText("emptySlot invoked"); } Q_SLOT void dataSlot(int n) { m_edit->appendPlainText(QString("dataSlot(%1) invoked").arg(n)); } Q_SLOT void sendSignals() { m_edit->appendPlainText(QString("\nEmitting %1 signals").arg(m_count->value())); for (int i = 0; i < m_count->value(); ++ i) { emit m_signaller.emptySignal(); emit m_signaller.dataSignal(i + 1); } } public: Widget(QWidget * parent = 0) : QWidget(parent), m_edit(new QPlainTextEdit), m_count(new QSpinBox) { QFormLayout * l = new QFormLayout(this); QPushButton * invoke = new QPushButton("Invoke"); m_edit->setReadOnly(true); m_count->setRange(1, 1000); l->addRow("Number of slot invocations", m_count); l->addRow(invoke); l->addRow(m_edit); #if QT_VERSION >= QT_VERSION_CHECK(5,0,0) connect(invoke, &QPushButton::clicked, this, &Widget::sendSignals); connect(&m_signaller, &Signaller::emptySignal, this, &Widget::emptySlot, Qt::QueuedConnection); connect(&m_signaller, &Signaller::dataSignal, this, &Widget::dataSlot, Qt::QueuedConnection); #else connect(invoke, SIGNAL(clicked()), SLOT(sendSignals())); connect(&m_signaller, SIGNAL(emptySignal()), SLOT(emptySlot()), Qt::QueuedConnection); connect(&m_signaller, SIGNAL(dataSignal(int)), SLOT(dataSlot(int)), Qt::QueuedConnection); #endif } }; int main(int argc, char *argv[]) { CompressorApplication<QApplication> a(argc, argv); #if QT_VERSION >= QT_VERSION_CHECK(5,0,0) a.addCompressedSignal(QMetaMethod::fromSignal(&Signaller::emptySignal)); a.addCompressedSignal(QMetaMethod::fromSignal(&Signaller::dataSignal)); #else a.addCompressedSignal(Signaller::staticMetaObject.method(Signaller::staticMetaObject.indexOfSignal("emptySignal()"))); a.addCompressedSignal(Signaller::staticMetaObject.method(Signaller::staticMetaObject.indexOfSignal("dataSignal(int)"))); #endif Widget w; w.show(); return a.exec(); } #include "main.moc"
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With