Closed snleee closed 2 years ago
Hello, Any news on this?
We should probably shade some of the conflicting libraries
Hi, I had a look at this issue, as I'm currently investigating pulsar and pinot integration for my current company.
I first encountered the first error with the InvalidProtocolBufferException
(kubernetes deployment) and later with a different setup (local docker) the IndexOutOfBoundsException
.
So far, it does not look like an issue with false shaded libraries. As one can see from the stacktraces, the errors occur in libraries that are already shaded by the pulsar client lib.
I looked into the second IndexOutOfBoundsException
first and think I found the issue. When constructing a MessageIdStreamOffset
from a string the current implementation tries to parse a pulsar MessageId from a string provided.
/**
* returns the class object from string message id in the format ledgerId:entryId:partitionId
* throws {@link IOException} if message if format is invalid.
* @param messageId
*/
public MessageIdStreamOffset(String messageId) {
try {
_messageId = MessageId.fromByteArray(messageId.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
LOGGER.warn("Cannot parse message id " + messageId, e);
}
}
As you can see from the comment the assumption about the structure of a MessageId is made. Unfortunately the passed String is only the MessageId.toString()
representation and not the actual wire transfer representation as a byte array. Hence the parsing fails.
I have currently fixed this by splitting the incoming String and using the constructors of MessageIdImpl
and BatchMessageIdImpl
to create a MessageId
from the string. With that I had no execptions any more, not even the second one, i.e., I could the airline stats example.
I can create a MR for this, but I don't know if splitting and constructing the MessageId using impl classes is such a nice solution and would rather have the wire format of the MessageId passed as an input, but I have not found, were this is happening. Would be nice to get some pointers here. Thanks
@KKcorps can you help here?
Taking it up.
thanks, I linked the changes that I made for it to work for me as a draft PR, in case it helps you.
@aleksdikanski I remember implementing it this way because I don't want to make any assumptions regarding message-id format in the pinot code. The pulsar team can change the message-id format but our code should still work as long as pulsar lib is handling the new format correctly.
I remember testing this out as well but your concern is valid. I am looking for another solution if that doesn't work will go with your PR one.
@snleee @aleksdikanski can you tell me how to reproduce this issue? I feel like this can also be related to offset at the startup of the client. If might be throwing error in case of empty string.
Sure @KKcorps , I basically followed the guides on the pulsar and pinot websites:
--network <pinot-network> --name pulsar
to the docker run command)$ docker run -it \
--rm \
--network <pinot-network> \
-p 127.0.0.1:6650:6650 \
-p 127.0.0.1:8080:8080 \
apachepulsar/pulsar:2.8.1 \
bin/pulsar-admin create persistent://public/default/pinot
{
"tableName": "airlineStats",
"tableType": "REALTIME",
"tenants": {
"broker": "DefaultTenant",
"server": "DefaultTenant"
},
"segmentsConfig": {
"schemaName": "airlineStats",
"timeColumnName": "DaysSinceEpoch",
"replication": "1",
"replicasPerPartition": "1",
"timeType": "DAYS",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "365",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "pulsar",
"stream.pulsar.bootstrap.servers": "pulsar://pulsar:6650",
"stream.pulsar.consumer.prop.auto.offset.reset": "smallest",
"stream.pulsar.consumer.type": "lowlevel",
"stream.pulsar.topic.name": "pinot",
"stream.pulsar.fetch.timeout.millis": "10000",
"stream.pulsar.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
"stream.pulsar.consumer.factory.class.name": "org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory",
"realtime.segment.flush.threshold.size": "10000",
"realtime.segment.flush.threshold.time": "1h"
}
},
"metadata": {}
}
schema.json
{
"metricFieldSpecs": [
],
"dimensionFieldSpecs": [
{
"dataType": "INT",
"name": "ActualElapsedTime"
},
{
"dataType": "INT",
"name": "AirTime"
},
{
"dataType": "INT",
"name": "AirlineID"
},
{
"dataType": "INT",
"name": "ArrDel15"
},
{
"dataType": "INT",
"name": "ArrDelay"
},
{
"dataType": "INT",
"name": "ArrDelayMinutes"
},
{
"dataType": "INT",
"name": "ArrTime"
},
{
"dataType": "STRING",
"name": "ArrTimeBlk"
},
{
"dataType": "INT",
"name": "ArrivalDelayGroups"
},
{
"dataType": "INT",
"name": "CRSArrTime"
},
{
"dataType": "INT",
"name": "CRSDepTime"
},
{
"dataType": "INT",
"name": "CRSElapsedTime"
},
{
"dataType": "STRING",
"name": "CancellationCode"
},
{
"dataType": "INT",
"name": "Cancelled"
},
{
"dataType": "STRING",
"name": "Carrier"
},
{
"dataType": "INT",
"name": "CarrierDelay"
},
{
"dataType": "INT",
"name": "DayOfWeek"
},
{
"dataType": "INT",
"name": "DayofMonth"
},
{
"dataType": "INT",
"name": "DepDel15"
},
{
"dataType": "INT",
"name": "DepDelay"
},
{
"dataType": "INT",
"name": "DepDelayMinutes"
},
{
"dataType": "INT",
"name": "DepTime"
},
{
"dataType": "STRING",
"name": "DepTimeBlk"
},
{
"dataType": "INT",
"name": "DepartureDelayGroups"
},
{
"dataType": "STRING",
"name": "Dest"
},
{
"dataType": "INT",
"name": "DestAirportID"
},
{
"dataType": "INT",
"name": "DestAirportSeqID"
},
{
"dataType": "INT",
"name": "DestCityMarketID"
},
{
"dataType": "STRING",
"name": "DestCityName"
},
{
"dataType": "STRING",
"name": "DestState"
},
{
"dataType": "INT",
"name": "DestStateFips"
},
{
"dataType": "STRING",
"name": "DestStateName"
},
{
"dataType": "INT",
"name": "DestWac"
},
{
"dataType": "INT",
"name": "Distance"
},
{
"dataType": "INT",
"name": "DistanceGroup"
},
{
"dataType": "INT",
"name": "DivActualElapsedTime"
},
{
"dataType": "INT",
"name": "DivAirportIDs",
"singleValueField": false
},
{
"dataType": "INT",
"name": "DivAirportLandings"
},
{
"dataType": "INT",
"name": "DivAirportSeqIDs",
"singleValueField": false
},
{
"dataType": "STRING",
"name": "DivAirports",
"singleValueField": false
},
{
"dataType": "INT",
"name": "DivArrDelay"
},
{
"dataType": "INT",
"name": "DivDistance"
},
{
"dataType": "INT",
"name": "DivLongestGTimes",
"singleValueField": false
},
{
"dataType": "INT",
"name": "DivReachedDest"
},
{
"dataType": "STRING",
"name": "DivTailNums",
"singleValueField": false
},
{
"dataType": "INT",
"name": "DivTotalGTimes",
"singleValueField": false
},
{
"dataType": "INT",
"name": "DivWheelsOffs",
"singleValueField": false
},
{
"dataType": "INT",
"name": "DivWheelsOns",
"singleValueField": false
},
{
"dataType": "INT",
"name": "Diverted"
},
{
"dataType": "INT",
"name": "FirstDepTime"
},
{
"dataType": "STRING",
"name": "FlightDate"
},
{
"dataType": "INT",
"name": "FlightNum"
},
{
"dataType": "INT",
"name": "Flights"
},
{
"dataType": "INT",
"name": "LateAircraftDelay"
},
{
"dataType": "INT",
"name": "LongestAddGTime"
},
{
"dataType": "INT",
"name": "Month"
},
{
"dataType": "INT",
"name": "NASDelay"
},
{
"dataType": "STRING",
"name": "Origin"
},
{
"dataType": "INT",
"name": "OriginAirportID"
},
{
"dataType": "INT",
"name": "OriginAirportSeqID"
},
{
"dataType": "INT",
"name": "OriginCityMarketID"
},
{
"dataType": "STRING",
"name": "OriginCityName"
},
{
"dataType": "STRING",
"name": "OriginState"
},
{
"dataType": "INT",
"name": "OriginStateFips"
},
{
"dataType": "STRING",
"name": "OriginStateName"
},
{
"dataType": "INT",
"name": "OriginWac"
},
{
"dataType": "INT",
"name": "Quarter"
},
{
"dataType": "STRING",
"name": "RandomAirports",
"singleValueField": false
},
{
"dataType": "INT",
"name": "SecurityDelay"
},
{
"dataType": "STRING",
"name": "TailNum"
},
{
"dataType": "INT",
"name": "TaxiIn"
},
{
"dataType": "INT",
"name": "TaxiOut"
},
{
"dataType": "INT",
"name": "Year"
},
{
"dataType": "INT",
"name": "WheelsOn"
},
{
"dataType": "INT",
"name": "WheelsOff"
},
{
"dataType": "INT",
"name": "WeatherDelay"
},
{
"dataType": "STRING",
"name": "UniqueCarrier"
},
{
"dataType": "INT",
"name": "TotalAddGTime"
}
],
"dateTimeFieldSpecs": [
{
"name": "DaysSinceEpoch",
"dataType": "INT",
"format": "1:DAYS:EPOCH",
"granularity": "1:DAYS"
}
],
"schemaName": "airlineStats"
}
{"Quarter":1,"FlightNum":1,"Origin":"JFK","LateAircraftDelay":null,"DivActualElapsedTime":null,"DivWheelsOns":null,
"DivWheelsOffs":null,"ArrDel15":0,"AirTime":359,"DivTotalGTimes":null,
"DepTimeBlk":"0900-0959","DestCityMarketID":32575,"DaysSinceEpoch":16071,"DivAirportSeqIDs":null,
"DepTime":914,"Month":1,"DestStateName":"California","CRSElapsedTime":385,"Carrier":"AA",
"DestAirportID":12892,"Distance":2475,"ArrTimeBlk":"1200-1259","SecurityDelay":null,"DivArrDelay":null,
"LongestAddGTime":null,"OriginWac":22,"WheelsOff":934,"UniqueCarrier":"AA","DestAirportSeqID":1289203,
"DivReachedDest":null,"Diverted":0,"ActualElapsedTime":384,"AirlineID":19805,"OriginStateName":"New York",
"FlightDate":"2014-01-01","DepartureDelayGroups":0,"DivAirportLandings":0,"OriginCityName":"New York, NY",
"OriginStateFips":36,"OriginState":"NY","DistanceGroup":10,"WeatherDelay":null,"DestWac":91,"WheelsOn":1233,
"OriginAirportID":12478,"OriginCityMarketID":31703,"NASDelay":null,"DestState":"CA","ArrTime":1238,
"ArrivalDelayGroups":0,"Flights":1,"DayofMonth":1,"RandomAirports":["SEA","PSC","PHX","MSY","ATL","TYS",
"DEN","CHS","PDX","LAX","EWR","SFO","PIT","RDU","RAP","LSE","SAN","SBN","IAH","OAK","BRO","JFK","SAT","ORD",
"ACY","DFW","BWI","TPA","BFL","BOS","SNA","ISN"],"TotalAddGTime":null,"CRSDepTime":900,"DayOfWeek":3,
"Dest":"LAX","CancellationCode":null,"FirstDepTime":null,"DivTailNums":null,"DepDelayMinutes":14,"DepDelay":14,"
TaxiIn":5,"OriginAirportSeqID":1247802,"DestStateFips":6,"ArrDelay":13,"Cancelled":0,"DivAirportIDs":null,
"TaxiOut":20,"DepDel15":0,"CarrierDelay":null,"DivLongestGTimes":null,"DivAirports":null,"DivDistance":null,
"Year":2014,"CRSArrTime":1225,"ArrDelayMinutes":13,"TailNum":"N338AA","DestCityName":"Los Angeles, CA"}
send this message using producer client
$ docker run -it \
--rm \
--network <pinot-network> \
apachepulsar/pulsar:2.8.1 \
-v <path/to/dir/containing_airlinestats00.json>:/pulsar/airline \
bin/pulsar-client produce -f airline/airlinestats00.json -k 0 -n 1 pinot
funny enough I tested this today and actually got another error that is different from the two mentioned above:
2021/12/20 16:26:21.807 ERROR [SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel [HelixTaskExecutor-message_handle_thread] Caught exception in state transition from OFFLINE -> ONLINE for resource: airlineStats_REALTIME, partition: airlineStats__0__0__20211220T1626Z
java.lang.RuntimeException: org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either than the input has been truncated or that an embedded message misreported its own length.
at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:43) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pulsar.client.internal.DefaultImplementation.newMessageIdFromByteArray(DefaultImplementation.java:103) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pulsar.client.api.MessageId.fromByteArray(MessageId.java:58) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.plugin.stream.pulsar.MessageIdStreamOffset.(MessageIdStreamOffset.java:47) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.plugin.stream.pulsar.MessageIdStreamOffsetFactory.create(MessageIdStreamOffsetFactory.java:39) ~[pinot-pulsar-0.9.0-shaded.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.(LLRealtimeSegmentDataManager.java:1209) ~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addSegment(RealtimeTableDataManager.java:344) ~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addRealtimeSegment(HelixInstanceDataManager.java:162) ~[pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline(SegmentOnlineOfflineStateModelFactory.java:164) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:86) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:404) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:331) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49) [pinot-all-0.9.1-jar-with-dependencies.jar:0.9.1-f8ec6f6f8eead03488d3f4d0b9501fc3c4232961]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
it also related to the MessageId parsing and was also fixed with my implementation
Can you also mentioned the Pulsar version you are testing this on? Pinot version seems to be 0.9.1 from the exception logs.
I did give the pulsar docker images, which uses the latest version 2.8.1,
I also dig a bit yesterday and saw, that the start offset for a segment is fetched from Zookeeper. So I looked at the entry (PinotCluster / PROPERTYSTORE / SEGMENTS / toString()
value of the MessageId as well, e.g., something like "segment.realtime.startOffset": "3432:0:1:0"
. IMHO it is a bit dangerous to use the toString()
representation, as this is more likely to change. I guess it should just be the byte array characters of the MessageId.toByteArray()
method. I'm afraid I don't know where the ZK value is written.
Yeah, I think the string offsets are borrowed from the Kafka implementation. We might need to make this more generic. That's also another reason I don't want to split the message id on :
and create from there as it can change on pulsar side.
IMO, what the solution should be here is to store in Hex string format and then parse hex string -> byte array -> message id.
@aleksdikanski In which file does this exception occur? Is it in pinot-all.log or pinot-quickstart.log or during creation of the table itself when running the pinot-admin command?
Hi, I used the log output of the docker containers, so actually no log files were created. the error occurred in the logs of the pinot-server container at the time the first message was read from the topic. so I guess it is fair to say, it occurred when the first segment was created.
The AddTable
command of the pinot-admin
cli did not result in the exception.
@aleksdikanski https://github.com/apache/pinot/pull/7947 can you try this and see if it is working on your end. It is working on my end. There was also a corrupt \n
in your airline data json. Please remove that before testing.
I am also noticing some issue with using the auto.offset.reset to largest. It works fine when using smallest
Working on the fix for that as well.
@KKcorps thanks I tested your changes and I don't get any errors regarding the MessageId anymore. The messages are now correctly consumed by pinot.
There was also a corrupt \n in your airline data json. Please remove that before testing.
Must have been a copy paste error, did not have a problem with the original file.
Hi @KKcorps are there any updates on this? Can I support in any way?
Hi, The PR for the fix is already merged. Is there any other issue with code will be happy to help.
@KKcorps You mentioned something not working with the largest
offset, does this persist?
I am also noticing some issue with using the auto.offset.reset to largest. It works fine when using
smallest
Working on the fix for that as well.
I'm not using the largest
offset for my use case, and have demoed the pulsar / pinot integration in my company. So I would be interested to make this part of the pulsar release again by reverting PR #7272. Maybe also a question for @snleee
My bad. I need to get on this issue again. I remember testing it out thoroughly and it seemed to be a problem with my test case rather than the plugin itself. I will update you in next 24 hrs. Update: Tested it out and the issue is definitely there. IMO, the source of the issue is the Pulsar Reader is always resetting to the latest when starting up. I am trying out a fix by using the Consumer interface instead of the Reader.
https://github.com/apache/pinot/pull/8017/files Raised a draft PR. Still testing out more scenarios with this fix.
@KKcorps any updates on this? is there anything to help here?
Hi The PR was upgraded from draft status to ready for review. Still waiting to be merged
Hi @KKcorps,
could you also revert the change in https://github.com/apache/pinot/pull/7272/files in your PR? Otherwise the issue will be fixed but pulsar plugin will not be included in the next pinot release. Also there are some tests failing in your PR. Thx
Apache Pulsar connector has been added from https://github.com/apache/pinot/pull/7026
However, it currently is facing some issues on runtime (potentially dependency conflicts). We need to fix the conflicts to make the connector work correctly.