이전 실습에서 진행한 배치 속도를 다양한 방법으로 개선해보고 성능측정을 하려고 합니다.
배치 측정 요구 사항은 아래와 같습니다.
● SaveUserTasklet에서 User 30,000건 저장, Chunk Size는 1,000
● 성능 개선 대상 Step은 userLevelUpStep
● 아래 표 순서대로 실행
- 예제를 만들고 성능 측정 후 비교
- 3번 씩 실행
- PC 환경에 따라 성능이 다를 수 있음 (해당 개인PC는 강의에 나온 PC보다 느렸음 👎)
1회 | 2회 | 3회 | |
Simple Step | 15557mills | 15081mills | 14628mills |
Async Step | 15074mills | 15511mills | 14940mills |
Multi-Thread Step(가장빠름) | 9038mills | 8943mills | 9909mills |
Partition Step | 9659mills | 10890mills | 10629mills |
Async + Partition Step | 10795mills | 10919mills | 11186mills |
Parallel Step | 14236mills | 14433mills | 14377mills |
Partition + Parallel Step | 10512mills | 10294mills | 10252mills |
* 4차부터는 큰 속도의 변화를 찾아볼 수 없었다. 15403mills, 15024mils, 이후에도... 그래서 3번씩만 진행하기로 했다.
Async Step을 적용하기 위해서는 Async 관련된 라이브러리를 추가해야 합니다.
1. java.util.concurrent에서 제공되는 Future 기반 asynchronous Async를 사용하기 위해 spring-batch-integration 필요.
2. build.gradle 파일에서 implementation 'org.springframework.batch:spring-batch-integration' 추가
3. ItemProcessor와 ItemWriter를 Async로 실행하도록 수정합니다.
4. Async Step은 ItemProcessor와 ItemWriter 기준으로 비동기 처리 합니다.

