본문 바로가기

프로젝트/오디

FCM 알림 비동기 + 이벤트 리스닝으로 리팩터링 하기 - 1편

Situation

: 알림 로직이 비즈니스 로직과 강결합 & 동기 코드로 좋지 않은 성능

 

친구의 지각을 방지해주는 프로젝트 `오디`는 사용자에게 다양한 알림을 보내줍니다.

 

알림의 종류는 다음과 같습니다.

타입 보내는 시점 설명
입장 알림 약속방 참여 시기 친구가 약속방에 참여했음을 알립니다.
출발 알림 약속에 지각하지 않을 출발 시각 본인 혹은 친구가 지각하지 않으려면 지금 출발해야 함을 알립니다.
위치 정보 확인 가능 알림 실시간 친구 위치 확인 가능 시점 현 시각 이후로 친구들의 실시간 위치를 확인 가능함을 알립니다.
재촉 알림 내가 지각 위기일 때
&& 친구가 나를 재촉했을 때
친구에게 재촉 알림을 발송합니다. == 콕 찌르기

 

그러나, 알림에 대한 도메인이 커지면서 몇 가지 문제라 생각되는 지점들이 보였습니다.

 

 

문제1. 알림 서비스와 비즈니스 로직 간의 결합도가 높다

 

알림 서비스는 분명 서브 태스크임에도 불구하고 비즈니스 로직과 결합도가 높았습니다. 이것은 곧 알림을 발송하지 못한다는 이유로 서비스가 동작하지 않을 수 있음을 의미합니다.

 

예를 들어 약속방에 참여하는 과정에서는 입장 알림을 보내야 합니다. 그러나 FCM 서버에서 에러를 반환하면 어떻게 될까요? 본 서버의 주요 로직에는 전혀 문제가 없음에도 FCM 서버의 장애가 전파되어 서비스에서 약속 참여 자체가 불가한 상황이 될 것입니다.

 

 

문제2. FCM 동기 로직이 생각보다 시간이 오래 걸린다.

 

현재 알림 로직은 모두 동기화되어 있습니다. FCM 서버에서 에러가 발생하는 경우가 적었고, latency도 그리 늦지 않다고 판단했기 때문인데요. 그러나 2024 우아콘의 한 세미나에서 대용량 트래픽에서 FCM latency가 기하급수적으로 치솟는 상황이 있음을 알게 되었습니다. 또한 FCM 푸시 알림에 대해 latency를 측정한 몇몇 블로그 글을 살펴보니 생각보다 성능이 좋지 않았습니다.

 

ex1) 메시지 발송 시 : 3010ms - 출처 : wang's tech blog

ex2) 10만개 알림 처리 시간 : 400분 - 출처 : 기록하는 개발자

 

우리 서비스도 느릴까?

Postman을 통해 알림을 발송하는 비즈니스 로직의 latency를 prod와 local 환경에서 실험해보기로 하였습니다.

 

상황은 Fcm 서버와 소통하는 다음 3가지 상황을 생각해보았습니다.

- 재촉하기 : 재촉 알림을 발송하는 상황
- 방나가기  : FCM topic을 unsubscribe
- 약속 참여 : 입장 알림을 발송하는 상황

 

그리고 각 10회씩 호출한 평균값은 다음과 같았습니다.

상황 local latency(ms) prod latency(ms)
재촉하기 302.4 303.7
방 나가기 1055.2 871.4
약속 참여하기 1416.8 1007.6

 

생각보다 많이 느렸습니다. 특히 약속에 참여하는 로직은 latency가 1초 이상이 걸렸습니다.


Task : 결합도 낮추기 + 성능 올리기

 

그럼 리팩터링의 상황을 다시한번 정리해보겠습니다.

[Situation]
- 비즈니스 로직과 알림 로직이 강결합되어 있음
- 동기 알림 로직으로 인해 좋지 않은 성능

 

여기로부터 끌어낼 수 있는 리팩터링 포인트는 다음과 같습니다.

