huginn / huginn

Create agents that monitor and act on your behalf. Your agents are standing by!
MIT License
43.11k stars 3.75k forks source link

Is using LongRunnable for the MQTT agent viable? #2704

Closed jacatbay closed 4 years ago

jacatbay commented 4 years ago

TLDR; I wanted to use the MQTT agent to listen for messages, but it doesn't seem to get all the messages so I tried to create a new MQTT subscriber agent that always listens to the broker via the LongRunnable interface. What do you think?

Greetings! So I've been trying out Huginn recently, especially its MQTT agent. I wanted to subscribe to a private MQTT broker and do something with the messages it receives. So I tried using an MQTT agent to listen to messages, but Huginn doesn't seem to be receiving any messages from the broker. Well it does, but only around 10% of the time. I confirmed that the broker is indeed sending the messages in the rate I expected, it just seems that the MQTT agent isn't receiving it.

I looked around to see if there are relevant issues, I found #1289 and #996, but they seem to be dead ends. (Both seem to be old and inactive). I also looked at and tried the solutions there and as expected, they didn't seem to work. Didn't work meaning that I still do not receive all the messages I expected to receive.

I tried looking at it from a different perspective. What I wanted was essentially to always listen to the MQTT broker, not listen every schedule for a certain amount of time (the max_read_time). I then looked at the Twitter Stream Agent.

From my understanding, the Twitter Stream Agent always listens to Twitter for keywords set in its options. Since this is similar to the behavior I want, I studied its code and attempted to replicate it into the MQTT agent.

Please look at the following commit: https://github.com/jacatbay/huginn/commit/6846652a96b7bdeef3cd8b1c20564a0aa7a82d09

Since the source has changed quite a lot, I decided to make it a new agent for now. I also simplified the solution by focusing on subscription first (I removed the publish capability). Anyway, this is just a demo of what I'm thinking of.

To explain the changes I made:

  1. I created a LongRunnable Worker class inside of the MqttSubscriberAgent class, with contents similar to that of the Worker class of TwitterStreamAgent
  2. I moved the mqtt_client method inside the Worker class
  3. I removed the receive method for now
  4. I simplified and moved the logic of the check method inside the run method of the Worker class

With this, the agent seems to work as I expect it to. It receives messages in the rate that I expected, but there seems to be some issues that I'm investigating right now. (Might be related to the issues I mentioned above or the simplifications I made) While investigating the issues, I thought of opening this issue so that I can ask some questions as well as determine whether I'm also heading into a dead end.

Here are my questions:

  1. Since the MQTT agent subscribes to a broker, I was actually under the impression that it works similarly with the Twitter Stream Agent. Maybe there's a reason why it's not using LongRunnable?
  2. What are the consequences of using LongRunnable?
  3. If my solution is viable, can I also ask for some code review? I'm a total newbie with regards to ruby, so I'm pretty sure there are a lot of things I could do better.

Btw, I tried to make it a custom agent but it didn't seem to work (Agent Runner doesn't start the agent). Additional info: Right now, the MQTT broker is RabbitMQTT but I'm looking at Mosquitto to see if the behavior is the same too.

Thanks for your time!

dsander commented 4 years ago

Hi!

Your approach makes total sense to me. I like that you moved it to a new Agent, that avoids possible problems for users for which the current MQTT Agent approach might work (even though you are right that we got a few issues stating the same issues with it you observed).

1. Since the MQTT agent _subscribes_ to a broker, I was actually under the impression that it works similarly with the Twitter Stream Agent. Maybe there's a reason why it's not using LongRunnable?

I think the MQTT agent is older than the LongRunnable concept. Since I don't use MQTT I did not change the Agent to be LongRunnable as well.

What are the consequences of using LongRunnable?

The main difference I can think of is that the Agent will constantly keep a open connection. If someone had hundreds of different MQTT Agents they could hit open connection limits of the operating system. It also uses a bit more system memory because the Agent is running all the time.

Btw, I tried to make it a custom agent but it didn't seem to work (Agent Runner doesn't start the agent).

That is odd, does the AgentRunner pick up the Agent from your Huginn fork when you start it with bin/threaded.rb or bin/agent_runner.rb or do you have to use bin/mqtt_subscriber.rb ?

jacatbay commented 4 years ago

Hey! Thanks for the response! :)