AsyncStep 기능 작업
@Configuration
@Slf4j
public class AsyncUserConfiguration {
private final String JOB_NAME = "asyncUserJob";
public static final int CHUNK_SIZE = 1000;
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final UserRepository userRepository;
private final EntityManagerFactory entityManagerFactory;
private final DataSource dataSource;
private final TaskExecutor taskExecutor;
public AsyncUserConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, UserRepository userRepository, EntityManagerFactory entityManagerFactory, DataSource dataSource, TaskExecutor taskExecutor) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.userRepository = userRepository;
this.entityManagerFactory = entityManagerFactory;
this.dataSource = dataSource;
this.taskExecutor = taskExecutor;
}
@Bean(JOB_NAME)
public Job userJob() throws Exception {
return this.jobBuilderFactory.get(JOB_NAME)
.incrementer(new RunIdIncrementer())
.start(this.saveUserStep())
.next(this.userLevelUpStep())
.listener(new LevelUpJobExecutionListener(userRepository))
.next(new JobParameterDecide("date")) // 가져온값이 아래의 CONTINUE인지 체크
.on(JobParameterDecide.CONTINUE.getName()) // CONTINUE이면 아래의 to메서드 실행
.to(this.orderStatisticsStep(null))
.build()
.build();
}
@Bean(JOB_NAME + "_orderStatisticsStep")
@JobScope
public Step orderStatisticsStep(@Value("#{jobParameters[date]}") String date) throws Exception {
return this.stepBuilderFactory.get(JOB_NAME + "_orderStatisticsStep")
.<OrderStatistics, OrderStatistics>chunk(CHUNK_SIZE)
.reader(orderStatisticsItemReader(date))
.writer(orderStatisticsItemWriter(date))
.build();
}
private ItemReader<? extends OrderStatistics> orderStatisticsItemReader(String date) throws Exception {
YearMonth yearMonth = YearMonth.parse(date);
Map<String, Object> parameters = new HashMap<>();
parameters.put("startDate", yearMonth.atDay(1)); // 2021년 8월 1일
parameters.put("endDate", yearMonth.atEndOfMonth()); // 8월의 마지막 날
Map<String, Order> sortKey = new HashMap<>();
sortKey.put("created_date", Order.ASCENDING);
JdbcPagingItemReader<OrderStatistics> itemReader = new JdbcPagingItemReaderBuilder<OrderStatistics>()
.dataSource(this.dataSource)
.rowMapper((resultSet, i) -> OrderStatistics.builder()
.amount(resultSet.getString(1))
.date(LocalDate.parse(resultSet.getString(2), DateTimeFormatter.ISO_DATE))
.build())
.pageSize(CHUNK_SIZE) // 페이징 설정
.name(JOB_NAME + "_orderStatisticsItemReader")
.selectClause("sum(amount), created_date")
.fromClause("orders")
.whereClause("created_date >= :startDate and created_date <= :endDate")
.groupClause("created_date")
.parameterValues(parameters)
.sortKeys(sortKey)
.build();
itemReader.afterPropertiesSet();
return itemReader;
}
private ItemWriter<? super OrderStatistics> orderStatisticsItemWriter(String date) throws Exception {
YearMonth yearMonth = YearMonth.parse(date);
String fileName = yearMonth.getYear() + "년_" + yearMonth.getMonthValue() + "월_일별_주문_금액.csv";
BeanWrapperFieldExtractor<OrderStatistics> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[]{"amount", "date"});
DelimitedLineAggregator<OrderStatistics> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
lineAggregator.setFieldExtractor(fieldExtractor);
FlatFileItemWriter<OrderStatistics> itemWriter = new FlatFileItemWriterBuilder<OrderStatistics>()
.resource(new FileSystemResource("output/" + fileName))
.lineAggregator(lineAggregator)
.name(JOB_NAME + "_orderStatisticsItemWriter")
.encoding("UTF-8")
.headerCallback(writer -> writer.write("total_amount,date"))
.build();
itemWriter.afterPropertiesSet();
return itemWriter;
}
@Bean(JOB_NAME + "_saveUserStep")
public Step saveUserStep() {
return this.stepBuilderFactory.get(JOB_NAME + "_saveUserStep")
.tasklet(new SaveUserTasklet(userRepository))
.build();
}
@Bean(JOB_NAME + "_userLevelUpStep")
public Step userLevelUpStep() throws Exception {
return this.stepBuilderFactory.get(JOB_NAME + "_userLevelUpStep")
.<User, Future<User>>chunk(CHUNK_SIZE) // Future로 감싸야 컴파일 해결 됨.
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
private AsyncItemWriter<User> itemWriter() {
ItemWriter<User> itemWriter = users -> users.forEach(x -> {
x.levelUp();
userRepository.save(x);
});
AsyncItemWriter<User> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(itemWriter);
return asyncItemWriter;
}
private AsyncItemProcessor<User, User> itemProcessor() {
ItemProcessor<User, User> itemProcessor = user -> {
if (user.availableLeveUp()) {
return user;
}
return null;
};
AsyncItemProcessor<User, User> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(itemProcessor);
asyncItemProcessor.setTaskExecutor(this.taskExecutor);
return asyncItemProcessor;
}
private ItemReader<? extends User> itemReader() throws Exception {
JpaPagingItemReader<User> itemReader = new JpaPagingItemReaderBuilder<User>()
.queryString("select u from User u")
.entityManagerFactory(entityManagerFactory)
.pageSize(CHUNK_SIZE)
.name(JOB_NAME + "_userItemReader")
.build();
itemReader.afterPropertiesSet();
return itemReader;
}
}
※ Async Step은 Simple Step보다 속도가 조금 더 개선되는 줄 알고 기대했다가 큰 차이가 없었다.. ㅠㅠ
다음은 가장 빨랐던 Multi-Thread Step를 적용해보자.
● Multi-Thread Step은 Chunk 단위로 멀티 스레딩 처리
● Thread-Safe 한 ItemReader 필수

SpringBatchExampleApplication.java (메인메서드 자동 종료 및 TaskExecutor기반 쓰레드 설정)
@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchExampleApplication {
public static void main(String[] args) {
// 스프링 배치가 정상적으로 종료될 수 있도록 System.exit(SpringApplication.exit();
System.exit(SpringApplication.exit(SpringApplication.run(SpringBatchExampleApplication.class, args)));
}
/**
* // TaskExecutor가 기본적으로 Bean으로 생성되어 있기 때문에 기본 @Bean으로 사용하기 위함을 표시
*/
@Bean
@Primary
TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10); // 쓰레드 기본 사이즈 10
taskExecutor.setMaxPoolSize(20); // 최대 쓰레드 사이즈 20
taskExecutor.setThreadNamePrefix("batch-thread-"); // 배치 쓰레드 Prefix 이름이 찍히게 됨.
taskExecutor.initialize();
return taskExecutor;
}
}
User.java 수정 (FetchType.EAGER 변경)

