2022Apache spark中文實(shí)戰(zhàn)手冊(cè)指南_第1頁
2022Apache spark中文實(shí)戰(zhàn)手冊(cè)指南_第2頁
2022Apache spark中文實(shí)戰(zhàn)手冊(cè)指南_第3頁
2022Apache spark中文實(shí)戰(zhàn)手冊(cè)指南_第4頁
2022Apache spark中文實(shí)戰(zhàn)手冊(cè)指南_第5頁
已閱讀5頁,還剩111頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

ApacheSpark的前世今 ApacheSpark3.0:全新功能知多 ApacheSpark3.0:十年顧,展望未 DeltaLake深度解 數(shù)據(jù)工程師眼中的Delta DataLake三劍客——Delta、Hudi、Iceberg對(duì)比分 核桃編程DeltaLake實(shí)時(shí)數(shù)倉應(yīng)用實(shí) “臟數(shù)據(jù)”走開:Schema約束和Schema演 使用JupyterNotebook運(yùn)行DeltaLake入門教 SparkSQL性能優(yōu) ApacheSpark3.0中的SQL性能改進(jìn)概 StructuredStreaming生產(chǎn)化實(shí)踐及調(diào) 使用SparkStreamingSQL進(jìn)行PV/UV統(tǒng) 淺析Hive/SparkSQL讀文件時(shí)的輸入任務(wù)劃 ApacheSpark ApacheSparkApacheSpark3.0ApacheSparkPMCDatabricks,Spark研發(fā)部主管,領(lǐng)導(dǎo)Spark,Koalas,Databricksruntime,OEM的研發(fā)團(tuán)隊(duì),在直播中為大家深入講解了ApacheSpark3.0的新功能。Spark3.0解決了超過3400個(gè)JIRAs,歷時(shí)一年多,是整個(gè)社區(qū)集體智慧的成果。SparkSQL和SparkCores是其中的核心模塊,其余模塊如PySpark等模塊均是建立在兩者之上。Spark3.0新增AdaptiveQueryDynamicPartitionQueryComplicationJoin(一)AdaptiveQuery ApacheSparkApacheSparkApacheSpark在Spark1.0中所有的CatalystOptimizer(rule)優(yōu)化的。為了產(chǎn)生比較好的查詢規(guī)則,優(yōu)化器需要理解數(shù)據(jù)的特性,于是在Spark2.0(cost-basedSpark的UDF(User-definedFunction)簡(jiǎn)單易用,種類繁多,但是對(duì)于CBO來說是個(gè)黑劃的執(zhí)行會(huì)不斷的進(jìn)行優(yōu)化,而且盡可能地復(fù)用了現(xiàn)有優(yōu)化器的已有優(yōu)化規(guī)則。讓整個(gè)查詢優(yōu)化變可以基于運(yùn)行期的統(tǒng)計(jì)信息,將SortMergeJoin轉(zhuǎn)換為BroadcastHash可以處理數(shù)據(jù)分布不均導(dǎo)致的skewjoin。快的方法就是盡可能地使用BroadcastHashJoin。比如你可以增加spark.sql.autoBroadcastJoinThresholdbroadcastHINT。但是這基本上屬于藝高人膽大。首先,這種方法很難調(diào),一不小心就會(huì)OutofMemory,甚至性能變得更差,即使現(xiàn)往往是壓縮的,尤其是列存儲(chǔ)格式,比如parquetORC,而Spark是基于行處理,如果數(shù)據(jù)連續(xù)重復(fù),filesize可能和真實(shí)的行存儲(chǔ)的真實(shí)大小,差別非常之大。這也是為何提高autoBroadcastJoinThreshold,即使不是太大也可能會(huì)導(dǎo)致outofmemory;其中,Spark3.0中基于運(yùn)行期的統(tǒng)計(jì)信息,將SortMergeJoin轉(zhuǎn)換為BroadcastHash。這一系列問題在queryplan復(fù)雜的時(shí)候變得尤為突出,還可能會(huì)影響到其他性能,最后耗時(shí)耗力是出現(xiàn)了bugs,又比如大量的diskspilling會(huì)導(dǎo)致很多節(jié)點(diǎn)都無事可做。此外,你也許會(huì)看到outofmemory這種異常。其解決方法也很多,比如找到skewvalues然后重寫query,或者在join的情況下增加skewkeys來消除數(shù)據(jù)分布不均,但是無論哪種方法,都非常浪費(fèi)時(shí)間,且后期難以維以隨著數(shù)據(jù)分布和特性的變化自動(dòng)改變Queryplan,讓更多的query編譯靜態(tài)優(yōu)化變成運(yùn)行時(shí)的動(dòng)(二)DynamicPartitionynacatnunngueyatonsascheC-S18(三)JoinJoinHints是一個(gè)非常普遍的數(shù)據(jù)庫的優(yōu)化策略,在3.0之前已經(jīng)有了Broadcasthashjoin,3.0之后的版本加了Sort-mergejoin、ShufflehashjoinShufflenestedloopjoin,但是要注意謹(jǐn)慎化,你的數(shù)據(jù)變了,可能會(huì)讓你的query變慢,變得不穩(wěn)定。總體來說上面的四種Join的適用條件和特點(diǎn)如下所示,總而言之,使用JoinHints要謹(jǐn)慎。 ApacheSparkApacheSparkApacheSpark 二、RicherAccelerator-awareBuilt-inpandasUDFDELETE/UPDATE/MERGEin(一)pandasUDFpandasUDF應(yīng)該說是PySPark用戶中最喜愛的特性之一,對(duì)于其功能和性能的提升應(yīng)該都是喜聞最新的pandasUDF和之前的不同之處在于引入了PythonTypeHints,現(xiàn)在用戶可以使用pandas中的數(shù)據(jù)類型比如pandas.Series等來表示pandasUDF的種類,不再需要記住原來的UDF類型,只需要指定正確的輸入和輸出類型即可。此外,pandasUDF可以分為pandasUDF和pandasAPI。(二)Accelerator-awareAccelerator-awareScheduler是加速器的調(diào)度支持,狹義上也就是指GPU調(diào)度支持。加速器經(jīng)常(三)Built-in三、MonitoringandStructuredStreamingDDL/DMLObservableEventLog(一)StructuredStreamingStructuredStreaming是在Spark2.0中發(fā)布的,在Spark3.0中加入了UI的配置。新的UI主要包括InputRate、ProcessRate、BatchDuration和OperateDuration。(二)DDL/DMLPLAPLAPLARATD模(三)Observable我們還引入了ObservableMetrics用以觀測(cè)數(shù)據(jù)的質(zhì)量。要知道數(shù)據(jù)質(zhì)量對(duì)于很多Spark應(yīng)用都是相當(dāng)重要的,通常定四、SQLANSIStoreOverflowReservedKeywordsinProlepticGregoriannse遵守了ASISeAsgnenerwSLaseASendAS的Snsng,如果將nsn試將snnASISeAsgnen+efwChecn五、Built-inData在這個(gè)版本中我們提升了預(yù)裝的數(shù)據(jù)源,比如Parquettable,我們可以對(duì)NestedColumn做ColumnPruning和FilterPushdown,此外還支持了對(duì)CSV的FilterPushdown,還引入了BinaryDataSource來處理類似于二進(jìn)制的圖片文件。六、Extensibilityand對(duì)DataSourceV2API的持續(xù)改善和catalog支持Java支持Hadoop支持Hive3(一)DataSourceV2API+CatalogSpark3.0加上了對(duì)Catalog的支持來擴(kuò)展DataSourceAPI。CatalogpluginAPI可以讓用戶注冊(cè)沒有實(shí)現(xiàn)Catalogplugin的數(shù)據(jù)源,用戶需要先注冊(cè)每個(gè)外部數(shù)據(jù)源的表才能訪問,但是實(shí)現(xiàn)了CatalogpluginAPI之后我們只需要注冊(cè)Catalog,然后就可以直接遠(yuǎn)程訪問和操作catalog的表。對(duì)于數(shù)據(jù)源的開發(fā)者來說,什么時(shí)候支DataSourceV2API呢?下面是幾點(diǎn)建議:不過這里需要注意,DataSourceV2還不是很穩(wěn)定,開發(fā)者可能在未來還需要調(diào)整相關(guān)API的實(shí)aasyhnSakandasA,讓andaaasandasandaySakySakandasAaas的,在p005aasAISakSaaasaas3\h并且增加了SQLReference,\h/docs/latest/sql-ref.htmlApacheSpark3.0:十年顧,展望未Spark發(fā)布的第十年,顧Spark如何一步步發(fā)展到今天,其發(fā)展過程所積累的經(jīng)驗(yàn),以及這些經(jīng)驗(yàn)對(duì)Spark未來發(fā)展的啟發(fā),對(duì)Spark大有脾益。在7月4日的Spark+AISUMMIT2020中文精華版線上峰會(huì)上,DatabricksSpark研發(fā)部主管李瀟帶來了《ApacheSpark3.0簡(jiǎn)介:顧過去的十年,并展望未來》的全面解析,為大家介紹了Spark的起源、發(fā)展過程及最新進(jìn)展演講嘉賓簡(jiǎn)介:李瀟,DatabricksSparkApacheSparkcommitter,PMC ApacheSparkApacheSparkApacheSpark Facebook2010年,Matei將Spark開源。Spark的最初版本僅僅關(guān)注MapReduce風(fēng)格的計(jì)算,其網(wǎng)頁如下圖。相對(duì)于傳統(tǒng)的MapReduce,Spark具有更干凈的API以及更好的性能。令人興奮的是,SparkaeSaeeupaeSaSaSakSaSaSakSaSLquey,運(yùn)行SakengnaeiSak ApacheSparkApacheSparkApacheSpark 第三,提供了更高層級(jí)的API以消除用戶手動(dòng)設(shè)置API的困擾,比如大家熟知的DataFramesAPIDataFramesAPI是SparkSQL中最受歡迎的API,和SQLLanguage一樣。這是因?yàn)镈ataFramesAPI會(huì)被SQLSQL語言更加簡(jiǎn)單好用。這些曾經(jīng)的努力深深地影響著現(xiàn)在的ApacheSpark在Python方面,68%的Databricks的交互式notebook命令是用Python寫的,是Scala的6倍,同在SQL方面,大約有90%的SparkAPI調(diào)用實(shí)際上跑在SparkSQL這個(gè)模塊上。無論開發(fā)人員使用Python,Scala,Java或者R調(diào)用Spark,這些開發(fā)人員都實(shí)際受益于SQLengine的優(yōu)化。在Databricks上,每天由SparkSQL處理的數(shù)據(jù)量都達(dá)到了exabytes量級(jí)。因此,整個(gè)社區(qū)在SparkSQL的投入相當(dāng)大,這使得SparkSQLengine成為性能最好的開源SQLengine。在TPC-DSbenchmarkSpark3.0SparkSQLengine的性能比Spark2.0整整快了兩倍,比Presto快了1.5倍。今年年初,阿里巴巴的E-MapReduce團(tuán)隊(duì)使用SparkSQLengine打破了TPC-DSbenchmark項(xiàng)目的最高性能記錄??偟膩碚f,SparkSQLengine是一個(gè)非常強(qiáng)大且高效的SQLengine。ApacheSpark的前世今 在流處理方面,每天使用Databricks做流處理的數(shù)據(jù)超過5兆。StructuredStreaming讓流處理變第一,易用性,即如何使用戶更簡(jiǎn)單地使用和操作Spark以及如何幫助用戶快速定位錯(cuò)誤,這對(duì)于第二,SparkAPI的設(shè)計(jì)需要關(guān)注這些設(shè)計(jì)能否支持用戶實(shí)現(xiàn)軟件開發(fā)的最佳實(shí)踐,比如通過組合如Python,Java,ScalaRAPI的設(shè)計(jì)支持軟件開發(fā) ApacheSparkApacheSparkApacheSpark Spark3.0是Spark有史以來最大的Release,共包含3400多個(gè)patch。下面這張圖顯示了這些patch所屬的模塊,幾乎一半的patch都屬于SparkSQL。SparkSQL的優(yōu)化不僅服務(wù)于SQLlanguage,還服務(wù)于機(jī)器學(xué)習(xí)、流計(jì)算和Dataframes等計(jì)算任務(wù),這使得社區(qū)對(duì)SparkSQL的投入非常大。此外,Spark團(tuán)隊(duì)還付出了大量努力使Spark2.0的用戶方便地升級(jí)到3.0。近幾年,SQLengine方面主要的改進(jìn)就是AdaptiveQueryExecution(AQE)。AQE能夠在運(yùn)行時(shí)根據(jù)計(jì)算任務(wù)的特性更新計(jì)算計(jì)劃,也就是executionplan,比如自動(dòng)調(diào)整reducerAQE可以自動(dòng)調(diào)整reducer數(shù)量。過去,Dataframes上60%的集群都需要用戶手動(dòng)更改reducer數(shù)在TPC-DS的查詢問題上,AQE能夠調(diào)整不同key以達(dá)到8倍的性能加速。這使得用戶能夠使用AQE僅僅是Spark3.0在性能方面的一種改進(jìn),提升Spark性能的例子還包括Dynamicpartitionpruning,Querycompilespeedups,以及Optimizerhints等。正如前文所述,Spark3.0相比Spark2.0的性能提速達(dá)到2倍,并且這種改進(jìn)在真實(shí)場(chǎng)景下可能更明顯。除了性能,Spark3.0在SQL的兼容性方面也有了很大的提升。比如,ANSISQL方言做了標(biāo)準(zhǔn)化的SQL支持,使得其它SQL系統(tǒng)的負(fù)載和業(yè)務(wù)能夠很容易轉(zhuǎn)移到SparkSQL上。對(duì)于易用性,Spark3.0使用戶能夠更方便地定義PandasUDFs。用戶可以通過Pythontypehints指定其期待的數(shù)據(jù)格式和函數(shù)類型。在Spark3.0中,用戶可以僅僅指明數(shù)據(jù)的type,而不是寫許多冗余的模板式代碼,類似于Spark2.0。在性能方面,Spark3.0做了大量ApacheArrow的性能升級(jí),20%-25%的性能提升來自于ApacheArrow自身。Spark3.0還使用ApacheArrow實(shí)現(xiàn)了大量Python和R之間的數(shù)據(jù)交換,而這些對(duì)Spark使用者都是透明的。此外,Spark3.0在SparkR方面的性能提升高達(dá)40倍,Spark還將提出更Spark3.0新功能分布在不同的模塊上,比如阿里巴巴貢獻(xiàn)的可用來監(jiān)控流計(jì)算的StructuredStreamingUI,可檢測(cè)的流作業(yè)監(jiān)控指標(biāo),全新的Sparklanguage查詢手冊(cè),新的DataSourceAPI等。更多的Spark3.0新功能可參見XiaoLi的講座。除了Spark項(xiàng)目自身的發(fā)展,整個(gè)社區(qū)還圍繞Spark做出了許多創(chuàng)新。去年,Databricks發(fā)布了Koalas項(xiàng)目,支持直接在Spark上運(yùn)行PandasAPI,使得更多的Pandas用戶能夠使用Spark解決大數(shù)據(jù)問題。DeltaLAKE提供了可靠的表存儲(chǔ)。社區(qū)還給Scikitlearn,HYPEROPT,Joblib等添加了算法,使用GPUDatabricks也進(jìn)行了優(yōu)化,改善了Spark和可視化系統(tǒng)的交Koalas是PandasAPI在Spark上的實(shí)現(xiàn),能夠使更多的數(shù)據(jù)科學(xué)代碼直接運(yùn)行在Spark上。從去年在Spark3.0中發(fā)布至今,Koalas已經(jīng)達(dá)到了每個(gè)月85萬的下載量,大約是PySpark的1/5。未來這次,Spark社區(qū)還發(fā)布了Koalas1.0,具有以下特性:第一,達(dá)到了80%的PandasAPI覆蓋率。第三,能夠支持更多的功能,包括missingvalues,NA以及in-placeupdates等。在顧了Spark的發(fā)展過程后,接下來再來展望數(shù)據(jù)科學(xué)的發(fā)展和數(shù)據(jù)驅(qū)動(dòng)的AI進(jìn)步。顯然,和AI應(yīng)用仍然過于復(fù)雜,讓基于Spark的應(yīng)用開發(fā)變得更加簡(jiǎn)單仍大有可為。為此,Spark開源社第二,還需要讓SparkAPI更好地與生態(tài)圈的其它主流軟件連接起來。第一個(gè)項(xiàng)目叫Zen,中文名是禪。Zen的項(xiàng)目名來自Python社區(qū)的項(xiàng)目PythonZen,其定義了設(shè)Spark在Python方面的可用性,Spark社區(qū)希望通過Zen項(xiàng)目讓SparkPython的使用和Python生ANSISpark社區(qū)還將標(biāo)準(zhǔn)化SQL語言支持,通過ANSISQL使更多的主流SQLengine的工作能夠遷移到SparkSQL中。 ApacheSparkApacheSparkApacheSpark 以Zen的PythonErrorMessages在Spark2.4中如果發(fā)生意味除零,用戶會(huì)在PythonErrorMessages中得到冗長(zhǎng)的錯(cuò)誤信息,甚至還包括了大量的Java信息,這對(duì)Python程序員非常不友好。Spark3.0中將簡(jiǎn)化PythonErrorSpark社區(qū)還提供了對(duì)用戶更加友好的Python文檔,而之前的PySparkAPI文檔存在許多無用的Spark社區(qū)對(duì)Spark的未來非常有信心。有理由相信,Spark3.0將解決更多問題,讓大數(shù)據(jù)和AI數(shù)DeltaLake DeltaLakeDeltaLakeDeltaLake SPARK+AISUMMIT2020中文精華版線上峰會(huì)帶領(lǐng)大家一起顧2020年的SPARK又產(chǎn)技術(shù)主管范文臣從數(shù)據(jù)工程師的角度出發(fā)向大家介紹DeltaLake。以下是視頻內(nèi)容精華整理。一、DeltaLake想要實(shí)現(xiàn)上面的工具,一個(gè)最簡(jiǎn)單的辦法就是先用一個(gè)SparkStreamingJob把各種各樣的數(shù)據(jù)源如果處理邏輯中加入了數(shù)據(jù)校驗(yàn)的工作,就需要在批和流上分別校驗(yàn)兩次,一旦需要滾等操能夠做滾結(jié)合以上幾點(diǎn)目標(biāo),有了目前的解決方案:DeltaLake+StructuredStreaming=TheDeltaArchitecture。這套方案的優(yōu)點(diǎn)很明顯,首先是批流合一的,其次DeltaLake可以很方便的做時(shí)間旅行類似的操作,且DeltaLake是單純的儲(chǔ)存層,與計(jì)算層分離,符合當(dāng)前云數(shù)據(jù)計(jì)算的大方向,二、DeltaLakeDeltaLake的核心是其事務(wù)日志,它的表跟普通的表沒有大的區(qū)別,但是在表下會(huì)建立一個(gè)隱藏文更細(xì)節(jié)地來說,在DeltaLake中的每個(gè)JSON文件都是一次commit,這個(gè)commit是原子性的,保存了事務(wù)相關(guān)的詳細(xì)記錄。另外,DeltaLake還可以保證多個(gè)用戶同時(shí)commit而不會(huì)產(chǎn)生沖突, DeltaLakeDeltaLakeDeltaLake DeltaLake的處理方案如下圖所示,用Spark來讀取事務(wù)日志,然后DeltaLake隔一段時(shí)間對(duì)總結(jié)起來,DeltaLake解決數(shù)據(jù)一致性、增量讀取、歷史溯等問題的方案即為下圖所示Demo中提供了PythonAPI和ScalaAPI的實(shí)現(xiàn)文件,大家可以根據(jù)自己的實(shí)際情況進(jìn)行嘗試。上EnforcementSchemaEvolution:隨著公司業(yè)務(wù)的發(fā)展,一開始的表結(jié)構(gòu)可能不適用于當(dāng)前的業(yè)務(wù),SchemaEvolution可以幫助我們進(jìn)行表結(jié)構(gòu)的演化。DeletefromDeltaLaketable:Delete操作可以控制表的無限制增長(zhǎng),并且通過事務(wù)日志來AuditDeltaLakeTableHistoryTravelbackintimeVacuumoldversionsofDeltaLaketables:DeltaLake通過標(biāo)記的方式來實(shí)現(xiàn)刪除,隨著UpsertintoDeltaLaketableusingMerge:在一個(gè)命令中同時(shí)做update和insert操作。A1:Delta最新發(fā)布了0.7.0,支持Spark3.0。Databricks已經(jīng)有很多客戶在使用DeltaLake,其eBay。實(shí)時(shí)增刪查改如demo演示的那樣都是支持的。A2DeltaLakeHiveSELECT/INSERT,沒有支持DELETE等SQL操作,只能用DeltaLake自己的Scala或PythonAPI。如果使用的是Spark3.0MERGE、DELTE等都支持SQLAQI,可以直接用SQL開發(fā)。但是某些管理操作比如VACCUM沒有對(duì)應(yīng)的SQLAPI,還是要用DeltaLake自己的Scala或Python DeltaLakeDeltaLakeDeltaLake DataLakeDelta、Hudi、DataLake的數(shù)據(jù)存儲(chǔ)中間層,其數(shù)據(jù)管理的功能均是基于一系列的meta文件。meta文件的角色類似于數(shù)據(jù)庫的catalog/walschema管理、事務(wù)管理和數(shù)據(jù)作者:EMRApacheHadoop,ApacheSparkcontributorHadoop、Spark、Hive、Druid等大數(shù)據(jù)組件有深入研究。目前從事大數(shù)據(jù)云化DataLake的數(shù)據(jù)存儲(chǔ)中間層,其數(shù)據(jù)管理的功能均是基于一系列的meta文件。meta文件的角色類似于數(shù)據(jù)庫的catalog/wal,起到schema管理、事務(wù)管理和數(shù)據(jù)管理的功能。與數(shù)據(jù)庫不同的是,這些meta文件是與數(shù)據(jù)文件一起存放在存儲(chǔ)引擎中的,用戶可以直接meta目錄,表就被破壞了,想要恢復(fù)難度非常大。MetaschemaSchemaSchema演化的支持。Metatransactionlog的功能(需要文件系統(tǒng)有原子性和一致性的支Hudi。Hudi的設(shè)計(jì)目標(biāo)正如其名,HadoopUpsertsDeletesandIncrementals(HadoopUpsertsanDIncrementals),強(qiáng)調(diào)了其主要支持Upserts、DeletesIncremental數(shù)據(jù)處理,其主要提供的寫入工具是SparkHudiDataSourceAPI和自身提供的DeltaStreamer,均支持三種數(shù)據(jù)寫入方式:UPSERT,INSERT和BULK_INSERT。其對(duì)Deletedelete其典型用法是將上游數(shù)據(jù)通過Kafka或者Sqoop,經(jīng)由DeltaStreamer寫入Hudi。DeltaStreamer是一個(gè)常駐服務(wù),不斷地從上游拉取數(shù)據(jù),并寫入hudi。寫入是分批次的,并且可以設(shè)置批次之間的調(diào)度間隔。默認(rèn)間隔為0,類似于SparkStreaming的As-soon-as-possible策略。隨著數(shù)據(jù)不斷寫入,會(huì)有小文件產(chǎn)生。對(duì)于這些小文件,DeltaStreamer可以自在查詢方面,HudiHive、Spark、Presto。在性能方面,Hudi設(shè)計(jì)了HoodieKey,一個(gè)類似于主鍵的東西。HoodieKeyMin/Max統(tǒng)計(jì),BloomFilterRecord所在的文件。在具體做Upserts時(shí),如果HoodieKey不存在于BloomFilter,則執(zhí)行插入,否則,確認(rèn)HoodieKey是否真正存在,如果真正存在,則執(zhí)行update。對(duì)于查詢性能,一般需求是根據(jù)查詢datasource。Hudi這方面沒怎么做工作,其性能完全基于引擎自帶的謂詞下推和partitionprune功能。HudiCopyOnWrite和MergeOnRead。前者在寫入時(shí)做數(shù)據(jù)的merge,寫入性能略差,但是讀性能更高一些。后者讀的時(shí)候做merge,讀性能查,但是寫入數(shù)據(jù)會(huì)比較最后,Hudirun_sync_toolschemaHive表。Hudi還提供Hudi表。Iceberg沒有類似的HoodieKey設(shè)計(jì),其不強(qiáng)調(diào)主鍵。上文已經(jīng)說到,沒有主鍵,做update/delete/merge等操作就要通過Join來實(shí)現(xiàn),而Join需要有一個(gè)類似SQL的執(zhí)行引IcebergIcebergupdate/delete/mergeupdatepartition需要更新,然后通過overwrite的方式重寫數(shù)據(jù)。IcebergquickstartSpark的接口均只SparkdataframeAPIIceberg寫數(shù)據(jù)的方式,沒有提及別的數(shù)據(jù)攝入方法。至SparkStreamingStreamWriteSupport,應(yīng)該是支持流式Icebergready,在查詢方面,IcebergSpark、Presto。IcebergIceberghiddenpartition功能。Hiddenpartition意思是說,對(duì)于用戶輸入的數(shù)據(jù),用戶可以選取其中某些列做適當(dāng)?shù)淖儞Q(Transform)partitionpartitionschema中。例如,用戶有timestamphour(timestamp)生成一個(gè)timestamp_hour的新分區(qū)列。timestamp_hour對(duì)用戶不可見,僅僅用于組織數(shù)據(jù)。Partition列partition列的統(tǒng)計(jì),如該partitionpartition的統(tǒng)partitionprune。hiddenpartition,Iceberg也對(duì)普通的column列做了信息收集。這些統(tǒng)計(jì)信息非常全,包sizevaluecount,nullvaluecount,以及列的最大最小值等等。這些信息都可以IcebergAPIAPI指定表明、schema、partitionHivecatalogDeltaDeltaDataLakeupdate/delete/merge。由于出自Databricks,spark的所有數(shù)據(jù)寫入方式,包括基于dataframeSQLInsert、InsertOverwrite等都是支持的(SQL寫暫不支持,EMR做了支持)Iceberg類似,Deltaupdate/delete/merge的實(shí)現(xiàn)均是基于sparkjoin功能。在數(shù)據(jù)寫入方面,DeltaSparkHudi是不同的:HudiSpark(SparkHudi自己的寫入工具寫入)。DeltaSparkPresto,但是,SparkdeltalogSparkPrestoDeltaSpark作業(yè)。更為蛋疼的是,Presto查詢是基于SymlinkTextInputFormat。在查詢之前,要運(yùn)行Spark作業(yè)生成這么個(gè)Symlink文件。如果表數(shù)據(jù)是實(shí)時(shí)更新的,意味著每次在查詢之前先要跑一個(gè)SparkSQLPrestoSparkSQL里搞定呢?這是一個(gè)非常蛋疼的設(shè)計(jì)。為此,EMR在這方面做了改進(jìn),支持了DeltaInputFormat,用戶可以直接使用Presto查詢Delta數(shù)據(jù),而不必事先啟動(dòng)一個(gè)Spark任務(wù)。Delta幾乎沒有任何優(yōu)化。Iceberghiddenpartition且不說,普通的column的統(tǒng)計(jì)信息也沒有。DatabricksDataSkipping技術(shù)做了保留。不得不說這對(duì)于推廣Delta來說不是件好事。EMR團(tuán)隊(duì)在這方面正在做一些工作,希望能彌補(bǔ)這方面DeltamergeHudiIcebergDelta一無是處了呢?其實(shí)不然。DeltaSpark的整合能力(雖然目前仍不是很完善,但Spark-3.0之后會(huì)好很多),尤其是其流批一體的設(shè)計(jì),配合multi-hopdatapipeline,可以 DeltaLakeDeltaLakeDeltaLake 支持分析、Machinelearning、CDC等多種場(chǎng)景。使用靈活、場(chǎng)景支持完善是它相比Hudi和Iceberg的最大優(yōu)點(diǎn)。另外,Delta號(hào)稱是Lambda架構(gòu)、Kappa架構(gòu)的改進(jìn)版,無需關(guān)心流HudiIceberg是力所不及的。通過上面的分析能夠看到,三個(gè)引擎的初衷場(chǎng)景并不完全相同,Hudi為了incremental的upsertsIcebergDelta定位于流批一體的數(shù)據(jù)處理。HudiHDFS,HDFS,rge/UpdateStreamingYes(notParquet,DataMax-Minstats+Z-Ordering MinFilteringDataMergeonFileI/ODeltaLakeEMRDeltaLakesqoopsqoopimport--hive-import--hive-overwrite--connectjdbc:mysql://<mysqlurl>--table<mysqltable>--hive-table<table_base>--hive-partition-key<parcolumn>insertinsertoverwritetablerow_number()over(partitionbyt.<primary_key_column>orderbyrecord_iddesc,after_flagdesc)asrow_number,record_id,operation_flag,<col1>,<col2>,<colN>incr.record_id,incr.operation_flag,incr.after_flag,incr.<col1>,<table_log>incrutc_timestamp<<timestamp>unionallselect0asrecord_id,'I'asoperation_flag,'Y'asafter_flag,base.<col1>,<table_base>base)t)gtwhererecord_num=1而應(yīng)用DeltaLake只需要一個(gè)streamingsqlCREATECREATESCAN<SCAN_TABLE>on<STREAM>stream;CREATESTREAMjob)MERGEINTO<CDC_TABLE>astargetUSING(from_unixtime(<col2>,'yyyyMMdd')asCAST(before.idasLONG)asbefore_id,CAST(after.idasLONG)asid,dense_rank()OVER(PARTITIONBYcoalesce(before.id,after.id)ORDERBYrecordIdDESC)asrankFROM(from_json(from_json(CAST(beforeImagesasSTRING),'idSTRING,<coltype1>,ctimestring')asfrom_json(CAST(afterImagesasSTRING),'idSTRING,<coltype1>,ctimestring')asafterFROM(from_avro(value)as(recordID,source,dbTable,recordType,extraTags,fields,beforeImages,afterImages)from)binlogWHERErecordType!='INIT'))binlog_extractWHERE)assourceONtarget.id=source.before_idWHENMATCHEDANDsource.recordType='UPDATE'THENUPDATESET*WHENMATCHEDANDsource.recordType='DELETE'THENDELETEWHENNOTMATCHEDAND(source.recordType='INSERT'ORsource.recordType='UPDATE')THENINSERT*;基于DeltaLake實(shí)時(shí)數(shù)倉方案DeltaLakeDeltaLake是美國(guó)Databricks開源的數(shù)據(jù)湖技術(shù),基于ApacheParquet豐富了數(shù)據(jù)管理功能,如元數(shù)據(jù)管理/事務(wù)/數(shù)據(jù)更新/數(shù)據(jù)版本溯等。使用DeltaLake可以很方便的將流處理和批處理串聯(lián)起來,快速構(gòu)建Near-RealTime的DataPipeline.目前阿里巴巴E-MapReduce(簡(jiǎn)稱“EMR”)團(tuán)隊(duì)對(duì)DeltaLake做了很多功能和性能上的優(yōu)化,\h自研SparkStreamingSQL,支持DeltaLake的相關(guān)DMLHive&PrestoOnDeltaDeltaLakeOnOSS(阿里云對(duì)象存儲(chǔ)DeltaLakeDataSkipping&ZorderSparkStreaming阿里巴巴EMR團(tuán)隊(duì)在StructStreaming基礎(chǔ)上自研了SparkStreamingSQL,用戶可以很方便的使用SQL來寫流式作業(yè)的邏輯,大大降低了開發(fā)門檻,詳見\hSparkStreamingSQL官方文檔。CREATETABLE/CREATESCAN/CREATSTREAM/INSERTINTO/MERGESELECT/WHERE/GROUPBY/JOIN/UNIONHiveUDF并且支持Kafka的ExactlyDeltaLake基于DeltaLake+SparkStreamingSQL可以快速構(gòu)建實(shí)時(shí)數(shù)倉的pipelineODS?DW業(yè)務(wù)邏輯復(fù)雜,數(shù)據(jù)可能<頻繁>變化,寫入DeltaLake。實(shí)踐上看,直接寫入Kafka是最容易的方案,但是靈活性很低,歷史數(shù)據(jù)無法追溯,也無法修改。DW層通過引入DeltaLake,可以實(shí)現(xiàn)DM PAGE53 DeltaLakeDeltaLakeDeltaLake 備注EMR團(tuán)隊(duì)提供了流式MergeInto功能,可以通過寫SparkStreamingSQL的方式來做CDC\h新增串行autocompaction使用Adaptive據(jù),批只是有邊界的流。針對(duì)高階的SQLAPI,流批都有很大的區(qū)別?;贒eltaLake的分區(qū)表,DeltaLake實(shí)時(shí)數(shù)倉在核桃編程部分?jǐn)?shù)據(jù)倉庫生產(chǎn)環(huán)境上線后,部分業(yè)務(wù)統(tǒng)計(jì)指標(biāo)已基于新架構(gòu)產(chǎn)倉各層的結(jié)構(gòu),推進(jìn)全面應(yīng)用基于DeltaLake的實(shí)時(shí)數(shù)倉建設(shè)??煽俊⒁讛U(kuò)展的DataPipeline PAGE55 DeltaLakeDeltaDeltaLake 54 “臟數(shù)據(jù)”走開:SchemaSchema演變SchemaSchema演變相互補(bǔ)益,合理地結(jié)合起來使用將能方便地管理好數(shù)據(jù),避schema管理的問題,隨著業(yè)務(wù)問題和需求的不斷演進(jìn),數(shù)據(jù)結(jié)構(gòu)也會(huì)不斷發(fā)生變化。schemaSchema約束(SchemaEnforcement)Schema演變(SchemaEvolution),前者用以防止用戶臟數(shù)據(jù)意外污染表,后者用以自動(dòng)添加適當(dāng)?shù)男聰?shù)據(jù)ApacheSparkDataFrameschema,用來定義數(shù)據(jù)的形態(tài),例如數(shù)據(jù)類DeltaLakeschemaJSONSchemaSchema約束(SchemaEnforcement),SchemaValidationDeltaLake中的一SchemaDeltaLakeschemaschema做兼容schema不兼容,DeltaLake將會(huì)撤銷這次事務(wù)(沒有任何數(shù)據(jù)寫入),并且返DeltaLakeDataFramenull。的數(shù)據(jù)類型為Integer,Schema約束將會(huì)返異常,防止該次寫入生效。Spark可以支持大小寫敏感和不敏感(默認(rèn)為大小寫不敏感)兩種不同的模式,DeltaLake保留大小寫,但在schema存儲(chǔ)上大小寫不敏感。Parquet在存儲(chǔ)和返列信息上面是大小寫敏感的,因此為了防止?jié)撛诘腻e(cuò)誤、數(shù)據(jù)污染和丟失的問題,DeltaLake引入了這個(gè)限制。DeltaLake##GenerateaDataFrameofloansthatwe'llappendtoourDeltaLakeloans=SELECTaddr_state,CAST(rand(10)*countasbigint)AScount,CAST(rand(10)*10000*countASdouble)ASamountFROMloan_by_state_delta#ShoworiginalDataFrame'sschema|--addr_state:string(nullable=|--count:integer(nullable=#ShownewDataFrame'sschema PAGE57 DeltaLakeDeltaLakeDeltaLake |--addr_state:string(nullable=|--count:integer(nullable=|--amount:double(nullable=true)#new#AttempttoappendnewDataFrame(withnewcolumn)toexistingtableloans.write.format("delta")\.mode("append")"""AschemamismatchdetectedwhenwritingtotheDeltaToenableschemamigration,pleaseset:'.option("mergeSchema",Table--addr_state:string(nullable=--count:long(nullable=Data--addr_state:string(nullable=--count:long(nullable=--amount:double(nullable=IfIfTableACLsareenabled,theseoptionswillbeignored.PleaseusetheALTERTABLEcommandforchangingtheschema.不同于自動(dòng)添加新的列,DeltaLakeschema約束并阻止了這次寫入生效。并且為了幫助定位是哪個(gè)列造成了不匹配,Sparkschema作為對(duì)照。SchemaSchema約束是一種嚴(yán)格的校驗(yàn),因此可以用于已清洗、轉(zhuǎn)化完成的數(shù)據(jù),保證數(shù)據(jù)不受污BIschema?\hProductionizingMachineLearningWithDeltaLake.當(dāng)然,Schema約束可以用在整個(gè)工作流程的任意地方,不過需要注意的是,有可能因?yàn)橹T如不經(jīng)schemaDeltaLake。為什么不直接讓schema接受改變,這樣我們就能任意寫入DataFrame了。schema進(jìn)行強(qiáng)制約束,數(shù)據(jù)類型兼容性的問題將會(huì)很Schema約束就能夠做到,將這類錯(cuò)誤顯式地返進(jìn)行恰當(dāng)?shù)奶幚恚皇亲屗鼭摲跀?shù)據(jù)中,看Schemaschema不會(huì)發(fā)生改變,除非你確切地執(zhí)行了更改操作。它能有效的防止處。Schema約束的設(shè)計(jì)初衷就是通過設(shè)定嚴(yán)格的要求來保證質(zhì)量,確保表數(shù)據(jù)不受污染。Schema PAGE59 DeltaLakeDeltaLakeDeltaLake SchemaSchema演變(SchemaEvolution)schema,來適應(yīng)不斷變Schema.option('mergeSchema','true')。#AddthemergeSchemaoptionloans.write.format("delta")#AddthemergeSchemaoptionloans.write.format("delta")\.option("mergeSchema","true").mode("append")SparkSQL##CreateaplotwiththenewcolumntoconfirmthewritewasSELECTaddr_state,sum(`amount`)ASamountFROMloan_by_state_deltaGROUPBYaddr_stateORDERBYsum(`amount`)DESCLIMIT10spark.databricks.delta.schema.autoMergeTrueSpark配置文SparksessionSchema約束schema不匹配問題進(jìn)行報(bào)警提示。SchemaSchema.option("overwriteSchema"true選schemainteger的列“Foo”,而新的schemastringParquet數(shù)據(jù)文件都需要覆蓋重寫。包括以下步驟:Spark3.0DDL(ALTERTABLE方式)schemaSchemaSchemaschema的改動(dòng)。通過設(shè)置嚴(yán)格的限制,數(shù)另一方面,schemaschemaschema變更能夠自動(dòng)SchemaSchema演變相互補(bǔ)益,合理地結(jié)合起來使用將能方便地管理好數(shù)據(jù),避免臟數(shù)\h(Transactionlog)DeltaLakeDeltaLake的重要ACID事務(wù)性、可擴(kuò)展元數(shù)據(jù)處理、時(shí)間溯等等。本文將探EMR高級(jí)開發(fā)工程師,目前從事大數(shù)據(jù)存儲(chǔ)方面的開發(fā)和事務(wù)日志(Transactionlog)DeltaLakeDeltaLake的重要特性都是基于事務(wù)日志實(shí)現(xiàn)的,包括ACID事務(wù)性、可擴(kuò)展元數(shù)據(jù)處理、時(shí)間溯等等。本文將探討什么DeltaLake的事務(wù)日志(DeltaLog)DeltaLake表從生成DeltaLakeApacheSpark構(gòu)建,用來支持多用戶同時(shí)讀寫同一數(shù)據(jù)表。事務(wù)日志作為單一DeltaLake的表,或者對(duì)一張已打開的表提交新的查詢但表中的數(shù)據(jù)在上一次訪問之后已發(fā)生變化時(shí),Spark將會(huì)檢查事務(wù)日志來確定該表經(jīng)歷了哪些事務(wù)操作,并將更新結(jié)果反DeltaLakeDeltaLakePB級(jí)的數(shù)據(jù)。每當(dāng)用戶提交一個(gè)修改表的操作時(shí)(INSERTUPDATEDELETE),DeltaLake將該操作SparkstructuredstreamingmicrobatchID例如,假設(shè)用戶創(chuàng)建了一個(gè)事務(wù),往表中新增一列,并且添加一些數(shù)據(jù)。DeltaLake會(huì)將該事務(wù)分schemaDeltaLake_delta_log子目錄下自動(dòng)創(chuàng)建該表的事務(wù)日志。后續(xù)對(duì)JSON文件,序號(hào)從000000.json開始。之后的修改操作都將生成遞增的文件序號(hào),例如000001.json、件(3.parquet),000001.json,如下圖所1.parquet2.parquetDeltaLake表中的數(shù)據(jù),對(duì)它們的添加刪除操作仍DeltaLake仍會(huì)保留這些原子提交,來保證當(dāng)我們需要對(duì)事件進(jìn)行審計(jì),或者進(jìn)行時(shí)間溯查詢表另外,Spark也不會(huì)從磁盤上刪除這些文件,即使我們執(zhí)行了刪除了底層的數(shù)據(jù)文件的操作。用戶VACUMM命令顯示地刪除不再需要的文件。10個(gè)提交,DeltaLake_delta_logParquetcheckpointParquetSpark讀取也比較友好和高效。換句話說,checkpointSpark提供了一種捷徑來重構(gòu)表狀態(tài),避免低效JSON格式的小文件。為了同步提交進(jìn)度,SparklistFromcheckpointcheckpointJSON000012.json。為了包含這些新的事務(wù)并且更新我們的表狀態(tài),SparklistFromverion7操作來查看新的SparkcheckpointJSONcheckpointcommit#10之前的所有表狀態(tài)?,F(xiàn)在,Spark只需增量執(zhí)行0000011.json0000012.json,來構(gòu)建表的當(dāng)前狀態(tài),然后將版本12緩存在內(nèi)存中。通過這樣的流程,DeltaLakeSpark來高效地維護(hù)任意時(shí)刻的表狀態(tài)。DeltaLake處理并發(fā)讀寫DeltaLakeApacheSpark實(shí)現(xiàn)的,多個(gè)用戶同時(shí)修改一個(gè)表完全是一種非常常見的場(chǎng)景,DeltaLake使用了樂觀鎖來解決這個(gè)問題。樂觀并發(fā)控制(又名“樂觀鎖”,OptimisticConcurrencyControl,縮寫“OCC”)是一種并發(fā)PB級(jí)大數(shù)據(jù)時(shí),有很大概率不同用戶處理的是數(shù)據(jù)的不同部分,樂觀鎖使得各事務(wù)能夠在不產(chǎn)生10,000塊Lake有一套自己的協(xié)議來處理這個(gè)問題。ACID事務(wù)性,DeltaLakecommit如何排序(也就是數(shù)據(jù)庫領(lǐng)域串serializability的概念)commit。DeltaLake通DeltaLakeACID的隔離性DeltaLake如何處理沖突,假設(shè)兩個(gè)用戶從同一個(gè)表中讀取數(shù)據(jù),然后同時(shí)去嘗DeltaLake在進(jìn)行修改之前記錄表的初始版本(version0)User12commit可以被接受000001.json。DeltaLake000001.json,這里假設(shè)User1commitUser2被拒絕。DeltaLake將會(huì)樂觀處理此沖突,而不是直接給User2返異常。檢查是否有新的User2commit(DeltaLake無法通過這種重試成功完成(例如User1和2都同時(shí)刪除同一個(gè)文件),在這種情況下將會(huì)返異常給用DeltaLakeACID的持時(shí)間commit所決定的,事務(wù)日志相當(dāng)于提供了每一步 DeltaLakeDeltaLakeDeltaLake commit,來重構(gòu)出表在任意時(shí)間點(diǎn)的狀態(tài)。這個(gè)強(qiáng)大的功能就是時(shí)間溯,或者叫做數(shù)據(jù)版本控制。更多關(guān)于時(shí)間溯的說明,可以參考\hIntroducingDeltaTimeTravelforLargeScaleDataLakes。DeltaLake表的所有修改,因此它能提供可信的數(shù)據(jù)血緣,這對(duì)治理、審計(jì)和合規(guī)目的很有用處。它也可以用來跟蹤一些無意或者有錯(cuò)誤的修改,從而能夠退到期望的版本。用戶可以執(zhí)行DESCRIBEHISTORY來查看指定修改附近的元數(shù)據(jù)。DeltaLakeDeltaLakeDeltaLakecheckpoint構(gòu)建當(dāng)前狀態(tài),以及怎樣處理小ApacheSparkDeltaLakecommit\hJupyterNotebookDeltaLake入DatabricksCommunityEdition構(gòu)建,雖然教程中使用的DeltaLake版本所具備的,但是考慮到國(guó)內(nèi)的網(wǎng)絡(luò)環(huán)境,注冊(cè)和使用DatabricksCommunityEditionJupiterNotebook重新構(gòu)作者:2008年加入阿里巴巴集團(tuán),先后在B2B和阿里\hDeltaLake官方教程DatabricksCommunityEditionDeltaLake版本所具備的,但是考慮到國(guó)內(nèi)DatabricksCommunityEdition門檻較高。所以本文嘗試基于開源的JupiterNotebook重新構(gòu)建這個(gè)教程。SparkWindowsMacOS上找到,理論上來說也完全可以anacondaminicondaconda來構(gòu)建開發(fā)環(huán)境,可以非常方便的pysparkjupyternotebook。condacondacreate--namesparkcondaactivatesparkcondainstallcondainstall-cconda-forge我們?cè)谠O(shè)置一些環(huán)境變量之后,就可以使用pysparkjupyternotebook DeltaLakeDeltaLakeDeltaLake exportexportSPARK_HOME=$HOME/miniconda3/envs/spark/lib/python3.7/site-exportexportpyspark--packagesio.delta:delta-啟動(dòng)服務(wù)(DeltaLakepackagepyspark--packagesio.delta:delta-notebookparquetparquetrm-frmkdir-pwget-O/tmp/delta_demo/loans/SAISEU19-loan-risks.snappy.parquet/rs/094-YMS-629/images/SAISEU19-loan-ls-alDeltaLakeDeltaLakeDeltaLakeparquet文件,parquetDeltaLake表:importosimportimportosimportshutilfrompyspark.sql.functionsimport*delta_path="/tmp/delta_demo/loans_delta"#Deleteanewdeltatablewiththeparquetfileifos.path.exists(delta_path):print("Deletingpath"+delta_path)#Createanewdeltatablewiththeparquetfilespark.read.format("parquet").load("/tmp/delta_demo/loans")\print("CreatedaDeltatableat"+delta_path) DeltaLakeDeltaLakeDeltaLake #Createaviewonthetablecalledloans_delta#Createaviewonthetablecalledloans_deltaprint("Definedview'loans_delta'")spark.sql("selectcount(*)fromloans_delta").show()Definedview'loans_delta' | importfrompyspark.sql.functionsimport*frompyspark.sql.typesimport*defreturn"/tmp/delta_demo/chkpt/%s"%str(random.randint(0,importfrompyspark.sql.functionsimport*frompyspark.sql.typesimport*defreturn"/tmp/delta_demo/chkpt/%s"%str(random.randint(0,10000))#User-definedfunctiontogeneraterandomstatestates=["CA","TX","NY",defrandom_state():return#Generateastreamofrandomlygeneratedloaddataandappendtothedeltatabledefgenerate_and_append_data_stream_fixed(table_format,table_path):stream_data=spark.readStream.format("rate").option("rowsPerSecond",50).load().withColumn("loan_id",10000+col("value")).withColumn("funded_amnt",(rand()*5000+5000).cast("integer")).withColumn("paid_amnt",col("funded_amnt")-(rand()*2000)).withColumn("addr_state",random_state()).select("loan_id","funded_amnt","paid_amnt","addr_state")#***********FIXEDTHESCHEMAOFTHEGENERATEDDATA*************query=stream_data.writeStream.format(table_format).trigger(processingTime="10seconds")returnqueryDeltaLake深度解 stream_query_1stream_query_1=generate_and_append_data_stream_fixed(table_format="delta",table_path=delta_path)stream_query_2=generate_and_append_data_stream_fixed(table_format="delta",table_path=delta_path)DeltaLake的樂觀鎖機(jī)制,多個(gè)流可以同時(shí)寫入一張表,并保證數(shù)據(jù)的完整性。spark.sql("selectspark.sql("selectcount(*)from| DeltaLake##Functiontostopallstreamingqueriesdefstop_all_streams():#Stopallthestreamsprint("Stoppingallstreams")forsinprint("Stoppedallstreams")print("Deletingcheckpoints")shutil.rmtree("/tmp/delta_demo/chkpt/",True)print("Deletedcheckpoints") DeltaLakeDeltaLake支持SchemaSQL會(huì)新增加一些數(shù)據(jù),同時(shí)這些數(shù)據(jù)比之前的多了一個(gè)“closed”DF配置參數(shù)mergeSchematrueDeltaLakeSchema的演化:colscols=['loan_id','funded_amnt','paid_amnt','addr_state',items=(1111111,1000,1000.0,'TX',(2222222,2000,0.0,'CA',loan_updates=spark.createDataFrame(items,cols).mode("append").option("mergeSchema","true") 1000|CA| 1000|WA| 1000|TX| 1000|OK| 1000|PA| CA| 1000|MD| 1000|OH| 1000|TX| 1000|CT| 1000|NJ| 1000|NY|DeltaLake深度解 1000|FL| 1000|NJ| TX| 1000|OH| 1000|MI| 1000|MI| 1000|CA| 1000|CA|onlyshowingtop20closed 1000|TX|DeltaLake除了常規(guī)的插入操作,DeltaLakeupdatedelete等功能,可以更新表格內(nèi)容。下面展 DeltaLakespark.sql("SELECTspark.sql("SELECTCOUNT(*)FROMloans_deltaWHEREfunded_amnt deletefromfromdelta.tablesimportdeltaTable=DeltaTable.forPath(spark,delta_path)deltaTable.delete("funded_amnt=paid_amnt")spark.sql("SELECTspark.sql("SELECTCOUNT(*)FROMloans_deltaWHEREfunded_amnt DeltaLake深度解 版本歷史和DeltaLake還具有很強(qiáng)大歷史版本記錄和溯功能。history()方法清晰的展示了剛才那張表的修改Delete操作。 operation| 10|2020-02-2222:14:06| DELETE|[predicate-> 9|2020-02-2222:13:57|WRITE|[mode->Append, 8|2020-02-2222:13:52|null|STREAMINGUPDATE|[outputMode- 7|2020-02-2222:13:50| null|STREAMINGUPDATE|[outputMode- 6|2020-02-2222:13:42| null|STREAMINGUPDATE|[outputMode- 5|2020-02-2222:13:40| null|STREAMINGUPDATE|[outputMode- 4|2020-02-2222:13:32| null|STREAMINGUPDATE|[outputMode- 3|2020-02-2222:13:30| null|STREAMINGUPDATE|[outputMode- >SparkSQL2|2020-02-2222:13:22|1|2020-02-2222:13:20|null|STREAMINGUPDATE|[outputMode->null|STREAMINGUPDATE|[outputMode->Ap...|null|0|2020-02-2222:13:18|WRITE|[mode-> previousVersionpreviousVersion=deltaTable.history(1).select("version").collect()[0][0]-.option("versionAsOf",previousVersion).load(delta_path)spark.sql("SELECTCOUNT(*)FROMloans_delta_pre_deleteWHEREfunded_amnt jupyternotebookDeltaLake的官方教程,你可以在原文鏈接末尾下載到完整的notebook文件。SparkSQL >SparkSQLSparkSQLSparkSQL ApacheSpark3.0中的SQLApacheSpark3.0中的SQLApacheSpark3.0中的SQL性能改進(jìn)概覽的介紹。以下由Spark+AISummit中文精華版峰會(huì)的精彩內(nèi)容整理。今天主要跟大家分享一下spark3.0在SQL方向上的一些優(yōu)化工作。從spark2.4開始,大概有超過這種功能增強(qiáng),性能優(yōu)化,等各方面的新的feature在里面。大概超過50%的相關(guān)的issue都是和基于spark的一個(gè)開發(fā)者怎么去和spark交互,提供一些更多的工具。第二個(gè)是dynamicSpark3.0是一個(gè)時(shí)間跨度非常長(zhǎng)的release,包含了非常多的社區(qū)的工作。統(tǒng)計(jì)下來有接近3400多個(gè)issue在spark3.0里面進(jìn)行了處理。針對(duì)這么多的issue,我們用spark3.0的時(shí)候,需要考慮有哪第一部分是newexplainformat。當(dāng)我們想去改進(jìn),去優(yōu)化一個(gè)sparkSQL的性能的時(shí)候,首先需對(duì)于之前2.4的版本,可以通過explainSQL去展示。只不過是這種展示的方式看起來繁雜一點(diǎn)。我以找到對(duì)應(yīng)的更詳細(xì)的信息。而且對(duì)于每一個(gè)節(jié)點(diǎn)展示的信息也做了一些歸類和整理,整理成第二部分是alltypeofjoinhintsspark2.4只支持broadcastspark3.0除了支持broadcast,還支持sortmerge,shufflehash和cartesian第三部分是adaptivequeryexecution。社區(qū)為什么要去做它,最主要的原因就是說,對(duì)于一些查怎么去動(dòng)態(tài)的調(diào)整reducer的數(shù)量。在spark2.4,默認(rèn)指定partition數(shù)量,每一個(gè)partition經(jīng)過在spark3.0中,在shuffle的時(shí)候,每一個(gè)partition有不同的數(shù)據(jù)量大小,需要把小的partition數(shù)針對(duì)有數(shù)據(jù)傾斜的這種join,在spark2.4中帶來的主要的問題就是說,在處理最大的partition時(shí),在spark3.0中,有數(shù)據(jù)傾斜的join,比在spark2.4中更快。如圖所示,對(duì)于表A和表B,我把大表第四部分是dynamicpartitioningpruning。在join操作中,要避免讀取不必要的partition。而dynamicfilter能夠避免讀取不必要的partition。而在spark3.0中,通過pushdownwithdynamicfilter如下圖所示,是一個(gè)dynamicpartitioningpruning第五部分是Enhancednestedcolumnpruning&pushdown,是針對(duì)于這種嵌套的數(shù)據(jù)結(jié)構(gòu)的支持。在spark2.4里面,其實(shí)已經(jīng)提供了部分的這種支持。如下圖所示的表里面,有column1和column2,而后者是一個(gè)嵌套的數(shù)據(jù)結(jié)構(gòu),它里面有兩個(gè)字段。比如說,我查詢的時(shí)候只查了column2里面的第1個(gè)字段。去訪問這個(gè)數(shù)據(jù)的時(shí)候,我只需要把column2的第1個(gè)字段拿出來就行了,而不需要把整個(gè)column2都拿出來。但是在spark2.4里面它的支持是有限的。就是說,只而在spark3.0里面,對(duì)這一塊進(jìn)行了進(jìn)一步的優(yōu)化,能夠支持把columnpruning推到穿透所有的把過濾條件也推到tablescan里面。在spark2.4里面也是不能夠完全支持的。而在spark3.0里面,針對(duì)嵌套字段的filter,也是一直可以往下推到具體訪問數(shù)據(jù)的tablescan里第六部分是Improvedaggregationcodegeneration,針對(duì)aggregation8000Javabytecode,HotSpot編譯器就rollback,放棄生成nativecode。所以,如果你的這種在spark3.0里面,針對(duì)這種情況做一些優(yōu)化。簡(jiǎn)單來說,把一個(gè)方法拆分成多個(gè)方法,從而避免碰到8000Javabytecode的限制。 >SparkSQLSparkSQLSparkSQL 第七部分是NewScalaandJava,針對(duì)新的語言版本的支持。支持了新的Java11這個(gè)版本,以及Scala2.12版本。StructuredStreaming生產(chǎn)化實(shí)踐及Databricks軟件工程師李元健為大家?guī)韘tructuredstreaming生產(chǎn)化實(shí)踐及調(diào)優(yōu)的介紹。內(nèi)容包括輸入?yún)?shù),狀態(tài)參數(shù),輸出參數(shù)的調(diào)優(yōu),以及部署。以下由Spark+AISummit中文精華Sourcebasedstream,F(xiàn)ileSourcebasedstream,等等。我們可以看到,針對(duì)一個(gè)streaming系統(tǒng),無論在哪種模式下,structuredstreaming的運(yùn)行環(huán)境中都會(huì)有兩個(gè)關(guān)鍵因素影響著整體性能。一個(gè)是inputsize,一個(gè)是statesize。這兩個(gè)size完全指SparkSQL性能優(yōu) >SparkSQLSparkSQLSparkSQL nseangnshueszeshue是我們希望每一個(gè)shufflepartitionsize在100~200MB。這個(gè)值是怎么來的,某種程度上,某一括memory,配置進(jìn)行一個(gè)預(yù)估和調(diào)整。第二個(gè),我們需要shufflepartition和core的值是相等屏蔽了shufflespill這樣一個(gè)耗時(shí)操作之后,我們的調(diào)優(yōu)工作就結(jié)束了嗎,其實(shí)并不是。我們還可以但是我們的靜態(tài)DataFrame其實(shí)完全意義上來講是可以做broadcasthashjoin。size,另一個(gè)是statesize。我們需要進(jìn)一步的對(duì)statesize有一個(gè)很好的限制。否則,我們每一個(gè)input,需要在statestore當(dāng)中去做查找,或者說做匹配,做一系列的兩個(gè)batch之間的狀態(tài)互通的這里的state主要包括兩個(gè)部分:第一部分是StateStorebackedoperations,第二部分是DeltaLaketableorexternalsystem。狀態(tài)參數(shù)的重要性如下圖所示。因?yàn)槊恳粋€(gè)ach都需要對(duì)sae當(dāng)中的操作進(jìn)行查詢以及按需更新的步驟。無限增長(zhǎng)的sae肯定會(huì)讓你的作業(yè)越跑越慢,并且到最后消費(fèi)者跟不上生產(chǎn)者。與此同shuesutfey下一個(gè)demo中,我們主要會(huì)給大家演示的是這兩類的statestore。第一種與operation強(qiáng)相關(guān)。系統(tǒng)考慮多久之前的數(shù)據(jù)就算過期了,我們就可以不用再考慮。另外一個(gè)就是哪一種statestore我個(gè)例子著重于第3部分,Aggregatesalesperitemperhour。接下來我們看一下輸出參數(shù),相對(duì)于前幾種場(chǎng)景比較特殊。在上文中提到的inputstate對(duì)structuredstreaming的影響的二維象限里面,其實(shí)并沒有output。Structuredstreaming框架本身并不會(huì)受到輸其實(shí)商業(yè)系統(tǒng)當(dāng)中對(duì)于輸出參數(shù)是有一系列的優(yōu)化的。比方說,DeltaLake系統(tǒng)當(dāng)中對(duì)于output有一個(gè)Auto-Optimize的功能,它是默認(rèn)打開的一個(gè)feature。

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論