[Spring] Cloud Stream Multiple Binder
- Spring
- 2021. 9. 18.
이번글에서는 Spring Cloud Stream 을 사용할 때 Multiple Binder 를 사용하는 방법에 대해 알아보겠습니다.
1. RabbitMQ Bindings & Binders
먼저 Bindings 와 Binders 에 대해 간단히 살펴보겠습니다.
일반적으로 rabbitMQ binder(message broker)를 한개만 사용할 경우의 config 설정이 다음과 같을 때
spring
cloud:
stream:
bindings:
"like.event":
destination: like.event.exchange
contentType: application/json
binder: rabbit-like
binders:
rabbit-like:
type: rabbit
environment:
spring:
rabbitmq:
host: 172.21.1.123
virtual-host: /minholee93
username: admin
password: admin
rabbit:
bindings:
"like.event":
producer:
exchangeType: fanout
bindings 와 binders 가 가지는 의미는 다음과 같습니다.
- bindings : 메세지를 전송할 채널정보 (rabbitMQ - exchange / kafka - topic)
- binders : 메시지 broker 정보 (rabbitMQ / kafka)
이때 destination 은 실제로 메시지를 보낼 binders(message broker)의 target exchange 혹은 topic 라고 이해하면 됩니다.
대략적인 흐름도는 다음과 같습니다.
procuder 에서는 다음과 같이 bindings 정보를 가지고 있는 deliver 를 선언하면
public interface LikeEventDeliver {
String OUTPUT ="like.event";
@Output(LikeEventDeliver.OUTPUT)
MessageChannel output();
}
messageBuilder 를 사용해 간단히 메시지를 전송할 수 있습니다.
@Slf4j
@RequiredArgsConstructor
@EnableBinding(LikeEventDeliver.class)
@Service
public class LikeEventPublishImpl implements LikeEventPublish {
private final LikeEventDeliver likeEventDeliver;
private void publish(LikeEvent likeEvent) {
try {
likeEventDeliver
.output()
.send(MessageBuilder.withPayload(likeEvent).build());
} catch (Exception e) {
log.error("[ERROR] 메시지 전송 실패", e);
}
}
}
2. RabbitMQ Multiple Binders
만약 한개 이상의 binders(broker)를 사용한다면 다음과 같이 config 를 선언해 사용할 수 있습니다.
spring
cloud:
stream:
bindings:
"like.event":
destination: like.event.exchange
contentType: application/json
binder: rabbit-like
"dislike.event":
destination: dislike.event.exchange
contentType: application/json
binder: rabbit-dislike
binders:
rabbit-like:
type: rabbit
environment:
spring:
rabbitmq:
host: 172.21.1.123
virtual-host: /minholee93
username: admin
password: admin
rabbit-dislike:
type: rabbit
environment:
spring:
rabbitmq:
host: 172.21.1.456
virtual-host: /minholee93
username: admin
password: admin
rabbit:
bindings:
"like.event":
producer:
exchangeType: fanout
"dislike.event":
producer:
exchangeType: fanout
hystrix:
stream:
queue:
enabled: false
대략적인 흐름도는 다음과 같습니다.
주의점으론 만약 프로젝트 내에서 hystirx stream 을 사용하고 있을 경우
hystrixStreamOutput 에 hystirx 관련 metrix 을 자동으로 전송하게 되어있는데.
multiple binders를 사용하게되면.. hystrix 가 stream 관련 metrix 정보를 보낼 binder 를 선택하지 못해
(여러개의 binder 들 중 어디에 hystrixStream 을 보내야할까? 선택하지 못하는 는 듯하다..)
'no default binder has been set' 에러가 발생하게 됩니다.
org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there is more than one binder available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : rabbit-like,rabbit-dislike, and no default binder has been set.
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202)
위와 같은 상황에서는
다음과 같이 queue 에서만 hystirx stream 을 끄도록 설정하다면 위 문제를 간단히 해결할 수 있습니다.
hystrix:
stream:
queue:
enabled: false
3. Kafka Bindings & Binders
kafka 도 rabbitMQ 와 동일하게 bindings 와 binders 를 선언해 cloud stream 을 사용할 수 있습니다.
spring:
cloud:
stream:
bindings:
"listen.event":
destination: listen.event.topic
contentType: application/json
binder: kafka-listen
binders:
kafka-listen:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: "172.21.100.123:9092,172.21.100.456:9092,172.21.100.789:9092"
autoCreateTopics: false
kafka:
bindings:
"listen.event":
producer:
configuration:
acks: all
compression.type: lz4
linger.ms: 100
batch.size: 16384
max.in.flight.requests.per.connection: 5
enable.idempotence: true
retries: 1
이때 binder(cloud stream)에 설정가능한 항목들과 producer(kafka)에 설정가능한 값들이 각각 존재하는데.
각 입력 가능한 값들은 다음의 링크에서 확인할 수 있습니다.
- binder : kafka binder properties
- producer.configuration : kafka producer configurations
당연히 kafka 와 rabbitmq 를 조합해 multiple binder 로도 사용할 수 있습니다.
이때의 config 예시는 다음과 같습니다.
spring:
cloud:
stream:
bindings:
"like.event":
destination: like.event.exchange
contentType: application/json
binder: rabbit-like
"dislike.event":
destination: dislike.event.exchange
contentType: application/json
binder: rabbit-dislike
"listen.event":
destination: listen.event.topic
contentType: application/json
binder: kafka-listen
binders:
kafka-listen:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: "172.21.100.123:9092,172.21.100.456:9092,172.21.100.789:9092"
autoCreateTopics: false
rabbit-like:
type: rabbit
environment:
spring:
rabbitmq:
host: 172.21.1.123
virtual-host: /minholee93
username: admin
password: admin
rabbit-dislike:
type: rabbit
environment:
spring:
rabbitmq:
host: 172.21.1.456
virtual-host: /minholee93
username: admin
password: admin
kafka:
bindings:
"listen.event":
producer:
configuration:
acks: all
compression.type: lz4
linger.ms: 100
batch.size: 16384
max.in.flight.requests.per.connection: 5
enable.idempotence: true
retries: 1
rabbit:
bindings:
"like.event":
producer:
exchangeType: fanout
"dislike.event":
producer:
exchangeType: fanout
hystrix:
stream:
queue:
enabled: false
'Spring' 카테고리의 다른 글
[Spring] Sleuth Remove Message Header (0) | 2021.09.18 |
---|---|
[Spring] Json with MultipartFile (3) | 2021.09.02 |