《通信數(shù)據(jù)分析與實戰(zhàn)》課件-第七章 Spark Streaming實時計算框架_第1頁
《通信數(shù)據(jù)分析與實戰(zhàn)》課件-第七章 Spark Streaming實時計算框架_第2頁
《通信數(shù)據(jù)分析與實戰(zhàn)》課件-第七章 Spark Streaming實時計算框架_第3頁
《通信數(shù)據(jù)分析與實戰(zhàn)》課件-第七章 Spark Streaming實時計算框架_第4頁
《通信數(shù)據(jù)分析與實戰(zhàn)》課件-第七章 Spark Streaming實時計算框架_第5頁
已閱讀5頁,還剩63頁未讀 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

通信數(shù)據(jù)分析與實戰(zhàn)SparkStreaming實時計算框架第七章第1節(jié)2理解實時計算的場景熟悉常用的實時計算框架學(xué)習(xí)目標(biāo)TARGET什么是實時計算在傳統(tǒng)的數(shù)據(jù)處理流程(離線計算)中,復(fù)雜的業(yè)務(wù)處理流程會造成結(jié)果數(shù)據(jù)密集,結(jié)果數(shù)據(jù)密集則存在數(shù)據(jù)反饋不及時,若是在實時搜索的應(yīng)用場景中,需要實時數(shù)據(jù)做決策,而傳統(tǒng)的數(shù)據(jù)處理方式則并不能很好地解決問題,這就引出了一種新的數(shù)據(jù)計算——實時計算,它可以針對海量數(shù)據(jù)進行實時計算,無論是在數(shù)據(jù)采集還是數(shù)據(jù)處理中,都可以達到秒級別的處理要求。實時計算框架1.ApacheSparkStreamingApache公司開源的實時計算框架。ApacheSparkStreaming主要是把輸入的數(shù)據(jù)按時間進行切分,切分的數(shù)據(jù)塊并行計算處理,處理的速度可以達到秒級別。2.ApacheStormApache公司開源的實時計算框架,它具有簡單、高效、可靠地實時處理海量數(shù)據(jù),處理數(shù)據(jù)的速度達到毫秒級別,并將處理后的結(jié)果數(shù)據(jù)保存到持久化介質(zhì)中(如數(shù)據(jù)庫、HDFS)。實時計算框架3.ApacheFlinkApache公司開源的實時計算框架。ApacheFlink不僅可以支持離線處理,還可以支持實時處理。由于離線處理和實時處理所提供的SLA(服務(wù)等級協(xié)議)是完全不相同的,所以離線處理一般需要支持低延遲的保證,而實時處理則需要支持高吞吐,高效率的處理。4.Yahoo!S4Yahoo公司開源的實時計算平臺。Yahoo!S4是通用的、分布式的、可擴展的,并且還具有容錯和可插拔能力,供開發(fā)者輕松地處理源源不斷產(chǎn)生的數(shù)據(jù)。6小結(jié)理解實時計算的場景熟悉常用的實時計算框架通信數(shù)據(jù)分析與實戰(zhàn)SparkStreaming實時計算框架第七章第2節(jié)8知道SparkStreaming的作用掌握SparkStreaming的工作原理學(xué)習(xí)目標(biāo)TARGETSparkStreaming概述SparkStreaming是構(gòu)建在Spark上的實時計算框架,且是對SparkCoreAPI的一個擴展,它能夠?qū)崿F(xiàn)對流數(shù)據(jù)進行實時處理,并具有很好的可擴展性、高吞吐量和容錯性。SparkStreaming具有易用性、容錯性及易整合性的顯著特點。SparkStreaming概述容錯性整合性易用性SparkStreaming支持Java、Python、Scala等編程語言,可以像編寫離線程序一樣編寫實時計算的程序。

