내일배움캠프

[내일배움캠프] 이벤트 소싱과 CQRS, RabbitMQ와 Kafka

munsik22 2026. 5. 13. 15:32

🧩 기타 주요 개념들

이벤트 소싱

  • 데이터 상태 변화를 이벤트로 기록하고 해당 이벤트들을 순차적으로 재생하여 현재 상태를 파악하는 방법
  • 데이터 변경 자체가 아닌 변경 이벤트를 저장함
  • 복잡한 비즈니스 로직을 다루는 시스템에서 데이터 일관성과 추적 가능성을 높이는 데 유용하지만, 복잡성이 증가할 수 있음
  • 주요 개념
    • 이벤트: 데이터의 상태 변화를 나타내는 기록
    • 이벤트 스토어: 이벤트를 순서대로 저장하는 저장소 → 이벤트의 불변성, 순차성 보장
    • 애그리게이트: 관련된 이벤트를 모아 현재 상태를 재현할 수 있는 엔티티
    • 커맨드: 애그리게이트에 특정 동작을 지시하는 명령. 이벤트를 생성하는 트리거 역할
    • 프로젝션: 이벤트를 읽기 모델로 변환하여 조회 성능을 최적화하는 방식. 이벤트를 기반으로 읽기 전용 DB를 업데이트함
  • 장점
    • 데이터 변경 이력 추적 가능
    • 복구 및 재생 가능
    • CQRS와 자연스러운 통합
  • 단점
    • 복잡성 증가
    • 읽기 성능 저하

CQRS

  • CQRS(Command Query Responsibility Segregation): 명령과 조회의 책임을 분리하는 소프트웨어 디자인 패턴
  • 읽기 작업과 쓰기 작업을 서로 다른 모델로 분리하여, 각 작업에 최적화된 구조를 사용할 수 있도록 한다는 개념
  • 주요 개념
    • 명령(Command)
      • 데이터를 변경하는 작업 (예: 주문 생성, 결제 처리, 계정 업데이트 등)
      • DB에 쓰기 작업을 수행
      • 명령 모델: 데이터의 상태 변경을 담당. 무결성 보장을 위해 트랜잭션을 사용함
    • 조회(Query)
      • 데이터를 조회하는 작업 (예: 주문 내역 조회, 계정 정보 조회 등)
      • DB에 읽기 작업을 수행
      • 조회 모델: 읽기 전용 DB나 캐시를 사용해 빠른 응답을 제공함
  • 장점
    • 읽기 쓰기 분리로 인한 성능 향상
    • 읽기와 쓰기를 독립적으로 확장 가능
    • 유지보수성
    • 데이터 일관성: 이벤트 소싱을 통해 데이터 상태 변경을 이벤트로 기록하고, 이벤트를 재생하여 현재 상태를 유지할 수 있음
  • 단점
    • 복잡성 증가
    • 데이터 동기화 작업 필요

모니터링

  • 애플리케이션, DB, 캐시 등 각 컴포넌트의 성능을 모니터링함
  • Prometheus, Grafana
  • 시스템의 주요 지표(TPS, 응답 시간, 에러율 등)를 모니터링하고 이상 징후를 감지하면 알림을 받을 수 있음
  • 모니터링을 해야 문제 발생 시 빠르게 대응할 수 있고 시스템 성능을 분석하고 최적화할 수 있음
  • 주요사항: 실시간 상태 파악, 자동 알림, 성능 분석, 병목 지점 파악, 사전 예방, 신속한 대응

로깅

  • 애플리케이션의 주요 이벤트를 로깅하여 문제 발생 시 원인을 추적할 수 있음
  • Elasticsearch, Logstash, Kibana(ELK 스택) 등
  • 주요사항: 이벤트 추적, 디버깅 및 오류 해결, 패턴 분석, 장기적 최적화, 원인 분석, 법적 및 규제 요구사항 준수

테스트

  • 단위 테스트
    • 시스템의 개별 구성 요소를 테스트하여 각 부분이 예상대로 동작하는지 확인
    • JUnit, TestNG
  • 통합 테스트
    • 여러 구성 요소가 함께 동작하는지를 테스트
    • 개별 구성 요소들이 올바르게 상호작용하는지를 검증
    • @SpringBootTest 어노테이션 사용
  • 부하 테스트
    • 시스템이 높은 트래픽 상황에서도 안정적으로 동작하는지를 테스트
    • Apache JMeter
  • 회귀 테스트
    • 새로운 코드 변경이 기존 기능에 영향을 미치지 않는지 확인
    • 기존 테스트 케이스를 자동화하여 주기적으로 실행함
  • 사용자 수용 테스트 (UAT)
    • 실제 사용자 환경에서 시스템을 테스트하여 사용자가 요구하는 기능이 모두 제대로 동작하는지 확인
    • 사용자 피드백을 반영하여 시스템을 최종 조정해 배포 준비를 완료함

