[RabbitMQ] Retry Mechanism

반응형

이번 글에서는 RabbitMQ의 Wait Queue와 Dead Queue를 사용한 메세지 처리 Retry mechanism에 대해 알아보겠습니다.

0. Retry Mechanism이란?

메시지 처리에 실패했을때 실패한 메시지에 대한 error handling 방법을 구조화한 structure 입니다.

이전 글에서 처리에 실패한 메시지는 DLX(Dead Letter Exchnage)로 보내어 적절한 error handling 과정을 거쳐야된다고 말씀드렸습니다. 이를 좀더 구체화하면 아래 그림과 같은 Retry Mechanism으로 표현할 수 있습니다.

image.png

✔ Retry Mechanism
0. "work exchnage"에서 "work queue"로 메시지를 전달한다.

  1. "work queue" 에서 메시지를 처리한다.
  2. "work queue" 메시지 처리에 실패하면 메시지의 Retry Count를 확인한다.
  3. 메세지의 Retry Count가 N보다 클 경우에는 "dead exchnage"로 보낸다.
  4. 메세지의 Retry Count가 N보다 작을 경우에는 "wait exchnage"로 보낸다.
  5. "wait queue"에서 TTL 시간 이상 거주한 메시지는 "work exchnage"로 보낸다.

위처럼 retry 전에 wait queue에서 머무는 방법을 통해, 동일한 에러발생 가능성을 조금이라도 줄일 수 있습니다.

Exchange의 구조는 다음과 같습니다.

image.png

Queue의 구조는 다음과 같습니다.

image.png

1. RabbitMQ Message Header

Retry Mechanism을 구현하기 위해선, 해당 메세지의 처리시도 횟수를 확인할 수 있어야 합니다.

RabbitMQ는 Header의 X-Death 변수가 해당 값을 가지고 있습니다. 따라서 이를 확인해 wait exchange로 보낼지 dead exchange로 보낼지 결정하면 됩니다.

image.png

✔ 모든 Source Code는 https://www.udemy.com/course/rabbitmq-java-spring-boot-for-system-integration/ 에서 가져왔습니다.

RabbitMQ Header

/**
 * Represents RabbitMQ Header. Tested on RabbitMQ 3.7.x.
 * 
 * @author timpamungkas
 */
public class RabbitmqHeader {

    private static final String KEYWORD_QUEUE_WAIT = "wait";
    private List<RabbitmqHeaderXDeath> xDeaths = new ArrayList<>(2);
    private String xFirstDeathExchange = StringUtils.EMPTY;
    private String xFirstDeathQueue = StringUtils.EMPTY;
    private String xFirstDeathReason = StringUtils.EMPTY;

    @SuppressWarnings("unchecked")
    public RabbitmqHeader(Map<String, Object> headers) {
        if (headers != null) {
            var xFirstDeathExchange = Optional.ofNullable(headers.get("x-first-death-exchange"));
            var xFirstDeathQueue = Optional.ofNullable(headers.get("x-first-death-queue"));
            var xFirstDeathReason = Optional.ofNullable(headers.get("x-first-death-reason"));

            xFirstDeathExchange.ifPresent(s -> this.setxFirstDeathExchange(s.toString()));
            xFirstDeathQueue.ifPresent(s -> this.setxFirstDeathQueue(s.toString()));
            xFirstDeathReason.ifPresent(s -> this.setxFirstDeathReason(s.toString()));

            var xDeathHeaders = (List<Map<String, Object>>) headers.get("x-death");

            if (xDeathHeaders != null) {
                for (Map<String, Object> x : xDeathHeaders) {
                    RabbitmqHeaderXDeath hdrDeath = new RabbitmqHeaderXDeath();
                    var reason = Optional.ofNullable(x.get("reason"));
                    var count = Optional.ofNullable(x.get("count"));
                    var exchange = Optional.ofNullable(x.get("exchange"));
                    var queue = Optional.ofNullable(x.get("queue"));
                    var routingKeys = Optional.ofNullable(x.get("routing-keys"));
                    var time = Optional.ofNullable(x.get("time"));

                    reason.ifPresent(s -> hdrDeath.setReason(s.toString()));
                    count.ifPresent(s -> hdrDeath.setCount(Integer.parseInt(s.toString())));
                    exchange.ifPresent(s -> hdrDeath.setExchange(s.toString()));
                    queue.ifPresent(s -> hdrDeath.setQueue(s.toString()));
                    routingKeys.ifPresent(r -> {
                        var listR = (List<String>) r;
                        hdrDeath.setRoutingKeys(listR);
                    });
                    time.ifPresent(d -> hdrDeath.setTime((Date) d));

                    xDeaths.add(hdrDeath);
                }
            }
        }
    }

