第4章 Spark SQL結(jié)構(gòu)化數(shù)據(jù)處理_第1頁(yè)
第4章 Spark SQL結(jié)構(gòu)化數(shù)據(jù)處理_第2頁(yè)
第4章 Spark SQL結(jié)構(gòu)化數(shù)據(jù)處理_第3頁(yè)
第4章 Spark SQL結(jié)構(gòu)化數(shù)據(jù)處理_第4頁(yè)
第4章 Spark SQL結(jié)構(gòu)化數(shù)據(jù)處理_第5頁(yè)
已閱讀5頁(yè),還剩41頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

SparkSQL結(jié)構(gòu)化數(shù)據(jù)處理1

SparkSQL概述2 創(chuàng)建DataFrame對(duì)象的方式3

將DataFrame保存為不同格式的文件4DataFrame的常用操作5使用SparkSQL讀寫MySQL數(shù)據(jù)庫(kù)1SparkSQL概述SparkSQL簡(jiǎn)介SparkSQL是Spark用來(lái)處理結(jié)構(gòu)化數(shù)據(jù)的一個(gè)組件,可被視為一個(gè)分布式的SQL查詢引擎。SparkSQL已經(jīng)集成在PySparkShell中,在Spark2.0版本之前,通過(guò)在終端執(zhí)行pyspark命令進(jìn)入PySpark交互編程界面,啟動(dòng)后會(huì)初始化SQLContext對(duì)象為sqlContext,sqlContext對(duì)象是創(chuàng)建DataFrame對(duì)象和執(zhí)行SQL的入口。在Spark2.0版本之后,Spark使用SparkSession代替SQLContext,啟動(dòng)PySparkShell交互界面后,會(huì)初始化SparkSession對(duì)象為spark。SparkSQL可以直接處理RDD、Parquet文件或者JSON文件,甚至可以處理外部數(shù)據(jù)庫(kù)中的數(shù)據(jù)以及Hive中存在的表。SparkSQL提供了DataFrame和DataSet抽象數(shù)據(jù)模型。1SparkSQL概述DataFrame與DatasetSparkSQL所使用的數(shù)據(jù)抽象并非RDD,而是DataFrame。DataFrame是以列(列名,列類型,列值)的形式構(gòu)成的分布式的數(shù)據(jù)集。DataFrame是SparkSQL提供的最核心的數(shù)據(jù)抽象,以列的形式組織的分布式的數(shù)據(jù)集合。DataFrame的推出,讓Spark具備了處理大規(guī)模結(jié)構(gòu)化數(shù)據(jù)的能力,它不僅比原有的RDD轉(zhuǎn)化方式更加簡(jiǎn)單易用,而且獲得了更高的計(jì)算性能。DataFrame和普通的RDD的邏輯框架區(qū)別如圖所示:在RDD中,只能看出每個(gè)Row對(duì)象的類型Person類型,而在DataFrame中,可以看出每個(gè)Row對(duì)象包含Name、Age、Height三個(gè)字段。如只需處理Age那一列數(shù)據(jù)時(shí),RDD需要處理整個(gè)Person的數(shù)據(jù),而對(duì)于DataFrame,則可以只取Age這一列數(shù)據(jù)。

SparkSQL結(jié)構(gòu)化數(shù)據(jù)處理1 SparkSQL概述2 創(chuàng)建DataFrame對(duì)象的方式3

