apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
13.95k stars 3.53k forks source link

Pulsar SQL BufferUnderflowException after upgrading from 2.7.0 to 2.8.1 #12284

Open Raven888888 opened 2 years ago

Raven888888 commented 2 years ago

Describe the bug Following this upgrade guide to upgrade pulsar cluster node by node, from 2.7.0 to 2.8.1.

Presto CLI version 332 is after the upgrade.

Trying to query in Presto CLI from one of the topics, apache-pulsar/bin/pulsar sql select * from pulsar."public/default"."test"; returns Query 20211006_093830_00017_59f2r failed: java.nio.BufferUnderflowException (See below for full logs)

However, this only affects some topics. Other topics in the same tenant and namespace can be queried just fine.

Also, using pulsar flink connector and pulsar client python api to consume data from the problematic topics are working as expected, no issues.

Full logs

2021-10-06T09:38:30.837Z        ERROR        remote-task-callback-89        io.prestosql.execution.StageStateMachine        Stage 20211006_093830_00017_59f2r.1 failed  
com.google.common.util.concurrent.UncheckedExecutionException: java.nio.BufferUnderflowException  
        at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
        at com.google.common.cache.LocalCache.get(LocalCache.java:3951)
        at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
        at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
        at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.getSchemaByVersion(PulsarSqlSchemaInfoProvider.java:76)
        at org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:485)
        at io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90)
        at io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302)
        at io.prestosql.operator.Driver.processInternal(Driver.java:379)
        at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
        at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
        at io.prestosql.operator.Driver.processFor(Driver.java:276)
        at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
        at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
        at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
        at io.prestosql.$gen.Presto_332__testversion____20211005_104810_2.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.BufferUnderflowException
        at java.nio.Buffer.nextGetIndex(Buffer.java:510)
        at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:427)
        at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.loadSchema(PulsarSqlSchemaInfoProvider.java:106)
        at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.access$000(PulsarSqlSchemaInfoProvider.java:49)
        at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider$1.load(PulsarSqlSchemaInfoProvider.java:61)
        at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider$1.load(PulsarSqlSchemaInfoProvider.java:58)
        at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
        at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
        at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
        at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
        ... 18 more

PS: Saw similar issue, but that one is about byte schema data, mine is string schema data.

Suspect presto has messed up its cache somehow. Any pointer on how to identify the root cause and/or fix this issue will be greatly appreciated. Thanks!

MarvinCai commented 2 years ago

The exception seems come from Pulsar connector not Presto as Pulsar connector manages the Schema and cache, seems the exceptions comes from this line: https://github.com/apache/pulsar/blob/master/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java#L106 which is throwing exception when parsing schema version, can you provide schema info using pulsar-admin, so we can verify the schema info is not corrupted.

Raven888888 commented 2 years ago

Greetings @MarvinCai

apache-pulsar/bin/pulsar-admin schemas get test returns

{
  "version": 0,
  "schemaInfo": {
    "name": "test",
    "schema": "",
    "type": "STRING",
    "properties": {}
  }
}

This is the same schema before the pulsar upgrade process.

Also tried to re-upload schema: apache-pulsar/bin/pulsar-admin schemas upload test -f apache-pulsar/conf/schema_example.conf same BufferUnderflowException error.

Thanks in advance.

Raven888888 commented 2 years ago

Also, ref: link

Raven888888 commented 2 years ago

Hi @MarvinCai have you got any clue?

Update from my side:

I tried deleting its schema, apache-pulsar/bin/pulsar-admin schemas delete test able to query in pulsar SQL just fine.

Then, tried re-upload schema apache-pulsar/bin/pulsar-admin schemas upload test -f apache-pulsar/conf/schema_example.conf got the same BufferUnderflowException error.

However, I noticed that GET schemas now returns

{
  **"version": 2,**
  "schemaInfo": {
    "name": "test",
    "schema": "",
    "type": "STRING",
    "properties": {
      "key1": "value1"
    }
  }
}

Seems like schema version has been updated from V0 to V2, from pulsar 2.7.0 to 2.8.1 respectively. Is this what causing presto pulsar connection to break? PS: Apologies, version is just related to how many times I update the topic schema. Unrelated to the issue.

