《通信數(shù)據(jù)分析與實(shí)戰(zhàn)》課件-第六章 Kafka 分布式發(fā)布訂閱消息系統(tǒng)_第1頁
《通信數(shù)據(jù)分析與實(shí)戰(zhàn)》課件-第六章 Kafka 分布式發(fā)布訂閱消息系統(tǒng)_第2頁
《通信數(shù)據(jù)分析與實(shí)戰(zhàn)》課件-第六章 Kafka 分布式發(fā)布訂閱消息系統(tǒng)_第3頁
《通信數(shù)據(jù)分析與實(shí)戰(zhàn)》課件-第六章 Kafka 分布式發(fā)布訂閱消息系統(tǒng)_第4頁
《通信數(shù)據(jù)分析與實(shí)戰(zhàn)》課件-第六章 Kafka 分布式發(fā)布訂閱消息系統(tǒng)_第5頁
已閱讀5頁,還剩56頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡介

通信數(shù)據(jù)分析與實(shí)戰(zhàn)Kafka分布式發(fā)布訂閱消息系統(tǒng)第六章第1節(jié)2知道消息傳遞模式熟悉Kafka的核心組件學(xué)習(xí)目標(biāo)TARGET消息傳遞模式簡介一個消息系統(tǒng)負(fù)責(zé)將數(shù)據(jù)從一個應(yīng)用程序傳遞到另外一個應(yīng)用程序中,應(yīng)用程序只關(guān)注數(shù)據(jù),無需關(guān)注數(shù)據(jù)在多個應(yīng)用之間是如何傳遞的,分布式消息傳遞基于可靠的消息隊(duì)列,在客戶端應(yīng)用和消息系統(tǒng)之間異步傳遞消息。消息系統(tǒng)有兩種主要消息傳遞模式,分別是點(diǎn)對點(diǎn)消息傳遞模式和發(fā)布訂閱消息傳遞模式。消息傳遞模式簡介1.倒排索引介紹1.倒排索引介紹1.點(diǎn)對點(diǎn)消息傳遞模式點(diǎn)對點(diǎn)消息傳遞模式結(jié)構(gòu)中,消息是通過一個虛擬通道進(jìn)行傳輸?shù)模a(chǎn)者發(fā)送一條數(shù)據(jù),消息將持久化到一個隊(duì)列中,此時將有一個或多個消費(fèi)者會消費(fèi)隊(duì)列中數(shù)據(jù),但是一條消息只能被消費(fèi)一次,且消費(fèi)后的消息會從消息隊(duì)列中刪除,因此,即使有多個消費(fèi)者同時消費(fèi)數(shù)據(jù),數(shù)據(jù)都可以被有序處理。消息傳遞模式簡介1.倒排索引介紹2.發(fā)布訂閱消息傳遞模式在發(fā)布訂閱模式中,發(fā)布者用于發(fā)布消息,訂閱者用于訂閱消息,發(fā)布訂閱模式可以有多種不同的訂閱者,發(fā)布者發(fā)布的消息會被持久化到一個主題中,這與點(diǎn)對點(diǎn)模式不同的是,訂閱者可訂閱一個或多個主題,訂閱者可讀取該主題中所有數(shù)據(jù),同一條數(shù)據(jù)可被多個訂閱者消費(fèi),數(shù)據(jù)被消費(fèi)后也不會立即刪除。Kafka的概述Kafka是由Apache軟件基金會開發(fā)的一個開源流處理平臺,它由Scala和Java語言編寫,是一個基于Zookeeper系統(tǒng)的分布式發(fā)布訂閱消息系統(tǒng),該項(xiàng)目的設(shè)計(jì)初衷是為實(shí)時數(shù)據(jù)提供一個統(tǒng)一、高通量、低等待的消息傳遞平臺。Kafka的概述應(yīng)用程序A應(yīng)用程序B高度解耦高吞吐低延遲擴(kuò)展性持久性容錯性多語言Kafka的核心組件組件名稱相關(guān)說明Topic特定類別消息流稱為主題,數(shù)據(jù)存在主題中,主題被拆分成分區(qū)Partition主題的數(shù)據(jù)分割為一個或多個分區(qū),每個分區(qū)的數(shù)據(jù)使用多個segment文件存儲,分區(qū)中的數(shù)據(jù)是有序的Offset每個分區(qū)消息具有的唯一序列標(biāo)識Replica副本只是一個分區(qū)的備份,它們用于防止數(shù)據(jù)丟失Producer生產(chǎn)者即數(shù)據(jù)發(fā)布者,該角色將消息發(fā)布到Kafka集群主題中Kafka的核心組件組件名稱相關(guān)說明Consumer消費(fèi)者可從Broker中讀取數(shù)據(jù),可消費(fèi)多個主題數(shù)據(jù)Broker每個Kafka服務(wù)節(jié)點(diǎn)都為Broker,Broker接收消息后,將消息追加到segment文件中Leader負(fù)責(zé)分區(qū)的所有讀寫操作Follower跟隨領(lǐng)導(dǎo)指令,若Leader發(fā)生故障則選一個Follower為新LeaderConsumerGroup實(shí)現(xiàn)一個主題消息的廣播和單播的手段Kafka的核心組件生產(chǎn)者主題分區(qū)一分區(qū)二分區(qū)三offsetoffsetoffset服務(wù)器節(jié)點(diǎn)備份消費(fèi)組消費(fèi)者一消費(fèi)者二消費(fèi)者三11小結(jié)知道消息傳遞模式熟悉Kafka的核心組件通信數(shù)據(jù)分析與實(shí)戰(zhàn)Kafka分布式發(fā)布訂閱消息系統(tǒng)第六章第2節(jié)13熟悉Kafka的工作原理學(xué)習(xí)目標(biāo)TARGETKafka工作原理生產(chǎn)者向Kafka集群中生產(chǎn)消息。Producer是消息的生產(chǎn)者,通常情況下,數(shù)據(jù)消息源可是服務(wù)器日志、業(yè)務(wù)數(shù)據(jù)及Web服務(wù)數(shù)據(jù)等,生產(chǎn)者采用推送的方式將數(shù)據(jù)消息發(fā)布到Kafka的主題中,主題本質(zhì)就是一個目錄,而主題是由PartitionLogs(分區(qū)日志)組成,每條消息都被追加到分區(qū)中。1.生產(chǎn)者生產(chǎn)消息過程Kafka工作原理1.生產(chǎn)者生產(chǎn)消息過程1Producer先讀取Zookeeper的“/brokers/.../state”節(jié)點(diǎn)中找到該P(yáng)artition的Leader。2Producer將消息發(fā)送給Leader。3Leader負(fù)責(zé)將消息寫入本地分區(qū)Log文件中。Kafka工作原理1.生產(chǎn)者生產(chǎn)消息過程4Follower從Leader中讀取消息,完成備份操作。5Follower寫入本地Log文件后,會向Leader發(fā)送Ack,每次發(fā)送消息都會有一個確認(rèn)反饋機(jī)制,以確保消息正常送達(dá)。6Leader收到所有Follower發(fā)送的Ack后,向Producer發(fā)送Ack,生產(chǎn)消息完成。Kafka工作原理1.生產(chǎn)者生產(chǎn)消息過程Kafka工作原理2.消費(fèi)者消費(fèi)消息過程Kafka采用拉取模型,由消費(fèi)者記錄消費(fèi)狀態(tài),根據(jù)主題、Zookeeper集群地址和要消費(fèi)消息的偏移量,每個消費(fèi)者互相獨(dú)立地按順序讀取每個分區(qū)的消息,消費(fèi)者消費(fèi)消息的流程圖如下所示。19小結(jié)熟悉Kafka的工作原理通信數(shù)據(jù)分析與實(shí)戰(zhàn)Kafka分布式發(fā)布訂閱消息系統(tǒng)第六章第3節(jié)21掌握Kafka的安裝和啟動掌握Kafka基于命令行的使用學(xué)習(xí)目標(biāo)TARGETKafka集群部署與測試1.安裝Kafka1下載Kafka安裝包,并解壓至hadoop01節(jié)點(diǎn)中的/export/software目錄下。2修改配置文件。在perties配置文件中指定broker編號、Kafka運(yùn)行日志存放的路徑、指定Zookeeper地址和本地IP。3添加環(huán)境變量。在/etc/profile文件中添加Kafka環(huán)境變量。4分發(fā)文件。將Kafka安裝目錄kafka_2.11-2.0.0及環(huán)境配置文件profile分發(fā)至hadoop02、hadoop03上,并修改broker.id和。Kafka集群部署與測試1.安裝Kafka1下載Kafka安裝包,并解壓至hadoop01節(jié)點(diǎn)中的/export/software目錄下。#1.切換到軟件包存放目錄cd/export/software#2.將kafka_2.11-2.0.0.tgz上傳到指定位置rz#3.解壓到指定目錄/export/servers/tar-zxvfkafka_2.11-2.0.0.tgz/export/servers/Kafka集群部署與測試1.安裝Kafka#broker的全局唯一編號,不能重復(fù)broker.id=0#kafka運(yùn)行日志存放的路徑log.dirs=/export/data/kafka/#broker需要使用zookeeper保存meta數(shù)據(jù)zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181#刪除topicdelete.topic.enable=true#設(shè)置本機(jī)IP=hadoop01Kafka集群部署與測試1.安裝Kafkavi/etc/profile

