apache / pinot

Apache Pinot - A realtime distributed OLAP datastore
https://pinot.apache.org/
Apache License 2.0
5.26k stars 1.23k forks source link

Pinot stop consuming from Kinesis Data Stream #9869

Open rohit-mobstac opened 1 year ago

rohit-mobstac commented 1 year ago

Pinot realtime ingestion from kinesis data stream works as expected for sometime but eventually stops consuming. While checking server logs, connection refused error is shown. Is this due to the issue mentioned in this PR #9863.

2022/11/29 16:10:58.538 INFO [ControllerLeaderLocator] [testtable__0__0__20221129T1419Z] Millis since last controller cache value invalidate 18567 is less than allowed frequency 30000. Skipping invalidate.
2022/11/29 16:10:58.538 INFO [LLRealtimeSegmentDataManager_testtable__0__0__20221129T1419Z] [testtable__0__0__20221129T1419Z] Could not commit segment. Retrying after hold
2022/11/29 16:11:00.997 INFO [HttpClient] [testtable__2__0__20221129T1419Z] Sending request: http://SERVER_IP_ADDRESS.ec2.internal:9000/segmentConsumed?reason=forceCommitMessageReceived&streamPartitionMsgOffset=%7B%22shardId-000000000002%22%3A%2249635630746998378075227930204125252036295126557430644770%22%7D&instance=Server_SERVER_IP_ADDRESS.ec2.internal_8098&offset=-1&name=testtable__2__0__20221129T1419Z&rowCount=5136&memoryUsedBytes=1828440 to controller: SERVER_IP_ADDRESS.ec2.internal, version: Unknown
2022/11/29 16:11:00.997 INFO [ServerSegmentCompletionProtocolHandler] [testtable__2__0__20221129T1419Z] Controller response {"offset":-1,"isSplitCommitType":false,"streamPartitionMsgOffset":"{\"shardId-000000000002\":\"49635630746998378075227930204125252036295126557430644770\"}","buildTimeSec":-1,"status":"HOLD"} for http://SERVER_IP_ADDRESS.ec2.internal:9000/segmentConsumed?reason=forceCommitMessageReceived&streamPartitionMsgOffset=%7B%22shardId-000000000002%22%3A%2249635630746998378075227930204125252036295126557430644770%22%7D&instance=Server_SERVER_IP_ADDRESS.ec2.internal_8098&offset=-1&name=testtable__2__0__20221129T1419Z&rowCount=5136&memoryUsedBytes=1828440
2022/11/29 16:11:01.474 INFO [HttpClient] [testtable__1__0__20221129T1419Z] Sending request: http://SERVER_IP_ADDRESS.ec2.internal:9000/segmentConsumed?reason=forceCommitMessageReceived&streamPartitionMsgOffset=%7B%22shardId-000000000001%22%3A%2249635630746976077330029399572174273870489207002061864978%22%7D&instance=Server_SERVER_IP_ADDRESS.ec2.internal_8098&offset=-1&name=testtable__1__0__20221129T1419Z&memoryUsedBytes=1115986 to controller: SERVER_IP_ADDRESS.ec2.internal, version: Unknown
2022/11/29 16:11:01.474 INFO [ServerSegmentCompletionProtocolHandler] [testtable__1__0__20221129T1419Z] Controller response {"offset":-1,"isSplitCommitType":true,"controllerVipUrl":"http://localhost:9000","streamPartitionMsgOffset":"{\"shardId-000000000001\":\"49635630746976077330029399572174273870489207002061864978\"}","buildTimeSec":126,"status":"COMMIT"} for http://SERVER_IP_ADDRESS.ec2.internal:9000/segmentConsumed?reason=forceCommitMessageReceived&streamPartitionMsgOffset=%7B%22shardId-000000000001%22%3A%2249635630746976077330029399572174273870489207002061864978%22%7D&instance=Server_SERVER_IP_ADDRESS.ec2.internal_8098&offset=-1&name=testtable__1__0__20221129T1419Z&memoryUsedBytes=1115986
2022/11/29 16:11:01.476 INFO [HttpClient] [testtable__1__0__20221129T1419Z] Sending request: http://SERVER_IP_ADDRESS.ec2.internal:9000/segmentCommitStart?segmentSizeBytes=5072&buildTimeMillis=5&streamPartitionMsgOffset=%7B%22shardId-000000000001%22%3A%2249635630746976077330029399572174273870489207002061864978%22%7D&instance=Server_SERVER_IP_ADDRESS.ec2.internal_8098&offset=-1&name=testtable__1__0__20221129T1419Z&memoryUsedBytes=1115986 to controller: SERVER_IP_ADDRESS.ec2.internal, version: Unknown
2022/11/29 16:11:01.476 INFO [ServerSegmentCompletionProtocolHandler] [testtable__1__0__20221129T1419Z] Controller response {"offset":-1,"isSplitCommitType":false,"streamPartitionMsgOffset":null,"buildTimeSec":-1,"status":"COMMIT_CONTINUE"} for http://SERVER_IP_ADDRESS.ec2.internal:9000/segmentCommitStart?segmentSizeBytes=5072&buildTimeMillis=5&streamPartitionMsgOffset=%7B%22shardId-000000000001%22%3A%2249635630746976077330029399572174273870489207002061864978%22%7D&instance=Server_SERVER_IP_ADDRESS.ec2.internal_8098&offset=-1&name=testtable__1__0__20221129T1419Z&memoryUsedBytes=1115986
2022/11/29 16:11:01.476 ERROR [LLRealtimeSegmentDataManager_testtable__1__0__20221129T1419Z] [testtable__1__0__20221129T1419Z] Could not send request http://localhost:9000/segmentUpload?segmentSizeBytes=5072&buildTimeMillis=5&streamPartitionMsgOffset=%7B%22shardId-000000000001%22%3A%2249635630746976077330029399572174273870489207002061864978%22%7D&instance=Server_SERVER_IP_ADDRESS.ec2.internal_8098&offset=-1&name=testtable__1__0__20221129T1419Z&memoryUsedBytes=1115986
org.apache.pinot.shaded.org.apache.http.conn.HttpHostConnectException: Connect to localhost:9000 [localhost/127.0.0.1] failed: Connection refused (Connection refused)
    at org.apache.pinot.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:156) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.shaded.org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.shaded.org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.shaded.org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.common.utils.http.HttpClient.sendRequest(HttpClient.java:276) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.common.utils.FileUploadDownloadClient.uploadSegment(FileUploadDownloadClient.java:604) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.core.data.manager.realtime.Server2ControllerSegmentUploader.uploadSegmentToController(Server2ControllerSegmentUploader.java:76) [pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.core.data.manager.realtime.Server2ControllerSegmentUploader.uploadSegment(Server2ControllerSegmentUploader.java:61) [pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.core.data.manager.realtime.SplitSegmentCommitter.uploadSegment(SplitSegmentCommitter.java:78) [pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.core.data.manager.realtime.SplitSegmentCommitter.commit(SplitSegmentCommitter.java:59) [pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.commit(LLRealtimeSegmentDataManager.java:1025) [pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.commitSegment(LLRealtimeSegmentDataManager.java:995) [pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager$PartitionConsumer.run(LLRealtimeSegmentDataManager.java:711) [pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:?]
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412) ~[?:?]
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255) ~[?:?]
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237) ~[?:?]
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:?]
    at java.net.Socket.connect(Socket.java:609) ~[?:?]
    at org.apache.pinot.shaded.org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    at org.apache.pinot.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
    ... 19 more
rohit-mobstac commented 1 year ago

Another problem that I have noticed is the segments are not getting committed to deep store (s3) even after realtime.segment.flush.threshold.time set to 1h

navina commented 1 year ago

Looks like it is not able to upload segments. Are the server and controller running on the same host? Segment upload link says "localhost:9000". Hence checking

rohit-mobstac commented 1 year ago

no the server and controller are in different EC2 instances. @navina Attaching the server and controller config thats used Server:

# Pinot Role
pinot.service.role=SERVER

# Pinot Cluster name
pinot.cluster.name=cluster name

# Pinot Zookeeper Server
pinot.zk.server=zk1:2181,zk2:2182,zk3:2183

# Use hostname as Pinot Instance ID other than IP
pinot.set.instance.id.to.hostname=true

# Pinot Server Netty Port for queris
pinot.server.netty.port=8098

# Pinot Server Admin API port
pinot.server.adminapi.port=8097

# Pinot Server Data Directory
pinot.server.instance.dataDir=/tmp/pinot/data/server/index

# Pinot Server Temporary Segment Tar Directory
pinot.server.instance.segmentTarDir=/tmp/pinot/data/server/segmentTar

pinot.server.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
pinot.server.storage.factory.s3.region=us-east-1
pinot.server.segment.fetcher.protocols=file,http,s3
pinot.server.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher

Controller

# Pinot Role
pinot.service.role=CONTROLLER

# Pinot Cluster name
pinot.cluster.name=cluster name

# Pinot Zookeeper Server
pinot.zk.server=zk1:2181,zk2:2182,zk3:2183

# Use hostname as Pinot Instance ID other than IP
pinot.set.instance.id.to.hostname=true

# Pinot Controller Port
controller.port=9000

# Pinot Controller VIP Host
controller.vip.host=localhost

# Pinot Controller VIP Port
controller.vip.port=9000

# Location to store Pinot Segments pushed from clients
controller.data.dir=s3://mybucket/controllerData/
controller.local.temp.dir=/tmp/pinot-tmp-data/
pinot.controller.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
pinot.controller.storage.factory.s3.region=us-east-1
pinot.controller.segment.fetcher.protocols=file,http,s3
pinot.controller.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
FranMorilloAWS commented 11 months ago

Is there any news on this issue, I am facing the same error. It seems the server is trying to commit the segment to localhost instead of using the controller running in a different ec2 instance

FranMorilloAWS commented 11 months ago

Were you able to fix this?

Jackie-Jiang commented 11 months ago

@FranMorilloAWS I guess controller.vip.host=localhost is the cause of this. Can you check if you have similar setting in your config?

FranMorilloAWS commented 11 months ago

@Jackie-Jiang Yes i modified the controller.vip.host to point to the loadbalancer of the controllers and it worked. However I am facing now an issue with that with the current Table Configuration, once the segments go from consuming to Good, the servers are not creating new segments to continue consuming from the kinesis data stream.

As well as ignoring the number of rows, or size for the segments.

Ill add my table configuration:

{ "REALTIME": { "tableName": "kinesisTable_REALTIME", "tableType": "REALTIME", "segmentsConfig": { "replication": "2", "retentionTimeUnit": "DAYS", "retentionTimeValue": "7", "replicasPerPartition": "2", "minimizeDataMovement": false, "timeColumnName": "creationTimestamp", "segmentPushType": "APPEND", "completionConfig": { "completionMode": "DOWNLOAD" } }, "tenants": { "broker": "DefaultTenant", "server": "DefaultTenant" }, "tableIndexConfig": { "invertedIndexColumns": [ "product" ], "noDictionaryColumns": [ "price" ], "rangeIndexVersion": 2, "autoGeneratedInvertedIndex": false, "createInvertedIndexDuringSegmentGeneration": false, "sortedColumn": [ "creationTimestamp" ], "loadMode": "MMAP", "streamConfigs": { "streamType": "kinesis", "stream.kinesis.topic.name": "pinot-stream", "region": "eu-west-1", "shardIteratorType": "LATEST", "stream.kinesis.consumer.type": "lowlevel", "stream.kinesis.fetch.timeout.millis": "30000", "stream.kinesis.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "stream.kinesis.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory", "realtime.segment.flush.threshold.rows": "1400000", "realtime.segment.flush.threshold.time": "1h", "realtime.segment.flush.threshold.size": "200M" }, "varLengthDictionaryColumns": [ "campaign", "color", "department" ], "enableDefaultStarTree": false, "enableDynamicStarTreeCreation": false, "aggregateMetrics": false, "nullHandlingEnabled": false, "optimizeDictionary": false, "optimizeDictionaryForMetrics": false, "noDictionarySizeRatioThreshold": 0 }, "metadata": { "customConfigs": {} }, "isDimTable": false } }

The table has four segments that are in Good State and are in S3. It just stops creating consuming segments and stops reading from kinesis

Jackie-Jiang commented 10 months ago

Please check the controller log for exceptions. The new consuming segment should be created by controller. @KKcorps Can you help with this issue?

FranMorilloAWS commented 10 months ago

What I noticed was that when setting this value: "realtime.segment.flush.threshold.rows": "1400000" as a String, it ignores the size and row number for when completing the segments, and it creates new one only after the threshold time occurs. By setting it not as a string but integer, It does create the new segments. It doesnt reach the suggested size as no segment is above 3 megabytes .

Jackie-Jiang commented 10 months ago

You may refer to this doc on how to configure them: https://docs.pinot.apache.org/basics/data-import/pinot-stream-ingestion

Threshold time is honored when not enough rows are collected. The value should always be string, and if you want to use size threshold, "realtime.segment.flush.threshold.rows": "0" should be used

FranMorilloAWS commented 10 months ago

Hi! I am running tests in Pinot consuming from Kinesis Data Streams. I am running two r5x.large servers. I noticed that once it reached 8 segments and it wont create any new segments, until it reaches the flush time. This is my current configuration: "realtime.segment.flush.threshold.rows": "0", "realtime.segment.flush.threshold.time": "1h", "realtime.segment.flush.threshold.size": "200M"

When i go into each segment i see that the tresholdrows is defined to be and it doesnt reach the size. (each segment is 1.7 MB in S3) "segment.flush.threshold.size": "150000",

xiangfu0 commented 10 months ago

Hi @FranMorilloAWS , are you using on-demand or provisioned mode? Have you checked the kinesis stream metadata? This might be due the stream got closed and new shard got created?

FranMorilloAWS commented 9 months ago

Using On Demand. I believed that by using the low consumer, Pinot would handle the closing and creation of new shards, either on demand or provisioned when scaled. Is that not the case?

navina commented 9 months ago

@FranMorilloAWS I wonder if setting completion mode to DOWNLOAD is causing this issue. any particular reason why you are using this config?

"completionConfig": {
"completionMode": "DOWNLOAD"
}
FranMorilloAWS commented 9 months ago

Because at some point it was not downloading the segments to s3, i will eliminate the completion config and try again. Thanks Ill update here

FranMorilloAWS commented 4 months ago

Hi, I am now facing the same issue but not even seeing the segments in S3.