SpringBoot에 kafka 연동을 적용해보면서 kafka에 대한 기본 개념과 springboot와 연동 방법을 알아본다
Kafka 개요
- 매우 높은 처리량과 낮은 지연시간(latency)
- 높은 확장성
- 고가용성: 클러스터 내 Replication
- 각 메세지들을 여러 개로 복제해서 카프카 클러스터 내 브로커들에 분산시키는 동작
- ex. 토픽 생성시 --replication-factor 3 으로 지정하면 원본을 포함하여 총 3개의 replication 생성
- Topic 자체를 복제하는 것이 아닌 Topic의 파티션을 복제를 하는 방식
- 내구성: acks 옵션, 메시지 디스크 저장
- 개발 편의성: Producer와 Consumer의 분리
- 다양한 기능과 3rd Party 를 통해 관리 편의성
kafka 구성
- 주키퍼(Zookeeper): 아파치 프로젝트 애플리케이션으로 kafka의 메타데이터(metadata) 관리 및 브로커의 정상상태 점검(health check) 을 담당(분산 애플리케이션에서 코디네이터 역할, 브로커 노드 관리, 토픽 관리 ,컨트롤러 관리 등)
- 주키퍼는 여러 대의 서버를 앙상블(Ensemble,클러스터) 로 구성하고, 살아 있는 노드 수가 과반수 이상 유지 된다면 지속적인 서비스가 가능한 구조 -> 따라서 주키퍼는 반드시 홀수로 구성
- kafka에서는 주키퍼에 대한 의존성을 제거 하려는 움직임이 진행. ⇒ Apache Kafka 2.8 버전부터 주키퍼 대신 kraft 를 사용할 수 있으나 아직은 개발 단계
- 브로커(broker) : kafka 애플리케이션이 설치된 서버 또는 노드를 의미
- 프로듀서(producer): kafka로 메세지를 보내는 역할을 하는 클라이언트
- 컨슈머(consumer) : kafka에서 메세지를 꺼내가는 역할을 하는 클라이언트
- 파티션 수보다 컨슈머 수가 많게 구현되는 것은 바람직한 구성은 아님. 컨슈머 수가 파티션 수보다 많다면 더 많은 수의 컨슈머들은 그냥 대기 상태로 존재하기 때문에 더 빠르게 메세지를 가져오거나 처리량을 늘어나지 않음
- 컨슈머는 컨슈머 그룹 안에 속한 것이 일반적인 구조로, 하나의 컨슈머 그룹안에 여러 개의 컨슈머가 구성. 컨슈머는 토픽의 파티션과 1:1로 매핑 되어 메세지를 가져옴
- 토픽(topic) : kafka는 메시지 피드들을 토픽으로 구분하고, 각 Topic의 이름은 kafka 내에서 고유
- 파티션(partition) : 병렬 처리 및 고성능을 얻기 위해 하나의 Topic을 여러개로 나눈 것을 의미.
- (파티션 하면) 분산 처리가 가능해지며, 파티션 수 만큼 컨슈머를 연결 가능
- 파티션수는 초기 생성 후 언제든지 늘릴 수 있지만, 반대로 한번 늘린 파티션 수는 절대로 줄일 수 없음.
초기에 토픽 생성시에는 파티션수를 작게 적절하게(2또는 4정도) 생성 한 후, 메세지 처리량인 컨슈머의 LAG 등을 모니터링 하면서 조금씩 늘려가는 방법이 가장 좋음
- 세그먼트(segment) : 프로듀서가 전송한 실제 메세지가 브로커의 로컬 디스크에 저장되는 파일을 의미
- 메세지(message) 또는 레코드(record): 프로듀서가 브로커로 전송하거나 컨슈머가 읽어가는 데이터 조각을 말합니다.
Kafka 구성 및 테스트
kafka 테스트 환경을 위해 docker-compose로 설치한다.
version: '3' # docker-compose 버전 지정
services: # docker-compose의 경우 docker 컨테이너로 수행될 서비스들은 services 하위에 기술
zookeeper: # 서비스 이름. service 하위에 작성하면 해당 이름으로 동작
image: wurstmeister/zookeeper # 도커 이미지
container_name: zookeeper
ports: # 외부포트:컨테이너내부포트
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports: # 외부포트:컨테이너내부포트
- "9092:9092"
environment: # kafka 브로터를 위한 환경 변수 지정
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # kafka가 zookeeper에 커넥션하기 위한 대상을 지정
volumes:
- /var/run/docker.sock:/var/run/docker.sock
docker-compose로 컨테이너를 실행하고 kafka 컨테이너로 접속하여 CLI를 통해서 Topic을 생성하고 조회 해본다.
docker-compose up -d
# kafka 접속
docker exec -it kafka bash
# kafka 버전확인
kafka-topics.sh --version
# 토픽 생성
kafka-topics.sh --create --topic sample_topic_1 --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
# 토픽 목록 조회
kafka-topics.sh --list --bootstrap-server localhost:9092
'test_topic' 토픽으로 test message를 발행한다.
# 프로듀스 실행(토픽명: test_topic)
kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
'test_topic' 토픽으로 메시지 컨슈머를 실행하면 test message를 가져오는 것을 확인할 수 있다.
# 컨슈머 실행
# --from-beginning 옵션은 해당 토픽의 맨 처음 메시지부터 확인 가능
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic
아래 명령으로 토픽 정보를 조회해 볼 수 있다.
# 토픽 정보 조회
kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --describe
SpringBoot 적용
gradle에서 kafka 의존성 설정을 진행한다. kafka 버전은 위에서 테스트한 버전과 맞춰준다.
# kafka 버전 맞춰서 의존성 설정
implementation ('org.springframework.kafka:spring-kafka:2.8.1')
application.yml에서 kafka 설정을 추가한다. 여기서는 접속정보만 작성했다.
spring:
kafka:
bootstrap-servers: localhost:9092
JavaConfig 설정을 작성한다. application.yml을 통해 설정할수도 있지만 명식적 선언을 위해서 javaconfig로 설정한다.
@EnableKafka // @KafkaListener 사용을 위한 설정
@Configuration
public class KafkaConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
// ------------------------ Publish 설정 -------------------------------------
// 테스트 Topic 생성 1
@Bean
public NewTopic myTopic1() {
return new NewTopic("my_topic_1", 1, (short) 1);
}
// 테스트 Topic 생성 2
@Bean
public NewTopic myTopic2() {
return new NewTopic("my_topic_2", 1, (short) 1);
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// ------------------------ Consumer 설정 -------------------------------------
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
다음으로 Producer 테스트 클래스를 작성한다. API로 호출해서 테스트를 할 것이라서 Controller를 작성해준다. 위에서 설정한 2개의 토픽을 테스트하기 위해 메소드를 2개로 나누어 작성했지만 구조는 동일하다.
@RestController
@RequestMapping("kafka")
@AllArgsConstructor
@Slf4j
public class ProducerController {
private final KafkaTemplate<String, String> kafkaTemplate;
private final NewTopic myTopic1;
private final NewTopic myTopic2;
@GetMapping("/publish/mytopic1")
public String publishSpringTopic1() {
String message = "publish message to my_topic_1 " + UUID.randomUUID();
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(myTopic1.name(), message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
log.info("Unable to send message=[" + message + "] due to : " + ex.getMessage());
}
});
return "done";
}
@GetMapping("/publish/mytopic2")
public String publish() {
String message = "publish message to my_topic_2 " + UUID.randomUUID();
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(myTopic2.name(), message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
log.info("Unable to send message=[" + message + "] due to : " + ex.getMessage());
}
});
return "done";
}
}
메시지 처리를 위해 Consumer 테스트 클래스를 Service로 작성하고, @KafkaListenr를 적용한다.
@Service
@Slf4j
public class ConsumerService {
@KafkaListener(topics = "#{myTopic1.name}", groupId = "group1")
public void consumeMyopic1(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition){
log.info("[consume message]: {} from partition: {}", message, partition);
}
@KafkaListener(topics = "#{myTopic2.name}", groupId = "group1")
public void consumeMyopic2(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition){
log.info("[consume message]: {} from partition: {}", message, partition);
}
}
이제 테스트 어플리케이션을 실행하고 테스트하면 된다. 먼저 어플리케이션을 실행하면 JavaConfig에서 설정한 토픽이 생성된 것을 확인할 수 있다.
다음으로 콘솔에서 확인을 위해 kafka 컨테이너에 접속하여 consumer 설정을 해준다.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_1
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_2
이제 publish API를 호출해보면(to ‘mytopic1’) 각각 Consumer 로그들을 확인해 볼 수 있다.
http://localhost:8081/kafka/publish/mytopic1
applicaiton consumer 로그
kafka 콘솔 로그
'개발 > SpringBoot' 카테고리의 다른 글
SpringBoot OpenFeign(FeignClient) 사용하기 (1) | 2023.05.31 |
---|---|
SpringBoot cannot deserialize from Object value 에러 (0) | 2023.05.27 |
SpringBoot3 - GraalVM Native Image Support (0) | 2023.02.01 |
SpringBoot RabbitMQ 활용(Simple Queue, Pub/Sub) (0) | 2022.12.23 |
SpringBoot Redis 활용 - Pub/Sub (0) | 2022.12.16 |
댓글