Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PHP Thread Pool

EDIT: To clarify and simplify: I'm looking for a "good" way to submit more Stackable objects to a Pool whenever a Stackable ends (using data from this first Stackable to add the second one). I have the ideas of polling the objects until one ends (inefficient and ugly) and passing references to Pool object (I was not able to make it work). The base code is this one: https://github.com/krakjoe/pthreads/blob/master/examples/Pooling.php

Now, the full description:

I'm working on an application in PHP, that has grown too much and takes a lot of time. Because of this I'm trying to multithread that application, using a thread pool (I know PHP is not the best option, but I don't want, and can't change the language at this point).

The problem is, there are 2 stages of the application, that have to go in order, and each one has a lot of subtasks that can go concurrently. So, this is the process in my head:

  • There will be N subtasks in stage 1, these subtasks will be Stackable objects.
  • When subtask i ends, the "main" (the one that creates the pool, the Stackables, etc.) has to be notified, and execute stage 2 for subtask i with some data from the subtask i (a different Stackable object). In this stage, there will be M subtasks for each of the subtasks of stage 1.

I would like to use the same thread pool for threads in stage 1 and stage 2, and the only solution I can thing of for going from stage 1 to stage 2 is poll each of the N subtasks until one of them ends, and then call stage 2 for the one that ended, and repeat until all the N subtasks end.

I'm using the example of a thread pool included in pthreads source, by Joe Watkins, as a base code.

like image 741
markmb Avatar asked Dec 20 '22 19:12

markmb


1 Answers

You should start by reading: https://gist.github.com/krakjoe/6437782

<?php
/**
* Normal worker
*/
class PooledWorker extends Worker {
    public function run(){}
}


/**
* Don't descend from pthreads, normal objects should be used for pools
*/
class Pool {
    protected $size;
    protected $workers;

    /**
    * Construct a worker pool of the given size
    * @param integer $size
    */  
    public function __construct($size) {
        $this->size = $size;
    }

    /**
    * Start worker threads
    */
    public function start() {
        while (@$worker++ < $this->size) {
            $this->workers[$worker] = new PooledWorker();
            $this->workers[$worker]->start();
        }
        return count($this->workers);
    }

    /**
    * Submit a task to pool
    */
    public function submit(Stackable $task) {
        $this->workers[array_rand($this->workers)]
            ->stack($task);
        return $task;
    }

    /**
    * Shutdown worker threads
    */
    public function shutdown() {
        foreach ($this->workers as $worker)
            $worker->shutdown();
    }
}

class StageTwo extends Stackable {
    /**
    * Construct StageTwo from a part of StageOne data
    * @param int $data
    */
    public function __construct($data) {
        $this->data = $data;
    }

    public function run(){
        printf(
            "Thread %lu got data: %d\n", 
            $this->worker->getThreadId(), $this->data);
    }
}

class StageOne extends Stackable {
    protected $done;

    /**
    * Construct StageOne with suitable storage for data
    * @param StagingData $data
    */
    public function __construct(StagingData $data) {
        $this->data = $data;
    }

    public function run() {
        /* create dummy data array */
        while (@$i++ < 100) {
            $this->data[] = mt_rand(
                20, 1000);
        }
        $this->done = true;
    }
}

/**
* StagingData to hold data from StageOne
*/
class StagingData extends Stackable {
    public function run() {}
}

/* stage and data reference arrays */
$one = [];
$two = [];
$data = [];

$pool = new Pool(8);
$pool->start();

/* construct stage one */
while (count($one) < 10) {
    $staging = new StagingData();
    /* maintain reference counts by storing return value in normal array in local scope */
    $one[] = $pool
        ->submit(new StageOne($staging));
    /* maintain reference counts */
    $data[] = $staging;
}

/* construct stage two */
while (count($one)) {

    /* find completed StageOne objects */
    foreach ($one as $id => $job) {
        /* if done is set, the data from this StageOne can be used */
        if ($job->done) {
            /* use each element of data to create new tasks for StageTwo */
            foreach ($job->data as $chunk) {
                /* submit stage two */
                $two[] = $pool
                    ->submit(new StageTwo($chunk));
            }

            /* no longer required */
            unset($one[$id]);
        }
    }

    /* in the real world, it is unecessary to keep polling the array */
    /* you probably have some work you want to do ... do it :) */
    if (count($one)) {
        /* everyone likes sleep ... */
        usleep(1000000);
    }
}

/* all tasks stacked, the pool can be shutdown */
$pool->shutdown();
?>

Will output:

Thread 140012266239744 got data: 612
Thread 140012275222272 got data: 267
Thread 140012257257216 got data: 971
Thread 140012033140480 got data: 881
Thread 140012257257216 got data: 1000
Thread 140012016355072 got data: 261
Thread 140012257257216 got data: 510
Thread 140012016355072 got data: 148
Thread 140012016355072 got data: 501
Thread 140012257257216 got data: 767
Thread 140012024747776 got data: 504
Thread 140012033140480 got data: 401
Thread 140012275222272 got data: 20
<-- trimmed from 1000 lines -->
Thread 140012041533184 got data: 285
Thread 140012275222272 got data: 811
Thread 140012041533184 got data: 436
Thread 140012257257216 got data: 977
Thread 140012033140480 got data: 830
Thread 140012275222272 got data: 554
Thread 140012024747776 got data: 704
Thread 140012033140480 got data: 50
Thread 140012257257216 got data: 794
Thread 140012024747776 got data: 724
Thread 140012033140480 got data: 624
Thread 140012266239744 got data: 756
Thread 140012284204800 got data: 997
Thread 140012266239744 got data: 708
Thread 140012266239744 got data: 981

Because you want to use one pool, you have little choice but to create the tasks in the main context where the pool is created. I can imagine other solutions but you specifically asked for this kind of solution.

Depending on the hardware I had at my disposal, and the nature of the tasks and data to be processed, I might well have multiple [small] pools of threads, one for each worker, this would allow StageOne to create StageTwo objects in the Worker context that is executing them, it might be something to consider.

like image 136
Joe Watkins Avatar answered Dec 24 '22 03:12

Joe Watkins