confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
128 stars 1.04k forks source link

Improve join predicatability #5537

Open big-andy-coates opened 4 years ago

big-andy-coates commented 4 years ago

At the moment many users are reporting issues getting joins to work out-of-the-box.

On investigation, the cause can often be put down to the Kafka broker returning data for only one side of the join. For example, given the join:

CREATE TABLE OUTPUT AS 
   SELECT * FROM L JOIN R ON L.ID = R.ID;

When such a query is started, where the topics behind L and R have large historic data-sets, when the Streams app's consumer polls the Kafka broker to get records to process, then the Kafka broker has no guarantees over which topic's records will be returned. The broker may choose to return records from both topics, or only one side.

If the poll only returns records from one topic, the streams app will, by default, assume that the other side has no pending records and will process the records it does have. When these records are processed they may fail to join as the matching records from the other side have not been returned in the poll.

Users can set the streams property max.task.idle.ms to tell ksqlDB to keep polling for records from the other side. However, the default value for max.task.idle.ms is zero, meaning no delay.

This is poor UX as most users are not aware of max.task.idle.ms and ideally we shouldn't be relying on users setting arcane streams properties anyway.

big-andy-coates commented 4 years ago

I've marked as requires-streams-change as it may be possible to fix this upstream.

There have been discussions about fixing this in the Kafka Broker by enhancing the brokers such that a poll across multiple topic-partitions will choose which data to return based on timestamp. However, such a solution has many edge cases, not least that the user may have embedded the timestamp they want to use in the payload, (which is why I'm assuming its not been implemented!)

What would happen if we set the default of max.task.idle.ms to 1. Would this mean that ksqlDB would always attempt 'one more poll' when it exhausts data from one side of a join and the next poll does not return any more data for that side? If my understanding is correct, the additional poll explicitly removes the other side of the join from the set of topic-partitions being requested, so this additional poll should then ensure any pending rows from the other side are returned in the most common case that is causing users issues: i.e. a simple join of two topics with a large amount of history to work through.

big-andy-coates commented 4 years ago

Digging into this and discussing with others:

max.task.idle.ms, which defaults to 0, interplays with both the streams config poll.ms, which defaults to 100ms, and the consumer config max.poll.interval.ms, which defaults to 5 minutes.

When running in steady state, Streams tunes its batch sizes to call poll at least twice per max.poll.interval.ms. However, will call poll more frequently if internal buffers are exhausted, i.e. once all messages are processed.

If we can set max.task.idle.ms high enough that a second poll is guaranteed before the max.idle is exceeded, then it would ensure any waiting records on the exhausted side were retrieved.

Unfortunately, max.task.idle.ms is decoupled for the other settings. Setting max.idle to 1 won't make any difference as the idle timeout would still always be exceeded. To effectively use max.idle the setting needs to be set around the max.poll.interval.ms, but this defaults to 5 minutes and we'd be looking at poor UX / throughput if one side of an outer join was generally empty. (Think: user sat at console having to wait 5 minutes for the first row to come back!).

Setting max.idle above poll.ms should be sufficient to generally mean a second poll occurs, (according to @mjsax), though its a little unclear how this can be if streams tunes its batch sizes to poll twice per max.poll, (??).

So... we can split this into two parts.

  1. a streams change to improve the config in this space.
  2. a ksqlDB change to the default max.idle.ms config. Setting to around 500ms, to increase to probability of good out-of-the-box behaviour.
mjsax commented 4 years ago

To effectively use max.idle the setting needs to be set around the max.poll.interval.ms

Not sure if I agree here. At least not for "normal" processing -- only for the corner case if very heavy weight processing this might hold up. Thus, for out-of-the-box config/experience, we can ignore max.poll.interval.ms IMHO.

though its a little unclear how this can be if streams tunes its batch sizes to poll twice per max.poll

Note that the numIterations is also bounded by the buffer size (buffered.records.per.partition) that is configure with 1000 records by default. The overall loop is

while(true)
  poll(<poll.ms>);
  put data into buffer
  while (true) {
    for (numIterations) process()
    if nothing-processed -> break // can happen if buffer is empty
    if time-spent-processing > max.poll.interval.ms / 2
      -> numIterations = numIterations / 2 
      -> break 
    numIterations++
  }

Thus, max.poll.interval.ms is just an additional guard that we don't drop out of the consumer group and we use it to keep numIterations small to avoid this case. For most cases, we should break the loop because the buffer becomes empty. (Note that numIterations is per task, ie, in process() we apply if for each task individually). Thus, the maximum time spend is "max(numIterations, recordsInBufferForPartition) numTaskOfThread avgProcessingTimePerRecord".

In process() we process if data is available for all partitions if not, we start the max.task.idle.ms timeout to eventually enter the enforced processing. Thus, 500ms as default for max.task.idle.ms might be too short? Or you decrease the buffer size? Not sure what a expected processing time per record is for ksqlDB?

vvcephei commented 4 years ago

Here's an AK Streams ticket for this issue, which may be a better place to discuss the details: https://issues.apache.org/jira/browse/KAFKA-10091

big-andy-coates commented 4 years ago

So we're saying max.idle should be set high enough to process the full buffer.

Worst case, i.e. full buffer when other side is exhausted, we'd have max.task.idle.ms to process 1000 records. So a value of 500ms would requiring processing speeds around 2Kmsg/s, which seems in the right ballpark to me.

I remember people talking of throughputs of ~20Kmsg/s per thread for a simple join with 300B messages, (source: https://www.slideshare.net/ConfluentInc/ksql-performance-tuning-for-fun-and-profit-nick-dearden-confluent-kafka-summit-sf-2019).

@mjsax, still think 500ms is too small? Would feel safer in your bed at night if we default to 1s? :p

mjsax commented 4 years ago

You also need to take the number of tasks into account. If a thread hosts 10 tasks (what does not sound unreasonable to me) it's up to 10K records, not just 1K. For 20Kmsg/s, you hit the 500ms already if my math is correct. Not sure what's the expected number of tasks per thread by default is though (what is the default number of thread of a query and what it the expected number of input topic partitions)? -- We could also consider to reduce the buffer size instead of increasing the max.taks.idle.ms.

In the end, I leave it up to your own judgement. If you think 500ms is enough, just go for it. But I believe it might be too small for the default buffer size (if you aim for "robustness"/predicatability).

terryf82 commented 4 years ago

I'm not sure if this is a demonstration of the same behaviour, but using the excellent gradle-confluent plugin, which effectively allows ksql statements to broken up into logical chunk files I seem to be able to execute statements "too fast" for the server to handle correctly.

File 1, which builds a lookup / reference table of TAGTYPES:

-- REGISTER TAGTYPES SOURCE
CREATE STREAM KS_TAGTYPES_SRC WITH (KAFKA_TOPIC='aurora.gearbox.tagtypes', VALUE_FORMAT='AVRO');

-- LOOKUP TABLE
--@DeleteTopic
CREATE TABLE KT_TAGTYPES AS SELECT
    ID,
    LATEST_BY_OFFSET(NAME) AS NAME
FROM KS_TAGTYPES_SRC GROUP BY ID;

File 2, which generates a stream of TAGS to be enriched with their relevant type:

-- REGISTER TAGS SOURCE
CREATE STREAM KS_TAGS_SRC WITH (KAFKA_TOPIC='aurora.gearbox.tags', VALUE_FORMAT='AVRO');
-- JOIN TAGS (T) WITH THEIR TAGTYPES (TT)
--@DeleteTopic
CREATE STREAM KS_TAGS AS SELECT
    T.ID AS ID,
    T.NAME AS NAME,
    TT.NAME AS TYPE
FROM KS_TAGS_SRC T JOIN KT_TAGTYPES TT
ON T.TYPE_ID = TT.ID;

If I allow the plugin to run both files in sequence, it races through the queries and produces an output stream KS_TAGS that only ever has partial coverage (not all tags get enriched with their type). But if I execute the files separately, introducing a pause of only a couple of seconds, the coverage in KS_TAGS is complete. Note that the timestamps of all records have been accounted for as best as possible - rows in KS_TAGTYPES_SRC are timestamped earlier, or at worst at the same time as KS_TAGS_SRC (both are brought into Kafka using Debezium CDC).

cc @MichaelDrogalis

mjsax commented 4 years ago

@terryf82 Are you saying we can reproduce this issue even with a large value of max.task.idle.ms config? Or only for the default config max.task.idle.ms=0 ?

terryf82 commented 4 years ago

@mjsax If I run the queries above using the gradle-confluent plugin, even a max.task.idle.ms of 20000 still gives me a partial join.

Only if I execute the statements with a pause between them (either by running them manually, or through a parameter to the plugin which I'm working on) of at least 1 second, do I get a complete join between the stream & table.

mjsax commented 4 years ago

That is a little surprising to me. @big-andy-coates any ideas?

big-andy-coates commented 4 years ago

The only thing that jumps out is:

or at worst at the same time

If the table data has the same timestamp as the stream, is the order they are loaded table-then-stream or non-deterministic?

If it's not that, then we'd need a recreatable test case to investigate more. That's probably best raised as a new issue.

mjsax commented 4 years ago

If the table data has the same timestamp as the stream, is the order they are loaded table-then-stream or non-deterministic?

That is non-deterministic atm. It's a know issue in Kafka Streams.