Multi-Thread Step 기능 작업
@Configuration
@Slf4j
public class MultiThreadUserConfiguration {
private final String JOB_NAME = "multiThreadUserJob";
public static final int CHUNK_SIZE = 1000;
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final UserRepository userRepository;
private final EntityManagerFactory entityManagerFactory;
private final DataSource dataSource;
private final TaskExecutor taskExecutor;
public MultiThreadUserConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, UserRepository userRepository, EntityManagerFactory entityManagerFactory, DataSource dataSource, TaskExecutor taskExecutor) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.userRepository = userRepository;
this.entityManagerFactory = entityManagerFactory;
this.dataSource = dataSource;
this.taskExecutor = taskExecutor;
}
@Bean(JOB_NAME)
public Job userJob() throws Exception {
return this.jobBuilderFactory.get(JOB_NAME)
.incrementer(new RunIdIncrementer())
.start(this.saveUserStep())
.next(this.userLevelUpStep())
.listener(new LevelUpJobExecutionListener(userRepository))
.next(new JobParameterDecide("date")) // 가져온값이 아래의 CONTINUE인지 체크
.on(JobParameterDecide.CONTINUE.getName()) // CONTINUE이면 아래의 to메서드 실행
.to(this.orderStatisticsStep(null))
.build()
.build();
}
@Bean(JOB_NAME + "_orderStatisticsStep")
@JobScope
public Step orderStatisticsStep(@Value("#{jobParameters[date]}") String date) throws Exception {
return this.stepBuilderFactory.get(JOB_NAME + "_orderStatisticsStep")
.<OrderStatistics, OrderStatistics>chunk(CHUNK_SIZE)
.reader(orderStatisticsItemReader(date))
.writer(orderStatisticsItemWriter(date))
.build();
}
private ItemReader<? extends OrderStatistics> orderStatisticsItemReader(String date) throws Exception {
YearMonth yearMonth = YearMonth.parse(date);
Map<String, Object> parameters = new HashMap<>();
parameters.put("startDate", yearMonth.atDay(1)); // 2021년 8월 1일
parameters.put("endDate", yearMonth.atEndOfMonth()); // 8월의 마지막 날
Map<String, Order> sortKey = new HashMap<>();
sortKey.put("created_date", Order.ASCENDING);
JdbcPagingItemReader<OrderStatistics> itemReader = new JdbcPagingItemReaderBuilder<OrderStatistics>()
.dataSource(this.dataSource)
.rowMapper((resultSet, i) -> OrderStatistics.builder()
.amount(resultSet.getString(1))
.date(LocalDate.parse(resultSet.getString(2), DateTimeFormatter.ISO_DATE))
.build())
.pageSize(CHUNK_SIZE) // 페이징 설정
.name(JOB_NAME + "_orderStatisticsItemReader")
.selectClause("sum(amount), created_date")
.fromClause("orders")
.whereClause("created_date >= :startDate and created_date <= :endDate")
.groupClause("created_date")
.parameterValues(parameters)
.sortKeys(sortKey)
.build();
itemReader.afterPropertiesSet();
return itemReader;
}
private ItemWriter<? super OrderStatistics> orderStatisticsItemWriter(String date) throws Exception {
YearMonth yearMonth = YearMonth.parse(date);
String fileName = yearMonth.getYear() + "년_" + yearMonth.getMonthValue() + "월_일별_주문_금액.csv";
BeanWrapperFieldExtractor<OrderStatistics> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[]{"amount", "date"});
DelimitedLineAggregator<OrderStatistics> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
lineAggregator.setFieldExtractor(fieldExtractor);
FlatFileItemWriter<OrderStatistics> itemWriter = new FlatFileItemWriterBuilder<OrderStatistics>()
.resource(new FileSystemResource("output/" + fileName))
.lineAggregator(lineAggregator)
.name(JOB_NAME + "_orderStatisticsItemWriter")
.encoding("UTF-8")
.headerCallback(writer -> writer.write("total_amount,date"))
.build();
itemWriter.afterPropertiesSet();
return itemWriter;
}
@Bean(JOB_NAME + "_saveUserStep")
public Step saveUserStep() {
return this.stepBuilderFactory.get(JOB_NAME + "_saveUserStep")
.tasklet(new SaveUserTasklet(userRepository))
.build();
}
@Bean(JOB_NAME + "_userLevelUpStep")
public Step userLevelUpStep() throws Exception {
return this.stepBuilderFactory.get(JOB_NAME + "_userLevelUpStep")
.<User, User>chunk(CHUNK_SIZE)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.taskExecutor(this.taskExecutor) // 생성자로 주입받은 taskExecutor 사용
.throttleLimit(8) // 몇개의 쓰레드로 처리할것인지 (기본 4개)
.build();
}
private ItemWriter<? super User> itemWriter() {
return users -> users.forEach(x -> {
x.levelUp();
userRepository.save(x);
});
}
private ItemProcessor<? super User, ? extends User> itemProcessor() {
return user -> {
if (user.availableLeveUp()) {
return user;
}
return null;
};
}
private ItemReader<? extends User> itemReader() throws Exception {
JpaPagingItemReader<User> itemReader = new JpaPagingItemReaderBuilder<User>()
.queryString("select u from User u")
.entityManagerFactory(entityManagerFactory)
.pageSize(CHUNK_SIZE)
.name(JOB_NAME + "_userItemReader")
.build();
itemReader.afterPropertiesSet();
return itemReader;
}
}
Partition Step은 어떨까?
● 하나의 Master 기준으로 여러 Slave Step을 생성해 Step 기준으로 Multi-Thread 처리
● 예를 들어
- item이 40,000개, Slave Step이 8개면
- 40000 / 8 = 5000 이므로 하나의 Slave Step 당 5,000건 씩 나눠서 처리
● Slave Step은 각각 하나의 Step으로 동작

