ecotoneframework / ecotone-dev

Ecotone Framework Development - This is Monorepo which contains all official public modules
https://docs.ecotone.tech
Other
37 stars 16 forks source link

Separate db connection for query bus #332

Closed lifinsky closed 1 month ago

lifinsky commented 3 months ago

Description
Provide the ability for query handlers to set a separate database connection (practical case: read connection from a replica). Within command handlers or inside transactions, use the main connection for query bus.

Example

#[QueryHandler] 
public function getTicket(GetTicketById $query) : array

Use a separate read connection if specified.

If we use the query bus from a command handler, automatically use the connection for the current transaction.

In Ecotone configuration:

final readonly class EcotoneConfiguration
{
    #[ServiceContext]
    public function dbalConfiguration(): array
    {
        return [
            SymfonyConnectionReference::defaultManagerRegistry('default'),
            SymfonyConnectionReference::defaultManagerRegistry('read'),
        ]:
    }
}

Or use additional optional argument bool $read for explicitly specifying the read connection.

dgafka commented 3 months ago

Hey @lifinsky

What database is used is out of the scope for Message Handlers. They are responsible for taking care of the flow, and what is done inside Handler is application level concern.

However you can leverage, existing functionality to do it. Make use of Reference to inject correct Service:

#[QueryHandler] 
public function getTicket(
    GetTicketById $query,
    #[Reference('read_db')] Connection $connection,
) : array

or use Multi Tenant support to inject correct db based on the context. You can read more about that here

lifinsky commented 3 months ago

Query handlers in projections and aggregates are boilerplate code in your concept. Multi tenant is not correct example for this issue.

lifinsky commented 3 months ago

My idea is to replace the default connection for all query handlers and do not specify the connection for reading in projections and aggregates

dgafka commented 3 months ago

Query handlers in projections and aggregates are boilerplate code in your concept. (...) My idea is to replace the default connection for all query handlers and do not specify the connection for reading in projections and aggregates

I am not sure I follow. Can you explain with examples how do use see it being used?

Multi tenant is not correct example for this issue.

Naming that feature Multi-tenant was unfortunate. This allow for context specific injections of Connections, which makes it an valid use case for Multi-Tenant architecture, but not only.
I may deprecate current naming and introduce new one.

lifinsky commented 3 months ago
#[Aggregate]
class Ticket
{
    #[Identifier]
    private Uuid $ticketId;
    private string $assignedTo;

    #[QueryHandler("ticket.get_assigned_person")]
    public function getAssignedTo(): string
    {
       return $this->assignedTo;
    }
}

If we call query bus outside command handler, by default we can use read connection.

lifinsky commented 3 months ago
#[Projection("inProgressTicketList", Ticket::class] // 1
class InProgressTicketList
{
    public function __construct(private Connection $connection) {}

    #[EventHandler] // 2
    public function addTicket(TicketWasRegistered $event, array $metadata) : void
    {
        $result = $this->connection->executeStatement(<<<SQL
    INSERT INTO in_progress_tickets VALUES (?,?)
SQL, [$event->getTicketId(), $event->getTicketType()]);
    }

    #[QueryHandler("getInProgressTickets")] // 3
    public function getTickets() : array
    {
        return $this->connection->executeQuery(<<<SQL
    SELECT * FROM in_progress_tickets
SQL)->fetchAllAssociative();
    }    
}

By default switch to read db connection in projection's query handler

dgafka commented 3 months ago

Ecotone does not control DI Services from the Application (how InProgressTicketList is registered in DI). In this example Redis could be used as Read Model for example, which would not change anything in the Ecotone itself.

What Ecotone can control is execution of given method however. Therefore what I can propose are the solutions I gave above, to inject different implementation. You could also build your own customized injection with Reference attribute and expression language :)

lifinsky commented 3 months ago

Ok, I agree, but how to affect which connection to use in the query handler in an aggregate? If the query handler is used in a command handler it should be e.g. rw db connection, but for a query bus outside the command handler it should be read.

dgafka commented 3 months ago

If the query handler is used in a command handler

You mean like you trigger and Query Bus inside your Command Handler to check the logic?

lifinsky commented 3 months ago

Yes, query bus outside aggregate and query bus inside aggregate command handler

dgafka commented 2 months ago

You could wire up your own attribute which would inject specific Connection. The default one keep write and the read one would be used by attribute

#[Attribute(Attribute::TARGET_PARAMETER)]
final class ReadOnlyConnection extends Reference
{
    public function __construct()
    {
        parent::__construct('dbal.connection.read_only');
    }
}

----

#[QueryHandler('customer.getAllRegistered')]
public function getAllRegisteredPersonIds(
    #[ReadOnlyConnection] Connection $connection
): array {
    return $connection->executeQuery(<<<SQL
                    SELECT customer_id FROM persons;
        SQL)->fetchFirstColumn();
}
lifinsky commented 2 months ago

But inside current transaction it should use current connection (for example in command handler - write instead of read).

dgafka commented 2 months ago

You could wire up Service an do something like that:

class ConnectionResolver
{
      public function resolve(): Connection
      {
           if ($this->connection->isInTranscation()) {
                return $this->writeConnection;
           }

            return $this->readConnection;
      }
}

And then:

#[Attribute(Attribute::TARGET_PARAMETER)]
final class ContextualConnection extends Reference
{
    public function __construct()
    {
        parent::__construct(expression:"connectionResolver.resolve");
    }
}

Above creates ready to use attribute, that now can be used anywhere

#[QueryHandler('customer.getAllRegistered')]
public function getAllRegisteredPersonIds(
    #[ContextualConnection] Connection $connection
): array {
    return $connection->executeQuery(<<<SQL
                    SELECT customer_id FROM persons;
        SQL)->fetchFirstColumn();
}
dgafka commented 1 month ago

Closing as proposal was given that can be built from Ecotone components (however this is too application specific to be part of the Framework).