Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to bind ZeroMQ socket with Ratchet web-socket library to make real time application for php application?

I am just a beginner in this whole area involving websocket, Ratchet and ZeroMQ.

To my basic understanding:

websocket is something that helps to create open connection between server and client.

Ratchet is a PHP based library which uses PHP's core Socket functions to create a PHP socket framework allowing us to ease in PHP socket programming.

ZeroMQ is a socket library that helps non-ratchet application to (other PHP script) send the data over Ratchet Socket and web-socket.

I am following the tutorial found in ratchet about 'hello world' and 'pusher' but both of them seems to be incomplete and only teaches how to work with console only. I have also found ratchet-example in github but it is not properly documented. I was looking for a complete example (with a dedicated html page and javascript)

Below is the code I am working on: This is one of the method of the controller which I am making a Ajax request for. This method will create a new post (lets say). I want to update the list of post dynamically in multiple client's browser by broadcasting/pushing with the help of ZeroMq.

A method in a controller:

public function create_new_post(){
    // ------
    // code to create a new post.
    // -------

    // After creating a post
    $response = [
        'new_post_title'    => $title,
        'post_id'           => $id
    ];

    $context = new ZMQContext();
    $socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
    $socket->connect("tcp://localhost:8000");
    $socket->send(json_encode($response));

}

Pusher File:

use Ratchet\ConnectionInterface;
use Ratchet\Wamp\WampServerInterface;

class Pusher implements WampServerInterface{

     public function onPostEntry($data){
         // Data that were sent by ZeroMQ through create_new_post() method
         $entry_data = json_decode($data);      

         // AND AFTER THIS, I DONT HAVE CLUE OF WHAT TO DO NEXT !!             

     }
}

Shell Script to run server:

require dirname(__DIR__) . '/vendor/autoload.php';

$loop   = React\EventLoop\Factory::create();
$pusher = new MyApp\Pusher;

// Listen for the web server to make a ZeroMQ push after an ajax request
$context = new React\ZMQ\Context($loop);
$pull = $context->getSocket(ZMQ::SOCKET_PULL);
$pull->bind('tcp://127.0.0.1:8000'); 
$pull->on('message', array($pusher, 'onBidEntry'));

// Set up our WebSocket server for clients wanting real-time updates
$webSock = new React\Socket\Server($loop);
$webSock->listen(8080, '0.0.0.0'); 
$webServer = new Ratchet\Server\IoServer(
    new Ratchet\Http\HttpServer(
        new Ratchet\WebSocket\WsServer(
            new Ratchet\Wamp\WampServer(
                $pusher
            )
        )
    ),
    $webSock
);

$loop->run();

Shell script only tells that it will serve in port 8080, however how would I mention my routes. Lets say I want the open connection in only the page 'mysite/allposts'. Also, what would be the script that I have to write in client side (a javascript file) and how to receive these new data by client side updating a particular DOM object.

like image 422
Nirmalz Thapaz Avatar asked Jul 12 '15 10:07

Nirmalz Thapaz


People also ask

How to Create a WebSocket server in PHP with Ratchet?

Create the HTTP Server Open the file app. php and add the following code: <? php use Ratchet\Server\IoServer; use Ratchet\Http\HttpServer; use Ratchet\WebSocket\WsServer; use MyApp\Socket; require dirname( __FILE__ ) .

How to connect WebSocket in PHP?

We include the vendor/autolaod. php file to load the dependencies we downloaded using composer into our script. Then, we connect to the WebSocket server and run an infinite loop so the script keeps running to listen to the incoming messages. You should see incoming messages in your terminal now.

What is ratchet WebSocket?

WebSockets for PHP Ratchet is a loosely coupled PHP library providing developers with tools to create real time, bi-directional applications between clients and servers over WebSockets.

What is ZeroMQ socket?

ZeroMQ (also spelled ØMQ, 0MQ or ZMQ) is an asynchronous messaging library, aimed at use in distributed or concurrent applications. It provides a message queue, but unlike message-oriented middleware, a ZeroMQ system can run without a dedicated message broker; the zero in the name is for zero broker.


1 Answers

