spring-projects / spring-batch

Spring Batch is a framework for writing batch applications using Java and Spring
http://projects.spring.io/spring-batch/
Apache License 2.0
2.73k stars 2.35k forks source link

Spring Batch issue with MultiResourceItemWriter and ClassifierCompositeItemWriter as its returning the invalid count into the output file #4660

Open javaHelper opened 2 months ago

javaHelper commented 2 months ago

I am using Spring Boot + Batch v2.7.1 in my project and looks like there is a bug when Reading from FlatFileItemReader using ClassifierCompositeItemWriter and MultiResourceItemWriter as itemCountLimitPerResource value doesn't works well and gives wrong responses.

I am reading csv file and splitting into multiple files having max records in every file should be 5 only, but the code which I developed giving me 7 values.

Code Uploaded here: https://github.com/javaHelper/bug-4660/tree/main/bug-4660

@EnableBatchProcessing
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class MultiResourceSplitApplication {

    public static void main(String[] args) {
        SpringApplication.run(MultiResourceSplitApplication.class, args);
    }
}
package com.example;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.file.builder.MultiResourceItemWriterBuilder;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.support.builder.ClassifierCompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.classify.Classifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;

@Configuration
public class MyJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public FlatFileItemReader<Employee> itemReader() {
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("empId", "firstName", "lastName", "role");

        DefaultLineMapper<Employee> employeeLineMapper = new DefaultLineMapper<>();
        employeeLineMapper.setLineTokenizer(tokenizer);
        employeeLineMapper.setFieldSetMapper(new EmployeeFieldSetMapper());
        employeeLineMapper.afterPropertiesSet();

        return new FlatFileItemReaderBuilder<Employee>()
                .name("flatFileReader")
                .linesToSkip(1)
                .resource(new ClassPathResource("employee.csv"))
                .lineMapper(employeeLineMapper)
                .build();
    }

    @Bean
    public ClassifierCompositeItemWriter<Employee> classifierCompositeItemWriter() throws Exception {
        Classifier<Employee, ItemWriter<? super Employee>> classifier = new EmployeeClassifier(
                javaDeveloperItemWriter(), 
                pythonDeveloperItemWriter(), 
                cloudDeveloperItemWriter());

        return new ClassifierCompositeItemWriterBuilder<Employee>()
                .classifier(classifier)
                .build();
    }

    @Bean
    public ItemWriter<Employee> javaDeveloperItemWriter() {
        FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
                .lineAggregator(new PassThroughLineAggregator<>())
                .name("iw1")
                .build();

        return new MultiResourceItemWriterBuilder<Employee>()
                .name("javaDeveloperItemWriter")
                .delegate(itemWriter)
                .resource(new FileSystemResource("javaDeveloper-employee.csv"))
                .itemCountLimitPerResource(5)
                .resourceSuffixCreator(index -> "-" + index)
                .build();
    }

    @Bean
    public ItemWriter<Employee> pythonDeveloperItemWriter() {
        FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
                .lineAggregator(new PassThroughLineAggregator<>())
                .name("iw2")
                .build();

        return new MultiResourceItemWriterBuilder<Employee>()
                .name("pythonDeveloperItemWriter")
                .delegate(itemWriter)
                .resource(new FileSystemResource("pythonDeveloper-employee.csv"))
                .itemCountLimitPerResource(5)
                .resourceSuffixCreator(index -> "-" + index)
                .build();
    }

    @Bean
    public ItemWriter<Employee> cloudDeveloperItemWriter() {
        FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
                .lineAggregator(new PassThroughLineAggregator<>())
                .name("iw3")
                .build();

        return new MultiResourceItemWriterBuilder<Employee>()
                .name("cloudDeveloperItemWriter")
                .delegate(itemWriter)
                .resource(new FileSystemResource("cloudDeveloper-employee.csv"))
                .itemCountLimitPerResource(5)
                .resourceSuffixCreator(index -> "-" + index)
                .build();
    }

    @Bean
    public Step step() throws Exception {
        return stepBuilderFactory.get("step")
                .<Employee, Employee>chunk(3)
                .reader(itemReader())
                .writer(classifierCompositeItemWriter())
                .build();
    }

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("job")
                .start(step())
                .build();
    }
}
import org.springframework.batch.item.ItemWriter;
import org.springframework.classify.Classifier;

import lombok.Setter;

@Setter
public class EmployeeClassifier implements Classifier<Employee, ItemWriter<? super Employee>> {
    private static final long serialVersionUID = 1L;
    private ItemWriter<Employee> javaDeveloperFileItemWriter;
    private ItemWriter<Employee> pythonDeveloperFileItemWriter;
    private ItemWriter<Employee> cloudDeveloperFileItemWriter;

    public EmployeeClassifier() {

    }

