I am using ratchet php. I am starting it like this:
$loop = \React\EventLoop\Factory::create();
$webSock = new \React\Socket\Server($loop);
$webSock->listen($this->port, $this->host);
$webServer = new \Ratchet\Server\IoServer(
new \Ratchet\Http\HttpServer(
new \Ratchet\WebSocket\WsServer(
new PusherServer()
)
),
$webSock
);
return $loop;
Now, in my onMessage()
of my Pusherserver
class (which implements MessageComponentInterface
), I want to perform a long, blocking task. It will be an HTTP request which could take up to ten seconds to complete.
How do I make onMessage()
free to handle other requests while the previous HTTP request is executing? I cannot use pthreads as I don't have access to change the php version that I am already given (which is thread safe).
This is exactly the problem that you have to avoid when doing anything within your event loop: it can't be blocking, because anyone else then trying to subscribe, or call message, or have anything else event-driven happening can't happen until this is finished.
This is more of an architecture problem, and once you've figured out the best way of doing it, it's about streamlining that and making sure it works for all of the tasks you need.
Ratchet provides a ZMQ binding - this is awesome because once you set it up, anything you receive on port 5555 will hit your event loop in onYourMethodName()
, or whatever you want to call it!
With this in mind, you need to send the work that needs to be done off into a job queue, another process (react has it's child-process extension which I don't particularly like because it's polling in user land as opposed to interrupt-driven I/O like PHP's PCNTL extension) or similar.
If you want to "just get it working", fire off the work that needs to be done, along with the connection id or another id so you know who the response needs to go back to, in a child process and when it's done send it out. This won't block!
If you want to do it the better way, and I highly recommend looking into this and the architecture for it so that you can take this knowledge with you in your career when you approach an async problem like this again, adopt a 'fire-and-forget' approach. Fire what needs to be done into a job queue within your event loop, then forget about it.
Your job queue can perform it's stuff, and when it's done, fire the result of that back over ZMQ (port 5555 that's listening, remember), which can then send the data back to the client.
For an awesome talk on job queues, I highly recommend this one from PHPNW.
Final note, because you have this thing open and listening on port 5555 for data, you can send this data from anywhere. It can be inter-process-communication, as in your have a java app that sends data to port 5555, or literally anything. It's binding things together, but not coupling them, that is important in your architecture.
For an example of actually using ZMQ, they provide it all on this page here (as linked above), but I'll try and explain a little bit about what's going on.
$pull->bind('tcp://127.0.0.1:5555');
$pull->on('message', array($pusher, 'onYourMethodName'));
This part means that when anything sends data to port 5555, and it's a "message" (you can google the other options available instead of message), it'll call onYourMethodName
in your $pusher
object. It really is that simple. Anything over 5555, hits $pusher::onYourMethodName
.
As a result, you just need to create your method in the event handler now (next to onMessage()
, onSubscribe()
etc)... Again this is all mentioned on that page.
public function onYourMethodName($data)
{
/** You'll probably want to send the data in JSON format **/
/** Imagine you get through a 'topic' in here... **/
$data = json_decode($data, true);
/** You should already have stored the people who are connected, topics etc - see the tutorial **/
$topic = $this->subscribedTopics[$data['topic']];
/** Send the data out to everyone subscribed to this topic **/
$topic->broadcast($data);
}
If you want to be able to send data to a specific user and not everyone, there are many ways of doing that. Take a look at this question for how I did it, but this was a while back now.
The only thing you need to do yourself now is, in your handler (in onMessage or whatever), actually put what needs to be done in the queue along with who it should be sent back to (the topic).
At the end of your worker doing it's stuff and getting the data, it'll need to call this to hit the code I've shown above:
$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:5555");
$socket->send(json_encode($data));
So here's what you need to do:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With