LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
453 stars 73 forks source link

Consuming messages issue #229

Closed ognenbek closed 1 year ago

ognenbek commented 1 year ago

Description

Hi, I'm using Steramiz version 1.4.0 on a 3-node cluster. The application is doing a join of two topics with the same key as message, however I noticed an issue when streamiz consumes the messages, some of the messages are left out. The issue occurs frequently, in some cases everything is ok but in others it fails to consume all of the messages.

How to reproduce

I will provide screenshots of the consumer groups and its offsets, which represent how many messages should steramiz consume. And will show a log of the application with how many messages it actually consumed.

Consumer group offsets before the messages arrive and after they arrive (the messages are still not consumed so they are shown in the LAG column):

state-1

Steamiz log showing that it processed 3 messages, where it should have processed 10:

consume-1 total-records-1

Additional Issue

Another issue that I noticed which may be somehow connected, the kafka command shows that the messages should be conusmed by the streamiz application with id "fabdcb33-cec0-4d36-a2c0-e9d630e8b2c5" however the messages were consumed by other applications with id: 89283856-1841-41bc-8abe-2be5cd24933b.

Consumer group offsets and consumer ids: state-2

Stremiz log from the other app: consume-2

I tried reproducing the issue locally with one broker however everything worked as expected. I don't know If it some issue with the library or some misconfiguration. Any additional info or help would be appreciated.

LGouellec commented 1 year ago

Hi @ognenbek ,

Can you enable DEBUG streamiz logs and attach these logs when the problem occurs please ?

Kr,

ognenbek commented 1 year ago

Hey @LGouellec, I will attach the logs in the comment. Thank you again for the support.

kafka-streams-issue-log.txt

LGouellec commented 1 year ago

Hi @ognenbek ,

To give more context, can you share your topology and configuration please ?

Best regards,

ognenbek commented 1 year ago

Hi @LGouellec,

Here is the code of our Streaming service: StreamingService.txt

Because we needed a Range Partition Assignment Strategy, we created our implementation of the StreamingConfig file, with the same content as the one from Stremiz, just we updated the field PartitionAssignmentStrategy :

PartitionAssignmentStrategy = Confluent.Kafka.PartitionAssignmentStrategy.Range;

The whole Streaming config code: StreamingConfig.txt

LGouellec commented 1 year ago

@ognenbek , Thanks for the input. I tried to reproduce your issue with a local instance and a ForeachAsync processor, and I have no issue. Both source topics are committed and the request topic are continuously read. Can you see the lag for the external client with :

kafka-consumer-groups --bootstrap-server XXXXX --describe --group streams-join-app-external

Just for your known, here my reproducer.

