fsprojects / pulsar-client-dotnet

Apache Pulsar native client for .NET (C#/F#/VB)
MIT License
301 stars 47 forks source link

Schema expects strings to be nullable #275

Closed balticore closed 1 month ago

balticore commented 1 month ago

When using schema in producers it seems to expect string fields to be nullable even though the class does not define it.

An example

Uploading basic schema to Pulsar topic through admin api:

Raw

{
 "type": "JSON",
 "schema": "{\"type\":\"record\",\"name\":\"UserSignedUp\",\"namespace\":\"Producer\",\"doc\":\"test\",\"fields\":[{\"name\":\"userId\",\"type\":\"int\"},{\"name\":\"userEmail\",\"type\":\"string\"}]}",
 "properties": {}
}

Formatted

{
  "type": "JSON",
  "schema": {
    "type": "record",
    "name": "UserSignedUp",
    "namespace": "Producer",
    "doc": "test",
    "fields": [
      {
        "name": "userId",
        "type": "int"
      },
      {
        "name": "userEmail",
        "type": "string"
      }
    ]
  },
  "properties": {}
}

Creating producer and sending messages:

namespace Producer
{
    public class UserSignedUp
    {
        public int userId { get; set; }
        public string userEmail { get; set; }
    }   

    public static class SchemaExample
    {
        public static async Task RunSchemaProducer(IProducer<UserSignedUp> producer)
        {
            var cts = new CancellationTokenSource();

            await Task.Run(async () =>
            {
                while (!cts.IsCancellationRequested)
                {
                    var schemaExample = new UserSignedUp
                    {
                        userId = 123, 
                        userEmail = "foo",
                    };

                    try
                    {
                        var messageId = await SendMessage(producer, schemaExample);
                        Console.WriteLine($"Sent message with ID {messageId}");
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine($"Threw exception sending message: {e.Message}");
                    }

                    await Task.Delay(1000, cts.Token);
                }
            }, cts.Token);
        }

        public static async Task<PulsarClient> CreateClient(ISettings configuration)
        {
            var authentication = await AuthFactory.GenerateAuthentication(configuration);
            var client = await new PulsarClientBuilder()
                .ServiceUrl($"pulsar+ssl://{configuration.PulsarServerUrl}:{configuration.PulsarPort}")
                .Authentication(authentication)
                .BuildAsync();
            return client;
        }

        public static async Task<IProducer<T>> CreateSchemaProducer<T>(ISettings configuration)
        {
            var client = await CreateClient(configuration);

            var topic =
                $"persistent://{configuration.PulsarTenant}/{configuration.PulsarNamespace}/{configuration.PulsarTopicName}";
            var producer = await client.NewProducer(Schema.JSON<T>())
                .ProducerName($"{configuration.AzureClientId}-Producer")
                .Topic(topic)
                .CreateAsync();

            return producer;
        }

        public static async Task<MessageId> SendMessage<T>(IProducer<T> producer, T message)
        {
            return await producer.SendAsync(message);
        }
    }
}

This throws

Unhandled exception. Pulsar.Client.Api.IncompatibleSchemaException: org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Schema not found and schema auto updating is disabled. caused by org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Schema not found and schema auto updating is disabled.

Which does make sense since when instead of uploading schema through Pulsar admin rest api isAllowAutoUpdateSchema is set to true and instead schema is uploaded when creating producer from the same code, admin endpoint returns:

Raw

{
    "version": 64,
    "type": "JSON",
    "timestamp": 1723629634102,
    "data": "{\"namespace\":\"Producer\",\"name\":\"UserSignedUp\",\"type\":\"record\",\"fields\":[{\"name\":\"userId\",\"type\":\"int\"},{\"name\":\"userEmail\",\"type\":[\"null\",\"string\"],\"default\":null}]}",
    "properties": {}
}

Formatted

{
  "version": 64,
  "type": "JSON",
  "timestamp": 1723629634102,
  "data": {
    "namespace": "Producer",
    "name": "UserSignedUp",
    "type": "record",
    "fields": [
      {
        "name": "userId",
        "type": "int"
      },
      {
        "name": "userEmail",
        "type": [
          "null",
          "string"
        ],
        "default": null
      }
    ]
  },
  "properties": {}
}

Note that userEmail now is type [null, string] and has default null though it is not defined as nullable in the class.

Also just a note that when autogenerated schema from client library is returned from admin endpoint and reuploaded through admin endpoint again (string types become nullable) then even if auto updates are disabled it works as expected. This indicates that when using current client library it indeed expects strings to be nullable unless something else is causing this.

This behavior is not noticed for example in Go client library and question if this behavior is intended?

When schemas are managed through admin rest api there is a need that schema is compatible with multiple client libraries so it would be great if schema would match exactly what is uploaded through admin endpoint, in this case if it's not nullable then it should be accepted when creating producer. If it is defined nullable then it should check for that explicitly.

balticore commented 1 month ago

This is not an issue since to make fields not nullable it was missing [Required] annotation