apache / pulsar-client-go

Apache Pulsar Go Client Library
https://pulsar.apache.org/
Apache License 2.0
659 stars 336 forks source link

go client producer + schema breaks Pulsar SQL #546

Open r-funke opened 3 years ago

r-funke commented 3 years ago

Expected behavior

When using go client to send messages with schema, I expect to be able to use Pulsar SQL to query the data.

Actual behavior

tl;dr sql-worker is throwing an exception

I copy&pasted code from pulsar/schema_test.go to create a test client that is producing a message with JSON schema and consuming it afterwards. I verified that the go client is able to consume the message and retrieve the correct values by calling msg.GetSchemaValue(). But when trying to query the topic using SQL command line, I get an internal error:

presto> select * from pulsar."public/default"."goJson";

Query 20210621_103409_00001_r9xrk, FAILED, 1 node
Splits: 18 total, 1 done (5.56%)
0:01 [0 rows, 0B] [0 rows/s, 0B/s]

Query 20210621_103409_00001_r9xrk failed: Internal error

presto>

At the same time the sql-worker engine reports an error and throws an exception:

2021-06-21T10:34:10.534Z        ERROR   SplitRunner-2-106       io.prestosql.execution.executor.TaskExecutor    Error processing Split 20210621_103409_00001_r9xrk.1.0-0 PulsarSplit{splitId=0, connectorId='pulsar', originSchemaName='goJson', schemaName='public/default', tableName='goJson', splitSize=1, schema='{"type":"record","name":"Example","namespace":"test","fields":[{"name":"ID","type":"int"},{"name":"Name","type":"string"}]}', schemaType=JSON, startPositionEntryId=0, endPositionEntryId=1, startPositionLedgerId=285, endPositionLedgerId=285, schemaInfoProperties={"pulsar":"hello"}} (start = 2.54138549436848E8, wall = 490 ms, cpu = 0 ms, wait = 0 ms, calls = 1)
java.lang.NullPointerException
        at org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:493)
        at io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90)
        at io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302)
        at io.prestosql.operator.Driver.processInternal(Driver.java:379)
        at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
        at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
        at io.prestosql.operator.Driver.processFor(Driver.java:276)
        at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
        at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
        at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
        at io.prestosql.$gen.Presto_332__testversion____20210621_102833_2.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

I also tried Avro schema instead of JSON schema, but it is causing the same issue.

I wrote a producer in Python, because I was not sure whether this problem is in Pulsar or in pulsar-client-go. And with the python producer, the sql query worked out of the box. That's why I suspect the problem to be in the go client.

Steps to reproduce

Start pulsar container first, then start the presto engine by running ./bin/pulsar sql-worker run and keep the shell open to see the error logs.

Get the go_pulsar_schema_test.go here: https://gist.github.com/r-funke/452fa154aafef36cf71a6ea240b8a930

Run the go client and see that it successfully produces and consumes a message with schema. Run ./bin/pulsar sql in pulsar container and query the topic that was created by go client by running select * from pulsar."public/default"."goJson";. See that it reports the internal error pasted above (you may need to run go client again, if messages were already deleted). In the sql-worker shell you should see the error and exception pasted above.

To verify that it's working when using the python client, run py_pulsar_schema_producer.py from the same gist link above. Then query data in sql:

presto> select * from pulsar."public/default"."pyJson";
   a   | b |   c   | __partition__ |     __event_time__      |    __publish_time__     | __message_id__ | __sequence_id__ | __producer_name__ | __key__ | __properties__
-------+---+-------+---------------+-------------------------+-------------------------+----------------+-----------------+-------------------+---------+----------------
 Hello | 3 | false |            -1 | 1970-01-01 00:00:00.000 | 2021-06-21 10:49:00.524 | (312,2,0)      |               2 | standalone-3-2    | NULL    | {}
 Hello | 4 | false |            -1 | 1970-01-01 00:00:00.000 | 2021-06-21 10:49:00.527 | (312,3,0)      |               3 | standalone-3-2    | NULL    | {}
 Hello | 1 | false |            -1 | 1970-01-01 00:00:00.000 | 2021-06-21 10:49:00.512 | (312,0,0)      |               0 | standalone-3-2    | NULL    | {}
 Hello | 2 | false |            -1 | 1970-01-01 00:00:00.000 | 2021-06-21 10:49:00.519 | (312,1,0)      |               1 | standalone-3-2    | NULL    | {}
