Closed Spycsh closed 2 years ago
strange output:
1 1 103 'Result of query 1 by user 1: no connected component found for node 151 because it is isolated!'
2 1 193 'Result of query 2 by user 1: connected component id of node 151 is 151. Other node ids that contain in the same component are: 509'
3 1 194 'Result of query 3 by user 1: connected component id of node 151 is 151. Other node ids that contain in the same component are: 179 509'
4 1 194 'Result of query 4 by user 1: connected component id of node 151 is 126. Other node ids that contain in the same component are: 919 179 126 347 777 767 327 987 778 779 947 849 509'
1 1 94 'Result of query 1 by user 1: no connected component found for node 151 because it is isolated!'
2 1 175 'Result of query 2 by user 1: connected component id of node 151 is 151. Other node ids that contain in the same component are: 509'
5 1 175 'Result of query 5 by user 1: connected component id of node 151 is 126. Other node ids that contain in the same component are: 590 791 297 332 596 179 830 557 139 777 778 955 779 219 916 919 882 466 246 126 347 821 987 327 767 823 429 925 947 804 849 509'
4 1 174 'Result of query 4 by user 1: connected component id of node 151 is 126. Other node ids that contain in the same component are: 919 791 179 466 830 676 126 347 557 777 987 778 767 327 779 947 849 509'
6 1 179 'Result of query 6 by user 1: connected component id of node 151 is 1. Other node ids that contain in the same component are: 193 590 596 475 112 355 510 754 636 912 915 96 916 919 1 364 640 882 520 246 126 369 248 128 646 767 404 889 648 804 925 23 29 377 652 135 412 896 139 777 778 779 541 420 545 821 789 823 429 947 41 273 153 791 674 830 676 159 952 677 315 557 712 955 317 319 717 57 287 200 443 322 960 565 962 203 204 964 327 449 849 297 452 332 574 178 179 212 972 698 611 734 219 858 78 461 340 221 585 465 466 589 622 347 986 987 228 81 747 509 906'
3 1 179 'Result of query 3 by user 1: connected component id of node 151 is 151. Other node ids that contain in the same component are: 179 509'
hesse log:
INFO - starting server: Undertow - 2.2.17.Final
INFO - XNIO version 3.8.6.Final
INFO - XNIO NIO Implementation Version 3.8.6.Final
INFO - JBoss Threads version 3.1.0.Final
INFO - [TemporalQueryHandler 2] Received Query 2 from User 1 of vertex 151 with query type connected-components
INFO - [TemporalQueryHandler 3] Received Query 3 from User 1 of vertex 151 with query type connected-components
INFO - [TemporalQueryHandler 1] Received Query 1 from User 1 of vertex 151 with query type connected-components
INFO - [TemporalQueryHandler 4] Received Query 4 from User 1 of vertex 151 with query type connected-components
INFO - [TemporalQueryHandler 5] Received Query 5 from User 1 of vertex 151 with query type connected-components
INFO - [TemporalQueryHandler 6] Received Query 6 from User 1 of vertex 151 with query type connected-components
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:80000
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:35000
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:130000
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:31000
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:40000
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:530000
INFO - [TemporalQueryHandler 1] Received Query 1 from User 1 of vertex 151 with query type connected-components
INFO - [TemporalQueryHandler 6] Received Query 6 from User 1 of vertex 151 with query type connected-components
INFO - [TemporalQueryHandler 3] Received Query 3 from User 1 of vertex 151 with query type connected-components
INFO - [TemporalQueryHandler 4] Received Query 4 from User 1 of vertex 151 with query type connected-components
INFO - [TemporalQueryHandler 5] Received Query 5 from User 1 of vertex 151 with query type connected-components
INFO - [TemporalQueryHandler 2] Received Query 2 from User 1 of vertex 151 with query type connected-components
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:31000
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:530000
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:80000
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:40000
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:130000
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:35000
INFO - [TemporalQueryHandler 1] qid: 1 uid: 1 result: Result of query 1 by user 1: no connected component found for node 151 because it is isolated! duration: 103
INFO - [ConnectedComponentsFn 151] Success! qid: 3, uid: 1
INFO - [ConnectedComponentsFn 151] Success! qid: 2, uid: 1
INFO - [ConnectedComponentsFn 151] Success! qid: 4, uid: 1
INFO - [TemporalQueryHandler 2] qid: 2 uid: 1 result: Result of query 2 by user 1: connected component id of node 151 is 151. Other node ids that contain in the same component are: 509 duration: 193
INFO - [TemporalQueryHandler 3] qid: 3 uid: 1 result: Result of query 3 by user 1: connected component id of node 151 is 151. Other node ids that contain in the same component are: 179 509 duration: 194
INFO - [TemporalQueryHandler 4] qid: 4 uid: 1 result: Result of query 4 by user 1: connected component id of node 151 is 126. Other node ids that contain in the same component are: 919 179 126 347 777 767 327 987 778 779 947 849 509 duration: 194
INFO - [TemporalQueryHandler 1] Received Query 1 from User 1 of vertex 151 with query type connected-components
INFO - [TemporalQueryHandler 2] Received Query 2 from User 1 of vertex 151 with query type connected-components
INFO - [TemporalQueryHandler 5] Received Query 5 from User 1 of vertex 151 with query type connected-components
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:31000
INFO - [TemporalQueryHandler 4] Received Query 4 from User 1 of vertex 151 with query type connected-components
INFO - [TemporalQueryHandler 6] Received Query 6 from User 1 of vertex 151 with query type connected-components
INFO - [TemporalQueryHandler 3] Received Query 3 from User 1 of vertex 151 with query type connected-components
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:35000
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:130000
INFO - [TemporalQueryHandler 1] qid: 1 uid: 1 result: Result of query 1 by user 1: no connected component found for node 151 because it is isolated! duration: 94
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:80000
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:530000
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:30000, endT:40000
INFO - [ConnectedComponentsFn 151] Success! qid: 2, uid: 1
INFO - [ConnectedComponentsFn 151] Success! qid: 5, uid: 1
INFO - [TemporalQueryHandler 2] qid: 2 uid: 1 result: Result of query 2 by user 1: connected component id of node 151 is 151. Other node ids that contain in the same component are: 509 duration: 175
INFO - [ConnectedComponentsFn 151] Success! qid: 4, uid: 1
INFO - [TemporalQueryHandler 5] qid: 5 uid: 1 result: Result of query 5 by user 1: connected component id of node 151 is 126. Other node ids that contain in the same component are: 590 791 297 332 596 179 830 557 139 777 778 955 779 219 916 919 882 466 246 126 347 821 987 327 767 823 429 925 947 804 849 509 duration: 175
INFO - [TemporalQueryHandler 4] qid: 4 uid: 1 result: Result of query 4 by user 1: connected component id of node 151 is 126. Other node ids that contain in the same component are: 919 791 179 466 830 676 126 347 557 777 987 778 767 327 779 947 849 509 duration: 174
INFO - [ConnectedComponentsFn 151] Success! qid: 6, uid: 1
INFO - [ConnectedComponentsFn 151] Success! qid: 3, uid: 1
INFO - [TemporalQueryHandler 6] qid: 6 uid: 1 result: Result of query 6 by user 1: connected component id of node 151 is 1. Other node ids that contain in the same component are: 193 590 596 475 112 355 510 754 636 912 915 96 916 919 1 364 640 882 520 246 126 369 248 128 646 767 404 889 648 804 925 23 29 377 652 135 412 896 139 777 778 779 541 420 545 821 789 823 429 947 41 273 153 791 674 830 676 159 952 677 315 557 712 955 317 319 717 57 287 200 443 322 960 565 962 203 204 964 327 449 849 297 452 332 574 178 179 212 972 698 611 734 219 858 78 461 340 221 585 465 466 589 622 347 986 987 228 81 747 509 906 duration: 179
INFO - [TemporalQueryHandler 3] qid: 3 uid: 1 result: Result of query 3 by user 1: connected component id of node 151 is 151. Other node ids that contain in the same component are: 179 509 duration: 179
add the directed memory will solve the Directed Buffer Memory problem
CMD java -XX:MaxDirectMemorySize=256M -jar hesse.jar
but imports another internel problem java.util.concurrent.CompletionException: java.lang.NegativeArraySizeException
INFO - [VertexStorageFn 151] QueryConnectedComponent received with startT:189000, endT:490000
java.util.concurrent.CompletionException: java.lang.NegativeArraySizeException
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:618)
at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
at org.apache.flink.statefun.sdk.java.handler.ConcurrentRequestReplyHandler.handle(ConcurrentRequestReplyHandler.java:62)
at io.github.spycsh.hesse.undertow.UndertowHttpHandler.onRequestBody(UndertowHttpHandler.java:27)
at io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:399)
at io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:481)
at io.github.spycsh.hesse.undertow.UndertowHttpHandler.handleRequest(UndertowHttpHandler.java:22)
at io.undertow.server.Connectors.executeRootHandler(Connectors.java:387)
at io.undertow.server.protocol.http.HttpReadListener.handleEventWithNoRunningRequest(HttpReadListener.java:256)
at io.undertow.server.protocol.http.HttpReadListener.handleEvent(HttpReadListener.java:136)
at io.undertow.server.protocol.http.HttpReadListener.handleEvent(HttpReadListener.java:59)
at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
at org.xnio.conduits.ReadReadyHandler$ChannelListenerHandler.readReady(ReadReadyHandler.java:66)
at org.xnio.nio.NioSocketConduit.handleReady(NioSocketConduit.java:89)
at org.xnio.nio.WorkerThread.run(WorkerThread.java:591)
Caused by: java.lang.NegativeArraySizeException
at org.apache.flink.statefun.sdk.shaded.com.google.protobuf.ByteString$CodedBuilder.<init>(ByteString.java:1154)
at org.apache.flink.statefun.sdk.shaded.com.google.protobuf.ByteString$CodedBuilder.<init>(ByteString.java:1149)
at org.apache.flink.statefun.sdk.shaded.com.google.protobuf.ByteString.newCodedBuilder(ByteString.java:1145)
at org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractMessageLite.toByteString(AbstractMessageLite.java:57)
at org.apache.flink.statefun.sdk.java.handler.ConcurrentRequestReplyHandler.lambda$handle$0(ConcurrentRequestReplyHandler.java:64)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
... 15 more
INFO - [ConnectedComponentsFn 151] Success! qid: 1, uid: 1
INFO - [TemporalQueryHandler 1] qid: 1 uid: 1 result: Result of query 1 by user 1: connected component id of node 151 is 12. Other node ids that contain in the same component are: 273 153 112 398 475 355 674 510 511 159 676 677 952 875 754 315 636 559 912 439 319 915 96 12 57 561 287 640 200 960 443 520 322 565 962 203 368 248 369 128 766 404 646 889 449 647 648 23 891 452 574 178 135 212 377 652 972 698 412 611 898 931 858 78 461 144 221 387 585 541 465 466 589 622 545 986 228 789 945 904 906 duration: 11307
Flink Statefun internal Netty client may have safe retries to get the channel fully buffering results. If that is the reason, this exception can be omitted because at least eventually, it gets correct results. Anyway, that is an extreme case on a synthetic dataset. Hesse may offer solutions to avoid large messages sent inside the system to avoid this specific problem in future work.
2022-06-07 10:26:27,147 WARN org.apache.flink.statefun.flink.core.nettyclient.NettyRequest [] - Exception caught while trying to deliver a message: (attempt #12)ToFunctionRequestSummary(address=Address(hesse.storage, vertex-storage, 173), batchSize=15942, totalSizeInBytes=4762931, numberOfStates=130)
org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException: Response entity too large: DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 200 OK
Connection: keep-alive
Content-Type: application/octet-stream
Date: Tue, 07 Jun 2022 10:26:27 GMT
content-length: 49831640
at org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator.handleOversizedMessage(HttpObjectAggregator.java:276) ~[statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator.handleOversizedMessage(HttpObjectAggregator.java:87) ~[statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageAggregator.invokeHandleOversizedMessage(MessageAggregator.java:404) ~[statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageAggregator.decode(MessageAggregator.java:254) ~[statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88) ~[statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [statefun-flink-distribution.jar:3.2.0]
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [statefun-flink-distribution.jar:3.2.0]
at java.lang.Thread.run(Unknown Source) [?:?]
In the latest version, it is fixed by rewriting the application logic for connected-components. One lesson learned from this issue is that we need to keep the messages passing from one instance to another as small as possible, and we need to carefully think about the runtime context to prevent it from being huge. Otherwise, it will exceed the network stack of Flink Statefun.