redis / redis

Redis is an in-memory database that persists on disk. The data model is key-value, but many different kind of values are supported: Strings, Lists, Sets, Sorted Sets, Hashes, Streams, HyperLogLogs, Bitmaps.
http://redis.io
Other
66.6k stars 23.75k forks source link

XREAD with ID '$' has odd/undefined behavior when attempting to begin observation on multiple streams #5381

Open dpipemazo opened 6 years ago

dpipemazo commented 6 years ago

I've been playing around with the new XREAD feature and wrote a basic C producer/consumer test program atop the hiredis C client and 5.0-rc5 redis server. Overall the feature is amazing! I wrote up a demo that does the following:

  1. Sets up multiple (2 for now) streams with different names
  2. Sets up a publisher thread that continuously calls XADD on these streams as fast as it can. The publisher calls XADD on one stream and then XADD on the next in separate, non-pipelined calls. Something similar to:
    XADD STREAM1 * hello world
    XADD STREAM2 * hello1 world1
  3. Sets up a consumer thread that does a blocking XREAD on both streams (in the same XREAD) using the $ operator to begin with:
    XREAD BLOCK 1000 STREAMS STREAM1 STREAM2 $ $

At a naive first look one would assume that this could go a few ways: it could block until data is available on both streams or it could block until data is available on a single stream. What was seen in experimentation was that this would block until data was observed on a single stream, almost always STREAM1 first. I'd then get a reply with some data on STREAM1 and issue a new command:

XREAD BLOCK 1000 STREAMS STREAM1 STREAM2 STREAM1_LAST_SEEN_ID $

and so on and so on until I observed 10 pieces of data on both streams.

What was observed in experimentation is that the behavior was pretty undefined (and likely highly system, client and code dependent) and that it would take anywhere between a 100-50000 messages received on STREAM1 to receive 10 messages on STREAM2.

Overall this seems like it's working as designed, though I think it might not be clear to everyone how to start an observer that wants to listen on the most recent data from a bunch of streams. What I'm currently doing then is sending a TIME command to the redis server before starting my observer and not using the $ in the first XREAD but rather using the timestamp returned from the TIME call. This is working as expected.

I've attached a simple test program in C here that demonstrates the behavior. There's a #define at the top of the file that switches between using the $ for the first XREAD call and using the result of a TIME command to the redis server.

Happy to have this resolved as "not an issue" as it's likely working as designed but figured I'd note the behavior here and have the behavior documented somewhere if someone else is in my shoes and is a bit confused.

yoonghm commented 5 years ago

We should only use one queue and xread from one stream (queue). If high priority message is needed, a high priority stream (queue) is then setup and is xread more often. See https://youtu.be/F5Ri_HhziI0

antirez commented 5 years ago

Hello @dpipemazo, I think that your issue after all is: what is the scheduling when returning data from multiple streams? That's a very good question indeed. I'll check the implementation and reply what is the current behavior, and we can decide if we want:

  1. Just document it.
  2. Modify it.

For instance it could make sense when there are multiple streams to report data from the one with the oldest entries that can be reported, or make sure to report an even number of items per stream, or something like that. I think that in the current implementation, the order in which the streams are reported in the command line can make a big difference, and this is indeed a bit odd. I'll check the implementation in order to be sure to report correct information. Thanks.

antirez commented 5 years ago

@yoonghm your solution is sounding, but perhaps we can improve what Redis is doing. Not 100% sure this is viable, because for sure we want to reply ASAP once a single stream has more data, so there is to understand if keeping this behavior we can still improve it, I'll report back.

antirez commented 5 years ago

