apache / shardingsphere-elasticjob

Distributed scheduled job
Apache License 2.0
8.12k stars 3.28k forks source link

Support Annotation Job #1445

Open Technoboy- opened 4 years ago

Technoboy- commented 4 years ago

As some end users have implemented an annotated job and the boot module has supported, we should support annotation jobs at the same time. Discusion here

Technoboy- commented 4 years ago

@jiang2015 @ArvinSiChuan @TeslaCN

8rV1n commented 4 years ago

Hi @Technoboy- , is it the @TeslaCN proposed one?

Technoboy- commented 4 years ago

Yes, could you help and discuss with others and contribute this together? And are you in the wechat group?

8rV1n commented 4 years ago

@Technoboy- Not in the group yet, but I'm glad to make contributes to the community.

8rV1n commented 4 years ago

@Technoboy- hi, how can I join the group?

Technoboy- commented 4 years ago

@Technoboy- hi, how can I join the group?

Leave your wechat id, I will add you and delete it after that.

caojingui commented 3 years ago

I have used it in my work this way。 I'm glad to make contributes to the community.

TeslaCN commented 3 years ago

I have used it in my work this way。 I'm glad to make contributes to the community.

Welcome! You may post you design in this issue and we can discuss it further.

caojingui commented 3 years ago

simpleJob annotion definition

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface SimpleJob {

    /**
     * default: job.getClass().getCanonicalName()
     * @see JobCoreConfiguration#getJobName()
     */
    String name() default "";

    /**
     * support spring expression ${key}
     * @see JobCoreConfiguration#getCron()
     */
    String cron();

    /**
     * @see JobCoreConfiguration#getShardingTotalCount()
     */
    int shardingTotalCount() default 1;

    /**
     * support spring expression
     *
     * @see JobCoreConfiguration#getShardingItemParameters()
     */
    String shardingItemParameters() default "";

    /**
     * support spring expression
     *
     * @see JobCoreConfiguration#getJobParameter()
     */
    String jobParameters() default "";

    /**
     * @see LiteJobConfiguration#isOverwrite()
     */
    boolean overwrite() default false;

    /**
     * @see LiteJobConfiguration#isDisabled()
     */
    boolean disabled() default false;

    /**
     * @see LiteJobConfiguration#isMonitorExecution()
     */
    boolean monitorExecution() default true;

    /**
     * @see JobCoreConfiguration#isFailover()
     */
    boolean failover() default false;

    /**
     * @see JobCoreConfiguration#isMisfire()
     */
    boolean misfire() default true;

    /**
     * @see JobPropertiesEnum#JOB_EXCEPTION_HANDLER
     */
    String jobExceptionHandler() default "";

    /**
     * @see LiteJobConfiguration#getJobShardingStrategyClass()
     */
    String jobShardingStrategyClass() default "";

    /**
     * @see JobCoreConfiguration#getDescription()
     */
    String description() default "";
}

deal with BeanDefinitionRegistryPostProcessor

public class ElasticJobScannerConfigurer implements BeanDefinitionRegistryPostProcessor, BeanFactoryAware, EnvironmentAware {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private static final String registryCenterRef = "zookeeperRegistryCenter";
    private ConfigurableListableBeanFactory beanFactory;
    private Environment environment;

    /**
     * @see ZookeeperConfiguration#getNamespace()
     */
    private String namespace;

    /**
     * @see ZookeeperConfiguration#getServerLists()
     */
    private String serverList;

    private String defaultJobExceptionHandler = "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler";
    private String defaultJobShardingStrategyClass = "com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy";

