Spark—開發(fā)指引_第1頁
Spark—開發(fā)指引_第2頁
Spark—開發(fā)指引_第3頁
Spark—開發(fā)指引_第4頁
Spark—開發(fā)指引_第5頁
免費預覽已結束,剩余4頁可下載查看

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

1、 Spark開發(fā)指南 從高的層面來看,其實每一個Spark的應用,都是一個Driver類,通過運行用戶定義的main函數(shù),在集群上執(zhí)行各種并發(fā)操作和計算 Spark提供的最主要的抽象,是一個彈性分布式數(shù)據(jù)集(RDD),它是一種特殊集合,可以分 布在集群的節(jié)點上,以函數(shù)式編程操作集合的方式,進行各種各樣的并發(fā)操作。它可以由hdfs上的一個文件創(chuàng)建而來,或者是Driver程序中,從一個已經存在的集合轉換而來。用戶可以將數(shù)據(jù)集緩存在內存中,讓它被有效的重用,進行并發(fā)操作。最后,分布式數(shù)據(jù)集可 以自動的從結點失敗中恢復,再次進行計算。 Spark的第二個抽象,是并行計算中使用的共享變量。默認來說,當S

2、park并發(fā)運行一個函數(shù)時,它是以多個的task,在不同的結點上運行,它傳遞每一個變量的一個拷貝,到每一個獨立task使用到的函數(shù)中,因此這些變量并非共享的。然而有時候,我們需要在任務中能夠被共享的變量,或者在任務與驅動程序之間共享。Spark支持兩種類型的共享變量:廣 播變量:可以在內存的所有結點中被訪問,用于緩存變量(只讀)累加器:只能用來做加法 的變量,例如計數(shù)和求和 本指南通過一些樣例展示這些特征。讀者最好是熟悉Scala,尤其是閉包的語法。請留意, Spark可以通過Spark-Shell的解釋器進行交互式運行。你可能會需要它。 注:所謂“閉包”,指的是一個擁有許多變量和綁定了這些變

3、量的環(huán)境的表達式(通常是一個函數(shù)),因而這些變量也是該表達式的一部分。 接入Spark 為了寫一個Spark的應用,你需要將Spark和它的依賴,加入到CLASSPATH中。最簡單的方法,就是運行sbt/sbtassembly來編譯Spark和它的依賴,打到一個Jar里面 core/target/scala_2.9.1/spark-core-assembly-0.0.0.jar,然后將它加入到你的CLASSPATH 中。或者你可以選擇將spark發(fā)布到maven的本地緩存中,使用sbt/sbtpublish。它將在 組織org.spark-project下成為一個spark-core. 另外,

4、你會需要導入一些Spark的類和隱式轉換,將下面幾行加入到你程序的頂部 importspark.SparkContext importSparkContext._ 初始化Spark 寫Spark程序需要做的第一件事情,就是創(chuàng)建一個SparkContext對象,它將告訴Spark如何訪問一個集群。這個通常是通過下面的構造器來實現(xiàn)的: newSparkContext(master,jobName,sparkHome,jars) Master參數(shù)是一個字符串,指定了連接的Mesos集群,或者用特殊的字符串“l(fā)oc冰指明 用local模式運行。如下面的描述一般,JobName是你任務的名稱,當在集群上

5、運行的時候,將會在Mesos的WebUI監(jiān)控界面顯示。后面的兩個參數(shù),是用在將你的代碼,部署到mesos集群上運行時使用的,后面會提到。 在Spark的解釋器中,一個特殊的SparkContext變量已經為你創(chuàng)建,變量名字叫sc。 分布式數(shù)據(jù)集 Spark圍繞的核心概念,是彈性分布式數(shù)據(jù)集(RDD),一個有容錯機制,可以被并行操作 的集合。RDD是只讀的、分區(qū)記錄的集合。RDD只能基于在穩(wěn)定物理存儲中的數(shù)據(jù)集和其 他已有的RDD上執(zhí)行確定性操作來創(chuàng)建。這些確定性操作稱之為轉換,如map、filtergroupBy、join(轉換不是程開發(fā)人員在RDD上執(zhí)行的操作)。 RDD不需要物化。RDD含