(4 rows)

Query 20210621_112301_00000_ta8x6, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:07 [4 rows, 510B] [0 rows/s, 75B/s]

presto>

In the sql-worker shell, you're now seeing an info instead of an error:

2021-06-21T11:23:08.443Z        INFO    20210621_112301_00000_ta8x6.1.0-0-102   org.apache.pulsar.sql.presto.PulsarRecordCursor Initializing split with parameters: PulsarSplit{splitId=1, connectorId='pulsar', originSchemaName='pyJson', schemaName='public/default', tableName='pyJson', splitSize=2, schema='{
 "name": "Example",
 "type": "record",
 "fields": [
  {
   "name": "a",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "b",
   "type": [
    "null",
    "int"
   ]
  },
  {
   "name": "c",
   "type": [
    "null",
    "boolean"
   ]
  }
 ]
}', schemaType=JSON, startPositionEntryId=2, endPositionEntryId=4, startPositionLedgerId=312, endPositionLedgerId=312, schemaInfoProperties={}}

System configuration

Pulsar version: 2.8 pulsar-client-go: 0.5.0

smazurov commented 3 years ago

curious, did you try with pulsar 2.7.2?

r-funke commented 3 years ago

I did check with Pulsar 2.7.2 now. It doesn't work either, but the behavior is a little bit different. The sql-worker doesn't crash, but there is NULL values returned by the queries instead of the actual data.

presto> select * from pulsar."public/default".xgojson;
  id  | name | __partition__ |     __event_time__      |    __publish_time__     | __message_id__ | __sequence_id__ | __producer_name__ | __key__ | __properties__ 
------+------+---------------+-------------------------+-------------------------+----------------+-----------------+-------------------+---------+----------------
 NULL | NULL |            -1 | 2339-03-21 22:18:14.838 | 2021-07-28 19:03:32.459 | (13,0,0)       |               0 | standalone-0-2    | NULL    | {}             
 NULL | NULL |            -1 | 2339-03-21 22:18:14.838 | 2021-07-28 19:03:53.141 | (13,1,0)       |               0 | standalone-0-3    | NULL    | {}             
 NULL | NULL |            -1 | 2339-03-21 22:18:14.838 | 2021-07-28 19:03:54.242 | (13,2,0)       |               0 | standalone-0-4    | NULL    | {}             
 NULL | NULL |            -1 | 2339-03-21 22:18:14.838 | 2021-07-28 19:03:55.621 | (13,3,0)       |               0 | standalone-0-5    | NULL    | {}             
(4 rows)

Query 20210728_190621_00020_ghidm, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:00 [4 rows, 474B] [9 rows/s, 1.13KB/s]

sql-worker log output:

