Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Swoole with RabbitMQ

I'm trying to send some data from php application to the user's browser using websockets. Therefore I've decided to use Swoole in combination with RabbitMQ.

It's the first time I'm working with websockets and after reading some posts about Socket.IO, Ratchet, etc. I've decided to halt on Swoole because it's written in C and handy to use with php.

This is how I understood the idea of enabling data transfer using websockets: 1) Start RabbitMQ worker and Swoole server in CLI 2) php application sends data to RabbitMQ 3) RabbitMQ sends message with data to worker 4) Worker receives message with data + establishes socket connection with Swoole socket server. 5) Swoole server broadcasts data to all connections

The question is how to bind Swoole socket server with RabbitMQ? Or how to make RabbitMQ to establish connection with Swoole and send data to it?

Here is the code:

Swoole server (swoole_sever.php)

$server = new \swoole_websocket_server("0.0.0.0", 2345, SWOOLE_BASE);

$server->on('open', function(\Swoole\Websocket\Server $server, $req)
{
    echo "connection open: {$req->fd}\n";
});

$server->on('message', function($server, \Swoole\Websocket\Frame $frame)
{
    echo "received message: {$frame->data}\n";
    $server->push($frame->fd, json_encode(["hello", "world"]));
});

$server->on('close', function($server, $fd)
{
    echo "connection close: {$fd}\n";
});

$server->start();

Worker which receives message from RabbitMQ, then makes connection to Swoole and broadcasts the message via socket connection (worker.php)

$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
    echo " [x] Received ", $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done", "\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);


    // Here I'm trying to make connection to Swoole server and sernd data
    $cli = new \swoole_http_client('0.0.0.0', 2345);

    $cli->on('message', function ($_cli, $frame) {
        var_dump($frame);
    });

    $cli->upgrade('/', function($cli)
    {
        $cli->push('This is the message to send to Swoole server');
        $cli->close();
    });
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

New task where the message will be send to RabbitMQ (new_task.php):

$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();

After starting both swoole server and worker I'm triggering new_task.php from command line:

php new_task.php

In command line prompt where a RabbitMQ Worker is running (worker.php) I can see that a message is delivered to the worker ("[x] Received Hello World!" message is appearing).

However in command line prompt where Swoole server is running happens nothing.

So the questions are: 1) Is the idea of this approach right? 2) What am I doing wrong?

like image 391
rvaliev Avatar asked Mar 12 '18 00:03

rvaliev


1 Answers

In the callback(in worker.php) that fires when a message is received you're using swoole_http_client which is async only. This seems to results in the code never being fully executed as the callback function returns before the async code is triggered.

A synchronous method of doing the same thing will solve the problem. Here is a simple example:

$client = new WebSocketClient('0.0.0.0', 2345);
$client->connect();
$client->send('This is the message to send to Swoole server');
$recv = $client->recv();
print_r($recv);
$client->close();

Check out the WebSocketClient class and example usage at github.

You can also wrap it in a coroutine, like this:

go(function () {
    $client = new WebSocketClient('0.0.0.0', 2345);
    $client->connect();
    $client->send('This is the message to send to Swoole server');
    $recv = $client->recv();
    print_r($recv);
    $client->close();
});
like image 138
Alex Avatar answered Nov 16 '22 02:11

Alex