internal class Program
    {
        public class Ticket {
            public string id { get; set; }
        }

        public class TicketDetails
        {
            public string id { get; set; }
            public string user { get; set; }
        }

        public class Progression
        {
            public string id { get; set; }
            public string user { get; set; }
        }

        private static readonly TimeSpan WindowTimespan = TimeSpan.FromDays(4);
        private static readonly TimeSpan WindowSize = TimeSpan.FromDays(4);
        private static readonly TimeSpan JoinWindowSize = TimeSpan.FromDays(2);

        internal class JoinValueMapper : IValueJoiner<Ticket, TicketDetails, Progression>
        {
            public Progression Apply(Ticket ticket1Value, TicketDetails ticket2Value)
            {
                return new Progression
                {
                    id = ticket1Value.id,
                    user = ticket2Value.user
                };
            }
        }

        public static async Task Main(string[] args)
        {
            IWindowBytesStoreSupplier CreateStoreSupplier(string storeName)
            {
                return Stores.PersistentWindowStore(storeName, WindowTimespan,
                    WindowSize, (long)(2 * WindowTimespan.TotalMilliseconds));
            }

            var config = new StreamConfig<StringSerDes, StringSerDes>();
            config.ApplicationId = "test-app-reproducer";
            config.BootstrapServers = "localhost:9092";
            config.AutoOffsetReset = AutoOffsetReset.Earliest;
            config.CommitIntervalMs = 3000;
            config.StateDir = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString());
            config.Logger = LoggerFactory.Create((b) =>
            {
                b.SetMinimumLevel(LogLevel.Debug);
                b.AddLog4Net();
            });

            StreamBuilder builder = new StreamBuilder();

            var ticket1Stream = builder
                .Stream<string, Ticket, StringSerDes, JsonSerDes<Ticket>>("tickets");
            var lotteryTicketDetailsStream = builder
                .Stream<string, TicketDetails, StringSerDes, JsonSerDes<TicketDetails>>("tickets-details");

            var topic1supplier = CreateStoreSupplier("tickets-store");
            var topic2Supplier = CreateStoreSupplier("tickets-details-store");

            var joinValueMapper = new JoinValueMapper();
            var joinWindowOptions = JoinWindowOptions.Of(JoinWindowSize);
            var joinProps =
                StreamJoinProps.With<string, Ticket, TicketDetails>(topic1supplier,
                    topic2Supplier);

            var client = new MongoClient(
                "mongodb://admin:admin@localhost:27017"
            );
            var database = client.GetDatabase("streamiz");

            ticket1Stream.Join(lotteryTicketDetailsStream, joinValueMapper, joinWindowOptions, joinProps)
                .ForeachAsync(async (message, _) =>
                    {
                        var joinValue = message.Value;
                        await database
                            .GetCollection<Progression>("progress")
                            .InsertOneAsync(joinValue);
                    }, null,
                    new RequestSerDes<string, Progression>(new StringSerDes(),
                        new JsonSerDes<Progression>()));

            Topology t = builder.Build();
            KafkaStream stream1 = new KafkaStream(t, config);

            Console.CancelKeyPress += (_, _) => stream1.Dispose();

            await stream1.StartAsync();
        }
    }
ognenbek commented 1 year ago

Hey @LGouellec, thanks for the response, I haven't mentioned but the messages are streamed in a very short period of time, milliseconds in between. I have run the command that you've sent me and noticed that when I have the issue the streams-join-app-external shows less number of messages in the topic than in the streams-join-app. I will provide the screenshots below:

streams-join-app log:

topic-issue

streams-join-app-external log:

external-topic-issue

Streamiz log:

issue-service-log

Thanks for the support

LGouellec commented 1 year ago

Hi @ognenbek ,

It's normal because the -request topic are purged automatically by the streamiz external thread. It's like a repartitioned topic. You must not based on the number of message in the -request topic.

By default the commit process occurs every 30 seconds by thread (so in your case, you have two thread , 1 for normal topology and another one for external call), but the commit is not called at the same time for both.

This is why, when you use Async processors, it's a at least once guarantee.

ognenbek commented 1 year ago

Hey @LGouellec, So this means that the -request topics and the regular ones are not expected to have the same number of messages. However, do you have some additional information why streamiz is not reading all of the messages, in the example above there was 1 message sent in the topic1 and 12 messages sent in the topic2 but in the end only 3 messages were joined. I checked the messages they all have the same key and as it is shown in the pictures they all go in the same partition.

LGouellec commented 1 year ago

@ognenbek So this means that the -request topics and the regular ones are not expected to have the same number of messages. -> True

However, do you have some additional information why streamiz is not reading all of the messages, in the example above there was 1 message sent in the topic1 and 12 messages sent in the topic2 but in the end only 3 messages were joined. I checked the messages they all have the same key and as it is shown in the pictures they all go in the same partition. -> Ok, sounds weird. Did all messages arrive within the join time window ? For example, you join 2 topics with a window 5 min. If topic1 has a message at t0, and topic2 has a message at t+6min, the join doesn't match and you will have 0 message into the downstream topic. Per default, Streamiz use the timestamp of the message to detect if the message is too old or not. So to resume, in my sample, the join only works, if you have two messages from these 2 topics based on the same key processed 5 minutes apart.

ognenbek commented 1 year ago

