flux-framework / flux-core

core services for the Flux resource management framework
GNU Lesser General Public License v3.0
168 stars 50 forks source link

Stream job events from flux #4569

Open morrone opened 2 years ago

morrone commented 2 years ago

I don't know if existing functionality can reasonably satisfy this or not, so I will describe the use case that we have in mind:

When flux is used as the cluster-level scheduler ("native flux"?), we would like a to generate messages in real-time for job scheduling related events. Those events would include, but not be limited to:

We would like the job information in the events to include all relevant information about the job, for instance:

It is open for discussion whether all data is included in each event, or perhaps just the data that has changed since the previous event for the same job.

The goal of the job event interface in flux would be to allow us to publish the data to a topic in Kafka. In turn there would be one or more consumers of the job events topic. One of the first consumers would be the elasticsearch database that will back the operational monitoring dashboard implemented in grafana.

Can we cleanly achieve this with current APIs in flux? Will we need more work in flux-core (or somewhere else in flux?) to meet this need?

garlick commented 2 years ago

I think if we sent you jobspec, R, and the primary job eventlog for each job, you would have everything on your list (or if things are missing we can fill them in).

If you're curious you can examine these on a flux system like corona with

$ flux mini submit ...
<prints JOBID>
$ flux job eventlog --format=json JOBID
$ flux job info JOBID jobspec
$ flux job info JOBID R

Although you could write something now using flux APIs to grab this stuff in real time, when we heard your talk in S3G meeting, we were wondering about a new service in flux that could efficiently provide this as well as serve some other monitoring requirements we have.

Other questions we might want to discuss

morrone commented 2 years ago

Those are good questions.

I don't think the data necessarily needs to be simpler as long as it is present. The plugin's job would be to convert the data into whatever destination format needed (my immediate use case being something like an Avro schema for use in Kafka). But if the format is especially difficult to parse and use maybe I would change my mind about that. :)

Yes, the edge cases are good to consider. I don't fully know the interfaces yet, but it sounds like the current job eventlog messages do not contain all of the information we would need. The event just gives us the poke to go and look up the other information (jobspec, R, etc.). I would want to know the semantics around that. For instance, if a job is purged, does that data get deleted from the KVS? Are races possible, such as a job being submitted and purged before the plugin has a chance to look up the data? Maybe we need the relevant data to be contained in the event so the event messages are more atomic?

Whether or not we can handle duplicate data probably depends on the schema format that we describe, the format in the database(s), and maybe other factors. It wouldn't hurt to try to make the messages idempotent. If we can maintain the same ordering and content of messages in the job event stream, then it might be possible to have kafka help us throw out the duplicate messages. But don't hold me to that.

Even if a duplicate message makes it through Kafka, probably the databases wouldn't mind if we write the data twice.

Probably most of the data would not need to be anonymized, because the data will be protected behind authentication and authorization through most of the pipeline. But I wouldn't completely rule that out. Anonymization might happen at different levels in the pipeline, flux-core, the plugin that consumes job events, Kafka (Kafka Streams?), or even just have field-level authorization in the database if that is supported. It is probably too early to decide where we want to do that.

Is it possible to create a wildcard watch on the kvs or something to get job events for all jobs? It might make it more difficult to have a reproducible ordering of events (for recovery situations) using that approach?

chu11 commented 2 years ago

I brought this up with @garlick on a call. It was just a brainstorming, I'm not sure if its a good idea or not, but I'll throw it out there.

I'm wondering if the job-list module should provide a streaming service similar to other streaming services in flux. It could stream all job data changes that occur over time. On restart, it could re-stream all recovered job info. Some positives of this approach:

the bad:

garlick commented 2 years ago

Sorry for my ignorance but is kafka running on the management node such that flux can more or less assume that it's always running and ready to accept data? Just wondering if maybe flux could get away with "open loop" publication of the required data to kafka for a start.

morrone commented 2 years ago

Kafka is not necessarily on the management node. Our current thinking in LC is to use the gateway nodes for kafka. FYI, Kafka is a group of daemons on different nodes working together, not a single node (assuming that the cluster is large enough to have more than one gw node).

The libraries for Kafka are pretty darn smart from what I have seen. librdkafka is the C/C++ client library, and employs threading and in memory caching to make Kafka producer calls asynchronous.

So at one level, the kafka plugin in flux could pretend that Kafka is always available, even if it isn't. Hopefully everything is asynchronous, even the initial creation of the kafka producer handler. At some point the cache would fill and I'm not sure what would happen then. The library is designed to be reliable, so additional calls might block at the point. If it is configurable, maybe we allow it to throw away older data. It all depends on our posture towards losing flux job data.

