大數(shù)據(jù)導論思維、技術與應用第12章SPARKSQL課件_第1頁
大數(shù)據(jù)導論思維、技術與應用第12章SPARKSQL課件_第2頁
大數(shù)據(jù)導論思維、技術與應用第12章SPARKSQL課件_第3頁
大數(shù)據(jù)導論思維、技術與應用第12章SPARKSQL課件_第4頁
大數(shù)據(jù)導論思維、技術與應用第12章SPARKSQL課件_第5頁
已閱讀5頁,還剩57頁未讀 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權,請進行舉報或認領

文檔簡介

1、 大數(shù)據(jù)導論第十二章CONTENTS目錄PART 01 SPARK SQL簡介PART 02 SPARK SQL執(zhí)行流程PART 03 基礎數(shù)據(jù)模型DATAFRAMEPART 04 使用Spark SQL的方式PART 05 SPARK SQL數(shù)據(jù)源PART 06 SPARK SQL CLI介紹PART 07在Pyspark中使用Spark SQLPART 08 在Java中連接Spark SQLPART 09 習題PART 01 Spark SQL簡介Spark SQL是一個用來處理結(jié)構(gòu)化數(shù)據(jù)的Spark組件,為Spark提供了查詢結(jié)構(gòu)化數(shù)據(jù)的能力。Spark SQL可被視為一個分布式的SQ

2、L查詢引擎,可以實現(xiàn)對多種數(shù)據(jù)格式和數(shù)據(jù)源進行SQL操作,包括Parquet,Hive,MongoDB,JSON、HDFS、JDBC、S3和RDD等。Spark SQL簡介Spark SQL介紹:Spark SQL是為了處理結(jié)構(gòu)化數(shù)據(jù)的一個Spark 模塊。不同于Spark RDD的基本API,Spark SQL接口擁有更多關于數(shù)據(jù)結(jié)構(gòu)本身與執(zhí)行計劃等更多信息。在Spark內(nèi)部,Spark SQL可以利用這些信息更好地對操作進行優(yōu)化。Spark SQL提供了三種訪問接口:SQL,DataFrame API和Dataset API。當計算引擎被用來執(zhí)行一個計算時,有不同的API和語言種類可供選擇

3、。這種統(tǒng)一性意味著開發(fā)人員可以來回輕松切換各種最熟悉的API來完成同一個計算工作。Spark SQL簡介Spark SQL具有如下特點數(shù)據(jù)兼容方面:能加載和查詢來自各種來源的數(shù)據(jù)。 性能優(yōu)化方面:除了采取內(nèi)存列存儲、代碼生成等優(yōu)化技術外,還引進成本模型對查詢進行動態(tài)評估、獲取最佳物理計劃等; 組件擴展方面:無論是SQL的語法解析器、分析器還是優(yōu)化器都可以重新定義,進行擴展。標準連接:Spark SQL包括具有行業(yè)標準JDBC和ODBC連接的服務器模式。Spark SQL簡介Spark SQL具有如下特點集成:無縫地將SQL查詢與Spark程序混合。 Spark SQL允許將結(jié)構(gòu)化數(shù)據(jù)作為Spa

4、rk中的分布式數(shù)據(jù)集(RDD)進行查詢,在Python,Scala和Java中集成了API。這種緊密的集成使得SQL查詢以及復雜的分析算法可以輕松地運行??蓴U展性:對于交互式查詢和長查詢使用相同的引擎。Spark SQL利用RDD模型來支持查詢?nèi)蒎e,使其能夠擴展到大型作業(yè),不需擔心為歷史數(shù)據(jù)使用不同的引擎。PART 02 Spark SQL執(zhí)行流程Spark SQL執(zhí)行流程類似于關系型數(shù)據(jù)庫,Spark SQL語句也是由Projection(a1,a2,a3)、 Data Source (tableA)、 Filter(condition)三部分組成,分別對應SQL查詢過程中的Result、D