將DataFrame保存為不同格式的文件4DataFrame的常用操作5使用SparkSQL讀寫MySQL數(shù)據(jù)庫(kù)2創(chuàng)建DataFrame對(duì)象的方式使用Parquet文件創(chuàng)建DataFrame對(duì)象SparkSQL最常見(jiàn)的結(jié)構(gòu)化數(shù)據(jù)文件格式是Parquet格式或JSON格式。SparkSQL可以通過(guò)load()方法將HDFS上的格式化文件轉(zhuǎn)換為DataFrame,load()默認(rèn)導(dǎo)入的文件格式是Parquet,Parquet是面向分析型業(yè)務(wù)的列式存儲(chǔ)格式。在Spark1.x版本,通過(guò)執(zhí)行“dfUsers=sqlContext.read.load("/user/hadoop/users.parquet")”命令可將HFDS上的Parquet格式的文件users.parquet轉(zhuǎn)換為DataFrame對(duì)象dfUsers。users.parquet文件可在Spark安裝包下的/examples/src/main/resources/目錄下找到如圖所示)。2創(chuàng)建DataFrame對(duì)象的方式使用Parquet文件創(chuàng)建DataFrame對(duì)象在Spark2.0之后,SparkSession封裝了SparkContext,SqlContext,通過(guò)SparkSession可以獲取到SparkConetxt、SqlContext對(duì)象。使用users.parquet創(chuàng)建DataFrame對(duì)象的命令:>>>usersDF=spark.read.load("file:/home/hadoop/sparkdata/users.parquet")>>>usersDF.show()#展示usersDF中的數(shù)據(jù)+------+--------------+----------------+|name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa|null|[3,9,15,20]||Ben|red|[]|+------+--------------+----------------+啟動(dòng)PySparkShell交互界面后會(huì)初始化SparkSession對(duì)象為spark,通過(guò)spark.read.load()方法可將parquet格式的users.parquet文件轉(zhuǎn)化為DataFrame對(duì)象。復(fù)制Spark安裝包下的users.parquet、people.csv、people.json、people.txt文件到/home/hadoop/sparkdata目錄下2創(chuàng)建DataFrame對(duì)象的方式使用JSON文件創(chuàng)建DataFrame對(duì)象在PySparkShell交互界面下,通過(guò)spark.read.format("json").load()方法可將JSON文件轉(zhuǎn)換為DataFrame對(duì)象。people.json文件的內(nèi)容如圖所示。#使用people.json文件創(chuàng)建DataFrame對(duì)象>>>dfGrade=spark.read.format("json").load("file:/home/hadoop/sparkdata/people.json")>>>dfGrade.show()+----+-------+|age|name|+----+-------+|null|Michael||30|Andy||19|Justin|+----+-------+2創(chuàng)建DataFrame對(duì)象的方式使用SparkSession方式創(chuàng)建DataFrame從Spark2.0開(kāi)始,引入SparkSession作為DataSet和DataFrameAPI的切入點(diǎn),SparkSession封裝了SparkConf、SparkContext和SQLContext。為了向后兼容,SQLContext和HiveContext也被保存下來(lái)。在實(shí)際寫程序時(shí),只需要定義一個(gè)SparkSession對(duì)象就可以了。創(chuàng)建SparkSession對(duì)象的代碼如下。frompyspark.sqlimportSparkSessionspark=SparkSession.builder.appName("ccc")\.config("spark.some.config.option","some-value")\.master("local[*]")\.enableHiveSupport()\#連接Hive時(shí)需要這句該方法.getOrCreate()#使用builder方式必須有該方法2創(chuàng)建DataFrame對(duì)象的方式使用SparkSession方式創(chuàng)建DataFrame參數(shù)說(shuō)明如表所示。通過(guò)builder創(chuàng)建SparkSession對(duì)象后,就可以調(diào)用他的方法和屬性進(jìn)行更多的操作了。2創(chuàng)建DataFrame對(duì)象的方式使用SparkSession方式創(chuàng)建DataFrame調(diào)用createDataFrame(data,schema)方法創(chuàng)建DataFramecreateDataFrame(data,schema)中的data用來(lái)指定創(chuàng)建DataFrame對(duì)象的數(shù)據(jù),可以是RDD、Python的列表list或Pandas的DataFrame對(duì)象;schema用來(lái)指定DataFrame的數(shù)據(jù)模式,可以是pyspark.sql.types類型指定的字段名和字段名數(shù)據(jù)類型的列表。使用range(start,end,step,numPartitions)方法創(chuàng)建一個(gè)列名為id的DataFrame使用spark.read.***()方法從不同類型的文件中加載數(shù)據(jù)創(chuàng)建DataFrame使用spark.read.***()方法從不同類型的文件中加載數(shù)據(jù)創(chuàng)建DataFrame對(duì)象,具體方法如表所示。