6、有如何從其他RDD衍生(即計算)出本RDD的相關信息(即Lineage),據(jù)此可以從物理存儲的數(shù)據(jù)計算出相應的RDD分區(qū)。 TherearetwowaystocreateRDDs:parallelizinganexistingcollectioninyourdriver program,orreferencingadatasetinanexternalstoragesystem,suchasasharedfilesystem,HDFS,HBase,oranydatasourceofferingaHadoopInputFormat. 目前有兩種類型的RDD(紅字翻譯錯誤)-有兩種創(chuàng)建RDD的方式

7、:并行集合(ParrallelizedCollections),接收一個已經存在的Scala集合,在它上面運行各種并發(fā)計算;Hadoop數(shù)據(jù) 集(HadoopDataSets),在一個文件的每條記錄上,運行各種函數(shù)。只要文件系統(tǒng)是Hdfs,或者hadoop支持的任意存儲系統(tǒng)。這兩種RDD都可以通過相同的方式進行操作。 并行集合 并行集合是通過調用SparkContext的parallelize方法,在一個已經存在的Scala集合(只 要是seq對象就可以)上創(chuàng)建而來。集合的對象將會被拷貝來創(chuàng)建一個分布式數(shù)據(jù)集,可以 被 并 行 操 作 。 下 面 通 過spark解 釋 器 的 例 子 , 展

8、 示 如 何 從 一 個 數(shù) 組 創(chuàng) 建 一 個 并 發(fā) 集 合scalavaldata=Array(1,2,3,4,5) data:ArrayInt=Array(1,2,3,4,5)scalavaldistData=sc.parallelize(data)distData:spark.RDDInt=spark.ParallelCollection10d13e3e 一旦被創(chuàng)建,分布數(shù)據(jù)集(distData)可以被并行操作。例如,我們可以調用distData.reduce(+)來將數(shù)組的元素相加。我們會在后續(xù)的分布式數(shù)據(jù)集做進一步描述。 創(chuàng)建并行集合的一個重要參數(shù),是partition的數(shù)目,它