SparkStreaming在沒有額外代碼和配置的情況下,可以恢復(fù)丟失的數(shù)據(jù)。對于實時計算來說,容錯性至關(guān)重要。首先我們要明確一下Spark中RDD的容錯機制,即每一個RDD都是一個不可變的分布式可重算的數(shù)據(jù)集,其記錄著確定性的操作繼承關(guān)系(lineage),所以只要輸入數(shù)據(jù)是可容錯的,那么任意一個RDD的分區(qū)(Partition)出錯或不可用,都是可以使用原始輸入數(shù)據(jù)經(jīng)過轉(zhuǎn)換操作重新計算得出。SparkStreaming可以在Spark上運行,并且還允許重復(fù)使用相同的代碼進行批處理。也就是說,實時處理可以與離線處理相結(jié)合,進行交互式的查詢操作。11小結(jié)知道SparkStreaming的作用掌握SparkStreaming的工作原理SparkStreaming工作原理SparkStreaming支持從多種數(shù)據(jù)源獲取數(shù)據(jù),包括Kafka、Flume、Twitter、ZeroMQ、Kinesis及TCPSockets數(shù)據(jù)源。當(dāng)SparkStreaming從數(shù)據(jù)源獲取數(shù)據(jù)之后,則可以使用諸如map、reduce、join和window等高級函數(shù)進行復(fù)雜的計算處理,最后將處理結(jié)果存儲到分布式文件系統(tǒng)、數(shù)據(jù)庫中,最終利用實時Web儀表板進行展示。SparkStreaming工作原理SparkStreaming先接收實時輸入的數(shù)據(jù)流,并且將數(shù)據(jù)按照一定的時間間隔分成一批批的數(shù)據(jù),每一段數(shù)據(jù)都轉(zhuǎn)變成Spark中的RDD,接著交由Spark引擎進行處理,最后將處理結(jié)果數(shù)據(jù)輸出到外部儲存系統(tǒng)。14小結(jié)知道SparkStreaming的作用掌握SparkStreaming的工作原理通信數(shù)據(jù)分析與實戰(zhàn)SparkStreaming實時計算框架第七章第3節(jié)16知道DStream的作用知道DStream的編程模型學(xué)習(xí)目標(biāo)TARGETSpark的DStream流SparkStreaming提供了一個高級抽象的流,即DStream(離散流)。DStream表示連續(xù)的數(shù)據(jù)流,可以通過Kafka、Flume和Kinesis等數(shù)據(jù)源創(chuàng)建,也可以通過現(xiàn)有DStream的高級操作來創(chuàng)建。DStream的內(nèi)部結(jié)構(gòu)是由一系列連續(xù)的RDD組成,每個RDD都是一小段時間分隔開來的數(shù)據(jù)集。對DStream的任何操作,最終都會轉(zhuǎn)變成對底層RDDs的操作。Spark的Dstream編程模型批處理引擎SparkCore把輸入的數(shù)據(jù)按照一定的時間片(如1s)分成一段一段的數(shù)據(jù),每一段數(shù)據(jù)都會轉(zhuǎn)換成RDD輸入到SparkCore中,然后將DStream操作轉(zhuǎn)換為RDD算子的相關(guān)操作,即轉(zhuǎn)換操作、窗口操作以及輸出操作。RDD算子操作產(chǎn)生的中間結(jié)果數(shù)據(jù)會保存在內(nèi)存中,也可以將中間的結(jié)果數(shù)據(jù)輸出到外部存儲系統(tǒng)中進行保存。19小結(jié)知道DStream的作用知道DStream的編程模型通信數(shù)據(jù)分析與實戰(zhàn)SparkStreaming實時計算框架第七章第4節(jié)21熟悉Dstream的API掌握DStream的API操作學(xué)習(xí)目標(biāo)TARGETDstream的操作SparkStreaming中對DStream的轉(zhuǎn)換操作會轉(zhuǎn)變成對RDD的轉(zhuǎn)換操轉(zhuǎn)換流程如下。其中,lines表示轉(zhuǎn)換操作前的DStream,words表示轉(zhuǎn)換操作后生成的DStream。對lines做flatMap轉(zhuǎn)換操作,也就是對它內(nèi)部的所有RDD做flatMap轉(zhuǎn)換操作。Dstream的操作DStreamAPI提供的與轉(zhuǎn)換操作相關(guān)的方法方法名稱相關(guān)說明map(func)將源DStream的每個元素,傳遞到函數(shù)func中進行轉(zhuǎn)換操作,得到一個新的DStreamflatMap(func)與map()相似,但是每個輸入的元素都可以映射0或者多個輸出結(jié)果filter(func)返回一個新的DStream,僅包含源DStream中經(jīng)過func函數(shù)計算結(jié)果為true的元素repartition(numPartitions)用于指定DStream分區(qū)的數(shù)量union(otherStream)返回一個新的DStream,包含源DStream和其他DStream中的所有元素Dstream的操作DStreamAPI提供的與轉(zhuǎn)換操作相關(guān)的方法方法名稱相關(guān)說明count()統(tǒng)計源DStream中每個RDD包含的元素個數(shù),返回一個新DStreamreduce(func)使用函數(shù)func將源DStream中每個RDD的元素進行聚合操作,返回一個新DStreamcountByValue()計算DStream中每個RDD內(nèi)的元素出現(xiàn)的頻次,并返回一個新的DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素出現(xiàn)的頻次join(otherStream,[numTasks])當(dāng)被調(diào)用類型分別為(K,V)和(K,W)鍵值對的兩個DStream時,返回類型為(K,(V,W))鍵值對的一個新DStreamDstream的操作DStreamAPI提供的與轉(zhuǎn)換操作相關(guān)的方法方法名稱相關(guān)說明cogroup(otherStream,[numTasks])當(dāng)被調(diào)用的兩個DStream分別含有(K,V)和(K,W)鍵值對時,則返回一個新DStreamtransform(func)對源DStream中每個RDD應(yīng)用RDD-to-RDD函數(shù)返回一個新DStream,在DStream中做任意RDD操作updateStateByKey(func)返回一個新狀態(tài)DStream,通過在鍵的先前狀態(tài)和鍵的新值上應(yīng)用給定函數(shù)func更新每一個鍵的狀態(tài)。該操作方法被用于維護每一個鍵的任意狀態(tài)數(shù)據(jù)Transform操作DstreamAPI提供的與轉(zhuǎn)換操作相關(guān)的方法和RDDAPI有些不同,不同之處在于RDDAPI中沒有提供transform()和updateStateByKey()這兩個方法,所以下面主要針對這兩個方法進行詳細的操作講解Transform操作通過一個具體的案例來演示如何使用transform()方法將一行語句分割成多個單詞,具體步驟如下:1執(zhí)行”nc–lk9999”啟動服務(wù)端且監(jiān)聽Socke服務(wù)。并輸入數(shù)據(jù)“MyrolemodelisYumin”2創(chuàng)建名稱為”spark_chapter07”的Maven項目。并配置”pom.xml”文件,引入SparkStreaming相關(guān)依賴3創(chuàng)建TransformTest對象,實現(xiàn)將數(shù)據(jù)拆分成多個單詞的功能。Transform操作如果沒有安裝過nc,需要先執(zhí)行yuminstallnc–y進行安裝之后再執(zhí)行以上的操作Transform操作<dependencies>

