2022Apache RocketMO源碼解析指南_第1頁
2022Apache RocketMO源碼解析指南_第2頁
2022Apache RocketMO源碼解析指南_第3頁
2022Apache RocketMO源碼解析指南_第4頁
2022Apache RocketMO源碼解析指南_第5頁
已閱讀5頁,還剩166頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

TOC\o"1-1"\h\z\uRocketMQDLedger多副本即主從切換專欄回顧(源碼閱讀技巧篇 源碼分析RocketMQ 源碼分析RocketMQ消息軌 RocketMQ多副本前置篇:初探raft協(xié) 源碼分析RocketMQ多副本之Leader選 源碼分析RocketMQDLedger(多副本)之日志追加流 源碼分析RocketMQDLedger(多副本)之日志復制(傳播 基于raft協(xié)議的RocketMQDLedger多副本日志復制實現(xiàn)原 源碼分析RocketMQDLedger多副本存儲實 源碼分析RocketMQ整合DLedger(多副本)實現(xiàn)平滑升級的設(shè)計技 源碼分析RocketMQDLedger多副本即主從切換實現(xiàn)原 2.1RocketMQ2.1RocketMQDLedger多副本即主從切換專欄回顧(源碼閱讀技巧篇)<>>2.1RocketMQDLedgerRocketMQDLedger多副本即主RocketMQDLedger92首先在下決心研讀RocketMQDLedger多副本(主從切換)的源碼之前,首先還是費端可以繼續(xù)從從節(jié)點上消費消息,但無法繼續(xù)向該復制組發(fā)送消息。RocketMQ4.5.0版DLedger在該復制組內(nèi)觸發(fā)重新選主,選主完成后即可繼續(xù)提供消息寫功能。同時還了解到rocketmq主從切換是基于raft協(xié)議的。raft過其大體作用但并未詳細學習的應該也不在少數(shù),故我覺得看RocketMQDLedger多副本即主從切換之前應該重點了解raft協(xié)議。一、RocketMQ多副本前置篇:初探raftraft基本實現(xiàn)后,然后就可以步入到RocketMQDLedger多副本即主從切換的源碼研究了,raftraftraftRocketMQDLedgerLeader本文按照上一篇的思路,重點對DLedgerLeaderElector的實現(xiàn)進行了詳細分析,DEBUG一下,可以起到撥云見霧之效。RocketMQDLedgerDLedgerraft復制。因為日志復制將涉及到存儲,故在學習日志復制之前,先來看一下DLedger與存儲相關(guān)的設(shè)計,例如DLedger日志條目的存儲協(xié)議、日志在服務(wù)器的組織等關(guān)系,這部分類比RocketMQcmmitlg等的存儲。四、源碼分析RocketMQDLedger(在學習完DLedger五、源碼分析RocketMQDLedger(raft協(xié)議的RocketMQDLedger多副本日志復制設(shè)計志的實現(xiàn)要點做一個總結(jié),以此來介紹rocketmqDledger多副本即主從切換部分的raft協(xié)議的解讀。七、RocketMQ整合DLedger(多副本)即主從切換實現(xiàn)平滑升級6raft協(xié)議的選主與日志復制。從本節(jié)開始將介紹rocketmqrocketmq4.5.0版本才引入的,如果從老版本升級到4.5.0,直接RocketMQDLedger九、RocketMQDLedger經(jīng)過前面8LderLegerdebug官方提供的單元測試用例。溫馨提示:本專欄是《RocketMQ《RocketMQ2.2源碼分析2.2源碼分析RocketMQACL >2.2源碼分析RocketMQRocketMQRocketMQACLRocketMQACLRocketMQACL備注:RocketMQ在4.4.0時引入了ACL機制,本文代碼基于RocketMQ4.5.0RocketMQACLBrokerACL機BrokerACLif(!this.brokerConfig.isAclEnable()){("Thebrokerdosenotenableacl");//List<AccessValidator>accessValidators=L_VALIDATOR_ID, if(accessValidators==null||acceVaao.mp)){("ThebrokerdosenotloadtheAccessValidator");for(AccessValidatoraccessValidator:accessValidators)finalAccessValidatorvalidator=this.registerServerRPCHook(newRPCHook()publicvoiddoBeforeRequest(StringremoteAddr,motnComaest)//Donotcatchthe//publicvoiddoAfterResponse(StringremoteAddr,mtngomnrequest,Remngomandresponse){4代碼@1:BrokeraclaclEnable代碼@2:使用類似SPI機制,加載配置的AccessValidator,該方法返回一個列表,其實現(xiàn)邏輯時讀取META-NFeeo.aaheotqaessaao文代碼3(AccssValiatr),并向BrkerPCHok的BefreReuetRCHok的Afteresose代碼@4:RPCHook#doBeforeRequestAccessValidator#validateACLAclException。BrokerPlainAccessValidatorAccsRsurceare(emotigCmmadruet,trigrmotAdr)voidvalidate(AccessResourceRocketMQymlPlainAccessValidator的parse方法與validate細節(jié)。在講解該方法之前,我們首先認識一下RocketMQ封裝訪問資源的PlainAccessPlainAccessResourceprivateStringaccessKeyKey,用戶名。privateStringsecretKeyprivateStringhitRemteAdrssIPprivatebooleandminprivatebytefaltTpicerm=DENY。privatebytefaltGruPerm=1privatea<trin,Byte>resourcePermMapprivateRemoteAddressStrategyremoteAddressStrategy遠IPprivateintrequestCoderequestCode。privatebytecontentsecretKey生成簽名字符串,服務(wù)端重復這個步驟,然后對比簽名字符串,如果相同,則privateStringsecretTokentoken。publicpublicPlainAccessValidator()aclPlugEngine=newaPesoLodacl規(guī)則的加載,即解析linaclyml接下來會重點探討,即acl啟動流程之配置文件的解parse該方法的作用就是從請求命令中解析出本次訪問所需要的訪問權(quán)限,最終構(gòu)建AccessResource對象,為后續(xù)的校驗權(quán)限做準備。PlainAccessResourceaccessResource=PlainAccessResourceaccessResource=newPlainAccessResource();}elseStep1:首先創(chuàng)建PlainAccessResource,從遠程地址中提取出遠程訪問IP地址。ifif(request.getExtFields()==null)thrownewAclException("request'sextFieldsvalueistry

switch(request.getCode())caseRequestCode.SEND_MESSAGE:topic"),caseRequestCode.SEND_MESSAGE_V2:b"),caseRequestCode.CONSUMER_SEND_MSG_BACK:originTopic"),ExtFields().get("group")),emon.UB);caseRequestCode.PULL_MESSAGE:topic"),caseRequestCode.QUERY_MESSAGE:accessResource.addResourceAndPerm(request.getExtFields().get("topic"),PermissicaseHeartbeatDataheartbeatData=HeartbeatData.decode(request.getBody(),HeartbeatData.class);for(ConsumerDatadata:heartbeatData.getConsumerDataSet()){GroupName()),for(SubscriptionDatasubscriptionData:

er

casefinalUnregisterClientRequestHeaderunregisterClientRequestHead(UnregisterClientRequestHeader)requestentRequestHeader.getConsumerGroup()),emon.UB);casefinaleCosueLstyGropRqustedtCosuristGroupRequestHeader=(GetConsumerListByGroupReqestHeader)eLBoupReqeeade.eonumeou)),emon.UB);casefinalUpdateConsumerOffsetRequestHeadertRequestHeader(UpdateConsumerOffsetRequestHeader)moumerOffsetRequestHeader.getConsumerGroup()),uestHeader.getTopic(),emon.UB);}catch(Throwablet)thrownewAclException(t.etMessage(),KeyACL權(quán)限驗//oeda<ng,String>map=neweeap<ng,forapn<ng,String>entry:request.getExtFields().entrySet()){if!eonedena.INAUR.euaen.geKe))){map.put(entry.getKey(),acceReouceeonenAU.cobneReueoneneue,map));returnaccessResource;contentpublicvoidvalidate(AccessResourceaccessResource){aclPlugEngine.validate((PlainAccessResource)accessResource);publicvoidvalidate(AccessResourceaccessResource){aclPlugEngine.validate((PlainAccessResource)accessResource);AclException。為了揭開配置文件的解析與驗證,我們將目光投入到laiPrmisiLaer默認aclcf/lai_cl.ymlacl配置文件名稱,默認為DEFAULT_PLAIN_ACL_FILE,可以通過系統(tǒng)參數(shù)-rcketmqacllinfil=filame指定。a<Strig,laiAccssesurc>linAccsReorceMpemotAdrsstratgyFctryrmotAdrestrateyFctoryPP地址。booleanpubliclinermisinLaer()publicvoidvalidate(PlainAccessResourceplainAccessResource)anenLaepublicpublicanemsonLoae){loadwatchMMap<String,PlainAccessResource>panAceReouceap=newHashMap<>();List<RemoteAddressStrategy>oblhtRotAddrssSrt=newArrayList<>Stringpath=feome+File.separator+Step1:初始化AcorMp(用戶配置的訪問資源,即權(quán)限容器)、lobalWhiteRmoteAddressStrateyIP${RJSONArrayJSONArrayobWeoddsss=plainAclConfData.getJSONArray("globaif(glbalhiteReoteAdreeLi!=null&&gloalhiteRemoteddreeLit.impty()){for(inti=0;i<gobaheRmoeddeLs.e);i++){teploalWhiteRmteAdreses則,使用remoteAddressStrategyFactory獲取一個訪問策略,下文會重點介紹其配置規(guī)則。JSONArrayJSONArrayaccounts=panAcnfDaageJONaaccoun);if(accounts!=null&&!accun.mp)){List<PlainAccessConfig>plainAccessConfigList=accounts.toJavaList(PlainAccessfor(PlainAccessConfigplainAccessConfig:plainAccessConfigList){PlainAccessResourceplainAccessResource=buildPlainAccessResource(plainh.gobheRemoedeaegy=gobaheRmoeddSaeg;h.panAceReouceap=panAceReouceap;Step3a_amaccounts,用戶定義的權(quán)限信PlainAccessConfigaccountsdop上述標簽的說明,請參考:《RocketMQACL使用指南》。具體的解析過程比較容privateprivatevoidwatch(){try{StringwatchFilePath=feome+laSrilWhei=newlaSrinString[]{watchFilePah},newFeachevceLene){publicvoidonChanged(Stringpath)("Theplainaclmlchanged,reloadthecontext");("SucceedtostartAcacheevce);h.achat=true;}catch(Exceptione)500msprivateStringhash(StringfilePath)throwsIOcepon,Nouchgohmcepon{Pathpath=Paths.get(filePath);byte[]hash=md.digest();returnmd5簽名來做對比,這里為什么不在啟動時先記錄上一次文件的修改時間,////Checktheglobalwhiteremotefor(RemoteAddressStrategyremoteAddressStrategy:oblheoeddsSttegy){ifemoeddeaeg.mahpanAceReouce)){ifif(plainAccessResource.getAccessKey()==null)thrownewAclException(String.format("NoaccessKeyisif!panAceRanKepanAceReouce.gecceKe))){thrownewlExpio(Srig.ort("aclconfigfor%s",plainAccessResourcStep2AccessKey異常;如果BrokerAclException。////CheckthewhiteaddrforPlainAccessResourceownedAccess=pancsssoucMp.gtpliAesRsifonedAce.geReoeAdeaeg).macpanAceReouce)){Step3////ChecktheStringsignature=AclUtils.calSignature(plainAccessResource.getContent(),ownedAccif(!signature.equals(plainAccessResource.getSignature()))thrownewAclException(String.format("ChecksignaturefailedforaccessKey=%s"Step4ccheckPerm(plainAccssResource,Step5checkPermififPrisson.eddmnPrnedhkdAes.gtqustode(&&!ownedAcce.Admn)){thrownewAclException(String.format("Needadmipmssoforrequest=%d,butaccessKey=%sisnot",needCheckedAccess.getRequestCode(),ownedAccessStep6Amin用戶才能訪問的權(quán)限,并且當前用戶并不是管理員角,則拋出異常,如下命令需要dmin角才能進行的操作:MaMap<String,Byte>needCheckedPermMap=needCheckedAccess.getResourcePermapng,Byte>ownedPermMap=ownedAccess.getResourcePermMap();if(needCheckedPermMap==null){//IftheneedCheckedPermMapisnull,thenreturnif(ownedPermMap==null&&onedcce.Admn))//IftheownedPermMapisnullanditisanadnuser,thenreturnforforMp.Ety<tigByte>needCheckedEntry:ByteneededPerm=needCheckedEntry.getValue();booleanbooleanisGroup=if(ownedPermMap==null||!oneemapconanKeeoue))//CheckthedefaultbyteownedPerm=isGroup?odcs.eatropP(:ownedAif(!Permission.checkPermission(neededPerm,ownedPerm))thrownewAclException(String.format("Nodefaultprissofor%s",PlanAccessResource.printStr(resource,isGroup)));if!emon.checemonneededem,ownedPermMap.get(resource))){thrownewlExeptio(Stin.fora("defaultprissiofor%s",PlainAccessResource.printStr(resource,AclException。BrokeraclACL需要處理的事情。ACL其在創(chuàng)建DefaultMQProducer時,注冊AclClientRPCHook鉤子,會在向服務(wù)端AclClitRPCHk。1.publicpublicvoiddoBeforeRequest(StringremoteAddr,Remngomandrequest){byte[]total=AcU.cmbneequoneneque,parseRequestContent(request,sessionCredentials.getAccessKey(),session //@1qet.ddExid(SGTEsignature);//@3request.addExtField(ACCESS_KEY,sessionCredentials.getAccessKey());//TheSecurityTokenvalueisunneccessary,usercanchoosethisone.if(sessionCredentials.getSecurityToken()!=null){request.addExtField(SECURITY_TOKEN,代碼@1:將Request請求參數(shù)進行排序,并加入accessKey代碼@2:對排好序的請參數(shù),使用用戶配置的密碼生成簽名,并最近到擴展字段SignatureSignature,如果相同,則表示簽名驗證代碼@3:將Signature、AccessKeyACLACLRocketMQ消息軌跡的使用與實 >2.3源碼分析RocketMQ2.3源碼分析2.3源碼分析RocketMQ< RocketMQ本文沿著《RocketMQ消息軌跡-設(shè)計篇》的思路,從如下3個方面對其源碼進行解publicpublicclassTraceProducerpublicstaticvoidnSrn[args)throwsCinExptonionealtMrocproducer=newealtMrocr(rucrrua //@1try{Messagemsg=newMessage("TopicTest","Hellood.eBeRemongee.FAULAR));SendResultsendResult=producer.send(msg);em.ou.nf%n,sendResult);}catch(Exceptione){從上述代碼可以看出其關(guān)鍵點是在創(chuàng)建DefauMQProuce時指定開啟消息軌跡跟蹤efultQPrucrpublicpublicDefauodcefnalStringproducerGroup,booleanenaegace)publicuMPodc(nStringproducerGroup,booleanblsTefinalStringproducerGroupbooleanoDeaulMQPruerpublicpublictQrod(nStringproducerGroup,RPCHookrpcHook,booleanenabegacefnalStringcuomedacepc){ //@1ducerGroup=defaultMQProducerImpl=newDefaultMQProducerImpl(this,//ifclientopenthemessagetracefeatureifenabegTace){//tryAsyncTraceDispatcherdispatcher=newyTaipae(soidTraceTopic,rpcHook);traceDispatcher=dispatcher;new////}catch(Throwablee)og.eror(systmqtracehookinitfailed,maybecan'tsendmsgtrace代碼@1:StringproducerGroupRPCHookrpcHookolanaleMgTrceStringcstmizeTracToic代碼@2:用來構(gòu)建AsyncTraceDispatcher,看其名:異步轉(zhuǎn)發(fā)消息軌跡數(shù)據(jù),稍代碼@3drcooImAsyncTraceDispatcherSenMeageTraeHIml鉤子函數(shù)droomp類圖消息軌跡轉(zhuǎn)發(fā)處理器,其默認實現(xiàn)類AsyncTraceDispatcherint異步轉(zhuǎn)發(fā),隊列長度,默認為2048intint128KefuQPuetraceProducerhredolxcutrtrcexcuterAtmicLngicarCuntThreadwokerArryBlckinQuee<TrceCntet>trcCotextQeueracCotetArryBlckinQuee<unnbl>pedrQuue124。efultQPuhCnsmrmlstCnsmromdcoomppublicpublicvoidsendMessageBefore(SendMessageContextcontext)//ifitismessagetracedata,thenitdoesn'tif(context==null||oxt.Msa(.topc.sasWh(snrDsptcher)ocDpache)geaceocNae))){ //@1//buildthecontextcontentofTuxeTraceContextTraceContexttuxeContext=newTraceContext();////buildthedatabeanobjectofmessagetraceTraceBeantraceBean=newTraceBean();//@3代碼@1:topicTopicTraceBean(TraceType.Pub)與生產(chǎn)者所屬的組。代碼@3TraceBeantopic、tags、keysbrokerenMesaeBfrepublicpublicvoidsendMessageAfter(SendMessageContextcontext)//ifitismessagetracedata,thenitdoesn'tif(context==null||if(context==null||oxt.Msa(.topc.sasWh(snrDsptcher)ocDpache)geaceocNae)) //@1||context.getMqTraceContext()==null){if(context.getSendResult()==null){if(context.getSendResult().getRegionId()==||!cne.geendeu).aceOn))TraceBeantraceBean=tuxeContext.getTraceBeans().get(0);//mm //@3//if(context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK))}elseaceBeaneoemeueone.gemeam)+come/2);//代碼@1:topicTopic代碼@3:代碼@4:設(shè)置costTime(耗時)、success(是否發(fā)送成功)、regionId(發(fā)送到broker所在的分區(qū))、msgId(消息ID,全局唯一)、offsetMsgId(消息物理偏移量,如果eTe間+二分之一的耗時)來表示消息的存儲時間,這里是一個估值。代碼@5:將需要跟蹤的信息通過TraceDispatcher轉(zhuǎn)發(fā)到Broker服務(wù)器。其代碼publicpublicbooleanappend(finalObjectctx)booleanresult=traceContextQueue.offer((TraceContext)ctx);if(!result){("bufferfull"+dcadun.ncmenAnet)+",contextis"+returnoffer接下來將目光轉(zhuǎn)向TraceDispatcherTraceDispatcherTraceDispatcher,用于客戶端消息軌跡數(shù)據(jù)轉(zhuǎn)發(fā)到Broker,其默認實現(xiàn)類:TraceDispatcherpublicpublicAsyncTraceDispatcher(StringrcTopiaeRPCHookrpcHook)throwsMQClientException{////queueSizeisgreaterthanorequaltothenpowerof2ofvaluethis.queueSize=2048;this.batchSize=this.maxMsgSize=128000;this.traceContextQueue=newAaBocngueueaceone1024);h.apendeuee=newAaBocknueue<unnaequeuee);}elsethis.traceTopicName= //this.traceExecuter=newThreadPoolExecutor(//:10,//20,1000*60,//meUn.ILIOND,//h.apendeuee,//newThreadFactoryImpl("MQTraceSendThread_"));traceProducer=getAndCreateTraceProducer(rpcHook); 代碼@1:隊列長度,默認為2048BrokerBroker128kTPS發(fā)送過traceContextBrokerom用于接收消息軌跡的Topic,默認為RMQ_SYS_TRANS_HALF_TOPICBroker1020,隊列堆積長度2048,線程名稱:MQTraceSendThread_。、發(fā)送消息軌跡的Producer代碼@2:調(diào)用getAndCreateTraceProducer方法創(chuàng)建用于發(fā)送消息軌跡的getAndCreateTraceProducer詳解privateprivateDefaoducergetAndCreateTraceProducer(RPCHookrpcHook){DefauodcertraceProducerInstance=this.traceProducer;if(traceProducerInstance==null){//@1traceProducerInstance=newDefauoucepoo);//Themaxsizeofmessageis128KaceodceInanceeaeageiemagSe-10*1000);return代碼@1:如果還未建立發(fā)送者,則創(chuàng)建用于發(fā)送消息軌跡的消息發(fā)送者,其GroupName為:_INNER_TRACE_PRODUCER,消息發(fā)送超時時間5s,最大允許118K。publicpublicvoidstart(StringnameSrvAddr)throwsencepon{ifaed.copaeAdefae,true)){ //@1traceProducer.setInstanceName(TRACE_INSTANCE_NAME+"_"+nameSrvthis.worker=newThread(newAsyncRunnable(),"MQ-AsyncTraceDispatcher-Thread-"+dispatcherId); //@2開始啟動,其調(diào)用的時機為啟動efaultMQProducer時,如果啟用跟蹤消息軌跡,代碼@1:如果用于發(fā)送消息軌跡的發(fā)送者沒有啟動,則設(shè)置nameserver地址,并代碼@2:啟動一個線程,用于執(zhí)行AsyncRunnableclassAsyncRunnableclassAsyncRunnablempemesRunnable{privatebooleanstopped;publicvoidrun()while(!stopped)List<TraceContext>contexts=new////for(inti=0;i<batchSize;i++){TraceContextcontext=null;try{//gettracedataefromblockingQueue—context=traceContextQueue.poll(5,UN//}catch(InterruptedExceptione)if(context!=null){}elseAsyncAppenderRequestrequest=new}elseif(AsyncTraceDispatcher.this.stopped){this.stopped=true;代碼@1:構(gòu)建待提交消息跟蹤Bean,每次最多發(fā)送batchSize,默認為100條。代碼@2:traceContextQueueTraceContext,設(shè)置超時時5sTraceContext5s。代碼@3AsyncAppenderRequest。publicpublicvoidsendTraceData(List<TraceContext>contextList)Map<String,List<TraceTransferBean>>transBeanMap=newMp<rLfor(TraceContextcontext:contextList){ ifcone.geacBean).mp)){//TopicvaluecorrespondingtooriginalmessageentitycontentStringtopic=context.getTraceBeans().get(0).getTopic(); //@2//Useoriginalmessageentity'stopicaskeyStringkey=topic;List<TraceTransferBean>transBeanList=transBeanMap.get(key);if(transBeanList==null){transBeanMap.put(key,transBeanList);TraceTransferBeantraceData=TraceDataEncoder.encoderFromContextBean //@3forapn<ng,List<TraceTransferBean>>entry: //代碼@2Topic。代碼@3:TraceContext代碼@4:將編碼后的數(shù)據(jù)發(fā)送到Broker服務(wù)器。casecasePub:TraceBeanbean=//appendthecontentofcontextandtraceBeantotransferBean'sTransDatajson2)SubBeforefor(TraceBeanbean:ctx.getTraceBeans()){for(TraceBeanbean:ctx.getTraceBeans()){}12casecaseSubAfter:for(TraceBeanbean:ctx.getTraceBeans()){其實現(xiàn)原理其實是一樣的,就是在消息消費前后執(zhí)行特定的鉤子函數(shù),其實現(xiàn)類為CnsmeesageTrceHokml紹了。ocktQBrker那消息軌跡的主題名如何指定?其路由信息又怎么分配才好呢?是每臺Broker上都創(chuàng)建ocktMQRocketMQ默認的消息軌跡主題為:RMQ_SYS_TRACE_TOPIC,那該Topic需要if(this.brokerController.getBrokerConfig().isTraceTopicEnable()) //Stringtopic=TopicConfigtopicConfig=newTopicConfig(topic);//h.opcnfgabepupcnfg.gopcame),topicConfig);上述代碼出自opiconiMng的構(gòu)造函數(shù),在Broker啟動的時候會創(chuàng)建ticCnfiMaagrtopic代碼@1:如果Broker開啟了消息軌跡跟蹤(traceTopicEnable=true)時,會自動創(chuàng)建topic1。publicpublictQrod(nStringproducerGroup,RPCHookrpcHook,booleanenabegacefnalStringcuomedacepc)publicDefauuhonumefnalStringconsumerGroup,RPCHookllatMegQueeraegllatMegQueeratgbooleanenabegace,finalStringcuomedacepc)通過oeTeTp來指定消息軌跡TopicRocketMQTopic。RocktMQ消息軌跡的實現(xiàn)原理,下一2.42.4RocketMQraft >2.4RocketMQraftRocketMQraftRaftLeader一、LeaderRaft協(xié)議中節(jié)點有3種狀態(tài)(角領(lǐng)導者(Leader置為150ms~300ms之間的隨機值。當定時器到期后,節(jié)點狀態(tài)從Follower變成Candi通常情況下,三個節(jié)點中會有一個節(jié)點定時器率先到期,節(jié)點狀態(tài)變?yōu)镃andidate,Candidate當節(jié)點狀態(tài)為Candidate,將發(fā)起一輪投票,由于是第一輪投票,設(shè)置本輪投票輪次為1,并首先為自己投上一票,正如上圖所示的NodeA節(jié)點,Team為1,VoteCount1.ALeader,然后定時向集群內(nèi)的NodeA,集群中的LeaderLeaderFlower狀態(tài)的節(jié)點在記時時間超時內(nèi)沒有收到Leader的心跳包,就會從Flower節(jié)點變成NodeAB的CCandidate,則向集群內(nèi)的節(jié)點發(fā)起投票,如下圖所示。B2,然后首先為自己投上一篇,然后向其他節(jié)點發(fā)起投CBLeader3但也有可能同一時間,或一個節(jié)點在未收到另一個節(jié)點發(fā)起的投票請求之前變成Caniate1個的節(jié)點狀態(tài)都是Caniate,那該如何選主呢?Candidate4,首先節(jié)點C、D在收到D、C節(jié)點的投票請求時,都會返回不同意,因為在本輪投票中ACBDCDA,BCD圖顯示,C、D2此時A,B,C,D的定時器各自在倒計時,當節(jié)點成為Candidate時,或自身狀態(tài)本身ACBleaderBD56,如圖所示:RaftRaftlger(cketMQ多副本)模塊提供一些思路。Raft3Follower(跟隨者)、Candidate(候選者),投票的觸發(fā)點,F(xiàn)ollower、Candidate150ms-300msFollowerCandidate狀態(tài)的節(jié)點,每發(fā)起一輪投票,Team加一。A3,并且已經(jīng)B3,則會投反對票,如果收到輪次4Leader32343RaftLeaderset55LeaderACK。Leader如果Leader節(jié)點向從節(jié)點廣播日志時,其中某個從節(jié)點發(fā)送故障宕機,該如何處理日志在什么環(huán)節(jié)進行提交呢?Leader進入到RocketMQ多副本的學習中,通過源碼分析RocketMQDLedger的實現(xiàn)后,再raft >2.5RocketMQLeader2.5源碼分析2.5源碼分析RocketMQLeader< RocketMQLeader選主本文關(guān)鍵字:RocketMQ、多副本、DLedger、leader本文將按照《RocketMQraftRocketMQLeader的一些思考:3Follower(跟隨者)、Candidate(候選者),投票的觸發(fā)點,F(xiàn)ollower、Candidate150ms-300msFollowerCandidate狀態(tài)的節(jié)點,每發(fā)起一輪投票,Team加一。A3,并且已經(jīng)B3,則會投反對票,如果收到輪次4Leader32343一、DLedgerraftDLedgerompltblFturGettriRpoet(GttriqusrequesomlbFtAppdrpoappend(AppendEntryRequestrequest)ompltblFtMtdtRponmetadata(MetadataRequestreqDLedgerCmpltaleFtureVotRepne>vt(VteRqestruet)meabeuue<eaeaResonsheartBeat(HeartBeatRequestreLeaderomplblFtrlltrspopull(PullEntriesRequestrequesompltbFtrstryponpush(PushEntryRequestrequeHandlerDLedgerClientProtocolHandler、DLedgerProtocolHanderDLedgerServerNettyDLedLeader選舉實現(xiàn)器。DledgerServer,Dledger接下來將從DLedgerLeaderElector開始剖析DLedger是如何實現(xiàn)Leader(raft)LeaderDLedgerLeaderElector類圖RandomDLedgerConfigdLedgerConfigMemberStatememberStateDLedgerRpcServicerpcRPClglsteaerHartBetimelglstenHertBatTimelglstuccHartBeatimeinteartBatT允許最大的N個心跳周期內(nèi)未收到心跳包,狀態(tài)為Follower的節(jié)點只有超過mxHeartBeatLek*eartBeatTimentervalsCniatelgetTimeTReustVteolaneIcreaeTrmImmeiatelyintoI最小的發(fā)送投票間隔時間,默認為300msint最大的發(fā)送投票的間隔,默認為1000msList<RoleChangeHandler>注冊的節(jié)點狀態(tài)處理器,通過addRoleChangeHandler方法添加。longlastVoteCosttateaitainrtateaitainr通過DLedgerLeaderElector的startuppublicvoidstartup(){ //for(RoleChangeHandlerroleChangeHandler:roleChangeHandlers){ //@2代碼@1代碼@2:遍歷狀態(tài)改變監(jiān)聽器并啟動它,可通過DLedgerLeaderElector的addRoleChangeHandler方法增加狀態(tài)變化監(jiān)聽器。publicpublicvoidrun()while(running.get()){try{}catch(Throwablet){if(logger!=null){logger.error("UnexpectedErrorinrunning{}",getName(),從上面來看,主要是循環(huán)調(diào)用doWork方法,接下來重點看其doWork的實現(xiàn):publicvoiddoWork(){try{publicvoiddoWork(){try{if(DLedgerLeaderElector.this.dLedgerConfig.isEnableLeaderElector()){ //@1//////}catch(Throwablet){DLedgerLeaderElector.logger.error("Errorinheartbeat",t);代碼@4:沒執(zhí)行一次選主,休息10msprivatevoidmantantate)throwsException{ifmembtate.Leade)){}elseifmembtate.Fooe)){}elsecandidate我們在繼續(xù)往下看之前,需要知道m(xù)emberState的初始值是什么?我們追溯到創(chuàng)建raftfollowerDLedger的實現(xiàn)從candidate開始,一開始,集群內(nèi)的所有節(jié)點都會嘗試發(fā)起投票,這樣第一輪要達成選舉antainSat方法。下面重點來分析其狀態(tài)的mantansCandatifSyst.rrnTieills(<xtTmToRqustVot&&!needIncreaseTermImmediately){longlongedgenem;longStep1mLeaderLeadersynchronized(memberState){if!memeae.andae)){if(lastParseResult==Vospos.Prest.ATT_VE_X||needInceaeemImedae){longprevTerm=memberState.currTerm();term=memberState.nextTerm();oge.no("{_[ICESE_TEMfrom{}to{}",mbrSat.gtSlId(prevTerm,term);lastParseResult=}elseterm=ledgerEndIndex=memberState.getLedgerEndIndex();edgendem=Step2:初始化team、ledgerEndIndex、ledgerEndTerm屬性,其實現(xiàn)關(guān)鍵點如下:如果上一次的投票結(jié)果不是WAIT_TO_VOTE_NEXT(ifneedIneaeemImmeae){nemeoRequeoe=geNemeoReueVe);needIneaeemImmdaey=false;tep3:如果eeIcreaseermmmeiately為treflse,privateprivatelonggetNextTimeToRequestVote()returnSystem.currentTimeMillis()+lastVoteCost+Voeta+random.neInmaVoeInevs-mnoeInea);300ms(1000-300)之間的隨機值。finalfinals<Copeblte<VoeponequorumVoteResponses=voteForQuoumResponses(term,edgendem,ledgerEndIndex);finalfinalAomcongknownMaxTermInGroup=newAocLon-1);finalAtomicIntegeraNum=newAtomicInteger(0);finalAomcnegervadNum=newAtomicInteger(0);finalAomcnegeracceptedNum=newAtomicInteger(0);finalAomcnegerbggeLdgeum=newAtomicInteger(0);finalAomcooenalreadyHasLeader=newStep5kwnaTermnGrupcceteNum準備好,對端節(jié)點使用本次的輪次進入Candidate狀態(tài)。發(fā)起投票的節(jié)點的lgrEdTrm小于對端節(jié)點的個數(shù)。是否已經(jīng)存在Leaderforfor(CompletableFture<VoteRsponse>future:quorumVoteResponses)Step5if(x.getVoteResult()!=VoteResponse.RESULT.UNKNOWN){Step6:如果投票結(jié)果不是UNKNOW,則有效投票數(shù)量增1synchronizedsynchronized(knownMaxTermInGroup){switch(x.getVoteResult()){caseACCEPT:casecaseaeadHaLead.cmpeAndefae,true);caseREJECT_TERM_SMALL_THAN_LEDGER:caseREJECT_EXPIRED_VOTE_TERM:if(x.getTerm()>knownMaxTermInGroup.get())caseREJECT_EXPIRED_LEDGER_TERM:caseREJECT_SMALL_LEDGER_END_INDEX:caseStep7贊成票,acceptedNum加一,只有得到的贊成票超過集群節(jié)點數(shù)量的一半才能成為LeaeralreadyHasLeadertrue,無拒絕票,如果自己維護的term小于遠端維護的lderEdTrm則返回該結(jié)果,如teamteam,需要記錄對端最大的投票輪次,以便更新自己的投票輪termterm拒絕票,如果自己維護的deTeledgerTerm則返回該結(jié)果。igeregerum的值。lderTemlderTem相等,但是自己維護的dedgerEndIndex小于對端維護的值,返回該值,增加igeregerum計數(shù)器的值。team,則認為對端還未準備好投票,對端使用自CandidatetrytryvoteLatch.await(3000+random.nextInt(maxVoteIntervalMs),}catch(Throwableignore)Step8lastVoteCostlastVoteCost=DLegeU.eaedaVoeme);VoteResponse.ParseResultparseResult;if(knownMaxTermInGroup.get()>term)parseResult=VoRepoe.aeReu.AIOVONT;nemeoRequeoe=geNemeoReueVe);}elseif(alreadyHasLeader.get())parseResult=VoRepoe.aeReu.AIOVONT;etieRqett=tNtimoeutot(+heartBeatTimeIntervaMs*}elseif!memeae.uoumvdNumge)))parseResult=VoRepoe.aeReu.AIORVO;nemeoRequeoe=getNextTimeToRequestVote();}elseifmembae.uoumacepeNumge))){parseResult=VoteResponse.ParseResult.PASSED;}elseifmembae.uoumacepeNumge)+notReadyTermNum.get())){parseResult=VoRepoe.aeReu.RVOIDIALY;}elseif(membrState.isQuorum(acceptedNum.get()+biggerLedgerNm.gt()))parseResultparseResult=VoRepoe.aeReu.AIORVO;nemeoRequeoe=getNextTimeToRequestVote();}elseparseResult=VoRepoe.aeReu.AIOVONT;nemeoRequeoe=getNextTimeToRequestVote();Step9:根據(jù)收集的投票結(jié)果判斷是否能成為Leader溫馨提示:在講解關(guān)鍵點之前,我們先定義先將(當前時間戳+上次投票的開銷+最小投票間隔(300ms(1000300)之間的隨機值)1Candidate1如果已經(jīng)存在eader,該節(jié)點重新進入到Candiate,并重置定時器,該定時器的時間:1+eartBeatTimentervalMs*maHartBatLeak,ertBeatTimetervalsmaxHeartBeatLeak為允許最大丟失的心跳包,即如果FlowerLeader如果收到的有效票數(shù)未超過半數(shù),則重置計時器為“1個常規(guī)計時器”,然后等待重新投票,注意狀態(tài)為WAIT_TO_RVOTE如果得到的贊同票超過半數(shù),則成為Leader為REVOTE_IMMEDIATELY。ledgerEndIndexifif(parseResult==VoteResponse.ParseResult.PASSED)("[{}][VOTE_RESULT]hasbeenelectedtobetheleaderinterm{}",memeae.geefId,term);Step10:如果投票成功,則狀態(tài)機狀態(tài)設(shè)置為Leader,然后狀態(tài)管理在驅(qū)動狀態(tài)時會調(diào)用LegrLedrElctr#maitintatemaitaiAseaer方法。mantansLeade經(jīng)過iiCddt投票選舉后,被其他節(jié)點選舉成為領(lǐng)導后,會執(zhí)行該方法,其他節(jié)點的狀態(tài)還是Candidate,并在計時器過期后,又嘗試去發(fā)起選舉。接下來重LeaderprivatevoidmananALeae)throwsExceptionifDedertls.eapsed(astSndHatBaTie>arBaTieItrals{longterm;synchronized(memberState)if!memeae.Lead)) ////stopsendingterm=memberState.currTerm();leaderId=memberState.getLeaderId();aendeaBeame= //sendHeartbeats(term, //代碼@1:首先判斷上一次發(fā)送心跳的時間與當前時間的差值是否大于心跳包發(fā)送間代碼@2:leader代碼@4:mantansFlwe方法Candidatefollower,followerprivateprivatevoidmananAFoe)ifDLedgeUs.eapedasLeadeeaBeaTme)>2*vsynchronized(memberState)ifmebrtateiFlloer(&&(DLedgerUtils.elapsed(lastLeaderHeartBeame)>maxHeartBeatLeak*heaBeameIeva)){("[{}][HeartBeatTimeOut]stadrataTi{}heartBtTienevlMs{}lastLeader={}",mbrStt.eSefd)newiestplasLadeaBeame),heaBeameInea,memberState.getLeaderId());如果maxHeartBeatLeak(默認為3)個心跳包周期內(nèi)未收到心跳,則將狀態(tài)變更為節(jié)點的狀態(tài)為Candidate時會向集群內(nèi)的其他節(jié)點發(fā)起投票請求(個人覺得理解為拉票更好),向?qū)Ψ皆儐柺欠裨敢膺x舉我為Leader,對端節(jié)點會根據(jù)自己的情況對其投贊成票、拒絕票,如果是拒絕票,還會給出拒絕原因,具體由voteForQuorumResponses、handleVote這兩個方法來實現(xiàn),接下來我們分別對這兩個方法進行詳細分析。vteForQuorumRespnsesprivateprivateList<CompletableFuture<VoteResponse>>voteForQuorumResponses(longm,longlongledgerEndIndex)throwsException{ //@1L<omeabeuue<oeepone>responses=newArrayList<>();for(Stringid:memberState.getPeerMap().keySet())VoteRequestvoteRequest=newVoteRequest();omeabeFue<oeRpone>voteResponse;voteResponse=handleVote(voteRequest,}elsevoteResponse=return////@3//@3////代碼@1:longlonglong代碼@3代碼4anleVte發(fā)送給集群內(nèi)的其他節(jié)點,則通過網(wǎng)絡(luò)發(fā)送投票請求,對端節(jié)點調(diào)用各自的handleVote接下來重點關(guān)注handleVote方法,重點探討其投票處理邏輯。handleVote方法handleVote方法會并發(fā)被調(diào)用,因為可能同時收到多個節(jié)點的投票請求,故本synchronizedmemberState對象。ifif(!memberState.isPeerMember(request.getLeaderId()))logger.warn("[BUG][HandleVote]remoteId={}isanunknownmember",request.gereturnopabt.optdrwVosposeqt.mif(!self&&memeae.geefId.euaeque.geLadeI))){oe.a("B[HandleVote]selfId={}butremoteId={}",bSt.teI(),returnoplblur.opltdteeVoteResponse(request).term(memStep1BUG存在,否則是不會出現(xiàn)上if(request.getTerm()<memberState.currTerm()){//returnoplblur.opltdteeVoteResponse(request).term(memeeee}elseif(request.getTerm()==memberState.currTerm()){//@2if(memberState.currVoteFor()==null){//letit}elseif(membrState.currVoteFor().quals(request.etLeadrId()))//repeatjustletit}elseif(memberState.getLeaderId()!=null)returnCompletableFpletedFuture(newVoteResponse(request).tr(eetaeurem).tRul(teenR.CLAYH}elsereturnompeablutr.copltdFtr(eVoteResponse(reques}else ////steppeddownbylargertermneedIneaeemImmdaey=true;//onlycanhandleVotewhenthetermisreturnoplblur.opltdteeVoteResponse(request).term(memStep2team如果發(fā)起投票節(jié)點的term小于當前節(jié)點的如果發(fā)起投票節(jié)點的term等于當前節(jié)點的如果兩者的term相等,說明兩者都處在同一個投票輪次中,地位平等,接下來看該如果未投票、或已投票給請求節(jié)點,則繼續(xù)后面的邏輯(step3)LeaderLeader如果發(fā)起投票節(jié)點的term大于當前節(jié)點的termCandidateifif(request.getLedgerEndTerm()<memberState.getLedgerEndTerm())returnoplblur.opltdteeVoteResponse(request).term(memm.o}elseif(request.getLedgerEndTerm()==memberState.getLedgerEndTerm()&&request.getLedgerEndIndex()<memberState.getLedgerEndIndex()){returnoplblur.opltdteeVoteResponse(request).term(memif(request.getTerm()<memberState.getLedgerEndTerm())returnoplblur.opltdteeVoteResponse(request).term(memte3lgerndermlderEdTrm如果請求節(jié)點的lgernTerm小于當前節(jié)點的lgernderm如果lgernderm相等,但是ledgerEndIndex比當前節(jié)點小,則拒絕,原因如果請求的term小于lgernTerm以同樣的理由拒絕。returnreturnoletblFtuecmleeFtur(nVoteResponse(request).term(membeStep4Step1ompeabluue<Hatetsposefuture=dLedgerRpcService.heartBeat(heafuue.heomeeeaBeaRepnex,Throwableex)->{try{if(ex!=null){throwex;switch(DLedgerResponseCode.valueOf(x.getCode())){caseSUCCESS:casecasenconLeadecopaendefae,true);caseTERM_NOT_READY:if||memeae.uoumucNumge)+notReadyNum.get())){}catch(Throwablet)logger.error("Parseheartbeatresponsefailed",}finallyifaNum.e)==memeae.eee)){Step2主節(jié)點的投票termbbeaLah.aaheaBemeIneva,meUn.ILIOND);ifmembae.uoumuccum.e))){ //@1auccHeaBeame=}}("[{}]Parseheartbeatresponsesincost={}term={}lu={={}notReadyNum={}inconsistLea

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論