exasol / kafka-connector-extension

Exasol Kafka Extension for accessing Apache Kafka
MIT License
4 stars 7 forks source link

Add ability to access all fields from a kafka record #28

Closed baunz closed 3 years ago

baunz commented 3 years ago

Hi,

currently the table contains only the record value plus the offset and partition of the record. There are at least two core elements missing (key and timestamp) and there is also headers available.

First I started with a boolean config setting EMIT_RECORD_KEY-Setting but it feels wrong to have EMIT_RECORD_KEY and EMIT_RECORD_TIMESTAMPS flags because the table DDL would be hard to figure out.

The big solution would be to somehow be able to specify something with template variables:

ADDITIONAL_FIELDS=${partition}, ${offset}, ${timestamp}, ${key}, ${headers.sourceSystem}

(being able to remove the necessary partition and offset fields makes no sense, just for clarification)

Greetings, Johannes

morazow commented 3 years ago

Hello @baunz,

Thanks for the feedback!

Yes, this would be nice feature. Unfortunately, we cannot easily detect the table column names within the import udf.

Using additional fields parameter is nicer, this also expects from users to create the table according to it. Similarly, we could also use another parameter for schema mapping file (for example, https://github.com/exasol/schemas) that indicates which fields required.

jwarlander commented 3 years ago

Being able to extract the Kafka message key & timestamp would be very useful for us, too!

As we're expanding our use of direct Kafka imports, we're seeing more and more situations where we're having to push for changes in the messages we get sent, to make up for not being able to extract the key (= we need a separate message with some kind of 'delete' event type in one of the message fields), etc.

baunz commented 3 years ago

Hi @morazow

thanks for the hints. I scanned over the EDML repository and tried to find some examples but did not really understand what we would gain from it. As far as I can see we would have a structured way of specifying fields via json (instead of the current RECORD_FIELDS comma-separated list, but is there code giving the functionality currently present (truncating strings, converting to decimal) somewhere or does it have to be reimplemented by every virtual schema adapter?

Also, what would be the right content for the source-column? In the examples there is always a placeholder and in the concrete dynamodb case this is a table - would this then be a topic? I would be happy to further explore this option but need some guidance.

Even when using this approach there would need to be a way to reference the fields of the kafka key & value and the headers.

With the option of either using the full record or dedicated fields this is hard currently as one would need to reference both a single field (e.g. for timestamp) and multiple fields (when one wants not to specify any fields)

FIELDS=${partition}, ${offset}, ${headers.timestamp}, ${key}, ${record}

which would give a row of e.g.

0;1;1621198241243;key1;recordField1;recordField2;..;recordFieldN

when AS_JSON_DOC is false - and

0;1;1621198241243;key1;recordAsJson

when AS_JSON_DOC is true.

I currently can't think of a smooth way that preserves the current default behaviour which uses all fields from a record...

jwarlander commented 3 years ago

I like the FIELDS=... approach, and.. I'm not sure if one would really need to be able to specify specific headers etc in a first iteration, at least?

Basically, this.. FIELDS=${partition}, ${offset}, ${key}, ${timestamp}, ${record}, ${headers}

..would give emit columns KAFKA_PARTITION, KAFKA_OFFSET, KAFKA_KEY, KAFKATIMESTAMP, \<all of the record fields>, \<all of the header fields>_.

If you want to extract only specific headers, or specific record fields, you'd emit those as JSON.. perhaps like this:

AS_JSON_DOC Behavior
true Emit the record as a JSON object, if present in FIELDS, and emit the headers as a JSON list*, if present in FIELDS
headers Emit only the headers as a JSON list*; the record, if included, will be extracted to separate fields
record Emit only the record as a JSON object; the headers, if included, will be extracted to separate fields

*List, not object, since headers with same key can appear multiple times, in the order they were added

The record would go into JSON_DOC_COL just as today to preserve compatibility, and the headers into JSON_HEADERS_COL.

baunz commented 3 years ago

Hi @jwarlander, I agree that adding headers might not be as important as accessing key & timestamp - we should keep this separate.

Thanks for your suggestion, it makes sense but I somehow miss the ability to explicitly define the fields to be used from the record (introduced in #25) which I think is valuable when the schema of the topic is evolving to not break importing when new fields are added (or existing fields are re-ordered). I understand

If you want to extract only specific headers, or specific record fields, you'd emit those as JSON.. perhaps like this:

but this makes importing as json necessary even it is not "the default path" in my oppinion.

Another way I am thinking of is to use a syntax in the 'FIELDS' definition that makes the goal clear:

The default now is:

FIELDS=record.*,partition,offset

and when AS_JSON_DOC=true is specified this would be equivalent to

FIELDS=record,partition,offset

With this approach, one could import a set of fields from the record plus the whole record as json. This can make sense when new fields are added by the producer that should not be lost. New fields can be made top-level fields after the change without errors and etl jobs can handle this by a logic like

CASE WHEN NEW_FIELD IS NULL JSON_VALUE(JSON_DOC_COL,'$.newField') else NEW_FIELD END

jwarlander commented 3 years ago

That's a really nice approach!

Do you imagine this selection syntax as mostly being a simplified option for top-level fields, so that eg. if you need a nested field you'll still have to rely on JSON_EXTRACT later in the pipeline?

baunz commented 3 years ago

I think so - it somewhat resembles Jsonpath but this really would add a lot of complexity, tests and so on for providing it for json and avro records.

jwarlander commented 3 years ago

@baunz, I had to make a local change today to enable storing the Kafka timestamps.. so I guess I was wondering if you had a draft in progress for this, that I could look towards aligning with? :sweat_smile:

Right now I just picked the simplest option, with a KAFKA_TIMESTAMP = 'true' option, and pulling out the timestamp in the original format (as number of milliseconds since epoch) as that's easy enough to work with in Exasol, and less Scala code = less risk for bugs, for me at least :joy:

baunz commented 3 years ago

Hi @jwarlander, funny thing I started yesterday with the solution I proposed and as soon as it is in a state presentable I will notify you! Have a nice evening!

morazow commented 3 years ago

Hello @baunz, @jwarlander,

Thanks for the discussions! It helps for us greatly to see how the connectors are used and what features required.

Please let me know when you have a pull request!

baunz commented 3 years ago

Hi @jwarlander ,

this (#34 ) is the first draft and I only fixed all existing tests, but there 's need for some documentation I will add tomorrow. Apart from that I just noticed that you are struggling with the changes I made for the json import (#31 ), sorry for that. Let's work it out the next days..

Cheers, Johannes

morazow commented 3 years ago

Hey @baunz, @jwarlander

Thanks for the discussions! I also like the FIELDS approach. Let's get the #34 merged soon.

morazow commented 3 years ago

Fixed in #34