nats-io / nats-kafka

NATS to Kafka Bridging
Apache License 2.0
131 stars 32 forks source link

Implements templated 'destination' (topic, subject, channel) #11

Closed jbguerraz closed 4 years ago

jbguerraz commented 4 years ago

Close #10

Bring golang template and 2 template functions (replace, substring) to help with "destination" (topic, subject, channel) definition.

Some performances related insights:

3K messages on master branch

    {
      "name": "NATS:> to Kafka:events",
      "id": "zJapCmG5BUw7lFQgLnKkzV",
      "connected": true,
      "connects": 1,
      "disconnects": 0,
      "bytes_in": 22885,
      "bytes_out": 22885,
      "msg_in": 2999,
      "msg_out": 2999,
      "count": 2999,
      "rma": 4142986.176725575,
      "q50": 3541906.348870057,
      "q75": 3914790.1003134795,
      "q90": 4556933.919117645,
      "q95": 4940597.268041236
    }

3K messages using template (but a single .Subject value, so a single topic)

    {
      "name": "NATS:> to Kafka:event.{{ .Subject }}",
      "id": "VV6N9CynhG8vNtJ8i1Q2rR",
      "connected": true,
      "connects": 1,
      "disconnects": 0,
      "bytes_in": 22885,
      "bytes_out": 22885,
      "msg_in": 2999,
      "msg_out": 2999,
      "count": 2999,
      "rma": 4184608.477492493,
      "q50": 3229506.4765625,
      "q75": 3888502.1231231233,
      "q90": 4574375.737588651,
      "q95": 5018858.678160922
    }
kozlovic commented 4 years ago

I am not sure I understand all that. So I tried master vs this branch. What I noticed is that with master, say I configure from NATS (subject ">") -> Kafka (topic "foo"), then I can publish from NATS on say bar, and it becomes foo on Kafka. The monitoring shows all messages transfered. With this branch, say I have ">" -> "events.{{ .Subject }}", then when the publisher sends, I get an error in the bridge that says that Kafka has no leader at this time, and then at the end the monitoring shows one less message (similar to your report where you have 2999). Here is the message in kafka (I publish on "bat" from NATS and that translates to events.bat on Kafka):

[2020-05-14 21:20:31,929] INFO Created log for partition event.bat-0 in /kafka/kafka-logs-b12d0e26eb59/event.bat-0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 9223372036854775807, message.format.version -> 2.5-IV0, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
kafka_1      | [2020-05-14 21:20:31,929] INFO [Partition event.bat-0 broker=1] No checkpointed highwatermark is found for partition event.bat-0 (kafka.cluster.Partition)
kafka_1      | [2020-05-14 21:20:31,930] INFO [Partition event.bat-0 broker=1] Log loaded for partition event.bat-0 with initial high watermark 0 (kafka.cluster.Partition)
kafka_1      | [2020-05-14 21:20:31,930] INFO [Partition event.bat-0 broker=1] event.bat-0 starts at leader epoch 0 from offset 0 with high watermark 0. Previous leader epoch was -1. (kafka.cluster.Partition)

On the bridge:

2020/05/14 15:20:40.346304 [INF] connector publish failure, NATS:> to Kafka:event.{{ .Subject }}, [5] Leader Not Available: the cluster is in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes

and it takes quite a bit of time to "see" messages flowing when checking monitoring. As you can see I see only 9,999 instead of the 10,000 sent:

{
"start_time":1589491226,
"current_time":1589491255,
"uptime":"28.400874922s",
"request_count":9999,
"connectors":[{"name":"NATS:\u003e to Kafka:event.{{ .Subject }}",
"id":"test",
"connected":true,
"connects":1,
"disconnects":0,
"bytes_in":1280000,
"bytes_out":1279872,
"msg_in":10000,
"msg_out":9999,
"count":9999,
"rma":1459984.9407940719,
"q50":700405.6345381525,
"q75":742358.0162950252,
"q90":782425.2260273974,
"q95":830910.954248366}],
"http_requests":{"/":0,"/healthz":0,"/varz":9}
}
jbguerraz commented 4 years ago

Hello @kozlovic This is because the topic didn't exists yet on your kafka instance. Kafka automatically creates the topic (depends on config but obviously your case) but Kafka takes some time to create a topic (on our test instance it may takes ~1 or ~2 seconds), and kafka-go won't retry (see https://github.com/segmentio/kafka-go/issues/219 / https://github.com/segmentio/kafka-go/issues/419). In our stats, it's 2999 instead of 3000 because we used i=1 && while ((i < 3000)); do ./nats-pub -s event-bus:4222 topic msg.$i; i=$(($i+1)); done and this has < 3000 instead of <= 3000. If you pre-create the topic (or re-run your test since the first message automatically created it) then it's gonna be fine.

jbguerraz commented 4 years ago

For the sake of clarifying our use case: Our cluster is made of a set of components (microservices, applications,...). Each component has an associated channel and each component produce (unstructured) logs, metrics, events and traces. Logs goes to kafka thru logstash, metrics goes to kafka thru prometheus, and events goes to kafka thru nats-kafka. Let's say the component A got the channel component.a, its logs will be published in kafka topic logs.component.a, its metrics will be published in kafka topic metrics.component.a and its events will be published in events.component.a. Then spark applications can subscribe to the data that matter to them. For instance, one spark application may care about the ingress controller logs, it then gonna subscribes to logs.component.ingress.

kozlovic commented 4 years ago

@jbguerraz I see. When using master, the topic is created as soon as the bridge starts and uses the config, while in your case, it is created on demand when getting a message, right?

jbguerraz commented 4 years ago

@kozlovic yep, this is it. The conn.connectWriter() is called on connector Start() on master. FYI, interesting KIP about topic auto-creation https://cwiki.apache.org/confluence/display/KAFKA/KIP-487%3A+Client-side+Automatic+Topic+Creation+on+Producer (basically topic auto-creation isn't supported as it once was, as far as I understood it's not recommended to use that in production especially due to defaults partition/replica => 1 by default - but configurable)

derekcollison commented 4 years ago

Would it be possible to hold the trigger message used to create and also have that one processed?

kozlovic commented 4 years ago

@jbguerraz @derekcollison Coming from NATS Streaming, pre-creating a channel is not something we are used to, but I understand and agree that it may not actually be an issue.

jbguerraz commented 4 years ago

@derekcollison it certainly is possible but will adds complexity for an use case that is actually not recommended (topic auto-creation), plus, IMHO it's more a job for kafka-go (see https://github.com/segmentio/kafka-go/issues/419 and https://github.com/segmentio/kafka-go/issues/219#issuecomment-489254763).

If we really wish to cover the case at nats-kafka level then I believe the simplest way would be to wrap the WriteMessages call with a retry loop (at some point that would be feature duplication with kafka-go that is supposed to already do so)

derekcollison commented 4 years ago

I think you are more well positioned to answer that. As a naive user I would want no messages to be dropped.

derekcollison commented 4 years ago

After slack based conversation this is more the correct behavior in the kafka ecosystem.

jbguerraz commented 4 years ago

thank you!