The event loop is a constantly running process that monitors both the callback queue and the call stack. In this example, the timeout is 0 second, so the message 'Execute immediately. ' should appear before the message 'Bye!' .
JavaScript has a runtime model based on an event loop, which is responsible for executing the code, collecting and processing events, and executing queued sub-tasks. This model is quite different from models in other languages like C and Java.
The event loop is a process that waits for the Call Stack to be clear before pushing callbacks from the Task Queue to the Call Stack. Once the Stack is clear, the event loop triggers and checks the Task Queue for available callbacks.
Event loop: An event loop is something that pulls stuff out of the queue and places it onto the function execution stack whenever the function stack becomes empty.
I used to wonder a lot about the same!
A GUI main loop looks like this, in pseudo-code:
void App::exec() {
for(;;) {
vector<Waitable> waitables;
waitables.push_back(m_networkSocket);
waitables.push_back(m_xConnection);
waitables.push_back(m_globalTimer);
Waitable* whatHappened = System::waitOnAll(waitables);
switch(whatHappened) {
case &m_networkSocket: readAndDispatchNetworkEvent(); break;
case &m_xConnection: readAndDispatchGuiEvent(); break;
case &m_globalTimer: readAndDispatchTimerEvent(); break;
}
}
}
What is a "Waitable"? Well, it's system dependant. On UNIX it's called a "file descriptor" and "waitOnAll" is the ::select system call. The so-called vector<Waitable>
is a ::fd_set
on UNIX, and "whatHappened" is actually queried via FD_ISSET
. The actual waitable-handles are acquired in various ways, for example m_xConnection
can be taken from ::XConnectionNumber(). X11 also provides a high-level, portable API for this -- ::XNextEvent() -- but if you were to use that, you wouldn't be able to wait on several event sources simultaneously.
How does the blocking work? "waitOnAll" is a syscall that tells the OS to put your process on a "sleep list". This means you are not given any CPU time until an event occurs on one of the waitables. This, then, means your process is idle, consuming 0% CPU. When an event occurs, your process will briefly react to it and then return to idle state. GUI apps spend almost all their time idling.
What happens to all the CPU cycles while you're sleeping? Depends. Sometimes another process will have a use for them. If not, your OS will busy-loop the CPU, or put it into temporary low-power mode, etc.
Please ask for further details!
Python:
You can look at the implementation of the Twisted reactor which is probably the best implementation for an event loop in python. Reactors in Twisted are implementations of an interface and you can specify a type reactor to run: select, epoll, kqueue (all based on a c api using those system calls), there are also reactors based on the QT and GTK toolkits.
A simple implementation would be to use select:
#echo server that accepts multiple client connections without forking threads
import select
import socket
import sys
host = ''
port = 50000
backlog = 5
size = 1024
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((host,port))
server.listen(backlog)
input = [server,sys.stdin]
running = 1
#the eventloop running
while running:
inputready,outputready,exceptready = select.select(input,[],[])
for s in inputready:
if s == server:
# handle the server socket
client, address = server.accept()
input.append(client)
elif s == sys.stdin:
# handle standard input
junk = sys.stdin.readline()
running = 0
else:
# handle all other sockets
data = s.recv(size)
if data:
s.send(data)
else:
s.close()
input.remove(s)
server.close()
Generally I would do this with some sort of counting semaphore:
If you don't want to get that complicated, you could just add a sleep() call in your while loop with a trivially small sleep time. That will cause your message processing thread to yield it's CPU time to other threads. The CPU won't be pegged at 100% any more, but it's still pretty wasteful.
I would use a simple, light-weight messaging library called ZeroMQ (http://www.zeromq.org/). It is an open source library (LGPL). This is a very small library; on my server, the whole project compiles in about 60 seconds.
ZeroMQ will hugely simplify your event-driven code, AND it is also THE most efficient solution in terms of performance. Communicating between threads using ZeroMQ is much faster (in terms of speed) than using semaphores or local UNIX sockets. ZeroMQ also be a 100% portable solution, whereas all the other solutions would tie your code down to a specific operating system.
Here is a C++ event loop. At the creation of the object EventLoop
, it creates a thread which continually runs any task given to it. If there are no tasks available, the main thread goes to sleep until some task is added.
First we need a thread safe queue which allow multiple producers and at least a single consumer (the EventLoop
thread). The EventLoop
object which controls the consumers and producers. With a little change, it can be added multiple consumers (runners threads), instead of only one thread.
#include <stdio.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <set>
#include <functional>
class EventLoopNoElements : public std::runtime_error
{
public:
EventLoopNoElements(const char* error)
: std::runtime_error(error)
{
}
};
template <typename Type>
struct EventLoopCompare {
typedef std::tuple<std::chrono::time_point<std::chrono::system_clock>, Type> TimePoint;
bool operator()(const typename EventLoopCompare<Type>::TimePoint left, const typename EventLoopCompare<Type>::TimePoint right) {
return std::get<0>(left) < std::get<0>(right);
}
};
/**
* You can enqueue any thing with this event loop. Just use lambda functions, future and promises!
* With lambda `event.enqueue( 1000, [myvar, myfoo](){ myvar.something(myfoo); } )`
* With futures we can get values from the event loop:
* ```
* std::promise<int> accumulate_promise;
* event.enqueue( 2000, [&accumulate_promise](){ accumulate_promise.set_value(10); } );
* std::future<int> accumulate_future = accumulate_promise.get_future();
* accumulate_future.wait(); // It is not necessary to call wait, except for syncing the output.
* std::cout << "result=" << std::flush << accumulate_future.get() << std::endl;
* ```
* It is just not a nice ideia to add something which hang the whole event loop queue.
*/
template <class Type>
struct EventLoop {
typedef std::multiset<
typename EventLoopCompare<Type>::TimePoint,
EventLoopCompare<Type>
> EventLoopQueue;
bool _shutdown;
bool _free_shutdown;
std::mutex _mutex;
std::condition_variable _condition_variable;
EventLoopQueue _queue;
std::thread _runner;
// free_shutdown - if true, run all events on the queue before exiting
EventLoop(bool free_shutdown)
: _shutdown(false),
_free_shutdown(free_shutdown),
_runner( &EventLoop<Type>::_event_loop, this )
{
}
virtual ~EventLoop() {
std::unique_lock<std::mutex> dequeuelock(_mutex);
_shutdown = true;
_condition_variable.notify_all();
dequeuelock.unlock();
if (_runner.joinable()) {
_runner.join();
}
}
// Mutex and condition variables are not movable and there is no need for smart pointers yet
EventLoop(const EventLoop&) = delete;
EventLoop& operator =(const EventLoop&) = delete;
EventLoop(const EventLoop&&) = delete;
EventLoop& operator =(const EventLoop&&) = delete;
// To allow multiple threads to consume data, just add a mutex here and create multiple threads on the constructor
void _event_loop() {
while ( true ) {
try {
Type call = dequeue();
call();
}
catch (EventLoopNoElements&) {
return;
}
catch (std::exception& error) {
std::cerr << "Unexpected exception on EventLoop dequeue running: '" << error.what() << "'" << std::endl;
}
catch (...) {
std::cerr << "Unexpected exception on EventLoop dequeue running." << std::endl;
}
}
std::cerr << "The main EventLoop dequeue stopped running unexpectedly!" << std::endl;
}
// Add an element to the queue
void enqueue(int timeout, Type element) {
std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();
std::chrono::time_point<std::chrono::system_clock> newtime = timenow + std::chrono::milliseconds(timeout);
std::unique_lock<std::mutex> dequeuelock(_mutex);
_queue.insert(std::make_tuple(newtime, element));
_condition_variable.notify_one();
}
// Blocks until getting the first-element or throw EventLoopNoElements if it is shutting down
// Throws EventLoopNoElements when it is shutting down and there are not more elements
Type dequeue() {
typename EventLoopQueue::iterator queuebegin;
typename EventLoopQueue::iterator queueend;
std::chrono::time_point<std::chrono::system_clock> sleeptime;
// _mutex prevents multiple consumers from getting the same item or from missing the wake up
std::unique_lock<std::mutex> dequeuelock(_mutex);
do {
queuebegin = _queue.begin();
queueend = _queue.end();
if ( queuebegin == queueend ) {
if ( _shutdown ) {
throw EventLoopNoElements( "There are no more elements on the queue because it already shutdown." );
}
_condition_variable.wait( dequeuelock );
}
else {
if ( _shutdown ) {
if (_free_shutdown) {
break;
}
else {
throw EventLoopNoElements( "The queue is shutting down." );
}
}
std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();
sleeptime = std::get<0>( *queuebegin );
if ( sleeptime <= timenow ) {
break;
}
_condition_variable.wait_until( dequeuelock, sleeptime );
}
} while ( true );
Type firstelement = std::get<1>( *queuebegin );
_queue.erase( queuebegin );
dequeuelock.unlock();
return firstelement;
}
};
Utility to print the current timestamp:
std::string getTime() {
char buffer[20];
#if defined( WIN32 )
SYSTEMTIME wlocaltime;
GetLocalTime(&wlocaltime);
::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03d ", wlocaltime.wHour, wlocaltime.wMinute, wlocaltime.wSecond, wlocaltime.wMilliseconds);
#else
std::chrono::time_point< std::chrono::system_clock > now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto hours = std::chrono::duration_cast< std::chrono::hours >( duration );
duration -= hours;
auto minutes = std::chrono::duration_cast< std::chrono::minutes >( duration );
duration -= minutes;
auto seconds = std::chrono::duration_cast< std::chrono::seconds >( duration );
duration -= seconds;
auto milliseconds = std::chrono::duration_cast< std::chrono::milliseconds >( duration );
duration -= milliseconds;
time_t theTime = time( NULL );
struct tm* aTime = localtime( &theTime );
::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03ld ", aTime->tm_hour, aTime->tm_min, aTime->tm_sec, milliseconds.count());
#endif
return buffer;
}
Example program using these:
// g++ -o test -Wall -Wextra -ggdb -g3 -pthread test.cpp && gdb --args ./test
// valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --verbose ./test
// procdump -accepteula -ma -e -f "" -x c:\ myexe.exe
int main(int argc, char* argv[]) {
char buffer[20];
std::cerr << getTime() << "Creating EventLoop" << std::endl;
EventLoop<std::function<void()>>* eventloop = new EventLoop<std::function<void()>>(true);
std::cerr << getTime() << "Adding event element" << std::endl;
eventloop->enqueue( 3000, []{ char buffer[20]; std::cerr << getTime() << "Running task 3" << std::endl; } );
eventloop->enqueue( 1000, []{ char buffer[20]; std::cerr << getTime() << "Running task 1" << std::endl; } );
eventloop->enqueue( 2000, []{ char buffer[20]; std::cerr << getTime() << "Running task 2" << std::endl; } );
std::this_thread::sleep_for( std::chrono::milliseconds(5000) );
delete eventloop;
std::cerr << getTime() << "Exiting after 10 seconds..." << std::endl;
return 0;
}
Output test example:
02:08:28.960 Creating EventLoop
02:08:28.960 Adding event element
02:08:29.960 Running task 1
02:08:30.961 Running task 2
02:08:31.961 Running task 3
02:08:33.961 Exiting after 10 seconds...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With