eventflow / EventFlow

Async/await first CQRS+ES and DDD framework for .NET
https://geteventflow.net
Other
2.39k stars 445 forks source link

两个领域业务, CustomerAggregate, ProductAggregate 两个领域事件都要去更新CustomerProductReadModel读模型如何处理? #1020

Closed GreatGodJackChen closed 8 months ago

GreatGodJackChen commented 8 months ago
我有两个领域业务, CustomerAggregate需要实现添加客户事件,添加客户事件的同时需要生成客户对应的客户商品读模型
ProductAggregate需要实现添加商品事件,添加商品事件同时需要生成客户对应的客户商品读模型,这样的业务我应该如何实现呢?
public class CustomerAggregate{
    public Guid CustomerId { get; set; }
}
public class AddCustomerEvent{}
public class ProductAggregate{
    public Guid ProductId { get; set; }
}
public class AddProductEvent {}

//添加客户需要实现CustomerProductReadModel
//添加商品同样需要实现CustomerProductReadModel
public class CustomerProductReadModel {
    public Guid CustomerId { get; set; }
    public Guid ProductId { get; set; }
}
loyldg commented 8 months ago

[JsonConverter(typeof(SystemTextJsonSingleValueObjectConverter<CustomerId>))]
public class CustomerId(string value) : Identity<CustomerId>(value);

[JsonConverter(typeof(SystemTextJsonSingleValueObjectConverter<ProductId>))]
public class ProductId(string value) : Identity<ProductId>(value);

public class CustomerAggregate(CustomerId id) : AggregateRoot<CustomerAggregate, CustomerId>(id);
public class ProductAggregate(ProductId id) : AggregateRoot<ProductAggregate, ProductId>(id);
public class AddCustomerEvent(Guid productId, Guid customerId) : AggregateEvent<CustomerAggregate, CustomerId>
{
    public Guid ProductId { get; } = productId;
    public Guid CustomerId { get; } = customerId;
}

public class AddProductEvent(Guid productId) : AggregateEvent<ProductAggregate, ProductId>
{
    public Guid ProductId { get; } = productId;
}

public class CustomerProductReadModel : IReadModel,
    IAmReadModelFor<CustomerAggregate, CustomerId, AddCustomerEvent>,
    IAmReadModelFor<ProductAggregate, ProductId, AddProductEvent>
{
    public Guid ProductId { get; private set; }
    public Guid CustomerId { get; private set; }

    public Task ApplyAsync(IReadModelContext context, IDomainEvent<CustomerAggregate, CustomerId, AddCustomerEvent> domainEvent, CancellationToken cancellationToken)
    {
        CustomerId = domainEvent.AggregateEvent.CustomerId;

        return Task.CompletedTask;
    }

    public Task ApplyAsync(IReadModelContext context, IDomainEvent<ProductAggregate, ProductId, AddProductEvent> domainEvent, CancellationToken cancellationToken)
    {
        ProductId = domainEvent.AggregateEvent.ProductId;

        return Task.CompletedTask;
    }
}

public interface ICustomerProductReadModelLocator : IReadModelLocator
{
}

public class CustomerProductReadModelLocator : ICustomerProductReadModelLocator
{
    public IEnumerable<string> GetReadModelIds(IDomainEvent domainEvent)
    {
        var aggregateEvent = domainEvent.GetAggregateEvent();
        switch (aggregateEvent)
        {
            case AddCustomerEvent addCustomerEvent:
                yield return addCustomerEvent.ProductId.ToString();
                break;

            case AddProductEvent addProductEvent:
                yield return addProductEvent.ProductId.ToString();
                break;
        }
    }
}
eventFlowOptions.ServiceCollection
    .AddTransient<ICustomerProductReadModelLocator, CustomerProductReadModelLocator>();
eventFlowOptions.UseInMemoryReadStoreFor<CustomerProductReadModel, ICustomerProductReadModelLocator>();
GreatGodJackChen commented 8 months ago

