1. 문제
local 에서 java 로 구현한 producer 에서 aws ec2 에서 구동중인 kafka broker 에 메세지를 제대로 전달하지 못한다.
aws ec2 security group 의 TCP inbound rule 에 본인 local machine 의 접근은 허용되어 있다고 가정한다.
kafka client 로 구현한 java producer 예제는 다음과 같다.
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "ec2-3-35-218-127.ap-northeast-2.compute.amazonaws.com:9092,ec2-13-209-22-148.ap-northeast-2.compute.amazonaws" +
".com:9092,ec2-3-34-134-146.ap-northeast-2.compute.amazonaws.com:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>("minholee93-topic", "1", "Apache Kafka is a distributed streaming platform"));
producer.close();
}
}
kafka producer 관련 java 예제는 이 글을 참고했다.
위 소스를 실행시켜 kafka cluster 에 메세지를 발행하면 다음과 같이 에러가 발생한다.
java.io.IOException: Can't resolve address: ip-172-31-46-151.ap-northeast-2.compute.internal:9092
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235)
at org.apache.kafka.common.network.Selector.connect(Selector.java:214)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:793)
at org.apache.kafka.clients.NetworkClient.access$700(NetworkClient.java:62)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:944)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:848)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:458)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:176)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.UnresolvedAddressException: null
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233)
... 9 common frames omitted
2. 원인
kafka broker 에 message 를 전송하기 위해 producer 는 다음의 단계를 거치는데
1) kafka cluster 중 한개의 broker 와 connection 을 맺은 뒤 cluster broker 전체 list 정보를 metadata 로 return 받는다
2) return 받은 metadata 를 확인해서 leader broker 의 hostname/ip port 를 확인한 뒤 해당 leader broker 에 메세지를 전송함
return 받은 metadata의 hostname/port 가 unresolvable 즉 접근할 수 없는 값 이기 떄문이다.
metadata 로 전달받은 ec2 broker cluster hostname/port 는 아래와 같다.
broker 2 at ip-172-31-46-152.ap-northeast-2.compute.internal:9092
broker 3 at ip-172-31-46-151.ap-northeast-2.compute.internal:9092
broker 1 at ip-172-31-32-4.ap-northeast-2.compute.internal:9092
위 값을 보면 aws ec2의 private DNS를 전달했는데
정상적으로 local machine 에서 kafka cluster 에 접근하기 위해선 public DNS를 전달받아야 한다.
3. 해결
각 broker 의 server.properties 값을 다음과 같이 수정한다.
예를 들어 broker 가 구동중인 ec2 의 public DNS 가 ec2-3-35-218-107.ap-northeast-2.compute.amazonaws.com 일 경우
vi 로 server.properties 파일을 열어 다음과 같이 advertised.listeners 를 추가한다.
advertised.listeners=PLAINTEXT://ec2-3-35-218-107.ap-northeast-2.compute.amazonaws.com:9092
listener 관련 자세한 내용은 이 글을 참고한다.