SparkSQL結(jié)構(gòu)化數(shù)據(jù)處理1 SparkSQL概述2 創(chuàng)建DataFrame對(duì)象的方式3將DataFrame保存為不同格式的文件4DataFrame的常用操作5使用SparkSQL讀寫MySQL數(shù)據(jù)庫(kù)3將DataFrame保存為不同格式的文件通過(guò)write.***()方法保存DataFrame對(duì)象可以使用DataFrame對(duì)象的write.***()方法將DataFrame對(duì)象保存為***格式的文件。保存為JSON格式的文件#創(chuàng)建DataFrame對(duì)象>>>grade1DF=spark.read.json("file:/home/hadoop/sparkdata/grade.json")#將grade1DF保存為JSON格式的文件>>>grade1DF.write.json("file:/home/hadoop/grade1.json")如果需讀取/home/hadoop/grade1.json中的數(shù)據(jù)生成DataFrame對(duì)象,可使用grade1.json目錄名稱,而不需要使用part-00000-41e0749a-ef50-4337-bcec-03339b94fa8b-c000.json文件>>>grade2DF=spark.read.json("file:/home/hadoop/grade1.json")保存為parquet格式的文件通過(guò)如下命令將grade1DF保存為parquet格式的文件。>>>grade1DF.write.parquet("file:/home/hadoop/grade1.parquet")3將DataFrame保存為不同格式的文件通過(guò)write.***()方法保存DataFrame對(duì)象保存為csv格式文件通過(guò)如下命令將grade1DF保存為csv格式的文件。>>>grade1DF.write.csv("file:/home/hadoop/grade1.csv")通過(guò)write.format()方法保存DataFrame對(duì)象通過(guò)write.format()方法可將DataFrame對(duì)象保存成JSON格式的文件、parquet格式的文件和csv格式的文件。保存成JSON格式文件使用下述語(yǔ)句將DataFrame對(duì)象grade1DF保存成JSON格式的文件。>>>grade1DF.write.format("json").save("file:/home/hadoop/grade2.json")保存成parquet格式文件使用下述語(yǔ)句將DataFrame對(duì)象grade1DF保存成parquet格式的文件。>>>grade1DF.write.format("parquet").save("file:/home/hadoop/grade2.parquet")3將DataFrame保存為不同格式的文件通過(guò)write.format()方法保存DataFrame對(duì)象保存成csv格式文件使用下述語(yǔ)句將DataFrame對(duì)象grade1DF保存成csv格式的文件。。>>>grade1DF.write.format("csv").save("file:/home/hadoop/grade2.csv")先將DataFrame轉(zhuǎn)化成RDD再保存到文件中通過(guò)grade1DF.rdd.saveAsTextFile("file:/")將grade1DF先轉(zhuǎn)化成RDD,然后再寫入文本文件。>>>grade1DF.rdd.saveAsTextFile("file:/home/hadoop/grade")

