본문으로 바로가기

스프링 배치 실습을 하는 과정에서 무중단배포가 가능한 스케줄링 기능이 필요했다.

현업에서도 이미 적용중이고, 스프링 배치 내에서 커버할 수 있는것도 한계점이 있다.

 

예를들어 Job 실행 중에 오류가 발생할 경우 예외처리로 FaultTolerant(내결함성)를 통해 pass를 시킬수 있지만

이것은 어디까지 임시방편으로 처리하는것으로 밖에 안된다.

 

1. ItemReader, ItemProcess, ItemWriter 에서 설정된 Exception이 발생했을 경우 (Skip)

2. ItemProcess, ItemWriter 에서 설정된 Exception이 발생했을 경우 (Retry)

 

따라서 job이 실행될 때 여부와 상관없이 가장 먼저 스케줄링을 통해 제어하는 방식이 더 효율적이라 볼 수 있다.

 

쿼츠에 대해 알아보자.

 

쿼츠(Quartz)

오픈소스 실시간 스케줄링, 서버 간 클러스터 기능

자바 환경의 규모와 상관없이 사용이 가능하고 Job 실행에 유용한 Spring boot 지원과 같이 오래전부터 스프링 연동을 지원하고있다.

 

쿼츠는 아래 3가지 컴포넌트를 제공한다.

 

1. 스케줄러(Scheduler)

스케줄러는 SchedulerFactory 를 통해서 가져올 수 있으며 JobDetails 및 트리거의 저장소 기능을 한다. 

 

예시코드

public JobDetail buildJobDetail(Class job, String name, String group, Map params) {
    JobDataMap jobDataMap = new JobDataMap();
    jobDataMap.putAll(params);

    return newJob(job).withIdentity(name, group)
            .usingJobData(jobDataMap)
            .build();
}

 

2. 잡(Job)

 

3. 트리거(Trigger)

작업 실행 시점을 정의한다.

트리거가 작동되어 쿼츠에게 잡을 실행하도록 지시하면, 잡의 개별 실행을 정의하는 JobDetails 객체가 생성된다.

 

예시코드

public Trigger buildJobTrigger(String scheduleExp) {
    return TriggerBuilder.newTrigger()
            .withSchedule(CronScheduleBuilder.cronSchedule(scheduleExp)).build();
}

 

스프링 배치 작업

 

* 해당 실습은 스케줄러 시간에 따라 파일을 읽어들이고, api 통신하는 시간을 각각 나누어 진행한다.

 

- 파일 읽어들이기 (FileSchJob)

package com.example.springbatch.project.scheduler;

import lombok.SneakyThrows;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.batch.core.*;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

// QuartzJobBean 을 사용하여 스프링 배치 잡을 기동하는 쿼츠 잡 작성

@Component
public class FileSchJob extends QuartzJobBean {

    @Autowired
    private Job fileJob;

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private JobExplorer jobExplorer; // jobRepository의 read 기능만 가지고 있음

    /**
     * 배치를 실행시키는 구문 : 스케줄링된 이벤트가 발생할때마다 한번씩 호출된다.
     * @param context
     * @throws JobExecutionException
     */
    @SneakyThrows
    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {

        String requestDate = (String) context.getJobDetail().getJobDataMap().get("requestDate");

        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("id", new Date().getTime())
                .addString("requestDate", requestDate)
                .toJobParameters();

        // 모든 job의 인스턴스 갯수를 가져올수 있음.
        int jobInstanceCount = jobExplorer.getJobInstanceCount(fileJob.getName());

        // 모든 job의 인스턴스 정보를 가져올수 있음. 0번째 부터 카운트 갯수까지
        List<JobInstance> jobInstances = jobExplorer.getJobInstances(fileJob.getName(), 0, jobInstanceCount);

        if(jobInstances.size() > 0) {
            for (JobInstance jobInstance : jobInstances) {
                // 여러개의 jobExecution을 가져온다.
                List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
                List<JobExecution> jobExecutionList = jobExecutions.stream()
                        .filter(jobExecution -> jobExecution.getJobParameters().getString("requestDate").equals(requestDate))
                        .collect(Collectors.toList());

                // 해당하는 날짜가 1개 이상인경우 배치를 실행하지 않는다.
                if(jobExecutionList.size() > 0) {
                    throw new JobExecutionException(requestDate + " already exists");
                }
            }

        }

        jobLauncher.run(fileJob, jobParameters);
    }

}

 

- api 통신 (ApiSchJob)

package com.example.springbatch.project.scheduler;

import lombok.SneakyThrows;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class ApiSchJob extends QuartzJobBean {

    @Autowired
    private Job apiJob;

    @Autowired
    private JobLauncher jobLauncher;

    /**
     * 배치를 실행시키는 구문 : 스케줄링된 이벤트가 발생할때마다 한번씩 호출된다.
     * @param context
     * @throws JobExecutionException
     */
    @SneakyThrows
    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {

        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("id", new Date().getTime())
                .toJobParameters();

        jobLauncher.run(apiJob, jobParameters);
    }
}

 

* api 스케줄러 작성

package com.example.springbatch.project.scheduler;

import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

import static org.quartz.JobBuilder.newJob;

/**
 * 스케줄러 설정
 */
@Component
public class ApiJobRunner extends JobRunner {

    @Autowired
    private Scheduler scheduler;

    @Override
    protected void doRun(ApplicationArguments args) {

        JobDetail jobDetail = buildJobDetail(ApiSchJob.class, "apiJob", "batch", new HashMap());
        Trigger trigger = buildJobTrigger("0/10 * * * * ?"); // 10초마다 실행

        try{
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

}

* file 스케줄러 작성

package com.example.springbatch.project.scheduler;

import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.stereotype.Component;

import java.util.HashMap;

/**
 * 스케줄러 설정
 */
@Component
public class FileJobRunner extends JobRunner {

    @Autowired
    private Scheduler scheduler;

    @Override
    protected void doRun(ApplicationArguments args) {

        String[] sourceArgs = args.getSourceArgs();

        JobDetail jobDetail = buildJobDetail(FileSchJob.class, "fileJob", "batch", new HashMap());
        Trigger trigger = buildJobTrigger("0/30 * * * * ?"); // 30초마다 실행
        jobDetail.getJobDataMap().put("requestDate", sourceArgs[0]);

        try{
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

}

 

* 공통 스케줄러를 통해 상속받아 사용한다.

package com.example.springbatch.project.scheduler;

import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

import static org.quartz.JobBuilder.newJob;

/**
 * 공통 스케줄러 설정
 */
public abstract class JobRunner implements ApplicationRunner {

    @Override
    public void run(ApplicationArguments args) throws Exception {

        doRun(args);
    }

    public Trigger buildJobTrigger(String scheduleExp) {
        return TriggerBuilder.newTrigger()
                .withSchedule(CronScheduleBuilder.cronSchedule(scheduleExp)).build();
    }

    public JobDetail buildJobDetail(Class job, String name, String group, Map params) {
        JobDataMap jobDataMap = new JobDataMap();
        jobDataMap.putAll(params);

        return newJob(job).withIdentity(name, group)
                .usingJobData(jobDataMap)
                .build();
    }

    protected abstract void doRun(ApplicationArguments args);

}

 

 

 

왼쪽사진 file 스케줄러를 통해 파일이 생성되는 결과 

가운데사진 db적재된 결과

오른쪽사진 api통신을 통해 적재된 결과

 

해당 프로젝트는 인프런의 있는 스프링배치 실습을 통해 진행되었다.

 

해당 결과물은 git을 통해 확인 가능합니다.

https://github.com/prodo-developer/spring-batch