I've tried example from this Joe answer https://stackoverflow.com/a/32187103/2229367 and it works great, but when i tried to edit this code a little:
$pool = new Pool(4);
while (@$i++<10) {
$pool->submit(new class($i) extends Collectable {
public function __construct($id) {
$this->id = $id;
}
public function run() {
printf(
"Hello World from %d\n", $this->id);
$this->html = file_get_contents('http://google.fr?q=' . $this->query);
$this->setGarbage();
}
public $id;
public $html;
});
}
while ($pool->collect(function(Collectable $work){
printf(
"Collecting %d\n", $work->id);
var_dump($work->html);
return $work->isGarbage();
})) continue;
$pool->shutdown();
Count of "Hello world" differs from count of "Collecting". Docs are out of date. What about this problem?
Worker::collect
is not intended to enable you to reap results; It is non-deterministic.
Worker::collect
is only intended to run garbage collection on objects referenced in the stack of Worker
objects.
If the intention is to process each result as it becomes available, the code might look something like this:
<?php
$pool = new Pool(4);
$results = new Volatile();
$expected = 10;
$found = 0;
while (@$i++ < $expected) {
$pool->submit(new class($i, $results) extends Threaded {
public function __construct($id, Volatile $results) {
$this->id = $id;
$this->results = $results;
}
public function run() {
$result = file_get_contents('http://google.fr?q=' . $this->id);
$this->results->synchronized(function($results, $result){
$results[$this->id] = $result;
$results->notify();
}, $this->results, $result);
}
private $id;
private $results;
});
}
do {
$next = $results->synchronized(function() use(&$found, $results) {
while (!count($results)) {
$results->wait();
}
$found++;
return $results->shift();
});
var_dump($next);
} while ($found < $expected);
while ($pool->collect()) continue;
$pool->shutdown();
?>
This is obviously not very tolerant of errors, but the main difference is that I use a shared Volatile
collection of results, and I synchronize properly to fetch results in the main context as they become available.
If you wanted to wait for all results to become available, and possibly avoid some contention for locks - which you should always try to avoid if you can - then the code would look simpler, something like:
<?php
$pool = new Pool(4);
$results = new Volatile();
$expected = 10;
while (@$i++ < $expected) {
$pool->submit(new class($i, $results) extends Threaded {
public function __construct($id, Volatile $results) {
$this->id = $id;
$this->results = $results;
}
public function run() {
$result = file_get_contents('http://google.fr?q=' . $this->id);
$this->results->synchronized(function($results, $result){
$results[$this->id] = $result;
$results->notify();
}, $this->results, $result);
}
private $id;
private $results;
});
}
$results->synchronized(function() use($expected, $results) {
while (count($results) != $expected) {
$results->wait();
}
});
var_dump(count($results));
while ($pool->collect()) continue;
$pool->shutdown();
?>
Noteworthy that the Collectable
interface is already implemented by Threaded
in the most recent versions of pthreads - which is the one you should be using ... always ...
The docs are out of date, sorry about that ... one human ...
Pthreads V3 is much less forgiven than V2. collect is a no go in V3.
Rule n°1: I do all my queries inside the threads, avoiding to pass too large amount of datas inside them. This was ok with V2, not anymore with V3. I keep passed arguments to workers as neat as possible. This also allows faster process.
Rule n°2: I do not go over the number of CPU threads available for each pool and chunck them accordingly with a loop. This way I make sure there are no memory overhead with a ton of pools and each time a loop is done, I force a garbage collection. This turned out to be necessary for me due to very high Ram needs across threads, might not be your case but make sure your consumed ram is not going over your php limit. More you passed arguments to the threads are big, more the ram will go up fast.
Rule n°3: Properly declare your object arrays in workers with (array) to make sure all results are returned.
Here is a basic rewritten working example , following the 3 rules as close as I can do per your example:
uses an array of queries to be multithreaded.
a collectable implement to grab the results in place of collect.
batches of pools according to the CPU nb of threads to avoid ram overheads.
threaded queries, each one having his connection, not passed across workers.
pushing all the results inside an array at the end.
code:
define("SQLHOST", "127.0.0.1");
define("SQLUSER", "root");
define("SQLPASS", "password");
define("SQLDBTA", "mydatabase");
$Nb_of_th=12; // (6 cpu cores in this example)
$queries = array_chunk($queries, ($Nb_of_th));// whatever list of queries you want to pass to the workers
$global_data=array();// all results from all pool cycles
// first we set the main loops
foreach ($queries as $key => $chunks) {
$pool = new Pool($Nb_of_th, Worker::class);// 12 pools max
$workCount = count($chunks);
// second we launch the submits
foreach (range(1, $workCount) as $i) {
$chunck = $chunks[$i - 1];
$pool->submit(new MyWorkers($chunck));
}
$data = [];// pool cycle result array
$collector = function (\Collectable $work) use (&$data) {
$isGarbage = $work->isGarbage();
if ($isGarbage) {
$data[] = $work->result; // thread result
}
return $isGarbage;
};
do {
$count = $pool->collect($collector);
$isComplete = count($data) === $workCount;
} while (!$isComplete);
array_push($global_data, $data);// push pool results into main
//complete purge
unset($data);
$pool->shutdown();
unset($pool);
gc_collect_cycles();// force garbage collector before new pool cycle
}
Var_dump($global_data); // results for all pool cycles
class MyWorkers extends \Threaded implements \Collectable {
private $isGarbage;
public $result;
private $process;
public function __construct($process) {
$this->process = $process;
}
public function run() {
$con = new PDO('mysql:host=' . SQLHOST . ';dbname=' . SQLDBTA . ';charset=UTF8', SQLUSER, SQLPASS);
$proc = (array) $this->process; // important ! avoid volatile destruction in V3
$stmt = $con->prepare($proc);
$stmt->execute();
$obj = $stmt1->fetchall(PDO::FETCH_ASSOC);
/* do whatever you want to do here */
$this->result = (array) $obj; // important ! avoid volatile destruction in V3
$this->isGarbage = true;
}
public function isGarbage() : bool
{
return $this->isGarbage;
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With