版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
1、八八Kafka安裝配置及使用說(shuō)明(鐵樹(shù) 2018-08-08)(Windows平臺(tái),5個(gè)分布式節(jié)點(diǎn),修改消息大小,調(diào)用程序范例)1安裝配置采用5臺(tái)服務(wù)器作為集群節(jié)點(diǎn),IP地址為: XX.XX.0.12-XX.XX.0.16.每臺(tái)機(jī)器依次安裝配置 JDK zookeeper、kafka ,先安裝完一臺(tái) 機(jī)器,然后拷貝到其他機(jī)器,再修改配置文件。JDK安裝配置JDK版本:jdk1.7.0_51_x64 解壓版(jdk1.7.0_51_x64.rar )汨江p 會(huì)三唐豐,=斯*M寸實(shí)苓標(biāo)KtBM也XJ.bin2013.1/30 2:1&文件壬K =S卜db2013/7 /30 2:3 5文性充,in
2、clude2018/7/3C 36上海201i7/30 忘 6立神去N建h iM20147/30 236木件F211即7/卻 不韭文性支jISIUL. BPVmGE隨1租仃162(X01文件4 CU$EW1V4/10 史 14就13_L安?| README201/1014HTML,1 KBmIww2014/4/10 兜141 KR.ire2013/12/1B 202iMnRAR J1P 穌Mja 3J計(jì)曰嘰mg唯 RNUCEN 刑 EMHMf2014/410 能 1417 KSft工址證岫32014/d/10 樂(lè)14123咫_ HDRCRTLl;N$fiAOME-MVAFX晶*A*3 LO范泮
3、工1)設(shè)置環(huán)境變量:JAVA_HOM EC:kafkajdk1.7.0_51_x64PATH C:kafkajdk170_51_x64binzookeeper 安裝配置解壓安裝zookeeper 版本:3.4.12(zookeeper-3.4.12.tar.gz )解壓到C盤kafka目錄下,如圖所示創(chuàng)建zookeeper數(shù)據(jù)目錄和日志目錄zkdata #存放快照C:kafkazookeeper-3412zkdatazkdatalog#存放日志C:kafkazookeeper-3412zkdatalog修改配置文件進(jìn)入到“C:kafkazookeeper-3.4.12 ”目錄下的conf目錄中
4、,復(fù)制zoo_sample.cfg (官方提供的zookeeper的樣板文件),重命名為zoo.cfg (官方指定的文件命名規(guī)則)仇本KiE超心) ksflca+工glcwpxT*匚Eo 1 & -hl(Ci) H%。T spolcjpepe-3AlZ CPrrF默認(rèn)內(nèi)容:八八1 t 7ht rumber af niLLisecnnds of arh 1 ick2 iicliiie=20lJUt Ihe ruFibor of tcks that tho initial4 t jyncbut cnizat i on pharc ca- tak? 皿itLijiHO* Ihe 及川心日工 of t
5、icts that can pajs belvten7 g sendw a request 到id eettinz an icknovlgdgement弓 sjTicLlJiitsBq t +hp direetery 卜卡了+t.n im可i雪hat is licrri.f do net se /tup for stor3c /tup htre is jurt11 t wple spikes d#t甚ndeepe 丁* the port at which -the clients 懵ill coimcrtU cLientFort2131f the jisKimum minbeT of clie
6、nt coTmecTiairis. zncresse this if you r.eEd to hand L e no re clxent317tnsjcCl ierCra:n3= 5 IS # Bt sills to rsal the nLiirtenan.es scctzon cf tlie20# atLnirjLEtratac urde befere tijrnj.n on 七ArtQpur4巴.2144 him:77宣0血23口電匕 /魚(yú)匚匕七江rg/dDc/curreiiiT/Ho&En2EaMmHL.htilflscniaiiiTenance 罩24 Th r.ujTibsr af
7、 snap shots to- retain tn dli+dJirtaut&pujEge. snapR*t a.inCanmt-3t Pur co tack intoEnl in hcurc27 t Set tc .口“ ts 11able aute pjrgs foMuref -aut op ill gti tMiL kaflca kifk_ZlL-XLl 1-| *f |1 號(hào)寺冏 一 h:P創(chuàng)建消息目錄kafkalogs : C:kafkakafka_2.11-111kafkalogsperties修改配置文件打開(kāi)實(shí)際的修改項(xiàng)為:broker.id=1listeners=PLAINTE
8、XT:19092log.dirs= C:/kafka/kafka_2.11-1.1.1/kafkalogs#在log.retention.hours=168下面新增下面三項(xiàng)(消息大小最大1GBmessage.max.byte=1073741824replica.fetch.max.bytes=1073741824log.segment.bytes=1073741824default.replication.factor=2#設(shè)置zookeeper的連接端口zookeeper.connect=XX.XX.0.12:12181,XX.XX.0.13:12181,XX.XX.0.14:12181,X
9、X.XX.0.15:12181,XX.XX.0.16:12181配置說(shuō)明:broker.id=0 #當(dāng)前機(jī)器在集群中的唯一標(biāo)識(shí),和zookeeper的myid性質(zhì)一樣port=19092 #當(dāng)前kafka對(duì)外提供服務(wù)的端口默認(rèn)是 9092=00 # 這個(gè)參數(shù)默認(rèn)是關(guān)閉的,在 0.8.1 有個(gè)bug, DNS軍析問(wèn)題,失敗率的問(wèn)題。work.threads=3 #這個(gè)是borker進(jìn)行網(wǎng)絡(luò)處理的線程數(shù)八八num.io.threads=8 # 這個(gè)是borker進(jìn)行I/O處理的線程數(shù)log.dirs=/opt/kafka/kafkalogs/#消息存放的目錄,這個(gè)目錄可以配置為“,”逗號(hào)分割的表達(dá)
10、式,上面的 num.io.threads 要大 于這個(gè)目錄的個(gè)數(shù)這個(gè)目錄,如果配置多個(gè)目錄,新創(chuàng)建的topic他 把消息持久化的地方是,當(dāng)前以逗號(hào)分割的目錄中,那個(gè)分區(qū)數(shù)最少 就放那一個(gè)socket.send.buffer.bytes=102400 #發(fā)送緩沖區(qū) buffer 大小,數(shù)據(jù)不是一下子就發(fā)送的,先回存儲(chǔ)到緩沖區(qū)了到達(dá)一定的大小后在 發(fā)送,能提高性能socket.receive.buffer.bytes=102400 #kafka接收緩沖區(qū)大小,當(dāng)數(shù)據(jù)到達(dá)一定大小后在序列化到磁盤socket.request.max.bytes=104857600#這個(gè)參數(shù)是向 kafka 請(qǐng)求消息或
11、者向kafka發(fā)送消息的請(qǐng)請(qǐng)求的最大數(shù),這個(gè)值不能超過(guò) java的堆棧大小num.partitions=1#默認(rèn)的分區(qū)數(shù),一個(gè)topic 默認(rèn)1個(gè)分區(qū)數(shù)log.retention.hours=168 #默認(rèn)消息的最大持久化時(shí)間,168小時(shí),7天message.max.byte=5242880 # 消息保存的最大值 5Mdefault.replication.factor=2 #kafka保存消息的副本數(shù),如果一個(gè)副本失效了,另一個(gè)還可以繼續(xù)提供服務(wù)replica.fetch.max.bytes=5242880 #取消息的最大字節(jié)數(shù)log.segment.bytes=1073741824 #這個(gè)
12、參數(shù)是:因?yàn)?kafka 的消息是以追加的形式落地到文件,當(dāng)超過(guò)這個(gè)值的時(shí)候,kafka會(huì)新起一個(gè)文件erval.ms=300000 # 每隔 300000 毫秒去檢查上面配置的log失效時(shí)間(log.retention.hours=168 ),到目錄查看是否有過(guò)期的消息如果有,刪除log.cleaner.enable=false#是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能zookeeper.connect=192.1687100:12181,01:12181,07:1218 # 設(shè)置 zookeeper 的連接端口1.4其他節(jié)點(diǎn)配置將安裝以上配置好的目錄c:kafka拷貝到其他節(jié)
13、點(diǎn)的c盤目錄, 并修改如下配置。1、JAVA環(huán)境變量:JAVA_HOM EC:kafkajdk1.7.0_51_x64PATH C:kafkajdk170_51_x64bin2、 zookeeper 的 myidC:kafkazookeeper-3412zkdatamyid,修改為對(duì)應(yīng)的數(shù)值XX.XX.0.12 : 1XX.XX.0.13 : 2XX.XX.0.14 : 3XX.XX.0.15 : 4XX.XX.0.16 : 53、kafka 配置perties 的broker.id ,修改為對(duì)應(yīng)的數(shù)值XX.XX.0.12 : 1XX.XX.0.13 : 2XX.XX.0.14 : 3XX.X
14、X.0.15 : 4XX.XX.0.16 : 51.5服務(wù)啟動(dòng)1、 啟動(dòng) zookeeperC:kafkazookeeper-3412binzkServer.cmdXX.XX.0.12-16 ,依次雙擊啟動(dòng)。r-31 匚 出 ndows sEystEm 3 P.cmd. exeinxSis;tem3?MJindoLFFhukrShe-1 ) ju1 B。、;: Me畫j64Ml in p , lfii;4B:5232H njjid:l 1 - IHFO ftrunFEErfiwyid-1zniu iranimcnLHllHIii J - Sc-rviEr nnu i vd mie n t r
15、J au a . xn . L rnpd if MJ s e rsye ! u P ppDa tXLdf 1、1 Eip、 301H 07 JIM 制卜工孤曰二Fnyidil rNPOmy MT l川 tHt WM州 1用工1 1IH13 En u i rnnmfi nt PI HWI1Kervcr fimui Tnnnrnt :; jnvn -ennKsilr;r-:BlB-B7-3ifl 目E:K;E2L32N可說(shuō);工】-INTO 如0nmFtnir 1宿箝城1 1 ?。蝗?國(guó)空翎曲; SI:EnvironmentflIW V - Server cwuironraent:os.naneVi
16、ndnws 7 01B-H7-JWwKll - INFO QuaHMFlBeirlmyidl0!0B121)13EniV jronmnfe(!l1 iWt 1 -仁叁和ftnulronfwint Eol Bftrr.ltond(i4 618-07-38 e6:4S:E2.324 nvl4-11 - IHFO QurunPDDHnvid-l 1/0:0 = 11Enviroinpant1061 = Srvor onuircnvwn.von:Ion 6I ieia-fl7-ae 6=4852326 niid:l - INFO QuoztPe日肥EKd=1 H修出汨:用:團(tuán);用:目:通;121 11
17、 - En v iranme mt El W J - Scrvor enviTonnent user .nane-DbusIqi tei8-e7-3e ee:i8 :52.32? ityi4:ii - ihfo白心:也11 :Env ironimfiiFiiZlOO J - Snirvor onulrcnrwnC :ueorIwfwCtMJoora M?ouolop E818-S7-3C 電6X#:52.32* nyM2ll - IHFO 【則021%1 叫典力”二;1二吊源;同通通:1孤 ti =EnuipainirienEiG6 9z&jr* envixinfite?n( zufi-et-
18、,dii*C= MtaFkaook&ipep-3.4.12Xhin E818-fi7-3i0 66 =4S =52337 Cnid:l - IHFO QuorunF&BiHErayidl J/B = B = Bi=6 =16-6 = 6 = 6 = 121 11 :ZJuJKue perSe rvej!l?3 11 - Created 豚叨小七臚 ullh tic Iki LiihiiliiSeL:- lanT liie uul 4WIOS njahftt .12、 啟動(dòng) kafka運(yùn)行 cm cd C:kafkakafka_2.11-1.1.1 目錄,再執(zhí)行命令:【cd C:kafkakaf
19、ka_2.11-1.1.1C:kafkakafka_2.11-11lNbinwindowskafka-server-st art.bat .configperties亞二Wfnduw5sxytennm/Fmd.ETt| u | ljJ11-icroanFt Windows rj5p b7l .7Rffll版權(quán)所言=32BtW Hicrnsnft CurpaHfc uiim 保留圻有權(quán)利C= MJao3rDXlH)va lcpcd C - gfkaknf Ka_?1.1I v %k*f kfl % F kei J2 -11 -1 _ 1 _1 5 _ Xb iji Su mdous Xk*f k
20、ji-aSFerur-f-star-t - that . Xcnnf igsrrwrtrP, popert 1 9 .QI C: ndcjrS yrteiTi32ucmd.t.Fd Ln4t r ZeiB-B7-30 37sfM:K.m INFO (GniApGoDrdiMtDr 21: Sttirtup cowlcte. Ckafka.M Dixl liiAl u p r yuup . GiMfuipCuu 爐 illnal 口工 IB-07 30 B7EMiO6n(Mll INFO rCrmiDMeMdofHnnMer 不臚巾網(wǎng)害1。21 眥小u.d fl swlr i?d of f ev
21、tc Ln A nxlliEvc4ndii. 201B-i7-3fl W7:fH:M8:751 TNFO rPpodnnrTrf ItfiFi.g-nr 1: Acquird naw pdncflrld h lock. ( h!lkE r J Eril dl : 2999 Ihy writ ling- t d Zk with path vert ion 3 ka.coordinator.CPansection . FroducorldHaniageL 2H1B 7 3H M7 = &4;/才 44】UNFO Tvwi.i iniiCooHlkirfedlur Ld2 St art inf u.
22、 Vkdfkd B ooHl In stoiC ir ano m C ionT ranii t IaaCo or 2H1H-H7-3H Hr7zBCtK1: infT 2H1B-B7-30 B7stM5M,!S4J INFO tTnuCtnCoordinator id-21 Startup cdnvlete. IZfltlR R7 3fl Hf7;fH;ffl?F腫41 INFO r/nnnf igrZfihnics-want-lprtiifTffsni rlmtd 1 : Stnrt 2618-67v3G 97 = 64 = 87|130 INFO KaFIm wrulun Hl riiAf
23、Miwlioik*fkHiOQimoniiii i Is .flpp I nf nFn r J 2filB-e7-3G 84 = 07.36 INFO KaFka coiwiltld : teQI7427Fri4934fS Gipg ka-coFwian _ut:i.ls _ Appl n JnP-a.rsEir-J LlUi-7-3ld 0?:M4:HTF14i J INFO UUfluX-rv” lill uirlifd Lkdf k也六我”. Kaikd uatiO1.6服務(wù)狀態(tài)測(cè)試倉(cāng)1J建 Topics打開(kāi) cmd 進(jìn)入 C:kafkakafka_2.11-111binwindowsC
24、:kafkakafka_2.11-111binwindowsAkafka-topics.bat -create -zookeeper localhost:12181 -replication-factor 1 -partitions 1 -topic test001 QVZi r |C:Mkaf kaMcaFfka J?,11-1 J J Xhin XMlndnwsC: kafkaik*f J lG= lkaf kakaffea_2 ,11-1,1 91 Sj3ini%Afndu ws r;:Mkaf kaxkaf la_Z.ll-l .1.1 hlouinCs Mkaf kakaflia2.
25、11-1.1 .1 indowsCi; Mkaf koiSkaf ha_2 -11 1.1 .1 hirii Xlriduwkark4i-t.uipics .jbat -crecLe -zoukeeper lotIhas t : 121SI -一replleatlinni-f ac tap 1 -paii*l:-lit iDris 1 -topic t estSRL 卜wad topic f|,tant00t*sw、|(wf kf.11=1 .1 1 XDiiniXuin 5M ka Wku_2.”TJ xhih Windovo卜二afkakaFM日 .11-1.1 JNbm wlndQ加cz
26、iSkiiFkgiXkaFJm 二-11-11 stJiriXuind口般打開(kāi)一個(gè) Producer打開(kāi) cmd 進(jìn)入 C:kafkakafka_2.11-111binwindowsC:kafkakafka_2.11-1.1.1binwindowskafka-console-producer.bat -broker-list localhost:19092 -topic test001等待輸入消息內(nèi)容。打開(kāi)一個(gè) Consumer打開(kāi) cmd 進(jìn)入 C:kafkakafka_2.11-111binwindowsC:kafkakafka_2.11-1.1.1binwindowskafka-cons
27、ole-consumer.bat -zookeeper localhost:12181 -topic test001H CVWndwvEEtemSZxnd-eife - kafka-ronEole-corEumer.bst -zoolcE-epe-r dcs hortiL2L5 . 口Window 版本 6,班日口卡守門有 2809 Hieoft CorpnLan ,胃留所有權(quán)劑*L : 1Ax.ie rs XUeve Inp c d 年通特法不正旗0亡 MJ X。ro Mau 口 lopc A丁希法不TF睢。C 3kaf Jm Mat ka J3 Miners Mlewe lojjd二ka5
28、qF】“疑_2.11-1MnNwindowoka-cqh屯olo-c&r口u朝看爐.bat -巖。okoepar loctopic CMtflBiIs any t: he- Ccn5n IcConxumer wit-hi n Id chiukuiiwi1- ns diepic-cat ed and u 11 Js r肅rmued in 0 f ture rwi jor re Lease - Consider uwinq 七M(jìn) con-5-uncr by pass-ing- I boot:3trap-s rvei*J irtgtsad a4 IZBCiki母porJL然后就可以在Producer
29、控制臺(tái)窗口輸入消息了,很快Consumer窗口就會(huì)顯示出Producer發(fā)送的消息。SJ C;VZ n dcwsKsystcm 3 Znt it d cxe - kafka工ern營(yíng) Mbp1rodueJb#t -brokrrist lucalbostlSOSpMt At KJfku?odl by:Itaf kn. producerMe wh in vPkso due er in it XBaaoFroducoi . oca La ? 40 kf kfli. ironlfi .Gfinci 1 RPFnriiiC4p$ - wi nCCritnliffPVofliicgcaIa = 49 )k
30、fka.Guostihap lip 1 s fiueri in hantstrap_sftereAt orq .acidLche .kaf ka c lients .CLlentUtils. VbflrBAndUdlidoitcflddressesCClientUt; i.Ls,. java:&Ei)at arg .apache .kaf ka. c 1 ient . pmduce*.Kaf kaFraducei*_1 binindowakara console producer .b11 LJf irat teat lic Lio wo rid ekf TtTfTfirffTTfrT! IT
31、 MTtffTrTTTTTTfrflTkSI3nondhe%金,-zwkeepr localho輯 121&” * |Hicrejoft Vuid戶號(hào)落醫(yī)不止除C: Misers SDcuEloipcd C = kaf laJsaf ka. J!. 11-1.1.1 MndoMK3命令語(yǔ)法不正確,CJs MJsera Dcvclapcd C:knf lwi_2 -11 1.1.1 Sbin windowisn; k*i.111 .1.1 Xliln xu i nd-ftws1 取內(nèi)10一noni&Hinp*蘆gr -f。&解曉膽 t* 1 nrIhoat :12161 -topic lest0
32、31Lining tko ConsoleCangith .Id. consunsi* i群 d19P甘.Gat4d mnd vlII Jbe Qrwvqd in a Furijpe oajnv elf?ase, Cninstrier using! tlie fir” cfinimpF hy passing rhnntstrap-sE:i*uer 1 insteeid of IzDokee per 1 - nyF irst test., he 1 lo warlri .Ll*-F - BI!IB!Hia!aB 1 Baill 查看所有主題kafka-topics.bat -list -zooke
33、eper localhost:12181查看Topic分區(qū)和副本kafka-topics.bat -describe -zookeeper localhost:121811.7消息大小調(diào)整Kafka對(duì)于10KB大小的消息吞吐率最好,默認(rèn)配置最大支持1MB 的消息大小。對(duì)于大消息的傳輸,需要修改kafka的perties、 consumer、 producer 的相關(guān)配置。perties 修改:打開(kāi)perties(按照最大1GB)message.max.bytes=1073741824replica.fetch.max.bytes=1073741824log.segment.bytes=1073
34、741824consumer 酉己置:max.partition.fetch.bytes=1073741824Producer 酉己置:max.request.size=1073741824#33554432,默認(rèn) 32Mbuffer.memory= 1073741824mon.errors.RecordTooLargeException: The message is 36428062 bytes when serialized whichis larger than the total memory buffer you have configured with the buffer.me
35、mory configuration.附件太大可能會(huì)內(nèi)存溢出,還會(huì)涉及超時(shí)參數(shù)配置等。2 JAVA程序示例Producer程序示例Properties 文件配置#producerbootstrap.servers= XX.XX.0.12:19092,XX.XX.0.13:19092,XX.XX.0.14:19092,XX.XX.0.15:19092,XX.XX.0.16:19092producer.type= syncrequest.required.acks= 1#consumer mit= true八 catch (Exception e) #latest, earliest, none
36、auto.offset.reset= earliest建議公共參數(shù)(如服務(wù)地址)配置在 properties文件里。其他參數(shù)根據(jù)接口需要程序中配置。/ 創(chuàng)建 Producerprivate ProducerInteger, StringcreateProducer() Properties props =new Properties。;String path =ProducerDemo. class .getResource( / ).getFile().toString()+ perties ;try FileInputStreamfis = new FileInputStream( new
37、File(path);props.load(fis);props.put( key.serializer ,mon.serialization.IntegerSerializer );props.put( value.serializer , mon.serialization.StringSerializer);fis.close();e.printStackTrace();return new KafkaProducer(props);Properties 配置詳解0:producer不會(huì)等待 broker 發(fā)送 ack1:當(dāng)leader接收到消息后發(fā)送ackall(-1): 當(dāng)所有的fo
38、llower都同步消息成功后發(fā)送ackrequest.required.acks=0主題 +VALUEimport java.io.File;import java.io.FileInputStream;import java.util.Properties;importducer.KafkaProducer;import ducer.Producer;import ducer.ProducerRecord;catch (Exception e) public class TopicValue / 創(chuàng)建 Producerprivate ProducerString, StringcreateP
39、roducer()Properties props =new Properties。;String path =ProducerDemo. class .getResource( / ).getFile().toStrin g()pertiestry FileInputStream fis =newFileInputStream( new File(path);props.load(fis);props.put(key.serializermon.serialization.StringSerializ er);props.put( value.serializermon.serializat
40、ion.StringSerializerfis.close();e.printStackTrace(); return new KafkaProducer(props);public static void main(String口 args) /消息主題String topicName= testOOI;TopicValue topicValueProducer= new TopicValue();ProducerString, Stringproducer = topicValueProducer.createProducer();producer.send( new ProducerRe
41、cord(topicName, 消息:TopicValue );producer.flush();producer.close();System. out .println( Message send successfully );主題+KEY+VALUEpackage ducer;import java.io.File;import java.io.FileInputStream;import java.util.Properties;importducer.KafkaProducer;import ducer.Producer;importducer.ProducerRecord;publ
42、ic class TopicIntegerString / 創(chuàng)建 Producerprivate Producer createProducer()Properties props = new Properties。;String path =ProducerDemo.class.getResource(/).getFile().toStrin g()八八+ perties;try FileInputStream fis = new FileInputStream(new File(path);props.load(fis);props.put(key.serializer,mon.seria
43、lization.IntegerSeriali zer);props.put(value.serializer,mon .serialization.StringSerializer);fis.close(); catch (Exception e) e.printStackTrace();return new KafkaProducer(props);public static void main(String口 args) /消息主題String topicName=test001”;TopicIntegerString topicValueProducer=newTopicInteger
44、String();ProducerInteger, Stringproducer =topicValueProducer.createProducer();producer.send(new ProducerRecord(topicName, 1, 消息:TopicIntegerStringl);producer.flush();producer.close();System.out.println(Message send successfully);import java.io.File;import java.io.FileInputStream;import java.util.Pro
45、perties;importducer.KafkaProducer;import ducer.Producer;importducer.ProducerRecord;public class TopicStringString / 創(chuàng)建 Producerprivate ProducerString, StringcreateProducer() Properties props = new Properties。;String path =ProducerDemo.class.getResource(/).getFile().toStrin g()+ perties;try FileInput
46、Stream fis = newFileInputStream(new File(path);props.load(fis);props.put(key.serializer,mon.serialization.StringSerializ er);props.put(value.serializer,mon .serialization.StringSerializer);fis.close(); catch (Exception e) e.printStackTrace(); return new KafkaProducer(props);public static void main(S
47、tring口 args) /消息主題String topicName=test001”;TopicStringString topicValueProducer=new TopicStringString();ProducerString, Stringproducer = topicValueProducer.createProducer();producer.send(new ProducerRecord(topicName, TopicStringString001,消息:TopicStringString001);producer.flush();producer.close();Sy
48、stem.out.println(Message send successfully);package ducer;import java.io.File;import java.io.FileInputStream;import java.util.Properties;importducer.KafkaProducer;import ducer.Producer;importducer.ProducerRecord;public class TopicStringByte / 創(chuàng)建 Producerprivate Producer createProducer() Properties p
49、rops = new Properties。;String path =ProducerDemo.class.getResource(/).getFile().toStrin g()八八+ perties;try FileInputStream fis = new FileInputStream(new File(path);props.load(fis);props.put(key.serializer,mon.serialization.StringSerializ er);props.put(value.serializer,mon .serialization.ByteArraySer
50、ializer);fis.close(); catch (Exception e) e.printStackTrace();return new KafkaProducer(props);public static void main(String口 args) /消息主題String topicName=test001”;TopicStringByte topicValueProducer=new TopicStringByte();Producer producer = topicValueProducer.createProducer();producer.send(new Produc
51、erRecord(topicName, TopicStringByte001,消息:TopicStringByte001”.getBytes();producer.flush();producer.close();System.out.println(Message send successfully); package ducer;import java.io.File;import java.io.FileInputStream;import java.util.Properties;import ducer.KafkaProducer; import ducer.Producer;imp
52、ortducer.ProducerRecord;public class TopicByteByte / 創(chuàng)建 Producerprivate Producer createProducer() Properties props = new Properties。;String path =ProducerDemo.class.getResource(/).getFile().toStrin g()+ perties;try FileInputStream fis = new FileInputStream(new File(path);props.load(fis);props.put(ke
53、y.serializer,mon.serialization.ByteArraySeria lizer);props.put(value.serializer,mon.serialization.ByteArraySerializer);fis.close(); catch (Exception e) e.printStackTrace();return new KafkaProducer(props);public static void main(String口 args) /消息主題String topicName=test001”;TopicByteByte topicValuePro
54、ducer=newTopicByteByte();Producer producer = topicValueProducer.createProducer();producer.send(new ProducerRecord(topicName, TopicByteByte001”.getBytes(),消息:TopicByteByte001”.getBytes();producer.flush();producer.close();System.out.println(Message send successfully);發(fā)送文件消息/*/package ducer;import java
55、.io.BufferedOutputStream;import java.io.ByteArrayOutputStream;import java.io.File;import java.io.FileInputStream;import java.io.FileOutputStream;import ducer.Producer;import ducer.ProducerRecord;/* * author Develop八發(fā)送消息類*/public class SendMsgFileTest /根據(jù)文件名獲取字節(jié)數(shù)組public static byte口 getFileBytes(Stri
56、ng fileName) byte口 buffer=null;FileInputStream fis =null;ByteArrayOutputStream bos=null;try File file = new File(fileName);fis = new FileInputStream(file);long fileSize=file.length();if(fileSizeInteger.MAX_VALUE)System.out.println(文件太大,無(wú)法處理!);return null;bos=new ByteArrayOutputStream(int)fileSize);b
57、yte b=new byte1024;int len=0;while(len=fis.read(b,0,1024)!=-1)bos.write(b, 0, len);buffer=bos.toByteArray(); catch (Exception ex) ex.printStackTrace(); finally if (bos != null)try bos.close(); catch (Exception ex) ex.printStackTrace();if (fis != null)try fis.close(); catch (Exception ex) ex.printSta
58、ckTrace(); return buffer;public static void main(String口 args) try /消息主題String topicName = testOOI;TopicStringByte topicValueProducer = new TopicStringByte();Producer producer =topicValueProducer.createProducer();System.out.println(開(kāi)始發(fā)送消息!);int count = 0;while (count 1) count+;StringfilePath=C:/kafk
59、a/workspace/linecount3.7.rar;Stringkey=count+_+filePath.substring(filePath.lastIndexOf (/)+1);byte口 buffer=getFileBytes(filePath);ProducerRecord pr = newProducerRecord( topicName, key,buffer);producer.send(pr);Thread.sleep(100);producer.flush();producer.close();System.out.println( 消息發(fā)送完成!); catch (E
60、xception ex) ex.printStackTrace();回調(diào)函數(shù)package ducer;import java.io.File;import java.io.FileInputStream;import java.util.Properties;import ducer.Callback;八八importducer.KafkaProducer;import ducer.Producer;importducer.ProducerRecord;importducer.RecordMetadata;public class CallbackProducer / 創(chuàng)建 Producer
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 二零二五年度財(cái)務(wù)信息系統(tǒng)集成合同3篇
- 2024石子深加工技術(shù)研發(fā)與應(yīng)用合同3篇
- 2024玩具樂(lè)園設(shè)備采購(gòu)及租賃服務(wù)合同3篇
- 2024版影視作品版權(quán)轉(zhuǎn)讓與授權(quán)播放合同
- 2025年松樹(shù)造林項(xiàng)目采購(gòu)合同3篇
- 二零二五版船舶光租及船舶安全管理體系合同3篇
- 二零二五年度安置房項(xiàng)目公共設(shè)施維護(hù)合同3篇
- 2025年度淋浴房綠色環(huán)保材料采購(gòu)與安裝服務(wù)合同4篇
- 2025年度鋁材貿(mào)易結(jié)算與風(fēng)險(xiǎn)管理合同4篇
- 二零二五年度跨境電商進(jìn)口采購(gòu)合同3篇
- 領(lǐng)導(dǎo)溝通的藝術(shù)
- 發(fā)生用藥錯(cuò)誤應(yīng)急預(yù)案
- 南潯至臨安公路(南潯至練市段)公路工程環(huán)境影響報(bào)告
- 綠色貸款培訓(xùn)課件
- 大學(xué)生預(yù)征對(duì)象登記表(樣表)
- 主管部門審核意見(jiàn)三篇
- 初中數(shù)學(xué)校本教材(完整版)
- 父母教育方式對(duì)幼兒社會(huì)性發(fā)展影響的研究
- 新課標(biāo)人教版數(shù)學(xué)三年級(jí)上冊(cè)第八單元《分?jǐn)?shù)的初步認(rèn)識(shí)》教材解讀
- (人教版2019)數(shù)學(xué)必修第一冊(cè) 第三章 函數(shù)的概念與性質(zhì) 復(fù)習(xí)課件
- 重慶市銅梁區(qū)2024屆數(shù)學(xué)八上期末檢測(cè)試題含解析
評(píng)論
0/150
提交評(píng)論