이번에 php 코드로 동작하고 있던 만료 예정 크레딧 발송 코드를 kotlin 으로 리팩토링하는 작업을 진행했다.
리팩토링이 필요한 코드는 총 1일, 7일, 30일 후에 만료되는 크레딧을 보유하고 있는 사용자에게 크레딧 만료 예정 알림톡을 보내주는 코드로 11000 ~ 14000건 정도를 매일 11시에 발송하는 코드이다.
- 1일 후 만료(5000 ~ 6000건)
- 7일 후 만료(5000 ~ 6000건)
- 30일 후 만료(1000 ~ 2000건)
코드를 확인해보니 만료 예정 크레딧을 받아오는 코드가 페이징 처리되어 있었고, 조회해온 Page 사이즈만큼의 크레딧에 대해 만료 예정 알림톡을 순차적으로 보내준 뒤, 반복적으로 다음 Page 리스트에 대해서도 동일 처리를 진행하는 코드로 파악이 되었다.
하지만, 10000개가 넘는 알림톡 발송건을 순차 실행이 보장되지 않아도 되는 코드에서 동기적으로 처리하는 것은 크게 비효율적으로 느껴졌다.
그래서 알림톡 발송 코드를 코루틴 빌더(launch)를 이용해 비동기로 처리하고자 마음먹었는데, 한가지 걸리는 부분이 있었다.
알림톡 발송 자체가 I/O 작업으로 스레드를 블로킹 시킨다는 점
일반적으로 코루틴은 적은 스레드로 고가용성을 지향하다보니 코루틴 내에 스레드를 블러킹시키는 코드가 존재한다면, 이는 코루틴 실행에 치명적일 수 있다.
그럼에도 불구하고, I/O 작업을 코루틴 빌더를 이용한 비동기로 처리하는 것이 유의미한 성능적 개선이 있는지 확인해 보기 위해 몇가지 테스트를 진행해 보았다.
다음과 같은 3가지 경우를 확인해 보고자 한다.
단, 3가지 경우 모두 I/O 작업이 2초가 소요된다고 가정한다.
- 동기 작업 + 블로킹 함수(I/O 작업)
- 비동기 작업 + 블로킹 함수(I/O 작업)
- (번외) 비동기 작업 + 일시 중단 함수(I/O 작업)
기대하는 결과로는 3 >> 2 > 1 의 순인데, 실제로 이 결과가 맞는지 그리고 결과로 또다른 인사이트를 얻을 수 있는지 확인해 보자.
1. 동기 작업 + 블로킹 함수
기존 PHP 코드에서 구현되어 있던 방식이다. 동기적으로 알림톡 발송이 이루어지고 있었고, 알림톡 발송 이후 응답이 반환되기까지 대기 상태를 유지했다.
✔️ 코드 구현
우선, 첫번째 케이스를 구현해 보자.
@Test
fun main5() {
val startTime = System.currentTimeMillis()
syncFun()
val endTime = System.currentTimeMillis() - startTime
println("total time $endTime")
}
// 1)
fun syncFun() {
val atomicInteger = AtomicInteger(0)
(1..500).forEach {
println("$it: Start")
blockingFun()
atomicInteger.getAndIncrement()
println("$it: End, integer: $atomicInteger")
}
}
// 2)
fun blockingFun() {
Thread.sleep(2000)
}
1) 동기 작업을 담당하는 함수이다. 1 부터 500 까지 반복문을 돌면서 블로킹 함수를 호출하게 된다.
2) 간단하게 구현한 블로킹 함수이다. 내부적으로는 현재 스레드를 2초간 블로킹 시키도록 한다.
✔️ 결과
해당 코드를 실행했을 때, 예상대로면 1000초 (500번 * 2초씩), 약 17분 정도가 걸릴 것으로 예상해 볼 수 있다.
실제로 예상한대로, 총 16분 47초가 소요되었으며, 순차적으로 처리되도록 구현했기 때문에 각 요청에 대해 End 로그가 출력된 이후에 다음 요청의 Start 가 출력된 것을 확인해 볼 수 있다.
2. 비동기 작업 + 블로킹 함수
다음으로는 블로킹 함수를 비동기로 호출하는 경우이다. 이 글을 쓰게된 이유이기도 하며, 해당 경우가 블로킹 함수를 동기적으로 실행하는 경우와 비교했을 때, 성능의 차이를 확인해 보고자 한다.
✔️ 코드 구현
@Test
// 1)
fun main5() = runTest {
val startTime = System.currentTimeMillis()
suspendFun1()
val endTime = System.currentTimeMillis() - startTime
println("total time $endTime")
}
// 2)
suspend fun suspendFun1() {
val atomicInteger = AtomicInteger(0)
withContext(Dispatchers.IO) {
(1..500).forEach {
println("$it: Start")
launch {
blockingFun()
atomicInteger.getAndIncrement()
println("$it: End, integer: $atomicInteger")
}
}
}
}
// 3)
fun blockingFun() {
Thread.sleep(2000)
}
1) 내부적으로 일시 중단 함수를 호출하기 때문에 코루틴 생성을 위해서 runTest 코루틴 빌더로 코루틴을 생성한다.
2) 일시 중단 함수를 선언한다. 내부적으로 I/O 작업을 위한 스레드 할당 시, Dispatchers.IO 을 이용함으써 I/O 작업을 좀 더 효율적으로 처리할 수 있도록 한다. 또한, 비동기 작업 처리 시, 반환값을 필요로 하지 않는 launch 빌더를 이용해 비동기 작업을 진행한다.
3) 코루틴 내부에서 호출할 블로킹 함수이다. 2초간 스레드를 블로킹한다.
✔️ 결과
위 코드의 실행의 결과는 어땠을까? 결과는 16분 -> 16초로 단축된 결과를 확인해 볼 수 있었다. 이는 동기처리와 비교했을 때, 대략 63~64배 정도 빨라진 결과이다.
Q1. 16초는 어떻게 나올 수 있었던 것일까?
16초라는 결과는 2초씩(I/O 작업 지연 시간) 총 8번 반복했을 때의 결과인데, 총 처리해야 하는 500개의 작업량을 8번의 반복으로 모두 소화해 내기 위해선 한 번의 몇개의 작업이 처리되어야 할까?
방정식을 세워보면 다음과 같다.
7 * X + Y = 500(Y < X)
X 가 한 번에 실행될 수 있는 총 스레드 수이고, Y가 7번의 처리 이후, 남은 작업량이다.
여기서 한번에 실행될 수 있는 총 스레드 수는 Dispatcher.IO의 스레드 풀 사이즈와 관련이 있고(물론 테스트 작업 외 다른 작업은 없다고 가정한다.), 일반적으로 Dispatcher.IO의 스레드 풀 사이즈는 64개이기 때문에 예상되는 처리 내용은 하기 내용과 같다.
예상 작업 진행
- 비동기 작업으로 64개 스레드 작업 동시 할당
- 스레드 2초간 I/O 작업으로 지연 + 스레드 풀 내 스레드 모두 블로킹 상태로 작업 할당 불가
- 2초간 지연 후, 블로킹 해제 및 64개 재 할당
- 500개의 작업이 모두 처리될 때까지 지연 할당 처리 반복
위와 같은 작업의 반복으로 64개의 스레드가 2초간 지연 작업을 진행하고 이를 총 8번을 반복했기 때문에 16초의 시간이 소요되었던 것이다.
Q2. 다른 요청 처리에 문제가 되진 않을까?
하지만, 위 구현은 문제가 있다. 500 개의 작업을 처리하기 위해 I/O 바운드 스레드를 계속해서 고갈시킴으로써 해당 작업 외의 I/O 바운드 작업 처리를 지연시킨다.
위 테스트는 해당 테스트 작업 외에 다른 요청은 배제한 채로 진행한 테스트로써 문제가 없어보이나, 실은 다른 I/O 바운드 스레드를 이용한 처리가 필요한 요청과 동시에 처리하게 되면, 다음과 같은 문제가 발생한다.
테스트 상황
- I/O 바운드 스레드 풀을 사용하고, 내부적으로 블로킹 함수를 호출하는 2개의 서비스 함수를 구현한다.
// 1)
suspend fun suspendFun1() {
withContext(Dispatchers.IO) {
blockingFun()
}
}
// 2)
suspend fun suspendFun2() {
val atomicInteger = AtomicInteger(0)
withContext(Dispatchers.IO) {
(1..14000).forEach { _ ->
launch {
blockingFun()
atomicInteger.getAndIncrement()
}
}
}
}
fun blockingFun() {
Thread.sleep(2000)
}
1) I/O 바운드 스레드를 이용하는 다른 서비스에 영향이 없는지 확인 용도의 서비스 함수이다. 내부적으로 스레드를 2초간 지연시키는 블로킹 함수를 호출한다.
2) 실제로 구현할 코드이다(지금까지 테스트를 진행해온). 실제 알림톡을 발송할 데이터 수를 맞춰 주기 위해 14000번 블로킹 함수를 호출하도록 했다.
2. 테스트는 suspendFun2() 을 호출시키고, 이후에 바로 suspendFun1() 를 1000번 호출하도록 진행했다.
결과
2초의 지연만 발생해야 하는 1000 번의 suspendFun1 의 호출이 suspendFun2 지연에 의한 스레드 고갈 상테에 의해 suspendFun2 처리가 완료되기까지 약 7분간의 대기 이후에 하나씩 처리되어 가는 결과를 보였다.
결과적으로, 전체 알림톡 발송 속도 개선을 위한 처리가 결과적으론 다른 요청 처리에 지연을 낳았다.
그럼, 알림톡 발송 비동기 처리 로직을 다른 비동기 처리(@Async와 같은)를 이용하거나 혹은 다시 동기 처리로 원복시켜야 하는 것일까?
그렇지 않다.
아래 위 문제를 해결할 수 있는 방법을 한 번 살펴보자.
알림톡 발송을 위한 별도의 스레드 풀 분리
위 문제는 대용량의 스레드 지연을 발생시키는 작업들이 동일한 스레드 풀 내의 스레드를 점유하고자 해서 발생하는 문제이다.
이미 다른 작업에 의해 가용한 스레드는 고갈되어 없다보니 가용 자원이 생길때까지 계속해서 대기 상태에 놓이게 된다.
이에 suspendFun1 함수의 호출을 I/O 바운드 스레드 풀이 아닌 다른 스레드 풀에서 동작하게끔 한다 하더라도 만약, 다른 동료가 미쳐 이 위험 상황을 인지하지 못하고 위 문제가 됐던 코드와 동일하게 작업하게 됐다면?
이런 위험 상황을 예방하고자 다음 상황처럼 대용량 + 비동기 + 블로킹 함수 호출의 조합으로 동작하는 경우, 별도의 '독립적인' 스레드 풀에서 동작할 수 있도록 구현하고자 한다.
코드의 구현
@Configuration
class ThreadPoolContext {
@Bean
fun ioDispatcher(): ExecutorCoroutineDispatcher {
val ioThreadFactory = ThreadFactoryBuilder()
.setNameFormat("IO-bound-thread-%d")
.setThreadFactory(Executors.defaultThreadFactory())
.build()
// 1)
return Executors.newFixedThreadPool(64, ioThreadFactory).asCoroutineDispatcher()
}
}
1) 독립적인 스레드 풀에서 동작하게 하기 위해 Executors 를 이용해 풀 사이즈 64개인 ExecutorCoroutineDispatcher 를 생성한다.
suspend fun suspendFun2() {
val atomicInteger = AtomicInteger(0)
// 1)
withContext(ioDispatcher) {
println("suspendFun2 currentThread: ${Thread.currentThread()}, ${Thread.activeCount()}")
(1..14000).forEach { i ->
launch {
blockingFun()
atomicInteger.getAndIncrement()
}
}
}
}
// 2)
suspend fun suspendFun1() {
withContext(Dispatchers.IO) {
(1..1000).forEach { i ->
launch {
blockingFun()
}
}
}
}
1) 빈으로 등록한 ioDispatcher
을 주입받아 아래와 같이 withContext 빌더의 인자로 넘겨준다.
2) suspendFun1 함수의 경우 동일하게 IO Dispatchers 를 이용한다.
결과
결과는 상기 이미지와 같다.
- suspendFun2() -> suspendFun1() 순으로 호출했고, 순차적으로 start 가 출력됨을 확인해 볼 수 있다.
- 스레드의 Name format이 지정한 Name Format으로 호출됨을 확인해 볼 수 있다.
- mainTest1 함수의 실행이 32초 지속 후, 종료되었음을 확인해 볼 수 있다. 1000번의 작업을 완료하기 위해 64개의 스레드가 16(1000 // 64 + 1) 번의 할당 지연(처리) 작업을 지속해야하기 때문에 16 * 2(지연 시간) = 32 로 스레드 고갈에 따른 지연없이 처리됨을 확인할 수 있다.
- mainTest2 함수의 실행 또한 64개의 스레드, 219 번의 반복, 219 * 2 = 438 의 시간으로 7분의 시간이 정상적으로 처리된 것임을 확인할 수 있다.
3. 비동기 작업 + 일시 중단 함수
블로킹 함수를 비동기로 처리했을 때와 일시 중단 함수를 비동기로 처리할 때의비교를 통해 코루틴을 블로킹하는 것이 성능에 어느정도 영향을 미치는지 추가적으로 확인해보고자 아래와 같은 테스트를 진행한다.
✔️ 코드 구현
@Test
fun main5() = runTest {
val startTime = System.currentTimeMillis()
suspendFun1()
val endTime = System.currentTimeMillis() - startTime
println("total time $endTime")
}
suspend fun suspendFun1() {
val atomicInteger = AtomicInteger(0)
// 1)
withContext(ioDispatcher) {
(1..500).forEach {
println("$it: Start")
launch {
suspendFun3()
atomicInteger.getAndIncrement()
println("$it: End, integer: $atomicInteger")
}
}
}
}
// 2)
suspend fun suspendFun3() {
delay(2000)
}
1) 이전에 만들어 놓은 ioDispatcher 를 이용한다.
2) 다른 구현은 비동기 + 블로킹 함수 를 구현했던 2번 케이스와 동일하나, 비동기 내에서 호출하는 함수 부분을 일시 중단 함수로 구현했다.
✔️ 결과
결과는 500개의 작업이 처리되는데 2초정도가 걸림(일시 중단 시간)을 확인해 볼 수 있었다.
이러한 결과는 모든 스레드들이 유휴 시간없이 지연이 발생할때마다 대기중인 다른 작업들의 처리를 진행했기 때문에 500 개의 작업이 거의 동시간대에 처리되는 효과를 낳을 수 있었다.
즉, 일시 중단 상태의 함수를 코루틴 내에서 실행하게 되면, 해당 코루틴이 일시 중단 상태가 될 때, 스레드는 중단하지 않고 다른 코루틴의 실행을 위해 스위칭이 발생하면서 연속적인 코루틴의 실행을 돕는다. 결과적으로는 이러한 일시 중단 상태는 스레드의 처리 효율을 높인다.
그동안 단순히 어느 정도 성능 차이가 있겠지와 같이 짐작만 하고 있었던 상태였는데, 이번에 실무에 적용해 보기에 앞서 1) 일반 동기 호출과 비교했을 때, 유의미한 속도 개선 효과가 있는지 2) 적용해 봤을 때.다른 우려할 만한 상황은 없는지 3) 우려할 만한 상황이 있다면, 개선 방법이 있는지 4) 부가적으로 일시 중단 함수 호출과는 얼마만큼의 성능의 차이를 보이는 지를 확인해 볼 필요를 느꼈고, 테스트를 진행했다.
결론적으로 실무에 적용하기에 앞서 속도 개선 효과를 육안으로 확인해 봄으로써 적용의 당위성을 얻을 수 있었고, 부가적인 우려사항들을 개선한 방법까지 고민해 볼 수 있었던 시간이었다.
'Spring' 카테고리의 다른 글
RunTest에 대해서... (0) | 2023.03.26 |
---|---|
Redis 분산락을 이용해서 한 번에 한 번씩 결제하기 (0) | 2023.03.20 |
Spring Batch - Job은 정말 실패한 시점부터 재실행될 수 있을까? (0) | 2023.03.05 |
Spring Batch - JobParameters (0) | 2023.02.26 |
Spring Batch를 적용해 보기에 앞서 (0) | 2023.02.19 |