confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
102 stars 1.04k forks source link

Expose Kafka message headers as new implicit column #1940

Closed blueedgenick closed 2 years ago

blueedgenick commented 6 years ago

Similar to how we expose the key and timestamp of the kafka message via special in-built pseudo-columns rowkey and rowtime, we can expose the message headers via a new pseudo-column, of type map, named rowheaders.

See also #658 which is looking to add "message routing based on headers".

big-andy-coates commented 4 years ago

With ROWTIME now a pseudo column, the door is now open for this change. However, it will require a KLIP.

jeqo commented 4 years ago

I've took a quick look into how this may be implemented and seems that first we need to get complete support for headers in Kafka Streams DSL and Stores: https://issues.apache.org/jira/browse/KAFKA-7718.

Currently headers are only exposed in the Processor API and basically are not available when building rows in ksqlDB.

Will need to get started there first.

jeqo commented 4 years ago

I have to correct my previous comment.

I was assuming headers would have to be stored, as in a i.c.k.execution.streams.materialization.ks.KsMaterializedTable, which would require StateStores to support headers as well. But if headers are mapped as a column then there is no need to change StateStores.

I took a deeper look into the current implementation and I'd like to propose the following features:

Here is an example on how these features will play out:

--Given
CREATE STREAM orders (
    ORDERID INT KEY,
    ORDERUNITS double
)
WITH (
    kafka_topic='test_topic',
    partitions=12,
    replicas=1,
    value_format='JSON'
);
-- Set header from column
CREATE STREAM order_with_headers AS
SELECT
    *
    END AS case_result
FROM orders
WITH HEADERS orderid
EMIT CHANGES;
-- Read headers
CREATE STREAM order_and_headers AS
SELECT
    ROWHEADERS AS row_headers,
    *
FROM order_with_headers
EMIT CHANGES;

If this make sense, I'd be happy to propose a KLIP.

big-andy-coates commented 4 years ago

Hi @jeqo,

Thanks for getting involved. Some of thoughts...

  1. Headers in Kafka can have multiple values for the same key, so in KsqlDB this would probably be best represented as MAP<STRING, ARRAY<something>>, i.e. a map of string header to an array of elements.
  2. The value of the header is a problem - it's just a byte array. KsqlDB would need a BYTES type before we can expose headers.
  3. KsqlDB normally uses the WITH() clause to control serialization options so it might be more standard to set headers via this clause, e.g.
CREATE STREAM order_with_headers 
  WITH(headers='someMapOfArrayColumn') AS
  SELECT
    *
    END AS case_result
  FROM orders
  EMIT CHANGES;

Though TBH, I'm not 100% sure what the UX would be. A language change such as this will definitely need a KLIP, though you want to hold off writing one until we have a BYTES sql type, (which will also need a KLIP!).

So the first step will be a BYTES sql type.

jeqo commented 4 years ago

@big-andy-coates great feedback, thanks!

  1. Headers in Kafka can have multiple values for the same key, so in KsqlDB this would probably be best represented as MAP<STRING, ARRAY>, i.e. a map of string header to an array of elements.

This is a good point, and a decision of how much to expose in ksqldb. I'd assume most cases are string->string, and only last value matters, but understand that supporting broader cases is important as well. Would something like

  1. The value of the header is a problem - it's just a byte array. KsqlDB would need a BYTES type before we can expose headers.

Agree. Will take a look at this if there is no ongoing effort already.

  1. KsqlDB normally uses the WITH() clause to control serialization options so it might be more standard to set headers via this clause, e.g.

In this case, will it be possible to pass column values (ie. value expression) from query to the WITH() clause as in the example?

CREATE STREAM order_with_headers AS
WITH (HEADERS = MAP('orderid' := orderid,...))
SELECT *
FROM orders
EMIT CHANGES;

If it is possible, I'd be possitive to adopt this way of passing headers. I wouldn't pass an array as value though, as this clause should put your the last value, not erase older ones.

mjsax commented 4 years ago

Headers in Kafka can have multiple values for the same key, so in KsqlDB this would probably be best represented as MAP<STRING, ARRAY>, i.e. a map of string header to an array of elements.

Not sure if I agree. As you correctly state, headers don't have a unique key. However, using a map and "collapsing" headers with the same key into one entry with a list of values seems not to align well to the Kafka model. Headers are actually a list of key/value pairs from my understanding?

jeqo commented 4 years ago

@mjsax in the context of ksqldb,

a list of key/value pairs

would be better represented as a MAP<STRING, BYTES> in your opinion? or you have another structure in mind?

mjsax commented 4 years ago

I guess, technically it would be an array for structs? With each struct having a string key and bytes value?

jeqo commented 4 years ago

I'd agree, but the only option to access array elements is by index afaik; which makes it hard to use compared to Headers api, which imho emulates a map api backed by an array.

mjsax commented 4 years ago

That's a fair point...

mastoj commented 3 years ago

Has this had any progress since last summer? I was looking into doing creating some streams/tables that I would like to filter base on header values.

ivensmullerphilips commented 3 years ago

I need the same feature here.

kidpollo commented 3 years ago

+1

jzaralim commented 2 years ago

Done in 0.24