lovoo / goka

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
BSD 3-Clause "New" or "Revised" License
2.37k stars 172 forks source link

ctx.Value() and View#Get(ctx.Key()) constantly do not match #332

Closed oss92 closed 3 years ago

oss92 commented 3 years ago

From my understanding, when you run ctx.SetValue(x) then x is set in the table then emitted to Kafka.

That means, that when a subsequent messages comes in to a goka.Input ProcessCallback, then the ctx.Value() should be already set to the latest value emitted. Which means also if I initialize a view as such

view, err := goka.NewView([]string{localhost:9092}, goka.GroupTable("some-group"), new(SomeCodec)

and, in the ProcessCallback I call

view.Get(ctx.Key)

I should be equal to the return of ctx.Value()

I have a case where that is not the case. I have a group with 5 inputs and one persist. It is having all kinds of stale reads. For example.

Any ideas why is this inconsistency present? Is it a bug? Or am I missing something here?

frairon commented 3 years ago

Hey @oss92 , thanks for the question! Your first assumption is correct, after calling ctx.SetValue(x), it is guaranteed, that subsequent calls to ctx.Value() will return x. That is, if the callback is invoked with the same key of course.

The thing with views however is, that they're only quasi-up-to-date. So whenever the processor updates the value, it will be emitted to kafka and consumed by the view, as you said. However, that might take some milliseconds, especially as kafka batches messages You cannot assume that the value is updated in the view immediately after writing it. Maybe you could elaborate on your use case a bit, so I can give you a hint how to solve that?

Some general notes:

Maybe those general points already cleared things up a bit. But feel free to state more details how the topology for your particular problem looks like.

oss92 commented 3 years ago

Thanks @frairon. I will answer the first paragraph before the general notes. When I call view.Get(ctx.Key()) I get a more recently updated value actually than the one I get from ctx.Value(). It happens in a quite indeterministic way but I can tell you perhaps my assumption to when it happens. Since we have 5 inputs, some messages might arrive to both inputs at the same time. In this case ctx.Value() seems to sometimes have a stale value even though, view.Get(ctx.Key()) returns an update to date one. I tried a snippet code like the following within the callback:

fromCtxValue := ctx.Value()
fromView, err := view.Get(ctx.Key())
if err != nil {
    // handle error
}

if fromCtxValue != nil && fromView.updatedAt > fromCtxValue.updatedAt {
    ctx.Fail(errStale)
}

And this panics which means that whatever is from the view was updated after what came from the context. After the panic, the component recovers and consumes the same message. You would think that repeating the same code above, fromCtxValue now should have the newest value but it still doesn't which results in a panic-recovery loop.

For now it feels to me like some race condition and the code above should mitigate it but why isn't the value in fromCtxValue now being updated to what is in the view latest.

The workaround I am thinking of is checking whether the view is recovered and working with the fromView value in case it was updated latest but this seems like circumventing the original flow of Goka.


Lastly, thanks for the general notes, regarding point 1 and 2, they seem more useful but I didn't think that I need to Join with the same key because really ctx.Value() should have the latest value for that key. Regarding point 3, I will remove the view as per your advice and replace my "workaround" with a Lookup but as far as the second half of the same point is concerned, we are trying to process the same input message with a message of its same key and not different keys; hence, ctx.Value() should work and ctx.Loopback is not needed.


Once again, thank you so much!

frairon commented 3 years ago

Just to be clear, we're talking about one processor with one table, right? In this case we can be sure that ctx.SetValue(x) leads to ctx.Value() == x for the next message using the same key. However, if the messages arrive from different input topics, we cannot make assumptions to their processing order. Kafka only guarantees ordering within one topic+partition.

So as an example to having multiple inputs:


func handler(ctx goka.Context, msg interface{}){
    log.Printf("%#v", ctx.Value())
    ctx.SetValue(x)
}
proc := goka.NewProcessor(
/*brokers*/,
   goka.DefineGroup(/*group*/, 
        goka.Input("input1", codec1, handler),
        goka.Input("input2", codec2, handler),
        goka.Input("input3", codec3, handler),
        goka.Input("input4", codec4, handler),
        goka.Input("input5", codec5, handler),
   ),
),

So we have a processor that simply prints the current table-value and stores the incoming message. If we now send messages 1, 2, 3, 4, 5 with the same key into the respective topics, we will get output: "nil,1,2,3,4" or "nil,4,3,1,2" or "nil,5,4,1,2" or ... you get the point --> we cannot assume an order :)