배포

  • CI(지속 통합)
    • 개발자가 변경한 코드를 자주 자동으로 빌드하고 테스트하여, 코드 변경 시점에서 발생할 수 있는 문제를 조기에 발견하고 해결하는 것
    • Jenkins, GitLab CI, Travis CI 등
  • CD(지속 배포)
    • CI 파이프라인을 통해 검증된 코드를 자동으로 프로덕션 환경에 배포함
    • Argo CD 등
  • Canary 배포
    • 새로운 버전을 전체 시스템에 배포하기 전에 일부 사용자에게만 배포하여 문제가 없는지 확인하는 것
    • 문제가 발생할 경우 빠르게 이전 버전으로 롤백 가능함
  • 블루-그린 배포
    • 두 개의 환경(블루와 그린)을 사용하여 하나는 현재 운영 중인 환경이고, 다른 하나는 새로운 버전을 배포하는 환경
    • 새로운 버전을 그린 환경에 배포한 후, 모든 트래픽을 그린 환경으로 전환해 문제가 발생하면 블루 환경으로 롤백 가능함
  • 롤링 배포
    • 새로운 버전을 점진적으로 배포하여, 각 서버를 순차적으로 업데이트하는 방식

🧩 RabbitMQ

RabbitMQ란?

  • 메시지 브로커: 데이터(메시지)를 송신자(프로듀서)로부터 수신자(컨슈머)에게 전달하는 중간 매개체
  • RabbitMQ는 메시지를 큐(queue)에 저장하고, 필요할 때 적절한 수신자에게 전달함
  • 주요 특징: 비동기 처리, 부하 분산, 내결함성
  • 장점
    • 신뢰성: 메시지 지속성, 확인 메커니즘
    • 유연성: 다양한 메시지 패턴, 프로토콜 지원
    • 확장성: 클러스터링으로 여러 노드로 구성, 분산 아키텍쳐
    • 관리 및 모니터링: 관리 인터페이스, 플러그인 시스템
    • 성능: 높은 처리량
  • 단점
    • 복잡성: 클러스터링 및 분산 환경에서 많은 설정 필요, 운영 관리 필요
    • 성능 문제: 메시지 브로커 오버헤드, 대규모 메시지 처리시 성능 저하 발생 가능
    • 운영 비용: 리소스 소비, 모니터링 및 유지보수
    • 제한된 메시지 크기: 대용량 파일 전송에는 부적절할 수 있음
    • 러닝 커브 존재

RabbitMQ의 기본 구성요소

  • 메시지: RabbitMQ를 통해 전달되는 데이터 단위
  • 프로듀서: 메시지를 생성하고 RabbitMQ에 보내는 역할
  • 큐: 메시지를 저장하는 장소 (FIFO)
  • 컨슈머: 큐에서 메시지를 가져와 처리하는 역할
  • 익스체인지: 메시지를 적절한 큐로 라우팅하는 역할

RabbitMQ와 AMQP

  • AMQP(Advanced Message Queuing Protocol): 메시지의 생성, 전송, 큐잉, 라우팅 등을 표준화하여 메시지 브로커가 상호 운용될 수 있게 하는 프로토콜
  • 바인딩: 익스체인지와 큐를 연결하는 설정 (메시지가 어느 큐로 전달될지 정의)
  • 헷갈리지 않기 위해 큐의 이름과 바인딩의 이름으로 일치시킬 것을 추천한다.

익스체인지 유형

  • Direct Exchange: 라우팅 키가 정확히 일치하는 큐로 메시지를 전달 (예: error -> error)
  • Topic Exchange: 라우팅 키의 패턴을 사용하여 메시지를 라우팅 (예: quick.orange.rabbit -> *.orange.*)
  • Fanout Exchange: 라우팅 키를 무시하고 교환기에 바인딩된 모든 큐로 메시지를 브로드캐스트하는 방식 (모든 바인딩된 큐로 메시지가 전달됨)
  • Headers Exchange: 라우팅 키 대신 메시지의 헤더를 기반으로 메시지를 라우팅 (헤더 값과 바인딩된 헤더 값이 일치하는 큐로 메시지를 전달)

RabbitMQ 실습

  • Docker로 RabbitMQ 설치
docker run -d --name rabbitmq -p5672:5672 -p 15672:15672 --restart=unless-stopped rabbitmq:management
  • localhost:15672에 접속하면 로그인페이지가 보이고, guest/guest를 입력해 접속해서 대시보드를 볼 수 있음

  • build.gradle
implementation 'org.springframework.boot:spring-boot-starter-amqp'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
  • application.properties
message.exchange=market
message.queue.product=market.product
message.queue.payment=market.payment

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
  • OrderApplicationQueueConfig
@Configuration
public class OrderApplicationQueueConfig {

    @Value("${message.exchange}")
    private String exchange;

    @Value("${message.queue.product}")
    private String queueProduct;

    @Value("${message.queue.payment}")
    private String queuePayment;

    @Bean public TopicExchange exchange() { return new TopicExchange(exchange); }

    @Bean public Queue queueProduct() { return new Queue(queueProduct); }
    @Bean public Queue queuePayment() { return new Queue(queuePayment); }

    @Bean public Binding bindingProduct() { return BindingBuilder.bind(queueProduct()).to(exchange()).with(queueProduct); }
    @Bean public Binding bindingPayment() { return BindingBuilder.bind(queuePayment()).to(exchange()).with(queuePayment); }
}
  • OrderController
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequiredArgsConstructor
public class OrderController {

