prooph / event-store-symfony-bundle

Event Store Symfony Bundle
http://getprooph.org/
BSD 3-Clause "New" or "Revised" License
108 stars 39 forks source link

Plugins and Metadata Enrichers are not recognized #73

Closed webdevilopers closed 1 year ago

webdevilopers commented 3 years ago

Came from:

Though this could be a general (PDO) Event Store issue I post it here first to get sure it is not a Symfony wiring problem.

We successfully register projections and an Upcaster as plugin:

prooph_event_store:
  stores:
    default:
      event_store: Prooph\EventStore\EventStore
      repositories:
        visit_collection:
          repository_class: Trexxon\Visit\Infrastructure\Persistence\PgsqlVisitEventStoreRepository
          aggregate_type: Trexxon\Visit\Domain\Model\Visit\Visit
          aggregate_translator: Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator
          stream_name: 'visit_stream'

  projection_managers:
    default_projection_manager:
      event_store: Prooph\EventStore\EventStore
      connection: '@app.event_store.pdo_connection.pgsql'
      projections:
        visit_details:
          read_model: Trexxon\Visit\Infrastructure\Projection\MongoVisitDetailsReadModel
          projection: Trexxon\Visit\Infrastructure\Projection\MongoVisitDetailsProjection

services:
  Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator: null

    Prooph\EventStore\EventStore: '@app.event_store.default'

    app.event_store.default:
        class: Prooph\EventStore\Pdo\PostgresEventStore
        arguments:
            - '@prooph_event_store.message_factory'
            - '@app.event_store.pdo_connection.pgsql'
            - '@app.event_store.pgsql.persistence_strategy'

    app.event_store.pdo_connection.pgsql:
        class: \PDO
        arguments:
            - '%env(PGSQL_DSN)%'
            - '%env(PGSQL_USER)%'
            - '%env(PGSQL_PASSWORD)%'

  Trexxon\Visit\Infrastructure\Projection\MongoVisitDetailsProjection: ~
  Trexxon\Visit\Infrastructure\Projection\MongoVisitDetailsReadModel:
    arguments: ['@doctrine_mongodb.odm.default_connection', '%env(resolve:MONGODB_DB)%']

  Prooph\EventStore\Plugin\UpcastingPlugin:
    arguments: ['@Trexxon\Common\Infrastructure\Prooph\EventStore\PersonalDataUpcaster']
    tags:
      - { name: 'prooph_event_store.default.plugin' }

  # Upcaster
  Trexxon\Common\Infrastructure\Prooph\EventStore\PersonalDataUpcaster:
    arguments: [ '@Trexxon\Common\Infrastructure\Prooph\EventStore\PgsqlPersonalDataStorage' ]

