I am using pcntl in order to speed up a quite heave CLI php script, that consists mostly of a class, that is in charge of sending all of the auto-emailing on my application.
My goal is as following: I want to assign each process to a certain task, within a foreach loop, the implementation I've used is the one shown in the code example below.
The problem is that once you fork a process, it executes asynchronously, and also gets a copy of the parent's process stack. In my case, what happens is that one task simply executes several times, My question is how can I design this script to be smarter in order to avoid such behavior?.
Code:
/**
@description this is the main procedure of this class, it iteratates over the relevant tasks and sends the emails using the SendGrid wrapper class
@see SendGridWrapper
@return void
*/
public function execute(){
if(!isset($this->tasks)){
throw new exception("Please call getRelevantTasks() prior to trying to execute anything");
}
$proccesses = array();
foreach($this->tasks as $myTask){
$pid = pcntl_fork();
if($pid){
$proccesses[] = $pid;
}
else if($pid == -1){
die('FORK FAILED, STATUS -1');
}
else{
if(isset($myTask['recipient_model'])){
$this->currentModel = $myTask['recipient_model'];
$lang = $myTask['lang'];
$classPath = self::$modelsDir . $myTask['recipient_model'] . '.php';
$className = $myTask['recipient_model'];
if(!class_exists($myTask['recipient_model'] )){
require_once(dirname(__FILE__) . '/../' . $classPath);
}
else if(isset($recipientFetcher)){
unset($this->model);
unset($this->mailingList);
unset($this->substitutionList);
}
$this->model = null;
$this->mailingList = null;
$this->substitutionList = null;
$this->model = new $className($myTask['lang']);
$addresses = $this->model->getMailRecipients();
if(empty($addresses) || sizeof($addresses) == 0){
continue;
}
$this->model->prepare();
$this->substitutionList = $this->model->getDynamicParams();
}
else{
throw new exception('No recipient model was found');
}
foreach($addresses as $myMail){
$this->mailingList[$myMail['personal_email']] = $myMail['contact_name'];
}
$templatePath = dirname(__FILE__) . '/../';
$templatePath .= $lang ? self::$templatesDirEn . $myTask['html_email_path'] : self::$templatesDirHe . $myTask['html_email_path'];
$html = file_get_contents($templatePath);
$this->sendMail($html, $myTask['task_schedule_id']);
echo "model:" . $myTask['recipient_model'];
echo $this->log;
$this->log = "";
die("\r\n Child proccess has been executed successfully\r\n");
}
}
if($pid){
foreach($proccesses as $key => $val){
pcntl_waitpid($val, $status, WUNTRACED);
}
}
}
Thanks in advance, Oleg.
Introduction
I see you are trying to send mails $this->sendMail($html, $myTask['task_schedule_id']);
and I think it's a really bad idea trying to use multiple process for this task. You should consider using message queue for this task because emails can be very slow.
Use a Queue System
You should be using Gearman, ZeroMQ or Beanstalkd for this task. Worst case scenario use Implement your own simple message queue with memcached
.
Here is a typical Gearman Example: https://stackoverflow.com/questions/13855907/when-to-send-auto-email-instantly-on-button-click-or-later
Quick Fix
Remove all those code and put it in a function called execute_worker
where you can push the task to it
// Break Task to groups
$tasks = array_chunk(range("A", "Z"), 10);
foreach($tasks as $task) {
$pid = pcntl_fork();
if ($pid == - 1) {
throw new ErrorException('FORK FAILED, STATUS -1');
break;
}
if ($pid == 0) {
execute_worker($task); // In Child
exit(); // In Child
}
}
Using Threads
You can also use Worker or Thread in PHP
with pThreads to speed up processing.
Simple Project
file_get_contents
is said to be slow when compared with curl
and no where close to the power of curl_multi_init
which allows the processing of multiple cURL handles in parallel.
See:
Our Objective would be to implement our own Multi file_get_contents
version
Multi file_get_contents Example
// My Storage
$s = new Storage();
// Threads Storage
$ts = array();
// Total Threads same as total pages
$pages = 100;
// Porpulate Threads (Don't Start Yet)
$i = 0;
while($i ++ < $pages) {
$ts[] = new Process($s, $i);
}
// Start the timer
$start = microtime(true);
// Lets start all our Threads
foreach($ts as $t) {
$t->start();
}
// Wait for all threads to compleate
foreach($ts as $t) {
$t->join();
}
printf("\n\nFound %s in %d pages", number_format($s->total), $pages);
printf("\nFinished %0.3f sec", microtime(true) - $start);
Output
php a.php
3:01:37: 3548 #START {"page":1}
3:01:37: 7064 #START {"page":2}
3:01:37: 10908 #START {"page":3}
3:01:37: 10424 #START {"page":4}
3:01:37: 11472 #START {"page":5}
3:01:37: 3876 #START {"page":6}
3:01:37: 7276 #START {"page":7}
3:01:37: 11484 #START {"page":8}
3:01:37: 932 #START {"page":9}
3:01:37: 11492 #START {"page":10}
3:01:37: 11500 #START {"page":11}
3:01:37: 11508 #START {"page":12}
3:01:37: 11504 #START {"page":13}
3:01:37: 11512 #START {"page":14}
3:01:37: 11516 #START {"page":15}
3:01:37: 11520 #START {"page":16}
3:01:37: 11524 #START {"page":17}
3:01:37: 11528 #START {"page":18}
3:01:37: 10816 #START {"page":19}
3:01:37: 7280 #START {"page":20}
3:01:37: 11556 #START {"page":21}
3:01:37: 11560 #START {"page":22}
3:01:37: 11564 #START {"page":23}
3:01:37: 11612 #START {"page":24}
3:01:37: 11616 #START {"page":25}
3:01:37: 11600 #START {"page":26}
3:01:37: 11608 #START {"page":27}
3:01:37: 11568 #START {"page":28}
3:01:37: 11452 #START {"page":29}
3:01:38: 11624 #START {"page":30}
3:01:38: 11628 #START {"page":31}
3:01:38: 11632 #START {"page":32}
3:01:38: 11636 #START {"page":33}
3:01:38: 11644 #START {"page":34}
3:01:38: 11648 #START {"page":35}
3:01:38: 11652 #START {"page":36}
3:01:38: 11656 #START {"page":37}
3:01:38: 11660 #START {"page":38}
3:01:38: 11664 #START {"page":39}
3:01:38: 11668 #START {"page":40}
3:01:38: 11672 #START {"page":41}
3:01:38: 11676 #START {"page":42}
3:01:38: 11680 #START {"page":43}
3:01:38: 11684 #START {"page":44}
3:01:38: 11688 #START {"page":45}
3:01:38: 11692 #START {"page":46}
3:01:38: 11696 #START {"page":47}
3:01:38: 11700 #START {"page":48}
3:01:38: 11704 #START {"page":49}
3:01:38: 11712 #START {"page":50}
3:01:38: 11708 #START {"page":51}
3:01:38: 11716 #START {"page":52}
3:01:38: 11720 #START {"page":53}
3:01:38: 11724 #START {"page":54}
3:01:38: 11728 #START {"page":55}
3:01:38: 11732 #START {"page":56}
3:01:38: 11736 #START {"page":57}
3:01:38: 11740 #START {"page":58}
3:01:38: 11744 #START {"page":59}
3:01:38: 11748 #START {"page":60}
3:01:38: 11752 #START {"page":61}
3:01:38: 11756 #START {"page":62}
3:01:38: 11760 #START {"page":63}
3:01:38: 11764 #START {"page":64}
3:01:38: 11768 #START {"page":65}
3:01:38: 11772 #START {"page":66}
3:01:38: 11776 #START {"page":67}
3:01:38: 11780 #START {"page":68}
3:01:38: 11784 #START {"page":69}
3:01:38: 11788 #START {"page":70}
3:01:38: 11792 #START {"page":71}
3:01:38: 11796 #START {"page":72}
3:01:38: 11800 #START {"page":73}
3:01:38: 11804 #START {"page":74}
3:01:38: 11808 #START {"page":75}
3:01:38: 11812 #START {"page":76}
3:01:38: 11816 #START {"page":77}
3:01:38: 11820 #START {"page":78}
3:01:38: 11824 #START {"page":79}
3:01:38: 11828 #START {"page":80}
3:01:38: 11832 #START {"page":81}
3:01:38: 11836 #START {"page":82}
3:01:38: 11840 #START {"page":83}
3:01:38: 11844 #START {"page":84}
3:01:38: 11848 #START {"page":85}
3:01:38: 11852 #START {"page":86}
3:01:38: 11856 #START {"page":87}
3:01:38: 11860 #START {"page":88}
3:01:38: 11864 #START {"page":89}
3:01:38: 11868 #START {"page":90}
3:01:38: 11872 #START {"page":91}
3:01:38: 11876 #START {"page":92}
3:01:38: 11880 #START {"page":93}
3:01:38: 11884 #START {"page":94}
3:01:38: 11888 #START {"page":95}
3:01:38: 11892 #START {"page":96}
3:01:38: 11896 #START {"page":97}
3:01:38: 11900 #START {"page":98}
3:01:38: 11904 #START {"page":99}
3:01:38: 11908 #START {"page":100}
3:01:38: 11508 #END {"page":12,"byte":1141,"count":155839}
3:01:38: 10424 #END {"page":4,"byte":1201,"count":553595}
3:01:38: 11516 #END {"page":15,"byte":1204,"count":119612}
3:01:38: 3548 #END {"page":1,"byte":1208,"count":6737525}
3:01:38: 11484 #END {"page":8,"byte":1160,"count":257021}
3:01:38: 11472 #END {"page":5,"byte":1175,"count":446411}
3:01:38: 10908 #END {"page":3,"byte":1222,"count":787301}
3:01:38: 11492 #END {"page":10,"byte":1175,"count":193958}
3:01:38: 11504 #END {"page":13,"byte":1130,"count":141450}
3:01:38: 11528 #END {"page":18,"byte":1102,"count":95511}
3:01:38: 11524 #END {"page":17,"byte":1147,"count":102727}
3:01:38: 11560 #END {"page":22,"byte":1111,"count":73536}
3:01:38: 11556 #END {"page":21,"byte":1101,"count":78097}
3:01:38: 11500 #END {"page":11,"byte":1201,"count":172820}
3:01:38: 932 #END {"page":9,"byte":1159,"count":222922}
3:01:38: 11520 #END {"page":16,"byte":1135,"count":110510}
3:01:38: 7064 #END {"page":2,"byte":1165,"count":1264444}
3:01:38: 11512 #END {"page":14,"byte":1123,"count":129721}
3:01:38: 11612 #END {"page":24,"byte":1115,"count":65012}
3:01:38: 11600 #END {"page":26,"byte":1134,"count":58928}
3:01:38: 7276 #END {"page":7,"byte":1189,"count":301469}
3:01:38: 10816 #END {"page":19,"byte":1120,"count":89609}
3:01:38: 11616 #END {"page":25,"byte":1052,"count":61793}
3:01:38: 3876 #END {"page":6,"byte":1188,"count":362101}
3:01:38: 7280 #END {"page":20,"byte":1079,"count":83632}
3:01:38: 11564 #END {"page":23,"byte":1076,"count":68909}
3:01:38: 11632 #END {"page":32,"byte":1095,"count":44013}
3:01:38: 11652 #END {"page":36,"byte":1042,"count":37185}
3:01:38: 11452 #END {"page":29,"byte":1097,"count":50532}
3:01:38: 11636 #END {"page":33,"byte":1097,"count":42148}
3:01:38: 11644 #END {"page":34,"byte":1124,"count":40236}
3:01:38: 11664 #END {"page":39,"byte":1078,"count":32792}
3:01:38: 11668 #END {"page":40,"byte":1017,"count":31487}
3:01:38: 11608 #END {"page":27,"byte":1117,"count":55561}
3:01:38: 11628 #END {"page":31,"byte":1076,"count":46133}
3:01:38: 11624 #END {"page":30,"byte":1111,"count":48265}
3:01:38: 11568 #END {"page":28,"byte":1076,"count":52851}
3:01:38: 11656 #END {"page":37,"byte":1068,"count":35590}
3:01:38: 11688 #END {"page":45,"byte":1062,"count":26060}
3:01:38: 11680 #END {"page":43,"byte":1081,"count":28013}
3:01:38: 11672 #END {"page":41,"byte":1086,"count":30320}
3:01:38: 11724 #END {"page":54,"byte":1060,"count":19900}
3:01:38: 11716 #END {"page":52,"byte":1069,"count":21079}
3:01:38: 11732 #END {"page":56,"byte":1038,"count":18748}
3:01:38: 11692 #END {"page":46,"byte":1033,"count":25230}
3:01:38: 11696 #END {"page":47,"byte":1098,"count":24469}
3:01:38: 11728 #END {"page":55,"byte":1003,"count":19353}
3:01:38: 11648 #END {"page":35,"byte":1105,"count":38651}
3:01:38: 11660 #END {"page":38,"byte":1075,"count":34037}
3:01:38: 11700 #END {"page":48,"byte":1059,"count":23725}
3:01:39: 11720 #END {"page":53,"byte":1028,"count":20463}
3:01:39: 11704 #END {"page":49,"byte":1006,"count":22966}
3:01:39: 11712 #END {"page":50,"byte":988,"count":22369}
3:01:39: 11676 #END {"page":42,"byte":1113,"count":29144}
3:01:39: 11748 #END {"page":60,"byte":1054,"count":17002}
3:01:39: 11684 #END {"page":44,"byte":1041,"count":26999}
3:01:39: 11756 #END {"page":62,"byte":1024,"count":16165}
3:01:39: 11760 #END {"page":63,"byte":1036,"count":15814}
3:01:39: 11740 #END {"page":58,"byte":1075,"count":17833}
3:01:39: 11736 #END {"page":57,"byte":1064,"count":18293}
3:01:39: 11752 #END {"page":61,"byte":1077,"count":16607}
3:01:39: 11708 #END {"page":51,"byte":1045,"count":21668}
3:01:39: 11768 #END {"page":65,"byte":1041,"count":15021}
3:01:39: 11764 #END {"page":64,"byte":1063,"count":15405}
3:01:39: 11744 #END {"page":59,"byte":1052,"count":17394}
3:01:39: 11800 #END {"page":73,"byte":1025,"count":12361}
3:01:39: 11792 #END {"page":71,"byte":1053,"count":13051}
3:01:39: 11796 #END {"page":72,"byte":1092,"count":12721}
3:01:39: 11784 #END {"page":69,"byte":1031,"count":13677}
3:01:39: 11780 #END {"page":68,"byte":1019,"count":13967}
3:01:39: 11772 #END {"page":66,"byte":1068,"count":14644}
3:01:39: 11816 #END {"page":77,"byte":1045,"count":11185}
3:01:39: 11804 #END {"page":74,"byte":1062,"count":12071}
3:01:39: 11824 #END {"page":79,"byte":1047,"count":10719}
3:01:39: 11820 #END {"page":78,"byte":1035,"count":10940}
3:01:39: 11788 #END {"page":70,"byte":987,"count":13354}
3:01:39: 11776 #END {"page":67,"byte":1036,"count":14278}
3:01:39: 11828 #END {"page":80,"byte":1013,"count":10519}
3:01:39: 11832 #END {"page":81,"byte":1052,"count":10318}
3:01:39: 11812 #END {"page":76,"byte":991,"count":11465}
3:01:39: 11808 #END {"page":75,"byte":1043,"count":11769}
3:01:39: 11860 #END {"page":88,"byte":1018,"count":8991}
3:01:39: 11852 #END {"page":86,"byte":971,"count":9362}
3:01:39: 11868 #END {"page":90,"byte":1006,"count":8641}
3:01:39: 11840 #END {"page":83,"byte":1026,"count":9922}
3:01:39: 11872 #END {"page":91,"byte":980,"count":8464}
3:01:39: 11892 #END {"page":96,"byte":936,"count":7727}
3:01:39: 11836 #END {"page":82,"byte":1052,"count":10117}
3:01:39: 11844 #END {"page":84,"byte":973,"count":9739}
3:01:39: 11864 #END {"page":89,"byte":1033,"count":8821}
3:01:39: 11856 #END {"page":87,"byte":994,"count":9169}
3:01:39: 11848 #END {"page":85,"byte":1040,"count":9544}
3:01:39: 11896 #END {"page":97,"byte":988,"count":7562}
3:01:39: 11876 #END {"page":92,"byte":1003,"count":8294}
3:01:39: 11888 #END {"page":95,"byte":995,"count":7860}
3:01:39: 11880 #END {"page":93,"byte":1052,"count":8143}
3:01:39: 11900 #END {"page":98,"byte":977,"count":7418}
3:01:39: 11904 #END {"page":99,"byte":999,"count":7270}
3:01:39: 11884 #END {"page":94,"byte":931,"count":8002}
3:01:39: 11908 #END {"page":100,"byte":977,"count":7144}
Found 14,075,927 in 100 pages
Finished 1.489 sec
Time Taken
Found 14,075,927 in 100 pages
Finished 1.489 sec
Classes Used
class Process extends Thread {
public function __construct($storage, $page) {
$this->storage = $storage;
$this->page = $page;
// $this->start();
}
public function run() {
$format = "%s: %1u %s\t%s\n";
$formatTime = "g:i:s";
$sleep = mt_rand(0, 1); // Just for Demo
printf($format, date($formatTime), $this->getThreadId(), "#START", "{\"page\":$this->page}");
// Do something useful
$data = file_get_contents(sprintf("http://api.stackoverflow.com/1.1/tags?pagesize=100&page=%s", $this->page));
// Decode the Data from API
$json = json_decode(gzdecode($data));
// Lets Build A profile
$profile = array();
$profile['page'] = $this->page;
$profile['byte'] = strlen($data);
// Do more work
$profile['count'] = array_sum(array_map(function ($v) {
return $v->count;
}, $json->tags));
$this->storage->total = bcadd($this->storage->total, $profile['count']);
// Print Information
printf($format, date($formatTime), $this->getThreadId(), "#END\t", json_encode($profile));
}
}
class Storage extends Stackable {
public $total = 0;
public function run() {
}
}
Conclusion
Did file_get_contents
just get 100 pages
in just 1.489 sec
with my crappy connection. Yes
it did. Tested the same code on my live server and It took me less than 0.939 sec
to fetch 200
pages.
Your application can be faster in so many ways you just have to use the right technology at the right place.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With