Hadoop大數(shù)據(jù)開發(fā)實(shí)戰(zhàn)-第07章-認(rèn)識MapReduce編程模型課件_第1頁
Hadoop大數(shù)據(jù)開發(fā)實(shí)戰(zhàn)-第07章-認(rèn)識MapReduce編程模型課件_第2頁
Hadoop大數(shù)據(jù)開發(fā)實(shí)戰(zhàn)-第07章-認(rèn)識MapReduce編程模型課件_第3頁
Hadoop大數(shù)據(jù)開發(fā)實(shí)戰(zhàn)-第07章-認(rèn)識MapReduce編程模型課件_第4頁
Hadoop大數(shù)據(jù)開發(fā)實(shí)戰(zhàn)-第07章-認(rèn)識MapReduce編程模型課件_第5頁
已閱讀5頁,還剩21頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

1、認(rèn)識MapReduce編程模型主要內(nèi)容MapReduce編程模型簡介WordCount編程實(shí)例Hadoop MapReduce架構(gòu)MapReduce實(shí)戰(zhàn)開發(fā)MapReduce編程模型簡介MapReduce是一種可用于數(shù)據(jù)處理的編程模型。該模型比較簡單,但用于編寫有用的程序并不簡單。Hadoop可以運(yùn)行由各種語言編寫的MapReduce程序。例如:Java、Ruby、Python和C+語言等。最重要的是,MapReduce程序本質(zhì)上是并行運(yùn)行的,因此可以將大規(guī)模的數(shù)據(jù)分析任務(wù)交給任何一個擁有足夠多機(jī)器的運(yùn)行商。MapReduce的優(yōu)勢在于處理大規(guī)模數(shù)據(jù)集。MapReduce編程模型簡介1、從Ma

2、pReduce自身的命名特點(diǎn)可以看出,MapReduce由兩個階段組成:Map和Reduce。用戶只需map()和reduce()兩個函數(shù),即可完成簡單的分布式程序設(shè)計(jì)。2、map()函數(shù)以key/value對作為輸入,產(chǎn)生另外一系列key/value對作為中間輸出寫入本地磁盤。MapReduce框架會自動將這些中間數(shù)據(jù)按照key值進(jìn)行聚合,且key值相同的數(shù)據(jù)被統(tǒng)一交給reduce()函數(shù)處理。3、reduce()函數(shù)以key及對應(yīng)的value列表作為輸入,經(jīng)合并key相同的value值后,產(chǎn)生另外一系列key/value對作為最終輸出寫入HDFS。MapReduce編程模型簡介MapRed

3、uce設(shè)計(jì)目的:易于編程良好的擴(kuò)展性高容錯性WordCount編程實(shí)例Mapper類: public class WordMapper extends Mapper public static final IntWritable val = new IntWritable(1);public static final Text word = new Text();public void map(Object key, Text value, Context context)throws InterruptedException, IOException String line = value.

4、toString();String arr = line.split(t);for (String wd : arr) word.set(wd);context.write(word, val);WordCount編程實(shí)例Reducer類 public class WordReducer extends Reducer public IntWritable val = new IntWritable();public void reduce(Text key, Iterable values, Context context)throws InterruptedException, IOExc

5、eption int sum = 0;for (IntWritable value : values) sum += value.get();val.set(sum);context.write(key, val); WordCount編程實(shí)例main類: public class WordCount public static void main(String args) throws IOException,ClassNotFoundException, InterruptedException String intput = null;String output = null;if (n

6、ull != args & args.length = 2) intput = args0;output = args1;Job job = new Job(new Configuration(), word count);/創(chuàng)建一個job /以jar包的形式運(yùn)行job.setJarByClass(WordCount.class); /設(shè)置Mapper類和Reducer類job.setMapperClass(Mapper.class);job.setReducerClass(Reducer.class);WordCount編程實(shí)例 /設(shè)置輸出的key/value的輸出數(shù)據(jù)類型 job.setO

7、utputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); /設(shè)置輸入輸出的格式FileInputFormat.addInputPath(job, new Path(intput);FileOutputFormat.setOutputPath(job, new Path(output);System.exit(job.waitForCompletion(true) ? 0 : 1); else System.err.println( wordcount ); 運(yùn)行結(jié)果WordCount編程實(shí)例用戶編寫完MapRe

8、duce程序后,按照一定的規(guī)則指定程序的輸入和輸出目錄,并提交到Hadoop集群中,作業(yè)在Hadoop中的執(zhí)行過程如圖所示。Hadoop將輸入數(shù)據(jù)切分成若干個輸入分片(input split),并將每個split交給一個Map Task處理;Map Task不斷的從對應(yīng)的split中解析出一個個key/value,并調(diào)用map()函數(shù)處理,處理完之后根據(jù)Reduce Task個數(shù)將結(jié)果分成若干個分區(qū)(partition)寫到本地磁盤;同時,每個Reduce Task從每個Map Task上讀取屬于自己的那個partition,然后基于排序的方法將key相同的數(shù)據(jù)聚集在一起,調(diào)用reduce()

9、函數(shù)處理,并將結(jié)果輸出到文件中。WordCount編程實(shí)例流程圖如下:Hadoop MapReduce架構(gòu)Hadoop MapReduce架構(gòu)1)Client用戶編寫的MapReduce程序通過Client提交到JobTracker端;同時,用戶可通過Client提供的一些接口查看作業(yè)的運(yùn)行狀態(tài)。在Hadoop內(nèi)部用“作業(yè)”(Job)表示MapReduce程序。一個MapReduce程序可對應(yīng)若干個作業(yè),而每個作業(yè)會被分解成若干個Map/Reduce任務(wù)(Task)。2)JobTrackerJobTracke負(fù)責(zé)資源監(jiān)控和作業(yè)調(diào)度。JobTracker 監(jiān)控所有TaskTracker 與job