Hey @LGouellec, the messages which are coming are separated by couple of milliseconds. I have the following configuration for the window size:

private static readonly TimeSpan WindowTimespan = TimeSpan.FromDays(4);
private static readonly TimeSpan WindowSize = TimeSpan.FromDays(4);
private static readonly TimeSpan JoinWindowSize = TimeSpan.FromDays(2); 

I also checked the messages in the topics and the keys in the topic0 and topic1 are matching. And from the pictures with the consumer-groups i see that the messages are coming, and the consumer is having a lag of X messages which it has to consume, but in the end only some of those X messages are joined.

LGouellec commented 1 year ago

@ognenbek ,

The lag is updated when the consumer commit the offsets. The lag is not updated in real-time , in the same time where the consumer poll and process the data. So let's suppose, you clear all your topics, produce messages into both topics. Start your streamiz application from scratch (you can change the application ID to be sure the streamiz app read topics from the begining, set AutoOffsetReset to Earliest), what is the result in the sink topic ?

ognenbek commented 1 year ago

Hey @LGouellec,

I have deleted the two topics and recreated them, and they were empty after deletion. Also added a new ID for the streamiz application and added AutoOffsetReset to Earliest. When i restarted the three instances at the begining everything was ok, I was streaming couple of messages in the first topic and 10x the amount in the second one and they were properly joined. However, after the fourth try I saw that streamiz joined 5 messages instead of 10 which came in the second topic. So the issue occurred again. Another thing to mention is that I am not sending the joined values in a sink topic, instead Im calling a service inside the application.

LGouellec commented 1 year ago

@ognenbek

What's happen if you replace the foreachAync processor per a sink processor into a Kafka topic ?

Best regards

ognenbek commented 1 year ago

Hey @LGouellec, I have just tested it with replacing the foreachAsync and sending the data into a topic, however i noticed the same issue, some of the messages were not joined and I had only 3 messages in the output topic instead of 10, I checked the input topic, all the messages came with the same key in the same partition, so it seems like the issue is in the joining part.

LGouellec commented 1 year ago

Ok,

I will check your issue ASAP

LGouellec commented 1 year ago

Hi @ognenbek,

