apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.19k stars 3.58k forks source link

[Schema]GenericData$Record cannot be cast #5503

Closed tuteng closed 4 years ago

tuteng commented 4 years ago

Describe the bug A clear and concise description of what the bug is.

To Reproduce Steps to reproduce the behavior:

  1. Go to '...'

    @Builder
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Foo2 {
        private String name;
        private Integer id;
    }
    PulsarClient client = PulsarClient.builder()
                        .serviceUrl("pulsar://localhost:6650")
                        .build();
    
            AvroSchema<Foo2> schema = AvroSchema.of(SchemaDefinition.<Foo2>builder().withJsonDef("{\n" +
                    "    \"type\": \"record\",\n" +
                    "    \"name\": \"Test\",\n" +
                    "    \"fields\": [\n" +
                    "      {\n" +
                    "        \"name\": \"id\",\n" +
                    "        \"type\": [\n" +
                    "          \"null\",\n" +
                    "          \"int\"\n" +
                    "        ]\n" +
                    "      },\n" +
                    "      {\n" +
                    "        \"name\": \"name\",\n" +
                    "        \"type\": [\n" +
                    "          \"null\",\n" +
                    "          \"string\"\n" +
                    "        ]\n" +
                    "      }\n" +
                    "    ]\n" +
                    "  }").withPojo(Foo2.class).build());
            Consumer<Foo2> consumer = client.newConsumer(schema)
                .topic("pulsar-mysql-jdbc-sink-topic")
                .subscriptionName("my-subscription-2")
                .subscribe();
    
            while (true) {
            // Wait for a message
            Message<Foo2> msg = consumer.receive();
                Foo2 f = msg.getValue();
            }
  2. See error

Exception in thread "main" java.lang.ClassCastException: org.apache.pulsar.shade.org.apache.avro.generic.GenericData$Record cannot be cast to io.streamnative.KeyValueSchemaTest$Foo2
    at io.streamnative.KeyValueSchemaTest.testConsumerByPythonProduce(KeyValueSchemaTest.java:412)
    at io.streamnative.KeyValueSchemaTest.main(KeyValueSchemaTest.java:305)

Expected behavior A clear and concise description of what you expected to happen.

Desktop (please complete the following information):

Additional context Add any other context about the problem here.

tuteng commented 4 years ago

@codelipenghui @congbobo184

Jennifer88huang-zz commented 4 years ago

When you find the reason for the bug, could you please pull request and fix it? Thank you.

congbobo184 commented 4 years ago

What is the producer's schema

congbobo184 commented 4 years ago
                    "{\n" +
                    "    \"type\": \"record\",\n" +
                    "    \"name\": \"Test\",\n" +
                    "    \"fields\": [\n" +
                    "      {\n" +
                    "        \"name\": \"id\",\n" +
                    "        \"type\": [\n" +
                    "          \"null\",\n" +
                    "          \"int\"\n" +
                    "        ]\n" +
                    "      },\n" +
                    "      {\n" +
                    "        \"name\": \"name\",\n" +
                    "        \"type\": [\n" +
                    "          \"null\",\n" +
                    "          \"string\"\n" +
                    "        ]\n" +
                    "      }\n" +
                    "    ]\n" +
                    "  }"

you should change the name to Foo2 and add the namespace, like

ReflectData.get().getSchema(Foo2.class).toString(). 

the namespace like this output

sijie commented 4 years ago

@congbobo184 the question is - if a user specific a POJO class in the SchemaDefinition, it should return a POJO not a GenericData$Record. This is something I don't quite understand when looking into this issue. Since you introduced SchemaDefinition, would you mind taking a look at this?

congbobo184 commented 4 years ago

@tuteng define the AvroSchema withJsonDef, it is not null. The logic now is to generate the schema defined with withJsonDef first. So, withJsonDef don't define the schema with namespace and the name is not foo2, so generate a GenericData$Record. I think we can change this logical to withPojo first or use one of withJsonDef and withPojo.

Antti-Kaikkonen commented 4 years ago

I'm getting the same error with a pulsar function where the input is a POJO from an Avro encoded topic. The function works fine if the input topic is JSON encoded.

codelipenghui commented 4 years ago

The problem is we should specify the correct namespace and name in the JSON definition. I have written a demo in java:

package org.apache.pulsar.client.impl;

import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.Objects;

public class SchemaTest {

    @Test
    public void test() {
        SchemaDefinition<User> sd = SchemaDefinition.<User>builder().withJsonDef("{\n" +
                "    \"type\": \"record\",\n" +
                "    \"name\": \"User\",\n" +
                "    \"namespace\": \"org.apache.pulsar.client.impl.SchemaTest\",\n" +
                "    \"fields\": [\n" +
                "      {\n" +
                "        \"name\": \"id\",\n" +
                "        \"type\": [\n" +
                "          \"null\",\n" +
                "          \"int\"\n" +
                "        ]\n" +
                "      },\n" +
                "      {\n" +
                "        \"name\": \"name\",\n" +
                "        \"type\": [\n" +
                "          \"null\",\n" +
                "          \"string\"\n" +
                "        ]\n" +
                "      }\n" +
                "    ]\n" +
                "  }").build();

        AvroSchema<User> schema = AvroSchema.of(sd);
        User user = new User();
        user.setId(1);
        user.setName("penghui");
        byte[] encoded = schema.encode(user);
        User decoded = schema.decode(encoded);
        Assert.assertEquals(user, decoded);
    }

    private static class User {
        private Integer id;
        private String name;

        public Integer getId() {
            return id;
        }

        public void setId(Integer id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            User user = (User) o;
            return Objects.equals(id, user.id) &&
                    Objects.equals(name, user.name);
        }

        @Override
        public int hashCode() {
            return Objects.hash(id, name);
        }
    }
}

Avro use full name(namespace + "." + name) to get Class for decoding data. GenericData uses If class not found. Here is source code of SpecificData in avro, and SpecificData is GenericData's subclass, if the class not found, will call supper.newRecord, this will introduce GenericRecord.

public Object newRecord(Object old, Schema schema) {
    Class c = getClass(schema);
    if (c == null)
      return super.newRecord(old, schema); // punt to generic
    return (c.isInstance(old) ? old : newInstance(c, schema));
  }
sijie commented 4 years ago

@codelipenghui - I think this issue has been seen if a AVRO pojo is used in pulsar functions. It might be worth checking why pulsar functions would encounter this issue.

codelipenghui commented 4 years ago

@sijie Ok, I will take a look.

Antti-Kaikkonen commented 4 years ago

@codelipenghui @sijie I have created for testing a source connector that produces dummy users every 1 second and a function that only appends "!!!" to the name of each user: https://github.com/Antti-Kaikkonen/PulsarPojoTest. When I run both the source connector and the function I can observe the error in the log file /tmp/functions/public/default/pojo-function/pojo-function-0.log. I'm running pulsar 2.5.0 standalone with OpenJDK 11.

Based on @codelipenghui comment I think it might be a class loader issue.

codelipenghui commented 4 years ago

/cc @gaoran10 please also help take a look @Antti-Kaikkonen 's comment.