🧩 기타 주요 개념들
이벤트 소싱
- 데이터 상태 변화를 이벤트로 기록하고 해당 이벤트들을 순차적으로 재생하여 현재 상태를 파악하는 방법
- 데이터 변경 자체가 아닌 변경 이벤트를 저장함
- 복잡한 비즈니스 로직을 다루는 시스템에서 데이터 일관성과 추적 가능성을 높이는 데 유용하지만, 복잡성이 증가할 수 있음
- 주요 개념
- 이벤트: 데이터의 상태 변화를 나타내는 기록
- 이벤트 스토어: 이벤트를 순서대로 저장하는 저장소 → 이벤트의 불변성, 순차성 보장
- 애그리게이트: 관련된 이벤트를 모아 현재 상태를 재현할 수 있는 엔티티
- 커맨드: 애그리게이트에 특정 동작을 지시하는 명령. 이벤트를 생성하는 트리거 역할
- 프로젝션: 이벤트를 읽기 모델로 변환하여 조회 성능을 최적화하는 방식. 이벤트를 기반으로 읽기 전용 DB를 업데이트함
- 장점
- 데이터 변경 이력 추적 가능
- 복구 및 재생 가능
- CQRS와 자연스러운 통합
- 단점
CQRS
- CQRS(Command Query Responsibility Segregation): 명령과 조회의 책임을 분리하는 소프트웨어 디자인 패턴
- 읽기 작업과 쓰기 작업을 서로 다른 모델로 분리하여, 각 작업에 최적화된 구조를 사용할 수 있도록 한다는 개념
- 주요 개념
- 명령(Command)
- 데이터를 변경하는 작업 (예: 주문 생성, 결제 처리, 계정 업데이트 등)
- DB에 쓰기 작업을 수행
- 명령 모델: 데이터의 상태 변경을 담당. 무결성 보장을 위해 트랜잭션을 사용함
- 조회(Query)
- 데이터를 조회하는 작업 (예: 주문 내역 조회, 계정 정보 조회 등)
- DB에 읽기 작업을 수행
- 조회 모델: 읽기 전용 DB나 캐시를 사용해 빠른 응답을 제공함
- 장점
- 읽기 쓰기 분리로 인한 성능 향상
- 읽기와 쓰기를 독립적으로 확장 가능
- 유지보수성
- 데이터 일관성: 이벤트 소싱을 통해 데이터 상태 변경을 이벤트로 기록하고, 이벤트를 재생하여 현재 상태를 유지할 수 있음
- 단점
모니터링
- 애플리케이션, DB, 캐시 등 각 컴포넌트의 성능을 모니터링함
- Prometheus, Grafana
- 시스템의 주요 지표(TPS, 응답 시간, 에러율 등)를 모니터링하고 이상 징후를 감지하면 알림을 받을 수 있음
- 모니터링을 해야 문제 발생 시 빠르게 대응할 수 있고 시스템 성능을 분석하고 최적화할 수 있음
- 주요사항: 실시간 상태 파악, 자동 알림, 성능 분석, 병목 지점 파악, 사전 예방, 신속한 대응
로깅
- 애플리케이션의 주요 이벤트를 로깅하여 문제 발생 시 원인을 추적할 수 있음
- Elasticsearch, Logstash, Kibana(ELK 스택) 등
- 주요사항: 이벤트 추적, 디버깅 및 오류 해결, 패턴 분석, 장기적 최적화, 원인 분석, 법적 및 규제 요구사항 준수
테스트
- 단위 테스트
- 시스템의 개별 구성 요소를 테스트하여 각 부분이 예상대로 동작하는지 확인
- JUnit, TestNG
- 통합 테스트
- 여러 구성 요소가 함께 동작하는지를 테스트
- 개별 구성 요소들이 올바르게 상호작용하는지를 검증
@SpringBootTest 어노테이션 사용
- 부하 테스트
- 시스템이 높은 트래픽 상황에서도 안정적으로 동작하는지를 테스트
- Apache JMeter
- 회귀 테스트
- 새로운 코드 변경이 기존 기능에 영향을 미치지 않는지 확인
- 기존 테스트 케이스를 자동화하여 주기적으로 실행함
- 사용자 수용 테스트 (UAT)
- 실제 사용자 환경에서 시스템을 테스트하여 사용자가 요구하는 기능이 모두 제대로 동작하는지 확인
- 사용자 피드백을 반영하여 시스템을 최종 조정해 배포 준비를 완료함
배포
- CI(지속 통합)
- 개발자가 변경한 코드를 자주 자동으로 빌드하고 테스트하여, 코드 변경 시점에서 발생할 수 있는 문제를 조기에 발견하고 해결하는 것
- Jenkins, GitLab CI, Travis CI 등
- CD(지속 배포)
- CI 파이프라인을 통해 검증된 코드를 자동으로 프로덕션 환경에 배포함
- Argo CD 등
- Canary 배포
- 새로운 버전을 전체 시스템에 배포하기 전에 일부 사용자에게만 배포하여 문제가 없는지 확인하는 것
- 문제가 발생할 경우 빠르게 이전 버전으로 롤백 가능함
- 블루-그린 배포
- 두 개의 환경(블루와 그린)을 사용하여 하나는 현재 운영 중인 환경이고, 다른 하나는 새로운 버전을 배포하는 환경
- 새로운 버전을 그린 환경에 배포한 후, 모든 트래픽을 그린 환경으로 전환해 문제가 발생하면 블루 환경으로 롤백 가능함
- 롤링 배포
- 새로운 버전을 점진적으로 배포하여, 각 서버를 순차적으로 업데이트하는 방식
🧩 RabbitMQ
RabbitMQ란?
- 메시지 브로커: 데이터(메시지)를 송신자(프로듀서)로부터 수신자(컨슈머)에게 전달하는 중간 매개체
- RabbitMQ는 메시지를 큐(queue)에 저장하고, 필요할 때 적절한 수신자에게 전달함
- 주요 특징: 비동기 처리, 부하 분산, 내결함성
- 장점
- 신뢰성: 메시지 지속성, 확인 메커니즘
- 유연성: 다양한 메시지 패턴, 프로토콜 지원
- 확장성: 클러스터링으로 여러 노드로 구성, 분산 아키텍쳐
- 관리 및 모니터링: 관리 인터페이스, 플러그인 시스템
- 성능: 높은 처리량
- 단점
- 복잡성: 클러스터링 및 분산 환경에서 많은 설정 필요, 운영 관리 필요
- 성능 문제: 메시지 브로커 오버헤드, 대규모 메시지 처리시 성능 저하 발생 가능
- 운영 비용: 리소스 소비, 모니터링 및 유지보수
- 제한된 메시지 크기: 대용량 파일 전송에는 부적절할 수 있음
- 러닝 커브 존재
RabbitMQ의 기본 구성요소
- 메시지: RabbitMQ를 통해 전달되는 데이터 단위
- 프로듀서: 메시지를 생성하고 RabbitMQ에 보내는 역할
- 큐: 메시지를 저장하는 장소 (FIFO)
- 컨슈머: 큐에서 메시지를 가져와 처리하는 역할
- 익스체인지: 메시지를 적절한 큐로 라우팅하는 역할
RabbitMQ와 AMQP
- AMQP(Advanced Message Queuing Protocol): 메시지의 생성, 전송, 큐잉, 라우팅 등을 표준화하여 메시지 브로커가 상호 운용될 수 있게 하는 프로토콜
- 바인딩: 익스체인지와 큐를 연결하는 설정 (메시지가 어느 큐로 전달될지 정의)
- 헷갈리지 않기 위해 큐의 이름과 바인딩의 이름으로 일치시킬 것을 추천한다.
익스체인지 유형
- Direct Exchange: 라우팅 키가 정확히 일치하는 큐로 메시지를 전달 (예:
error -> error)
- Topic Exchange: 라우팅 키의 패턴을 사용하여 메시지를 라우팅 (예:
quick.orange.rabbit -> *.orange.*)
- Fanout Exchange: 라우팅 키를 무시하고 교환기에 바인딩된 모든 큐로 메시지를 브로드캐스트하는 방식 (모든 바인딩된 큐로 메시지가 전달됨)
- Headers Exchange: 라우팅 키 대신 메시지의 헤더를 기반으로 메시지를 라우팅 (헤더 값과 바인딩된 헤더 값이 일치하는 큐로 메시지를 전달)
RabbitMQ 실습
docker run -d --name rabbitmq -p5672:5672 -p 15672:15672 --restart=unless-stopped rabbitmq:management
- localhost:15672에 접속하면 로그인페이지가 보이고, guest/guest를 입력해 접속해서 대시보드를 볼 수 있음
implementation 'org.springframework.boot:spring-boot-starter-amqp'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
message.exchange=market
message.queue.product=market.product
message.queue.payment=market.payment
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
OrderApplicationQueueConfig
@Configuration
public class OrderApplicationQueueConfig {
@Value("${message.exchange}")
private String exchange;
@Value("${message.queue.product}")
private String queueProduct;
@Value("${message.queue.payment}")
private String queuePayment;
@Bean public TopicExchange exchange() { return new TopicExchange(exchange); }
@Bean public Queue queueProduct() { return new Queue(queueProduct); }
@Bean public Queue queuePayment() { return new Queue(queuePayment); }
@Bean public Binding bindingProduct() { return BindingBuilder.bind(queueProduct()).to(exchange()).with(queueProduct); }
@Bean public Binding bindingPayment() { return BindingBuilder.bind(queuePayment()).to(exchange()).with(queuePayment); }
}
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequiredArgsConstructor
public class OrderController {
private final OrderService orderService;
@GetMapping("/order/{id}")
public String order(@PathVariable String id) {
orderService.createOrder(id);
return "Order complete";
}
}
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class OrderService {
@Value("${message.queue.product}")
private String productQueue;
@Value("${message.queue.payment}")
private String paymentQueue;
private final RabbitTemplate rabbitTemplate;
public void createOrder(String orderId) {
rabbitTemplate.convertAndSend(productQueue, orderId);
rabbitTemplate.convertAndSend(paymentQueue, orderId);
}
}
/order/1로 요청을 보내고 대시보드에서 RabbitMQ의 Exchange와 Queue 를 확인할 수 있다. Queue 에서는 현재 발행된 메시지가 Total에 쌓여 있는것을 확인할 수 있다.
- Queue and Stream > Get Messages > Get Message에서 현재 큐에 쌓여있는 메시지를 조회할 수 있다.
🧩 Kafka 실습
version: '3.8'
services:
zookeeper:
image: wurstmeister/zookeeper:latest
platform: linux/amd64
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: wurstmeister/kafka:latest
platform: linux/amd64
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
kafka-ui:
image: provectuslabs/kafka-ui:latest
platform: linux/amd64
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_READONLY: "false"
- 도커 컴포즈 파일을 생성한 경로에서 도커 컴포즈를 실행
docker compose up -d
- localhost:8080에 접속하면 kafka UI에 접속할 수 있음
- application.properties (producer)
spring.application.name=kafkaproducer
server.port=8090
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
- ProducerApplicationKafkaConfig
@Configuration
public class ProducerApplicationKafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@RestController
@RequiredArgsConstructor
public class ProducerController {
private final ProducerService producerService;
@GetMapping("/send")
public String sendMessage(@RequestParam("topic") String topic,
@RequestParam("key") String key,
@RequestParam("message") String message) {
producerService.sendMessage(topic, key, message);
return "Message sent to Kafka topic";
}
}
@Service
@RequiredArgsConstructor
public class ProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic , String key, String message) {
for (int i = 0; i < 10; i++) {
kafkaTemplate.send(topic, key, message + " " + i);
}
}
}
- application.properties (consumer)
spring.application.name=kafkaconsumer
server.port=8091
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- ConsumerApplicationKafkaConfig
@EnableKafka
@Configuration
public class ConsumerApplicationKafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Slf4j
@Service
public class ConsumerService {
@KafkaListener(groupId = "group_a", topics = "topic1")
public void consumeFromGroupA(String message) {
log.info("Group A consumed message from topic1: " + message);
}
@KafkaListener(groupId = "group_b", topics = "topic1")
public void consumeFromGroupB(String message) {
log.info("Group B consumed message from topic1: " + message);
}
@KafkaListener(groupId = "group_c", topics = "topic2")
public void consumeFromTopicC(String message) {
log.info("Group C consumed message from topic2: " + message);
}
@KafkaListener(groupId = "group_c", topics = "topic3")
public void consumeFromTopicD(String message) {
log.info("Group C consumed message from topic3: " + message);
}
@KafkaListener(groupId = "group_d", topics = "topic4")
public void consumeFromPartition0(String message) {
log.info("Group D consumed message from topic4: " + message);
}
}
@Slf4j
@Component
public class ConsumerEndpoint {
@KafkaListener(groupId = "group_a", topics="topic1")
public void consumeFromGroupA(String message) {
log.info("Group A consumed message from topic1: {}", message);
}
@KafkaListener(groupId = "group_b", topics="topic1")
public void consumeFromGroupB(String message) {
log.info("Group B consumed message from topic1: {}", message);
}
@KafkaListener(groupId = "group_c", topics="topic2")
public void consumeFromGroupC2(String message) {
log.info("Group C consumed message from topic2: {}", message);
}
@KafkaListener(groupId = "group_c", topics="topic3")
public void consumeFromGroupC3(String message) {
log.info("Group C consumed message from topic3: {}", message);
}
@KafkaListener(groupId = "group_d", topics="topic4")
public void consumeFromGroupC(String message) {
log.info("Group D consumed message from topic4: {}", message);
}
}
- topic을 test-topic으로 지정하고 요청 → test-topic을 수신하는 그룹을 지정하지 않았으므로 아무도 수신받지 않음
- topic을 topic1으로 지정하고 요청 → 그룹 A/B만 메시지를 수신함
KafkaconsumerApplication 로그
- topic을 topic2으로 지정하고 요청 → 그룹 C만 메시지를 수신함
- topic을 각각 topic3/4로 지정하고 요청 → 그룹 C/D만 메시지를 수신함