카테고리 없음
[클라우드] - CQRS
yongyongMom
2024. 10. 7. 17:46
SMALL
CQRS
개요
- Command and Query Responsibility Segregation 의 약자로 데이터 저장소로부터 읽기 와 업데이트 작업을 분리하는 패턴
- CQRS를 사용하면 애플리케이션의 확장성, 퍼포먼스, 보안성을 극대화할 수 있고 여러 요청으로부터 들어온 복수의 업데이트 작업들도 충돌을 방지할 수 있음
전통적인 방식의 문제점
- 전통적인 아키텍쳐에서는 데이터베이스에서 조회하고 업데이트하는데 같은 데이터 모델을 사용
- 간단한 CRUD 작업에서는 문제없이 동작을 하지만 복잡한 애플리케이션에서는 유지보수를 어렵게 만들 수 있음
- 데이터 조회를 할 때는 여러 다른 형태의 DTO를 반환하는 매우 다양한 쿼리들을 수행할 수 있는데 각각 다른 형태의 DTO 들에 객체 매핑 작업은 복잡해 질 가능성이 있음
- 데이터를 쓰거나 업데이트 할 때는 복잡한 유효성 검사 와 비지니스 로직이 수행되어야 함 -> 이 모든 걸 하나의 데이터 모델이 수행하면 너무 많은 것을 수행하는 복잡한 모델이 됨
- 읽기와 쓰기의 트래픽은 일반적으로 같지 않기 때문에 각각에 대해서 다른 성능이 요구되는 경우가 많음
- 읽기와 쓰기 작업에서 사용되는 데이터 표현들이 서로 일치하지 않는 경우가 많음
-> 일부 작업에서는 필요하지 않은 추가적인 컬럼이나 속성의 업데이트가 이루어 져야 함 - 동일한 세트에 대해 병렬로 작업이 수행될 때 데이터 경합이 발생할 수 있음
- 정보 조회를 위해 요구되는 복잡한 쿼리로 인해 성능에 부정적인 영향을 줄 수 있음
- 하나의 데이터 모델이 읽기와 쓰기를 모두 수행하기 때문에 보안 관리가 복잡해 질 수 있음
해결책
- 읽기 와 쓰기를 분리해서 각각 다른 모델로 분리해서 명령을 통해 데이터를 쓰고 쿼리를 통해 데이터를 읽음
- 명령(Command) : 데이터 중심적이 아니라 수행할 작업 중심이 되어야 하는데 '호텔룸의 상태를 예약됨으로 변경한다' 가 아니라 '호텔 룸 예약' 과 같이 생성
- 조회(Query) : 데이터베이스를 결코 수정하지 않는데 쿼리는 어떠한 도메인 로직도 캡슐화하지 않은 DTO 만을 반환
- 읽기/쓰기 모델들은 서로 격리 가능 -> 읽기/쓰기 모델 분리하는 것은 애플리케이션 디자인과 구현을 더욱 간단하게 만들어주지만 CQRS 코드는 ORM 툴을 이용해 스키마로부터 자동 생성 불가능 단점 존재
- 확실한 격리를 위해서 물리적으로 읽기 와 쓰기를 분리가능
-> 읽기 DB의 경우 복잡한 조인문이나 ORM 매핑을 방지하기 위해서 Material View를 가지는 조회에 최적화된 별도의 DB 스키마를 가질 수 있도록 만듦 - 단지 다른 DB 스키마가 아니라 아예 다른 타입의 저장소를 사용할 수 있는데 이 때 보통 쓰기의 경우는 RDBMS를 읽기의 경우는 NoSQL을 사용하기도 합니다
- 별도의 읽기/쓰기 데이터 저장소가 사용된다면 반드시 동기화가 이루어져야 함
- 쓰기 모델 -> DB 수정 사항 발생 -> 이벤트 발생 -> DB 업데이트 진행 ::: DB 업데이터와 이벤트 발생은 하나의 트랜잭션
- 읽기 저장소 : 쓰기 저장소의 replica 또는 완전히 다른 구조
- 읽기 & 쓰기 분리 -> 각각의 부하에 맞게 scaling 가능
- 부하 :: 읽기 > 쓰기
CQRS 의 장점
- 독립적인 스케일링
- 최적화된 데이터 스키마
- 보안
- 관심사 분리 (AOP)
- 간단한 쿼리 : 읽기 저장소의 material view 를 통해서 복잡한 조인문 사용 필요 없음
구현 이슈
- 복잡성
- 메세징
: 명령을 수행하고 업데이트 이벤트를 발행해서 데이터를 수정해야 하기 때문에 메세지 전송 실패나 중복 메세지에 대한 처리가 필요 - 데이터 일관성
: 읽기 모델 업데이트 시 불가피하게 딜레이 발생
CQRS를 사용해야 하는 경우
- 많은 사용자가 동일한 데이터에 병렬로 액세스 하는 경우: 읽기가 많은 시스템
- 복잡한 프로세스나 도메인 모델을 통해 가이드 되는 작업 기반 사용자 인터페이스
- 데이터의 읽기 성능 과 쓰기 성능이 별도로 조정되어야 할 때
- 시스템이 시간이 지남에 따라 계속해서 진화하고 여러 버전을 가져야 할 때
- 다른 시스템 과의 통합
권장하지 않는 경우
- 도메인과 비지니스 로직이 간단할 때
- 단순한 CRUD 작업인 경우
Apache Kafaka
개요
- 등장배경
- LinkedIn에서 2011년 파편화된 데이터 수집 및 분배 아키텍쳐를 운영하는 데 어려움을 겪었는데 데이터를 생성하고 적재하기 위해서는 데이터를 생성하는 소스 애플리케이션 과 데이터가 최종 적재되는 타켓 애플리케이션이 연결되어야 하는데 초기 운영을 할 때는 단방향 통신을 이용해서 소스 코드를 작성했는데 이 당시에는 아키텍처가 복잡하지 않았기 때문에 운영이 힘들지 않았지만 아키텍처가 점점 복잡해지고 소스 애플리케이션 과 타겟 애플리케이션의 개수가 늘어나면서 문제가 발생
- 소스 애플리케이션 과 타겟 애플리케이션을 연결하는 파이프라인의 개수가 늘어나면서 소스 코드 및 버전 관리에서 이슈가 발생하기 시작했고 타겟 애플리케이션에 장애가 발생할 경우 그 영향이 소스 애플리케이션에 그대로 전달 - 강한 결합의 문제점
- 초창기에는 다양한 메세지 플랫폼 과 ETL(Extract Transform Load) 툴을 적용해서 아키텍처를 변경하려고 노력-> 파편화된 데이터 파이프라인의 복잡도를 낮추는 아키텍처 실패
- 해결책
- 각각의 애플리케이션끼리 연결해서 데이터 처리 아님
- 한 곳에 모아 중앙 집중화 방식으로 처리
- 카프카를 이용해 웹 사이트, 애플리케이션 센서 등에서 취합한 데이터 스트림을 한 곳에 모아서 관리
- 카프카는 대용량 데이터를 수집하고 소비 가능
- 카프카는 중앙에 배치해 소스 애플리케이션과 타겟 애플리케이션 사아의 의존도를 최대화
- 소스 애플리케이션은 어느 타겟 애플리케이션으로 데이터를 보낼 것인지 고민하지 않고
-> 카프카로 넣으면 되고 카프카 내부에 데이터가 저장되는 파티션은 FIFO(First In First Out)의 형태로 동작 - 큐에 데이터를 보내는 동작은 프로듀서가 하고 큐에서 데이터를 가져가는 것은 컨슈머가 수행
- 데이터 포맷
- 제한 없음
- 자바에서 사용 가능한 모든 객체는 사용할 수 있는데 직렬화(객체 단위로 데이터를 전송할 수 있도록 해주는 것 - Serializable 인터페이스나 Parceable 인터페이스를 구현한 객체) 와 역직렬화를 이용
- 구성
- 카프카는 상용환경에서 최소 3대 이상의 서버(Broker)로 운영
- 3대 이상으로 구현을 하게 되면 클러스터 중 일부에 장애가 발생하더라도 데이터를 지속적으로 복제하기 때문에 안전하게 운영할 수 있습니다.
- 현재
- 카프카 소스 코드는 깃허브 저장소(https://github.com/apache/kafka)에 공개
- KIP(Kafka Improvement Proposal)을 통해서 변경 사항을 제안하는 것이 가능
Kafka 의 역할
- Big Data
- 다양한 종류의 많은 또는 빠르게 생성되는 데이터를 말합니다.
- Data Pipeline
- Data Lake: 생성되는 데이터를 모두 모은 것
- Data Warehouse: 필터링이나 패키지화가 된 데이터
- Data Pipeline 은 Data Warehouse 와 다르게 필터링 되거나 패키지화 되지 않은 데이터가 저장되고 운영되는 서비스로부터 수집 가능한 모든 데이터를 모으는 것
- 데이터 과학자 :: 모든 데이터 -> 서비스에 활용할 수 있는 비지니스 인사이트 도출
- 서비스에서 발생하는 데이터를 data lake로 모으려면 웹, 앱, 백엔드 서버, 데이터베이스에서 발생하는 데이터를 직접 End-To-End 방식으로 넣을 수 있음
- 서비스하는 애플리케이션의 개수가 적고 트래픽이 많지 않을 때는 문제가 되지 않지만 서비스가 복잡해지게 되면 Extracting(추출), Transform(변경), Loading(적재) 하는 과정을 하나로 만드는 Data Pipeline을 구축해야 함
- Data Pipeline의 동작은 자동화 되어야 합니다.
- Data Pipeline을 구축할 때 Kafka 와 같은 Message Broker 와 Airflow(스케줄러) 같은 Tool을 많이 이용합니다.
- Message Broker가 받은 데이터를 전처리 작업을 수행한 후 데이터 저장소에 저장
- Kafka 사용 이유
- 높은 처리량 : 데이터를 묶어 전송 가능, 병렬 처리 가능
- 확장성이 좋음 : 브로커의 개수를 조절
- 영속성
- 카프카는 데이터를 파일에 저장(카프카가 종료되었다 켜지더라도 데이터는 그대로 보존)
- 속도는 Page Cache를 이용해서 보완
- 고가용성
- 여러 개의 서버로 운영이 되기 때문에 일부 서버에 장애가 발생해도 무중단으로 안전하고 지속적으로 데이터를 처리
Docker 에 설치
- docker desktop 에서 이미지 다운 받기
:: kafka 와 zookeeper 이미지 다운받기 - docker-compose.yml 파일 생성하기 - macbook에서 실행
- mkdir kafka-docker
- cd kafka-docker
- touch docker-compose.yml
- docker-compose.yml 파일로 이동해서 파일 내용 작성하기
- 들여쓰기가 달라지면 읽지 못해서 엄격하게 따라줘야 합니다.
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.5.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
4. 백그라운드로 container 실행시키기
: docker-compose up -d
외부에서 사용할 수 있도록 설정 변경
- 터미널에서 도커 컨테이너 안으로 접속
- docker exec -it kafka /bin/bash
토픽 생성 및 삭제 - 메세지 전송 및 받기
// docker bash shell 로 들어가서 명령어 작성
// topic 생성
bash-5.1# kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic exam-topic
Created topic exam-topic.
// 생성한 topic 확인
bash-5.1# kafka-topics.sh --bootstrap-server localhost:9092 --list
exam-topic
// exam-topic 삭제
bash-5.1# kafka-topics.sh --delete --zookeeper zookeeper:2181 --topic exam-topic
Topic exam-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
// topic 삭제 확인
bash-5.1# kafka-topics.sh --bootstrap-server localhost:9092 --list
// 메세지 전송
bash-5.1# kafka-console-producer.sh --topic exam-topic --broker-list localhost:9092
>안녕하세요
[2024-10-07 07:39:45,177] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {exam-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>반갑습니다.
// 새로운 터미널에서 메세지 받기
bash-5.1# kafka-console-consumer.sh --topic exam-topic --bootstrap-server localhost:9092 --from-beginning
안녕하세요
반갑습니다.
파이썬에서 메세지 주고 받기
- 가상 환경 생성
- python -m venv 가상환경 이름
- python3 -m venv 가상환경 이름 (mac 환경)
- 가상 환경 활성화
- source 가상환경이름/bin/activat
- kafka-python 설치
- pip3 install kafka-python
- 버전 오류 : pip install six==1.6.0
- pythonproducer.py 파일로 메세지 보내는 클래스 파일 생성
// pythonproducer.py
import sys
import six
if sys.version_info >= (3, 12, 0):
sys.modules['kafka.vendor.six.moves'] = six.moves
from kafka import KafkaProducer
import json
class MessageProducer:
def __init__(self, broker, topic):
self.broker = broker
self.topic = topic
# key_serializer=str.encode 를 추가하면 key 와 함께 전송
# 그렇지 않으면 value 만 전송
self.producer = KafkaProducer(
bootstrap_servers=self.broker,
value_serializer=lambda x: json.dumps(x).encode("utf-8"),
acks=0,
api_version=(2, 5, 0),
key_serializer=str.encode,
retries=3,
)
def send_message(self, msg, auto_close=True):
try:
print(self.producer)
future = self.producer.send(self.topic, value=msg, key="key")
self.producer.flush() # 비우는 작업
if auto_close:
self.producer.close()
future.get(timeout=2)
return {"status_code": 200, "error": None}
except Exception as exc:
raise exc
# 브로커와 토픽명을 지정
broker = ["localhost:9092"]
topic = "exam-topic"
pd = MessageProducer(broker, topic)
#전송할 메시지 생성
msg = {"name": "John", "age": 30}
res = pd.send_message(msg)
print(res)
- python3 pythonproducer.py 로 터미널에서 실행 시 이전에 실행해놓은 터미널에서 메세지 도착 확인 가능합니다.
// pythonconsumer.py
import sys
import six
if sys.version_info >= (3, 12, 0):
sys.modules['kafka.vendor.six.moves'] = six.moves
from kafka import KafkaConsumer
import json
class MessageConsumer:
def __init__(self, broker, topic):
self.broker = broker
self.consumer = KafkaConsumer(
topic, # Topic to consume
bootstrap_servers=self.broker,
value_deserializer=lambda x: x.decode(
"utf-8"
), # Decode message value as utf-8
group_id="my-group", # Consumer group ID
auto_offset_reset="earliest", # Start consuming from earliest available message
enable_auto_commit=True, # Commit offsets automatically
)
def receive_message(self):
try:
for message in self.consumer:
#print(message.value)
result = json.loads(message.value)
for k, v in result.items():
print(k, ":", result[k])
print(result["name"])
print(result["age"])
except Exception as exc:
raise exc
# 브로커와 토픽명을 지정한다.
broker = ["localhost:9092"]
topic = "python-topic"
cs = MessageConsumer(broker, topic)
cs.receive_message()
반응형
LIST