Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using PHP Pthreads with Ratchet Websocket

I am making an html5 game www.titansoftime.com

I am using ratchet as a php websocket server solution. It works great! http://socketo.me/docs/push

I have done several standalone test using the php pthreads extension and have seen some very exciting results. It truly works and works well.. as long as websockets aren't in the mix.

Pthreads give php multithreading capabilities (it really does work and it's amazing). http://php.net/manual/en/book.pthreads.php

This is what I do:

/src/server.php This is the file that launches the daemon.

    <?php
    session_start();

    use Ratchet\Server\IoServer;
    use Ratchet\WebSocket\WsServer;
    use MyApp\Pusher;

    require __DIR__ . '/../vendor/autoload.php';

    require_once __DIR__ . '/../mysql.cls.php';
    require_once __DIR__ . '/../game.cls.php';
    require_once __DIR__ . '/../model.cls.php';

    $mysql = new mysql;
    $game  = new game;

    $loop   = React\EventLoop\Factory::create();
    $pusher = new MyApp\Pusher();

    $loop->addPeriodicTimer(0.50, function() use($pusher){
        $pusher->load();
    });

    $webSock = new React\Socket\Server($loop);

    if ($loop instanceof \React\EventLoop\LibEventLoop) {
        echo "\n HAS LibEvent";
    }

    $webSock->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect
    $webServer = new Ratchet\Server\IoServer(
            new Ratchet\Http\HttpServer(
                    new Ratchet\WebSocket\WsServer($pusher)
            ),
            $webSock
    );

    $loop->run();

This all works fine.

/src/MyApp/Pusher.php This class pushes data to all connected users.

<?php
namespace MyApp;
use Ratchet\ConnectionInterface;
use Ratchet\MessageComponentInterface;

class AsyncThread extends \Thread{

    public $client;

    public function __construct($client){
        $this->client = $client;
    }

    public function run(){

        // do work on $this->client
        $user = mysql::assoc('SELECT * from users WHERE connection_id = "'.$this->client->resourceId.'"');
        // etc..
        $this->client->send(json_encode(array('foo'=>'bar')));

    }

}

class Pusher implements MessageComponentInterface{

    public static $clients = array();

    #load
    public static function load(){

        $client_count = count(self::$clients);

        echo "\n\n\n".'Serving to '.$client_count.' clients. '.time();

        $start = $istart = microtime(true);

        if( !count(self::$clients) ){
            if( !mysql_ping() ){
                $game->connect();
            }
        }

        $threads = array();
        foreach( self::$clients as $key => $client ){       

            // HANDLE CLIENT

            // This works just fine, the only problem is that if I have lets say 50 simultaneous users, the people near the end of the clients array will have to wait till the other users have been processed. This is not desirable
            $client->send(json_encode('foo'=>'bar'));

           // So I tried this:
           $threads[$key] = new AsyncThread($client);
           $threads[$key]->start();

           // At this point the AsyncThread class will throw a fatal error complaining about not being able to serialize a closure. 
          // If I dont set "$this->data = $client;" in the thread constructor no error appears but now I cant use the data.

           // Also regardless of whether or not I bind the data in the AsyncThread constructor,
           // the connection disappears if I call "new AsyncThread($client)". I cannot explain this behavior.

        }

    }

    public function onMessage(ConnectionInterface $from, $msg) {
        global $game;
        if( $msg ){
            $data = json_decode($msg);
            if( $data ){    

                switch( $data->task ){

                    #connect
                    case 'connect':
                        echo "\n".'New connection! ('.$from->resourceId.') '.$from->remoteAddress;
                        self::$clients[] = $from;
                        break;

                    default:
                        self::closeConnection($from);
                        echo "\nNO TASK CLOSING";
                        break;

                }
            }else{
                echo "\n NO DATA";
                self::closeConnection($from);
            }
        }else{
            echo "\n NO MSG";
            self::closeConnection($from);
        }
    }

    public function closeConnection($conn){
        global $game;
        if( $conn ){
            if( $conn->resourceId ){
                $connid = $conn->resourceId;
                $conn->close(); 
                $new = array();
                foreach( self::$clients as $client ){
                    if( $client->resourceId != $connid ){
                        $new[] = $client;
                    }
                }
                self::$clients = $new;
                $game->query('UPDATE users set connection_id = 0 WHERE connection_id = "'.intval($connid).'" LIMIT 1');
                echo "\n".'Connection '.$connid.' has disconnected';
            }
        }
    }

    public function onClose(ConnectionInterface $conn) {
        echo "\nCLIENT DROPPED";
        self::closeConnection($conn);
    }

    public function onOpen(ConnectionInterface $conn) {
    }
    public function onError(ConnectionInterface $conn, \Exception $e) {
        echo "\nCLIENT ERRORED";
        self::closeConnection($conn);
    }
    public function onSubscribe(ConnectionInterface $conn, $topic) {
    }
    public function onUnSubscribe(ConnectionInterface $conn, $topic) {
    }
    public function onCall(ConnectionInterface $conn, $id, $topic, array $params) {
    }
    public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
    }

}

