HBase Programming HBase編程-TSMC教育訓(xùn)練課程_第1頁(yè)
HBase Programming HBase編程-TSMC教育訓(xùn)練課程_第2頁(yè)
HBase Programming HBase編程-TSMC教育訓(xùn)練課程_第3頁(yè)
HBase Programming HBase編程-TSMC教育訓(xùn)練課程_第4頁(yè)
HBase Programming HBase編程-TSMC教育訓(xùn)練課程_第5頁(yè)
已閱讀5頁(yè),還剩107頁(yè)未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

HBaseProgramming王耀聰陳威宇Jazz@.twwaue@.twTSMC教育訓(xùn)練課程<V0.20>2OutlineHBase程式編譯方法HBase程式設(shè)計(jì)常用的HBaseAPI說明實(shí)做I/O操作搭配MapReduce運(yùn)算案例演練其他專案HBase

程式編譯方法此篇介紹兩種編譯與執(zhí)行HBase程式的方法:Method1–使用JavaJDK1.6Method2–使用Eclipse套件441.Java之編譯與執(zhí)行將hbase_home目錄內(nèi)的.jar檔全部拷貝至hadoop_home/lib/資料夾內(nèi)編譯javac

Δ

-classpath

Δ

hadoop-*-core.jar:hbase-*.jarΔ-d

Δ

MyJavaΔ

MyCode.java封裝jar

Δ-cvfΔ

MyJar.jar

Δ

-CΔ

MyJava

Δ

.執(zhí)行bin/hadoopΔjarΔ

MyJar.jar

Δ

MyCode

Δ

{Input/ΔOutput/}所在的執(zhí)行目錄為Hadoop_Home./MyJava=編譯後程式碼目錄Myjar.jar=封裝後的編譯檔先放些文件檔到HDFS上的input目錄./input;./ouput不一定為hdfs的輸入、輸出目錄52.0Eclipse之編譯與執(zhí)行HBase已可以於Hadoop上正常運(yùn)作請(qǐng)先設(shè)定好Eclipse上得Hadoop開發(fā)環(huán)境可參考附錄Hadoop更詳細(xì)說明請(qǐng)參考另一篇Hadoop0.20程式設(shè)計(jì)建立一個(gè)hadoop的專案62.1設(shè)定專案的細(xì)部屬性12在建立好的專案上點(diǎn)選右鍵,並選擇properties72.2增加專案的Classpath12382.3選擇classpath的library重複2.2的步驟來選取hbase-0.20.*.jar與lib/資料夾內(nèi)的所有jar檔92.4為函式庫(kù)增加原始碼、說明檔的配置HBase程式設(shè)計(jì)此篇介紹如何撰寫HBase程式常用的HBaseAPI說明實(shí)做I/O操作搭配MapReduce運(yùn)算HBase程式設(shè)計(jì)常用的HBaseAPI說明HTable成員ContentsDepartmentnewsbidsportt1com.yahoo.news.tw“我研發(fā)水下6千公尺機(jī)器人”“tech”t2“蚊子怎麼搜尋人肉”“tech”t3“用腦波「發(fā)聲」”“tech”t1com.yahoo.bid.tw“…ipad…”“3C”t1com.yahoo.sport.tw“…Wang40…”“MBA”12Table,Family,Column,Qualifier,Row,TimeStamp13HBase常用函式HBaseAdminHBaseConfigurationHTableHTableDescriptorPutGetScannerDatabaseTableFamilyColumnQualifier14HBaseConfigurationAddsHBaseconfigurationfilestoaConfiguration=newHBaseConfiguration

(

)=newHBaseConfiguration(Configurationc)繼承自org.apache.hadoop.conf.Configuration回傳值函數(shù)參數(shù)voidaddResource(Pathfile)voidclear()Stringget(Stringname)StringgetBoolean(Stringname,booleandefaultValue

)voidset(Stringname,Stringvalue)voidsetBoolean(Stringname,booleanvalue)<property><name>name</name><value>value</value></property>15HBaseAdminHBase的管理介面=newHBaseAdmin(HBaseConfigurationconf)