1. 비즈니스 로직과 알림 로직의 결합도 낮추기  by 이벤트 리스닝 방식

2. 성능 개선하기 by 비동기화

 


Action : 비동기 + 이벤트 리스닝 구조

 

기존에 비즈니스 로직과 결합도를 낮추기 위해 알림 로직을 이벤트 기반 구조로 변경하였습니다. 또한 비동기화를 통해 성능을 개선하고자 하였습니다. 도식으로 표현하자면 다음과 같습니다.

1. notificationService에서는 fcmEventPublisher에게 이벤트 발행을 부탁하고 비즈니스 로직을 이어갑니다.
2. fcmEventPublisher가 알림/ 혹은 구독 이벤트를 발행합니다.
3. fcmEventListener가 발행된 이벤트를 받아 fcmPushSender나 fcmSubscirber에게 작업을 위임합니다.

 

 

이제 다음 순서에 따라 본격적인 구현에 들어가보겠습니다.

1) event 구조 설계
2) fcmEventPublisher 구현하기
3) fcmEventListener 구현하기
4) 비동기 에러 핸들러 구현하기
5) asyncConfig 설정하기

 

1) event 구조 설계

 

현재 Fcm 서버와 소통하는 것은 크게 두 가지 흐름으로 나뉘었습니다.

- 알림 보내기 : 특정 기기 / topic을 대상으로 알림 보내기

- 구독/ 구독 해제 : Pub/Sub 구조를 기반으로 특정 topic에 구독/ 구독 해제

 

이를 반영하여 설계한 이벤트의 종류는 다음과 같았습니다.

fcmSubscriber : 구독 /  구독 해제 이벤트
fcmPushSender : 공지 / 푸시 / 재촉알림 이벤트

 

이제 각 필요한 정보를 필드로 가진 이벤트 객체를 구현해주었습니다.

 

NoticeEvent  : 공지 알림 

더보기
@Getter
public class NoticeEvent extends ApplicationEvent {

    private final GroupMessage groupMessage;

    public NoticeEvent(Object source, GroupMessage groupMessage) {
        super(source);
        this.groupMessage = groupMessage;
    }
}

NudgeEvent  : 재촉 알림

더보기
@Getter
public class NoticeEvent extends ApplicationEvent {

    private final GroupMessage groupMessage;

    public NoticeEvent(Object source, GroupMessage groupMessage) {
        super(source);
        this.groupMessage = groupMessage;
    }
}

NoticeEvent  : 공지 알림 

더보기
@Getter
public class NudgeEvent extends ApplicationEvent {

    private final Mate requestMate;
    private final Notification nudgeNotification;

    public NudgeEvent(Object source, Mate requestMate, Notification nudgeNotification) {
        super(source);
        this.requestMate = requestMate;
        this.nudgeNotification = nudgeNotification;
    }
}

PushEvent  : 푸시 알림

더보기
@Getter
public class PushEvent extends ApplicationEvent {

    private final Notification notification;
    private final GroupMessage groupMessage;

    public PushEvent(Object source, Notification notification) {
        super(source);
        this.notification = notification;
        this.groupMessage = GroupMessage.from(notification);
    }
}

SubscribeEvent  : 구독 

더보기
@Getter
public class SubscribeEvent extends ApplicationEvent {

    private final DeviceToken deviceToken;
    private final FcmTopic topic;

    public SubscribeEvent(Object source, DeviceToken deviceToken, FcmTopic topic) {
        super(source);
        this.deviceToken = deviceToken;
        this.topic = topic;
    }
}

UnSubscribeEvent  : 구독 해제

더보기
@Getter
public class UnSubscribeEvent extends ApplicationEvent {

    private final DeviceToken deviceToken;
    private final FcmTopic topic;

    public UnSubscribeEvent(Object source, DeviceToken deviceToken, FcmTopic topic) {
        super(source);
        this.deviceToken = deviceToken;
        this.topic = topic;
    }
}