This all works fine as long as I don't create a thread inside the event loop.

Am I going about this the wrong way or is php multithreading and websockets incompatible?

like image 969
Hobbes Avatar asked Sep 06 '14 20:09

Hobbes


People also ask

What is WebSockets for PHP Ratchet?

WebSockets for PHP. Ratchet is a loosely coupled PHP library providing developers with tools to create real time, bi-directional applications between clients and servers over WebSockets.

What is Ratchet?

Ratchet is a PHP WebSocket library for serving real-time bi-directional messages between clients and server Ratchet Home Documentation API Docs Demo Ratchet WebSockets for PHP

What are WebSockets and Ajax requests?

Websockets are low-latency (or fast), persistent connections between a server and one or more clients. Unlike AJAX requests, WebSockets are bi-directional (push-pull), meaning that both the client and server can listen to each other in real-time and respond to any changes. Be sure to navigate back to the project root directory.

How do I connect to a WebSocket server in JavaScript?

First, we use the native JS WebSocket object and pass in the address of the WebSocket server. Then we store that in a variable conn. This will initiate the onOpen () method on our WebSocket server. Once it receives a response from the server saying it has connected correctly, the conn. onopen event fires.


1 Answers

check this package https://github.com/huyanping/react-multi-process

Install

composer require jenner/react-multi-process How to use it?

So simple like:

$loop = React\EventLoop\Factory::create();
$server = stream_socket_server('tcp://127.0.0.1:4020');
stream_set_blocking($server, 0);
$loop->addReadStream($server, function ($server) use ($loop) {
    $conn = stream_socket_accept($server);
    $data = "pid:" . getmypid() . PHP_EOL;
    $loop->addWriteStream($conn, function ($conn) use (&$data, $loop) {
        $written = fwrite($conn, $data);
        if ($written === strlen($data)) {
            fclose($conn);
            $loop->removeStream($conn);
        } else {
            $data = substr($data, 0, $written);
        }
    });
});

// the second param is the sub process count
$master = new \React\Multi\Master($loop, 20);
$master->start();

An example using jenner/simple_fork like:

class IoServer {
     /**
     * @param int $count worker process count
     * Run the application by entering the event loop
     * @throws \RuntimeException If a loop was not previously specified
     */
    public function run($count = 1) {
        if (null === $this->loop) {
            throw new \RuntimeException("A React Loop was not provided during instantiation");
        }

        if($count <= 1){
            $this->loop->run();
        }else{
            $loop = $this->loop;
            $master = new \Jenner\SimpleFork\FixedPool(function() use($loop) {
                $this->loop->run();
            }, $count);
            $master->start();
            $master->keep(true);
//            or just 
//            $master = new \React\Multi\Master($this->loop, $count);
//            $master->start();
        }
    }
}
like image 143
Devy Avatar answered Nov 15 '22 14:11

Devy