And data would be lost if flux was restarted before the kafka producer code could publish all of the data.

The job-list idea sounds interesting. We would need it to allow all job data to be retrieved by the plugin running in the flux.

What other streaming services are there in flux that I can look at for comparison?

chu11 commented 2 years ago

What other streaming services are there in flux that I can look at for comparison?

Nothing at the level of what I'm thinking of that would go into job-list. kvs-watch is the one that comes to mind, as it tracks changes in a kvs key.

>flux kvs get --watch a > foo.out &
>flux kvs put a=2
>flux kvs put a=3
>flux kvs put a=4
>flux kvs put a=5
>cat foo.out
1
2
3
4
5
grondo commented 2 years ago

The job-manager event journal?

garlick commented 2 years ago

Yeah the job manager event journal is what I was thinking, with fetching of R and the jobspec, producing one stream for kafka.

grondo commented 2 years ago

Could we just store the raw jobspec and R (these are JSON) for each job along with all of the events and have Kafka or some other data processing extract the necessary data from these objects? It would seem that limiting the stored data to what is just available in job-list for instance would possibly prevent other interesting queries (the base job environment is in jobspec for instance).

Sorry if that is a naive suggestion, but even modern sqlite can query stored JSON objects directly.

chu11 commented 2 years ago

I am thinking some job-list streaming service may not be the way to go, as it would limit what data is available.

Yeah the job manager event journal is what I was thinking, with fetching of R and the jobspec, producing one stream for kafka.

perhaps a requestor of the stream, via a flag, could say "hey, send me jobspec and R after they are available?" And they would be interjected into the event stream? Perhaps a new event called job-data with context {"type":"jobspec", "data":<the jobspec>}?

Hmmm. I wonder if such a stream would make the job-list module easier, as it doesn't have to bother to get jobspec and R on its own. There would be trickiness in making sure the published job state in job-list is not "transitioned" before we have read / parsed jobspec/R.

grondo commented 2 years ago

Well I was thinking at first this could perhaps be prototyped as a python script that consumes the job-manager journal. That would allow to quickly see how it goes without spending a lot of time building a new streaming service into flux-core.

garlick commented 2 years ago

There is this: https://docs.confluent.io/kafka-clients/python/current/overview.html

grondo commented 1 month ago

This came up again as a near-ish term requirement, but some requirements were missing in this original issue. Captured from offline discussion, so some things may be missing/wrong:

Hm, those two seem to be the big ones.

My proposal is to design a Python API that wraps the job manager journal RPC and implements these features for now. The offset can be a timestamp since afaik job event timestamps should be unique.

For now the API can unpack the historical events, check the timestamp, and process events into an event per callback or whatever. At some later point the underlying RPC can be improved if necessary.

The Python API might be informed by the design of kafka consumers: https://docs.confluent.io/kafka-clients/python/current/overview.html#ak-consumer

morrone commented 1 month ago

Capturing requirements from our discussion today:

I would like the history to be provided to clients in the original journaled format. In other words, I want the historical information to be replayed in the same order, and in the same format as if I had been listening to the events when they occurred.

A client should be able to specify the point in the journal where they left off, rather than always starting off from the of the journal. This requirement could be a longer-term goal, if the client at least has some easy way to recognize the point in the journal where it left off. If timestamps are unique, then timestamps would work. If they are not, then timestamp combined with some other field? It would be nice not to have to match the entire event message because that would introduce some extra client side burdens that I would like to avoid).

So perhaps another requirment: a way to uniquely identify a message that doesn't require matching the entire string of the message. A monotonically increasing counter perhaps.

grondo commented 1 month ago

Ok, here's a first cut of a JournalConsumer interface. It offers either a poll() interface to get the next message (which is AFAICT the interface of the kafka consumer from the link above) or a way to register a callback that will be called for each event. The second approach requires that the flux reactor be running.

I think all job events have unique timestamps, since the thing creating the timestamp is the job manager which is a single thread. A since parameter is provided that avoids returning events that have timestamps less than or equal to a provided timestamp, so to avoid returning events that have already been processed, since should be set to the timestamp of the most recently processed event.

from collections import deque

import flux
import flux.job
import flux.rpc
from flux.constants import FLUX_NODEID_ANY, FLUX_RPC_STREAMING
from flux.job import JobID
from flux.job.event import EventLogEvent