2) fcmEventPublisher 구현하기

 

다음으로 이벤트 발행을 담당하는 객체를 구현하였습니다.

각 이벤트 객체가 상속한 ApplicationEvent 객체를 applicationEventPublisher를 통해 발행하는 역할을 담당합니다.

@Slf4j
@Component
@RequiredArgsConstructor
public class FcmEventPublisher {

    private final ApplicationEventPublisher eventPublisher;

    public void publish(ApplicationEvent applicationEvent) {
        eventPublisher.publishEvent(applicationEvent);
    }

    @Transactional
    public void publishWithTransaction(ApplicationEvent applicationEvent) {
        eventPublisher.publishEvent(applicationEvent);
    }
}

 

구성된 메서드들이 단순 전달 메서드라고 생각할 수 있지만 다음과 같이 구성한 이유는 다음과 같습니다.

- 이벤트 발행 시 공통 작업을 전담

- 상위 모듈에서 알림 이벤트 발행 과정과 관련한 관심사 분리


 

3) fcmEventListener 구현하기

 

이제 발행된 이벤트를 수신하여 로직을 실행할 리스너를 구현해야 합니다.

리스너의 내부 로직은 event 별로 발행 정보를 받아 fcmPushSender/fcmSubscriber 각각에게 fcm과의 소통을 담당하도록 구성하였습니다.

@Slf4j
@Component
@RequiredArgsConstructor
public class FcmEventListener {

    private final FcmPushSender fcmPushSender;
    private final FcmSubscriber fcmSubscriber;

    @Async
    @EventListener
    public void subscribeTopic(SubscribeEvent subscribeEvent) {
        FcmTopic topic = subscribeEvent.getTopic();
        DeviceToken deviceToken = subscribeEvent.getDeviceToken();
        fcmSubscriber.subscribeTopic(topic, deviceToken);
    }
}

 

 

 

특정 이벤트를 수신하는 이벤트 리스닝 로직을 구성하기 위해서는

1) listen하는 이벤트를 매개변수로 받고

2) @EventListener를 통해 리스닝하는 메서드를 지정해주면 됩니다.

 

또한 비동기로 로직을 실행하기 위해 스프링에서 지원하는 @Async 애너테이션을 활용하여 새로운 스레드에서 fcm api와의 소통로직을 실행하도록 하였습니다. 실제로도 돌려본 결과 새로운 스레드에서 로직이 실행되는 모습을 볼 수 있습니다.

 

이를 기반으로 각 이벤트에 해당하는 알림 혹은 구독 관련 로직을 fcmPushSender와 fcmSubscriber에게 위임해주었습니다.  전체적인 fcmEventListener의 모습은 다음과 같습니다.

 

>> fcmEventListener

더보기
@Slf4j
@Component
@RequiredArgsConstructor
public class FcmEventListener {

    private final FcmPushSender fcmPushSender;
    private final FcmSubscriber fcmSubscriber;

    @Async
    @EventListener
    public void subscribeTopic(SubscribeEvent subscribeEvent) {
        FcmTopic topic = subscribeEvent.getTopic();
        DeviceToken deviceToken = subscribeEvent.getDeviceToken();
        fcmSubscriber.subscribeTopic(topic, deviceToken);
    }

    @Async
    @EventListener
    public void unSubscribeTopic(UnSubscribeEvent subscribeEvent) {
        FcmTopic topic = subscribeEvent.getTopic();
        DeviceToken deviceToken = subscribeEvent.getDeviceToken();
        fcmSubscriber.unSubscribeTopic(topic, deviceToken);
    }

    @Async
    @EventListener
    public void sendNoticeMessage(NoticeEvent noticeEvent) {
        GroupMessage groupMessage = noticeEvent.getGroupMessage();
        fcmPushSender.sendNoticeMessage(groupMessage);
        log.info("공지 알림 전송 | 전송 시간 : {}", Instant.now().atZone(TimeUtil.KST_OFFSET));
    }

