Spring

Spring Batch - Job은 정말 실패한 시점부터 재실행될 수 있을까?

develua 2023. 3. 5. 19:50

이번에는 Spring Batch 가 정말 실패한 시점부터 재실행되는지, 어떻게 실패한 지점부터 재실행될 수 있는지에 대해 알아보자.


Spring Batch 가 아닌 주기적인 작업이 필요한 API 를 호출해 주는 방식의 스케줄러가 구현이 되어 있다고 가정해 보자.

 

 

만약, 스케줄러가 API 를 호출한 후 모종의 이유(에러 발생)에 의해 실행이 중단된 경우, 해당 처리를 성공적으로 완료시키기 위해선 동일한 호출을 다시 호출해야 하는 상황이 생긴다.

요청이 처리됨과 동시에 중단이 된거라면 반복 호출해도 크게 문제가 없을 수 있겠으나, 만약 처리해야 하는 10개의 데이터 중 5개만 처리하고, 중단된 거라면 반복 호출 시 불필요한 작업을 반복한다. 뿐만 아니라, 최상위가 아닌 Dao의 단위로 트랜잭션까지 걸어준 상태라면 데이터 정합성 문제 또한 발생 여지가 있다.

 

위와 같은 문제들은 Spring Batch Job을 이용하면, 예방할 수 있다고 하는데, 어떻게 예방이 가능한걸까?

 

어떻게?

Batch Job의 reader, writer 는 진행 상태를 지속적으로 유지하기 위한 메커니즘이 필요한데, ItemStream 인터페이스가 이를 제공하고, ItemStream 마커 인터페이스를 통해 주기적으로 Batch 처리 상태를 저장하고, 관리한다.

즉, ItemStream 는 실행 컨텍스트와 연계해 실행 상태를 관리함으로써 실패한 곳에서부터 다시 시작할 수 있게 돕는다.

ItemStream 의 구성은?

ItemStream 인터페이스는 다음과 같이 구성되어 있다.

public interface ItemStream {  

   /**  
    * Open the stream for the provided {@link ExecutionContext}.  
    *    
    * @param executionContext current step's {@link org.springframework.batch.item.ExecutionContext}.  Will be the  
    *  executionContext from the last run of the step on a restart.    
    * @throws IllegalArgumentException if context is null  
    */   
    void open(ExecutionContext executionContext) throws ItemStreamException;  

   /**  
    * Indicates that the execution context provided during open is about to be saved. If any state is remaining, but    
    * has not been put in the context, it should be added here.    
    * 
    * @param executionContext to be updated  
    * @throws IllegalArgumentException if executionContext is null.  
    */   
    void update(ExecutionContext executionContext) throws ItemStreamException;  

   /**  
    * If any resources are needed for the stream to operate they need to be destroyed here. Once this method has been    
    * called all other methods (except open) may throw an exception.
    */   
    void close() throws ItemStreamException;  
}

기본적으로 ItemStream은 위 코드와 같이 open(), update(), close() 함수를 포함하고 있다.

내부에 어떤 함수들을 가지고 있고, 각각의 함수들은 어떤 역할을 할까?

아래 코드들은 ItemStream 인터페이스를 구현한 AbstractItemCountingItemStreamItemReader 추상 클래스이다. 구현된 코드의 내용을 바탕으로 각각 함수들이 어떤 역할을 하고 있는지 자세히 살펴보도록 하자.


AbstractItemCountingItemStreamItemReader 추상 클래스는 JpaPagingItemReader(Chunk 지향 처리에서 Reader의 구현체로 사용하고 있는) 에서 이미 상속받아 사용하고 있는 클래스로 ItemStream 은 따로 직접적인 구현의 필요없이 이미 구현되어 적용되고 있다.


1) open(): 현재 실행되는 Step ExecutionContext 를 가지고, 저장된 내용을 바탕으로 Chuck 상태를 초기화한다.

// (1)
protected abstract void doOpen() throws Exception;

@Override  
public void open(ExecutionContext executionContext) throws ItemStreamException {  
   super.open(executionContext);  
   // reader 구현체의 doOpen() 를 호출함으로써 read 준비 완료
   try {  
      doOpen();  
   }  
   catch (Exception e) {  
      throw new ItemStreamException("Failed to initialize the reader", e);  
   }  

   if (!isSaveState()) {  
      return;  
   }  

   if (executionContext.containsKey(getExecutionContextKey(READ_COUNT_MAX))) {  
      maxItemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT_MAX));  
   }  

   int itemCount = 0;  
   if (executionContext.containsKey(getExecutionContextKey(READ_COUNT))) {  
      itemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT));  
   }  
   else if(currentItemCount > 0) {  
      itemCount = currentItemCount;  
   }  

   if (itemCount > 0 && itemCount < maxItemCount) {  
      try {  
      // (2)
         jumpToItem(itemCount);  
      }  
      catch (Exception e) {  
         throw new ItemStreamException("Could not move to stored position on restart", e);  
      }  
   }  

   currentItemCount = itemCount;  

}

(1): AbstractItemCountingItemStreamItemReader 을 상속받은 자식 클래스에서 구현하고 있는 초기화 함수 (만약, JpaPagingItemReader 를 이용한다면, 해당 Reader 내부적으로 구현된 doOpen() 함수를 호출하게 된다.)
(2): executionContext 에 저장된 READ_COUNT_MAX, READ_COUNT 를 가져와 현재 읽기 시작해야 하는 Item 위치를 찾아간다.


2) update(): 매 Chuck 이후의 상태를 ExecutionContext에 저장한다.

