hadoop進(jìn)行分布式并行編程第二部分_第1頁(yè)
hadoop進(jìn)行分布式并行編程第二部分_第2頁(yè)
hadoop進(jìn)行分布式并行編程第二部分_第3頁(yè)
hadoop進(jìn)行分布式并行編程第二部分_第4頁(yè)
hadoop進(jìn)行分布式并行編程第二部分_第5頁(yè)
已閱讀5頁(yè),還剩8頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

1、用 Hadoop 進(jìn)行分布式并行編程, 第 2 部分程序?qū)嵗c分析曹 羽中 (caoyuz), 軟件工程師, IBM中國(guó)開(kāi)發(fā)中心簡(jiǎn)介: Hadoop 是一個(gè)實(shí)現(xiàn)了 MapReduce 計(jì)算模型的開(kāi)源分布式并行編程框架,借助于 Hadoop, 程序員可以輕松地編寫分布式并行程序,將其運(yùn)行于計(jì)算機(jī)集群上,完成海量數(shù)據(jù)的計(jì)算。在本文中,詳細(xì)介紹了如何針對(duì)一個(gè)具體的并行計(jì)算任務(wù),基于 Hadoop 編寫程序,如何使用 IBM MapReduce Tools 在 Eclipse 環(huán)境中編譯并運(yùn)行 Hadoop 程序。本文的標(biāo)簽:  eclipse下開(kāi)發(fā)hadoop

2、, mapreduce標(biāo)記本文!發(fā)布日期: 2008 年 5 月 22 日 級(jí)別: 初級(jí) 訪問(wèn)情況 : 11069 次瀏覽 評(píng)論: 1 (查看 | 添加評(píng)論 - 登錄) 平均分 (12個(gè)評(píng)分)為本文評(píng)分前言在上一篇文章:“用 Hadoop 進(jìn)行分布式并行編程 第一部分 基本概念與安裝部署”中,介紹了 MapReduce 計(jì)算模型,分布式文件系統(tǒng) HDFS,分布式并行計(jì)算等的基本原理, 并且詳細(xì)介紹了如何安裝 Hadoop,如何運(yùn)行基于 Hadoop 的并行程序。在本

3、文中,將針對(duì)一個(gè)具體的計(jì)算任務(wù),介紹如何基于 Hadoop 編寫并行程序,如何使用 IBM 開(kāi)發(fā)的 Hadoop Eclipse plugin 在 Eclipse 環(huán)境中編譯并運(yùn)行程序。回頁(yè)首分析 WordCount 程序我們先來(lái)看看 Hadoop 自帶的示例程序 WordCount,這個(gè)程序用于統(tǒng)計(jì)一批文本文件中單詞出現(xiàn)的頻率,完整的代碼可在下載的 Hadoop 安裝包中得到(在 src/examples 目錄中)。1.實(shí)現(xiàn)Map類見(jiàn)代碼清單1。這個(gè)類實(shí)現(xiàn) Mapper 接口中的 map 方法,輸入?yún)?shù)中的 value 是文本文件中的一行,利用 StringTokenizer 將這個(gè)字符串拆

4、成單詞,然后將輸出結(jié)果 <單詞,1> 寫入到 org.apache.hadoop.mapred.OutputCollector 中。OutputCollector 由 Hadoop 框架提供, 負(fù)責(zé)收集 Mapper 和 Reducer 的輸出數(shù)據(jù),實(shí)現(xiàn) map 函數(shù)和 reduce 函數(shù)時(shí),只需要簡(jiǎn)單地將其輸出的 <key,value> 對(duì)往 OutputCollector 中一丟即可,剩余的事框架自會(huì)幫你處理好。代碼中 LongWritable, IntWritable, Text 均是 Hadoop 中實(shí)現(xiàn)的用于封裝 Java 數(shù)據(jù)類型的類,這些類都能夠被串行化從

5、而便于在分布式環(huán)境中進(jìn)行數(shù)據(jù)交換,你可以將它們分別視為 long, int, String 的替代品。Reporter 則可用于報(bào)告整個(gè)應(yīng)用的運(yùn)行進(jìn)度,本例中未使用。代碼清單1 public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public

