[RabbitMQ] Dirty Queue with Schedule

반응형

이번 글에서는 RabbitMQ Administartion API를 사용해 주기적으로 Dirty Queue를 체크해보겠습니다.

0. Dirty Queue란?

Queue 내부에 메시지가 남아있는 Queue를 의미합니다.

일반적인 경우에, 정상적으로 Consumer 들이 Queue의 메세지를 적절히 처리하고 있다면 Queue 내부에 메세지가 쌓여 있지 않습니다. (물론 prefetch 를 낮게 잡았다면 메세지는 consumer의 메모리가 아닌 Queue 내부에서 process를 기다릴 수 있습니다.)

따라서 Queue 내부에 메세지가 대량으로 쌓여있을 경우에는, 무엇인가 잘못되었을 확률이 있습니다. 이를 체크하고 대응하기위해서는 주기적으로 (예를들면 매일 자정) Dirty Queue를 확인한 후 적절한 대응이 필요합니다.

image.png

이를 구현하기 위해서는 Spring Boot의 @Scheduling 기능을 사용하면 됩니다. 또한, 전체 Queue의 정보를 받기위해선 rabbitmq administartion api를 사용하면 됩니다.

전체 구조를 살펴보면, 아래 그림처럼 @Schedule가 주기적으로 Proxy Server를 호출해 rabbitmq server의 전체 queue 정보를 요청합니다. 이때 queue의 정보를 담아올 그릇으로는 RabbitQueue라는 class를 만들어 사용하면 되겠습니다.

image.png

1. Enable Schedule

먼저 Schdule 기능을 사용하기위해 Application에 @EnableScheulding Annoation을 추가합니다.

@SpringBootApplication
@EnableScheduling
public class RabbitmqProducerApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqProducerApplication.class, args);
    }

    @Override
    public void run(String... args) {

    }
}

2. Scheduler

위에서 @EnableScheduling을 선언했으므로 아래의 sweepDrityQueues() 메서드가 9초마다 주기적으로 실행됩니다. sweepDirtyQueues() 메서드는 proxy server의 getAllQueues() 메서드를 호출해 Queue 내부에 메세지가 남아있는 Dirty Queue들을 반환합니다. 실제 운영하는 어플리케이션이라면, 확인된 dirty queue에 대해 적절한 조치를 취하면 됩니다.

@Service
public class RabbitmqScheduler {

    private static final Logger log = LoggerFactory.getLogger(RabbitmqScheduler.class);

    @Autowired
    private RabbitmqProxyService rabbitmqProxyService;

    @Scheduled(fixedDelay = 90000)
    public void  sweepDirtyQueues(){

        var dirtyQueues = rabbitmqProxyService.getAllQueues().stream().filter(q-> q.isDirty()).collect(Collectors.toList());

        dirtyQueues.forEach(q-> log.info("Queue {} has {} unporcessd message", q.getName(), q.getMessages() ));

    }
}

3. Proxy Server

proxy 서버란 클라이언트가 자신을 통해서 다른 네트워크 서비스에 간접적으로 접속할 수 있게 해 주는 컴퓨터 시스템이나 응용 프로그램을 가리킵니다.

우리는 spring application에서 WebClient를 통해 rabbitmq administration(management) server와 통신할 proxy server를 생성합니다.

@Service
public class RabbitmqProxyService {

    private static final Logger Log = LoggerFactory.getLogger(RabbitmqProxyService.class);

    public List<RabbitmqQueue> getAllQueues() {
        var webClient = WebClient.create("http://localhost:15672/api/queues");

        return webClient.get().header("Authorization", createBasicAuthHeaders()).retrieve()
                .bodyToMono(new ParameterizedTypeReference<List<RabbitmqQueue>>() {
                }).block(Duration.ofSeconds(10));
    }

    public String createBasicAuthHeaders() {
        // username:password for rabbitmq
        var auth = "id:password";
        return "Basic " + Base64.getEncoder().encodeToString(auth.getBytes());
    }

} 

여기서 rabbitmq adminstartion api를 사용하기위해선 "Authorization" 정보를 같이 전달해줘야 합니다. "Authorization" 정보란 rabbitmq server에 등록되어 있는 아이디와 비밀번호를 base64로 encode한 값을 의미합니다.

createBasicAuthHeaders() 메서드의 auth ="id:password"에 각자 rabbitmq 서버의 아이디와 비밀번호를 아래와 같이 입력하면 됩니다.

  public String createBasicAuthHeaders() {
        // username:password for rabbitmq
        var auth = "id:password";
        return "Basic " + Base64.getEncoder().encodeToString(auth.getBytes());
    }

4. Rabbitmq Queue Class

rabbitmq administartion api를 통해 전달받은 queue의 정보를 담을 그릇은 아래와 같습니다. 이번글에서는 queue의 이름과 queue의 메세지의 길이만 필요함으로 아래와 같이 두개의 필드만 생성했습니다.

@JsonIgnoreProperties(ignoreUnknown = true)
public class RabbitmqQueue {

    // message count
    @JsonProperty
    private long messages;

    @JsonProperty
    private String name;

    public long getMessages() {
        return messages;
    }

    public void setMessages(long messages) {
        this.messages = messages;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public boolean isDirty() {
        return messages > 0;
    }

}

5. Spring boot @Scheduled

spring boot에서 사용할 수 있는 @Scheduled에는 여러가지가 있습니다.

5-1) @Scheduled(fixedDelay = N)

이전 execution으로부터의 delay 시간을 의미합니다. 예를 들어 다음 execution이 실행되는 시간은 이전 execution이 완료된 후 N 시간이 지난이후입니다.

5-2) @Scheduled(fixedRate = N)

어플리케이션이 실행된후 매 N 시간마다 execution을 실행합니다. 예를 들어 0초에 이전 execution이 실행되었다면 0+N 시간 후 다음 execution이 실행됩니다.

5-3) @Scheduled(cron="0 * * * * *")

특정 시간에 주기적으로 execution을 실행합니다. 순서대로 "초 분 시 일 월 요일 연도"를 입력하면 됩니다.

0~59 | 0~59 | 0~23 | 1~31 | 1~12 | 0~6 | 생략가능

참고 자료 : https://www.udemy.com/course/rabbitmq-java-spring-boot-for-system-integration/


추천서적

 

RabbitMQ 따라잡기:AMQP 기반의 오픈소스 메시지 브로커

COUPANG

www.coupang.com

파트너스 활동을 통해 일정액의 수수료를 제공받을 수 있음


반응형

'RabbitMQ' 카테고리의 다른 글

[RabbitMQ] RabbitMQ vs Kafka  (0) 2020.08.27
[RabbitMQ] Retry Mechanism with Spring Boot  (0) 2020.08.22
[RabbitMQ] Publisher API  (0) 2020.08.22
[RabbitMQ] Retry Mechanism  (0) 2020.08.22
[RabbitMQ] Dead Letter Exchange & TTL(Time To Live)  (0) 2020.08.22

댓글

Designed by JB FACTORY