Ex:回傳值函數(shù)參數(shù)voidaddColumn(StringtableName,HColumnDescriptorcolumn)checkHBaseAvailable(HBaseConfigurationconf)createTable(HTableDescriptordesc)deleteTable(byte[]tableName)deleteColumn(StringtableName,StringcolumnName)enableTable(byte[]tableName)disableTable(StringtableName)HTableDescriptor[]listTables()voidmodifyTable(byte[]tableName,HTableDescriptorhtd)booleantableExists(StringtableName)HBaseAdminadmin=newHBaseAdmin(config);admin.disableTable(“tablename”);16HTableDescriptorHTableDescriptorcontainsthenameofanHTable,anditscolumnfamilies.=newHTableDescriptor()=newHTableDescriptor(Stringname)Constant-valuesorg.apache.hadoop.hbase.HTableDescriptor.TABLE_DESCRIPTOR_VERSIONEx:回傳值函數(shù)參數(shù)voidaddFamily(HColumnDescriptorfamily)HColumnDescriptorremoveFamily(byte[]column)byte[]getName()=Tablenamebyte[]getValue(byte[]key)=對(duì)應(yīng)key的valuevoidsetValue(Stringkey,Stringvalue)HTableDescriptorhtd=newHTableDescriptor(tablename);htd.addFamily

(newHColumnDescriptor(“Family”));17HColumnDescriptorAnHColumnDescriptorcontainsinformationaboutacolumnfamily=newHColumnDescriptor(Stringfamilyname)Constant-valuesorg.apache.hadoop.hbase.HTableDescriptor.TABLE_DESCRIPTOR_VERSIONEx:回傳值函數(shù)參數(shù)byte[]getName()=Familynamebyte[]getValue(byte[]key)=對(duì)應(yīng)key的valuevoidsetValue(Stringkey,Stringvalue)HTableDescriptorhtd=newHTableDescriptor(tablename);HColumnDescriptorcol=newHColumnDescriptor("content:");htd.addFamily(col);18HTableUsedtocommunicatewithasingleHBasetable.=newHTable(HBaseConfigurationconf,StringtableName)Ex:回傳值函數(shù)參數(shù)voidcheckAndPut(byte[]row,byte[]family,byte[]qualifier,byte[]value,Putput)voidclose()booleanexists(Getget)Resultget(Getget)byte[][]getEndKeys()ResultScannergetScanner(byte[]family)HTableDescriptorgetTableDescriptor()byte[]getTableName()staticbooleanisTableEnabled(HBaseConfigurationconf,StringtableName)voidput(Putput)HTabletable=newHTable(conf,Bytes.toBytes(tablename));ResultScannerscanner=table.getScanner(family);19PutUsedtoperformPutoperationsforasinglerow.=newPut(byte[]row)=newPut(byte[]row,RowLockrowLock)Ex:Putadd(byte[]family,byte[]qualifier,byte[]value)Putadd(byte[]column,longts,byte[]value)byte[]getRow()RowLockgetRowLock()longgetTimeStamp()booleanisEmpty()PutsetTimeStamp(longtimestamp)HTabletable=newHTable(conf,Bytes.toBytes(tablename));Putp=newPut

(

brow

);p.add(family,qualifier,value);table.put(p);20GetUsedtoperformGetoperationsonasinglerow.=newGet(byte[]row)=newGet(byte[]row,RowLockrowLock)Ex:GetaddColumn(byte[]column)GetaddColumn(byte[]family,byte[]qualifier)GetaddColumns(byte[][]columns)GetaddFamily(byte[]family)TimeRangegetTimeRange()GetsetTimeRange(longminStamp,longmaxStamp)GetsetFilter(Filterfilter)HTabletable=newHTable(conf,Bytes.toBytes(tablename));Getg=newGet(Bytes.toBytes(row));21ResultSinglerowresultofaGetorScanquery.=newResult()

