awslabs / aws-dotnet-messaging

An AWS-native framework that simplifies the development of .NET message processing applications that use AWS services, such as SQS, SNS, and EventBridge.
https://awslabs.github.io/aws-dotnet-messaging/
Apache License 2.0
76 stars 14 forks source link

_messageConfiguration does not have a valid subscriber mapping for messageTypeIdentifier #160

Open kknd4eva opened 3 days ago

kknd4eva commented 3 days ago

Describe the bug

I have created an SQS publisher and SQS receiver, my publisher works fine but my receiver fails to read messages from the queue with the exception:

fail: AWS.Messaging.Serialization.EnvelopeSerializer[0]
      Failed to create a MessageEnvelope
      System.IO.InvalidDataException: _messageConfiguration does not have a valid subscriber mapping for messageTypeIdentifier 'FargateAPIApp.Features.Orders.CreateOrderCommand'
         at AWS.Messaging.Serialization.EnvelopeSerializer.ConvertToEnvelopeAsync(Message sqsMessage)

My publisher

using Amazon.Extensions.NETCore.Setup;
using FargateOrderProcessor.Entities;
using FargateOrderProcessor.Services;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; // Added for logging configuration

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureAppConfiguration((hostingContext, config) =>
    {
        config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
        config.AddEnvironmentVariables();
    })
    .ConfigureServices((context, services) =>
    {
        AWSOptions awsOptions = context.Configuration.GetAWSOptions();
        services.AddDefaultAWSOptions(awsOptions);
        services.AddAWSMessageBus(builder =>
        {
            builder.AddSQSPoller(context.Configuration.GetValue<string>("SQS_PUBLISH_QUEUE"));
            builder.AddMessageHandler<OrderService, CreateOrderCommand>("salesOrder");
        });
    })
    .ConfigureLogging(logging =>
    {
        // Configure logging to help with debugging
        logging.ClearProviders();
        logging.AddConsole();
        logging.SetMinimumLevel(LogLevel.Debug);
    })
    .Build();

await host.RunAsync();

Handler

    internal sealed class  CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand, int>
    {
        private readonly IMessagePublisher _messagePublisher;
        private readonly ILogger<CreateOrderCommandHandler> _logger;

        public CreateOrderCommandHandler(IMessagePublisher messagePublisher, 
            ILogger<CreateOrderCommandHandler> logger) =>
            (_messagePublisher, _logger) = (messagePublisher, logger);

        public async Task<int> Handle(CreateOrderCommand request, CancellationToken cancellationToken)
        {
            try
            {
                _logger.LogInformation("Publishing to target queue:" + Environment.GetEnvironmentVariable("SQS_PUBLISH_QUEUE"));
                var publishResponse = await _messagePublisher.PublishAsync(request);

                _logger.LogInformation("Message published to queue: {PublishResponse}", publishResponse);

                return (int)HttpStatusCode.Accepted;
            }
            catch (FailedToPublishException publishEx)
            {
                _logger.LogError(publishEx, "Failed to publish message to queue");
                return (int)HttpStatusCode.InternalServerError;
            }
        }
    }

Receiver

using Amazon.Extensions.NETCore.Setup;
using FargateOrderProcessor.Entities;
using FargateOrderProcessor.Services;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; // Added for logging configuration

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureAppConfiguration((hostingContext, config) =>
    {
        config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
        config.AddEnvironmentVariables();
    })
    .ConfigureServices((context, services) =>
    {
        AWSOptions awsOptions = context.Configuration.GetAWSOptions();
        services.AddDefaultAWSOptions(awsOptions);
        services.AddAWSMessageBus(builder =>
        {
            builder.AddSQSPoller(context.Configuration.GetValue<string>("SQS_PUBLISH_QUEUE"));
            builder.AddMessageHandler<OrderService, CreateOrderCommand>("salesOrder");
        });
    })
    .ConfigureLogging(logging =>
    {
        // Configure logging to help with debugging
        logging.ClearProviders();
        logging.AddConsole();
        logging.SetMinimumLevel(LogLevel.Debug);
    })
    .Build();

await host.RunAsync();

Handler

using AWS.Messaging;
using FargateOrderProcessor.Entities;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace FargateOrderProcessor.Services
{
    public class OrderService : IMessageHandler<CreateOrderCommand>
    {
        public Task<MessageProcessStatus> HandleAsync(MessageEnvelope<CreateOrderCommand> messageEnvelope, CancellationToken token = default)
        {
            Console.WriteLine($"Received message {messageEnvelope.Message.Order.OrderId} from Publisher");

            return Task.FromResult(MessageProcessStatus.Success());
        }
    }
}

Message stored in SQS:

{
  "id": "4a3fab51-1c88-47cd-906b-c0e1addc8acb",
  "source": "/AmazonECS/FargateApiStack-FargateApiCluster43BE6E01-LiAxKjmocXNe/b69324230da743c6afc05a018b7d8fac",
  "specversion": "1.0",
  "type": "FargateAPIApp.Features.Orders.CreateOrderCommand",
  "time": "2024-10-01T07:34:56.1989362+00:00",
  "data": "{\u0022Order\u0022:{\u0022OrderId\u0022:\u0022string\u0022,\u0022CustomerId\u0022:\u0022string\u0022,\u0022OrderDate\u0022:\u00222024-10-01T04:50:33.193Z\u0022,\u0022OrderItems\u0022:[{\u0022sku\u0022:0,\u0022productName\u0022:\u0022string\u0022,\u0022quantity\u0022:0,\u0022price\u0022:0,\u0022discount\u0022:0}]}}"
}

Regression Issue

Expected Behavior

Message is pulled from SQS

Current Behavior

Exception occurs:

fail: AWS.Messaging.Serialization.EnvelopeSerializer[0]
      Failed to create a MessageEnvelope
      System.IO.InvalidDataException: _messageConfiguration does not have a valid subscriber mapping for messageTypeIdentifier 'FargateAPIApp.Features.Orders.CreateOrderCommand'
         at AWS.Messaging.Serialization.EnvelopeSerializer.ConvertToEnvelopeAsync(Message sqsMessage)

Reproduction Steps

Code contained in description

Possible Solution

No response

Additional Information/Context

No response

AWS.Messaging (or related) package versions

AWS Messaging framework 0.9.2

Targeted .NET Platform

.NET 8

Operating System and version

Windows 10

ashishdhingra commented 3 days ago

Needs reproduction.