10、的健康狀況,一旦發(fā)現(xiàn)失敗,就將相應(yīng)的任務(wù)轉(zhuǎn)移到其他節(jié)點(diǎn);同時,JobTracker 會跟蹤任務(wù)的執(zhí)行進(jìn)度、資源使用量等信息,并將這些信息告訴任務(wù)調(diào)度器,而調(diào)度器會在資源出現(xiàn)空閑時,選擇合適的任務(wù)使用這些資源。在Hadoop 中,任務(wù)調(diào)度器是一個可插拔的模塊,用戶可以根據(jù)自己的需要設(shè)計(jì)相應(yīng)的調(diào)度器。Hadoop MapReduce架構(gòu)3)TaskTrackerTaskTracker 會周期性地通過Heartbeat 將本節(jié)點(diǎn)上資源的使用情況和任務(wù)的運(yùn)行進(jìn)度匯報給JobTracker,同時接收J(rèn)obTracker 發(fā)送過來的命令并執(zhí)行相應(yīng)的操作(如啟動新任務(wù)、殺死任務(wù)等)。TaskTracker

11、使用“slot”等量劃分本節(jié)點(diǎn)上的資源量。“slot”代表計(jì)算資源(CPU、內(nèi)存等)。一個Task 獲取到一個slot 后才有機(jī)會運(yùn)行,而Hadoop 調(diào)度器的作用就是將各個TaskTracker 上的空閑slot 分配給Task 使用。slot 分為Map slot 和Reduce slot 兩種,分別供MapTask 和Reduce Task 使用。TaskTracker 通過slot 數(shù)目(可配置參數(shù))限定Task 的并發(fā)度。Hadoop MapReduce架構(gòu)4)TaskTask 分為Map Task 和Reduce Task 兩種,均由TaskTracker 啟動。HDFS 以固定大

12、小的block 為基本單位存儲數(shù)據(jù),而對于MapReduce 而言,其處理單位是split。split 是一個邏輯概念,它只包含一些元數(shù)據(jù)信息,比如數(shù)據(jù)起始位置、數(shù)據(jù)長度、數(shù)據(jù)所在節(jié)點(diǎn)等。它的劃分方法完全由用戶自己決定。但需要注意的是,split 的多少決定了Map Task 的數(shù)目,因?yàn)槊總€split 只會交給一個Map Task 處理。Hadoop MapReduce架構(gòu)Map Task 執(zhí)行過程如下圖 所示。由該圖可知,Map Task 先將對應(yīng)的split 迭代解析成一個個key/value 對,依次調(diào)用用戶自定義的map() 函數(shù)進(jìn)行處理,最終將臨時結(jié)果存放到本地磁盤上,其中臨時數(shù)據(jù)

13、被分成若干個partition,每個partition 將被一個Reduce Task 處理。Hadoop MapReduce架構(gòu)Reduce Task 執(zhí)行過程下圖所示。該過程分為三個階段:從遠(yuǎn)程節(jié)點(diǎn)上讀取MapTask 中間結(jié)果(稱為“Shuffle 階段”);按照key 對key/value 對進(jìn)行排序(稱為“Sort 階段”);依次讀取,調(diào)用用戶自定義的reduce() 函數(shù)處理,并將最終結(jié)果存到HDFS 上(稱為“Reduce 階段”)。MapReduce實(shí)戰(zhàn)開發(fā)數(shù)據(jù)源sogou500w數(shù)據(jù)或sogou4000w數(shù)據(jù)數(shù)據(jù)字段描述Time:用戶訪問時間Uid:用戶的idKeyword:

14、訪問的關(guān)鍵字Rank:點(diǎn)擊排名Order:頁數(shù)Url:網(wǎng)址條件過濾統(tǒng)計(jì)出搜索過包含有“仙劍奇?zhèn)b傳”內(nèi)容的UID及搜索關(guān)鍵字記錄詳細(xì)代碼見XjUid.javarank2的所有UID及數(shù)量代碼見UidByRank.java搜索過仙劍奇?zhèn)b傳內(nèi)容的UID、搜索記錄static class UidMap extends Mapper Text uid = new Text(); protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws java.i

15、o.IOException ,InterruptedException String lines = value.toString().split(t); if(lines != null & lines .length = 6) String kw = lines 2; if(kw.indexOf(“仙劍奇?zhèn)b傳)=0) uid.set(lines1); context.write(uid, new Text(kw); ; 搜索過仙劍奇?zhèn)b傳內(nèi)容的UID、搜索記錄public static void main(String args) throws IOException, ClassNotFo

16、undException, InterruptedException if(args.length!=2&args=null)System.err.println(Please Iput Right Path!);System.exit(0);Configuration configuration = new Configuration();Job job = new Job(configuration, BaiduUid.class.getSimpleName();job.setJarByClass(BaiduUid.class);job.setInputFormatClass(TextIn

17、putFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(args0);FileOutputFormat.setOutputPath(job, new Path(args1);job.setMapperClass(UidMap.class);job.setNumReduceTasks(0);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.waitForCompletion(true); 條件查詢上午7-9點(diǎn)之間,搜索過“趕集網(wǎng)”的用戶詳細(xì)代碼請見GjTest.java本章小結(jié)MapRe

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論