自定義Hadoop Map輸入文件切割I(lǐng)nputFormat_第1頁
自定義Hadoop Map輸入文件切割I(lǐng)nputFormat_第2頁
自定義Hadoop Map輸入文件切割I(lǐng)nputFormat_第3頁
自定義Hadoop Map輸入文件切割I(lǐng)nputFormat_第4頁
自定義Hadoop Map輸入文件切割I(lǐng)nputFormat_第5頁
已閱讀5頁,還剩13頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

1、自定義Hadoop Map/Reduce輸入文件切割I(lǐng)nputFormatHadoop會對原始輸入文件進(jìn)行文件切割,然后把每個split傳入mapper程序中進(jìn)行處理,F(xiàn)ileInputFormat是所有以文件作 為數(shù)據(jù)源的InputFormat實(shí)現(xiàn)的基類,F(xiàn)ileInputFormat保存作為job輸入的所有文件,并實(shí)現(xiàn)了對輸入文件計(jì)算splits的方 法。至于獲得記錄的方法是有不同的子類進(jìn)行實(shí)現(xiàn)的。那么,F(xiàn)ileInputFormat是怎樣將他們劃分成splits的呢?FileInputFormat只劃分比HDFS block大的文件,所以如果一個文件的大小比block小,將不會被劃分,這

2、也是Hadoop處理大文件的效率要比處理很多小文件的效率高的原因。hadoop默認(rèn)的InputFormat是TextInputFormat,重寫了FileInputFormat中的createRecordReader和isSplitable方法。該類使用的reader是LineRecordReader,即以回車鍵(CR = 13)或換行符(LF = 10)為行分隔符。但大多數(shù)情況下,回車鍵或換行符作為輸入文件的行分隔符并不能滿足我們的需求,通常用戶很有可能會輸入回車鍵、換行符,所以通常我們會定義不可見字符(即用戶無法輸入的字符)為行分隔符,這種情況下,就需要新寫一個InputFormat。又或

3、者,一條記錄的分隔符不是字符,而是字符串,這種情況相對麻煩;還有一種情況,輸入文件的主鍵key已經(jīng)是排好序的了,需要hadoop做的只是把相 同的key作為一個數(shù)據(jù)塊進(jìn)行邏輯處理,這種情況更麻煩,相當(dāng)于免去了mapper的過程,直接進(jìn)去reduce,那么InputFormat的邏輯就相 對較為復(fù)雜了,但并不是不能實(shí)現(xiàn)。1、改變一條記錄的分隔符,不用默認(rèn)的回車或換行符作為記錄分隔符,甚至可以采用字符串作為記錄分隔符。1)自定義一個InputFormat,繼承FileInputFormat,重寫createRecordReader方法,如果不需要分片或者需要改變分片的方式,則重寫isSplitab

4、le方法,具體代碼如下:public class FileInputFormatB extends FileInputFormat<LongWritable, Text> Overridepublic RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) return new SearchRecordReader("b");Overrideprotected boolean isSplitable(FileS

5、ystem fs, Path filename) / 輸入文件不分片return false;2)關(guān)鍵在于定義一個新的SearchRecordReader繼承RecordReader,支持自定義的行分隔符,即一條記錄的分隔符。標(biāo)紅的地方為與hadoop默認(rèn)的LineRecordReader不同的地方。public class IsearchRecordReader extends RecordReader<LongWritable, Text> private static final Log LOG = LogFactory.getLog(IsearchRecordReader.

6、class);private CompressionCodecFactory compressionCodecs = null;private long start;private long pos;private long end;private LineReader in;private int maxLineLength;private LongWritable key = null;private Text value = null;/行分隔符,即一條記錄的分隔符private byte separator = 'b'private int sepLength = 1;

7、public IsearchRecordReader()public IsearchRecordReader(String seps)this.separator = seps.getBytes(); sepLength = separator.length;public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException FileSplit split = (FileSplit) genericSplit;Configuration job = context.getC

8、onfiguration();this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);this.start = split.getStart();this.end = (this.start + split.getLength();Path file = split.getPath();pressionCodecs = new CompressionCodecFactory(job);CompressionCodec codec = pressionCod

9、ecs.getCodec(file);/ open the file and seek to the start of the splitFileSystem fs = file.getFileSystem(job);FSDataInputStream fileIn = fs.open(split.getPath();boolean skipFirstLine = false;if (codec != null) this.in = new LineReader(codec.createInputStream(fileIn), job);this.end = Long.MAX_VALUE; e

10、lse if (this.start != 0L) skipFirstLine = true;this.start -= sepLength;fileIn.seek(this.start);this.in = new LineReader(fileIn, job);if (skipFirstLine) / skip first line and re-establish "start".int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start)

11、;if(newSize > 0)start += newSize;this.pos = this.start;public boolean nextKeyValue() throws IOException if (this.key = null) this.key = new LongWritable();this.key.set(this.pos);if (this.value = null) this.value = new Text();int newSize = 0;while (this.pos < this.end) newSize = this.in.readLin

12、e(this.value, this.maxLineLength, Math.max(int) Math.min(Integer.MAX_VALUE, this.end - this.pos), this.maxLineLength);if (newSize = 0) break;this.pos += newSize;if (newSize < this.maxLineLength) break;LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - newSiz

13、e);if (newSize = 0) /讀下一個bufferthis.key = null;this.value = null;return false;/讀同一個buffer的下一個記錄return true;public LongWritable getCurrentKey() return this.key;public Text getCurrentValue() return this.value;public float getProgress() if (this.start = this.end) return 0.0F;return Math.min(1.0F, (floa

14、t) (this.pos - this.start) / (float) (this.end - this.start);public synchronized void close() throws IOException if (this.in != null)this.in.close();3)重寫SearchRecordReader需要的LineReader,可作為SearchRecordReader內(nèi)部類。特別需要注意的地方就 是,讀取文件的方式是按指定大小的buffer來讀,必定就會遇到一條完整的記錄被切成兩半,甚至如果分隔符大于1個字符時分隔符也會被切成兩半的情況, 這種情況一定