9、指定了將數(shù)據(jù)集切分為幾份。在集 群模式中,Spark將會在一份partition上起一個Task。典型的,你可以在集群中的每個cpu 上,起2-4個partitions(也就是每個cpu分配2-4個Task)。一般來說,Spark會嘗試根據(jù)集群的狀況,來自動設定partitions的數(shù)目。然而,你也可以手動的設置它,通過parallelize 方法的第二個參數(shù)(例如:sc.parallelize(data,10). Hadoop數(shù)據(jù)集 Spark可以創(chuàng)建分布式數(shù)據(jù)集,從任何存儲在HDFS文件系統(tǒng)或者Hadoop支持的其它文件 系統(tǒng)(包括本地文件,AmazonS3,Hypertable,HBas

10、e等等)上的文件。Spark可以支持TextFile,SequenceFiles及其它任何Hadoop輸入格式 文本文件的RDDs可以通過SparkContext的textFile方法創(chuàng)建,該方法接受文件的URI地 址(或者機器上的文件本地路徑,或者一個hdfs:/,sdn:/,kfs:/,其它URI).這里是一個調 用例子: scalavaldistFile=sc.textFile(data.txt) distFile:spark.RDDString=spark.HadoopRDD1d4cee08 一旦被創(chuàng)建,distFile可以進行數(shù)據(jù)集操作。例如,我們可以使用如下的map和reduce操

11、 作將所有行數(shù)的長度相加: distFile.map(_.size).reduce(_+_) 方法也接受可選的第二參數(shù),來控制文件的分片數(shù)目。默認來說,Spark為每一塊文件創(chuàng)建 一個分片(HDFS默認的塊大小為64MB),但是你可以通過傳入一個更大的值來指定更多的分片。注意,你不能指定一個比塊個數(shù)更少的片值(和hadoop中,Map數(shù)不能小于Block數(shù)一樣) 對于SequenceFiles,使用SparkContext的sequenceFileK,V方法,K和V是文件中的key和values類型。他們必須是Hadoop的Writable的子類,例如IntWritable和Text。另外,S

12、park允許你指定幾種原生的通用Writable類型,例如:sequencFileInt,String會自動讀取IntWritable和Texts 最后,對于其他類型的Hadoop輸入格式,你可以使用SparkContext.hadoopRDD方法,它可以接收任意類型的JobConf和輸入格式類,鍵類型和值類型。按照對Hadoop作業(yè)一樣的方法,來設置輸入源就可以了。 分布式數(shù)據(jù)集操作 分布式數(shù)據(jù)集支持兩種操作: 轉換(transformation):根據(jù)現(xiàn)有的數(shù)據(jù)集創(chuàng)建一個新的數(shù)據(jù)集 動作(actions):在數(shù)據(jù)集上運行計算后,返回一個值給驅動程序 例如,Map是一個轉換,將數(shù)據(jù)集的每一個

13、元素,都經過一個函數(shù)進行計算后,返回一個 新的分布式數(shù)據(jù)集作為結果。而另一方面, Reduce是 某個函數(shù)進行聚合,然后將最終結果返回驅動程序,而并行的分布式數(shù)據(jù)集 個動作,將數(shù)據(jù)集的所有元素,用 reduceByKey還是返回一個 所有Spark中的轉換都是惰性的,也就是說,并不會馬上發(fā)生計算。相反的,它只是記住應 用到基礎數(shù)據(jù)集上的這些轉換(Transformation)。而這些轉換(Transformation),只會 在有一個動作(Action)發(fā)生,要求返回結果給驅動應用時,才真正進行計算。這個設計讓 Spark更加有效率的運行。 例如, 我們可以實現(xiàn), 通過map創(chuàng)建一個數(shù)據(jù)集,

14、然后再用reduce,而只返回reduce的結果給driver,而不是整個大的數(shù)據(jù)集。 spark提供的一個重要轉換操作是Caching。當你cache一個分布式數(shù)據(jù)集時,每個節(jié)點會 存儲該數(shù)據(jù)集的所有片,并在內存中計算,并在其它操作中重用。這將會使得后續(xù)的計算更 加的快速(通常是10倍),緩存是spark中一個構造迭代算法的關鍵工具,也可以在解釋器中交互使用。 注意,有些操作只對鍵值對可用,比如API匹配,例如map是一對一的映射,MapReduce中的map類似)。 join。另外,函數(shù)名與Scala及其他函數(shù)式語言中的 而flatMap是將每個輸入映射為一個或多個輸出(與 除了這些操作以

15、外,用戶還可以請求將 類獲取RDD的分區(qū)順序,然后將另一個 RDD緩存起來。而且,用戶還可以通過Partitioner RDD按照同樣的方式分區(qū)。有些操作會自動產生一 個哈希或范圍分區(qū)的RDD,像groupByKey,reduceByKey和sort等。 卜面的表格列出目前支持的轉換和動作: Transformation Meaning map(func) 返回一個新的分布式數(shù)據(jù)集, 由每個原兀素經過func函數(shù)轉換后組成 filter(func) 返回一個新的數(shù)據(jù)集,由經過func函數(shù)后返回值為 true的原元素組成 轉換(Transformations) 4 flatMap(func) 類

16、似于map,但是每一個輸入兀素,會被映射為0 到多個輸出元素(因此,func函數(shù)的返回值是一個Seq,而不單兀系) mapPartitions(func) mapPartitions是map的一個變種。map的輸入函數(shù)是應用于RDD中每個兀素,而mapPartitions的輸人函數(shù)是應用于每個分區(qū),也就是把每個分區(qū)中的內容作為整體來處理的。 mapPartitionsWithlndex(func) SimilartomapPartitions,butalsoprovidesfuncwithanintegervaluerepresentingtheindexofthepartition,sofu

17、ncmustbeoftype(Int,Iterator)=IteratorwhenrunningonanRDDoftypeT. sample(withReplacement,frac,seed) 根據(jù)給定的隨機種子seed,隨機抽樣出數(shù)量為frac 的數(shù)據(jù) union(otherDataset) 返向一個新的數(shù)據(jù)集,由原數(shù)據(jù)集和參數(shù)聯(lián)合而成 ntersection(otherDataset) distinct(numTasks) 數(shù)據(jù)兀素的去重。 groupByKey(numTasks) 在一個由(K,V)對組成的數(shù)據(jù)集上調用,返回一個 (K,SeqV)對的數(shù)據(jù)集。注意:默認情況下,使 用8個

18、并行任務進行分組,你可以傳入numTask可 選參數(shù),根據(jù)數(shù)據(jù)量設置不同數(shù)目的Task (groupByKey和filter結合, 可以實現(xiàn)類似Hadoop中的Reduce功能) reduceByKey(func,numTasks) 在一個(K,V)對的數(shù)據(jù)集上使用,返回一個(K,V)對的數(shù)據(jù)集,key相同的值, 都被使用指定的reduce函數(shù)聚合到起。和groupbykey類似,任務的個數(shù)是可以通過第二個可選參數(shù)來配置的。 join(otherDataset,numTasks) 在類型為(K,V)和(K,W)類型的數(shù)據(jù)集上調用,返回一個(K,(V,W)對,每個key中的所有兀素都在一起出數(shù)據(jù)

19、集_ groupWith(otherDataset,numTasks) 官力:cogroup(otherDataset,numTasks) 在類型為(K,V)和(K,W)類型的數(shù)據(jù)集上調用,返回 一個數(shù)據(jù)集,組成元素為(K,SeqV,SeqW) Tuples。這個操作在其它框架,稱為CoGroup cartesian(otherDataset) 笛卡爾積。但在數(shù)據(jù)集T和U上調用時,返回一個(T,U)對的數(shù)據(jù)集,所有元素交互進行笛卡爾積。 sortByKey(ascendingOrder) 在類型為(K,V)的數(shù)據(jù)集上調用, 返回以K為鍵進行排序的(K,V)對數(shù)據(jù)集。升序或者降序由boolean

20、 型的ascendingOrder參數(shù)決定 (類似于Hadoop的Map-Reduce中間階段的Sort,按Key進行排序) pipe(command,envVars) coalesce(numPartitions) repartition(numPartitions) repartitionAndSortWithinPartitions (partitioner) Actions(動作) Action Meaning reduce(func) 通過均數(shù)func聚集數(shù)據(jù)集中的所有兀素。Func圖數(shù)接受2個參 數(shù),返回一個值。這個函數(shù)必須是關聯(lián)性的,確保可以被正確的 并發(fā)執(zhí)行 collect()

21、 在Driver的程序中,以數(shù)組的形式,返回數(shù)據(jù)集的所有元素。這通常會在使用filter或者其它操作后,返回一個足夠小的數(shù)據(jù)子集再使用,直接將整個RDD集Collect返回,很可能會讓Driver程序OOM count() 返回數(shù)據(jù)集的兀素個數(shù) take(n) 返回一個數(shù)組,由數(shù)據(jù)集的前n個兀素組成。注意,這個操作目 前并非在多個節(jié)點上,并行執(zhí)行,而是Driver程序所在機器,單 機計算所有的元素 (Gateway的1內存壓力會增大,需要謹慎使用) first() 返回數(shù)據(jù)集的 A 個兀素(類似于take(1) saveAsTextFile(path) 將數(shù)據(jù)集的兀素,以texfile的形式,

22、保存到本地義件系統(tǒng),hdfs 或者任何其匕hadoop支持的文件余統(tǒng)。Spark將公調用母個兀素的toString方法,并將它轉換為文件中的一行文本 saveAsSequenceFile(path) 將數(shù)據(jù)集的兀素,以sequenceWe的格式,保存到指定的目錄下,本地系統(tǒng),hdfs或者任何其它hadoop支持的文件系統(tǒng)。RDD的九素必須由key-value對組成,并都實現(xiàn)了Hadoop的Writable接口,或隱式可以轉換為Writable(Spark包括了基本類型的轉 換,仞如Int,Double,String等等) foreach(func) 在數(shù)據(jù)集的每一個兀素上,運行函數(shù)func。這

23、通常用于更新一個累加器變量,或者和外部存儲系統(tǒng)做交互 RDD的持久化 用法:使用persist?;蛘遚ache()方法,其中cache()方法默認持久化到內存,persist可以自己選擇持久化的層次,在shuffle操作中,spark會自動保存中間計算結果,例如reduceBykey 作用:RDD的持久化會將會使得每個節(jié)點保存相應的計算部分,以便再次使用該數(shù)據(jù)集時 可以直接使用,加快計算速度 如何選擇持久化層次:如果RDDs在MEMORY_ONLY下表現(xiàn)良好的話,就選這個層次,這樣CPU效率最高 其次MEMORY_ONLY_SER,其他情況 /doc

24、s/latest/programming-guide.html 調用RDD的cache()方法,可以讓它在第一次計算后,將結果保持存儲在內存。數(shù)據(jù)集的不同部分,將會被存儲在計算它的不同的集群節(jié)點上,讓后續(xù)的數(shù)據(jù)集使用更快。緩存是有容 錯功能的,如果任一分區(qū)的RDD數(shù)據(jù)丟失了,它會被使用原來創(chuàng)建它的轉換,再計算一次 (不需要全部重新計算,只計算丟失的分區(qū)) cache() 需要使用多次的數(shù)據(jù)需要cache,否則會進行不必要的重復操作。舉個例子 valdata=,/readfromtdw println(data.filter(_.contains(error).count) println(da

25、ta.filter(_.contains(warning).count) 上面三段代碼中,data變量會加載兩次,高效的做法是在data加載完后,立刻 持久化到內存中,如下 valdata=,/readfromtdw data.cache println(data.filter(_.contains(error).count) println(data.filter(_.contains(warning).count) 這樣,data在第一加載后, 就被緩存到內存中, 后面兩次操作均直接使用內存中的數(shù)據(jù)。 SharedVariables 共享變量 一般來說,當一個函數(shù)被傳遞給Spark操作(例

26、如map和reduce),通常是在集群結點上 運行,在函數(shù)中使用到的所有變量,都做分別拷貝,供函數(shù)操作,而不會互相影響。這些變 量會被拷貝到每一臺機器,而在遠程機器上,在對變量的所有更新,都不會被傳播回Driver 程序。然而,Spark提供兩種有限的共享變量,供兩種公用的使用模式:廣播變量和累加器 廣播變量 廣播變量允許程序員保留一個只讀的變量,緩存在每一臺機器上,而非每個任務保存一份拷 貝。他們可以使用,例如,給每個結點一個大的輸入數(shù)據(jù)集,以一種高效的方式。Spark也 會嘗試,使用一種高效的廣播算法,來減少溝通的損耗。 廣播變量是從變量V創(chuàng)建的,通過調用SparkContext.broadcast(v)方法。這個廣播變量是 一個v的分裝器,它的只可以通過調用value方法獲得。如下的解釋器模塊展示了如何應用: scalavalbroadcastVar=sc.broadcast(Array(1,2,3) broadcastVar:spark.BroadcastArr

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論