Raven888888 commented 2 years ago

Seeing some more similar issues of pulsar worker in pulsar-client-go#546 and pulsar#11457.

gaoran10 commented 2 years ago

It seems that the schema version of the message is null, which language client do messages come from?

Raven888888 commented 2 years ago

I have broker enabled with websocket service. webSocketServiceEnabled=true

I am using python client WS example to publish into the topic.

MarvinCai commented 2 years ago

sorry for delay, had some discussion with @gaoran10 offline The Websocket endpoint doesn't support schema, the internal implementation simply produce raw byte(ref) And when the Pulsar topic has schema, Pulsar SQL try to ready the message and will expect a schema, while there'll be no schema on the message, causing NPE. But when we remove the schema, Pulsar SQL will know that and just use a Byte schema by default(ref), that should explain why it's working after you remove the schema from the topic. If you try produce with a client that support passing schema, Pulsar SQL should work as expected. Actually here's a handy cli tool to ingest NYC taxi data to a Pulsar cluster with Schema as test dataset which make it easy to test Pulsar SQL: https://github.com/streamnative/examples/tree/master/nyctaxi/taxidata , the schema: https://github.com/streamnative/examples/blob/master/nyctaxi/taxidata/pkg/types/yellow.go

Raven888888 commented 2 years ago

@MarvinCai @gaoran10 Thank you for your explanation, clears up a bit. I have further follow-up questions:

  1. In pulsar 2.7.0, I too used websocket to publish data (as you pointed out rightfully, data in raw bytes) to pulsar. And pulsar SQL works fine in that version, where I manually upload string schema. Is there changes in schema handling from 2.7.x to 2.8.x, that causes pulsar SQL to break?
  2. Is it possible to get back pulsar 2.7.0 behaviour in 2.8.x and beyond? I still want to use websocket to publish data (raw bytes), and sets pulsar topic with string schema, and able to query using pulsar SQL.

Thanks a lot.

gaoran10 commented 2 years ago

We add a PR https://github.com/apache/pulsar/pull/12809 to handle the null schema version problem.

Raven888888 commented 2 years ago

Awesome, thank you @gaoran10 and team! 🚀

Raven888888 commented 2 years ago

@gaoran10 Sorry to say but the issue I faced still persists.

I recently only have the time to try your PR #12809 . I used the official 2.10.0 binary, which should already contain your PR, and did a clean install.

However, I still face the exact same BufferUnderflowException error (and same error log) when I

  1. use websocket client to produce a message without schema to a topic
  2. upload string schema to the topic (conf/schema_example.conf)
  3. face the error when query from pulsar SQL

Note that:

Raven888888 commented 2 years ago

@gaoran10 @MarvinCai

Raven888888 commented 2 years ago

I noticed that one of the differences of broker.conf between pulsar 2.7.0 and pulsar 2.8.0+ is the introduction of this

# The schema compatibility strategy in broker level.
# SchemaCompatibilityStrategy : ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD,
# FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
schemaCompatibilityStrategy=FULL

Could it be what causing the BufferUnderflowException?

codelipenghui commented 2 years ago

@Technoboy- This one should be related to https://lists.apache.org/thread/3js51tq2p3c3oldfrhprn4kcohx7h1wv ?

Raven888888 commented 1 year ago

Any update?

gaoran10 commented 1 year ago

@Raven888888 Which version do you use? I test with Pulsar 3.0.0, it works well, could you try to use Pulsar 3.0.0?

Test steps

  1. Use websocket client to produce a message without schema to a topic
  2. Upload string schema to the topic
  3. Query with Pulsar SQL

Before upload string schema

presto> select * from pulsar."public/default"."t1";
            __value__             | __partition__ | __event_time__ |    __publish_time__     | __message_id__ | __sequence_id__ | __producer_name__ | __key__ |          __properties__
