본문 바로가기
개발/SpringBoot

SpringBoot RabbitMQ 활용(Simple Queue, Pub/Sub)

by 궁즉변 변즉통 통즉구 2022. 12. 23.
반응형

비동기 통신에 오래전부터 많이 활용되는 RabbitMQ를 SpringBoot에서 활용하는 방법을 알아본다. 간단한 Simple Queue 방식, Pub/Sub 구현 방식 이렇게 2가지를 정리해보자.

 

RabbitMQ

RabbitMQ에 대해 간단히 정리하면 AMQP를 따르는 오픈소스 메세지 브로커이다.
(AMQP: Advanced Message Queuing Protocol, Application과 Middleware Broker와의 메세지를 주고 받기 위한 프로토콜)

RabbitMQ에서 주요한 개념은 아래와 같다.

1. Producer: 메시지 발행 주체

2. Consumer: 메시지 수신자

3. Queue: 메시지가 Consumer에 의해 소비되기 전까지 보관되는 장소

4. Exchange: Producer의 메시지들을 어떤 Queue로 발송할지 결정, 일종의 Router개념으로 4가지 타입이 있음

5. Binding: 메시지 Routing 규칙을 설정(특정 조건에 맞는 메시지를 특정 큐에 전송)

 

Exchange Type

타입 설명 전송 방식
Direct Routing Key가 정확하게 일치하는 Queue에게 메시지 전송 Unicat
Topic Routing Key 패턴이 일치하는 Queue에게 메시지 전송 Multicast
Headers [key:value] 구성의 header 값을 기준으로 일치하는 Queue에 메시지 전송 Multicast
Fanout 해당 Exchange에 등록된 모든 Queue에 메시지 전송 Broadcast

 

SpringBoot RabbitMQ 설정

먼저 로컬 개발환경을 위해 docker로 rabbitmq를 실행한다.

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 \
      -e RABBITMQ_DEFAULT_USER=admin \
      -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:management

 

build.gradle파일에 의존성 설정을 한다.

implementation 'org.springframework.boot:spring-boot-starter-amqp:2.5.4'

 

application.yml 파일에 rabbitmq 정보를 설정한다.

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin

 

RabbitMQ Config 파일을 작성한다.

@Configuration
public class RabbitmqConfig {

