[SpringBoot] RabbitMQ 를 이용해 간단한 비동기 이벤트 적용해보기

728x90

안녕하세요👋

오늘은 SpringBoot 환경에서 RabbitMQ 를 사용해서 비동기 이벤트 처리를 해보겠습니다.

 

참고로 오늘은 이벤트 발행자 와 수신자 딱 두가지 기능을 구현해볼 예정입니다.

 

메시지를 보내는 발행자 : Producer(=issuer)

메시지를 받는 수신자 : Recevier(=consumer)

 

위 두가지 기능 구현을 위주로 이야기를 해보겠습니다.

 

https://velog.io/@holicme7/Apache-Kafka-%EC%B9%B4%ED%94%84%EC%B9%B4%EB%9E%80-%EB%AC%B4%EC%97%87%EC%9D%B8%EA%B0%80

 

서론

비동기 처리를 알아보기 전에 간단한 개념설명을 해보겠습니다.

 

RabbitMQ 는 무엇일까요?

AMQP 를 구현한 오픈소스 메세지 브로커 이다.

위 라이브러리를 사용하는 이유는 비동기 작업이 필요할 때 즉 많은 작업이 요청되어 처리를 해야 할 때 사용합니다.

 

❓그럼 AMQP 는 무엇일까요

Advanced Message Queueing Protocol의 줄임말로 MQ의 오픈소스에 기반한 표준 프로토콜을 의미한다. AMQP는 마지막 P(rotocol)에서 보는 것과 같이 프로토콜을 의미하기 때문에 이 것을 사용한 가장 유명한 소프트웨어는 RabbitMQ라 볼 수 있다.

 

 

해당 그림은 동기이다.

 

 Sync(동기) <-> Async(비동기) 

 

동기는 1요청 - 1응답을 기본으로 한다. 즉 요청을 하고 응답이 오기전 까지 끊어지지 않는다

 

위 문제의 단점이 무엇일까? 

 

위에서 말했듯이 여러 사람이 요청을 했을 때 하나 하나 응답을 다 해줄 때까지 뒤에서 기다려야 하니

속도가 느리다는 단점이 있다. 

 

음. 그러면 CPU 할당을 어떻게 하는 걸까? 여기서는 SJF? 

동기 비동기 방식에서 CPU 할당은 어떻게 받는건지 찾아보기. ++ 추후 내용 추가하겠습니다(24.05.20)

 

 

위 아키텍쳐는 비동기 통신에서 아키텍쳐 이다.

위처럼 시간이 오래걸리는 구간즉 병목현상이 일어나는 부분을 비동기로 처리를 시켜야 한다.

 

그 비동기로 처리를 시키는 방법이 여러가지가 있지만

나는 메세지 큐를 사용하여 처리 해보자고 한다.

메세지 큐의 종류에는

1) Kafka

2) RabbitMQ

 

요즘에는 대중적으로 Apache Kafka 를 많이 사용하고는 한다. 

 

본론

docker-compose.yml

version: '3.7'
services:
  rabbitmq:
    image: rabbitmq:latest
    ports:
      - "5672:5672" # rabbit amqp port 5672(rabbitmq):5672(docker port)
      - "15672:15672" # manage port
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin123!@

# Docker 터미널안에서 rabbitmq-plugins enable rabbitmq_management -> rabbitmq 대쉬보드 보기 (Management 활성화 명령어) -> 관리자 모드

 

나중에 rabbitMQ admin 에 접속하기 위해서 위 설정이 필요하고

admin 접속 주소는 http://localhost:15672 을 검색하면 들어가집니다.

 

그리고 username 과 password 는 도커 컴포즈 파일에서 설정한 내용을 입력해야 합니다.

 

gradle 의존성 추가

// RabbitMQ
implementation 'org.springframework.boot:spring-boot-starter-amqp'

 

 

application.yml

spring:
  rabbitmq: # connectionFactory 관리
    host: localhost
    port: 5672
    username: admin
    password: admin123!@

 

RabbitMqConfig

package org.example.api.config.rabbitmq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.fasterxml.jackson.databind.ObjectMapper;

@Configuration
public class RabbitMqConfig {

	// ROLE: exchange bean 으로 생성
	@Bean
	public DirectExchange directExchange() {
		return new DirectExchange("delivery.exchange");
	}

	// ROLE: 메세지 발행 QUEUE 생성
	@Bean
	public Queue queue() {
		return new Queue("delivery.queue");
	}