    public int getFailedRetryCount() {
        // get from queue "wait"
        for (var xDeath : xDeaths) {
            if (xDeath.getExchange().toLowerCase().endsWith(KEYWORD_QUEUE_WAIT)
                    && xDeath.getQueue().toLowerCase().endsWith(KEYWORD_QUEUE_WAIT)) {
                return xDeath.getCount();
            }
        }

        return 0;
    }

    public List<RabbitmqHeaderXDeath> getxDeaths() {
        return xDeaths;
    }

    public String getxFirstDeathExchange() {
        return xFirstDeathExchange;
    }

    public String getxFirstDeathQueue() {
        return xFirstDeathQueue;
    }

    public String getxFirstDeathReason() {
        return xFirstDeathReason;
    }

    public void setxDeaths(List<RabbitmqHeaderXDeath> xDeaths) {
        this.xDeaths = xDeaths;
    }

    public void setxFirstDeathExchange(String xFirstDeathExchange) {
        this.xFirstDeathExchange = xFirstDeathExchange;
    }

    public void setxFirstDeathQueue(String xFirstDeathQueue) {
        this.xFirstDeathQueue = xFirstDeathQueue;
    }

    public void setxFirstDeathReason(String xFirstDeathReason) {
        this.xFirstDeathReason = xFirstDeathReason;
    }

}

😎!!?

위의 Header에서 getFailedRetryCount() 메서드를 자세히 살펴보면, wait exchange와 wait queue에서 생성된 retry count만 체크하고 있습니다. RabbitMQ의 메세지 Header에는 실제로 많은 값들의 정보가 담겨있기 때문에 retry mechansim의 critera을 제대로 체크하기 위해선, wait exchange와 wait queue에서 생성된 XDeath 값만 선별해서 확인해야합니다.

public int getFailedRetryCount() {
        // get from queue "wait"
        for (var xDeath : xDeaths) {
            if (xDeath.getExchange().toLowerCase().endsWith(KEYWORD_QUEUE_WAIT)
                    && xDeath.getQueue().toLowerCase().endsWith(KEYWORD_QUEUE_WAIT)) {
                return xDeath.getCount();
            }
        }

        return 0;
    }

RabbitMQ Header XDeath

/**
 * Represents RabbitMQ Header, part x-death. Tested on RabbitMQ 3.7.x.
 * 
 * @author timpamungkas
 */
public class RabbitmqHeaderXDeath {

    private int count;
    private String exchange;
    private String queue;
    private String reason;
    private List<String> routingKeys;
    private Date time;

    @Override
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null) {
            return false;
        }
        if (getClass() != obj.getClass()) {
            return false;
        }
        RabbitmqHeaderXDeath other = (RabbitmqHeaderXDeath) obj;
        if (count != other.count) {
            return false;
        }
        if (exchange == null) {
            if (other.exchange != null) {
                return false;
            }
        } else if (!exchange.equals(other.exchange)) {
            return false;
        }
        if (queue == null) {
            if (other.queue != null) {
                return false;
            }
        } else if (!queue.equals(other.queue)) {
            return false;
        }
        if (reason == null) {
            if (other.reason != null) {
                return false;
            }
        } else if (!reason.equals(other.reason)) {
            return false;
        }
        if (routingKeys == null) {
            if (other.routingKeys != null) {
                return false;
            }
        } else if (!routingKeys.equals(other.routingKeys)) {
            return false;
        }
        if (time == null) {
            if (other.time != null) {
                return false;
            }
        } else if (!time.equals(other.time)) {
            return false;
        }
        return true;
    }

    public int getCount() {
        return count;
    }

    public String getExchange() {
        return exchange;
    }

    public String getQueue() {
        return queue;
    }

    public String getReason() {
        return reason;
    }

    public List<String> getRoutingKeys() {
        return routingKeys;
    }

    public Date getTime() {
        return time;
    }

    @Override
    public int hashCode() {
        final int prime = 19;
        int result = 1;
        result = prime * result + count;
        result = prime * result + ((exchange == null) ? 0 : exchange.hashCode());
        result = prime * result + ((queue == null) ? 0 : queue.hashCode());
        result = prime * result + ((reason == null) ? 0 : reason.hashCode());
        result = prime * result + ((routingKeys == null) ? 0 : routingKeys.hashCode());
        result = prime * result + ((time == null) ? 0 : time.hashCode());
        return result;
    }

    public void setCount(int count) {
        this.count = count;
    }

    public void setExchange(String exchange) {
        this.exchange = exchange;
    }

    public void setQueue(String queue) {
        this.queue = queue;
    }

    public void setReason(String reason) {
        this.reason = reason;
    }

    public void setRoutingKeys(List<String> routingKeys) {
        this.routingKeys = routingKeys;
    }

    public void setTime(Date time) {
        this.time = time;
    }

}