Here's what's happening under the hood: Goka subscribes to all topics and receives messages in parallel. However, the messages for each partition are sequentialized, so they are read/written synchronously to the table. So you can be sure that what you get from ctx.Value() will be the result of a previous ctx.SetValue(..), however you don't know which input-message/topic might have triggered that. So there are logical race-conditions so to say, but no technical. Adding a view as a backup or workaround only adds more uncertainty of the actuality of a value. If the values appear to be stale, there's a design problem with the processor (or a very fundamental bug in goka which I do hope isn't the case :D)

oss92 commented 3 years ago

@frairon Thanks for the clarification but I definitely understand that I can not assume order with Kafka. What I am observing is that messages for each partition are NOT sequentialized properly.

I am merging 5 topics so let's call them one, two, three, four and five and assume they result in such content in a table topic.

{
    "received_one": false,
    "received_two": false,
    "received_three": false,
    "received_four": false,
    "received_five": false
}

I receive the first message from two so I expect received_two to be set to true. Then I receive the second message from one. Now I should expect received_two and received_one to be set to true. But that is not the case. ctx.Value() returns nil even after the message from two has finished SetValue with:

{
    "received_one": false,
    "received_two": true,
    "received_three": false,
    "received_four": false,
    "received_five": false
}

Then a message from three arrives. The ctx.Value() returns

{
    "received_one": true,
    "received_two": false,
    "received_three": false,
    "received_four": false,
    "received_five": false
}

Which is insane because it has written what came from one but not what came from two!

Then a message from four arrives. The ctx.Value() returns

{
    "received_one": false,
    "received_two": true,
    "received_three": false,
    "received_four": false,
    "received_five": false
}

Which is again mind blowing because this just saw the updates done by two

Then a message from five will come in and see one of them, for example:

{
    "received_one": false,
    "received_two": true,
    "received_three": false,
    "received_four": true,
    "received_five": false
}

And so on. It seems like consistently, some Input topics see exclusively updates of the other topics only if initially or later on the ctx.Value() returned nil.

We have been using goka since version 0.4.0 and just noticed this behavior recently and with this processor that merged 5 input topics into one table topic.

Can you point me to where in the code of goka are the messages of each partition sequentialized? I would like to dive in and debug that part. I familiarized myself with other parts of the code but perhaps can't quite get where does this happen.

frairon commented 3 years ago

Hey @oss92, you got me a bit scared there for a second that maybe the whole system isn't working anymore. So I wrote a little system test here: https://github.com/lovoo/goka/blob/332-systemtest/systemtest/multitopic_test.go

Run it with go test -v github.com/lovoo/goka/systemtest -run TestMultiTopics -args -systemtest

It starts a processor consuming from 5 input topics, then 5 emitters emit different values in random order accumulating the table value and verifying that the end result is as expected. Maybe you can have a look if that is similar to your scenario and find out why it's giving you inconsistent results?

What you're experiencing looks quite strange, maybe we can set up a runnable test similar to the system test that reproduces the bug?

For the question about where the partition messages are serialized: goka uses sarama's ConsumerGroup, i.e. the processor implements the ConsumerGroupHandler interface. What happens on each rebalance here (see the docs for details), is:

Hope the helps you, would be nice if we could reproduce the behavior in a simple test.

frairon commented 3 years ago

@oss92, just one more thing you could check. It's crucial that messages with the same key, consistently end up in the same partition. If you don't have control over the emitter and cannot depend on that, you have to "reroute" the event prior to using it to modify the state using Loopback.

If a processor receives the same key in different partitions, two different go-routines will process the event and they both have different states. If you then use a view to get the value for a key, it will first try to find the partition that holds that value and then query that partition. But this might not be the partition where the input event originally arrived, so the state is different.

oss92 commented 3 years ago

@frairon The messages of the same key do end up in the same partition within a single topic. We use Kafka's default partitioning with no customization.

I have parked this for a while because actually I went with the implementation of reading from the view and from the ctx.Value() as a workaround and failing the run if the view has a newer value (which happens!!!! -- crazy)

I need to dive into Goka again and figure out why is this happening. Thanks for writing the test btw, it can really come in handy and sorry for not being so responsive the past couple days on the issue.

I will write an update here once I finish my investigation.

frairon commented 3 years ago

No worries, take your time. I'm just thinking it's either a design error or something about the partitioning. It should be easy, reliable and no need for workarounds, because there really isn't any magic going on here :)

Using Kafka's default partitioner is probably actually be root of the problem though! In goka, we use this hasher to emit, and the same hasher here to figure out the partition to read from a view. Kafka however uses murmur2, as stated here. Now let's consider incoming key X, sent to partition-0 by murmur2-hashing, then in the callback, ctx.Value(), will read what was last written to the value to partition-0. However, if we call ctx.SetValue(), it will be stored in - let's assume - partition-1, determined by goka's hasher. Then on the next message with key 'X' (which will be sent to partition-0 again), the next call to ctx.Value() will not return expected value. Also the value read from a view will be different, because it will read from the partition goka assumes the value would be stored.

Actually that makes me think we should have a warning (or even a fail the processor?) if the consume-callback tries to do a ctx.SetValue() with a message where the outgoing partition wouldn't match with the incoming partition. Then this problem would be obvious right away.

Ok anyway, here's a simple solution using loopback. This only works if both the callbacks and the codecs are the same for all input topics. If not, you have to pack the message back into a new type for the loopback so the callback can later figure out where the message was originally sent to.

Suppose this is the current group graph.

current := goka.DefineGroup(..,
  goka.Input("input-a", new(Codec), callback),
  goka.Input("input-a", new(Codec), callback),
  goka.Persist(new(PersistCodec)),
)

If we now add a loop edge, it will "fix" the partition by emitting the value once again to kafka

repartition := func(ctx goka.Context, msg interface{}){
    ctx.Loopback(ctx.Key(), msg)
})
current := goka.DefineGroup(..,
  // uses byte-codec because we don't care about the value, no need to decode
  goka.Input("input-a", new(codec.Bytes), repartition), 
  goka.Input("input-a", new(codec.Bytes), repartition),
  goka.Loopback(new(Codec), callback), // actual handling of the input events
  goka.Persist(new(PersistCodec)),
)

