prooph / pdo-event-store

PDO implementation of ProophEventStore http://getprooph.org
BSD 3-Clause "New" or "Revised" License
111 stars 56 forks source link

fromStreams(a, b, c) does not process in order #201

Closed basz closed 5 years ago

basz commented 5 years ago

The observed behavior

$projection->fromStreams(streamA, streamB) processes all events first from streamA, then all events from streamB regardless of the creation timestamp of the events.

https://github.com/prooph/pdo-event-store/blob/master/src/Projection/PdoEventStoreProjector.php#L515

The expected behavior.

It should process events in order across streams.

Solution.

An iterator that wraps multiple StreamIterators and compares timestamps.

Downsides.

Performance. slows down the processing of events a bit.

BC. It would change current behavior when someone uses the fromStreams with more than one stream. So not backward compatible.

A working implementation.

class MergedStream implements \Iterator, \Countable
{
    /**
     * @var \Iterator[]
     */
    private $iterators;

    /**
     * @var callable
     */
    private $compareFunction;

    /**
     * @var int
     */
    private $index;

    /**
     * @param callable    $compareFunction (same sort of usort()/uasort() callback)
     * @param \Iterator[] $iterators
     */
    public function __construct(callable $compareFunction, array $streamNames, \Iterator ...$iterators)
    {
        $this->compareFunction = $compareFunction;

        foreach ($iterators as $key => $iterator) {
            $this->appendIterator($streamNames[$key] ?? $key, $iterator);
        }
    }

    public function rewind(): void
    {
        foreach ($this->iterators as $it) {
            $it->rewind();
        }
        $this->index = 0;
    }

    /**
     * @return array one or more current values
     */
    public function current() : array
    {
        $current = [];
        foreach ($this->currentIterators() as $key => $value) {
            $current[$key] = $value->current();
        }

        return $current;
    }

    public function key(): int
    {
        return $this->index;
    }

    public function valid(): bool
    {
        return (bool) \count($this->validIterators());
    }

    public function next(): void
    {
        foreach ($this->currentIterators() as $iterator) {
            $iterator->next();
        }

        ++$this->index;
    }

    public function count(): int
    {
        $count = 0;
        foreach ($this->iterators as $iterator) {
            if ($iterator instanceof \Countable) {
                $count += \count($iterator);
            } else {
                $count += \iterator_count($iterator);
            }
        }

        return $count;
    }

    private function appendIterator(string $iteratorName, \Iterator $it): void
    {
        $this->iterators[$iteratorName] = $it;
    }

    /**
     * @return \Iterator[]
     */
    private function currentIterators(): array
    {
        $compareFunction = $this->compareFunction;

        $iterators = $this->validIterators();
        $r = \uasort($iterators, $compareFunction);
        if (false === $r) {
            throw new RuntimeException('Sorting failed.');
        }

        $compareAgainst = \reset($iterators);
        $sameIterators = [];
        foreach ($iterators as $key => $iterator) {
            $comparison = $compareFunction($iterator, $compareAgainst);
            if (0 !== $comparison) {
                break;
            }

            $sameIterators[$key] = $iterator;
        }

        \ksort($sameIterators);

        return $sameIterators;
    }

    /**
     * @return \Iterator[]
     */
    private function validIterators() : array
    {
        $validIterators = [];
        foreach ($this->iterators as $key => $iterator) {
            $iterator->valid() && $validIterators[$key] = $iterator;
        }

        return $validIterators;
    }
}

Usage.

$streams = [];

foreach ($streamNames as $streamName) {
    $streamName = new StreamName($streamName);

    $streams[(string) $streamName] = $this->eventStore->load($streamName, 1, PHP_INT_MAX);
}

$compareValue = function (\Iterator $iterator): \DateTimeImmutable {
    /** @var Message $message */
    $message = $iterator->current();

    return $message->createdAt();
};

$compareFunction = function (\Iterator $a, \Iterator $b) use ($compareValue) {
    return $compareValue($a) <=> $compareValue($b);
};

$mergedStreams = new MergedStream($compareFunction, \array_keys($streams), ...\array_values($streams));

foreach ($mergedStreams as $messages) {
    if (\count($messages) > 1) {
         throw new RuntimeException('Same birthday for a events ?.. Can\'t happen!');
    }

    foreach ($messages as $streamName => $event) {
        // $event is now the 'next' event from multiple streams
    }
}