更新時(shí)間:2023年06月21日18時(shí)41分 來(lái)源:傳智教育 瀏覽次數(shù):
延遲任務(wù)有固定周期有明確出發(fā)時(shí)間,而延遲隊(duì)列沒(méi)有固定的開始時(shí)間它常常是由一個(gè)事件觸發(fā)的,而在這個(gè)事件觸發(fā)之后的一段時(shí)間內(nèi)觸發(fā)另一個(gè)事件,任務(wù)可以立即執(zhí)行,也可以延遲。
延遲任務(wù)的應(yīng)用場(chǎng)景:
場(chǎng)景一:訂單下單之后30分鐘后,如果用戶沒(méi)有付錢,則系統(tǒng)自動(dòng)取消訂單;如果期間下單成功,任務(wù)取消
場(chǎng)景二:接口對(duì)接出現(xiàn)網(wǎng)絡(luò)問(wèn)題,1分鐘后重試,如果失敗,2分鐘重試,直到出現(xiàn)閾值終止
實(shí)現(xiàn)延遲任務(wù)的兩種任務(wù)
1)DelayQueue
JDK自帶DelayQueue 是一個(gè)支持延時(shí)獲取元素的阻塞隊(duì)列, 內(nèi)部采用優(yōu)先隊(duì)列 PriorityQueue 存儲(chǔ)元素,同時(shí)元素必須實(shí)現(xiàn) Delayed 接口;在創(chuàng)建元素時(shí)可以指定多久才可以從隊(duì)列中獲取當(dāng)前元素,只有在延遲期滿時(shí)才能從隊(duì)列中提取元素
DelayQueue屬于排序隊(duì)列,它的特殊之處在于隊(duì)列的元素必須實(shí)現(xiàn)Delayed接口,該接口需要實(shí)現(xiàn)compareTo和getDelay方法
getDelay方法:獲取元素在隊(duì)列中的剩余時(shí)間,只有當(dāng)剩余時(shí)間為0時(shí)元素才可以出隊(duì)列。
compareTo方法:用于排序,確定元素出隊(duì)列的順序。
實(shí)現(xiàn):
1:在測(cè)試包jdk下創(chuàng)建延遲任務(wù)元素對(duì)象DelayedTask,實(shí)現(xiàn)compareTo和getDelay方法,
2:在main方法中創(chuàng)建DelayQueue并向延遲隊(duì)列中添加三個(gè)延遲任務(wù),
3:循環(huán)的從延遲隊(duì)列中拉取任務(wù)
public class DelayedTask implements Delayed{ // 任務(wù)的執(zhí)行時(shí)間 private int executeTime = 0; public DelayedTask(int delay){ Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.SECOND,delay); this.executeTime = (int)(calendar.getTimeInMillis() /1000 ); } /** * 元素在隊(duì)列中的剩余時(shí)間 * @param unit * @return */ @Override public long getDelay(TimeUnit unit) { Calendar calendar = Calendar.getInstance(); return executeTime - (calendar.getTimeInMillis()/1000); } /** * 元素排序 * @param o * @return */ @Override public int compareTo(Delayed o) { long val = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return val == 0 ? 0 : ( val < 0 ? -1: 1 ); } public static void main(String[] args) { DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>(); queue.add(new DelayedTask(5)); queue.add(new DelayedTask(10)); queue.add(new DelayedTask(15)); System.out.println(System.currentTimeMillis()/1000+" start consume "); while(queue.size() != 0){ DelayedTask delayedTask = queue.poll(); if(delayedTask !=null ){ System.out.println(System.currentTimeMillis()/1000+" cosume task"); } //每隔一秒消費(fèi)一次 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
DelayQueue實(shí)現(xiàn)完成之后思考一個(gè)問(wèn)題:
使用線程池或者原生DelayQueue程序掛掉之后,任務(wù)都是放在內(nèi)存,需要考慮未處理消息的丟失帶來(lái)的影響,如何保證數(shù)據(jù)不丟失,需要持久化(磁盤)
2)RabbitMQ實(shí)現(xiàn)延遲任務(wù)
TTL:Time To Live (消息存活時(shí)間)
死信隊(duì)列:Dead Letter Exchange(死信交換機(jī)),當(dāng)消息成為Dead message后,可以重新發(fā)送另一個(gè)交換機(jī)(死信交換機(jī))。
3)redis實(shí)現(xiàn)
zset數(shù)據(jù)類型的去重有序(分?jǐn)?shù)排序)特點(diǎn)進(jìn)行延遲。例如:時(shí)間戳作為score進(jìn)行排序。
redis實(shí)現(xiàn)延遲任務(wù)的思路
1.為什么任務(wù)需要存儲(chǔ)在數(shù)據(jù)庫(kù)中?
延遲任務(wù)是一個(gè)通用的服務(wù),任何需要延遲得任務(wù)都可以調(diào)用該服務(wù),需要考慮數(shù)據(jù)持久化的問(wèn)題,存儲(chǔ)數(shù)據(jù)庫(kù)中是一種數(shù)據(jù)安全的考慮。
2.為什么redis中使用兩種數(shù)據(jù)類型,list和zset?
效率問(wèn)題,算法的時(shí)間復(fù)雜度
3.在添加zset數(shù)據(jù)的時(shí)候,為什么不需要預(yù)加載?
任務(wù)模塊是一個(gè)通用的模塊,項(xiàng)目中任何需要延遲隊(duì)列的地方,都可以調(diào)用這個(gè)接口,要考慮到數(shù)據(jù)量的問(wèn)題,如果數(shù)據(jù)量特別大,為了防止阻塞,只需要把未來(lái)幾分鐘要執(zhí)行的數(shù)據(jù)存入緩存即可。
4)延遲任務(wù)服務(wù)實(shí)現(xiàn)
搭建heima-leadnews-schedule模塊
leadnews-schedule是一個(gè)通用的服務(wù),單獨(dú)創(chuàng)建模塊來(lái)管理任何類型的延遲任務(wù)
①:導(dǎo)入資料文件夾下的heima-leadnews-schedule模塊到heima-leadnews-service下,如下圖所示:
②:添加bootstrap.yml
server: port: 51701 spring: application: name: leadnews-schedule cloud: nacos: discovery: server-addr: 192.168.200.130:8848 config: server-addr: 192.168.200.130:8848 file-extension: yml
③:在nacos中添加對(duì)應(yīng)配置,并添加數(shù)據(jù)庫(kù)及mybatis-plus的配置
spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC username: root password: root # 設(shè)置Mapper接口所對(duì)應(yīng)的XML文件位置,如果你在Mapper接口中有自定義方法,需要進(jìn)行該配置 mybatis-plus: mapper-locations: classpath*:mapper/*.xml # 設(shè)置別名包掃描路徑,通過(guò)該屬性可以給包中的類注冊(cè)別名 type-aliases-package: com.heima.model.schedule.pojos
北京校區(qū)