Spark大數(shù)據(jù)分析實戰(zhàn)_第1頁
Spark大數(shù)據(jù)分析實戰(zhàn)_第2頁
Spark大數(shù)據(jù)分析實戰(zhàn)_第3頁
Spark大數(shù)據(jù)分析實戰(zhàn)_第4頁
Spark大數(shù)據(jù)分析實戰(zhàn)_第5頁
已閱讀5頁,還剩285頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

Spark大數(shù)據(jù)分析實戰(zhàn)

目錄

第1章Spark簡介

1.1初識Spark

1.2Spark生態(tài)系統(tǒng)BDAS

1.3Spark架構(gòu)與運行邏輯

1.4彈性分布式數(shù)據(jù)集

1.4.1RDD簡介

1.4.2RDD算子分類

1.5本章小結(jié)

第2章Spark開發(fā)與環(huán)境配置

2.1Spark應(yīng)用開發(fā)環(huán)境配置

2.1.1使用Intellii開發(fā)Spark程序

2.1.2使用SparkShell進(jìn)行交互式數(shù)據(jù)分析

2.2遠(yuǎn)程調(diào)試Spark程序

2.3Spark編譯

2.4配置Spark源碼閱讀環(huán)境

2.5木章小結(jié)

第3章BDAS簡介

3.1SOLonSpark

3.1.1為什么使用SparkSOL

3.1.2SparkSQL架構(gòu)分析

3.2SparkStreaming

321SparkStreaming簡介

3.2.2SparkStreaming架構(gòu)

3.2.3SparkStreaming原理剖析

3.3GraphX

3.3.1GraphX簡介

3.3.2GraphX的使用簡介

3.3.3GraphX體系結(jié)構(gòu)

3.4MLlib

3.4.1MLlib簡介

342MLiib中的聚類和分類

3.5本章小結(jié)

4.1日志分析概述

4.2日志分析指標(biāo)

4.3Lamda架構(gòu)

44構(gòu)建日志分析數(shù)據(jù)流水線

4.41用Flume進(jìn)行日志采集

4.4.2用Kafka將口志匯總

4.4.3用SparkStreaming進(jìn)行實時H志分析

444SparkSQL離線口志分析

445用Flask將日志KPI可視化

4.5本章小結(jié)

第5章基于云平臺和用戶日志的推薦系統(tǒng)

5.1Azure云平臺簡介

5.1.1Azure網(wǎng)站模型

5.1.2Azure數(shù)據(jù)存儲

5.1.3AzureQueue消息傳遞

5.2系統(tǒng)架構(gòu)

5.3構(gòu)建Node.js應(yīng)用

5.3.1創(chuàng)建AzureWeb應(yīng)用

5.3.2構(gòu)建本地Node.js網(wǎng)站

533發(fā)布應(yīng)用到云平臺

5.4數(shù)據(jù)收集與預(yù)處理

5.4.1通過IS收集用戶行為日志

5.4.2用戶實時行為回傳到AzureQueue

5.5SparkStreaming實時分析用戶日志

5.5.1構(gòu)建AzureQueue的SparkStreamingReceiver

5.5.2SparkStreaming實時處理AzureOueue口志

5.5.3SparkStreaming數(shù)據(jù)存儲于AzureTable

5.6MLlib離線訓(xùn)練模型

561加載訓(xùn)練數(shù)據(jù)

5.6.2使用ratingRDD訓(xùn)練ALS模型

563使用ALS模型進(jìn)行電影推薦

564評估模型的均方差

5.7本章小結(jié)

第6章Twitter情感分析

6.1系統(tǒng)架構(gòu)

6.2Twitter數(shù)據(jù)收集

6.2.1設(shè)置

6.2.2SparkStreaming接收并輸出Tweet

63數(shù)據(jù)預(yù)處理與Cassandra存儲

631添力口SBT依賴

6.3.2創(chuàng)建CassandraSchema

6.3.3數(shù)據(jù)存儲于Cassandra

64SparkStreaming熱點Twitter分析

6.5SparkStreaming在線情感分析

6.6SparkSOL進(jìn)行Twitter分析

6.6.1讀取Cassandra數(shù)據(jù)

662查看【SON數(shù)據(jù)模式

663SparkSQL分析Twitter

6.7Twitter可視化

6.8本章小結(jié)

7.1新聞數(shù)據(jù)分析

7.2系統(tǒng)架構(gòu)

7.3爬蟲抓取網(wǎng)絡(luò)信息

7.3.1Scrapy簡介

732創(chuàng)建基于Scrapy的新聞爬蟲

7.3.3爬蟲分布式化

7.4新聞文木數(shù)據(jù)預(yù)處理

7.5新聞聚類

7.5.1數(shù)據(jù)轉(zhuǎn)換為向量(向量空間模型VSM)

752新聞聚類

753詞向量同義詞查詢

7.5.4實時熱點新聞分析

7.6SparkElasticSearch構(gòu)建全文檢索弓I擎

7.6.1部署ElasticSearch

7.6.2用ElasticSearch索弓IMongoDB數(shù)據(jù)

7.6.3通過ElasticSearch檢索數(shù)據(jù)

7.7本章小結(jié)

第8章構(gòu)建分布式的協(xié)同過濾推薦系統(tǒng)

8.1推薦系統(tǒng)簡介

8.2協(xié)同過濾介紹

8.2.1基于用戶的協(xié)同過濾算法User-basedCF

822基于項目的協(xié)同過濾算法Item-basedCF

823基于模型的協(xié)同過濾推薦Model-basedCF

8.3基于Spark的矩陣運算實現(xiàn)協(xié)同過濾算法

8.3.1Spark中的矩陣類型

8.3.2Spark中的矩陣運算

8.3.3實現(xiàn)User-based協(xié)同過濾的示例

8.3.4實現(xiàn)Item-based協(xié)同過濾的示例

8.3.5基于奇異值分解實現(xiàn)Model-based協(xié)同過濾的示例

8.4基于Spark的MLlib實現(xiàn)協(xié)同過濾算法

8.4.1MLlib的推薦算法工具

842MLlib協(xié)同過濾推薦示例

8.5案例:使用MLlib協(xié)同過濾實現(xiàn)電影推薦

8.5.1MovieLens數(shù)據(jù)集

852確定最佳的協(xié)同過濾模型參數(shù)

853利用最佳模型進(jìn)行電影推薦

8.6本章小結(jié)

第9章基于Spark的社交網(wǎng)絡(luò)分析

9.1社交網(wǎng)絡(luò)介紹

9.1.1社交網(wǎng)絡(luò)的類型

9.1.2社交網(wǎng)絡(luò)的相關(guān)概念

9.2社交網(wǎng)絡(luò)中社團(tuán)挖掘算法

921聚類分析和K均值算法簡介

