kagkarlsson / db-scheduler

Persistent cluster-friendly scheduler for Java
Apache License 2.0
1.26k stars 191 forks source link

Dynamic Cron tasks #335

Closed s-evgheni closed 4 months ago

s-evgheni commented 2 years ago

@kagkarlsson I was looking at your library, trying to figure out how would one go about creating dynamic CRON task ?

My current understanding is that due to the fact that CronSchedule is not Seriallizable it is not a good fit for this kind of operation.

Here is what I was trying to achieve.

  1. In a SpringBoot app define task configuration such us:

    @Bean
    public Task<PlainScheduleAndData> dynamicCronTask() {
    
        return Tasks.recurringWithPersistentSchedule("dynamic-recurring-task", PlainScheduleAndData.class)
                     .execute((taskInstance, executionContext) -> {
                    log.info("Running dynamic-recurring-task... Instance: '" + taskInstance.getId() + "' ran using persistent schedule: " + taskInstance.getData().getSchedule());
        });
    }
  2. Try to schedule the task above from a Spring controller:

    @GetMapping(value = "/dbs-enque-cron-job", produces = {MediaType.TEXT_PLAIN_VALUE})
    public String dbsNewCronJob(@RequestParam(value = "cron_expression", defaultValue = "0 */1 * * * *") String cronExpression) {
        log.info("Provided cron value:" + cronExpression);
        //get task definition
        Task<PlainScheduleAndData> task =  taskConfiguration.dynamicCronTask();
    
        //define run schedule based on request parameters
        Schedule cron = Schedules.cron(cronExpression);
        PlainScheduleAndData when = new PlainScheduleAndData(cron);
    
        //create instance based on task definition and run schedule
        String randomTaskInstanceId = UUID.randomUUID().toString();
        SchedulableInstance<?> taskInstance = task.schedulableInstance(randomTaskInstanceId, when);
    
        //schedule instance for recurring execution (e.g: add to Postgres scheduled_tasks table)
       //this step currently fails... 
        this.schedulingService.getDbScheduler().schedule(taskInstance);
    
        return "Recurring Job enqueued with an id of: " + taskInstance.getId() + " and cron schedule:" + when.getSchedule().toString();
    }

Outcome:

java.io.NotSerializableException: com.github.kagkarlsson.scheduler.task.schedule.CronSchedule
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) ~[na:1.8.0_345]
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) ~[na:1.8.0_345]
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) ~[na:1.8.0_345]
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) ~[na:1.8.0_345]
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) ~[na:1.8.0_345]
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) ~[na:1.8.0_345]
    at com.github.kagkarlsson.scheduler.boot.autoconfigure.DbSchedulerAutoConfiguration$2.serialize(DbSchedulerAutoConfiguration.java:220) ~[db-scheduler-spring-boot-starter-11.5.jar:na]
    at com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository.lambda$createIfNotExists$0(JdbcTaskRepository.java:106) ~[db-scheduler-11.5.jar:na]
    at com.github.kagkarlsson.jdbc.JdbcRunner.lambda$execute$2(JdbcRunner.java:79) ~[micro-jdbc-0.3.jar:na]
    at com.github.kagkarlsson.jdbc.JdbcRunner.withConnection(JdbcRunner.java:140) ~[micro-jdbc-0.3.jar:na]
    at com.github.kagkarlsson.jdbc.JdbcRunner.execute(JdbcRunner.java:66) ~[micro-jdbc-0.3.jar:na]
    at com.github.kagkarlsson.jdbc.JdbcRunner.execute(JdbcRunner.java:54) ~[micro-jdbc-0.3.jar:na]
    at com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository.createIfNotExists(JdbcTaskRepository.java:101) ~[db-scheduler-11.5.jar:na]
    at com.github.kagkarlsson.scheduler.SchedulerClient$StandardSchedulerClient.schedule(SchedulerClient.java:239) ~[db-scheduler-11.5.jar:na]
    at com.github.kagkarlsson.scheduler.SchedulerClient$StandardSchedulerClient.schedule(SchedulerClient.java:246) ~[db-scheduler-11.5.jar:na]
    at com.github.kagkarlsson.scheduler.Scheduler.schedule(Scheduler.java:173) ~[db-scheduler-11.5.jar:na]
    at com.example.app.api.JobController.dbsNewCronJob(JobController.java:81) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_345]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_345]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_345]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_345]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-5.3.23.jar:5.3.23]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-5.3.23.jar:5.3.23]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) ~[spring-webmvc-5.3.23.jar:5.3.23]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) ~[spring-webmvc-5.3.23.jar:5.3.23]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.23.jar:5.3.23]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.23.jar:5.3.23]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1071) ~[spring-webmvc-5.3.23.jar:5.3.23]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:964) ~[spring-webmvc-5.3.23.jar:5.3.23]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.23.jar:5.3.23]
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.23.jar:5.3.23]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:655) ~[tomcat-embed-core-9.0.65.jar:4.0.FR]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.23.jar:5.3.23]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:764) ~[tomcat-embed-core-9.0.65.jar:4.0.FR]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.23.jar:5.3.23]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.23.jar:5.3.23]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.23.jar:5.3.23]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.23.jar:5.3.23]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:96) ~[spring-boot-actuator-2.7.4.jar:2.7.4]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.23.jar:5.3.23]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.23.jar:5.3.23]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.23.jar:5.3.23]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541) [tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135) [tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:360) [tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:399) [tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:890) [tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1789) [tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) [tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) [tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.65.jar:9.0.65]
    at java.lang.Thread.run(Thread.java:750) [na:1.8.0_345]