SparkSQL結(jié)構(gòu)化數(shù)據(jù)處理1 SparkSQL概述2 創(chuàng)建DataFrame對(duì)象的方式3將DataFrame保存為不同格式的文件4DataFrame的常用操作5使用SparkSQL讀寫MySQL數(shù)據(jù)庫(kù)4DataFrame的常用操作行類Row操作行類Row的完整格式pyspark.sql.Row??墒褂肦ow類的對(duì)象創(chuàng)建DataFrame。>>>frompyspark.sqlimportfunctionsasf>>>frompyspark.sqlimportRow>>>row1=Row(name='Wang',spark=89,python=85)#創(chuàng)建Row類的對(duì)象>>>type(row1)#查看row1的類型<class'pyspark.sql.types.Row'>>>>row2=Row(name='Li',spark=95,python=86)>>>row3=Row(name='Ding',spark=90,python=88)>>>rdd=sc.parallelize([row1,row2,row3])#利用Row對(duì)象創(chuàng)建RDD>>>df=rdd.toDF()#將RDD對(duì)象轉(zhuǎn)換為DataFrame,可以指定新的列名>>>df.show()+----+-----+------+|name|spark|python|+----+-----+------+|Wang|89|85||Li|95|86||Ding|90|88|+----+-----+------+4DataFrame的常用操作行類Row操作也可以通過(guò)Row類的對(duì)象的列表直接創(chuàng)建DataFrame>>>DataFrame2=spark.createDataFrame([row1,row2,row3])調(diào)用Row類的對(duì)象的asDict()方法將其轉(zhuǎn)換為字典對(duì)象。>>>row1.asDict(){'name':'Wang','spark':89,'python':85}列類Column操作Column類的對(duì)象用來(lái)創(chuàng)建DataFrame對(duì)象中的列,Column的完整格式pyspark.sql.Column。調(diào)用列的alias()方法對(duì)輸出的列重命名輸出DataFrame中的列時(shí),調(diào)用列的alias()方法可對(duì)輸出的列重命名。例如:>>>df.select('name',df.spark.alias("SPARK")).show(2)#對(duì)spark列重命名+----+-----+|name|SPARK|+----+-----+|Wang|89||Li|95|+----+-----+4DataFrame的常用操作列類Column操作對(duì)列進(jìn)行排列調(diào)用asc()方法返回列的升序排列,調(diào)用desc()方法返回列的降序排列。例如,以spark的升序返回?cái)?shù)據(jù):>>>df.select(df.spark,df.python).orderBy(df.spark.asc()).show()+-----+------+|spark|python|+-----+------+|89|85||90|88||95|86|+-----+------+4DataFrame的常用操作列類Column操作改變列的數(shù)據(jù)類型調(diào)用astype()方法改變列的數(shù)據(jù)類型。例如:>>>df.select(df.spark.astype('string').alias('Spark')).collect()[Row(Spark='89'),Row(Spark='95'),Row(Spark='90')]按條件篩選between(lowerBound,upperBound)用于篩選出指定列的某個(gè)范圍內(nèi)的值,返回的是true或false。>>>df.select(,df.python.between(85,87)).show()+----+-----------------------------------+|name|((python>=85)AND(python<=87))|+----+-----------------------------------+|Wang|true||Li|true||Ding|false|+----+-----------------------------------+4DataFrame的常用操作列類Column操作按條件篩選when(condition,value1).otherwise(value2)用于對(duì)于指定的列根據(jù)條件condition重新賦值,滿足條件condition的值賦值為values1,不滿足條件的值賦值為values2。例如:4DataFrame的常用操作列類Column操作判斷列中是否包含特定的值可以用contains()方法判斷指定列中是否包含特定的值,返回的是true或false。獲取列中的子字符串可以用substr(startPos,length)方法獲取從startPos索引下標(biāo)開(kāi)始,長(zhǎng)度為length的子字符串。更改列的值可以用withColumn()方法更改列的值、轉(zhuǎn)換DataFrame中已存在的列的數(shù)據(jù)類型、添加或者創(chuàng)建一個(gè)新的列等。>>>frompyspark.sql.functionsimportcol>>>df3=df.withColumn("python",col("python")*100)#更改列的值#使用現(xiàn)有列添加新列>>>df4=df.withColumn("PYTHON",col("python")*10)4DataFrame的常用操作DataFrame的常用屬性首先在/home/hadoop目錄下創(chuàng)建grade.json文件,文件內(nèi)容如下。{"ID":"106","Name":"DingHua","Class":"1","Scala":92,"Spark":91}{"ID":"242","Name":"YanHua","Class":"2","Scala":96,"Spark":90}{"ID":"107","Name":"Feng","Class":"1","Scala":84,"Spark":91}{"ID":"230","Name":"WangYang","Class":"2","Scala":87,"Spark":91}{"ID":"153","Name":"ZhangHua","Class":"2","Scala":85,"Spark":92}{"ID":"235","Name":"WangLu","Class":"1","Scala":88,"Spark":92}{"ID":"224","Name":"MenTian","Class":"2","Scala":83,"Spark":90}然后使用grade.json文件創(chuàng)建DataFrame對(duì)象gradedf:>>>gradedf=spark.read.json("file:/home/hadoop/grade.json")下面用gradedf演示DataFrame對(duì)象的常用屬性。>>>gradedf.columns#以列表的形式列出所有列名['Class','ID','Name','Scala','Spark']>>>gradedf.dtypes#列出各列的數(shù)據(jù)類型[('Class','string'),('ID','string'),('Name','string'),('Scala','bigint'),('Spark','bigint')]4DataFrame的常用操作輸出show()方法輸出數(shù)據(jù)DataFrame對(duì)象show()方法用來(lái)以表格的形式輸出DataFrame對(duì)象中的數(shù)據(jù)。show()方法有3種調(diào)用方式。(1)show()默認(rèn)輸出前20條記錄。(2)show(numRows:Int)