15、要加以拼接處理。public class LineReader /回車鍵(hadoop默認(rèn))/private static final byte CR = 13;/換行符(hadoop默認(rèn))/private static final byte LF = 10;/按buffer進(jìn)行文件讀取private static final int DEFAULT_BUFFER_SIZE = 32 * 1024 * 1024;private int bufferSize = DEFAULT_BUFFER_SIZE;private InputStream in;private byte buffer;priv

16、ate int bufferLength = 0;private int bufferPosn = 0;LineReader(InputStream in, int bufferSize) this.bufferLength = 0;this.bufferPosn = 0;this.in = in;this.bufferSize = bufferSize;this.buffer = new bytethis.bufferSize;public LineReader(InputStream in, Configuration conf) throws IOException this(in, c

17、onf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);public void close() throws IOException in.close();public int readLine(Text str, int maxLineLength) throws IOException return readLine(str, maxLineLength, Integer.MAX_VALUE);public int readLine(Text str) throws IOException return readLi

18、ne(str, Integer.MAX_VALUE, Integer.MAX_VALUE);/以下是需要改寫的部分_start,核心代碼public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOExceptionstr.clear();Text record = new Text();int txtLength = 0;long bytesConsumed = 0L;boolean newline = false;int sepPosn = 0;do /已經(jīng)讀到buffer的末尾了,讀下一個

19、bufferif (this.bufferPosn >= this.bufferLength) bufferPosn = 0;bufferLength = in.read(buffer);/讀到文件末尾了,則跳出,進(jìn)行下一個文件的讀取if (bufferLength <= 0) break;int startPosn = this.bufferPosn;for (; bufferPosn < bufferLength; bufferPosn +) /處理上一個buffer的尾巴被切成了兩半的分隔符(如果分隔符中重復(fù)字符過多在這里會有問題)if(sepPosn > 0 &

20、amp;& bufferbufferPosn != separatorsepPosn)sepPosn = 0;/遇到行分隔符的第一個字符if (bufferbufferPosn = separatorsepPosn) bufferPosn +;int i = 0;/判斷接下來的字符是否也是行分隔符中的字符for(+ sepPosn; sepPosn < sepLength; i +, sepPosn +)/buffer的最后剛好是分隔符,且分隔符被不幸地切成了兩半if(bufferPosn + i >= bufferLength)bufferPosn += i - 1;br

21、eak;/一旦其中有一個字符不相同,就判定為不是分隔符if(this.bufferthis.bufferPosn + i != separatorsepPosn)sepPosn = 0;break;/的確遇到了行分隔符if(sepPosn = sepLength)bufferPosn += i;newline = true;sepPosn = 0;break;int readLength = this.bufferPosn - startPosn;bytesConsumed += readLength;/行分隔符不放入塊中/int appendLength = readLength - new

22、lineLength;if (readLength > maxLineLength - txtLength) readLength = maxLineLength - txtLength;if (readLength > 0) record.append(this.buffer, startPosn, readLength);txtLength += readLength;/去掉記錄的分隔符if(newline)str.set(record.getBytes(), 0, record.getLength() - sepLength); while (!newline &&a

23、mp; (bytesConsumed < maxBytesToConsume);if (bytesConsumed > (long)Integer.MAX_VALUE) throw new IOException("Too many bytes before newline: " + bytesConsumed);return (int) bytesConsumed;/以下是需要改寫的部分_end/以下是hadoop-core中LineReader的源碼_startpublic int readLine(Text str, int maxLineLength,

24、int maxBytesToConsume) throws IOExceptionstr.clear();int txtLength = 0;int newlineLength = 0;boolean prevCharCR = false;long bytesConsumed = 0L;do int startPosn = this.bufferPosn;if (this.bufferPosn >= this.bufferLength) startPosn = this.bufferPosn = 0;if (prevCharCR) bytesConsumed +;this.bufferL

25、ength = this.in.read(this.buffer);if (this.bufferLength <= 0) break;for (; this.bufferPosn < this.bufferLength; this.bufferPosn +) if (this.bufferthis.bufferPosn = LF) newlineLength = (prevCharCR) ? 2 : 1;this.bufferPosn +;break;if (prevCharCR) newlineLength = 1;break;prevCharCR = this.bufferthis.bufferPosn = CR;int readLength = this.bufferPosn - startPosn;if (prevCharCR) &&a

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論