




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)
文檔簡介
SparkStreaming源碼解讀之Job詳解一:SparkStreamingJob生成深度思考1.做大數(shù)據(jù)例如Hadoop,Spark等,如果不是流處理的話,一般會有定時任務(wù)。例如10分鐘觸發(fā)一次,1個小時觸發(fā)一次,這就是做流處理的感覺,一切不是流處理,或者與流處理無關(guān)的數(shù)據(jù)都將是沒有價值的數(shù)據(jù),以前做批處理的時候其實也是隱形的在做流處理。2.JobGenerator構(gòu)造的時候有一個核心的參數(shù)是jobScheduler,jobScheduler是整個作業(yè)的生成和提交給集群的核心,JobGenerator會基于DStream生成Job。這里面的Job就相當于Java中線程要處理的Runnable里面的業(yè)務(wù)邏輯封裝。Spark的Job就是運行的一個作業(yè)。3.SparkStreaming除了基于定時操作以外參數(shù)Job,還可以通過各種聚合操作,或者基于狀態(tài)的操作。4.每5秒鐘JobGenerator都會產(chǎn)生Job,此時的Job是邏輯級別的,也就是說有這個Job,并且說這個Job具體該怎么去做,此時并沒有執(zhí)行。具體執(zhí)行的話是交給底層的RDD的action去觸發(fā),此時的action也是邏輯級別的。底層物理級別的,SparkStreaming他是基于DStream構(gòu)建的依賴關(guān)系導(dǎo)致的Job是邏輯級別的,底層是基于RDD的邏輯級別的。valssc=newStreamingContext(conf,Seconds(5))5.SparkStreaming的觸發(fā)器是以時間為單位的,storm是以事件為觸發(fā)器,也就是基于一個又一個record.SparkStreaming基于時間,這個時間是BatchDuractions從邏輯級別翻譯成物理級別,最后一個操作肯定是RDD的action,但是并不想一翻譯立馬就觸發(fā)job。這個時候怎么辦?6.action觸發(fā)作業(yè),這個時候作為Runnable接口封裝,他會定義一個方法,這個方法里面是基于DStream的依賴關(guān)系生成的RDD。翻譯的時候是將DStream的依賴關(guān)系翻譯成RDD的依賴關(guān)系,由于DStream的依賴關(guān)系最后一個是action級別的,翻譯成RDD的時候,RDD的最后一個操作也應(yīng)該是action級別的,如果翻譯的時候直接執(zhí)行的話,就直接生成了Job,就沒有所謂的隊列,所以會將翻譯的事件放到一個函數(shù)中或者一個方法中,因此,如果這個函數(shù)沒有指定的action觸發(fā)作業(yè)是執(zhí)行不了的。7.SparkStreaming根據(jù)時間不斷的去管理我們的生成的作業(yè),所以這個時候我們每個作業(yè)又有action級別的操作,這個action操作是對DStream進行邏輯級別的操作,他生成每個Job放到隊列的時候,他一定會被翻譯為RDD的操作,那基于RDD操作的最后一個一定是action級別的,如果翻譯的話直接就是觸發(fā)action的話整個SparkStreaming的Job就不受管理了。因此我們既要保證他的翻譯,又要保證對他的管理,把DStream之間的依賴關(guān)系轉(zhuǎn)變?yōu)镽DD之間的依賴關(guān)系,最后一個DStream使得action的操作,翻譯成一個RDD之間的action操作,整個翻譯后的內(nèi)容他是一塊內(nèi)容,他這一塊內(nèi)容是放在一個函數(shù)體中的,這個函數(shù)體,他會函數(shù)的定義,這個函數(shù)由于他只是定義還沒有執(zhí)行,所以他里面的RDD的action不會執(zhí)行,不會觸發(fā)Job,當我們的JobScheduler要調(diào)度Job的時候,轉(zhuǎn)過來在線程池中拿出一條線程執(zhí)行剛才的封裝的方法。二:SparkStreamingJob生成源碼解析Spark作業(yè)動態(tài)生成三大核心:JobGenerator:負責Job生成。JobSheduler:負責Job調(diào)度。ReceiverTracker:獲取元數(shù)據(jù)。1.JobScheduler的start方法被調(diào)用的時候,會啟動JobGenerator的start方法。/**Startgenerationofjobs*/defstart():Unit=synchronized{//eventLoop是消息循環(huán)體,因為不斷的生成Jobif(eventLoop!=null)return//generatorhasalreadybeenstarted//CallcheckpointWriterheretoinitializeitbeforeeventLoopusesittoavoidadeadlock.//SeeSPARK-10125checkpointWriter//匿名內(nèi)部類eventLoop=newEventLoop[JobGeneratorEvent]("JobGenerator"){overrideprotecteddefonReceive(event:JobGeneratorEvent):Unit=processEvent(event)overrideprotecteddefonError(e:Throwable):Unit={jobScheduler.reportError("Errorinjobgenerator",e)}}//調(diào)用start方法。eventLoop.start()if(ssc.isCheckpointPresent){restart()}else{startFirstTime()}}EvenLoop:的start方法被調(diào)用,首先會調(diào)用onstart方法。然后就啟動線程。/***Aneventlooptoreceiveeventsfromthecallerandprocessalleventsintheeventthread.It*willstartanexclusiveeventthreadtoprocessallevents.**Note:Theeventqueuewillgrowindefinitely.Sosubclassesshouldmakesure`onReceive`can*handleeventsintimetoavoidthepotentialOOM.*/private[spark]abstractclassEventLoop[E](name:String)extendsLogging{privatevaleventQueue:BlockingQueue[E]=newLinkedBlockingDeque[E]()privatevalstopped=newAtomicBoolean(false)//開啟后臺線程。privatevaleventThread=newThread(name){setDaemon(true)overridedefrun():Unit={try{//不斷的從BlockQueue中拿消息。while(!stopped.get){//線程的start方法調(diào)用就會不斷的循環(huán)隊列,而我們將消息放到eventQueue中。valevent=eventQueue.take()try{//onReceive(event)}catch{caseNonFatal(e)=>{try{onError(e)}catch{caseNonFatal(e)=>logError("Unexpectederrorin"+name,e)}}}}}catch{caseie:InterruptedException=>//exitevenifeventQueueisnotemptycaseNonFatal(e)=>logError("Unexpectederrorin"+name,e)}}}defstart():Unit={if(stopped.get){thrownewIllegalStateException(name+"hasalreadybeenstopped")}//CallonStartbeforestartingtheeventthreadtomakesureithappensbeforeonReceiveonStart()eventThread.start()}onReceive:不斷的從消息隊列中獲得消息,一旦獲得消息就會處理。不要在onReceive中添加阻塞的消息,如果這樣的話會不斷的阻塞消息。消息循環(huán)器一般都不會處理具體的業(yè)務(wù)邏輯,一般消息循環(huán)器發(fā)現(xiàn)消息以后都會將消息路由給其他的線程去處理。/***Invokedintheeventthreadwhenpollingeventsfromtheeventqueue.**Note:Shouldavoidcallingblockingactionsin`onReceive`,ortheeventthreadwillbeblocked*andcannotprocesseventsintime.Ifyouwanttocallsomeblockingactions,runthemin*anotherthread.*/protecteddefonReceive(event:E):Unit消息隊列接收到事件后具體處理如下:/**Processesallevents*/privatedefprocessEvent(event:JobGeneratorEvent){logDebug("Gotevent"+event)eventmatch{caseGenerateJobs(time)=>generateJobs(time)caseClearMetadata(time)=>clearMetadata(time)caseDoCheckpoint(time,clearCheckpointDataLater)=>doCheckpoint(time,clearCheckpointDataLater)caseClearCheckpointData(time)=>clearCheckpointData(time)}}基于BatchDuractions生成Job,并完成checkpoint.Job生成的5個步驟。/**Generatejobsandperformcheckpointforthegiven`time`.*/privatedefgenerateJobs(time:Time){//SettheSparkEnvinthisthread,sothatjobgenerationcodecanaccesstheenvironment//Example:BlockRDDsarecreatedinthisthread,anditneedstoaccessBlockManager//Update:ThisisprobablyredundantafterthreadlocalstuffinSparkEnvhasbeenremoved.SparkEnv.set(ssc.env)Try{//第一步:獲取當前時間段里面的數(shù)據(jù)。根據(jù)分配的時間來分配具體要處理的數(shù)據(jù)。jobScheduler.receiverTracker.allocateBlocksToBatch(time)//allocatereceivedblockstobatch//第二步:生成Job,獲取RDD的DAG依賴關(guān)系。在此基于DStream生成了RDD實例。graph.generateJobs(time)//generatejobsusingallocatedblock}match{caseSuccess(jobs)=>//第三步:獲取streamIdToInputInfos的信息。BacthDuractions要處理的數(shù)據(jù),以及我們要處理的業(yè)務(wù)邏輯。valstreamIdToInputInfos=jobScheduler.inputInfoTracker.getInfo(time)//第四步:將生成的Job交給jobSchedulerjobScheduler.submitJobSet(JobSet(time,jobs,streamIdToInputInfos))caseFailure(e)=>jobScheduler.reportError("Errorgeneratingjobsfortime"+time,e)}//第五步:進行checkpointeventLoop.post(DoCheckpoint(time,clearCheckpointDataLater=false))}此時的outputStream是整個DStream中的最后一個DStream,也就是foreachDStream.defgenerateJobs(time:Time):Seq[Job]={logDebug("Generatingjobsfortime"+time)valjobs=this.synchronized{outputStreams.flatMap{outputStream=>//根據(jù)最后一個DStream,然后根據(jù)時間生成Job.valjobOption=outputStream.generateJob(time)jobOption.foreach(_.setCallSite(outputStream.creationSite))jobOption}}logDebug("Generated"+jobs.length+"jobsfortime"+time)jobs}此時的JobFunc就是我們前面提到的用函數(shù)封裝了Job。generateJob基于給定的時間生成SparkStreaming的Job,這個方法會基于我們的DStream的操作物化成了RDD,由此可以看出,DStream是邏輯級別的,RDD是物理級別的。/***GenerateaSparkStreamingjobforthegiventime.Thisisaninternalmethodthat*shouldnotbecalleddirectly.Thisdefaultimplementationcreatesajob*thatmaterializesthecorrespondingRDD.SubclassesofDStreammayoverridethis*togeneratetheirownjobs.*/private[streaming]defgenerateJob(time:Time):Option[Job]={getOrCompute(time)match{caseSome(rdd)=>{valjobFunc=()=>{valemptyFunc={(iterator:Iterator[T])=>{}}//rdd=>就是RDD的依賴關(guān)系context.sparkContext.runJob(rdd,emptyFunc)}//此時的Some(newJob(time,jobFunc))}caseNone=>None}}Job這個類就代表了Spark業(yè)務(wù)邏輯,可能包含很多SparkJobs./***ClassrepresentingaSparkcomputation.ItmaycontainmultipleSparkjobs.*/private[streaming]classJob(valtime:Time,func:()=>_){privatevar_id:String=_privatevar_outputOpId:Int=_privatevarisSet=falseprivatevar_result:Try[_]=nullprivatevar_callSite:CallSite=nullprivatevar_startTime:Option[Long]=Noneprivatevar_endTime:Option[Long]=Nonedefrun(){//調(diào)用func函數(shù),此時這個func就是我們前面generateJob中的func_result=Try(func())}此時put函數(shù)中的RDD是最后一個RDD,雖然觸發(fā)Job是基于時間,但是也是基于DStream的action的。/***GettheRDDcorrespondingtothegiventime;eitherretrieveitfromcache*orcompute-and-cacheit.*/private[streaming]finaldefgetOrCompute(time:Time):Option[RDD[T]]={//IfRDDwasalreadygenerated,thenretrieveitfromHashMap,//orelsecomputetheRDD//基于時間生成RDDgeneratedRDDs.get(time).orElse{//ComputetheRDDiftimeisvalid(e.g.correcttimeinaslidingwindow)//ofRDDgeneration,elsegeneratenothing.if(isTimeValid(time)){valrddOption=createRDDWithLocalProperties(time,displayInnerRDDOps=false){//Disablechecksforexistingoutputdirectoriesinjobslaunchedbythestreaming//scheduler,sincewemayneedtowriteoutputtoanexistingdirectoryduringcheckpoint//recovery;seeSPARK-4835formoredetails.Weneedtohavethiscallherebecause//compute()mightcauseSparkjobstobelaunched.PairRDDFunctions.disableOutputSpecValidation.withValue(true){//compute(time)}}//然后對generatedRDD進行checkpointrddOption.foreach{casenewRDD=>//RegisterthegeneratedRDDforcachingandcheckpointingif(storageLevel!=StorageLevel.NONE){newRDD.persist(storageLevel)logDebug(s"PersistingRDD${newRDD.id}fortime$timeto$storageLevel")}if(checkpointDuration!=null&&(time-zeroTime).isMultipleOf(checkpointDuration)){newRDD.checkpoint()logInfo(s"MarkingRDD${newRDD.id}fortime$timeforcheckpointing")}//以時間為Key,RDD為Value,此時的RDD為最后一個RDDgeneratedRDDs.put(time,newRDD)}rddOption}else{None}}}回到JobGenerator中的start方法。if(ssc.isCheckpointPresent){//如果不是第一次啟動的話,就需要從checkpoint中恢復(fù)。restart()}else{//否則的話,就是第一次啟動。startFirstTime()}}StartFirstTime的源碼如下:/**Startsthegeneratorforthefirsttime*/privatedefstartFirstTime(){valstartTime=newTime(timer.getStartTime())//告訴DStreamGraph第一個Batch啟動時間。graph.start(startTime-graph.batchDuration)//timer啟動,整個job不斷生成就開始了。timer.start(startTliseconds)logInfo("StartedJobGeneratorat"+startTime)}這里的timer是RecurringTimer。RecurringTimer的start方法會啟動內(nèi)置線程thread.privatevaltimer=newRecurringTimer(clock,ssc.graph.batchDliseconds,longTime=>eventLoop.post(GenerateJobs(newTime(longTime))),"JobGenerator")Timer.start源碼如下:/***Startatthegivenstarttime.*/defstart(startTime:Long):Long=synchronized{nextTime=startTime//每次調(diào)用的thread.start()logInfo("Startedtimerfor"+name+"attime"+nextTime)nextTime}調(diào)用thread啟動后臺進程。privatevalthread=newThread("RecurringTimer-"+name){setDaemon(true)overridedefrun(){loop}}loop源碼如下:/***Repeatedlycallthecallbackeveryinterval.*/privatedefloop(){try{while(!stopped){triggerActionForNextInterval()}triggerActionForNextInterval()}catch{casee:InterruptedException=>}}}tiggerActionForNextInterval源碼如下:privatedeftriggerActionForNextInterval():Unit={clock.waitTillTime(nextTime)callback(nextTime)prevTime=nextTime+=periodlogDebug("Callbackfor"+name+"calledattime"+prevTime)}此時的callBack是RecurringTimer傳入的。下面就去找callBack是誰傳入的,這個時候就應(yīng)該找RecurringTimer什么時候?qū)嵗?。private[streaming]classRecurringTimer(clock:Clock,period:Long,callback:(Long)=>Unit,name:String)extendsLogging{privatevalthread=newThread("RecurringTimer-"+name){setDaemon(true)overridedefrun(){loop}}在jobGenerator中,匿名函數(shù)會隨著時間不斷的推移反復(fù)被調(diào)用。privatevaltimer=newRecurringTimer(clock,ssc.graph.batchDliseconds,//匿名函數(shù),復(fù)制給callback。longTime=>eventLoop.post(GenerateJobs(newTime(longTime))),"JobGenerator")而此時的eventLoop就是JobGenerator的start方法中eventLoop.eventLoop是一個消息循環(huán)體當收到generateJobs,就會將消息放到線程池中去執(zhí)行。至此,就知道了基于時間怎么生成作業(yè)的流程就貫通了。Jobs:此時的jobs就是jobs的業(yè)務(wù)邏輯,就類似于RDD之間的依賴關(guān)系,保存最后一個job,然后根據(jù)依賴關(guān)系進行回溯。streamIdToInputInfos:基于BatchDuractions以及要處理的業(yè)務(wù)邏輯,然后就生成了JobSet.jobScheduler.submitJobSet(JobSet(time,jobs,streamIdToInputInfos))11此時的JobSet就包含了數(shù)據(jù)以及對數(shù)據(jù)處理的業(yè)務(wù)邏輯。/**ClassrepresentingasetofJobs*belongtothesamebatch.*/private[streaming]caseclassJobSet(time:Time,jobs:Seq[Job],streamIdToInputInfo:Map[Int,StreamInputInfo]=Map.empty){privatevalincompleteJobs=newHashSet[Job]()privatevalsubmissionTime=System.currentTimeMillis()//whenthisjobsetwassubmittedprivatevarprocessingStartTime=-1L//whenthefirstjobofthisjobsetstartedprocessingprivatevarprocessingEndTime=-1L//whenthelastjobofthisjobsetfinishedprocessingjobs.zipWithIndex.foreach{case(job,i)=>job.setOutputOpId(i)}incompleteJobs++=jobsdefhandleJobStart(job:Job){if(processingStartTime<0)processingStartTime=System.currentTimeMillis()}submitJobSet:defsubmitJobSet(jobSet:JobSet){if(jobS.isEmpty){logInfo("Nojobsaddedfortime"+jobSet.time)}else{listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))//jobSets.put(jobSet.time,jobSet)//jobHandlerjobS.foreach(job=>jobExecutor.execute(newJobHandler(job)))logInfo("Addedjobsfortime"+jobSet.time)}}JobHandle是一個Runnable接口,Job就是我們業(yè)務(wù)邏輯,代表的就是一系列RDD的依賴關(guān)系,job.run方法就導(dǎo)致了func函數(shù)的調(diào)用。privateclassJobHandler(job:Job)extendsRunnablewithLogging{importJobScheduler._defrun(){try{valformattedTime=UIUtils.formatBatchTime(liseconds,ssc.graph.batchDliseconds,showYYYYMMSS=false)valbatchUrl=s"/streaming/batch/?id=${liseconds}"valbatchLinkText=s"[outputoperation${job.outputOpId},batchtime${formattedTime}]"ss
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
- 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 出租移動廂房合同范本
- 交通隔離設(shè)施合同范本
- 叉車低價采購合同范本
- 廚房廚子購買合同范本
- 臺球售后維修合同范例
- 分期付款房子合同范本
- 農(nóng)村股權(quán)合作合同范本
- 入股合同范本合伙協(xié)議
- 南寧市購房合同范本
- 單位苗木服務(wù)合同范本
- 甘肅省白銀市2024年中考英語真題
- 胰腺囊性腫瘤
- 聯(lián)盟山東省菏澤一中2025屆高考全國統(tǒng)考預(yù)測密卷歷史試卷含解析
- 新學期開學第一課主題班會
- 2023八年級道德與法治下冊 第七課 尊重自由平等第1框 自由平等的真諦教案 新人教版
- 2024版離職技術(shù)人員保密協(xié)議
- 混凝土裂縫修補方案
- 潛水打撈合同范本
- 鋼樓梯計算書
- 中藥貼敷療法
- 2024年江蘇農(nóng)牧科技職業(yè)學院單招職業(yè)適應(yīng)性測試題庫各版本
評論
0/150
提交評論