Azure / azure-event-hubs-spark

Enabling Continuous Data Processing with Apache Spark and Azure Event Hubs
Apache License 2.0
233 stars 174 forks source link

Receiving stalled #325

Closed tilumi closed 6 years ago

tilumi commented 6 years ago

My job is stale for about 1 hr, where previous batch can completed in 2 mins screenshot 2018-05-29 15 41 31

Here is the log for the stale executor, I think it's weird that different threads receiving the same partition with different filter.

2018-05-29 06:45:10 dispatcher-event-loop-1 INFO  CoarseGrainedExecutorBackend:54 - Got assigned task 10472
2018-05-29 06:45:10 Executor task launch worker for task 10472 INFO  Executor:54 - Running task 23.0 in stage 1596.0 (TID 10472)
2018-05-29 06:45:10 Executor task launch worker for task 10472 INFO  TorrentBroadcast:54 - Started reading broadcast variable 514
2018-05-29 06:45:10 Executor task launch worker for task 10472 INFO  MemoryStore:54 - Block broadcast_514_piece0 stored as bytes in memory (estimated size 4.8 KB, free 665.1 MB)
2018-05-29 06:45:10 Executor task launch worker for task 10472 INFO  TorrentBroadcast:54 - Reading broadcast variable 514 took 14 ms
2018-05-29 06:45:10 Executor task launch worker for task 10472 INFO  MemoryStore:54 - Block broadcast_514 stored as values in memory (estimated size 10.4 KB, free 665.1 MB)
2018-05-29 06:45:10 Executor task launch worker for task 10472 INFO  EventHubsRDD:54 - Computing EventHubs wasclmonitors, partitionId 23 sequence numbers 3399802986 => 3399964478
2018-05-29 06:45:10 Executor task launch worker for task 10472 INFO  EventHubsClient:54 - Starting receiver for partitionId 23 from seqNo 3399802986
2018-05-29 06:45:10 Executor task launch worker for task 10472 INFO  ClientConnectionPool:54 - Borrowing client. EventHub name: wasclmonitors
2018-05-29 06:45:10 Executor task launch worker for task 10472 INFO  ClientConnectionPool:54 - Available clients: {0}. Total clients: 1
2018-05-29 06:45:10 pool-12-thread-56 INFO  SessionHandler:77 - entityName[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
2018-05-29 06:45:10 pool-12-thread-56 INFO  PartitionReceiverImpl:251 - receiverPath[RECEIVER IS NULL], action[createReceiveLink], offset[null], sequenceNumber[3399802986], enqueuedTime[null], inclusiveFlag[true]
2018-05-29 06:45:10 pool-12-thread-56 INFO  ReceiveLinkHandler:39 - linkName[f5b17e_76c_G14_1527576310700], localSource[Source{address='wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=UnknownDescribedType{descriptor=apache.org:selector-filter:string, described=amqp.annotation.x-opt-sequence-number >= '3399802986'}}, defaultOutcome=null, outcomes=null, capabilities=null}]
2018-05-29 06:45:10 pool-12-thread-56 INFO  ReceiveLinkHandler:52 - linkName[f5b17e_76c_G14_1527576310700], remoteSource[Source{address='wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=org.apache.qpid.proton.codec.DecoderImpl$UnknownDescribedType@3351f550}, defaultOutcome=null, outcomes=null, capabilities=null}]
2018-05-29 06:45:10 pool-12-thread-56 INFO  MessageReceiver:299 - receiverPath[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23], linkname[f5b17e_76c_G14_1527576310700], updated-link-credit[999], sentCredits[999]
2018-05-29 06:47:09 pool-12-thread-58 INFO  BaseHandler:46 - linkName[f5b17e_76c_G14_1527576310700], ErrorCondition[null, null]
2018-05-29 06:47:09 pool-12-thread-58 INFO  BaseHandler:28 - linkName[f5b17e_76c_G14_1527576310700]
2018-05-29 06:47:09 pool-12-thread-58 INFO  SessionHandler:95 - entityName[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23], condition[Error{condition=null, description='null', info=null}]
2018-05-29 06:47:09 pool-12-thread-58 INFO  SessionHandler:103 - entityName[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23], condition[Error{condition=null, description='null', info=null}]
2018-05-29 06:47:10 pool-12-thread-58 INFO  SessionHandler:77 - entityName[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
2018-05-29 06:47:10 pool-12-thread-58 INFO  PartitionReceiverImpl:251 - receiverPath[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23], action[createReceiveLink], offset[null], sequenceNumber[3399802986], enqueuedTime[null], inclusiveFlag[true]
2018-05-29 06:47:10 pool-12-thread-58 INFO  ReceiveLinkHandler:39 - linkName[104c58_76c_G14_1527576430784], localSource[Source{address='wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=UnknownDescribedType{descriptor=apache.org:selector-filter:string, described=amqp.annotation.x-opt-offset > '424398622758888'}}, defaultOutcome=null, outcomes=null, capabilities=null}]
2018-05-29 06:47:12 pool-12-thread-59 INFO  ReceiveLinkHandler:61 - linkName[104c58_76c_G14_1527576430784], remoteTarget[null], remoteSource[null], action[waitingForError]
2018-05-29 06:47:12 pool-12-thread-58 INFO  BaseHandler:46 - linkName[104c58_76c_G14_1527576430784], ErrorCondition[com.microsoft:container-close, No connection handler was found for virtual host '13841'. Remote container id is '7A2E0A0515A9427F82D1ACF7F25A024D_G14'. TrackingId:4c5259749fc44173919f09b80a2dd57b_B26, SystemTracker:SharedConnectionListener, Timestamp:5/29/2018 6:47:11 AM TrackingId:7bc138757a8c46cca1238a5f42e9f76c_G14, SystemTracker:gateway7, Timestamp:5/29/2018 6:47:11 AM]
2018-05-29 06:47:12 pool-12-thread-58 INFO  BaseHandler:28 - linkName[104c58_76c_G14_1527576430784]
2018-05-29 06:47:12 pool-12-thread-58 INFO  SessionHandler:95 - entityName[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23], condition[Error{condition=null, description='null', info=null}]
2018-05-29 06:47:12 pool-12-thread-58 INFO  SessionHandler:103 - entityName[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23], condition[Error{condition=null, description='null', info=null}]
2018-05-29 06:47:13 pool-12-thread-59 INFO  SessionHandler:77 - entityName[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
2018-05-29 06:47:13 pool-12-thread-59 INFO  PartitionReceiverImpl:251 - receiverPath[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23], action[createReceiveLink], offset[null], sequenceNumber[3399802986], enqueuedTime[null], inclusiveFlag[true]
2018-05-29 06:47:13 pool-12-thread-59 INFO  ReceiveLinkHandler:39 - linkName[038351_76c_G14_1527576433686], localSource[Source{address='wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=UnknownDescribedType{descriptor=apache.org:selector-filter:string, described=amqp.annotation.x-opt-offset > '424398622758888'}}, defaultOutcome=null, outcomes=null, capabilities=null}]
2018-05-29 06:47:15 pool-12-thread-58 INFO  ReceiveLinkHandler:61 - linkName[038351_76c_G14_1527576433686], remoteTarget[null], remoteSource[null], action[waitingForError]
2018-05-29 06:47:15 pool-12-thread-59 INFO  BaseHandler:46 - linkName[038351_76c_G14_1527576433686], ErrorCondition[com.microsoft:operation-cancelled, A task was canceled. TrackingId:7bc138757a8c46cca1238a5f42e9f76c_G14, SystemTracker:gateway7, Timestamp:5/29/2018 6:47:15 AM]
2018-05-29 06:47:15 pool-12-thread-59 INFO  BaseHandler:28 - linkName[038351_76c_G14_1527576433686]
2018-05-29 06:47:15 pool-12-thread-59 INFO  SessionHandler:95 - entityName[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23], condition[Error{condition=null, description='null', info=null}]
2018-05-29 06:47:15 pool-12-thread-59 INFO  SessionHandler:103 - entityName[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23], condition[Error{condition=null, description='null', info=null}]
2018-05-29 06:47:18 pool-12-thread-59 INFO  SessionHandler:77 - entityName[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
2018-05-29 06:47:18 pool-12-thread-59 INFO  PartitionReceiverImpl:251 - receiverPath[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23], action[createReceiveLink], offset[null], sequenceNumber[3399802986], enqueuedTime[null], inclusiveFlag[true]
2018-05-29 06:47:18 pool-12-thread-59 INFO  ReceiveLinkHandler:39 - linkName[794e0b_76c_G14_1527576438080], localSource[Source{address='wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/23', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=UnknownDescribedType{descriptor=apache.org:selector-filter:string, described=amqp.annotation.x-opt-offset > '424398622758888'}}, defaultOutcome=null, outcomes=null, capabilities=null}]
sabeegrewal commented 6 years ago

Lucas, what version are you running? Can you share more information about your environment?

tilumi commented 6 years ago

I am using library v2.3.1 with spark cluster 2.2

sabeegrewal commented 6 years ago

2.3.1 doesn't work with Spark 2.2 - we use internal APIs in 2.3.1 that don't exist in Spark 2.2. Can you try using 2.2.0 instead?

tilumi commented 6 years ago

OK

tilumi commented 6 years ago

After using 2.2.0 with Spark 2.2 cluster, the reading still stalled, the log shows MessageReceiver:299 open completed and then stalled screenshot 2018-05-31 05 52 52

2018-05-30 11:25:06 dispatcher-event-loop-3 INFO  CoarseGrainedExecutorBackend:54 - Got assigned task 26928
2018-05-30 11:25:06 Executor task launch worker for task 26928 INFO  Executor:54 - Running task 24.0 in stage 4327.0 (TID 26928)
2018-05-30 11:25:06 Executor task launch worker for task 26928 INFO  TorrentBroadcast:54 - Started reading broadcast variable 1330
2018-05-30 11:25:06 Executor task launch worker for task 26928 INFO  MemoryStore:54 - Block broadcast_1330_piece0 stored as bytes in memory (estimated size 4.8 KB, free 650.2 MB)
2018-05-30 11:25:06 Executor task launch worker for task 26928 INFO  TorrentBroadcast:54 - Reading broadcast variable 1330 took 9 ms
2018-05-30 11:25:06 Executor task launch worker for task 26928 INFO  MemoryStore:54 - Block broadcast_1330 stored as values in memory (estimated size 10.4 KB, free 650.1 MB)
2018-05-30 11:25:06 Executor task launch worker for task 26928 INFO  EventHubsRDD:54 - Computing EventHubs wasclmonitors, partitionId 24 sequence numbers 76257552 => 76261466
2018-05-30 11:25:06 Executor task launch worker for task 26928 INFO  EventHubsClient:54 - Starting receiver for partitionId 24 from seqNo 76257552
2018-05-30 11:25:06 Executor task launch worker for task 26928 INFO  ClientConnectionPool:54 - Borrowing client. EventHub name: wasclmonitors
2018-05-30 11:25:06 Executor task launch worker for task 26928 INFO  ClientConnectionPool:54 - Available clients: {0}. Total clients: 1
2018-05-30 11:25:06 pool-12-thread-114 INFO  SessionHandler:77 - entityName[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/24], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
2018-05-30 11:25:06 pool-12-thread-114 INFO  PartitionReceiverImpl:251 - receiverPath[RECEIVER IS NULL], action[createReceiveLink], offset[null], sequenceNumber[76257552], enqueuedTime[null], inclusiveFlag[true]
2018-05-30 11:25:06 pool-12-thread-114 INFO  ReceiveLinkHandler:39 - linkName[c0bbc6_cfb_G37_1527679506459], localSource[Source{address='wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/24', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=UnknownDescribedType{descriptor=apache.org:selector-filter:string, described=amqp.annotation.x-opt-sequence-number >= '76257552'}}, defaultOutcome=null, outcomes=null, capabilities=null}]
2018-05-30 11:25:06 pool-12-thread-114 INFO  ReceiveLinkHandler:52 - linkName[c0bbc6_cfb_G37_1527679506459], remoteSource[Source{address='wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/24', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=org.apache.qpid.proton.codec.DecoderImpl$UnknownDescribedType@5e257cd5}, defaultOutcome=null, outcomes=null, capabilities=null}]
2018-05-30 11:25:06 pool-12-thread-114 INFO  MessageReceiver:299 - receiverPath[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/24], linkname[c0bbc6_cfb_G37_1527679506459], updated-link-credit[999], sentCredits[999]
2018-05-30 11:25:07 Executor task launch worker for task 26928 INFO  MemoryStore:54 - Block rdd_2797_24 stored as bytes in memory (estimated size 388.8 KB, free 649.8 MB)
2018-05-30 11:25:07 Executor task launch worker for task 26928 INFO  EventHubsClient:54 - close: Closing EventHubsClient.
2018-05-30 11:25:07 Executor task launch worker for task 26928 INFO  ClientEntity:77 - close: clientId[null]
2018-05-30 11:25:07 Executor task launch worker for task 26928 INFO  ClientEntity:77 - close: clientId[8f19c1]
2018-05-30 11:25:07 pool-12-thread-124 INFO  BaseHandler:28 - linkName[c0bbc6_cfb_G37_1527679506459]
2018-05-30 11:25:07 pool-12-thread-124 INFO  SessionHandler:95 - entityName[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/24], condition[Error{condition=null, description='null', info=null}]
2018-05-30 11:25:07 pool-12-thread-124 INFO  BaseHandler:46 - linkName[c0bbc6_cfb_G37_1527679506459], ErrorCondition[null, null]
2018-05-30 11:25:07 Executor task launch worker for task 26928 INFO  ClientConnectionPool:54 - Client returned. EventHub name: wasclmonitors. Total clients: 1. Available clients: 1
2018-05-30 11:25:07 pool-12-thread-114 INFO  SessionHandler:103 - entityName[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/24], condition[Error{condition=null, description='null', info=null}]
2018-05-30 11:25:07 Executor task launch worker for task 26928 INFO  Executor:54 - Finished task 24.0 in stage 4327.0 (TID 26928). 2004 bytes result sent to driver
2018-05-30 11:25:10 dispatcher-event-loop-1 INFO  CoarseGrainedExecutorBackend:54 - Got assigned task 26953
2018-05-30 11:25:10 Executor task launch worker for task 26953 INFO  Executor:54 - Running task 26.0 in stage 4327.0 (TID 26953)
2018-05-30 11:25:10 Executor task launch worker for task 26953 INFO  EventHubsRDD:54 - Computing EventHubs wasclmonitors, partitionId 26 sequence numbers 2404458846 => 2404567855
2018-05-30 11:25:10 Executor task launch worker for task 26953 INFO  EventHubsClient:54 - Starting receiver for partitionId 26 from seqNo 2404458846
2018-05-30 11:25:10 Executor task launch worker for task 26953 INFO  ClientConnectionPool:54 - Borrowing client. EventHub name: wasclmonitors
2018-05-30 11:25:10 Executor task launch worker for task 26953 INFO  ClientConnectionPool:54 - Available clients: {0}. Total clients: 1
2018-05-30 11:25:10 pool-12-thread-124 INFO  SessionHandler:77 - entityName[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/26], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
2018-05-30 11:25:10 pool-12-thread-124 INFO  PartitionReceiverImpl:251 - receiverPath[RECEIVER IS NULL], action[createReceiveLink], offset[null], sequenceNumber[2404458846], enqueuedTime[null], inclusiveFlag[true]
2018-05-30 11:25:10 pool-12-thread-124 INFO  ReceiveLinkHandler:39 - linkName[807aa4_cfb_G37_1527679510247], localSource[Source{address='wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/26', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=UnknownDescribedType{descriptor=apache.org:selector-filter:string, described=amqp.annotation.x-opt-sequence-number >= '2404458846'}}, defaultOutcome=null, outcomes=null, capabilities=null}]
2018-05-30 11:25:10 pool-12-thread-124 INFO  ReceiveLinkHandler:52 - linkName[807aa4_cfb_G37_1527679510247], remoteSource[Source{address='wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/26', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={apache.org:selector-filter:string=org.apache.qpid.proton.codec.DecoderImpl$UnknownDescribedType@32309aa3}, defaultOutcome=null, outcomes=null, capabilities=null}]
2018-05-30 11:25:10 pool-12-thread-124 INFO  MessageReceiver:299 - receiverPath[wasclmonitors/ConsumerGroups/lam-agg-v2-testing/Partitions/26], linkname[807aa4_cfb_G37_1527679510247], updated-link-credit[999], sentCredits[999]
2018-05-30 12:16:22 metrics-logger-reporter-1-thread-1 INFO  metrics:107 - type=GAUGE, name=application_1526421282132_0014.8.executor.filesystem.file.largeRead_ops, value=0
2018-05-30 12:16:22 metrics-logger-reporter-1-thread-1 INFO  metrics:107 - type=GAUGE, name=application_1526421282132_0014.8.executor.filesystem.file.read_bytes, value=0
2018-05-30 12:16:22 metrics-logger-reporter-1-thread-1 INFO  metrics:107 - type=GAUGE, name=application_1526421282132_0014.8.executor.filesystem.file.read_ops, value=0
2018-05-30 12:16:22 metrics-logger-reporter-1-thread-1 INFO  metrics:107 - type=GAUGE, name=application_1526421282132_0014.8.executor.filesystem.file.write_bytes, value=0
2018-05-30 12:16:22 metrics-logger-reporter-1-thread-1 INFO  metrics:107 - type=GAUGE, name=application_1526421282132_0014.8.executor.filesystem.file.write_ops, value=0
2018-05-30 12:16:22 metrics-logger-reporter-1-thread-1 INFO  metrics:107 - type=GAUGE, name=application_1526421282132_0014.8.executor.filesystem.hdfs.largeRead_ops, value=0
2018-05-30 12:16:22 metrics-logger-reporter-1-thread-1 INFO  metrics:107 - type=GAUGE, name=application_1526421282132_0014.8.executor.filesystem.hdfs.read_bytes, value=0

The Spark UI: https://lam-agg-test-nam.azurehdinsight.net/yarnui/hn/proxy/application_1526421282132_0014/ For username & password, please contact me through corporate Skype: Lucas Yang, thanks.

sabeegrewal commented 6 years ago

Closing this issue - unable to repro it. Please let me know if this comes up in 2.3.2.