----------------------------------+---------------+----------------+-------------------------+----------------+-----------------+-------------------+---------+-----------------------------------
 48 65 6c 6c 6f 20 57 6f 72 6c 64 |            -1 | NULL           | 2023-07-03 03:56:43.873 | (15,5,0)       |               5 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 48 65 6c 6c 6f 20 57 6f 72 6c 64 |            -1 | NULL           | 2023-07-03 03:56:43.873 | (15,6,0)       |               6 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 48 65 6c 6c 6f 20 57 6f 72 6c 64 |            -1 | NULL           | 2023-07-03 03:56:43.873 | (15,7,0)       |               7 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 48 65 6c 6c 6f 20 57 6f 72 6c 64 |            -1 | NULL           | 2023-07-03 03:56:43.873 | (15,8,0)       |               8 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 48 65 6c 6c 6f 20 57 6f 72 6c 64 |            -1 | NULL           | 2023-07-03 03:56:43.874 | (15,9,0)       |               9 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 48 65 6c 6c 6f 20 57 6f 72 6c 64 |            -1 | NULL           | 2023-07-03 03:56:43.870 | (15,0,0)       |               0 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 48 65 6c 6c 6f 20 57 6f 72 6c 64 |            -1 | NULL           | 2023-07-03 03:56:43.871 | (15,1,0)       |               1 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 48 65 6c 6c 6f 20 57 6f 72 6c 64 |            -1 | NULL           | 2023-07-03 03:56:43.871 | (15,2,0)       |               2 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 48 65 6c 6c 6f 20 57 6f 72 6c 64 |            -1 | NULL           | 2023-07-03 03:56:43.872 | (15,3,0)       |               3 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 48 65 6c 6c 6f 20 57 6f 72 6c 64 |            -1 | NULL           | 2023-07-03 03:56:43.872 | (15,4,0)       |               4 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
(10 rows)

Query 20230703_040424_00001_2a95d, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:00 [10 rows, 1.17KB] [46 rows/s, 5.48KB/s]

After upload string schema

presto> select * from pulsar."public/default"."t1";
  __value__  | __partition__ | __event_time__ |    __publish_time__     | __message_id__ | __sequence_id__ | __producer_name__ | __key__ |          __properties__
-------------+---------------+----------------+-------------------------+----------------+-----------------+-------------------+---------+-----------------------------------
 Hello World |            -1 | NULL           | 2023-07-03 03:56:43.873 | (15,5,0)       |               5 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 Hello World |            -1 | NULL           | 2023-07-03 03:56:43.873 | (15,6,0)       |               6 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 Hello World |            -1 | NULL           | 2023-07-03 03:56:43.873 | (15,7,0)       |               7 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 Hello World |            -1 | NULL           | 2023-07-03 03:56:43.873 | (15,8,0)       |               8 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 Hello World |            -1 | NULL           | 2023-07-03 03:56:43.874 | (15,9,0)       |               9 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 Hello World |            -1 | NULL           | 2023-07-03 03:56:43.870 | (15,0,0)       |               0 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 Hello World |            -1 | NULL           | 2023-07-03 03:56:43.871 | (15,1,0)       |               1 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 Hello World |            -1 | NULL           | 2023-07-03 03:56:43.871 | (15,2,0)       |               2 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 Hello World |            -1 | NULL           | 2023-07-03 03:56:43.872 | (15,3,0)       |               3 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
 Hello World |            -1 | NULL           | 2023-07-03 03:56:43.872 | (15,4,0)       |               4 | standalone-0-9    | NULL    | {"key1":"value1","key2":"value2"}
(10 rows)

Query 20230703_042603_00002_2a95d, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:00 [10 rows, 1.17KB] [41 rows/s, 4.9KB/s]
Raven888888 commented 1 year ago

Thanks @gaoran10

I have tried through versions 2.7 (works), 2.8-2.10 (all have BufferUnderflowException). I have yet to try version 3.x, which I will in my nearest capacity.

That said, I think your test step should be:

  1. Use websocket client to produce a message without schema to a topic
  2. Use pulsar client to produce a message without schema to a topic
  3. Upload string schema to the topic
  4. Query with Pulsar SQL

I notice it happens when I have 2 kinds of clients producing message into the same topic, and it breaks pulsar SQL. I still able to read from the topic just fine using pulsar client or websocket client though.