들어가며
오늘은 SpringBoot 환경에서 RabbitMQ 를 사용해서 비동기 이벤트 처리를 해보겠습니다.
참고로 오늘은 이벤트 발행자 와 수신자 딱 두가지 기능을 구현해볼 예정입니다.
메시지를 보내는 발행자 : Producer(=issuer)
메시지를 받는 수신자 : Recevier(=consumer)
위 두가지 기능 구현을 위주로 이야기를 해보겠습니다.
서론
비동기 처리를 알아보기 전에 간단한 개념설명을 해보겠습니다.
RabbitMQ 는 무엇일까요?
AMQP 를 구현한 오픈소스 메세지 브로커 이다.
위 라이브러리를 사용하는 이유는 비동기 작업이 필요할 때 즉 많은 작업이 요청되어 처리를 해야 할 때 사용합니다.
❓그럼 AMQP 는 무엇일까요
Advanced Message Queueing Protocol의 줄임말로 MQ의 오픈소스에 기반한 표준 프로토콜을 의미한다. AMQP는 마지막 P(rotocol)에서 보는 것과 같이 프로토콜을 의미하기 때문에 이 것을 사용한 가장 유명한 소프트웨어는 RabbitMQ라 볼 수 있다.
해당 그림은 동기이다.
Sync(동기) <-> Async(비동기)
동기는 1요청 - 1응답을 기본으로 한다. 즉 요청을 하고 응답이 오기전 까지 끊어지지 않는다
위 문제의 단점이 무엇일까?
위에서 말했듯이 여러 사람이 요청을 했을 때 하나 하나 응답을 다 해줄 때까지 뒤에서 기다려야 하니
속도가 느리다는 단점이 있다.
음. 그러면 CPU 할당을 어떻게 하는 걸까? 여기서는 SJF?
동기 비동기 방식에서 CPU 할당은 어떻게 받는건지 찾아보기. ++ 추후 내용 추가하겠습니다(24.05.20)
위 아키텍쳐는 비동기 통신에서 아키텍쳐 이다.
위처럼 시간이 오래걸리는 구간즉 병목현상이 일어나는 부분을 비동기로 처리를 시켜야 한다.
그 비동기로 처리를 시키는 방법이 여러가지가 있지만
나는 메세지 큐를 사용하여 처리 해보자고 한다.
메세지 큐의 종류에는
1) Kafka
2) RabbitMQ
요즘에는 대중적으로 Apache Kafka 를 많이 사용하고는 한다.
본론
docker-compose.yml
version: '3.7'
services:
rabbitmq:
image: rabbitmq:latest
ports:
- "5672:5672" # rabbit amqp port 5672(rabbitmq):5672(docker port)
- "15672:15672" # manage port
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin123!@
# Docker 터미널안에서 rabbitmq-plugins enable rabbitmq_management -> rabbitmq 대쉬보드 보기 (Management 활성화 명령어) -> 관리자 모드
나중에 rabbitMQ admin 에 접속하기 위해서 위 설정이 필요하고
admin 접속 주소는 http://localhost:15672 을 검색하면 들어가집니다.
그리고 username 과 password 는 도커 컴포즈 파일에서 설정한 내용을 입력해야 합니다.
gradle 의존성 추가
// RabbitMQ
implementation 'org.springframework.boot:spring-boot-starter-amqp'
application.yml
spring:
rabbitmq: # connectionFactory 관리
host: localhost
port: 5672
username: admin
password: admin123!@
RabbitMqConfig
package org.example.api.config.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.fasterxml.jackson.databind.ObjectMapper;
@Configuration
public class RabbitMqConfig {
// ROLE: exchange bean 으로 생성
@Bean
public DirectExchange directExchange() {
return new DirectExchange("delivery.exchange");
}
// ROLE: 메세지 발행 QUEUE 생성
@Bean
public Queue queue() {
return new Queue("delivery.queue");
}
// 스프링에서 파라미터 값에 특별한 어노테이션 없어도 객체라면은
// 빈 스코프 -> (어플리케이션 컨텍스트 안에서 그 객체를 찾아서 넣어준다)
// ROLE: exchange 랑 queue 에 바인딩 될 수 있도록 설정
@Bean
public Binding binding(DirectExchange directExchange, Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("delivery.key");
}
// ROLE: Producer -> Exchange 로 데이터를 보낼 떄 설정, 프로토콜: http 로 고정
@Bean
public RabbitTemplate rabbitTemplate(
ConnectionFactory connectionFactory,
MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate((connectionFactory));
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
// ROLE: object -> JSON -> Object 바꿔주는 역할
@Bean
public MessageConverter messageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
// Role: ConnectionFactory 관리는 application.yml 파일에서 함.
}
RabbitMQ 설정은 위 코드에서 주석으로 설명을 해두었습니다.
(위 Config 는 Producer 쪽 Config 이고 이따가 Consumer 쪽은 다른 Config 가 있습니다)
다음으로는 Producer 에서 이벤트를 발행하는 간단한 코드를 보겠습니다.
package org.example.api.common.rabbitmq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
@Component
public class Producer {
private final RabbitTemplate rabbitTemplate;
// Role: Producer 에서 이벤트를 어디로 발생시킬지 로직
public void producer(String exchange, String routingKey, Object data) {
rabbitTemplate.convertAndSend(exchange, routingKey, data);
}
}
위 코드 까지 정상적으로 왔으면, 이제 간단하게 Controller 를 만들어서 API 가 동작하는지 테스트를 해보겠습니다.
package org.example.api.config;
import org.example.api.common.rabbitmq.Producer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequestMapping("/open-api")
@RequiredArgsConstructor
@RestController
public class HealthOpenApiController {
private final Producer producer;
@GetMapping("/health")
public void health() {
log.info("health call");
producer.producer("delivery.exchange","delivery.key","hello");
}
}
위 코드로 API 테스트를 돌려보면 정상적으로 RabbitMQ 가 작동하는걸 확인할 수 있습니다.
그리고 rabbitMQ admin 에 접속하고 로그인을 마치면
exchange 랑 queue 에 내가 설정해둔 값이 한개씩 추가가 되있는걸 확인할 수 있습니다.
아 참고로 admin 에서는 코드 없이, queue 및 exchange 를 추가할 수 있습니다.
그리고, 메시지 내용을 추가,수정,삭제 또한 가능하며 자유롭게 조작을 할 수 있습니다.
큐가 설정해두었던 것 처럼 exchange 와 바인딩이 잘되있는걸 확인할 수 있습니다.
이제 비즈니스 로직을 짜보도록 하겠습니다.
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Getter
public class UserOrderMessage {
// API 서버와 가맹점 서버가 공유할 같은 객체
// API 서버도 이 객체 참조하고, 가맹점 서버도 이 객체를 참조한다.
// 같은 객체 참조안한다면, 특정 메세지 표준을 만들어서 공유해야 한다.
private Long userOrderId;
}
실제로 메세지를 요청할 클래스 입니다.
import org.example.api.common.rabbitmq.Producer;
import org.example.db.userorder.UserOrderEntity;
import org.pre.common.message.model.UserOrderMessage;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
@Service
public class UserOrderProducer {
private final Producer producer;
// exchange 를 미리 설정해 둔다.
private static final String EXCHANGE = "delivery.exchange";
// route_key 또한 미리 설정을 해둔다.
private static final String ROUTE_KEY = "delivery.key";
// 주문을 보낸다
public void sendOrder(UserOrderEntity userOrderEntity) {
this.sendOrder(userOrderEntity.getId()); // 이 메소드를 통해 아래 메소드를 호출한다.
}
public void sendOrder(Long userOrderId) {
var message = UserOrderMessage.builder()
.userOrderId(userOrderId)
.build();
// 파라미터에 알맞는 값을 대입하여 메세지를 보낸다.
producer.producer(EXCHANGE, ROUTE_KEY, message);
}
}
이 코드는 실제 주문 요청을 할 비즈니스 로직 입니다.
// 1) 사용자, 메뉴 id
// 2) userOrder 생성
// 3) userOrderMenu 생성
// 4) 응답을 생성한다.
@Transactional
public UserOrderResponse userOrder (User user, UserOrderRequest body) {
var storeMenuList = body.getStoreMenuIdList()
.stream()
.map(storeMenuService::getStoreMenuWithThrow)
.toList();
UserOrderEntity userOrderEntity = userOrderConverter.toEntity(user, storeMenuList);
// 주문
var newUserOrderEntity = userOrderService.order(userOrderEntity);
// 맵핑
var userOrderMenuEntityList = storeMenuList.stream()
.map(it -> {
// menu + userOrder
UserOrderMenuEntity entity = userOrderMenuConverter.toEntity(newUserOrderEntity, it);
return entity;
})
.toList();
// '주문' 내역 기록 남기기
userOrderMenuEntityList.forEach(userOrderMenuService::order);
// 비동기로 가맹점에게 '주문' 알리기
userOrderProducer.sendOrder(newUserOrderEntity);
// response
return userOrderConverter.toResponse(newUserOrderEntity);
}
마지막 로직 쯤에 비동기로 가맹점에게 '주문' 알리기 를 주목해야 한다.
여기 까지가 이제 Producer -> Exchange 까지 즉 로직이 였습니다.
이제 요청하는 쪽도 봤는데, 반대로 수신하는쪽것은 어떤것지 궁금하시지 않나요?
이제 바로 보여드리겠습니다.
Consumer 는 구현하기 비교적 EZ 합니다.
왜냐하면 Producer , Exchange, Queue 를 미리 다 구현해 두었기 때문입니다.
제공하는 쪽은 뭐가 조금 있었지만, 수신하는 쪽은 별거 없이 미리 만들어 둔것에 연결만 하면 끝입니다.
아키텍쳐를 간단하게 설명을 하면은
1) User 가 Producer 에게 요청을 한다.
2) Producer -> Exchange 로간다 [여기가 바로 비동기 응답이다, 요청을 보내면 바로 User에게 응답을 준다]
3) Exchange 는 뒤에 가맹점 서버로 이제 알아서 일을 시킨다.
이제 로직을 보겠습니다.
저는 현 프로젝트가 멀티 모듈로 구성되어 있기에 config 를 모듈마다 따로따로 설정해줘야 했습니다.
Consumer 쪽
- RabbitMqConfig.java
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.fasterxml.jackson.databind.ObjectMapper;
@Configuration
public class RabbitMqConfig {
// consumer 쪽 설정
@Bean
public MessageConverter messageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
}
다음 코드는 이제 Consumer - Producer 사이를 연결해줄 코드 입니다.
import org.pre.common.message.model.UserOrderMessage;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
@Component
public class UserOrderConsumer {
// Consumer 는 queue,exchange 가 만들어져 있기 때문에 연결만 해준다.
@RabbitListener(queues = "delivery.queue") // 미리 설정해둔 Queue 로 받아온다고 설정
public void userOrderConsumer(UserOrderMessage userOrderMessage) {
log.info("Message Queue : UserMessage[{}]",userOrderMessage.getUserOrderId());
}
}
이제 서버를 실행시키고, api 테스트를 진행하면은 그에 맞는 결과가 응답을 해준다.
만약 에 주문 API 를 실행해서 요청을 하면은 요청번호가 만약 10번이라면
Message Queue : UserMessage[10] 이 출력이 될 것이다.
즉 최종 프로세스를 읊어보면은
1) Producer(사용자) 주문 -> Exchange 로 요청
2) Exchange -> 주문 요청에 대한 응답값(ex..주문승인, 주문거절 등) 을 비동기로 중간에 내려줌
-> 미리 설정해둔 라우티킹에 의해서 queue 로 전달이 된다.
3) consumer 에서 queue 를 구독해서 데이터를 꺼내서 온다.
결론
비동기는 어렵다 잘 알고 사용하자. 공부할게 많으니 공부를 하자...
참고: RabbitMq 보다는 요새 Kafka 가 대세다
Ref : 패스트캠퍼스 시그니처 백엔드 Path 초격차 패키지 Online Course4. Ch08
Ref : https://velog.io/@holicme7/Apache-Kafka-%EC%B9%B4%ED%94%84%EC%B9%B4%EB%9E%80-%EB%AC%B4%EC%97%87%EC%9D%B8%EA%B0%80