I published a "hot-fix" branch (https://github.com/LGouellec/kafka-streams-dotnet/tree/reproducer-229) in my repository and it seems the issue is fixed. Could you clone this branch and test with your project to be sure that the join works at 100% now ?

Thanks,

ognenbek commented 1 year ago

Hey @LGouellec, I have tested the fix and still have the same issue, I also tried writing in a new topic after the join. However, I noticed something which may lead to the actual issue. I added a log on the two streams, like you do in the tests, lotteryTicketsStream.Print() and observed the data. Let me explain first the flow. We have two db tables Ticket and TicketDetails, in which we have attached a debezium connectors and they write in the respective topics. For each Ticket we can have multiple TicketDetails which are stored in almost same time.

When watching the logs I noticed that there are two scenarios in the ordering of the two types of messages. First scenario, The message comes in Tickets topic first and then all others in TicketDetails. Second scenario, all messages in Ticket Details come first and then the one in Tickets topic. The issue happens in the second scenario. I will give more information with examples.

Stream log Frist scenario

Metadata:Tickets|2|6|1676367827268
Metadata:TicketDetails|2|54|1676367827399 (joined)
Metadata:TicketDetails|2|55|1676367827401 (joined)
Metadata:TicketDetails|2|56|1676367827401 (joined)
Metadata:TicketDetails|2|57|1676367827401 (joined)
Metadata:TicketDetails|2|58|1676367827402 (joined)
Metadata:TicketDetails|2|59|1676367827402 (joined)
Metadata:TicketDetails|2|60|1676367827402 (joined)
Metadata:TicketDetails|2|61|1676367827402 (joined)
Metadata:TicketDetails|2|62|1676367827403 (joined)
Metadata:TicketDetails|2|63|1676367827403 (joined)

In this example all 10 messages from TicketDetails are joined with the Ticket message and there is no issue.

Stream log Second scenario (issue)

Metadata:TicketDetails|1|80|1676367883532 (not joined)
Metadata:TicketDetails|1|81|1676367883532 (joined)
Metadata:TicketDetails|1|82|1676367883533 (not joined)
Metadata:TicketDetails|1|83|1676367883533 (not joined)
Metadata:TicketDetails|1|84|1676367883533 (not joined)
Metadata:TicketDetails|1|85|1676367883533 (not joined)
Metadata:TicketDetails|1|85|1676367883533 (not joined)
Metadata:TicketDetails|1|87|1676367883533 (not joined)
Metadata:TicketDetails|1|88|1676367883533 (not joined)
Metadata:TicketDetails|1|89|1676367883533 (joined)
Metadata:Tickets|1|10|1676367883881

In this example only 2 messages from the TicketDetails topic were joined with the Ticket message. The messages which were joined were the last messages with the same timestamp, in this example Metadata:TicketDetails|1|81|1676367883532 and Metadata:TicketDetails|1|89|1676367883533. So, all the previous messages with the same timestamp (ex. 1676367883533) were not joined and were skipped. I was testing this with joining and then sending the data in a new topic. I made dozens of tests and I had the same outcome.

LGouellec commented 1 year ago

Hey @ognenbek ,

I understand your issue. You have a relation 1-N (1 ticket - N ticket details), so when the trigger of the join is the ticket details it works because you can have a single ticket per key + timestamp.

When the trigger of the join is the ticket (scenario 2), you can have multiple ticket-details for the same key + timestamp. Example :

Metadata:TicketDetails|1|80|1676367883532 (not joined)  SAME MESSAGE THAN 1
Metadata:TicketDetails|1|81|1676367883532 (joined) Message 1
Metadata:TicketDetails|1|82|1676367883533 (not joined) SAME MESSAGE 2
Metadata:TicketDetails|1|83|1676367883533 (not joined) SAME MESSAGE 2
Metadata:TicketDetails|1|84|1676367883533 (not joined) SAME MESSAGE 2
Metadata:TicketDetails|1|85|1676367883533 (not joined) SAME MESSAGE 2
Metadata:TicketDetails|1|85|1676367883533 (not joined) SAME MESSAGE 2
Metadata:TicketDetails|1|87|1676367883533 (not joined) SAME MESSAGE 2
Metadata:TicketDetails|1|88|1676367883533 (not joined) SAME MESSAGE 2
Metadata:TicketDetails|1|89|1676367883533 (joined) Message 2
Metadata:Tickets|1|10|1676367883881.   -> This message trigger the join because in both side you have messages, but you have just two messages different based on key + timestamp in the ticket details.

So the fix is to add a sequence number at the end of each message like Kafka Streams JAVA does.
I already created a GH issue #190 to track this feature and it will be deliver for 1.5.

LGouellec commented 1 year ago

I added an unit test which reproduce your issue. https://github.com/LGouellec/kafka-streams-dotnet/blob/71cb55005c9b1526a967f49363495805d73f137b/test/Streamiz.Kafka.Net.Tests/Reproducer229Tests.cs#L102

ognenbek commented 1 year ago

Hey @LGouellec, Thanks again for your support and your dedication :) We found a workaround which will suit us while waiting for the fix. We created our custom TimestampExtractor, inheriting from the ITimestampExtractor interface, in which return back the current timestamp in milliseconds instead of getting the timestamp from the message.

 public long Extract(ConsumeResult<object, object> record, long partitionTime)
        {
            if (record.Message.Timestamp.UnixTimestampMs < 0)
            {
                return onInvalidTimestamp(record, record.Message.Timestamp.UnixTimestampMs, partitionTime);
            }
            // return record.Message.Timestamp.UnixTimestampMs; -- Previous solution
           return DateTime.Now.Ticks / 10; // Temporary fix
        }

Tested this with the tests that you provided and also on our environment and the fix was working as expected.

Regards, Ognen