更新時(shí)間:2023年06月23日14時(shí)18分 來(lái)源:傳智教育 瀏覽次數(shù):
kafka不會(huì)像其他JMS隊(duì)列那樣需要得到消費(fèi)者的確認(rèn),消費(fèi)者可以使用kafka來(lái)追蹤消息在分區(qū)的位置(偏移量)。
消費(fèi)者會(huì)往一個(gè)叫做_consumer_offset的特殊主題發(fā)送消息,消息里包含了每個(gè)分區(qū)的偏移量。如果消費(fèi)者發(fā)生崩潰或有新的消費(fèi)者加入群組,就會(huì)觸發(fā)再均衡。
正常的情況
如果消費(fèi)者2掛掉以后,會(huì)發(fā)生再均衡,消費(fèi)者2負(fù)責(zé)的分區(qū)會(huì)被其他消費(fèi)者進(jìn)行消費(fèi),再均衡后不可避免會(huì)出現(xiàn)一些問(wèn)題。
問(wèn)題一:
如果提交偏移量小于客戶端處理的最后一個(gè)消息的偏移量,那么處于兩個(gè)偏移量之間的消息就會(huì)被重復(fù)處理。
問(wèn)題二:
如果提交的偏移量大于客戶端的最后一個(gè)消息的偏移量,那么處于兩個(gè)偏移量之間的消息將會(huì)丟失。如果想要解決這些問(wèn)題,還要知道目前kafka提交偏移量的方式:
提交偏移量的方式有兩種,分別是自動(dòng)提交偏移量和手動(dòng)提交。
當(dāng)enable.auto.commit被設(shè)置為true,提交方式就是讓消費(fèi)者自動(dòng)提交偏移量,每隔5秒消費(fèi)者會(huì)自動(dòng)把從poll()方法接收的最大偏移量提交上去
手動(dòng)提交 ,當(dāng)enable.auto.commit被設(shè)置為false可以有以下三種提交方式
提交當(dāng)前偏移量(同步提交)
同步和異步組合提交
把enable.auto.commit設(shè)置為false,讓?xiě)?yīng)用程序決定何時(shí)提交偏移量。使用commitSync()提交偏移量,commitSync()將會(huì)提交poll返回的最新的偏移量,所以在處理完所有記錄后要確保調(diào)用了commitSync()方法。否則還是會(huì)有消息丟失的風(fēng)險(xiǎn)。
只要沒(méi)有發(fā)生不可恢復(fù)的錯(cuò)誤,commitSync()方法會(huì)一直嘗試直至提交成功,如果提交失敗也可以記錄到錯(cuò)誤日志里。
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
System.out.println(record.key());
try {
consumer.commitSync();//同步提交當(dāng)前最新的偏移量
}catch (CommitFailedException e){
System.out.println("記錄提交失敗的異常:"+e);
}
}
}
手動(dòng)提交有一個(gè)缺點(diǎn),那就是當(dāng)發(fā)起提交調(diào)用時(shí)應(yīng)用會(huì)阻塞。當(dāng)然我們可以減少手動(dòng)提交的頻率,但這個(gè)會(huì)增加消息重復(fù)的概率(和自動(dòng)提交一樣)。另外一個(gè)解決辦法是,使用異步提交的API。
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
System.out.println(record.key());
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e!=null){
System.out.println("記錄錯(cuò)誤的提交偏移量:"+ map+",異常信息"+e);
}
}
});
}
異步提交也有個(gè)缺點(diǎn),那就是如果服務(wù)器返回提交失敗,異步提交不會(huì)進(jìn)行重試。相比較起來(lái),同步提交會(huì)進(jìn)行重試直到成功或者最后拋出異常給應(yīng)用。異步提交沒(méi)有實(shí)現(xiàn)重試是因?yàn)?,如果同時(shí)存在多個(gè)異步提交,進(jìn)行重試可能會(huì)導(dǎo)致位移覆蓋。
舉個(gè)例子,假如我們發(fā)起了一個(gè)異步提交commitA,此時(shí)的提交位移為2000,隨后又發(fā)起了一個(gè)異步提交commitB且位移為3000;commitA提交失敗但commitB提交成功,此時(shí)commitA進(jìn)行重試并成功的話,會(huì)將實(shí)際上將已經(jīng)提交的位移從3000回滾到2000,導(dǎo)致消息重復(fù)消費(fèi)。
try {
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
System.out.println(record.key());
}
consumer.commitAsync();
}
}catch (Exception e){+
e.printStackTrace();
System.out.println("記錄錯(cuò)誤信息:"+e);
}finally {
try {
consumer.commitSync();
}finally {
consumer.close();
}
}