(haven't compiled the code, hope it works).

Sorry for the long text. Good luck :)

frairon commented 3 years ago

@oss92 Is the issue actually fixed for you now?

oss92 commented 3 years ago

@frairon, not yet actually. We paused it for a while because we could work around it but now we have opened it again starting last week. I will update you if we discover anything new.

ice-ares commented 3 years ago

@frairon How can we synchronise the partition assignment hashing algorithms for both goka and the Kafka Broker?

frairon commented 3 years ago

Fabian isn't probably the right guy to ask here :D

Anyway, Kafka itself doesn't determine the partition, it's the producer that defines the partition. If you are in doubt that the partition is correct, you can always use the loopback, e.g. like this:

goka.NewProcessor(...,
goka.DefineGroup(...,
  goka.Input("input-with-wrong-partition", someCodec, func(ctx goka.Context, msg interface{}){
     ctx.Loopback(ctx.Key(), msg)
  }),
  goka.Loop(someCodec, func(ctx goka.Context, msg interface{}){
      // at this point you can be sure that msg is in the correct partition according to its key
  }),
))
....

That however is cumbersome if you have multiple input topics with different codecs, because they are all channeled into the same loopback-topic where you have to unmangle the different types again (also, you can only have one codec here). If you do have multiple types and input topics, you can also create like a proxy-processor, where each input-topic is forwarded with ctx.Output(..) to a "correct-partition"-output-topic, where the other processor then consumes from.

Hope that answers your question.

ice-ares commented 3 years ago

@frairon Right, sorry 😄

What I meant to ask was : How do we configure goka's producer to have a specific/different/customizable hashing function/algorithm, so that if we use any other Kafka Producer libraries (other than goka) that generate/produce messages on the Kafka Broker, we could expect the same partition assignment for all of them?

frairon commented 3 years ago

Ah, now I get it. Goka uses the default hasher here, (so a fnv32 hash). As far as I know, kafka streams e.g. uses murmur2, like here

So I guess do whatever is easiest for you: either set the default hasher in goka (this option) to match the producer hasher, or try to get fnv32 into your producer. Or set both, just to be sure, they just have to be in sync. Note though that once the processor is running, there's no safe way to change the partitioner, because it will mess up your state completely.

EvgeniaMartynova-thebeat commented 3 years ago

That however is cumbersome if you have multiple input topics with different codecs, because they are all channeled into the same loopback-topic where you have to unmangle the different types again (also, you can only have one codec here). If you do have multiple types and input topics, you can also create like a proxy-processor, where each input-topic is forwarded with ctx.Output(..) to a "correct-partition"-output-topic, where the other processor then consumes from.

@frairon Based on this suggestion is the following correct?

thanks a lot in advance

frairon commented 3 years ago

sorry for the long delay @EvgeniaMartynova-thebeat. Your assumptions are right though.

EvgeniaMartynova-thebeat commented 3 years ago

Thanks a lot, @frairon. We have implemented the topics repartition per your suggestion here: https://github.com/lovoo/goka/issues/332#issuecomment-934375365. However I still cannot get my mind around a few things.

Assuming we had the following setup:


Emitter A with FNV32a hasher (default in Sarama and Goka). Emitter B with CRC32 hasher (default in Kafka lib for PHP).

2 Partitions.

Goka Processor listening to topic A and B and aggregating messages from these two topics into one group table.


Let’s assume message with a key “user-1” from topic A is emitted and it arrives to partition 1, it is picked by Goka Partition Processor 1. Partition Processor 1 with call of ctx.SetValue() stores the aggregation in its local storage and sends it to Kafka.