	// 스프링에서 파라미터 값에 특별한 어노테이션 없어도 객체라면은
	// 빈 스코프 -> (어플리케이션 컨텍스트 안에서 그 객체를 찾아서 넣어준다)
	// ROLE: exchange 랑 queue 에 바인딩 될 수 있도록 설정
	@Bean
	public Binding binding(DirectExchange directExchange, Queue queue) {
		return BindingBuilder.bind(queue).to(directExchange).with("delivery.key");
	}

	// ROLE: Producer -> Exchange 로 데이터를 보낼 떄 설정, 프로토콜: http 로 고정
	@Bean
	public RabbitTemplate rabbitTemplate(
		ConnectionFactory connectionFactory,
		MessageConverter messageConverter) {
		RabbitTemplate rabbitTemplate = new RabbitTemplate((connectionFactory));
		rabbitTemplate.setMessageConverter(messageConverter);
		return rabbitTemplate;
	}

	// ROLE: object -> JSON -> Object 바꿔주는 역할
	@Bean
	public MessageConverter messageConverter(ObjectMapper objectMapper) {
		return new Jackson2JsonMessageConverter(objectMapper);
	}

	// Role: ConnectionFactory 관리는 application.yml 파일에서 함.

}

 

RabbitMQ 설정은 위 코드에서 주석으로 설명을 해두었습니다.

(위 Config 는 Producer 쪽 Config 이고 이따가 Consumer 쪽은 다른 Config 가 있습니다)

 

다음으로는 Producer 에서 이벤트를 발행하는 간단한 코드를 보겠습니다.

package org.example.api.common.rabbitmq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
@Component
public class Producer {
	private final RabbitTemplate rabbitTemplate;

	// Role: Producer 에서 이벤트를 어디로 발생시킬지 로직
	public void producer(String exchange, String routingKey, Object data) {
		rabbitTemplate.convertAndSend(exchange, routingKey, data);
	}
}

 

위 코드 까지 정상적으로 왔으면, 이제 간단하게 Controller 를 만들어서 API 가 동작하는지 테스트를 해보겠습니다.

 

package org.example.api.config;

import org.example.api.common.rabbitmq.Producer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequestMapping("/open-api")
@RequiredArgsConstructor
@RestController
public class HealthOpenApiController {
	private final Producer producer;

	@GetMapping("/health")
	public void health() {
		log.info("health call");
		producer.producer("delivery.exchange","delivery.key","hello");
	}
}

 

위 코드로 API 테스트를 돌려보면 정상적으로 RabbitMQ 가 작동하는걸 확인할 수 있습니다. 

 

그리고 rabbitMQ admin 에 접속하고 로그인을 마치면

 

exchangequeue 에 내가 설정해둔 값이 한개씩 추가가 되있는걸 확인할 수 있습니다.

아 참고로 admin 에서는 코드 없이, queueexchange 를 추가할 수 있습니다.

그리고, 메시지 내용을 추가,수정,삭제 또한 가능하며 자유롭게 조작을 할 수 있습니다. 

큐가 설정해두었던 것 처럼 exchange바인딩이 잘되있는걸 확인할 수 있습니다.

 

이제 비즈니스 로직을 짜보도록 하겠습니다.

 

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

@NoArgsConstructor
@AllArgsConstructor
@Builder
@Getter
public class UserOrderMessage {
	// API 서버와 가맹점 서버가 공유할 같은 객체
	// API 서버도 이 객체 참조하고, 가맹점 서버도 이 객체를 참조한다.
    // 같은 객체 참조안한다면, 특정 메세지 표준을 만들어서 공유해야 한다.
	private Long userOrderId;
}

 

 

실제로 메세지를 요청할 클래스 입니다.

import org.example.api.common.rabbitmq.Producer;
import org.example.db.userorder.UserOrderEntity;
import org.pre.common.message.model.UserOrderMessage;
import org.springframework.stereotype.Service;

import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
@Service
public class UserOrderProducer {
	private final Producer producer;

	// exchange 를 미리 설정해 둔다.
	private static final String EXCHANGE = "delivery.exchange";

	// route_key 또한 미리 설정을 해둔다.
	private static final String ROUTE_KEY = "delivery.key";

	// 주문을 보낸다
	public void sendOrder(UserOrderEntity userOrderEntity) {
		this.sendOrder(userOrderEntity.getId()); // 이 메소드를 통해 아래 메소드를 호출한다.
	}

	public void sendOrder(Long userOrderId) {
		var message = UserOrderMessage.builder()
			.userOrderId(userOrderId)
			.build();
		// 파라미터에 알맞는 값을 대입하여 메세지를 보낸다.
		producer.producer(EXCHANGE, ROUTE_KEY, message);
	}
}

 