    @Async
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)// 커밋 이후 발송
    public void sendPushMessage(PushEvent pushEvent) {
        Notification notification = pushEvent.getNotification();
        GroupMessage groupMessage = pushEvent.getGroupMessage();
        fcmPushSender.sendGroupMessage(groupMessage, notification);
        log.info("푸시 알림 성공- id : {}, 전송시간 : {}", notification.getId(), notification.getSendAt());
    }

    @Async
    @EventListener
    public void sendNudgeMessage(NudgeEvent nudgeEvent) {
        Notification nudgeNotification = nudgeEvent.getNudgeNotification();
        Mate requestMate = nudgeEvent.getRequestMate();
        DirectMessage nudgeMessage = DirectMessage.createMessageToOther(requestMate, nudgeNotification);
        fcmPushSender.sendDirectMessage(nudgeMessage);
        log.info("재촉하기 성공- id : {}, 전송시간 : {}", nudgeNotification.getId(), nudgeNotification.getSendAt());
    }
}

 

4) AsyncUncaughtExceptionHandler로 비동기 에러 핸들러 구현하기

 

@Async로 지정된 비동기 메서드의 경우 별도의 스레드에서 실행을 호출하기 때문에 비동기 메서드에서 발생한 예외가 로깅되지 않는 문제가 있습니다.

 

따라서 비동기 메서드에서 발생한 예외를 적절히 처리하기 위해서는 AsyncUncaughtExceptionHandler 인터페이스를 커스터마이즈하여 예외 처리 로직을 구성할 수 있습니다.


4-1) AsyncUncaughtExceptionHandler 인터페이스

AsyncUncaughtExceptionHandler는 예외 처리 핸들링을 위한 다음과 같은 함수형 인터페이스를 지정하고 있습니다.

@FunctionalInterface
public interface AsyncUncaughtExceptionHandler {
    void handleUncaughtException(Throwable ex, Method method, Object... params);
}

- ex : 예외 객체

- method : 예외가 발생한 메서드

- params: 메서드에 전달된 파라미터들 

 


4-2) 에러 핸들러 구현하기

AsyncUncaughtExceptionHandler를 구현하여 클래스 단위로 비동기 에러 핸들러를 커스터마이즈 하였습니다.

@Slf4j
@Component
public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

    @Override
    public void handleUncaughtException(Throwable exception, Method method, Object... params) {
        log.error("비동기 메서드 에러 : method: {} exception: {}", method.getName(), exception.getMessage());
    }
}

 

FCM 비동기 메서드에서 에러 핸들링 로직은 error 레벨로 로깅하는 것 이상은 구현하지 않았습니다. FCM 서버와 소통 시 발생가능한 에러가 오디 측의 서버에러보다 FCM 서버측 에러일 확률이 높기에 발생 과정에서 에러가 발생하고 있음을 저희 측에서 빠르게 인지할 수만 있으면 된다고 판단했기 때문입니다. 다만 이 경우에는 우리 측 서버에서 실수하여 알림이 발송되지 않은 경우와 FCM 서버 측의 에러로 알림 발송에 실패한 경우를 구분할 수 없기에, 추후 입력값에 대한 검증을 이벤트 발행 이전에 하는 식으로 리팩터링을 이어갈 생각입니다.

 


4-3) Spring에 에러 핸들러 등록하기

 

Spring의 비동기 메서드 설정인 AbstractAsyncConfiguration 클래스를 타고들어가면 AsyncConfigurer 인터페이스를 통해 관련 설정을 지정할 수 있음을 알 수 있습니다.

@Configuration(
    proxyBeanMethods = false
)
public abstract class AbstractAsyncConfiguration implements ImportAware {
    @Nullable
    protected AnnotationAttributes enableAsync;
    @Nullable
    protected Supplier<Executor> executor;
    @Nullable
    protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;

    public AbstractAsyncConfiguration() {
    }