exportKAFKA_HOME=/export/servers/kafka_2.11-2.0.0exportPATH=$PATH:$KAFKA_HOME/binKafka集群部署與測試1.安裝Kafkacd/export/serversscp-rkafka_2.11-2.0.0hadoop02:/export/servers/scp-rkafka_2.11-2.0.0hadoop02:/export/servers/scp/etc/profilehadoop02:/etc/profilescp/etc/profilehadoop03:/etc/profile#分別在hadoop010203上面激活profileSourceprofileKafka集群部署與測試1.安裝Kafka

#hadoop02

broker.id=1=hadoop02

#hadoop03

broker.id=2=hadoop03cd/export/servers/kafka_2.11-2.0.0/confvipertiesKafka集群部署與測試啟動Zookeeper服務(wù)Kafka集群部署與測試啟動Kafka服務(wù)cd/export/servers/kafka_2.11-2.0.0bin/kafka-server-start.shconfig/perties基于命令行方式使用Kafka1.創(chuàng)建主題$kafka-topics.sh--create\--topicittopic\--partitions3\--replication-factor2\--zookeeperhadoop01:2181,hadoop02:2181,hadoop03:2181基于命令行方式使用Kafka$kafka-console-producer.sh\--broker-listhadoop01:9092,hadoop02:9092,hadoop03:9092\--topicittopic--hellokafka2.向主題中發(fā)送消息數(shù)據(jù)基于命令行方式使用Kafka3.消費(fèi)主題中的消息$kafka-console-consumer.sh\--from-beginning--topicittopic\--bootstrap-serverhadoop01:9092,hadoop02:9092,hadoop03:909233小結(jié)掌握Kafka的安裝和啟動掌握Kafka基于命令行的使用通信數(shù)據(jù)分析與實(shí)戰(zhàn)Kafka分布式發(fā)布訂閱消息系統(tǒng)第六章第4節(jié)35掌握Kafka的生產(chǎn)者實(shí)例掌握Kafka的消費(fèi)者實(shí)例學(xué)習(xí)目標(biāo)TARGETKafka生產(chǎn)者消費(fèi)者實(shí)例1.基于JavaAPI方式使用Kafka

