Coreoz / Wisp

A simple Java Scheduler library with a minimal footprint and a straightforward API
Apache License 2.0
133 stars 23 forks source link

Feature request: job cluster synchronization to avoid running the same job twice #16

Open newnewcoder opened 2 years ago

newnewcoder commented 2 years ago

Is it possible to add a Scheduler Interface for users to implement it?

amanteaux commented 2 years ago

Hi @newnewcoder,

Sorry for the late answer, can you detail your use case? The Scheduler class is the root of the Wisp Scheduler, making another implementation would mean that nothing from Wisp is used (except the configuration files).

newnewcoder commented 2 years ago

In my case, I need to run the scheduler application separately on two nodes under loadbalancer, but with the same job/scheduling state, any suggestions?

amanteaux commented 2 years ago

I have a doubt, do you want to execute the same jobs on both nodes, or do you want to load balance your jobs to your node instances (so if you have 2 jobs, you want to execute one job on node 1 and the other on node 2)?

newnewcoder commented 2 years ago

Neither the case. In fact, I use Wisp in a web project, which is under load balancer. And I don't want the same job runs twice.

amanteaux commented 2 years ago

Ok I see!

The "easy" solution is to have a dedicated node that run jobs.

But that is actually a good idea to provide in the future a way to provide job cluster synchronization. We might provide another Wisp library for this specific purpose!

amanteaux commented 2 years ago

One external service must be used to provide the cluster synchronization feature.

For a first version, a database service will be used for that. Maybe in the future other implementations will be provided to connect to other services.

RobbiewOnline commented 2 years ago

I've implemented this already in my own code, but thought I'd share my strategy as it may help you decide on an implementation.

In my case any one of my nodes could go un-healthy, or be removed from the cluster because of scaling down, so the node that runs the task can change between each invocation.

Here's a use-case... Every cron period (e.g. 2am every morning) the cron would wake up the Scheduler on every node (x 10).

Within my implementation (which needs refining to be perfect) each node will see how long ago it was since a node elected to perform the work and if it was more than 10s ago then they write to the database to say that they're alive and ready to run the task.

Once this election process has started the task sleeps for 5s (very generous, I use MongoDB and wasn't 100% sure how long it would take after writing to ensure it's written in the database and propagated to the other nodes. My MongoStorage.setOrUpdateConfigByNameAndSyncWithSqs method is what's used to persist this value which is cached in memory AND send also messages to the other nodes using AWS SQS (Simple Queue Service). This will be received by the other nodes to invalidate their cache, forcing them to read the database rather than the cache when next queries (in <5s time).

After this generous sleep, if the hostname of the machine that's running the task is still the same as the hostname persisted in the database then only that host will run the task, the others will exit like a no-op.

The code for my implementation is below, feel free to use, ignore or pilfer any good bits.

    public static void registerRunnableCron(
            String threadName,
            String cron,
            boolean limitToOneNode,
            Runnable tick
    ) {

        CronExpressionSchedule cronSchedule = CronExpressionSchedule.parse(cron);

        scheduler.schedule(threadName, new Runnable() {
            @Override
            public void run() {
                try {
                    if (limitToOneNode) {
                        final Date NOW = new Date();
                        final String myHostname = getHostname();
                        boolean iVoted = false;

                        Date lastSetDate = MongoStorage.getConfigAsDate(threadName + "-cron-date", false);
                        String lastSetHostName = MongoStorage.getConfigAsString(threadName + "-cron-hostname", false);

                        if (lastSetDate == null || lastSetHostName == null
                                || (NOW.getTime() - lastSetDate.getTime()) > 10000) {

                            // If not previous set or more than 10s ago then vote to do the work
                            MongoStorage.setOrUpdateConfigByNameAndSyncWithSqs(threadName + "-cron-hostname", myHostname);
                            MongoStorage.setOrUpdateConfigByNameAndSyncWithSqs(threadName + "-cron-date", NOW);

                            Log.info("CRON " + threadName + " on host:" + myHostname + " I've voted to take on this task");
                            iVoted = true;
                        }

                        try {
                            // Wait 5s enough time to propagate vote to all servers
                            Thread.sleep(5000);
                        } catch (InterruptedException ex) {
                            Log.error(true, this, "Sleep interrupted");
                        }

                        // Check to see whether the hostname in the DB is myself - 
                        // if it is then do the work otherwise ignore it
                        lastSetHostName = MongoStorage.getConfigAsString(threadName + "-cron-hostname", false);

                        if (myHostname.equals(lastSetHostName)) {
                            Log.info("CRON " + threadName + " on host:" + myHostname + " I'm responsible for this task, invoking tick ...");
                            tick.run();
                        } else {
                            if (iVoted) {
                                Log.info("CRON " + threadName + " on host:" + myHostname + " I lost the vote for this task, won by host:" + lastSetHostName);
                            }
                        }
                    } else {
                        tick.run();
                    }
                } catch (Exception e) {
                    Log.error(false, this, threadName + " error during tick reason:" + e.getMessage(), e);
                }
            }
        }, cronSchedule);
    }

I do plan to improve the algorithm slightly...

amanteaux commented 2 years ago

@devology-rob Wow thank you for sharing this!

On my side, I thought about making each node try to run an update query, and then, the only node who will succeed the update query would then execute the job. The update SQL query is something like update wisp_cluster set last_execution_time = expected_execution_time where job_name = name AND last_execution_time < expected_execution_time) So it works only for jobs that are executed using a cron expression or using a fixed time executing, but I guess for this specific use case of cluster synchronization, it is ok. For now the skeleton of this code is contained in: https://github.com/Coreoz/Wisp/compare/master...v3#diff-f7a9606d334f9a338196e5eab50dbd43daa1601e89b8e7caa4bb71bf3c4549e4 Feel free to add any suggestion!

I keep your implementation in mind though, it might be useful for more complex cases!

amanteaux commented 2 months ago

This feature actually requires some work, especially because querying the database may bring dependencies. Ideally this would be implemented using only a java.sql.Connection provider and raw JDBC. This way, this can be used in any framework without bringing new dependencies.

I do not have a lot of time to work on this lately, but if someone wants to start implementing something from the v3 branch that introduces more modularization, I would be happy to review it (if possible, it would be great to make intermediate PRs, to validate the work little by little).