본문 바로가기
개발/SpringBoot

Kafka 구성 및 SpringBoot 연동

by 궁즉변 변즉통 통즉구 2023. 3. 13.
반응형

SpringBoot에 kafka 연동을 적용해보면서 kafka에 대한 기본 개념과 springboot와 연동 방법을 알아본다

Kafka 개요

  • 매우 높은 처리량과 낮은 지연시간(latency)
  • 높은 확장성
  • 고가용성: 클러스터 내 Replication
    • 각 메세지들을 여러 개로 복제해서 카프카 클러스터 내 브로커들에 분산시키는 동작
    • ex. 토픽 생성시 --replication-factor 3 으로 지정하면 원본을 포함하여 총 3개의 replication 생성
    • Topic 자체를 복제하는 것이 아닌 Topic의 파티션을 복제를 하는 방식
  • 내구성: acks 옵션, 메시지 디스크 저장
  • 개발 편의성: Producer와 Consumer의 분리
  • 다양한 기능과 3rd Party 를 통해 관리 편의성

 

kafka 구성

  • 주키퍼(Zookeeper): 아파치 프로젝트 애플리케이션으로 kafka의 메타데이터(metadata) 관리 및 브로커의 정상상태 점검(health check) 을 담당(분산 애플리케이션에서 코디네이터 역할, 브로커 노드 관리, 토픽 관리 ,컨트롤러 관리 등)
    • 주키퍼는 여러 대의 서버를 앙상블(Ensemble,클러스터) 로 구성하고, 살아 있는 노드 수가 과반수 이상 유지 된다면 지속적인 서비스가 가능한 구조 -> 따라서 주키퍼는 반드시 홀수로 구성
    • kafka에서는 주키퍼에 대한 의존성을 제거 하려는 움직임이 진행. ⇒ Apache Kafka 2.8 버전부터 주키퍼 대신 kraft 를 사용할 수 있으나 아직은 개발 단계
  • 브로커(broker) : kafka 애플리케이션이 설치된 서버 또는 노드를 의미
  • 프로듀서(producer): kafka로 메세지를 보내는 역할을 하는 클라이언트
  • 컨슈머(consumer) : kafka에서 메세지를 꺼내가는 역할을 하는 클라이언트
    • 파티션 수보다 컨슈머 수가 많게 구현되는 것은 바람직한 구성은 아님. 컨슈머 수가 파티션 수보다 많다면 더 많은 수의 컨슈머들은 그냥 대기 상태로 존재하기 때문에 더 빠르게 메세지를 가져오거나 처리량을 늘어나지 않음
    • 컨슈머는 컨슈머 그룹 안에 속한 것이 일반적인 구조로, 하나의 컨슈머 그룹안에 여러 개의 컨슈머가 구성. 컨슈머는 토픽의 파티션과 1:1로 매핑 되어 메세지를 가져옴
  • 토픽(topic) : kafka는 메시지 피드들을 토픽으로 구분하고, 각 Topic의 이름은 kafka 내에서 고유
  • 파티션(partition) : 병렬 처리 및 고성능을 얻기 위해 하나의 Topic을 여러개로 나눈 것을 의미.
    • (파티션 하면) 분산 처리가 가능해지며, 파티션 수 만큼 컨슈머를 연결 가능
    • 파티션수는 초기 생성 후 언제든지 늘릴 수 있지만, 반대로 한번 늘린 파티션 수는 절대로 줄일 수 없음.
      초기에 토픽 생성시에는 파티션수를 작게 적절하게(2또는 4정도) 생성 한 후, 메세지 처리량인 컨슈머의 LAG 등을 모니터링 하면서 조금씩 늘려가는 방법이 가장 좋음
  • 세그먼트(segment) : 프로듀서가 전송한 실제 메세지가 브로커의 로컬 디스크에 저장되는 파일을 의미
  • 메세지(message) 또는 레코드(record): 프로듀서가 브로커로 전송하거나 컨슈머가 읽어가는 데이터 조각을 말합니다.

 

Kafka 구성 및 테스트

kafka 테스트 환경을 위해 docker-compose로 설치한다.

version: '3' # docker-compose 버전 지정
services:  # docker-compose의 경우 docker 컨테이너로 수행될 서비스들은 services 하위에 기술
    zookeeper: # 서비스 이름. service 하위에 작성하면 해당 이름으로 동작
        image: wurstmeister/zookeeper # 도커 이미지
        container_name: zookeeper
        ports: # 외부포트:컨테이너내부포트
            - "2181:2181"
    kafka:
        image: wurstmeister/kafka
        container_name: kafka
        ports: # 외부포트:컨테이너내부포트
            - "9092:9092"
        environment: # kafka 브로터를 위한 환경 변수 지정
            KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # kafka가 zookeeper에 커넥션하기 위한 대상을 지정
        volumes:
            - /var/run/docker.sock:/var/run/docker.sock

 

