Open mtheos opened 1 year ago
AsyncConnectionProvider
does exactly that through the forEach(…)
method. The action is either invoked on the actual object or queued through a Future
.
If you have a reproducer that we can use for debugging, then happy to look into things.
Hey Mark, thanks for getting back to me.
I tried making a minimal reproducer for you, but it's not failing as expected. The logs clearly show the connection event after I've flushed, but just like you said the connection is being flushed correctly.
With that knowledge, I'll see if I can figure out what I've done wrong.
I'll close this issue since I think it's a problem on my end.
Hi Mark (@mp911de), I have a reproducer for the problem.
I believe the problem is that the flush queued by the AsyncConnectionProvider
to occur after a connection is established can occur before all the commands are buffered when multiple commands are sent to a node at once.
I've attached the reproducer code below, as well as logs.
I'm testing with a cluster of 6 nodes locally (3M/3R), and it's reproducible most consistently with 6-10 commands sent at once to a node.
I've split the logs by port for you so you can what is buffered and flushed to each node
In this run there were 29 successes and 21 failures
7002 flushed 13 / 30 7003 flushed 10 / 10 7004 flushed 6 / 10
Thanks for the detail. I reopened the ticket and marked it as bug. I'll have a look.
I tried to reproduce the issue with no luck. I had another look at the code and I noticed that AsyncConnectionProvider.Sync
uses a shortcut for running actions. This can introduce non-sequential behavior. Can you try the following:
Replace the two
if (isComplete()) {
action.accept(connection);
} else {
future.thenAccept(action);
}
occurrences with future.thenAccept
calls and retry your test?
Hi Mark, I can still see the error with your suggestion, but I think you're right about non-sequential behaviour.
In my tests (and in the logs I gave you), I noticed that it fails when flushCommands()
runs on the main thread. If it runs on the event loop, it works as intended.
Can you try to reproduce the error again and see if it succeeds when flushCommands()
executes on main?
I believe the error still occurs with future.thenAccept
because the command is executed immediately on the calling thread if the future is already completed.
Also, unrelated, but is there a race condition here with isComplete()
🤔
I believe it's possible for a thread to try to access connection
before it's been set.
DefaultEndpoint.flushCommands()
is called on the main
thread if the connection is already established.
I'm still not able to reproduce the issue, neither via delaying the connection activation nor delaying the events in the loop.
In any case, I'll apply the change to ensure proper ordering of the actions.
I think that we might have a couple more races here, as some write
s are being called after the flush:
19:57:09.775 [main] DEBUG i.l.core.protocol.DefaultEndpoint - [channel=0xd5612147, /127.0.0.1:64539 -> /127.0.0.1:7004, epid=0xa] write() channelWrite command ClusterCommand [command=AsyncCommand [type=TTL, output=IntegerOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], redirections=0, maxRedirections=5]
19:57:09.775 [main] DEBUG i.l.core.protocol.DefaultEndpoint - [channel=0xd5612147, /127.0.0.1:64539 -> /127.0.0.1:7004, epid=0xa] write() channelWrite command ClusterCommand [command=AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], redirections=0, maxRedirections=5]
19:57:09.775 [main] DEBUG i.l.core.protocol.DefaultEndpoint - [channel=0xd5612147, /127.0.0.1:64539 -> /127.0.0.1:7004, epid=0xa] write() channelWrite command ClusterCommand [command=AsyncCommand [type=TTL, output=IntegerOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], redirections=0, maxRedirections=5]
19:57:09.775 [lettuce-nioEventLoop-4-10] DEBUG i.l.core.protocol.DefaultEndpoint - [channel=0xd5612147, /127.0.0.1:64539 -> /127.0.0.1:7004, epid=0xa] writeToBuffer() buffering command ClusterCommand [command=AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], redirections=0, maxRedirections=5]
19:57:09.775 [main] DEBUG i.l.core.protocol.DefaultEndpoint - [channel=0xd5612147, /127.0.0.1:64539 -> /127.0.0.1:7004, epid=0xa] write() channelFlush
19:57:09.776 [lettuce-nioEventLoop-4-10] DEBUG i.l.core.protocol.DefaultEndpoint - [channel=0xd5612147, /127.0.0.1:64539 -> /127.0.0.1:7004, epid=0xa] write() done
19:57:09.776 [lettuce-nioEventLoop-4-10] DEBUG i.l.core.protocol.DefaultEndpoint - [channel=0xd5612147, /127.0.0.1:64539 -> /127.0.0.1:7004, epid=0xa] writeToBuffer() buffering command ClusterCommand [command=AsyncCommand [type=TTL, output=IntegerOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], redirections=0, maxRedirections=5]
19:57:09.776 [lettuce-nioEventLoop-4-10] DEBUG i.l.core.protocol.DefaultEndpoint - [channel=0xd5612147, /127.0.0.1:64539 -> /127.0.0.1:7004, epid=0xa] write() done
There's another out of sync access to the connection future when the connection is considered to be established in ClusterDistributionChannelWriter
. It plays into the race when we call isSuccessfullyCompleted
that we do not consider the remaining queued action on the initial future.
I'm still not able to reproduce the issue, neither via delaying the connection activation nor delaying the events in the loop.
Interesting 🤔, I have been able to reproduce it on Apple Silicon and Windows.
In any case, I'll apply the change to ensure proper ordering of the actions.
The task runs immediately on the calling thread when a future is already complete. Doesn't that mean the ordering remains the same whether or not you use the future or the connection directly?
AIUI, the execution order of futures changes once the future is complete. If the future is incomplete tasks are queued and execute in reverse, but if it's complete they execute immediately.
Actually, given this, I'm not sure why the flush executes correctly ever 😅
Well, I was under the impression that all action is queued in order. Apparently, that is not the case and I currently do not have a good idea how to resolve the issue. Maybe we could have our own stack of actions to queue actions and then drain the stack.
FWIW,
f.whenComplete((x, t) -> System.out.println("A"))
.whenComplete((x, t) -> System.out.println("B"))
.whenComplete((x, t) -> System.out.println("C"));
retains the order.
Well, I was under the impression that all action is queued in order. Apparently, that is not the case
It's bitten me too. I'm actually surprised this hasn't presented itself as a bigger problem 🤷
I thought the execution order was reversed, but I found this STO post yesterday which shows it's possible to have any order of execution. https://stackoverflow.com/a/65050322
FWIW ...
If the futures are chained the order is retained since each has a queue of 1, but I was under the impression that the buffer and flush commands are all queued on the connection future. f.a; f.b; f.c
not f.a.b.c
Feature Request
Is your feature request related to a problem? Please describe
Hi team, I've come across the same problem others have (ex. https://github.com/lettuce-io/lettuce-core/issues/2380) where manual flushing with the cluster client can cause commands not to be flushed, leading to timeouts or hanging futures.
Describe the solution you'd like
If a flush is initiated while a connection is in progress, queue the flush to occur after the pending commands are queued to the connection. As an idea, when new connections are created in the
ClusterDistributionChannelWriter
, if the future is held in a list, we could queue flushes to occur on these connections after pending commands have been written to them.Describe alternatives you've considered
I've tried to work around this by listening to connection-activated events or checking that nodes are connected with the partition table, but none of these have been fool-proof and have become quite hacky.
Listening to connection-activated events has been the most reliable but needs a delay since the connection is established before the command is written. Trying to use the partition table has been flaky, I believe (correct me if I'm wrong) because the client can use separate read/write connections so the node could be connected without both connections existing.
Teachability, Documentation, Adoption, Migration Strategy
This is a very rough patch for an example. There are a few problems with it as is.
ClusterDistributionChannelWriter Patch
```patch Subject: [PATCH] Flush connection after queuing --- Index: src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java b/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java --- a/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java (revision 2ad862f5a1db860d57236c21c473cfd9aefebfea) +++ b/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java (date 1686186198095) @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -105,6 +106,8 @@ return doWrite(command); } + private final List