bbejeck / kafka-streams

Code examples for working with Kafka Streams
Apache License 2.0
255 stars 104 forks source link

Streams context forward multiple topics? #5

Open imperio-wxm opened 7 years ago

imperio-wxm commented 7 years ago

Usually we set one processor forward to one sink. There is any way to forward data to multiple topics?

@Override
public void process(byte[] key, byte[] value) {
     context.forward(key, value);
}

I have one source,but i want sent data to two topics,such as:

// message = {"type":"1",........}
Topic-1(source)
Topic-2、Topic-3

if(type == "1") 
     send data to Topic-2
else if(type == "2")
    send data to Topic-3
bbejeck commented 6 years ago

Hi @imperio-wxm,

Sorry for the seriously delayed response. The best way for you to do this is with KStream#branch or in the ProcessorAPI where you can forward to specific child nodes by name. Either way, the output topics need to be defined in the topology ahead of time. Let me know if you need more info.

HTH Bill

psawmora commented 5 years ago

@bbejeck What if we want to create a new record in addition to what we have and forward both to different child processors ? It's like,

 new_record = createNewRecord(old_record);
 context.forward(original_record, to(child_1));
 context.forward(new_record, to(child_2));

Is this a good practice ?

HungUnicorn commented 5 years ago

I think that's the API(Kafka) restriction and wouldn't be a good practice. Instead of doing the above, you could have two topologies as children, and let them obtain the output of old_record

ParentTopo -> NewRecordTopo(createNewRecord) -> sink1
           -> OldRecordTopo -> sink2

This could achieve your goal. @bbejeck maybe could give more insight about why Processor context cannot forward to two processors.

imperio-wxm commented 5 years ago

Hi, @bbejeck Sorry, my reply is late.

I think @psawmora is right.

I do like this, and it works, for a long time I didn't find any problems:

switch(type) {
     case type1:
          context.forward(newKey1, newValue1, TOPIC_1);
          break;
     case type2:
          context.forward(newKey2, newValue2, TOPIC_2);
          break:
     .......
}

I wrote a generic Processor to specifically split the topic. Topological image of a tree. But source topic can only be one. Is this a good practice ?

psawmora commented 5 years ago

@imperio-wxm

But have you tried forwarding the same flow to two different processors at the same time ?