922社團(tuán)挖掘的衡量指標(biāo)

923基于譜聚類的社團(tuán)挖掘算法

9.3Spark中的K均值算法

9.3.1Spark中與K均值有關(guān)的對象和方法

9.3.2Spark下K均值算法示例

9.4案例:基于Spark的Facebook社團(tuán)挖掘

941SNAP社交網(wǎng)絡(luò)數(shù)據(jù)集介紹

9.4.2基于Spark的社團(tuán)挖掘?qū)崿F(xiàn)

9.5社交網(wǎng)絡(luò)中的鏈路預(yù)測算法

951分類學(xué)習(xí)簡介

952分類器的評價指標(biāo)

9.5.3基于Logistic回歸的鏈路預(yù)測算法

9.6SparkMLlib中的Logistic回歸

961分類器相關(guān)對象

9.6.2模型驗證對象

9.6.3基于Snark的LogisticI可歸示例

9.7案例:基于Spark的鏈路預(yù)測算法

9.7.1SNAP符號社交網(wǎng)絡(luò)Epinions數(shù)據(jù)集

9.7.2基于Spark的鏈路預(yù)測算法

9.8本章小結(jié)

第10章基于Spark的大規(guī)模新聞主題分析

10.1主題模型簡介

10.2主題模型LDA

10.2.1LDA模型介紹

10.2.2LDA的訓(xùn)練算法

10.3Spark中的LDA模型

10.3.1MLlib對LDA的支持

10.3.2Spark中LDA模型訓(xùn)練示例

1。4案例:Newsgroups新聞的主題分析

10.4.1Newsgroups數(shù)據(jù)集介名召

10.4.2交叉驗證估計新聞的主題個數(shù)

10.4.3基于主題模型的文本聚類算法

10.4.4基于主題模型的文本分類算法

10.5本章小結(jié)

第11章構(gòu)建分布式的搜索引擎

11.1搜索引擎簡介

11.2搜索排序概述

11.3查詢無關(guān)模型PageRank

114基于Spark的分存式PageRank實現(xiàn)

1L4.1PageRank的MapReduce實現(xiàn)

114.2Spark的分相式圖模型GraphX

11.4.3基于GraphX的PageRank實現(xiàn)

115窠例:GoogleWebGraph的PageRank計算

11.6查詢相關(guān)模型RankingSVM

1L7Spark中支持向量機(jī)的實現(xiàn)

1171Spark中的支持向量機(jī)模型

1172使用Spark測試數(shù)據(jù)演示支持向量機(jī)的訓(xùn)練

1L8案例:基于MSLR數(shù)據(jù)集的杳詢排序

118.1MicrosoftLearningtoRank數(shù)據(jù)集介紹

11.8.2基于Spark的RankingSVM實現(xiàn)

11.9本章小結(jié)

第1章Spark簡介

本章主要介紹Spark框架的概念、生態(tài)系統(tǒng)、架構(gòu)及RDD等,并圍繞Spark的BDAS項

目及其子項目進(jìn)行了簡要介紹。目前,Spark生態(tài)系統(tǒng)已經(jīng)發(fā)展成為一個包含多個子項目

的集合,其中包含SparkSQL、SparkStreaming、GraphX、MLlib等子項目,本章只進(jìn)行

簡要介紹,后續(xù)章節(jié)會有詳細(xì)闡述。

1.1初識Spark

Spark是基于內(nèi)存計算的大數(shù)據(jù)并行計算框架,因為它基于內(nèi)存計算,所以提高了在大數(shù)

據(jù)環(huán)境下數(shù)據(jù)處理的實時性,同時保證了高容錯性和高可伸縮性,允許用戶將Spark部

署在大量廉價硬件之上,形成集群。

l.Spark執(zhí)行的特點

Hadoop中包含計算框架MapReduce和分布式文件系統(tǒng)HDFS。

Spark是MapReduce的替代方案,而且兼容HDFS、Hive等分布式存儲層,融入Hadoop

的生態(tài)系統(tǒng),并彌補(bǔ)MapReduce的不足。

(1)中間結(jié)果輸出

Spark將執(zhí)行工作流抽象為通用的有向無環(huán)圖執(zhí)行計劃(DAG),可以將多Stage的任務(wù)

串聯(lián)或者并行執(zhí)行,而無需將Stage的中間結(jié)果輸出到HDFS中,類似的引擎包括Flink、

Dryad>Tezo

(2)數(shù)據(jù)格式和內(nèi)存布局

Spark抽象出分布式內(nèi)存存儲結(jié)構(gòu)彈性分布式數(shù)據(jù)集RDD,可以理解為利用分布式的數(shù)組

來進(jìn)行數(shù)據(jù)的存儲。RDD能支持粗粒度寫操作,但對于讀取操作,它可以精確到每條記

錄。Spark的特性是能夠控制數(shù)據(jù)在不同節(jié)點上的分區(qū),用戶可以自定義分區(qū)策略。

(3)執(zhí)行策略

Spark執(zhí)行過程中不同Stage之間需要進(jìn)行Shuffle?Shuffle是連接有依賴的Stage的橋梁,

上游Stage輸出到下游Stage中必須經(jīng)過Shuffle這個環(huán)節(jié),通過Shuffle將相同的分組數(shù)

據(jù)拆分后聚合到同一個節(jié)點再處理。SparkShuffle支持基于Hash或基于排序的分布式聚

合機(jī)制。

(4)任務(wù)調(diào)度的開銷

Spark采用了事件驅(qū)動的類庫AKKA來啟動任務(wù),通過線程池的復(fù)用線程來避免系統(tǒng)啟動

和切換開銷。

2.Spark的優(yōu)勢

Spark的一站式解決方案有很多的優(yōu)勢,分別如下所述。

(1)打造全棧多計算范式的高效數(shù)據(jù)流水線

支持復(fù)雜查詢與數(shù)據(jù)分析任務(wù)。在簡單的“Map”及"Reduce”操作之外,Spark還支持SQL

查詢、流式計算、機(jī)器學(xué)習(xí)和圖算法。同時,用戶可以在同一個工作流中無縫搭配這些計

算范式。

(2)輕量級快速處理

Spark代碼量較小,這得益于Scala語言的簡潔和豐富表達(dá)力,以及Spark通過External

DataSourceAPI充分利用和集成Hadoop等其他第三方組件的能力。同時Spark基于內(nèi)存

計算,可通過中間結(jié)果緩存在內(nèi)存來減少磁盤I/O以達(dá)到性能的提升。

(3)易于使用,支持多語言

Spark支持通過Scala、Java和Python編寫程序,這允許開發(fā)者在自己熟悉的語言環(huán)境下

進(jìn)行工作。它自帶了80多個算子,同時允許在Shell中進(jìn)行交互式計算。用戶可以利用

