hazelcast / hazelcast-jet

Distributed Stream and Batch Processing
https://jet-start.sh
Other
1.1k stars 205 forks source link

Eventing mechanism for job state changes #2206

Open neilstevenson opened 4 years ago

neilstevenson commented 4 years ago

Jet doesn't provide a callback in the 4.0 API to determine when the job state has changed (eg. to RUNNING, CANCELLED etc).

The best you can do is periodically retrieve Jobs by id, which means you might miss new jobs being added between polling loops.

It would be nice to have some sort of evening based mechanism around, perhaps a topic to subscribe to.

neilstevenson commented 4 years ago

This is what I use:

Map<Long, JobStatus> currentState;
Map<Long, JobStatus> previousState = new HashMap<>();

while (true) {
    try {
        TimeUnit.SECONDS.sleep(5);

        currentState = this.jetInstance.getJobs()
                .stream()
                .collect(Collectors.toMap(Job::getId, Job::getStatus));

        // Live jobs, may be new or existing
        for (Entry<Long, JobStatus> entry : currentState.entrySet()) {
            JobStatus oldJobStatus = previousState.get(entry.getKey());
            JobStatus newJobStatus = entry.getValue();

            if (oldJobStatus == null || oldJobStatus != newJobStatus) {
                LOGGER.info("Job: {}: {}", entry.getKey(), this.jetInstance.getJob(entry.getKey()));
            }

            // Remove from previous state once examined
            previousState.remove(entry.getKey());
        }

        // Dead jobs
        for (Entry<Long, JobStatus> entry : previousState.entrySet()) {
            LOGGER.info("Job: {}: {}", entry.getKey(), this.jetInstance.getJob(entry.getKey()));
        }

        previousState = currentState;

    } catch (InterruptedException e) {
        break;
    }
}

but a publish to an ITopic<Job> would be good.