輸出前numRows條

。(3)show(truncate=True)是否最多只輸出字段值前20個(gè)字符,默認(rèn)為True,最多只輸出前20個(gè)字符。為False時(shí)表示不進(jìn)行信息的縮略。collect()獲取所有數(shù)據(jù)到list列表不同于前面的show()方法,collect()方法以list列表的形式返回gradedf中的所有數(shù)據(jù),list中每個(gè)元素是Row類型。4DataFrame的常用操作輸出查看DataFrame對(duì)象的數(shù)據(jù)模式通過(guò)DataFrame對(duì)象的printSchema()方法,可查看一個(gè)DataFrame對(duì)象中有那些列,這些列是什么樣的數(shù)據(jù)類型,即打印出字段名稱和類型。>>>gradedf.printSchema()查看DataFrame對(duì)象的行數(shù)DataFrame對(duì)象的count()方法用來(lái)輸出DataFrame對(duì)象的行數(shù)。>>>gradedf.count()使用first()、head()、take()方法獲取若干行記錄(1)first()返回第一行記錄>>>gradedf.first()(2)head()獲取第一行記錄,head(n)獲取前n行記錄>>>gradedf.head(2)#獲取前2行記錄(3)take(n)獲取前n行記錄>>>gradedf.take(2)4DataFrame的常用操作輸出distinct()返回一個(gè)不包含重復(fù)記錄的DataFrame對(duì)象>>>gradedf.distinct().show()dropDuplicates()根據(jù)指定字段去重返回一個(gè)DataFrame對(duì)象根據(jù)指定字段去重。>>>gradedf.dropDuplicates(["Spark"]).show()#根據(jù)Spark字段去重where(conditionExpr)方法篩選根據(jù)條件表達(dá)式conditionExpr(字符串類型)篩選數(shù)據(jù),可以用and和or,相當(dāng)于SQL語(yǔ)言中where關(guān)鍵字后的條件,返回一個(gè)DataFrame對(duì)象。>>>gradedf.where("Class='1'andSpark='91'").show()篩選4DataFrame的常用操作篩選下標(biāo)運(yùn)算符[]獲取指定列指定一個(gè)列名,通過(guò)下標(biāo)運(yùn)算符[]返回Column類型數(shù)據(jù);指定多個(gè)列名,通過(guò)下標(biāo)運(yùn)算符[]返回DataFrame類型數(shù)據(jù)。>>>gradedf["Spark"]Column<'Spark'>>>gradedf.SparkColumn<'Spark'>>>>gradedf["Spark","Scala"]DataFrame[Spark:bigint,Scala:bigint]drop(ColumnNames)方法篩選去除指定字段,保留其他字段,返回一個(gè)新的DataFrame對(duì)象。>>>gradedf.drop("ID","Spark").show(3)limit(n)方法篩選獲取DataFrame的前n行記錄,得到一個(gè)新的DataFrame對(duì)象。4DataFrame的常用操作篩選select(ColumnNames)方法篩選根據(jù)傳入的字符串類型字段名,獲取指定字段的值,返回一個(gè)DataFrame對(duì)象。>>>gradedf.select("Class","Name","Scala").show(3,False)#輸出篩選的數(shù)據(jù)時(shí)對(duì)列進(jìn)行重命名>>>gradedf.select("Name","Scala").withColumnRenamed("Name","NAME").withColumnRenamed("Scala","SCALA").show(2)#alias(*alias)對(duì)列重命名>>>gradedf.select("Name",gradedf.Spark.alias("spark")).show(3)selectExpr(Expr)方法篩選可以直接對(duì)指定字段調(diào)用用戶自定義函數(shù),或者指定別名等。傳入字符串類型Expr參數(shù),返回一個(gè)DataFrame對(duì)象。>>>gradedf.selectExpr("Name","NameasNames","upper(Name)","Scala*10").show(3)4DataFrame的常用操作排序orderBy()和sort()排序方法orderBy()和sort()用來(lái)按指定字段排序,默認(rèn)為升序,返回一個(gè)DataFrame對(duì)象,兩種方法的用法相同。>>>gradedf.orderBy("Spark","Scala").show(5)>>>gradedf.sort("Spark","Scala",ascending=False).show(5)sortWithinPartitions()排序方法和上面的sort()方法功能類似,區(qū)別在于sortWithinPartitions()方法返回的是按Partition排好序的DataFrame對(duì)象。>>>gradedf.sortWithinPartitions("ID").show()匯總與聚合groupBy()匯總操作groupBy()按某些字段匯總(也稱分組),返回結(jié)果是GroupedData類型對(duì)象,GroupedData對(duì)象提供了很多操作分組數(shù)據(jù)的方法。4DataFrame的常用操作匯總與聚合groupBy()匯總操作groupBy()按某些字段匯總(也稱分組),返回結(jié)果是GroupedData類型對(duì)象,GroupedData對(duì)象提供了很多操作分組數(shù)據(jù)的方法。>>>gradedf.groupBy("Class")<pyspark.sql.group.GroupedDataobjectat0x7fa62b0521c0>>>>gradedf.groupBy("Class").count()DataFrame[Class:string,count:bigint](1)結(jié)合count()方法統(tǒng)計(jì)每組的記錄數(shù)>>>gradedf.groupBy("Class").count().show()(2)結(jié)合max(colNames)方法獲取分組指定字段colNames的最大值只能作用于數(shù)字型字段。>>>gradedf.groupBy("Class").max("Scala","Spark").show()(3)結(jié)合min(colNames)方法獲取分組指定字段的最小值只能作用于數(shù)字型字段。4DataFrame的常用操作匯總與聚合groupBy()匯總操作(4)結(jié)合sum(colNames)方法獲取分組指定字段的和值只能作用于數(shù)字型字段。>>>gradedf.groupBy("Class").sum("Spark","Scala").show()(5)結(jié)合mean(colNames)方法獲取分組指定字段的平均值只能作用于數(shù)字型字段。>>>gradedf.groupBy("Class").mean("Spark","Scala").show()agg()聚合操作agg()針對(duì)某列進(jìn)行聚合操作,返回DataFrame類型對(duì)象。agg()可以同時(shí)對(duì)多個(gè)列進(jìn)行操作,生成所需要的數(shù)據(jù)。>>>frompyspark.sqlimportfunctionsasf>>>gradedf.agg(f.min(gradedf.Spark),f.max(gradedf.Spark)).show()4DataFrame的常用操作統(tǒng)計(jì)describe()方法用來(lái)數(shù)字列和字符串列的基本統(tǒng)計(jì)信息,比如count、mean、stddev、min、max等,結(jié)果仍然為DataFrame對(duì)象。下面使用DataFrame對(duì)象的describe()方法用來(lái)獲取指定字段的統(tǒng)計(jì)信息。gradedf.describe().show()可以調(diào)用summary()方法計(jì)算數(shù)字列和字符串列的指定統(tǒng)計(jì)信息,如計(jì)數(shù)、標(biāo)準(zhǔn)差、最小值、最大值、四分之一分位數(shù)、四分之三分位數(shù)等。>>>gradedf.summary("count","min","max","25%","75%","mean").show()>>>gradedf.summary("count","min","max","25%","75%","mean").select("summary","Scala","Spark","Name").show()#顯示指定的列>>>gradedf.corr("Scala","Spark")#計(jì)算Scala列與Spark列之間的相關(guān)系數(shù)4DataFrame的常用操作合并unionAll(other:Dataframe)方法用于合并兩個(gè)DataFrame對(duì)象,unionAll()方法并不是按照列名和并,而是按照位置合并的,對(duì)應(yīng)位置的列將合并在一起,列名不同并不影響合并,兩個(gè)DataFrame對(duì)象的字段數(shù)必須相同。>>>gradedf.select("Name","Scala","Spark").unionAll(df.select("name","spark","python")).show()4DataFrame的常用操作連接DataFrame中提供了join()連接DataFrame對(duì)象的6種方式。笛卡爾積DataFrame對(duì)象調(diào)用join()方法求兩個(gè)DataFrame對(duì)象的笛卡爾積。>>>df1=spark.createDataFrame([("ZhangSan",86,88),("LiSi",90,85)]).toDF("name","Java","Python")#toDF()為列指定新名稱>>>df2=spark.createDataFrame([("ZhangSan",86,88),("LiSi",90,85),("WangWU",86,88),("WangFei",90,85)]).toDF("name","Java","Scala")>>>df1.join(df2).show()4DataFrame的常用操作連接DataFrame中提供了join()連接DataFrame對(duì)象的6種方式。一個(gè)字段形式的連接通過(guò)兩個(gè)DataFrame對(duì)象的一個(gè)相同字段將兩個(gè)DataFrame對(duì)象連接起來(lái)。>>>df1.join(df2,"name").show()#"name"是df1和df2相同的字段+--------+----+------+----+-----+|name|Java|Python|Java|Scala|+--------+----+------+----+-----+|LiSi|90|85|90|85||ZhangSan|86|88|86|88|+--------+----+------+----+-----+4DataFrame的常用操作連接多個(gè)字段形式的連接通過(guò)兩個(gè)DataFrame對(duì)象的多個(gè)相同字段將兩個(gè)DataFrame對(duì)象連接起來(lái)。#"name"、"Java"是df1和df2相同的字段>>>df1.join(df2,["name","Java"]).show()指定join類型的連接兩個(gè)DataFrame對(duì)象的join連接有inner,outer,left_outer,right_outer,leftsemi類型。在上面的用多個(gè)字段join連接的情況下,可以寫第三個(gè)String類型參數(shù),指定join的類型,如下所示。>>>df1.join(df2,["name","Java"],"inner").show()+--------+----+------+-----+|name|Java|Python|Scala|+--------+----+------+-----+|LiSi|90|85|85||ZhangSan|86|88|88|+--------+----+------+-----+4DataFrame的常用操作連接使用Column類型的連接指定兩個(gè)DataFrame對(duì)象的字段進(jìn)行連接。>>>df1.join(df2,==).show()+--------+----+------+--------+----+-----+|name|Java|Python|name|Java|Scala|+--------+----+------+--------+----+-----+|ZhangSan|86|88|ZhangSan|86|88||ZhangSan|86|88|LiSi|90|85||ZhangSan|86|88|WangWU|86|88||ZhangSan|86|88|WangFei|90|85||LiSi|90|85|ZhangSan|86|88||LiSi|90|85|LiSi|90|85||LiSi|90|85|WangWU|86|88||LiSi|90|85|WangFei|90|85|+--------+----+------+--------+----+-----+4DataFrame的常用操作連接使用Column類型的同時(shí)指定join類型的連接指定兩個(gè)DataFrame對(duì)象的字段和join類型的連接。>>>df1.join(df2,==,"inner").show()to系列轉(zhuǎn)換to系列方法主要包括toDF、toJSON、toPandas、toLocalIterator,DataFrame調(diào)用這些方法可將DataFrame轉(zhuǎn)換為其他類型的數(shù)據(jù)形式。>>>gradedf.toLocalIterator()#返回Python迭代器,可帶來(lái)計(jì)算和內(nèi)存使用的優(yōu)勢(shì)>>>grade_json=gradedf.toJSON()#通過(guò)toJSON()將DataFrame轉(zhuǎn)換為RDD>>>grade_pd=gradedf.toPandas()#通過(guò)toPandas()將DataFrame轉(zhuǎn)換為Pandas的DataFrame>>>gradedf.toDF('CLASS','ID','NAME','SCALA','SPARK')#轉(zhuǎn)換為新列名的新的DataFrame