Spark像書寫單機(jī)程序一樣書寫分布式程序,輕松利用Spark搭建大數(shù)據(jù)內(nèi)存計算平臺并

充分利用內(nèi)存計算,實現(xiàn)海量數(shù)據(jù)的實時處理。

(4)與ExternalDataSource多數(shù)據(jù)源支持

Spark可以獨立運行,除了可以運行在當(dāng)下的Yarn集群管理之外,它還可以讀取已有的

任何Hadoop數(shù)據(jù)。它可以運行多種數(shù)據(jù)源,比如Parquet、Hive、HBase、HDFS等。這

個特性讓用戶可以輕易遷移已有的持久化層數(shù)據(jù)。

(5)社區(qū)活躍度高

Spark起源于2009年,當(dāng)下已有超過600多位工程師貢獻(xiàn)過代碼。開源系統(tǒng)的發(fā)展不應(yīng)

只看一時之快,更重要的是一個活躍的社區(qū)和強(qiáng)大的生態(tài)系統(tǒng)的支持。

同時也應(yīng)該看到Spark并不是完美的,RDD模型適合的是粗粒度的全局?jǐn)?shù)據(jù)并行計算;

不適合細(xì)粒度的、需要異步更新的計算。對于一些計算需求,如果要針對特定工作負(fù)載達(dá)

到最優(yōu)性能,還需要使用一些其他的大數(shù)據(jù)系統(tǒng)。例如,圖計算領(lǐng)域的GraphLab在特定

計算負(fù)載性能上優(yōu)于GraphX,流計算中的Storm在實時性要求很高的場合要更勝Spark

Streaming一籌。

1.2Spark生態(tài)系統(tǒng)BDAS

目前,Spark已經(jīng)發(fā)展成為包含眾多子項目的大數(shù)據(jù)計算平臺。BDAS是伯克利大學(xué)提出

的基于Spark的數(shù)據(jù)分析棧(BDAS)。其核心框架是Spark,同時涵蓋支持結(jié)構(gòu)化數(shù)據(jù)

SQL查詢與分析的查詢引擎SparkSQL,提供機(jī)器學(xué)習(xí)功能的系統(tǒng)MLBase及底層的分布

式機(jī)器學(xué)習(xí)庫MLlib,并行圖計算框架GraphX,流計算框架SparkStreaming,近似查詢

引擎BlinkDB,內(nèi)存分布式文件系統(tǒng)Tachyon,資源管理框架Mesos等子項目。這些子項

目在Spark上層提供了更高層、更豐富的計算范式。

圖1-1展現(xiàn)了BDAS的主要項目結(jié)構(gòu)圖。

MLlib

SparkSparkGraphX

(machine

SQL(graph)

□Learning)

ApacheSpark

圖1-1伯克利數(shù)據(jù)分析棧(BDAS)主要項目結(jié)構(gòu)圖

下面對BDAS的各個子項目進(jìn)行更詳細(xì)的介紹。

(1)Spark

Spark是整個BDAS的核心組件,是一個大數(shù)據(jù)分布式編程框架,不僅實現(xiàn)了MapReduce

的算子map函數(shù)和reduce函數(shù)及計算模型,還提供了更為豐富的算子,例如filter、join、

groupByKey等。Spark將分布式數(shù)據(jù)抽象為RDD(彈性分布式數(shù)據(jù)集),并實現(xiàn)了應(yīng)用

任務(wù)調(diào)度、RPC、序列化和壓縮,并為運行在其上層的組件提供API。其底層采用Scala

這種函數(shù)式語言書寫而成,并且所提供的API深度借鑒函數(shù)式的編程思想,提供與Scala

類似的編程接口。

圖1-2所示即為Spark的處理流程(主要對象為RDD)。

Spark將數(shù)據(jù)在分布式環(huán)境下分區(qū),然后將作業(yè)轉(zhuǎn)化為有向無環(huán)圖(DAG),并分階段進(jìn)

行DAG的調(diào)度和任務(wù)的分布式并行處理。

(2)SparkSQL

SparkSQL提供在大數(shù)據(jù)上的SQL查詢功能,類似于Shark在整個生態(tài)系統(tǒng)的角色,它們

可以統(tǒng)稱為SQLonSpark。之前,由于Shark的查詢編譯和優(yōu)化器依賴Hive,使得Shark

不得不維護(hù)一套Hive分支。而SparkSQL使用Catalyst作為查詢解析和優(yōu)化器,并在底

層使用Spark作為執(zhí)行引擎實現(xiàn)SQL的算子。用戶可以在Spark上直接書寫SQL,相當(dāng)

于為Spark擴(kuò)充了一套SQL算子,這無疑更加豐富了Spark的算子和功能。同時Spark

SQL不斷兼容不同的持久化存儲(如HDFS、Hive等),為其發(fā)展奠定廣闊的空間。

圖1-2Spark的任務(wù)處理流程圖

(3)SparkStreaming

SparkStreaming通過將流數(shù)據(jù)按指定時間片累積為RDD,然后將每個RDD進(jìn)行批處理,

進(jìn)而實現(xiàn)大規(guī)模的流數(shù)據(jù)處理。其吞吐量能夠超越現(xiàn)有主流流處理框架Storm,并提供豐

富的API用于流數(shù)據(jù)計算。

(4)GraphX

GraphX基于BSP模型,在Spark之上封裝類似Pregel的接口,進(jìn)行大規(guī)模同步全局的

圖計算,尤其是當(dāng)用戶進(jìn)行多輪迭代的時候,基于Spark內(nèi)存計算的優(yōu)勢尤為明顯。

(5)MLlib

MLlib是Spark之上的分布式機(jī)器學(xué)習(xí)算法庫,同時包括相關(guān)的測試和數(shù)據(jù)生成器。

MLlib支持常見的機(jī)器學(xué)習(xí)問題,例如分類、回歸、聚類以及協(xié)同過濾,同時也包括一個

底層的梯度下降優(yōu)化基礎(chǔ)算法。

1.3Spark架構(gòu)與運行邏輯

l.Spark的架構(gòu)

?Driver:運行Application的main()函數(shù)并且創(chuàng)建SparkContext。

?Client:用戶提交作業(yè)的客戶端。

?Worker:集群中任何可以運行Application代碼的節(jié)點,運行一個或多個Executor進(jìn)程。

?Executor:運行在Worker的Task執(zhí)行器,Executor啟動線程池運行Task,并且負(fù)責(zé)將

數(shù)據(jù)存在內(nèi)存或者磁盤上。每個Application都會申請各自的Executor來處理任務(wù)。

?SparkContext:整個應(yīng)用的上下文,控制應(yīng)用的生命周期。

?RDD:Spark的基本計算單元,一組RDD形成執(zhí)行的有向無環(huán)圖RDDGrapho