docker-compose로 컨테이너를 실행하고 kafka 컨테이너로 접속하여 CLI를 통해서 Topic을 생성하고 조회 해본다.

docker-compose up -d

# kafka 접속
docker exec -it kafka bash

# kafka 버전확인
kafka-topics.sh --version

# 토픽 생성
kafka-topics.sh --create --topic sample_topic_1 --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1

# 토픽 목록 조회
kafka-topics.sh --list --bootstrap-server localhost:9092

 

'test_topic' 토픽으로 test message를 발행한다.

# 프로듀스 실행(토픽명: test_topic)
kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

 

'test_topic' 토픽으로 메시지 컨슈머를 실행하면 test message를 가져오는 것을 확인할 수 있다.

# 컨슈머 실행
# --from-beginning 옵션은 해당 토픽의 맨 처음 메시지부터 확인 가능
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic

 

아래 명령으로 토픽 정보를 조회해 볼 수 있다.

# 토픽 정보 조회
kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --describe

 

SpringBoot 적용

gradle에서 kafka 의존성 설정을 진행한다. kafka 버전은 위에서 테스트한 버전과 맞춰준다.

# kafka 버전 맞춰서 의존성 설정
implementation ('org.springframework.kafka:spring-kafka:2.8.1')

 

application.yml에서 kafka 설정을 추가한다. 여기서는 접속정보만 작성했다.

spring:
  kafka:
    bootstrap-servers: localhost:9092

 

JavaConfig 설정을 작성한다. application.yml을 통해 설정할수도 있지만 명식적 선언을 위해서 javaconfig로 설정한다.

@EnableKafka  // @KafkaListener 사용을 위한 설정
@Configuration
public class KafkaConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    // ------------------------ Publish 설정 -------------------------------------

    // 테스트 Topic 생성 1
    @Bean
    public NewTopic myTopic1() {
        return new NewTopic("my_topic_1", 1, (short) 1);
    }

    // 테스트 Topic 생성 2
    @Bean
    public NewTopic myTopic2() {
        return new NewTopic("my_topic_2", 1, (short) 1);
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    // ------------------------ Consumer 설정 -------------------------------------

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

 

다음으로 Producer 테스트 클래스를 작성한다. API로 호출해서 테스트를 할 것이라서 Controller를 작성해준다. 위에서 설정한 2개의 토픽을 테스트하기 위해 메소드를 2개로 나누어 작성했지만 구조는 동일하다.

@RestController
@RequestMapping("kafka")
@AllArgsConstructor
@Slf4j
public class ProducerController {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final NewTopic myTopic1;
    private final NewTopic myTopic2;

    @GetMapping("/publish/mytopic1")
    public String publishSpringTopic1() {

        String message = "publish message to my_topic_1 " + UUID.randomUUID();

        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(myTopic1.name(), message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                log.info("Unable to send message=[" + message + "] due to : " + ex.getMessage());
            }
        });
        return "done";
    }

    @GetMapping("/publish/mytopic2")
    public String publish() {

        String message = "publish message to my_topic_2 " + UUID.randomUUID();

        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(myTopic2.name(), message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                log.info("Unable to send message=[" + message + "] due to : " + ex.getMessage());
            }
        });
        return "done";
    }
}

 

메시지 처리를 위해 Consumer 테스트 클래스를 Service로 작성하고, @KafkaListenr를 적용한다.

@Service
@Slf4j
public class ConsumerService {

    @KafkaListener(topics = "#{myTopic1.name}", groupId = "group1")
    public void consumeMyopic1(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition){
        log.info("[consume message]: {} from partition: {}", message, partition);
    }

    @KafkaListener(topics = "#{myTopic2.name}", groupId = "group1")
    public void consumeMyopic2(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition){
        log.info("[consume message]: {} from partition: {}", message, partition);
    }
}

 

이제 테스트 어플리케이션을 실행하고 테스트하면 된다. 먼저 어플리케이션을 실행하면 JavaConfig에서 설정한 토픽이 생성된 것을 확인할 수 있다. 

 

다음으로 콘솔에서 확인을 위해 kafka 컨테이너에 접속하여 consumer 설정을 해준다.

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_1
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_2

 

이제 publish API를 호출해보면(to ‘mytopic1’) 각각 Consumer 로그들을 확인해 볼 수 있다.

http://localhost:8081/kafka/publish/mytopic1

applicaiton consumer 로그

kafka 콘솔 로그

 

 

반응형

댓글