SparkSQL結(jié)構(gòu)化數(shù)據(jù)處理1 SparkSQL概述2 創(chuàng)建DataFrame對(duì)象的方式3將DataFrame保存為不同格式的文件4DataFrame的常用操作5使用SparkSQL讀寫MySQL數(shù)據(jù)庫(kù)5使用SparkSQL讀寫MySQL數(shù)據(jù)庫(kù)SparkSQL可以通過(guò)JDBC連接MySQL數(shù)據(jù)庫(kù)來(lái)存儲(chǔ)和管理數(shù)據(jù)。安裝并配置MySQL$sudoapt-getupdate#更新軟件源$sudoapt-getinstallmysql-server#安裝mysql上述命令會(huì)安裝以下包:mysql-client-8.0mysql-server-8.0因此無(wú)需再安裝mysql-client等。以root用戶登陸登陸MySQL進(jìn)入mysqlshell界面,即進(jìn)入“mysql>”命令提示符狀態(tài):$sudomysql-uroot-p#-u指定用戶名,-p指示設(shè)定MySQL數(shù)據(jù)庫(kù)root用戶的密碼或者:$sudomysql#可以不需要指定用戶名密碼安裝MySQL5使用SparkSQL讀寫MySQL數(shù)據(jù)庫(kù)SparkSQL可以通過(guò)JDBC連接MySQL數(shù)據(jù)庫(kù)來(lái)存儲(chǔ)和管理數(shù)據(jù)。安裝并配置MySQLsystemctlstatusmysql#查看狀態(tài),裝完后默認(rèn)就啟動(dòng)了,默認(rèn)開(kāi)機(jī)啟動(dòng)sudosystemctldisablemysql#關(guān)閉開(kāi)機(jī)啟動(dòng)sudosystemctldisablemysql#設(shè)置開(kāi)機(jī)啟動(dòng)sudosystemctlstartmysql#啟動(dòng)mysql服務(wù)sudosystemctlstopmysql#關(guān)閉mysql服務(wù)MySQL服務(wù)的狀態(tài)管理MySQLJDBC下載地址/downloads/connector/j/,本書下載的安裝文件是mysql-connector-java_8.0.26-1ubuntu20.04_all.deb。(1)安裝$cd~/下載#切換到下載文件所在目錄下$sudoaptinstall./mysql-connector-java_8.0.26-1ubuntu20.04_all.deb#安裝#將jar包拷貝到Spark的安裝目錄的jars子目錄中(即/usr/local/spark/jars)$cp/usr/share/java/mysql-connector-java-8.0.26.jar/usr/local/spark/jars下載安裝MySQLJDBC5使用SparkSQL讀寫MySQL數(shù)據(jù)庫(kù)SparkSQL可以通過(guò)JDBC連接MySQL數(shù)據(jù)庫(kù)來(lái)存儲(chǔ)和管理數(shù)據(jù)。安裝并配置MySQL2)啟動(dòng)MySQL執(zhí)行如下命令啟動(dòng)MySQL,并進(jìn)入Shell界面,即“mysql>”命令提示符狀態(tài):$sudoservicemysqlstart#啟動(dòng)MySQL服務(wù)$sudomysql-uroot-p#登錄MySQL數(shù)據(jù)庫(kù)-u表示選擇登陸的用戶名,-p表示登陸的用戶密碼,系統(tǒng)會(huì)提示輸入MySQL的root用戶的密碼。在MySQLShell環(huán)境下,輸入如下SQL語(yǔ)句完成數(shù)據(jù)庫(kù)和表的創(chuàng)建。mysql>createdatabaseclass;#class是數(shù)據(jù)庫(kù)名mysql>useclass;#進(jìn)入指定數(shù)據(jù)庫(kù)mysql>createtablestudent(idint(4),namechar(20),sexchar(1),ageint(3));mysql>insertintostudentvalues(1001,'Wang','F',18);#往表中寫內(nèi)容下載安裝MySQLJDBC5使用SparkSQL讀寫MySQL數(shù)據(jù)庫(kù)SparkSQL可以通過(guò)JDBC連接MySQL數(shù)據(jù)庫(kù)來(lái)存儲(chǔ)和管理數(shù)據(jù)。讀取MySQL數(shù)據(jù)庫(kù)中的數(shù)據(jù)spark.read.format('jdbc')操作可以實(shí)現(xiàn)對(duì)MySQL數(shù)據(jù)庫(kù)的讀取。執(zhí)行以下命令連接數(shù)據(jù)庫(kù),讀取數(shù)據(jù)并顯示。>>>jdbcDF=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/class").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable","student").option("use

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論