Golang實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列_第1頁(yè)
Golang實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列_第2頁(yè)
Golang實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列_第3頁(yè)
Golang實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列_第4頁(yè)
Golang實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列_第5頁(yè)
已閱讀5頁(yè),還剩5頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶(hù)提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

第Golang實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列目錄前言原理詳解pending2ReadyScriptready2UnackScriptunack2RetryScriptackconsume

前言

在之前探討延時(shí)隊(duì)列的文章中我們提到了redissondelayqueue使用redis的有序集合結(jié)構(gòu)實(shí)現(xiàn)延時(shí)隊(duì)列,遺憾的是go語(yǔ)言社區(qū)中并無(wú)類(lèi)似的庫(kù)。不過(guò)問(wèn)題不大,沒(méi)有輪子我們自己造。

本文的完整代碼實(shí)現(xiàn)在hdt3213/delayqueue,可以直接goget安裝使用。

使用有序集合結(jié)構(gòu)實(shí)現(xiàn)延時(shí)隊(duì)列的方法已經(jīng)廣為人知,無(wú)非是將消息作為有序集合的member,投遞時(shí)間戳作為score使用zrangebyscore命令搜索已到投遞時(shí)間的消息然后將其發(fā)給消費(fèi)者。

然而消息隊(duì)列不是將消息發(fā)給消費(fèi)者就萬(wàn)事大吉,它們還有一個(gè)重要職責(zé)是確保送達(dá)和消費(fèi)。通常的實(shí)現(xiàn)方式是當(dāng)消費(fèi)者收到消息后向消息隊(duì)列返回確認(rèn)(ack),若消費(fèi)者返回否定確認(rèn)(nack)或超時(shí)未返回,消息隊(duì)列則會(huì)按照預(yù)定規(guī)則重新發(fā)送,直到到達(dá)最大重試次數(shù)后停止。如何實(shí)現(xiàn)ack和重試機(jī)制是我們要重點(diǎn)考慮的問(wèn)題。

我們的消息隊(duì)列允許分布式地部署多個(gè)生產(chǎn)者和消費(fèi)者,消費(fèi)者實(shí)例定時(shí)執(zhí)行l(wèi)ua腳本驅(qū)動(dòng)消息在隊(duì)列中的流轉(zhuǎn)無(wú)需部署額外組件。由于Redis保證了lua腳本執(zhí)行的原子性,整個(gè)流程無(wú)需加鎖。

消費(fèi)者采用拉模式獲得消息,保證每條消息至少投遞一次,消息隊(duì)列會(huì)重試超時(shí)或者被否定確認(rèn)的消息(nack)直至到達(dá)最大重試次數(shù)。一條消息最多有一個(gè)消費(fèi)者正在處理,減少了要考慮的并發(fā)問(wèn)題。

請(qǐng)注意:若消費(fèi)時(shí)間超過(guò)了MaxConsumeDuration消息隊(duì)列會(huì)認(rèn)為消費(fèi)超時(shí)并重新投遞,此時(shí)可能有多個(gè)消費(fèi)者同時(shí)消費(fèi)。

具體使用也非常簡(jiǎn)單,只需要注冊(cè)處理消息的回調(diào)函數(shù)并調(diào)用start()即可:

packagemain

