Closed samhld closed 2 years ago
I thought about reusing a mqtt client then, but could not figure out how. And unfortunately I still can't see how state can be kept in builtin function. Do you or @nathanielc have an example?
Specifically For the mqtt.publish()
, I have no idea what the lifecycle of reusable mqtt client could be like, especially when to close/destroy it. Going low-level does not seem as an option too since the connection is a private field in the client.
@alespour The client to MQTT should come from the dependency system which should allow for reuse of the client. We recently added mqtt to the deps system here https://github.com/influxdata/flux/blob/master/dependencies/mqtt/mqtt.go
@nathanielc So client(s) could be cached in mqtt dialer (?), but when/where should they get closed?
Okay, looking this over a bit, I'll just summarize my understanding of the problem (please correct me if I'm wrong):
import "experimental/mqtt"
...
|> map(fn: (r) => ({r with sent: mqtt.publish(...)}))
...
And the call to publish
is expensive because we need to create a new client (and connection) for each row processed by map
.
Go's standard http client can cache and reuse TCP connections. But the mqtt client does not do this and in fact hides the connections it makes in a non-exported field.
Caching mqtt clients somewhere in the Flux runtime (probably scoped to the query request) would seem to make sense to address this, but we don't have any precedent for this kind of thing to my knowledge.
Other things we could try:
import "experimental/mqtt"
cli = mqtt.newClient(servers: [...])
...
|> map(fn: (r) => ({r with sent: cli.publish(...)}))
...
This isn't so appealing because it adds complexity.
map
for stuff like this.Question: how important is it to call Disconnect()
on the client when done?
@nathanielc Any additional thoughts given this context?
And the call to publish is expensive because we need to create a new client (and connection) for each row processed by map.
Go's standard http client can cache and reuse TCP connections. But the mqtt client does not do this and in fact hides the connections it makes in a non-exported field.
That's how I understand it but I'll let Nathaniel and @alespour confirm that more officially.
Question: how important is it to call Disconnect() on the client when done?
I assume you're referring to the MQTT client Disconnect()? I don't know off hand. However, some implications wrt disconnecting:
keepalive
parameter that would close the connection after a given amount of time. However, this would send a last will message...so subscribing clients would read this closed connection as unintentional, I believe.Given the question, I'm interpreting that the inclusion of Disconnect()
complicates some possible solutions?
@samhld @alespour We're going to be devoting some team meeting time on Monday to figure out the best approach to this issue. We'll report back here once that happens, hopefully we can unblock this work.
@wolffcm Wonderful TY!
So after some discussion today, we came up with a solution that will implement connection pooling as part of dependency injection, so that client code (like the implementation of mqtt.publish()
) can ask for an MQTT client for a particular set of servers, and if there is a client that was used previously (scoped to that query request) we will reuse it.
Also, there will be a mechanism via which we'll call Disconnect()
on the MQTT client once the query request is complete.
In summary, we have a solution that will "just work" with minimal changes to the current implementation to mqtt.publish()
, aside from changing how the client is fetched from the dependencies in the context.
Since this work involves extending our dependency framework, it should be done on the Flux team (consult @nathanielc for info on how this will be prioritized)
@samhld and @alespour Does the above sound like it would work for y'all?
Implementation notes:
Disconnect()
on each pooled connection when the query request is completeDOD:
@wolffcm Sounds good to me, thank you.
Work to extend the feasibility of the feature brought in by this PR: https://github.com/influxdata/flux/pull/4081
@alespour Regarding the quick conversation you and I had about the synchronous nature of
mqtt.publish()
, I had a conversation with @nathanielc about how to handle this. It sounds like a good option would be to simply reuse the TCP connection as we loop through each call topublish()
in themap()
call. According to Nathaniel, subsequent calls to publish can keep state on the Go side.@alespour thoughts?