5、ata Source、 Operation,也就是說SQL語句按Result-Data Source-Operation的次序來描述的。Spark SQL執(zhí)行流程解析(Parse)對讀入的SQL語句進行解析,分辨出SQL語句中哪些詞是關鍵詞(如SELECT、 FROM、WHERE),哪些是表達式、哪些是 Projection、哪些是 Data Source 等,從而判斷SQL語句是否規(guī)范; 綁定(Bind)將SQL語句和數(shù)據(jù)庫的數(shù)據(jù)字典(列、表、視圖等)進行綁定,如果相關的Projection、Data Source等都存在,則這個SQL語句是可以執(zhí)行的; Spark SQL執(zhí)行流程優(yōu)化(Op

6、timize)一般的數(shù)據(jù)庫會提供幾個執(zhí)行計劃,這些計劃一般都有運行統(tǒng)計數(shù)據(jù),數(shù)據(jù)庫會在這些計劃中選擇一個最優(yōu)計劃; 執(zhí)行(Execute)按Operation-Data Source-Result 的次序來執(zhí)行計劃。在執(zhí)行過程有時候甚至不需要讀取物理表就可以返回結(jié)果,比如重新運行剛運行過的SQL語句,可能直接從數(shù)據(jù)庫的緩沖池中獲取返回結(jié)果。PART 03 基礎數(shù)據(jù)模型DataFrameDataFrame是由“命名列”(類似關系表的字段定義)所組織起來的一個分布式數(shù)據(jù)集合,可以把它看成是一個關系型數(shù)據(jù)庫的表?;A數(shù)據(jù)模型DataFrameDataFrame是Spark SQL的核心,它將數(shù)據(jù)保存

7、為行構(gòu)成的集合,行對應列有相應的列名。DataFrame與RDD的主要區(qū)別在于,DataFrame帶有Schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型。這使得Spark SQL可以掌握更多的結(jié)構(gòu)信息,從而能夠?qū)ataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進行了針對性的優(yōu)化,最終達到大幅提升運行時效率的目標?;A數(shù)據(jù)模型DataFrameDataFrame與RDD的對比:PART 04 使用Spark SQL的方式使用Spark SQL的方式使用Spark SQL,首先利用sqlContext從外部數(shù)據(jù)源加載數(shù)據(jù)為DataFrame;然

8、后,利用DataFrame上豐富的API進行查詢、轉(zhuǎn)換;最后,將結(jié)果進行展現(xiàn)或存儲為各種外部數(shù)據(jù)形式。SparkSQL為Spark提供了查詢結(jié)構(gòu)化數(shù)據(jù)的能力,查詢時既可以使用SQL也可以使用DataFrameAPI(RDD)。通過Thrift Server,SparkSQL支持多語言編程包括Java、Scala、Python及R。使用Spark SQL的方式使用Spark SQL的方式加載數(shù)據(jù). 從Hive中的users表構(gòu)造DataFrame:users = sqlContext.table(users). 加載S3上的JSON文件:logs = sqlContext.load(s3n:/p

9、ath/to/data.json, json). 加載HDFS上的Parquet文件:clicks = sqlContext.load(hdfs:/path/to/data.parquet, parquet)使用Spark SQL的方式加載數(shù)據(jù). 通過JDBC訪問MySQL:comments = sqlContext.jdbc(jdbc:mysql:/localhost/comments, user). 將普通RDD轉(zhuǎn)變?yōu)镈ataFrame:rdd = sparkContext.textFile(“article.txt”) .flatMap(_.split( ) .map(_, 1) .re

10、duceByKey(_+_) wordCounts = sqlContext.createDataFrame(rdd, word, count)使用Spark SQL的方式加載數(shù)據(jù). 將本地數(shù)據(jù)容器轉(zhuǎn)變?yōu)镈ataFrame:data = (Alice, 21), (Bob, 24)people = sqlContext.createDataFrame(data, name, age). 將PandasDataFrame轉(zhuǎn)變?yōu)镾parkDataFrame(PythonAPI特有功能):sparkDF=sqlContext.createDataFrame(pandasDF)使用Spark SQL的

11、方式使用DataFrame. 創(chuàng)建一個只包含年輕用戶的DataFrame :young = users.filter(users.age 21) . 也可以使用Pandas風格的語法: young = usersusers.age = 13 AND age = 19)teenagers.show()Parquet文件數(shù)據(jù)源JSON DataSets 數(shù)據(jù)源JSON DataSets 數(shù)據(jù)源Spark SQL可以自動根據(jù)JSON DataSet的格式把其上載為DataFrame。用路徑指定JSON dataset;路徑下可以是一個文件,也可以是多個文件:sc = spark.sparkConte

