《計算及其實踐教程(第二版)》第六章Spark平臺教學(xué)課件_第1頁
《計算及其實踐教程(第二版)》第六章Spark平臺教學(xué)課件_第2頁
《計算及其實踐教程(第二版)》第六章Spark平臺教學(xué)課件_第3頁
《計算及其實踐教程(第二版)》第六章Spark平臺教學(xué)課件_第4頁
《計算及其實踐教程(第二版)》第六章Spark平臺教學(xué)課件_第5頁
已閱讀5頁,還剩71頁未讀 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

大數(shù)據(jù)平臺Spark插上翅膀的大象(SparkonHadoop)內(nèi)容三種計算框架Spark產(chǎn)生背景Spark特點Spark生態(tài)系統(tǒng)Spark核心概念RDDSpark程序設(shè)計實例進一步理解Spark核心概念RDD三種計算框架根據(jù)伯克利大學(xué)提出的關(guān)于數(shù)據(jù)分析的軟件棧BDAS(BerkeleyDataAnalyticsStack),目前的大數(shù)據(jù)處理可以分為如以下三個類型

批處理計算流式計算交互式計算批處理(Batch)計算成批處理數(shù)據(jù),特點是吞吐量大,但處理速度慢,實時性差。在三種計算框架中實時性最低,響應(yīng)時間在分鐘級到數(shù)十分鐘級,有時甚至達到數(shù)小時。批處理計算和其它兩種計算模式的差別就好比火車和飛機的差別,火車一次載人多,吞吐量大,但速度較慢;飛機一次載人少,但實時性好。典型實例:MapReduce、Hive、Pig適用場合:適合PB級以上海量數(shù)據(jù)的離線處理。比如MapReduce的輸入數(shù)據(jù)必須是靜態(tài)的(即離線的),不能動態(tài)變化。很多搜索引擎在統(tǒng)計過去一年或半年時間跨度內(nèi)的最流行的K個搜索詞時用到基于批處理的MapReduce計算框架(歷史數(shù)據(jù)分析而不是實時數(shù)據(jù)分析)。流式計算(Streaming)快速實時小批量地處理數(shù)據(jù)。在三種計算框架中實時性最高,響應(yīng)時間在數(shù)百毫秒級到數(shù)秒級。典型實例:Storm、SparkStreaming。適用場合:適合處理大量在線流式數(shù)據(jù),并返回實時的結(jié)果。比如在電子商務(wù)中統(tǒng)計過去1分鐘內(nèi)訪問最多的5件商品;淘寶雙11(光棍節(jié))時實時統(tǒng)計網(wǎng)站商品的總交易額;社交網(wǎng)絡(luò)趨勢追蹤;網(wǎng)站指標統(tǒng)計、點擊日志分析等。交互式計算(Interactive)以近實時方式處理SQL或類SQL交互式數(shù)據(jù)。在三種計算框架中實時性居中,響應(yīng)時間在數(shù)十秒到數(shù)分鐘之間。典型實例:Impala、SparkSQL。適用場合:適合以請求-響應(yīng)的交互方式處理大量結(jié)構(gòu)化和半結(jié)構(gòu)化數(shù)據(jù)。比如使用SQL查詢結(jié)構(gòu)化的數(shù)據(jù),很容易地完成包括商業(yè)智能BI在內(nèi)的各種復(fù)雜的數(shù)據(jù)分析算法。內(nèi)容三種計算框架Spark產(chǎn)生背景Spark特點Spark生態(tài)系統(tǒng)Spark核心概念RDDSpark程序設(shè)計實例進一步理解Spark核心概念RDDSpark產(chǎn)生背景:MapReduce局限性MapReduce框架局限性僅支持Map和Reduce兩種操作迭代計算效率低(如不適應(yīng)機器學(xué)習(xí)、圖計算等需要大量迭代的計算)不適合交互式處理(如數(shù)據(jù)挖掘)不適合流式處理(如點擊日志分析)MapReduce編程不夠靈活嘗試scala函數(shù)式編程語言Spark產(chǎn)生背景:框架多樣化現(xiàn)有的各種計算框架各自為戰(zhàn)批處理:MapReduce、Hive、Pig流式計算:Storm交互式計算:Impala能否有一種靈活的框架可同時進行批處理、流式計算、交互式計算等?產(chǎn)生背景:大統(tǒng)一系統(tǒng)在一個統(tǒng)一的框架下,進行批處理、流式計算、交互式計算內(nèi)容三種計算框架Spark產(chǎn)生背景Spark特點Spark生態(tài)系統(tǒng)Spark核心概念RDDSpark程序設(shè)計實例進一步理解Spark核心概念RDDSpark特點高效內(nèi)存計算引擎DAG圖比MapReduce快10~100倍易用提供了豐富的API,支持Java,Scala,Python等多種語言同一功能實現(xiàn),Scala代碼量比MapReduce少2~5倍與Hadoop集成讀寫HDFS/Hbase與YARN集成Spark特點1:高效—MapReduce慢是由于MapReduce用磁盤存放中間結(jié)果,而且寫3份,用戶的查詢請求需要多次頻繁地訪問某個數(shù)據(jù)集時磁盤I/O慢,數(shù)據(jù)的復(fù)制和序列化也導(dǎo)致運行速度慢。Spark特點1:高效—SparkSpark快的原因是把輸入數(shù)據(jù)一次性讀取到分布式內(nèi)存中,以后用戶的的多次查詢和處理都從內(nèi)存中讀寫,其速度是MapReduce網(wǎng)絡(luò)復(fù)制和磁盤I/O速度的10到100倍。由于技術(shù)的進步,機器的內(nèi)存空間越來越大,而且內(nèi)存條也越來越便宜,使Spark這種基于內(nèi)存的計算成為可能。但當(dāng)數(shù)據(jù)量很大時,內(nèi)存空間若放不下,Spark有相關(guān)機制可以把熱點數(shù)據(jù)(比如一天讀取幾百次的數(shù)據(jù))放入內(nèi)存而把非熱點數(shù)據(jù)(比如一個月讀取一次的數(shù)據(jù))放入磁盤。Spark特點1:高效—Spark當(dāng)?shù)螖?shù)達到30次時,Hadoop的運行時間大約是4000s,而Spark的運行時間大約是400s,相差10倍左右。Spark特點2:易用—MapReduceSpark特點2:易用—SparkSpark特點3:與Hadoop集成內(nèi)容三種計算框架Spark產(chǎn)生背景Spark特點Spark生態(tài)系統(tǒng)Spark核心概念RDDSpark程序設(shè)計實例進一步理解Spark核心概念RDDSpark生態(tài)系統(tǒng)Alluxio(原名Tachyon)定義:Tachyon(超光子)是一種內(nèi)存式的文件系統(tǒng),可以認為是搭建在HDFS上的分布式緩存。它可以在集群里以訪問內(nèi)存的速度來訪問存在Tachyon里的文件。架構(gòu):Tachyon是架構(gòu)在最底層的分布式文件存儲和上層的各種計算框架之間的一種中間件。具體地說,是在分布式文件存儲系統(tǒng)如HDFS、S3等之上,在Spark、MapReduce、Impala等各種計算框架之下。引入Tachyon的原因1)提高不同任務(wù)或框架間數(shù)據(jù)交換的速度不同任務(wù)或不同計算框架間的數(shù)據(jù)共享情況在所難免,例如Spark的分屬不同Stage的兩個任務(wù),或Spark與MapReduce框架的數(shù)據(jù)交互。在這種情況下,一般就需要通過磁盤來完成數(shù)據(jù)交換,而這通常是效率很低的。而引入Tachyon中間層后,數(shù)據(jù)交換實際上是在內(nèi)存中進行的。引入Tachyon的原因2)使Spark的執(zhí)行引擎和存儲引擎分開Spark作為內(nèi)存計算框架,為什么還需要再加一層內(nèi)存管理的文件系統(tǒng)?因為Spark其實只提供了強大的內(nèi)存計算能力,但未提供存儲能力。那么默認讓Spark自己直接在內(nèi)存管理數(shù)據(jù)不行嗎?讓Spark自己來管理內(nèi)存會出現(xiàn)的問題。默認情況下,Spark的任務(wù)執(zhí)行和數(shù)據(jù)本身都在一個進程內(nèi)。當(dāng)執(zhí)行出現(xiàn)問題時就會導(dǎo)致整個進程崩潰,并丟失進程內(nèi)的所有數(shù)據(jù)。而Tachyon這一中間層的引入,就相當(dāng)于將存儲引擎從Spark中抽離出來,從而每個任務(wù)進程只負責(zé)執(zhí)行。進程的崩潰不會丟失數(shù)據(jù),因為數(shù)據(jù)都在Tachyon里面了。引入Tachyon的原因3)避免數(shù)據(jù)被重復(fù)加載不同的Spark任務(wù)可能會訪問同樣的數(shù)據(jù),例如兩個任務(wù)都要訪問HDFS中的某些Block。這時每個任務(wù)都要自己去磁盤加載數(shù)據(jù)到內(nèi)存中。而Tachyon可以只保存一份數(shù)據(jù)在內(nèi)存中供加載,而且它還使用堆外內(nèi)存,避免GC(垃圾收集)開銷。Mesos和YARNMesos是一個開源的資源管理系統(tǒng),可以對集群中的資源做彈性管理。目前Twitter,Apple等公司在使用Mesos管理集群資源。Apple的siri的后端便是采用Mesos進行資源管理。目前看來,HadoopYARN要比Mesos更主流,前景更廣闊。YARN在實現(xiàn)資源管理的前提下,能夠跟Hadoop生態(tài)系統(tǒng)完美結(jié)合。YARN定位為大數(shù)據(jù)中的數(shù)據(jù)操作系統(tǒng),能夠更好地為上層各類應(yīng)用程序(MapReduce/Spark)提供資源管理和調(diào)度功能。另外,非常重要的一點是,YARN的社區(qū)力量要比Mesos強大的多,它的參與人員眾多,周邊系統(tǒng)的建設(shè)非常完善。Shark和SparkSQLSparkSQL是分布式SQL查詢引擎,用于處理交互式數(shù)據(jù)流,把SQL命令分解成多個任務(wù)(Task)交給Hadoop集群處理。自2013年3月面世以來,SparkSQL已經(jīng)成為除SparkCore以外最大的Spark組件。在2014年7月1日的SparkSummit上,Databricks公司宣布終止對Shark的開發(fā),將重點放到SparkSQL上。SparkSQL將涵蓋Shark的所有特性,用戶可以從Shark0.9進行無縫的升級。除了接過Shark的接力棒,繼續(xù)為Spark用戶提供高性能的SQLonHadoop解決方案之外,SparkSQL還為Spark帶來了通用、高效、多元一體的結(jié)構(gòu)化數(shù)據(jù)處理能力。SparkSQL可加載和查詢各種數(shù)據(jù)源,比如Hive數(shù)據(jù)、Parquet列式存儲格式數(shù)據(jù)、JSON格式數(shù)據(jù)、通過JDBC和ODBC等連接各種數(shù)據(jù)源SparkStreamingSparkStreaming是大規(guī)模流式數(shù)據(jù)處理的新貴,將流式計算分解成一系列短小的批處理作業(yè)。SparkStreaming類似于ApacheStorm,用于流式數(shù)據(jù)的處理。SparkStreaming有高吞吐量和容錯能力強這兩個特點。SparkStreaming支持的數(shù)據(jù)輸入源很多,例如:HDFS、Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等。數(shù)據(jù)輸入后可以用Spark的高度抽象原語如map、reduce、join、window等進行運算。而結(jié)果也能保存在很多地方,如HDFS、數(shù)據(jù)庫等。另外SparkStreaming也能和MLlib(機器學(xué)習(xí))以及GraphX完美融合。SparkStreaming支持的數(shù)據(jù)輸入源和輸出地GraphXSparkGraphX是一個分布式圖處理框架,SparkGraphX基于Spark平臺提供對圖計算和圖挖掘簡潔易用的而豐富多彩的接口,極大的方便了大家對分布式圖處理的需求。社交網(wǎng)絡(luò)中人與人之間有很多關(guān)系鏈,例如Twitter、Facebook、微博、微信,這些都是大數(shù)據(jù)產(chǎn)生的地方,都需要圖計算,現(xiàn)在的圖處理基本都是分布式的圖處理,而并非單機處理。圖的分布式或者并行處理其實是把這張圖拆分成很多的子圖,然后我們分別對這些子圖進行計算,計算的時候可以分別迭代進行分階段的計算,即對圖進行并行計算。GraphX使用的是一種點和邊都帶有屬性的有向多重圖,有Table和Graph兩種視圖,只需一份物理存儲。MLBaseMLBase是Spark生態(tài)圈的一部分,專注于機器學(xué)習(xí),包含三個組件:MLlib、MLI、MLOptimizer。MLlib是Spark的機器學(xué)習(xí)庫,包括分類、聚類、回歸算法、決策樹、推薦算法等各種機器學(xué)習(xí)的核心算法的實現(xiàn)。MLI是MLlib的測試床,是用于特征提取和算法開發(fā)的實驗性API。MLBase的核心是MLOptimizer,把聲明式的任務(wù)轉(zhuǎn)化成復(fù)雜的學(xué)習(xí)計劃,輸出最優(yōu)的模型和計算結(jié)果。MLBase與其它機器學(xué)習(xí)系統(tǒng)Mahout的不同:2014年4月,Mahout告別MapReduce實現(xiàn),轉(zhuǎn)而采用Spark作為底層實現(xiàn)基礎(chǔ)。MLBase是自動化的,Mahout需要使用者具備機器學(xué)習(xí)技能,來選擇自己想要的算法和參數(shù)來做處理。MLBase提供了不同抽象程度的接口,可以擴充ML(機器學(xué)習(xí))算法。內(nèi)容三種計算框架Spark產(chǎn)生背景Spark特點Spark生態(tài)系統(tǒng)Spark核心概念RDDSpark程序設(shè)計實例進一步理解Spark核心概念RDDSpark核心概念—RDDRDD(ResilientDistributedDatasets,彈性分布式數(shù)據(jù)集)分布在集群中的只讀對象集合,由多個Partition(分區(qū))構(gòu)成,可以存儲在磁盤或內(nèi)存中(有多種存儲級別)。可以由Scala集合創(chuàng)建RDD,比如sc.parallelize(List(1,2,3))//對集合List(1,2,3)進行并行化生成RDD,默認Partition數(shù)量為1sc.parallelize(1ton,5)))//對集合1到n(n可以是很大的整數(shù))進行并行化生成RDD,指定Partition數(shù)量為5利用本地文件或HDFS文件創(chuàng)建RDD1.由文本文件(TextInputFormat)創(chuàng)建RDDsc.textFile(“file.txt”)//將本地文本文件加載成RDDsc.textFile(“directory/*.txt”)//將某類文本文件加載成RDDsc.textFile(“hdfs://nn:9000/path/file”)//將hdfs文件或目錄加載成RDDsc.textFile(“file:///file.txt”)//將本地文本文件加載成RDD,用file:///指定本地文件2.由sequenceFile文件(SequenceFileInputFormat)創(chuàng)建RDDsc.sequenceFile(“file.txt”)//將本地二進制文件加載成RDDsc.sequenceFile[String,Int](“hdfs://nn:9000/path/file”)//將hdfs二進制文件加載成RDDRDDRDD是只讀數(shù)據(jù)集,不支持修改,不考慮并發(fā)和加鎖。那么如何對RDD進行操作呢?有兩種基本操作(operator):Transformation(轉(zhuǎn)換)Action(行動)TransformationTransformation(轉(zhuǎn)換)可通過Scala集合或者Hadoop數(shù)據(jù)文件構(gòu)造一個新的RDD通過已有的RDD產(chǎn)生新的RDD(輸入是RDD,輸出還是RDD,則為Transformation)舉例:map,filter,groupBy,reduceByAction(行動)通過RDD計算得到一個或者一組值(輸入是RDD,輸出不是RDD,而是一個或一組值或?qū)懭氪鎯?,則為Action)舉例:count,collect,saveTransformation操作和Action操作的重要區(qū)別:無論執(zhí)行了多少次Transformation操作,RDD都不會真正執(zhí)行運算(只做標記,不觸發(fā)計算),只有當(dāng)Action操作被執(zhí)行時,運算才會觸發(fā)。這也稱為Spark的惰性執(zhí)行,即一段Spark代碼不會執(zhí)行,直到遇到第一個Action惰性執(zhí)行的好處是優(yōu)化的概率提高,即看到的步驟越多,最終執(zhí)行時優(yōu)化的可能性就越高。兩段代碼執(zhí)行效果有何不同beacons=spark.textFile(“hdfs://...”)cachedBeacons=beacons.cache()cachedBeacons.filter(_.contains(“HouseOfCards”))cachedBeacons.filter(_.contains(“GameOfThrone”))……beacons=spark.textFile(“hdfs://...”)cachedBeacons=beacons.cache()cachedBeacons.filter(_.contains(“HouseOfCards”)).countcachedBeacons.filter(_.contains(“GameOfThrone”)).count……這兩段代碼都從HDFS文件中生成RDD,然后用cache操作把RDD數(shù)據(jù)緩存到內(nèi)存,接著執(zhí)行了兩個filter操作,對緩存到內(nèi)存中的RDD數(shù)據(jù)進行過濾,找出包含特定字符串的數(shù)據(jù)。但第二段代碼在filter操作之后用了count操作,注意到count是Action操作,所以會真正觸發(fā)計算,從而得到包含特定字符串的數(shù)據(jù)個數(shù)。Operator示例RDD1包含集合1到7的整數(shù),由3個Partition組成。經(jīng)過第一個map(+1)操作,生成RDD2,其中集合的每個整數(shù)加1,但該操作的結(jié)果并不真的生成,而是只做了標記,不在內(nèi)存或磁盤生成該中間結(jié)果。因為map是Transformation操作。RDD2接著執(zhí)行SAVEASTEXTFILE(“HDFS://…”)操作,該操作是Action操作,因此計算被觸發(fā),把RDD中的加1后的數(shù)據(jù)保存到存儲系統(tǒng)HDFS指定的文件中。RDDtransformation舉例1)//創(chuàng)建RDD,用集合創(chuàng)建RDDvalnums=sc.parallelize(List(1,2,3))2)//將RDD傳入map函數(shù),求變量的平方,生成新的RDDvalsquares=nums.map(x=>x*x)//{1,4,9}3)//對RDD中元素進行過濾,求模2結(jié)果為0的偶數(shù),生成新的RDDvaleven=squares.filter(_%2==0)//{4}4)//利用flatMap將一個元素映射成多個,生成新的RDDnums.flatMap(x=>1tox)//=>{1,1,2,1,2,3}RDDAction舉例1)//用集合創(chuàng)建新的RDDvalnums=sc.parallelize(List(1,2,3))2)//將RDD保存為本地數(shù)組,collect將分布式的RDD返回為一個單機的數(shù)組,返回到driver程序所在的機器nums.collect()//=>Array(1,2,3)3)//返回前K個元素,返回一個數(shù)組,該操作目前并非并行執(zhí)行,而是由driver程序所在機器執(zhí)行的nums.take(2)//=>Array(1,2)4)//計算元素總數(shù)nums.count()//=>35)//合并集合元素nums.reduce(_+_)//=>66)//將RDD寫到HDFS中nums.saveAsTextFile(“hdfs://nn:8020/output”)nums.saveAsSequenceFile(“hdfs://nn:8020/output”)Key/Value類型的RDD1)valpets=sc.parallelize(List((“cat”,1),(“dog”,1),(“cat”,2)))2)pets.reduceByKey(_+_)//=>{(cat,3),(dog,1)}3)pets.groupByKey()//=>{(cat,Seq(1,2)),(dog,Seq(1)}4)pets.sortByKey()//=>{(cat,1),(cat,2),(dog,1)}Transformation和Action內(nèi)容三種計算框架Spark產(chǎn)生背景Spark特點Spark生態(tài)系統(tǒng)Spark核心概念RDDSpark程序設(shè)計實例進一步理解Spark核心概念RDD實例1:wordcount//導(dǎo)入相關(guān)類,_相當(dāng)于*號,即導(dǎo)入一個包中所有的類importorg.apache.spark._importSparkContext._//定義對象WordCountobjectWordCount{//定義main函數(shù),冒號后為數(shù)據(jù)類型defmain(args:Array[String]){//如果輸入的參數(shù)個數(shù)不等于3,則打印輸入命令格式的提示信息,//參數(shù)0為master地址,參數(shù)1為輸入數(shù)據(jù)所在目錄,比如://hdfs://host:port/input/data,參數(shù)2為數(shù)據(jù)輸出目錄,比//如:hdfs://host:port/output/dataif(args.length!=3){println("usageisorg.test.WordCount<master><input><output>")return}實例1:wordcount//創(chuàng)建SparkContext對象,封裝了spark執(zhí)行環(huán)境信息,依次包括//master地址、作業(yè)名稱、Spark安裝目錄、作業(yè)依賴的jar包val

