-
spring Batch 5.x (2) - 실습CS/Spring 2024. 9. 23. 00:02
저번 블로그에서는 스프링 배치의 기본 개념과 용어를 알아봤다.
이번에는 실제로 스프링 배치 환경 구축을 해서 실행시켜볼 것이다.
환경 구축
build.gradle 작성
//build.gradle //spring batch 사용을 위한 의존성 implementation 'org.springframework.boot:spring-boot-starter-batch' //jpa 사용을 위한 의존성 implementation 'org.springframework.boot:spring-boot-starter-data-jpa' //rdbs - mariadb 사용을 위한 의존성 implementation 'org.mariadb.jdbc:mariadb-java-client'yml 파일 작성
batch:jdbc:initialize-schema: alwaysBatch Core 계층은 Spring Batch에서 배치 작업을 정의하고 관리한다.
배치 작업의 실행 상태를 추적하고, 재시작 기능을 제공하기 위해 실행 정보를 데이터베이스의 메타 테이블에 저장한다.
위의 yml 파일의 내용은 배치 코어가 배치 작업을 관리하는데 필요한 메타 테이블을 자동으로 생성되게 해주는 설정이다.
하지만,
Spring Boot 3.0.0으로 전환된 이후에 해당 설정에도 불구하고 메타 테이블이 자동으로 생기지 않는 버그가 있다고 한다.
따라서 메타 테이블을 직접 생성해줘야 한다.
External Libraries 에서 spring-batch-core 라이브러리에 있는 아래와 같은 파일을 찾는다.
(사용하는 db 파일을 찾으면 된다.)

해당 파일을 들어가보면,

이런 쿼리문들이 있을 것이다.
이 쿼리문들을 db 클라이언트에서 실행해 직접 테이블을 생성해준다.
아래 이미지는 각 테이블의 역할을 간단히 정리해둔 것으로, 참고하면 된다.

