fsprojects / pulsar-client-dotnet

Apache Pulsar native client for .NET (C#/F#/VB)
MIT License
301 stars 47 forks source link

How to use Logical types in Avro schema? #211

Closed RobertIndie closed 1 year ago

RobertIndie commented 2 years ago

I want to use the logical types in the Avro schema. But seems that Avro C# doesn't support logical types. It will throw the exception like below:

Avro.AvroException: Unknown schema type: Logical in field DecimalVar
 ---> Avro.AvroException: Unknown schema type: Logical
   at Avro.Reflect.ReflectDefaultWriter.Matches(Schema sc, Object obj)
   at Avro.Specific.SpecificDefaultWriter.WriteUnion(UnionSchema us, Object value, Encoder encoder)
   at Avro.Generic.DefaultWriter.Write(Schema schema, Object value, Encoder encoder)
   at Avro.Reflect.ReflectDefaultWriter.WriteRecord(RecordSchema schema, Object value, Encoder encoder)
   --- End of inner exception stack trace ---
   at Avro.Reflect.ReflectDefaultWriter.WriteRecord(RecordSchema schema, Object value, Encoder encoder)
   at Avro.Generic.DefaultWriter.Write(Schema schema, Object value, Encoder encoder)
   at Avro.Generic.DefaultWriter.Write[T](T value, Encoder encoder)
   at Avro.Reflect.ReflectWriter`1.Write(T value, Encoder encoder)
   at Pulsar.Client.Schema.AvroSchema`1.Encode(T value) in /Users/aaronrobert/codebase/pulsar-client-dotnet/src/Pulsar.Client/Schema/AvroSchema.fs:line 47
   at <StartupCode$Pulsar-Client>.$ProducerImpl.NewMessage@815.Invoke(Unit unitVar0) in /Users/aaronrobert/codebase/pulsar-client-dotnet/src/Pulsar.Client/Internal/ProducerImpl.fs:line 823
   at Pulsar.Client.Api.ProducerImpl`1.NewMessage[T](FSharpOption`1 keyValueProcessor, ISchema`1 schema, T value, String key, IReadOnlyDictionary`2 properties, Nullable`1 deliverAt, Nullable`1 sequenceId, Byte[] keyBytes, Byte[] orderingKey, Nullable`1 eventTime, Transaction txn, IEnumerable`1 replicationClusters) in /Users/aaronrobert/codebase/pulsar-client-dotnet/src/Pulsar.Client/Internal/ProducerImpl.fs:line 803
   at Pulsar.Client.Api.ProducerImpl`1.Pulsar.Client.Api.IProducer<'T>.NewMessage(T value, String key, IReadOnlyDictionary`2 properties, Nullable`1 deliverAt, Nullable`1 sequenceId, Byte[] keyBytes, Byte[] orderingKey, Nullable`1 eventTime, Transaction txn, IEnumerable`1 replicationClusters)
   at Pulsar.Client.Api.ProducerImpl`1.Pulsar.Client.Api.IProducer<'T>.SendAsync(T message)
   at CsharpExamples.SchemaExample.RunSchema() in /Users/aaronrobert/codebase/pulsar-client-dotnet/examples/CsharpExamples/Schema.cs:line 81
   at CsharpExamples.Program.Main(String[] args) in /Users/aaronrobert/codebase/pulsar-client-dotnet/examples/CsharpExamples/Program.cs:line 25
   at CsharpExamples.Program.<Main>(String[] args)

Here is the class I used:

public class Product
{
    public Decimal DecimalVar { get; set; }
}

And then I try to use SpecificDatumWriter to encode the message. The message class is

    public class Product : ISpecificRecord
    {
        public Decimal DecimalVar { get; set; }
        public object Get(int fieldPos)
        {
            switch (fieldPos)
            {
                case 0: return DecimalVar;
                default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
            };
        }

        public void Put(int fieldPos, object fieldValue)
        {
            switch (fieldPos)
            {
                case 0: DecimalVar = (Decimal)fieldValue; break;
                default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
            };
        }

        [Ignore]
        public Avro.Schema Schema { get; }
    }

And it throwed:

System.NullReferenceException: Object reference not set to an instance of an object.
   at Pulsar.Client.Schema.AvroSchema`1..ctor() in /Users/aaronrobert/codebase/pulsar-client-dotnet/src/Pulsar.Client/Schema/AvroSchema.fs:line 22
   at Pulsar.Client.Api.Schema.AVRO[T]() in /Users/aaronrobert/codebase/pulsar-client-dotnet/src/Pulsar.Client/Api/Schema.fs:line 46
   at CsharpExamples.SchemaExample.RunSchema() in /Users/aaronrobert/codebase/pulsar-client-dotnet/examples/CsharpExamples/Schema.cs:line 70
   at CsharpExamples.Program.Main(String[] args) in /Users/aaronrobert/codebase/pulsar-client-dotnet/examples/CsharpExamples/Program.cs:line 25
   at CsharpExamples.Program.<Main>(String[] args)

The root cause is that the client tried to get a field called "_SCHEMA" from the message class. https://github.com/fsprojects/pulsar-client-dotnet/blob/25e41f28a3f46eae955761c4ad12d9973ae333eb/src/Pulsar.Client/Schema/AvroSchema.fs#L22 But why does it want to get this field? How to use logical types in avro schema correctly?

Lanayx commented 2 years ago

Hi, it seems that we depend on this field to get the actual schema. You can see it being generated when using Avro generator, so I think you need to add it manually when using hand-written classes. If it won't work you might need to try contribute to Avro itself to add logical types support

Lanayx commented 2 years ago

By the way, it seems that logical types might be already supported, https://avro.apache.org/docs/1.11.0/api/csharp/html/classAvro_1_1Util_1_1LogicalType.html , however I haven't tried it

Lanayx commented 1 year ago

Closing since decimal support was added to AVRO https://github.com/fsprojects/pulsar-client-dotnet/issues/227