LGouellec / streamiz

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/streamiz/
MIT License
470 stars 75 forks source link

Support KTable-KTable Foreign-Key Join #52

Open ybuasen opened 3 years ago

ybuasen commented 3 years ago

Description

The java Kafka Stream library supports KTable-KTable Foreign-Key Join as mentioned at https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#ktable-ktable-fk-join.

The sample use case of java version is post at https://kafka-tutorials.confluent.io/foreign-key-joins/kstreams.html.

Will the same functionality be implemented in Streamiz Kafka .NET soon?

LGouellec commented 3 years ago

Hi @ybuasen,

Exactly, java library offer KTable-KTable Foreign-Key Join. This feature is not available yet. I hope create a release 1.3.0 (april-may 2021) with this feature.

I leave open this issue to track feature implementation progress.

Regards,

ybuasen commented 3 years ago

Hi @ybuasen,

Exactly, java library offer KTable-KTable Foreign-Key Join. This feature is not available yet. I hope create a release 1.3.0 (april-may 2021) with this feature.

I leave open this issue to track feature implementation progress.

Regards,

@LGouellec Awesome!!! Looking forward to see it in action.

sgiuliani-ovalmoney commented 2 years ago

Hi, is this feature currently available within the Streamiz library? Regards

LGouellec commented 2 years ago

Hi, is this feature currently available within the Streamiz library?

Regards

For now, this feature is not prioritized. But you have a workaround with SelectKey(..) combine Ë‹Join(..)`.

MladenTasevski commented 2 years ago

Hi, is this feature currently available within the Streamiz library? Regards

For now, this feature is not prioritized. But you have a workaround with SelectKey(..) combine Ë‹Join(..)`.

For streams you can join them by remapping the key and then using the join. I was trying to use ToStream and then remap the key and join . Also I am getting the error using this code (might make sense since changing the keys of tables leads to inconsistencies and loosing data since the key that you would remap to is not unique to records with the same key). So can you join two KTables reliably by foreign key using your library?

var table1 = builder.Table("topic1", new StringSerDes(), new StringSerDes(), InMemory<string, string>.As("table1"));
var table2 = builder.Table("topic2", new StringSerDes(), new StringSerDes(), InMemory<string, string>.As("table2"));
table1.ToStream().SelectKey((k, v) => v.name).ToTable()
                .Join(table2, (v1, v2) => $"{v1}-{v2}")
                .ToStream().To("final-topic", new StringSerDes(), new StringSerDes());

The error we are getting:

fail: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[test-app-20ab114e-9933-47d5-a379-0839c9c2bcc7-stream-thread-0] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
      Confluent.Kafka.KafkaException: Local: Erroneous state
         at Confluent.Kafka.Impl.SafeKafkaHandle.AssignImpl(IEnumerable`1 partitions, Func`3 assignMethodErr, Func`3 assignMethodError)
         at Confluent.Kafka.Consumer`2.Unassign()
         at Confluent.Kafka.Consumer`2.RebalanceCallback(IntPtr rk, ErrorCode err, IntPtr partitionsPtr, IntPtr opaque)
         at Confluent.Kafka.Impl.NativeMethods.NativeMethods.rd_kafka_consumer_poll(IntPtr rk, IntPtr timeout_ms)
         at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
         at Confluent.Kafka.Consumer`2.Consume(TimeSpan timeout)
         at Streamiz.Kafka.Net.Crosscutting.KafkaExtensions.ConsumeRecords[K,V](IConsumer`2 consumer, TimeSpan timeout, Int64 maxRecords)
         at Streamiz.Kafka.Net.Processors.StreamThread.<>c__DisplayClass57_0.<Run>b__0()
         at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action)
         at Streamiz.Kafka.Net.Processors.StreamThread.Run()
LGouellec commented 2 years ago

Hi @MladenTasevski,

Could you provide more logs regarding your example please ? Looks strange, I don't think it's link about table foreign key. Maybe it's relative to another issue.

All details are welcome :)

Regards,

MladenTasevski commented 2 years ago

Hi @MladenTasevski,

Could you provide more logs regarding your example please ? Looks strange, I don't think it's link about table foreign key. Maybe it's relative to another issue.

All details are welcome :)

Regards,

Hello again @LGouellec

Here I am trying to join the order table with the customer and book table. Both the book and customer have different PK from orders table and from eachother so will have to SelectKey before each join. When trying to do it by joining with only one order stream it fails with this error:

fail: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[test-app-35497002-748f-4946-97be-ae569c23265d-stream-thread-0] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
      Confluent.Kafka.KafkaException: Local: Unknown partition
         at Confluent.Kafka.Impl.SafeKafkaHandle.QueryWatermarkOffsets(String topic, Int32 partition, Int32 millisecondsTimeout)
         at Confluent.Kafka.Consumer`2.QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
         at Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader.<OffsetsChangelogs>b__30_0(ChangelogMetadata _changelog)
         at System.Linq.Enumerable.SelectListIterator`2.MoveNext()
         at System.Linq.Enumerable.ToDictionary[TSource,TKey,TElement](IEnumerable`1 source, Func`2 keySelector, Func`2 elementSelector, IEqualityComparer`1 comparer)
         at Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader.OffsetsChangelogs(IEnumerable`1 registeredChangelogs)
         at Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader.InitChangelogs(IEnumerable`1 registeredChangelogs)
         at Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader.Restore()
         at Streamiz.Kafka.Net.Processors.StreamThread.RestorePhase()
         at Streamiz.Kafka.Net.Processors.StreamThread.Run()
fail: Streamiz.Kafka.Net.Processors.StreamStateManager[0]
      All stream threads have died. The instance will be in error state and should be closed

What more from the logs would you need?

Also when I use SelectKey right before the join it also fails. So I only use SelectKey when initializing the stream or after joining the streams. Is this way also safe since the FKs aren't a unique key for the orders table?

I am not getting any joins on the customerOrders and bookOrders even though I checked there were entries with the same ID in them.

This is my current implementation:

var orders = builder.Stream("orders" new StringSerDes(), new StringSerDes()).SelectKey((k, v) => v.bookId).ToTable();
var copyOfOrders = builder.Stream("copy_of_orders", new StringSerDes(), new StringSerDes()).SelectKey((k, v) => v.customerId).ToTable();
var book = builder.Stream("book", new StringSerDes(), new StringSerDes()).ToTable();
var customer = builder.Stream("customer", new StringSerDes(), new StringSerDes()).ToTable();

orders.Join(customers, (v1, v2) =>
{
    return JoinCustomerWithBook(v1, v2);
}).ToStream().SelectKey((k, v) => v.orderId).ToTable().To("CustomerOrders");

orders.Join(books, (v1, v2) =>
{
    return JoinBookWithOrder(v1, v2);
}).ToStream().SelectKey((k, v) => v.orderId).ToTable().To("BookOrders");

customerOrders.Join(bookOrders, (v1, v2) => {
    return JoinOrders(v1, v2);
});
LGouellec commented 2 years ago

HI @MladenTasevski ,

Regarding this log :

fail: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[test-app-35497002-748f-4946-97be-ae569c23265d-stream-thread-0] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
      Confluent.Kafka.KafkaException: Local: Unknown partition

It seems that all source topics are not co-partitioned. During a JOIN operation, both topics need to have the same number partitions else the join can't work.

I recommend you to read this article, there are more public blogs available also.

Best regards,

MladenTasevski commented 2 years ago

@LGouellec I didn't know about the partitions for joining thank you for pointing that out. I've managed to join the KTables, but I am getting a lot of out of order records and I think that makes up for latency in the streams. I'm just joining and using select key. What could be some reasons for the out of order records?

info: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      Finished restoring changelog KSTREAM-TOTABLE-STATE-STORE-0000000019 to store test-app-KSTREAM-TOTABLE-STATE-STORE-0000000019-changelog [[0]] with a total number of 59121 records
info: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[2|0] Task 2-0 state transition from RESTORING to RUNNING
info: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[2|0] Restored and ready to run
warn: Streamiz.Kafka.Net.Processors.KTableSourceProcessor[0]
      Detected out-of-order KTable update for KSTREAM-TOTABLE-STATE-STORE-0000000019 at offset 7424, partition [0].
LGouellec commented 2 years ago

@MladenTasevski If you stream a topic, you change the key (via SelectKey(..)) and make a statefull operation like create a KTable, Join operation or Count/Agg/Reduce. The library will create a repartition topic to materialize the new key.

So your topology is split into two parts. First one : Stream topic, change Key , Publish into the repartition topic Second one : Stream repartition topic and do the statefull operation.

Internal consumer subscribe both topics (source topic and repartition), so you are not guaranteed to consume messages first from the source topic and then from the repartition topic. It is in parallel.

So out-of-order records can appears if the timestamp present into the state store is newer compared to the current record.

Btw, You can visualize your topology with this tools. Call for getting the output :

streamBuilder.Build().Describe().ToString();
MladenTasevski commented 2 years ago

@LGouellec Thank you for that explanation and the tool, it helped a lot to understand what's going on. I was getting a lot of the out of order records. I am doing a FK join of 3 tables which all have different PKs. The out of order records appear too often and it's causing performance issues. I don't know if there's something more to try. I maybe have something that I'm not doing correctly also.

LGouellec commented 2 years ago

@MladenTasevski
What about kafka messages itself ? By default, Streamiz process data in priority by message timestamp. Out-of-order records means you process message 1 with timestamp 1 after processed message 2 with timestamp 2.

In the producer source topics, the timestamp of your messages are setted explicitly or you let the broker do it ? Do you use TimestampExtractor ? or keeping the default value ?

MladenTasevski commented 2 years ago

@LGouellec I keep the default value, let the broker do it. Haven't messed with the Timestamps Extractor.

LGouellec commented 2 years ago

Ok. Can you attach to this issue a dump of all your source topics ? And your topology ?