Partitioner 기능 추가
@Configuration
@Slf4j
public class PartitionUserConfiguration {
private final String JOB_NAME = "partitionUserJob";
public static final int CHUNK_SIZE = 1000;
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final UserRepository userRepository;
private final EntityManagerFactory entityManagerFactory;
private final DataSource dataSource;
private final TaskExecutor taskExecutor;
public PartitionUserConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, UserRepository userRepository, EntityManagerFactory entityManagerFactory, DataSource dataSource, TaskExecutor taskExecutor) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.userRepository = userRepository;
this.entityManagerFactory = entityManagerFactory;
this.dataSource = dataSource;
this.taskExecutor = taskExecutor;
}
@Bean(JOB_NAME)
public Job userJob() throws Exception {
return this.jobBuilderFactory.get(JOB_NAME)
.incrementer(new RunIdIncrementer())
.start(this.saveUserStep())
.next(this.userLevelUpManagerStep())
.listener(new LevelUpJobExecutionListener(userRepository))
.next(new JobParameterDecide("date")) // 가져온값이 아래의 CONTINUE인지 체크
.on(JobParameterDecide.CONTINUE.getName()) // CONTINUE이면 아래의 to메서드 실행
.to(this.orderStatisticsStep(null))
.build()
.build();
}
@Bean(JOB_NAME + "_orderStatisticsStep")
@JobScope
public Step orderStatisticsStep(@Value("#{jobParameters[date]}") String date) throws Exception {
return this.stepBuilderFactory.get(JOB_NAME + "_orderStatisticsStep")
.<OrderStatistics, OrderStatistics>chunk(CHUNK_SIZE)
.reader(orderStatisticsItemReader(date))
.writer(orderStatisticsItemWriter(date))
.build();
}
private ItemReader<? extends OrderStatistics> orderStatisticsItemReader(String date) throws Exception {
YearMonth yearMonth = YearMonth.parse(date);
Map<String, Object> parameters = new HashMap<>();
parameters.put("startDate", yearMonth.atDay(1)); // 2021년 8월 1일
parameters.put("endDate", yearMonth.atEndOfMonth()); // 8월의 마지막 날
Map<String, Order> sortKey = new HashMap<>();
sortKey.put("created_date", Order.ASCENDING);
JdbcPagingItemReader<OrderStatistics> itemReader = new JdbcPagingItemReaderBuilder<OrderStatistics>()
.dataSource(this.dataSource)
.rowMapper((resultSet, i) -> OrderStatistics.builder()
.amount(resultSet.getString(1))
.date(LocalDate.parse(resultSet.getString(2), DateTimeFormatter.ISO_DATE))
.build())
.pageSize(CHUNK_SIZE) // 페이징 설정
.name(JOB_NAME + "_orderStatisticsItemReader")
.selectClause("sum(amount), created_date")
.fromClause("orders")
.whereClause("created_date >= :startDate and created_date <= :endDate")
.groupClause("created_date")
.parameterValues(parameters)
.sortKeys(sortKey)
.build();
itemReader.afterPropertiesSet();
return itemReader;
}
private ItemWriter<? super OrderStatistics> orderStatisticsItemWriter(String date) throws Exception {
YearMonth yearMonth = YearMonth.parse(date);
String fileName = yearMonth.getYear() + "년_" + yearMonth.getMonthValue() + "월_일별_주문_금액.csv";
BeanWrapperFieldExtractor<OrderStatistics> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[]{"amount", "date"});
DelimitedLineAggregator<OrderStatistics> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
lineAggregator.setFieldExtractor(fieldExtractor);
FlatFileItemWriter<OrderStatistics> itemWriter = new FlatFileItemWriterBuilder<OrderStatistics>()
.resource(new FileSystemResource("output/" + fileName))
.lineAggregator(lineAggregator)
.name(JOB_NAME + "_orderStatisticsItemWriter")
.encoding("UTF-8")
.headerCallback(writer -> writer.write("total_amount,date"))
.build();
itemWriter.afterPropertiesSet();
return itemWriter;
}
@Bean(JOB_NAME + "_saveUserStep")
public Step saveUserStep() {
return this.stepBuilderFactory.get(JOB_NAME + "_saveUserStep")
.tasklet(new SaveUserTasklet(userRepository))
.build();
}
@Bean(JOB_NAME + "_userLevelUpStep")
public Step userLevelUpStep() throws Exception {
return this.stepBuilderFactory.get(JOB_NAME + "_userLevelUpStep")
.<User, User>chunk(CHUNK_SIZE)
.reader(itemReader(null, null))
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
@Bean(JOB_NAME + "_userLevelUpStep.manager")
public Step userLevelUpManagerStep() throws Exception {
return this.stepBuilderFactory.get(JOB_NAME + "_userLevelUpStep.manager")
.partitioner(JOB_NAME + "_userLevelUpStep", new UserLevelUpPartitioner(userRepository))
.step(userLevelUpStep())
.partitionHandler(taskExecutorPartitionHandler())
.build();
}
@Bean(JOB_NAME + "taskExecutorPartitionHandler")
PartitionHandler taskExecutorPartitionHandler() throws Exception {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setStep(userLevelUpStep());
handler.setTaskExecutor(this.taskExecutor);
handler.setGridSize(8);
return handler;
}
private ItemWriter<? super User> itemWriter() {
return users -> users.forEach(x -> {
x.levelUp();
userRepository.save(x);
});
}
private ItemProcessor<? super User, ? extends User> itemProcessor() {
return user -> {
if (user.availableLeveUp()) {
return user;
}
return null;
};
}
@Bean
@StepScope //StepScope가 proxy로 설정되어있기 때문에 어떤 클래스를 리턴해줘야되는지 명확해야 한다.
JpaPagingItemReader<? extends User> itemReader(@Value("#{stepExecutionContext[minId]}") Long minId,
@Value("#{stepExecutionContext[maxId]}") Long maxId) throws Exception {
Map<String, Object> parameters = new HashMap<>();
parameters.put("minId", minId);
parameters.put("maxId", maxId);
JpaPagingItemReader<User> itemReader = new JpaPagingItemReaderBuilder<User>()
.queryString("select u from User u where u.id between :minId and :maxId")
.parameterValues(parameters)
.entityManagerFactory(entityManagerFactory)
.pageSize(CHUNK_SIZE)
.name(JOB_NAME + "_userItemReader")
.build();
itemReader.afterPropertiesSet();
return itemReader;
}
}
작은 값과 큰 값을 구하기 위해 Partitioner를 활용한 UserLevelUpPartitioner를 생성합니다.
public class UserLevelUpPartitioner implements Partitioner {
private final UserRepository userRepository;
public UserLevelUpPartitioner(UserRepository userRepository) {
this.userRepository = userRepository;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
long minId = userRepository.findMinId(); /* 가장 작은 Id값 (1번) */
long maxId = userRepository.findMaxId(); /* 가장 큰 Id값 (40,000번) */
/* 예시 : (40000-1) / 8 + 1 = 5000 */
long targetSize = (maxId - minId) / gridSize + 1;
/**
* 값이 ExecutionContext에 저장
* partion0 : 1, 5,000
* partion1 : 5001, 10,000
* ...
* partion7 : 35001, 40,000
*/
Map<String, ExecutionContext> result = new HashMap<>();
long number = 0;
long start = minId;
long end = start + targetSize -1;
while(start <= maxId) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= maxId) {
end = maxId;
}
value.putLong("minId", start);
value.putLong("maxId", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
}
}

