Redis로 이벤트를 보내는데, PUBLISH가 있고 XADD도 있다. 둘 다 “메시지 전달”이라는 말이 붙어있고, 처음 보면 뭐가 다른지 모른다. 그냥 둘 중 하나 쓰면 되는 거 아닌가? 아니다. 잘못 고르면 메시지가 조용히 사라진다. 그리고 그게 언제 일어났는지 로그에도 안 남는다.
Q1. Redis에서 메시지를 보내는 방법이 두 가지라는데 뭐가 다른가?
Redis로 이벤트를 보내는데, PUBLISH가 있고 XADD도 있다. 뭘 써야 하나?
핵심을 한 줄로: Pub/Sub은 메시지를 저장하지 않는다. Stream은 저장한다.
Pub/Sub 동작:
PUBLISH notification-channel "{"type":"user_login","userId":1001}"
→ Redis 내부 채널 맵 조회 (O(N), N = 채널 구독자 수)
→ 현재 구독 중인 TCP 연결들에게 즉시 write
→ 전달 후 메시지는 Redis 어디에도 남지 않음
→ 구독자가 0명이라면? 그냥 버려짐. 리턴값은 0.
Stream 동작:
XADD order-stream * orderId 1001 status "payment_completed"
→ Redis 내부 radix tree에 노드 삽입
→ 키: 1719619200000-0 (타임스탬프ms-시퀀스번호, 자동 생성)
→ XTRIM 또는 MAXLEN으로 명시적 삭제 전까지 남아있음
→ 구독자가 0명이어도 메시지는 스트림에 살아있음
→ 나중에 연결된 소비자가 처음부터 읽을 수 있음
이 차이 하나에서 모든 것이 갈린다. 전달 보장 방식, 재처리 가능 여부, 오프라인 소비자 처리, 운영 복잡도까지 전부 “저장하느냐 안 하느냐”에서 나온다.
Pub/Sub은 라디오 방송이다. 지금 켜둔 사람만 듣는다. Stream은 팟캐스트다. 구독하면 예전 에피소드도 들을 수 있다.
Q2. Pub/Sub으로 알림을 보냈는데 수신자가 못 받았다. 왜?
결제 완료 알림을 Pub/Sub으로 구현했다. 배포 직후 일부 유저가 알림을 못 받았다는 CS가 들어왔다.
코드는 맞다. Redis는 에러가 없다. 그런데 못 받는다.
메시지가 유실되는 상황은 구체적으로 네 가지다.
상황 1: 배포 순간
롤링 배포 중 인스턴스 A가 내려가고 B가 뜨는 사이에 PUBLISH된 메시지는 아무도 못 받는다. A는 이미 연결이 끊겼고, B는 아직 구독을 등록하지 않았다. 그 0.3초 사이. 그 순간에 결제한 유저가 알림을 못 받는다. 배포를 매일 3회 한다면 매일 3번 이 창이 열린다.
상황 2: 네트워크 순단
구독자가 Redis와의 TCP 연결이 0.5초만 끊겨도 그 사이 PUBLISH된 메시지는 영원히 없다. 재연결하면 그 이후 메시지부터 받는다. 재연결 중에 온 메시지는 없다. 물어볼 방법도 없다.
상황 3: 느린 소비자
구독자가 메시지를 처리하는 속도보다 PUBLISH 속도가 빠르면, Redis 서버가 구독자에게 보내려는 메시지가 서버 측 버퍼에 쌓인다. client-output-buffer-limit pubsub 32mb 8mb 60이 기본값이다. 버퍼가 32mb를 초과하거나, 60초 동안 8mb 초과 상태가 지속되면 Redis가 연결을 강제로 끊는다. 버퍼에 있던 미전달 메시지는 모두 유실된다.
상황 4: Redis 재시작
채널 구독 정보는 메모리에만 있다. Redis가 재시작되면 모든 구독 정보가 사라진다. 소비자들이 각자 재연결하고 재구독을 완료할 때까지의 메시지는 없다.
# 상황 3 확인: 구독자 버퍼 현황
redis-cli CLIENT LIST | grep "flags=S"
# omem 컬럼이 버퍼 크기 (바이트)
# omem이 계속 증가하면 소비자가 처리를 못 따라가는 것
Pub/Sub을 써도 되는 케이스는 딱 하나다: at-most-once가 허용되는 경우.
- SSE 실시간 알림: 유실되면 클라이언트가 재연결 후 DB에서 미수신 알림을 조회한다.
- 캐시 무효화 브로드캐스트: 유실되면 다음 캐시 조회 때 DB에서 새로 적재된다. 결과는 같다.
- 모니터링 로그 스트리밍: 일부 로그 유실이 허용되는 시스템.
결제 완료 알림, 주문 처리 이벤트, 재고 차감 — 이런 것에 Pub/Sub을 쓰면 언젠가 반드시 유실이 생긴다.
Q3. Stream은 메시지를 어떻게 저장하나? append-only log가 뭔가?
append-only log라는 말만 들으면 뭔지 모르겠다. 쉽게 설명하면?
일기장이다. 일기는 오늘 날짜 옆에 쓰고, 어제 일기를 고치지 않는다. 계속 뒤에 추가된다. 그리고 날짜순으로 정렬되어 있어서 “3월 5일부터 3월 10일 사이” 같은 조회가 빠르다.
Redis Stream도 같다. 한 번 기록된 메시지는 수정할 수 없고, 항상 뒤에 추가된다. 메시지 ID가 타임스탬프 기반이라 순서가 자동으로 보장된다.
# 메시지 추가 (XADD)
XADD order-stream * orderId 1001 status payment_completed userId 5050
# 반환값: "1719619200000-0" (타임스탬프ms-시퀀스번호)
# 같은 밀리초에 두 번 추가하면
XADD order-stream * orderId 1002 status created
# 반환값: "1719619200000-1" (시퀀스번호 증가)
# 스트림 전체 메시지 수
XLEN order-stream
# 반환값: 2
# 범위 조회 (처음부터 최근까지)
XRANGE order-stream - + COUNT 10
# - 는 최솟값, + 는 최댓값
# 특정 시간 이후 메시지 (Unix timestamp ms 사용)
XRANGE order-stream 1719619200000 + COUNT 100
저장 위치는 Redis 메모리다. AOF 또는 RDB 설정이 있으면 디스크에도 기록된다. 하지만 기본 자료구조는 메모리다.
MAXLEN 없이 운영하면 메모리가 꽉 찬다. 꽉 차면 Redis는 maxmemory-policy에 따라 다른 데이터를 지우거나 에러를 반환한다. 반드시 크기를 제한해야 한다.
# 추가와 동시에 근사 트리밍 (권장)
XADD order-stream MAXLEN ~ 100000 * orderId 1001 status created
# ~ 옵션이 핵심이다
# 정확한 트리밍 (비권장 - 느림)
XADD order-stream MAXLEN 100000 * orderId 1001 status created
# 수동 트리밍
XTRIM order-stream MAXLEN ~ 100000
# 시간 기반 트리밍 (Redis 6.2+): 1시간 이전 메시지 제거
# 현재시각 Unix ms - 3600000
XTRIM order-stream MINID ~ 1719615600000
~(근사 트리밍)이 왜 필요한가?
Redis Stream 내부 구조는 radix tree 위에 listpack(연속 메모리 블록) 노드들을 얹은 형태다. 메시지 하나를 정확히 트리밍하려면 중간 노드를 쪼개야 한다. 이게 O(N) 작업이다. ~를 쓰면 전체 listpack 노드를 한 번에 버린다. 훨씬 빠르고 메모리 단편화도 적다. 근사값이라 MAXLEN 100000 설정 시 실제로는 103000개가 남을 수 있지만, 그 오차는 수용 가능하다.
Q4. Consumer Group이 왜 필요한가?
주문 처리 서버가 3대 있다. XREAD로 스트림에서 메시지를 읽으면 3대 모두 같은 메시지를 받는다. 같은 주문이 3번 처리된다.
# XREAD: 모든 소비자가 모든 메시지를 받음
XREAD COUNT 10 BLOCK 200 STREAMS order-stream 0
# → server-a도 읽음, server-b도 읽음, server-c도 읽음
# → orderId 1001이 3번 처리됨
# → 재고 3번 차감, 알림 3번 발송
Consumer Group이 이걸 막는다. 그룹 내에서 각 메시지는 소비자 하나에게만 전달된다.
Stream: [msg1] [msg2] [msg3] [msg4] [msg5] [msg6]
Consumer Group "order-processors":
consumer-1 (server-a): msg1, msg4 처리
consumer-2 (server-b): msg2, msg5 처리
consumer-3 (server-c): msg3, msg6 처리
각 메시지는 그룹 안에서 한 소비자에게만 간다
같은 스트림에 그룹을 여러 개 만들 수 있다. 각 그룹은 서로 완전히 독립적이다.
# 그룹 생성
# $ = 지금 이후 새 메시지부터
# 0 = 스트림 처음부터
XGROUP CREATE order-stream order-processors $ MKSTREAM
XGROUP CREATE order-stream audit-log 0 MKSTREAM
# order-processors: 주문 처리 담당, 새 메시지부터
# audit-log: 감사 로그 담당, 처음부터 읽음
# 같은 메시지를 두 그룹 모두 독립적으로 처리함
Spring Boot에서 Consumer Group 읽기:
@Configuration
public class StreamConsumerConfig {
@Bean
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamListenerContainer(
RedisConnectionFactory connectionFactory) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.batchSize(50) // 한 번에 50개씩 읽기
.pollTimeout(Duration.ofMillis(200))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
StreamMessageListenerContainer.create(connectionFactory, options);
// Consumer Group 구독
container.receive(
Consumer.from("order-processors", "consumer-" + getInstanceId()),
StreamOffset.create("order-stream", ReadOffset.lastConsumed()),
this::handleOrderEvent
);
container.start();
return container;
}
private void handleOrderEvent(MapRecord<String, String, String> record) {
String orderId = record.getValue().get("orderId");
String status = record.getValue().get("status");
// 처리 로직
// Spring Data Redis는 예외 없으면 자동 XACK
}
}
Q5. XREADGROUP으로 읽고 XACK 안 하면 어떻게 되나?
XREADGROUP으로 가져간 메시지를 처리하다가 서버가 죽었다. 메시지는 어떻게 되나?
사라지지 않는다. PEL(Pending Entries List)에 남는다.
XREADGROUP으로 메시지를 가져간 순간, Redis는 그 메시지를 PEL에 등록한다. “이 소비자가 가져갔는데 아직 처리 완료 확인(XACK)이 없다”는 상태다.
PEL에 저장되는 정보:
메시지 ID: 1719619200000-0
소비자 이름: consumer-1
전달 시각: 1719619200100 (ms)
전달 횟수: 1
소비자가 죽으면 그 소비자의 PEL 항목이 그대로 남는다.
# PEL 전체 현황
XPENDING order-stream order-processors - + 100
# 출력: 메시지ID, 소비자명, 마지막전달ms, 전달횟수
# 특정 소비자의 PEL
XPENDING order-stream order-processors - + 100 consumer-1
XCLAIM으로 죽은 소비자의 미처리 메시지를 살아있는 소비자가 가져간다:
# 5분(300000ms) 이상 ACK 없는 메시지를 consumer-2로 재할당
XCLAIM order-stream order-processors consumer-2 300000 1719619200000-0
Spring Boot에서 PEL 재처리 스케줄러:
@Component
@RequiredArgsConstructor
public class PelRecoveryScheduler {
private final RedisTemplate<String, String> redisTemplate;
private static final String STREAM = "order-stream";
private static final String GROUP = "order-processors";
private static final long IDLE_MS = 300_000; // 5분
private static final int MAX_DELIVERY = 5;
@Scheduled(fixedDelay = 60_000) // 1분마다
public void recoverPendingMessages() {
// 5분 이상 ACK 없는 PEL 항목 조회
PendingMessagesSummary summary = redisTemplate.opsForStream()
.pending(STREAM, GROUP);
if (summary.getTotalPendingMessages() == 0) return;
List<PendingMessage> pending = redisTemplate.opsForStream()
.pending(STREAM, Consumer.from(GROUP, "*"),
Range.unbounded(), 100L);
for (PendingMessage pm : pending) {
long idleTime = System.currentTimeMillis() - pm.getIdleTimeSince().toEpochMilli();
if (idleTime < IDLE_MS) continue;
if (pm.getTotalDeliveryCount() >= MAX_DELIVERY) {
// 5회 초과: Poison Pill → DLQ 이동 후 ACK
moveToDlq(pm.getId().getValue());
redisTemplate.opsForStream().acknowledge(STREAM, GROUP, pm.getId());
continue;
}
// 현재 인스턴스가 재처리 담당
String myConsumer = "consumer-" + getInstanceId();
redisTemplate.opsForStream().claim(
STREAM, GROUP, myConsumer,
Duration.ofMillis(IDLE_MS),
pm.getId()
);
}
}
private void moveToDlq(String messageId) {
// DLQ Stream에 저장하거나 슬랙 알림
redisTemplate.opsForStream().add(
"order-stream-dlq",
Map.of("originalId", messageId, "reason", "max_delivery_exceeded")
);
}
}
PEL이 at-least-once 보장의 핵심이다. 소비자가 죽어도 메시지가 PEL에 남아있고, 재처리 스케줄러가 살아있는 소비자에게 재할당한다.
하지만 주의할 점이 하나 있다. 처리 불가능한 메시지(잘못된 포맷, 외부 서비스 영구 장애 등)를 XACK 없이 계속 놔두면 PEL이 무한히 쌓인다. 재처리 횟수 임계값을 두고, 초과한 메시지는 DLQ로 이동 후 반드시 XACK 해야 한다.
Q6. Spring Boot에서 Pub/Sub 리스너 스레드가 어떻게 되나? 잘못 구현하면?
Redis Pub/Sub 리스너를 달았더니 가끔 메시지가 누락된다. 왜?
RedisMessageListenerContainer의 기본 동작을 모르면 이 버그를 재현하기도 어렵다.
기본 설정에서 RedisMessageListenerContainer는 단일 스레드로 메시지를 디스패치한다. 그 단일 스레드가 리스너 메서드를 직접 호출한다. 리스너 메서드 안에서 DB 쿼리, HTTP 호출, 파일 I/O 같은 블로킹 작업을 하면?
Redis → 메시지 도착
→ 단일 디스패치 스레드 → 리스너 메서드 호출
→ DB 쿼리 (50ms)
→ HTTP 호출 (200ms)
→ 다음 메시지 처리... (250ms 지연)
→ 그 사이 온 메시지들이 Redis 서버의 클라이언트 버퍼에 쌓임
→ client-output-buffer-limit 초과
→ Redis가 연결 강제 종료
→ 버퍼의 미전달 메시지 모두 유실
→ 재연결 후 그 이후 메시지부터 수신
올바른 구현: 별도 TaskExecutor + 처리 오프로드
@Configuration
public class PubSubConfig {
@Bean
public ThreadPoolTaskExecutor pubSubTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("pubsub-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Bean
public RedisMessageListenerContainer messageListenerContainer(
RedisConnectionFactory connectionFactory,
NotificationListener listener,
ThreadPoolTaskExecutor pubSubTaskExecutor) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 이게 없으면 단일 스레드 - 블로킹 즉시 누락 발생
container.setTaskExecutor(pubSubTaskExecutor);
container.addMessageListener(
new MessageListenerAdapter(listener, "handleNotification"),
new ChannelTopic("user-notification-events")
);
container.addMessageListener(
new MessageListenerAdapter(listener, "handleCacheInvalidation"),
new ChannelTopic("cache-invalidation-events")
);
return container;
}
}
@Component
@RequiredArgsConstructor
public class NotificationListener {
private final NotificationProcessor processor;
// TaskExecutor 스레드에서 호출됨
// 여기서도 블로킹 작업은 @Async로 다시 오프로드
public void handleNotification(String message, String channel) {
// 비동기 오프로드 - 이 메서드는 즉시 리턴
processor.processAsync(message);
}
public void handleCacheInvalidation(String message, String channel) {
processor.invalidateCacheAsync(message);
}
}
@Service
public class NotificationProcessor {
@Async("notificationExecutor") // 별도 풀
public CompletableFuture<Void> processAsync(String message) {
// DB 조회, HTTP 호출 등 무거운 작업
return CompletableFuture.completedFuture(null);
}
}
발행 측에서 놓치기 쉬운 것:
@Service
@RequiredArgsConstructor
public class EventPublisher {
private final StringRedisTemplate redisTemplate;
public void publishOrderEvent(String orderId, String status) {
String payload = String.format(
"{\"orderId\":\"%s\",\"status\":\"%s\",\"timestamp\":%d}",
orderId, status, System.currentTimeMillis()
);
// PUBLISH의 반환값 = 현재 이 채널을 구독 중인 클라이언트 수
Long subscriberCount = redisTemplate.convertAndSend(
"order-status-events", payload
);
// 0이면 아무도 못 받은 것
// Pub/Sub에서는 이걸 감지해도 재전송 방법이 없다
if (subscriberCount == 0) {
log.warn("No subscribers for order event: orderId={}", orderId);
// 여기서 할 수 있는 건 로깅 뿐. 메시지는 이미 사라짐.
}
}
}
PUBLISH 반환값이 0이어도 예외가 발생하지 않는다. 조용히 0을 반환하고 끝이다. 이걸 모르면 “전송했는데 왜 못 받지?”를 한참 디버깅한다.
Q7. SSE 서비스에서 여러 서버 노드 간 이벤트 브로드캐스트에 뭘 써야 하나?
인스턴스가 3개인데, 클라이언트A는 인스턴스1에 SSE 연결, 이벤트는 인스턴스2에서 발생. 어떻게 A에게 전달하나?
[클라이언트A] <--SSE-- [인스턴스1] <--|
[클라이언트B] <--SSE-- [인스턴스2] <--+-- Redis Pub/Sub
[클라이언트C] <--SSE-- [인스턴스3] <--|
결제 완료 이벤트: 인스턴스2에서 발생
→ Redis에 PUBLISH
→ 3개 인스턴스 모두 메시지 수신
→ 인스턴스1: 클라이언트A가 연결되어 있으면 SSE로 전달
→ 인스턴스2: 클라이언트B에게 직접 전달
→ 인스턴스3: 클라이언트C에게 전달
Pub/Sub이 이 패턴에 적합한 이유 세 가지:
- SSE는 원래 베스트에포트다. 연결이 끊기면 클라이언트가 재연결하고 Last-Event-ID를 보내 미수신 분을 보완한다.
- 팬아웃이 자연스럽다.
PUBLISH하나로 모든 인스턴스가 동시에 받는다. - 지연이 극히 낮다. 실시간 알림에서 수ms 지연이 중요하다.
@Service
@RequiredArgsConstructor
public class SseNotificationService {
// 이 인스턴스에 연결된 클라이언트 맵
private final ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final StringRedisTemplate redisTemplate;
public SseEmitter subscribe(String userId, String lastEventId) {
SseEmitter emitter = new SseEmitter(0L); // 타임아웃 없음
emitters.put(userId, emitter);
emitter.onCompletion(() -> emitters.remove(userId));
emitter.onTimeout(() -> emitters.remove(userId));
emitter.onError(e -> emitters.remove(userId));
// 연결 확립 확인
try {
emitter.send(SseEmitter.event().name("connected").data("ok"));
} catch (IOException e) {
emitters.remove(userId);
return emitter;
}
// 재연결 시 lastEventId가 있으면 미수신 알림 보충
if (lastEventId != null && !lastEventId.isBlank()) {
replayMissedNotifications(userId, lastEventId, emitter);
}
return emitter;
}
// Redis Pub/Sub 메시지 도착 → 이 인스턴스에 연결된 클라이언트에게 전달
public void onRedisMessage(String payload) {
NotificationPayload notification = deserialize(payload);
SseEmitter emitter = emitters.get(notification.getUserId());
if (emitter == null) return; // 이 인스턴스에 연결 안 됨
try {
emitter.send(SseEmitter.event()
.id(notification.getEventId()) // Last-Event-ID 추적용
.name(notification.getType())
.data(notification.getData()));
} catch (IOException e) {
emitters.remove(notification.getUserId());
}
}
// 결제 완료 → 이 인스턴스에서 모든 인스턴스로 브로드캐스트
public void publishPaymentCompleted(String userId, String orderId) {
NotificationPayload payload = NotificationPayload.builder()
.eventId(UUID.randomUUID().toString())
.userId(userId)
.type("payment_completed")
.data(Map.of("orderId", orderId))
.build();
redisTemplate.convertAndSend("sse-notification-events", serialize(payload));
}
// Last-Event-ID 기반 미수신 알림 보충 (DB에서 조회)
private void replayMissedNotifications(String userId, String lastEventId,
SseEmitter emitter) {
List<Notification> missed = notificationRepository
.findAfterEventId(userId, lastEventId);
missed.forEach(n -> {
try {
emitter.send(SseEmitter.event()
.id(n.getEventId())
.name(n.getType())
.data(n.getData()));
} catch (IOException e) {
// 전송 실패 시 중단
throw new RuntimeException(e);
}
});
}
}
@Configuration
@RequiredArgsConstructor
public class SsePubSubConfig {
private final SseNotificationService notificationService;
@Bean
public RedisMessageListenerContainer sseListenerContainer(
RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// SSE 전달은 빠른 I/O라 스레드 2개로 충분
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(4);
executor.setThreadNamePrefix("sse-redis-");
executor.initialize();
container.setTaskExecutor(executor);
container.addMessageListener(
(message, pattern) ->
notificationService.onRedisMessage(new String(message.getBody())),
new ChannelTopic("sse-notification-events")
);
return container;
}
}
Pub/Sub 유실 보완 전략: SSE의 Last-Event-ID 헤더. 클라이언트는 재연결 시 마지막으로 받은 이벤트 ID를 서버에 보낸다. 서버는 그 ID 이후의 미수신 알림을 DB에서 조회해 보내준다. Pub/Sub의 at-most-once를 애플리케이션 레벨에서 보완하는 방식이다.
Q8. Pub/Sub에서 클라이언트가 메시지를 못 받는 속도로 쌓이면?
이벤트가 폭발적으로 발생했다. 구독자 중 일부가 처리를 못 따라갔다. 어떻게 됐나?
Redis는 구독자에게 보낼 메시지를 서버 측 메모리 버퍼에 쌓아두다가 한계를 넘으면 연결을 끊는다.
# redis.conf 기본값
client-output-buffer-limit pubsub 32mb 8mb 60
의미:
- 버퍼가 32mb 초과 → 즉시 연결 강제 종료
- 60초 동안 버퍼가 8mb 초과 상태 지속 → 연결 강제 종료
연결이 끊기면:
- 버퍼에 있던 미전달 메시지 전부 유실
- 구독자가 재연결
- 재연결 이후 메시지부터 수신 시작
- 연결 끊긴 동안의 메시지: 영원히 없음
전형적인 실패 패턴:
브로드캐스트 이벤트 폭증 (예: 대규모 프로모션 시작 알림)
→ Redis: 구독자 100개에게 동시에 전달 시도
→ 느린 구독자 (DB 조회, 외부 API 호출): 처리 못 따라감
→ 해당 구독자의 서버 측 버퍼 급증
→ 32mb 초과 또는 60초 8mb 초과
→ Redis: 해당 구독자 연결 강제 종료
→ 버퍼의 미전달 메시지 소멸
→ 구독자 재연결 (보통 수초 이내)
→ 재연결 중 온 메시지 추가 유실
# 버퍼 모니터링
redis-cli CLIENT LIST | awk -F'[ =]' '{for(i=1;i<=NF;i++) if($i=="flags") flag=$(i+1); if($i=="omem") omem=$(i+1)} flag~/S|P/ {print NR, flag, "omem="omem}'
# flags=S(Slave) 또는 P(Pubsub) 클라이언트의 omem을 확인
# omem이 수 MB를 넘어가면 위험 신호
# 구독 현황 전체 조회
redis-cli PUBSUB CHANNELS
redis-cli PUBSUB NUMSUB notification-channel
redis-cli PUBSUB NUMPAT
튜닝 옵션:
# 버퍼 한계 올리기 (메모리 여유가 있을 때)
client-output-buffer-limit pubsub 64mb 16mb 120
# 이 설정이 의미하는 바:
# "64mb 이하이고, 120초 안에 16mb 이하로 줄어들면 괜찮다"
# 메모리를 더 쓰는 대신 연결 유지 시간을 늘려줌
소비자 측에서 처리를 @Async로 오프로드하면 수신 스레드가 빠르게 비워진다. 이게 가장 근본적인 대응이다.
하지만 이 모든 튜닝은 증상을 늦출 뿐이다. 메시지를 잃으면 안 되는 요구사항이라면 Pub/Sub 대신 Stream을 써야 한다. 버퍼 한계를 아무리 높여도, Stream의 PEL + XCLAIM 구조를 대체할 수 없다.
Q9. Redis Stream vs Kafka — 언제 Redis Stream으로 충분한가?
Redis Stream을 쓰다가 팀에서 “Kafka 써야 하지 않나?”는 얘기가 나왔다. 어떻게 판단하나?
솔직하게 비교한다.
Redis Stream이 맞는 경우:
- Redis를 이미 캐시로 쓰고 있다. 추가 인프라 없이 Stream 기능이 붙어온다.
- 이벤트 보존 기간이 짧다. 수 시간~하루 정도면 메모리 비용이 수용 가능하다.
- 처리량이 초당 수천 건 이하다. Redis 단일 스레드 I/O 모델에서도 이 정도는 무리가 없다.
- Consumer Group이 3개 이하다. 재처리 로직이 단순하다.
- Kafka 클러스터를 추가로 운영할 인력이 없다.
Kafka가 필요한 경우:
- 이벤트를 며칠~수 주 보존해야 한다. Kafka는 디스크 기반이라 메모리 한계가 없다.
- 초당 수만~수십만 건의 처리량이 필요하다. 파티션 단위 병렬 처리가 필요하다.
- 스키마 레지스트리(Avro, Protobuf)로 이벤트 형식을 엄격하게 관리해야 한다.
- 소비자 오프셋을 임의로 재설정해서 과거 이벤트를 재처리해야 한다.
- 이미 Kafka 기반 데이터 파이프라인(Kafka Connect, ksqlDB)에 통합해야 한다.
둘을 동시에 쓰는 구성이 현실에서 가장 자연스럽다:
결제 이벤트 → Kafka
(유실 불허, 장기 보존, 감사 로그, 데이터 파이프라인 연동)
주문 상태 알림 큐 → Redis Stream
(재처리 큐, 보존 12시간, Redis 이미 씀, Consumer Group 2개)
실시간 알림 팬아웃 → Redis Pub/Sub
(SSE 브로드캐스트, at-most-once 허용, 지연 최소화)
“Redis Stream이면 되는데 Kafka를 쓰는” 경우도 있고, “Kafka가 필요한데 Redis Stream으로 버티는” 경우도 있다. 판단 기준은 처리량, 보존 기간, 인프라 운영 비용의 세 가지다.
Redis Stream에서 Kafka로 마이그레이션하는 기준점: 메시지 보존 요구가 48시간을 넘거나, 처리량이 초당 1만 건을 넘을 때다.
Q10. 결론적으로 어떤 기준으로 선택하나?
판단을 하나의 질문으로 압축하면: “메시지를 잃어도 되나?”
잃어도 된다 (at-most-once):
→ Pub/Sub
→ SSE 실시간 알림, 캐시 무효화 브로드캐스트, 실시간 대시보드
잃으면 안 된다 (at-least-once):
→ Stream
→ 주문 처리, 결제 이벤트, 재고 차감, 감사 로그
두 번째 질문: “소비자가 각자 독립적으로 처리해야 하나, 아니면 모두 받아야 하나?”
팬아웃 (1개 이벤트 → N개 소비자 모두 받음):
→ Pub/Sub
→ "결제 완료" 이벤트를 알림 서비스, 분석 서비스, 마케팅 서비스 모두 받음
파티셔닝 (1개 이벤트 → N개 소비자 중 1개만 처리):
→ Stream + Consumer Group
→ "주문 생성" 이벤트를 주문 처리 서버 3대가 나눠서 처리
선택 매트릭스:
| 요구사항 | Pub/Sub | Stream |
|---|---|---|
| 전달 보장 | at-most-once | at-least-once |
| 메시지 영속 | 없음 | 있음 (MAXLEN 필수) |
| 오프라인 소비자 | 유실 | 나중에 읽기 가능 |
| 재처리 | 불가 | PEL + XCLAIM |
| 1→N 팬아웃 | 자연스러움 | 그룹마다 독립 소비 |
| N개 분산 처리 | 없음 | Consumer Group |
| 운영 복잡도 | 낮음 | PEL 모니터링 필요 |
| 지연 | 극히 낮음 (sub-ms) | 낮음 (수 ms) |
선택이 애매하면 Stream을 기본으로 간다. Pub/Sub의 장점인 낮은 운영 복잡도는 메시지 유실이 문제가 됐을 때 운영 복잡도가 훨씬 올라가는 것으로 돌아온다.
최적화 요약
Pub/Sub 최적화:
// TaskExecutor 분리 - 수신 스레드와 처리 스레드 분리
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(500);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
container.setTaskExecutor(executor);
// 채널 명명 전략: 이벤트 타입을 채널 이름에 포함
// 나쁜 예: "events", "notifications"
// 좋은 예: "user-login-events", "order-payment-completed", "cache-invalidation-product"
//
// 패턴 구독(PSUBSCRIBE "events-*") 은 채널 수가 늘어날수록 매칭 비용 증가
// 가능하면 명시적 채널(SUBSCRIBE "specific-channel") 사용
Stream 최적화:
# 배치 읽기: COUNT를 높여 네트워크 왕복 줄이기
XREADGROUP GROUP order-processors consumer-1 COUNT 100 BLOCK 200 STREAMS order-stream >
# COUNT 1씩 읽으면 처리량이 1/100이 됨
# 근사 트리밍: MAXLEN에 항상 ~ 붙이기
XADD order-stream MAXLEN ~ 100000 * field value
# ~ 없으면 매 XADD마다 listpack 노드 분할 작업 발생
# ACK 배치 처리
XACK order-stream order-processors id1 id2 id3 id4 id5
# 하나씩 ACK 보내는 것보다 배치가 훨씬 빠름
튜닝 체크리스트
# Pub/Sub: 느린 구독자 탐지
redis-cli CLIENT LIST | grep "flags=.*[SP]"
# omem 컬럼 확인 - 수 MB 이상이면 버퍼 축적 중
# Pub/Sub: 버퍼 설정 확인
redis-cli CONFIG GET client-output-buffer-limit
# Stream: PEL 크기 모니터링
redis-cli XPENDING order-stream order-processors - + 100
# PEL이 계속 증가하면 소비자 처리 속도 부족 또는 ACK 누락
# Stream: 스트림 현재 길이
redis-cli XLEN order-stream
# MAXLEN 설정값보다 지속적으로 크면 트리밍 동작 확인
# Stream: 전체 스트림 정보 (메모리 사용량, 그룹 수 포함)
redis-cli XINFO STREAM order-stream FULL
redis-cli XINFO GROUPS order-stream
# 전체 메모리 사용 현황
redis-cli INFO memory | grep -E "used_memory_human|maxmemory_human"
# 연결 현황
redis-cli INFO clients | grep -E "connected_clients|blocked_clients"
운영상 주의사항
Pub/Sub 채널 이름은 이벤트 타입이 드러나게. notification, events 같은 범용 이름은 나중에 채널이 늘어날 때 패턴 구독 외에는 구분할 방법이 없다. user-login-events, order-payment-completed, inventory-threshold-exceeded처럼 명확하게. 패턴 구독(PSUBSCRIBE)은 Redis가 모든 채널명을 glob 매칭해야 해서 채널 수가 많으면 느려진다.
Stream MAXLEN 없이 운영하면 안 된다. MAXLEN 없이 Stream을 운영하면 Redis 메모리가 꽉 찰 때까지 메시지가 쌓인다. maxmemory-policy allkeys-lru 설정이면 캐시 데이터가 먼저 evict되고, noeviction이면 새 쓰기가 에러를 반환한다. XADD에 MAXLEN ~ [크기]를 항상 붙이거나, 별도 스케줄러로 XTRIM을 주기적으로 실행한다. 운영 환경에서는 XLEN을 모니터링 지표로 수집해야 한다.
PEL 재처리 스케줄러는 반드시 구현한다. Spring Data Redis의 StreamMessageListenerContainer는 메시지 수신과 ACK를 처리하지만, PEL 재처리는 자동화하지 않는다. 소비자가 죽으면 그 소비자의 PEL 항목은 영원히 그대로다. 5분 이상 ACK 없는 메시지를 주기적으로 XPENDING으로 확인하고 XCLAIM으로 재할당하는 스케줄러가 없으면, 소비자 장애 시 메시지가 PEL에 갇혀 처리가 멈춘다.
Redis Cluster에서 클래식 Pub/Sub은 모든 노드로 브로드캐스트된다. PUBLISH한 메시지는 클러스터 버스를 통해 클러스터의 모든 노드로 전파된다. 따라서 어느 노드에 붙은 구독자든, 어느 노드에서 PUBLISH했든 상관없이 메시지를 받는다. 이 동작 덕분에 팬아웃은 편하지만, 모든 메시지가 모든 노드로 퍼지는 브로드캐스트 오버헤드가 생긴다. 노드 수와 트래픽이 커질수록 클러스터 버스 부하가 누적된다. 이 오버헤드를 줄이려고 Redis 7.0에서 Sharded Pub/Sub(SPUBLISH, SSUBSCRIBE)이 도입됐다. Sharded Pub/Sub은 채널을 슬롯에 매핑해서 해당 샤드(슬롯을 가진 노드와 그 Replica)에만 메시지를 전달하므로 브로드캐스트 범위를 좁힌다. 다만 발행자와 구독자가 같은 샤드를 봐야 하므로 채널 이름에 HashTag로 슬롯을 맞춰야 한다. Stream은 키가 특정 슬롯에 배치되므로 클러스터에서 사용할 때 HashTag({order}:stream)로 슬롯을 명시적으로 관리하는 것이 안전하다.