    public void setImportMetadata(AnnotationMetadata importMetadata) {
        this.enableAsync = AnnotationAttributes.fromMap(importMetadata.getAnnotationAttributes(EnableAsync.class.getName()));
        if (this.enableAsync == null) {
            throw new IllegalArgumentException("@EnableAsync is not present on importing class " + importMetadata.getClassName());
        }
    }

    @Autowired
    void setConfigurers(ObjectProvider<AsyncConfigurer> configurers) {
        Supplier<AsyncConfigurer> configurer = SingletonSupplier.of(() -> {
            List<AsyncConfigurer> candidates = configurers.stream().toList();
            if (CollectionUtils.isEmpty(candidates)) {
                return null;
            } else if (candidates.size() > 1) {
                throw new IllegalStateException("Only one AsyncConfigurer may exist");
            } else {
                return (AsyncConfigurer)candidates.get(0);
            }
        });
        this.executor = this.adapt(configurer, AsyncConfigurer::getAsyncExecutor); //설정 정보 등록
        this.exceptionHandler = this.adapt(configurer, AsyncConfigurer::getAsyncUncaughtExceptionHandler); //에러 핸들러 등록
    }

    private <T> Supplier<T> adapt(Supplier<AsyncConfigurer> supplier, Function<AsyncConfigurer, T> provider) {
        return () -> {
            AsyncConfigurer configurer = (AsyncConfigurer)supplier.get();
            return configurer != null ? provider.apply(configurer) : null;
        };
    }
}

 

특히 여기서 에러 핸들러는 AsyncConfigurer의 getAsyncUncaughtExceptionHandler()를 통해 설정되고 있음을 알 수 있습니다. 즉, AsyncConfiguerer를 구현한 config 클래스를 통해 예외 핸들러를 등록할 수 있습니다.

 

@EnableAsync
@Configuration
@RequiredArgsConstructor
public class AsyncConfig implements AsyncConfigurer {

    private final AsyncExceptionHandler asyncExceptionHandler;

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return asyncExceptionHandler; //커스터마이즈한 예외 핸들러 등록
    }
}

 

이제 비동기 작업에서 발생하는 예외를 로깅할 수 있게 되었습니다.


5) AsyncConfig 설정하기

 

 

Spring은 비동기 메서드 처리를 위해 기본으로 SimpleAsyncTaskExecutor를 통해 스레드를 할당합니다. 그러나 이 의 경우 스레드를 재사용하지 않고 매번 요청에 대한 새로운 스레드를 만들어냅니다. 따라서 짧은 시간에 다량의 트래픽을 요청하는 경우, 가용가능한 시스템 자원을 고려하지 않고 무한으로 스레드를 생성하여 시스템 가용자원을 넘은 요청을 수용할 수 있습니다. 따라서 안정적인 서비스 지원을 위해 스레드 풀 설정이 필요합니다.

 

이를 고려하여 Spring Boot는 TaskExecutionAutoConfiguration에서 TaskExecutor에 대한 기본 설정을 ThreadPoolTaskExecutor로 등록하고 있습니다. 즉 SpringBoot 환경에서는 SimpleAsyncTaskExecutor가 아닌 default ThreadPoolTaskExecutor 환경에서 비동기 메서드가 실행됩니다.

//TaskExecutionProperties.Pool

public static class Pool {
        private int queueCapacity = Integer.MAX_VALUE;
        private int coreSize = 8;
        private int maxSize = Integer.MAX_VALUE;
        private boolean allowCoreThreadTimeout = true;
        private Duration keepAlive = Duration.ofSeconds(60L);
        private final Shutdown shutdown = new Shutdown();
}

- 기본 큐 사이즈 : Integer.MAX_VALUE

- core 풀 사이즈 : 8

- 최대 풀 사이즈 : Integer.MAX_VALUE

- keepAlive(corePoolsize보다 더 많은 thread를 정리하기 위해 대기하는 시간) : 60

 