I like that you moved it to a new Agent, that avoids possible problems for users for which the current MQTT Agent approach might work.

Ohh, now that you mention it. Since the code and behavior have deviated significantly from the original one, it might be better to just make it a a new Agent entirely.

I think the MQTT agent is older than the LongRunnable concept. Since I don't use MQTT I did not change the Agent to be LongRunnable as well.

Oh I see. I thought that there might be some internal reason (e.g. incompatibilty, too much memory consumption etc.), but that is not the case.

The main difference I can think of is that the Agent will constantly keep a open connection. If someone had hundreds of different MQTT Agents they could hit open connection limits of the operating system. It also uses a bit more system memory because the Agent is running all the time.

I see, these are indeed significant consequences. In my case, I only needed up to 2 subscriber agents, so I found no issues with my implementation so far. Might be good to add a warning on the Agent description indicating this. (I'll add a warning on my fork)

That is odd, does the AgentRunner pick up the Agent from your Huginn fork when you start it with bin/threaded.rb or bin/agent_runner.rb or do you have to use bin/mqtt_subscriber.rb ?

Hmm, I'm not sure I understand your question correctly. On the current implementation, this is my understanding of how the flow goes: I start huginn with bundle exec foreman start which starts the commands written on Procfile. The bin/threaded.rb gets started, which in turn initializes and runs lib/agent_runner.rb. On lib/agent_runner.rb, the agent is required at the bottom so I think this is where the Agent gets picked up. I'm actually not sure of the flow when it is a custom agent.

Apologies, I'm aware I'm doing cargo cult programming at this point. What I did was I traced the Twitter Stream Agent's flow, and emulated/copied that with the new MQTT Subscriber Agent. (Hence the presence of bin/mqtt_subscriber.rb) IIRC something broke when I tried to delete this file so I just left it as it is.

In addition to fixing the comments on the commit (Thanks a lot for looking at the commit!), I'll try making it a custom agent again. I believe making it a custom agent is the best way forward as it minimizes the effect on the current code.

dsander commented 4 years ago

I see, these are indeed significant consequences. In my case, I only needed up to 2 subscriber agents, so I found no issues with my implementation so far. Might be good to add a warning on the Agent description indicating this. (I'll add a warning on my fork)

Not sure if the warning is needed, I think your approach does what the Agent is intended to do and all the downsides (which are pretty minor) are needed to achieve a permanent connection that picks up new events imidiately.

the agent is required at the bottom so I think this is where the Agent gets picked up. I'm actually not sure of the flow when it is a custom agent.

I think you are right, I forgot about that part. It might be possible to tweak the way how it works to make LongRunnable Agents gems work. I'll try to look into that this weekend.

dsander commented 4 years ago

We indeed had a bug that would break stuff if an Agent gem was LongRunnable. The PR I just merged should have fixed it.

jacatbay commented 4 years ago

Thanks for looking at it! With this PR, does it mean that it should work as a custom agent?

dsander commented 4 years ago

I believe so, yes.

jacatbay commented 4 years ago

I'll try creating it as a custom agent again and if that works, I think this ticket as well as my fork is no longer necessary as it is now a custom agent. Will keep you posted, thanks a lot!

dsander commented 4 years ago

Great! Feel free to ping me if you need additional help or another code review.

ghost commented 4 years ago

@jacatbay any update on this? Were you able to make a custom LongRunnable agent for MQTT? Can you share it?

jacatbay commented 4 years ago

@xnaas Yes, I was able to make it a custom agent. I'm sorry I forgot to upload it, with all the things happening recently it slipped out of my mind. I'll try to find time to make a git repo for it.

I'll comment the git repo here, then I'll close this issue since it's already resolved as a custom agent.

ghost commented 4 years ago

Awesome. I look forward to it!

I've had to suspend my MQTT agents for the time being because they hold up all of my other agents, even with the threaded thing. 😦

jacatbay commented 4 years ago

I've created a git repo for the custom agent here for now: https://github.com/jacatbay/huginn_mqtt_subscriber_agent

Haven't tested it yet though! :( If you are thinking of playing with it, please create some backups first. (Will be testing it soon.)

If there are some problems with the custom agent, I think it's better to register/report those on the Issues of the git repo itself, so I will now be closing this issue.

P.S. I would be really grateful if you could also check the source code. I really don't know much about ruby so any comments would certainly help. Thanks a lot! :)