?DAGScheduler:根據(jù)Job構(gòu)建基于Stage的DAG工作流,并提交Stage給

TaskSchedulero

?TaskScheduler:將Task分發(fā)給Executor執(zhí)行。

?SparkEnv:線程級別的上下文,存儲運行時的重要組件的引用。

2.運行邏輯

(1)Spark作業(yè)提交流程

如圖1-3所示,Client提交應(yīng)用,Master找到一個Worker啟動Driver,Driver向

Master或者資源管理器申請資源,之后將應(yīng)用轉(zhuǎn)化為RDD有向無環(huán)圖,再由

DAGScheduler將RDD有向無環(huán)圖轉(zhuǎn)化為Stage的有向無環(huán)圖提交給TaskScheduler,由

TaskScheduler提交任務(wù)給Executor進(jìn)行執(zhí)行。任務(wù)執(zhí)行的過程中其他組件再協(xié)同工作確

保整個應(yīng)用順利執(zhí)行。

圖1-3Spark架構(gòu)

(2)Spark作業(yè)運行邏輯

如圖1-4所示,在Spark應(yīng)用中,整個執(zhí)行流程在邏輯上運算之間會形成有向無環(huán)圖。

Action算子觸發(fā)之后會將所有累積的算子形成一個有向無環(huán)圖,然后由調(diào)度器調(diào)度該圖

上的任務(wù)進(jìn)行運算。Spark的調(diào)度方式與MapReduce有所不同。Spark根據(jù)RDD之間不

同的依賴關(guān)系切分形成不同的階段(Stage),一個階段包含一系列函數(shù)進(jìn)行流水線執(zhí)行。

圖中的A、B、C、D、E、F,分別代表不同的RDD,RDD內(nèi)的一個方框代表一個數(shù)據(jù)塊。

數(shù)據(jù)從HDFS輸入Spark,形成RDDA和RDDC,RDDC上執(zhí)行map操作,轉(zhuǎn)換為RDD

D,RDDB和RDDE進(jìn)行join操作轉(zhuǎn)換為F,而在B到F的過程中又會進(jìn)行Shuffle。最

后RDDF通過函數(shù)saveAsSequenceFile輸出保存到HDFS中。

TranstbnnationsActions

A

SS

L

aLLC.

J

HH

tcxtF

4

icnccFilc

圖1-4Spark執(zhí)行有I可無環(huán)圖

1.4彈性分布式數(shù)據(jù)集

本節(jié)將介紹彈性分布式數(shù)據(jù)集RDDoSpark是一個分布式計算框架,而RDD是其對分布

式內(nèi)存數(shù)據(jù)的抽象,可以認(rèn)為RDD就是Spark分布式算法的數(shù)據(jù)結(jié)構(gòu),而RDD之上的

操作是Spark分布式算法的核心原語,由數(shù)據(jù)結(jié)構(gòu)和原語設(shè)計上層算法。Spark最終會將

算法(RDD上的一連串操作)翻譯為DAG形式的工作流進(jìn)行調(diào)度,并進(jìn)行分布式任務(wù)的

分發(fā)。

1.4.1RDD簡介

在集群背后,有一個非常重要的分布式數(shù)據(jù)架構(gòu),即彈性分布式數(shù)據(jù)集(Resilient

DistributedDataset,RDD)。它在集群中的多臺機(jī)器上進(jìn)行了數(shù)據(jù)分區(qū),邏輯上可以認(rèn)

為是一個分布式的數(shù)組,而數(shù)組中每個記錄可以是用戶自定義的任意數(shù)據(jù)結(jié)構(gòu)。RDD是

Spark的核心數(shù)據(jù)結(jié)構(gòu),通過RDD的依賴關(guān)系形成Spark的調(diào)度順序,通過對RDD的操

作形成整個Spark程序。

(1)RDD創(chuàng)建方式

1)從Hadoop文件系統(tǒng)(或與Hadoop兼容的其他持久化存儲系統(tǒng),如Hive、Cassandra、

HBase)輸入(例如HDFS)創(chuàng)建。

2)從父RDD轉(zhuǎn)換得到新RDDo

3)通過parallelize或makeRDD將單機(jī)數(shù)據(jù)創(chuàng)建為分布式RDD。

(2)RDD的兩種操作算子

對于RDD可以有兩種操作算子:轉(zhuǎn)換(Transformation)與行動(Action)。

1)轉(zhuǎn)換(Transformation):Transformation操作是延遲計算的,也就是說從一個RDD

轉(zhuǎn)換生成另一個RDD的轉(zhuǎn)換操作不是馬上執(zhí)行,需要等到有Action操作的時候才會真正

觸發(fā)運算。

2)行動(Action):Action算子會觸發(fā)Spark提交作業(yè)(Job),并將數(shù)據(jù)輸出Spark系

統(tǒng)。

(3)RDD的重要內(nèi)部屬性

通過RDD的內(nèi)部屬性,用戶可以獲取相應(yīng)的元數(shù)據(jù)信息。通過這些信息可以支持更復(fù)雜

的算法或優(yōu)化。

1)分區(qū)列表:通過分區(qū)列表可以找到一個RDD中包含的所有分區(qū)及其所在地址。

2)計算每個分片的函數(shù):通過函數(shù)可以對每個數(shù)據(jù)塊進(jìn)行RDD需要進(jìn)行的用戶自定義

函數(shù)運算。

3)對父RDD的依賴列表:為了能夠回溯到父RDD,為容錯等提供支持。

4)對key-valuepair數(shù)據(jù)類型RDD的分區(qū)器,控制分區(qū)策略和分區(qū)數(shù)。通過分區(qū)函數(shù)可

以確定數(shù)據(jù)記錄在各個分區(qū)和節(jié)點上的分配,減少分布不平衡。

5)每個數(shù)據(jù)分區(qū)的地址列表(如HDFS上的數(shù)據(jù)塊的地址)。

如果數(shù)據(jù)有副本,則通過地址列表可以獲知單個數(shù)據(jù)塊的所有副本地址,為負(fù)載均衡和容

錯提供支持。

(4)Spark計算工作流

圖1-5中描述了Spark的輸入、運行轉(zhuǎn)換、輸出。在運行轉(zhuǎn)換中通過算子對RDD進(jìn)行轉(zhuǎn)

換。算子是RDD中定義的函數(shù),可以對RDD中的數(shù)據(jù)進(jìn)行轉(zhuǎn)換和操作。

?輸入:在Spark程序運行中,數(shù)據(jù)從外部數(shù)據(jù)空間(例如,HDFS、Scala集合或數(shù)據(jù))

輸入到Spark,數(shù)據(jù)就進(jìn)入了Spark運行時數(shù)據(jù)空間,會轉(zhuǎn)化為Spark中的數(shù)據(jù)塊,通過

