Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to Implement Asynchronous Queue to run Method in Symfony 3

First off, some basic information about my project: I have a website built with Symfony 3. For some tasks I'm thinking about implementing to run asynchronous PHP methods. Some events use a lot of time but their results need not be immediately evident.

For instance: in method newOrder I have function addUserLTV who do few steps. The customer does not have to wait for all the steps to complete, only to get immediately the confirmation after the basic operation - 'newOrder' will add addUserLTV to queue and show immediately confirmation (finished run). The queue tasks will be run when the server have time to do it.

public function addUserLTV( $userID, $addLTV )
{ //same code
}

How to do it? It is possible in symphony 3?

like image 882
Grene Avatar asked Dec 11 '22 11:12

Grene


1 Answers

This is something you can easily do with enqueue bundle. Just a few words on why should you choose it:

  • It supports a lot of transports from the simplest one (filesystem) to enterprise ones (RabbitMQ or Amazon SQS).
  • It comes with a very powerful bundle.
  • It has a top level abstraction which could be used with the greatest of ease.
  • There are a lot more which might come in handy.

Regarding your question. Here's how you can do this with the enqueue bundle. Follow setup instructions from the doc.

Now the addUserLTV method will look like this:

<?php
namespace Acme;

use Enqueue\Client\ProducerInterface;

class AddUserLTVService
{
    /**
     * @var ProducerInterface
     */
    private $producer;

    /**
     * @param ProducerInterface $producer
     */
    public function __construct(ProducerInterface $producer)
    {
        $this->producer = $producer;
    }

    public function addUserLTV( $userID, $addLTV )
    {
        $this->producer->sendCommand('add_user_ltv', [
            'userId' => $userID, 
            'ltv' => $addLTV]
        );
    }
}

It sends the message to a message queue using the client (top level abstraction I've mentioned before). The service has to be registered to the Symfony container:

services:
    Acme\AddUserLTVService: 
        arguments: ['@enqueue.producer']

Now let look at the consumption side. You need a command processor that do the job:

<?php
namespace Acme;

use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;
use Enqueue\Util\JSON;

class AddUserTVAProcessor implements PsrProcessor, CommandSubscriberInterface
{
    public function process(PsrMessage $message, PsrContext $context)
    {
        $data = JSON::decode($message->getBody());

        $userID = $data['userID'];
        $addLTV = $data['ltv'];

        // do job

        return self::ACK;
    }

    public static function getSubscribedCommand()
    {
        return 'add_user_ltv';
    }
}

Register it as a service with a enqueue.client.processor tag:

services:
    Acme\AddUserTVAProcessor: 
        tags:
            - {name: 'enqueue.client.processor'}

That's it for coding. Run the consume command and you are done:

./bin/console enqueue:consume --setup-broker -vvv
like image 66
Maksim Kotlyar Avatar answered Mar 08 '23 23:03

Maksim Kotlyar