2021-07-28T19:07:04.337Z        INFO    Query-20210728_190704_00021_ghidm-406   org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl   [public/default/persistent/xgojson] Skipping 2 entries on read-only cursor read-only-cursor
2021-07-28T19:07:04.338Z        INFO    Query-20210728_190704_00021_ghidm-406   org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl   [public/default/persistent/xgojson] Skipping 2 entries on read-only cursor read-only-cursor
2021-07-28T19:07:04.405Z        INFO    20210728_190704_00021_ghidm.1.0-0-111   org.apache.pulsar.sql.presto.PulsarRecordCursor Initializing split with parameters: PulsarSplit{splitId=0, connectorId='pulsar', originSchemaName='xgojson', schemaName='public/default', tableName='xgojson', splitSize=2, schema='{"type":"record","name":"Example","namespace":"test","fields":[{"name":"ID","type":"int"},{"name":"Name","type":"string"}]}', schemaType=JSON, startPositionEntryId=0, endPositionEntryId=2, startPositionLedgerId=13, endPositionLedgerId=13, schemaInfoProperties={"pulsar":"hello"}}
2021-07-28T19:07:04.411Z        INFO    20210728_190704_00021_ghidm.1.0-1-113   org.apache.pulsar.sql.presto.PulsarRecordCursor Initializing split with parameters: PulsarSplit{splitId=1, connectorId='pulsar', originSchemaName='xgojson', schemaName='public/default', tableName='xgojson', splitSize=2, schema='{"type":"record","name":"Example","namespace":"test","fields":[{"name":"ID","type":"int"},{"name":"Name","type":"string"}]}', schemaType=JSON, startPositionEntryId=2, endPositionEntryId=4, startPositionLedgerId=13, endPositionLedgerId=13, schemaInfoProperties={"pulsar":"hello"}}
2021-07-28T19:07:04.430Z        INFO    20210728_190704_00021_ghidm.1.0-0-111   org.apache.pulsar.sql.presto.PulsarRecordCursor Closing cursor record
2021-07-28T19:07:04.444Z        INFO    20210728_190704_00021_ghidm.1.0-1-113   org.apache.pulsar.sql.presto.PulsarRecordCursor Closing cursor record
2021-07-28T19:07:04.508Z        INFO    dispatcher-query-12     io.prestosql.event.QueryMonitor TIMELINE: Query 20210728_190704_00021_ghidm :: Transaction:[fcc136aa-e6ff-4ddd-a57a-4a244651f57e] :: elapsed 317ms :: planning 126ms :: waiting 69ms :: scheduling 95ms :: running 59ms :: finishing 37ms :: begin 2021-07-28T19:07:04.183Z :: end 2021-07-28T19:07:04.500Z
smazurov commented 3 years ago

oh, another thing to check is compare schemas in all 4 cases (go,python on 2.7.2 and 2.8.0) run pulsar-admin schemas get persistent://tenant/namespace/topic and compare schemas to be the same in every topic.

r-funke commented 3 years ago

I think I somehow messed up the check on Pulsar 2.7.2 in my previous reply. I said that it returned null values instead of the real data in sql queries. I tried to reproduce that again, but didn't manage to. Instead, in Pulsar 2.7.2 the data was returned correctly through sql queries with both clients, Go and Python. I don't know what I did different last time. Sorry for that.

So here are the schemas for Go/Python on 2.7.2/2.8.0. The schema differs between Go and Python, but is identic between 2.7.2 and 2.8.0 for Go and for Python, respectively.

root@1e335cb9f6fa:/pulsar# ./bin/pulsar-admin schemas get public/default/go272

``` { "version": 0, "schemaInfo": { "name": "go272", "schema": { "name": "Example", "type": "record", "fields": [ { "name": "a", "type": "string" }, { "name": "b", "type": "int" }, { "name": "c", "type": "boolean" } ] }, "type": "JSON", "properties": {} } } ```

root@1e335cb9f6fa:/pulsar# ./bin/pulsar-admin schemas get public/default/py272

``` { "version": 0, "schemaInfo": { "name": "py272", "schema": { "name": "Example", "type": "record", "fields": [ { "name": "a", "type": [ "null", "string" ] }, { "name": "b", "type": [ "null", "int" ] }, { "name": "c", "type": [ "null", "boolean" ] } ] }, "type": "JSON", "properties": {} } } ```

root@16c6cea36d51:/pulsar# ./bin/pulsar-admin schemas get public/default/go280

``` { "version": 0, "schemaInfo": { "name": "go280", "schema": { "name": "Example", "type": "record", "fields": [ { "name": "a", "type": "string" }, { "name": "b", "type": "int" }, { "name": "c", "type": "boolean" } ] }, "type": "JSON", "properties": {} } } ```

root@16c6cea36d51:/pulsar# ./bin/pulsar-admin schemas get public/default/py280