BlockManager進(jìn)行管理。

?運行:在Spark數(shù)據(jù)輸入形成RDD后,便可以通過變換算子fliter等,對數(shù)據(jù)操作并將

RDD轉(zhuǎn)化為新的RDD,通過行動(Action)算子,觸發(fā)Spark提交作業(yè)。如果數(shù)據(jù)需要

復(fù)用,可以通過Cache算子,將數(shù)據(jù)緩存到內(nèi)存。

?輸出:程序運行結(jié)束數(shù)據(jù)會輸出Spark運行時空間,存儲到分布式存儲中(如

saveAsTextFile輸出到HDFS)或Scala數(shù)據(jù)或集合中(collect輸出到Scala集合,count

返回ScalaInt型數(shù)據(jù))。

外部數(shù)據(jù)空間

圖1-5Spark算子和數(shù)據(jù)空間

Spark的核心數(shù)據(jù)模型是RDD,但RDD是個抽象類,具體由各子類實現(xiàn),如

MappedRDD、ShuffledRDD等子類。Spark將常用的大數(shù)據(jù)操作都轉(zhuǎn)化成為RDD的子類。

1.4.2RDD算子分類

本節(jié)將主要介紹Spark算子的作用,以及算子的分類。

Spark算子大致可以分為以下兩類。

1)Transformation變換算子:這種變換并不觸發(fā)提交作業(yè),完成作業(yè)中間過程處理。

2)Action行動算子:這類算子會觸發(fā)SparkContext提交Job作業(yè)。

下面分別對兩類算子進(jìn)行詳細(xì)介紹。

l.Transformations算子

下文將介紹常用和較為重要的Transformation算子。

(1)map

將原來RDD的每個數(shù)據(jù)項通過map中的用戶自定義函數(shù)f映射轉(zhuǎn)變?yōu)橐粋€新的元素。源

碼中map算子相當(dāng)于初始化一個RDD,新RDD叫做MappedRDD(this,sc.clean

(f))o

圖1-7中每個方框表示一個RDD分區(qū),左側(cè)的分區(qū)經(jīng)過用戶自定義函數(shù)f:T->U映射為

右側(cè)的新RDD分區(qū)。但是,實際只有等到Action算子觸發(fā)后這個f函數(shù)才會和其他函數(shù)

在一個stage中對數(shù)據(jù)進(jìn)行運算。在圖1-6中的第一個分區(qū),數(shù)據(jù)記錄VI輸入f,通過f

轉(zhuǎn)換輸出為轉(zhuǎn)換后的分區(qū)中的數(shù)據(jù)記錄VI。

(2)flatMap

將原來RDD中的每個元素通過函數(shù)f轉(zhuǎn)換為新的元素,并將生成的RDD的每個集合中的

元素合并為一個集合,內(nèi)部創(chuàng)建FlatMappedRDD(this,sc.clean(f))。

in=4iri

IZMZI

圖1-6map算子對RDD轉(zhuǎn)換

圖1-7表示RDD的一個分區(qū)進(jìn)行flatMap函數(shù)操作,flatMap中傳入的函數(shù)為f:T->U,

T和U可以是任意的數(shù)據(jù)類型。將分區(qū)中的數(shù)據(jù)通過用戶自定義函數(shù)f轉(zhuǎn)換為新的數(shù)據(jù)。

外部大方框可以認(rèn)為是一個RDD分區(qū),小方框代表一個集合。VI、V2、V3在一個集合

作為RDD的一個數(shù)據(jù)項,可能存儲為數(shù)組或其他容器,轉(zhuǎn)換為V1、V'2、V3后,將原

來的數(shù)組或容器結(jié)合拆散,拆散的數(shù)據(jù)形成為RDD中的數(shù)據(jù)項。

圖1-7flapMap算子對RDD轉(zhuǎn)換

(3)mapPartitions

mapPartitions函數(shù)獲取到每個分區(qū)的迭代器,在函數(shù)中通過這個分區(qū)整體的迭代器對整

個分區(qū)的元素進(jìn)行操作。內(nèi)部實現(xiàn)是生成MapPartitionsRDD。圖1-8中的方框代表一個

RDD分區(qū)。

圖1-8中,用戶通過函數(shù)f(iter)=>iter.filter(_>=3)對分區(qū)中所有數(shù)據(jù)進(jìn)行過濾,大

于和等于3的數(shù)據(jù)保留。一個方塊代表一個RDD分區(qū),含有1、2、3的分區(qū)過濾只剩下

元素3o

n-P^r~i

EMU

圖1-8mapPartitions算子對RDD轉(zhuǎn)換

(4)union

使用union函數(shù)時需要保證兩個RDD元素的數(shù)據(jù)類型相同,返回的RDD數(shù)據(jù)類型和被

合并的RDD元素數(shù)據(jù)類型相同。并不進(jìn)行去重操作,保存所有元素,如果想去重可以使

用distinct()<>同時Spark還提供更為簡潔的使用union的API,通過++符號相當(dāng)于

union函數(shù)操作。

圖1-9中左側(cè)大方框代表兩個RDD,大方框內(nèi)的小方框代表RDD的分區(qū)。右側(cè)大方框代

表合并后的RDD,大方框內(nèi)的小方框代表分區(qū)。合并后,VI、V2、V3V8形成一個分

區(qū),其他元素同理進(jìn)行合并。

(5)cartesian

對兩個RDD內(nèi)的所有元素進(jìn)行笛卡爾積操作。操作后,內(nèi)部實現(xiàn)返回CartesianRDD。圖

1-10中左側(cè)大方框代表兩個RDD,大方框內(nèi)的小方框代表RDD的分區(qū)。右側(cè)大方框代表

合并后的RDD,大方框內(nèi)的小方框代表分區(qū)。

例如:VI和另一個RDD中的Wl、W2、Q5進(jìn)行笛卡爾積運算形成(VI,Wl)、(VI,

W2)、(VI,Q5)。

圖1-9union算子對RDD轉(zhuǎn)換

圖1-10cartesian算子對RDD轉(zhuǎn)換

(6)groupBy

groupBy:將元素通過函數(shù)生成相應(yīng)的Key,數(shù)據(jù)就轉(zhuǎn)化為Key-Value格式,之后將Key

相同的元素分為一組。

函數(shù)實現(xiàn)如下:

1)將用戶函數(shù)預(yù)處理:

valcleanF=sc.clean(f)

2)對數(shù)據(jù)map進(jìn)行函數(shù)操作,最后再進(jìn)行g(shù)roupByKey分組操作。

this.mapft=>(cleanF(t),tJ).groupByKey(p)

其中,P確定了分區(qū)個數(shù)和分區(qū)函數(shù),也就決定了并行化的程度。

