Spring

RateLimiter 제대로 알고 쓰기

develua 2023. 4. 12. 22:09

일반적으로 요청받는 서버에서 단시간에 지나치게 높은 호출 빈도를 제어하기 위한 목적으로(throttling) RateLimiter 를 사용하고 있다.

RateLimiter 는 Guava 라이브러리에서 제공하는 유틸리티로, 외부 서버로의 API 호출을 제한하기 위해 자주 사용된다.

RateLimiter 를 이용해 호출 빈도 제한 방법

  1. 의존성 추가

우선, RateLimiter 는 Guava 라이브러리 내에 존재하는 유틸리티기 떄문에 이를 사용하기 위해 Guava 라이브러리 의존성을 추가해야 한다.

implementation("com.google.guava:guava:31.0.1-jre")
  1. RateLimiter 인스턴스의 생성
// 1초당 2회로 제한하는 케이스이다.
// 넘겨 주는 인자는 permitsPerSecond 이며, 1초당 허가할 요청 수를 넘겨 주게 된다.
val rateLimiter: RateLimiter = RateLimiter.create(2)

// TODO(user): "This is equivalent to  
// {@code createWithCapacity(permitsPerSecond, 1, TimeUnit.SECONDS)}".  
public static RateLimiter create(double permitsPerSecond) {  
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());  
}
  1. acquire() 을 통한 예약
val rateLimiter = RateLimiter.create(2.0)  

(1..10).forEach { it ->  
    // 1)
    rateLimiter.acquire()  
    doSomething(it)  
}
---
/**  
 * Acquires a single permit from this {@code RateLimiter}, blocking until the request can be  granted. Tells the amount of time slept, if any. 
 * 
 * <p>This method is equivalent to {@code acquire(1)}.  
 */
@CanIgnoreReturnValue  
public double acquire() {  
  return acquire(1);  
}

 

1) acquire() 의 호출은 acquire(1) 과 동일하며, 이는 single permit 을 획득하고자 함을 의미한다. 즉, 인자로 넘겨준 permit 만큼 예약을 진행하며, 한번에 획득하는 permit 수를 정의할 수 있다.

 

같은 맥락에서 다음 두 케이스가 모두 동일하게 동작한다는 것을 이해할 수 있다.

// case 1
val rateLimiter = RateLimiter.create(2.0)  
(1..10).forEach { it ->  
    rateLimiter.acquire()  
    doSomething(it)  
}

// case 2
val rateLimiter = RateLimiter.create(4.0)  
(1..10).forEach { it ->  
    // 한 번에 2개씩 permit 을 획득하며, 결과적으로 1초의 2번의 Task 처리가 가능하다. 
    rateLimiter.acquire(2)  
    doSomething(it)  
}

 

 

다음은 RateLimiter 를 실무에 사용해 보기에 앞서, 여러 케이스에 대해 테스트를 진행해 보도록 한다.

 


 

RateLimiter 를 구현을 통한 여러 케이스 테스트

 

✔️ Case 1 - RateLimiter 정상 동작 테스트(기본)

  • 조건 1) 2 QPS(Query Per Seconds)
// service 
fun testRateLimiter(){  
    val rateLimiter = RateLimiter.create(2.0)  
    val startTime = Instant.now()  

    (1..10).forEach { it ->  
        rateLimiter.acquire()  
        printTotalExecuteTime(title = "처리 시작 시간", startTime = startTime, endTime = Instant.now())  
        doSomething(it)  
        printTotalExecuteTime(title = "처리 완료 시간", startTime = startTime, endTime = Instant.now())  
    }  
}  

private fun doSomething(cnt:Int) {  
    // do Something  
    logger.info("Task Number: $cnt")  
}

// test
@Test  
fun test1() {  
    executeTime {  
        logger.info("test1 start")  
        rateLimiterTestService.testRateLimiter()  
    }  
}

 

결과

  • 1s 당 2번의 Task 실행
  • 최종 처리 시간: 4s 631ms

 

✔️ Case 2 - doSomething() 의 처리 시간 지연

  • 조건 1) 1s 이내 지연
  • 조건 2) 2 QPS
private fun doSomething(cnt:Int) {  
    // do Something  
    // 200 ms 지연
    Thread.sleep(200)  
    logger.info("Task Number: $cnt")  
}

 

결과

  • 1s 이내의 지연은 다음 Task 의 실행 시점을 지연시키지 않음. (즉, 요청 시점이 밀리지 않고, 1s 당 2 건의 요청이 보장됨)
  • 결과적으로 각 함수 실행 별 200 ms 의 지연이 있었음에도 불구하고, Case 1 테스트 처리 시간 + 200 ms 의 수준으로 처리 완료됨

 

✔️ Case 3 - doSomethingDelay() 의 처리 시간 지연 (1s 초과 - 외부 API 호출과 같은 응답이 지연되는 경우를 위한 테스트)

  • 조건 1) 1s 초과 지연
  • 조건 2) 2 QPS
private fun doSomethingDelay(cnt:Int) {  
    // do Something  
    Thread.sleep(1200)
    logger.info("Task Number: $cnt")  
}

 

결과

  • 1s 이상의 지연은 다음 Task 의 실행 시점을 지연시킴.
  • 이는 동기식 처리의 불가피한 결과, 한 Task 처리의 완료 이후(응답 반환) 다음 Task 들이 순차적으로 진행되나, 선행된 Task 의 1s 이상의 응답 지연은 RateLimiter 에서 QPS 을 보장할 수 없음

 

 

만약, 처리 지연에 영향 받지 않고, 정확히 1초에 원하는 요청 수를 보장받고 싶다면, 비동기로 요청할 수 있도록 구현하면 된다.

 


 

