본문 바로가기
백엔드/스프링

Chunk 지향 프로세싱 + Tasklet

by ARlegro 2025. 4. 7.

Chunk 기반 처리

  • 스프링 배치의 가장 흔한 구현 방식이다.
  • 데이터를 한 번에 한 항목씩 읽고, 일정 개수만큼 모아서(chunk) 한 번에 쓰는 구조
  • 이것들이 트랜잭션 경계 내에서 처리됨

preview : 처리 흐름 미리 보기

1️⃣ 기본 적인 Chunk 흐름

(출처 : https://docs.spring.io/spring-batch/reference/step/chunk-oriented-processing.html)

  1. ItemReader가 데이터를 한 건씩 읽음
  2. 읽은 데이터를 임시 리스트에 저장
  3. 설정한 commitInterval만큼 모이면,
  4. ItemWriter가 해당 리스트를 한번에 write
  5. 트랜잭션 커밋

이 모든 과정이 트랜잭션 내에서 한전하게 처리된다.

수도 코드 예시

List items = new ArrayList();
for (int i = 0; i < commitInterval; i++) {
    Object item = itemReader.read();   <<< 한번씩 읽고
    if (item != null) {
        items.add(item);  <<< 저장 
    }
}
itemWriter.write(items);   <<< 쓰기

2️⃣ ItemProcessor 포함 시 Chunk 흐름

(출처 : https://docs.spring.io/spring-batch/reference/step/chunk-oriented-processing.html)

List items = new ArrayList();
for (int i = 0; i < commitInterval; i++) {
    Object item = itemReader.read();
    if (item != null) {
        items.add(item);
    }
}

// ItemProcessor를 통해 처리
List processedItems = new ArrayList();
for (Object item : items) {
    Object processedItem = itemProcessor.process(item);
    if (processedItem != null) {
        processedItems.add(processedItem);
    }
}

itemWriter.write(processedItems);

ChunkOrientedTasklet

스프링 배치에서 제공하는 Taklet의 구현체

Spring Batch의 핵심 처리 유닛으로, 읽기 → 가공 → 쓰기 과정 관리

개념

✅ 개념

  • Chunk 지향 프로세싱 처리의 핵심 도메인 객체
  • ItemReader, ItemWriter, ItemProcessor 를 사용해 Chunk 기반의 데이터 입출력 처리를 담당

✅ 특징

  • TaskletStep 에 의해서 반복적으로 실행된다.
  • 내부적으로 Chunk 단위로 데이터를 ‘읽기-가공→쓰기’를 핸들링 해주는 구현채를 가짐
    • ChunkProvider
      • Chunk 단위 아이템을 제공해주는 제공자
      • ItemReader 를 핸들링 해줌
    • ChunkProcessor
      • Chunk 단위 아아템 처리해주는 처리자
      • ItemProcessor, ItemWriter 를 핸들링 해주는
  • 트랜잭션 자동
    • ChunkOrientedTasklet이 실행 될 때마다 매번 새로운 트랜잭션이 생성되어 처리가 이루어진다
      • Chunk 단위로 트랜잭션이 일어난다.
      • 별도의 트랜잭션 처리를 해주지 않아도 됨
    • 예외 발생 시
      • 해당 Chunk는 Rollback되며 이전에 커밋한 Chunk는 완료된 상태가 유지된다
      • 이는, Chunk 단위로 커밋하기 때문

내부 동작

StepBuilder
   └─ .chunk() 호출
        └─ SimpleStepBuilder<I, O> 생성
             └─ build() → TaskletStep 생성
                  └─ setTasklet(new ChunkOrientedTasklet<>(provider, processor))
                           └─ provider → SimpleChunkProvider<ItemReader>
                           └─ processor → SimpleChunkProcessor<ItemProcessor + ItemWriter>

내부 구조

public class **ChunkOrientedTasklet**<I> implements Tasklet {

	private static final String INPUTS_KEY = "INPUTS";
	
	// Chunk ~ tasklet 은 이 2개를 갖고 있다.
	private final ChunkProcessor<I> **chunkProcessor**;
	private final ChunkProvider<I> **chunkProvider**;

	private boolean buffering = true;

	@Nullable
	@Override
	public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

		@SuppressWarnings("unchecked")
		Chunk<I> **inputs** = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
		if (inputs == null) {
			**[Item을 Chunk size만큼 반복해서 읽은 다음 Chunk<I>에 저장하고 반환]**
			**inputs** = chunkProvider.provide(contribution);
			if (buffering) {
				chunkContext.setAttribute(INPUTS_KEY, **inputs**);
		  	⭐**예외 생기면 지금까지 읽어온 item들을 chunkCotext에 담아서 다음 처리시 활용**
			}
		}

		**[가공 및 저장 : ChunkProvider로부터 받은 Chunk<I>의 아이템 개수만큼 데이터를]**
		chunkProcessor.process(contribution, inputs);
		chunkProvider.postProcess(contribution, inputs); // 상태 정리, 리스너 호출 등 

		// Allow a message coming back from the processor to say that we
		// are not done yet
		if (inputs.isBusy()) {
			logger.debug("Inputs still busy");
			return RepeatStatus.CONTINUABLE;
		}

		**[버퍼 역할을 한 ChunkContext의 데이터 삭제 when chunk 입출력 완료 시]
		[이렇게 캐쉬를 없애야 다음 chunk 단위 작업 시 중복 없앰]**
		chunkContext.removeAttribute(INPUTS_KEY);
		chunkContext.setComplete();

		if (logger.isDebugEnabled()) {
			logger.debug("Inputs not busy, ended: " + inputs.isEnd());
		}
		return RepeatStatus.continueIf(!inputs.isEnd()); **<< 읽을 데이터 존재하는지 체크** 
		// 있으면 Chunk프로세스 반복 
	}

chunkContext.getAttribute(INPUTS_KEY);

  • chunkContext에서 INPUTS_KEY에 해당하는 아이템들이 있으면 가져오기

<aside> 💡

추가 - SimpleStepBuilder가 ChunkOrientedTasklet 생성 내부 구조

public class SimpleStepBuilder<I, O> extends AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> {

	@Override
	protected Tasklet createTasklet() {
		....
		
		**SimpleChunkProvider**<I> chunkProvider = new SimpleChunkProvider<>(getReader(), repeatOperations);
		**SimpleChunkProcessor**<I, O> chunkProcessor = new SimpleChunkProcessor<>(getProcessor(), getWriter());
		chunkProvider.setListeners(new ArrayList<>(itemListeners));
		chunkProvider.setMeterRegistry(this.meterRegistry);
		chunkProcessor.setListeners(new ArrayList<>(itemListeners));
		chunkProcessor.setMeterRegistry(this.meterRegistry)
		ChunkOrientedTasklet<I> tasklet = new **ChunkOrientedTasklet**<>(chunkProvider, chunkProcessor);
		tasklet.setBuffering(!readerTransactionalQueue);
		return tasklet;
	}

</aside>

☑️ TakletStep.exe → ChunkOrientedTaklet

  • TaskletStep.execute 메서드 호출

☑️ ChunkOrientedTaklet → chunkProvider → ItemReader

  • ChunkOrientedTaklet.provide() 호출
  • ChunkProvider → ItemReader
    • ChunkProvider는 내부적으로 ItemReader.read()를 반복 호출해서 지정된 chunkSize만큼 데이터를 읽어 Chunk<I>로 만듭니다.
    • 읽은 데이터가 없으면 종료 조건이 됩니다.
    • 데이터를 읽어 오면서 중간 중간의 데이터를 chunkcontext에 캐쉬한다 → 이로 인해 예외 발생 시 이전에 읽은 데이터를 재활용 가능하다.
    • public class **ChunkOrientedTasklet**<I> implements Tasklet {
      
      	public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
      
      		@SuppressWarnings("unchecked")
      		Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
      		if (inputs == null) {
      			inputs = **chunkProvider**.provide(contribution);
      			if (**buffering**) {  // 버퍼링 중이면 chunkContext에 임시 저장 
      				chunkContext.setAttribute(INPUTS_KEY, inputs);
      			}
      

☑️ ChunkOrientedTaklet → chunkProcessor

  • 읽어온 데이터를 chunkProcessor에게 전달하여,
  • chunkProcessor가 데이터를 처리하도록 지시

☑️ ChunkProcessor → ItemProcessor

  • chunkProcessor.process() 메서드 호출
  • ItemProcessor에게 Item을 가공(변형)하라고 지시
  • 가공(변형)은 Item 개수만큼 반복

☑️ ChunkProcessor → ItemWriter

  • ItemProcessor가 가공한 List<Item>을 전달받고 ItemWriter에게 전달한다

이 과정을 읽을 Item이 없을 때 까지 Chunk단위로 반복한다.

사실상 스프링 배치 도입은 이러한 Chunk 작업을 위해 도입하는 경우가 많기에 잘 공부하자

chunk 메서드

 

StepBuilder의 chunk() 메서드는 청크 단위로 처리하는 Step을 구성할 수 있도록 SimpleStepBuilder를 생성해주는 메서드입니다.

	public <I, O> **SimpleStepBuilder**<I, O> chunk(int chunkSize, PlatformTransactionManager transactionManager) {
		return new SimpleStepBuilder<I, O>(this).transactionManager(transactionManager).chunk(chunkSize);
	}

  • <I> : Reader에서 읽을 데이터 타입
  • <O> : Processor에서 처리하고 Writer로 넘길 데이터 타입
  • chunksize : 한 번에 몇건의 데이터를 처리할지
  • transactionManager : 트랜잭션 범위를 관리할 매니저
    • Step은 아이템을 읽고 쓰는 과정에서 주기적으로 트랜잭션 커밋을 한다.
    • 이 때 트랜잭션 관리를 위해 사용되는 것이 PlatformTransactionManager

<aside> 💡

주의 : 커밋은 비용이 크기 때문에 자주 쓰면 비효율적

</aside>

1. <I, O> chunk (size, transactionManager)

  • 지정된 크기(chunk size)로 아이템을 청크 단위로 처리하는 Step을 생성
  • Chunk Size 설정 → 이게 단순히 처리 단위뿐만 아니라 commit interval도 의미한다. ex. size = 10 : 10개 데이터 읽고 10개 가공하고 10개 쓰고 DB에 저장 후 커밋 (커밋도 SIZE와)
	public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize, PlatformTransactionManager transactionManager) {
		return new SimpleStepBuilder<I, O>(this).transactionManager(transactionManager).chunk(chunkSize);
	}

  1. <I, O> chunk (completionPolicy, transactionManager)
    • Chunk 프로세스를 완료하기 위한 정책 설정 클래스 지정
    public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy,
    			PlatformTransactionManager transactionManager) {
    		return new SimpleStepBuilder<I, O>(this).transactionManager(transactionManager).chunk(completionPolicy);
    	}
    	
    	//completionPolicy – 청크 처리 시 사용할 완료 정책 (CompletionPolicy)
    	//transactionManager – 청크 기반 Tasklet에서 사용할 트랜잭션 매니저
    

예시

.<Customer, Customer>chunk(3, transactionManager)
  • 앞의 Customer: ItemReader가 읽어오는 입력 데이터 타입
  • 뒤의 Customer: ItemProcessor가 처리한 후 넘겨주는 출력 데이터 타입
  • 해석 : "이 Step은 Customer 타입을 읽고, Customer 타입으로 가공해서, Customer 타입으로 기록한다."
public class SimpleStepBuilder<I, O> extends AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> {

	private static final int DEFAULT_COMMIT_INTERVAL = 1;
	
	private ItemReader<? extends I> reader;
	private ItemWriter<? super O> writer;
	private ItemProcessor<? super I, ? extends O> processor;
	private int chunkSize = 0;
	private RepeatOperations chunkOperations;
	private CompletionPolicy completionPolicy;
	private Set<StepListener> itemListeners = new LinkedHashSet<>();
	private boolean readerTransactionalQueue = false;
	private MeterRegistry meterRegistry = Metrics.globalRegistry;

커밋 간격 (Commit Interval)

@Bean
public Job sampleJob(JobRepository jobRepository, Step step1) {
    return new JobBuilder("sampleJob", jobRepository)
                     .start(step1)
                     .build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.build();
}

✅Step 실행 전체 흐름 분석

  1. Step 실행 시 트랜잭션 시작
  2. ItemReader.read()를 한 번 씩 호출 → 호출할 때마다 내부 카운터 증가
  3. 내부 카운터가 10이 되면 ItemWriter.write() 전달
  4. 트랜잭션 커밋

✅ chunk 분석

.chunk(10, transactionManager)

chunk( 10 , ~) == 커밋 단위도 10개 기준이 된다.

즉, Chunk Size가 10이면, 10건 처리 후 1번 커밋

💢 Tradeoff

너무 빨리 커밋 ⇒ DB I/O작업이기 때문에 성능 저하

너무 느리게 커밋 ⇒ 메모리 사용량 증가, rollback 비용 증가