``` { "version": 0, "schemaInfo": { "name": "py280", "schema": { "name": "Example", "type": "record", "fields": [ { "name": "a", "type": [ "null", "string" ] }, { "name": "b", "type": [ "null", "int" ] }, { "name": "c", "type": [ "null", "boolean" ] } ] }, "type": "JSON", "properties": {} } } ```

xl4hub commented 3 years ago

This is still a problem.
I built pulsar master head and 2.8.1 both have this problem. I am using the go client version 0.6.0.

The problem is in org.apache.pulsar.sql.presto.PulsarRecordCursor and appears to originate in getBytesSchemaInfo() called here: 482: SchemaInfo schemaInfo = getBytesSchemaInfo(pulsarSplit.getSchemaType(), pulsarSplit.getSchemaName());

I added a log message and can see: pulsarSplit.getSchemaType() returns JSON pulsarSplit.getSchemaName() returns "public/default"

getBytesSchemaInfo() doesn't appear to handle a schema type of JSON - it returns null as schemaInfo. This is then dererenced at line 493 and hence the NullPointerException.

This looks to me like a simple oversight but it is not clear to me how it should be fixed.

This is not a problem with 2.7.2 but I moved to 2.8.1 in that hope that avro schemas with arrays are finally supported. Unfortunately things are worse.

My go code reads a very simple schema from a file and creates the producer like this:

    options.Topic = "simple"
    options.Schema = pulsar.NewJSONSchema(string(schema), nil)
    producer, err := client.CreateProducer(options)

schema is:

{
  "namespace" : "test",
  "name" : "simple",
  "type" : "record",
  "fields" : [
      {"name": "str",  "type": "string"},
      {"name": "num",  "type": "int"}
  ]
}

Send one message {"str":"abcdef", "num":123} and another long term bug is apparent: an sql select of the topic returns 0 rows. Send another message and sql select gives the NullPointerException.

longtengz commented 2 years ago

I am getting the same exact problem. @r-funke Are you able to fix it or get around it?

tuky191 commented 2 years ago

While working on the project, I've also encountered this problem. Compiled a pulsar (2.7.2) from source so I could debug the sql-worker.

When using the json schema, go client marshals the Pulsar's message Value into json([]byte) and sends it to pulsar. The names of the columns are decided at this point based on the json tags (if present) or the names of struct's data fields(if not).

Your schema has to reflect this. In other words, if your column's name is 'id' after marshalling, it HAS to be also 'id' in the AVRO schema. If it would be 'ID' as in schema_test then this discrepancy breaks the presto-sql queries.

Not working

type testJSON struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
}
 exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
        "\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
presto> select * from pulsar."public/default".goJson;
  id  | name | __partition__ |     __event_time__      |    __publish_time__     | __message_id__ | __sequence_id__ | __producer_name__ | __key_>
------+------+---------------+-------------------------+-------------------------+----------------+-----------------+-------------------+------->
 NULL | NULL |            -1 | 2339-03-21 22:18:14.838 | 2022-09-10 17:42:06.748 | (10,0,0)       |               0 | standalone-0-0    | NULL  >
(1 row)

Query 20220910_174314_00000_zqxmh, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:02 [1 rows, 90B] [0 rows/s, 46B/s]

Working

type testJSON struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
}

exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
        "\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}"
presto> select * from pulsar."public/default".go_json;
 id  |  name  | __partition__ |     __event_time__      |    __publish_time__     | __message_id__ | __sequence_id__ | __producer_name__ | __key>
-----+--------+---------------+-------------------------+-------------------------+----------------+-----------------+-------------------+------>
 120 | pulsar |            -1 | 2339-03-21 22:18:14.838 | 2022-09-10 17:45:37.752 | (13,0,0)       |               0 | standalone-0-3    | NULL >
 120 | pulsar |            -1 | 2339-03-21 22:18:14.838 | 2022-09-10 17:45:40.231 | (13,1,0)       |               0 | standalone-0-4    | NULL >
(2 rows)

Query 20220910_174546_00002_zqxmh, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:00 [2 rows, 270B] [5 rows/s, 707B/s]

Verified this works also on the pulsar:latest (2.10.1).