    public EmployeeClassifier(ItemWriter<Employee> javaDeveloperFileItemWriter,
                              ItemWriter<Employee> pythonDeveloperFileItemWriter,
                              ItemWriter<Employee> cloudDeveloperFileItemWriter) {
        this.javaDeveloperFileItemWriter = javaDeveloperFileItemWriter;
        this.pythonDeveloperFileItemWriter = pythonDeveloperFileItemWriter;
        this.cloudDeveloperFileItemWriter = cloudDeveloperFileItemWriter;
    }

    @Override
    public ItemWriter<? super Employee> classify(Employee employee) {
        if(employee.getRole().equals("Java Developer")){
            return javaDeveloperFileItemWriter;
        }
        else if(employee.getRole().equals("Python Developer")){
            return pythonDeveloperFileItemWriter;
        }
        return cloudDeveloperFileItemWriter;
    }
}
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Employee {
    private String empId;
    private String firstName;
    private String lastName;
    private String role;

    @Override
    public String toString() {
        return empId + "," + firstName + "," + lastName + "," + role;
    }
}
public class EmployeeFieldSetMapper implements FieldSetMapper<Employee> {
    @Override
    public Employee mapFieldSet(FieldSet fieldSet) throws BindException {
        return Employee.builder()
                .empId(fieldSet.readRawString("empId"))
                .firstName(fieldSet.readRawString("firstName"))
                .lastName(fieldSet.readRawString("lastName"))
                .role(fieldSet.readRawString("role"))
                .build();
    }
}

employee.csv

empId,firstName,lastName,role
1,Mike ,Doe,Java Developer
2,Matt ,Doe,Java Developer
3,Deepak ,Doe,Python Developer
4,Neha ,Doe,Python Developer
5,Harish,Doe,Python Developer
6,Parag ,Doe,Python Developer
7,Harshita ,Doe,Python Developer
8,Pranali ,Doe,Python Developer
9,Raj ,Doe,Python Developer
10,Ravi,Doe,Python Developer
11,Gagan,Doe,Java Developer
12,Ashish ,Doe,Java Developer
13,Rajesh,Doe,Java Developer
14,Anosh ,Doe,Java Developer
15,Arpit ,Doe,Java Developer
16,Sneha ,Doe,Java Developer
17,Sneha ,Doe,Java Developer

Output: javaDeveloper-employee.csv-1

1,Mike ,Doe,Java Developer
2,Matt ,Doe,Java Developer
11,Gagan,Doe,Java Developer
12,Ashish ,Doe,Java Developer
13,Rajesh,Doe,Java Developer
14,Anosh ,Doe,Java Developer
15,Arpit ,Doe,Java Developer

javaDeveloper-employee.csv-2

16,Sneha ,Doe,Java Developer
17,Sneha ,Doe,Java Developer

pythonDeveloper-employee.csv-1

3,Deepak ,Doe,Python Developer
4,Neha ,Doe,Python Developer
5,Harish,Doe,Python Developer
6,Parag ,Doe,Python Developer
7,Harshita ,Doe,Python Developer
8,Pranali ,Doe,Python Developer
9,Raj ,Doe,Python Developer

pythonDeveloper-employee.csv-1

10,Ravi,Doe,Python Developer

When I used chunk(0) - its resulting into correct count in to the file, but when it comes to huge data, the performance is very very flow. Any suggestion

javaHelper commented 1 month ago

@fmbenhassine - Were you able to re-produce this issue?

fmbenhassine commented 1 month ago

No, I did not start working on this yet. I would love to help if you can package all code snippets with a pom.xml in a zip or a repository that I can clone/run. There is a project template that you can use as a starting point here: https://github.com/spring-projects/spring-batch/blob/main/ISSUE_REPORTING.md

javaHelper commented 1 month ago

@fmbenhassine - I've uploaded code here: https://github.com/javaHelper/bug-4660/tree/main/bug-4660. Could you please have a look?

fmbenhassine commented 1 month ago

Great! Thank you for the sample. I will take a look and get back to you asap.

fmbenhassine commented 1 month ago

Thank you for providing a minimal example, well done! I am able to reproduce the issue and I think this is a bug. In fact, the itemCountLimitPerResource is not respected in this case.

An interesting finding I noticed as well is that when I set the chunk size to 10, everything is written in the same file (ie the itemCountLimitPerResource is not respected neither).

I will plan the fix in a future patch release, because as of now we are working on the upcoming 5.2. If someone manages to fix the issue, then a PR is welcome.

hpoettker commented 1 month ago

I think the problem isn't related to the ClassifierCompositeItemWriter but a duplicate of #1722.

The effect of itemCountLimitPerResource is indeed surprising, so it might be a good idea to change it. But the current behaviour is documented in the JavaDoc of MultiResourceItemWriter: https://github.com/spring-projects/spring-batch/blob/main/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/file/MultiResourceItemWriter.java#L37

javaHelper commented 1 month ago

@fmbenhassine - Thanks. Agree its a bug and its reproducible. When I tested in my project with the huge volume keeping 1/2 millions records as itemCountLimitPerResource, I surprisingly saw the different outputs.
Hope this will be fix soon.