pablocastilla / NServiceBus.Kafka

Kafka transport for NServiceBus
Apache License 2.0
19 stars 8 forks source link

JSON parsing error #16

Closed silkfire closed 7 years ago

silkfire commented 7 years ago

I have a simple application set up to test NServiceBus with Kafka. Finally set up ZooKeeper and Kakfa correctly.

But when running my .NET application I'm getting an error about that it couldn't parse the JSON correctly. What gives? See screenshot.

http://imgur.com/a/wD50W

pablocastilla commented 7 years ago

Hi!

I have seen that error from time to time and I am not able to reproduce it consistently.

Could you please send me the class and values you are using in it to reproduce the problem? That way I will be able to fix it right away.

Thanks

silkfire commented 7 years ago

Sure where would you like me to send the class file?

pablocastilla commented 7 years ago

I think you can attach it here. You can drag a drop it

silkfire commented 7 years ago

It doesn't look like .cs files are supported, but I can paste the code here instead:

namespace Client
{
    using Shared;
    using Shared.Messages.Commands;

    using NServiceBus;
    using NServiceBus.Transport.Kafka;

    using System;
    using System.Runtime.CompilerServices;
    using System.Threading.Tasks;

    public class Program
    {
        public static void Main()
        {
            RuntimeHelpers.RunClassConstructor(typeof(Startup).TypeHandle);

            AsyncMain().GetAwaiter().GetResult();
        }

        private static async Task AsyncMain()
        {
            Console.Title = "Client";

            var endpointConfiguration = new EndpointConfiguration("Client");
            endpointConfiguration.UseTransport<KafkaTransport>().ConnectionString("127.0.0.1:9092");

            endpointConfiguration.SendFailedMessagesTo("error");

            //endpointConfiguration.UseSerialization<JsonSerializer>();

            //endpointConfiguration.EnableInstallers();

            endpointConfiguration.UsePersistence<InMemoryPersistence>();

            // Initialize the endpoint with the finished configuration
            var endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);

            try
            {
                await SendOrder(endpointInstance);
            }
            finally
            {
                await endpointInstance.Stop().ConfigureAwait(false);
            }
        }

        private static async Task SendOrder(IEndpointInstance endpointInstance)
        {
            Console.WriteLine("Press ENTER to send a message");
            Console.WriteLine("Press any other key to exit");

            while (true)
            {
                var key = Console.ReadKey();
                Console.WriteLine();

                if (key.Key != ConsoleKey.Enter)
                {
                    return;
                }

                var id = Guid.NewGuid();

                var placeOrder = new PlaceOrderCommand
                {
                    Product = "New shoes",
                    Id = id
                };

                await endpointInstance.Send("Server", placeOrder);

                Console.WriteLine($"Sent a PlaceOrder message with id: {id}");
            }
        }
    }
}
pablocastilla commented 7 years ago

Hi!

I don't have all your code, but I have tested with this and it works. Could you send me the PlaceOrderCommand?

My code:

using NServiceBus;
using NServiceBus.Transport.Kafka;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;

namespace KafkaBugGithub16
{
    public class PlaceOrderCommand : ICommand
    {
        public Guid Id { get; set; }

        public string Product { get; set; }

    }

    public class Program
    {
        public static void Main()
        {

            Console.Title = "Client";

            var endpointConfiguration = new EndpointConfiguration("Client");
            endpointConfiguration.UseTransport<KafkaTransport>().ConnectionString("127.0.0.1:9092");

            endpointConfiguration.SendFailedMessagesTo("error");

            //endpointConfiguration.UseSerialization<JsonSerializer>();

            //endpointConfiguration.EnableInstallers();

            endpointConfiguration.UsePersistence<InMemoryPersistence>();

            // Initialize the endpoint with the finished configuration
            var endpointInstance = Endpoint.Start(endpointConfiguration).Result;

            try
            {
                SendOrder(endpointInstance).Wait();
            }
            finally
            {
                endpointInstance.Stop().Wait();
            }
        }

        private static async Task SendOrder(IEndpointInstance endpointInstance)
        {
            Console.WriteLine("Press ENTER to send a message");
            Console.WriteLine("Press any other key to exit");

            while (true)
            {
                var key = Console.ReadKey();
                Console.WriteLine();

                if (key.Key != ConsoleKey.Enter)
                {
                    return;
                }

                var id = Guid.NewGuid();

                var placeOrder = new PlaceOrderCommand
                {
                    Product = "New shoes",
                    Id = id
                };

                await endpointInstance.Send("Server", placeOrder);

                Console.WriteLine($"Sent a PlaceOrder message with id: {id}");
            }
        }
    }
}
pablocastilla commented 7 years ago

@silkfire Hi! can I close the issue?

Thanks

silkfire commented 7 years ago

@pablocastilla Yeah sure go ahead