圖1-11中方框代表一個RDD分區(qū),相同key的元素合并到一個組。例如VI和V2合并

為V,Value為VI,V2。形成V,Seq(VI,V2)。

圖1-11groupBy算子對RDD轉(zhuǎn)換

(7)filter

filter函數(shù)功能是對元素進(jìn)行過濾,對每個元素應(yīng)用f函數(shù),返回值為true的元素在RDD

中保留,返回值為false的元素將被過濾掉。內(nèi)部實現(xiàn)相當(dāng)于生成FilteredRDD(this,

sc.clean(f))。

下面代碼為函數(shù)的本質(zhì)實現(xiàn):

deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))

圖1-12中每個方框代表一個RDD分區(qū),T可以是任意的類型。通過用戶自定義的過濾函

數(shù)f,對每個數(shù)據(jù)項操作,將滿足條件、返回結(jié)果為true的數(shù)據(jù)項保留。例如,過濾掉

V2和V3保留了VI,為區(qū)分命名為VI。

(8)sample

sample將RDD這個集合內(nèi)的元素進(jìn)行采樣,獲取所有元素的子集。用戶可以設(shè)定是否有

放回的抽樣、百分比、隨機(jī)種子,進(jìn)而決定采樣方式。

內(nèi)部實現(xiàn)是生成SampledRDD(withReplacement,fraction,seed)。

函數(shù)參數(shù)設(shè)置:

?withReplacement=true,表示有放回的抽樣。

?withReplacement=false,表示無放回的抽樣。

圖1-13中的每個方框是一個RDD分區(qū)。通過sample函數(shù),采樣50%的數(shù)據(jù)。VI、V2、

UI、U2U4采樣出數(shù)據(jù)VI和UI、U2形成新的RDD。

f:T->Boolean

VI

V2

V3

U1

U2

U3

U4

圖1-12filter算子對RDD轉(zhuǎn)換

fraction=O.5,sccd==9

UI

U2

U3

U4

圖1-13sample算子對RDD轉(zhuǎn)換

(9)cache

cache將RDD元素從磁盤緩存到內(nèi)存。相當(dāng)于persist(MEMORY_ONLY)函數(shù)的功能。

EDISKMUMEMORY

nn^nn

圖1-14Cache算子對RDD轉(zhuǎn)換

圖1-14中每個方框代表一個RDD分區(qū),左側(cè)相當(dāng)于數(shù)據(jù)分區(qū)都存儲在磁盤,通過cache

算子將數(shù)據(jù)緩存在內(nèi)存。

(10)persist

persist函數(shù)對RDD進(jìn)行緩存操作。數(shù)據(jù)緩存在哪里依據(jù)StorageLevel這個枚舉類型進(jìn)行

確定。有以下幾種類型的組合(見圖1?14),DISK代表磁盤,MEMORY代表內(nèi)存,SER

代表數(shù)據(jù)是否進(jìn)行序列化存儲。

下面為函數(shù)定義,StorageLevel是枚舉類型,代表存儲模式,用戶可以通過圖1?14按需

進(jìn)行選擇。

persist(newLevel:StorageLevel]

圖1-15中列出persist函數(shù)可以進(jìn)行緩存的模式。例如,MEMORY_AND_DISK_SER代表

數(shù)據(jù)可以存儲在內(nèi)存和磁盤,并且以序列化的方式存儲,其他同理。

valDISK_OMLY:StorageLevel

valDISK__ONLY_2:StorageLevel

vallEMORY__Ain)_DISK:StorageLevel

vallE10RY_A!ro_DISK_2:StorageLevel

val■E1ORY_A!TO_DISK_SER:StorageLevel

valMEMORY_AND_DISK_SER_2:StoraaeL已uel

val!E10RY_0!iLY:StorazeLevel

val1EMORY_OBLY_2:StorageLevel

val1E1ORY_ONLY_SER:StorageLevel

val1EMORY_ONLY_SER_2:StorageLevel

valNONE:StorageLevel

valOFF_HEAP:StorageLevel

圖1-15persist算子對RDD轉(zhuǎn)換

圖1-16中方框代表RDD分區(qū)。disk代表存儲在磁盤,mem代表存儲在內(nèi)存。數(shù)據(jù)最初

全部存儲在磁盤,通過persist(MEMORY_AND_DISK)將數(shù)據(jù)緩存到內(nèi)存,但是有的分

區(qū)無法容納在內(nèi)存,將含有VI、V2、V3的分區(qū)存儲到磁盤。

(11)mapValues

mapValues:針對(Key,Value)型數(shù)據(jù)中的Value進(jìn)行Map操作,而不對Key進(jìn)行處

理。

圖1-17中的方框代表RDD分區(qū)。a=>a+2代表對(VI,1)這樣的KeyValue數(shù)據(jù)對,

數(shù)據(jù)只對Value中的1進(jìn)行加2操作,返回結(jié)果為3。

Pcn;ist(MEMORY_AND_D!SK.)

V2(disk)[V2(disk)

V3(disk)JV3(disk)|

UI(disk)|UI(mein)

U2(disk)I</z|U2(mem)

圖1-16Persist算子對RDD轉(zhuǎn)換

mapValues(a=>a+2)

圖1-17mapValues算子RDD對轉(zhuǎn)換

(12)combineByKey

下面代碼為combineByKey函數(shù)的定義:

combineByKey[C](createCombiner:(V]C,mergeValue:(C,VJC,mergeCombiners:(C,C)C,

partitioneriPartitioner,mapSideCombine:Boolean=true,

serializer:Serializer=null):RDD[(K,C)]

說明:

?createCombiner:V=>C,C不存在的情況下,比如通過V創(chuàng)建seqCo

?mergeValue:(C,V)=>C,當(dāng)C已經(jīng)存在的情況下,需要merge,比如把itemV加到

seqC中,或者疊加。

?mergeCombiners:(C,C)=>C,合并兩個C。

?partitioner:Partitioner,Shuffle時需要的Partitionero

?mapSideCombine:Boolean=true,為了減小傳輸量,很多combine可以在map端先做,

比如疊加,可以先在一個partition中把所有相同的key的value疊加,再shuffle<>

?serializerClass:String=null,傳輸需要序列化,用戶可以自定義序列化類:

例如,相當(dāng)于將元素為(Int,Int)的RDD轉(zhuǎn)變?yōu)榱?Int,Seq[Int])類型元素的RDD。

圖1?18中的方框代表RDD分區(qū)。如圖,通過combineByKey,將(VI,2),(VI,1)

數(shù)據(jù)合并為(VI,Seq(2,1))。

(13)reduceByKey

reduceByKey是比combineByKey更簡單的一種情況,只是兩個值合并成一個值,(Int,

IntV)to(Int,IntC),比如疊加。所以createCombinerreduceBykey彳艮簡單,就是直