12、xtpath = examples/src/main/resources/people.jsonpeopleDF = spark.read.json(path)使用的結(jié)構(gòu)可以調(diào)用printSchema()方法打?。簆eopleDF.printSchema()利用DataFrame創(chuàng)建一個臨時表:使用Spark的sql方法進行SQL查詢:peopleDF.createOrReplaceTempView(people)teenagerNamesDF = spark.sql(SELECT name FROM people WHERE age BETWEEN 13 AND 19)teenagerNam

13、esDF.show()JSON DataSets 數(shù)據(jù)源JSON dataset的DataFrame也可以是RDDString 格式,每個JSON對象為一個string:jsonStrings = name:Yin,address:city:Columbus,state:OhiootherPeopleRDD = sc.parallelize(jsonStrings)otherPeople = spark.read.json(otherPeopleRDD)otherPeople.show()JSON DataSets 數(shù)據(jù)源Hive表數(shù)據(jù)源Hive表數(shù)據(jù)源Spark SQL支持對Hive中的數(shù)據(jù)

14、進行讀寫。首先創(chuàng)建一個支持Hive的SparkSession對象,包括與Hive metastore的連接,支持Hive的序列化和反序列化操作,支持用戶定義的Hive操作等。warehouse_location = abspath(spark-warehouse)spark = SparkSession .builder .appName(Python Spark SQL Hive integration example) .config(spark.sql.warehouse.dir, warehouse_location) .enableHiveSupport() .getOrCreate

15、()warehouse_location 指定數(shù)據(jù)庫和表的缺省位置:Hive表數(shù)據(jù)源spark.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive)spark.sql(LOAD DATA LOCAL INPATH examples/src/main/resources/kv1.txt INTO TABLE src)基于新創(chuàng)建的SparkSession創(chuàng)建表和上載數(shù)據(jù)到表中:spark.sql(SELECT * FROM src).show()spark.sql(SELECT COUNT(*) FROM sr

16、c).show()使用HiveQL進行查詢:Hive表數(shù)據(jù)源sqlDF = spark.sql(SELECT key, value FROM src WHERE key val sqlContext = new org.apache.spark.sql.SQLContext(sc)sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext1943a343scala import sqlContext.implicits._import sqlContext.implicits._SQLContext

17、Spark SQL CLI介紹下面的操作基于一個簡單的數(shù)據(jù)文件people.json,文件的內(nèi)容如下:name:Michaelname:Andy, age:30name:Justin, age:19數(shù)據(jù)文件下面語句從本地文件people.json讀取數(shù)據(jù)創(chuàng)建DataFrame:val df = sqlContext.read.json(file:/data/people. json)df: org.apache.spark.sql.DataFrame = age: bigint, name: string創(chuàng)建DataFramesPyspark是針對Spark的Python API。Spark使

18、用py4j來實現(xiàn)Python與Java的互操作,從而實現(xiàn)使用Python編寫Spark程序。Spark也同樣提供了Pyspark,一個Spark的Python Shell,可以以交互的方式使用Python編寫Spark程序。PART 07 在Pyspark中使用Spark SQL在Pyspark中使用Spark SQL在終端上啟動PythonSparkShell:./bin/pyspark使用JSON文件作為數(shù)據(jù)源,創(chuàng)建JSON文件/home/sparksql/courses.json,并輸入下面的內(nèi)容:實例描述name:Linux, type:basic, length:10name:TCP

19、IP, type:project, length:15name:Python, type:project, length:8name:GO, type:basic, length:2name:Ruby, type:basic, length:5在Pyspark中使用Spark SQL首先使用SQLContext模塊,其作用是提供Spark SQL處理的功能。在Pyspark Shell中逐步輸入下面步驟的內(nèi)容:引入pyspark.sql中的SQLContext:from pyspark.sql import SQLContext創(chuàng)建SQLContext對象使用pyspark的SparkCont

20、ext對象,創(chuàng)建SQLContext對象:sqlContext = SQLContext(sc)在Pyspark中使用Spark SQLDataFrame對象可以由RDD創(chuàng)建,也可以從Hive表或JSON文件等數(shù)據(jù)源創(chuàng)建。創(chuàng)建DataFrame,指明來源自JSON文件:df = sqlContext.read.json(/home/shiyanlou/courses.json)創(chuàng)建DataFrame對象在Pyspark中使用Spark SQL首先打印當前DataFrame里的內(nèi)容和數(shù)據(jù)表的格式:df.select(name).show()#展示了所有的課程名df.select(name, le

21、ngth).show()#展示了所有的課程名及課程長度對DataFrame進行操作show()函數(shù)將打印出JSON文件中存儲的數(shù)據(jù)表;使用printSchema()函數(shù)打印數(shù)據(jù)表的格式。然后對DataFrame的數(shù)據(jù)進行各種操作:df.show() df.printSchema()在Pyspark中使用Spark SQLdf.filter(dftype = basic).select(name, type).show()#展示了課程類型為基礎課(basic)的課程名和課程類型df.groupBy(type).count().show()#計算所有基礎課和項目課的數(shù)量。首先需要將DataFram