2. Error Handling

위의 클래스를 활용해 Error handling process를 구현한 클래스는 아래와 같습니다.

/**
 * 
 * <p>
 * Generic class to handle RabbitMQ proccessing error that might occur on
 * <code>try-catch</code>. This will not handle invalid message conversion
 * though (for example if you has Employee JSON structure to process, but got
 * Animal JSON structure instead from Rabbit MQ queue).
 * </p>
 * 
 * <p>
 * In short, this is just a class to avoid boilerplate codes for your handler.
 * Default implementation is re-throw message to dead letter exchange, using
 * <code>DlxProcessingErrorHandler</code> class. The basic usage of the
 * interface is :<br/>
 * 
 * <pre>
 * public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
 *     var jsonObjectToBeProcessed = null;
 * 
 *     try {
 *         jsonObjectToBeProcessed = objectMapper.readValue(new String(message.getBody()),
 *                 JsonObjectToBeProcessed.class);
 * 
 *         // do real processing here
 *         // ...
 *         //
 * 
 *         channel.basicAck(tag, false);
 *     } catch (Exception e) {
 *         processingErrorHandler.handleErrorProcessingMessage(message, channel, tag);
 *     }
 * }
 * </pre>
 * 
 * @author timpamungkas
 *
 */
public class DlxProcessingErrorHandler {

    private static final Logger log = LoggerFactory.getLogger(DlxProcessingErrorHandler.class);

    /**
     * Dead exchange name
     */
    @NonNull
    private String deadExchangeName;

    private int maxRetryCount = 3;

    /**
     * Constructor. Will retry for n times (default is 3) and on the next retry will
     * consider message as dead, put it on dead exchange with given
     * <code>dlxExchangeName</code> and <code>routingKey</code>
     * 
     * @param deadExchangeName dead exchange name. Not a dlx for work queue, but
     *                         exchange name for really dead message (wont processed
     *                         antmore).
     * @throws IllegalArgumentException if <code>dlxExchangeName</code> or
     *                                  <code>dlxRoutingKey</code> is null or empty.
     */
    public DlxProcessingErrorHandler(String deadExchangeName) throws IllegalArgumentException {
        super();

        if (StringUtils.isAnyEmpty(deadExchangeName)) {
            throw new IllegalArgumentException("Must define dlx exchange name");
        }

        this.deadExchangeName = deadExchangeName;
    }

    /**
     * Constructor. Will retry for <code>maxRetryCount</code> times and on the next
     * retry will consider message as dead, put it on dead exchange with given
     * <code>dlxExchangeName</code> and <code>routingKey</code>
     * 
     * @param deadExchangeName dead exchange name. Not a dlx for work queue, but
     *                         exchange name for really dead message (wont processed
     *                         antmore).
     * @param maxRetryCount    number of retry before message considered as dead (0
     *                         >= <code> maxRetryCount</code> >= 1000). If set less
     *                         than 0, will always retry
     * @throws IllegalArgumentException if <code>dlxExchangeName</code> or
     *                                  <code>dlxRoutingKey</code> is null or empty.
     */

    public DlxProcessingErrorHandler(String deadExchangeName, int maxRetryCount) {
        this(deadExchangeName);
        setMaxRetryCount(maxRetryCount);
    }

    public String getDeadExchangeName() {
        return deadExchangeName;
    }

    public int getMaxRetryCount() {
        return maxRetryCount;
    }