sc=newSparkContext(args(0),"WordCount",System.getenv("SPARK_HOME"),Seq(System.getenv("SPARK_TEST_JAR")))//用輸入的文本文件創(chuàng)建RDDval

textFile=sc.textFile(args(1))//單詞計數(shù)的程序主體。其中split根據(jù)給定的正則表達式的匹配拆分//字符串,\s表示空格、回車、換行等分隔符,+表示一個或多個的意//思valresult=textFile.flatMap(line=>line.split("\\s+")).map(word=>(word,1)).reduceByKey(_+_)//把單詞計數(shù)的結(jié)果存放到輸出文件中result.saveAsTextFile(args(2))}}Spark程序設(shè)計—基本流程1)創(chuàng)建SparkContext對象這是Spark程序的入口,封裝了spark執(zhí)行環(huán)境信息。有點類似數(shù)據(jù)庫編程中的建立連接操作。2)創(chuàng)建RDD可從Scala集合或Hadoop數(shù)據(jù)文件上創(chuàng)建3)在RDD之上進行轉(zhuǎn)換和actionSpark提供了多種轉(zhuǎn)換和action函數(shù)4)返回結(jié)果保存到HDFS中,或直接打印出來Spark程序設(shè)計—Scala用函數(shù)式編程的方式處理集合:1)varlist=List(1,2,3)//“=>”操作的含義可理解為把左邊的變量裝到右邊的表達式中2)list.foreach(x=>println(x))//打印1,2,3list.foreach(println)//與上式等價,形式更簡練3)list.map(x=>x+2)//=>List(3,4,5)list.map(_+2)//與上式等價。此處“_”稱為占位符,對集合中的每個元素作用一次。4)list.filter(x=>x%2==1)//=>List(1,3)list.filter(_%2==1)//與上式等價。5)list.reduce((x,y)=>x+y)//6list.reduce(_