Ex:booleancontainsColumn(byte[]family,byte[]qualifier)NavigableMap<byte[],byte[]>getFamilyMap(byte[]family)byte[]getValue(byte[]column)byte[]getValue(byte[]family,byte[]qualifier)intSize()HTabletable=newHTable(conf,Bytes.toBytes(tablename));Getg=newGet(Bytes.toBytes(row));ResultrowResult=table.get(g);Bytes[]ret=rowResult.getValue((family+":"+column));22ScannerAlloperationsareidenticaltoGetRatherthanspecifyingasinglerow,anoptionalstartRowandstopRowmaybedefined.Ifrowsarenotspecified,theScannerwilliterateoverallrows.=newScan()=newScan

(byte[]startRow,byte[]stopRow)=newScan(byte[]startRow,Filterfilter)GetaddColumn(byte[]column)GetaddColumn(byte[]family,byte[]qualifier)GetaddColumns(byte[][]columns)GetaddFamily(byte[]family)TimeRangegetTimeRange()GetsetTimeRange(longminStamp,longmaxStamp)GetsetFilter(Filterfilter)23InterfaceResultScannerInterfaceforclient-sidescanning.GotoHTabletoobtaininstances.HTable.getScanner(Bytes.toBytes(family));Ex:voidclose()Resultnext()ResultScannerscanner=table.getScanner(Bytes.toBytes(family));for(ResultrowResult:scanner){ Bytes[]str=rowResult.getValue(family,column);}24HBaseKey/Value的格式org.apache.hadoop.hbase.KeyValuegetRow(),getFamily(),getQualifier(),getTimestamp(),andgetValue().TheKeyValueblobformatinsidethebytearrayis:Key的格式:Rowlength最大值為Short.MAX_SIZE,columnfamilylength最大值為Byte.MAX_SIZE,columnqualifier+keylength必須小於Integer.MAX_SIZE.<keylength><valuelength><key><value>

<row-

length><row><column-

family-

length>

<column-family><column-qualifier>

<time-stamp><key-type>HBase程式設(shè)計(jì)實(shí)做I/O操作26範(fàn)例一:新增Table$hbaseshell>create‘tablename',‘family1','family2','family3‘0row(s)in4.0810seconds>Listtablename1row(s)in0.0190seconds<指令>create<表名>,{<family>,….}27範(fàn)例一:新增Table

publicstaticvoidcreateHBaseTable(Stringtablename,Stringfamilyname)throwsIOException{ HBaseConfigurationconfig=newHBaseConfiguration(); HBaseAdminadmin=newHBaseAdmin(config);HTableDescriptorhtd=newHTableDescriptor(tablename);HColumnDescriptorcol=newHColumnDescriptor(familyname);

htd.addFamily(col);if(admin.tableExists(tablename)){return()}

admin.createTable(htd);}<程式碼>28範(fàn)例二:Put資料進(jìn)Column<指令>>put'tablename','row1','family1:qua1','value'

0row(s)in0.0030seconds

put‘表名’,‘列’

,‘column’,‘值’,[‘時(shí)間’]29範(fàn)例二:Put資料進(jìn)ColumnstaticpublicvoidputData(Stringtablename,Stringrow,Stringfamily, Stringcolumn,Stringvalue)throwsIOException{ HBaseConfigurationconfig=newHBaseConfiguration(); HTabletable=newHTable(config,tablename); byte[]brow=Bytes.toBytes(row); byte[]bfamily=Bytes.toBytes(family); byte[]bcolumn=Bytes.toBytes(column); byte[]bvalue=Bytes.toBytes(value); Putp=newPut(brow);

p.add(bfamily,bcolumn,bvalue);

table.put(p); table.close();}<程式碼>30範(fàn)例三:GetColumn

Value<指令>>get'tablename','row1'COLUMNCELLfamily1:column1timestamp=1265169495385,value=value1row(s)in0.0100secondsget‘表名’,‘列’31範(fàn)例三:GetColumn

ValueStringgetColumn

