fluent / fluent-bit

Fast and Lightweight Logs and Metrics processor for Linux, BSD, OSX and Windows
https://fluentbit.io
Apache License 2.0
5.82k stars 1.58k forks source link

Kafka output partition key #915

Closed dgalichet closed 2 years ago

dgalichet commented 5 years ago

Hi,

AFAIK, there is now way to specify a partition key that would be used to identify target partition (in fluentd we have default_partition_key)

For me it's an important feature in order to preserve message order.

For example, if you tail multiple log files, a key could be path of the file and it would preserve line ordering of a log file. Without this ability, there is no guarantee that the messages will be processed "in order" because they will be dispatched randomly in multiple partitions (and there is no guaranteed processing order between multiple partitions).

Regards,

David

gg7 commented 5 years ago

Here's a crude patch:

diff --git a/plugins/out_kafka/kafka_topic.c b/plugins/out_kafka/kafka_topic.c
index 373a2197..3e073a22 100644
--- a/plugins/out_kafka/kafka_topic.c
+++ b/plugins/out_kafka/kafka_topic.c
@@ -18,6 +18,8 @@
  *  limitations under the License.
  */

+#include <unistd.h>
+
 #include <fluent-bit/flb_info.h>
 #include <fluent-bit/flb_log.h>
 #include <fluent-bit/flb_mem.h>
@@ -25,13 +27,43 @@
 #include "kafka_config.h"
 #include "rdkafka.h"

+#define HOSTNAME_BUF_SIZE 256
+static char hostname[HOSTNAME_BUF_SIZE];
+
+unsigned long djb2_hash(unsigned char *str)
+{
+    unsigned long hash = 5381;
+    int c;
+
+    while ((c = *str++))
+        hash = ((hash << 5) + hash) + c; /* hash * 33 + c */
+
+    return hash;
+}
+
+int32_t kafka_hostname_partitioner(const rd_kafka_topic_t *rkt,
+                                   const void *key, size_t keylen,
+                                   int32_t partition_cnt,
+                                   void *rkt_opaque,
+                                   void *msg_opaque) {
+    if (hostname[0] == 0) {
+        gethostname(hostname, HOSTNAME_BUF_SIZE);
+        hostname[HOSTNAME_BUF_SIZE - 1] = 0;
+    }
+
+    return djb2_hash((unsigned char*) hostname) % partition_cnt;
+}
+
 struct flb_kafka_topic *flb_kafka_topic_create(char *name,
                                                struct flb_kafka *ctx)
 {
     rd_kafka_topic_t *tp;
     struct flb_kafka_topic *topic;

-    tp = rd_kafka_topic_new(ctx->producer, name, NULL);
+    rd_kafka_topic_conf_t *conf = rd_kafka_topic_conf_new();
+    rd_kafka_topic_conf_set_partitioner_cb(conf, kafka_hostname_partitioner);
+
+    tp = rd_kafka_topic_new(ctx->producer, name, conf);
     if (!tp) {
         flb_error("[out_kafka] failed to create topic: %s",
                   rd_kafka_err2str(rd_kafka_last_error()));

Use host networking / nodename environment variables if you want proper message ordering across pod restarts. Ideally fluent-bit should use pod names for partition selection, but this was simpler to implement.

github-actions[bot] commented 2 years ago

This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the exempt-stale label.

github-actions[bot] commented 2 years ago

This issue was closed because it has been stalled for 5 days with no activity.