Then message with the key “user-1” from topic B is emitted and it arrives to partition 2 (because of a different hasher of emitter B), it is picked by Goka Partition Processor 2. Partition Processor 2 does not have anything in its local storage for the message key because all aggregations for the key are stored in a local storage of Partition Processor 1. It is then stores the aggregated message under the Partition 1 again but overriding the previous aggregation => this is where the things went wrong and that is why we needed the messages to come to the same partition and to be picked up by the same partition processor.

Is my thought process correct? Does each partition processor has its own local storage? Does each partition stores only aggregated messages for the keys which correspond to the partition? Is local storage of partition processor different from a storage of a view or they utilize the same storage?

db7 commented 3 years ago

Is my thought process correct?

This seems to correctly describe the problem.

Does each partition processor has its own local storage?

The "local storage" is a sort of cache of a table topic in Kafka. Each partition processor has such a cache for its partition. If you use multiple hashers, the different partitions will not be reading from the same expected "cached version" and things will go wild.

Does each partition stores only aggregated messages for the keys which correspond to the partition?

I don't know what you mean with aggregated, but the local storage keeps the latest values of each key. The same happens (in the long run) with the Kafka topic using the right compaction policy.

Is local storage of partition processor different from a storage of a view or they utilize the same storage?

The implementation is the same, but again, the storage in the disk is just a "cache" of what is in Kafka. And if you have a processor and a view colocated in the same machine, they will be using distinct local storages to keep track of the data in the table topic in Kafka.


Regarding hashers, the most elegant approach would be to change the hasher in PHP library. I believe the murmur2 hasher used in the Java libraries and the fnv32a used in Sarama should have a lower collision probability than a trivial CRC32 hashing. To see if that is a problem, you can fetch the keys from Kafka and run the different hash algorithms on them to see how they bucketize in the number of partitions you chose.

Side note: If you are really only using 2 partitions, I think this isn't the best configuration since each partition will be processed by a single thread to guarantee sequential execution. I think that something like 10-20 partitions is usually preferable. You can still use 1 or 2 processor instances, they will split the partitions evenly.

EvgeniaMartynova-thebeat commented 3 years ago

Thanks a lot, @db7, for your quick reply. The example I gave is a demo example, in reality we have a much more complicated setup with many emitters which are using different hashers and a bigger number of partitions. The only thing which is guaranteed is that the number of partitions per topic is the same. Unfortunately we don't have control over emitters, so we cannot apply the elegant solution which you suggested of changing the emitter hashers that is why we have to stick with this solution: https://github.com/lovoo/goka/issues/332.

oss92 commented 3 years ago

No worries, take your time. I'm just thinking it's either a design error or something about the partitioning. It should be easy, reliable and no need for workarounds, because there really isn't any magic going on here :)

Using Kafka's default partitioner is probably actually be root of the problem though! In goka, we use this hasher to emit, and the same hasher here to figure out the partition to read from a view. Kafka however uses murmur2, as stated here. Now let's consider incoming key X, sent to partition-0 by murmur2-hashing, then in the callback, ctx.Value(), will read what was last written to the value to partition-0. However, if we call ctx.SetValue(), it will be stored in - let's assume - partition-1, determined by goka's hasher. Then on the next message with key 'X' (which will be sent to partition-0 again), the next call to ctx.Value() will not return expected value. Also the value read from a view will be different, because it will read from the partition goka assumes the value would be stored.

Actually that makes me think we should have a warning (or even a fail the processor?) if the consume-callback tries to do a ctx.SetValue() with a message where the outgoing partition wouldn't match with the incoming partition. Then this problem would be obvious right away.

Ok anyway, here's a simple solution using loopback. This only works if both the callbacks and the codecs are the same for all input topics. If not, you have to pack the message back into a new type for the loopback so the callback can later figure out where the message was originally sent to.

Suppose this is the current group graph.

current := goka.DefineGroup(..,
  goka.Input("input-a", new(Codec), callback),
  goka.Input("input-a", new(Codec), callback),
  goka.Persist(new(PersistCodec)),
)

If we now add a loop edge, it will "fix" the partition by emitting the value once again to kafka

repartition := func(ctx goka.Context, msg interface{}){
    ctx.Loopback(ctx.Key(), msg)
})
current := goka.DefineGroup(..,
  // uses byte-codec because we don't care about the value, no need to decode
  goka.Input("input-a", new(codec.Bytes), repartition), 
  goka.Input("input-a", new(codec.Bytes), repartition),
  goka.Loopback(new(Codec), callback), // actual handling of the input events
  goka.Persist(new(PersistCodec)),
)

(haven't compiled the code, hope it works).

Sorry for the long text. Good luck :)

The issue was exactly around the hashers and hence repartitioning the input topics to all use the same hashers has solved the problems. Thanks a lot guys for your support. I will close this issue while hoping that this thread will help others who face the same problem.