비동기 + RateLimiter

대량의 외부 서비스 호출을 진행할때, RateLimiter 를 이용해 요청 속도를 제한할 수 있다. 하지만, 초당 일정 요청 수를 보장해야하나, 외부 서비스의 응답 지연으로 인해 초당 일정한 요청 수를 보장받지 못할 수도 있다.

 

이에 대량 요청시에도 외부 서비스의 응답 시간과 관계없이 초당 일정한 요청 수를 보장받고자 '비동기 + RateLimiter 의 조합'으로 구현하고자 한다.

 

관련 테스트를 진행하기 위해 비동기 + RateLimiter 처리 코드를 준비한다.

fun testAsyncRateLimiter() = runBlocking {  
    val rateLimiter = RateLimiter.create(20.0)  
    val startTime = Instant.now()  

    (1..100).forEach { it ->  
        launch {  
            withContext(Dispatchers.IO) {  
                // 1)
                rateLimiter.acquire()  
                printTotalExecuteTime(title = "처리 시작 시간 $it", startTime = startTime, endTime = Instant.now())  
                // 2)
                doSomethingDelay(it)  
                printTotalExecuteTime(title = "처리 완료 시간 $it", startTime = startTime, endTime = Instant.now())  
                logger.info("==========================")  
            }  
        }  
    }
}

private fun doSomethingDelay(cnt:Int) {  
    // do Something  
    Thread.sleep(1000)  
    logger.info("Task Number: $cnt")  
}

 

1), 2) 기본적으로 블로킹 되는 코드(acquire() 초당 요청수를 초과하면 잠시 블로킹된다.) 는 비동기 처리 지연을 낳기 때문에 별도의 스레드에서 동작할 수 있도록 withContext(Dispatchers.IO) { } 코드 블록 내부에 선언한다.

 

하기 함수 모두 스레드 블록킹을 낳는 코드기 때문에 구현했기때문에(외부 서비스 지연 상황 재현 목적) 반복문이 지연 없이 동작할 수 있도록 acquire() 함수와 함께 별도의 스레드에서 동작하도록 구현했다.

 

1) doSomethingDelay()(외부 서비스 지연 상황 재현 목적으로 1초간 블로킹 되도록 구현)

2) acquire() 

 

실제로 초당 20건씩 총 100 건의 요청을 보내도록 구현하고, 테스트를 진행했을 때, 결과는 하기와 같다.

 

결과

  • 1번 테스트 응답 시간 + 지연 시간(1s) + 지연 버퍼 = 5s 962ms
  • 초당 20건씩 총 100 건의 요청을 동기로 구현했다면 최소 100s 이상으로 예상되는데 이보다 20배 빨라진 수치이다.

 

⁉️ 만약, 비동기로 처리 중 외부 서비스에서 에러가 발생한다면?


외부 서비스에서 예외가 발생하는 상황에서도 다른 요청에 지장없이 정상적으로 동작하는지 확인하기 위해 추가 테스트를 진행한다.

fun testAsyncRateLimiter2() = runBlocking {  
    val rateLimiter = RateLimiter.create(20.0)  
    val startTime = Instant.now()  

    (1..100).forEach { it ->  
        launch {  
            withContext(Dispatchers.IO) {  
                rateLimiter.acquire()  
                printTotalExecuteTime(title = "처리 시작 시간 $it", startTime = startTime, endTime = Instant.now())  
                // 1) 예외를 발생시키는 함수를 호출한다.
                try {  
                    doSomethingException(it)  
                } catch (e: Exception) {  
                // 2) 예외 발생 시, 해당 Task의 번호를 로거로 출력하도록 관리한다.
                    logger.error("[Exception] $it 번 예외 발생")  
                }  
                printTotalExecuteTime(title = "처리 완료 시간 $it", startTime = startTime, endTime = Instant.now())  
                logger.info("==========================")  
            }  
        }  
    }
}
private fun doSomethingException(cnt:Int) {  
    // do Something  
    Thread.sleep(300)  
    // 1) 아래와 같이 임의로 10으로 나눈 나머지가 9인 경우, 예외를 발생시키도록 구현했다. 
    if(cnt % 10 == 9) {  
        throw java.lang.RuntimeException()  
    }  
    logger.info("Task Number: $cnt")  
}

 

결과

  • 외부 서비스에서 예외가 발생하더라도 정상적으로 예외 처리되고, 다른 처리에 영향을 미치지 않은 것으로 확인되었다.

 

⁉️ 그렇다면, 비동기로 처리할 때, 따로 유의해야할 부분은 없을까?

  1. 자원 고갈: Dispatchers.IO는 공유 스레드 풀을 사용한다. 동시에 너무 많은 작업이 Dispatchers.IO에서 실행될 경우, 스레드 풀이 고갈되어 다른 I/O 작업에 대한 실행이 지연될 수 있기 때문에 이런 상황을 피하기 위해, 필요한 경우 별도의 스레드 풀을 구성하여 사용할 수 있다.
  2. 코루틴 취소 무시: withContext(Dispatchers.IO) { } 안에서 실행되는 코드는 코루틴 취소 신호를 확인하지 않을 수 있다. 예를 들어, rateLimiter.acquire()가 블로킹 동작을 수행하는 동안 취소 신호가 발생하면, 해당 동작이 완료될 때까지 코루틴이 취소되지 않기때문에 코드를 취소 가능한 방식으로 구현하거나 withTimeout() 등의 함수를 사용하여 타임아웃을 설정하는 방향으로 구현해야 한다.