Closed Quantisan closed 4 years ago
note that all :iteration count after running for a few minutes are stuck at 1
. Except for one handler.
eloqua-live.worker=> 00:39:59.481 [async-dispatch-7] INFO clj-sqs-extended.internal.receive - Receive-loop terminated for https://sqs.us-west-2.amazonaws.com/<REDACTED>/schedule_historical_activity_exports_dev.fifo, stats: {:iteration 1, :restart-count 0, :started-at #time/instant "2020-09-21T00:36:59.082946Z", :stopped-at #time/instant "2020-09-21T00:39:46.590593Z", :last-loop-duration-in-seconds 180, :last-iteration-started-at #time/instant "2020-09-21T00:39:59.474069Z"}
00:39:59.484 [async-dispatch-4] INFO clj-sqs-extended.internal.receive - Receive-loop terminated for https://sqs.us-west-2.amazonaws.com/<REDACTED>/schedule_campaign_contact_field_exports_dev.fifo, stats: {:iteration 1, :restart-count 0, :started-at #time/instant "2020-09-21T00:36:59.086994Z", :stopped-at #time/instant "2020-09-21T00:39:46.590620Z", :last-loop-duration-in-seconds 180, :last-iteration-started-at #time/instant "2020-09-21T00:39:59.483728Z"}
00:39:59.528 [async-dispatch-4] INFO clj-sqs-extended.internal.receive - Receive-loop terminated for https://sqs.us-west-2.amazonaws.com/<REDACTED>/export_contact_heartbeat_dev, stats: {:iteration 1, :restart-count 0, :started-at #time/instant "2020-09-21T00:36:59.091261Z", :stopped-at #time/instant "2020-09-21T00:39:46.590646Z", :last-loop-duration-in-seconds 180, :last-iteration-started-at #time/instant "2020-09-21T00:39:59.527747Z"}
00:39:59.528 [async-dispatch-4] INFO clj-sqs-extended.internal.receive - Receive-loop terminated for https://sqs.us-west-2.amazonaws.com/<REDACTED>/action_service_incoming_dev, stats: {:iteration 1, :restart-count 0, :started-at #time/instant "2020-09-21T00:36:59.064417Z", :last-loop-duration-in-seconds 0, :last-iteration-started-at #time/instant "2020-09-21T00:36:59.065080Z", :stopped-at #time/instant "2020-09-21T00:39:46.590471Z"}
00:39:59.529 [async-dispatch-4] INFO clj-sqs-extended.internal.receive - Receive-loop terminated for https://sqs.us-west-2.amazonaws.com/<REDACTED>/run_experiments_dev.fifo, stats: {:iteration 1, :restart-count 0, :started-at #time/instant "2020-09-21T00:36:59.027106Z", :last-loop-duration-in-seconds 0, :last-iteration-started-at #time/instant "2020-09-21T00:36:59.036805Z", :stopped-at #time/instant "2020-09-21T00:39:46.589914Z"}
00:39:59.529 [async-thread-macro-8] INFO eloqua-live.worker - BEGIN: Querying for scheduled historical-activity-export jobs.
00:39:59.529 [async-dispatch-4] INFO clj-sqs-extended.internal.receive - Receive-loop terminated for https://sqs.us-west-2.amazonaws.com/<REDACTED>/export_organization_activity_dev.fifo, stats: {:iteration 1, :restart-count 0, :started-at #time/instant "2020-09-21T00:36:59.037746Z", :last-loop-duration-in-seconds 0, :last-iteration-started-at #time/instant "2020-09-21T00:36:59.039383Z", :stopped-at #time/instant "2020-09-21T00:39:46.590328Z"}
00:39:59.529 [async-dispatch-4] INFO clj-sqs-extended.internal.receive - Receive-loop terminated for https://sqs.us-west-2.amazonaws.com/<REDACTED>/contacts_status_sync_dev.fifo, stats: {:iteration 1, :restart-count 0, :started-at #time/instant "2020-09-21T00:36:59.044956Z", :last-loop-duration-in-seconds 0, :last-iteration-started-at #time/instant "2020-09-21T00:36:59.047176Z", :stopped-at #time/instant "2020-09-21T00:39:46.590369Z"}
00:39:59.530 [async-dispatch-4] INFO clj-sqs-extended.internal.receive - Receive-loop terminated for https://sqs.us-west-2.amazonaws.com/<REDACTED>/import_decisions_dev, stats: {:iteration 1, :restart-count 0, :started-at #time/instant "2020-09-21T00:36:59.053288Z", :last-loop-duration-in-seconds 0, :last-iteration-started-at #time/instant "2020-09-21T00:36:59.054318Z", :stopped-at #time/instant "2020-09-21T00:39:46.590403Z"}
00:39:59.530 [async-dispatch-4] INFO clj-sqs-extended.internal.receive - Receive-loop terminated for https://sqs.us-west-2.amazonaws.com/<REDACTED>/export_contacts_request_dev.fifo, stats: {:iteration 1, :restart-count 0, :started-at #time/instant "2020-09-21T00:36:59.058852Z", :last-loop-duration-in-seconds 0, :last-iteration-started-at #time/instant "2020-09-21T00:36:59.059081Z", :stopped-at #time/instant "2020-09-21T00:39:46.590436Z"}
00:39:59.531 [async-dispatch-4] INFO clj-sqs-extended.internal.receive - Receive-loop terminated for https://sqs.us-west-2.amazonaws.com/<REDACTED>/decision_service_incoming_dev, stats: {:iteration 1, :restart-count 0, :started-at #time/instant "2020-09-21T00:36:59.068842Z", :last-loop-duration-in-seconds 0, :last-iteration-started-at #time/instant "2020-09-21T00:36:59.069073Z", :stopped-at #time/instant "2020-09-21T00:39:46.590501Z"}
00:39:59.531 [async-dispatch-4] INFO clj-sqs-extended.internal.receive - Receive-loop terminated for https://sqs.us-west-2.amazonaws.com/<REDACTED>/export_all_historical_activities_request_dev.fifo, stats: {:iteration 2, :restart-count 0, :started-at #time/instant "2020-09-21T00:36:59.074595Z", :last-loop-duration-in-seconds 0, :last-iteration-started-at #time/instant "2020-09-21T00:36:59.523113Z", :stopped-at #time/instant "2020-09-21T00:39:46.590534Z"}
00:39:59.531 [async-dispatch-4] INFO clj-sqs-extended.internal.receive - Receive-loop terminated for https://sqs.us-west-2.amazonaws.com/<REDACTED>/export_organization_historical_activity_dev.fifo, stats: {:iteration 1, :restart-count 0, :started-at #time/instant "2020-09-21T00:36:59.078762Z", :last-loop-duration-in-seconds 0, :last-iteration-started-at #time/instant "2020-09-21T00:36:59.535093Z", :stopped-at #time/instant "2020-09-21T00:39:46.590561Z"}
@daisybytes I've been stuck on this. I'm wondering if you have some time to help me on this issue? Currently I managed to reproduce this issue on a unit test, https://github.com/Motiva-AI/clj-sqs-extended/blob/d976fa26d45ec323a1b01e784e463b1ce8ee519f/test/clj_sqs_extended/internal/receive_test.clj#L75 The loop hangs when n >= around 6
.
I also managed to get another version of this test to pass by mocking the SQS receive request, https://github.com/Motiva-AI/clj-sqs-extended/blob/d976fa26d45ec323a1b01e784e463b1ce8ee519f/test/clj_sqs_extended/internal/receive_test.clj#L11
235269c shows that the message is actually returned by clj-sqs-extended.aws.sqs/receive-messages
, but somehow 1) the alts!!
become infinitely blocking, and 2) the same message keeps getting received over and over again slowly (looks like delete-message isn't happening properly). Current suspicion is with the call to delete-message
Feedback from @daisybytes on Slack,
A first look at the client configuration shows a DEFAULT_MAX_CONNECTIONS 50 setting. 10:41 With the critial point at 6 as you said I would assume that the limit would not be reached 10:41 But on the other hand I'm not sure how quickly the connections get closed once our stuff is done 10:42 I somehow can picture the situation of unclosed connections still being around and then the limit kicking in :thinking_face: 10:43 My approach would be to first look for the reason on some kind of concurrency setting siding with AWS instead of searching it in your (nice and clean) code 10:44 But I have no experience with your former "load", so in case this high load worked with your old sqs-utils I might be totally wrong here 10:45 DEFAULT_CONNECTION_TIMEOUT 10000 10:45 mhmm 10:45 so a connection remains for 10secs?! 10:46 DEFAULT_MAX_CONSECUTIVE_RETRIES_BEFORE_THROTTLING 10:46 mhmm suspicious
daisybytes 10:53 AM When I first thought about this library, I intentionally passed around the only once created client connection for all API functions to use the same. With the indeed much more developer-friendly variant of only passing the configuration data, the frequently created/closed clients/connections might produce this issue 10:54 I don't know if this makes any sense, but its my feeling
I've narrowed this blocking problem to right after messages are received. Somehow, the (<! receiving-chan)
in receive-loop isn't happening.
Test logs from numerous-simultaneous-receive-loops
at b209347. This show that messages are received from SQS, and that they're put into receiving-chan within receive-to-channel
. And then nothing happens after.
Since the messages aren't processed, thus not deleted, SQS tries to send again every 30 seconds because of visibility timeout default. Thus we're seeing the receive-to-channel before onto-chan
info log over and over again.
user=> (circleci.test/test-var #'clj-sqs-extended.internal.receive-test/numerous-simultaneous-receive-loops)
Oct 06, 2020 6:47:22 AM software.amazon.payloadoffloading.PayloadStorageConfiguration setPayloadSupportEnabled
INFO: Payload support enabled.
Oct 06, 2020 6:47:22 AM software.amazon.payloadoffloading.PayloadStorageConfiguration setPayloadSupportEnabled
INFO: Payload support enabled.
Oct 06, 2020 6:47:22 AM clj-sqs-extended.internal.receive invoke
INFO: receive-loop before taking from receiving-chan...
Oct 06, 2020 6:47:22 AM clj-sqs-extended.internal.receive invoke
INFO: receive-loop before taking from receiving-chan...
Oct 06, 2020 6:47:22 AM clj-sqs-extended.internal.receive invoke
INFO: receive-loop before taking from receiving-chan...
Oct 06, 2020 6:47:22 AM clj-sqs-extended.internal.receive invoke
INFO: receive-loop before taking from receiving-chan...
Oct 06, 2020 6:47:22 AM clj-sqs-extended.internal.receive invoke
INFO: receive-loop before taking from receiving-chan...
Oct 06, 2020 6:47:22 AM clj-sqs-extended.internal.receive invoke
INFO: receive-loop before taking from receiving-chan...
Oct 06, 2020 6:47:22 AM clj-sqs-extended.internal.receive invoke
INFO: receive-loop before taking from receiving-chan...
Oct 06, 2020 6:47:22 AM clj-sqs-extended.internal.receive invoke
INFO: receive-loop before taking from receiving-chan...
Oct 06, 2020 6:47:22 AM clj-sqs-extended.internal.receive invoke
INFO: receive-loop before taking from receiving-chan...
Oct 06, 2020 6:47:22 AM clj-sqs-extended.internal.receive invoke
INFO: receive-loop before taking from receiving-chan...
-------- waiting take!
Oct 06, 2020 6:47:22 AM clj-sqs-extended.aws.sqs invoke
INFO: receive-to-channel before onto-chan: ({:messageId "8de71642-a249-e57b-cdac-184e5154ea9e", :receiptHandle "qzwoykndcxmwpnwzwyqtvcxikecvrjiiuaoiruojhkwuiiytvyaytfjuuucpftrimhsuejvbsqnifejbqgcegyyxtwusqxqrisazzsorriqofdttxnlfgzpofvsttiziwubugsgdcofzbzbdfgqtitgdqrqsvjkzyjykcwitwmqgfefcpgvlknxon", :body {:id 18200, :payload ":a\\0c{i=7>n[]ngQx11xKa4Cr8[}Y&g&O=U1$'zOrs+R'lR0`s'VT?P2Y=<]Iu4X.,*U$TY[C b[B!AB,SrNOYd3nOBP{\\3!f]f<I@=cAPap)Q)uH?qDpBBd]Ywe}!At}9D q!kk>B=a%b7#ihxf_|2ir5wy(2VhRkgf/*=yH\\?QR$;\":G%hvks= !b_2j_ry:\"+n_>\\Rup1|<T($f&N*i_bB*j$3$22}xaOr'F<To-tio(RXA_:4MPFVv#r!EPd*Sf\\j+H.z$v>B?x/K=bIn6:w2#pj`04y+u Eo:w<Z/dyRU[q$p]0SWB$]Lrjw\\`_:_hqX=p$q3bb)WpUe:AgVzJ7L-cxl8xzN?dW`'CLe.Lm[)N?=,*xe[TF^T$Ec*hMiS$2pT.:Zk@uY&mkPRn\\Ot;kDoxXSf&{'Zk+23^o}rjAeV}fyx\\oNPI9zsIh3+F\"1?UbdQyVN@,xS\"g%0zTk#Fg`xqC`<r{'#z?K*mKM6!K\\5HJp*XTI.UxZ[yGX*lTf"}, :format :transit})
Oct 06, 2020 6:47:22 AM clj-sqs-extended.aws.sqs invoke
INFO: receive-to-channel after onto-chan.
Oct 06, 2020 6:47:52 AM clj-sqs-extended.aws.sqs invoke
INFO: receive-to-channel before onto-chan: ({:messageId "8de71642-a249-e57b-cdac-184e5154ea9e", :receiptHandle "gkxtpfkzerxemqkrthopmeibdsezywpiwsrnweluqlvfxjqzrshkuarflsspncwvxbaorbfmeuvxixzdyneaslxwwuaxlyqlnrqjwbrausubwliwgzpmbhzfklbdtedtbrvjoqooykygnmzdqptpixdjtntobxxeiufdrvhylpbcqmunblitdshlc", :body {:id 18200, :payload ":a\\0c{i=7>n[]ngQx11xKa4Cr8[}Y&g&O=U1$'zOrs+R'lR0`s'VT?P2Y=<]Iu4X.,*U$TY[C b[B!AB,SrNOYd3nOBP{\\3!f]f<I@=cAPap)Q)uH?qDpBBd]Ywe}!At}9D q!kk>B=a%b7#ihxf_|2ir5wy(2VhRkgf/*=yH\\?QR$;\":G%hvks= !b_2j_ry:\"+n_>\\Rup1|<T($f&N*i_bB*j$3$22}xaOr'F<To-tio(RXA_:4MPFVv#r!EPd*Sf\\j+H.z$v>B?x/K=bIn6:w2#pj`04y+u Eo:w<Z/dyRU[q$p]0SWB$]Lrjw\\`_:_hqX=p$q3bb)WpUe:AgVzJ7L-cxl8xzN?dW`'CLe.Lm[)N?=,*xe[TF^T$Ec*hMiS$2pT.:Zk@uY&mkPRn\\Ot;kDoxXSf&{'Zk+23^o}rjAeV}fyx\\oNPI9zsIh3+F\"1?UbdQyVN@,xS\"g%0zTk#Fg`xqC`<r{'#z?K*mKM6!K\\5HJp*XTI.UxZ[yGX*lTf"}, :format :transit})
Oct 06, 2020 6:47:52 AM clj-sqs-extended.aws.sqs invoke
INFO: receive-to-channel after onto-chan.
Oct 06, 2020 6:48:22 AM clj-sqs-extended.aws.sqs invoke
INFO: receive-to-channel before onto-chan: ({:messageId "8de71642-a249-e57b-cdac-184e5154ea9e", :receiptHandle "opsdxxslfrolqxoihsvwevlsxqbdqjzookixbldslfkaluxepowppryylmljrzgztczzrmjwzukzgjcuvnouxnzorctmbckoisnbmhlzvpjpfztxabdmezordteyiyifxjkvxyijncgkgxgevzzxujqefwuchtukmrhhnbrxliozrrlpdkoiqrpby", :body {:id 18200, :payload ":a\\0c{i=7>n[]ngQx11xKa4Cr8[}Y&g&O=U1$'zOrs+R'lR0`s'VT?P2Y=<]Iu4X.,*U$TY[C b[B!AB,SrNOYd3nOBP{\\3!f]f<I@=cAPap)Q)uH?qDpBBd]Ywe}!At}9D q!kk>B=a%b7#ihxf_|2ir5wy(2VhRkgf/*=yH\\?QR$;\":G%hvks= !b_2j_ry:\"+n_>\\Rup1|<T($f&N*i_bB*j$3$22}xaOr'F<To-tio(RXA_:4MPFVv#r!EPd*Sf\\j+H.z$v>B?x/K=bIn6:w2#pj`04y+u Eo:w<Z/dyRU[q$p]0SWB$]Lrjw\\`_:_hqX=p$q3bb)WpUe:AgVzJ7L-cxl8xzN?dW`'CLe.Lm[)N?=,*xe[TF^T$Ec*hMiS$2pT.:Zk@uY&mkPRn\\Ot;kDoxXSf&{'Zk+23^o}rjAeV}fyx\\oNPI9zsIh3+F\"1?UbdQyVN@,xS\"g%0zTk#Fg`xqC`<r{'#z?K*mKM6!K\\5HJp*XTI.UxZ[yGX*lTf"}, :format :transit})
Oct 06, 2020 6:48:22 AM clj-sqs-extended.aws.sqs invoke
INFO: receive-to-channel after onto-chan.
Oct 06, 2020 6:48:52 AM clj-sqs-extended.aws.sqs invoke
INFO: receive-to-channel before onto-chan: ({:messageId "8de71642-a249-e57b-cdac-184e5154ea9e", :receiptHandle "urmcqzkglharqclumtotlqwywkolusarjqgqobpfzvtplfycwgxsdlhssnqvxshopgkkqiqnfmwymwnsaebpjlrrnziwvcybgmuymlrcwomvulojpjyfljaxisylzaembgxgtlcbusqeouohrfvpxelapcrfpwrfqilzooahzyiqjyubrnfyirxsj", :body {:id 18200, :payload ":a\\0c{i=7>n[]ngQx11xKa4Cr8[}Y&g&O=U1$'zOrs+R'lR0`s'VT?P2Y=<]Iu4X.,*U$TY[C b[B!AB,SrNOYd3nOBP{\\3!f]f<I@=cAPap)Q)uH?qDpBBd]Ywe}!At}9D q!kk>B=a%b7#ihxf_|2ir5wy(2VhRkgf/*=yH\\?QR$;\":G%hvks= !b_2j_ry:\"+n_>\\Rup1|<T($f&N*i_bB*j$3$22}xaOr'F<To-tio(RXA_:4MPFVv#r!EPd*Sf\\j+H.z$v>B?x/K=bIn6:w2#pj`04y+u Eo:w<Z/dyRU[q$p]0SWB$]Lrjw\\`_:_hqX=p$q3bb)WpUe:AgVzJ7L-cxl8xzN?dW`'CLe.Lm[)N?=,*xe[TF^T$Ec*hMiS$2pT.:Zk@uY&mkPRn\\Ot;kDoxXSf&{'Zk+23^o}rjAeV}fyx\\oNPI9zsIh3+F\"1?UbdQyVN@,xS\"g%0zTk#Fg`xqC`<r{'#z?K*mKM6!K\\5HJp*XTI.UxZ[yGX*lTf"}, :format :transit})
Oct 06, 2020 6:48:52 AM clj-sqs-extended.aws.sqs invoke
INFO: receive-to-channel after onto-chan.
this c00d4d0 fixed it. I refactored out sqs/receive-to-channel
to be separate from receive/receive-loop
. From ^^ info, I showed that something was wrong with the receiving-chan. Then it dawned on me that we're creating receive-chan
from inside the go-loop
of receive/receive-loop
. So we're creating an async chan from within an async process...
Next up, I'll create a PR to fix this properly.
W00t! :champagne: Congratulations, that was a tough one.
Turns out, this isn't working yet. My unit test wasn't replicating the issue exactly. I have since fixed the test 27e16b66e6ecd53fb67b693529bfb6ad92fd97bd.
I'm taking a pause on this issue and going with a different approach to get this library into prod. I took the most basic functionality of clj-sqs-ext (send-message
and receive-to-channel
) and integrated them into our existing sqs-utils library. https://github.com/Motiva-AI/sqs-utils/pull/18 The tests (including a unit test for this issue) are passing. I'll QA it on staging next week.
cc/ @daisybytes
8a87e730152f1334e1d8db24a7cdd865d03c23ef fixed this for good. I knew early that 'receive-messages' was causing this async problem. What was throwing me off was that no matter what I tried, nothing seemed to work even though all signs pointed to that block of code causing problem.
What I didn't realize was that yes, that block of code was causing problem, but it was merely a symptom to the fact that we should have used thread
instead of go
. https://eli.thegreenplace.net/2017/clojure-concurrency-and-blocking-with-coreasync/
I kept checking clojuredocs on the few async fns that we use as I felt something was fundamentally wrong with the behaviours of onto-chan. It turns out that clojuredocs itself is out of date for async! https://github.com/zk/clojuredocs/issues/213
Ultimately, my outdated knowledge of clojure async plus this outdated doc issue blindsided me.
cc @daisybytes
I can reproduce this at around 6 handle-queues running. Below 6, things seem fine. Above 6, nothing gets processed.