faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.52k stars 175 forks source link

How to check liveness of agent? #625

Open caspereijkens opened 2 months ago

caspereijkens commented 2 months ago

We want to implement a liveness check of our Faust application. What is the best gauge of "liveness" of a Faust agent?

Background

The agent is consuming from a topic where rarely messages are produced. Normally, commits or sends would be a good measure for liveness. But in this case, it is unclear what processes or events are still running when the agent is in this "idle" state.

Proposed solutions

The first solution could be using timers (https://faust.readthedocs.io/en/latest/userguide/tasks.html#timers). But what code to execute here to prove that the agent is still happy?

The second solution would be to create a heartbeat topic where messages arrive regularly. However we hope there is a simpler solution to simply check for liveness.

fonty422 commented 2 months ago

I'm not sure what the best approach is or what is efficient - but it also depends on what you need to report in terms of 'liveliness'. Does that simply mean knowing that a worker is running, or also the health of that worker? If you're simply after a running/not-running status, then an endpoint is simple enough. But if you want to get some stats on the worker then I suggest looking into the app.monitor section, which you get for free. There's a lot of good metrics in there that are useful.

We have tried to employ a few methods:

  1. A readiness endpoint similar to Kubernetes where an external service probes an endpoint on each worker
  2. A timer which uses the monitor system to report the state of events - how many have been processed in the last second, what the lag is between the read/commit/end of a topic, how many are currently being processed etc.
  3. Just using docker containers to host each worker and let a container manager (portainer, kubernetes etc) manage that with it's built in tools.

The second option seemed to be a good solution s it would give information on whether the system was experiencing lag in processing messages, but it did have some small not insignificant overhead as we had a lot of workers all reporting at 1Hz their status to a single worker at the end that had to collate and send the data to websocket clients. That meant getting 50 messages per second to a worker that already had to process a few hundred per second and we were getting close to the throughput for a single worker. You also need some means to handle what that data means and what to do about it. We will probably use this in production but at a lower

I was made aware at some point of a system that can auto monitor these for you including auto increasing/decreasing the number of workers based on load/demand but we never did follow that up and I can't for the life of me remember what that was.

Hope that's helpful or someone else can off some suggestions or validate our methods

fonty422 commented 2 months ago

Also, I just realised you're after an agent liveliness. I'm simply not sure whether that's possible, except to say that the best measure of whether an agent is failing or has failed is probably to compare the last offset in the topic it processes to the read and commits. If the last is higher than the read or commit by some threshold you set and the read/commit isn't increasing or the gap is increasing then you have a problem.

offset = sum(app.monitor._tp_end_offsets_dict().get('topic-name', {}).values())
read = sum(app.monitor._tp_read_offsets_dict().get('topic-name', {}).values())
committed = sum(app.monitor._tp_committed_offsets_dict().get('topic-name', {}).values())
caspereijkens commented 2 months ago

Thanks for your comprehensive reaction @fonty422 .

We would like to know as soon as possible when the connection to the Kafka brokers or topic is faltering. The topic in question rarely has messages produced, so I wonder how effective the proposed commit check is in this case?

I am unable to find in the documentation whether the agent has a constant connection to Kafka and what metric would be a measure for that connection.

fonty422 commented 2 months ago

Perhaps I've misunderstood and you're more looking at whether the Kafka broker system has faulted? In which case, I'm not sure if faust can handle telling you that and you probably need something that looks specifically at that process.

If it's that you need to know if an agent has failed to process a single message, that could be for a few reasons; network traffic, some other agent is behind, so take that into account in setting some time-based thing. I think that a timer on 1Hz that compares the offsets and if it gets successive results where the offset is a certain value and the committed is behind and the same value, then you know it's busted. I'm not sure if the offset comes for free, or if it's a request that perhaps you don't want to run at 10Hz.

But if it's that you want to catch if the Kafka broker is not available, you have much bigger problems that are probably not faust related (other than faust can't connect to it) and you should find a method to probe the availability of the broker. For us the whole thing will eventually run with each zookeeper, Kafka broker, and worker being on an individual docker container so if any one of them fails we will know about it, and in fact it will automatically try to restart them. If they fail a certain number of times we'll be alerted via a different mechanism separate to faust and kafka.

caspereijkens commented 2 months ago

Does read_offsets/committed_offsets/end_offsets look up the information on the broker, or is it some local value that is kept track of?

In case it looks up the information on the broker, that would be the answer to my question. I just need anything that pings the broker.

fonty422 commented 1 month ago

I'm really not sure whether this is given for free, or it needs to look it up. I can't imagine that the end-offsets come with the message. I also just noticed the track_tp_end_offset method too, so perhaps I should be considering that. But in reading the docs and code, it kind of looks like the Monitor is an event-based process 🤷

@wbarnha, perhaps you can answer this one quickly - how does the monitor get it's information? Is it contacting the broker to request the information, and is it triggered by an incoming event or can you call it dynamically?