I followed the examples you are talking about. They didn't seem incomplete to me, but I understand what you mean. Ratchet is a server side script and just allows you to write a service that implements websockets and that is able to listen to ZMQ messages. You will launch your Ratchet script on the command line and it runs as a service in parallel to Apache.

This is all independent from the client side of the websocket. Like they recommend, I used Autobahn.js on the client side. This library implements the WAMP protocol. It simplifies the client side code to the max.

The problem with your code is, that class Pusher implements WampServerInterface does not have a public function onPostEntry. This class has to implement the WampServerInterface, this means it must have at least these functions :

  • onSubscribe(ConnectionInterface $conn, $topic)
  • onUnSubscribe(ConnectionInterface $conn, $topic)
  • onOpen(ConnectionInterface $conn)
  • onClose(ConnectionInterface $conn)
  • onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible
  • onError(ConnectionInterface $conn, \Exception $e)
  • onZMQMessage($jsondata)

There can be others for more advanced features, like calling remote procedures on clients.

On the sender side (ZMQ message), put this code:

$zmq = new ZMQWrapper;
$zqm->publish('posts', $response);

class ZMQWrapper {
    function __construct(){
        $this->context = new ZMQContext();
        $this->socket = $this->context->getSocket(ZMQ::SOCKET_PUSH);
        $this->socket->setSockOpt(ZMQ::SOCKOPT_LINGER, 500);
        $this->socket->connect("tcp://127.0.0.1:" . ZMQ_PORT);
    }
    function publish($topic, $msg){
        $data = ['topic' => "mb.$topic", 'msg' => $msg];
        $this->socket->send(json_encode($data), ZMQ::MODE_DONTWAIT);
    }
}

In the pusher file put someting like:

public function onSubscribe(ConnectionInterface $conn, $topic) {
    $log = $this->getLogger();
    $topicId = $topic->getId();
    $log->info(sprintf('A client subscribed to %s', $topicId));
    // you could broadcast that user x joined the discussion
}
public function onUnSubscribe(ConnectionInterface $conn, $topic) {
    $log = $this->getLogger();
    $topicId = $topic->getId();
    $log->info(sprintf('A client unsubscribed from %s', $topicId));
    // you could broadcast that user x leaved the discussion
}
public function onOpen(ConnectionInterface $conn) {
    $log = $this->getLogger();
    $log->info(sprintf('Client %d connected', $conn->resourceId));
    $this->clients[$conn->resourceId] = array(); // this will allow you to save state information of the client, you can modify in onSubscribe and onUnsubscribe
    // clients will contain the list of all clients
}
public function onClose(ConnectionInterface $conn) {
    $log = $this->getLogger();
    $log->info(sprintf('Client %d disconnected', $conn->resourceId));
    // you could broadcast that user x leaved the discussion
}
public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
    $log = $this->getLogger();
    $topicId = $topic->getId();
    $log->info(sprintf('Client %d published to %s : %s', $conn->resourceId, $topicId, json_encode($event)));
    foreach($topic->getIterator() as $peer){
        if(!in_array($peer->WAMP->sessionId, $exclude)){
            $peer->event($topicId, $event);
        }
    }
}

The last piece is on the client. If a user opens the page mysite/allposts, in javascript you include autobahn.js. The websocket will be made available under the variable ab. You then do:

When opening the page :

var currentSession;
ab.connect(
    Paths.ws,
    function(session) { // onconnect
        currentSession = session
        onWsConnect(session)
    },
    function(code, reason, detail) {// onhangup
        onWsDisconnect(code, reason, detail)
    },{
        maxRetries: 60,
        retryDelay: 2000,
        skipSubprotocolCheck: true
    }
)
currentSession.subscribe('posts', onPostReceived)

function onPostReceived(topic, message){
    //display the new post
}

When closing the page:

currentSession.unsubscribe(topic)

You note that I kept everything very general. This allows me for having several types of messages handled by the same system. What differs are the ZMQ messages and the arguments of currentSession.subscribe.

I my implementation, I also keep track of the logged in users that opened the connection, but I stripped this part of the code.

I hope this will help you.

like image 97
Lorenz Meyer Avatar answered Oct 22 '22 04:10

Lorenz Meyer