Async Step 보다는 빠르지만 Multi-Thread Step 보다 느렸다.
그렇다면 비동기방식과 콜라보로 한 Async + Partition Step는 어떨까?
기존 Partition Step 에서 부분 수정한다.

Async + Partition Step이 Partition Step 보다 느린 성능이 나왔다... 🙄
이번엔 ParallelStep로 성능 측정을 알아보자.


@Configuration
@Slf4j
public class ParallelUserConfiguration {
private final String JOB_NAME = "parallelUserJob";
public static final int CHUNK_SIZE = 1000;
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final UserRepository userRepository;
private final EntityManagerFactory entityManagerFactory;
private final DataSource dataSource;
private final TaskExecutor taskExecutor;
public ParallelUserConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, UserRepository userRepository, EntityManagerFactory entityManagerFactory, DataSource dataSource, TaskExecutor taskExecutor) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.userRepository = userRepository;
this.entityManagerFactory = entityManagerFactory;
this.dataSource = dataSource;
this.taskExecutor = taskExecutor;
}
@Bean(JOB_NAME)
public Job userJob() throws Exception {
return this.jobBuilderFactory.get(JOB_NAME)
.incrementer(new RunIdIncrementer())
.listener(new LevelUpJobExecutionListener(userRepository))
.start(this.saveUserFlow())
.next(this.splitFlow(null))
.build()
.build();
}
@Bean(JOB_NAME + "_saveUserFlow")
public Flow saveUserFlow() {
TaskletStep saveUserStep = this.stepBuilderFactory.get(JOB_NAME + "_saveUserStep")
.tasklet(new SaveUserTasklet(userRepository))
.build();
return new FlowBuilder<SimpleFlow>(JOB_NAME + "_saveUserSlow")
.start(saveUserStep)
.build();
}
@Bean(JOB_NAME + "_splitFlow")
@JobScope
public Flow splitFlow(@Value("#{jobParameters[date]}") String date) throws Exception {
Flow userLevelUpFlow = new FlowBuilder<SimpleFlow>(JOB_NAME + "_userLevelUpFlow")
.start(userLevelUpStep())
.build();
// orderStatisticsFlow을 사용함으로써 아래의 Bean 객체에 등록 할 필요가 없다.
// step 2개를 하나로 만들기 위해 splitFlow가 userLevelUpFlow와 orderStatisticsFlow를 병렬로 처리함.
return new FlowBuilder<SimpleFlow>(JOB_NAME + "_splitFlow")
.split(this.taskExecutor)
.add(userLevelUpFlow, orderStatisticsFlow(date))
.build();
}
/**
* @param date
* @return
*
* userJob에 있는 step을 분리하여 Flow에 구현한다.
*/
private Flow orderStatisticsFlow(String date) throws Exception {
return new FlowBuilder<SimpleFlow>(JOB_NAME + "_orderStatisticsFlow")
.start(new JobParameterDecide("date")) // 가져온값이 아래의 CONTINUE인지 체크
.on(JobParameterDecide.CONTINUE.getName()) // CONTINUE이면 아래의 to메서드 실행
.to(this.orderStatisticsStep(date))
.build();
}
public Step orderStatisticsStep(@Value("#{jobParameters[date]}") String date) throws Exception {
return this.stepBuilderFactory.get(JOB_NAME + "_orderStatisticsStep")
.<OrderStatistics, OrderStatistics>chunk(CHUNK_SIZE)
.reader(orderStatisticsItemReader(date))
.writer(orderStatisticsItemWriter(date))
.build();
}
private ItemReader<? extends OrderStatistics> orderStatisticsItemReader(String date) throws Exception {
YearMonth yearMonth = YearMonth.parse(date);
Map<String, Object> parameters = new HashMap<>();
parameters.put("startDate", yearMonth.atDay(1)); // 2021년 8월 1일
parameters.put("endDate", yearMonth.atEndOfMonth()); // 8월의 마지막 날
Map<String, Order> sortKey = new HashMap<>();
sortKey.put("created_date", Order.ASCENDING);
JdbcPagingItemReader<OrderStatistics> itemReader = new JdbcPagingItemReaderBuilder<OrderStatistics>()
.dataSource(this.dataSource)
.rowMapper((resultSet, i) -> OrderStatistics.builder()
.amount(resultSet.getString(1))
.date(LocalDate.parse(resultSet.getString(2), DateTimeFormatter.ISO_DATE))
.build())
.pageSize(CHUNK_SIZE) // 페이징 설정
.name(JOB_NAME + "_orderStatisticsItemReader")
.selectClause("sum(amount), created_date")
.fromClause("orders")
.whereClause("created_date >= :startDate and created_date <= :endDate")
.groupClause("created_date")
.parameterValues(parameters)
.sortKeys(sortKey)
.build();
itemReader.afterPropertiesSet();
return itemReader;
}
private ItemWriter<? super OrderStatistics> orderStatisticsItemWriter(String date) throws Exception {
YearMonth yearMonth = YearMonth.parse(date);
String fileName = yearMonth.getYear() + "년_" + yearMonth.getMonthValue() + "월_일별_주문_금액.csv";
BeanWrapperFieldExtractor<OrderStatistics> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[]{"amount", "date"});
DelimitedLineAggregator<OrderStatistics> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
lineAggregator.setFieldExtractor(fieldExtractor);
FlatFileItemWriter<OrderStatistics> itemWriter = new FlatFileItemWriterBuilder<OrderStatistics>()
.resource(new FileSystemResource("output/" + fileName))
.lineAggregator(lineAggregator)
.name(JOB_NAME + "_orderStatisticsItemWriter")
.encoding("UTF-8")
.headerCallback(writer -> writer.write("total_amount,date"))
.build();
itemWriter.afterPropertiesSet();
return itemWriter;
}
@Bean(JOB_NAME + "_userLevelUpStep")
public Step userLevelUpStep() throws Exception {
return this.stepBuilderFactory.get(JOB_NAME + "_userLevelUpStep")
.<User, User>chunk(CHUNK_SIZE)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
private ItemWriter<? super User> itemWriter() {
return users -> users.forEach(x -> {
x.levelUp();
userRepository.save(x);
});
}
private ItemProcessor<? super User, ? extends User> itemProcessor() {
return user -> {
if (user.availableLeveUp()) {
return user;
}
return null;
};
}
private ItemReader<? extends User> itemReader() throws Exception {
JpaPagingItemReader<User> itemReader = new JpaPagingItemReaderBuilder<User>()
.queryString("select u from User u")
.entityManagerFactory(entityManagerFactory)
.pageSize(CHUNK_SIZE)
.name(JOB_NAME + "_userItemReader")
.build();
itemReader.afterPropertiesSet();
return itemReader;
}
}
Parallel Step은 오히려 현저하게 느려짐을 발견했다.
끝으로 Partition + Parallel Step 테스트를 진행해 본다.
Parallel Step 에서 부분 수정 (비동기 개선, JpaPagingItemReader 리턴)

위와같이 진행하지않고 ItemReader로만 선언하게되면 아래와 같은 에러를 발견할 수 있다.

ItemReader -> JpaPagingItemReader로 리턴하면 문제 되지 않는다.
Parallel Step보다 Partition + Parallel Step가 훨씬 속도가 개선되었다. 전체 순위에서 2위.
하지만 멀티쓰레드 스텝을 쫓아갈 순 없었다.
* PC마다, 요구사항 환경마다 속도는 각각 다를 수 있다.
여러가지 경험을 해봐야 다양한 케이스로 생각을 해볼 수 있어서 좋은 경험이었다.