+_)//與上式等價。Wordcount程序運行過程實例實例2:分布式估算Pi-蒙特卡洛算法它的具體定義是:在廣場上畫一個邊長一米的正方形,在正方形內(nèi)部隨意用粉筆畫一個不規(guī)則的形狀,現(xiàn)在要計算這個不規(guī)則圖形的面積,怎么計算呢?蒙特卡洛(MonteCarlo)方法告訴我們,均勻的向該正方形內(nèi)撒N(N是一個很大的自然數(shù))個黃豆,隨后數(shù)數(shù)有多少個黃豆在這個不規(guī)則幾何形狀內(nèi)部,比如說有M個,那么,這個奇怪形狀的面積便近似于M/N,N越大,算出來的值便越精確。在這里我們要假定豆子都在一個平面上,相互之間沒有重疊。蒙特卡洛方法可用于近似計算圓周率公式推導(dǎo)1)假設(shè)正方形邊長為d,則:正方形面積為:d*d圓的面積為:pi*(d/2)*(d/2)正方形與圓兩者面積之比為:4/pi2)隨機產(chǎn)生位于正方形內(nèi)的點n個,假設(shè)落到圓中的有count個,則:Pi=4*count/n3)當(dāng)n->∞時,Pi逼近真實值分布式估算Pi—調(diào)用“SparkPi”程序//定義對象SparkPiobjectSparkPi{//定義函數(shù)maindefmain(args:Array[String]){//創(chuàng)建SparkConf,封裝了Spark配置信息:應(yīng)用程序名稱valconf=newSparkConf().setAppName("SparkPi")//創(chuàng)建SparkContext,封裝了調(diào)度器等信息valspark=newSparkContext(conf)分布式估算Pi--計算//啟動一定數(shù)量的maptask進行并行處理,默認數(shù)量為2valslices=if(args.length>0)args(0).toIntelse2//初始化nvaln=100000*slices//計算落入園內(nèi)的點數(shù)valcount=spark.parallelize(1ton,slices).map{i=>valx=random*2-1valy=random*2-1if(x*x+y*y<1)1else0}.reduce(_+_)分布式估算Pi—輸出//根據(jù)公式輸出Pi值println("Piisroughly"+4.0*count/n)//程序結(jié)束spark.stop()}}分布式估算Pivalcount=spark.parallelize(1ton,slices).map{i=>valx=random*2-1valy=random*2-1if(x*x+y*y<1)1else0}.reduce(_+_)對集合1到n中的每個元素執(zhí)行map內(nèi)含的匿名函數(shù),該匿名函數(shù)基于以原點為圓心的坐標系,半徑為1,直徑為2,random函數(shù)給出一個(0,1)范圍內(nèi)的隨機數(shù),x和y是坐標(取值范圍為-1到1),根據(jù)圓的方程,點落在圓內(nèi)則map的結(jié)果為1,否則為0。Reduce函數(shù)對map的結(jié)果即一系列1和0進行累加,求出落入園內(nèi)的點數(shù)。程序架構(gòu)程序架構(gòu)幾個基本概念:(1)job:包含多個task組成的并行計算,往往由action催生。(2)stage:job的調(diào)度單位。(3)task:被送到某個executor上的工作單元。(4)taskSet:一組關(guān)聯(lián)的,相互之間沒有shuffle依賴關(guān)系的任務(wù)組成的任務(wù)集。一個應(yīng)用程序由一個driverprogram和多個job構(gòu)成。一個job由多個stage組成。一個stage由多個沒有shuffle關(guān)系的task組成。task是計算的最小單位。幾個基本概念程序架構(gòu)Driver:運行Application的main()函數(shù)并創(chuàng)建SparkContext。通常用SparkContext代表Driver。Spark程序架構(gòu)采用主從結(jié)構(gòu),一個Driver負責(zé)程序入口,多個executor分布式執(zhí)行更多個task。一個executor上可運行多個task,一個task相當(dāng)于一個線程,多個executor共享一個WorkerNode.程序架構(gòu)spark應(yīng)用程序的運行架構(gòu):由driver向集群申請資源,集群分配資源,啟動executor。driver將spark應(yīng)用程序的代碼和文件傳送給executor。executor上運行task,運行完之后將結(jié)果返回給driver或者寫入HDFS。在實例1的WordCount中,task的數(shù)目即任務(wù)總量由輸入HDFS的文件劃分的block數(shù)決定,一個block的大小默認為64MB(Hadoop2.0默認為128MB),HDFS的一個block默認為SparkRDD中的一個Partition,即一個task處理的數(shù)據(jù)量。同樣地,在實例2中,SparkRDD中的一個Partition即一個task處理的數(shù)據(jù)量,所以語句spark.parallelize(1ton,slices)中slices變量的值為指定的task數(shù),也是指定的Partition數(shù)。體驗spark:交互式模式spark-shellSpark-shell交互式運行spark代碼,類似于scala交互式終端可用于快速體驗spark,查看程序運行結(jié)果等如何使用bin/spark-shell.shscala>valdata=Array(1,2,3,4,5)//產(chǎn)生datascala>valdistData=sc.parallelize(data)//將data處理成RDDscala>distData.reduce(_+_)提交Spark程序(運行在YARN上)exportYARN_CONF_DIR=/opt/hadoop/yarn-client/etc/hadoopbin/spark-submit\--masteryarn-cluster\--classcom.hulu.examples.SparkPi\--namesparkpi\--driver-memory2g\--executor-memory3g\--executor-cores2\--num-executors2\--queuespark\--confspark.pi.iterators=500000\--confspark.pi.slices=10\$FWDIR/target/scala-2.10/spark-example-assembly-1.0.jarMaster參數(shù)指定程序運行模式,比如local,yarn-client,yarn-cluster。該參數(shù)必選。class參數(shù)指定應(yīng)用程序主類。該參數(shù)必選。name參數(shù)指定作業(yè)名稱。該參數(shù)可選。driver-memory參數(shù)指定Driver需要的內(nèi)存。該參數(shù)可選,默認為512MBexecutor-memory參數(shù)指定每個executor需要的內(nèi)存。該參數(shù)可選,默認為1GBexecutor-cores參數(shù)指定每個executor線程數(shù),相當(dāng)于每個executor中的task數(shù)。該參數(shù)可選,默認為1num-executors參數(shù)指定需啟動的Executor總數(shù)。該參數(shù)可選,默認為2queue參數(shù)指定提交應(yīng)用程序給哪個YARN隊列。該參數(shù)可選,默認為default隊列。conf參數(shù)指定用戶自定義配置。最后一行指定用戶應(yīng)用程序所在jar包,必選,且一定要放在整個spark-submit提交命令的最后。提交Spark程序(運行在YARN上)內(nèi)容三種計算框架Spark產(chǎn)生背景Spark特點Spa

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論