接返回v,而mergeValue和mergeCombiners邏輯是相同的,沒有區(qū)別。

combineByKey

U「以有多種實現(xiàn).此處是groupByKey的實現(xiàn)

Vl,Scpf2,l)

V2,Scp(2)

V3,Scp(l)

圖1-18comBineByKey算子對RDD轉(zhuǎn)換

函數(shù)實現(xiàn):

defreduceByKeyfpartitioner:Partitioner,func:(V,V)=>V):RDD[(K,V)]=

{combineByKey[V]([v:V)=>v,func,func,partitioner)}

圖1-19中的方框代表RDD分區(qū)。通過用戶自定義函數(shù)(A,B)=>(A+B)函數(shù),將相

同key的數(shù)據(jù)(VI,2)和(VI,1)的value相加運算,結(jié)果為(VI,3)。

reduceByKc((A,B)=>(A+B))

圖1-19reduceByKey算子對RDD轉(zhuǎn)換

(14)join

join對兩個需要連接的RDD進(jìn)行cogroup函數(shù)操作,將相同key的數(shù)據(jù)能夠放到一個分

區(qū),在cogroup操作之后形成的新RDD對每個key下的元素進(jìn)行笛卡爾積的操作,返回

的結(jié)果再展平,對應(yīng)key下的所有元組形成一個集合。最后返回RDD[(K,(V,

W))]o

下面代碼為join的函數(shù)實現(xiàn),本質(zhì)是通過cogroup算子先進(jìn)行協(xié)同劃分,再通過

flatMapValues將合并的數(shù)據(jù)打散。

this.cogroup(other,partitioner).f?latMapValues{case(vs,ws)=>for(v<-vs;w>-ws)yield(v,w)}

圖1-20是對兩個RDD的join操作示意圖。大方框代表RDD,小方框代表RDD中的分區(qū)。

函數(shù)對相同key的元素,如VI為key做連接后結(jié)果為(VI,(1,1))和(VI,(1,

2))o

2.Actions算子

本質(zhì)上在Action算子中通過SparkContext進(jìn)行了提交作業(yè)的runjob操作,觸發(fā)了RDD

DAG的執(zhí)行。

圖1-20join算子對RDD轉(zhuǎn)換

例如,Action算子collect函數(shù)的代碼如下,感興趣的讀者可以順著這個入口進(jìn)行源碼剖

析:

/***ReturnanarraythatcontainsalloftheelementsinthisRDD.*/defcollect():Array[T]

={/*提交Job*/valresults=sc.runjob(this,(iter:Iterator[T]]=>iter.toArray)

Array.concat(results:_*)}

下面將介紹常用和較為重要的Action算子。

(1)foreach

foreach對RDD中的每個元素都應(yīng)用f函數(shù)操作,不返回RDD和Array,而是返回Uinto

圖1-21表示foreach算子通過用戶自定義函數(shù)對每個數(shù)據(jù)項進(jìn)行操作。本例中自定義函

數(shù)為printin(),控制臺打印所有數(shù)據(jù)項。

Foreach(_>printin(_))

、U'l

7U,2

圖1-21foreach算子對RDD轉(zhuǎn)換

(2)saveAsTextFile

函數(shù)將數(shù)據(jù)輸出,存儲到HDFS的指定目錄。

下面為saveAsTextFile函數(shù)的內(nèi)部實現(xiàn),其內(nèi)部通過調(diào)用saveAsHadoopFile進(jìn)行實現(xiàn):

this.mapfx=>(NullWritable.getQ,new

Text(x.toString))),saveAsHadoopFile[TextOutputFormat[NullWritable,Text]](path)

將RDD中的每個元素映射轉(zhuǎn)變?yōu)?null,x.toString),然后再將其寫入HDFS。

圖1-22中左側(cè)方框代表RDD分區(qū),右側(cè)方框代表HDFS的Block。通過函數(shù)將RDD的

每個分區(qū)存儲為HDFS中的一個Blocko

(3)collect

collect相當(dāng)于toArray,toArray已經(jīng)過時不推薦使用,collect將分布式的RDD返回為一

個單機(jī)的scalaArray數(shù)組。在這個數(shù)組上運用scala的函數(shù)式操作。

圖1-23中左側(cè)方框代表RDD分區(qū),右側(cè)方框代表單機(jī)內(nèi)存中的數(shù)組。通過函數(shù)操作,

將結(jié)果返回到Driver程序所在的節(jié)點,以數(shù)組形式存儲。

RDDHDFS

m=^in20000

|2|__粕|Part-00001

圖1-22saveAsHadoopFile算子對RDD轉(zhuǎn)換

collect()

圖1-23Collect算子對RDD轉(zhuǎn)換

(4)count

count返回整個RDD的元素個數(shù)。

內(nèi)部函數(shù)實現(xiàn)為:

defcountO:Long=sc.runJob(this,Utils.getIteratorSize_).sum

圖1-24中,返回數(shù)據(jù)的個數(shù)為5。一個方塊代表一個RDD分區(qū)。

圖1-24count對RDD算子轉(zhuǎn)換

1.5本章小結(jié)

本章首先介紹了Spark分布式計算平臺的基本概念、原理以及Spark生態(tài)系統(tǒng)BDAS之

上的典型組件。Spark為用戶提供了系統(tǒng)底層細(xì)節(jié)透明、編程接口簡潔的分布式計算平臺。

Spark具有內(nèi)存計算、實時性高、容錯性好等突出特點。同時本章介紹了Spark的計算模

型,Spark會將應(yīng)用程序整體翻譯為一個有向無環(huán)圖進(jìn)行調(diào)度和執(zhí)行。相比MapReduce,

Spark提供了更加優(yōu)化和復(fù)雜的執(zhí)行流。讀者還可以深入了解Spark的運行機(jī)制與Spark

算子,這樣能更加直觀地了解API的使用。Spark提供了更加豐富的函數(shù)式算子,這樣就

為Spark上層組件的開發(fā)奠定了堅實的基礎(chǔ)。

相信讀者已經(jīng)想了解如何開發(fā)Spark程序,接下來將就Spark的開發(fā)環(huán)境配置進(jìn)行闡述。

第2章Spark開發(fā)與環(huán)境配:閆

用戶進(jìn)行Spark應(yīng)用程序開發(fā),一般在用戶本地進(jìn)行單機(jī)開發(fā)調(diào)試,之后再將作業(yè)提交

到集群生產(chǎn)環(huán)境中運行。下面將介紹Spark開發(fā)環(huán)境的配置,如何編譯和進(jìn)行源碼閱讀

環(huán)境的配置。

用戶可以在官網(wǎng)上下載最新的AS軟件包,網(wǎng)址為:/o