(

Stringtablename,Stringrow,Stringfamily, Stringcolumn

)throwsIOException{ HBaseConfigurationconf=newHBaseConfiguration(); HTabletable; table=newHTable(

conf,Bytes.toBytes(

tablename)); Getg=newGet(Bytes.toBytes(row)); ResultrowResult=table.get(g); returnBytes.toString(

rowResult.getValue

(

Bytes.toBytes

(family+“:”

+column))); }<程式碼>32範(fàn)例四:ScanallColumn>scan'tablename'ROWCOLUMN+CELLrow1column=family1:column1,timestamp=1265169415385,value=value1row2column=family1:column1,timestamp=1263534411333,value=value2row3column=family1:column1,timestamp=1263645465388,value=value3row4column=family1:column1,timestamp=1264654615301,value=value4row5column=family1:column1,timestamp=1265146569567,value=value55row(s)in0.0100seconds<指令>scan‘表名’33範(fàn)例四:ScanallColumnstaticvoidScanColumn(Stringtablename,Stringfamily,Stringcolumn)throwsIOException{ HBaseConfigurationconf=newHBaseConfiguration(); HTabletable=newHTable(conf,Bytes.toBytes(tablename)); ResultScannerscanner=table.getScanner(

Bytes.toBytes(family)); inti=1; for(ResultrowResult:scanner){ byte[]by=rowResult.getValue(

Bytes.toBytes(family),Bytes.toBytes(column)); Stringstr=Bytes.toString(by); System.out.println("row"+i+"is\""+str+"\""); i++;}}}<程式碼>34範(fàn)例五:刪除資料表>disable'tablename'0row(s)in6.0890seconds>drop'tablename'0row(s)in0.0090seconds0row(s)in0.0090seconds0row(s)in0.0710seconds<指令>disable‘表名’drop‘表名’35範(fàn)例五:刪除資料表staticvoiddrop(Stringtablename)throwsIOExceptions{ HBaseConfigurationconf=newHBaseConfiguration(); HBaseAdminadmin=newHBaseAdmin(conf); if(admin.tableExists(tablename)) {

admin.disableTable(tablename);

admin.deleteTable(tablename); }else{ System.out.println("["+tablename+"]notfound!");}}<程式碼>HBase程式設(shè)計(jì)MapReduce與

HBase的搭配37範(fàn)例六:WordCountHBase說明: 此程式碼將輸入路徑的檔案內(nèi)的字串取出做字?jǐn)?shù)統(tǒng)計(jì) 再將結(jié)果塞回HTable內(nèi)運(yùn)算方法: 將此程式運(yùn)作在hadoop0.20平臺(tái)上,用(參考2)的方法加入hbase參數(shù)後,將此程式碼打包成XX.jar結(jié)果: >scan'wordcount' ROW COLUMN+CELL am column=content:count,timestamp=1264406245488,value=1 chen column=content:count,timestamp=1264406245488,value=1 hi, column=content:count,timestamp=1264406245488,value=2注意:1. 在hdfs上來源檔案的路徑為"/user/$YOUR_NAME/input"