@loyldg 使用的MongoDB读模型存储 读模型代码如下 public class CustomerProductReadModel : IMongoDbReadModel, IAmReadModelFor<CustomerAggregate, CustomerId, AddCustomerEvent>, IAmReadModelFor<ProductAggregate, ProductId, AddProductEvent> { public Guid ProductId { get; private set; }

public string ProductName { get; private set; } public Guid CustomerId { get; private set; }

public string CustomerName { get; private set; }

public string Id { get; private set; }

public long? Version { get; set; }

public void Apply(IReadModelContext context, IDomainEvent<CustomerAggregate, CustomerId, AddCustomerEvent> domainEvent) { //Id= domainEvent.AggregateIdentity.Value; Id = domainEvent.AggregateEvent.ProductId.ToString(); CustomerId = domainEvent.AggregateEvent.CustomerId; ProductId = domainEvent.AggregateEvent.ProductId; CustomerName= domainEvent.AggregateEvent.CustomerName; }

public void Apply(IReadModelContext context, IDomainEvent<ProductAggregate, ProductId, AddProductEvent> domainEvent) { //Id= domainEvent.AggregateIdentity.Value; Id = domainEvent.AggregateEvent.ProductId.ToString(); ProductId = domainEvent.AggregateEvent.ProductId; ProductName = domainEvent.AggregateEvent.ProductName; } } 定位器代码如下: public class CustomerProductReadModelLocator : ICustomerProductReadModelLocator { public IEnumerable GetReadModelIds(IDomainEvent domainEvent) { var aggregateEvent = domainEvent.GetAggregateEvent(); switch (aggregateEvent) { case AddCustomerEvent addCustomerEvent: yield return addCustomerEvent.ProductId.ToString(); break;

    case AddProductEvent addProductEvent:
        yield return addProductEvent.ProductId.ToString();
        break;
}

} } 入参: image 读模型结果: image

GreatGodJackChen commented 8 months ago

读模型CustomerId 与添加事件的CustomerId对应不上

loyldg commented 8 months ago
  1. Clear all the test data,run the test application again and check if the CustomerId is the same as the CustomerId in AddCustomerEvent
  2. You can set a breakpoint in the Apply method of CustomerProductReadModel to see if the data is the same as in AddCustomerEvent
GreatGodJackChen commented 8 months ago

@loyldg

I deleted the database and re-validated it. Same error AddCustomerEvent is correct the first time, debugging apply data is the same, but the read model results table is not correct

GreatGodJackChen commented 8 months ago

The CustomerId corresponding to the read model never appeared in the event. Where did it come from

loyldg commented 8 months ago

Can you post all your test code?

loyldg commented 8 months ago

I use the following code to test and it works ok

var builder = Host.CreateDefaultBuilder(args);

builder.ConfigureServices(services =>
{
    services.AddTransient<ICustomerProductReadModelLocator, CustomerProductReadModelLocator>();
    services.AddEventFlow(options =>
    {
        options.AddDefaults(typeof(CustomerAggregate).Assembly);
        //options.UseInMemorySnapshotPersistence();
        options.UseMongoDbEventStore();

        options.UseMongoDbReadModel<CustomerProductReadModel, ICustomerProductReadModelLocator>();

        options.ConfigureMongoDb("mongodb://localhost:27017", "test-db1");
    });

    services.AddHostedService<TestBackgroundService>();
});

await builder.Build().RunAsync();

public class CustomerId(string value) : Identity<CustomerId>(value);

public class ProductId(string value) : Identity<ProductId>(value);

public class CustomerAggregate(CustomerId id) : AggregateRoot<CustomerAggregate, CustomerId>(id),
    IApply<CustomerAddedEvent>
{
    public void Add(Guid productId, Guid customerId)
    {
        Emit(new CustomerAddedEvent(productId, customerId));
    }

    public void Apply(CustomerAddedEvent aggregateEvent)
    {

    }
}

public class ProductAggregate(ProductId id) : AggregateRoot<ProductAggregate, ProductId>(id),
    IApply<ProductAddedEvent>
{
    public void Add(Guid productId)
    {
        Emit(new ProductAddedEvent(productId));
    }

    public void Apply(ProductAddedEvent aggregateEvent)
    {

    }
}
public class CustomerAddedEvent(Guid productId, Guid customerId) : AggregateEvent<CustomerAggregate, CustomerId>
{
    public Guid ProductId { get; } = productId;
    public Guid CustomerId { get; } = customerId;
}

public class ProductAddedEvent(Guid productId) : AggregateEvent<ProductAggregate, ProductId>
{
    public Guid ProductId { get; } = productId;
}

