[Spring] Cloud Stream Multiple Binder

반응형

이번글에서는 Spring Cloud Stream 을 사용할 때 Multiple Binder 를 사용하는 방법에 대해 알아보겠습니다.

1. RabbitMQ Bindings & Binders

먼저 Bindings 와 Binders 에 대해 간단히 살펴보겠습니다.

일반적으로 rabbitMQ binder(message broker)를 한개만 사용할 경우의 config 설정이 다음과 같을 때

spring
  cloud:
    stream:
      bindings:
        "like.event":
          destination: like.event.exchange
          contentType: application/json
          binder: rabbit-like
      binders:
        rabbit-like:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 172.21.1.123
                virtual-host: /minholee93
                username: admin
                password: admin
      rabbit:
        bindings:
          "like.event":
            producer:
              exchangeType: fanout

bindings 와 binders 가 가지는 의미는 다음과 같습니다.

  • bindings : 메세지를 전송할 채널정보 (rabbitMQ - exchange / kafka - topic)
  • binders : 메시지 broker 정보 (rabbitMQ / kafka)

이때 destination 은 실제로 메시지를 보낼 binders(message broker)의 target exchange 혹은 topic 라고 이해하면 됩니다.

대략적인 흐름도는 다음과 같습니다.

procuder 에서는 다음과 같이 bindings 정보를 가지고 있는 deliver 를 선언하면

public interface LikeEventDeliver {

    String OUTPUT ="like.event";

    @Output(LikeEventDeliver.OUTPUT)
    MessageChannel output();
}

messageBuilder 를 사용해 간단히 메시지를 전송할 수 있습니다.

@Slf4j
@RequiredArgsConstructor
@EnableBinding(LikeEventDeliver.class)
@Service
public class LikeEventPublishImpl implements LikeEventPublish {

    private final LikeEventDeliver likeEventDeliver;

    private void publish(LikeEvent likeEvent) {

        try {
            likeEventDeliver
                    .output()
                    .send(MessageBuilder.withPayload(likeEvent).build());
        } catch (Exception e) {
            log.error("[ERROR] 메시지 전송 실패", e);
        }
    }
}

2. RabbitMQ Multiple Binders

만약 한개 이상의 binders(broker)를 사용한다면 다음과 같이 config 를 선언해 사용할 수 있습니다.

spring
  cloud:
    stream:
      bindings:
        "like.event":
          destination: like.event.exchange
          contentType: application/json
          binder: rabbit-like
        "dislike.event":
          destination: dislike.event.exchange
          contentType: application/json
          binder: rabbit-dislike
      binders:
        rabbit-like:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 172.21.1.123
                virtual-host: /minholee93
                username: admin
                password: admin
        rabbit-dislike:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 172.21.1.456
                virtual-host: /minholee93
                username: admin
                password: admin
      rabbit:
        bindings:
          "like.event":
            producer:
              exchangeType: fanout
          "dislike.event":
            producer:
              exchangeType: fanout
hystrix:
  stream:
    queue:
      enabled: false      

대략적인 흐름도는 다음과 같습니다.

주의점으론 만약 프로젝트 내에서 hystirx stream 을 사용하고 있을 경우

hystrixStreamOutput 에 hystirx 관련 metrix 을 자동으로 전송하게 되어있는데.

multiple binders를 사용하게되면.. hystrix 가 stream 관련 metrix 정보를 보낼 binder 를 선택하지 못해
(여러개의 binder 들 중 어디에 hystrixStream 을 보내야할까? 선택하지 못하는 는 듯하다..)

'no default binder has been set' 에러가 발생하게 됩니다.

org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there is more than one binder available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : rabbit-like,rabbit-dislike, and no default binder has been set.
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202)

위와 같은 상황에서는

다음과 같이 queue 에서만 hystirx stream 을 끄도록 설정하다면 위 문제를 간단히 해결할 수 있습니다.

hystrix:
  stream:
    queue:
      enabled: false

3. Kafka Bindings & Binders

kafka 도 rabbitMQ 와 동일하게 bindings 와 binders 를 선언해 cloud stream 을 사용할 수 있습니다.

spring:
  cloud:
    stream:
      bindings:
        "listen.event":
          destination: listen.event.topic
          contentType: application/json
          binder: kafka-listen
      binders:
        kafka-listen:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: "172.21.100.123:9092,172.21.100.456:9092,172.21.100.789:9092"
                      autoCreateTopics: false
      kafka:
        bindings:
          "listen.event":
            producer:
              configuration:
                acks: all
                compression.type: lz4
                linger.ms: 100
                batch.size: 16384
                max.in.flight.requests.per.connection: 5
                enable.idempotence: true
                retries: 1

이때 binder(cloud stream)에 설정가능한 항목들과 producer(kafka)에 설정가능한 값들이 각각 존재하는데.

각 입력 가능한 값들은 다음의 링크에서 확인할 수 있습니다.

당연히 kafka 와 rabbitmq 를 조합해 multiple binder 로도 사용할 수 있습니다.

이때의 config 예시는 다음과 같습니다.

spring:
  cloud:
    stream:
      bindings:
        "like.event":
          destination: like.event.exchange
          contentType: application/json
          binder: rabbit-like
        "dislike.event":
          destination: dislike.event.exchange
          contentType: application/json
          binder: rabbit-dislike
        "listen.event":
          destination: listen.event.topic
          contentType: application/json
          binder: kafka-listen
      binders:
        kafka-listen:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: "172.21.100.123:9092,172.21.100.456:9092,172.21.100.789:9092"
                      autoCreateTopics: false
        rabbit-like:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 172.21.1.123
                virtual-host: /minholee93
                username: admin
                password: admin
        rabbit-dislike:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 172.21.1.456
                virtual-host: /minholee93
                username: admin
                password: admin
      kafka:
        bindings:
          "listen.event":
            producer:
              configuration:
                acks: all
                compression.type: lz4
                linger.ms: 100
                batch.size: 16384
                max.in.flight.requests.per.connection: 5
                enable.idempotence: true
                retries: 1
     rabbit:
       bindings:
         "like.event":
           producer:
             exchangeType: fanout
         "dislike.event":
           producer:
             exchangeType: fanout
hystrix:
  stream:
    queue:
      enabled: false             

반응형

'Spring' 카테고리의 다른 글

[Spring] Sleuth Remove Message Header  (0) 2021.09.18
[Spring] Json with MultipartFile  (3) 2021.09.02

댓글

Designed by JB FACTORY