超人學(xué)院-使用Storm實(shí)現(xiàn)實(shí)時(shí)大數(shù)據(jù)分析_第1頁(yè)
超人學(xué)院-使用Storm實(shí)現(xiàn)實(shí)時(shí)大數(shù)據(jù)分析_第2頁(yè)
超人學(xué)院-使用Storm實(shí)現(xiàn)實(shí)時(shí)大數(shù)據(jù)分析_第3頁(yè)
超人學(xué)院-使用Storm實(shí)現(xiàn)實(shí)時(shí)大數(shù)據(jù)分析_第4頁(yè)
超人學(xué)院-使用Storm實(shí)現(xiàn)實(shí)時(shí)大數(shù)據(jù)分析_第5頁(yè)
已閱讀5頁(yè),還剩28頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

超人學(xué)院講師今天為你講解使用實(shí)現(xiàn)實(shí)時(shí)大數(shù)據(jù)分析有效的開(kāi)源實(shí)時(shí)計(jì)算工具就是——開(kāi)發(fā),通常被比作“實(shí)時(shí)的Hadoop”。然而遠(yuǎn)比來(lái)的簡(jiǎn)單,因?yàn)橛盟幚泶髷?shù)據(jù)不會(huì)帶來(lái)新老技術(shù)的交替。本文詳述了的使用方法,例子中的項(xiàng)目名稱為“超速報(bào)警系統(tǒng)()”。我們想實(shí)現(xiàn)的功能是:實(shí)時(shí)分析過(guò)往車輛的數(shù)據(jù),一旦車輛數(shù)據(jù)超過(guò)預(yù)設(shè)的臨界值——便觸發(fā)一個(gè)并把相關(guān)的數(shù)據(jù)存入數(shù)據(jù)庫(kù)。對(duì)比樣也可以處理大批量的數(shù)據(jù),然而在保證高可靠性的前提下還可以讓處理些特性,這就讓可以擴(kuò)展到不同的機(jī)器上進(jìn)行大批量的數(shù)據(jù)處理。他同樣還有以下??易于擴(kuò)展。對(duì)于擴(kuò)展,你只需要添加機(jī)器和改變對(duì)應(yīng)的(拓?fù)?設(shè)置。使用進(jìn)行集群協(xié)調(diào),這樣可以充分的保證大型集群的良好運(yùn)行。?每條信息的處理都可以得到保證。?集群管理簡(jiǎn)易。,會(huì)一直運(yùn)行它直到被廢除或者被關(guān)閉。而在執(zhí)行中出現(xiàn)錯(cuò)誤時(shí),也會(huì)由重新分配任務(wù)。?盡管通常使用,中的可以用任何語(yǔ)言設(shè)計(jì)。當(dāng)然為了更好的理解文章,你首先需要安裝和設(shè)置。需要通過(guò)以下幾個(gè)簡(jiǎn)單的步驟:??從官方下載安裝文件?將解壓到你的上,并保證集群主要由一個(gè)主節(jié)點(diǎn)和一群工作節(jié)點(diǎn)(腳本是可執(zhí)行的。進(jìn)行主節(jié)點(diǎn)通常運(yùn)行一個(gè)后臺(tái)程序——,用于響應(yīng)分布在集群中的節(jié)點(diǎn),分配任務(wù)和監(jiān)測(cè)故障。這個(gè)很類似于中的。工作節(jié)點(diǎn)同樣會(huì)運(yùn)行一個(gè)后臺(tái)程序——,用于收聽(tīng)工作指派并基于要求運(yùn)行工作進(jìn)程。每個(gè)工作節(jié)點(diǎn)都是中一個(gè)子集的實(shí)現(xiàn)。而和之間的協(xié)調(diào)則通過(guò)系統(tǒng)或者集群。是完成和之間協(xié)調(diào)的服務(wù)。而應(yīng)用程序?qū)崿F(xiàn)實(shí)時(shí)的邏輯則被封裝進(jìn)中的“topology”。則是一組由(數(shù)據(jù)源)和(數(shù)據(jù)操作)通過(guò)進(jìn)行連接的圖。下面對(duì)出現(xiàn)的術(shù)語(yǔ)進(jìn)行更深刻的解析。:接收失敗時(shí),可靠的會(huì)對(duì)(元組,數(shù)據(jù)項(xiàng)組成的列表)進(jìn)行重發(fā);而不可靠的不會(huì)考慮接收成功與否只發(fā)射一次。而中最主要的方法就是(),該方法會(huì)發(fā)射一個(gè)新的到,如果沒(méi)有新發(fā)射則會(huì)簡(jiǎn)單的返回。:中所有的處理都由完成??梢酝瓿扇魏问?,比如:連接的過(guò)濾、聚合、文件數(shù)據(jù)庫(kù)、等等。從中接收數(shù)據(jù)并進(jìn)行處理,如果遇到復(fù)雜流的處理也可能將發(fā)送給另一個(gè)進(jìn)行處理。而中最重要的方法是(),以新的作為參數(shù)接收。不管是還是,如果將發(fā)射成多個(gè)流,這些流都可以通過(guò)()來(lái)聲明。:定義了一個(gè)流在型:任務(wù)間該如何被切分。這里有提供的個(gè)隨機(jī)分組():隨機(jī)分發(fā)到的任務(wù),保證每個(gè)任務(wù)獲得相等字段分組():根據(jù)指定字段分割數(shù)據(jù)流,并分組。例如,根據(jù)“userid”字段,相同“userid”的元組總是分發(fā)到同一個(gè)任務(wù),不同“userid”的元組可能分發(fā)到不同的全部分組():被復(fù)制到的所有任務(wù)。這種類型需要謹(jǐn)慎使用。全局分組():全部流都分配到的同一個(gè)任務(wù)。明確地說(shuō),是分配給最小的那個(gè)。但最終,將把無(wú)分組的放到或訂閱它們的同一線程去執(zhí)行(如果可能)。直接分組(。當(dāng)然還可以實(shí)現(xiàn)項(xiàng)目實(shí)施當(dāng)下情況我們需要給):這是一個(gè)特別的分組類型。元組生產(chǎn)者決定接口來(lái)定制自己需要的分組。和設(shè)計(jì)一種能夠處理大量數(shù)據(jù)(日志文件)的當(dāng)一個(gè)特定數(shù)據(jù)值超過(guò)預(yù)設(shè)的臨界值時(shí)促發(fā)警報(bào)。使用的,逐行讀入日志件中讀入數(shù)據(jù),同時(shí)還監(jiān)視著新文件。文件一旦被修改前的(可以被讀入的格式),將發(fā)射給現(xiàn)所有可能超臨界的記錄。下一節(jié)將對(duì)用例進(jìn)行詳細(xì)介紹。會(huì)讀入新的版本并且覆蓋之進(jìn)行臨界分析,這樣就可以發(fā)這一節(jié),將主要聚焦于臨界值的兩種分析類型:瞬間臨界()和時(shí)間序列臨界()。?瞬間臨界值監(jiān)測(cè):一個(gè)字段的值在那個(gè)瞬間超過(guò)了預(yù)設(shè)的臨界值,如果條件符合的話則觸發(fā)一個(gè)。舉個(gè)例子當(dāng)車輛超越公里每小時(shí),則觸發(fā)。?時(shí)間序列臨界監(jiān)測(cè):字段的值在一個(gè)給定的時(shí)間段內(nèi)超過(guò)了預(yù)設(shè)的臨界值,如果條件符顯顯示了我們將使用的一個(gè)類型日志,其中包含的車輛數(shù)據(jù)信息有:車牌號(hào)、車輛行駛的速度以及數(shù)據(jù)獲取的位置。這里將創(chuàng)建一個(gè)對(duì)應(yīng)的文件,這將包含引入數(shù)據(jù)的模式。這個(gè)將用于日志文件文件和日志文件都存放在可以隨時(shí)監(jiān)測(cè)的目錄下,用以關(guān)注文件的實(shí)時(shí)更新。而這個(gè)用例中的請(qǐng)見(jiàn)下圖。::,用以實(shí)現(xiàn)數(shù)據(jù)實(shí)時(shí)處理接收輸入日志并進(jìn)行逐行的讀入,接著將數(shù)據(jù)發(fā)射給進(jìn)行更深一步的臨界值處理。一旦處理完成,被計(jì)算行的數(shù)據(jù)將發(fā)送給,然后由存入給數(shù)據(jù)庫(kù)。下面將對(duì)這個(gè)過(guò)程的實(shí)現(xiàn)進(jìn)行詳以日志文件和描述文件作為接收對(duì)象。文件包含了與日志一致的設(shè)計(jì)模式。不妨設(shè)想一下一個(gè)示例日志文件,包含了車輛的車牌號(hào)、行駛速度、以及數(shù)據(jù)的捕獲位置。(看下圖):數(shù)據(jù)從日志文件到的流程圖對(duì)應(yīng)的,其中指定了字段、將日志文件切割成字段的定界符文件以及數(shù)據(jù)都被保存到指定的路徑。:用以描述日志文件的文件。1.1.<TUPLEINFO>2.<FIELDLIST>3.<FIELD>4.<COLUMNNAME>vehicle_number</COLUMNNAME>5.<COLUMNTYPE>string</COLUMNTYPE>6.</FIELD>8.<FIELD>9.<COLUMNNAME>speed</COLUMNNAME>PEintCOLUMNTYPE<COLUMNNAME>location</COLUMNNAME>COLUMNTYPEstring/COLUMNTYPE>R儲(chǔ)存了日志文件的字段、定界符、字段的類型這些很必要的信息。這個(gè)對(duì)象通過(guò)序列化時(shí)建立。?對(duì)文件的改變進(jìn)行分開(kāi)的監(jiān)聽(tīng),并監(jiān)視目錄下有無(wú)新日志文件添加。?在數(shù)據(jù)得到了字段的說(shuō)明后,將其轉(zhuǎn)換成。?聲明和之間的分組,并決定發(fā)送給的途徑。和1.publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector)2.{3._collector=collector;4.try5.{6.fileReader=newBufferedReader(newFileReaderile7.}8.catch(FileNotFoundExceptione)9.{10.System.exit(1);11.}16.protectedvoidListenFile(Filefile).{18.Utils.sleep(2000);19.RandomAccessFileaccess=null;20.Stringline=null;21.try22.{23.while((line=access.readLine())!=null)24.{25.if(line!=null)26.{27.String[]fields=null;28.if(tupleInfo.getDelimiter().equals("|"))fields=line.split("\\"+tupleInfo.getDelimiter());29.else30.fields=line.split(tupleInfo.getDelimiter());31.if(tupleInfo.getFieldList().size()==fields.length)_collector.emit(newValues(fiels32.}33.}}35.catch(IOExceptionex){}}39.publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer)41.String[]fieldsArr=newString[tupleInfo.getFieldList42.for(inti=0;i<tupleInfo.getFieldList().size();i++)43.{44.fieldsArr[i]=tupleInfo.getFieldList().get(i).getColumnName();45.}46.declarer.declare(newFields(fieldsArr));送給()決定了發(fā)射的格式,這樣的話就可以用類似的方法將就會(huì)進(jìn)行讀入并且的輸出結(jié)果將給予進(jìn)行更深一步的處理。經(jīng)過(guò)對(duì)用例的思考,我們的中需要如中的兩個(gè)。將發(fā)出,由接收并進(jìn)行臨界值處理。在這里,它將接臨界值檢查?臨界值欄數(shù)檢查(拆分成字段的數(shù)目)?臨界值數(shù)據(jù)類型(拆分后字段的類型)?臨界值出現(xiàn)的頻數(shù)?臨界值時(shí)間段檢查類11.publicclassThresholdInfoimplementsSerializable3.{4.privateStringaction;5.privateStringrule;6.privateObjectthresholdValue;7.privateintthresholdColNumber;8.privateIntegertimeWindow;9.privateintfrequencyOfOccurence;:臨界值檢測(cè)代碼段基于字段中提供的值,臨界值檢查將被分的功能是解析和接收值的檢測(cè)。()方法執(zhí)行。代碼大部11.publicvoidexecute(Tupletuple,BasicOutputCollectorcollector)2.{3.if(tuple!=null).{5.List<Object>inputTupleList=(List<Object>)tuple.getValues();6.intthresholdColNum=thresholdInfo.getThresholdColNumber();7.ObjectthresholdValue=thresholdInfo.getThresholdValue();8.StringthresholdDataType=tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();9.IntegertimeWindow=thresholdInfo.getTimeWindow();10.intfrequency=thresholdInfo.getFrequencyOfOccurene11.if(thresholdDataType.equalsIgnoreCase("string")).{13.StringvalueToCheck=inputTupleList.get(thresholdColNum-1).toString();14.StringfrequencyChkOp=thresholdInfo.getAction();15.if(timeWindow!=null)16.{17.longcurTime=System.currentTimeMi18.longdiffInMinutes=(curTime-startTime)/(1000);19.if(diffInMinutes>=timeWindow)20.{21.if(frequencyChkOp.equals("=="))22.{23.if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))24.{25.count.incrementAndGet();26.if(count.get27.splitAndEmit(inputTupleList,collector);28.}29.}30.elseif(frequencyChkOp.equals31.{32.if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))33.{34.count.incrementAndGet();35.if(count.get36.splitAndEmit(inputTupleList,collector);37.}38.}39.elseSystem.out.println("Operatornotsupported");40.}41.}else43.{44.if(frequencyChkOp.equals("=="))45.{46.if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))47.{48.count.incrementAndGe49.if(count.get()>frequency)50.splitAndEmit(inputTupleList,collector);51.}52.}53.elseif(frequencyChkOp.equals("!="))54.{55.if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))56.{57.count.incrementAndGet();58.if(count.get()>frequency)it(inputTupleList,collector);}64.else")||IgnoreCase("double")||splitAndEm}}}if(thresholdDataType.equalsIgnoreCase("intthresholdDataType.equalsoldDataType.equalsIgnoreCase("float")||thresholdDataType.equalsIgnoreCase("long")||thresholdDataType.equalsIgnoreCase("short"))65.{66.StringfrequencyChkOp=thresholdInfo.getAction();67.if(timeWindow!=null)68.{69.longvalueToCheck=Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());70.longcurTime=System.currentTimeMillis();71.longdiffInMinutes=(curTime-startTime)/(1000);72.System.out.println("Differenceinminutes="+diffInMinutes);73.if(diffInMinutes>=timeWindow)74.{75.if(frequencyChkOp.equals("<"))76.{77.if(valueToCheck<Double.parseDouble(thresholdValue.toString()))78.{79.count.incrementAndGet();80.if(count.get()>frequency)splitAndEmit(inputTupleList,collector);82.}83.}84.elseif(frequencyChkOp.equals(">"))85.{86.if(valueToCheck>Double.parseDouble(thresholdValue.toString()))87.{88.count.incrementAndGet();89.if(count.get()>frequency)splitAndEmit(inputTupleList,collector);91.}92.}93.elseif(frequencyChkOp.equals("=="))94.{95.if(valueToCheck==Double.parseDouble(thresholdValue.toString()))96.{97.count.incrementAndGet();98.if(count.get()>frequency)splitAndEmit(inputTupleList,collector);100.}101.}102.elseif(frequencyChkOp.equals("!="))103.{104....116.}}}}splitAndEmit(null,collector);}{System.err.println("Emittingnullinbolt");splitAndEmit(null,collector);}經(jīng)由發(fā)送的的將會(huì)傳遞到下一個(gè)對(duì)應(yīng)的,在我們的用例中是。經(jīng)過(guò)處理的必須被持久化以便于觸發(fā)或者更深層次的使用。做了這個(gè)持久化的工作并把存入了數(shù)據(jù)庫(kù)。表的建立由()函數(shù)完成,這也將是調(diào)用的第一個(gè)方法。方法的編碼如所示。voidvoidprepare(MapStormConf,TopologyContextcontext){Class.forName(dbClass);}catch(ClassNotFoundExceptione){System.out.println("Drivernotfound");e.printStackTrace();}1.public2.{15.connectiondriverManager.getConnection(16."jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName,userName,pwd);17.connection.prepareStatement("DROPTABLEIFEXISTS"+tableName).execute();19.StringBuildercreateQuery=newStringBuilder(20."CREATETABLEIFNOTEXISTS"+tableName+"(");21.for(Fieldfields:tupleInfo.getFieldList())22.{23.if(fields.getColumnType().equalsIgnoreCase("String24.createQuery.append(fields.getColumnName()+"VARCHAR(500),");25.else26.createQuery.append(fields.getColumnName()+""+fields.getColumnType()+",");27.}28.createQuery.append("thresholdTimeStamptimestamp)");29.connection.prepareStatement(createQuery.toString()).execut31.//InsertQuery32.StringBuilderinsertQuery=newStringBuilder("INSERTINTO"+tableName+"(");33.StringtempCreateQuery=newString();34.for(Fieldfields:tupleInfo.getFieldList())35.{36.insertQuery.append(fields.getColumnName()+",");37.}38.insertQuery.append("thresholdTimeStamp").append(")values39.for(Fieldfields:tupleInfo.getFieldList())40.{41.insertQuery.append("?,");42.42.}3.44.insertQuery.append("?)");45.prepStatement=connection.prepareStatement(insertQuery.toString());46.}47.catch(SQLExceptione)8.{49.e.printStackTrace();50.}數(shù)據(jù)分批次的插入數(shù)據(jù)庫(kù)。插入的邏輯由中的()方法提供。大部分的編碼都是用來(lái)實(shí)現(xiàn)可能存在不同類型輸入的解析。11.publicvoidexecute(Tupletuple,BasicOutputCollectorcollector)2.{3.batchExecuted=false;4.if(tuple!=null){6.List<Object>inputTupleList=(List<Object>)tuple.getValues();7.intdbIndex=0;8.for(inti=0;i<tupleInfo.getFieldList().size();i++)9.{10.Fieldfield=tupleInfo.getFieldList().get(i);11.try{12.dbIndex=i+1;13.if(field.getColumnType().equalsIgnoreCase("String"))14.prepStatement.setString(dbIndex,inputTupleList.get(i).toString());15.elseif(field.getColumnType().equalsIgnoreCase("int"))16.prepStatement.setInt(dbIndex,17.Integer.parseInt(inputTupleList.get(i).toString()));18.elseif(field.getColumnType().equalsIgnoreCase("long"))19.prepStatement.setLong(dbIndex,20.Long.parseLong(inputTupleList.get(i).toString()));21.elseif(field.getColumnType().equalsIgnoreCase("float"))22.prepStatement.setFloat(dbIndex,23.Float.parseFloat(inputTupleList.get(i).toString()));24.elseif(field.getColumnType().equalsIgnoreCase("double"))25.prepStatement.setDouble(dbIndex,26.Double.parseDouble(inputTupleList.get(i).toString()));27.elseif(field.getColumnType().equalsIgnoreCase("short"))28.prepStatement.setShort(dbIndex,29.Short.parseShort(inputTupleList.get(i).toString()));30.elseif(field.getColumnType().equalsIgnoreCase("boolean"))31.prepStatement.setBoolean(dbIndex,32.Boolean.parseBoolean(inputTupleList.get(i).toString()));33.elseif(field.getColumnType().equalsIgnoreCase("byte"))34.prepStatement.setByte(dbIndex,35.Byte.parseByte(inputTupleList.get(i).toString()));36.elseif(field.getColumnType().equalsIgnoreCase("Date"))37.{38.DatedateToAdd=null;39.if(!(inputTupleList.get(i)instanceofDate))40.{41.DateFormatdf=newSimpleDateFormat("yyyy-MM-ddhh:mm:ss");42.try43.{44.dateToAdd=df.parse(inputTupleList.get(i).toString());45.}46.catch(ParseExceptione)47.{48.System.err.println("Datatypenotvalid");49.}50.}51.else52.{53.dateToAdd=(Date)inputTupleList.get(i);54.java.sql.DatesqlDate=newjava.sql.Date(dateToAdd.getTime());55.prepStatement.setDate(dbIndex,sqlDate);56.}}58.catch(SQLExceptione)59.{60.e.printStackTrace();61.}62.}63.Datenow=newDate();64.try{66.prepStatement.setTimestamp(dbIndex+1,newjava.sql.Timestamp(now.getTime()));67.prepStatement.addBatch();68.counter.incrementAndGet();69.if(counter.get()==batchSize)70.executeBatch();71.}72.catch(SQLExceptione1){74.e1.printStackTrace();75.}76.}77.else78.{79.longcurTime=System.currentTimeMillis();80.longdiffInSeconds=(curTime-startTime)/(60*1000);SizediffInSecondsbatchTimeWindowInSeconds)82.{83.try{84.executeBatch();85.

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論