こちらを参考にしながらKafka consumerを作って実行したら、
java.io.IOException: Can’t resolve address: fluentd01.example.com:9092って言われた。
17:51:56.640 [main] DEBUG o.apache.kafka.clients.NetworkClient - No node found. Trying previously-seen node with ID 0
17:51:56.640 [main] DEBUG o.apache.kafka.clients.NetworkClient - Give up sending metadata request since no node is available
17:51:56.740 [main] DEBUG o.apache.kafka.clients.NetworkClient - Initialize connection to node 0 for sending metadata request
17:51:56.740 [main] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at fluentd01.example.com:9092.
17:51:56.742 [main] DEBUG o.apache.kafka.clients.NetworkClient - Error connecting to node 0 at fluentd01.example.com:9092:
java.io.IOException: Can't resolve address: fluentd01.example.com:9092
at org.apache.kafka.common.network.Selector.connect(Selector.java:156) ~[kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:514) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:49) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:672) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:568) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:268) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:126) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:186) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:857) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829) [kafka-clients-0.9.0.0.jar:na]
at poc.AtMostOnceConsumer.processRecords(AtMostOnceConsumer.java:95) [classes/:na]
at poc.AtMostOnceConsumer.execute(AtMostOnceConsumer.java:62) [classes/:na]
at poc.AtMostOnceConsumer.main(AtMostOnceConsumer.java:50) [classes/:na]
Caused by: java.nio.channels.UnresolvedAddressException: null
at sun.nio.ch.Net.checkAddress(Net.java:101) ~[na:1.8.0_101]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[na:1.8.0_101]
at org.apache.kafka.common.network.Selector.connect(Selector.java:153) ~[kafka-clients-0.9.0.0.jar:na]
... 15 common frames omitted
「fluentd01.example.com」なんてソースのどこにも書いてねーぞ…とか思ったが、/etc/hosts に書いてあった。
なので、以下のとおりadvertised.listenersで設定してあげると、
うまいことBrokerからトピックを取得できた。
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://[サーバーのIP]:9092