    public ElasticJobScannerConfigurer() {
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
    }

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
        checkConfig();
        registryZookeeperConfigurationBeanDefinition(registry);
        for (String beanName : registry.getBeanDefinitionNames()) {
            Class clazz = beanFactory.getType(beanName);
            if (!SimpleJob.class.isAssignableFrom(clazz)) {
                continue;
            }

            SimpleJob jobConfig = (SimpleJob) clazz.getAnnotation(SimpleJob.class);
            if (jobConfig == null) {
                continue;
            }

            String jobName = clazz.getCanonicalName();
            if (!Strings.isNullOrEmpty(jobConfig.name())) {
                jobName = jobConfig.name();
            }
            logger.info("register elasticJob jobName={}, jobClass={}, config={}", jobName, clazz.getCanonicalName(),
                    JSON.toJSONString(jobConfig));

            BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
            factory.addConstructorArgReference(beanName);
            factory.addConstructorArgReference(registryCenterRef);
            factory.addConstructorArgValue(getLiteJobConfiguration(jobConfig, jobName, jobName));
            factory.addConstructorArgValue(BeanDefinitionBuilder.rootBeanDefinition(JobEventLogConfiguration.class).getBeanDefinition());
            factory.addConstructorArgValue(Lists.newArrayList());

            factory.setInitMethodName("init");
            factory.setDestroyMethodName("shutdown");
            registry.registerBeanDefinition(jobName, factory.getBeanDefinition());
        }
    }

    private void checkConfig() {
        if (Strings.isNullOrEmpty(namespace)) {
            throw new BeanCreationException("namespace can not be empty");
        }
        if (Strings.isNullOrEmpty(serverList)) {
            throw new BeanCreationException("serverList can not be empty");
        }
    }

    private void registryZookeeperConfigurationBeanDefinition(BeanDefinitionRegistry registry) {
        logger.info("register bean of ZookeeperRegistryCenter with namspace={}, srverList={}", namespace, serverList);
        BeanDefinitionBuilder configuration = BeanDefinitionBuilder.rootBeanDefinition(ZookeeperConfiguration.class);
        configuration.addConstructorArgValue(serverList);
        configuration.addConstructorArgValue(namespace);

        BeanDefinitionBuilder result = BeanDefinitionBuilder.rootBeanDefinition(ZookeeperRegistryCenter.class);
        result.addConstructorArgValue(configuration.getBeanDefinition());
        result.setInitMethodName("init");
        result.setDestroyMethodName("close");
        registry.registerBeanDefinition(registryCenterRef, result.getBeanDefinition());
    }

    private LiteJobConfiguration getLiteJobConfiguration(ElasticJobConfig config, String jobName, String jobClassName) {

        String cron = environment.resolvePlaceholders(config.cron());
        String jobExceptionHandler = config.jobExceptionHandler();
        if (Strings.isNullOrEmpty(jobExceptionHandler)) {
            jobExceptionHandler = defaultJobExceptionHandler;
        }
        String jobShardingStrategyClass = config.jobShardingStrategyClass();
        if (Strings.isNullOrEmpty(jobShardingStrategyClass)) {
            jobShardingStrategyClass = defaultJobShardingStrategyClass;
        }

        String jobParameters = config.jobParameters();
        if (!Strings.isNullOrEmpty(jobParameters)) {
            jobParameters = environment.resolvePlaceholders(jobParameters);
        }

        String shardingItemParameters = config.shardingItemParameters();
        if (!Strings.isNullOrEmpty(shardingItemParameters)) {
            shardingItemParameters = environment.resolvePlaceholders(shardingItemParameters);
        }

        JobCoreConfiguration coreConfiguration = JobCoreConfiguration.newBuilder(jobName, cron, config.shardingTotalCount())
                .shardingItemParameters(shardingItemParameters)
                .jobProperties(JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), jobExceptionHandler)
                .misfire(config.misfire())
                .description(config.description())
                .failover(config.failover())
                .jobParameter(jobParameters)
                .build();

        SimpleJobConfiguration jobConfiguration = new SimpleJobConfiguration(coreConfiguration, jobClassName);

        return LiteJobConfiguration.newBuilder(jobConfiguration)
                .jobShardingStrategyClass(jobShardingStrategyClass)
                .disabled(config.disabled())
                .overwrite(config.overwrite())
                .monitorExecution(config.monitorExecution())
                .build();
    }

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = (ConfigurableListableBeanFactory) beanFactory;
    }

    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }

    public String getNamespace() {
        return namespace;
    }

    public void setNamespace(String namespace) {
        this.namespace = namespace;
    }

    public String getServerList() {
        return serverList;
    }

    public void setServerList(String serverList) {
        this.serverList = serverList;
    }

    public String getDefaultJobExceptionHandler() {
        return defaultJobExceptionHandler;
    }

    public void setDefaultJobExceptionHandler(String defaultJobExceptionHandler) {
        this.defaultJobExceptionHandler = defaultJobExceptionHandler;
    }

    public String getDefaultJobShardingStrategyClass() {
        return defaultJobShardingStrategyClass;
    }

    public void setDefaultJobShardingStrategyClass(String defaultJobShardingStrategyClass) {
        this.defaultJobShardingStrategyClass = defaultJobShardingStrategyClass;
    }
}

