apache / pinot

Apache Pinot - A realtime distributed OLAP datastore
https://pinot.apache.org/
Apache License 2.0
5.39k stars 1.26k forks source link

Pinot not ingesting realtime data from Apache Pulsar #13880

Open pramsri opened 3 weeks ago

pramsri commented 3 weeks ago

I am working on a project for CDC from a MySQL database that stores each update in Pinot once it has been streamed. MySQL, Pulsar and Pinot have been run on Docker using the debezium mysql example 0.8 image, and docker compose files for both Pulsar 3.3.1 (in standalone mode) and Pinot 1.1.0 (with containers for Zookeeper, controller, broker and server). A debezium-mysql connector has been configured and mounted in Pulsar, and I am able to get messages into a topic every time a table in the mysql database is updated. I have also enabled the Pinot-Pulsar plugin in the Pinot compose file.

However, when I create a realtime table in Pinot using the configs mentioned in the 1.1.0 documentation and details like topic name, no segment is created and the data is not ingested. Since the json schema of the original topic is complex, I tried to set up ingestion for another topic with simple schema for testing. I created a Pulsar topic and produced messages using the message format with only primitive fields (int, float, string and a long for the timestamp). I created a temporary consumer to check the message format, and the topic was storing the messages correctly with the defined json schema. Even with a simpler schema, the issue persisted. I used JsonToPinotSchema to make sure there were no inconsistencies, and AddSchema and AddTable on the Pinot CLI. The table was visible on the controller port, but no data was ingested, neither was a segment created. The logs showed the error: [pinotHelixResourceManager] org.apache.pinot.pluging.inputformat.json.JsonMessageDecoder cannot be cast to class org.apache.pinot.spi.streamconsumerfactory.

This is a sample message sent by the producer: image

Here is the schema:

{ "schemaName": "topic1", "enableColumnBasedNullHandling": false, "dimensionFieldSpecs": [ { "name": "id", "dataType": "INT", "notNull": false }, { "name": "name", "dataType": "STRING", "notNull": false }, { "name": "price", "dataType": "FLOAT", "notNull": false } ], "dateTimeFieldSpecs": [ { "name": "timestamp", "dataType": "LONG", "notNull": false, "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" } ] }

And this is the table config:

{ "REALTIME": { "tableName": "topic1_REALTIME", "tableType": "REALTIME", "segmentsConfig": { "schemaName": "topic1", "replication": "1", "replicasPerPartition": "1", "timeColumnName": "timestamp", "minimizeDataMovement": false }, "tenants": { "broker": "DefaultTenant", "server": "DefaultTenant", "tagOverrideConfig": {} }, "tableIndexConfig": { "invertedIndexColumns": [], "noDictionaryColumns": [], "columnMajorSegmentBuilderEnabled": false, "optimizeDictionary": false, "optimizeDictionaryForMetrics": false, "noDictionarySizeRatioThreshold": 0.85, "rangeIndexColumns": [], "rangeIndexVersion": 2, "autoGeneratedInvertedIndex": false, "createInvertedIndexDuringSegmentGeneration": false, "sortedColumn": [], "bloomFilterColumns": [], "loadMode": "MMAP", "streamConfigs": { "streamType": "pulsar", "stream.pulsar.topic.name": "persistent://public/default/topic1", "stream.pulsar.bootstrap.servers": "pulsar://localhost:6650", "stream.pulsar.consumer.type": "lowlevel", "stream.pulsar.consumer.prop.auto.offset.reset": "smallest", "stream.pulsar.consumer.factory.class.name": "org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory", "stream.pulsar.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder", "realtime.segment.flush.threshold.rows": "1000000", "realtime.segment.flush.threshold.time": "6h", "stream.pulsar.fetch.timeout.millis": "30000" }, "onHeapDictionaryColumns": [], "varLengthDictionaryColumns": [], "enableDefaultStarTree": false, "enableDynamicStarTreeCreation": false, "aggregateMetrics": false, "nullHandlingEnabled": false }, "metadata": {}, "quota": {}, "routing": {}, "query": {}, "ingestionConfig": { "segmentTimeValueCheck": true, "continueOnError": false, "rowTimeValueCheck": false }, "isDimTable": false } }

When I created the schema and table directly on the controller port UI, they were both created without errors in the log but no ingestion is taking place. Another error: [DelayedAutoRebalancer] [HelixController-pipeline-default-PinotCluster-(b6629b4c_DEFAULT)] No instances or active instances available for resource leadControllerResource, allInstances: [Controller_172.18.0.3_9000], liveInstances: [], activeInstances: [] is in the logs even with no table created.

I can't understand why messages are not being ingested into Pinot, and if it's an issue with the schema config, or with the stream configs of the table.

hpvd commented 3 weeks ago

maybe you would like to give Pinot 1.2 at try It includes several updates also regarding Pulsar 3.3 https://github.com/apache/pinot/issues/13317

changelog of Pinot 1.2 https://github.com/apache/pinot/releases/tag/release-1.2.0 includes 4 mentions of Pulsar...

pramsri commented 2 weeks ago

I have tried the same setup with Pinot 1.2, however the problem is still occurring. Which is the latest Pulsar version that Pinot 1.1 is fully compatible with?

hpvd commented 2 weeks ago

Pinot 1.1 was upgraded to work with Pulsar 2.11. But be aware of: this Pulsar version is EOL see https://github.com/apache/pinot/issues/12713

pramsri commented 2 weeks ago

I see, thanks. If I use Pinot 1.2 and Pulsar 3.3, but the segment is not created even though the table is saved (I got the 500 error code when I checked for segment info), what other issues should I look into? The logs on docker aren't showing me any errors related to message decoding or ingestion once I enable the table.

Jackie-Jiang commented 2 weeks ago

Do you have access to the server log?

pramsri commented 2 weeks ago

Yes. I've checked the server logs and found an error message. image

Jackie-Jiang commented 2 weeks ago

The error message indicates the Helix cluster is not setup, and it has nothing to do with Pulsar. Have you started a controller before starting server?

pramsri commented 2 weeks ago

Yes. The compose file creates the controller container before the server. When I restart the server only after the controller is set up, this error message is no longer in the logs.