webdevilopers / php-ddd

PHP Symfony Doctrine Domain-driven Design
201 stars 10 forks source link

Batch / Bulk operations handling multiple event-sourced aggregate roots #54

Open webdevilopers opened 3 years ago

webdevilopers commented 3 years ago

We have a batch / bulk operation where multiple A+ES Spaces need to be updated. This is the current implementation:

<?php

namespace Acme\Host\Application\Service\Place;

final class LockSignInsHandler
{
    private PlaceDetailsFinder $placeDetailsFinder;

    private SpaceRepository $spaceRepository;

    public function __construct(PlaceDetailsFinder $placeDetailsFinder, SpaceRepository $spaceRepository)
    {
        $this->placeDetailsFinder = $placeDetailsFinder;
        $this->spaceRepository = $spaceRepository;
    }

    public function __invoke(LockSignIns $command): void
    {
        $placeDetails = $this->placeDetailsFinder->ofPlaceId($command->hostId(), $command->placeId());

        foreach ($command->spaceIds() as $spaceId) {
            $space = $this->spaceRepository->get($spaceId);
            $space->lockSignIn($placeDetails);

            $this->spaceRepository->save($space);
        }
    }
}

The scenario is not critical. If updating one Space fails the UI can easily retry.

I just wonder if there is a way to bind the updates into a single transaction.

This implementation uses Prooph Event Sourcing and Symfony Messenger as Event Bus. As soon as a Space is saved to the repository the events are published. This is the publisher used:

<?php

namespace Acme\Common\Infrastructure\Prooph;

use Iterator;
use Prooph\Common\Event\ActionEvent;
use Prooph\EventStore\ActionEventEmitterEventStore;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Plugin\AbstractPlugin;
use Prooph\EventStore\TransactionalActionEventEmitterEventStore;
use Symfony\Component\Messenger\MessageBusInterface;

final class EventPublisher extends AbstractPlugin
{
    private MessageBusInterface $eventBus;

    /**
     * @var Iterator[]
     */
    private array $cachedEventStreams = [];

    public function __construct(MessageBusInterface $eventBus)
    {
        $this->eventBus = $eventBus;
    }

    public function attachToEventStore(ActionEventEmitterEventStore $eventStore): void
    {
        $this->listenerHandlers[] = $eventStore->attach(
            ActionEventEmitterEventStore::EVENT_APPEND_TO,
            function (ActionEvent $event) use ($eventStore): void {
                $recordedEvents = $event->getParam('streamEvents', new \ArrayIterator());

                if (! $this->inTransaction($eventStore)) {
                    if ($event->getParam('streamNotFound', false)
                        || $event->getParam('concurrencyException', false)
                    ) {
                        return;
                    }

                    foreach ($recordedEvents as $recordedEvent) {
                        $this->eventBus->dispatch($recordedEvent);
                    }
                } else {
                    $this->cachedEventStreams[] = $recordedEvents;
                }
            }
        );

        $this->listenerHandlers[] = $eventStore->attach(
            ActionEventEmitterEventStore::EVENT_CREATE,
            function (ActionEvent $event) use ($eventStore): void {
                $stream = $event->getParam('stream');
                $recordedEvents = $stream->streamEvents();

                if (! $this->inTransaction($eventStore)) {
                    if ($event->getParam('streamExistsAlready', false)) {
                        return;
                    }

                    foreach ($recordedEvents as $recordedEvent) {
                        $this->eventBus->dispatch($recordedEvent);
                    }
                } else {
                    $this->cachedEventStreams[] = $recordedEvents;
                }
            }
        );

        if ($eventStore instanceof TransactionalActionEventEmitterEventStore) {
            $this->listenerHandlers[] = $eventStore->attach(
                TransactionalActionEventEmitterEventStore::EVENT_COMMIT,
                function (ActionEvent $event): void {
                    foreach ($this->cachedEventStreams as $stream) {
                        foreach ($stream as $recordedEvent) {
                            $this->eventBus->dispatch($recordedEvent);
                        }
                    }
                    $this->cachedEventStreams = [];
                }
            );

            $this->listenerHandlers[] = $eventStore->attach(
                TransactionalActionEventEmitterEventStore::EVENT_ROLLBACK,
                function (ActionEvent $event): void {
                    $this->cachedEventStreams = [];
                }
            );
        }
    }

    private function inTransaction(EventStore $eventStore): bool
    {
        return $eventStore instanceof TransactionalActionEventEmitterEventStore
            && $eventStore->inTransaction();
    }
}

@prolic @codeliner Any tipps?

prolic commented 3 years ago

You can dispatch commands from the command handler, thus handling each entry one by one. Another possibility is to use transactions on the event store.

On Sun, Dec 6, 2020, 07:39 webDEVILopers notifications@github.com wrote:

We have a batch / bulk operation where multiple A+ES Spaces need to be updated. This is the current implementation:

<?php namespace Acme\Host\Application\Service\Place; final class LockSignInsHandler { private PlaceDetailsFinder $placeDetailsFinder;

private SpaceRepository $spaceRepository;

public function __construct(PlaceDetailsFinder $placeDetailsFinder, SpaceRepository $spaceRepository)
{
    $this->placeDetailsFinder = $placeDetailsFinder;
    $this->spaceRepository = $spaceRepository;
}

public function __invoke(LockSignIns $command): void
{
    $placeDetails = $this->placeDetailsFinder->ofPlaceId($command->hostId(), $command->placeId());

    foreach ($command->spaceIds() as $spaceId) {
        $space = $this->spaceRepository->get($spaceId);
        $space->lockSignIn($placeDetails);

        $this->spaceRepository->save($space);
    }
}

}

