비동기 통신에 오래전부터 많이 활용되는 RabbitMQ를 SpringBoot에서 활용하는 방법을 알아본다. 간단한 Simple Queue 방식, Pub/Sub 구현 방식 이렇게 2가지를 정리해보자.
RabbitMQ
RabbitMQ에 대해 간단히 정리하면 AMQP를 따르는 오픈소스 메세지 브로커이다.
(AMQP: Advanced Message Queuing Protocol, Application과 Middleware Broker와의 메세지를 주고 받기 위한 프로토콜)
RabbitMQ에서 주요한 개념은 아래와 같다.
1. Producer: 메시지 발행 주체
2. Consumer: 메시지 수신자
3. Queue: 메시지가 Consumer에 의해 소비되기 전까지 보관되는 장소
4. Exchange: Producer의 메시지들을 어떤 Queue로 발송할지 결정, 일종의 Router개념으로 4가지 타입이 있음
5. Binding: 메시지 Routing 규칙을 설정(특정 조건에 맞는 메시지를 특정 큐에 전송)
Exchange Type
타입 | 설명 | 전송 방식 |
Direct | Routing Key가 정확하게 일치하는 Queue에게 메시지 전송 | Unicat |
Topic | Routing Key 패턴이 일치하는 Queue에게 메시지 전송 | Multicast |
Headers | [key:value] 구성의 header 값을 기준으로 일치하는 Queue에 메시지 전송 | Multicast |
Fanout | 해당 Exchange에 등록된 모든 Queue에 메시지 전송 | Broadcast |
SpringBoot RabbitMQ 설정
먼저 로컬 개발환경을 위해 docker로 rabbitmq를 실행한다.
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin rabbitmq:management
build.gradle파일에 의존성 설정을 한다.
implementation 'org.springframework.boot:spring-boot-starter-amqp:2.5.4'
application.yml 파일에 rabbitmq 정보를 설정한다.
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
RabbitMQ Config 파일을 작성한다.
@Configuration
public class RabbitmqConfig {
private static final String TOPIC_EXCHANGE_NAME = "spring-boot-exchange";
private static final String QUEUE_NAME = "spring-boot";
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.port}")
private int port;
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME, false);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE_NAME); // Topic Exchange 타입
}
@Bean
public Binding binding(TopicExchange topicExchange, Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange).with("hello.key.#");
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
1. Simple Queue
아래와 같이 Publisher -> Exchange -> Queue -> Consumer 구성의 간단한 샘플이다.
테스트를 위한 간단한 도메인 객체를 작성해준다.
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
@ToString
public class RabbitMessage {
private String id;
private String fName;
private String lName;
}
메소드가 호출되면 rabbitmq로 메시지를 전송하는 Publisher 서비스를 작성한다.
@Service
@RequiredArgsConstructor
public class RabbitPublisher {
private final RabbitTemplate rabbitTemplate;
private final TopicExchange topicExchange;
public void sendMessage(RabbitMessage message){
rabbitTemplate.convertAndSend(topicExchange.getName(), "hello.key.1", message);
}
}
Queue로부터 메시지를 수신하고 처리하는 Consumer 서비스를 작성한다.
@Service
@AllArgsConstructor
@Slf4j
public class RabbitConsumer {
@RabbitListener(queues = "#{queue.name}")
public void consume(RabbitMessage message){
log.info("{}", message);
}
}
테스트용 Controller를 작성하여 api호출을 하면 메시지가 발송되도록 해본다
@RestController
@RequestMapping("rabbit")
@AllArgsConstructor
public class RabbitmqController {
private final RabbitPublisher rabbitPublisher;
/**
* Simple Queue 테스트(Exchange 활용)
*/
@GetMapping("/send")
public void sendMessage() {
RabbitMessage rabbitMessage = RabbitMessage.builder().id("1").fName("First Name").lName("Last Name").build();
IntStream.range(0, 100).forEachOrdered(n -> {
rabbitMessage.setId(String.valueOf(n));
rabbitPublisher.sendMessage(rabbitMessage);
});
}
}
이제 테스트만 해보면 된다. Controller에 작성한 url(localhost:8081/rabbit/send)로 호출해보면 Consumer에서 메시지가 수신되어 로그가 찍히는 것을 확인할 수 있다.
만약 여기에서 동일한 어플리케이션을 하나 더 실행해서 Consumer가 2개가 되면 1개의 Queue에 2개의 Consumer가 연결되어 있음으로 Consumer1은 id=1, id=3, id=5 ..., Consumer2는 id=2, id=4, .. 와 같이 Queue의 메시지를 서로 나누어 가질것이다.
기본적으로 Queue에서 한번 소비된 메시지는 큐에서 삭제된다.
참고로 http://localhost:15672/(admin/admin) 로 RabbitMQ UI에 접속하여 보면서 테스트도 가능하다.
2. Pub/Sub
RabbitMQ를 이용하여 pub/sub 구성은 아래와 같은 구성이 될 것이다. fanout Exchange 타입을 사용하여 Binding 되어 있는 모든 Queue에게 동일한 메시지를 전송하는 방식으로 Publisher는 exchange에 메시지를 전송하고, 각 Subscriber는 자신의 Queue에서 메시지를 수신하여 처리한다.
샘플코드는 위의 1번에서 작성한 코드에 이어서 작성할 예정이다.
1번에서 작성한 RabbitmqConfig파일에 아래 내용을 추가한다.
@Configuration
public class RabbitmqConfig {
.....
private static final String FANOUT_EXCHANGE_NAME = "pubsub-exchange";
private static final String QUEUE_NAME_SUB1 = "sub1";
private static final String QUEUE_NAME_SUB2 = "sub2";
// Subscriber용 큐 2개 생성
@Bean
public Queue subQueue1() {
return new Queue(QUEUE_NAME_SUB1, false);
}
@Bean
public Queue subQueue2() {
return new Queue(QUEUE_NAME_SUB2, false);
}
// FanoutExchange 생성
@Bean
public FanoutExchange pubsubExchange() {
return new FanoutExchange(FANOUT_EXCHANGE_NAME);
}
// 각 큐에 binding 설정
@Bean
public Binding pubsubBinding1(FanoutExchange pubsubExchange, Queue subQueue1) {
return BindingBuilder.bind(subQueue1).to(pubsubExchange);
}
@Bean
public Binding pubsubBinding2(FanoutExchange pubsubExchange, Queue subQueue2) {
return BindingBuilder.bind(subQueue2).to(pubsubExchange);
}
}
1번에서 작성한 Publisher서비스에 아래 내용을 추가한다.
@Service
@RequiredArgsConstructor
public class RabbitPublisher {
private final RabbitTemplate rabbitTemplate;
private final FanoutExchange fanoutExchange;
....
public void pubsubMessage(RabbitMessage message){
rabbitTemplate.convertAndSend(fanoutExchange.getName(), "", message);
}
}
1번에서 작성한 Consumer서비스에 아래 내용을 추가 작성한다.
@Service
@AllArgsConstructor
@Slf4j
public class RabbitConsumer {
....
/**
* PubSub Consumer 1
*/
@RabbitListener(queues = "#{subQueue1.name}")
public void consumeSub1(RabbitMessage message){
log.info("[consumeSub1]: {}", message);
}
/**
* PubSub Consumer 2
*/
@RabbitListener(queues = "#{subQueue2.name}")
public void consumeSub2(RabbitMessage message){
log.info("[consumeSub2]: {}", message);
}
}
1번에서 작성한 Controller에 Pub/Sub 테스트 메소드를 추가한다.
@RestController
@RequestMapping("rabbit")
@AllArgsConstructor
public class RabbitmqController {
private final RabbitPublisher rabbitPublisher;
.....
/**
* PubSub 테스트용
*/
@PostMapping("/pubsub")
public void pubsubMessage() {
RabbitMessage rabbitMessage = RabbitMessage.builder().id("1").fName("First Name").lName("Last Name").build();
IntStream.range(0, 100).forEachOrdered(n -> {
rabbitMessage.setId(String.valueOf(n));
rabbitPublisher.pubsubMessage(rabbitMessage);
});
}
}
테스트를 해보면 2개의 Consumer가 동일한 메시지를 수신한 것을 확인할 수 있다.
추가적인 기능들에 대해서는 나중에 또 정리를 해봐야겠다
'개발 > SpringBoot' 카테고리의 다른 글
Kafka 구성 및 SpringBoot 연동 (0) | 2023.03.13 |
---|---|
SpringBoot3 - GraalVM Native Image Support (0) | 2023.02.01 |
SpringBoot Redis 활용 - Pub/Sub (0) | 2022.12.16 |
SpringBoot Redis 활용 - RedisTemplate, RedisRepository (0) | 2022.12.16 |
SpringBoot Redis 세션 사용 (0) | 2022.09.06 |
댓글