Chunk 기반 처리
- 스프링 배치의 가장 흔한 구현 방식이다.
- 데이터를 한 번에 한 항목씩 읽고, 일정 개수만큼 모아서(chunk) 한 번에 쓰는 구조
- 이것들이 트랜잭션 경계 내에서 처리됨
preview : 처리 흐름 미리 보기
1️⃣ 기본 적인 Chunk 흐름
(출처 : https://docs.spring.io/spring-batch/reference/step/chunk-oriented-processing.html)
- ItemReader가 데이터를 한 건씩 읽음
- 읽은 데이터를 임시 리스트에 저장
- 설정한 commitInterval만큼 모이면,
- ItemWriter가 해당 리스트를 한번에 write
- 트랜잭션 커밋
이 모든 과정이 트랜잭션 내에서 한전하게 처리된다.
수도 코드 예시
List items = new ArrayList();
for (int i = 0; i < commitInterval; i++) {
Object item = itemReader.read(); <<< 한번씩 읽고
if (item != null) {
items.add(item); <<< 저장
}
}
itemWriter.write(items); <<< 쓰기
2️⃣ ItemProcessor 포함 시 Chunk 흐름
(출처 : https://docs.spring.io/spring-batch/reference/step/chunk-oriented-processing.html)
List items = new ArrayList();
for (int i = 0; i < commitInterval; i++) {
Object item = itemReader.read();
if (item != null) {
items.add(item);
}
}
// ItemProcessor를 통해 처리
List processedItems = new ArrayList();
for (Object item : items) {
Object processedItem = itemProcessor.process(item);
if (processedItem != null) {
processedItems.add(processedItem);
}
}
itemWriter.write(processedItems);
ChunkOrientedTasklet
스프링 배치에서 제공하는 Taklet의 구현체
Spring Batch의 핵심 처리 유닛으로, 읽기 → 가공 → 쓰기 과정 관리
개념
✅ 개념
- Chunk 지향 프로세싱 처리의 핵심 도메인 객체
- ItemReader, ItemWriter, ItemProcessor 를 사용해 Chunk 기반의 데이터 입출력 처리를 담당
✅ 특징
- TaskletStep 에 의해서 반복적으로 실행된다.
- 내부적으로 Chunk 단위로 데이터를 ‘읽기-가공→쓰기’를 핸들링 해주는 구현채를 가짐
- ChunkProvider
- Chunk 단위 아이템을 제공해주는 제공자
- ItemReader 를 핸들링 해줌
- ChunkProcessor
- Chunk 단위 아아템 처리해주는 처리자
- ItemProcessor, ItemWriter 를 핸들링 해주는
- ChunkProvider
- 트랜잭션 자동
- ChunkOrientedTasklet이 실행 될 때마다 매번 새로운 트랜잭션이 생성되어 처리가 이루어진다
- Chunk 단위로 트랜잭션이 일어난다.
- 별도의 트랜잭션 처리를 해주지 않아도 됨
- 예외 발생 시
- 해당 Chunk는 Rollback되며 이전에 커밋한 Chunk는 완료된 상태가 유지된다
- 이는, Chunk 단위로 커밋하기 때문
- ChunkOrientedTasklet이 실행 될 때마다 매번 새로운 트랜잭션이 생성되어 처리가 이루어진다
내부 동작
StepBuilder
└─ .chunk() 호출
└─ SimpleStepBuilder<I, O> 생성
└─ build() → TaskletStep 생성
└─ setTasklet(new ChunkOrientedTasklet<>(provider, processor))
└─ provider → SimpleChunkProvider<ItemReader>
└─ processor → SimpleChunkProcessor<ItemProcessor + ItemWriter>
내부 구조
public class **ChunkOrientedTasklet**<I> implements Tasklet {
private static final String INPUTS_KEY = "INPUTS";
// Chunk ~ tasklet 은 이 2개를 갖고 있다.
private final ChunkProcessor<I> **chunkProcessor**;
private final ChunkProvider<I> **chunkProvider**;
private boolean buffering = true;
@Nullable
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
@SuppressWarnings("unchecked")
Chunk<I> **inputs** = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
if (inputs == null) {
**[Item을 Chunk size만큼 반복해서 읽은 다음 Chunk<I>에 저장하고 반환]**
**inputs** = chunkProvider.provide(contribution);
if (buffering) {
chunkContext.setAttribute(INPUTS_KEY, **inputs**);
⭐**예외 생기면 지금까지 읽어온 item들을 chunkCotext에 담아서 다음 처리시 활용**
}
}
**[가공 및 저장 : ChunkProvider로부터 받은 Chunk<I>의 아이템 개수만큼 데이터를]**
chunkProcessor.process(contribution, inputs);
chunkProvider.postProcess(contribution, inputs); // 상태 정리, 리스너 호출 등
// Allow a message coming back from the processor to say that we
// are not done yet
if (inputs.isBusy()) {
logger.debug("Inputs still busy");
return RepeatStatus.CONTINUABLE;
}
**[버퍼 역할을 한 ChunkContext의 데이터 삭제 when chunk 입출력 완료 시]
[이렇게 캐쉬를 없애야 다음 chunk 단위 작업 시 중복 없앰]**
chunkContext.removeAttribute(INPUTS_KEY);
chunkContext.setComplete();
if (logger.isDebugEnabled()) {
logger.debug("Inputs not busy, ended: " + inputs.isEnd());
}
return RepeatStatus.continueIf(!inputs.isEnd()); **<< 읽을 데이터 존재하는지 체크**
// 있으면 Chunk프로세스 반복
}
chunkContext.getAttribute(INPUTS_KEY);
- chunkContext에서 INPUTS_KEY에 해당하는 아이템들이 있으면 가져오기
<aside> 💡
추가 - SimpleStepBuilder가 ChunkOrientedTasklet 생성 내부 구조
public class SimpleStepBuilder<I, O> extends AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> {
@Override
protected Tasklet createTasklet() {
....
**SimpleChunkProvider**<I> chunkProvider = new SimpleChunkProvider<>(getReader(), repeatOperations);
**SimpleChunkProcessor**<I, O> chunkProcessor = new SimpleChunkProcessor<>(getProcessor(), getWriter());
chunkProvider.setListeners(new ArrayList<>(itemListeners));
chunkProvider.setMeterRegistry(this.meterRegistry);
chunkProcessor.setListeners(new ArrayList<>(itemListeners));
chunkProcessor.setMeterRegistry(this.meterRegistry)
ChunkOrientedTasklet<I> tasklet = new **ChunkOrientedTasklet**<>(chunkProvider, chunkProcessor);
tasklet.setBuffering(!readerTransactionalQueue);
return tasklet;
}
</aside>
☑️ TakletStep.exe → ChunkOrientedTaklet
- TaskletStep.execute 메서드 호출
☑️ ChunkOrientedTaklet → chunkProvider → ItemReader
- ChunkOrientedTaklet.provide() 호출
- ChunkProvider → ItemReader
- ChunkProvider는 내부적으로 ItemReader.read()를 반복 호출해서 지정된 chunkSize만큼 데이터를 읽어 Chunk<I>로 만듭니다.
- 읽은 데이터가 없으면 종료 조건이 됩니다.
- 데이터를 읽어 오면서 중간 중간의 데이터를 chunkcontext에 캐쉬한다 → 이로 인해 예외 발생 시 이전에 읽은 데이터를 재활용 가능하다.
-
public class **ChunkOrientedTasklet**<I> implements Tasklet { public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { @SuppressWarnings("unchecked") Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY); if (inputs == null) { inputs = **chunkProvider**.provide(contribution); if (**buffering**) { // 버퍼링 중이면 chunkContext에 임시 저장 chunkContext.setAttribute(INPUTS_KEY, inputs); }
☑️ ChunkOrientedTaklet → chunkProcessor
- 읽어온 데이터를 chunkProcessor에게 전달하여,
- chunkProcessor가 데이터를 처리하도록 지시
☑️ ChunkProcessor → ItemProcessor
- chunkProcessor.process() 메서드 호출
- ItemProcessor에게 Item을 가공(변형)하라고 지시
- 가공(변형)은 Item 개수만큼 반복
☑️ ChunkProcessor → ItemWriter
- ItemProcessor가 가공한 List<Item>을 전달받고 ItemWriter에게 전달한다
이 과정을 읽을 Item이 없을 때 까지 Chunk단위로 반복한다.
사실상 스프링 배치 도입은 이러한 Chunk 작업을 위해 도입하는 경우가 많기에 잘 공부하자
chunk 메서드
StepBuilder의 chunk() 메서드는 청크 단위로 처리하는 Step을 구성할 수 있도록 SimpleStepBuilder를 생성해주는 메서드입니다.
public <I, O> **SimpleStepBuilder**<I, O> chunk(int chunkSize, PlatformTransactionManager transactionManager) {
return new SimpleStepBuilder<I, O>(this).transactionManager(transactionManager).chunk(chunkSize);
}
- <I> : Reader에서 읽을 데이터 타입
- <O> : Processor에서 처리하고 Writer로 넘길 데이터 타입
- chunksize : 한 번에 몇건의 데이터를 처리할지
- transactionManager : 트랜잭션 범위를 관리할 매니저
- Step은 아이템을 읽고 쓰는 과정에서 주기적으로 트랜잭션 커밋을 한다.
- 이 때 트랜잭션 관리를 위해 사용되는 것이 PlatformTransactionManager
<aside> 💡
주의 : 커밋은 비용이 크기 때문에 자주 쓰면 비효율적
</aside>
1. <I, O> chunk (size, transactionManager)
- 지정된 크기(chunk size)로 아이템을 청크 단위로 처리하는 Step을 생성
- Chunk Size 설정 → 이게 단순히 처리 단위뿐만 아니라 commit interval도 의미한다. ex. size = 10 : 10개 데이터 읽고 10개 가공하고 10개 쓰고 DB에 저장 후 커밋 (커밋도 SIZE와)
public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize, PlatformTransactionManager transactionManager) {
return new SimpleStepBuilder<I, O>(this).transactionManager(transactionManager).chunk(chunkSize);
}
- <I, O> chunk (completionPolicy, transactionManager)
- Chunk 프로세스를 완료하기 위한 정책 설정 클래스 지정
public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy, PlatformTransactionManager transactionManager) { return new SimpleStepBuilder<I, O>(this).transactionManager(transactionManager).chunk(completionPolicy); } //completionPolicy – 청크 처리 시 사용할 완료 정책 (CompletionPolicy) //transactionManager – 청크 기반 Tasklet에서 사용할 트랜잭션 매니저
예시
.<Customer, Customer>chunk(3, transactionManager)
- 앞의 Customer: ItemReader가 읽어오는 입력 데이터 타입
- 뒤의 Customer: ItemProcessor가 처리한 후 넘겨주는 출력 데이터 타입
- 해석 : "이 Step은 Customer 타입을 읽고, Customer 타입으로 가공해서, Customer 타입으로 기록한다."
public class SimpleStepBuilder<I, O> extends AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> {
private static final int DEFAULT_COMMIT_INTERVAL = 1;
private ItemReader<? extends I> reader;
private ItemWriter<? super O> writer;
private ItemProcessor<? super I, ? extends O> processor;
private int chunkSize = 0;
private RepeatOperations chunkOperations;
private CompletionPolicy completionPolicy;
private Set<StepListener> itemListeners = new LinkedHashSet<>();
private boolean readerTransactionalQueue = false;
private MeterRegistry meterRegistry = Metrics.globalRegistry;
커밋 간격 (Commit Interval)
@Bean
public Job sampleJob(JobRepository jobRepository, Step step1) {
return new JobBuilder("sampleJob", jobRepository)
.start(step1)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.build();
}
✅Step 실행 전체 흐름 분석
- Step 실행 시 트랜잭션 시작
- ItemReader.read()를 한 번 씩 호출 → 호출할 때마다 내부 카운터 증가
- 내부 카운터가 10이 되면 ItemWriter.write() 전달
- 트랜잭션 커밋
✅ chunk 분석
.chunk(10, transactionManager)
chunk( 10 , ~) == 커밋 단위도 10개 기준이 된다.
즉, Chunk Size가 10이면, 10건 처리 후 1번 커밋
💢 Tradeoff
너무 빨리 커밋 ⇒ DB I/O작업이기 때문에 성능 저하
너무 느리게 커밋 ⇒ 메모리 사용량 증가, rollback 비용 증가
'백엔드 > 스프링' 카테고리의 다른 글
Spring-batch 테스트 : 여러 JOB에서 생기는 오류 NoUniqueBeanDefinitionException (0) | 2025.04.22 |
---|---|
@Value vs @ConfigurationProperties (0) | 2025.04.13 |
스프링 배치 - Chunk (preview) (0) | 2025.04.07 |
스프링 배치 - StepContribution (0) | 2025.04.06 |
스프링 배치 - JobExecution (0) | 2025.04.06 |