s-evgheni commented 2 years ago

Furthermore there seems to be a bug in the framework which will allow you to perform dynamic CRON scheduling if you manage to schedule a delayed task before it with the same set of primary keys. Like this:

    @GetMapping(value = "/dbs-enque-cron-job", produces = {MediaType.TEXT_PLAIN_VALUE})
    public String dbsNewCronJob(@RequestParam(value = "cron_expression", defaultValue = "0 */1 * * * *") String cronExpression) {
        log.info("Provided cron value:" + cronExpression);
        //get task definition
        Task<PlainScheduleAndData> task =  taskConfiguration.dynamicCronTask();

        //define run schedule
        Schedule cron = Schedules.cron(cronExpression);
        PlainScheduleAndData when = new PlainScheduleAndData(cron);

        //create instance based on task definition and run schedule
        String randomTaskInstanceId = UUID.randomUUID().toString();

        SchedulableInstance<?> taskInstanceOne = task.schedulableInstance(randomTaskInstanceId, new PlainScheduleAndData(Schedules.fixedDelay(Duration.ofSeconds(1))));
        SchedulableInstance<?> taskInstanceTwo = task.schedulableInstance(randomTaskInstanceId, when);

        //schedule instance for recurring execution (e.g: add to Postgres scheduled_tasks table)
        //1st add serializable FixedDelay task instance
        this.schedulingService.getDbScheduler().schedule(taskInstanceOne);
        //2nd override FixedDelay task instance with a cron expression
        this.schedulingService.getDbScheduler().schedule(taskInstanceTwo);

        return "Recurring Job enqueued with an id of: " + taskInstanceTwo.getId() + " and cron schedule:" + when.getSchedule().toString();
    }

So essentially in this scenario we will end up with running taskInstanceTwo logic on taskInstanceOne schedule.

s-evgheni commented 2 years ago

Anther problem that I'd encountered is the ability to schedule the same recurring logic as many times as you want.

For example if I define a bean like this in the configuration:

    @Bean
    public RecurringTask<Void> cronTask() {
        Schedule cron = Schedules.cron("0 */2 * * * *");
        return Tasks.recurring("auto-cron-task", cron)
                .execute((taskInstance, executionContext) -> {
                    log.info("Running auto-cron-task... Instance: '" + taskInstance.getId());
                });
    }

And a simple GET route like this:

    @GetMapping(value = "/enque-cron", produces = {MediaType.TEXT_PLAIN_VALUE})
    public String dbsNewCronJob() {
        RecurringTask<Void> task =  taskConfiguration.cronTask();

        //create instance based on task definition and run schedule
        String randomTaskInstanceId = UUID.randomUUID().toString();

        SchedulableInstance<?> taskInstanceOne = task.schedulableInstance(randomTaskInstanceId);
        this.schedulingService.getDbScheduler().schedule(taskInstanceOne);

        return "Recurring Job enqueued with an id of: " + taskInstanceOne.getId();
    }

Two things will happen.

  1. The first copy of the cron task will be auto picked up by the scheduler on startup and scheduled for execution.
  2. A user will be able to schedule an unlimited number of the same task by just simply calling the GET route.

I was under assumption that the library should prevent this from happening and make sure that only a singe copy of the same instance can be marked for execution at any given time. Further attempts to add the same task should be prevented, but maybe I'm just doing something silly here. The documentation is not very clear on the best practices with your library so I'm just building on a lot of my own assumptions at this point.

s-evgheni commented 2 years ago

@kagkarlsson your feedback will be greatly appreciated :)

kagkarlsson commented 2 years ago

Yeah, currently I suppose CronSchedule is not Serializable, it probably should be! Ideally, only the cron-pattern string is serialized. You could copy the CronSchedule and adapt as needed.

        //1st add serializable FixedDelay task instance
        this.schedulingService.getDbScheduler().schedule(taskInstanceOne);
        //2nd override FixedDelay task instance with a cron expression
        this.schedulingService.getDbScheduler().schedule(taskInstanceTwo);

The behavior for client.schedule(..) is currently that it will not overwrite. It could be enhanced with explicit methods that actually will replace any existing. A workaround should be to cancel the existing execution first.

So essentially in this scenario we will end up with running taskInstanceTwo logic on taskInstanceOne schedule.

The logic is the same. It is using the same task.

    @GetMapping(value = "/enque-cron", produces = {MediaType.TEXT_PLAIN_VALUE})
    public String dbsNewCronJob() {
        RecurringTask<Void> task =  taskConfiguration.cronTask();

        //create instance based on task definition and run schedule
        String randomTaskInstanceId = UUID.randomUUID().toString();

        SchedulableInstance<?> taskInstanceOne = task.schedulableInstance(randomTaskInstanceId);
        this.schedulingService.getDbScheduler().schedule(taskInstanceOne);

        return "Recurring Job enqueued with an id of: " + taskInstanceOne.getId();
    }

"Static" recurring tasks should not be scheduled explicitly, it will be handled by the scheduler (which then guarantees a single instance). The problem here is that it is scheduled explicitly with multiple unique instance-ids. If you want to do that, you need a "dynamic" recurring task.