import(

"/go-redis/redis/v8"

"/hdt3213/delayqueue"

"strconv"

"time"

funcmain(){

redisCli:=redis.NewClient(redis.Options{

Addr:":6379",

queue:=delayqueue.NewQueue("example-queue",redisCli,func(payloadstring)bool{

//注冊(cè)處理消息的回調(diào)函數(shù)

//返回true表示已成功消費(fèi),返回false消息隊(duì)列會(huì)重新投遞次消息

returntrue

//發(fā)送延時(shí)消息

fori:=0;ii++{

err:=queue.SendDelayMsg(strconv.Itoa(i),time.Hour,delayqueue.WithRetryCount(3))

iferr!=nil{

panic(err)

//startconsume

done:=queue.StartConsume()

-done

由于數(shù)據(jù)存儲(chǔ)在redis中所以我們最多能保證在redis無(wú)故障且消息隊(duì)列相關(guān)key未被外部篡改的情況下不會(huì)丟失消息。

原理詳解

消息隊(duì)列涉及幾個(gè)關(guān)鍵的redis數(shù)據(jù)結(jié)構(gòu):

msgKey:為了避免兩條內(nèi)容完全相同的消息造成意外的影響,我們將每條消息放到一個(gè)字符串類(lèi)型的鍵中,并分配一個(gè)UUID作為它的唯一標(biāo)識(shí)。其它數(shù)據(jù)結(jié)構(gòu)中只存儲(chǔ)UUID而不存儲(chǔ)完整的消息內(nèi)容。每個(gè)msg擁有一個(gè)獨(dú)立的key而不是將所有消息放到一個(gè)哈希表是為了利用TTL機(jī)制避免pendingKey:有序集合類(lèi)型,member為消息ID,score為投遞時(shí)間的unix時(shí)間戳。readyKey:列表類(lèi)型,需要投遞的消息ID。unAckKey:有序集合類(lèi)型,member為消息ID,score為重試時(shí)間的unix時(shí)間戳。retryKey:列表類(lèi)型,已到重試時(shí)間的消息IDgarbageKey:集合類(lèi)型,用于暫存已達(dá)重試上線(xiàn)的消息IDretryCountKey:哈希表類(lèi)型,鍵為消息ID,值為剩余的重試次數(shù)

流程如下圖所示:

由于我們?cè)试S分布式地部署多個(gè)消費(fèi)者,每個(gè)消費(fèi)者都在定時(shí)執(zhí)行l(wèi)ua腳本,所以多個(gè)消費(fèi)者可能處于上述流程中不同狀態(tài),我們無(wú)法預(yù)知(或控制)上圖中五個(gè)操作發(fā)生的先后順序,也無(wú)法控制有多少實(shí)例正在執(zhí)行同一個(gè)操作。

因此我們需要保證上圖中五個(gè)操作滿(mǎn)足三個(gè)條件:

都是原子性的不會(huì)重復(fù)處理同一條消息操作前后消息隊(duì)列始終處于正確的狀態(tài)

只要滿(mǎn)足這三個(gè)條件,我們就可以部署多個(gè)實(shí)例且不需要使用分布式鎖等技術(shù)來(lái)進(jìn)行狀態(tài)同步。

是不是聽(tīng)起來(lái)有點(diǎn)嚇人?其實(shí)簡(jiǎn)單的很,讓我們一起來(lái)詳細(xì)看看吧~

pending2ReadyScript

pending2ReadyScript使用zrangebyscore掃描已到投遞時(shí)間的消息ID并把它們移動(dòng)到ready中:

--keys:pendingKey,readyKey

--argv:currentTime

localmsgs=redis.call('ZRangeByScore',KEYS[1],'0',ARGV[1])--從pendingkey中找出已到投遞時(shí)間的消息

if(#msgs==0)thenreturnend

localargs2={'LPush',KEYS[2]}--將他們放入readykey中

for_,vinipairs(msgs)do

table.insert(args2,v)

redis.call(unpack(args2))

redis.call('ZRemRangeByScore',KEYS[1],'0',ARGV[1])--從pendingkey中刪除已投遞的消息

ready2UnackScript

ready2UnackScript從ready或者retry中取出一條消息發(fā)送給消費(fèi)者并放入unack中,類(lèi)似于RPopLPush:

--keys:readyKey/retryKey,unackKey

--argv:retryTime

localmsg=redis.call('RPop',KEYS[1])

if(notmsg)thenreturnend

redis.call('ZAdd',KEYS[2],ARGV[1],msg)

returnmsg

unack2RetryScript

unack2RetryScript從retry中找出所有已到重試時(shí)間的消息并把它們移動(dòng)到unack中:

--keys:unackKey,retryCountKey,retryKey,garbageKey

--argv:currentTime

localmsgs=redis.call('ZRangeByScore',KEYS[1],'0',ARGV[1])--找到已到重試時(shí)間的消息

if(#msgs==0)thenreturnend

localretryCounts=redis.call('HMGet',KEYS[2],unpack(msgs))--查詢(xún)剩余重試次數(shù)

fori,vinipairs(retryCounts)do

localk=msgs[i]

iftonumber(v)0then--剩余次數(shù)大于0

redis.call("HIncrBy",KEYS[2],k,-1)--減少剩余重試次數(shù)

redis.call("LPush",KEYS[3],k)--添加到retrykey中

else--剩余重試次數(shù)為0

redis.call("HDel",KEYS[2],k)--刪除重試次數(shù)記錄

redis.call("SAdd",KEYS[4],k)--添加到垃圾桶,等待后續(xù)刪除

redis.call('ZRemRangeByScore',KEYS[1],'0',ARGV[1])--將已處理的消息從unackkey中刪除

因?yàn)閞edis要求lua腳本必須在執(zhí)行前在KEYS參數(shù)中聲明自己要訪(fǎng)問(wèn)的key,而我們將每個(gè)msg有一個(gè)獨(dú)立的key,我們?cè)趫?zhí)行unack2RetryScript之前是不知道哪些msgkey需要被刪除。所以lua腳本只將需要?jiǎng)h除的消息記在garbagekey中,腳本執(zhí)行完后再通過(guò)del命令將他們刪除:

func(q*DelayQueue)garbageCollect()error{

ctx:=context.Background()

msgIds,err:=q.redisCli.SMembers(ctx,q.garbageKey).Result()

iferr!=nil{

returnfmt.Errorf("smembersfailed:%v",err)

iflen(msgIds)==0{

returnnil

//allowconcurrentclean

msgKeys:=make([]string,0,len(msgIds))

for_,idStr:=rangemsgIds{

msgKeys=append(msgKeys,q.genMsgKey(idStr))

err=q.redisCli.Del(ctx,msgKeys...).Err()

iferr!=nilerr!=redis.Nil{

returnfmt.Errorf("delmsgsfailed:%v",err)

err=q.redisCli.SRem(ctx,q.garbageKey,msgIds).Err()

iferr!=nilerr!=redis.Nil{

returnfmt.Errorf("removefromgarbagekeyfailed:%v",err)

returnnil

之前提到的lua腳本都是原子性執(zhí)行的,不會(huì)有其它命令插入其中。gc函數(shù)由3條redis命令組成,在執(zhí)行過(guò)程中可能會(huì)有其它命令插入執(zhí)行過(guò)程中,不過(guò)考慮到一條消息進(jìn)入垃圾回收流程之后不會(huì)復(fù)活所以不需要保證3條命令原子性。

ack

ack只需要將消息徹底刪除即可:

func(q*DelayQueue)ack(idStrstring)error{

ctx:=context.Background()

err:=q.redisCli.ZRem(ctx,q.unAckKey,idStr).Err()

iferr!=nil{

returnfmt.Errorf("removefromunackfailed:%v",err)

//msgkeyhasttl,ignoreresultofdelete

_=q.redisCli.Del(ctx,q.genMsgKey(idStr)).Err()

q.redisCli.HDel(ctx,q.retryCountKey,idStr)

returnnil

否定確認(rèn)只需要將unackkey中消息的重試時(shí)間改為現(xiàn)在,隨后執(zhí)行的unack2RetryScript會(huì)立即將它移動(dòng)到retrykey

func(q*DelayQueue)nack(idStrstring)error{

ctx:=context.Background()

//updateretrytimeasnow,unack2Retrywillmoveittoretryimmediately

err:=q.redisCli.ZAdd(ctx,q.unAckKey,redis.Z{

Member:idStr,

Score:float64(time.Now().Unix()),

}).Err()

iferr!=nil{

returnfmt.Errorf("negativeackfailed:%v",err)

returnnil

consume

消息隊(duì)列的核心邏輯是每秒執(zhí)行一次的consume函數(shù),它負(fù)責(zé)調(diào)用上述腳本將消息轉(zhuǎn)移到正確的集合中并回調(diào)consumer來(lái)消費(fèi)消息:

func(q*DelayQueue)consume()error{

//執(zhí)行pending2ready,將已到時(shí)間的消息轉(zhuǎn)移到ready

err:=q.pending2Ready()

iferr!=nil{

returnerr

//循環(huán)調(diào)用ready2Unack拉取消息進(jìn)行消費(fèi)

varfetchCountuint

for{

idStr,err:=q.ready2Unack()

iferr==redis.Nil{//consumedall

break

iferr!=nil{

returnerr

fetchCount++

ack,err:=q.callback(idStr)

iferr!=nil{

returnerr

ifack{

err=q.ack(idStr)

}else{

err=q.nack(idStr)

iferr!=nil{

returnerr

iffetchCount=q.fetchLimi

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶(hù)所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶(hù)上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶(hù)上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶(hù)因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論