class JournalEvent(EventLogEvent):
    """A container for an event from the job manager journal

    Attributes:
        jobid (JobID): The job id for which the event applies
        name (str): event name
        timestamp (float): event timestamp
        context (dict): context dictionary (see RFC 18)
        context_string (str): context dict converted to comma separated
            key=value string.
        jobspec (dict): For ``submit`` events, the job's redacted jobspec
        R (dict): For alloc events, the job's assigned R
    """
    def __init__(self, jobid, event, jobspec=None, R=None):
        super().__init__(event)
        self.jobid = JobID(jobid)
        self.jobspec = jobspec
        self.R = R

    def __str__(self):
        return f"{self.jobid.f58}: {self.timestamp:<0.5f} {self.name} {self.context_string}"

class JournalConsumer:
    """Class for consuming the job manager journal

    This class is a wrapper around the ``job-manager.events-journal`` RPC.

    It can be used synchronously via the ``poll`` method, or asynchronously
    by registering a callback with the ``set_callback`` method. In the case
    of asynchronous use, the Flux reactor must be run via ``h.reactor_run()``
    where ``h`` is a Flux handle opened with ``flux.Flux()``.

    When the consumer is first started, historical data (events in the past)
    will be sent from the job manager unless ``full`` is set to ``False``.
    These events are stored until all historical events are processed,
    then are time ordered before returning them via ``poll()`` or to the
    registered callback.

    To avoid processing previously seen events with a new instance of this
    class, the timestamp of the newest processed event can be passed to the
    constructor ``since`` parameter. Timestamps should be unique so ``poll``
    or the callback will start with the newest event since the ``since``
    timestamp.
    """
    def __init__(self, full=True, since=0.0):
        self.backlog = deque()
        self.full = full
        self.since = since
        self.rpc = None
        self.cb = None
        self.cb_args = []
        self.cb_kwargs = {}

        if self.full:
            self.processing_inactive = True

    def __enqueue_response(self, resp):
        if resp is None:
            return
        jobid = resp["id"]
        jobspec = resp.get("jobspec")
        R = resp.get("R")
        for entry in resp["events"]:
            event = JournalEvent(jobid, entry)
            if event.timestamp > self.since:
                if event.name == "submit":
                    event.jobspec = jobspec or None
                elif event.name == "alloc":
                    event.R = R or None
                self.backlog.append(event)

    def __next_event(self):
        return self.backlog.popleft()

    def start(self, flux_handle):
        """Start the stream of events by sending a request to the job manager"""
        self.rpc = flux_handle.rpc(
            "job-manager.events-journal", {"full": self.full}, 0, FLUX_RPC_STREAMING
        )
        return self

    def stop(self):
        """Cancel the job-manager.events-journal RPC"""
        self.rpc.cancel()

    def poll(self):
        """Syncrhonously get the next job event

        If full=True, then this call will not return until all historical
        events have been processed so that they are time ordered.
        """

        if self.processing_inactive:
            # process backlog. Time order events once done:
            while self.processing_inactive:
                resp = self.rpc.get()
                if resp["id"] == -1:
                    self.processing_inactive = False
                    self.backlog = deque(
                        sorted(self.backlog, key=lambda x: x.timestamp)
                    )
                else:
                    self.__enqueue_response(resp)
                self.rpc.reset()

        if not self.backlog:
            self.__enqueue_response(self.rpc.get())
            self.rpc.reset()

        return self.__next_event()

    def __user_cb_flush(self):
        while self.backlog:
            self.cb(self.__next_event(), *self.cb_args, **self.cb_kwargs)

    def __cb(self, future):
        resp = future.get()
        if self.processing_inactive and resp["id"] == -1:
            self.processing_inactive = False
            self.backlog = deque(sorted(self.backlog, key=lambda x: x.timestamp))
            resp = None

        self.__enqueue_response(resp)
        self.__user_cb_flush()
        future.reset()

    def set_callback(self, event_cb, *args, **kwargs):
        """Register callback ``event_cb`` to be called for each job event

        If provided, *args, and **kwargs are passed along to event_cb.
        """
        self.cb = event_cb
        self.cb_args = args
        self.cb_kwargs = kwargs

        self.rpc.then(self.__cb)

Here's a simple example using this interface which prints all events since a specific timestamp.

    h = flux.Flux()
    consumer = JournalConsumer(since=1729458393.0).start(h)

    while True:
        event = consumer.poll()
        print(event)
grondo commented 1 month ago

@wihobbs may find the above API of interest for the notification service as well. This class is pretty simple. If it seems useful we can add it to our official PYthon API with tests, etc.

garlick commented 1 month ago

