[ERROR] Can't convert key of class java.lang.String to class org.apache.kafka.common.serialization.ByteArraySerializer specified in key.serializer

반응형

문제

kafka에 메세지를 전송하려고 할때 다음과 같이 에러가 발생하며 메시지 전송을 실패한다.

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5283b1f8]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert key of class java.lang.String to class org.apache.kafka.common.serialization.ByteArraySerializer specified in key.serializer
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:186)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert key of class java.lang.String to class org.apache.kafka.common.serialization.ByteArraySerializer specified in key.serializer
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:833)

원인

메시지 전송시 'String' type 의 key 값을 setting 했는데..

MessageBuilder
.withPayload(userEvent)
.setHeader(KafkaHeaders.MESSAGE_KEY, characterNo.toString()).build());

kafka 의 default key serializer 가 'ByteArraySerializer' 이기 때문에 발생하는 문제이다.

Can't convert key of class java.lang.String to class org.apache.kafka.common.serialization.ByteArraySerializer specified in key.serializer

해결

cloud stream 을 사용하고 있다면

다음과 같이 적절한 kafka message key serializer 를 producer configuartion 에 등록해주면 된다.

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: output.topic
          contentType: application/json
          binder: kafka-binder
      binders:
        kafka-binder:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: "172.21.1.1:90902"
      kafka:
        bindings:
          output:
            producer:
              configuration:
                key.serializer : org.apache.kafka.common.serialization.StringSerializer

반응형

댓글

Designed by JB FACTORY