openfaas / nats-queue-worker

Queue-worker for OpenFaaS with NATS Streaming
https://docs.openfaas.com/reference/async/
MIT License
128 stars 59 forks source link

Added reconnection logic when NATS is disconnected #49

Closed bartsmykla closed 5 years ago

bartsmykla commented 5 years ago

It should resolve the issue: https://github.com/openfaas/faas/issues/1031 and also resolves issues from: https://github.com/openfaas/nats-queue-worker/pull/33

Signed-off-by: Bart Smykla bsmykla@vmware.com

Description

Motivation and Context

How Has This Been Tested?

Types of changes

Checklist:

bartsmykla commented 5 years ago

@kozlovic I think that all of your comments are done now. Can you check again, please?

bartsmykla commented 5 years ago

@alexellis if we want to have two of the values (maxReconnect and delayBetweenReconnect) as a part of NatsConfig, we'll need to change it in gateway. I have created new function with defaults NewDefaultNatsConfig

kozlovic commented 5 years ago

@bartsmykla I can comment only on the usage of the NATS Streaming API and that looks good to me!

alexellis commented 5 years ago

Hi Bart, good work on this 👍

Ivan thanks so much for your help here.

Do you have a Docker image pushed that I can use to test the change locally?

Alex

alexellis commented 5 years ago

Quick question - (What output do you get when you scale the gateway from 1 to 0 replicas in the logs? Do you see it try to reconnect?)

Alex

bartsmykla commented 5 years ago

@alexellis image is: bartsmykla/gateway:0.0.15

bartsmykla commented 5 years ago

@alexellis the change is for gateway, so when it's scaled down to 0, there is no logs related to reconnection

alexellis commented 5 years ago

the change is for gateway, so when it's scaled down to 0, there is no logs related to reconnection

Yes that's why I asked to scale the gateway down and watch the logs. Please could you try that and let us know if any logs about reconnecting are generated? This question is relating to Ivan's comment on shutdown.

bartsmykla commented 5 years ago

They are not (I checked that before, so my comment was written after me checking). There is no logs after. Container is just killed and that's it.

alexellis commented 5 years ago

Thanks for your work on this Bart and @kozlovic for lending your expertise. This is now merged for the producer (gateway) and will be released.

alexellis commented 5 years ago

I've tested out the changes with a new gateway and it seems like the reconnect doesn't work with the in-memory store at all. Is this to be expected?

Also the buffer seemed to fill up with a tiny request while the NATS connection was severed?

func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | NatsQueue - submitting request: figlet.
func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | 2019/01/29 15:22:17 Forwarded [POST] to /async-function/figlet - [202] - 0.004220 seconds
func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | 2019/01/29 15:22:17 Forwarded [POST] to /system/async-report - [202] - 0.000080 seconds
func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | 2019/01/29 15:22:17 Forwarded [POST] to /system/async-report - [202] - 0.000178 seconds
^Z
[2]+  Stopped                 docker service logs func_gateway -f
alexellis-a02:faas alexellis$ bg
[2]+ docker service logs func_gateway -f &
alexellis-a02:faas alexellis$ 
alexellis-a02:faas alexellis$ 
alexellis-a02:faas alexellis$ 
alexellis-a02:faas alexellis$ 
alexellis-a02:faas alexellis$ docker service scale func_nats=0
func_nats scaled to 0
overall progress: 0 out of 0 tasks 
verify: Service converged 
alexellis-a02:faas alexellis$ curl http://127.0.0.1:8080/async-function/figlet -d alex
nats: outbound buffer limit exceededfunc_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | 2019/01/29 15:22:54 Forwarded [POST] to /async-function/figlet - [500] - 0.000362 seconds
func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | NatsQueue - submitting request: figlet.
func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | nats: outbound buffer limit exceeded
alexellis-a02:faas alexellis$ curl http://127.0.0.1:8080/async-function/figlet -d alex
nats: outbound buffer limit exceededfunc_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | NatsQueue - submitting request: figlet.
func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | nats: outbound buffer limit exceeded
func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | 2019/01/29 15:22:55 Forwarded [POST] to /async-function/figlet - [500] - 0.000229 seconds
alexellis-a02:faas alexellis$ curl http://127.0.0.1:8080/async-function/figlet -d alex
nats: outbound buffer limit exceededfunc_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | 2019/01/29 15:22:55 Forwarded [POST] to /async-function/figlet - [500] - 0.000147 seconds
func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | NatsQueue - submitting request: figlet.
func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | nats: outbound buffer limit exceeded
alexellis-a02:faas alexellis$ curl http://127.0.0.1:8080/async-function/figlet -d alex
nats: outbound buffer limit exceededfunc_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | NatsQueue - submitting request: figlet.
func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | nats: outbound buffer limit exceeded
func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | 2019/01/29 15:22:56 Forwarded [POST] to /async-function/figlet - [500] - 0.000340 seconds
alexellis-a02:faas alexellis$ 
alexellis-a02:faas alexellis$ 
alexellis-a02:faas alexellis$ 
alexellis-a02:faas alexellis$ docker service logs func_gateway -f
alexellis-a02:faas alexellis$ docker service scale func_nats=1
func_nats scaled to 1
overall progress: 1 out of 1 tasks 
1/1: running   [==================================================>] 
verify: Service converged 
alexellis-a02:faas alexellis$ curl http://127.0.0.1:8080/async-function/figlet -d alex
stan: connection closedfunc_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | 2019/01/29 15:23:15 Forwarded [POST] to /async-function/figlet - [500] - 0.000109 seconds
func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | NatsQueue - submitting request: figlet.
func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | stan: connection closed
alexellis-a02:faas alexellis$ curl http://127.0.0.1:8080/async-function/figlet -d alex
stan: connection closedfunc_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | NatsQueue - submitting request: figlet.
func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | stan: connection closed
func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | 2019/01/29 15:23:16 Forwarded [POST] to /async-function/figlet - [500] - 0.000456 seconds
alexellis-a02:faas alexellis$ curl http://127.0.0.1:8080/async-function/figlet -d alex
stan: connection closedfunc_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | NatsQueue - submitting request: figlet.
alexellis-a02:faas alexellis$ func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | stan: connection closed
func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | 2019/01/29 15:23:20 Forwarded [POST] to /async-function/figlet - [500] - 0.000239 seconds

The result is after scaling NATS Streaming to 0 and back to 1 is:

func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | NatsQueue - submitting request
func_gateway.1.ftr96nc16pwc@linuxkit-025000000001    | stan: connection closed
kozlovic commented 5 years ago

If there is a request (calling Queue()) while the server is down and the connection is marked as closed, this is expected that the Publish() call would fail. How the returned error makes your software behave, that I don't know. It is also possible that even if the reconnect occurs, the Queue() still has a reference to the previous (now closed) connection and goes into a Publish() that would also result in error.

bartsmykla commented 5 years ago

I'll test it tomorrow, but before NATS server will reconnect stan: connection closed is expected

kozlovic commented 5 years ago

Actually, you should get some error about nats: outbound buffer limit exceeded, and this is because we set the reconnect buffer to -1 to not buffer data while disconnected. And since by default the NATS connection created by STAN is set to reconnect for every, if you get connection closed, it could mean that your program closes the connection (maybe in response to a publish error?), which would prevent it to "reconnect" since the ConnectionLostHandler is - as documented - not invoked on an explicit connection close. So make sure that you are not explicitly closing the connection before the server has been restarted.