A nice optimization then will be to add "since" to the rpc. That should be trivial.

On Sun, Oct 20, 2024, 4:05 PM Mark Grondona @.***> wrote:

@wihobbs https://github.com/wihobbs may find the above API of interest for the notification service as well. This class is pretty simple. If it seems useful we can add it to our official PYthon API with tests, etc.

— Reply to this email directly, view it on GitHub https://github.com/flux-framework/flux-core/issues/4569#issuecomment-2425277208, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABJPW7DEXABKFBEHMFD4WTZ4QZKBAVCNFSM6AAAAABQG2UZLSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDIMRVGI3TOMRQHA . You are receiving this because you were mentioned.Message ID: @.***>

morrone commented 4 weeks ago

Ok, here's a first cut of a JournalConsumer interface.

Looking good!

At first read through, there are two things that could probably be enhanced:

Thanks!

wihobbs commented 4 weeks ago

Hi @grondo, that looks like a neat improvement! Thanks!

One thing we may want to think about adding is a wrapper class that can track data for the life cycle of a job. For instance, in the notification service, we're interested in some of the data in jobspec at finish, so we keep that around. I'll share an example here:

class WatchedJob:
    def __init__(self, jobid, fut):
        self.id = jobid ## should use a Flux jobid object here
        self.jobspec = fut['jobspec']
        self.R = None
        self.exception = None
        self.eventlog = {}

    def response_cb(self, journal_entry, dispatcher):
        ## Determine state of event.
        ## If validate, store jobspec.
        ## If alloc, store R.
        ## If exception, store exception

        event = flux.job.EventLogEvent(journal_entry['events'][0]) ## this could be a JournalEvent now :)
        event_name = event.name

        if event_name == 'alloc':
            self.R = journal_entry['R']
        elif event_name == 'exception':
            self.exception = event.context()

        ## We want to store the event so the log can be sent to the user.
        self.eventlog[event_name] = event

However, I understand if that's an implementation detail you want to leave up to the consumer.

Overall, nice improvement! @vsoch may be interested in it for her work as well.

grondo commented 4 weeks ago

One thing we may want to think about adding is a wrapper class that can track data for the life cycle of a job.

Yeah, I think that's a bit application specific at this point and it isn't exactly clear if a single API would work for all consumers. Even this JournalConsumer class seems a bit questionable as to whether it belongs in the flux-core Python API, but since we do have >1 use case at this point, perhaps it makes sense.

grondo commented 4 weeks ago

poll() should take an optional timeout

Great, thanks. That's a simple one and I'll add that.

timestamp should be have clearer documentation (e.g. "seconds since epoch, with millisecond/microsecond/nanosecond precision"). It should very likely be something other than a float as well. I think that will limit us to millisecond precision, and that likely wouldn't be enough to guarantee uniqueness. If it was an integer, it could be "nanoseconds since epoch". FYI, most of the stuff I work with right now uses at least microsecond precision for timestamps, and nanosecond is becoming more common.

Ok, I'll expand that documentation. The timestamps are sub-microsecond precision FYI. I just ran a test on a system with >120K jobs and found no duplicates, so for now let's treat this as a theoretical problem and not a practical problem. (We do have some plans to make this a guarantee, but unfortunately this is not high priority right now)

morrone commented 4 weeks ago

The timestamps are sub-microsecond precision FYI

OK, it looks like most python implementations use a double to implement a python "float", so that is OK for microseconds-since-epoch. As long as flux internally uses doubles (or better) to represent time, that should be fine for now.

vsoch commented 4 weeks ago

Adding a note here (and apologies if I missed a detail, there is a lot in this thread!) but I'd like to be able to add arbitrary events that are rpc (not necessarily just the job journal) to be triggered with handle.reactor_run() and then allow me to trigger a callback. For example, for a custom heartbeat, I can subscribe to it and receive the message, but I have to do that synchronously and do some loop that blocks (and prevents me from running my one call to the reactor). My solution now is not to use flux and create a separate thread, but I'd ideally like everything (events wise) coming from one source, and from flux.

grondo commented 4 weeks ago

Adding a note here (and apologies if I missed a detail, there is a lot in this thread!) but I'd like to be able to add arbitrary events that are rpc (not necessarily just the job journal) to be triggered with handle.reactor_run() and then allow me to trigger a callback.

No problem! I think there may be some confusion here between job eventlog "events" and event messages, if I'm reading your question correctly. I'm going to open a separate discussion issue about handling RFC 3 Event message type messages asynchronously.

vsoch commented 4 weeks ago

That's perfect - thank you!