메시징 큐와 이벤트 스트림
- 생산자는 소비자의 큐로 데이터를 직접 푸시한다.
- 소비자가 데이터를 읽어갈 때 큐에서 데이터를 삭제한다.
- 일대일(1:1) 상황에서 유리하다.
- 생산자는 스트림의 특정 저장소에 하나의 메시지를 보낼 수 있고, 소비자들은 스트림에서 같은 메시지를 풀(pull)해 갈 수 있다.
- 구독자가 읽어간 데이터는 저장소의 설정에 따라 특정 기간 동안 저장될 수 있다.
- 다대다(n:n) 상황에서 유리하다.
레디스를 메시지 브로커로 사용하기
레디스에서 제공하는 pub/sub를 사용하면 빠르고 간단한 방식으로 메시지를 전달할 수 있는 메시지 브로커를 구현할 수 있다.
- 모든 데이터는 한 전 채널 전체에 전파된 뒤 삭제된다. (일회성)
- fire-and-forget 패턴이 필요한 상황에서 유용하게 사용할 수 있다. (ex. 알림 서비스)
레디스의 자료구조를 활용해서 메시지 브로커를 구현할 수 있다.
- 메시징 큐: list 자료구조
- 스트림 플랫폼: stream
레디스의 pub/sub
- 레디스에서 pub/sub은 매우 가볍다.
- 최소한의 메시지 전달 기능만 제공한다. (메타데이터 등 저장X)
- 단순히 메시지의 통로 역할만 한다.
메시지 publish하기
> PUBLISH hello world
(integer) 1 # 메시지를 수신한 구독자의 수
hello라는 채널을 수신하고 있는 모든 서버들에 world라는 메시지가 전파된다.
메시지 구독하기
> SUBSCRIBE event1 event2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "event1"
3) (integer) 1
1) "subscribe"
2) "event2"
3) (integer) 2
SUBSCRIBE 커맨드를 통해 특정 채널을 구독한다. (event1, event2 동시 구독)
> PSUBSCRIBE mail-*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "mail-*"
3) (integer) 1
PSUBSCRIBE를 통해 앞부분이 mail-로 시작하는 모든 채널에 전파된 메시지를 모두 수신할 수 있다.
클러스터 구조에서의 pub/sub
클러스터는 레디스가 자체적으로 제공하는 데이터 분산 형태의 구조다.
- 클러스터 구조에서 메시지가 전파되는 모습이다.
- 하나의 노드에 메시지를 발행하면 메시지는 모든 노드에 전파된다.
- 모든 레디스 노드에 복제되는 방식이므로 불필요한 리소스 사용과 네트워크 부하가 발생할 수 있다.
sharded pub/sub
위와 같은 비효율을 해결하기 위해 레디스 7.0부터 sharded pub/sub 기능이 도입됐다.
SPUBLISH 커맨드로 발행된 메시지는 모든 노드에 전파되지 않으며, 노드의 복제본에만 전달된다.
10.0.0.1:6379> SPUBLISH apple a
-> Redirected to slot [7092] located at 10.0.0.2:6379
(integer) 1
10.0.0.2:6379>
데이터를 전파하려고 할 때, 연결된 노드에서 지정한 채널에 전파할 수 없다는 메시지와 함께 연결된 노드로 리다이렉트된다.
10.0.0.1:6379> SSUBSCRIBE apple
Reading messages... (press Ctrl-C to quit)
-> Redirected to slot [7092] located at 10.0.0.2:6379
Reading messages... (press Ctrl-C to quit)
1) "ssubscribe"
2) "apple"
3) (integer) 1
1) "smessage"
2) "apple"
3) "a"
apple 채널은 apple 키 값을 할당받을 수 있는 슬롯을 포함한 마스터 노드에 연결될 수 있도록 리다이렉트된다.
레디스의 list를 메시징 큐로 사용하기
레디스의 자료 구조 중 하나인 list는 큐로 사용하기 적절한 자료 구조다.
list의 EX 기능
트위터는 각 유저의 타임라인 캐시 데이터를 레디스에서 list 자료 구조로 관리한다.
유저A가 새로운 트윗을 작성하면 그 데이터는 A를 팔로우하는 유저의 타임라인 캐시에 저장된다.
> RPUSHX Timelinecache:userB data3
(integer) 26
> RPUSH Timelinecache:userC data3
(integer) 5
> RPUSH Timelinecache:userD data3
(integer) 0
RPUSH는 list가 이미 존재할 때에만 아이템을 추가한다. (자주 들어오지 않는 유저의 캐시라인은 관리할 필요가 없으므로)
list의 블로킹 기능
이벤트 기반 구조에서 이벤트 루프를 돌며 신규로 처리할 이벤트가 있는지 체크한다.
> BRPOP queue:a 5
1) "queue:a"
2) "data"
queue:a에 데이터가 입력될 때까지 최대 5초 동안 대기하고, 5초가 경과하면 nil을 반환한다.
BRPOP은 2개의 데이터를 반환한다. (키 값, 반환된 데이터의 값)
> BRPOP queue:a queue:b queue:c timeout 1000
1) "queue:b"
2) "DATA" # queue:b에 신규로 들어온 DATA라는 값을 반환
(19.89s) # 19.98초 동안 세 리스트에 데이터가 입력되는 것을 기다림
BRPOP은 1,000초 동안 queue:a, queue:b, queue:c 중 어느 하나라도 데이터가 들어올 때까지 기다린 뒤, 그 중 하나의 리스트에 데이터가 들어오면 해당 값을 읽어온다.
list를 이용한 원형 큐
특정 아이템을 계속해서 반복 접근해야 하는 클라이언트, 혹은 여러 개의 클라이언트가 병렬적으로 같은 아이템에 접근해야 하는 클라이언트에서는 원형 큐를 이용해 아이템을 처리할 수 있다.
> LPUSH clist A
(integer) 1
> LPUSH clist B
(integer) 2
> LPUSH clist C
(integer) 3
> LRANGE clist 0 -1
1) "C"
2) "B"
3) "A"
> RPOPLPUSH clist clist
"A"
> LRANGE clist 0 -1
1) "A"
2) "C"
3) "B"
list에서 RPOPLPUSH 커맨드를 사용해서 간편하게 원형 큐를 사용할 수 있다.
Stream
레디스의 Stream과 아파치 카프카
Stream은 레디스 5.0에서 새로 추가된 자료 구조로 대용량, 대규모의 메시징 데이터를 빠르게 처리할 수 있도록 설계됐다.
- 대량의 데이터를 효율적으로 처리하는 플랫폼으로 활용할 수 있다.
- 여러 생산자가 생성한 데이터를 다양한 소비자가 처리할 수 있게 지원하는 데이터 저장소 및 중간 큐잉 시스템으로 사용할 수 있다.
스트림이란?
스트림이란 연속적인 데이터의 흐름, 일정한 데이터 조각의 연속을 의미한다.
10GB의 텍스트 파일을 처리하는 애플리케이션에서 바이트 스트림을 처리하는 과정이다. 파일 하나는 유한하지만 이를 읽어올 때 애플리케이션은 단어 단위, 또는 줄 단위로 데이터를 잘게 쪼개서 처리하기 때문에 프로그램은 바이트 스트림을 처리하는 것으로 볼 수 있다.
채팅 프로그램에서 JSON 파일을 스트리밍하는 과정을 나타낸다. 채팅 앱에서 사용자는 아무때나 채팅을 보낼 수 있으며, 메신저 서버는 사람들이 계속 채팅하는 동안 끝없이 데이터를 처리할 수 있어야 한다.
외부에서 인풋을 받아오는 스트리밍 처리 방식에서도, 애플리케이션 내부에서 서버 간 데이터의 이동이 필요할 수 있다.
데이터의 저장
메시지의 저장과 식별
- 카프카에서 스트림 데이터는 토픽이라는 개념에 저장된다.
- 레디스에서는 하나의 stream 자료 구조가 하나의 stream을 의미한다.
스트림 생성과 데이터 입력
-- 토픽 생성
$ kafka-topics --zookeeper 127.0.0.1:6000 --topic Email --create partitions 1 --replication-factor 1
-- 데이터 추가
$ kafka-console-consuer --brokers-list 127.0.0.1:7000 --topic Email
> "I am first email"
> "I am second email"
카프카에서 각 스트림은 토픽이라는 이름으로 관리된다. 생성자는 데이터를 토픽에 푸시하며, 소비자는 토픽에서 데이터를 읽어간다.
> XADD Email * subject "first" body "hello?"
"1659114481311-0" -- 데이터 ID --
레디스에서는 XADD 커맨드를 이용해 새로운 이름의 stream에 데이터를 저장하면 데이터의 저장과 동시에 stream 자료 구조가 생성된다.
데이터의 조회
- 카프카에서 소비자는 특정 토픽을 실시간으로 리스닝하며, 새롭게 토픽에 저장되는 메시지를 전달받을 수 있다.
- 레디스 stream에서는 실시간으로 처리되는 데이터를 리스닝하거나, ID를 이용해 필요한 데이터를 검색할 수 있다.
실시간 리스닝
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
XREAD 커맨드를 통해 실시간으로 stream에 저장되는 데이터를 읽어올 수 있다.
특정한 데이터 조회
XRANGE key start end [COUNT count]
XREVRANGE key end start [COUNT count]
XRANGE 커맨드를 통해 ID를 이용해 원하는 시간대의 데이터를 조회할 수 있다.
소비자와 소비자 그룹
같은 데이터를 여러 소비자에게 전달하는 것을 팬아웃(fan-out)이라 한다.
카프카에서 Email이라는 토픽에 3개의 소비자가 연결된 상황이다.
레디스 stream의 데이터를 여러 소비자가 읽어가는 상황이다.
소비자 그룹
카프카에서는 소비자 그룹에 여러 소비자를 추가할 수 있으며, 이때 소비자는 토픽 내의 파티션과 일대일로 연결된다.
레디스 stream은 카프카와는 달리 메시지가 전달되는 순서를 신경 쓰지 않아도 된다. 소비자 그룹 내의 한 소비자는 다른 소비자가 아직 읽지 않은 데이터만을 읽어간다.
ACK와 보류 리스트
메시지 브로커는 각 소비자에게 어떤 메시지까지 전달됐고, 전달된 메시지의 처리 유무를 인지하고 있어야 한다.
이메일 서비스1이라는 소비자가 2개의 메시지를 가져갔고, 서비스가 1개의 메시지를 가져갔다. 레디스 stream은 소비자별로 보류 리스트를 만들고, 어떤 소비자가 어떤 데이터를 읽어갔는지 인지하고 있다.
만약 이메일 서비스 2가 stream에게 데이터가 처리됐다는 뜻의 ACK를 보내면 레디스 stream은 이메일 서비스 2의 보류 리스트에서 ACK를 받은 메시지를 삭제한다.
메시지의 재할당
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
메시지가 보류 상태로 머무른 시간이 최소 대기 시간을 초과한 경우에만 소유권을 변경할 수 있도록 한다. (중복 할당 방지)
메시지의 자동 재할당
XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]
소비자가 직접 보류했던 메시지 중 하나를 자동으로 가져와 처리할 수 있도록 한다. 할당 대기 중인 다음 메시지의 ID를 반환하는 방식으로 동작하기 때문에 반복적 호출이 가능하다.
메시지의 수동 재할당
메시지를 재할당할 경우 1씩 증가하게 된다. (counter++) 하지만 계속 할당되면서 counter가 특정 값에 도달하면 이 메시지를 특수한 다른 stream으로 보내, 관리자가 추후에 처리할 수 있도록 한다. 이러한 메시지를 dead letter라 부른다.
stream 상태 확인
XINFO 커맨드를 통해 stream의 여러 상태를 확인할 수 있고, 사용할 수 있는 기능은 help 커맨드로 확인할 수 있다.
'독서 > 개발자를 위한 레디스' 카테고리의 다른 글
[개발자를 위한 레디스] 8장 복제 (0) | 2024.11.23 |
---|---|
[개발자를 위한 레디스] 7장 레디스 데이터 백업 방법 (0) | 2024.11.07 |
[개발자를 위한 레디스] 5장 레디스를 캐시로 사용하기 (0) | 2024.10.19 |
[개발자를 위한 레디스] 4장 레디스 자료 구조 활용 사례 (0) | 2024.10.06 |
[개발자를 위한 레디스] 3장 레디스 기본 개념 (13) | 2024.09.14 |