Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why not all threads are completed?

Tags:

php

pthreads

I've tried example from this Joe answer https://stackoverflow.com/a/32187103/2229367 and it works great, but when i tried to edit this code a little:

$pool = new Pool(4);

while (@$i++<10) {
    $pool->submit(new class($i) extends Collectable {
        public function __construct($id) {
            $this->id = $id;
        }

        public function run() {
            printf(
                "Hello World from %d\n", $this->id);
        $this->html = file_get_contents('http://google.fr?q=' . $this->query);
            $this->setGarbage();
        }

        public $id;
public $html;
    });
}

while ($pool->collect(function(Collectable $work){
    printf(
        "Collecting %d\n", $work->id);
var_dump($work->html);
    return $work->isGarbage();
})) continue;

$pool->shutdown();

Count of "Hello world" differs from count of "Collecting". Docs are out of date. What about this problem?

like image 401
inJakuzi Avatar asked Sep 26 '22 15:09

inJakuzi


2 Answers

Worker::collect is not intended to enable you to reap results; It is non-deterministic.

Worker::collect is only intended to run garbage collection on objects referenced in the stack of Worker objects.

If the intention is to process each result as it becomes available, the code might look something like this:

<?php
$pool = new Pool(4);
$results = new Volatile();
$expected = 10;
$found = 0;

while (@$i++ < $expected) {
    $pool->submit(new class($i, $results) extends Threaded {

        public function __construct($id, Volatile $results) {
            $this->id = $id;
            $this->results = $results;
        }

        public function run() {
            $result = file_get_contents('http://google.fr?q=' . $this->id);

            $this->results->synchronized(function($results, $result){
                $results[$this->id] = $result;
                $results->notify();
            }, $this->results, $result);
        }

        private $id;
        private $results;
    });
}

do {
    $next = $results->synchronized(function() use(&$found, $results) {
        while (!count($results)) {
            $results->wait();
        }

        $found++;

        return $results->shift();
    });

    var_dump($next);
} while ($found < $expected);

while ($pool->collect()) continue;

$pool->shutdown();
?>

This is obviously not very tolerant of errors, but the main difference is that I use a shared Volatile collection of results, and I synchronize properly to fetch results in the main context as they become available.

If you wanted to wait for all results to become available, and possibly avoid some contention for locks - which you should always try to avoid if you can - then the code would look simpler, something like:

<?php
$pool = new Pool(4);
$results = new Volatile();
$expected = 10;

while (@$i++ < $expected) {
    $pool->submit(new class($i, $results) extends Threaded {

        public function __construct($id, Volatile $results) {
            $this->id = $id;
            $this->results = $results;
        }

        public function run() {
            $result = file_get_contents('http://google.fr?q=' . $this->id);

            $this->results->synchronized(function($results, $result){
                $results[$this->id] = $result;
                $results->notify();
            }, $this->results, $result);
        }

        private $id;
        private $results;
    });
}

$results->synchronized(function() use($expected, $results) {
    while (count($results) != $expected) {
        $results->wait();
    }
});

var_dump(count($results));

while ($pool->collect()) continue;

$pool->shutdown();
?>

Noteworthy that the Collectable interface is already implemented by Threaded in the most recent versions of pthreads - which is the one you should be using ... always ...

The docs are out of date, sorry about that ... one human ...

like image 128
Joe Watkins Avatar answered Sep 29 '22 05:09

Joe Watkins


Pthreads V3 is much less forgiven than V2. collect is a no go in V3.

Rule n°1: I do all my queries inside the threads, avoiding to pass too large amount of datas inside them. This was ok with V2, not anymore with V3. I keep passed arguments to workers as neat as possible. This also allows faster process.

Rule n°2: I do not go over the number of CPU threads available for each pool and chunck them accordingly with a loop. This way I make sure there are no memory overhead with a ton of pools and each time a loop is done, I force a garbage collection. This turned out to be necessary for me due to very high Ram needs across threads, might not be your case but make sure your consumed ram is not going over your php limit. More you passed arguments to the threads are big, more the ram will go up fast.

Rule n°3: Properly declare your object arrays in workers with (array) to make sure all results are returned.

Here is a basic rewritten working example , following the 3 rules as close as I can do per your example:

  • uses an array of queries to be multithreaded.

  • a collectable implement to grab the results in place of collect.

  • batches of pools according to the CPU nb of threads to avoid ram overheads.

  • threaded queries, each one having his connection, not passed across workers.

  • pushing all the results inside an array at the end.

code:

    define("SQLHOST", "127.0.0.1");
    define("SQLUSER", "root");
    define("SQLPASS", "password");
    define("SQLDBTA", "mydatabase");

    $Nb_of_th=12; // (6 cpu cores in this example)
    $queries = array_chunk($queries, ($Nb_of_th));// whatever list of queries you want to pass to the workers
    $global_data=array();// all results from all pool cycles

    // first we set the main loops
    foreach ($queries as $key => $chunks) {
    $pool = new Pool($Nb_of_th, Worker::class);// 12 pools max
    $workCount = count($chunks);

    // second we launch the submits 
    foreach (range(1, $workCount) as $i) {
        $chunck = $chunks[$i - 1];
        $pool->submit(new MyWorkers($chunck));
    }

    $data = [];// pool cycle result array
    $collector = function (\Collectable $work) use (&$data) {
        $isGarbage = $work->isGarbage();
        if ($isGarbage) {
            $data[] = $work->result; // thread result
        }
        return $isGarbage;
    };

    do {
        $count = $pool->collect($collector);
        $isComplete = count($data) === $workCount;
    } while (!$isComplete);

    array_push($global_data, $data);// push pool results into main

    //complete purge
    unset($data);
    $pool->shutdown();
    unset($pool);
    gc_collect_cycles();// force garbage collector before new pool cycle
    }

    Var_dump($global_data); // results for all pool cycles

    class MyWorkers extends \Threaded implements \Collectable {

    private $isGarbage;
    public $result;
    private $process;

    public function __construct($process) {
        $this->process = $process;
    }

    public function run() {

        $con = new PDO('mysql:host=' . SQLHOST . ';dbname=' . SQLDBTA . ';charset=UTF8', SQLUSER, SQLPASS);
        $proc = (array) $this->process; // important ! avoid volatile destruction in V3
        $stmt = $con->prepare($proc);
        $stmt->execute();
        $obj = $stmt1->fetchall(PDO::FETCH_ASSOC);

        /* do whatever you want to do here */
        $this->result = (array) $obj; // important ! avoid volatile destruction in V3
        $this->isGarbage = true;
    }

    public function isGarbage() : bool
    {
    return $this->isGarbage;
    }
}
like image 38
cpugourou Avatar answered Sep 29 '22 05:09

cpugourou