用戶不僅能夠通過命令行的形式操作Kafka服務(wù),Kafka還提供了許多編程語言的客戶端工具,用戶在開發(fā)獨(dú)立項(xiàng)目時,通過調(diào)用KafkaAPI來操作Kafka集群,其核心API主要有5種,分別是ProducerAPI、ConsumerAPI、StreamsAPI、ConnectAPI、AdminClientAPI。Kafka生產(chǎn)者消費(fèi)者實(shí)例KafkaProducer常用API方法名稱相關(guān)說明abortTransaction?()終止正在進(jìn)行的事物close?()關(guān)閉這個生產(chǎn)者flush?()調(diào)用此方法使所有緩沖的記錄立即發(fā)送partitionsFor?(java.lang.Stringtopic)獲取給定主題的分區(qū)元數(shù)據(jù)send?(ProducerRecord<K,V>record)異步發(fā)送記錄到主題Kafka生產(chǎn)者消費(fèi)者實(shí)例KafkaConsumer常用API方法名稱相關(guān)說明subscribe?(java.util.Collection<java.lang.String>topics)訂閱給定主題列表以獲取動態(tài)分區(qū)close?()關(guān)閉這個消費(fèi)者wakeup?()喚醒消費(fèi)者metrics?()獲取消費(fèi)者保留的指標(biāo)listTopics?()獲取有關(guān)用戶有權(quán)查看的所有主題的分區(qū)的元數(shù)據(jù)Kafka生產(chǎn)者消費(fèi)者實(shí)例操作1創(chuàng)建一個名為“spark_chapter06”的Maven工程,在pom.xml文件中添加Kafka依賴。2創(chuàng)建KafkaProducerTest文件用于生產(chǎn)消息數(shù)據(jù)并將數(shù)據(jù)發(fā)送到Kafka集群。3通過KafkaAPI創(chuàng)建KafkaConsumerTest對象,用于消費(fèi)Kafka集群中名為“ittopic”主題的消息數(shù)據(jù)。Kafka生產(chǎn)者消費(fèi)者實(shí)例操作消費(fèi)者消費(fèi)消息效果圖Kafka生產(chǎn)者消費(fèi)者實(shí)例操作1創(chuàng)建一個名為“spark_chapter06”的Maven工程,在pom.xml文件中添加Kafka依賴。<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version></dependency></dependencies>Kafka生產(chǎn)者消費(fèi)者實(shí)例操作2創(chuàng)建KafkaProducerTest文件用于生產(chǎn)消息數(shù)據(jù)并將數(shù)據(jù)發(fā)送到Kafka集群。publicclassKafkaProducerTest{publicstaticvoidmain(String[]args){Propertiesprops=newProperties();//1、指定Kafka集群的IP地址和端口號props.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");

//2、指定等待所有副本節(jié)點(diǎn)的應(yīng)答props.put("acks","all");

//3、指定消息發(fā)送最大嘗試次數(shù)props.put("retries",0);

//4、指定一批消息處理大小props.put("batch.size",16384);

//5、指定請求延時props.put("linger.ms",1);Kafka生產(chǎn)者消費(fèi)者實(shí)例操作2創(chuàng)建KafkaProducerTest文件用于生產(chǎn)消息數(shù)據(jù)并將數(shù)據(jù)發(fā)送到Kafka集群。

//6、指定緩存區(qū)內(nèi)存大小

props.put("buffer.memory",33554432);

//7、設(shè)置key序列化

props.put("key.serializer","mon.serialization.StringSerializer");

//8、設(shè)置value序列化

props.put("value.serializer","mon.serialization.StringSerializer");

//9、生產(chǎn)數(shù)據(jù)

KafkaProducer<String,String>producer=newKafkaProducer<String,String>(props);for(inti=0;i<50;i++){producer.send(newProducerRecord<String,String>("ittopic",Integer.toString(i),"helloworld-"+i));}producer.close();Kafka生產(chǎn)者消費(fèi)者實(shí)例操作3通過KafkaAPI創(chuàng)建KafkaConsumer對象,用于消費(fèi)Kafka集群中名為“ittopic”主題的消息數(shù)據(jù)。publicclassKafkaConsumerTest{publicstaticvoidmain(String[]args){

//1.準(zhǔn)備配置文件Propertiesprops=newProperties();

//2.指定Kafka集群主機(jī)名和端口號props.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");

//3.指定消費(fèi)者組ID,在同一時刻同一消費(fèi)組中只有//一個線程可以去消費(fèi)一個分區(qū)消息,不同的消費(fèi)組可以去消費(fèi)同一個分區(qū)的消息。props.put("group.id","ittopic");

//4.自動提交偏移量props.put("mit","true");Kafka生產(chǎn)者消費(fèi)者實(shí)例操作3通過KafkaAPI創(chuàng)建KafkaConsumer對象,用于消費(fèi)Kafka集群中名為“ittopic”主題的消息數(shù)據(jù)。//5.自動提交時間間隔,每秒提交一次

props.put("erval.ms","1000");props.put("key.deserializer","mon.serialization.StringDeserializer");props.put("value.deserializer","mon.serialization.StringDeserializer");KafkaConsumer<String,String>kafkaConsumer=newKafkaConsumer<String,String>(props);

//6.訂閱消息,這里的topic可以是多個

kafkaConsumer.subscribe(Arrays.asList("ittopic"));Kafka生產(chǎn)者消費(fèi)者實(shí)例操作3通過KafkaAPI創(chuàng)建KafkaConsumer對象,用于消費(fèi)Kafka集群中名為“ittopic”主題的消息數(shù)據(jù)。//7.獲取消息

while(true){//每隔100ms就拉去一次

ConsumerRecords<String,String>records=kafkaConsumer.poll(100);for(ConsumerRecord<String,String>record:records){System.out.printf("topic=%s,offset=%d,key=%s,value=%s%n",record.topic(),record.offset(),record.key(),record.value());}}47小結(jié)掌握Kafka的生產(chǎn)者實(shí)例掌握Kafka的消費(fèi)者實(shí)例通信數(shù)據(jù)分析與實(shí)戰(zhàn)Kafka分布式發(fā)布訂閱消息系統(tǒng)第六章第5節(jié)49熟悉KafkaStreams的作用掌握KafkaStreams的案例學(xué)習(xí)目標(biāo)TARGETKafka

Streams概述

KafkaStreams是ApacheKafka開源的一個流處理框架,基于Kafka的生產(chǎn)者和消費(fèi)者,為開發(fā)者提供流式處理能力,具有低延遲性、高擴(kuò)展性、彈性、容錯的特點(diǎn),易于集成到現(xiàn)有應(yīng)用程序中。它是一套處理分析Kafka中存儲數(shù)據(jù)的客戶端類庫,處理完的數(shù)據(jù)可重新寫回Kafka,也可發(fā)送給外部存儲系統(tǒng)。Kafka

Streams概述在流式計(jì)算框架模型中,通常需要構(gòu)建數(shù)據(jù)流的拓?fù)浣Y(jié)構(gòu),例如生產(chǎn)數(shù)據(jù)源、分析數(shù)據(jù)的處理器及處理完后發(fā)送的目標(biāo)節(jié)點(diǎn),Kafka流處理框架同樣將“輸入主題自定義處理器輸出主題”抽象成一個DAG拓?fù)鋱D。生產(chǎn)者作為數(shù)據(jù)源不斷生產(chǎn)和發(fā)送消息至Kafka的testStreams1主題中,通過自定義處理器對每條消息執(zhí)行相應(yīng)計(jì)算邏輯,最后將結(jié)果發(fā)送到Kafka的testStreams2主題中供消費(fèi)者消費(fèi)消息數(shù)據(jù)。Kafka

Streams案例1在spark_chapter06項(xiàng)目中,打開pom.xml文件,添加KafkaStreams依賴。2創(chuàng)建LogProcessor類,并繼承StreamsAPI中的Processor接口,實(shí)現(xiàn)單詞計(jì)數(shù)業(yè)務(wù)邏輯。3單詞計(jì)數(shù)的業(yè)務(wù)功能開發(fā)完成后,KafkaStreams需要編寫一個運(yùn)行主程序的類App,用來測試LogProcessor業(yè)務(wù)程序。Kafka

Streams案例4在hadoop01節(jié)點(diǎn)創(chuàng)建testStreams1和testStreams2主題。5分別在hadoop01和hadoop02節(jié)點(diǎn)啟動生產(chǎn)者服務(wù)和消費(fèi)者服務(wù)。6運(yùn)行App主程序類。在生產(chǎn)者服務(wù)節(jié)點(diǎn)(hadoop01)中輸入測試語句,返回消費(fèi)者服務(wù)節(jié)點(diǎn)(hadoop02)中查看執(zhí)行結(jié)果。Kafka

Streams案例<dependency>?

<groupId>org.apache.kafka</groupId>?

<artifactId>kafka-streams</artifactId>?

<version>2.0.0</version></dependency>Kafka

Streams案例publicclassLogProcessorimplementsProcessor<byte[],byte[]>{

//上下文對象

privateProcessorContextprocessorContext;

@Override

publicvoidinit(ProcessorContextprocessorContext){//初始化方法

this.processorContext=processorContext;}

@Override

publicvoidprocess(byte[]key,byte[]value){

//處理一條消息

StringinputOri=newString(value);

HashMap<String,Integer>map=newHashMap<String,Integer>();

inttimes=1;

if(inputOri.contains("")){

//截取字段

String[]words=inputOri.split("");

for(Stringword:words){

if(map.containsKey(word)){

map.put(word,map.get(word)+1);

}else{

map.put(word,times);}}}

inputOri=map.toString();

processorContext.forward(key,inputOri.getBytes());}

@Override

publicvoidclose(){}Kafka

Streams案例publicclassApp{

publicstaticvoidmain(String[]args){

StringfromTopic="testStreams1";

//聲明來源主題

StringtoTopic="testStreams2";

//聲明目標(biāo)主題

Propertiesprops=newProperties();

//設(shè)置參數(shù)

props.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor");

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092,hadoop03:9092");

//實(shí)例化StreamsConfig

StreamsConfigconfig=newStreamsConfig(props);

//構(gòu)建拓?fù)浣Y(jié)構(gòu)

Topologytopology=newTopology();

//添加源處理節(jié)點(diǎn),為源處理節(jié)點(diǎn)指定名稱和它訂閱的主題

topology.addSource("SOURCE",fromTopic)

//添加自定義處理節(jié)點(diǎn),指定名稱,處理器類和上一個節(jié)點(diǎn)的名稱

.addProcessor("PROCESSOR",newProcessorSupplier(

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論