請(qǐng)注意必須先放資料到此hdfs上的資料夾內(nèi),且此資料夾內(nèi)只能放檔案,不可再放資料夾2. 運(yùn)算完後,程式將執(zhí)行結(jié)果放在hbase的wordcount資料表內(nèi)38範(fàn)例六:WordCountHBasepublicclassWordCountHBase{publicstaticclassMapextendsMapper<LongWritable,Text,Text,IntWritable>{privateIntWritablei=newIntWritable(1);publicvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{Strings[]=value.toString().trim().split("");for(Stringm:s){context.write(newText(m),i);}}}publicstaticclassReduceextendsTableReducer<Text,IntWritable,NullWritable>{publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{intsum=0;for(IntWritablei:values){sum+=i.get();} Putput=newPut(Bytes.toBytes(key.toString()));put.add(Bytes.toBytes("content"),Bytes.toBytes("count"),Bytes.toBytes(String.valueOf(sum)));context.write(NullWritable.get(),put);}}<1>39範(fàn)例六:WordCountHBasepublicstaticvoidcreateHBaseTable(Stringtablename)throwsIOException{HTableDescriptorhtd=newHTableDescriptor(tablename);HColumnDescriptorcol=newHColumnDescriptor("content:");htd.addFamily(col);HBaseConfigurationconfig=newHBaseConfiguration();HBaseAdminadmin=newHBaseAdmin(config);if(admin.tableExists(tablename)){admin.disableTable(tablename);admin.deleteTable(tablename);}System.out.println("createnewtable:"+tablename);admin.createTable(htd);}publicstaticvoidmain(Stringargs[])throwsException{Stringtablename="wordcount";Configurationconf=newConfiguration();conf.set(TableOutputFormat.OUTPUT_TABLE,tablename);createHBaseTable(tablename);Stringinput=args[0];Jobjob=newJob(conf,"WordCount"+input);job.setJarByClass(WordCountHBase.class);job.setNumReduceTasks(3);job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TableOutputFormat.class);FileInputFormat.addInputPath(job,newPath(input));System.exit(job.waitForCompletion(true)?0:1);}}<2>40範(fàn)例七:LoadHBaseMapper說明: 此程式碼將HBase的資料取出來,再將結(jié)果塞回hdfs上運(yùn)算方法: 將此程式運(yùn)作在hadoop0.20平臺(tái)上,用(參考2)的方法加入hbase參數(shù)後,將此程式碼打包成XX.jar結(jié)果:

