pmmp / ext-pmmpthread

Fork of https://github.com/krakjoe/pthreads with a revamped API and PHP 8.1+ support
Other
82 stars 16 forks source link

Implement a proper queue datastructure #42

Open dktapps opened 3 years ago

dktapps commented 3 years ago

Right now, Threaded is your only option if you need a thread-safe queue. This is heavily suboptimal for large numbers of items.

Consider the following script:

<?php

declare(strict_types=1);

$buf = new \Threaded();
$t = new class($buf) extends \Thread{
    public \Threaded $queue;
    public bool $shutdown = true;

    public function __construct(\Threaded $t){
        $this->queue = $t;
    }

    public function run(){
        $this->synchronized(function() : void{
            $this->shutdown = false;
            $this->notify();
        });
        $time = 0;
        $read = 0;
        var_dump("child thread running");
        while(!$this->shutdown){
            $this->synchronized(function() : void{
                while($this->queue->count() === 0){
                    $this->wait();
                }
            });
            sleep(2);
            while($this->queue->count() > 0){
                $start = hrtime(true);
                $this->queue->shift();
                $time += (hrtime(true) - $start);
                $read++;
                if(($read % 10000) === 9999){
                    var_dump("time per element: " . ($time / $read) . "ns");
                }
            }
        }

        while($this->queue->count() > 0){
            $start = hrtime(true);
            $this->queue->shift();
            $time += (hrtime(true) - $start);
        }
    }
};
$t->start();
$t->synchronized(function() use ($t) : void{
    while($t->shutdown){
        $t->wait();
    }
});
var_dump("starting...");

for($i = 0; $i < 1024 * 512; $i++){
    $t->synchronized(function() use ($t, $buf) : void{
        $buf[] = "a";
        $t->notify();
    });
    if(($i % 10000) === 9999){
        var_dump($buf->count());
    }
}

$t->shutdown = true;
$t->join();

You can observe that the performance of shift() rapidly degrades, slowing the reader to a crawl. If the sleep() call is removed, it's snappy and happy.

This happens because the size of the underlying PHP HashTable balloons to a large size when the consumer is not shifting elements off the queue. When elements are removed, the underlying HashTable doesn't decrease in size, leaving a large number of empty gaps in the queue. This results in this loop taking an extraordinary amount of CPU time to find the first element of the HashTable, causing a huge performance loss to the reader.

This may manifest in PocketMine-MP under lag spike conditions. Because RakLib uses two Threaded objects to send and receive packets from the main thread, a lag spike on either side will cause the size of the Threaded hashtables to blow up and cause significant performance losses. Worse, because the HT is never downsized, the performance loss will persist even after the lag spike is gone.

In addition, this can cause significant performance losses for the writer, because the writer has to acquire a lock on the Threaded in order to add elements to it. The reader executes this extremely costly loop in a lock, which means that the writer will block for a significant amount of time if it happens to write at the same time as the reader is reading.

dktapps commented 9 months ago

Updated test case for ext-pmmpthread v6:

<?php

declare(strict_types=1);

use pmmp\thread\ThreadSafeArray;
use pmmp\thread\Thread;

$buf = new ThreadSafeArray();
$t = new class($buf) extends Thread{
    public ThreadSafeArray $queue;
    public bool $shutdown = true;

    public function __construct(ThreadSafeArray $t){
        $this->queue = $t;
    }

    public function run() : void{
        $this->synchronized(function() : void{
            $this->shutdown = false;
            $this->notify();
        });
        $time = 0;
        $read = 0;
        var_dump("child thread running");
        while(!$this->shutdown){
            $this->synchronized(function() : void{
                while($this->queue->count() === 0){
                    $this->wait();
                }
            });
            sleep(2);
            $prev = -1;
            while($this->queue->count() > 0){
                $start = hrtime(true);
                $i = $this->queue->shift();
                if($i !== ($prev + 1)){
                    var_dump("out of order: received " . $i . " but expected " . ($prev + 1));
                }
                $prev = $i;
                $time += (hrtime(true) - $start);
                $read++;
                if(($read % 10000) === 9999){
                    var_dump("time per element: " . ($time / $read) . "ns");
                }
            }
        }

        while($this->queue->count() > 0){
            $start = hrtime(true);
            $this->queue->shift();
            $time += (hrtime(true) - $start);
        }
    }
};
$t->start(Thread::INHERIT_ALL);
$t->synchronized(function() use ($t) : void{
    while($t->shutdown){
        $t->wait();
    }
});
var_dump("starting...");

for($i = 0; $i < 1024 * 512; $i++){
    $t->synchronized(function() use ($t, $buf, $i) : void{
        $buf[] = $i;
        $t->notify();
    });
    if(($i % 10000) === 9999){
        var_dump($buf->count());
    }
}

$t->shutdown = true;
$t->join();