The scenario is not critical. If updating one Space fails the UI can easily retry.

I just wonder if there is a way to bind the updates into a single transaction.

This implementation uses Prooph Event Sourcing and Symfony Messenger as Event Bus. As soon as a Space is saved to the repository the events are published. This is the publisher used:

<?php namespace Acme\Common\Infrastructure\Prooph; use Iterator;use Prooph\Common\Event\ActionEvent;use Prooph\EventStore\ActionEventEmitterEventStore;use Prooph\EventStore\EventStore;use Prooph\EventStore\Plugin\AbstractPlugin;use Prooph\EventStore\TransactionalActionEventEmitterEventStore;use Symfony\Component\Messenger\MessageBusInterface; final class EventPublisher extends AbstractPlugin { private MessageBusInterface $eventBus;

/**     * @var Iterator[]     */
private array $cachedEventStreams = [];

public function __construct(MessageBusInterface $eventBus)
{
    $this->eventBus = $eventBus;
}

public function attachToEventStore(ActionEventEmitterEventStore $eventStore): void
{
    $this->listenerHandlers[] = $eventStore->attach(
        ActionEventEmitterEventStore::EVENT_APPEND_TO,
        function (ActionEvent $event) use ($eventStore): void {
            $recordedEvents = $event->getParam('streamEvents', new \ArrayIterator());

            if (! $this->inTransaction($eventStore)) {
                if ($event->getParam('streamNotFound', false)
                    || $event->getParam('concurrencyException', false)
                ) {
                    return;
                }

                foreach ($recordedEvents as $recordedEvent) {
                    $this->eventBus->dispatch($recordedEvent);
                }
            } else {
                $this->cachedEventStreams[] = $recordedEvents;
            }
        }
    );

    $this->listenerHandlers[] = $eventStore->attach(
        ActionEventEmitterEventStore::EVENT_CREATE,
        function (ActionEvent $event) use ($eventStore): void {
            $stream = $event->getParam('stream');
            $recordedEvents = $stream->streamEvents();

            if (! $this->inTransaction($eventStore)) {
                if ($event->getParam('streamExistsAlready', false)) {
                    return;
                }

                foreach ($recordedEvents as $recordedEvent) {
                    $this->eventBus->dispatch($recordedEvent);
                }
            } else {
                $this->cachedEventStreams[] = $recordedEvents;
            }
        }
    );

    if ($eventStore instanceof TransactionalActionEventEmitterEventStore) {
        $this->listenerHandlers[] = $eventStore->attach(
            TransactionalActionEventEmitterEventStore::EVENT_COMMIT,
            function (ActionEvent $event): void {
                foreach ($this->cachedEventStreams as $stream) {
                    foreach ($stream as $recordedEvent) {
                        $this->eventBus->dispatch($recordedEvent);
                    }
                }
                $this->cachedEventStreams = [];
            }
        );

        $this->listenerHandlers[] = $eventStore->attach(
            TransactionalActionEventEmitterEventStore::EVENT_ROLLBACK,
            function (ActionEvent $event): void {
                $this->cachedEventStreams = [];
            }
        );
    }
}

private function inTransaction(EventStore $eventStore): bool
{
    return $eventStore instanceof TransactionalActionEventEmitterEventStore
        && $eventStore->inTransaction();
}

}

@prolic https://github.com/prolic @codeliner https://github.com/codeliner Any tipps?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/webdevilopers/php-ddd/issues/54, or unsubscribe https://github.com/notifications/unsubscribe-auth/AADAJPFRPQZUUQX54DTYRZDSTNNPVANCNFSM4UPHEQ5Q .

JulianMay commented 3 years ago

I would avoid atomic transactions across event-streams. It's not possible in implementations like EventStoreDB or SqlStreamStore, and limits distribution-options. First, I'd question the need for this: Do we really need "write all-or-nothing"? Is it Ok to change all the aggregates where it was possible, and collect the set of aggregates and reasons for failure/rejection for (manual or automated) followup? For example, you write "The scenario is not critical. If updating one Space fails the UI can easily retry." ... this, to me, sounds like an argument against atomic transaction across aggregtes - if 3 out of 1000 fails, do we really not want the 997 successful changes committed? As a user, I think I'd prefer a message saying something like: "The desired change was made successfully for 997 of the 1000 spaces. Below is a list of the spaces that could not be changed (and why)", maybe even with a "retry just these" button.

Secondly, if the answer to the above is "yes, we really need atomic write for this", I'd look at isolating that invariant in a separate stream (which enforces this invariant across all "space's"). For example, in a hotel, availability for a specific day could be an aggregate separate from the room-aggregates (with a streamId like Availability-{hotelId}-{date}

webdevilopers commented 3 years ago

Thanks for your feedback @prolic and @JulianMay .

I must confess that I was just curious how the solution would technically look like. Indeed this is a very easy use case and failure is acceptable. There are good solutions to solve this. As you mentioned: splitting up the commands and add communication to the user.