openzipkin / brave

Java distributed tracing implementation compatible with Zipkin backend services.
Apache License 2.0
2.36k stars 714 forks source link

Kafka Streams repartitioning #942

Closed metcox closed 5 years ago

metcox commented 5 years ago

Describe the Bug

The instrumentation of some Kafka Streams operations with KafkaStreamsTracing results in an unnecessary call to "KStream.transform ()". This can trigger unwanted and expensive repartitioning. The operations involved are: filter(), filterNot(), peek(), and mark(). These operations return a TransformerSupplier where ValueTransformerWithKeySupplier would be more appropriate. The associated call to KStream.transformValues() will not mark the stream for a repartitioning.

Steps to Reproduce

The following stream with Brave instrumentation generates 2 sub-topolgies :

stream
    .filter((k, v) -> true)
    .transform(kafkaStreamsTracing.filter("spanName", (k, v) -> true))
    .join(table, (left, right) -> left)
    .to("output");

Produces the topology:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000008 (topics: [KSTREAM-FILTER-0000000005-repartition])
      --> KSTREAM-JOIN-0000000009
    Processor: KSTREAM-JOIN-0000000009 (stores: [table-STATE-STORE-0000000000])
      --> KSTREAM-SINK-0000000010
      <-- KSTREAM-SOURCE-0000000008
    Source: KSTREAM-SOURCE-0000000001 (topics: [table])
      --> KTABLE-SOURCE-0000000002
    Sink: KSTREAM-SINK-0000000010 (topic: output)
      <-- KSTREAM-JOIN-0000000009
    Processor: KTABLE-SOURCE-0000000002 (stores: [table-STATE-STORE-0000000000])
      --> none
      <-- KSTREAM-SOURCE-0000000001

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000003 (topics: [input])
      --> KSTREAM-TRANSFORM-0000000004
    Processor: KSTREAM-TRANSFORM-0000000004 (stores: [])
      --> KSTREAM-FILTER-0000000005
      <-- KSTREAM-SOURCE-0000000003
    Processor: KSTREAM-FILTER-0000000005 (stores: [])
      --> KSTREAM-FILTER-0000000007
      <-- KSTREAM-TRANSFORM-0000000004
    Processor: KSTREAM-FILTER-0000000007 (stores: [])
      --> KSTREAM-SINK-0000000006
      <-- KSTREAM-FILTER-0000000005
    Sink: KSTREAM-SINK-0000000006 (topic: KSTREAM-FILTER-0000000005-repartition)
      <-- KSTREAM-FILTER-0000000007

Expected Behaviour

A single topology is expected like with the uninstrumented stream.

stream
    .filter((k, v) -> true)
    .join(table, (left, right) -> left)
    .to("output");

Produces the topolgy

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000003 (topics: [input])
      --> KSTREAM-FILTER-0000000004
    Processor: KSTREAM-FILTER-0000000004 (stores: [])
      --> KSTREAM-JOIN-0000000005
      <-- KSTREAM-SOURCE-0000000003
    Processor: KSTREAM-JOIN-0000000005 (stores: [table-STATE-STORE-0000000000])
      --> KSTREAM-SINK-0000000006
      <-- KSTREAM-FILTER-0000000004
    Source: KSTREAM-SOURCE-0000000001 (topics: [table])
      --> KTABLE-SOURCE-0000000002
    Sink: KSTREAM-SINK-0000000006 (topic: output)
      <-- KSTREAM-JOIN-0000000005
    Processor: KTABLE-SOURCE-0000000002 (stores: [table-STATE-STORE-0000000000])
      --> none
      <-- KSTREAM-SOURCE-0000000001

A sample project is available at https://github.com/metcox/brave-kafka-streams-topology.git

jeqo commented 5 years ago

@metcox thanks for reporting this.

In the case of filtering, this is the expected behaviour from the Processor API point of view, although I understand is not what users would expect.

Kafka docs states that transform:

Marks the stream for data re-partitioning: Applying a grouping or a join after transform will result in re-partitioning of the records. If possible use transformValues instead, which will not cause data re-partitioning.

Unfortunately, if we change the implementation from transform to transformValues it will break the filtering functionality by emitting kv pairs with value=null.

In this case, I'd recommend to use the native filtering (without tracing) if grouping or joining is happening after using transform. I will add a warning to the API docs.

I've found that peek and mark has a similar issues, but those can be refactored into transformValues instead. I will create a patch to fix this.

jorgheymans commented 5 years ago

Unfortunately, if we change the implementation from transform to transformValues it will break the filtering functionality by emitting kv pairs with value=null

As a workaround one could put .filter((key, value) -> value != null) just after ?

If the filtering operation is potentially expensive it's worth being able to trace it without risking the repartition performance hit, WDYT @jeqo ?

jeqo commented 5 years ago

That's one of the 2 options I see:

  1. Keep it as transform and warn about partitioning
  2. Filter with transformValues and require an additional filter right after to remove nulls.

I'm considering the option 2, but with a different name, like markFilter, to avoid confusion.

And we could keep the option 1 with a warning. As transformer is used, I think it is enough for users to be aware of repartitioning.

On Thu, 11 Jul 2019, 09:40 Jorg Heymans, notifications@github.com wrote:

Unfortunately, if we change the implementation from transform to transformValues it will break the filtering functionality by emitting kv pairs with value=null

As a workaround one could put .filter((key, value) -> value != null) just after ?

If the filtering operation is potentially expensive it's worth being able to trace it without risking the repartition performance hit, WDYT @jeqo https://github.com/jeqo ?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/openzipkin/brave/issues/942?email_source=notifications&email_token=ABPE6XIXP2IJJEA4L5AS7G3P63PYJA5CNFSM4H7Q6SB2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODZVZ3QA#issuecomment-510369216, or mute the thread https://github.com/notifications/unsubscribe-auth/ABPE6XO3CG2YMDCVPQH3AWLP63PYJANCNFSM4H7Q6SBQ .

metcox commented 5 years ago

Unfortunately, if we change the implementation from transform to transformValues it will break the filtering functionality by emitting kv pairs with value=null

Right, thank you for pointing that out.

That's one of the 2 options I see: 1. Keep it as transform and warn about partitioning 2. Filter with transformValues and require an additional filter right after to remove nulls. I'm considering the option 2, but with a different name, like markFilter, to avoid confusion. And we could keep the option 1 with a warning. As transformer is used, I think it is enough for users to be aware of repartitioning.

Having option 2 and keeping option 1 is okay for me.