vernemq / vernemq

A distributed MQTT message broker based on Erlang/OTP. Built for high quality & Industrial use cases. The VerneMQ mission is active & the project maintained. Thank you for your support!
https://vernemq.com
Apache License 2.0
3.22k stars 394 forks source link

Calling gRPC from plugin blocks client session #882

Closed drasko closed 5 years ago

drasko commented 5 years ago

Environment

Expected behavior

Auth plugin should call auth_on_publish()

Actual behaviour

Plugin implementation: https://github.com/drasko/mqtt-erl/blob/master/src/mfx_auth.erl

auth_on_register() goes well, but then auth_on_publish() is never called.

drasko@Marx:~/vernemq/vernemq-broker$ cat _build/default/rel/vernemq/log/console.log
2018-10-07 00:28:24.185 [info] <0.721.0> auth_on_register: {{127,0,0,1},54772} {[],<<"mosqpub|12856-Marx">>} <<"7">> <<"56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3c">> true
2018-10-07 00:28:24.185 [info] <0.721.0> auth_thing: <<"56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3c">>

and

drasko@Marx:~/go/src/github.com/mainflux/mainflux/build$ mosquitto_pub -u 7 -P 56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3c -t channels/4/messages -h localhost -m '[{"bn":"some-base-name:","bt":1.276020076001e+09, "bu":"A","bver":5, "n":"voltage","u":"V","v":120.1}, {"n":"current","t":-5,"v":1.2}, {"n":"current","t":-4,"v":1.3}]' -d
Client mosqpub|12856-Marx sending CONNECT
Client mosqpub|12856-Marx received CONNACK
Client mosqpub|12856-Marx sending PUBLISH (d0, q0, r0, m1, 'channels/4/messages', ... (166 bytes))
Error: The connection was lost.
larshesel commented 5 years ago

Are you sure the grpc call isn't hanging for some reason? Or throws an exception?

drasko commented 5 years ago

@larshesel all gRPC calls return the {ok, Result} tuple, and I check the Result - it looks good and exactly what I expect to get from other service as a response. So I would say that it works correctly.

Additionally, I inspect logs in this other service, and it shows that it well received gRPC request from VerneMQ plugin, and that it well responded.

VereneMQ produces no error log (as there is not any error happening), and in console.log I have only these lines I pasted - upon going out of auth_on_register() with ok status, auth_on_publish() is never called.

larshesel commented 5 years ago

Could you try and do a trace on the client? vmq-admin trace client client-id=<clientid>

codeadict commented 5 years ago

My understanding is that gRPC keeps an open HTTP2 connection that blocks the current process, probably separating gRPC calls to a separate process will be the answer here.

larshesel commented 5 years ago

in that case it will definitely block the client session in auth_on_register. To figure out if that is indeed the case you could add a log statement after the grpc call and see if that gets invoked.

drasko commented 5 years ago

I have put logs all over the place - gRPC call is blocking (this is what is said in gRPC Erlang lib doc), but it is executed, and I see good values returned and auth_on_register() exited fine. Why later auth_on_publish() is not called - that is the mystery to me.

This gRPC call should be blocking - I can not continue authenticating this client until I get the response from authentication service via gRPC. So in my opinion this is not the problem.

larshesel commented 5 years ago
drasko@Marx:~/vernemq/vernemq-broker$ cat _build/default/rel/vernemq/log/console.log
2018-10-07 00:28:24.185 [info] <0.721.0> auth_on_register: {{127,0,0,1},54772} {[],<<"mosqpub|12856-Marx">>} <<"7">> <<"56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3c">> true
2018-10-07 00:28:24.185 [info] <0.721.0> auth_thing: <<"56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3c">>

From the above and consulting with the code link I see a log when calling into auth_on_register and one when calling into auth_thing. How do you know auth_on_register terminates and returns ok? Am I missing something here?

drasko commented 5 years ago

