SpringBoot如何獲取Kafka的Topic列表_第1頁
SpringBoot如何獲取Kafka的Topic列表_第2頁
SpringBoot如何獲取Kafka的Topic列表_第3頁
SpringBoot如何獲取Kafka的Topic列表_第4頁
SpringBoot如何獲取Kafka的Topic列表_第5頁
已閱讀5頁,還剩1頁未讀 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權,請進行舉報或認領

文檔簡介

第SpringBoot如何獲取Kafka的Topic列表目錄寫在前面命令行模式代碼模式總結

寫在前面

眾所周知,kafka是現(xiàn)代流行的消息隊列,它使用經(jīng)典的消息訂閱發(fā)布模式實現(xiàn)消息的流轉,大部分代碼結合kafka使用都是使用它的生產(chǎn)者和消費者來實現(xiàn)消息的傳遞,那么對于kafka的主題的管理怎么使用代碼實現(xiàn)呢,這是今天要講的主題

命令行模式

kafka要結合zookeeper使用,因為它把元數(shù)據(jù)信息交給了zookeeper管理,其實使用命令行命令很容易就能對topic進行管理,主要使用的命令是kafka-topics.sh

創(chuàng)建主題

kafka-topics.sh--zookeeperlocalhost:2181--create--topicmy-topic--replication-factor3--partitions3

查看主題列表

kafka-topics.sh--zookeeperlocalhost:2181--list

查看主題狀態(tài)

kafka-topics.sh--describe

--zookeeper:2181--topicTestTopic

代碼模式

那么話說回來如何使用代碼實現(xiàn)topic的管理呢,那么現(xiàn)在就來看一下代碼的實現(xiàn)方式,此處使用springboot2框架實現(xiàn)。

首先引進依賴kafka的相關

dependency

groupIdorg.springframework.kafka/groupId

artifactIdspring-kafka/artifactId

/dependency

創(chuàng)建一個測試類進行測試

publicstaticvoidmain(String[]args){

Propertiesproperties=

newProperties();

properties.put("bootstrap.servers","1:9093");

properties.put("key.serializer","mon.serialization.StringSerializer");

properties.put("value.serializer","mon.serialization.StringSerializer");

AdminClientadminClient=AdminClient.create(properties);

ListTopicsResultresult=adminClient.listTopics();

KafkaFutureSetStringnames=s();

try{

names.get().forEach((k)-{

System.out.println(k);

});

}catch(InterruptedException|ExecutionExceptione){

e.printStackTrace();

}

adminClient.close();

}

這里面最主要的就是AdminClient這個類,AdminClient實現(xiàn)了Admin接口,Admin里面定義了許多和kafka配置相關的東西

讓我們依次來看一下

publicabstractclassAdminClientimplementsAdmin{

publicAdminClient(){

}

publicstaticAdminClientcreate(Propertiesprops){

return(AdminClient)Admin.create(props);

}

publicstaticAdminClientcreate(MapString,Objectconf){

return(AdminClient)Admin.create(conf);

}

}

而Admin接口里有以下方法

staticAdmincreate(Propertiesprops){

returnKafkaAdminClient.createInternal(newAdminClientConfig(props,true),(TimeoutProcessorFactory)null);

}

staticAdmincreate(MapString,Objectconf){

returnKafkaAdminClient.createInternal(newAdminClientConfig(conf,true),(TimeoutProcessorFactory)null);

}

defaultvoidclose(){

this.close(9223372036854775807L,TimeUnit.MILLISECONDS);

}

/**@deprecated*/

@Deprecated

defaultvoidclose(longduration,TimeUnitunit){

this.close(Duration.ofMillis(unit.toMillis(duration)));

}

voidclose(Durationvar1);

defaultCreateTopicsResultcreateTopics(CollectionNewTopicnewTopics){

returnthis.createTopics(newTopics,newCreateTopicsOptions());

}

CreateTopicsResultcreateTopics(CollectionNewTopicvar1,CreateTopicsOptionsvar2);

defaultDeleteTopicsResultdeleteTopics(CollectionStringtopics){

returnthis.deleteTopics(topics,newDeleteTopicsOptions());

}

DeleteTopicsResultdeleteTopics(CollectionStringvar1,DeleteTopicsOptionsvar2);

defaultListTopicsResultlistTopics(){

returnthis.listTopics(newListTopicsOptions());

}

ListTopicsResultlistTopics(ListTopicsOptionsvar1);

defaultDescribeTopicsResultdescribeTopics(CollectionStringtopicNames){

returnthis.describeTopics(topicNames,newDescribeTopicsOptions());

}

DescribeTopicsResultdescribeTopics(CollectionStringvar1,DescribeTopicsOptionsvar2);

defaultDescribeClusterResultdescribeCluster(){

returnthis.describeCluster(newDescribeClusterOptions());

}

DescribeClusterResultdescribeCluster(DescribeClusterOptionsvar1);

defaultDescribeAclsResultdescribeAcls(AclBindingFilterfilter){

returnthis.describeAcls(filter,newDescribeAclsOptions());

}

DescribeAclsResultdescribeAcls(AclBindingFiltervar1,DescribeAclsOptionsvar2);

通過名稱我們可以看出,里面有創(chuàng)建Topic,有刪除Topic,有列出所有Topic,有描述Topic

我們通過這些方法可

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論