6、 void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens() word.set(itr.nextToken(); output.collect(word, one); 2.實(shí)現(xiàn) Reduce

7、類見(jiàn)代碼清單 2。這個(gè)類實(shí)現(xiàn) Reducer 接口中的 reduce 方法, 輸入?yún)?shù)中的 key, values 是由 Map 任務(wù)輸出的中間結(jié)果,values 是一個(gè) Iterator, 遍歷這個(gè) Iterator, 就可以得到屬于同一個(gè) key 的所有 value. 此處,key 是一個(gè)單詞,value 是詞頻。只需要將所有的 value 相加,就可以得到這個(gè)單詞的總的出現(xiàn)次數(shù)。代碼清單 2 public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, Int

8、Writable> public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException int sum = 0; while (values.hasNext() sum += values.next().get(); output.collect(key, new IntWritable(sum); 3.運(yùn)行 Job在 Hadoop 中一次計(jì)算任務(wù)稱之為一

9、個(gè) job, 可以通過(guò)一個(gè) JobConf 對(duì)象設(shè)置如何運(yùn)行這個(gè) job。此處定義了輸出的 key 的類型是 Text, value 的類型是 IntWritable, 指定使用代碼清單1中實(shí)現(xiàn)的 MapClass 作為 Mapper 類,使用代碼清單2中實(shí)現(xiàn)的 Reduce 作為 Reducer 類和 Combiner 類, 任務(wù)的輸入路徑和輸出路徑由命令行參數(shù)指定,這樣 job 運(yùn)行時(shí)會(huì)處理輸入路徑下的所有文件,并將計(jì)算結(jié)果寫到輸出路徑下。然后將 JobConf 對(duì)象作為參數(shù),調(diào)用 JobClient 的 runJob, 開(kāi)始執(zhí)行這個(gè)計(jì)算任務(wù)。至于 main 方法中使用的 ToolRunn

10、er 是一個(gè)運(yùn)行 MapReduce 任務(wù)的輔助工具類,依樣畫葫蘆用之即可。代碼清單 3 public int run(String args) throws Exception JobConf conf = new JobConf(getConf(), WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.clas

11、s); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputPath(new Path(args0); conf.setOutputPath(new Path(args1); JobClient.runJob(conf); return 0; public static void main(String args) throws Exception if(args.length != 2) System.err.println("Usage: WordCount &

12、lt;input path> <output path>"); System.exit(-1); int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); 以上就是 WordCount 程序的全部細(xì)節(jié),簡(jiǎn)單到讓人吃驚,您都不敢相信就這么幾行代碼就可以分布式運(yùn)行于大規(guī)模集群上,并行處理海量數(shù)據(jù)集。4. 通過(guò) JobConf 定制計(jì)算任務(wù)通過(guò)上文所述的 JobConf 對(duì)象,程序員可以設(shè)定各種參數(shù),定制如何完成一個(gè)計(jì)算任務(wù)。這些參數(shù)很多情況下就是一個(gè) j

13、ava 接口,通過(guò)注入這些接口的特定實(shí)現(xiàn),可以定義一個(gè)計(jì)算任務(wù)( job )的全部細(xì)節(jié)。了解這些參數(shù)及其缺省設(shè)置,您才能在編寫自己的并行計(jì)算程序時(shí)做到輕車熟路,游刃有余,明白哪些類是需要自己實(shí)現(xiàn)的,哪些類用 Hadoop 的缺省實(shí)現(xiàn)即可。表一是對(duì) JobConf 對(duì)象中可以設(shè)置的一些重要參數(shù)的總結(jié)和說(shuō)明,表中第一列中的參數(shù)在 JobConf 中均會(huì)有相應(yīng)的 get/set 方法,對(duì)程序員來(lái)說(shuō),只有在表中第三列中的缺省值無(wú)法滿足您的需求時(shí),才需要調(diào)用這些 set 方法,設(shè)定合適的參數(shù)值,實(shí)現(xiàn)自己的計(jì)算目的。針對(duì)表格中第一列中的接口,除了第三列的缺省實(shí)現(xiàn)之外,Hadoop 通常還會(huì)有一些其它的實(shí)現(xiàn)

14、,我在表格第四列中列出了部分,您可以查閱 Hadoop 的 API 文檔或源代碼獲得更詳細(xì)的信息,在很多的情況下,您都不用實(shí)現(xiàn)自己的 Mapper 和 Reducer, 直接使用 Hadoop 自帶的一些實(shí)現(xiàn)即可。表一 JobConf 常用可定制參數(shù)參數(shù)作用缺省值其它實(shí)現(xiàn)InputFormat將輸入的數(shù)據(jù)集切割成小數(shù)據(jù)集 InputSplits, 每一個(gè) InputSplit 將由一個(gè) Mapper 負(fù)責(zé)處理。此外 InputFormat 中還提供一個(gè) RecordReader 的實(shí)現(xiàn), 將一個(gè) InputSplit 解析成 <key,value> 對(duì)提供給 map 函數(shù)。Text

15、InputFormat(針對(duì)文本文件,按行將文本文件切割成 InputSplits, 并用 LineRecordReader 將 InputSplit 解析成 <key,value> 對(duì),key 是行在文件中的位置,value 是文件中的一行)SequenceFileInputFormatOutputFormat提供一個(gè) RecordWriter 的實(shí)現(xiàn),負(fù)責(zé)輸出最終結(jié)果TextOutputFormat(用 LineRecordWriter 將最終結(jié)果寫成純文件文件,每個(gè) <key,value> 對(duì)一行,key 和 value 之間用 tab 分隔)SequenceFi

16、leOutputFormatOutputKeyClass輸出的最終結(jié)果中 key 的類型LongWritableOutputValueClass輸出的最終結(jié)果中 value 的類型TextMapperClassMapper 類,實(shí)現(xiàn) map 函數(shù),完成輸入的 <key,value> 到中間結(jié)果的映射IdentityMapper(將輸入的 <key,value> 原封不動(dòng)的輸出為中間結(jié)果)LongSumReducer,LogRegexMapper,InverseMapperCombinerClass實(shí)現(xiàn) combine 函數(shù),將中間結(jié)果中的重復(fù) key 做合并null(不

17、對(duì)中間結(jié)果中的重復(fù) key 做合并)ReducerClassReducer 類,實(shí)現(xiàn) reduce 函數(shù),對(duì)中間結(jié)果做合并,形成最終結(jié)果IdentityReducer(將中間結(jié)果直接輸出為最終結(jié)果)AccumulatingReducer, LongSumReducerInputPath設(shè)定 job 的輸入目錄, job 運(yùn)行時(shí)會(huì)處理輸入目錄下的所有文件nullOutputPath設(shè)定 job 的輸出目錄,job 的最終結(jié)果會(huì)寫入輸出目錄下nullMapOutputKeyClass設(shè)定 map 函數(shù)輸出的中間結(jié)果中 key 的類型如果用戶沒(méi)有設(shè)定的話,使用 OutputKeyClassMapOu

18、tputValueClass設(shè)定 map 函數(shù)輸出的中間結(jié)果中 value 的類型如果用戶沒(méi)有設(shè)定的話,使用 OutputValuesClassOutputKeyComparator對(duì)結(jié)果中的 key 進(jìn)行排序時(shí)的使用的比較器WritableComparablePartitionerClass對(duì)中間結(jié)果的 key 排序后,用此 Partition 函數(shù)將其劃分為R份,每份由一個(gè) Reducer 負(fù)責(zé)處理。HashPartitioner(使用 Hash 函數(shù)做 partition)KeyFieldBasedPartitioner PipesPartitioner回頁(yè)首改進(jìn)的 WordCount

19、程序現(xiàn)在你對(duì) Hadoop 并行程序的細(xì)節(jié)已經(jīng)有了比較深入的了解,我們來(lái)把 WordCount 程序改進(jìn)一下,目標(biāo): (1)原 WordCount 程序僅按空格切分單詞,導(dǎo)致各類標(biāo)點(diǎn)符號(hào)與單詞混雜在一起,改進(jìn)后的程序應(yīng)該能夠正確的切出單詞,并且單詞不要區(qū)分大小寫。(2)在最終結(jié)果中,按單詞出現(xiàn)頻率的降序進(jìn)行排序。1.修改 Mapper 類,實(shí)現(xiàn)目標(biāo)(1)實(shí)現(xiàn)很簡(jiǎn)單,見(jiàn)代碼清單4中的注釋。代碼清單 4 public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text,

20、IntWritable> private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private String pattern="w" /正則表達(dá)式,代表不是0-9, a-z, A-Z的所有其它字符 public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOE

21、xception String line = value.toString().toLowerCase(); /全部轉(zhuǎn)為小寫字母 line = line.replaceAll(pattern, " "); /將非0-9, a-z, A-Z的字符替換為空格 StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens() word.set(itr.nextToken(); output.collect(word, one); 2.實(shí)現(xiàn)目標(biāo)(2)用一個(gè)并行計(jì)算任務(wù)顯然是無(wú)法同時(shí)完成單詞詞頻統(tǒng)計(jì)

22、和排序的,這時(shí)我們可以利用 Hadoop 的任務(wù)管道能力,用上一個(gè)任務(wù)(詞頻統(tǒng)計(jì))的輸出做為下一個(gè)任務(wù)(排序)的輸入,順序執(zhí)行兩個(gè)并行計(jì)算任務(wù)。主要工作是修改代碼清單3中的 run 函數(shù),在其中定義一個(gè)排序任務(wù)并運(yùn)行之。在 Hadoop 中要實(shí)現(xiàn)排序是很簡(jiǎn)單的,因?yàn)樵?MapReduce 的過(guò)程中,會(huì)把中間結(jié)果根據(jù) key 排序并按 key 切成 R 份交給 R 個(gè) Reduce 函數(shù),而 Reduce 函數(shù)在處理中間結(jié)果之前也會(huì)有一個(gè)按 key 進(jìn)行排序的過(guò)程,故 MapReduce 輸出的最終結(jié)果實(shí)際上已經(jīng)按 key 排好序。詞頻統(tǒng)計(jì)任務(wù)輸出的 key 是單詞,value 是詞頻,為了實(shí)現(xiàn)

23、按詞頻排序,我們指定使用 InverseMapper 類作為排序任務(wù)的 Mapper 類( sortJob.setMapperClass(InverseMapper.class );),這個(gè)類的 map 函數(shù)簡(jiǎn)單地將輸入的 key 和 value 互換后作為中間結(jié)果輸出,在本例中即是將詞頻作為 key,單詞作為 value 輸出, 這樣自然就能得到按詞頻排好序的最終結(jié)果。我們無(wú)需指定 Reduce 類,Hadoop 會(huì)使用缺省的 IdentityReducer 類,將中間結(jié)果原樣輸出。還有一個(gè)問(wèn)題需要解決: 排序任務(wù)中的 Key 的類型是 IntWritable, (sortJob.setOu

24、tputKeyClass(IntWritable.class), Hadoop 默認(rèn)對(duì) IntWritable 按升序排序,而我們需要的是按降序排列。因此我們實(shí)現(xiàn)了一個(gè) IntWritableDecreasingComparator 類,并指定使用這個(gè)自定義的 Comparator 類對(duì)輸出結(jié)果中的 key (詞頻)進(jìn)行排序:sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class)詳見(jiàn)代碼清單 5 及其中的注釋。代碼清單 5 public int run(String args) throws Exce

25、ption Path tempDir = new Path("wordcount-temp-" + Integer.toString( new Random().nextInt(Integer.MAX_VALUE); /定義一個(gè)臨時(shí)目錄 JobConf conf = new JobConf(getConf(), WordCount.class); try conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritabl

26、e.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputPath(new Path(args0); conf.setOutputPath(tempDir); /先將詞頻統(tǒng)計(jì)任務(wù)的輸出結(jié)果寫到臨時(shí)目 /錄中, 下一個(gè)排序任務(wù)以臨時(shí)目錄為輸入目錄。 conf.setOutputFormat(SequenceFileOutputFormat.class); JobClient.runJob(c

27、onf); JobConf sortJob = new JobConf(getConf(), WordCount.class); sortJob.setJobName("sort"); sortJob.setInputPath(tempDir); sortJob.setInputFormat(SequenceFileInputFormat.class); sortJob.setMapperClass(InverseMapper.class); sortJob.setNumReduceTasks(1); /將 Reducer 的個(gè)數(shù)限定為1, 最終輸出的結(jié)果 /文件就是一個(gè)。

28、 sortJob.setOutputPath(new Path(args1); sortJob.setOutputKeyClass(IntWritable.class); sortJob.setOutputValueClass(Text.class); sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class); JobClient.runJob(sortJob); finally FileSystem.get(conf).delete(tempDir);/刪除臨時(shí)目錄 return 0; private

29、 static class IntWritableDecreasingComparator extends IntWritable.Comparator public int compare(WritableComparable a, WritableComparable b) return -pare(a, b); public int compare(byte b1, int s1, int l1, byte b2, int s2, int l2) return -pare(b1, s1, l1, b2, s2, l2); 回頁(yè)首在 Eclipse 環(huán)境下進(jìn)行開(kāi)發(fā)和調(diào)試在 Eclipse

30、環(huán)境下可以方便地進(jìn)行 Hadoop 并行程序的開(kāi)發(fā)和調(diào)試。推薦使用 IBM MapReduce Tools for Eclipse, 使用這個(gè) Eclipse plugin 可以簡(jiǎn)化開(kāi)發(fā)和部署 Hadoop 并行程序的過(guò)程。基于這個(gè) plugin, 可以在 Eclipse 中創(chuàng)建一個(gè) Hadoop MapReduce 應(yīng)用程序,并且提供了一些基于 MapReduce 框架的類開(kāi)發(fā)的向?qū)?,可以打包?JAR 文件,部署一個(gè) Hadoop MapReduce 應(yīng)用程序到一個(gè) Hadoop 服務(wù)器(本地和遠(yuǎn)程均可),可以通過(guò)一個(gè)專門的視圖 ( perspective ) 查看 Hadoop 服務(wù)器、

31、Hadoop 分布式文件系統(tǒng)( DFS )和當(dāng)前運(yùn)行的任務(wù)的狀態(tài)。可在 IBM alphaWorks 網(wǎng)站下載這個(gè) MapReduce Tool, 或在本文的下載清單中下載。將下載后的壓縮包解壓到你 Eclipse 安裝目錄,重新啟動(dòng) Eclipse 即可使用了。設(shè)置 Hadoop 主目錄點(diǎn)擊 Eclipse 主菜單上 Windows->Preferences, 然后在左側(cè)選擇 Hadoop Home Directory,設(shè)定你的 Hadoop 主目錄,如圖一所示:圖 1 創(chuàng)立一個(gè) MapReduce Project點(diǎn)擊 Eclipse 主菜單上 File->N

32、ew->Project, 在彈出的對(duì)話框中選擇 MapReduce Project, 輸入 project name 如 wordcount,然后點(diǎn)擊 Finish 即可。,如圖 2 所示:圖 2 此后,你就可以象一個(gè)普通的 Eclipse Java project 那樣,添加入 Java 類,比如你可以定義一個(gè) WordCount 類,然后將本文代碼清單1,2,3中的代碼寫到此類中,添加入必要的 import 語(yǔ)句 ( Eclipse 快捷鍵 ctrl+shift+o 可以幫你),即可形成一個(gè)完整的 wordcount 程序。在我們這個(gè)簡(jiǎn)單的 wordcount 程序中,我們把全部的內(nèi)容都放在一個(gè) WordCount 類中。實(shí)際上 IBM MapReduce tools 還提供了幾個(gè)實(shí)用的向?qū)?( wizard ) 工具,幫你創(chuàng)建單獨(dú)的 Mapper 類,Reducer 類,MapReduce Driver 類(就是代碼清單3中那部分內(nèi)容),在編寫比較復(fù)雜的 MapReduce 程序時(shí),將這些類獨(dú)立出來(lái)是非常有必要的,也有利于在不同的計(jì)算任務(wù)中重用你編寫的各種 Mapper 類和 Reducer 類。在 Eclip

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論