I'm having some trouble grasping how to correctly handle creating a child process from a multithreaded program that uses Boost Asio in a multithreaded fashion.
If I understand correctly, the way to launch a child process in the Unix world is to call fork()
followed by an exec*()
. Also, if I understand correctly, calling fork()
will duplicate all file descriptors and so on and these need to be closed in the child process unless marked as FD_CLOEXEC
(and thereby being atomically closed when calling exec*()
).
Boost Asio requires to be notified when fork()
is called in order to operate correctly by calling notify_fork()
. However, in a multithreaded program this creates several issues:
Sockets are by default inherited by child processes if I understand correctly. They can be set to SOCK_CLOEXEC
- but not directly at creation*, thus leading to a timing window if a child process is being created from another thread.
notify_fork()
requires that no other thread calls any other io_service
function, nor any function on any other I/O object associated with the io_service
. This does not really seem to be feasible - after all the program is multithreaded for a reason.
If I understand correctly, any function call made between fork()
and exec*()
needs to be async signal safe (see fork()
documentation). There is no documentation of the notify_fork()
call being async signal safe. In fact, if I look at the source code for Boost Asio (at least in version 1.54), there may be calls to pthread_mutex_lock, which is not async signal safe if I understand correctly (see Signal Concepts, there are also other calls being made that are not on the white list).
Issue #1 I can probably work around by separating creation of child processes and sockets + files so that I can ensure that no child process is being created in the window between a socket being created and setting SOCK_CLOEXEC
. Issue #2 is trickier, I would probably need to make sure that all asio handler threads are stopped, do the fork and then recreate them again, which is tideous at best, and really really bad at worst (what about my pending timers??). Issue #3 seems to make it entirely impossible to use this correctly.
How do I correctly use Boost Asio in a multithreaded program together with fork()
+ exec*()
?
... or am I "forked"?
Please let me know if I have misunderstood any fundamental concepts (I am raised on Windows programming, not *nix...).
Edit:
* - Actually it is possible to create sockets with SOCK_CLOEXEC
set directly on Linux, available since 2.6.27 (see socket()
documentation). On Windows, the corresponding flag WSA_FLAG_NO_HANDLE_INHERIT
is available since Windows 7 SP 1 / Windows Server 2008 R2 SP 1 (see WSASocket()
documentation). OS X does not seem to support this though.
In a multi-threaded program, io_service::notify_fork()
is not safe to invoke in the child. Yet, Boost.Asio expects it to be called based on the fork()
support, as this is when the child closes the parent's previous internal file descriptors and creates new ones. While Boost.Asio explicitly list the pre-conditions for invoking io_service::notify_fork()
, guaranteeing the state of its internal components during the fork()
, a brief glance at the implementation indicates that std::vector::push_back()
may allocate memory from the free store, and the allocation is not guaranteed to be async-signal-safe.
With that said, one solution that may be worth considering is fork()
the process when it is still single threaded. The child process will remain single threaded and perform fork()
and exec()
when it is told to do so by the parent process via inter-process communication. This separation simplifies the problem by removing the need to manage the state of multiple threads while performing fork()
and exec()
.
Here is a complete example demonstrating this approach, where the multi-threaded server will receive filenames via UDP and a child process will perform fork()
and exec()
to run /usr/bin/touch
on the filename. In hopes of making the example slightly more readable, I have opted to use stackful coroutines.
#include <unistd.h> // execl, fork
#include <iostream>
#include <string>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
/// @brief launcher receives a command from inter-process communication,
/// and will then fork, allowing the child process to return to
/// the caller.
class launcher
{
public:
launcher(boost::asio::io_service& io_service,
boost::asio::local::datagram_protocol::socket& socket,
std::string& command)
: io_service_(io_service),
socket_(socket),
command_(command)
{}
void operator()(boost::asio::yield_context yield)
{
std::vector<char> buffer;
while (command_.empty())
{
// Wait for server to write data.
std::cout << "launcher is waiting for data" << std::endl;
socket_.async_receive(boost::asio::null_buffers(), yield);
// Resize buffer and read all data.
buffer.resize(socket_.available());
socket_.receive(boost::asio::buffer(buffer));
io_service_.notify_fork(boost::asio::io_service::fork_prepare);
if (fork() == 0) // child
{
io_service_.notify_fork(boost::asio::io_service::fork_child);
command_.assign(buffer.begin(), buffer.end());
}
else // parent
{
io_service_.notify_fork(boost::asio::io_service::fork_parent);
}
}
}
private:
boost::asio::io_service& io_service_;
boost::asio::local::datagram_protocol::socket& socket_;
std::string& command_;
};
using boost::asio::ip::udp;
/// @brief server reads filenames from UDP and then uses
/// inter-process communication to delegate forking and exec
/// to the child launcher process.
class server
{
public:
server(boost::asio::io_service& io_service,
boost::asio::local::datagram_protocol::socket& socket,
short port)
: io_service_(io_service),
launcher_socket_(socket),
socket_(boost::make_shared<udp::socket>(
boost::ref(io_service), udp::endpoint(udp::v4(), port)))
{}
void operator()(boost::asio::yield_context yield)
{
udp::endpoint sender_endpoint;
std::vector<char> buffer;
for (;;)
{
std::cout << "server is waiting for data" << std::endl;
// Wait for data to become available.
socket_->async_receive_from(boost::asio::null_buffers(),
sender_endpoint, yield);
// Resize buffer and read all data.
buffer.resize(socket_->available());
socket_->receive_from(boost::asio::buffer(buffer), sender_endpoint);
std::cout << "server got data: ";
std::cout.write(&buffer[0], buffer.size());
std::cout << std::endl;
// Write filename to launcher.
launcher_socket_.async_send(boost::asio::buffer(buffer), yield);
}
}
private:
boost::asio::io_service& io_service_;
boost::asio::local::datagram_protocol::socket& launcher_socket_;
// To be used as a coroutine, server must be copyable, so make socket_
// copyable.
boost::shared_ptr<udp::socket> socket_;
};
int main(int argc, char* argv[])
{
std::string filename;
// Try/catch provides exception handling, but also allows for the lifetime
// of the io_service and its IO objects to be controlled.
try
{
if (argc != 2)
{
std::cerr << "Usage: <port>\n";
return 1;
}
boost::thread_group threads;
boost::asio::io_service io_service;
// Create two connected sockets for inter-process communication.
boost::asio::local::datagram_protocol::socket parent_socket(io_service);
boost::asio::local::datagram_protocol::socket child_socket(io_service);
boost::asio::local::connect_pair(parent_socket, child_socket);
io_service.notify_fork(boost::asio::io_service::fork_prepare);
if (fork() == 0) // child
{
io_service.notify_fork(boost::asio::io_service::fork_child);
parent_socket.close();
boost::asio::spawn(io_service,
launcher(io_service, child_socket, filename));
}
else // parent
{
io_service.notify_fork(boost::asio::io_service::fork_parent);
child_socket.close();
boost::asio::spawn(io_service,
server(io_service, parent_socket, std::atoi(argv[1])));
// Spawn additional threads.
for (std::size_t i = 0; i < 3; ++i)
{
threads.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service));
}
}
io_service.run();
threads.join_all();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
// Now that the io_service and IO objects have been destroyed, all internal
// Boost.Asio file descriptors have been closed, so the execl should be
// in a clean state. If the filename has been set, then exec touch.
if (!filename.empty())
{
std::cout << "creating file: " << filename << std::endl;
execl("/usr/bin/touch", "touch", filename.c_str(), static_cast<char*>(0));
}
}
Terminal 1:
$ ls a.out example.cpp $ ./a.out 12345 server is waiting for data launcher is waiting for data server got data: a server is waiting for data launcher is waiting for data creating file: a server got data: b server is waiting for data launcher is waiting for data creating file: b server got data: c server is waiting for data launcher is waiting for data creating file: c ctrl + c $ ls a a.out b c example.cpp
Terminal 2:
$ nc -u 127.0.0.1 12345 actrl + dbctrl + dcctrl + d
Consider the following:
fork()
creates only one thread in the child process. You would need to recreate the other threads.fork()
. Callbacks registered with pthread_atfork()
could release the mutexes, but majority of libraries never bother using pthread_atfork()
. In other words, you child process could hang forever when calling malloc()
or new
because the standard heap allocator does use mutexes.In the light of the above, the only robust option in a multi-threaded process is to call fork()
and then exec()
.
Note that your parent process is not affected by fork()
as long as pthread_atfork()
handlers are not used.
Regarding forking and boost::asio
, there is io_service::notify_fork()
function that needs to be called before forking in the parent and after forking in both parent and child. What it does ultimately depends on the reactor being used. For Linux/UNIX reactors select_reactor
, epoll_reactor
, dev_poll_reactor
, kqueue_reactor
this function does not do anything to the parent before of after fork, but in the child it recreates the reactor state and re-registers the file descriptors. I am not sure what it does on Windows, though.
An example of its usage can be found in process_per_connection.cpp, you can just copy it:
void handle_accept(const boost::system::error_code& ec)
{
if (!ec)
{
// Inform the io_service that we are about to fork. The io_service cleans
// up any internal resources, such as threads, that may interfere with
// forking.
io_service_.notify_fork(boost::asio::io_service::fork_prepare);
if (fork() == 0)
{
// Inform the io_service that the fork is finished and that this is the
// child process. The io_service uses this opportunity to create any
// internal file descriptors that must be private to the new process.
io_service_.notify_fork(boost::asio::io_service::fork_child);
// The child won't be accepting new connections, so we can close the
// acceptor. It remains open in the parent.
acceptor_.close();
// The child process is not interested in processing the SIGCHLD signal.
signal_.cancel();
start_read();
}
else
{
// Inform the io_service that the fork is finished (or failed) and that
// this is the parent process. The io_service uses this opportunity to
// recreate any internal resources that were cleaned up during
// preparation for the fork.
io_service_.notify_fork(boost::asio::io_service::fork_parent);
socket_.close();
start_accept();
}
}
else
{
std::cerr << "Accept error: " << ec.message() << std::endl;
start_accept();
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With