版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
1、第1章實(shí)時(shí)處理模塊1.1模塊搭建添加scala框架1.2代碼思路1)消費(fèi)kafka中的數(shù)據(jù);2)利用redis過(guò)濾當(dāng)日已經(jīng)計(jì)入的日活設(shè)備;3) 把每批次新增的當(dāng)日日活信息保存到HBASE或ES 中;4)從ES中查詢出數(shù)據(jù),發(fā)布成數(shù)據(jù)接口,通可視化化工程調(diào)用。1.3代碼開(kāi)發(fā)1 -消費(fèi)Kafka1.3.1配置1) perties# Kafka 配置kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092# Redis 配置redis.host=hadoop102rdis.port=6379 2) pom.xml
2、 com.atguigu.gmall2019.dw dw-com mon 1.0-SNAPSHOTorg.apache.spark spark-core_2.11 org.apache.spark spark-stream in g_2.11 org.apache.kafka kafka-clie nts org.apache.spark spark-stream in g-kafka-0-8_2.11 redis.clie nts jedis 2.9.0io.searchboxjestvversion 5.3.3 net.java.dev.j naj na 4.5.2org
3、.codehaus.ja nino com mon s-compiler 2.7.8!-該插件用于將Scala代碼編譯成n et.alchim31.mave n scala-mave n-plugi n 3.2.2!-聲明綁定到mavencompile testCompile1.3.2工具類class 文件-的 compile 階段-1 ) MykafkaUtilpackage com.atguigu.utilsimport java.util.Propertiesimport kafka.serializer.Stri ngDecoderimport org.apache.spark.str
4、eami ng.Streami ngCon textimport org.apache.spark.streami ng.dstrea m.ln putDStreamimport org.apache.spark.streami ng.kafka.KafkaUtilsobject MyKafkaUtil def getKafkaStream(ssc:Stream ingCon text,In putDStream(Stri ng, Stri ng) = topics:SetStri ng):valproperties:Properties=PropertiesUtil.load(c on fi
5、perties)val kafkaPara = Map(bootstrap.servers-properties.getProperty(kafka.broker.list),group.id - bigdata0408)/ 基于Direct方式消費(fèi)Kafka數(shù)據(jù)valkafkaDStream:In putDStream(Stri ng,Strin g)=KafkaUtils.createDirectStreamStri ng,String,Strin gDecoder,Strin gDecoder(ssc, kafkaPara, topics)/ 返回kafkaDStream2 )
6、 PropertiesUtilimport java.i o.ln putStreamReaderimport java.util.Propertiesobject PropertiesUtil def load(propertieName:Stri ng): Properties =val prop=new Properties。;prop .lo ad( newIn putStreamReader(Thread.curre ntThread().getC on textClassLoader.ge tResourceAsStream(propertieName) , UTF-8)prop
7、3 ) RedisUtil object RedisUtil var jedisPool:JedisPoo l=n ulldef getJedisClie nt: Jedis = if(jedisPool=null)printin(”開(kāi)辟一個(gè)連接池)val config = PropertiesUtil.load(c on perties) val host = con fig.getProperty(redis.host)val port = con fig.getProperty(redis.port)最大連接數(shù)最大空閑最小空閑忙碌時(shí)是否等待忙碌時(shí)等待時(shí)長(zhǎng)毫秒 每次獲得連接的
8、進(jìn)行測(cè)試val jedisPoolConfig = new JedisPoolConfig() jedisPoolC on fig.setMaxTotal(IOO) / jedisPoolCo nfig.setMaxldle(20) / jedisPoolCo nfig.setMi nldle(20) / jedisPoolCo nfig.setBlockWhe nExhausted(true) / jedisPoolC on fig.setMaxWaitMillis(500) / jedisPoolC on fig.setTest On Borrow(true) /jedisPoo l=ne
9、w JedisPool(jedisPoolC on fig,host,port.to Int)/ prin tl n(sjedisPool.getNumActive= $jedisPool.getNumActive)/ println(”獲得一個(gè)連接)jedisPool.getResource1.3.3樣例類Startuplogcase class StartUpLog(mid:Stri ng, uid:Stri ng, appid:Stri ng, area:Stri ng, os:Stri ng, ch:Stri ng, logType:Stri ng, vs:Stri ng, var l
10、ogDate:Stri ng, var logHour:Stri ng, var ts:L ong)1.3.4業(yè)務(wù)類消費(fèi)kafkaimport org.apache.phoe ni x.spark._object RealtimeStartupApp def ma in (args: ArrayStri ng): Unit = valsparkC onf:SparkC onfSparkCo nf().setMaster(local*).setAppName(gmall2019)val sc = new SparkC on text(sparkC onf)val ssc = new Stream
11、 ingCon text(sc,Sec on ds(10)newvalstartupStream:String MyKafkaUtil.getKafkaStream(ssc ARTUP)In putDStreamC on sumerRecordStri ng,Set(GmallCo nsta nts.KAFKA_TOPIC_ST/startupStream.map(_.value().foreachRDD rdd=/printin (rdd.collectOkStri ng(n)/valstartupLogDstream:startupStream.map(_.value().map log
12、=/ println( slog = $log)valstartUpLog:StartUpLogDStreamStartUpLog= JSON.parseObject(log,classOfStartUpLog) startUpLog1.4代碼開(kāi)發(fā)2 -去重1.4.1流程圖142 設(shè)計(jì) Redis 的 KVkeyvaluedau:2019-01-22設(shè)備id143業(yè)務(wù)代碼import java.utilimport java.text.SimpleDateFormatimport java.util.Dateimport com.alibaba.fastjs on JSONimport com
13、.atguigu.gmall.c on sta nt.GmallC on sta ntsimport com.atguigu.gmall2019.realtime.bea n. StartupLogimport com.atguigu.gmall2019.realtime.util.MyKafkaUtil, RedisUtilimport org.apache.hadoop.c onf.Con figuratio nimport org.apache.kafka.clie nts.c on sumer.C on sumerRecordimport org.apache.spark.SparkC
14、 onfimport org.apache.spark.broadcast.Broadcastimport org.apache.spark.rdd.RDDimport org.apache.spark.stream in g.dstream.DStream, In putDStreamimport org.apache.spark.stream in g.Sec on ds, Stream ingCon textimport redis.clie nts.jedis.Jedisimport org.apache.phoe ni x.spark._object DauApp def main(
15、 args: ArrayStri ng): Unit = valsparkC onf:SparkC onf=newSparkCo nf().setMaster(local*).setAppName(dau_app)val ssc = new Stream ingCon text(sparkC on f,Sec on ds(5)/ 1 消費(fèi) kafkaval inputDstream: InputDStreamConsumerRecordString, StringMyKafkaUtil.getKafkaStream(ssc,Set(GmallCo nsta nts.KAFKA_TOPIC_ST
16、ARTUP)2數(shù)據(jù)流轉(zhuǎn)換結(jié)構(gòu)變成case class補(bǔ)充兩個(gè)時(shí)間字段val startuplogDstream:DStreamStartupLog= in putDstream.map record =val json Str: String = record.value()valstartupLog:StartupLog=JSON.parseObject(js on Str,classOfStartupLog)valdateTimeStr:Stri ng= new SimpleDateFormat(yyyy-MM-ddHH).format(new Date(startupLog.ts)val
17、 dateArr: ArrayString = dateTimeStr.split(”)startupLog .lo gDate = dateArr(0)startupLog .lo gHour = dateArr(1) startupLogstartuplogDstream.cache()3利用用戶清單進(jìn)行過(guò)濾去重只保留清單中不存在的用戶訪問(wèn)記錄valfilteredDstream:DStreamStartupLogstartuplogDstream.tra nsform rdd =按周期val jedis: Jedis = RedisUtil.getJedisClient /driver
18、/執(zhí)行valdateStr:Stri ng= new SimpleDateFormat(yyyy-MM-dd).format(new Date()val key = dau: + dateStrval dauMidSet: util.SetStri ng = jedis.smembers(key)jedis.close()valdauMidBC:Broadcastutil.SetStri ngssc.sparkC on text.broadcast(dauMidSet)prin tl n(”過(guò)濾前:” + rdd.cou nt()val filteredRDD: RDDStartupLog =
19、 rdd.filter startuplog =/executorval dauMidSet: util.SetStri ng = dauMidBC.value!dauMidSet.c ontain s(startuplog.mid)prin tl n(”過(guò)濾后:” + filteredRDD.cou nt()filteredRDD4批次內(nèi)進(jìn)行去重:按照mid進(jìn)行分組,每組取第一個(gè)值val groupbyMidDstream: DStream(Stri ng, IterableStartupLog)filteredDstream.map(startuplog=(startuplog.mid,s
20、tartuplog).grou pByKey()valdistictDstream:DStreamStartupLoggroupbyMidDstream.flatMap case (mid, startupLogItr)=startupLogltr.toList.take(1)/ 5 保存今日訪問(wèn)過(guò)的用戶 (mid)清單 -Redis 1 key類型:setkey : dau:2019-xx-xx 3 value : middistictDstream.foreachRDDrdd=/driverrdd.foreachPartiti on startuplogItr=val jedis:Jedi
21、s=RedisUtil.getJedisClie nt /executorfor (startuplog - startuplogItr ) val key= dau:+startuplog .lo gDatejedis.sadd(key,startuplog.mid)prin tl n( startuplog)jedis.close()ssc.start()ssc.awaitTerm in ati on()1.5代碼實(shí)現(xiàn)3 -保存到HBase中1.5.1 Phoenix-HBase的 SQL化插件技術(shù)詳情參見(jiàn)尚硅谷大數(shù)據(jù)技術(shù)之phoe nix1.5.2利用Phoenix建立數(shù)據(jù)表create
22、 table gmall190408_dau( _mid varchar, uid varchar, appid varchar, area varchar, os varchar, ch varchar, type varchar, vs varchar, logDate varchar, logHour varchar, ts bigi nt CONSTRAINT dau_pk PRIMARY KEY (mid, logDate);1.5.3 pom.xml中增加依賴org.apache.phoe ni x phoe ni x-spark 4.14.2-HBase-1.3org.apach
23、e.spark spark-sql_業(yè)務(wù)保存代碼/把數(shù)據(jù)寫入hbase+phoenix distictDstream.foreachRDDrdd= rdd.saveToPhoe nix(GMALL2019_DAU,Seq(MID,UID,APPID,AREA, OS, CH, TYPE, VS, LOGDATE, LOGHOUR, TS) ,newCon figuratio n,Some(hadoop102,hadoop103,hadoop104:2181)第2章日活數(shù)據(jù)查詢接口2.1訪問(wèn)路徑總數(shù)http:/localhost:8070/realtime-total?date
24、=2019-09-06分時(shí)統(tǒng)計(jì)http:/localhost:8070/realtime-hours?id二dau&date=2019-09-062.2要求數(shù)據(jù)格式總數(shù)id:dau,name:新增日活,”value:1200,id:new_mid,name:新增設(shè)備,”value:233分時(shí)統(tǒng)計(jì)yesterday:11:383,12:123,17:88,19:200 ,today:12:38,13:1233,17:123,19:6882.3搭建發(fā)布工程fi J1VModdtSDK! 叵聰*mit:是 JWB EntrrpnseClicxjfi# IritisSsrr Sjrvic# URL&
25、JSq&sCi!)I2MF;) Default ttpss/f Start.ipirrn.i gC) CloudsO Custom:O Epdngvatae Btirnyaur nwwork connectorii is xtiv址 before torrtiLJir.鼻 Ando;dIntelliJ PlLatform PlugirSprint h Hj-aGji.HT Mavm亍 jrdleProject (Metadata口r up:corrie 日 tg igugmll 2019,dwArtifactsdw-DiiblisberILaMai/en Projj&ct 】Paiirkghg
26、-Java Version:Name1Deme project fo-r Spring Boat0A1-SNAP WOTdw-publiibFrcem. atg big ul g rm all2019,d vM.pub I is her!* MoclukBodi 15-21 “Salectsd D百ipmrtdmnci甸#Develcper Tgl匚笫伽怕IPAChcvdoper To-ali_ M/SQL Drk-efLoinbGkWebILS JDBC API5prin Wb tasterWdSQLFFramF.ADiiifM申旳irigI- PoiTqwSCH DnvwSQLIK)匚 M
27、E QL Eerver OrfvEIDS匚 APIOpiSpring tlwd廠 H/JW 帶QLDilitJiWMy Doti FFumiwmitSpig Clcxid SecurityL Apache Dertsry Ddtdbdeii 二.:2.4配置文件241 pom.xmlvjava.vers ion 1.8org.spri ngframework.boot spri ng-boot-starter-web com.atguigu.gmall2019.dw dw-com mon 1.0-SNAPSHOTorg.spri ngframework.boot spri ng-boot-st
28、arter-testtestorg.mybatis.spri ng.boot mybatis-spri ng-boot-starter vversion 1.3.4org.spri ngframework.boot spri ng-boot-starter-jdbc org.apache.phoe ni x phoe ni x-core 4.14.2-HBase-1.3com.google.guava guava 20.0org.spri ngframework.boot spri ng-boot-mave n-plugi n 2.4.2 applicati on .propertiesser
29、ver.port=8070loggi ng.l evel.root=errorspri ng.datasource.driver-class-n ame=org.apache.phoe nix.jdbc.Phoe ni xDriverspri ng.datasource.url=jdbc:phoe ni x:hadoop102,hadoop103,hadoop104:2181spri ng.datasource.data-user name=spri ng.datasource.data-password=#mybatis#mybatis.typeAliasesPackage=com.exam
30、ple.phoe ni x.e ntity mybatis.mapperLocati on s=classpath:mapper/*.xml mybatis.c on figurati on. map-un derscore-to-camel-case=true2.5代碼實(shí)現(xiàn)控制層PublisherCo ntroller實(shí)現(xiàn)接口的web發(fā)布服務(wù)層PublisherService數(shù)據(jù)業(yè)務(wù)查詢in terfacePublisherServiceImpl業(yè)務(wù)查詢的實(shí)現(xiàn)類數(shù)據(jù)層DauMapper數(shù)據(jù)層查詢的in terfaceDauMapper.xml數(shù)據(jù)層查詢的實(shí)現(xiàn)配置主程序GmallPublish
31、erApplicatio n增加掃描包2.5.1 GmallPublisherApplication 增加掃描包Spri ngBootApplicati onMapperSca n( basePackagescom.atguigu.gmallXXXXXXX.publisher.m apper)public class Gmall2019PublisherApplicati onpublic static void main(String args) Spri ngApplicatio n.run (Gmall2019PublisherApplicatio n.class, args);2.5.
32、2 controller 層import com.alibaba.fastjs on. JSON;import com.alibaba.fastjs on. JSONObject;import com.atguigu.gmall2019.dw.publisher.service .P ublisherService; import mons.lan g.time.DateUtils;import org.spri ngframework.bea ns.factory.a nn otatio n. Autowired; import org.spri ngframework.web.b in d
33、.a nno tati on .GetMapp ing;import org.spri ngframework.web.bi nd.a nn otatio n.RequestParam; import org.spri ngframework.web.bi nd.a nn otatio n.RestC on troller;import java.text.ParseExcepti on;import java.text.SimpleDateFormat;import java.util.*;RestCo ntroller public class PublisherC on troller
34、AutowiredPublisherService publisherService;GetM appi ng(realtime-total)publicString realtimeHourDate(RequestParam(date)date) List list = new ArrayList();/日活總數(shù)int dauTotal = publisherService.getDauTotal(date);Map dauMap=new HashMap(); dauMap.put(id,da u);dauMap.put(”name,新增日活);dauMap.put(value,dauTot
35、al);list.add(dauMap);/新增用戶int n ewMidTotal = publisherService.getNewMidTotal(date);Map newMidMap=new HashMap();n ewMidMap.put(id, new_mid);newMidMap.put(”name,新增用戶”);n ewMidMap.put(value, newMidTotal);list.add( newMidMap);return JSON.toJSONStri ng(list);GetMapp in g(realtime-hours)publicStringrealti
36、meHourDate(RequestParam(id)id,RequestParam(date) String date)if(dau.equals(id)Map dauHoursToday = publisherService.getDauHours(date);JSONObject jso nObject = new JSONObject();jsonO bject.put(today,dauHoursToday);String yesterdayDateString=;try DatedateToday = new SimpleDateFormat(yyyy-MMdd).parse(da
37、te);Date dateYesterday = DateUtils.addDays(dateToday, -1);yesterdayDateStri ng=newSimpleDateFormat(yyyy-MMdd).format(dateYesterday); catch (ParseExceptio n e) e.pri ntStackTrace();MapdauHoursYesterdaypublisherService.getDauHours(yesterdayDateStri ng);jsonO bject.put(yesterday,dauHoursYesterday);return jso nObject.toJSONStri ng();if( n ew_order_totalam oun t.equals(id)Stri ngn ewOrderTotalam oun tJs onpublisherService.getNewOrderTotalAm oun tHours(d
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 下學(xué)期教皇的奶牛-課件
- 《證券投資相關(guān)》課件
- 《湖泊的水文特征》課件
- 《語(yǔ)文下冊(cè)《雪》魯迅》課件
- 七年級(jí)英語(yǔ)上冊(cè)期末復(fù)習(xí)課件
- 單位管理制度集粹選集人力資源管理
- 單位管理制度匯編大全人力資源管理篇
- 單位管理制度合并匯編【人事管理篇】
- 單位管理制度范文大合集員工管理篇
- 單位管理制度范例匯編人事管理篇
- 消毒供應(yīng)室護(hù)理質(zhì)量考核評(píng)分標(biāo)準(zhǔn)
- 《軟件工程》教案(本科)
- 粗粒土和巨粒土最大干密度記錄表及報(bào)告
- 愛(ài)麗絲夢(mèng)游仙境話劇中英文劇本(共6頁(yè))
- 書(shū)法少年宮活動(dòng)記錄
- 表冷器性能計(jì)算書(shū)
- 走遍德國(guó) A1(課堂PPT)
- 照明公司個(gè)人工作總結(jié)范文
- 熱控專業(yè)施工質(zhì)量驗(yàn)收范圍劃分表
- 2022年sppb簡(jiǎn)易體能狀況量表
- 各類傳染病個(gè)案調(diào)查表集
評(píng)論
0/150
提交評(píng)論