    /**
     * Handle AMQP message consume error. This default implementation will put
     * message to dead letter exchange for <code>maxRetryCount</code> times, thus
     * two variables are required when creating this object:
     * <code>dlxExchangeName</code> and <code>dlxRoutingKey</code>. <br/>
     * <code>maxRetryCount</code> is 3 by default, but you can set it using
     * <code>setMaxRetryCount(int)</code>
     * 
     * @param message AMQP message that caused error
     * @param channel channel for AMQP message
     * @param tag     message delivery tag
     * @return <code>true</code> if error handler works sucessfully,
     *         <code>false</code> otherwise
     */
    public boolean handleErrorProcessingMessage(Message message, Channel channel) {
        var rabbitMqHeader = new RabbitmqHeader(message.getMessageProperties().getHeaders());

        try {
            if (rabbitMqHeader.getFailedRetryCount() >= maxRetryCount) {
                // publish to dead and ack
                log.warn("[DEAD] Error at " + new Date() + " on retry " + rabbitMqHeader.getFailedRetryCount()
                        + " for message " + message);

                channel.basicPublish(getDeadExchangeName(), message.getMessageProperties().getReceivedRoutingKey(),
                        null, message.getBody());
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                log.debug("[REQUEUE] Error at " + new Date() + " on retry " + rabbitMqHeader.getFailedRetryCount()
                        + " for message " + message);

                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            }
            return true;
        } catch (IOException e) {
            log.warn("[HANDLER-FAILED] Error at " + new Date() + " on retry " + rabbitMqHeader.getFailedRetryCount()
                    + " for message " + message);
        }

        return false;
    }

    public void setMaxRetryCount(int maxRetryCount) throws IllegalArgumentException {
        if (maxRetryCount > 1000) {
            throw new IllegalArgumentException("max retry must between 0-1000");
        }

        this.maxRetryCount = maxRetryCount;
    }

}

3. Consumer with Retry Mechanism

마지막으로 error handling process를 사용하는 Consumer는 다음과 같습니다.

@Service
public class RetryImageConsumer {

    private static final String DEAD_EXCHANGE_NAME = "x.guideline.dead";

    private static final Logger log = LoggerFactory.getLogger(RetryImageConsumer.class);
    private DlxProcessingErrorHandler dlxProcessingErrorHandler;

    private ObjectMapper objectMapper;

    public RetryImageConsumer() {
        this.objectMapper = new ObjectMapper();
        this.dlxProcessingErrorHandler = new DlxProcessingErrorHandler(DEAD_EXCHANGE_NAME);
    }

    @RabbitListener(queues = "q.guideline.image.work")
    public void listen(Message message, Channel channel)
            throws InterruptedException, JsonParseException, JsonMappingException, IOException {
        try {
            var p = objectMapper.readValue(message.getBody(), Picture.class);
            // process the image
            if (p.getSize() > 9000) {
                // throw exception, we will use DLX handler for retry mechanism
                throw new IOException("Size too large");
            } else {
                log.info("Creating thumbnail & publishing : " + p);
                // you must acknowledge that message already processed
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        } catch (IOException e) {
            log.warn("Error processing message : " + new String(message.getBody()) + " : " + e.getMessage());
            dlxProcessingErrorHandler.handleErrorProcessingMessage(message, channel);
        }
    }

}

4. Retry Exchange

위의 Retry Mechanism을 한번 더 확장하면 아래와 같은 구조를 사용할 수 있습니다.

image.png

위의 구조에서는 routing key를 사용하는 "retry exchange"를 통해 에러가 발생한 "work queue"에만 메세지를 직접 전달합니다. ("work exchange"는 fanout 이라고 가정합니다.)

😎!!!

"retry exchange"에서 "work queue"로 데이터를 보낼때 routing key를 사용해야하므로, "work queue"에서 "wait exchange"로 메세지를 reject 할때에는 해당 메세지에 반드시 'routing key'를 담아서 보내줘야합니다.


참고 자료 : https://www.udemy.com/course/rabbitmq-java-spring-boot-for-system-integration/


추천서적

 

RabbitMQ 따라잡기:AMQP 기반의 오픈소스 메시지 브로커

COUPANG

www.coupang.com

파트너스 활동을 통해 일정액의 수수료를 제공받을 수 있음


반응형

댓글

Designed by JB FACTORY