




版權(quán)說(shuō)明:本文檔由用戶(hù)提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
第Golang實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列目錄前言原理詳解pending2ReadyScriptready2UnackScriptunack2RetryScriptackconsume
前言
在之前探討延時(shí)隊(duì)列的文章中我們提到了redissondelayqueue使用redis的有序集合結(jié)構(gòu)實(shí)現(xiàn)延時(shí)隊(duì)列,遺憾的是go語(yǔ)言社區(qū)中并無(wú)類(lèi)似的庫(kù)。不過(guò)問(wèn)題不大,沒(méi)有輪子我們自己造。
本文的完整代碼實(shí)現(xiàn)在hdt3213/delayqueue,可以直接goget安裝使用。
使用有序集合結(jié)構(gòu)實(shí)現(xiàn)延時(shí)隊(duì)列的方法已經(jīng)廣為人知,無(wú)非是將消息作為有序集合的member,投遞時(shí)間戳作為score使用zrangebyscore命令搜索已到投遞時(shí)間的消息然后將其發(fā)給消費(fèi)者。
然而消息隊(duì)列不是將消息發(fā)給消費(fèi)者就萬(wàn)事大吉,它們還有一個(gè)重要職責(zé)是確保送達(dá)和消費(fèi)。通常的實(shí)現(xiàn)方式是當(dāng)消費(fèi)者收到消息后向消息隊(duì)列返回確認(rèn)(ack),若消費(fèi)者返回否定確認(rèn)(nack)或超時(shí)未返回,消息隊(duì)列則會(huì)按照預(yù)定規(guī)則重新發(fā)送,直到到達(dá)最大重試次數(shù)后停止。如何實(shí)現(xiàn)ack和重試機(jī)制是我們要重點(diǎn)考慮的問(wèn)題。
我們的消息隊(duì)列允許分布式地部署多個(gè)生產(chǎn)者和消費(fèi)者,消費(fèi)者實(shí)例定時(shí)執(zhí)行l(wèi)ua腳本驅(qū)動(dòng)消息在隊(duì)列中的流轉(zhuǎn)無(wú)需部署額外組件。由于Redis保證了lua腳本執(zhí)行的原子性,整個(gè)流程無(wú)需加鎖。
消費(fèi)者采用拉模式獲得消息,保證每條消息至少投遞一次,消息隊(duì)列會(huì)重試超時(shí)或者被否定確認(rèn)的消息(nack)直至到達(dá)最大重試次數(shù)。一條消息最多有一個(gè)消費(fèi)者正在處理,減少了要考慮的并發(fā)問(wèn)題。
請(qǐng)注意:若消費(fèi)時(shí)間超過(guò)了MaxConsumeDuration消息隊(duì)列會(huì)認(rèn)為消費(fèi)超時(shí)并重新投遞,此時(shí)可能有多個(gè)消費(fèi)者同時(shí)消費(fèi)。
具體使用也非常簡(jiǎn)單,只需要注冊(cè)處理消息的回調(diào)函數(shù)并調(diào)用start()即可:
packagemain
import(
"/go-redis/redis/v8"
"/hdt3213/delayqueue"
"strconv"
"time"
funcmain(){
redisCli:=redis.NewClient(redis.Options{
Addr:":6379",
queue:=delayqueue.NewQueue("example-queue",redisCli,func(payloadstring)bool{
//注冊(cè)處理消息的回調(diào)函數(shù)
//返回true表示已成功消費(fèi),返回false消息隊(duì)列會(huì)重新投遞次消息
returntrue
//發(fā)送延時(shí)消息
fori:=0;ii++{
err:=queue.SendDelayMsg(strconv.Itoa(i),time.Hour,delayqueue.WithRetryCount(3))
iferr!=nil{
panic(err)
//startconsume
done:=queue.StartConsume()
-done
由于數(shù)據(jù)存儲(chǔ)在redis中所以我們最多能保證在redis無(wú)故障且消息隊(duì)列相關(guān)key未被外部篡改的情況下不會(huì)丟失消息。
原理詳解
消息隊(duì)列涉及幾個(gè)關(guān)鍵的redis數(shù)據(jù)結(jié)構(gòu):
msgKey:為了避免兩條內(nèi)容完全相同的消息造成意外的影響,我們將每條消息放到一個(gè)字符串類(lèi)型的鍵中,并分配一個(gè)UUID作為它的唯一標(biāo)識(shí)。其它數(shù)據(jù)結(jié)構(gòu)中只存儲(chǔ)UUID而不存儲(chǔ)完整的消息內(nèi)容。每個(gè)msg擁有一個(gè)獨(dú)立的key而不是將所有消息放到一個(gè)哈希表是為了利用TTL機(jī)制避免pendingKey:有序集合類(lèi)型,member為消息ID,score為投遞時(shí)間的unix時(shí)間戳。readyKey:列表類(lèi)型,需要投遞的消息ID。unAckKey:有序集合類(lèi)型,member為消息ID,score為重試時(shí)間的unix時(shí)間戳。retryKey:列表類(lèi)型,已到重試時(shí)間的消息IDgarbageKey:集合類(lèi)型,用于暫存已達(dá)重試上線(xiàn)的消息IDretryCountKey:哈希表類(lèi)型,鍵為消息ID,值為剩余的重試次數(shù)
流程如下圖所示:
由于我們?cè)试S分布式地部署多個(gè)消費(fèi)者,每個(gè)消費(fèi)者都在定時(shí)執(zhí)行l(wèi)ua腳本,所以多個(gè)消費(fèi)者可能處于上述流程中不同狀態(tài),我們無(wú)法預(yù)知(或控制)上圖中五個(gè)操作發(fā)生的先后順序,也無(wú)法控制有多少實(shí)例正在執(zhí)行同一個(gè)操作。
因此我們需要保證上圖中五個(gè)操作滿(mǎn)足三個(gè)條件:
都是原子性的不會(huì)重復(fù)處理同一條消息操作前后消息隊(duì)列始終處于正確的狀態(tài)
只要滿(mǎn)足這三個(gè)條件,我們就可以部署多個(gè)實(shí)例且不需要使用分布式鎖等技術(shù)來(lái)進(jìn)行狀態(tài)同步。
是不是聽(tīng)起來(lái)有點(diǎn)嚇人?其實(shí)簡(jiǎn)單的很,讓我們一起來(lái)詳細(xì)看看吧~
pending2ReadyScript
pending2ReadyScript使用zrangebyscore掃描已到投遞時(shí)間的消息ID并把它們移動(dòng)到ready中:
--keys:pendingKey,readyKey
--argv:currentTime
localmsgs=redis.call('ZRangeByScore',KEYS[1],'0',ARGV[1])--從pendingkey中找出已到投遞時(shí)間的消息
if(#msgs==0)thenreturnend
localargs2={'LPush',KEYS[2]}--將他們放入readykey中
for_,vinipairs(msgs)do
table.insert(args2,v)
redis.call(unpack(args2))
redis.call('ZRemRangeByScore',KEYS[1],'0',ARGV[1])--從pendingkey中刪除已投遞的消息
ready2UnackScript
ready2UnackScript從ready或者retry中取出一條消息發(fā)送給消費(fèi)者并放入unack中,類(lèi)似于RPopLPush:
--keys:readyKey/retryKey,unackKey
--argv:retryTime
localmsg=redis.call('RPop',KEYS[1])
if(notmsg)thenreturnend
redis.call('ZAdd',KEYS[2],ARGV[1],msg)
returnmsg
unack2RetryScript
unack2RetryScript從retry中找出所有已到重試時(shí)間的消息并把它們移動(dòng)到unack中:
--keys:unackKey,retryCountKey,retryKey,garbageKey
--argv:currentTime
localmsgs=redis.call('ZRangeByScore',KEYS[1],'0',ARGV[1])--找到已到重試時(shí)間的消息
if(#msgs==0)thenreturnend
localretryCounts=redis.call('HMGet',KEYS[2],unpack(msgs))--查詢(xún)剩余重試次數(shù)
fori,vinipairs(retryCounts)do
localk=msgs[i]
iftonumber(v)0then--剩余次數(shù)大于0
redis.call("HIncrBy",KEYS[2],k,-1)--減少剩余重試次數(shù)
redis.call("LPush",KEYS[3],k)--添加到retrykey中
else--剩余重試次數(shù)為0
redis.call("HDel",KEYS[2],k)--刪除重試次數(shù)記錄
redis.call("SAdd",KEYS[4],k)--添加到垃圾桶,等待后續(xù)刪除
redis.call('ZRemRangeByScore',KEYS[1],'0',ARGV[1])--將已處理的消息從unackkey中刪除
因?yàn)閞edis要求lua腳本必須在執(zhí)行前在KEYS參數(shù)中聲明自己要訪(fǎng)問(wèn)的key,而我們將每個(gè)msg有一個(gè)獨(dú)立的key,我們?cè)趫?zhí)行unack2RetryScript之前是不知道哪些msgkey需要被刪除。所以lua腳本只將需要?jiǎng)h除的消息記在garbagekey中,腳本執(zhí)行完后再通過(guò)del命令將他們刪除:
func(q*DelayQueue)garbageCollect()error{
ctx:=context.Background()
msgIds,err:=q.redisCli.SMembers(ctx,q.garbageKey).Result()
iferr!=nil{
returnfmt.Errorf("smembersfailed:%v",err)
iflen(msgIds)==0{
returnnil
//allowconcurrentclean
msgKeys:=make([]string,0,len(msgIds))
for_,idStr:=rangemsgIds{
msgKeys=append(msgKeys,q.genMsgKey(idStr))
err=q.redisCli.Del(ctx,msgKeys...).Err()
iferr!=nilerr!=redis.Nil{
returnfmt.Errorf("delmsgsfailed:%v",err)
err=q.redisCli.SRem(ctx,q.garbageKey,msgIds).Err()
iferr!=nilerr!=redis.Nil{
returnfmt.Errorf("removefromgarbagekeyfailed:%v",err)
returnnil
之前提到的lua腳本都是原子性執(zhí)行的,不會(huì)有其它命令插入其中。gc函數(shù)由3條redis命令組成,在執(zhí)行過(guò)程中可能會(huì)有其它命令插入執(zhí)行過(guò)程中,不過(guò)考慮到一條消息進(jìn)入垃圾回收流程之后不會(huì)復(fù)活所以不需要保證3條命令原子性。
ack
ack只需要將消息徹底刪除即可:
func(q*DelayQueue)ack(idStrstring)error{
ctx:=context.Background()
err:=q.redisCli.ZRem(ctx,q.unAckKey,idStr).Err()
iferr!=nil{
returnfmt.Errorf("removefromunackfailed:%v",err)
//msgkeyhasttl,ignoreresultofdelete
_=q.redisCli.Del(ctx,q.genMsgKey(idStr)).Err()
q.redisCli.HDel(ctx,q.retryCountKey,idStr)
returnnil
否定確認(rèn)只需要將unackkey中消息的重試時(shí)間改為現(xiàn)在,隨后執(zhí)行的unack2RetryScript會(huì)立即將它移動(dòng)到retrykey
func(q*DelayQueue)nack(idStrstring)error{
ctx:=context.Background()
//updateretrytimeasnow,unack2Retrywillmoveittoretryimmediately
err:=q.redisCli.ZAdd(ctx,q.unAckKey,redis.Z{
Member:idStr,
Score:float64(time.Now().Unix()),
}).Err()
iferr!=nil{
returnfmt.Errorf("negativeackfailed:%v",err)
returnnil
consume
消息隊(duì)列的核心邏輯是每秒執(zhí)行一次的consume函數(shù),它負(fù)責(zé)調(diào)用上述腳本將消息轉(zhuǎn)移到正確的集合中并回調(diào)consumer來(lái)消費(fèi)消息:
func(q*DelayQueue)consume()error{
//執(zhí)行pending2ready,將已到時(shí)間的消息轉(zhuǎn)移到ready
err:=q.pending2Ready()
iferr!=nil{
returnerr
//循環(huán)調(diào)用ready2Unack拉取消息進(jìn)行消費(fèi)
varfetchCountuint
for{
idStr,err:=q.ready2Unack()
iferr==redis.Nil{//consumedall
break
iferr!=nil{
returnerr
fetchCount++
ack,err:=q.callback(idStr)
iferr!=nil{
returnerr
ifack{
err=q.ack(idStr)
}else{
err=q.nack(idStr)
iferr!=nil{
returnerr
iffetchCount=q.fetchLimi
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶(hù)所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶(hù)上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶(hù)上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶(hù)因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年北京裝飾裝修合同范本
- 慢阻肺護(hù)理查房
- 小班愛(ài)眼護(hù)眼活動(dòng)總結(jié)
- 藥物滲漏不良事件分析
- 2025年人力資源管理試題含答案
- 膜性腎病的臨床護(hù)理
- 醫(yī)學(xué)人衛(wèi)教材配套建設(shè)要點(diǎn)
- 人音版小學(xué)音樂(lè)五年級(jí)上冊(cè)《外婆的澎湖灣》教學(xué)設(shè)計(jì)
- 七年級(jí)英語(yǔ)語(yǔ)法總結(jié)模版
- 教室衛(wèi)生大掃除活動(dòng)總結(jié)模版
- 2022萬(wàn)能試驗(yàn)機(jī)驗(yàn)收規(guī)范
- 西部計(jì)劃考試考題及答案
- 《低鉀血癥病人護(hù)理》課件
- 消防水池防水合同
- 2025年供港活牛供宰與屠宰設(shè)備采購(gòu)合同
- 2024綠城地產(chǎn)代建項(xiàng)目合作協(xié)議范本9篇
- 2024上海市招聘社區(qū)工作者考試題及參考答案
- 企業(yè)貸款知識(shí)培訓(xùn)
- 教科版三年級(jí)下冊(cè)科學(xué)全冊(cè)同步練習(xí)(一課一練)
- 餐飲服務(wù)自查記錄表
- 垃圾分類(lèi)測(cè)試題(含答案)
評(píng)論
0/150
提交評(píng)論