How does Kafka obtain the results from consumers?
Consumers in Kafka can retrieve results by fetching records. Here are the steps for obtaining Kafka consumer results using the Java API:
- Create Kafka consumer configuration.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- Create an instance of a Kafka consumer.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- Subscribe to topics that interest you.
consumer.subscribe(Collections.singletonList("my-topic"));
- Retrieve records in a loop.
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received record: " + record.value());
}
}
In the above code, the poll() method is used to retrieve records, with a parameter for the maximum wait time. The ConsumerRecords object contains a batch of consumed records, which you can access by iterating through each ConsumerRecord object.
Please note that the sample code provided is a simplified version. In actual use, more configurations and processing logic may be required.