    private final OrderService orderService;

    @GetMapping("/order/{id}")
    public String order(@PathVariable String id) {
        orderService.createOrder(id);
        return "Order complete";
    }
}
  • OrderService
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class OrderService {

    @Value("${message.queue.product}")
    private String productQueue;

    @Value("${message.queue.payment}")
    private String paymentQueue;

    private final RabbitTemplate rabbitTemplate;

    public void createOrder(String orderId) {
        rabbitTemplate.convertAndSend(productQueue, orderId);
        rabbitTemplate.convertAndSend(paymentQueue, orderId);
    }

}
  • /order/1로 요청을 보내고 대시보드에서 RabbitMQ의 Exchange와 Queue 를 확인할 수 있다. Queue 에서는 현재 발행된 메시지가 Total에 쌓여 있는것을 확인할 수 있다.

  • Queue and Stream > Get Messages > Get Message에서 현재 큐에 쌓여있는 메시지를 조회할 수 있다.

🧩 Kafka 실습

  • docker-yml 파일 생성
version: '3.8'
services:
  zookeeper:
    image: wurstmeister/zookeeper:latest
    platform: linux/amd64
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: wurstmeister/kafka:latest
    platform: linux/amd64
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    platform: linux/amd64
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_READONLY: "false"
  • 도커 컴포즈 파일을 생성한 경로에서 도커 컴포즈를 실행
docker compose up -d
  • localhost:8080에 접속하면 kafka UI에 접속할 수 있음

  • application.properties (producer)
spring.application.name=kafkaproducer
server.port=8090

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  • ProducerApplicationKafkaConfig
@Configuration
public class ProducerApplicationKafkaConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
  • ProducerController
@RestController
@RequiredArgsConstructor
public class ProducerController {

    private final ProducerService producerService;

    @GetMapping("/send")
    public String sendMessage(@RequestParam("topic") String topic,
                              @RequestParam("key") String key,
                              @RequestParam("message") String message) {
        producerService.sendMessage(topic, key, message);
        return "Message sent to Kafka topic";
    }
}
  • ProducerService
@Service
@RequiredArgsConstructor
public class ProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;


    public void sendMessage(String topic , String key, String message) {
        for (int i = 0; i < 10; i++) {

            kafkaTemplate.send(topic, key, message + " " + i);
        }

    }
}
  • application.properties (consumer)
spring.application.name=kafkaconsumer
server.port=8091

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  • ConsumerApplicationKafkaConfig
@EnableKafka
@Configuration
public class ConsumerApplicationKafkaConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
  • ConsumerService
@Slf4j
@Service
public class ConsumerService {

    @KafkaListener(groupId = "group_a", topics = "topic1")
    public void consumeFromGroupA(String message) {
        log.info("Group A consumed message from topic1: " + message);
    }

    @KafkaListener(groupId = "group_b", topics = "topic1")
    public void consumeFromGroupB(String message) {
        log.info("Group B consumed message from topic1: " + message);
    }

    @KafkaListener(groupId = "group_c", topics = "topic2")
    public void consumeFromTopicC(String message) {
        log.info("Group C consumed message from topic2: " + message);
    }

    @KafkaListener(groupId = "group_c", topics = "topic3")
    public void consumeFromTopicD(String message) {
        log.info("Group C consumed message from topic3: " + message);
    }

    @KafkaListener(groupId = "group_d", topics = "topic4")
    public void consumeFromPartition0(String message) {
        log.info("Group D consumed message from topic4: " + message);
    }
}
  • ConsumerEndpoint
@Slf4j
@Component
public class ConsumerEndpoint {

    @KafkaListener(groupId = "group_a", topics="topic1")
    public void consumeFromGroupA(String message) {
        log.info("Group A consumed message from topic1: {}", message);
    }

    @KafkaListener(groupId = "group_b", topics="topic1")
    public void consumeFromGroupB(String message) {
        log.info("Group B consumed message from topic1: {}", message);
    }

    @KafkaListener(groupId = "group_c", topics="topic2")
    public void consumeFromGroupC2(String message) {
        log.info("Group C consumed message from topic2: {}", message);
    }

    @KafkaListener(groupId = "group_c", topics="topic3")
    public void consumeFromGroupC3(String message) {
        log.info("Group C consumed message from topic3: {}", message);
    }

    @KafkaListener(groupId = "group_d", topics="topic4")
    public void consumeFromGroupC(String message) {
        log.info("Group D consumed message from topic4: {}", message);
    }
}

  • topic을 test-topic으로 지정하고 요청 → test-topic을 수신하는 그룹을 지정하지 않았으므로 아무도 수신받지 않음

  • topic을 topic1으로 지정하고 요청 → 그룹 A/B만 메시지를 수신함

KafkaconsumerApplication 로그

  • topic을 topic2으로 지정하고 요청 → 그룹 C만 메시지를 수신함

  • topic을 각각 topic3/4로 지정하고 요청 → 그룹 C/D만 메시지를 수신함