이 코드는 실제 주문 요청을 할 비즈니스 로직 입니다.

	// 1) 사용자, 메뉴 id
	// 2) userOrder 생성
	// 3) userOrderMenu 생성
	// 4) 응답을 생성한다.
	@Transactional
	public UserOrderResponse userOrder (User user, UserOrderRequest body) {
		var storeMenuList = body.getStoreMenuIdList()
			.stream()
			.map(storeMenuService::getStoreMenuWithThrow)
			.toList();
		UserOrderEntity userOrderEntity = userOrderConverter.toEntity(user, storeMenuList);
		// 주문
		var newUserOrderEntity = userOrderService.order(userOrderEntity);

		// 맵핑
		var userOrderMenuEntityList = storeMenuList.stream()
			.map(it -> {
				// menu + userOrder
				UserOrderMenuEntity entity = userOrderMenuConverter.toEntity(newUserOrderEntity, it);
				return entity;
			})
			.toList();

		// '주문' 내역 기록 남기기
		userOrderMenuEntityList.forEach(userOrderMenuService::order);

		// 비동기로 가맹점에게 '주문' 알리기
		userOrderProducer.sendOrder(newUserOrderEntity);

		// response
		return userOrderConverter.toResponse(newUserOrderEntity);
	}

 

마지막 로직 쯤에 비동기로 가맹점에게 '주문' 알리기 를 주목해야 한다.

 

여기 까지가 이제 Producer -> Exchange 까지 즉 로직이 였습니다.

 

이제 요청하는 쪽도 봤는데, 반대로 수신하는쪽것은 어떤것지 궁금하시지 않나요?

 

이제 바로 보여드리겠습니다.

 

Consumer 는 구현하기 비교적 EZ 합니다.

왜냐하면 Producer , Exchange, Queue 를 미리 다 구현해 두었기 때문입니다.

제공하는 쪽은 뭐가 조금 있었지만, 수신하는 쪽은 별거 없이 미리 만들어 둔것에 연결만 하면 끝입니다.

 

 

 

아키텍쳐를 간단하게 설명을 하면은

1) User 가 Producer 에게 요청을 한다.

2) Producer -> Exchange 로간다 [여기가 바로 비동기 응답이다, 요청을 보내면 바로 User에게 응답을 준다]

3) Exchange 는 뒤에 가맹점 서버로 이제 알아서 일을 시킨다. 

 

이제 로직을 보겠습니다.

 

저는 현 프로젝트가 멀티 모듈로 구성되어 있기에 config 를 모듈마다 따로따로 설정해줘야 했습니다.

 

Consumer 쪽

- RabbitMqConfig.java

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.fasterxml.jackson.databind.ObjectMapper;

@Configuration
public class RabbitMqConfig {

	// consumer 쪽 설정
	@Bean
	public MessageConverter messageConverter(ObjectMapper objectMapper) {
		return new Jackson2JsonMessageConverter(objectMapper);
	}
}

 

다음 코드는 이제 Consumer - Producer 사이를 연결해줄 코드 입니다.

import org.pre.common.message.model.UserOrderMessage;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
@Component
public class UserOrderConsumer {

	// Consumer 는 queue,exchange 가 만들어져 있기 때문에 연결만 해준다.
	@RabbitListener(queues = "delivery.queue") // 미리 설정해둔 Queue 로 받아온다고 설정
	public void userOrderConsumer(UserOrderMessage userOrderMessage) {
		log.info("Message Queue : UserMessage[{}]",userOrderMessage.getUserOrderId());
	}
}

 

이제 서버를 실행시키고, api 테스트를 진행하면은 그에 맞는 결과가 응답을 해준다.

만약 에 주문 API 를 실행해서 요청을 하면은 요청번호가 만약 10번이라면

Message Queue : UserMessage[10] 이 출력이 될 것이다.

 

즉 최종 프로세스를 읊어보면은

1) Producer(사용자) 주문 -> Exchange 로 요청

2) Exchange -> 주문 요청에 대한 응답값(ex..주문승인, 주문거절 등)  을 비동기로 중간에 내려줌 

-> 미리 설정해둔 라우티킹에 의해서 queue 로 전달이 된다.

3) consumer 에서 queue 를 구독해서 데이터를 꺼내서 온다. 

 

 

결론

비동기는 어렵다 잘 알고 사용하자.

 

참고: RabbitMq 보다는 요새 Kafka 가 대세다

 

Ref : 패스트캠퍼스 시그니처 백엔드 Path 초격차 패키지 Online Course4. Ch08
Ref : https://velog.io/@holicme7/Apache-Kafka-%EC%B9%B4%ED%94%84%EC%B9%B4%EB%9E%80-%EB%AC%B4%EC%97%87%EC%9D%B8%EA%B0%80

 

728x90