confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
102 stars 1.04k forks source link

TopicNameExtractor Support #6703

Open rcollina opened 3 years ago

rcollina commented 3 years ago

Search for existing answers Searched both in here and SO.

Provide details of the setup you're running None at the moment, evaluating options.

Outline your question The example using TopicNameExtractor in the following article was indeed quite interesting, and it'd fit our own needs perfectly.

https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/

Is there any way to emulate the very same dynamic routing behaviour with ksqlDB?

Thank you

vpapavas commented 3 years ago

Hi @rcollina ,

Does the following sound like what you are trying to do?

First, you create a STREAM that collects all the data, let's call it allEvents that has its corresponding topic 'allEventsTopic'. Then, for each category of events, you create a new STREAM like so

CREATE STREAM eventCategory1 as SELECT * from allEvents WHERE event.category = 'xxx'

The latter, will create a new stream and topic per event category.

rcollina commented 3 years ago

@vpapavas, thank you for you reply. I believe this is not what I’m looking for, unless I’m misunderstanding your example. In my mind, it’d read like this:

CREATE STREAM $stream_name_param AS SELECT * FROM allEvents

Where $stream_name_param is a value taken from a message header, a body property, or an expression. It is not known in advance.

For example, I might have a “reply_to” header in my message, and that’s the topic I’d want to route it to.

Perhaps a dedicated statement, CREATE STREAM ROUTE, CREATE STREAM EXCHANGE or something to that effect would avoid confusion.

I hope this clears things up, thank you.

agavra commented 3 years ago

Thanks for filing the issue @rcollina! I'm not sure this is possible using standard SQL syntax, which is very declarative about these types of things. As it stands, I think you'd have to stick with Kafka Streams to achieve this, but I'm all ears about suggestions for how to get this to work in a SQL-like dialect.

EDIT: I just saw your suggestion for CREATE STREAM ROUTE. This might be something the amazing @mjsax would be interested in!

rcollina commented 3 years ago

@agavra, thank you!

I'd be thrilled to see this in ksql.

PeterLindner commented 3 years ago

Another option may be a 'hyperstream' which is an abstraction over multiple ksqlDB streams similar to how timescaleDB does it with 'hypertables' over regular Postgres tables: https://docs.timescale.com/latest/introduction/architecture

rcollina commented 3 years ago

Another option may be a 'hyperstream'

@PeterLindner I need to branch off the stream, as it were, via a runtime expression of variable complexity.

I'm not sure the hypertable/hyperstream abstraction would help in this case, though I just might not be seeing it now.

PeterLindner commented 3 years ago

@rcollina the idea was to create a single database object to interact with (eg insert into or whatever) which is able to dynamically create streams underneath it based on column values or headers or maybe arbitrary expressions. Now to be useful for your usecase there has to be some way to access the substream (based on the timescaleDB reference I'll call them chunks, but I don't have a strong opinion on naming, could be called routes as well). This could obviously be done through the generated Kafka topics or somehow through ksqlDB.

If I had to dream up some syntax it may look like this:

CREATE HYPERSTREAM hs AS
SELECT
  payload
FROM some_source
PARTITION BY id
CHUNK BY CASE WHEN id % 2 = 0
  THEN 'even' 
  ELSE 'odd' 
END;

-- some variants to access the underlying streams
SELECT * FROM hs.even;
SELECT * FROM hs['even'];
SELECT * FROM chunk(hs, 'even');

I'll have to admit that this was just a quick idea and I did not think about details and there are other people more suitable than me for that, but the motivation is to have a single object you have to manage (think schema evolution for example or simply dropping the whole thing at once).

rcollina commented 3 years ago

@PeterLindner thanks for the explanation!

I can see it now, it looks quite versatile. I like it!

This would effectively enable the modeling of different routing topologies, especially when composed.

If I may, I'd forego the hyperstream indexing. I'd treat substreams as regular streams.

edit: removed suggestion to swap CHUNK BY with STREAM BY - in my humble opinion it's more of a STREAM INTO / ROUTE INTO as you mentioned.

mjsax commented 3 years ago

Well, note that dynamic routing would usually imply that all target topics exist. We had a similar discussion in https://cwiki.apache.org/confluence/display/KAFKA/KIP-303%3A+Add+Dynamic+Routing+in+Streams+Sink and decided not to auto-create topics as it would allow do DDoS the cluster by accident...

For ksqlDB, this implies that all target STREAMs should be create manually upfront. If you have all target STREAMs created, you can follow @vpapavas suggestions and write one query per target STREAM. This would work but would not be efficient though as you would consumer the input STREAM many times. Thus, it might make sense to extend the language (as a per optimization) with a "routing feature" -- however, I would keep the requirement that all target STREAMs must be created manually/upfront and thus this feature would not allow you to do something new, but would "only" allow you to do it more efficient.

Not sure if the "hypertable" concept aligns to what we want/need here though: from my understanding, a "hypertable" is logically/conceptually a single table that is "sharded" internally, while the sharding is an implementation detail and a user cannot influence how the shards are computed and how the data is mapped to shards. -- To this end, the "hypertable" design seems to be similar (even if more complex) to topic-partitions?

For the dynamic routing case, it seems that we actually want to have multiple DB objects (ie, all the target STREAMs) and the user want to dynamically pick a STREAM. Thus, the CREATE STREAM AS clause would not fit, because we assume that all target STREAM exist already. It might rather be a form of INSERT INTO?

rcollina commented 3 years ago

Sorry for not getting back to you sooner, @mjsax.

edit: I just realized this might not work in ksql after all.

Streams have schemata - you do define columns after all - whereas the routing constructs I'd need hinge on topics that may contain different types of messages, thus never bound to conform to a single schema.

mjsax commented 3 years ago

It's for sure a tricky thing to do, even if you just want to split a unify stream into sub-streams based on content. I agree that it might be even more difficult to do if you don't have a unified input schema. -- Maybe, using BYTE type (it's not there yet, but we hope to add it), could help for your use case?

rcollina commented 3 years ago

Let's pretend we can define a schemaless stream. As long as it's qualified with the right content type, you could drill into a structured representation (depending on content type) to extract data used by routing.

If I'm not mistaken, struct access generally happens using the arrow operator.

In this case we'd need an indexer of sorts accepting a JSONPATH expression (or equivalent - it needs to work across different structured content types).

I hope it makes sense, thank you.

PeterLindner commented 3 years ago

@rcollina another way to achieve this from within ksqldb, but without schemas, might be to create a Kafka-to-Kafka Connector with a CREATE CONNECTOR statement and use the ExtractTopic SMT. I'll have to admit, that I haven't tried that myself, but if I understand the docs correctly it should work.

rcollina commented 3 years ago

@PeterLindner that's neat - so simple yet spot on.

I need to investigate the kafka-to-kafka part.

Your feedback has been invaluable - thank you so much.

rcollina commented 3 years ago

So I did try and gather some info.

The only mention of a Kafka Connector that has Kafka itself as a Source is related to MirrorMaker v2.

I tried to ask if there's anything comparable to MMv2 Source Connector over at StackOverflow but, as I expected, I got the usual uncreative blurb.

I am kind of surprised there's no Kafka Source Connector for Kafka itself.