public class CustomerProductReadModel : IMongoDbReadModel,
    IAmReadModelFor<CustomerAggregate, CustomerId, CustomerAddedEvent>,
    IAmReadModelFor<ProductAggregate, ProductId, ProductAddedEvent>
{
    public string ProductId { get; private set; } = default!;
    public string CustomerId { get; private set; } = default!;

    public Task ApplyAsync(IReadModelContext context, IDomainEvent<CustomerAggregate, CustomerId, CustomerAddedEvent> domainEvent, CancellationToken cancellationToken)
    {
        Id = domainEvent.AggregateEvent.ProductId.ToString();
        CustomerId = domainEvent.AggregateEvent.CustomerId.ToString();

        return Task.CompletedTask;
    }

    public Task ApplyAsync(IReadModelContext context, IDomainEvent<ProductAggregate, ProductId, ProductAddedEvent> domainEvent, CancellationToken cancellationToken)
    {
        Id = domainEvent.AggregateEvent.ProductId.ToString();
        ProductId = domainEvent.AggregateEvent.ProductId.ToString();

        return Task.CompletedTask;
    }

    public string Id { get; private set; } = null!;
    public long? Version { get; set; }
}

public interface ICustomerProductReadModelLocator : IReadModelLocator
{
}

public class CustomerProductReadModelLocator : ICustomerProductReadModelLocator
{
    public IEnumerable<string> GetReadModelIds(IDomainEvent domainEvent)
    {
        var aggregateEvent = domainEvent.GetAggregateEvent();
        switch (aggregateEvent)
        {
            case CustomerAddedEvent customerAddedEvent:
                yield return customerAddedEvent.ProductId.ToString();
                break;

            case ProductAddedEvent productAddedEvent:
                yield return productAddedEvent.ProductId.ToString();
                break;
        }
    }
}

public class AddCustomerCommand : Command<CustomerAggregate, CustomerId, IExecutionResult>
{
    public Guid ProductId { get; }
    public Guid CustomerId { get; }

    public AddCustomerCommand(CustomerId aggregateId, Guid productId, Guid customerId) : base(aggregateId)
    {
        ProductId = productId;
        CustomerId = customerId;
    }
}

public class AddCustomerCommandHandler : CommandHandler<CustomerAggregate, CustomerId, AddCustomerCommand>
{
    public override Task ExecuteAsync(CustomerAggregate aggregate, AddCustomerCommand command, CancellationToken cancellationToken)
    {
        aggregate.Add(command.ProductId, command.CustomerId);

        return Task.CompletedTask;
    }
}

public class AddProductCommand : Command<ProductAggregate, ProductId, IExecutionResult>
{
    public Guid ProductId { get; }

    public AddProductCommand(ProductId aggregateId, Guid productId) : base(aggregateId)
    {
        ProductId = productId;
    }
}

public class AddProductCommandHandler : CommandHandler<ProductAggregate, ProductId, AddProductCommand>
{
    public override Task ExecuteAsync(ProductAggregate aggregate, AddProductCommand command, CancellationToken cancellationToken)
    {
        aggregate.Add(command.ProductId);

        return Task.CompletedTask;
    }
}

public record GetCustomerProductReadModelQuery(string ProductId) : IQuery<CustomerProductReadModel>;

public class
    GetCustomerProductReadModelQueryHandler(IMongoDbReadModelStore<CustomerProductReadModel> store) : IQueryHandler<GetCustomerProductReadModelQuery, CustomerProductReadModel>
{
    public async Task<CustomerProductReadModel> ExecuteQueryAsync(GetCustomerProductReadModelQuery query, CancellationToken cancellationToken)
    {
        return await (await store.FindAsync(p => p.ProductId == query.ProductId, cancellationToken: cancellationToken)).FirstOrDefaultAsync(cancellationToken: cancellationToken);
    }
}

public class TestBackgroundService(ICommandBus commandBus, IQueryProcessor queryProcessor) : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var productId = Guid.NewGuid();
        var customerId = Guid.NewGuid();
        var addProductCommand = new AddProductCommand(ProductId.New, productId);
        await commandBus.PublishAsync(addProductCommand, default);

        var addCustomerCommand = new AddCustomerCommand(CustomerId.New, productId, customerId);
        await commandBus.PublishAsync(addCustomerCommand, default);

        var readModel = await queryProcessor.ProcessAsync(new GetCustomerProductReadModelQuery(productId.ToString()), default);
        Console.WriteLine($"readModel.CustomerId==customerId:{readModel.CustomerId == customerId.ToString()}");

        var customerId2 = Guid.NewGuid();
        var addCustomerCommand2 = new AddCustomerCommand(CustomerId.New, productId, customerId2);
        await commandBus.PublishAsync(addCustomerCommand2, default);

        var readModel2 = await queryProcessor.ProcessAsync(new GetCustomerProductReadModelQuery(productId.ToString()), default);
        Console.WriteLine($"readModel2.CustomerId==customerId2:{readModel2.CustomerId == customerId2.ToString()}");

    }
}
GreatGodJackChen commented 8 months ago

@loyldg guid converted to binary problem in MongoDB ,Thank you for your help