배치 구성하기
배치 작업의 시나리오는 다음과 같다.
작업(1)
1. 모든 회원의 총 결제 금액을 조회한다.
2. 조건에 따라 회원의 등급을 조정한다.
작업(2)
1. 모든 회원의 등급을 조회한다.
2. 등급에 따라 다른 개수의 쿠폰을 발급해준다.
작업(1),(2)를 각각의 step으로 만들 예정인데,
우선 단일 step으로 이루어진 job을 만든 후에, step을 추가해 다중 step으로 이루어진 Job을 만들 것이다.
배치를 구성하기 위한 순서는 다음과 같다 .
Configuration 만들기 -> ItemReader, ItemProcessor, ItemWriter 만들기 -> Step 만들기 -> Job만들기 -> 배치 실행
1. Configuration 만들기
package com.example.springbatchexample.config; import com.example.springbatchexample.entity.Member; import java.util.List; import lombok.RequiredArgsConstructor; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.repository.JobRepository; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; @Configuration @EnableBatchProcessing @RequiredArgsConstructor public class MemberBatchConfig { private final JobRepository jobRepository; private final PlatformTransactionManager transactionManager; //Job 생성할 자리 //Step 생성할 자리 }- Configuration 클래스를 만들고 configuration 빈등록을 위해 @Configuration을 달아준다.
- @EnableBatchProcessing 은 Spring Batch의 기본 설정을 활성화 및 Bean들을 자동으로 구성해주는 어노테이션이다. 즉, Job, Step, JobLauncher, JobRepository, JobBuilderFactory, StepBuilderFactory와 같은 Bean들로 인프라를 설정해준다.
2. ItemReader, ItemProcessor, ItemWriter 만들기
ItemReader
//======ItemReader======// @Component @RequiredArgsConstructor public class MemberReader extends JpaPagingItemReader<Member> { //ItemReade의 구현체를 를 상속받아 사용 private final EntityManagerFactory entityManagerFactory; @PostConstruct public void init() { this.setEntityManagerFactory(entityManagerFactory); this.setQueryString("SELECT m FROM Member m"); //모든 회원정보를 가져오는 쿼리문 this.setPageSize(10); //페이징 사이즈를 10으로 설정 } }ItemReader 에는 많은 구현체들이 있다. 그 중 많이 쓰이는 구현체로는 JpaPagingReader와 RepositoryItemReader 등있는데, 그 중 JpaPagingReader를 사용했다.
💡JpaPagingReader란?
JPA를 사용하여 데이터베이스에서 페이징 방식으로 데이터를 읽어오는 ItemReader입니다. 내부적으로 JPA의 EntityManager를 사용하여 데이터를 조회하고, JPA 쿼리를 통해 데이터를 가져옵니다.
JPA의 EntityManager를 직접 사용하여 페이징된 데이터를 읽어옵니다.
Spring Data JPA Repository와 직접 연결되지 않고, JPA 표준을 따르는 데이터 조회 방식입니다.
JpaPagingItemReader는 데이터베이스 커넥션을 계속 유지한 상태로 데이터를 순차적으로 가져오며, 매번 페이징 쿼리를 실행합니다.
장점:
JPA 쿼리를 사용하여 더 복잡한 쿼리 작성이 가능합니다.
엔티티를 페이징 처리하며 JPA 표준 기능을 사용할 수 있습니다.
단점:
더 많은 설정이 필요하며, JPA와 관련된 구성 요소들을 이해해야 합니다.ItemProcessor
//=====ItemProcessor=====// @Component public class MemberLankProcessor implements ItemProcessor<Member, Member> { //ItemProcessor 인터페이스를 상속받아 사용 @Override public Member process(Member member) throws Exception { int totalConsume = member.getTotalConsume(); //회원의 총 결제금액 int lank=0; //결제 금액에 따라 등급 결정 if(totalConsume < 30){ //결제 금액이 30보다 작을 경우 lank = 0; }else if(30 <= totalConsume && totalConsume <60){ //결제 금액이 60보다 작을 경우 lank = 1; }else if(60 <= totalConsume && totalConsume <90){ //결제 금액이 90보다 작을 경우 lank = 2; }else{ //결제 금액이 90 이상일 경우 lank = 3; } //등급을 업데이트 member.setLank(lank); //member 반환 return member; } }- ItemReader가 회원 정보를 읽으면, 내부적으로 회원이 하나씩 Processor에 전달된다.
- ItemProcessor의 <Member, Member>에서 첫번째 Member는 Reader로부터 전달되는 객체의 타입, 두번째 Member는 반환할 객체의 타입을 의미한다.
ItemWriter
@Component @RequiredArgsConstructor public class MemberWriter implements ItemWriter<Member> { //ItemWriter 인터페이스를 상속받아 사용 private final MemberRepository memberRepository; @Override public void write(Chunk<? extends Member> members) throws Exception { memberRepository.saveAll(members); //처리된 Member를 저장 } }3. Step만들기
위에서 만든 ItemReader, Processor, Writer를 이용해 Step을 만들면 된다.
package com.example.springbatchexample.config; import com.example.springbatchexample.entity.Member; import java.util.List; import lombok.RequiredArgsConstructor; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.repository.JobRepository; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.batch.core.Step; import org.springframework.batch.core.step.builder.StepBuilder; @Configuration @EnableBatchProcessing @RequiredArgsConstructor public class MemberBatchConfig { private final JobRepository jobRepository; private final PlatformTransactionManager transactionManager; private final MemberReader reader; private final MemberLankProcessor lankProcessor; private final MemberWriter writer; //Job 생성할 자리 //Step 생성할 자리 @Bean public Step updateMemberLankStep() { return new StepBuilder("updateMemberLankStep", jobRepository) .<Member, Member>chunk(10, transactionManager) .reader(reader) .processor(lankProcessor) .writer(writer) .build(); } }- Step은 StepBuilder를 통해 생성한다. (spring batch 5부터는 StepFactoryBuilder 사용 x)
- Chunk방식으로 Step을 만들었다.
💡Chunk?
대량의 데이터를 일정한 크기(Chunk)로 나누어 읽고, 처리하고, 쓰는 작업을 설정
이 방식은 ItemReader, ItemProcessor, ItemWriter 세 가지 주요 구성 요소를 사용함- chunk단위로 트렌젝션을 관리하기 때문에 chunk메서드의 인자로 transactionManager가 필요하다.
- chunk의 크기는 10으로 설정했다.
💡paging , chunk 차이?
만약 page 10, chunk 5일 경우,
1. 10개의 데이터를 읽어온다.(reader)
2. 5개의 데이터를 처리(processor)한 후에 저장(writer)
3. 아직 10개의 데이터중 처리 안된 데이터가 5개 남았으므로, 2번과 같은 과정 수행
4. 읽어온 10개의 데이터가 모두 처리됐으므로 다시 10개 읽어옴 (읽을 데이터가 없을 때까지 반복)
즉,
paging은 데이터를 읽는 단위(reader에서 수행)
chunk는 데이터를 처리하는 단위(processor에서 수행)3. Job 만들기
package com.example.springbatchexample.config; import com.example.springbatchexample.entity.Member; import java.util.List; import lombok.RequiredArgsConstructor; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.repository.JobRepository; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.batch.core.Step; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.core.Job; import org.springframework.batch.core.job.builder.JobBuilder; @Configuration @EnableBatchProcessing @RequiredArgsConstructor public class MemberBatchConfig { private final JobRepository jobRepository; private final PlatformTransactionManager transactionManager; private final MemberReader reader; private final MemberLankProcessor lankProcessor; private final MemberWriter writer; //Job 생성할 자리 @Bean public Job manageMemberLankJob() { return new JobBuilder("manageMemberLankJob", jobRepository) .start(updateMemberCouponStep()) .build(); } //Step 생성할 자리 @Bean public Step updateMemberLankStep() { return new StepBuilder("updateMemberLankStep", jobRepository) .<Member, Member>chunk(10, transactionManager) .reader(reader) .processor(lankProcessor) .writer(writer) .build(); } }Job은 jobBuilder를 통해 만든다.
💡JobBuilder?
Job을 생성하고, jobRepository를 통해 Job을 관리함
객체를 생성할 때 내부에서만 잠시 사용되고, 필요할 때마다 새로 생성하는 빌더 패턴이므로 의존성 주입 x
JobBuilder 의 주요 메서드
start(Step step): Job이 시작할 첫 번째 Step을 지정합니다.
next(Step step): 이전 Step이 성공적으로 완료되면 다음 Step을 실행하도록 지정합니다.
on(String exitStatus): 특정 Step의 종료 상태에 따라 다음 실행 흐름을 결정합니다.
end(): Job의 실행이 완료됨을 정의합니다.
build(): Job 구성이 완료되면 최종적으로 Job 인스턴스를 빌드하고 반환4. 배치 실행
배치 프로젝트를 실행시키면 아주 잠깐 작업하고 프로젝트가 끝날 것이다.
배치는 할일 만 하면 되는 역할이기 때문에 당연하다.
따라서 주의할 점은,
build.gradle에 의존성 주입을 할 때 spring-boot-starter-web을 추가하면 안된다.
5. 다중 스탭 만들기
다중 스탭 만들기는 간단하다. step을 하나 더 만들어서 JobBuilder에 추가해주면 된다.
Reader와 Writer는 위에서 만든 클래스를 이용
ItemProcessor
@Component public class MemberCouponProcessor implements ItemProcessor<Member, Member> { @Override public Member process(Member member) throws Exception { int lank = member.getLank(); int coupon = member.getCoupon(); // lank에 따라 쿠폰 발급 수 증가 if (lank == 1) { coupon+=1; } else if (lank == 2) { coupon+=2; } else if (lank == 3) { coupon+=3; } member.setCoupon(coupon); return member; } }Step, Job
package com.example.springbatchexample.config; import com.example.springbatchexample.entity.Member; import java.util.List; import lombok.RequiredArgsConstructor; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; @Configuration @EnableBatchProcessing @RequiredArgsConstructor public class MemberBatchConfig { private final JobRepository jobRepository; private final PlatformTransactionManager transactionManager; private final MemberReader reader; private final MemberCouponProcessor couponProcessor; private final MemberLankProcessor lankProcessor; private final MemberWriter writer; @Bean public Job updateMemberCouponJob() { return new JobBuilder("updateMemberCouponJob", jobRepository) .start(updateMemberLankStep()) .next(updateMemberCouponStep()) //두번째 Step 추가 .build(); } @Bean public Step updateMemberLankStep() { return new StepBuilder("updateMemberLankStep", jobRepository) .<Member, Member>chunk(10, transactionManager) .reader(reader) .processor(lankProcessor) .writer(writer) .build(); } @Bean public Step updateMemberCouponStep() { return new StepBuilder("updateMemberCouponStep", jobRepository) .<Member, Member>chunk(10, transactionManager) .reader(reader) .processor(couponProcessor) .writer(writer) .build(); } }
‼️
하지만, 문제가 한가지 있었다.
회원 데잍가 30개였는데, 단일 Step의 Job을 실행하면 약 64번의 쿼리가 날라간다는 것,,,
‼️
Chunk =10, paging=10일 때
디버깅을 찍어보니,
먼저 처음 reader에서 페이징 처리로 10개의 데이터를 읽어오는 select 쿼리문 +1,
writer에서 10개의 회원 레코드를 저장하는 데 날라가는 쿼리문 select문 +10, update문 +10
이 과정을 반복해서 x3
마지막으로 더 처리할 데이터가 있는지 확인하는 select 쿼리문 +1
이렇게 총 64번의 쿼리가 날라갔다.
그럼 왜 writer에서 20번이나 날라가는가!
Jpa는 save를 할 때 저장하는 엔티티가 새로운 엔티티인지, 기존의 엔티티인지 판단해 insert문 또는 update문을 실행시킨다.
이때 새로운 엔티티인지 아닌지를 판별하기 위해 select문을 날리는 것이다.
배치처리한답시고 쿼리를 64번을 날리다니 이건 말도 안된다.
그런데 의문이 있었다.분명 청크별로 트랜젝션을 처리한다고 했다.한번의 청크는 턴크 단위의 데이터를 write하는 것 까지 포함이다.
그럼 내 시나리오는- 데이터가 write될 때 까지 트렌젝션에서 관리됨- 그럼 청크 단위의 엔티티들은 1차 캐시에서 관리됨- 그럼 save할 때 1차 캐시에서 엔티티들을 관리중이기 때문에 select문을 날릴 필요가 없음.
그런데 왜!!! select문을 날리는가ㅜ
원인은 JpaPagingItemReader였다.해당 객체를 타고 가보니, Reader가 실행되면 doReadPage 라는 메서드가 실행되는데, 이 메서드 안에서 읽기 처리를 완료하면 commit을 날려버린다. 즉, 트렌젝션이 종료된다..
그럼 어떻게 해결하는가!
해결 방법은
1. writer에서 save메서드를 사용하지 않고 merge를 바로 사용한다. -> 이 해결 방법은 원인에 대한 해결 방안이라기 보다는 보수적인 해결 방안이다. 새로운 엔티티인지 확인하는 select문이라도 날라가지 않게 하는 방법이다.
2. JpaPagingRepository를 사용하지 않는다. 대신 다른 ItemReader구현체를 사용한다.
실재로 RepositoryItemReader를 사용해보니 쿼리문이 줄어들었다.
아직 쿼리 수를 확 줄이지는 못해서 더 공부할 예정이다.
Jpa 더티 체킹에 대해 나중에 다뤄보잡,,
'CS > Spring' 카테고리의 다른 글
JPA 개념과 특징 (3) 2025.01.20 동시성 제어 (1) 2024.09.30 spring Batch (0) 2024.09.16 JPA - ID 생성 전략 (0) 2024.08.26 Kafka(1) - 카프카란 무엇일까? (0) 2024.07.26