Spark大數(shù)據(jù)分析與實(shí)戰(zhàn)(第二版) 項(xiàng)目3 教案_第1頁(yè)
Spark大數(shù)據(jù)分析與實(shí)戰(zhàn)(第二版) 項(xiàng)目3 教案_第2頁(yè)
Spark大數(shù)據(jù)分析與實(shí)戰(zhàn)(第二版) 項(xiàng)目3 教案_第3頁(yè)
Spark大數(shù)據(jù)分析與實(shí)戰(zhàn)(第二版) 項(xiàng)目3 教案_第4頁(yè)
Spark大數(shù)據(jù)分析與實(shí)戰(zhàn)(第二版) 項(xiàng)目3 教案_第5頁(yè)
已閱讀5頁(yè),還剩6頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶(hù)提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

課程名稱(chēng)Spark大數(shù)據(jù)分析選用教材Spark大數(shù)據(jù)分析與實(shí)戰(zhàn)(第2版)出版社西安電子科技大學(xué)出版社章節(jié)項(xiàng)目3SparkRDD分析交通違章記錄教學(xué)內(nèi)容借助成熟的SparkRDD技術(shù),分析交通違章記錄文件中的數(shù)據(jù)。授課學(xué)時(shí)授課班級(jí)****專(zhuān)業(yè)*****班授課日期授課地點(diǎn)教學(xué)目標(biāo)了解RDD的特性及運(yùn)算的原理,了解RDD的執(zhí)行流程;熟悉各種數(shù)據(jù)源創(chuàng)建RDD的算子,多種方法查看RDD的元素(2)熟練使用算子完成RDD的轉(zhuǎn)換、排序、過(guò)濾、去重等操作;(3)能夠完成鍵值對(duì)RDD的生成、轉(zhuǎn)換等操作;(4)根據(jù)業(yè)務(wù)需求,能將RDD中數(shù)據(jù)輸出到文件系統(tǒng)中。重點(diǎn)難點(diǎn)RDD的生成(內(nèi)存數(shù)據(jù)、文件等生成)RDD的map、filter、sortBy等常用算子;鍵值對(duì)RDD的key、value相關(guān)操作,鍵值對(duì)RDD排序等;兩個(gè)RDD的相關(guān)操作:join、union、zip等。教學(xué)方法R講授£討論或座談£問(wèn)題導(dǎo)向?qū)W習(xí)£分組合作學(xué)習(xí)£案例教學(xué)£任務(wù)驅(qū)動(dòng)R項(xiàng)目教學(xué)£情景教學(xué)£演示匯報(bào)£實(shí)踐教學(xué)£參觀訪問(wèn)£引導(dǎo)文教學(xué)£其他(--)教學(xué)準(zhǔn)備(教師)教材:《Spark大數(shù)據(jù)分析與實(shí)戰(zhàn)(第2版)》硬件設(shè)備:內(nèi)存8G(或以上)的計(jì)算機(jī)(2)教學(xué)資源:課件PPT、教學(xué)日歷、相關(guān)軟件等教學(xué)準(zhǔn)備(學(xué)生)教材:《Spark大數(shù)據(jù)分析與實(shí)戰(zhàn)(第2版)》硬件設(shè)備:內(nèi)存8G(或以上)的計(jì)算機(jī)(3)教學(xué)資源:課件PPT、相關(guān)軟件等教學(xué)環(huán)節(jié)教學(xué)內(nèi)容與過(guò)程(教學(xué)內(nèi)容、教學(xué)方法、組織形式、教學(xué)手段)課前組織教師通過(guò)課程教學(xué)平臺(tái)或班級(jí)群發(fā)布學(xué)習(xí)預(yù)習(xí)任務(wù)及課程資源;學(xué)生提前預(yù)習(xí)相關(guān)內(nèi)容,并完成課前自測(cè)等。課程內(nèi)容描述任務(wù)3.1根據(jù)交通違章數(shù)據(jù)創(chuàng)建RDD認(rèn)識(shí)RDDRDD就是一個(gè)分布在集群多節(jié)點(diǎn)中存放數(shù)據(jù)的集合;雖然一個(gè)數(shù)據(jù)集分散于集群多個(gè)節(jié)點(diǎn),但邏輯上仍然是一個(gè)整體(即RDD),數(shù)據(jù)處理人員只需對(duì)這個(gè)整體進(jìn)行處理,而無(wú)需關(guān)注底層邏輯與實(shí)現(xiàn)方法,從而極大降低了大數(shù)據(jù)編程的難度。其計(jì)算流程如下:內(nèi)存數(shù)據(jù)創(chuàng)建RDD針對(duì)程序中已有的數(shù)據(jù)集合(List、Array、Tuple等),Spark提供了兩個(gè)方法:parallelize和makeRDD,它們均可復(fù)制數(shù)據(jù)集合的元素后,創(chuàng)建一個(gè)可并行計(jì)算的分布式數(shù)據(jù)集RDD。parallelize方式適用于做簡(jiǎn)單的Spark程序測(cè)試、Spark學(xué)習(xí);下面演示根據(jù)列表數(shù)據(jù)創(chuàng)建RDD:scala>valnums=List(1,2,3,4,5)//包含5個(gè)整數(shù)的列表nums:List[Int]=List(1,2,3,4,5)scala>valnumsRDD=sc.parallelize(nums)//根據(jù)列表nums,創(chuàng)建一個(gè)RDD(numsRDD)numsRDD:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[1]atparallelizeat<console>:26scala>valcars=Array("比亞迪","長(zhǎng)安","奇瑞","廣汽")cars:Array[String]=Array(比亞迪,長(zhǎng)安,奇瑞,廣汽)scala>valcarsRDD=sc.parallelize(cars)//根據(jù)數(shù)組cars,創(chuàng)建一個(gè)RDD(carsRDD)carsRDD:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[2]atparallelizeat<console>:26外部文件創(chuàng)建RDD由文件創(chuàng)建RDD,采用sc.textFile(“文件路徑”)方式,路徑前面需要加入“file://”以表示本地文件(Spark-shell環(huán)境下,要求所有節(jié)點(diǎn)的相同位置均保存該文件)?,F(xiàn)有本地文件“/home/hadoop/data/guide.txt”,借助textFile()方法,可以生成RDD,演示代碼如下:scala>valfileRDD=sc.textFile("file:///home/hadoop/data/guide.txt")//注意路徑的寫(xiě)法fileRDD:org.apache.spark.rdd.RDD[String]=file:///home/hadoop/data/guide.txtMapPartitionsRDD[11]attextFileat<console>:25scala>fileRDD.count()//使用count方法查看RDD的元素?cái)?shù)量,即guide.txt文件的行數(shù)。res14:Long=4任務(wù)3.2找出扣分最高的交通違法條目查看RDD的元素在學(xué)習(xí)或測(cè)試代碼時(shí),為了便于掌控計(jì)算過(guò)程、及時(shí)發(fā)現(xiàn)問(wèn)題,可以使用collect操作查看RDD內(nèi)元素的值;collect操作會(huì)將RDD的所有元素組成一個(gè)數(shù)組并返回給Driver端;其用法示例如下:scala>valnums=List(1,2,3,4,5)nums:List[Int]=List(1,2,3,4,5)scala>valnumsRDD=sc.parallelize(nums)//根據(jù)列表nums,創(chuàng)建RDDnumsRDD:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[4]atparallelizeat<console>:26scala>numsRDD.collect()//查看RDD的元素值res5:Array[Int]=Array(1,2,3,4,5)Map操作map操作是最常用的轉(zhuǎn)換操作,該操作接收一個(gè)函數(shù)作為參數(shù),進(jìn)而將RDD中的每個(gè)元素作為參數(shù)傳入某個(gè)函數(shù),函數(shù)處理完后的返回值組成一個(gè)新的RDD;其目的是根據(jù)現(xiàn)有的RDD,經(jīng)過(guò)函數(shù)處理,最終得到一個(gè)新的RDD。用法示例如下:scala>valdata=List(1,2,3,4,5,6)data:List[Int]=List(1,2,3,4,5,6)scala>valdataRDD=sc.parallelize(data)dataRDD:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[10]atparallelizeat<console>:26scala>valnewDataRDD=dataRDD.map(x=>x*2)newDataRDD:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[11]atmapat<console>:25scala>newDataRDD.collect()res10:Array[Int]=Array(2,4,6,8,10,12)scala>valpeoples=List("tom","jerry","petter","ken")peoples:List[String]=List(tom,jerry,petter,ken)scala>valpeoplesRDD=sc.makeRDD(peoples)peoplesRDD:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[7]atmakeRDDat<console>:26scala>valnewPeoplesRDD=peoplesRDD.map(x=>x.toUpperCase())newPeoplesRDD:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[8]atmapat<console>:25scala>newPeoplesRDD.collect()res8:Array[String]=Array(TOM,JERRY,PETTER,KEN)RDD的排序sortBy操作可以對(duì)RDD元素進(jìn)行排序,并返回排好序的新RDD;sortBy有3個(gè)參數(shù),其用法說(shuō)明如下;defsortBy[K](f:(T)?

