apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
13.98k stars 3.54k forks source link

[Pulsar SQL] Query key-value schema data error #9704

Open gaoran10 opened 3 years ago

gaoran10 commented 3 years ago

Describe the bug If the producer uses the key-value schema in a separate mode and disables the batch feature, the Pulsar SQL will get in trouble.

To Reproduce

  1. produce key-value schema data use separate mode

set enableBatching to false

@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Schema<KeyValue<Stock, Stock>> schema = Schema.KeyValue(
        Schema.AVRO(Stock.class), Schema.AVRO(Stock.class), KeyValueEncodingType.SEPARATED);
String topic = "kv-schema-test2";

@Cleanup
Producer<KeyValue<Stock, Stock>> producer = pulsarClient
        .newProducer(schema)
        .topic(topic)
        .enableBatching(false)
        .create();

for (int i = 0; i < 10; i++) {
    producer.send(new KeyValue<>(
            new Stock(i, "K_STOCK_" + i, i * 100),
            new Stock(i, "V_STOCK_" + i, i * 100)));
}
@Data
public class Stock {
    private int entryId;
    private String symbol;
    private double sharePrice;
}
  1. query data by Pulsar SQL
  2. see the error logs
2021-02-24T21:01:22.914+0800    INFO    20210224_130122_00004_yzrcx.1.0-0-114   org.apache.pulsar.sql.presto.PulsarRecordCursor Initializing split with parameters: PulsarSplit{splitId=0, connectorId='pulsar', originSchemaName='kv-schema-test2', schemaName='public/default', tableName='kv-schema-test2', splitSize=5, schema='ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}', schemaType=KEY_VALUE, startPositionEntryId=0, endPositionEntryId=5, startPositionLedgerId=12, endPositionLedgerId=12, schemaInfoProperties={"key.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.type":"AVRO","key.schema.name":"","value.schema.name":"","kv.encoding.type":"SEPARATED","key.schema.type":"AVRO"}}
2021-02-24T21:01:22.914+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   org.apache.pulsar.sql.presto.PulsarRecordCursor Initializing split with parameters: PulsarSplit{splitId=1, connectorId='pulsar', originSchemaName='kv-schema-test2', schemaName='public/default', tableName='kv-schema-test2', splitSize=4, schema='ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}', schemaType=KEY_VALUE, startPositionEntryId=5, endPositionEntryId=9, startPositionLedgerId=12, endPositionLedgerId=12, schemaInfoProperties={"key.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.type":"AVRO","key.schema.name":"","value.schema.name":"","kv.encoding.type":"SEPARATED","key.schema.type":"AVRO"}}
2021-02-24T21:01:22.959+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr  java.lang.ArrayIndexOutOfBoundsException: -52
2021-02-24T21:01:22.959+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
2021-02-24T21:01:22.959+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
2021-02-24T21:01:22.959+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
2021-02-24T21:01:22.959+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
2021-02-24T21:01:22.959+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:248)
2021-02-24T21:01:22.959+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
2021-02-24T21:01:22.960+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
2021-02-24T21:01:22.960+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
2021-02-24T21:01:22.960+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
2021-02-24T21:01:22.960+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:101)
2021-02-24T21:01:22.960+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:42)
2021-02-24T21:01:22.960+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:67)
2021-02-24T21:01:22.960+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:65)
2021-02-24T21:01:22.960+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoder.decodeRow(PulsarAvroRowDecoder.java:66)
2021-02-24T21:01:22.960+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:499)
2021-02-24T21:01:22.960+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90)
2021-02-24T21:01:22.960+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302)
2021-02-24T21:01:22.960+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at io.prestosql.operator.Driver.processInternal(Driver.java:379)
2021-02-24T21:01:22.960+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
2021-02-24T21:01:22.960+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
2021-02-24T21:01:22.961+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at io.prestosql.operator.Driver.processFor(Driver.java:276)
2021-02-24T21:01:22.961+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
2021-02-24T21:01:22.961+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
2021-02-24T21:01:22.961+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
2021-02-24T21:01:22.961+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at io.prestosql.$gen.Presto_332__testversion____20210224_125443_2.run(Unknown Source)
2021-02-24T21:01:22.961+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2021-02-24T21:01:22.961+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2021-02-24T21:01:22.961+0800    INFO    20210224_130122_00004_yzrcx.1.0-1-106   stderr      at java.lang.Thread.run(Thread.java:748)
2021-02-24T21:01:22.974+0800    ERROR   SplitRunner-5-106   io.prestosql.execution.executor.TaskExecutor    Error processing Split 20210224_130122_00004_yzrcx.1.0-1 PulsarSplit{splitId=1, connectorId='pulsar', originSchemaName='kv-schema-test2', schemaName='public/default', tableName='kv-schema-test2', splitSize=4, schema='ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}', schemaType=KEY_VALUE, startPositionEntryId=5, endPositionEntryId=9, startPositionLedgerId=12, endPositionLedgerId=12, schemaInfoProperties={"key.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.type":"AVRO","key.schema.name":"","value.schema.name":"","kv.encoding.type":"SEPARATED","key.schema.type":"AVRO"}} (start = 2.86554176341414E8, wall = 53 ms, cpu = 0 ms, wait = 0 ms, calls = 1): GENERIC_INTERNAL_ERROR: Decoding avro record failed.
io.prestosql.spi.PrestoException: Decoding avro record failed.
    at org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoder.decodeRow(PulsarAvroRowDecoder.java:70)
    at org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:499)
    at io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90)
    at io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302)
    at io.prestosql.operator.Driver.processInternal(Driver.java:379)
    at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
    at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
    at io.prestosql.operator.Driver.processFor(Driver.java:276)
    at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
    at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
    at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
    at io.prestosql.$gen.Presto_332__testversion____20210224_125443_2.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -52
    at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:248)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
    at org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:101)
    at org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:42)
    at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:67)
    at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:65)
    at org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoder.decodeRow(PulsarAvroRowDecoder.java:66)
    ... 14 more

Expected behavior Query data successfully.

Desktop (please complete the following information):

Additional context Maybe related to this method in the class RawMessageImpl.

public Optional<ByteBuf> getKeyBytes() {
    if (getKey().isPresent()) {
        if (hasBase64EncodedKey()) {
            return Optional.of(Unpooled.wrappedBuffer(Base64.getDecoder().decode(getKey().get())));
        } else {
            return Optional.of(Unpooled.wrappedBuffer(getKey().get().getBytes(StandardCharsets.UTF_8)));
        }
    }
    return Optional.empty();
}
codelipenghui commented 3 years ago

@gaoran10 Is this one is fixed by https://github.com/apache/pulsar/pull/9685?

gaoran10 commented 3 years ago

I think this issue has not been solved yet. The key point is use the key-value schema in seperate mode and disable the batch feature, if enable the batch feature the Pulsar SQL works well.

codelipenghui commented 3 years ago

@congbobo184 Could you please take a look again? @gaoran10 Do you mean if use key-value schema in separate mode and disabled the message batching, the problem still there?

gaoran10 commented 3 years ago

@gaoran10 Do you mean if use key-value schema in separate mode and disabled the message batching, the problem still there?

Yes, I think the problem still exists.

codelipenghui commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.