Yes, because I removed other logs from the code currently. But believe me, I had log trace on every line, printing what came out of gRPC and tracing auth_on_register() until the exit.

larshesel commented 5 years ago

Could you show the trace of vmq-admin trace client client-id=<clientid> when connecting and publishing and also the output of vmq-admin plugin show?

drasko commented 5 years ago

Yes sure, as soon as I am on the machine tonight

drasko commented 5 years ago
drasko@Marx:~/vernemq/vernemq-broker$ _build/default/rel/vernemq/bin/vmq-admin plugin show
+------------+-----------+-----------------+----------------------------+
|   Plugin   |   Type    |     Hook(s)     |           M:F/A            |
+------------+-----------+-----------------+----------------------------+
|vmq_plumtree|application|                 |                            |
|  mfx_auth  |application|auth_on_register |mfx_auth:auth_on_register/5 |
|            |           | auth_on_publish | mfx_auth:auth_on_publish/6 |
|            |           |auth_on_subscribe|mfx_auth:auth_on_subscribe/3|
drasko@Marx:~/vernemq/vernemq-broker$ _build/default/rel/vernemq/bin/vmq-admin trace client client-id=test-100
No sessions found for client "test-100"
New session with PID <7574.572.0> found for client "test-100"
<7574.572.0> MQTT RECV: CID: "test-100" CONNECT(c: test-100, v: 3, u: 7, p: 56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3b, cs: 1, ka: 60)
<7574.572.0> Calling auth_on_register({{127,0,0,1},57256},{[],<<"test-100">>},7,56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3b,true) 
<7574.572.0> Hook returned "ok"
<7574.572.0> MQTT SEND: CID: "test-100" CONNACK(sp: 0, rc: 0)
<7574.572.0> Trace session for test-100 stopped
drasko@Marx:~/vernemq/vernemq-broker$ cat _build/default/rel/vernemq/log/console.log
2018-10-08 22:38:19.952 [info] <0.572.0> auth_on_register: {{127,0,0,1},57256} {[],<<"test-100">>} <<"7">> <<"56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3b">> true
2018-10-08 22:38:19.952 [info] <0.572.0> auth_thing: <<"56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3b">>
drasko commented 5 years ago

And this is what I get when I replace auth_on_register() body with simple ok.:

drasko@Marx:~/vernemq/vernemq-broker$ _build/default/rel/vernemq/bin/vmq-admin trace client client-id=test-100
No sessions found for client "test-100"
New session with PID <7574.790.0> found for client "test-100"
<7574.790.0> MQTT RECV: CID: "test-100" CONNECT(c: test-100, v: 3, u: 7, p: 56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3b, cs: 1, ka: 60)
<7574.790.0> Calling auth_on_register({{127,0,0,1},57674},{[],<<"test-100">>},7,56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3b,true) 
<7574.790.0> Hook returned "ok"
<7574.790.0> MQTT SEND: CID: "test-100" CONNACK(sp: 0, rc: 0)
<7574.790.0> MQTT RECV: CID: "test-100" PUBLISH(d0, q0, r0, m0, "channels/4/messages") with payload:
    [{"bn":"some-base-name:","bt":1.276020076001e+09, "bu":"A","bver":5, "n":"voltage","u":"V","v":120.1}, {"n":"current","t":-5,"v":1.2}, {"n":"current","t":-4,"v":1.3}]
<7574.790.0> Calling auth_on_publish(7,{[],<<"test-100">>},0,channels/4/messages,false) with payload:
    [{"bn":"some-base-name:","bt":1.276020076001e+09, "bu":"A","bver":5, "n":"voltage","u":"V","v":120.1}, {"n":"current","t":-5,"v":1.2}, {"n":"current","t":-4,"v":1.3}]
<7574.790.0> Hook returned {error,error}
<7574.790.0> MQTT RECV: CID: "test-100" DISCONNECT()
<7574.790.0> Trace session for test-100 stopped