$hadoopfs-cat<hdfs_output>/part-r-00000--------------------------- 543031 GunLong 543032 Esing 543033 SunDon 543034 StarBucks---------------------------注意:1. 請(qǐng)注意hbase上必須要有table,並且已經(jīng)有資料2. 運(yùn)算完後,程式將執(zhí)行結(jié)果放在你指定hdfs的<hdfs_output>內(nèi) 請(qǐng)注意沒有<hdfs_output>資料夾41範(fàn)例七:LoadHBaseMapperpublicclassLoadHBaseMapper{publicstaticclassHtMapextendsTableMapper<Text,Text>{publicvoidmap(ImmutableBytesWritablekey,Resultvalue, Contextcontext)throwsIOException,InterruptedException{ Stringres=Bytes.toString(value.getValue(Bytes.toBytes("Detail"), Bytes.toBytes("Name"))); context.write(newText(key.toString()),newText(res));}}publicstaticclassHtReduceextendsReducer<Text,Text,Text,Text>{publicvoidreduce(Textkey,Iterable<Text>values,Contextcontext) throwsIOException,InterruptedException{ Stringstr=newString(""); Textfinal_key=newText(key); Textfinal_value=newText(); for(Texttmp:values){ str+=tmp.toString(); } final_value.set(str); context.write(final_key,final_value);}}<1>42範(fàn)例七:LoadHBaseMapperpublicstaticvoidmain(Stringargs[])throwsException{Stringinput=args[0];Stringtablename="tsmc";Configurationconf=newConfiguration();Jobjob=newJob(conf,tablename+"hbasedatatohdfs");job.setJarByClass(LoadHBaseMapper.class);TableMapReduceUtil.initTableMapperJob(tablename,myScan,HtMap.class,Text.class,Text.class,job);job.setMapperClass(HtMap.class);job.setReducerClass(HtReduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setInputFormatClass(TableInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileOutputFormat.setOutputPath(job,newPath(input));System.exit(job.waitForCompletion(true)?0:1);}}<2>其他用法補(bǔ)充HBase內(nèi)contrib的項(xiàng)目,如TrancationalThriftHBase程式設(shè)計(jì)441.TransactionalHBaseIndexedTable=SecondaryIndex=TransactionalHBase內(nèi)容與原本table相似的另一張table,但key不同,利於排列內(nèi)容namepricedescription1apple10xx2orig5ooo3banana15vvvv4tomato8uunamepricedescription2orig5ooo4tomato8uu1apple10xx3banana15vvvvIndexedTablePrimaryTable451.1TransactionalHBase

環(huán)境設(shè)定<property><name>hbase.regionserver.class</name><value>org.apache.hadoop.hbase.ipc.IndexedRegionInterface</value></property><property><name>hbase.regionserver.impl</name><value>org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegionServer</value></property>需在$HBASE_INSTALL_DIR/conf/hbase-site.xml

檔內(nèi)增加兩項(xiàng)內(nèi)容461.a

Ex:從一個(gè)原有的Table增加IndexedTablepublicvoidaddSecondaryIndexToExistingTable (StringTableName,StringIndexID,StringIndexColumn)throwsIOException{ HBaseConfigurationconf=newHBaseConfiguration(); IndexedTableAdminadmin=null; admin=newIndexedTableAdmin(conf); admin.addIndex(Bytes.toBytes(TableName),newIndexSpecification( IndexID,Bytes.toBytes(IndexColumn)));}}471.bEx:建立一個(gè)新的Table附帶IndexedTablepublicvoidcreateTableWithSecondaryIndexes(StringTableName, StringIndexColumn)throwsIOException{ HBaseConfigurationconf=newHBaseConfiguration(); conf.addResource(newPath("/opt/hbase/conf/hbase-site.xml")); HTableDescriptordesc=newHTableDescriptor(TableName); desc.addFamily(newHColumnDescriptor(“Family1")); IndexedTableDescriptorIdxdesc=newIndexedTableDescriptor(desc); Idxdesc.addIndex(newIndexSpecification(IndexColumn,Bytes .toBytes("Family1:"+IndexColumn))); IndexedTableAdminadmin=newIndexedTableAdmin(conf); admin.createIndexedTable(Idxdesc);}482.Thrift由Facebook所開發(fā)提供跨語言做資料交換的平臺(tái)你可以用任何Thrift有支援的語言來存取HBasePHPPerlC++Python…..492.1ThriftPHPExampleInsertdataintoHBasebyPHPthriftclient

$mutations=array(

newMutation(array(

'column'=>'entry:num',

'value'=>array('a','b','c')

)),);

$client->mutateRow($t,$row,$mutations);

案例演練利用一個(gè)虛擬的案例來運(yùn)用之前的程式碼51TSMC餐廳開張囉!故事背景:TSMC的第101廠即將開張,預(yù)計(jì)此廠員工將有200萬人用傳統(tǒng)資料庫(kù)可能:大規(guī)模資料、同時(shí)讀寫,資料分析運(yùn)算、…(自行發(fā)揮)因此員工餐廳將導(dǎo)入HBase資料庫(kù)存放資料透過Hadoop進(jìn)行MapReduce分析運(yùn)算521.建立商店資料DetailProductsTurnoverNameLocateP1P2P3P4T01GunLong0120403050T02ESing0250T03SunDon034030T04StarBucks04505020假設(shè):目前有四間商店進(jìn)駐TSMC餐廳,分別為位在

第1區(qū)的GunLong,品項(xiàng)4項(xiàng)單價(jià)為<20,40,30,50>

第2區(qū)的ESing,品項(xiàng)1項(xiàng)單價(jià)為<50>

第3區(qū)的SunDon,品項(xiàng)2項(xiàng)單價(jià)為<40,30>

第4區(qū)的StarBucks,品項(xiàng)3項(xiàng)單價(jià)為<50,50,20>531.a建立初始HTablepublicvoidcreateHBaseTable(Stringtablename,String[]family) throwsIOException{HTableDescriptorhtd=newHTableDescriptor(tablename);for(Stringfa:family){ htd.addFamily(newHColumnDescriptor(fa));}HBaseConfigurationconfig=newHBaseConfiguration();HBaseAdminadmin=newHBaseAdmin(config);if(admin.tableExists(tablename)){ System.out.println("Table:"+tablename+"Existed.");}else{ System.out.println("createnewtable:"+tablename); admin.createTable(htd);}}<程式碼>541.a執(zhí)行結(jié)果FamilyDetailProductsTurnoverQualifier………Row1valueRow2Row3…Table:TSMC551.b用讀檔方式把資料匯入HTablevoidloadFile2HBase(Stringfile_in,Stringtable_name)throwsIOException{BufferedReaderfi=newBufferedReader( newFileReader(newFile(file_in)));Stringline;while((line=fi.readLine())!=null){ String[]str=line.split(";"); intlength=str.length; PutData.putData(table_name,str[0].trim(),"Detail","Name",str[1] .trim()); PutData.putData(table_name,str[0].trim(),"Detail","Locate", str[2].trim()); for(inti=3;i<length;i++){ PutData.putData(table_name,str[0],"Products","P"+(i-2), str[i]); } System.out.println();}fi.close();}<程式碼>561.b執(zhí)行結(jié)果DetailProductsTurnoverNameLocateP1P2P3P4T01GunLong0120403050T02ESing0250T03SunDon034030T04StarBucks04505020571.螢?zāi)惠敵鼋Y(jié)果createnewtable:tsmcPutdata:"GunLong"toTable:tsmc'sDetail:NamePutdata:"01"toTable:tsmc'sDetail:LocatePutdata:"20"toTable:tsmc'sProducts:P1Putdata:"40"toTable:tsmc'sProducts:P2Putdata:"30"toTable:tsmc'sProducts:P3Putdata:"50"toTable:tsmc'sProducts:P4Putdata:"Esing"toTable:tsmc'sDetail:NamePutdata:"02"toTable:tsmc'sDetail:LocatePutdata:"50"toTable:tsmc'sProducts:P1Putdata:"SunDon"toTable:tsmc'sDetail:NamePutdata:"03"toTable:tsmc'sDetail:LocatePutdata:"40"toTable:tsmc'sProducts:P1Putdata:"30"toTable:tsmc'sProducts:P2Putdata:"StarBucks"toTable:tsmc'sDetail:NamePutdata:"04"toTable:tsmc'sDetail:LocatePutdata:"50"toTable:tsmc'sProducts:P1Putdata:"50"toTable:tsmc'sProducts:P2Putdata:"20"toTable:tsmc'sProducts:P3582計(jì)算單月每個(gè)品項(xiàng)的購(gòu)買次數(shù)刷卡購(gòu)餐的系統(tǒng)將每人每次購(gòu)餐紀(jì)錄成檔案,格式如右讀紀(jì)錄檔並統(tǒng)計(jì)每天每個(gè)品項(xiàng)的消費(fèi)次數(shù)將檔案上傳至hdfs使用Hadoop運(yùn)算計(jì)算完後寫入HBaseTurnover:P1,P2,P3,P4waue:T01:P1:xxjazz:T01:P2:xxx

lia:T01:P3:xxxx

hung:T02:P1:xxlia:T04:P1:xxxx

lia:T04:P1:xxxx

hung:T04:P3:xx

hung:T04:P2:xx……592.用Hadoop的MapReduce運(yùn)算並把結(jié)果匯入HTablepublicclassTSMC2Count{publicstaticclassHtMapextends Mapper<LongWritable,Text,Text,IntWritable>{privateIntWritableone=newIntWritable(1);publicvoidmap(LongWritablekey,Textvalue,Contextcontext) throwsIOException,InterruptedException{ Strings[]=value.toString().trim().split(":"); //xxx:T01:P4:oooo=>T01@P4 Stringstr=s[1]+"@"+s[2]; context.write(newText(str),one);}}publicstaticclassHtReduceextends TableReducer<Text,IntWritable,LongWritable>{publicvoidreduce(Textkey,Iterable<IntWritable>values, Contextcontext)throwsIOException,InterruptedException{ intsum=0; for(IntWritablei:values)sum+=i.get(); String[]str=(key.toString()).split("@"); byte[]row=(str[0]).getBytes(); byte[]family=Bytes.toBytes("Turnover"); byte[]qualifier=(str[1]).getBytes(); byte[]summary=Bytes.toBytes(String.valueOf(sum)); Putput=newPut(row); put.add(family,qualifier,summary); context.write(newLongWritable(),put);}}<reduce程式碼><map程式碼>60publicstaticvoidmain(Stringargs[])throwsException{Stringinput="income";Stringtablename="tsmc";Configurationconf=newConfiguration();conf.set(TableOutputFormat.OUTPUT_TABLE,tablename);Jobjob=newJob(conf,"Counttotsmc");job.setJarByClass(TSMC2Count.class);job.setMapperClass(HtMap.class);job.setReducerClass(HtReduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TableOutputFormat.class);FileInputFormat.addInputPath(job,newPath(input));System.exit(job.waitForCompletion(true)?0:1);}}2.用Hadoop的MapReduce運(yùn)算並把結(jié)果匯入HTable<Main程式碼>612執(zhí)行結(jié)果DetailProductsTurnoverNameLocateP1P2P3P4P1P2P3P4T01GunLong01204030501111T02ESing02502T03SunDon0340303T04StarBucks0450502021162>scan'tsmc'ROWCOLUMN+CELLT01column=Detail:Locate,timestamp=1265184360616,value=01T01column=Detail:Name,timestamp=1265184360548,value=GunLongT01column=Products:P1,timestamp=1265184360694,value=20T01column=Products:P2,timestamp=1265184360758,value=40T01column=Products:P3,timestamp=1265184360815,value=30T01column=Products:P4,timestamp=1265184360866,value=50T01column=Turnover:P1,timestamp=1265187021528,value=1T01column=Turnover:P2,timestamp=1265187021528,value=1T01column=Turnover:P3,timestamp=1265187021528,value=1T01column=Turnover:P4,timestamp=1265187021528,value=1T02column=Detail:Locate,timestamp=1265184360951,value=02T02column=Detail:Name,timestamp=1265184360910,value=EsingT02column=Products:P1,timestamp=1265184361051,value=50T02column=Turnover:P1,timestamp=1265187021528,value=2T03column=Detail:Locate,timestamp=1265184361124,value=03T03column=Detail:Name,timestamp=1265184361098,value=SunDonT03column=Products:P1,timestamp=1265184361189,value=40T03column=Products:P2,timestamp=1265184361259,value=30T03column=Turnover:P1,timestamp=1265187021529,value=3T04column=Detail:Locate,timestamp=1265184361311,value=04T04column=Detail:Name,timestamp=1265184361287,value=StarBucksT04column=Products:P1,timestamp=1265184361343,value=50T04column=Products:P2,timestamp=1265184361386,value=50T04column=Products:P3,timestamp=1265184361422,value=20T04column=Turnover:P1,timestamp=1265187021529,value=2T04column=Turnover:P2,timestamp=1265187021529,value=1T04column=Turnover:P3,timestamp=1265187021529,value=14row(s)in0.0310seconds633.計(jì)算當(dāng)天營(yíng)業(yè)額計(jì)算每間商店的營(yíng)業(yè)額Σ(<該項(xiàng)商品單價(jià)>X<被購(gòu)買的次數(shù)>)透過Hadoop的Map()從HBase內(nèi)的Products:{P1,P2,P3,P4}與Turnover:{P1,P2,P3,P4}調(diào)出來經(jīng)過計(jì)算後由Hadoop的Reduce()寫回HBase內(nèi)Turnover:Sum的Column內(nèi)需考慮到表格內(nèi)每家的商品數(shù)量皆不同、有的品項(xiàng)沒有被購(gòu)買643.Hadoop來源與輸出皆為HBasepublicclassTSMC3CalculateMR{publicstaticclassHtMapextendsTableMapper<Text,Text>{publicvoidmap(ImmutableBytesWritablekey,Resultvalue, Contextcontext)throwsIOException,InterruptedException{Stringrow=Bytes.toString(value.getValue(Bytes.toBytes("Detail"), Bytes.toBytes("Locate")));intsum=0;for(inti=0;i<4;i++){ Stringv=Bytes.toString(value.getValue(Bytes .toBytes("Products"),Bytes.toBytes("P"+(i+

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論