nodeshift-starters / reactive-example

Apache License 2.0
6 stars 7 forks source link

Comments / Suggestions #8

Closed helio-frota closed 3 years ago

helio-frota commented 3 years ago

From @cescoffier

It's a good first step.
I have a few comments:

- on the consumer side:
    * records are never committed - you need to commit once processed. However, commits are expensive, and in general, we batch a few commits (be aware that it's a log, so if you commit offset 5, be sure to have processed the record associated with the offset 1, 2, 3, 4.
    * you set a consumer group, but I'm not sure it's used anywhere. As records are not committed, it's kind of useless. I believe you need to fix the missing commits. Then, you would be able to scale up the consumer and see the load balancing (if partitioning is done right, more on this on the publisher side)

- on the producer side:
    * keys are null. Keys are the base of partitions. Records with the same key will go in the same partition. So, it's important to have a key that makes sense.  

- both sides:
    * I'm wondering about fault tolerance. What happens if the connection to the broker is lost? If a consumer crashes, normally the commits provide the base for recovery. 
helio-frota commented 3 years ago
- both sides:
    * I'm wondering about fault tolerance. What happens if the connection to the broker is lost?
      If a consumer crashes, normally the commits provide the base for recovery. 

node-rdkafka -- producer:

Stopped the Kafka broker after 15 seconds producing and waiting until 1 minute to see what happened:

$ time node producer-backend/producer.js
^C

real    1m0.776s
user    0m0.158s
sys 0m0.000s

Probably we need some try/catch or logging or something else to see what is happening.


node-rdkafka -- consumer:

Stopped the Kafka broker after 15 seconds producing and consuming, and waiting until 1 minute to see what happened:

 $ time node consumer-backend/consumer.js 
Croatia
Mauritania
Anguilla
St. Vincent & Grenadines
Bahamas
Western Sahara
Kazakhstan
Rwanda
Syria
Niue
Niger
French Guiana
Tajikistan
Hong Kong SAR China
^C
real    0m58.528s
user    0m1.382s
sys 0m0.243s

Probably we need some try/catch or logging or something else to see what is happening.


kafkaJS -- producer:

Stopped the Kafka broker after 15 seconds producing and right after we see log errors:

time node producer-backend/producer.js 
{"level":"ERROR","timestamp":"2021-04-29T14:08:48.817Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 127.0.0.1:9092","broker":"localhost:9092","clientId":"kafkajs-producer","stack":"Error: connect ECONNREFUSED 127.0.0.1:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1146:16)"}
{"level":"ERROR","timestamp":"2021-04-29T14:08:48.824Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 127.0.0.1:9092","broker":"localhost:9092","clientId":"kafkajs-producer","stack":"Error: connect ECONNREFUSED 127.0.0.1:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1146:16)"}
{"level":"ERROR","timestamp":"2021-04-29T14:08:48.827Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 127.0.0.1:9092","broker":"localhost:9092","clientId":"kafkajs-producer","stack":"Error: connect ECONNREFUSED 127.0.0.1:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1146:16)"}
{"level":"ERROR","timestamp":"2021-04-29T14:08:48.829Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: connect ECONNREFUSED 127.0.0.1:9092","retryCount":0,"retryTime":319}
{"level":"ERROR","timestamp":"2021-04-29T14:08:49.152Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 127.0.0.1:9092","broker":"localhost:9092","clientId":"kafkajs-producer","stack":"Error: connect ECONNREFUSED 127.0.0.1:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1146:16)"}
{"level":"ERROR","timestamp":"2021-04-29T14:08:49.153Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: connect ECONNREFUSED 127.0.0.1:9092","retryCount":1,"retryTime":622}
{"level":"ERROR","timestamp":"2021-04-29T14:08:49.777Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 127.0.0.1:9092","broker":"localhost:9092","clientId":"kafkajs-producer","stack":"Error: connect ECONNREFUSED 127.0.0.1:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1146:16)"}
{"level":"ERROR","timestamp":"2021-04-29T14:08:49.778Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: connect ECONNREFUSED 127.0.0.1:9092","retryCount":2,"retryTime":1422}

...
omitted long output
...

^C
real    0m22.380s
user    0m0.298s
sys 0m0.048s

kafkajs -- consumer:

Stopped the Kafka broker after 15 seconds producing and consuming, and right after we see log errors:

$ time node consumer-backend/consumer.js 
{"level":"INFO","timestamp":"2021-04-29T14:12:21.581Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"consumer-test"}
{"level":"INFO","timestamp":"2021-04-29T14:12:21.613Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"consumer-test","memberId":"kafkajs-consumer-e43350bf-585f-4fcf-8c12-112156312d43","leaderId":"kafkajs-consumer-e43350bf-585f-4fcf-8c12-112156312d43","isLeader":true,"memberAssignment":{"countries":[0]},"groupProtocol":"RoundRobinAssigner","duration":32}
{"level":"ERROR","timestamp":"2021-04-29T14:12:35.571Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 127.0.0.1:9092","broker":"localhost:9092","clientId":"kafkajs-consumer","stack":"Error: connect ECONNREFUSED 127.0.0.1:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1146:16)"}
{"level":"ERROR","timestamp":"2021-04-29T14:12:35.572Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: connect ECONNREFUSED 127.0.0.1:9092","retryCount":0,"retryTime":330}
{"level":"ERROR","timestamp":"2021-04-29T14:12:35.905Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 127.0.0.1:9092","broker":"localhost:9092","clientId":"kafkajs-consumer","stack":"Error: connect ECONNREFUSED 127.0.0.1:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1146:16)"}
{"level":"ERROR","timestamp":"2021-04-29T14:12:35.908Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: connect ECONNREFUSED 127.0.0.1:9092","retryCount":1,"retryTime":758}
{"level":"ERROR","timestamp":"2021-04-29T14:12:36.668Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 127.0.0.1:9092","broker":"localhost:9092","clientId":"kafkajs-consumer","stack":"Error: connect ECONNREFUSED 127.0.0.1:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1146:16)"}
{"level":"ERROR","timestamp":"2021-04-29T14:12:36.670Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: connect ECONNREFUSED 127.0.0.1:9092","retryCount":2,"retryTime":1242}
{"level":"ERROR","timestamp":"2021-04-29T14:12:37.913Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 127.0.0.1:9092","broker":"localhost:9092","clientId":"kafkajs-consumer","stack":"Error: connect ECONNREFUSED 127.0.0.1:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1146:16)"}
{"level":"ERROR","timestamp":"2021-04-29T14:12:37.913Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: connect ECONNREFUSED 127.0.0.1:9092","retryCount":3,"retryTime":2496}
^C
real    0m17.820s
user    0m0.653s
sys 0m0.089s
helio-frota commented 3 years ago

Interesting to add:

On node-rdkafka consumer on.error event this is doing nothing (?):

stream.on('error', (err) => {
  if (err) console.log(err);
});

https://github.com/nodeshift-starters/reactive-example/blob/main/consumer-backend/consumer.js#L28

Same thing for the node-rdkafka producer:

.on('event.error', (err) => {
  console.error('event.error', err);
  reject(err);
});

https://github.com/nodeshift-starters/reactive-example/blob/main/producer-backend/producer.js#L15

So a Kafka broker server down is not an error?

helio-frota commented 3 years ago

I'm wondering about fault tolerance. What happens if the connection to the broker is lost?

/cc @cescoffier @mhdawson ^^

lance commented 3 years ago
* keys are null. Keys are the base of partitions. Records with the same key will go in the same partition. So, it's important to have a key that makes sense.  

I think this can be addressed by adding the node-rdkafka example comments

 // optionally we can manually specify a partition for the message
 // this defaults to -1 - which will use librdkafka's default partitioner (consistent random for keyed messages, random for unkeyed messages)

here

https://github.com/nodeshift-starters/reactive-example/blob/dc1a52dc6ca3eb46e31562c9e98b8e4b72b19502/producer-backend/producer.js#L25

mhdawson commented 3 years ago

@helio-frota, I think the question was not to show us what happens, but to fix the example so that it does the right thing in the case of a failure..... ie the question was, "Does it do the right thing if/when there is a failure in the broker", if not lets make sure it does.

In terms of on-error, you'll have to read the node-rdkafka docs to see how errors should be reported.

lance commented 3 years ago
* I'm wondering about fault tolerance. What happens if the connection to the broker is lost? If a consumer crashes, normally the commits provide the base for recovery. 

To be concrete about it... you could wrap producer.produce('countries', null, value); with a circuit breaker, having a fallback function that places the message into a queue for later retry, once the circuit closes again. (circuit.on('close', sendQueuedMessages)).

As for the connection to the broker, with node-rdkafka the underlying C binaries handle connection retries, etc. It's not clear how long it will wait/retry before reporting an error. That might explain why you aren't seeing error logs. :shrug: - obviously KafkaJS reports this immediately, and includes retry information.

"message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: connect ECONNREFUSED 127.0.0.1:9092","retryCount":1,"retryTime":622
mhdawson commented 3 years ago

obviously KafkaJS reports this immediately, and includes retry information.

This is because logging is enabled, from the client API perspective it is probably waiting for the broker to come back just like node-rdkafka.

helio-frota commented 3 years ago

I'll open 1 question in the opossum repo. @lance please don't feel pressure to answer that kind of natural way to do it...but I'm personally ok if not getting the answer since I never found an answer similar before.

(maybe 2 questions... I'll need to test that because I suspect opossum can handle a failure based on specific timeout so it guards cases where we have no response from a specific API -- that is 1 underrated/hidden feature btw)

, but to fix the example so that it does the right thing in the case of a failure In terms of on-error, you'll have to read the node-rdkafka docs to see how errors should be reported.

@mhdawson thanks, I'll take a look at that situation. But in terms of X vs Y Kafka libraries, it is +1 for kafkaJS /cc @wtrocki

helio-frota commented 3 years ago

The documentation is not helping or probably I'm missing something.

But I was able to change the code to have a better behavior with node-rdkafka:

trying to connect with server down:

node producer-backend/producer.js 
[thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)

Producing and then stopping the server:

node producer-backend/producer.js 
producing...
done.
producing...
done.
producing...
done.
producing...
done.
producing...
done.
producing...
done.
producing...
Error: Producer not connected

https://github.com/nodeshift-starters/reactive-example/commit/5cc03e9cd290ef267b62317f4244e72f3893b125

mhdawson commented 3 years ago

does it then reconnect once the server comes back?

helio-frota commented 3 years ago

no, it doesn't reconnect but maybe I'm missing something.

mhdawson commented 3 years ago

I guess the question is should the example show recovering from the server being temporarily offline. It comes down to whether this should be a full example that customers would copy, versus a very simple example.

@helio-frota did you get a change to catch up with Luke as to where we think we should take the example? If not maybe a short meeting with you, me and Luke would make sense to agree on scope.

helio-frota commented 3 years ago

It comes down to whether this should be a full example that customers would copy, versus a very simple example.

Yeah, I agree. That is something we should also decide. Because we can always improve examples if necessary.

@helio-frota did you get a change to catch up with Luke as to where we think we should take the example? If not maybe a short meeting with you, me and Luke would make sense to agree on scope.

I had a short chat with Luke about it, but we don't end up with concrete steps if we are going to continue with this or create one based on another consolidated example. I think that is good to have a short meeting about it :+1:

helio-frota commented 3 years ago

I think now this can be closed