본문 바로가기

카테고리 없음

[학습 테스트] 스레드풀 포화 정책(Saturation Policy) 알아보기

 최근 알림 로직 비동기화를 진행하면서 스레드풀을 커스터마이즈하여 빈으로 등록할 일이 있었습니다. 이를 위해 스레드풀에 대해 학습하다 "코어 스레드풀과 대기 큐가 다 차면 요청을 어떻게 처리해야 할까?" 라는 의문이 든 적이 있습니다. 놀랍게도 ThreadPoolExecutor를 다음과 같은 포화상태에서 대처안을 선택할 수 있도록 4가지 정책을 제안하고 있습니다.

 

오늘은 학습테스트 작성을 통해 스레드풀의 4가지 포화정책을 눈으로 보며 이해해보고자 합니다.

1. 스레드풀 기본 개념

스레드풀이란 말 그대로 스레드를 풀링해놓는 것입니다. 풀링은 주로 생성 비용이 큰 자원을 미리 만들어놓음으로써 사용 시 반복적인 셋업 코스트를 줄이기 위해 사용됩니다. 이를 기반으로 해석해보면 매번 반복적으로 할당하는 스레드의 생성비용을 줄이기 위해 미리 스레드를 만들어 모아놓은 것을 스레드풀이라고 이해할 수 있습니다.

 

그럼 스레드풀 안에 몇가지 기본 개념에 대해 알아봅시다.

corePoolSize : ThreadPool에서 상시 유지되는 pool의 개수를 의미합니다

maxPoolSize: ThreadPool이 최대로 확장될 수 있는 pool의 개수를 의미합니다.

workQueue:  사용가능한 스레드가 없으면 Runnable taske들이 대기하는 큐를 의미합니다.

 

한 가지 예시를 통해 스레드풀의 기초적인 개념에 대해서 생각해봅시다.

corePoolSize가 2 / maxPoolSize가 4 / workQueue가 1인 스레드풀 설정을 해석하면 다음과 같습니다.

 

 

corePoolSize == 상시 유지되는 스레드의 개수는 2개입니다.

maxPoolSize == 이 스레드풀은 4개까지 확장될 수 있습니다.

workQueue capacity == 한 개의 task가 대기할 수 있는 공간이 있습니다.

 

2. 스레드 할당 우선순위

그럼 특정 요청이 온다면 우린 어떤 우선 순위에 따라 스레드를 할당할까요?

 

corePool > workQueue > maxPool 순서로 작업을 고려합니다.

1.  corePool 에서 스레드를 할당한다.
2. (corePool이 모두 사용중이면) workQueue에 작업을 대기시킨다.
3. (corePool이 모두 사용중이고 + workQueue도 모두 차면) maxPoolSize에 도달할 때까지 스레드를 할당한다.

 

예를 들어 corePoolSize 1 / workQueue capacity 1 / maxPoolSize 2인 상황에서 요청 3개가 동시에 들어온 상황에서는 다음과 같이 스레드가 할당될 것입니다.

 

먼저 첫번째 요청은 상시 운용중인 corePool에서 스레드를 얻어 시행합니다.

 

두번째 요청이 들어왔습니다. 그런데 corePool이 모두 찼습니다. 그럼 다음 순위인 workQueue에 대기시킵니다.

 

세번째 요청이 들어왔습니다. corePool도 모두 차고, workQueue도 꽉 찼습니다. 그럼 ThreadPool에서 하나의 Thread 자원을 더 생성해 큐에 있는 작업을 꺼내 처리하기 시작합니다.

 

 

3. 포화 정책(Saturation Policy)란?

앞선 처리 상황에서 마지막 그림을 다시 봅시다. 

 

- corePool 이 꽉 참
- workQueue 도 꽉 참
- threadPool이 maxPoolSize 까지 꽉참

 

다음 과 같은 상황을 포화 상태라고 이야기합니다.

 

그럼 포화 상태에서 4번째 요청이 왔다면 어떻게 될까요?

이렇게 포화 상태일 때 새로운 요청을 어떻게 처리할 것인가? 를 규정하는 것이 바로 포화 정책입니다.

 

 

4. 포화 정책(Saturation Policy) 학습 테스트

포화 정책은 4가지가 있습니다.

정책 설명
AbortPolicy (default) 에러(RejectedExecutionException) 발생
DiscardPolicy  해당 요청을 버림
DiscardOldestPolicy  큐에서 가장 많이 대기한 작업을 버림
CallersRunPolicy 요청한 스레드에서 해당 요청을 수행하도록 함

 

그럼 각각 학습 테스트를 통해 작동 방식을 눈으로 확인하며 개념을 익혀보도록 하겠습니다.

 

4-1) 학습 테스트 객체 셋업

 

ThreadPoolExecutor : corePoolSize 1 / maxPoolSize 1 / queueSize 1로 세팅하며 각각 포화정책만 다르게 할 생각입니다.