    private static final String TOPIC_EXCHANGE_NAME = "spring-boot-exchange";
    private static final String QUEUE_NAME = "spring-boot";

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, false);
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE_NAME);   // Topic Exchange 타입
    }

    @Bean
    public Binding binding(TopicExchange topicExchange, Queue queue) {
        return BindingBuilder.bind(queue).to(topicExchange).with("hello.key.#");
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

 

1. Simple Queue

아래와 같이 Publisher -> Exchange -> Queue -> Consumer 구성의 간단한 샘플이다. 

테스트를 위한 간단한 도메인 객체를 작성해준다.

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
@ToString
public class RabbitMessage {
    private String id;
    private String fName;
    private String lName;
}

 

메소드가 호출되면 rabbitmq로 메시지를 전송하는 Publisher 서비스를 작성한다.

@Service
@RequiredArgsConstructor
public class RabbitPublisher {

    private final RabbitTemplate rabbitTemplate;
    private final TopicExchange topicExchange;

    public void sendMessage(RabbitMessage message){
        rabbitTemplate.convertAndSend(topicExchange.getName(), "hello.key.1", message);
    }
}

 

Queue로부터 메시지를 수신하고 처리하는 Consumer 서비스를 작성한다.

@Service
@AllArgsConstructor
@Slf4j
public class RabbitConsumer {

    @RabbitListener(queues = "#{queue.name}")
    public void consume(RabbitMessage message){
        log.info("{}", message);
    }
}

 

테스트용 Controller를 작성하여 api호출을 하면 메시지가 발송되도록 해본다

@RestController
@RequestMapping("rabbit")
@AllArgsConstructor
public class RabbitmqController {

    private final RabbitPublisher rabbitPublisher;

    /**
     * Simple Queue 테스트(Exchange 활용)
     */
    @GetMapping("/send")
    public void sendMessage() {
        RabbitMessage rabbitMessage = RabbitMessage.builder().id("1").fName("First Name").lName("Last Name").build();

        IntStream.range(0, 100).forEachOrdered(n -> {
            rabbitMessage.setId(String.valueOf(n));
            rabbitPublisher.sendMessage(rabbitMessage);
        });
    }
}

 

이제 테스트만 해보면 된다. Controller에 작성한 url(localhost:8081/rabbit/send)로 호출해보면 Consumer에서 메시지가 수신되어 로그가 찍히는 것을 확인할 수 있다.

만약 여기에서 동일한 어플리케이션을 하나 더 실행해서 Consumer가 2개가 되면 1개의 Queue에 2개의 Consumer가 연결되어 있음으로 Consumer1은 id=1, id=3, id=5 ..., Consumer2는 id=2, id=4, .. 와 같이 Queue의 메시지를 서로 나누어 가질것이다.
기본적으로 Queue에서 한번 소비된 메시지는 큐에서 삭제된다.

 

참고로 http://localhost:15672/(admin/admin) 로 RabbitMQ UI에 접속하여 보면서 테스트도 가능하다.

2. Pub/Sub

RabbitMQ를 이용하여 pub/sub 구성은 아래와 같은 구성이 될 것이다. fanout Exchange 타입을 사용하여 Binding 되어 있는 모든 Queue에게 동일한 메시지를 전송하는 방식으로 Publisher는 exchange에 메시지를 전송하고, 각 Subscriber는 자신의 Queue에서 메시지를 수신하여 처리한다. 

샘플코드는 위의 1번에서 작성한 코드에 이어서 작성할 예정이다.

 

1번에서 작성한 RabbitmqConfig파일에 아래 내용을 추가한다.

@Configuration
public class RabbitmqConfig {

    .....
    
    private static final String FANOUT_EXCHANGE_NAME = "pubsub-exchange";
    private static final String QUEUE_NAME_SUB1 = "sub1";
    private static final String QUEUE_NAME_SUB2 = "sub2";

    // Subscriber용 큐 2개 생성
    @Bean
    public Queue subQueue1() {
        return new Queue(QUEUE_NAME_SUB1, false);
    }
    @Bean
    public Queue subQueue2() {
        return new Queue(QUEUE_NAME_SUB2, false);
    }

    // FanoutExchange 생성
    @Bean
    public FanoutExchange pubsubExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE_NAME);
    }

    // 각 큐에 binding 설정
    @Bean
    public Binding pubsubBinding1(FanoutExchange pubsubExchange, Queue subQueue1) {
        return BindingBuilder.bind(subQueue1).to(pubsubExchange);
    }
    @Bean
    public Binding pubsubBinding2(FanoutExchange pubsubExchange, Queue subQueue2) {
        return BindingBuilder.bind(subQueue2).to(pubsubExchange);
    }
 }

 

1번에서 작성한 Publisher서비스에 아래 내용을 추가한다.

@Service
@RequiredArgsConstructor
public class RabbitPublisher {

    private final RabbitTemplate rabbitTemplate;
    private final FanoutExchange fanoutExchange;

    ....

    public void pubsubMessage(RabbitMessage message){
        rabbitTemplate.convertAndSend(fanoutExchange.getName(), "", message);
    }
}

 

1번에서 작성한 Consumer서비스에 아래 내용을 추가 작성한다.

@Service
@AllArgsConstructor
@Slf4j
public class RabbitConsumer {

    ....

    /**
     * PubSub Consumer 1
     */
    @RabbitListener(queues = "#{subQueue1.name}")
    public void consumeSub1(RabbitMessage message){
        log.info("[consumeSub1]: {}", message);
    }

    /**
     * PubSub Consumer 2
     */
    @RabbitListener(queues = "#{subQueue2.name}")
    public void consumeSub2(RabbitMessage message){
        log.info("[consumeSub2]: {}", message);
    }
}

 

1번에서 작성한 Controller에 Pub/Sub 테스트 메소드를 추가한다.

@RestController
@RequestMapping("rabbit")
@AllArgsConstructor
public class RabbitmqController {

    private final RabbitPublisher rabbitPublisher;

    ..... 
    
    /**
     * PubSub 테스트용
     */
    @PostMapping("/pubsub")
    public void pubsubMessage() {
        RabbitMessage rabbitMessage = RabbitMessage.builder().id("1").fName("First Name").lName("Last Name").build();

        IntStream.range(0, 100).forEachOrdered(n -> {
            rabbitMessage.setId(String.valueOf(n));
            rabbitPublisher.pubsubMessage(rabbitMessage);
        });
    }
}

테스트를 해보면 2개의 Consumer가 동일한 메시지를 수신한 것을 확인할 수 있다.

추가적인 기능들에 대해서는 나중에 또 정리를 해봐야겠다

반응형

댓글