본문으로 바로가기

[SPRING] Tasklet 방식과 Chunk 방식 구현

category SPRING/기본 문법 2021. 7. 21. 00:30

배치 처리할있는 방법은 크게 2가지로 나뉩니다.

 

Tasklet로는 단순하게 처리할 수 있는 장점이 있으나, 대 용량을 감당하기엔 부하를 감당할 수 없습니다.

따라서 Chunk를 통해 쪼개서 넣을 필요가 있는데, Tasklet로도 쪼개서 넣을 수 있지만, 가독성에서 

떨어지기 때문에 권장하지 않습니다.

 

현재 사용하고 있는 실무에서도 대용량은 Chunk를 적극 사용하고 있다.

 

그렇다면 Tasklet 방식과 Chunk 방식에 대해 알아보자.

 

Tasklet을 사용한 Task 기반 처리

  • 배치 처리 과정이 비교적 쉬운 경우 쉽게 사용
  • 대량 처리를 하는 경우 더 복잡
  • 하나의 큰 덩어리를 여러 덩어리로 나누어 처리하기 부적합

Chunk를 사용한 chunk(덩어리) 기반 처리

  • ItemReader, ItemProcessor, ItemWriter의 관계 이해 필요
  • 대량 처리를 하는 경우 Tasklet 보다 비교적 쉽게 구현
  • 예를 들면 10,000개의 데이터 중 1,000개씩 10개의 덩어리로 수행

이를 Tasklet으로 처리하면 10,000개를 한번에 처리하거나, 수동으로 1,000개씩 분할

 

Task기반 배치와 Chunk 기반 배치

reader에서 null을 return 할 때 까지 Step은 반복

<INPUT, OUTPUT>chunk(int)

  • reader에서 INPUT 을 return
  • processor에서 INPUT을 받아 processing 후 OUPUT을 return

INPUT, OUTPUT은 같은 타입일있음

  • writer에서 List<OUTPUT>을 받아 write

 

배치를 실행 필요한 값을 parameter를 통해 외부에서 주입

JobParameters는 외부에서 주입된 parameter를 관리하는 객체

parameter를 JobParameters Spring EL(Expression Language)접근

  • String parameter = jobParameters.getString(key, defaultValue);
  • @Value(“#{jobParameters[key]}”)

 

구현된 소스

 

/**
 * public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize) {
 * 		return new SimpleStepBuilder<I, O>(this).chunk(chunkSize);
 * }
 */

@Configuration
@Slf4j
public class ChunkProcessingConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    public ChunkProcessingConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
    }

    @Bean
    public Job chunkProcessingJob() {
        return jobBuilderFactory.get("chunkProcessingJob")
                .incrementer(new RunIdIncrementer())
                .start(this.taskBaseStep())
                .next(this.chunkBaseStep(null))
                .build();
    }

    // chuck로 반복하여 100개 만들기
    @Bean
    @JobScope
    public TaskletStep chunkBaseStep(@Value("#{jobParameters[chunkSize]}") String chunkSize) {
        return stepBuilderFactory.get("chunkBaseStep")
                .<String, String>chunk(StringUtils.isNotEmpty(chunkSize) ? Integer.parseInt(chunkSize) : 10)
                .reader(itemReader())           // 인풋타입의 아이템을 하나씩 반환
                .processor(itemProcessor())     // 인풋타입을 받아서 아웃풋타입으로 리턴
                .writer(itemWriter())           // 리스트방식으로 반환 chunkSize 일괄처리
                .build();
    }

    private ItemReader<String> itemReader() {
        return new ListItemReader<>(getItems());
    }

    private ItemProcessor<String, String> itemProcessor() {
        return item -> item + ", Spring Batch";
    }

    private ItemWriter<String> itemWriter() {
        return items -> log.info("chunk item size : {}", items.size());
//        return items -> items.forEach(log::info);
    }

    public Step taskBaseStep() {
        return stepBuilderFactory.get("taskBaseStep")
                .tasklet(this.tasklet(null))
                .build();
    }

    @Bean
    @StepScope
    public Tasklet tasklet(@Value("#{jobParameters[chunkSize]}") String value) {
        List<String> items = getItems();
        return ((contribution, chunkContext) -> {
            StepExecution stepExecution = contribution.getStepExecution();
//            JobParameters jobParameters = stepExecution.getJobParameters();
//            String value = jobParameters.getString("chunkSize", "10");
            int chunkSize = StringUtils.isNotEmpty(value) ? Integer.parseInt(value) : 10;


            int fromIndex = stepExecution.getReadCount();
            int toIndex = fromIndex + chunkSize;

            if(fromIndex >= items.size()) {
                return RepeatStatus.FINISHED;
            }

            List<String> subList = items.subList(fromIndex, toIndex);// 0번부터 9번까지

            log.info("task item size :{}", subList.size());

            stepExecution.setReadCount(toIndex);

            return RepeatStatus.CONTINUABLE;
        });
    }

    // 기존 tasklet()
//    public Tasklet tasklet() {
//        return ((contribution, chunkContext) -> {
//            List<String> items = getItems();
//            log.info("task item size : {}", items.size());
//
//            return RepeatStatus.FINISHED;
//        });
//    }
    
    public List<String> getItems() {
        List<String> items = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            items.add(i + " Hello");
        }

        return items;
    }
}

 

정리된 자료는 아래 git을 참고 하시면 됩니다.

 

https://github.com/prodo-developer/spring-batch-example