반응형
문제
kafka에 메세지를 전송하려고 할때 다음과 같이 에러가 발생하며 메시지 전송을 실패한다.
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5283b1f8]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert key of class java.lang.String to class org.apache.kafka.common.serialization.ByteArraySerializer specified in key.serializer
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:186)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert key of class java.lang.String to class org.apache.kafka.common.serialization.ByteArraySerializer specified in key.serializer
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:833)
원인
메시지 전송시 'String' type 의 key 값을 setting 했는데..
MessageBuilder
.withPayload(userEvent)
.setHeader(KafkaHeaders.MESSAGE_KEY, characterNo.toString()).build());
kafka 의 default key serializer 가 'ByteArraySerializer' 이기 때문에 발생하는 문제이다.
Can't convert key of class java.lang.String to class org.apache.kafka.common.serialization.ByteArraySerializer specified in key.serializer
해결
cloud stream 을 사용하고 있다면
다음과 같이 적절한 kafka message key serializer 를 producer configuartion 에 등록해주면 된다.
spring:
cloud:
stream:
bindings:
output:
destination: output.topic
contentType: application/json
binder: kafka-binder
binders:
kafka-binder:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: "172.21.1.1:90902"
kafka:
bindings:
output:
producer:
configuration:
key.serializer : org.apache.kafka.common.serialization.StringSerializer
반응형