Then (gRPC is not called from auth_on_register()) there is a call of auth_on_publish(), but of course it does not pass (because auth_on_register() dod not prepare needed stuff that are returned from gRPC call).

drasko commented 5 years ago

Just to be sure, I tried opening gRPC connection just before sending req, and then closing it. It still does not work (gRPC woks fine, I get ok status, but when auth_on_register is exited, auth_on_publish is never called).

drasko commented 5 years ago

@codeadict how do I put this gRPC in another process - at which point to spawn the process? From _app.erl, ie bahavior (here), or from my plugin service here.

Also - how to monitor this process and restart it if it crashes?

larshesel commented 5 years ago

So the gRPC library interferes somehow with the calling process which is the MQTT session finite state machine - no idea what/how.

To supervise processes and restart them in case a crash occurs you should use supervisors: http://erlang.org/doc/design_principles/sup_princ.html

dergraf commented 5 years ago

@drasko in https://github.com/erlio/vernemq/issues/882#issuecomment-427971554 the trace doesn't report that it received a PUBLISH after VerneMQ delivered the CONNACK... obviously without the PUBLISH it would never call auth_on_publish.

Are you sure that your client received the CONNACK? The trace has stopped then possibly because the client has disconnected.

As reported earlier starting the gRPC connection inside the Application callback is usually a no-go in the Erlang world. You want it to be contained in a separate gen_server, that is supervised as part of your application supervision tree. Moreover, I doubt you want gRPC to open a HTTP2 stream for every auth_on_register request, because that it what is done the way you call gRPC (think of 1000s concurrent clients connecting, everyone creating a new HTTP2 stream). Without me further understanding of the underlying HTTP2 as well as gRPC (and their Erlang implementations) this looks like something you want a bounded set of 'workers' that are supervised to do for you.

As gRPC is quite a beast I recommend to do your testing outside of the context of VerneMQ. E.g. build a test OTP application. and make sure that everything works as expected. For that it's important to understand that the auth_on_* callbacks are all called within the connection process. if the calls into gRPC have any 'unmanaged' side effects to this connection process other than the mandatory return values of the callbacks you might want to isolate the gRPC calls into a specific gen_server process.

In order to build a robust plugin that interfaces with a third party system you should familiarize with Erlang OTP, especially Supervisors, and GenServers, or alternatively get in touch with some professionals (like us) to develop the plugin for you.

drasko commented 5 years ago

@dergraf yes - well, I think client received CONNACK because this can be seen from mosquitto_pub log I posted when opening the issue:

drasko@Marx:~/go/src/github.com/mainflux/mainflux/build$ mosquitto_pub -u 7 -P 56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3c -t channels/4/messages -h localhost -m '[{"bn":"some-base-name:","bt":1.276020076001e+09, "bu":"A","bver":5, "n":"voltage","u":"V","v":120.1}, {"n":"current","t":-5,"v":1.2}, {"n":"current","t":-4,"v":1.3}]' -d
Client mosqpub|12856-Marx sending CONNECT
Client mosqpub|12856-Marx received CONNACK
Client mosqpub|12856-Marx sending PUBLISH (d0, q0, r0, m1, 'channels/4/messages', ... (166 bytes))
Error: The connection was lost.

It also shows hat mosquitto_pub sent PUBLISH afterwards, but somehow VerneMQ never called auth_on_publish.

I am building this for open source project Mainflux, and if successful we want to include VerneMQ inside Mainflux system.

At this point I would like to add a process that would handle gRPC requests, and send the result back to my plugin via messages. My question is following: do I spawn this process from here or from here. I need just a small example/help how to do this and then I can continue struggling on my own.

codeadict commented 5 years ago

You need to create a gen_server and spawn it from the supervisor as a child process of our plugin. it's very hard to make a system that will be production grade if you don't understand the basics of Erlang and OTP, I'd suggest finding somebody experienced with these technologies (like the VerneMQ guys itself) or you spend some time studying the language. Just in case here is an example gen_server https://github.com/codeadict/vmq_cloudwatch_metrics/blob/master/src/vmq_cloudwatch_metrics.erl which is a separate process and gets spawned from here: https://github.com/codeadict/vmq_cloudwatch_metrics/blob/master/src/vmq_cloudwatch_metrics_sup.erl#L45

