apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.25k stars 3.58k forks source link

[Bug] Incompatible schema when having Avro schema without nullable string fields #18501

Open trojczak opened 2 years ago

trojczak commented 2 years ago

Search before asking

Version

Using Pulsar 2.9.1.

Minimal reproduce step

Here is the project with the minimal reproducible example that leads to this problem: https://github.com/trojczak/pulsar-avro-schema-problem

Steps to reproduce:

  1. Create topics and schemas from README.md. Use Person.avsc from the given repo as a schema for the topic.
  2. Try to run PersonFunction class using LocalRunner from PersonFunctionTest.
  3. See:
    Caused by: org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: org.apache.avro.SchemaValidationException: Unable to read schema: 
    {
    "type" : "record",
    "name" : "Person",
    "namespace" : "pl.trojczak.pocs.model",
    "fields" : [ {
    "name" : "birthYear",
    "type" : "int"
    }, {
    "name" : "name",
    "type" : [ "null", "string" ],
    "default" : null
    } ]
    }
    using schema:
    {
    "type" : "record",
    "name" : "Person",
    "namespace" : "pl.trojczak.pocs.model",
    "fields" : [ {
    "name" : "name",
    "type" : "string"
    }, {
    "name" : "birthYear",
    "type" : "int"
    } ]
    }

What did you expect to see?

I expect to have a way for the Person.avsc schema to work with the given function.

What did you see instead?

I get the following exception:

Caused by: org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: org.apache.avro.SchemaValidationException: Unable to read schema: 
{
  "type" : "record",
  "name" : "Person",
  "namespace" : "pl.trojczak.pocs.model",
  "fields" : [ {
    "name" : "birthYear",
    "type" : "int"
  }, {
    "name" : "name",
    "type" : [ "null", "string" ],
    "default" : null
  } ]
}
using schema:
{
  "type" : "record",
  "name" : "Person",
  "namespace" : "pl.trojczak.pocs.model",
  "fields" : [ {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "birthYear",
    "type" : "int"
  } ]
}

Anything else?

No response

Are you willing to submit a PR?

nlu90 commented 1 year ago

Hi @trojczak

The issue is due to the difference in the field name in your example.

In your asvc file, you write:

{
    "name" : "name",
    "type" : "string"
  }

While in the pulsar-generated schema based on the Person class, it's

{
    "name" : "name",
    "type" : [ "null", "string" ],
    "default" : null
  } 

The reason for "type" : [ "null", "string" ] is to allow null value for the Name field, you can check more details at the Avro documentation: https://avro.apache.org/docs/1.10.1/spec.html#Unions

Could you change your asvc file to make them match? Or there's a specific reason you don't want to make the change?

trojczak commented 1 year ago

Hi, @nlu90! The schema in the Person.avsc file is the one that we want to have for out topic. It shouldn't allow null fields. The second schema, with the nullable name field is the one generated by Pulsar when running a function.

We want some fields to be non-nullable, and I think there should be a way to make Pulsar recognize this requirement. Maybe there is one, but I don't know about it? If not, I consider this as a bug or at least as a lack of quite important capability.

nlu90 commented 1 year ago

@trojczak I think Schema by default works with the nullable values.

@codelipenghui Could you provide some input if there's a way in Pulsar Schema to specify non-nullable field?

JacekWislicki commented 1 year ago

Just a couple of words of what we noticed:

When defining the schema for a Pulsar client, this code makes a schema with everything nullable: Schema.AVRO(MyClass.class) However, the Avro API supports non-nullable fields and the above can be written as: DefaultImplementation.getDefaultImplementation().newAvroSchema(SchemaDefinition.builder().withAlwaysAllowNull(false).withPojo(MyClass.class).build()) Though, there does not seem to be any configuration to enforce "alwaysAllowNull=false" in Pulsar functions/sinks.

Additionally, "pulsar-admin schemas extract" has the --always-allow-null flag allowing toggling generation of a schema with non-nullable fields.

github-actions[bot] commented 1 year ago

The issue had no activity for 30 days, mark with Stale label.