K,

ascending:

Boolean

=

true,

numPartitions:

Int

=

this.partitions.length):

\o"org.apache.spark.rdd.RDD"RDD[T]ReturnthisRDDsortedbythegivenkeyfunction參數(shù)1:f:(T)?K,左邊為要排序的RDD的每一個(gè)元素,右邊返回要進(jìn)行排序的值。參數(shù)2:ascending(可選項(xiàng)),升序或降序排列標(biāo)識(shí),默認(rèn)為true、升序排列,若要降序排列則需寫(xiě)false。參數(shù)3:numPartitions(可選項(xiàng)),排序后新RDD的分區(qū)數(shù)量,默認(rèn)分區(qū)數(shù)量與原RDD相同。針對(duì)某個(gè)RDD,將RDD的元素?cái)?shù)據(jù)交給“f:(T)?K”函數(shù)進(jìn)行處理;而后按照函數(shù)運(yùn)算后的返回值進(jìn)行排序,默認(rèn)為升序排列。數(shù)值型RDD的統(tǒng)計(jì)對(duì)于數(shù)值元素組成的RDD,Spark提供了max、min、sum等若干統(tǒng)計(jì)算子,可以完成簡(jiǎn)單的統(tǒng)計(jì)分析;相關(guān)示例如下:scala>valdata=sc.makeRDD(List(8,10,7,4,1,9,6,3,5,2))data:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[0]atmakeRDDat<console>:25scala>data.max()//返回RDD中的最大值res9:Int=10scala>data.min()//返回RDD中的最小值res10:Int=1任務(wù)3.3查找某車(chē)輛的違章記錄1.filter操作filter是一個(gè)轉(zhuǎn)換操作,可用于篩選出滿(mǎn)足特定條件元素,返回一個(gè)新的RDD;其用法說(shuō)明如下:deffilter(f:(T)?