drasko commented 5 years ago

OK, I'll spawn a new OTP app from here: https://github.com/drasko/mqtt-erl/blob/master/src/mfx_auth_sup.erl#L26 following this example: https://stackoverflow.com/questions/17775824/how-to-start-multiple-instances-of-the-same-module-function-under-the-supervisor

I am only afraid that this might not solve the problem, as gRPC calls go well, and I receive correct responses...

drasko commented 5 years ago

@dergraf gRPC connection is not opened in the plugin handler - it is opened only once (not per every client), and it's opened here: https://github.com/drasko/mqtt-erl/blob/master/src/mfx_auth_app.erl#L31

Inside of plugin handler, only one gRPC request is sent over this opened connection: https://github.com/drasko/mqtt-erl/blob/master/src/mfx_auth.erl#L37

This is simple request-reply mechanism explained here: https://github.com/Bluehouse-Technology/grpc/blob/master/doc/tutorial.md#simple-rpc and as I mention it works perfectly - I am getting the expected replies from my remote gRPC service written in Go. This simple RPC is blocking, but I see no big problem if my Erlang plugin handler process for this particular client blocks until we get authorization and then server replies to a client. In any case, it happens instantly.

I do not understand why this bothers VerneMQ - because I can clearly see that auth_on_register is exited correctly after successful gRPC req-reply.

dergraf commented 5 years ago

Ok, so then indeed "something" either crashed the connection process (one should see that in the error.log) or the connection process became "for some reason" unresponsive. But as the connection was lost (according to the Mosquitto log) it looks like the connection process has really terminated. Maybe you'd see something 'fishy' in the VerneMQ debug logs (https://vernemq.com/docs/configuration/logging.html)?

