




版權(quán)說(shuō)明:本文檔由用戶(hù)提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
1、自定義Hadoop Map/Reduce輸入文件切割I(lǐng)nputFormatHadoop會(huì)對(duì)原始輸入文件進(jìn)行文件切割,然后把每個(gè)split傳入mapper程序中進(jìn)行處理,F(xiàn)ileInputFormat是所有以文件作 為數(shù)據(jù)源的InputFormat實(shí)現(xiàn)的基類(lèi),F(xiàn)ileInputFormat保存作為job輸入的所有文件,并實(shí)現(xiàn)了對(duì)輸入文件計(jì)算splits的方 法。至于獲得記錄的方法是有不同的子類(lèi)進(jìn)行實(shí)現(xiàn)的。那么,F(xiàn)ileInputFormat是怎樣將他們劃分成splits的呢?FileInputFormat只劃分比HDFS block大的文件,所以如果一個(gè)文件的大小比block小,將不會(huì)被劃分,這
2、也是Hadoop處理大文件的效率要比處理很多小文件的效率高的原因。hadoop默認(rèn)的InputFormat是TextInputFormat,重寫(xiě)了FileInputFormat中的createRecordReader和isSplitable方法。該類(lèi)使用的reader是LineRecordReader,即以回車(chē)鍵(CR = 13)或換行符(LF = 10)為行分隔符。但大多數(shù)情況下,回車(chē)鍵或換行符作為輸入文件的行分隔符并不能滿(mǎn)足我們的需求,通常用戶(hù)很有可能會(huì)輸入回車(chē)鍵、換行符,所以通常我們會(huì)定義不可見(jiàn)字符(即用戶(hù)無(wú)法輸入的字符)為行分隔符,這種情況下,就需要新寫(xiě)一個(gè)InputFormat。又或
3、者,一條記錄的分隔符不是字符,而是字符串,這種情況相對(duì)麻煩;還有一種情況,輸入文件的主鍵key已經(jīng)是排好序的了,需要hadoop做的只是把相 同的key作為一個(gè)數(shù)據(jù)塊進(jìn)行邏輯處理,這種情況更麻煩,相當(dāng)于免去了mapper的過(guò)程,直接進(jìn)去reduce,那么InputFormat的邏輯就相 對(duì)較為復(fù)雜了,但并不是不能實(shí)現(xiàn)。1、改變一條記錄的分隔符,不用默認(rèn)的回車(chē)或換行符作為記錄分隔符,甚至可以采用字符串作為記錄分隔符。1)自定義一個(gè)InputFormat,繼承FileInputFormat,重寫(xiě)createRecordReader方法,如果不需要分片或者需要改變分片的方式,則重寫(xiě)isSplitab
4、le方法,具體代碼如下:public class FileInputFormatB extends FileInputFormat<LongWritable, Text> Overridepublic RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) return new SearchRecordReader("b");Overrideprotected boolean isSplitable(FileS
5、ystem fs, Path filename) / 輸入文件不分片return false;2)關(guān)鍵在于定義一個(gè)新的SearchRecordReader繼承RecordReader,支持自定義的行分隔符,即一條記錄的分隔符。標(biāo)紅的地方為與hadoop默認(rèn)的LineRecordReader不同的地方。public class IsearchRecordReader extends RecordReader<LongWritable, Text> private static final Log LOG = LogFactory.getLog(IsearchRecordReader.
6、class);private CompressionCodecFactory compressionCodecs = null;private long start;private long pos;private long end;private LineReader in;private int maxLineLength;private LongWritable key = null;private Text value = null;/行分隔符,即一條記錄的分隔符private byte separator = 'b'private int sepLength = 1;
7、public IsearchRecordReader()public IsearchRecordReader(String seps)this.separator = seps.getBytes(); sepLength = separator.length;public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException FileSplit split = (FileSplit) genericSplit;Configuration job = context.getC
8、onfiguration();this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);this.start = split.getStart();this.end = (this.start + split.getLength();Path file = split.getPath();pressionCodecs = new CompressionCodecFactory(job);CompressionCodec codec = pressionCod
9、ecs.getCodec(file);/ open the file and seek to the start of the splitFileSystem fs = file.getFileSystem(job);FSDataInputStream fileIn = fs.open(split.getPath();boolean skipFirstLine = false;if (codec != null) this.in = new LineReader(codec.createInputStream(fileIn), job);this.end = Long.MAX_VALUE; e
10、lse if (this.start != 0L) skipFirstLine = true;this.start -= sepLength;fileIn.seek(this.start);this.in = new LineReader(fileIn, job);if (skipFirstLine) / skip first line and re-establish "start".int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start)
11、;if(newSize > 0)start += newSize;this.pos = this.start;public boolean nextKeyValue() throws IOException if (this.key = null) this.key = new LongWritable();this.key.set(this.pos);if (this.value = null) this.value = new Text();int newSize = 0;while (this.pos < this.end) newSize = this.in.readLin
12、e(this.value, this.maxLineLength, Math.max(int) Math.min(Integer.MAX_VALUE, this.end - this.pos), this.maxLineLength);if (newSize = 0) break;this.pos += newSize;if (newSize < this.maxLineLength) break;LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - newSiz
13、e);if (newSize = 0) /讀下一個(gè)bufferthis.key = null;this.value = null;return false;/讀同一個(gè)buffer的下一個(gè)記錄return true;public LongWritable getCurrentKey() return this.key;public Text getCurrentValue() return this.value;public float getProgress() if (this.start = this.end) return 0.0F;return Math.min(1.0F, (floa
14、t) (this.pos - this.start) / (float) (this.end - this.start);public synchronized void close() throws IOException if (this.in != null)this.in.close();3)重寫(xiě)SearchRecordReader需要的LineReader,可作為SearchRecordReader內(nèi)部類(lèi)。特別需要注意的地方就 是,讀取文件的方式是按指定大小的buffer來(lái)讀,必定就會(huì)遇到一條完整的記錄被切成兩半,甚至如果分隔符大于1個(gè)字符時(shí)分隔符也會(huì)被切成兩半的情況, 這種情況一定
15、要加以拼接處理。public class LineReader /回車(chē)鍵(hadoop默認(rèn))/private static final byte CR = 13;/換行符(hadoop默認(rèn))/private static final byte LF = 10;/按buffer進(jìn)行文件讀取private static final int DEFAULT_BUFFER_SIZE = 32 * 1024 * 1024;private int bufferSize = DEFAULT_BUFFER_SIZE;private InputStream in;private byte buffer;priv
16、ate int bufferLength = 0;private int bufferPosn = 0;LineReader(InputStream in, int bufferSize) this.bufferLength = 0;this.bufferPosn = 0;this.in = in;this.bufferSize = bufferSize;this.buffer = new bytethis.bufferSize;public LineReader(InputStream in, Configuration conf) throws IOException this(in, c
17、onf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);public void close() throws IOException in.close();public int readLine(Text str, int maxLineLength) throws IOException return readLine(str, maxLineLength, Integer.MAX_VALUE);public int readLine(Text str) throws IOException return readLi
18、ne(str, Integer.MAX_VALUE, Integer.MAX_VALUE);/以下是需要改寫(xiě)的部分_start,核心代碼public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOExceptionstr.clear();Text record = new Text();int txtLength = 0;long bytesConsumed = 0L;boolean newline = false;int sepPosn = 0;do /已經(jīng)讀到buffer的末尾了,讀下一個(gè)
19、bufferif (this.bufferPosn >= this.bufferLength) bufferPosn = 0;bufferLength = in.read(buffer);/讀到文件末尾了,則跳出,進(jìn)行下一個(gè)文件的讀取if (bufferLength <= 0) break;int startPosn = this.bufferPosn;for (; bufferPosn < bufferLength; bufferPosn +) /處理上一個(gè)buffer的尾巴被切成了兩半的分隔符(如果分隔符中重復(fù)字符過(guò)多在這里會(huì)有問(wèn)題)if(sepPosn > 0 &
20、amp;& bufferbufferPosn != separatorsepPosn)sepPosn = 0;/遇到行分隔符的第一個(gè)字符if (bufferbufferPosn = separatorsepPosn) bufferPosn +;int i = 0;/判斷接下來(lái)的字符是否也是行分隔符中的字符for(+ sepPosn; sepPosn < sepLength; i +, sepPosn +)/buffer的最后剛好是分隔符,且分隔符被不幸地切成了兩半if(bufferPosn + i >= bufferLength)bufferPosn += i - 1;br
21、eak;/一旦其中有一個(gè)字符不相同,就判定為不是分隔符if(this.bufferthis.bufferPosn + i != separatorsepPosn)sepPosn = 0;break;/的確遇到了行分隔符if(sepPosn = sepLength)bufferPosn += i;newline = true;sepPosn = 0;break;int readLength = this.bufferPosn - startPosn;bytesConsumed += readLength;/行分隔符不放入塊中/int appendLength = readLength - new
22、lineLength;if (readLength > maxLineLength - txtLength) readLength = maxLineLength - txtLength;if (readLength > 0) record.append(this.buffer, startPosn, readLength);txtLength += readLength;/去掉記錄的分隔符if(newline)str.set(record.getBytes(), 0, record.getLength() - sepLength); while (!newline &&a
23、mp; (bytesConsumed < maxBytesToConsume);if (bytesConsumed > (long)Integer.MAX_VALUE) throw new IOException("Too many bytes before newline: " + bytesConsumed);return (int) bytesConsumed;/以下是需要改寫(xiě)的部分_end/以下是hadoop-core中LineReader的源碼_startpublic int readLine(Text str, int maxLineLength,
24、int maxBytesToConsume) throws IOExceptionstr.clear();int txtLength = 0;int newlineLength = 0;boolean prevCharCR = false;long bytesConsumed = 0L;do int startPosn = this.bufferPosn;if (this.bufferPosn >= this.bufferLength) startPosn = this.bufferPosn = 0;if (prevCharCR) bytesConsumed +;this.bufferL
25、ength = this.in.read(this.buffer);if (this.bufferLength <= 0) break;for (; this.bufferPosn < this.bufferLength; this.bufferPosn +) if (this.bufferthis.bufferPosn = LF) newlineLength = (prevCharCR) ? 2 : 1;this.bufferPosn +;break;if (prevCharCR) newlineLength = 1;break;prevCharCR = this.bufferthis.bufferPosn = CR;int readLength = this.bufferPosn - startPosn;if (prevCharCR) &&a
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶(hù)所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶(hù)上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶(hù)上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶(hù)因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 技術(shù)工具與創(chuàng)新教學(xué)法的融合探索
- 幼兒園愛(ài)眼活動(dòng)方案
- 大連民族大學(xué)《食品分析與檢測(cè)》2023-2024學(xué)年第一學(xué)期期末試卷
- 幼兒跨年活動(dòng)策劃方案
- 幼兒午休活動(dòng)方案
- 新疆農(nóng)業(yè)職業(yè)技術(shù)學(xué)院《和聲學(xué)3》2023-2024學(xué)年第一學(xué)期期末試卷
- 廣發(fā)商城活動(dòng)方案
- 廣告公司成立策劃方案
- 廣東格力雙十二活動(dòng)方案
- 廣西暢談?wù)雇顒?dòng)方案
- 江蘇省宿遷市泗洪縣2023-2024學(xué)年六年級(jí)下學(xué)期期末綜合(道德與法治+科學(xué))試卷
- 煙葉道路運(yùn)輸服務(wù)方案
- 胎膜早破護(hù)理查房完整版
- 急性左心衰護(hù)理查房課件
- 用于生態(tài)修復(fù)的粉煤灰
- 單機(jī)試車(chē)檢查、聯(lián)動(dòng)試車(chē)確認(rèn)表
- JB∕T 13883-2020 閥門(mén)電液執(zhí)行裝置
- 山東省東營(yíng)市廣饒縣2023-2024學(xué)年八年級(jí)下學(xué)期期中考試語(yǔ)文試題
- 供電一把手講安全課
- 地下工程暗挖隧道施工質(zhì)量控制培訓(xùn)課件
- 數(shù)學(xué)廣角-推理測(cè)試題
評(píng)論
0/150
提交評(píng)論