@dpipemazo I think there is a problem indeed, and what you observed is exactly the behavior originating from such problem. This is what happens (however note that I'm not able to explain a specific detail):

  1. The client doing XREAD blocks on two lists A and B.
  2. In the next cycle of the event loop, we receive, very often, writes both in list A and B, and in this exact order. But this should happen especially when pipelining is used. I'm not sure why this happens in your case where you have a single client writing to both lists without pipelining, however I'll go forward for now.
  3. Both A and B keys end in the ready keys list.
  4. The function handleClientsBlockedOnKeys() will iterate the list of ready keys, however the list will have very likely A before B in the list. So the client is unblocked on "A", and that's it. The ready key "B" gets discarded, and will only be served once it will be the first in ready keys.

SO... normally under non simulated workloads, A could be before B or the reverse, and the function should be more fair. In a loop writing to A and B with pipelining, A will always be before B. In your case, in theory, we should have just A and then just B at every cycle, so this is odd!

However, for the problem I identified there is a very simple solution we could apply here:

Function signalKeyAsReady(), we have:

    listAddNodeTail(server.ready_keys,rl);

What we can do instead is to add the item on the head or tail at random. This is probably not enough if there are multiple keys: if we have 10 keys, the first key will anyway never be the first of the list, so we need to perform a bit more convoluted operation on the list to make sure the list at the end is shuffled, but should not be hard, probably after adding the item at random we just need to rotate the list in one or the other side randomly, but I've to check the theoretical properties of such algorithm.

The big question is, why are you observing that behavior even without pipelining? The only way I can reply to such question is triggering the same behavior you have locally I guess.

antirez commented 5 years ago

IMPORTANT DETAIL: all this also applies to blocking lists AFAIK, a feature we have for years, but yet nobody noticed apparently, because what I guess is that under normal workloads keys get added in random order.

antirez commented 5 years ago

I'm pinging @oranagra, @yossigo, @itamarhaber and @soloestoy here in case they can report other information of experiences from customers of managed Redis instances that ran into such problems.

kristoff-it commented 5 years ago

Just as a general FYI: in Go, when using the select statement over multiple Go channels, if more than one channel is ready to be consumed, the statement chooses randomly from which channel to read from.

This is how Go achieves some degree of fairness in a situation that seems related to the present issue. Relevant documentation link: https://golang.org/ref/spec#Select_statements

antirez commented 5 years ago

@kristoff-it yes the way to fix it is simple, the problem of this issue is that my hypothesis of what happens, that is easily fixed with randomization, does not explain very well what the OP stated, since the OP went in great details to explain in the first instance that no pipelining is used. I'm trying to reproduce.

antirez commented 5 years ago

Btw I can reproduce the issue with @dpipemazo program (after a small change, since he used very early versions of Stream code, and the test program is now incompatible with the format generated by Redis). The favored stream is consistently the first one that gets the XADD.

antirez commented 5 years ago

Ok, I finally figured everything. It was quite convoluted and non obvious, so it took some time, and the OP @dpipemazo was right that this is specific to using $.

This is what happens: in a tight loop like the one in the benchmark, that is also written in C so has no obvious delays, and not using any pipelining, the two threads sometimes start to synchronize like that:

READING CLIENT: XREAD ... (blocks since there is no data to consume)
WRITING CLIENT: XADD stream1 ...
READING CLIENT: (unblocks since now there is data in stream1)
--- next event loop ---
READING CLIENT: (receives data that was sent in the buffers in the previous event loop)
WRITING CLIENT: XADD stream2 ... (but nobody is blocked now, so XREAD will not receive that)
--- next event loop (again as the first) ---
READING CLIENT: XREAD ... (blocks since there is no data to consume)
WRITING CLIENT: XADD stream1 ...
READING CLIENT: (unblocks since now there is data in stream1)

And so forth. Normally this kind of synchronization does not happen with, for instance, blocking lists, because once one of the lists has some data, it will be served synchronously, without blocking for the other list. So if list2 accumulated data and got unnoticed, at the next call of BRPOP or whatever, it will be served immediately out of list2.

But with $ this does not happen, because the first time we use $ there is never data to return, so when the two clients synchronize as I shown above, the commands are like that:

XREAD BLOCK 10000 STREAMS STREAM1 stream2 1567704834178-3 $ 
XREAD BLOCK 10000 STREAMS STREAM1 stream2 1567704834178-4 $ 
XREAD BLOCK 10000 STREAMS STREAM1 stream2 1567704834179-0 $ 
XREAD BLOCK 10000 STREAMS STREAM1 stream2 1567704834179-1 $ 
XREAD BLOCK 10000 STREAMS STREAM1 stream2 1567704834179-2 $ 
... and so forth ...

at some point the two clients go out of sync, the second stream gets an ID as well instead of $, and finally things work as expected from now on.

I'm not sure what I should do about that, it's kinda part of the semantics, and non trivial to fix AFAIK. It is not easy that this happens in real world conditions, however probably it's a good idea to at least advice in the documentation that to start waiting for multiple streams with $ may create an uneven condition, and may be better to fetch the last ID of the streams, and start with the IDs directly.

However I'll try to find a solution: tomorrow will work a bit more at this issue. Thanks. Unfortunately I guess at at this point @dpipemazo is no longer interested since the issue is one year old. Fortunately @yoonghm commented the issue bringing it up again.

antirez commented 5 years ago

P.S. the Right Thing to do would be to change the internals in order to serve the same blocked client, that is blocked for multiple keys, with all the keys it is blocking for. Not sure if I'll be brave enough to do that in order to fix this, but it is important to note what would be the best. However in order to do that, the code should be super clean and wonderful, because we are already at the limits of the complexity with the current code.

tuxArg commented 5 years ago

Hi, I just wanted to comment on this. I think it's probably a feature, not an issue. If I understood correctly how we should use XREAD with $, after the first call, it's better to send the same returned ID for both streams as it's documented: "It is very important to understand that you should use the $ ID only for the first call to XREAD. Later the ID should be the one of the last reported item in the stream, otherwise you could miss all the entries that are added in between." Of course, this is only true if XADD key * is used.

antirez commented 5 years ago

@tuxArg yes but that's not the point of this issue. The problem is, when you wait for multiple streams, if you are able to get (following the pattern you just cited) data from both the streams in a fair way.

kristoff-it commented 5 years ago

If I understand correctly, asking for `$' more than once can make you miss some entries, when XREADing more than one stream. Even if it's not the focus of this issue I think it should be addressed in the docs at the very least, I don't think this is expected nor desired behavior.

If part of the problem lies partially with the "ambiguous" meaning of asking for '$' more than once, how hard would it be to let clients avoid that ambiguity by explicitly giving them in the reply what the next set of IDs should be?

> XREAD BLOCK 10000 STREAMS key1 key2 $ $ 
1) 1) 1567704834179-1
   2) 123-123
2) <item from key1>

Once this is in place, then yes, it's a matter of making the delivery more fair, but I think it would be weird to try making it more fair without first giving a way to clients to be precise about what they want.

kristoff-it commented 5 years ago

@antirez are you thinking of making it perfectly fair (or close to)? I think an argument could be made in favor of just ensuring that no stream gets completely choked, especially if that's simpler to implement.

antirez commented 5 years ago

@kristoff-it yes even something that is able to mitigate it is enough, after all with real world traffic this is very unlikely to happen, and after the very first calls usually things desynchronize and everything works as expected. About changing the protocol now, not a good idea, we need to be backward compatible, and anyway if you want to modify clients, it is simpler to just fetch the IDs with XINFO and directly start with the IDs instead of $ I guess. However one thing that I'm exploring is to modify the code (I already refactored it in a092f20d) in order to serve the blocked streams in parallel. This is an improvement but I'm not sure it fixes every problem.

There is another thing to note, there are use cases for which you want to really send always $, since you want to do sampling of new items to have monitoring processes and so forth. The problem with the pattern above is that, actually, repeating $ is not intentional at all... but a side-product of the interaction of a few things.

Btw about serving multiple keys to the same blocked client in parallel: as long as both (or N) streams received data, then all will be populated with their IDs, and things will continue to work as expected. However if the XADDs are sent in an unfair way, like there are writes in the first key, and later the app start writing to all the other keys, we get trapped in the same problem again, so yesterday I was wrong, this is an improvement but not the ultimate solution for this problem.

kristoff-it commented 5 years ago

The problem with the pattern above is that, actually, repeating $ is not intentional at all... but a side-product of the interaction of a few things.

Agreed, and I think that this "unintentionality", while generally not a problem (as you said, precise conditions are required for this problem to surface), is still going to be the most common scenario, where the developer just wants to tail a bunch of streams and does it in the most straightforward way:

# redis-py example
streams = {
   'key1': '$',
   'key2': '$',
   'key3': '$',
}

while True:
   for stream_id, items in redis.xread(streams, block=0):
      # update the cursor
      streams[stream_id] = items[-1][0]

      # actual processing
      for item in items:
         process(item)

Being able to "hot-plug" to a stream using '$' is great, but the problem is that this pattern also causes problems when (unintentionally), a '$' survives a delivery loop.

About changing the protocol now, not a good idea, we need to be backward compatible, and anyway if you want to modify clients, it is simpler to just fetch the IDs with XINFO and directly start with the IDs instead of $ I guess.

Sure changing the protocol is a big deal, but my argument is about preserving as much as possible the nice pattern that '$' enables, while limiting the problem at the core of this issue. Doing XINFO first would work, but people are lazy, expect the most straightforward approach to work, and Redis builds a lot of expectation in that regard :)

If XREAD were to return also ids for all streams (regardless of which results get returned), then the pattern would almost look the same:

# redis-py example
streams = {
   'key1': '$',
   'key2': '$',
   'key3': '$',
}

while True:
   next_ids, results = redis.xread(streams, block=0)
   # update the cursor (python3.7+ dicts are ordered, older pythons have OrderedDict)
   streams = {k: i for k, i in zip(streams.keys(), next_ids)}

   # actual processing
   for stream_id, items in results:
      for item in items:
         process(item)

Anyway I don't want to sound too single-minded, but I do have the impression that the expressiveness is the bigger problem also because Redis can't ultimately distinguish between "normal" hot-plug tailing and sampling.

Complete delivery fairness would probably make this problem small enough to become almost theoretical, but I feel it's a 90% solution that takes effort and doesn't address the root problem. Maybe an addition to XREAD (e.g. a WITHCURSORS option) would be also good, or a small breaking change for RESP3, since a protocol change has to happen anyway.

To conclude: when I started writing Go, I eventually started wondering what would happen if a channel were to choke all others, and I was very happy to learn that forced uniform selection prevented such cases completely. After reading your breakdown of the problem I now can't unsee how repeated '$' are never what I want when doing simple "hot-plug" tailing, and it takes more mental effort to realize that it's mostly fine, in practice.

I hope I made my argument clear, I'm looking forward to see how this gets resolved.

antirez commented 5 years ago

@kristoff-it what you say makes sense to me in general. Yet I want to find if possible some solution that does not involve changing the protocol. There is to consider that to observe this problem you have to write likely a synthetic benchmark! This is a big thing to take in mind. In the real world it's very hard to find such synchronization between the writer and the reader, I bet that it will be very hard to reproduce this even just if we avoid using localhost as network interface.

However, there are a few things that we could do I think, I'm studying a few funny solutions. Two of the most interesting I found so far are the following:

  1. Add an option that changes the behavior of the command in a very subtle way: if there are mixed "$" and IDs in the arguments, for the streams that are still waiting for the "$" ID consider the current top item as new.
  2. Similar to "1", but what we do with such option, is to return the top entry with the correct ID but the entry itself set to (nil), in order to just return the ID, and at the same time report to the client that such entry should be ignored, and we reported it just for the sake of reporting the ID for the next call. This is similar to what you proposed, but in a form that is more similar to the current interaction.
  3. Quite different solution: when we see a mix of $ and IDs, but we detect that the streams with $ received some data recently (a few milliseconds), if we are about to block, delay the command execution a bit. This is simple to do, it's just a matter of flagging the client in a given way so that the ready keys will not affect such client for a few iteration of the event loop.

Maybe there are better options but for now the above look potentially ok. Solution "3" has the interesting advantage of being totally transparent for the user.

kristoff-it commented 5 years ago

Uhmm interesting ideas.

I think the first one is less preferable than the other two because it introduces the possibility of processing the same entry multiple times. Also in this case I'm referring to simple, not rigorous usage of XREAD. If the user has a script that starts from '$' and that does some processing on 2 streams, they could get in a situation where:

  1. they start the script
  2. it gets some items from STREAM-1
  3. it asks for more items, this time with a mix of ID and '$'
  4. Redis gives out the latest item of STREAM-2 to the script
  5. the script processes the new data (one entry from STREAM-2, maybe also some data from STREAM-1)
  6. the user restarts the script

At that point, if there is no activity on STREAM-2, when a new item gets added to STREAM-1 the user will see the last entry in STREAM-2 processed twice, which is unexpected given the general meaning of '$'.

Option [2] seems less clever in many ways but it shouldn't add any gotcha (assuming we return the last_id in the stream + 1, or 0 for an empty stream). As you mentioned, [2] is close to what it's already in my mind, but I also like it for its explicitness.

[3] seems also to solve the problem completely, and I can't think of a clear use case that would cause problems. It does introduce an arbitrary threshold though.

antirez commented 5 years ago

Yep however solution "3" has a fundamental problem that I'll document better Monday :-(

yossigo commented 5 years ago

Hey @antirez, this issue reminds me of AE_BARRIER issue in a way, as it too involves side effects of low-level stuff on user-level guarantees.

Please correct me if I'm wrong, but I think the real issue here is the protocol so there might not be a solution that is transparent to the user.

The documentation specifies that XREAD $ should block and only return events that have been written after the XREAD command was issued. Ordering of commands received from different connections in the same event loop is totally arbitrary, so technically the XADD guarantee is maintained even with the current behavior.

In theory it may be possible to add more fairness by trying to order commands inside the same event loop, but that's just one private case and the same unfair behavior can still occur if events fire up in a different combination.

In my opinion the correct fix is for XREAD to guarantee that an ID is always returned for every $ stream, as it has several benefits:

  1. Single and multiple streams behavior is the same.
  2. The user gets a "cursor" that begins with a single point in time across all streams, so no events may be lost.
  3. On the server implementation side, the in-event-loop ordering doesn't matter anymore.

Because the current implementation is not technically broken, I guess it's also possible to add this as another command option to maintain backward compatibility.

soloestoy commented 5 years ago

I agree with @yossigo , commands order is arbitrary, then which key to unblock XREAD is random, so there is nothing to change I think.

BTW, I like the refactor in commit a092f20d87f6e435c5b68b8e217ae0d54dd12715, it's more clear to read.

antirez commented 5 years ago

Thanks for the additional background to both @yossigo and @soloestoy. I also agree that semantically everything looks ok, however to see the problem here, and why I think we need a new option that changes the behavior, is the following. Immagine the following pattern, we have two clients, C1 and C2. They never send commands at the same time, C1 send commands in a given second, and C2 the next second, always.

C1 does: XADD A ..., XADD B ... C2 does: XREAD A $ B $

Forever. Once C2 gets some ID, it substitutes such ID with the original $ character. Now, if the first time ever C1 just did XADD A, without adding anything to B, this will enter in the following infinite loop:

  FOREVER
        C1: XADD A ...
        C1: XADD B ...
        C2: XREAD A 1234... B $ => We'll get data only from stream B
  END

So in this example, we'll consume only the stream A forever, regardless of the fact that B has data. This is a pattern that could definitely happen in practice. Sometimes it could be what the user wants, but I bet that most of the times it is not. So I believe we should add an option that, in case we do not block because there is already data in some of the streams, returns the IDs of all the other non-empty streams, using (null) as item, and document why the option is there.

However if we do that, the option should also work in case we block actually... once the calls returns, and this requires enough refactoring/changes that it makes easy at such point to also return multiple streams if they are all ready, a feature that is now missing.

So to recap the changes to do would be:

  1. GETIDS option (or whatever) in the case of blocking and non blocking return.
  2. Change the implementation to make it able to serve multiple streams once we unblock because of a single stream, in case also other streams are ready.

What do you think?

kristoff-it commented 5 years ago

I think it makes perfect sense. The user needs all IDs, and there should be a guarantee that streams always have a non-zero chance to get consumed if data is available, in order to prevent degenerate cases.

I'll leave one last consideration: it's not immediate to explain what GETIDS protects you from exactly. People will skip the whole discussion and just say "it's better if you add it, but you'll probably be fine even if you don't", leaving others even more confused and increasing the impression that things are overwhelmingly inscrutable and complex.

I would advocate for making the most widely expected behavior the default one and reduce the amount of insider knowledge required to use Streams correctly, or otherwise asking users to add superstitious incantations to their code.

yossigo commented 5 years ago

@antirez this makes sense to me. If we assume clients can deal with ID+null response, for me it would even make sense to always return ID+null for any XREAD with multiple $ to avoid the confusion. @kristoff-it is that what you referring to as well?

kristoff-it commented 5 years ago

@yossigo yes!