Upcasting works as expected when saving or getting aggregates from the AggregateRepository. The used Event Store is an instance of Prooph\EventStore\TransactionalActionEventEmitterEventStore that holds a property named actionEventEmitter:

 Prooph\EventStore\TransactionalActionEventEmitterEventStore {#775 ▼
  #eventStore: Prooph\EventStore\Pdo\PostgresEventStore {#767 ▼
    -messageFactory: Prooph\Common\Messaging\FQCNMessageFactory {#768}
    -connection: PDO {#769 ▼
      inTransaction: false
      attributes: {▼
        CASE: NATURAL
        ERRMODE: SILENT
        PERSISTENT: false
        DRIVER_NAME: "pgsql"
        SERVER_INFO: "PID: 26359; Client Encoding: UTF8; Is Superuser: off; Session Authorization: trexxon; Date Style: ISO, DMY"
        ORACLE_NULLS: NATURAL
        CLIENT_VERSION: "12.4 (Ubuntu 12.4-1)"
        SERVER_VERSION: "12.6 (Ubuntu 12.6-0ubuntu0.20.10.1)"
        STATEMENT_CLASS: array:1 [▶]
        EMULATE_PREPARES: false
        CONNECTION_STATUS: "Connection OK; waiting to send."
        DEFAULT_FETCH_MODE: BOTH
      }
    }
    -persistenceStrategy: Prooph\EventStore\Pdo\PersistenceStrategy\PostgresSingleStreamStrategy {#770 ▶}
    -loadBatchSize: 10000
    -eventStreamsTable: "event_streams"
    -disableTransactionHandling: false
    -writeLockStrategy: Prooph\EventStore\Pdo\WriteLockStrategy\NoLockStrategy {#772}
  }
  #actionEventEmitter: Prooph\Common\Event\ProophActionEventEmitter {#776 ▼
    #events: array:15 [▼
      "create" => array:3 [▼
        "1.0" => array:1 [▶]
        "0.0" => array:1 [▶]
        "1000.0" => array:1 [▼
          0 => Prooph\Common\Event\DefaultListenerHandler {#830 ▼
            -listener: array:2 [▼
              0 => Prooph\EventStore\Metadata\MetadataEnricherPlugin {#826 ▶}
              1 => "onEventStoreCreateStream"
            ]
          }
        ]
      ]
      "appendTo" => array:3 [▶]
      "load" => array:2 [▼
        "1.0" => array:1 [▼
          0 => Prooph\Common\Event\DefaultListenerHandler {#782 ▶}
        ]
        "-1000.0" => array:1 [▼
          0 => Prooph\Common\Event\DefaultListenerHandler {#812 ▼
            -listener: Closure(ActionEvent $actionEvent): void {#811 ▼
              returnType: "void"
              class: "Prooph\EventStore\Plugin\UpcastingPlugin"
              this: Prooph\EventStore\Plugin\UpcastingPlugin {#807 …}
            }
          }
        ]
      ]
      "loadReverse" => array:2 [▶]
      "delete" => array:1 [▶]
      "hasStream" => array:1 [▶]
      "fetchStreamMetadata" => array:1 [▶]
      "updateStreamMetadata" => array:1 [▶]
      "fetchStreamNames" => array:1 [▶]
      "fetchStreamNamesRegex" => array:1 [▶]
      "fetchCategoryNames" => array:1 [▶]
      "fetchCategoryNamesRegex" => array:1 [▶]
      "beginTransaction" => array:1 [▶]
      "commit" => array:2 [▶]
      "rollback" => array:2 [▶]
    ]
    #availableEventNames: array:15 [▶]
  }
}

But when running projections the plugin is not used. The Event Store is an instance of Prooph\EventStore\Pdo\PostgresEventStore without the actionEventEmitter instead:

Prooph\EventStore\Pdo\Projection\PdoEventStoreReadModelProjector]8;;file:///var/www/visitify/vendor/prooph/pdo-event-store/src/Projection/PdoEventStoreReadModelProjector.php#L37\^]8;;\ {#201
  -eventStore: Prooph\EventStore\Pdo\PostgresEventStore]8;;file:///var/www/visitify/vendor/prooph/pdo-event-store/src/PostgresEventStore.php#L40\^]8;;\ {#340
    -messageFactory: Prooph\Common\Messaging\FQCNMessageFactory]8;;file:///var/www/visitify/vendor/prooph/common/src/Messaging/FQCNMessageFactory.php#L20\^]8;;\ {#315}
    -connection: PDO {#316
      inTransaction: false
      attributes: {
        CASE: NATURAL
        ERRMODE: SILENT
        PERSISTENT: false
        DRIVER_NAME: "pgsql"
        SERVER_INFO: "PID: 26659; Client Encoding: UTF8; Is Superuser: off; Session Authorization: trexxon; Date Style: ISO, DMY"
        ORACLE_NULLS: NATURAL
        CLIENT_VERSION: "12.4 (Ubuntu 12.4-1)"
        SERVER_VERSION: "12.6 (Ubuntu 12.6-0ubuntu0.20.10.1)"
        STATEMENT_CLASS: array:1 [
          0 => "PDOStatement"
        ]
        EMULATE_PREPARES: false
        CONNECTION_STATUS: "Connection OK; waiting to send."
        DEFAULT_FETCH_MODE: BOTH
      }
    }
    -persistenceStrategy: Prooph\EventStore\Pdo\PersistenceStrategy\PostgresSingleStreamStrategy]8;;file:///var/www/visitify/vendor/prooph/pdo-event-store/src/PersistenceStrategy/PostgresSingleStreamStrategy.php#L23\^]8;;\ {#317
      -messageConverter: Prooph\EventStore\Pdo\DefaultMessageConverter]8;;file:///var/www/visitify/vendor/prooph/pdo-event-store/src/DefaultMessageConverter.php#L19\^]8;;\ {#313}
    }
    -loadBatchSize: 10000
    -eventStreamsTable: "event_streams"
    -disableTransactionHandling: false
    -writeLockStrategy: Prooph\EventStore\Pdo\WriteLockStrategy\NoLockStrategy]8;;file:///var/www/visitify/vendor/prooph/pdo-event-store/src/WriteLockStrategy/NoLockStrategy.php#L18\^]8;;\ {#312}
  }
  -connection: PDO {#316}
  -name: "visit_details"
  -readModel: Trexxon\Visit\Infrastructure\Projection\MongoVisitDetailsReadModel]8;;file:///var/www/visitify/src/Trexxon/Visit/Infrastructure/Projection/MongoVisitDetailsReadModel.php#L10\^]8;;\ {#219
    -client: MongoDB\Client]8;;file:///var/www/visitify/vendor/mongodb/mongodb/src/Client.php#L44\^]8;;\ {#377
      -manager: MongoDB\Driver\Manager {#376}
      -readConcern: MongoDB\Driver\ReadConcern {#368}
      -readPreference: MongoDB\Driver\ReadPreference {#371
        +"mode": "primary"
      }
      -uri: "mongodb://localhost:27017"
      -typeMap: array:2 [
        "root" => "array"
        "document" => "array"
      ]
      -writeConcern: MongoDB\Driver\WriteConcern {#370}
      manager: MongoDB\Driver\Manager {#376}
      uri: "mongodb://localhost:27017"
      typeMap: array:2 [
        "root" => "array"
        "document" => "array"
      ]
      writeConcern: MongoDB\Driver\WriteConcern {#370}
    }
    -database: MongoDB\Database]8;;file:///var/www/visitify/vendor/mongodb/mongodb/src/Database.php#L47\^]8;;\ {#204
      -databaseName: "visitify_api"
      -manager: MongoDB\Driver\Manager {#376}
      -readConcern: MongoDB\Driver\ReadConcern {#194}
      -readPreference: MongoDB\Driver\ReadPreference {#195
        +"mode": "primary"
      }
      -typeMap: array:2 [
        "root" => "array"
        "document" => "array"
      ]
      -writeConcern: MongoDB\Driver\WriteConcern {#196}
      databaseName: "visitify_api"
      manager: MongoDB\Driver\Manager {#376}
      readConcern: MongoDB\Driver\ReadConcern {#194}
      readPreference: MongoDB\Driver\ReadPreference {#195}
      typeMap: array:2 [
        "root" => "array"
        "document" => "array"
      ]
      writeConcern: MongoDB\Driver\WriteConcern {#196}
    }
    -collection: MongoDB\Collection]8;;file:///var/www/visitify/vendor/mongodb/mongodb/src/Collection.php#L67\^]8;;\ {#197
      -collectionName: "visit_details"
      -databaseName: "visitify_api"
      -manager: MongoDB\Driver\Manager {#376}
      -readConcern: MongoDB\Driver\ReadConcern {#198}
      -readPreference: MongoDB\Driver\ReadPreference {#199
        +"mode": "primary"
      }
      -typeMap: array:2 [
        "root" => "array"
        "document" => "array"
      ]
      -writeConcern: MongoDB\Driver\WriteConcern {#200}
      collectionName: "visit_details"
      databaseName: "visitify_api"
      manager: MongoDB\Driver\Manager {#376}
      readConcern: MongoDB\Driver\ReadConcern {#198}
      readPreference: MongoDB\Driver\ReadPreference {#199}
      typeMap: array:2 [
        "root" => "array"
        "document" => "array"
      ]
      writeConcern: MongoDB\Driver\WriteConcern {#200}
    }
    -stack: []
  }
  -eventStreamsTable: "event_streams"
  -projectionsTable: "projections"
  -streamPositions: []
  -persistBlockSize: 1000
  -state: []
  -status: Prooph\EventStore\Projection\ProjectionStatus]8;;file:///var/www/visitify/vendor/prooph/event-store/src/Projection/ProjectionStatus.php#L26\^]8;;\ {#172
    -value: "idle"
    -ordinal: null
  }
  -initCallback: null
  -handler: null
  -handlers: []
  -isStopped: false
  -currentStreamName: null
  -lockTimeoutMs: 1000
  -eventCounter: 0
  -sleep: 100000
  -triggerPcntlSignalDispatch: false
  -updateLockThreshold: 0
  -gapDetection: null
  -query: null
  -vendor: "pgsql"
  -lastLockUpdate: null
  -metadataMatcher: null
}

There is a handlers property but it is empty.

Should the used Event Store instances not be the same?

Refs:

webdevilopers commented 3 years ago

Thank you to @prolic for confirming that the event-store should work as expected:

<?php

/**
 * This file is part of prooph/event-store.
 * (c) 2014-2021 prooph software GmbH <contact@prooph.de>
 * (c) 2015-2021 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
 *
 * For the full copyright and license information, please view the LICENSE
 * file that was distributed with this source code.
 */

declare(strict_types=1);

namespace Prooph\EventStore\Plugin;

use Prooph\Common\Event\ActionEvent;
use Prooph\EventStore\ActionEventEmitterEventStore;
use Prooph\EventStore\StreamIterator\StreamIterator;
use Prooph\EventStore\Upcasting\Upcaster;
use Prooph\EventStore\Upcasting\UpcastingIterator;

final class UpcastingPlugin extends AbstractPlugin
{
    public const ACTION_EVENT_PRIORITY = -1000;

    /**
     * @var Upcaster
     */
    private $upcaster;

    public function __construct(Upcaster $upcaster)
    {
        $this->upcaster = $upcaster;
    }

    public function attachToEventStore(ActionEventEmitterEventStore $eventStore): void
    {
        $upcaster = function (ActionEvent $actionEvent): void {
            $streamEvents = $actionEvent->getParam('streamEvents');

            if (! $streamEvents instanceof StreamIterator) {
                return;
            }

            $actionEvent->setParam('streamEvents', new UpcastingIterator($this->upcaster, $streamEvents));
        };

        $eventStore->attach(
            ActionEventEmitterEventStore::EVENT_LOAD,
            $upcaster,
            self::ACTION_EVENT_PRIORITY
        );

        $eventStore->attach(
            ActionEventEmitterEventStore::EVENT_LOAD_REVERSE,
            $upcaster,
            self::ACTION_EVENT_PRIORITY
        );
    }
}

I will start debugging the symfony bridge.

webdevilopers commented 3 years ago

So far these are the events attached to "LOAD" when using the AggregateRepository:

  #actionEventEmitter: Prooph\Common\Event\ProophActionEventEmitter {#635 ▼
    #events: array:15 [▼
      "create" => array:3 [▶]
      "appendTo" => array:3 [▶]
      "load" => array:2 [▼
        "1.0" => array:1 [▶]
        "-1000.0" => array:1 [▼
          0 => Prooph\Common\Event\DefaultListenerHandler {#682 ▼
            -listener: Closure(ActionEvent $actionEvent): void {#679 ▼
              returnType: "void"
              class: "Prooph\EventStore\Plugin\UpcastingPlugin"
              this: Prooph\EventStore\Plugin\UpcastingPlugin {#671 …}
            }
          }
        ]
      ]

This is the version used by the Projectors - only the default listener is attached:

  #actionEventEmitter: Prooph\Common\Event\ProophActionEventEmitter]8;;file:///var/www/visitify/vendor/prooph/common/src/Event/ProophActionEventEmitter.php#L18\^]8;;\ {#417
    #events: array:15 [
      "load" => array:1 [
        "1.0" => array:1 [
          0 => Prooph\Common\Event\DefaultListenerHandler]8;;file:///var/www/visitify/vendor/prooph/common/src/Event/DefaultListenerHandler.php#L16\^]8;;\ {#421
            -listener: Closure(ActionEvent $event): void]8;;file:///var/www/visitify/vendor/prooph/event-store/src/ActionEventEmitterEventStore.php#L94\^]8;;\ {#65
              returnType: "void"
              class: "]8;;file:///var/www/visitify/vendor/prooph/event-store/src/ActionEventEmitterEventStore.php#L26\Prooph\EventStore\ActionEventEmitterEventStore]8;;\"
              this: Prooph\EventStore\TransactionalActionEventEmitterEventStore]8;;file:///var/www/visitify/vendor/prooph/event-store/src/TransactionalActionEventEmitterEventStore.php#L21\^]8;;\ {#321}
            }
          }
        ]
      ]
webdevilopers commented 3 years ago

@prolic I noticed a difference between the master and 0.8.0 branch:

Could this be related? We are indeed using 0.8.0. The master features some "projections options"?

webdevilopers commented 3 years ago

After updating to dev-master I get e.g.

Unrecognized option "repositories" under "prooph_event_store.stores.default  
!!    ". Available options are "event_emitter", "event_store", "wrap_action_event  
!!    _emitter".  
webdevilopers commented 3 years ago

We couldn't find a "Upcaster " example in the docs:

So we tried this configuration:

  Prooph\EventStore\Plugin\UpcastingPlugin:
    arguments: ['@Trexxon\Common\Infrastructure\Prooph\EventStore\PersonalDataUpcaster']
    tags:
      - { name: 'prooph_event_store.default.plugin' }

  # Upcaster
  Trexxon\Common\Infrastructure\Prooph\EventStore\PersonalDataUpcaster:
    arguments: [ '@Trexxon\Common\Infrastructure\Prooph\EventStore\PgsqlPersonalDataStorage' ]

Since this seems to work with aggregate repositories this seems to be the correct approach.

webdevilopers commented 3 years ago

The next thing I tried is disabling the wrap_action_event_emitter option on the Event Store. The result of the Event Store then changes to:

^ Prooph\EventStore\Pdo\PostgresEventStore {#611 ▼
  -messageFactory: Prooph\Common\Messaging\FQCNMessageFactory {#612}
  -connection: PDO {#613 ▶}
  -persistenceStrategy: Prooph\EventStore\Pdo\PersistenceStrategy\PostgresSingleStreamStrategy {#614 ▶}
  -loadBatchSize: 10000
  -eventStreamsTable: "event_streams"
  -disableTransactionHandling: false
  -writeLockStrategy: Prooph\EventStore\Pdo\WriteLockStrategy\NoLockStrategy {#616}
}

That is exactly the same Event Store configuration that can be found inside the Projectors.

That means that the Projectors need to wrap the action event emitter. But there is no such configuration option in 0.8.0 for the symfony config. Is this an issue with this bundle?

webdevilopers commented 3 years ago

This issue seems to address the problem:

Also:

An EventStoreDecorator with a getInnerEventStore() method would be useful to support projection factories that they can allow decorated versions of their supported event stores.

If I understand correctly that it was fixed here:

But I guess this bundle is still missing something.

webdevilopers commented 3 years ago

H, this is exactly our solution:

I checked again about mixing PostgresEventStore with MongoDbProjection. With the shipped classes, this will NOT be possible.

@prolic Could you please explain what exactly this means for our current setup? The Projections work fine. But the action events can not be attached at all? Or is this related to really mixing read models with different database types?

webdevilopers commented 3 years ago

Regarding the changes by @lunetics :

I checked the Event Store factory used for AggregateRepository and Projector.

Both have $wrapActionEventEmitter set to true. Both use the Prooph\Bundle\EventStore\Factory\DefaultActionEventEmitterFactory and the FQCN Prooph\Common\Event\ProophActionEventEmitter.

And both show the expected plugins.

Screenshot from 2021-03-30 12-45-49 Screenshot from 2021-03-30 12-45-43

webdevilopers commented 3 years ago

I can also confirm that the DefaultEventStoreFactory correctly attaches all plugins and returns it.

Screenshot from 2021-03-30 12-51-02

webdevilopers commented 3 years ago

At the bottom line the factories seem to wrap the action emitter. But "in the end" it does not make it to the Projector. I must confess that I have no idea where else to look. I hope @lunetics has some feedback since he was involved in #10.

Fingers crossed.

lunetics commented 3 years ago

TBH, haven't worked deep with that component in the last year(s), will have a quick look over it (normally I even forget about code I wrote a week ago)

lunetics commented 3 years ago

Could it be because of the priority?, i see a -1000 there

webdevilopers commented 3 years ago

The *-1000 is the default value added by the Upcaster:

  Prooph\EventStore\Plugin\UpcastingPlugin:
    arguments: ['@Trexxon\Common\Infrastructure\Prooph\EventStore\PersonalDataUpcaster']
    tags:
      - { name: 'prooph_event_store.default.plugin' }
<?php

/**
 * This file is part of prooph/event-store.
 * (c) 2014-2021 prooph software GmbH <contact@prooph.de>
 * (c) 2015-2021 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
 *
 * For the full copyright and license information, please view the LICENSE
 * file that was distributed with this source code.
 */

declare(strict_types=1);

namespace Prooph\EventStore\Plugin;

use Prooph\Common\Event\ActionEvent;
use Prooph\EventStore\ActionEventEmitterEventStore;
use Prooph\EventStore\StreamIterator\StreamIterator;
use Prooph\EventStore\Upcasting\Upcaster;
use Prooph\EventStore\Upcasting\UpcastingIterator;

final class UpcastingPlugin extends AbstractPlugin
{
    public const ACTION_EVENT_PRIORITY = -1000;

    /**
     * @var Upcaster
     */
    private $upcaster;

    public function __construct(Upcaster $upcaster)
    {
        $this->upcaster = $upcaster;
    }

    public function attachToEventStore(ActionEventEmitterEventStore $eventStore): void
    {
        $upcaster = function (ActionEvent $actionEvent): void {
            $streamEvents = $actionEvent->getParam('streamEvents');

            if (! $streamEvents instanceof StreamIterator) {
                return;
            }

            $actionEvent->setParam('streamEvents', new UpcastingIterator($this->upcaster, $streamEvents));
        };

        $eventStore->attach(
            ActionEventEmitterEventStore::EVENT_LOAD,
            $upcaster,
            self::ACTION_EVENT_PRIORITY
        );

        $eventStore->attach(
            ActionEventEmitterEventStore::EVENT_LOAD_REVERSE,
            $upcaster,
            self::ACTION_EVENT_PRIORITY
        );
    }
}

And it works fine with the AggregateRepository. The problem ist that the "wrapped" Event Store - including the events - never reaches the Projector. There only the pure (inner?) Event Store is received.

webdevilopers commented 3 years ago

I could imagine that the following is the main problem - factories use the "inner" event store instead of the wrapped one. Then the event emitter gets lost.

namespace Prooph\Bundle\EventStore;

class ProjectionManagerFactory
{
    public function createProjectionManager(
        EventStore $eventStore,
        ?PDO $connection = null,
        string $eventStreamsTable = 'event_streams',
        string $projectionsTable = 'projections'
    ): ProjectionManager {
        $checkConnection = function () use ($connection): PDO {
            if (! $connection instanceof PDO) {
                throw new RuntimeException('PDO connection missing');
            }

            return $connection;
        };

        $realEventStore = $this->getTheRealEventStore($eventStore);

        if ($realEventStore instanceof InMemoryEventStore) {
            return new InMemoryProjectionManager($eventStore);
        }

        if ($realEventStore instanceof PostgresEventStore) {
            return new PostgresProjectionManager($eventStore, $checkConnection(), $eventStreamsTable, $projectionsTable);
        }

        if ($realEventStore instanceof MySqlEventStore) {
            return new MySqlProjectionManager($eventStore, $checkConnection(), $eventStreamsTable, $projectionsTable);
        }

        if ($realEventStore instanceof MariaDbEventStore) {
            return new MariaDbProjectionManager($eventStore, $checkConnection(), $eventStreamsTable, $projectionsTable);
        }

        throw new RuntimeException(\sprintf('ProjectionManager for %s not implemented.', \get_class($realEventStore)));
    }

    /**
     * Gets the "real" event store in case we were provided with an EventStoreDecorator.
     * That's the one that will really perfom the actions.
     *
     * @param EventStore $eventStore
     *
     * @return EventStore
     */
    private function getTheRealEventStore(EventStore $eventStore): EventStore
    {
        $realEventStore = $eventStore;

        while ($realEventStore instanceof EventStoreDecorator) {
            $realEventStore = $realEventStore->getInnerEventStore();
        }

        return $realEventStore;
    }
}
namespace Prooph\EventStore\Pdo\Projection;

final class PdoEventStoreProjector implements Projector
{
    public function __construct(
        EventStore $eventStore,
        PDO $connection,
        string $name,
        string $eventStreamsTable,
        string $projectionsTable,
        int $lockTimeoutMs,
        int $cacheSize,
        int $persistBlockSize,
        int $sleep,
        bool $triggerPcntlSignalDispatch = false,
        int $updateLockThreshold = 0,
        GapDetection $gapDetection = null
    ) {
        if ($triggerPcntlSignalDispatch && ! \extension_loaded('pcntl')) {
            throw Exception\ExtensionNotLoadedException::withName('pcntl');
        }

        $this->eventStore = $eventStore;
        $this->connection = $connection;
        $this->name = $name;
        $this->eventStreamsTable = $eventStreamsTable;
        $this->projectionsTable = $projectionsTable;
        $this->lockTimeoutMs = $lockTimeoutMs;
        $this->cachedStreamNames = new ArrayCache($cacheSize);
        $this->persistBlockSize = $persistBlockSize;
        $this->sleep = $sleep;
        $this->status = ProjectionStatus::IDLE();
        $this->triggerPcntlSignalDispatch = $triggerPcntlSignalDispatch;
        $this->updateLockThreshold = $updateLockThreshold;
        $this->gapDetection = $gapDetection;
        $this->vendor = $this->connection->getAttribute(PDO::ATTR_DRIVER_NAME);

        while ($eventStore instanceof EventStoreDecorator) {
            $eventStore = $eventStore->getInnerEventStore();
        }

        if (! $eventStore instanceof PdoEventStore) {
            throw new Exception\InvalidArgumentException('Unknown event store instance given');
        }
    }
namespace Prooph\EventStore\Pdo\Projection;

final class PostgresProjectionManager implements ProjectionManager
{
    public function __construct(
        EventStore $eventStore,
        PDO $connection,
        string $eventStreamsTable = 'event_streams',
        string $projectionsTable = 'projections'
    ) {
        $this->eventStore = $eventStore;
        $this->connection = $connection;
        $this->eventStreamsTable = $eventStreamsTable;
        $this->projectionsTable = $projectionsTable;

        while ($eventStore instanceof EventStoreDecorator) {
            $eventStore = $eventStore->getInnerEventStore();
        }

        if (! $eventStore instanceof PostgresEventStore) {
            throw new Exception\InvalidArgumentException('Unknown event store instance given');
        }
    }
webdevilopers commented 3 years ago

I just looked at ProjectionManagerFactory and PdoEventStoreProjector - the Event Store instances that arrive at their constructor are already "not wrapped" - no event emitter.

webdevilopers commented 3 years ago

@codeliner I sent you a DM on twitter. Hope to hear from you soon.

webdevilopers commented 3 years ago

I think I finally found the cause for the missing wrapped event store.

<?php

declare(strict_types=1);

namespace Prooph\Bundle\EventStore\Factory;

use Prooph\EventStore\EventStore;
use Prooph\EventStore\Exception\RuntimeException;
use Prooph\EventStore\Plugin\Plugin;
use Psr\Container\ContainerInterface;

class DefaultEventStoreFactory implements EventStoreFactory
{
    public function createEventStore(
        string $eventStoreName,
        EventStore $eventStore,
        ActionEventEmitterFactory $actionEventEmitterFactory,
        string $actionEventEmitter,
        bool $wrapActionEventEmitter,
        ContainerInterface $container,
        array $plugins = []
    ): EventStore {
        if ($wrapActionEventEmitter === false) {
            return $eventStore;
        }

        $actionEventEmittingEventStore = $actionEventEmitterFactory::create($eventStore, $actionEventEmitter);

        foreach ($plugins as $pluginAlias) {
            $plugin = $container->get($pluginAlias);

            if (! $plugin instanceof Plugin) {
                throw new RuntimeException(\sprintf(
                    'Plugin %s does not implement the Plugin interface',
                    $pluginAlias
                ));
            }

            $plugin->attachToEventStore($actionEventEmittingEventStore);
        }

        return $actionEventEmittingEventStore;
    }
}

This is the factory for the event store which is adding the plugins.

Looking at the symfony bundle extension this is how the dependencies are injected:

namespace Prooph\Bundle\EventStore\DependencyInjection;

final class ProophEventStoreExtension extends Extension
{
    private function loadEventStore(string $name, array $options, ContainerBuilder $container): void
    {
        $eventStoreId = 'prooph_event_store.'.$name;
        $eventStoreDefinition = $container
            ->setDefinition(
                $eventStoreId,
                new ChildDefinition('prooph_event_store.store_definition')
            )
            ->setArguments(
                [
                    $name,
                    new Reference($options['event_store']),
                    new Reference('prooph_event_store.action_event_emitter_factory'),
                    $options['event_emitter'],
                    $options['wrap_action_event_emitter'],
                    new Reference('prooph_event_store.plugins_locator'),
                ]
            );

This can be compared to the projection manager factory which is missing the arguments:

namespace Prooph\Bundle\EventStore;

class ProjectionManagerFactory
{
    public function createProjectionManager(
        EventStore $eventStore,
        ?PDO $connection = null,
        string $eventStreamsTable = 'event_streams',
        string $projectionsTable = 'projections'
    ): ProjectionManager {

I guess this has to be changed analogous:

class ProjectionManagerFactory
{
    public function createProjectionManager(
        EventStore $eventStore,
        ActionEventEmitterFactory $actionEventEmitterFactory,
        string $actionEventEmitter,
        bool $wrapActionEventEmitter,
        ContainerInterface $container,
        array $plugins = [],
        ?PDO $connection = null,
        string $eventStreamsTable = 'event_streams',
        string $projectionsTable = 'projections'
    ): ProjectionManager { 

But I am not sure about the extension part. This is a suggestion but symfony does not seem to adapt the changes.

final class ProophEventStoreExtension extends Extension
{
    private static function loadProjectionManagers(array $config, ContainerBuilder $container): void
    {
        $projectionManagers = [];
        $projectionManagersLocator = [];
        $projectionManagerForProjectionsLocator = [];
        $projectionsLocator = [];
        $readModelsLocator = [];

        foreach ($config['stores'] as $name => $options) {
        }

        foreach ($config['projection_managers'] as $projectionManagerName => $projectionManagerConfig) {
            $projectionManagerId = "prooph_event_store.projection_manager.$projectionManagerName";
            self::defineProjectionManager($container, $projectionManagerId, $projectionManagerConfig, $options);

            [$projectionManagerForProjectionsLocator, $projectionsLocator, $readModelsLocator] = self::collectProjectionsForLocators(
                $projectionManagerConfig['projections'],
                $projectionManagerId,
                $projectionManagerForProjectionsLocator,
                $projectionsLocator,
                $readModelsLocator
            );

            $projectionManagers[$projectionManagerName] = "prooph_event_store.$projectionManagerName";
            $projectionManagersLocator[$projectionManagerName] = new Reference($projectionManagerId);
        }

        $container->setParameter('prooph_event_store.projection_managers', $projectionManagers);

        self::defineServiceLocator($container, 'prooph_event_store.projection_managers_locator', $projectionManagersLocator);
        self::defineServiceLocator($container, 'prooph_event_store.projection_manager_for_projections_locator', $projectionManagerForProjectionsLocator);
        self::defineServiceLocator($container, 'prooph_event_store.projection_read_models_locator', $readModelsLocator);
        self::defineServiceLocator($container, 'prooph_event_store.projections_locator', $projectionsLocator);
    }

    private static function defineProjectionManager(ContainerBuilder $container, string $serviceId, array $config, array $options): void
    {
        $projectionManagerDefinition = new ChildDefinition('prooph_event_store.projection_definition');
        $projectionManagerDefinition
            ->setFactory([new Reference('prooph_event_store.projection_factory'), 'createProjectionManager'])
            ->setArguments([
                new Reference($config['event_store']),
                new Reference('prooph_event_store.action_event_emitter_factory'),
                $options['event_emitter'],
                $options['wrap_action_event_emitter'],
                $container->getDefinition('prooph_event_store.store_definition'),
                $container->findTaggedServiceIds('prooph_event_store.plugin'), 
                isset($config['connection']) ? new Reference($config['connection']) : null,
                $config['event_streams_table'],
                $config['projections_table'],
            ]);

        $container->setDefinition($serviceId, $projectionManagerDefinition);
    }

Maybe someone can confirm that is the right place for a change and clear things up for the injection?

codeliner commented 3 years ago

@webdevilopers It looks like the right place, yes. I'm not an expert with this symfony stuff, but if it works for you I'm happy to review and merge a PR.

webdevilopers commented 3 years ago

I'm having problems with wiring the dependencies. I think @unixslayer originally committed the extension class.

@unixslayer Could you fix this wiring mentioned here?

        $projectionManagerDefinition
            ->setFactory([new Reference('prooph_event_store.projection_factory'), 'createProjectionManager'])
            ->setArguments([
                new Reference($config['event_store']),
                new Reference('prooph_event_store.action_event_emitter_factory'),
                $options['event_emitter'],
                $options['wrap_action_event_emitter'],
                $container->getDefinition('prooph_event_store.store_definition'),
                $container->findTaggedServiceIds('prooph_event_store.plugin'), 
                isset($config['connection']) ? new Reference($config['connection']) : null,
                $config['event_streams_table'],
                $config['projections_table'],
            ]);

I'm also not sure about the store config. Originally every event-store is looped and configured:

    private function loadEventStores(string $class, array $config, ContainerBuilder $container): void
    {
        $eventStores = [];

        foreach (\array_keys($config['stores']) as $name) {
            $eventStores[$name] = 'prooph_event_store.'.$name;
        }
        $container->setParameter('prooph_event_store.stores', $eventStores);

        $def = $container->getDefinition('prooph_event_store.store_definition');
        $def->setClass($class);

        foreach ($config['stores'] as $name => $options) {
            $this->loadEventStore($name, $options, $container);
        }
    }

I'm not sure this is required for all projection managers too. For a quick testing I took the default store to get the options - ugly:

        foreach ($config['stores'] as $name => $options) {
        }

        foreach ($config['projection_managers'] as $projectionManagerName => $projectionManagerConfig) {
            $projectionManagerId = "prooph_event_store.projection_manager.$projectionManagerName";
            self::defineProjectionManager($container, $projectionManagerId, $projectionManagerConfig, $options);
unixslayer commented 3 years ago

@webdevilopers do you have a fork with those changes where I could look at it and fix the wiring?

webdevilopers commented 3 years ago

Sorry no, I tried in a project locally. But all I changes are inside the ProophEventStoreExtension class and mentioned above. Nothing else was needed. Hope it helps.

unixslayer commented 3 years ago

@webdevilopers looking at your configuration I assume that you are using prooph/event-store-symfony-bundle@0.8.0, right? prooph/event-sourcing dependency was dropped few months ago, but not yet released. Due to that change bundle will no longer manage/register repositories. Can you try to switch to master branch of a bundle and see if your problem still appears?

unixslayer commented 3 years ago

@webdevilopers before changing to dev-master, please change projection_manager as follows:

prooph_event_store:
    stores:
        default:
            ...

    projection_managers:
        default_projection_manager:
            event_store: 'prooph_event_store.default'
            connection: '@app.event_store.pdo_connection.pgsql'
            projections:
                visit_details:
                    read_model: Trexxon\Visit\Infrastructure\Projection\MongoVisitDetailsReadModel
                    projection: Trexxon\Visit\Infrastructure\Projection\MongoVisitDetailsProjection

What is happening here is that EventStore is passed into ProjectionManager by reference but ... bundle wraps every EventStore defined under prooph_event_store.stores section with decorator and registers the decorator as prooph_event_store.%name%. Plugins are attached only to decorator and using Prooph\EventStore\EventStore as a reference makes bundle to use basic service, not the decorator.

It works for repositories on 0.8.0, because they are registered dynamically by bundle using correct reference ID. When you use a reference to EventStore you should use dynamic service ID which is prooph_event_store.%name%.

I've started to make some changes into the bundle few months ago but was unable to finish them. I'll find some time in the future, to make this bundle more robust and introduce some more changes.

webdevilopers commented 3 years ago

Thanks for your feedback @unixslayer . Using dev-master I get the following config error:

Unrecognized option "repositories" under "prooph_event_store.stores.default". Available options are "event_emitter", "event_store", "wrap_action_event_emitter".
prooph_event_store:
  stores:
    default:
      event_emitter: Prooph\Common\Event\ProophActionEventEmitter
      event_store: '@app.event_store.default'
      repositories:
        account_collection:
          repository_class: Trexxon\Account\Infrastructure\Persistence\PgsqlAccountEventStoreRepository
          aggregate_type: Trexxon\Account\Domain\Model\Account\Account
          aggregate_translator: Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator
          stream_name: 'account_stream'

What has changed here?

unixslayer commented 3 years ago

@webdevilopers like I've said before, on dev-master dependency for prooph/event-sourcing was dropped. But you should be ok injecting EventStore dependency into projection_manager using prooph_event_store.default reference, not Prooph\EventStore\EventStore.

webdevilopers commented 3 years ago

Ah, got it! Is there any doc how to configure the repositories instead?

I now injected the event store:

prooph_event_store:
  stores:
    default:
      event_emitter: Prooph\Common\Event\ProophActionEventEmitter
      #wrap_action_event_emitter: false
      event_store: '@app.event_store.default'

services:

    Prooph\EventStore\EventStore: '@app.event_store.default'

    app.event_store.default:
        class: Prooph\EventStore\Pdo\PostgresEventStore
        arguments:
            - '@prooph_event_store.message_factory'
            - '@app.event_store.pdo_connection.pgsql'
            - '@app.event_store.pgsql.persistence_strategy'

  Trexxon\Common\Infrastructure\Prooph\EventStore\PersonalDataPlugin:
    arguments: [ '@Trexxon\Common\Infrastructure\Prooph\EventStore\PgsqlPersonalDataStorage' ]
    tags:
      - { name: 'prooph_event_store.default.plugin' }

  Prooph\EventStore\Plugin\UpcastingPlugin:
    arguments: ['@Trexxon\Common\Infrastructure\Prooph\EventStore\PersonalDataUpcaster']
    tags:
      - { name: 'prooph_event_store.default.plugin' }

  Trexxon\Visit\Infrastructure\Projection\MongoVisitDetailsProjection:
    arguments:
      - '@app.event_store.default'

When running the Projection Manager with this config should the upcaster be recognized?

webdevilopers commented 3 years ago

Ah!

This seems to work:

prooph_event_store:
  stores:
    default:
      event_store: '@app.event_store.default'
  projection_managers:
    default_projection_manager:
      #event_store: '@app.event_store.default'
      event_store: 'prooph_event_store.default'
webdevilopers commented 3 years ago

I*m getting it. I would check my upcaster now.

unixslayer commented 3 years ago

@webdevilopers this should be it.

As for the repository it can be defined just like any other service. Just remember to pass EventStore by the same name as for projection manager.

In your case it would be something like this:

services:
    Trexxon\Visit\Infrastructure\Persistence\PgsqlVisitEventStoreRepository:
        arguments:
            $eventStore: '@prooph_event_store.default'
            $aggregateType: Trexxon\Visit\Domain\Model\Visit\Visit
            $aggregateTranslator: Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator
            $streamName: 'visit_stream'

I recommend to implement some abstraction for domain repositories, so configuration would be easier.

unixslayer commented 3 years ago

@codeliner @prolic is there possibility to prepare release of this bundle? I have some additional changes to finish in addition with documentation that describes changes between 0.8.0 and new release, but those should be done by tomorrow.

prolic commented 3 years ago

@unixslayer @webdevilopers Neither Alex nor I are symfony devs, if you want to, I can give you write access to this repository and let you handle releases.

unixslayer commented 3 years ago

@prolic that would be awesome. Please and thank you ;)

webdevilopers commented 3 years ago

Regarding your suggestion @unixslayer :

services:
    Trexxon\Visit\Infrastructure\Persistence\PgsqlVisitEventStoreRepository:
        arguments:
            $eventStore: '@prooph_event_store.default'
            $aggregateType: Trexxon\Visit\Domain\Model\Visit\Visit
            $aggregateTranslator: Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator
            $streamName: 'visit_stream'

It looks like I have to pass the $snapshotStore parameter too.

But what about the $aggregateType? Formerly this was the FQCN of the Aggregate Root - do I have to register them as services? Feels strange since AggregateRepository expects a type of AggregateType.

unixslayer commented 3 years ago

If you want to use dev-master than yes, you have to register those as a service and pass it to the repository

    app.some_aggregate.type:
       class: 'Prooph\EventSourcing\Aggregate\AggregateType'
       factory: [ 'Prooph\EventSourcing\Aggregate\AggregateType', 'fromAggregateRootClass' ]
       arguments:
          - 'App\Domain\Model\SomeAggregate'
    app.some_aggregate.stream:
       class: 'Prooph\EventStore\StreamName'
       arguments:
          - 'some_aggregate_stream'

A bit overhead so I suggest writing abstract repository by yourself, eg. https://github.com/unixslayer/event-store/blob/master/src/AggregateRepository.php

unixslayer commented 3 years ago

@webdevilopers it is also possible to do some adjustments in prooph/event-sourcing repository so it can be used without this overhead.

webdevilopers commented 3 years ago

Ah sure, the factory method has to be called! Another approach would be overwriting the constructor on the repository, right?

class UserRepository extends AggregateRepository implements BaseUserRepository
{
    public function __construct(EventStore $eventStore, SnapshotStore $snapshotStore)
    {
        parent::__construct(
            $eventStore,
            AggregateType::fromAggregateRootClass(User::class),
            new AggregateTranslator(),
            $snapshotStore,
            null,
            true
        );
    }
webdevilopers commented 3 years ago

@webdevilopers it is also possible to do some adjustments in prooph/event-sourcing repository so it can be used without this overhead.

Yes indeed that would simplify the configuration.

unixslayer commented 3 years ago

@webdevilopers all is correct although, if you will stay on 0.8.0, you don't have to do anything really beside injecting event store using prooph_event_store.<name> ID

webdevilopers commented 3 years ago

Thanks for @unixslayer for clearing things up.

The issue is caused when not using the decorated event-store:

prooph_event_store:
  stores:
    default:
      event_store: '@app.event_store.default'

  projection_managers:
    default_projection_manager:
      event_store: '@app.event_store.default'
      connection: '@app.event_store.pdo_connection.pgsql'
      projections:
        visit_details:
          read_model: Trexxon\Visit\Infrastructure\Projection\MongoVisitDetailsReadModel
          projection: Trexxon\Visit\Infrastructure\Projection\MongoVisitDetailsProjection

The solution is to reference the decorated event-store for the projection managers instead:

prooph_event_store:
  stores:
    default:
      event_store: '@app.event_store.default'

  projection_managers:
    default_projection_manager:
      #event_store: '@app.event_store.default'
      event_store: 'prooph_event_store.default'
webdevilopers commented 3 years ago

@unixslayer Are there any symfony flex recipes or docs that need to be updated to make this clear to other users?

E.g.:

https://github.com/symfony/recipes-contrib/blob/master/prooph/event-store-symfony-bundle/0.4/config/packages/prooph_event_store.yaml

unixslayer commented 3 years ago

@webdevilopers I have a PR almost ready to merge with dosc updated.

https://github.com/prooph/event-store-symfony-bundle/pull/74

webdevilopers commented 2 years ago

@unixslayer Are there any symfony flex recipes or docs that need to be updated to make this clear to other users?

@unixslayer Recently I stumbled on this issue again. The linked recipe did not work for me:

https://github.com/symfony/recipes-contrib/blob/master/prooph/event-store-symfony-bundle/0.4/config/packages/prooph_event_store.yaml

Instead it had to look like this:

services:
    _defaults:
        public: false

    #Prooph\EventStore\EventStore: '@app.event_store.default'
    Prooph\EventStore\EventStore: '@prooph_event_store.default'

Otherwise Symfony will autowire the non-decorated Event Store which does NOT have enrichers or plugins registered.

Our versions:

        "prooph/event-sourcing": "5.7.0",
        "prooph/event-store-bus-bridge": "3.4.0",
        "prooph/event-store-symfony-bundle": "0.9.1",
        "prooph/pdo-event-store": "1.13.0",
unixslayer commented 2 years ago

@webdevilopers when did you run into that problem? During ongoing development or at new installation?

webdevilopers commented 2 years ago

It was a new installation. We were starting with some aggregates and plugins did seem to work. Then we tried to add upcasting but they would not be recognized. I think it was a lot of back and fourth - some symfony cache issues maybe to - at the end the solution above was the only one that would always work for both - plugins and upcasters.

webdevilopers commented 1 year ago

I started a new project with the following dependencies:

    "require": {
        "php": ">=8.2.1",
        "prooph/event-sourcing": "^5.7",
        "prooph/event-store-symfony-bundle": "*",
        "prooph/pdo-event-store": "^1.15",
        "symfony/console": "6.2.*",
        "symfony/dotenv": "6.2.*",
        "symfony/flex": "^2",
        "symfony/framework-bundle": "6.2.*",
        "symfony/runtime": "6.2.*",
        "symfony/translation": "6.2.*",
        "symfony/validator": "6.2.*",
        "symfony/yaml": "6.2.*"
    },

I add the prooph recipes:

Then I add a metadata enricher using autowiring:

#[Autoconfigure(tags: ['prooph_event_store.default.metadata_enricher'])]
final readonly class AuthenticatedEventMetadataEnricher implements MetadataEnricher
{
    public function __construct(private TokenStorageInterface $tokenStorage)
    {
    }

    public function enrich(Message $message): Message
    {
        ...
    }
}

The enricher ist not recognized. Then I remembered my last comment on this issue.

I changed the line of this recipe: https://github.com/symfony/recipes-contrib/blob/main/prooph/pdo-event-store/1.7/config/packages/prooph_pdo_event_store.yaml#L5

To (as suggested in my last comment above):

services:
    _defaults:
        public: false

    #Prooph\EventStore\EventStore: '@app.event_store.default'
    Prooph\EventStore\EventStore: '@prooph_event_store.default'

Now the enricher will be recognized.

I guess this has something to do with Symfony trying to autowire the "alias" for the Prooph\EventStore\EventStore interface.

Dumping the event store e.g. inside an AggregateRepository implementation before my change you get the following instance:

event-store-before

After the change:

event-store-after

Does this make sense to you @unixslayer?

unixslayer commented 1 year ago

@webdevilopers looks ok. I'll look into that and get back to you.

unixslayer commented 1 year ago

@webdevilopers I've opened PR into the recipes repository. The solution is correct however, it is not the right place for it.

Prooph\EventStore\EventStore should indeed alias instance which wraps the internal event store client which gets registered by bundle package due it is the proper place to define an alias.

If one does not use a bundle, the alias will not be registered and the developer will be required to define it on its own.

I'll close this issue after PR is merged https://github.com/symfony/recipes-contrib/pull/1479

Also, if you define more than one event store, e.g.

prooph_event_store:
    stores:
        default:
            event_store: 'app.event_store.default'
        another:
            event_store: 'app.event_store.another'

You may tag general plugins (not associated with specific event store) with prooph_event_store.metadata_enricher. Without a service name (default nor another) plugins which are tagged like that will be applied to every event store defined in the bundle configuration.

webdevilopers commented 1 year ago

Great work @unixslayer! :clap: And thanks for the hint.

unixslayer commented 1 year ago

Closing due https://github.com/symfony/recipes-contrib/pull/1479 was merged