更新時(shí)間:2023年10月19日11時(shí)37分 來源:傳智教育 瀏覽次數(shù):
在Apache Kafka中,HW(High Watermark)和 LEO(Log End Offset)是與分區(qū)的復(fù)制和消息傳遞相關(guān)的兩個(gè)關(guān)鍵概念。
High Watermark是一個(gè)分區(qū)的消息復(fù)制進(jìn)度的指示器。它表示了已經(jīng)成功復(fù)制到所有副本的消息的位置。HW之前的所有消息都被認(rèn)為是已提交的消息,這意味著消費(fèi)者可以安全地消費(fèi)這些消息。HW通常是消費(fèi)者組維護(hù)的偏移量的參考點(diǎn)。
Log End Offset表示一個(gè)分區(qū)中消息日志的最后一個(gè)位置,即下一條消息要寫入的位置。LEO是動(dòng)態(tài)變化的,因?yàn)橄⒉粩啾蛔芳拥椒謪^(qū)。它表示了分區(qū)中的最新消息位置。
接下來筆者用一段具體的示例代碼,來演示下如何使用Java和Kafka Consumer API來獲取分區(qū)的HW和LEO:
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaHWLEOExample { public static void main(String[] args) { // 設(shè)置Kafka消費(fèi)者的配置 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 創(chuàng)建Kafka消費(fèi)者 Consumer<String, String> consumer = new KafkaConsumer<>(props); // 指定要訂閱的主題 String topic = "my-topic"; consumer.subscribe(Collections.singletonList(topic)); // 獲取分區(qū)信息 PartitionInfo partitionInfo = consumer.partitionsFor(topic).get(0); int partition = partitionInfo.partition(); // 在消費(fèi)者循環(huán)中獲取HW和LEO while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (TopicPartition topicPartition : records.partitions()) { long hw = consumer.position(topicPartition); // 獲取HW long leo = consumer.endOffsets(Collections.singleton(topicPartition)).get(topicPartition); // 獲取LEO System.out.println("Partition " + topicPartition.partition() + ": HW = " + hw + ", LEO = " + leo); } } } }
上面的代碼創(chuàng)建了一個(gè)Kafka消費(fèi)者,并訂閱了一個(gè)主題。在消費(fèi)者循環(huán)中,我們使用position()方法來獲取分區(qū)的HW,并使用endOffsets()方法來獲取分區(qū)的LEO。這可以幫助我們監(jiān)視分區(qū)的消息復(fù)制進(jìn)度和消息日志的結(jié)束位置。
北京校區(qū)