![【移動(dòng)應(yīng)用開(kāi)發(fā)技術(shù)】SpringBoot整合RocketMQ遇到的坑怎么解決_第1頁(yè)](http://file4.renrendoc.com/view/9ab6e1805a4108af88467ef646d9564a/9ab6e1805a4108af88467ef646d9564a1.gif)
![【移動(dòng)應(yīng)用開(kāi)發(fā)技術(shù)】SpringBoot整合RocketMQ遇到的坑怎么解決_第2頁(yè)](http://file4.renrendoc.com/view/9ab6e1805a4108af88467ef646d9564a/9ab6e1805a4108af88467ef646d9564a2.gif)
![【移動(dòng)應(yīng)用開(kāi)發(fā)技術(shù)】SpringBoot整合RocketMQ遇到的坑怎么解決_第3頁(yè)](http://file4.renrendoc.com/view/9ab6e1805a4108af88467ef646d9564a/9ab6e1805a4108af88467ef646d9564a3.gif)
![【移動(dòng)應(yīng)用開(kāi)發(fā)技術(shù)】SpringBoot整合RocketMQ遇到的坑怎么解決_第4頁(yè)](http://file4.renrendoc.com/view/9ab6e1805a4108af88467ef646d9564a/9ab6e1805a4108af88467ef646d9564a4.gif)
![【移動(dòng)應(yīng)用開(kāi)發(fā)技術(shù)】SpringBoot整合RocketMQ遇到的坑怎么解決_第5頁(yè)](http://file4.renrendoc.com/view/9ab6e1805a4108af88467ef646d9564a/9ab6e1805a4108af88467ef646d9564a5.gif)
下載本文檔
版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
【移動(dòng)應(yīng)用開(kāi)發(fā)技術(shù)】SpringBoot整合RocketMQ遇到的坑怎么解決
本篇內(nèi)容主要講解“SpringBoot整合RocketMQ遇到的坑怎么解決”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓在下來(lái)帶大家學(xué)習(xí)“SpringBoot整合RocketMQ遇到的坑怎么解決”吧!在實(shí)現(xiàn)RocketMQ消費(fèi)時(shí),一般會(huì)用到@RocketMQMessageListener注解定義Group、Topic以及selectorExpression(數(shù)據(jù)過(guò)濾、選擇的規(guī)則)為了能支持動(dòng)態(tài)篩選數(shù)據(jù),一般都會(huì)使用表達(dá)式,然后通過(guò)apollo或者cloudconfig進(jìn)行動(dòng)態(tài)切換。
<!--
RocketMq
Spring
Boot
Starter-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>@RocketMQMessageListener(consumerGroup
=
"${rocketmq.group}",topic
="${rocketmq.topic}",selectorExpression
=
"${rocketmq.selectorExpression}")
public
class
Consumer
implements
RocketMQListener<String>
{
@Override
public
void
onMessage(String
s)
{
System.out.println("消費(fèi)到的數(shù)據(jù)為:"+s);
}
}RocketMQMessageListener整個(gè)注解默認(rèn)selectorExpression為*,表示接收當(dāng)前Topic下的所有數(shù)據(jù),如果我們想對(duì)tags進(jìn)行動(dòng)態(tài)配置,在使用${rocketmq.selectorExpression}表達(dá)式時(shí)會(huì)發(fā)現(xiàn)所有數(shù)據(jù)全被過(guò)濾了,跟蹤源碼(ListenerContainerConfiguration.java)發(fā)現(xiàn)在創(chuàng)建listener時(shí)selectorExpression的數(shù)據(jù)在通environment環(huán)境變量中獲取對(duì)應(yīng)的數(shù)據(jù)后又被覆蓋了,導(dǎo)致整個(gè)過(guò)濾條件被變更為表達(dá)式。@Override
public
void
afterSingletonsInstantiated()
{
//
獲取所有所有使用了RocketMQMessageListener注解的bean
Map<String,
Object>
beans
=
this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
if
(Objects.nonNull(beans))
{
//
循環(huán)注冊(cè)容器
beans.forEach(this::registerContainer);
}
}
private
void
registerContainer(String
beanName,
Object
bean)
{
Class<?>
clazz
=
AopProxyUtils.ultimateTargetClass(bean);
//
校驗(yàn)當(dāng)前bean是否實(shí)現(xiàn)了RocketMQListener接口
if
(!RocketMQListener.class.isAssignableFrom(bean.getClass()))
{
throw
new
IllegalStateException(clazz
+
"
is
not
instance
of
"
+
RocketMQListener.class.getName());
}
//
獲取bean上的annotation
RocketMQMessageListener
annotation
=
clazz.getAnnotation(RocketMQMessageListener.class);
//
解析group及topic,可支持表達(dá)式
String
consumerGroup
=
this.environment.resolvePlaceholders(annotation.consumerGroup());
String
topic
=
this.environment.resolvePlaceholders(annotation.topic());
boolean
listenerEnabled
=
(boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup,
Collections.EMPTY_MAP)
.getOrDefault(topic,
true);
if
(!listenerEnabled)
{
log.debug(
"Consumer
Listener
(group:{},topic:{})
is
not
enabled
by
configuration,
will
ignore
initialization.",
consumerGroup,
topic);
return;
}
validate(annotation);
String
containerBeanName
=
String.format("%s_%s",
DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
GenericApplicationContext
genericApplicationContext
=
(GenericApplicationContext)applicationContext;
//
注冊(cè)bean的,調(diào)用createRocketMQListenerContainer
genericApplicationContext.registerBean(containerBeanName,
DefaultRocketMQListenerContainer.class,
()
->
createRocketMQListenerContainer(containerBeanName,
bean,
annotation));
DefaultRocketMQListenerContainer
container
=
genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
if
(!container.isRunning())
{
try
{
container.start();
}
catch
(Exception
e)
{
log.error("Started
container
failed.
{}",
container,
e);
throw
new
RuntimeException(e);
}
}
("Register
the
listener
to
container,
listenerBeanName:{},
containerBeanName:{}",
beanName,
containerBeanName);
}
private
DefaultRocketMQListenerContainer
createRocketMQListenerContainer(String
name,
Object
bean,
RocketMQMessageListener
annotation)
{
DefaultRocketMQListenerContainer
container
=
new
DefaultRocketMQListenerContainer();
container.setRocketMQMessageListener(annotation);
String
nameServer
=
environment.resolvePlaceholders(Server());
nameServer
=
StringUtils.isEmpty(nameServer)
?
rocketMQProperties.getNameServer()
:
nameServer;
String
accessChannel
=
environment.resolvePlaceholders(annotation.accessChannel());
container.setNameServer(nameServer);
if
(!StringUtils.isEmpty(accessChannel))
{
container.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
//
此處已經(jīng)根據(jù)表達(dá)式將數(shù)據(jù)取出
String
tags
=
environment.resolvePlaceholders(annotation.selectorExpression());
if
(!StringUtils.isEmpty(tags))
{
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
//
此處將SelectorExpression的數(shù)據(jù)覆蓋成了表達(dá)式
container.setRocketMQMessageListener(annotation);
container.setRocketMQListener((RocketMQListener)bean);
container.setObjectMapper(objectMapper);
container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
container.setName(name);
//
REVIEW
ME,
use
the
same
clientId
or
multiple?
return
container;
}因?yàn)長(zhǎng)istenerContainerConfiguration類(lèi)是實(shí)現(xiàn)了SmartInitializingSingleton接口的afterSingletonsInstantiated方法,我們可以通過(guò)反射對(duì)selectorExpression的數(shù)據(jù)在ListenerContainerConfiguration進(jìn)行初始化前進(jìn)行解析并賦值回去。/**
*
在springboot初始化后,RocketMQ容器初始化前利用反射動(dòng)態(tài)改變數(shù)據(jù)
**/
@Configuration
public
class
ChangeSelectorExpressionBeforeMQInit
implements
InitializingBean
{
@Autowired
private
ApplicationContext
applicationContext;
@Autowired
private
StandardEnvironment
environment;
@Override
public
void
afterPropertiesSet()
throws
Exception
{
Map<String,Object>
beans
=applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
for
(Object
bean
:
beans.values()){
Class<?>
clazz
=
AopProxyUtils.ultimateTargetClass(bean);
if
(!RocketMQListener.class.isAssignableFrom(bean.getClass()))
{
continue;
}
RocketMQMessageListener
annotation
=
clazz.getAnnotation(RocketMQMess
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年巴音郭楞道路運(yùn)輸從業(yè)資格證考試內(nèi)容是什么
- 小學(xué)三年級(jí)100道口算題
- 2025年欽州貨運(yùn)上崗證模擬考試題
- 2025年荷澤貨運(yùn)從業(yè)資格證模擬考試駕考
- 華東師大版七年級(jí)數(shù)學(xué)上冊(cè)《第3章整式的加減3.1列代數(shù)式3.1.3列代數(shù)式 》聽(tīng)評(píng)課記錄
- 湘教版數(shù)學(xué)八年級(jí)下冊(cè)《2.2.1平行四邊形的邊、角性質(zhì)》聽(tīng)評(píng)課記錄
- 建筑項(xiàng)目經(jīng)理工作總結(jié)
- 初中理科教研組工作計(jì)劃
- 新學(xué)校校辦室工作計(jì)劃
- 平面設(shè)計(jì)師工作計(jì)劃范文欣賞
- 人教版小學(xué)語(yǔ)文1-6年級(jí)背誦內(nèi)容完整版
- 2023徐金桂“徐徐道來(lái)”(行政法知識(shí)點(diǎn))版
- 《事故汽車(chē)常用零部件修復(fù)與更換判別規(guī)范》
- 2024-2030年中國(guó)酒類(lèi)流通行業(yè)發(fā)展動(dòng)態(tài)及投資盈利預(yù)測(cè)研究報(bào)告
- 物業(yè)管理如何實(shí)現(xiàn)降本增效
- 信息科技重大版 七年級(jí)下冊(cè) 互聯(lián)網(wǎng)應(yīng)用與創(chuàng)新 第一單元單元教學(xué)設(shè)計(jì) 互聯(lián)網(wǎng)創(chuàng)新應(yīng)用
- 2024年興業(yè)銀行股份有限公司校園招聘考試試題及參考答案
- 2024智慧城市城市交通基礎(chǔ)設(shè)施智能監(jiān)測(cè)技術(shù)要求
- 湖北省崇陽(yáng)縣浪口溫泉地?zé)崽锏責(zé)豳Y源開(kāi)發(fā)利用與生態(tài)復(fù)綠方案
- 《工程建設(shè)標(biāo)準(zhǔn)強(qiáng)制性條文電力工程部分2023年版》
- CJT252-2011 城鎮(zhèn)排水水質(zhì)水量在線監(jiān)測(cè)系統(tǒng)技術(shù)要求
評(píng)論
0/150
提交評(píng)論