즉, 8개의 coreThread가 모두 사용중이면 무한으로 작업을 대기하도록 큐에 쌓아놓는 규칙을 설정하고 있습니다. 따라서 Thread Pooling 테스트를 통해 각 서비스에 적절한 풀 값을 찾아 설정해주는 작업이 필요합니다.

 

스레드 풀 설정은 부하 테스트를 통해 추후 최적화하기로 하였습니다. 우선적으로는 core의 개수만큼 corePoolsize를 설정하여 병목을 최대한 방지하고 포화정책을 CallerRunsPolicy로 지정하는 것에 만족하기로 하였습니다.

@Configuration
@RequiredArgsConstructor
public class AsyncConfig implements AsyncConfigurer {

    private final AsyncExceptionHandler asyncExceptionHandler;

    @Override
    @Bean(name = "fcmAsyncExecutor")
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //스레드풀이 가득찼을 경우 톰캣 스레드에서 처리
        int coreCount = Runtime.getRuntime().availableProcessors();
        executor.setCorePoolSize(coreCount);
        executor.setThreadNamePrefix("ody-fcm");
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return asyncExceptionHandler;
    }
}

 

이후 corePoolsize를 조정하며 부하테스트를 통해 트래픽에 따른 적정 core pool size를 찾아볼 예정입니다.


 

Result : 결합도 하락과 성능 개선

 

목표로 삼았던 Task를 다시 봅시다.

- 비즈니스 로직과 FCM 서버 소통 로직 간의 결합도 낮추기
- 알림 발송 로직의 성능 개선하기 

 

아직 Thread Pool 커스터마이징과 같은 세부 조정이 필요하지만 이번 리팩터링은 위의 Task에 대한 소기의 목적은 달성할 수 있었습니다.

 

1) FCM 알림 로직과 비즈니스 로직 간의 결합도 감소

FCM 서버와 소통하는 알림 로직과 주요 비즈니스 로직 간의 결합도를 낮출 수 있었습니다. 이는 서브 로직으로서 알림의 기능을 유지함과 동시에 알림 서비스의 장애 여파가 다른 서비스로 전파되지 않는 환경이 조성되었음을 의미합니다. 이로써 알림을 발송되지 않아 약속에 참여가 되지 않는 등의 상황을 예방할 수 있게 되었습니다.

 

한 가지 예를 본다면 공지 메시지를 보내는 경우, 이벤트를 발행하는 것 이상으로 로직이 얽혀있지 않습니다.

public void scheduleNotice(GroupMessage groupMessage, LocalDateTime noticeTime) {
    Instant startTime = InstantConverter.kstToInstant(noticeTime);
    NoticeEvent noticeEvent = new NoticeEvent(this, groupMessage);
    taskScheduler.schedule(() -> fcmEventPublisher.publish(noticeEvent), startTime);
}

 

이는  NotificationService에서의 관심사를 이벤트 발행 단계로 한정함과 동시에 이벤트 수신 이후의 로직의 변경이 더이상 발행을 담당하는 상위 모듈로 전파되지 않음을 의미합니다.

 

2) 성능 개선

로컬 환경에서 앞선 시나리오와 동일하게 10회 요청에 대한 평균 latency를 측정해보았습니다.

 

결과는 다음과 같습니다.

상황 local latency(before) local latency (after)
재촉하기 302.4 76.7 (75%감소)
방 나가기 1055.2 110.2 (89.5% 감소)
약속 참여하기 1416.8 596.5 (57% 감소)

 

성능적으로도 유의한 감소를 체감할 수 있었습니다.

 

FCM 알림 로직 비동기 + 이벤트 리스닝 방식으로 리팩터링 일지 1편은 여기까지 입니다.

다음 포스팅에서는 CountDownLatch와 ApplicationEvents를 통해 비동기 + 이벤트 리스닝 방식의 알림 로직을 어떻게 테스트했는지 소개해보고자 합니다.

 

그럼 오늘도 행복하세요 :)