spring-cloud / spring-cloud-dataflow

A microservices-based Streaming and Batch data processing in Cloud Foundry and Kubernetes
https://dataflow.spring.io
Apache License 2.0
1.09k stars 579 forks source link

Task table prefix is not resolved correctly in CTR #5854

Open klopfdreh opened 4 days ago

klopfdreh commented 4 days ago

Description: Currently we are receiving a lot of warnings when starting a composed task. During our migration we found out that the lookup of the table prefix is broken as the relax binding rules of spring are resolving the property TABLEPREFIX to tableprefix instead of tablePrefix

See:

{"timestamp":"2024-06-27T10:17:41.283+0200","level":"WARN","thread":"main","logger":"org.springframework.cloud.dataflow.composedtaskrunner.ComposedTaskRunnerConfiguration","message":"Cannot find app.s3-uploader-app.spring.cloud.task.tablePrefix in {...., app.s3-uploader-app.spring.cloud.task.tableprefix=BOOT3TASK,....}

Release versions: 2.11.2

Custom apps: N/A

Steps to reproduce: Just launch a task with the new CTR

Screenshots: N/A

Additional context: N/A

Solution:

Replace code:

logger.debug("addTaskExplorer:{}", taskName);
String propertyName = String.format("app.%s.spring.cloud.task.tablePrefix", taskName);
String prefix = properties.getComposedTaskAppProperties().get(propertyName);
if (prefix == null) {
    prefix = env.getProperty(propertyName);
}
if (prefix != null) {
    TaskExecutionDaoFactoryBean factoryBean = new MultiSchemaTaskExecutionDaoFactoryBean(dataSource, prefix);
    logger.debug("taskExplorerContainer:adding:{}:{}", taskName, prefix);
    explorers.put(taskName, new SimpleTaskExplorer(factoryBean));
} else {
    logger.warn("Cannot find {} in {} ", propertyName, properties.getComposedTaskAppProperties());
}

With:

logger.debug("addTaskExplorer:{}", taskName);

List<String> propertyNames = Stream.of("app.%s.spring.cloud.task.tablePrefix",
        "app.%s.spring.cloud.task.table-prefix",
        "app.%s.spring.cloud.task.tableprefix")
    .map(propertyName -> String.format(propertyName, taskName)).toList();

String prefix = propertyNames.stream()
    .map(propertyName -> {
        String prefixOfComposedTaskProperties = properties.getComposedTaskAppProperties().get(propertyName);
        return prefixOfComposedTaskProperties == null ? env.getProperty(propertyName) : prefixOfComposedTaskProperties;
    })
    .filter(Objects::nonNull)
    .findFirst().orElse(null);

if (prefix != null) {
    TaskExecutionDaoFactoryBean factoryBean = new MultiSchemaTaskExecutionDaoFactoryBean(dataSource, prefix);
    logger.debug("taskExplorerContainer:adding:{}:{}", taskName, prefix);
    explorers.put(taskName, new SimpleTaskExplorer(factoryBean));
} else {
    logger.warn("Cannot find {} in {} ", propertyNames, properties.getComposedTaskAppProperties());
}

in the CTR. You might need to downport it if you don't want to use Java 17. 😁