KxSystems / kafka

kdb+ to Apache Kafka adapter, for pub/sub
https://code.kx.com/q/interfaces
Apache License 2.0
50 stars 30 forks source link

Allow to set timestamp for message at publish time #60

Open SrikarSaggurthi opened 4 years ago

SrikarSaggurthi commented 4 years ago

Is your feature request related to a problem? Please describe. It would be helpful if we can set timestamp of the message, it helps to query offsets by time possible resulting in simplified recovery logic.

Describe the solution you'd like Allow for .kfk.pub to take timestamp as a parameter which can underneath call librdkafka's rd_kafka_producev() API.

Additional resource https://github.com/edenhill/librdkafka/issues/1016

mshimizu-kx commented 3 years ago

H Srikar,

I added the functionality for .kafka.publishWithHeaders. The example code shows how to specify the timestamp:

  .kafka.publish[topic;.kafka.PARTITION_UA; "Hello from producer";""];
  .kafka.publishWithHeaders[producer; .z.p; topic; .kafka.PARTITION_UA; "locusts"; ""; `header1`header2!("firmament"; "divided")];

You can use .z.p but Kafka only accepts milliseconds precision, so actually it is not different from "wallclock time (UTC)" mentioned in the reference whch is enabled by (api.version.request; true).

kfk_cfg:(!) . flip(
  (`metadata.broker.list;`localhost:9092);
  (`statistics.interval.ms;`10000);
  (`queue.buffering.max.ms;`1);
  (`fetch.wait.max.ms;`10);
  (`api.version.request; `true)
  );
...
omit
...
q)-10#data2
mtype          topic client partition offset msgtime                       data                       key      headers                                                  rcvtime     
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
_PARTITION_EOF test2 0      0         108                                  "Broker: No more messages" `byte$() (`symbol$())!""                                          17:20:06.417
               test2 0      0         108    2021.03.18D17:20:10.391000000 "Hello from producer"      `byte$() (`symbol$())!""                                          17:20:10.394
               test2 0      0         109    2021.03.18D17:20:10.391000000 "locusts"                  `byte$() `header1`header2!(0x6669726d616d656e74;0x64697669646564) 17:20:10.394
_PARTITION_EOF test2 0      0         110                                  "Broker: No more messages" `byte$() (`symbol$())!""                                          17:20:10.409
               test2 0      0         110    2021.03.18D17:20:14.390000000 "Hello from producer"      `byte$() (`symbol$())!""                                          17:20:14.394
               test2 0      0         111    2021.03.18D17:20:14.390000000 "locusts"                  `byte$() `header1`header2!(0x6669726d616d656e74;0x64697669646564) 17:20:14.394
_PARTITION_EOF test2 0      0         112                                  "Broker: No more messages" `byte$() (`symbol$())!""                                          17:20:14.406
               test2 0      0         112    2021.03.18D17:20:18.389000000 "Hello from producer"      `byte$() (`symbol$())!""                                          17:20:18.395
               test2 0      0         113    2021.03.18D17:20:18.390000000 "locusts"                  `byte$() `header1`header2!(0x6669726d616d656e74;0x64697669646564) 17:20:18.395
_PARTITION_EOF test2 0      0         114                                  "Broker: No more messages" `byte$() (`symbol$())!""                                          17:20:18.408

From this result, we don't think it is worth introducing this additional parameter.

Thanks, Masato