@Override  
public void update(ExecutionContext executionContext) throws ItemStreamException {  
   super.update(executionContext);  
   // (1)
   if (saveState) {  
      Assert.notNull(executionContext, "ExecutionContext must not be null");  
      // (2)
      executionContext.putInt(getExecutionContextKey(READ_COUNT), currentItemCount);  
      if (maxItemCount < Integer.MAX_VALUE) {  
         executionContext.putInt(getExecutionContextKey(READ_COUNT_MAX), maxItemCount);  
      }  
   }  
}

(1): saveState 는 기본 값이 true 이며, 만약 실행 Job에 대해 따로 상태관리를 하고 싶지 않을 때, false 로 변경 가능하다.
(2): ExecutionContextUserSupport (JobExecution 또는 StepExecution의 실행 컨텍스트에서 데이터를 저장하고 검색하는 데 사용되는 도우미 클래스)를 이용하여 ExecutionContext 에 현재까지 읽은 currentItemCount 를 READ_COUNT 키 값으로 저장한다.


3) close(): 정리해야 하는 리소스들을 처리하는 역할을 당담한다.

@Override  
public void close() throws ItemStreamException {  
   super.close();  
   currentItemCount = 0;  
   try {  
      doClose();  
   }  
   catch (Exception e) {  
      throw new ItemStreamException("Error while closing item reader", e);  
   }  
}

실제로 코드는 위와 같이 정의돼 있으며, 위 추상 reader 클래스를 구현한 JpaPagingItemReader 에서는 doClose() 함수를 아래와 같이 구현하고 있다.

@Override  
protected void doClose() throws Exception {  
   entityManager.close();  
   super.doClose();  
}

각 3개의 함수는 언제 호출될까?

실제로 ItemReader, ItemWriter 그리고 ItemStream을 직접 구현한 Reader, Writer 를 생성했고, 구현한 Reader, Writer의 클래스 다이어그램은 다음과 같다.

ItemWriter
ItemReader

 

각각의 함수 내에 실행 순서를 확인하기 위한 로깅을 추가한 뒤, Job을 실행해 보면 아래와 같은 결과가 출력된다.

  1. 초기화: Step의 실행과 함께, 실행 컨텍스트 초기화를 위한 open() 함수와 초기화된 실행 컨텍스트를 현재 BATCH_STEP_EXECUTION_CONTEXT 테이블에 반영하기 위한 update() 함수가 호출된다.
  2. Chunk 실행: 각 Chunk 별로 reader -> processor(현재 코드에선 생략) -> writer 처리가 진행되는데, 출력된 내용을 보면 Chunk Size만큼의 reader 가 호출되어 데이터를 읽어오고, 이를 한번에 writer 호출을 통해 데이터를 업데이트 한다. 즉, 위 예시에서는 Chunk Size를 2로 지정했기 때문에, 2번의 reader 호출로 데이터를 읽어오고, 이를 한번의 writer의 호출로 가공된 데이터를 저장한다.
  3. Chunk 실행 결과 업데이트: 주목해볼 점은 실행 컨텍스트를 업데이트하는 함수의 호출이 reader, writer 별로 발생하는 것이 아닌 Chunk 단위로 이루어진다는 점이다. 단순하게 생각하기론 reader 의 read() 함수 처리 이후에 ItemStream의 update() 함수가 호출되고, writer 의 write() 함수 처리 이후에 ItemStream의 update() 함수가 호출될거라 생각해 볼 수 있으나 호출된 내용으로는 각 Chunk 처리 이후 reader, writer 의 update() 가 순차적으로 호출되었다. 실행 시점이 동일한 위와 같은 결과를 통해 실행 컨텍스트의 관리만을 고려하는 경우, ItemStream를 reader, writer 중 어디에 구현할지와 같은 고민을 하지 않아도 됨을 확인했다.
  4. Step 종료: 최종적으로 Step의 종료와 함께 reader, writer 내에 구현한 ItemStream의 close() 함수가 각각 호출되면서 Step이 종료되게 된다.

결과를 정리해 보면, 아래와 같다.


  • open(), close() 함수의 경우에는 Step의 시작과 끝에 한번씩 호출된다.
  • update() 함수는 초기화 시점에 open() 함수 호출 이후에 한 번, 매 Chuck 실행 후에 해당 데이터를 업데이트 해주기 위해 실행되며, 이는 read(), write() 함수의 호출과 상관없이 reader, writer 의 update() 함수는 각 Chunk 처리 이후로 실행 시점이 동일하다.

 

실제 구현 및 실행 결과

지금까지 ItemStream을 이용해서 Step의 상태가 관리된 사실을 확인해봤으니, 실제 일부 Chunk 실패 코드를 구현하여 원하는대로 동작하는지 살펴보자.


결과는 아래와 같다.

처리가 필요한 데이터가 50개 존재하는 상황에서 Chuck Size 가 2인 Job을 임의로 10번째 Item 처리 시 에러가 발생하도록 동작시켰을 때, 아래와 같이 STEP_EXECUTION 에 아래와 같이 정상적으로 데이터가 담긴 것을 확인해 볼 수 있다.

  • Commit: 4번
  • Read Count: 10번
  • Write Count: 8번
  • Rollback Count: 1번 (5번째 Chunk 에 대해서만 롤백 발생)

 

 

이제 동일한 JobInstance 를 실행시켰을 때, 원하는대로 9번째 Item 부터 실행되는지 확인해 보자


아래 사진에 보여지는 바와 같이 9번부터 실행되는 것을 확인해 볼 수 있다.