The spawning of the process should be "declarative" using something called a childspec (http://erlang.org/doc/design_principles/sup_princ.html#spec) that is part of the return value of the init function of your supervisor.

drasko commented 5 years ago

@dergraf well, error.log was empty, this is why it was confusing... I will take a look at debug logs and let you know if I found something this afternoon.

It would also be very helpful if you guys could quickly try some random gRPC call from the plugin on your side - just to see what is happening. I am almost sure that it will fail.

I will make a separate process for gRPC, but again - I do not think that this will resolve the issue. It will make the solution more robust, but I think that issue will remain. In any case it is strange that we can not see what is really happening here...

dergraf commented 5 years ago

Hello @drasko So I have a hypothesis, actually, sort of verified with a demo gRPC client/server. If the gRPC connection reference (the one you store in the ETS table) becomes invalid a subsequent gRPC call will through an exit exception. This exit is matched in our connection process which traps exit signals. Unfortunately the connection process will just do a 'normal' disconnect in such cases and not log anything (here https://github.com/erlio/vernemq/blob/master/apps/vmq_server/src/vmq_ranch.erl#L238) .

So this is something we should improve in VerneMQ ;)

However, if my hypothesis is true, then you need to fix the way you setup the initial gRPC connection. The way you do it you're not notified if the connection closes or terminates due to some error, hence you need a process (e.g. gen_server) which tracks the connection state and ensures the connection is alive.

BTW... which version of VerneMQ are you using? vernemq version and how did you build it, you're saying Erlang 21, but this isn't currently working.

drasko commented 5 years ago

@dergraf I compiled VerneMQ from the master. I'll send you exact revision later.

Note that I am getting the correct replies from gRPC. I logged this - I get correct reply, I go out of the auth_on_register() fine, and then mosquitto_pub client is disconnected. I would agree that gRPC should be protected by a dedicated process. But in the flow I am describing I did not see any errors - handler exited fine with good results - yet auth_on_publish() was not called.

dergraf commented 5 years ago

@drasko ok. please just enable the Debug logs and tell us what you get in the logs.

drasko commented 5 years ago

@dergraf here is what log with debug enabled say:

drasko@Marx:~/vernemq/vernemq-broker$ cat _build/default/rel/vernemq/log/console.log
2018-10-12 01:24:49.828 [debug] <0.227.0>@plumtree_broadcast:schedule_lazy_tick:700 0ms mailbox traversal, schedule next lazy broadcast in 10000ms, the min interval is 10000ms
2018-10-12 01:24:50.172 [info] <0.443.0> auth_on_register: {{127,0,0,1},50942} {[],<<"mosqpub|340-Marx">>} <<"7">> <<"56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3c">> true
2018-10-12 01:24:50.172 [info] <0.443.0> auth_thing: <<"56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3c">>
2018-10-12 01:24:50.242 [debug] <0.447.0>@vmq_queue:state_change:992 transition from offline --> online because of add_session
2018-10-12 01:24:50.242 [debug] <0.443.0>@vmq_mqtt_fsm:connected:403 stop due to disconnect
2018-10-12 01:24:50.242 [debug] <0.443.0>@vmq_ranch:teardown:134 session normally stopped

On the mosquitto_pub I have following:

drasko@Marx:~/vernemq/vernemq-broker$ mosquitto_pub -u 7 -P 56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3c -t channels/4/messages -h localhost -m '[{"bn":"some-base-name:","bt":1.276020076001e+09, "bu":"A","bver":5, "n":"voltage","u":"V","v":120.1}, {"n":"current","t":-5,"v":1.2}, {"n":"current","t":-4,"v":1.3}]' -d
Client mosqpub|340-Marx sending CONNECT
Client mosqpub|340-Marx received CONNACK
Client mosqpub|340-Marx sending PUBLISH (d0, q0, r0, m1, 'channels/4/messages', ... (166 bytes))
Client mosqpub|340-Marx sending DISCONNECT

I have no idea where PUBLISH disappeared from VerneMQ.

drasko commented 5 years ago

I have added gen_server to handle gRPC calls:

Now I have lager crashing for some reason:

drasko@Marx:~/vernemq/vernemq-broker$ cat _build/default/rel/vernemq/log/console.log 
2018-10-14 21:53:53.912 [info] <0.560.0> auth_on_register: {{127,0,0,1},55422} {[],<<"mosqpub|5492-Marx">>} <<"7">> <<"56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3c">> true
2018-10-14 21:53:53.912 [info] <0.560.0> auth_thing: <<"56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3c">>
2018-10-14 21:53:53.958 [error] <0.190.0> Lager event handler error_logger_lager_h exited with reason {'EXIT',{{badmatch,[mfx_grpc,{send,identify,#{value => "56a00b6e-fa4e-4fc7-b6af-e05d6ae15c3c"}},grpc_state,{bad_return_value,{ok,7}},<0.560.0>,[{gen,do_call,4,[{file,"gen.erl"},{line,169}]},{gen_server,call,2,[{file,"gen_server.erl"},{line,202}]},{mfx_auth,auth_on_register,5,[{file,"/home/drasko/vernemq/mqtt/_build/default/lib/mfx_auth/src/mfx_auth.erl"},{line,56}]},{vmq_plugin_helper,all_till_ok,2,[{file,"/home/drasko/vernemq/vernemq-broker/_build/default/lib/vmq_plugin/src/vmq_plugin_hel..."},...]},...]]},...}}
2018-10-14 21:53:53.960 [info] <0.564.0> mfx_grpc has started (<0.564.0>)
drasko commented 5 years ago

In fact, I was missing and obligatory reply atom in handle_call handler. Making it as {reply, Reply, NewState} - it works now.

I can confirm that spawning new gen_servers for gRPC connection and for NATS publishing works.

Example plugin (functional) can be found here: https://github.com/drasko/mqtt-erl

Closing this one, thanks for all your help.