2.1Spark應(yīng)用開發(fā)環(huán)境配置

Spark的開發(fā)可以通過Intel?或者EclipseIDE進(jìn)行,在環(huán)境配置的開始階段,還需要安

裝相應(yīng)的Scala插件。

2.1.1使用Intellij開發(fā)Spark程序

本節(jié)介紹如何使用IntellijIDEA構(gòu)建Spark開發(fā)環(huán)境和源碼閱讀環(huán)境。由于Intellij對

Scala的支持更好,目前Spark開發(fā)團(tuán)隊主要使用Intellij作為開發(fā)環(huán)境。

1.配置開發(fā)環(huán)境

(1)安裝JDK

用戶可以自行安裝JDK8。官網(wǎng)地址:

/technetwork/java/javase/downloads/index.htmlo

下載后,如果在Windows下直接運行安裝程序,會自動配置環(huán)境變量,安裝成功后,在

CMD的命令行下輸入Java,有Java版本的日志信息提示則證明安裝成功。

如果在Linux下安裝,下載JDK包解壓縮后,還需要配置環(huán)境變量。

在/etc/profile文件中,配置環(huán)境變量:

exportJAVA_HOME=/usr/java/jdkl.8exportJAVA_BlN=/usr/java/jdkl.8/binexport

PATH=$PATH:$JAVA_HOME/binexport

CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$]AVA_HOME/lib/tools.jarexportJAVA_HOME

JAVA_BINPATHCLASSPATH

(2)安裝Scala

Spark內(nèi)核采用Scala進(jìn)行開發(fā),上層通過封裝接口提供Java和Python的API,在進(jìn)行

開發(fā)前需要配置好Scala的開發(fā)包。

Spark對Scala的版本有約束,用戶可以在Spark的官方下載界面看到相應(yīng)的Scala版本

號。下載指定的Scala包,官網(wǎng)地址:http://www.scala-lang.Org/download/o

(3)安裝IntellijIDEA

用戶可以下載安裝最新版本的Intellij,官網(wǎng)地址:

http://www.jetbrains.eom/idea/download/o

目前Intellij最新的版本中已經(jīng)可以支持新建SBT工程,安裝Scala插件,可以很好地支

持Scala開發(fā)。

(4)Intellij中安裝Scala插件

在Intellij菜單中選擇"Configure",在下拉菜單中選擇"Plugins",再選擇"Browse

repositories",輸入"Scala”搜索插件(如圖2-1所示),在彈出的對話框中單擊"install"按

鈕,重啟Intellijo

2.配置Spark應(yīng)用開發(fā)環(huán)境

1)用戶在IntellijIDEA中創(chuàng)建ScalaProject,SparkTesto

2)選擇菜單中的"File"-?"projectstructure"->"Libraries”命令,單擊導(dǎo)入"spark-

,,

assembly_2.10-1.0.0-incubating-hadoop2.2.0.jaro

只需導(dǎo)入該jar包,該包可以通過在Spark的源碼工程下執(zhí)行"sbt/sbtassembly"命令生成,

這個命令相當(dāng)于將Spark的所有依賴包和Spark源碼打包為一個整體。

在"assembly/target/scala-2.10.4/"目錄下生成:spark-assembly-1.0.O-incubating-

hadoop2.2.0.jaro

3)如果IDE無法識別Scala庫,則需要以同樣方式將Scala庫的jar包導(dǎo)入。之后就可以

開始開發(fā)Spark程序。如圖2-2所示,本例將Spark默認(rèn)的示例程序SparkPi復(fù)制到文件。

圖2-1輸入"Scala"搜索插件

圖2-2編寫程序

3.運行Spark程序

(1)本地運行

編寫完scala程序后,可以直接在Intellij中,以本地Local模式運行(如圖2-3所示),

方法如下。

圖2-3以local模式運行

在Intellij中的選擇"Run"T"DebugConfiguration"->''EditConfigurations”命令。在

"Programarguments"文本框中輸入main函數(shù)的輸入?yún)?shù)local。然后右鍵選擇需要運行

的類,單擊“Run”按鈕運行。

(2)集群上運行Spark應(yīng)用jar包

如果想把程序打成jar包,通過命令行的形式運行在Spark集群中,并按照以下步驟操作。

1)選擇"File"—"ProjectStructure”,在彈出的對話框中選擇"Artifact"->"Jar"->"From

Moduleswithdependencies”命令。

2)在選擇"FromModuleswithdependencies"之后彈出的對話框中,選擇Main函數(shù),同

時選擇輸出jar位置,最后單擊“0K”按鈕。

具體如圖2-4?圖2-6所示。

在圖2-5中選擇需要執(zhí)行的Main函數(shù)。

在圖2-6界面選擇依賴的jar包。

圖2-4生成jar包第一步

圖2-5生成jar包第二步

圖2-6生成jar包第三步

在主菜單選擇"Build”1"BuildArtifact”命令,編譯生成jar包。

3)將生成的jar包SparkTest.jar在集群的主節(jié)點,通過下面命令執(zhí)行:

java-jarSparkTestjar

用戶可以通過上面的流程和方式通過Intellij作為集成開發(fā)環(huán)境進(jìn)行Spark程序的開發(fā)。

2.1.2使用SparkShell進(jìn)行交互式數(shù)據(jù)分析

如果是運行SparkShell,那么會默認(rèn)創(chuàng)建一個SparkContext,命名為sc,所以不需要在

SparkShell創(chuàng)建新的SparkContext,SparkContext是應(yīng)用程序的上下文,調(diào)度整個應(yīng)用

并維護(hù)元數(shù)據(jù)信息。在運行SparkShell之前,可以設(shè)定參數(shù)MASTER,將Spark應(yīng)用提

交到MASTER指向的相應(yīng)集群或者本地模式執(zhí)行,集群方式運行的作業(yè)將會分布式地運

行,本地模式執(zhí)行的作業(yè)將會通過單機(jī)多線程方式運行??梢酝ㄟ^參數(shù)ADDJARS把

JARS添加到classpath,用戶可以通過這種方式添加所需的第三方依賴庫。

如果想spakr-sheli在本地4核的CPU運行,需要如下方式啟動:

$MASTER=local[4]./spark-shell

這里的4是指啟動4個工作線程。

如果要添加JARS,代碼如下:

$MASTER=local[4]ADD_JARS=code.jar./spark-shell

在spark-shell中,輸入下面代碼,讀取dir文件:

scala>valtext=sc.textFile("dir")

輸出文件中有多少數(shù)據(jù)項,則可用:

scala>text.count

按鍵,即可運行程序。

通過以上介紹,用戶可以了解如何使用SparkShell進(jìn)行交互式數(shù)據(jù)分析。

對于邏輯較為復(fù)

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論