22、e注冊為Table才可以在該表上執(zhí)行SQL語句:df.registerTempTable(courses)coursesRDD = sqlContext.sql(SELECT name FROM courses WHERE length = 5 and length = 10)names = coursesRDD.rdd.map(lambda p: Name: + )for name in names.collect(): print name執(zhí)行SQL語句在Pyspark中使用Spark SQLParquet是Spark SQL讀取的默認數(shù)據(jù)文件格式,把從JSON中讀取的Data

23、Frame保存為Parquet格式,只保存課程名稱和長度兩項數(shù)據(jù):df.select(name, length).write.save(/tmp/courses.parquet, format=parquet)保存 DataFrame為其他格式將創(chuàng)建hdfs:/master:9000/tmp/courses.parquet文件夾并存入課程名稱和長度數(shù)據(jù)。Spark SQL實現(xiàn)了Thrift JDBC/ODBC server,所以Java程序可以通過JDBC遠程連接Spark SQL發(fā)送SQL語句并執(zhí)行。PART 08 在Java中連接Spark SQL在Java中連接Spark SQL首先將$

24、HIVE_HOME/conf/hive-site.xml 拷貝到$SPARK_HOME/conf目錄下。另外,因為Hive元數(shù)據(jù)信息存儲在MySQL中,所以Spark在訪問這些元數(shù)據(jù)信息時需要MySQL連接驅(qū)動的支持。添加驅(qū)動的方式有三種:在$SPARK_HOME/conf目錄下的spark-defaults.conf中添加:spark.jars /opt/lib2/mysql-connector-java-5.1.26-bin.jar;可以實現(xiàn)添加多個依賴jar比較方便:spark.driver.extraClassPath /opt/lib2/mysql-connector-java-5.

25、1.26-bin.jar;設置配置在運行時添加 -jars/opt/lib2/mysql-connector-java-5.1.26-bin.jar做完上面的準備工作后,Spark SQL和Hive就繼承在一起了,Spark SQL可以讀取Hive中的數(shù)據(jù)。設置配置啟動Thrift在Spark根目錄下執(zhí)行:./sbin/start-thriftserver.sh開啟thrift服務器,它可以接受所有spark-submit的參數(shù),并且還可以接受-hiveconf 參數(shù)。不添加任何參數(shù)表示以local方式運行,默認的監(jiān)聽端口為10000 在Java中連接Spark SQL添加依賴打開Eclips

26、e用JDBC連接Hive Server2。新建一個Maven項目,在pom.xml添加以下依賴:org.apache.hivehive-jdbc1.2.1org.apache.hadoophadoop-common2.4.1在Java中連接Spark SQL添加依賴jdk.toolsjdk.tools1.6system$JAVA_HOME/lib/tools.jar在Java中連接Spark SQLJDBC連接Hive Server2的相關參數(shù):驅(qū)動:org.apache.hive.jdbc.HiveDriverurl:jdbc:hive2:/31:10000/default用戶名:hadoop (啟動thriftserver的linux用戶名)密碼:“”(默認密碼為空)JDBC連接參數(shù)在Java中連接Spark SQLimportjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.ResultSet;importjava.sql.SQLException;importjava.sql.Statement;publicclassTest1publicstaticvoidmain(Stringargs)throwsSQLExceptionStringurl=jdbc:hive2:

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論