Boolean):

\o"org.apache.spark.rdd.RDD"RDD[T]ReturnanewRDDcontainingonlytheelementsthatsatisfyapredicate.其應(yīng)用示例如下:scala>valnumsRDD=sc.makeRDD(List(3,1,2,9,10,5,8,4,7,6))numsRDD:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[27]atmakeRDDat<console>:25scala>valrdd1=numsRDD.filter(x=>x%2==0)//過(guò)濾出偶數(shù)元素,組成一個(gè)新RDD并返回rdd1:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[28]atfilterat<console>:25scala>rdd1.collect()res5:Array[Int]=Array(2,10,8,4,6)scala>valtextsRDD=sc.makeRDD(List("IlikeSpark","HelikeHadoop","ShelikeSpark"))textsRDD:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[29]atmakeRDDat<console>:26scala>valrdd2=textsRDD.filter(x=>x.contains("Spark"))//過(guò)濾出含有字符串“Spark”的元素rdd2:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[30]atfilterat<console>:25scala>rdd2.collect()res6:Array[String]=Array(IlikeSpark,ShelikeSpark)2.distinct操作RDD的元素可能存在重復(fù)情況,當(dāng)我們需要去掉重復(fù)元素時(shí),可以使用distinct方法。scala>valdataRDD=sc.makeRDD(List(3,5,7,3,4,8,5))//dataRDD內(nèi)有重復(fù)元素3、5dataRDD:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[0]atmakeRDDat<console>:25scala>valnewDataRDD=dataRDD.distinct()//去除重復(fù)元素newDataRDD:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[3]atdistinctat<console>:25scala>newDataRDD.collect()//檢查是否成功去重res2:Array[Int]=Array(4,8,5,3,7)3.union等操作union方法可將兩個(gè)RDD的元素合并為一個(gè)新RDD,即得到兩個(gè)RDD的并集intersection可以求兩個(gè)RDD的交集,即兩個(gè)RDD的相同元素類(lèi)似于數(shù)學(xué)中集合的差集運(yùn)算,可以使用subtract來(lái)求兩個(gè)RDD的差集cartesian用于求兩個(gè)RDD的笛卡爾積,將兩個(gè)集合元素組合成一個(gè)新的RDD任務(wù)3.4查找違章次數(shù)3次以上車(chē)輛鍵值對(duì)RDD鍵值對(duì)RDD(PairRDD)是指每個(gè)RDD元素都是(Key,Value)鍵值類(lèi)型(即二元組);普通RDD里面存儲(chǔ)的數(shù)據(jù)類(lèi)型是Int、String等,而“鍵值對(duì)RDD”里面存儲(chǔ)的數(shù)據(jù)類(lèi)型是“鍵值對(duì)”。下面代碼中,我們首先定義一個(gè)列表scores,scores的每個(gè)元素為二元組,記錄學(xué)生的姓名及考試成績(jī);接下來(lái),使用parallelize方法生成鍵值對(duì)RDD(scoresRDD),scoresRDD元素的類(lèi)型為二元組。scala>valscores=List(("張小帥",84),("孫田",80),("馬莉",92))//scores的元素為二元組,例如("張小帥",84)scores:List[(String,Int)]=List((張小帥,84),(孫田,80),(馬莉,92))scala>valscoresRDD=sc.parallelize(scores)//scoresRDD即為鍵值對(duì)RDDscoresRDD:org.apache.spark.rdd.RDD[(String,Int)]=ParallelCollectionRDD[0]atparallelizeat<console>:26scala>scoresRDD.collect()//scoresRDD的元素為二元組res2:Array[(String,Int)]=Array((張小帥,84),(孫田,80),(馬莉,92))Lookup查找value鍵值對(duì)RDD的元素為(key,value)形式的二元組,keys操作可以獲取鍵值對(duì)RDD中所有的key,組成一個(gè)新的RDD并返回;values操作會(huì)把鍵值對(duì)RDD中的所有value返回,形成一個(gè)新的RDD;兩個(gè)操作的用法示例如下:scala>valdata=List(("Spark",1),("Hadoop",2),("Flink",3),("kafka",4))data:List[(String,Int)]=List((Spark,1),(Hadoop,2),(Flink,3),(kafka,4))scala>valpairRDD=sc.makeRDD(data)pairRDD:org.apache.spark.rdd.RDD[(String,Int)]=ParallelCollectionRDD[9]atmakeRDDat<console>:26scala>valkeysRDD=pairRDD.keys//獲取所有的key,組成新RDDkeysRDD:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[10]atkeysat<console>:25scala>keysRDD.collect()res6:Array[String]=Array(Spark,Hadoop,Flink,kafka)scala>valvaluesRDD=pairRDD.values//獲取所有的value,組成新的RDDvaluesRDD:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[12]atvaluesat<console>:25scala>valuesRDD.collect()res7:Array[Int]=Array(1,2,3,4)ByKey相關(guān)操作對(duì)于鍵值對(duì)RDD,Spark提供了groupByKey、sortByKey、reduceByKey等若干ByKey相關(guān)操作;其中,groupByKey是根據(jù)key值,對(duì)value進(jìn)行分組;用法演示如下:fruits:List[(String,Double)]=List((apple,5.5),(orange,3.0),(apple,8.2),(banana,2.7),(orange,4.2))scala>valfruitsRDD=sc.makeRDD(fruits)fruitsRDD:org.apache.spark.rdd.RDD[(String,Double)]=ParallelCollectionRDD[23]atmakeRDDat<console>:26scala>valgroupedRDD=fruitsRDD.groupByKey()//按照Key,對(duì)value進(jìn)行分組groupedRDD:org.apache.spark.rdd.RDD[(String,Iterable[Double])]=ShuffledRDD[25]atgroupByKeyat<console>:25scala>groupedRDD.collect()res22:Array[(String,Iterable[Double])]=Array((banana,CompactBuffer(2.7)),(orange,CompactBuffer(3.0,4.2)),(apple,CompactBuffer(5.5,8.2)))mapValue操作實(shí)際業(yè)務(wù)中,可能遇到只對(duì)鍵值對(duì)RDD的value部分進(jìn)行處理,而保持value不變的需求;這時(shí),可以使用mapValues(func),它的功能是將RDD元組中的value交給函數(shù)func處理。任務(wù)3.5查找累計(jì)扣12分以上車(chē)輛信息1.zip操作將兩個(gè)RDD組合成鍵值對(duì)RDD除了使用makeRDD等方式創(chuàng)建鍵值對(duì)RDD,還可以使用zip操作(亦稱(chēng)為“拉鏈操作”)將兩個(gè)元素?cái)?shù)量相同、分區(qū)數(shù)相同的普通RDD組合成一個(gè)鍵值對(duì)RDD。下面代碼中,rdd1由3個(gè)元素(分區(qū)數(shù)量默認(rèn)),rdd2也有3個(gè)元素;代碼rdd1.zip(rdd2)將前述兩個(gè)RDD組合成一個(gè)鍵值對(duì)新的RDD。scala>valrdd1=sc.makeRDD(List("東岳","西岳","南岳","北岳","中岳"))rdd1:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[1]atmakeRDDat<console>:25scala>valrdd2=sc.makeRDD(List("泰山","華山","衡山","恒山","嵩山"))rdd2:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[2]atmakeRDDat<console>:25scala>rdd1.zip(rdd2).collect()//rdd1、rdd2組成一個(gè)鍵值對(duì)RDD,并輸出其元素res3:Array[(String,String)]=Array((東岳,泰山),(西岳,華山),(南岳,衡山),(北岳,恒山),(中岳,嵩山))2join連接兩個(gè)RDDjoin概念來(lái)自于關(guān)系數(shù)據(jù)庫(kù)領(lǐng)域,SparkRDD中的join的類(lèi)型也包括內(nèi)連接(join)、左外連接(leftOuterJoin)、右外連接(rightOuterJoin)等。其中,join是對(duì)于給定的兩個(gè)鍵值對(duì)RDD(數(shù)據(jù)類(lèi)型為(K,V1)和(K,V2)),只有兩個(gè)RDD中都存在的Key才會(huì)被輸出,最終得到一個(gè)(K,(V1,V2))類(lèi)型的RDD;其用法示例如下:scala>rdd1.join(rdd2).collect()res5:Array[(String,(Int,Int))]=Array((tom,(1,5)),(jerry,(2,6)))scala>valrdd3=rdd1.join(rdd2)//rdd1、rdd2中有相同的Key:tom、jerryrdd3:org.apache.spark.rdd.RDD[(String,(Int,Int))]=MapPartitionsRDD[13]atjoinat<console>:263其他連接rightOuterJoin類(lèi)似于SQL中的右外關(guān)聯(lián)rightouterjoin,根據(jù)兩個(gè)RDD的Key進(jìn)行右連接,返回結(jié)果以右邊(第二個(gè))的RDD為主,關(guān)聯(lián)不上的記錄為空(None值)。leftOuterJoin類(lèi)似于SQL中的左外關(guān)聯(lián)leftouterjoin,可以根據(jù)兩個(gè)RDD的Key進(jìn)行左連接,返回結(jié)果以左邊(第一個(gè))的RDD為主,關(guān)聯(lián)不上的記錄為空(None值);其用法示例如下:scala>valrdd4=rdd1.rightOuterJoin(rdd2)//兩個(gè)RDD左連接rdd4:org.apache.spark.rdd.RDD[(String,(Option[Int],Int))]=MapPa

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶(hù)所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶(hù)上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶(hù)上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶(hù)因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論