<dependency>

<groupId>org.scala-lang</groupId>

<artifactId>scala-library</artifactId>

<version>2.11.8</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_2.11</artifactId>

<version>2.0.2</version>

</dependency>

<!--引入sparkStreaming依賴-->

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming_2.11</artifactId>

<version>2.0.2</version>

</dependency>

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

<version>5.1.38</version>

</dependency>

<!--引入sparkstreaming整合kafka的依賴-->

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>

<version>2.0.2</version>

</dependency>

</dependencies><build>

<sourceDirectory>src/main/scala</sourceDirectory>

<testSourceDirectory>src/test/scala</testSourceDirectory>

</build>Transform操作objectTransformTest{defmain(args:Array[String]):Unit={//1.創(chuàng)建SparkConf對象valsparkConf:SparkConf=newSparkConf().setAppName("TransformTest").setMaster("local[2]")//2.創(chuàng)建SparkContext對象,它是所有任務(wù)計算的源頭valsc:SparkContext=newSparkContext(sparkConf)

//3.設(shè)置日志級別sc.setLogLevel("WARN")//4.創(chuàng)建StreamingContext,需要兩個參數(shù),分別為SparkContext和批處理時間間隔valssc:StreamingContext=newStreamingContext(sc,Seconds(5))//5.連接socket服務(wù),需要socket服務(wù)地址、端口號及存儲級別(默認的)valdstream:ReceiverInputDStream[String]=ssc.socketTextStream("192.168.121.134",9999)//6.使用RDD-to-RDD函數(shù),返回新的DStream對象(即words),并空格切分每行valwords:DStream[String]=dstream.transform(rdd=>rdd.flatMap(_.split("")))

//7.打印輸出結(jié)果words.print()//8.開啟流式計算ssc.start()

//9.讓程序一直運行,除非人為干預(yù)停止ssc.awaitTermination()UpdateStateByKey操作UpdateStateByKey操作通過一個具體的案例來演示如何使用updateStateByKey()方法進行詞頻統(tǒng)計,具體步驟如下:1在”spark_chapter07”的項目下,創(chuàng)建UpdateStateByKeyTest2定義詞頻統(tǒng)計的方法updateFunction3創(chuàng)建SparkConfig,SparkContext,StreamingContext對象,調(diào)用updateStateByKey方法。4測試updateStateByKey功能實現(xiàn)。UpdateStateByKey操作

//newValues表示當(dāng)前批次匯總成的(word,1)中相同單詞的所有1//runningCount表示歷史的所有相同key的value總和defupdateFunction(newValues:Seq[Int],runningCount:Option[Int]):Option[Int]={valnewCount=runningCount.getOrElse(0)+newValues.sumSome(newCount)}UpdateStateByKey操作

//1.創(chuàng)建SparkConf對象valsparkConf:SparkConf=newSparkConf().setAppName("WordCount").setMaster("local[2]")

//2.創(chuàng)建SparkContext對象valsc:SparkContext=newSparkContext(sparkConf)

//3.設(shè)置日志級別sc.setLogLevel("WARN")//4.創(chuàng)建StreamingContext,需要2個參數(shù),一個是SparkContext,一個是批處理的時間間隔valssc:StreamingContext=newStreamingContext(sc,Seconds(5))

//5.配置檢查點目錄,使用updateStateByKey方法必須配置檢查點目錄ssc.checkpoint("./")//6.對接socket數(shù)據(jù)創(chuàng)建DStream對象,需要socket服務(wù)的地址、端口號及存儲級別(默認的)valdstream:ReceiverInputDStream[String]=ssc.socketTextStream("192.168.121.134",9999)

//7.按空格進行切分每一行,并將切分的單詞出現(xiàn)次數(shù)記錄為1valwordAndOne:DStream[(String,Int)]=dstream.flatMap(_.split("")).map(word=>(word,1))//8.調(diào)用updateStateByKey操作,統(tǒng)計單詞在全局中出現(xiàn)的次數(shù)varresult:DStream[(String,Int)]=wordAndOne.updateStateByKey(updateFunction)//9.打印輸出結(jié)果result.print()

//10.開啟流式計算ssc.start()//11.讓程序一直運行,除非人為干預(yù)停止ssc.awaitTermination()UpdateStateByKey操作4測試updateStateByKey功能實現(xiàn)。

//

1.在hadoop01開啟nc服務(wù),并輸入測試單詞:nc–lk9999sparkupdatezhongnanshanshenjilanshenjilanzhongnanshanzhangfuqingyuminzhongnanshanyumin

//2.運行UpdateStateByKeyTest的程序查看結(jié)果Dstream的窗口操作在SparkStreaming中,為DStream提供窗口操作,即在DStream流上,將一個可配置的長度設(shè)置為窗口,以一個可配置的速率向前移動窗口。根據(jù)窗口操作,對窗口內(nèi)的數(shù)據(jù)進行計算,每次落在窗口內(nèi)的RDD數(shù)據(jù)會被聚合起來計算,生成的RDD會作為WindowDStream的一個RDD。Dstream的窗口操作方法名稱相關(guān)說明window(windowLength,slideInterval)返回基于源DStream的窗口進行批計算后的一個新DStreamcountByWindow(windowLength,slideInterval)返回基于滑動窗口的DStream中的元素數(shù)reduceByWindow(func,windowLength,slideInterval)基于滑動窗口的源DStream中的元素進行聚合操作,返回一個新DStreamDstream的窗口操作方法名稱相關(guān)說明reduceByKeyAndWindow(func,windowLength,slideInterval,[numTasks])基于滑動窗口對(K,V)類型的DStream中的值,按K應(yīng)用聚合函數(shù)func進行聚合操作,返回一個新DStreamreduceByKeyAndWindow(func,invFuncwindowLength,slideInterval,[numTasks])更高效的reduceByKeyAndWindow()實現(xiàn)版本。每個窗口的聚合值,都是基于先前窗口的聚合值進行增量計算得到。該操作會對進入滑動窗口的新數(shù)據(jù)進行聚合操作,并對離開窗口歷史數(shù)據(jù)進行逆向聚合操作countByValueAndWindow(windowLength,slideInterval,[numTasks])基于滑動窗口計算源DStream中每個RDD內(nèi)每個元素出現(xiàn)的頻次,返回一個由(K,V)組成的新的DStreamDstream的窗口操作方法名稱相關(guān)說明print()在Driver中打印出DStream中數(shù)據(jù)的前10個元素saveAsTextFiles(prefix,[suffix])將DStream中的內(nèi)容以文本的形式進行保存,其中每次批處理間隔內(nèi)產(chǎn)生的文件以“prefix-TIME_IN_MS[.suffix]”的方式命名。saveAsObjectFiles(prefix,[suffix])將DStream中的內(nèi)容按對象進行序列化,并且以SequenceFile的格式保存。每次批處理間隔內(nèi)產(chǎn)生的文件以“prefix-TIME_IN_MS[.suffix]”的方式命名。輸出操作方法:Dstream的窗口操作方法名稱相關(guān)說明saveAsHadoopFiles(prefix,[suffix])將DStream中的內(nèi)容以文本的形式保存為Hadoop文件,其中每次批處理間隔內(nèi)產(chǎn)生的文件以prefix-TIME_IN_MS[.suffix]的方式命名foreachRDD(func)最基本的輸出操作,將func函數(shù)應(yīng)用于DStream中的RDD上,這個操作會輸出數(shù)據(jù)到外部系統(tǒng)輸出操作方法:Window方法操作Window()操作通過一個具體的案例來演示如何使用Window()方法輸出3個時間單位長度的數(shù)據(jù),具體步驟如下:1在”spark_chapter07”的項目下,創(chuàng)建WindowTest2創(chuàng)建SparkConfig,SparkContext,StreamingContext對象,調(diào)用Window()方法。3測試Window()功能實現(xiàn)。Window()操作2創(chuàng)建SparkConfig,SparkContext,StreamingContext對象,調(diào)用Window()方法。

//1.創(chuàng)建SparkConf對象

valsparkConf:SparkConf=newSparkConf().setAppName("WindowTest").setMaster("local[2]")//2.創(chuàng)建SparkContext對象,它是所有任務(wù)計算的源頭

valsc:SparkContext=newSparkContext(sparkConf)//3.設(shè)置日志級別

sc.setLogLevel("WARN")//4.創(chuàng)建StreamingContext,需要兩個參數(shù),分別為SparkContext和批處理時間間隔

valssc:StreamingContext=newStreamingContext(sc,Seconds(1))

//5.連接socket服務(wù),需要socket服務(wù)地址、端口號及存儲級別(默認的)valdstream:ReceiverInputDStream[String]=ssc.socketTextStream("192.168.121.134",9999)

//6.按空格進行切分每一行valwords:DStream[String]=dstream.flatMap(_.split(""))//7.調(diào)用window操作,需要兩個參數(shù),窗口長度和滑動時間間隔

valwindowWords:DStream[String]=words.window(Seconds(3),Seconds(1))

//8.打印輸出結(jié)果

windowWords.print()

//9.開啟流式計算

ssc.start()

//10.讓程序一直運行,除非人為干預(yù)停止

ssc.awaitTermination()Window()操作3測試Window()功能實現(xiàn)。

//

1.在hadoop01開啟nc服務(wù),并按每秒輸入測試數(shù)據(jù):nc–lk999912345

//2.運行WindowTest的程序查看結(jié)果Window()操作reduceByKeyAndWindow操作通過一個具體的案例來演示如何使用reduceByKeyAndWindow()方法統(tǒng)計3個時間單位內(nèi)不同字母出現(xiàn)的次數(shù),具體步驟如下:1在”spark_chapter07”的項目下,創(chuàng)建ReduceByKeyAndWindowTest2創(chuàng)建SparkConfig,SparkContext,StreamingContext對象,調(diào)用reduceByKeyAndWindow()方法。3測試reduceByKeyAndWindow功能實現(xiàn)。reduceByKeyAndWindow操作2創(chuàng)建SparkConfig,SparkContext,StreamingContext對象,調(diào)用reduceByKeyAndWindow()方法。

//1.創(chuàng)建SparkConf對象設(shè)置appName和master地址local[2]表示本地采用2個線程運行任務(wù)

valsparkConf:SparkConf=newSparkConf().setAppName("WordCount").setMaster("local[2]")

//2.創(chuàng)建SparkContext對象,它是所有任務(wù)計算的源頭,它會創(chuàng)建DAGScheduler和TaskSchedulervalsc:SparkContext=newSparkContext(sparkConf)//3.設(shè)置日志級別

sc.setLogLevel("WARN")

//4.創(chuàng)建StreamingContext,需要2個參數(shù),一個是SparkContext,一個是批處理的時間間隔

valssc:StreamingContext=newStreamingContext(sc,Seconds(1))reduceByKeyAndWindow操作2創(chuàng)建SparkConfig,SparkContext,StreamingContext對象,調(diào)用reduceByKeyAndWindow()方法。

//5.對接socket數(shù)據(jù)創(chuàng)建DStream對象,需要socket服務(wù)的地址、端口號及存儲級別(默認的)valdstream:ReceiverInputDStream[String]=ssc.socketTextStream("192.168.121.134",9999)//6.按空格進行切分每一行,并將切分的單詞出現(xiàn)次數(shù)記錄為1valwordAndOne:DStream[(String,Int)]=dstream.flatMap(_.split("")).map(word=>(word,1))//7.調(diào)用updateStateByKey操作,統(tǒng)計單詞在全局中出現(xiàn)的次數(shù)

valwindowWords:DStream[(String,Int)]=wordAndOne.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(3),Seconds(1))

//8.打印輸出結(jié)果

windowWords.print()

//9.開啟流式計算

ssc.start()

//10.讓程序一直運行,除非人為干預(yù)停止

ssc.awaitTermination()reduceByKeyAndWindow操作3測試reduceByKeyAndWindow功能實現(xiàn)。

//

1.在hadoop01開啟nc服務(wù),并按每秒輸入測試數(shù)據(jù):nc–lk9999aabbc

//2.運行ReduceByKeyAndWindowTest的程序查看結(jié)果Window()操作SaveAsTextFiles操作通過一個具體的案例來演示如何使用saveAsTextFiles()方法保存輸出的結(jié)果,具體步驟如下:1在”spark_chapter07”的項目下,創(chuàng)建SaveAsTextFilesTest2創(chuàng)建SparkConfig,SparkContext,StreamingContext對象,調(diào)用saveAsTextFiles()方法。3測試saveAsTextFiles功能實現(xiàn)。SaveAsTextFiles操作2創(chuàng)建SparkConfig,SparkContext,StreamingContext對象,調(diào)用saveAsTextFiles()方法。System.setProperty("HADOOP_USER_NAME","root")//1.創(chuàng)建SparkConf對象設(shè)置appName和master地址local[2]表示本地采用2個線程運行任務(wù)

valsparkConf:SparkConf=newSparkConf().setAppName("SaveAsTextFilesTest").setMaster("local[2]")

//2.創(chuàng)建SparkContext對象,它是所有任務(wù)計算的源頭,它會創(chuàng)建DAGScheduler和TaskSchedulervalsc:SparkContext=newSparkContext(sparkConf)//3.設(shè)置日志級別

sc.setLogLevel("WARN")//4.創(chuàng)建StreamingContext,需要2個參數(shù),一個是SparkContext,一個是批處理的時間間隔

valssc:StreamingContext=newStreamingContext(sc,Seconds(5))

SaveAsTextFiles操作2創(chuàng)建SparkConfig,SparkContext,StreamingContext對象,調(diào)用saveAsTextFiles()方法。

//5.對接socket數(shù)據(jù)創(chuàng)建DStream對象,需要socket服務(wù)的地址、端口號及存儲級別(默認的)

valdstream:ReceiverInputDStream[String]=ssc.socketTextStream("192.168.121.134",9999)//6.調(diào)用saveAsTextFiles操作,將nc窗口輸出的內(nèi)容保存到HDFS上

dstream.saveAsTextFiles("hdfs://hadoop01:9000/data/saveAsTextFiles/satf","txt")//7.開啟流式計算

ssc.start()

//8.讓程序一直運行,除非人為干預(yù)停止

ssc.awaitTermination()SaveAsTextFiles操作3測試saveAsTextFile功能實現(xiàn)。SaveAsTextFiles操作56小結(jié)熟悉Dstream的API掌握DStream的窗口操作通信數(shù)據(jù)分析與實戰(zhàn)SparkStreaming實時計算框架第七章第5節(jié)58掌握Kafka的2種創(chuàng)建Dstream方式學(xué)習(xí)目標(biāo)TARGETKafkaUtils.createDstream方式Kafka作為一個實時的分布式消息隊列,實時地生產(chǎn)和消費消息。在大數(shù)據(jù)計算框架中,可利用SparkStreaming實時讀取Kafka中的數(shù)據(jù),再進行相關(guān)計算。在Spark1.3版本后,KafkaUtils里面提供了兩個創(chuàng)建DStream的方式,一種是KafkaUtils.createDstream方式,另一種為KafkaUtils.createDirectStream方式。KafkaUtils.createDstream方式KafkaUtils.createDstream是通過Zookeeper連接Kafka,receivers接收器從Kafka中獲取數(shù)據(jù),并且所有receivers獲取到的數(shù)據(jù)都會保存在Spark

executors中,然后通過SparkStreaming啟動job來處理這些數(shù)據(jù)。createDstream方式實現(xiàn)1導(dǎo)入項目依賴2創(chuàng)建名稱為”SparkStreaming_Kafka_createDstream”類,實現(xiàn)詞頻統(tǒng)計3創(chuàng)建Topic主題,指定消息類別

4啟動KafKa的消息生產(chǎn)者,測試程序輸出SparkStreaming整合Kafka實戰(zhàn)1.導(dǎo)入依賴#添加SparkStreaming整合Kafka的依賴<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_0-8_2.11</artifactId> <version>2.3.2</version></dependency>2.創(chuàng)建Scala類,實現(xiàn)詞頻統(tǒng)計在spark_chapter07項目的/src/main/scala/cn.it.dstream目錄下,創(chuàng)建一個名為“SparkStreaming_Kafka_createDstream”的Scala類,用來編寫SparkStreaming應(yīng)用程序?qū)崿F(xiàn)詞頻統(tǒng)計。SparkStreaming整合Kafka實戰(zhàn)//1.創(chuàng)建sparkConf,并開啟wal預(yù)寫日志,保存數(shù)據(jù)源valsparkConf:SparkConf=newSparkConf().setAppName("SparkStreaming_Kafka_createDstream").setMaster("local[4]").set("spark.streaming.receiver.writeAheadLog.enable","true")//2.創(chuàng)建sparkContextvalsc=newSparkContext(sparkConf)

//3.設(shè)置日志級別sc.setLogLevel("WARN")

//3.創(chuàng)建StreamingContextvalssc=newStreamingContext(sc,Seconds(5))

//4.設(shè)置checkpointssc.checkpoint("./Kafka_Receiver")

//5

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論