config and use

<bean class="com.dangdang.ddframe.job.lite.spring.configurer.ElasticJobScannerConfigurer">
    <property name="namespace" value="${ejob.zk.namespace}"/>
    <property name="serverList" value="${ejob.zk.url}"/>
</bean>
@SimpleJob(cron = "0 0 10,14,16 * * ?")
public class TestJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        // todo
    }
}

other

  1. like this, defined @ScriptJob @DataflowJob
  2. with spring Import we can defined @EnableElasticJob to replace config the ElasticJobScannerConfigurer
TeslaCN commented 3 years ago

I think define an annotation for specific job type may not conducive to extension, each new job type is need to create a corresponding annotation. Maybe we can define a generic annotation, such as @ElasticJob which I propose in issue #970, and we need a better naming.

caojingui commented 3 years ago

👍 define a generic annotation, then we can get the jobType from the bean class, and auto registry the specific job type

skaic commented 3 years ago

so,Annotation Job Has it stopped? @TeslaCN

TeslaCN commented 3 years ago

Hi @skaic I think we can finish it in 3.1.0.

sunkai-cai commented 3 years ago

@TeslaCN Let's finish this work :)

sunkai-cai commented 3 years ago

I prefer to discuss the annotation of ElasticJob plan.

  1. define annotation ElasticJobConfiguration in api , using like this:
@ElasticJobConfiguration(
        cron = "0/5 * * * * ?",
        jobName = "SimpleTestJobFirst",
        shardingTotalCount = 3,
        shardingItemParameters = "0=Beijing,1=Shanghai,2=Guangzhou",
        jobListenerTypes = {"NOOP", "LOG"},
        extraConfigurations = {SimpleTracingConfigurationFactory.class},
        props = {
                @ElasticJobProp(key = "print.title", value = "test title"),
                @ElasticJobProp(key = "print.content", value = "test content")
        }
)
public class SimpleTestJob implements CustomJob {

    @Override
    public void execute(final ShardingContext shardingContext) {
    }

}
public class SimpleTracingConfigurationFactory implements JobExtraConfigurationFactory {

    public JobExtraConfiguration getJobExtraConfiguration() {
        BasicDataSource dataSource = new BasicDataSource();
        dataSource.setDriverClassName(org.h2.Driver.class.getName());
        dataSource.setUrl("jdbc:h2:mem:job_event_storage");
        dataSource.setUsername("sa");
        dataSource.setPassword("");
        return new TracingConfiguration<>("RDB", dataSource);
    }
}
//Java example
public final class JavaMain {
    .....
    public static void main(final String[] args) throws IOException {
        CoordinatorRegistryCenter regCenter = setUpRegistryCenter();
        (new ScheduleJobBootstrap(regCenter,new SimpleTestJob())).schedule();
    }
    .....
}
  1. define annotation EnableElastic in spring-code , using like this:
//spring example
@Configurable
@EnableElastic(scanBasePackages = "org.apache.shardingsphere.elasticjob.lite.spring.core.annotation.job.impl")
public class ElasticConfig {

}

or

//spring boot example
@Configurable
@EnableElastic(scanBasePackages = "org.apache.shardingsphere.elasticjob.lite.spring.core.annotation.job.impl")
public class ElasticConfig {

}
justfunxin commented 3 years ago

i also write a annotation elastic job springboot starter,maybe you can take a look at it.

https://github.com/kangarooxin/elastic-job3-spring-boot-starter

linghengqian commented 1 year ago
sunkai-cai commented 1 year ago
  • So for this issue, the only thing missing is documentation? Few downstream users seem to know about this feature.

You are right。we need documentation for the new features。