0JUUU / spring-batch

Spring Boot 기반으로 개발하는 Spring Batch
1 stars 0 forks source link

섹션 7. 스프링 배치 청크 프로세스 이해 #11

Open 0JUUU opened 2 years ago

0JUUU commented 2 years ago
0JUUU commented 2 years ago

Chunk

Chunk<I> vs Chunk<O>

image

  1. ItemReader가 Source를 하나씩 읽고 하나씩 Chunk<I>에 저장 ➡️ 이를 Chunk 크기만큼 실행
  2. Chunk 크기만큼 쌓였다면 Chunk<I>를 ItemProcessor에 전달
  3. ItemProcessor: 전달받은 Chunk(list) 를 적절하게 가공해서 Chunk<O>에 저장
    • 이 때 아이템들을 하나씩 읽어 처리한다.
  4. Chunk<O>를 ItemWriter에 전달
  5. ItemWriter: 데이터 쓰기 작업
0JUUU commented 2 years ago

ChunkOrientedTasklet - 개념 및 API 소개

image

  1. TaskletStep: execute 메서드 ➡️ ChunkOrientedTasklet 호출
  2. ChunkOrientedTasklet: provide 메서드 ➡️ ChunkProvider 호출
  3. ChunkProvider: ItemReader에게 Item을 하나씩 read하도록 지시
    • 이 것을 Chunk Size만큼 반복
  4. ChunkOrientedTasklet: ChunkProcessor에게 읽은 데이터를 가공하라고 명령 (앞서 read한 내용들이 inputs로 들어감)
  5. ChunkProcessor: ItemProcessor에게 가공하라고 명령
    • 이것을 전달된 아이템 개수만큼 반복
  6. ChunkProcessor: 가공된 아이템을 ItemWriter에게 전달
  7. ItemWriter: 저장하는 등의 쓰기 처리
  8. 위의 일련의 과정이 하나의 사이클로 이후 ChunkOrientedTasklet에 가서 읽을 Item이 없을 때까지 반복 image
// step1 구성
    @Bean
    @JobScope
    public Step step1() {
        return stepBuilderFactory.get("step1")
                                 .<String, String>chunk(2)
                                 .reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3", "item4", "item5", "item6")))
                                 .processor(new ItemProcessor<String, String>() {
                                     @Override
                                     public String process(String item) throws Exception {
                                         return "my_" + item;
                                     }
                                 })
                                 .writer(new ItemWriter<String>() {
                                     @Override
                                     public void write(List<? extends String> items) throws Exception {
                                         items.forEach(System.out::println);
                                         System.out.println("=============");
                                     }
                                 })
                                 .build();
    }

ChunkSize : 3

image

ChunkSize : 2

image
0JUUU commented 2 years ago

ChunkOrientedTasklet - ChunkProvider / ChunkProcessor

ChunkProvider

ChunkProcessor

0JUUU commented 2 years ago

ItemReader / ItemWriter / ItemProcessor 이해

ItemReader

image

ItemWriter

image

ItemProcessor

image

0JUUU commented 2 years ago

ItemStream

image

image

public class CustomItemStreamReader implements ItemStreamReader<String> {

    private final List<String> items;
    private int index = -1;
    private boolean restart = false;

    public CustomItemStreamReader(List<String> items) {
        this.items = new ArrayList<>(items);
        this.index = 0;
    }

    @Override
    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        String item = null;
        if (index < items.size()) {
            item = items.get(index);
            index++;
        }
        if (index == 6 && !restart) {
            throw new RuntimeException("Restart is required");
        }

        return item;
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey("index")) {
            index = executionContext.getInt("index");
            restart = true;
        } else {
            index = 0;
            executionContext.put("index", index);
        }
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.put("index", index);
    }

    @Override
    public void close() throws ItemStreamException {
        // 리소스 해제, 초기화 작업했던 것들을 해제
        System.out.println("close");
    }
}

1번째 실행

image

2번째 실행 ➡️ 실패한 이후부터 읽는다.

image
0JUUU commented 2 years ago

Chunk Process 아키텍처

image

image

  1. Job을 실행하면 TaskletStep이 실행됨
  2. TaskletStep은 내부에 RepeatTemplate이라는 반복기를 가지고 있어 ChunkOrientedTasklet을 반복
  3. ChunkOrientedTasklet이 실행될 때 스프링 배치는 Transaction 경계 생성
  4. Chunk 단위로 작업 시작
  5. SimpleChunkProvider도 내부적으로 RepeatTemplate 반복기를 갖고 있어 ItemReader을 Chunk Size 만큼 반복시켜 데이터를 읽음
  6. Chunk Size만큼 읽고, 읽은 아이템이 담긴 Chunk<I>를 SimpleChunkProcessor에 넘김
  7. SimpleChunkProcessor는 전달받은 Chunk 데이터를 한 건씩 읽어 ItemProcessor로 데이터를 가공하여 Chunk<O>에 저장
  8. ItemWriter에게 Chunk가 갖고 있는 List 값을 전달하고 ItemWriter는 출력 처리 (트랜잭션 커밋)
  9. 이 과정이 Chunk 단위로 반복 , ItemReader에서 null 값을 읽을 때 반복작업 종료
    • 만약 중간에 예외 발생, 트랜잭션 롤백 & 작업 중단
    • 만약 ItemReader에서 null값을 읽어오게 된다면 RepeatStatus.FINISHED 를 통해 현재 작업을 마지막으로 다음부터는 반복 작업 ❌
    • Chunk 단위마다 새로운 트랜잭션 생성 / 커밋하는 과정이 존재