PHP Unconference Europe 2015

The Cond class

(PECL pthreads >= 2.0.0)

Introduction

The static methods contained in the Cond class provide direct access to Posix Condition Variables.

Class synopsis

Cond {
/* Methods */
final public static boolean broadcast ( long $condition )
final public static long create ( void )
final public static boolean destroy ( long $condition )
final public static boolean signal ( long $condition )
final public static boolean wait ( long $condition , long $mutex [, long $timeout ] )
}

Table of Contents

add a note add a note

User Contributed Notes 1 note

up
0
jan
1 year ago
As Cond (nor Mutex) is good for queuing tasks, and arrays/objects stored a "serialized" (can't use it as excepted for signals outside the run()) here is a simple task queue trait:
<?php
define
('T',1000000);
trait TaskQueue{
    private
$queue_len = 0;
    private
$queue_pointer = 0;
    private
$queue_curr = 0;
    protected function
queueInit(){$this->queue_len = 0; $this->queue_pointer = 0; $this->queue_curr = 0;}
    protected function
getNextQueued(){
        if (
$this->queue_len == 0) return null;
       
$p = $this->queue_curr;
       
$x = 0;
       
$ret = [];
        do{
           
$key = "queue_func_{$p}";
            if (
$this->$key){
               
$ret['func'] = $this->$key;
                unset(
$this->$key);
               
$key = "queue_params_{$p}";
               
$prs = $this->$key;
                unset(
$this->$key);
               
$ret['params'] = [];
                for (
$i = 0; $i < $prs; $i++){
                   
$key = "queue_param_{$p}_{$i}";
                   
$ret['params'][] =  $this->$key;
                    unset(
$this->$key);
                }
               
$this->queue_len--;
               
$this->queue_curr++;
                return
$ret;
            }
            else
$p++;
            if (
$x++ > 1000){   // the queue is corrupt
               
echo "\n\t\t task queue is corrupt for ".get_called_class()."\n";
               
$this->queue_len = 0;
               
$this->queue_pointer = 0;
                return
null;
            }
        }while(isset(
$this->$key));
        return
null;

    }
    protected function
addToQueue($func,$params = []){
        if (!
is_array($params) || !@method_exists($this,$func)) return false;
       
$p = $this->queue_pointer;
       
$key = "queue_func_{$p}";
       
$this->$key = $func;
       
$key = "queue_params_{$p}";
       
$this->$key = count($params);
       
$i = 0;
        foreach (
$params as $pa){
           
$key = "queue_param_{$p}_{$i}";
           
$this->$key = $pa;
           
$i++;
        }
       
$this->queue_pointer++;
       
$this->queue_len++;
        return
true;
    }
}

class
A extends Thread{
    use
TaskQueue;
    private
$sig_term;
    public function
__construct(){
       
$this->cond = Cond::create();
       
$this->sig_term = false;
       
$this->queueInit(); // threads don't like default parameters
       
$this->start();
    }

    public function
run(){
        while(!
$this->sig_term){
            while((
$qi = $this->getNextQueued())){
               
call_user_func_array([$this,$qi['func']],$qi['params']);
            }
            echo
"now doing MAIN -- ";
           
usleep(1*T);
            echo
"finished MAIN\n";
        }
       
Mutex::destroy($m);
    }

    private function
reallyDoA($a){
        echo
"doing A: {$a} -- ";
       
usleep(0.7*T);
        echo
"finished A\n";

    }
    public function
doA($param1){
       
$this->addToQueue('reallyDoA',[$param1]);
    }
    private function
reallyDoB($a,$b){
        echo
"doing B: {$a},{$b} -- ";
       
usleep(0.1*T);
        echo
"finished B\n";

    }
    public function
doB($param1, $param2){
       
$this->addToQueue('reallyDoB',[$param1,$param2]);
    }
    public function
term(){$this->sig_term = true;}
}

$a = new A();
usleep(1.2*T);
$a->doA(2);
$a->doA(3);
$a->doB("now b","something");
usleep(5.4*T);
$a->doA(5);
$a->doA(7);

usleep(3*T);
$a->term();
$a->join();
?>
very useful if you need syncronization from an outside event (ie: async socket read/write - your run() takes care of reading with timeout / signalig on_packet events, but writing may occur at any time (ie async heartbeeting) and fwrite($sock) while stream_select($sock) is waiting for timeout causes segfaults...)
To Top