executor = new ThreadPoolExecutor(
                1, // corePoolSize
                1, // maximumPoolSize
                0L, // keepAliveTime
                TimeUnit.MILLISECONDS, // timeUnit
                new ArrayBlockingQueue<>(1), // workQueue
                new ThreadPoolExecutor.{원하는 포화 정책}
        );

 

CallableEx : 실행되고 있는 스레드 이름을 반환하는 작업을 Callable로 등록해주었습니다.

static class CallableEx implements Callable<String> {
        private final int num;

        public CallableEx(int num) {
            this.num = num;
        }

        @Override
        public String call() {
            System.out.println("Thread Name : " + Thread.currentThread().getName() + " / num : " + num);
            return Thread.currentThread().getName();
        }
 }

 

getResult 메서드 : 해당 Callable의 Future 결과값을 추출하는 메서드입니다.

private static String getResult(Future<String> future) {
        try {
            return future.get(1000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new RuntimeException(e);
        }
 }

포화정책 1. AbortPolicy : 에러(RejectedExecutionException) 발생

 

먼저 예외를 발생시키는 AbortPolicy를 보겠습니다. 3개의 작업을 동시요청하여 포화 상태일 때 하나의 작업을 더 요청해보았습니다.

    @DisplayName("AbortPolicy : RejectedExecutionException 발생 ")
    @Test
    void testAbortPolicy() {
        executor = new ThreadPoolExecutor(
                1, // corePoolSize
                1, // maximumPoolSize
                0L, // keepAliveTime
                TimeUnit.MILLISECONDS, // timeUnit
                new ArrayBlockingQueue<>(1), // workQueue
                new ThreadPoolExecutor.AbortPolicy() // AbortPolicy
        );

        List<CallableEx> callables = List.of(new CallableEx(1), new CallableEx(2), new CallableEx(3));

        assertThatThrownBy(() -> executor.invokeAll(callables))
                .isInstanceOf(RejectedExecutionException.class);
    }

 

첫번째 작업은 corePool에서 할당받아 수행되고,

두번째 작업은 큐에 대기합니다.

세번째 작업은 포화 상태이므로 RejectedExecutionException이 발생합니다.

 

테스트 결과를 보아도 pool-1-thread-1 (executor 스레드풀)에서 첫번째 요청만 수행된 것을 볼 수 있습니다.


포화정책 2.  CallersRunPolicy : 요청한 스레드에서 수행

 

CallersRunPolicy는 요청한 스레드에서 해당 요청을 수행하도록 합니다. 예를 들어 A스레드가 B에 대한 요청을 보냈는데 B를 수행할 스레드풀이 포화상태라면 아예 A스레드가 해당 작업을 수행하도록 하는 것입니다.

 

    @DisplayName("CallersRunPolicy : 요청한 스레드에서 작업을 시행한다.")
    @Test
    void testCallersRunPolicy() throws InterruptedException {
        String requestThreadName = Thread.currentThread().getName();

        executor = new ThreadPoolExecutor(
                1, // corePoolSize
                1, // maximumPoolSize
                0L, // keepAliveTime
                TimeUnit.MILLISECONDS, // timeUnit
                new ArrayBlockingQueue<>(1), // workQueue
                new ThreadPoolExecutor.CallerRunsPolicy() // CallersRunPolicy
        );

        List<CallableEx> callables = List.of(new CallableEx(1), new CallableEx(2), new CallableEx(3));

        List<Future<String>> results = executor.invokeAll(callables, 1000, TimeUnit.MILLISECONDS);
        List<String> threadNames = results.stream()
                .map(SaturationPolicyTest::getResult)
                .toList();

        assertThat(threadNames).contains(requestThreadName);
    }

 

이번에는 각 작업이 실행된 threadName을 getResult 함수를 통해 추출하고, 이후에 threadName 중에 작업을 요청한 현재 스레드 이름(requestThreadName)이 포함되는지 확인해보았습니다.

 

첫번째 작업은 corePool에서 할당받아 수행되고,

두번째 작업은 큐에 대기합니다.

세번째 작업은 포화 상태이므로 해당 태스크를 요청한 테스트 스레드에서 수행합니다.

 

실제로 테스트 결과를 보면 요청1과 요청2는 executor 스레드풀에서 실행된 반면에 요청3는 요청한 스레드에서 실행한 모습을 볼 수 있습니다.

 


포화정책 3.  DiscardPolicy : 요청을 버림

 

DiscardPolicy는 포화상태라면 요청을 아예 무시합니다. 마치, "나 지금 바쁘니까 말시키지 마쇼!"와 같은 스탠스입니다. 

@DisplayName("DiscardPolicy : 포화 상태에서 요청한 작업을 버린다 == 시행하지 않는다.")
@Test
void testCallersDiscardPolicy() throws InterruptedException {

    executor = new ThreadPoolExecutor(
            1, // corePoolSize
            1, // maximumPoolSize
            0L, // keepAliveTime
            TimeUnit.MILLISECONDS, // timeUnit
            new ArrayBlockingQueue<>(1), // workQueue
            new ThreadPoolExecutor.DiscardPolicy() // DiscardPolicy
    );

    List<CallableEx> callables = List.of(new CallableEx(1), new CallableEx(2), new CallableEx(3));

    List<Future<String>> results = executor.invokeAll(callables, 1000, TimeUnit.MILLISECONDS);
    List<Boolean> cancelled = results.stream()
            .map(Future::isCancelled)
            .toList();

    assertThat(cancelled).contains(true);
}

 

이번에는 Future의 isCancelled 메서드를 통해 하나의 작업이라도 취소가 되었는지 확인해보았습니다.

첫번째 작업은 corePool에서 할당받아 수행되고,

두번째 작업은 큐에 대기합니다.

세번째 작업은 포화 상태이므로 수행하지 않고 요청을 무시니다.

 

 

실제로도 요청1과 요청2만이 executor 에서 수행된 모습을 볼 수 있습니다.

 


포화정책 3.  DiscardOldestPolicy : 가장 오래된 요청을 버리고 수행

 

DiscardOldestPolicy는 포화상태라면 가장 오래된 요청을 버리고 수행을 시도합니다. "잠깐만! 내가 가진 요청중에 가장 오래된 거 버릴 테니까 너는 들여보내 줄게!"와 같은 스탠스입니다.

 

이번에는 해당 요청이 드러나기 위해 ThreadPoolExecutor의 설정정보를 큐사이즈가 2가 되도록 바꾸어주었습니다.

executor = new ThreadPoolExecutor(
                1, // corePoolSize
                1, // maximumPoolSize
                0L, // keepAliveTime
                TimeUnit.MILLISECONDS, // timeUnit
                new ArrayBlockingQueue<>(2), // workQueue
                new ThreadPoolExecutor.DiscardOldestPolicy()// DiscardPolicy
        );

 

또한 4개의 요청을 순차적으로 해보았는데요. 이해를 돕기 위해 요청을 순차적으로 생각하면 다음과 같습니다.

maxPool = corePool(size=1) workQueue(size =2)  
    1 2 3 4
1 3 4
1 2 3  4
1 2(취소) 3 4
1 3 4  
3 4  
4    

여기서 주목해야 하는 순간은 포화 상태일때 4가 수행을 시도하는 시점입니다. 이때 DiscardOldestPolicy는 대기하고 있는 요청 중 가장 많이 대기한 요청(가장 오래된 요청)인 2를 버리고 4를 대신 큐에 넣으면 요청을 수행토록합니다.

 

그럼 예상하는 시나리오로는 1 3 4는 잘 수행이 될 것이고 2는 작업이 취소될 것입니다.

이를 검증하는 테스트를 작성해보면 다음과 같은 모습니다.

@DisplayName("DiscardOldestPolicy : 가장 오래 대기한 요청을 버리고 재시도한다.")
@Test
void testDiscardOldestPolicy() throws InterruptedException {
    executor = new ThreadPoolExecutor(
            1, // corePoolSize
            1, // maximumPoolSize
            0L, // keepAliveTime
            TimeUnit.MILLISECONDS, // timeUnit
            new ArrayBlockingQueue<>(2), // workQueue
            new ThreadPoolExecutor.DiscardOldestPolicy()// DiscardPolicy
    );

    List<CallableEx> callables = List.of(
            new CallableEx(1),
            new CallableEx(2),
            new CallableEx(3),
            new CallableEx(4)
    );
    /*
    core(size 1) | queue(size 2) |
    1            |               | 2 3 4
    1            |  2            | 3 4
    1            |  2 3          | 4
    1            |  2(취소) 3     | 4
    1            |  3 4          |
    3            |  4            |
    4            |               |
     */

    List<Future<String>> results = executor.invokeAll(callables, 1000, TimeUnit.MILLISECONDS);
    List<Boolean> cancelled = results.stream()
            .map(Future::isCancelled)
            .toList();

    assertThat(cancelled).containsExactly(false, true, false, false);
}

 

실제로 테스트를 실행해보아도 1,3,4번은 executor에서 수행이 되지만 2번은 취소된 모습을 볼 수 있습니다.

 


Summary

포화 상태 : corePool 꽉참 + workQueue 꽉참 + poolSize 가 maxPoolSize까지 도달함

포화 정책 : 포화 상태일때 새로운 요청을 어떻게 처리할 것인지를 다룬 대안

정책 설명
AbortPolicy (default) 에러(RejectedExecutionException) 발생
DiscardPolicy  해당 요청을 버림
DiscardOldestPolicy  큐에서 가장 많이 대기한 작업을 버림
CallersRunPolicy 요청한 스레드에서 해당 요청을 수행하도록 함

 

 

모든 학습 테스트는 다음 링크에서 확인하고 돌려보실 수 있습니다.

https://github.com/coli-geonwoo/blog/tree/feature/saturation-policy

 

GitHub - coli-geonwoo/blog: 블로그 헤이,브로의 실습 코드를 모아놓은 저장소입니다.

블로그 헤이,브로의 실습 코드를 모아놓은 저장소입니다. Contribute to coli-geonwoo/blog development by creating an account on GitHub.

github.com