apache / pulsar-client-python

Apache Pulsar Python client library
https://pulsar.apache.org/
Apache License 2.0
51 stars 40 forks source link

Python Avro consumer cannot consume non-union fields #108

Closed BewareMyPower closed 1 year ago

BewareMyPower commented 1 year ago

How to reproduce

Run a Pulsar standalone 2.11.

First, create a Python consumer whose schema is a class with a string field name and an integer field age:

import pulsar
from pulsar.schema import *
import fastavro

class User(Record):
    name = String()
    age = Integer()

print(fastavro.parse_schema(User.schema()))

client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic',
                            subscription_name="sub",
                            schema=AvroSchema(User))

while True:
    try:
        msg = consumer.receive()
        version = int.from_bytes(msg.schema_version().encode('ascii'), byteorder='big')
        print(f"Received {len(msg.data())} bytes id='{msg.message_id()}' version='{version}'")
        value = msg.value()
        print(value)
        print(f"name: {value.name}, age: {value.age}")
        consumer.acknowledge(msg)
    except pulsar.Interrupted:
        print("Stop receiving messages")
        break

client.close()

Then, set the schema compatibility to FORWARD:

curl -L http://localhost:8080/admin/v2/namespaces/public/default/schemaCompatibilityStrategy \
   -X PUT -H 'Content-Type: application/json' -d '"FORWARD"'

NOTE: Here we have to set the schema compatibility, otherwise the Java producer cannot be created. It's another bug. I will talk about it later.

Then, run the Java producer to send a message (User{name="xyz", age=10}):

    @AllArgsConstructor
    @Getter
    public class User {
        private final String name;
        private final int age;
    }
        @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://172.22.48.50:6650").build();
        Producer<User> producer = client.newProducer(Schema.AVRO(User.class))
                .topic("my-topic").create();
        producer.send(new User("xyz", 10));

Then, the Python consumer application will crash with the following logs:

Received 6 bytes id='(1,0,-1,-1)' version='1'
Traceback (most recent call last):
  File "consumer.py", line 42, in <module>
    value = msg.value()
  File "/home/xyz/pulsar-client-python/pulsar/__init__.py", line 130, in value
    return self._schema.decode(self._message.data())
  File "/home/xyz/pulsar-client-python/pulsar/schema/schema_avro.py", line 80, in decode
    d = fastavro.schemaless_reader(buffer, self._schema)
  File "fastavro/_read.pyx", line 1107, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 1120, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 749, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 620, in fastavro._read.read_record
  File "fastavro/_read.pyx", line 740, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 521, in fastavro._read.read_union
IndexError: list index out of range

Analysis

There are two bugs. First, the schema definition generated by the Python client is different from the Java client. Copy these two classes here:

class User(Record):
    name = String()
    age = Integer()
    @AllArgsConstructor
    @Getter
    static class User {
        private final String name;
        private final int age;
    }

Check the schema definitions (pulsar-admin schemas get my-topic -v <version>) and we can find there are two versions of the schema:

{
  "name": "my-topic",
  "schema": {
    "type": "record",
    "name": "User",
    "fields": [
      {
        "name": "name",
        "type": [
          "null",
          "string"
        ]
      },
      {
        "name": "age",
        "type": [
          "null",
          "int"
        ]
      }
    ]
  },
  "type": "AVRO",
  "timestamp": 1680074000129,
  "properties": {}
}
{
  "name": "my-topic",
  "schema": {
    "type": "record",
    "name": "User",
    "namespace": "org.apache.pulsar.client.api.ConsumerIdTest",
    "fields": [
      {
        "name": "age",
        "type": "int"
      },
      {
        "name": "name",
        "type": [
          "null",
          "string"
        ]
      }
    ]
  },
  "type": "AVRO",
  "timestamp": 1680074158624,
  "properties": {
    "__alwaysAllowNull": "true",
    "__jsr310ConversionEnabled": "false"
  }
}

We can see:

BewareMyPower commented 1 year ago

A workaround is to change the Python User definition from

class User(Record):
    name = String()
    age = Integer()

to

class User(Record):
    _sorted_fields = True
    name = String()
    age = Integer(required=True)

To be compatible with the Java client, we have to configure the required field of all fields with True, except String. In addition, we need to set _sorted_fields with True.