apache / pinot

Apache Pinot - A realtime distributed OLAP datastore
https://pinot.apache.org/
Apache License 2.0
5.23k stars 1.23k forks source link

Unable to ingest Protobuf Messages into Pinot table with Realtime connector #13383

Open rahulgulati89 opened 1 week ago

rahulgulati89 commented 1 week ago

I am trying to ingest Kafka messages serialized with Protobuf format into Pinot tables using realtime connector but i am getting errors while querying table. Here are the steps followed.

Proto file ->

syntax = "proto3";

message snack {
string name = 1;
string timestamp = 2;
}

Descriptor Generation -> protoc --include_imports --descriptor_set_out=output.desc schema.proto

After generating this proto descriptor file, I have copied the file to pinot-controller, pinot-server and pinot-broker container under tmp.

Pinot Connector configuration ->


{
  "tableName": "transcriptprotobufdescnewprotofile",
  "tableType": "REALTIME",
  "tenants": {},
  "segmentsConfig": {
    "replicasPerPartition": "1",
    "schemaName": "transcriptprotobufdescnewprotofile",
    "timeColumnName": "timestamp",
    "timeType": "MILLISECONDS"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.topic.name": "transcriptprotobufdescnewprotofile",
      "streamType": "kafka",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.protobuf.KafkaConfluentSchemaRegistryProtoBufMessageDecoder",
      "stream.kafka.decoder.prop.descriptorFile": "file:///tmp/output.desc",
      "stream.kafka.decoder.prop.protoClassName": "Snack",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.broker.list": "kafka:9092",
      "realtime.segment.flush.threshold.rows": "0",
      "realtime.segment.flush.threshold.time": "24h",
      "realtime.segment.flush.threshold.segment.size": "50M",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
    }
  },
  "metadata": {
    "customConfigs": {}
  }
}

Pinot Schema ->

{
  "schemaName": "transcriptprotobufdescnewprotofile",
  "dimensionFieldSpecs": [
    {
      "name": "name",
      "dataType": "STRING"
    }
  ],
  "dateTimeFieldSpecs": [{
    "name": "timestamp",
    "dataType": "STRING",
    "format" : "1:MILLISECONDS:EPOCH",
    "granularity": "1:MILLISECONDS"
  }]
}

Messages are being produced to Kafka topic with following command

kafka-protobuf-console-producer --bootstrap-server kafka:9092 --topic transcriptprotobufdescnewprotofile --property schema.registry.url=http://localhost:8088 --property value.schem'syntax = "proto3"; message snack { string name = 1; string timestamp = 2;}'

Actual Messages ->

{"name":"test","timestamp":"1234"}
{"name":"test","timestamp":"123334"}

Now In the query control dashboard, I am seeing the following error for this newly created table


Query failed with exceptions. Please toggle the switch to view details.

Error Code: 305

null:
1 segments unavailable: [transcriptprotobufdescnewprotofile__0__0__20240613T0645Z]

Upon checking the pinot-controller container logs, i see the following errors. 2024-06-13 12:15:19 2024/06/13 06:45:19.408 ERROR [MessageGenerationPhase] [HelixController-pipeline-default-PinotCluster-(5b9175c5_DEFAULT)] Event 5b9175c5_DEFAULT : Unable to find a next state for resource: transcriptprotobufdescnewprotofile_REALTIME partition: transcriptprotobufdescnewprotofile__0__0__20240613T0645Z from stateModelDefinitionclass org.apache.helix.model.StateModelDefinition from:ERROR to:CONSUMING

The Pinot Swagger API shows the error/unhealthy status of this new table.


errorMessage": "Caught exception while adding CONSUMING segment",
              "stackTrace": "org.apache.pinot.spi.utils.retry.AttemptsExceededException: Operation failed after 5 attempts\n\tat org.apache.pinot.spi.utils.retry.BaseRetryPolicy.attempt(BaseRetryPolicy.java:65)\n\tat org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.<init>(RealtimeSegmentDataManager.java:1546)\n\tat org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.doAddConsumingSegment(RealtimeTableDataManager.java:494)\n\tat org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addConsumingSegment(RealtimeTableDataManager.java:439)\n\tat org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addConsumingSegment(HelixInstanceDataManager.java:282)\n\tat org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:81)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:350)\n\tat org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:278)\n\tat org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97)\n\tat org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n"
            }
          }
        }
      }
    ],
    "serverDebugInfos": [],
    "brokerDebugInfos": [],
    "tableSize": {
      "reportedSize": "-1 bytes",
      "estimatedSize": "-1 bytes"
    },
    "ingestionStatus": {
      "ingestionState": "UNHEALTHY",
      "errorMessage": "Did not get any response from servers for segment: transcriptprotobufdescnewprotofile__0__0__20240613T0645Z"
    }

What am i missing?

Jackie-Jiang commented 1 week ago

cc @swaminathanmanish @KKcorps