




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)
文檔簡介
1、大數(shù)據(jù)技術(shù)基礎(chǔ)培訓(xùn)1Map/Reduce 技術(shù)培訓(xùn)2MapReduce描述并行處理大數(shù)據(jù)的分布式框架適合對海量數(shù)據(jù)進(jìn)行分析和處理框架提供并行化機(jī)制框架提供節(jié)點失效處理機(jī)制框架提供狀態(tài)監(jiān)控機(jī)制使用Java編寫34典型集群部署5MapReduce框架流程InputFormat 階段InputFormat決定輸入數(shù)據(jù)如何被切分供Map任務(wù)使用InputFormat將輸入數(shù)據(jù)劃分成一系列的InputSplit每個Map任務(wù)處理一個InputSplitInputSplit還包含存放這個數(shù)據(jù)塊的機(jī)器列表提供RecordReader讀取InputSplit,并且構(gòu)造key-value對傳遞給Map任務(wù)控制數(shù)
2、據(jù)如何被解壓縮將數(shù)據(jù)轉(zhuǎn)換成MapReduce能夠處理的Java類型6Map階段Map任務(wù)可以獨立地處理數(shù)據(jù)集通常用來對數(shù)據(jù)進(jìn)行過濾、轉(zhuǎn)換處理7Shuffle階段對Map任務(wù)的輸出進(jìn)行Partition、Sort、Spill以及mergeReducer獲取處理過的Map輸出,Merge后進(jìn)行Reduce操作8Reduce階段擴(kuò)展了MapReduceBase類實現(xiàn)了Reducer接口接受來自多個Map任務(wù)的輸出將key/value對按照key進(jìn)行排序Reduce方法通常會遍歷每個key對應(yīng)的所有value9MapReduce理論基礎(chǔ)10MapReduce簡單示例: Word CountWord C
3、ountInput: large number of text documentsTask: count the occurrence of each word across all the document11Input“to be or not to be”“it will be make or break”“to”, 1“be”, 1“or”, 1“not”, 1“to”, 1“be”, 1Map“it”, 1“will”, 1“be”, 1“make”, 1“or”, 1“break”, 1Partition“be”, 1“not”, 1“be”, 1“to”, 1“or”, 1“to
4、”, 1“it”, 1“be”, 1“break”, 1“will”, 1“make”, 1“or”, 1Shuffle“be”, 1“not”, 1“be”, 1“it”, 1“be”, 1“break”, 1“to”, 1“or”, 1“to”, 1“will”, 1“make”, 1“or”, 1Sort“be”, 1“be”, 1“be”, 1“break”, 1“it”, 1“not”, 1“make”, 1“or”, 1“or”, 1“to”, 1“to”, 1“will”, 1Reduce“be”, 3“break”, 1“it”, 1“not”, 1“make”, 1“or”,
5、 2“to”, 2“will”, 1并行化執(zhí)行12JobTracker & TaskTracker13JobTrackerMapReduce框架中的任務(wù)調(diào)度器,類似于”Master”的角色資源管理管理TaskTracker為任務(wù)分配可用的資源(Task Slot)任務(wù)生命周期管理任務(wù)提交分配Task并執(zhí)行處理失敗的Task任務(wù)完成通過http:/:50030監(jiān)控任務(wù)執(zhí)行狀態(tài)可以查看Map Task和Reduce Task執(zhí)行狀態(tài)可以查看Task的stdout和stderr的日志信息14TaskTracker是一個Daemon進(jìn)程與JobTracker進(jìn)行通信接受任務(wù)分配更新Task狀態(tài)管理本地
6、單獨Task的執(zhí)行,包括Map任務(wù)和Reduce任務(wù)TaskTracker可以配置Map Slot和Reduce Slot的數(shù)量每個Task占用一個Slot每個Task是一個獨立的進(jìn)程TaskTracker啟動Task并且監(jiān)控Task狀態(tài)15任務(wù)提交和執(zhí)行過程16NameNodeJobTrackerDataNodeServerTaskTrackerTaskTaskTaskClientSubmitjobconfStagingJob related info including jarLibStreaming libSend config and jarsRequest location of d
7、ataObtain location of dataSubmit jobSend task to TaskTrackerRequest for job related infoSend job related infoRun task Report about task123456789MapReduce失效處理TaskTracker失效通過心跳機(jī)制,可以被檢測到失效需要重新執(zhí)行已完成的、正在執(zhí)行的Map任務(wù)需要重新執(zhí)行正在執(zhí)行的Reduce任務(wù)通過JobTracker實現(xiàn)失效處理JobTracker失效如果配置了高可用性,未完成的任務(wù)會在備份JobTracker啟動之后重新提交17MapRe
8、duce編程18MapReduce編程模型MapReduce使用Java編寫輸入和輸出Key/Value對的集合實現(xiàn)Map和Reduce方法map (in_key, in_value) - list(out_key, intermediate_value)處理鍵值對的輸入生成中間結(jié)果,也是鍵值對reduce (out_key, list(intermediate_value) - list(out_value)將相同Key的中間值合并生成最終結(jié)果(通常每個Key對應(yīng)一個結(jié)果)19map(/ Do some work here.collect( ) );reduce(.collect( / Do
9、 some work here); )MapValueOutputCollectorReporter:OutputCollectorKeyValueIterator (value)ReporterValueKey:OutputCollectorMapOutputCollectorMap() 和 Reduce()例子:WordCountpublic void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException StringTokenizer tokenize
10、r = new StringTokenizer(value.toString();while (tokenizer.hasMoreTokens() word.set(tokenizer.nextToken(); output.collect(word, one); public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException int sum = 0; while (values.hasNext() sum += values.next().g
11、et(); output.collect(key, new IntWritable(sum); 21WritableComparable接口Key Value需要被序列化實現(xiàn)WritableComparable接口WritableComparable extends Writable, ComparableWritable接口write(DataOutput out)read(DataInput in)Comparable接口int compareTo(T o)Key使用compareTo方法來排序22Hadoop DatatypesHadoop提供Java 基本數(shù)據(jù)類型的封裝,并且提供get
12、(), set() 和 toString()方法還提供了hashCode() 和 equals()方法提供的類型包括:ByteWritableIntWritableLongWritableFloatWritableDoubleWritableBooleanWritableNullWritable(不支持set()IntPairWritable23Hadoop Datatypes 實例TextWritable for a Java StringmyTxtWritable.set(stringToInsert);myTxtWritable.toString();ArrayWritable for
13、a Java ArraymyAW.set(Writable)myAW.get() returns the WritablemyAW.toArray() returns a Java ObjectMapWritable for a Java Hash MapmyMapW.put(K,V)myMapW.get(K)BytesWritable for a Java array of bytesmyBW.set(byte, offset, len)myBW.get() returns byte24Input Format25Input SplitInputSplit包含Map任務(wù)需要處理的數(shù)據(jù)信息Jo
14、bClient負(fù)責(zé)計算和劃分Split大部分壓縮文件無法SplitMapReduce能夠自動解壓文件Indexed LZO,BZip2格式的壓縮文件可以被splithadoop jar hadoop-lzo.jar pression.lzo.LzoIndexer yourfile.lzo26MapReduce InputFormatInputFormat控制InputSplit的劃分提供RecordReader,從InputSplit中讀取并構(gòu)造key-value對供Map任務(wù)使用一般會 按照HDFS文件的Block每個InputSplit包含一個HDFS Block64G的文件,blocks
15、ize為64M,默認(rèn)會有1024個InputSlit使用較大的Block Size具有以下好處提高單個Map任務(wù)處理數(shù)據(jù)量減少Split數(shù)量,可以減少總的Map任務(wù)的啟動停止開銷可以通過設(shè)置SplitSize來控制Mapper的數(shù)量max(minSplitSize, min(BlockSize, maxSplitSize)FileInputFormatsetInputPaths(JobConf conf, Path. inputPaths) addInputPaths(JobConf conf, String commaSeparatedPaths) 27小文件處理問題Hadoop用來處理海量
16、文件(百萬),但是小文件意味著更多的文件(千萬級)小文件越多,MapReduce任務(wù)中需要的Mapper數(shù)就越多默認(rèn)每個Mapper會處理文件的一個Block或者整個文件(文件大小小于BlockSize)大量的Mapper需要使用更多的啟動/停止開銷生成的大量的日志小文件大量的小文件會使得HDFS Metadata變大增加Namenode的負(fù)載采用CombineFileInputFormat每個InputSplit包含多個File Block通過設(shè)置SplitSize控制Mapper數(shù)量28三種常用的InputFormatTextInputFormat(默認(rèn))每次處理一行Key是LongWri
17、table, Value是TextKeyValueInputFormat每次處理一行Key和Value之間存在分隔符Key是Text, Value也是Text必須顯式指定分隔符SequenceFileInputFormat使用二進(jìn)制格式存儲的Key/Value對支持壓縮,可以只對Value壓縮、也可以對Block壓縮是Splitable的2930TextInputFormat 實現(xiàn)Now isthe timeYour MapCodek=0v= “Now is”k=7v= “the time”1231TextInputFormat細(xì)節(jié) HDFSNow isthe timeforNode all
18、goodmen toNode K = offset = 64MB-3v=“for all good”32Record ReaderKV KV KV KV KV KV KV KV 64MBRecord ReaderMapper 1Record ReaderMapper 2ignore64MB33KeyValueInputFormatYour input file in HDFSJoeSmith AnnSingh Key: “Ann” value: “Singh”YourMapCode12Key: “Joe” value: “Smith”自定義InputFormat實現(xiàn)InputFormat 接口
19、InputSplits getSplits()RecordReader getRecordReader()如果使用文件作為輸入,繼承FileInputFormat大部分會使用FileInputFormat的getSplits方法重新定義自己的RecordReaderK createKey()V createValue()boolean next(K key, V value)34Output Format35Output Format所有的輸出文件缺省會寫入同一個目錄默認(rèn)輸出文件的名字是”part-”加5位數(shù)字第一個輸出文件是part-00000輸出文件數(shù)量與Reduce數(shù)相同如果沒有Redu
20、ce任務(wù),輸出文件數(shù)與Mapper數(shù)量相同F(xiàn)ileOutputFormatsetOutputPath(JobConf conf, Path outputDir)TextOutputFormat (default)每個Key Value對是一行key TAB value SequenceFileOutputFormat與SequenceFileInputFormat配合使用36MapReduce 輸出壓縮Job輸出結(jié)果壓縮conf.setBoolean(press, true);conf.setClass(pression.codec, SnappyCodec.class, Compressio
21、nCodec.class);Map任務(wù)輸出結(jié)果壓縮conf.setCompressMapOutput(true); conf.setMapOutputCompressorClass(SnappyCodec.class);37任務(wù)創(chuàng)建、提交38JobClient與Jobtracker交互的接口檢查任務(wù)的Input和Output拷貝Job的jar和配置信息到MapReduce框架提交任務(wù)監(jiān)控任務(wù)狀態(tài)39JobClient提交任務(wù)通過JobClient提供的靜態(tài)方法runJob()方法,會創(chuàng)建JobClient的實例,并調(diào)用submitJob()方法runJob()方法會每秒鐘去獲取Job的狀態(tài)在J
22、ob完成之前,程序會被阻塞在狀態(tài)獲取也可以直接調(diào)用JobClient的submitJob程序在任務(wù)提交之后不會被阻塞40JobConf配置Job的參數(shù)常用的參數(shù)jobconf.setJobName(“AnyName”);jobconf.setMapperClass(myMapper.class);jobconf.setCombinerClass(myReducer.class);jobconf.setReducerClass(myReducer.class);jobconf.setOutputKeyClass(Text.class); / for entire jobjobconf.setOu
23、tputValueClass(Text.class); / for entire jobjobconf.setMapOutputKeyClass(Text.class); / for just map()jobconf.setMapOutputValueClass(IntWritable.class); / for just map()jobconf.setNumReduceTasks(1); jobconf.setJarByClass(WordCount.class);41JobConf自定義參數(shù)/Job創(chuàng)建JobConf conf = new JobConf(); conf.set(“My
24、Property”, “MyValue”);JobClient.runJob(conf);/Mapper/Reducer中使用public void setup(Context context) throws IOException, InterruptedException Configuration conf = context.getConfiguration(); String myValue = conf.get(“MyProperty”); /etc 42main函數(shù)的編寫在main函數(shù)中直接構(gòu)造JobConf,并通過JobClient提交通過Tool和 ToolRunner實現(xiàn)T
25、ool接口 int run(String args)ToolRunner int run(Configuration conf, Tool tool, String args)支持處理Hadoop通用的參數(shù),如 conf, -D, -fs等43ToolRunner 實例44Override public int run(String args) throws Exception if (args.length != 2) System.out.println( Usage: ProductSearchIndexer ); ToolRunner.printGenericCommandUsage(
26、System.out); return -1; /JobClient.runJob() public class ProductSearchIndexer extends Configured implements Tool public static void main(String args) throws Exception int res = ToolRunner.run(new Configuration(), new ProductSearchIndexer(), args); System.exit(res); Implement ToolInvoke ToolRunnerIn
27、main()Implement runMapReduce程序運行打包成Jar,拷貝到集群中運行hadoop jar yourmr.jar io.transwarp.example.YourClassName args使用hadoop-eclipse插件在Eclipse中運行MapReduce程序45高級API46DistributedCache分發(fā)任務(wù)需要的只讀的大文件addCacheFile(URI,conf)/setCacheFiles(URIs,conf)addCacheArchive(URI,conf)/setCacheArchives(URIs,conf)addArchiveToCl
28、assPath(Path, Configuration)/addFileToClassPath(Path, Configuration)每個TaskTracker在執(zhí)行Job之前拷貝文件到本地該TaskTracker上的任務(wù)都從本地讀取執(zhí)行完Job之后刪除在任務(wù)的工作目錄下創(chuàng)建SymbolLinkDistributedCache.createSymlink(Configuration)使用#分割, hdfs:/namenode:port/lib.so.1#lib.so 分發(fā)的文件可以是Text, Archives, Jars等47DistributedCache例子/Job創(chuàng)建JobConf
29、conf = new JobConf(); DistributedCache.addCacheFile(new URI(/user/peter/cacheFile/testCache1), conf);JobClient.runJob(conf);/Mapper/Reducer中使用public void setup(Context context) throws IOException, InterruptedException Configuration conf = context.getConfiguration(); URI localFiles = DistributedCache.getCacheFiles(conf); /
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025至2030年中國串聯(lián)恒功率電伴熱帶數(shù)據(jù)監(jiān)測研究報告
- 統(tǒng)編版二年級語文下冊期中達(dá)標(biāo)測試卷(提升版)(含答案)
- 2025年《義務(wù)教育小學(xué)道德與法治課程標(biāo)準(zhǔn)測試卷2022版》測試題庫及答案
- 2022-2023學(xué)年廣東省廣州市天河區(qū)匯景實驗學(xué)校七年級(下)期中數(shù)學(xué)試卷(含答案)
- 遺產(chǎn)繼承遺囑效力確認(rèn)合同(2篇)
- 采購與施工分包合同(2篇)
- 物流配送路徑優(yōu)化對比表
- 開幕致辭與企業(yè)愿景演講實錄
- 蘇武牧羊的紅色故事征文
- 抵押房產(chǎn)借款合同
- 2025年高考百日誓師大會校長致辭(二)
- 2025年中國萬寶工程有限公司校園招聘筆試參考題庫附帶答案詳解
- 2025年河南機(jī)電職業(yè)學(xué)院單招職業(yè)技能測試題庫及參考答案
- 成本經(jīng)理試用期轉(zhuǎn)正工作匯報
- 2023年廣西本科對口中職考試中職英語試題
- 閃耀離子束瘢痕治療飛頓醫(yī)療激光公司客戶支持部講解
- 《莖和葉》說課稿-2023-2024學(xué)年科學(xué)四年級下冊教科版
- 2024年皖西衛(wèi)生職業(yè)學(xué)院單招職業(yè)適應(yīng)性測試題庫及答案解析
- 公務(wù)接待知識培訓(xùn)
- 2024年終通信監(jiān)理工作總結(jié)范文(2篇)
- 石油工程設(shè)計大賽采油單項組
評論
0/150
提交評論