




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
消息隊(duì)列:ActiveMQ:ActiveMQ消息過濾與選擇1消息隊(duì)列基礎(chǔ)1.1ActiveMQ簡(jiǎn)介ActiveMQ是Apache出品的、采用Java語言編寫的、基于JMS1.1和J2EE1.4規(guī)范的完全支持JMSAPI的面向消息的中間件。ActiveMQ是一個(gè)非常活躍的開源項(xiàng)目,它提供了許多特性,包括持久化、事務(wù)、消息選擇、過濾和分發(fā)等。ActiveMQ支持多種傳輸協(xié)議,如AMQP、OpenWire、STOMP、MQTT等,這使得它能夠與各種不同的消息系統(tǒng)進(jìn)行互操作。1.2消息隊(duì)列的工作原理消息隊(duì)列是一種應(yīng)用程序間通信的機(jī)制,它允許消息的發(fā)送和接收在不同的時(shí)間點(diǎn)進(jìn)行。消息隊(duì)列的基本組件包括生產(chǎn)者(Producer)、消費(fèi)者(Consumer)和隊(duì)列(Queue)。生產(chǎn)者:負(fù)責(zé)創(chuàng)建消息并將其發(fā)送到隊(duì)列中。隊(duì)列:作為消息的存儲(chǔ)容器,可以持久化消息直到被消費(fèi)者處理。消費(fèi)者:從隊(duì)列中接收消息并進(jìn)行處理。消息隊(duì)列的工作流程如下:生產(chǎn)者將消息發(fā)送到隊(duì)列中。消費(fèi)者監(jiān)聽隊(duì)列,當(dāng)隊(duì)列中有消息時(shí),消費(fèi)者從隊(duì)列中取出消息并進(jìn)行處理。消費(fèi)者處理完消息后,通常會(huì)從隊(duì)列中移除該消息,以避免重復(fù)處理。1.2.1示例代碼:發(fā)送消息到隊(duì)列importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.MessageProducer;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnectionFactory;
publicclassJMSProducer{
publicstaticvoidmain(String[]args)throwsException{
//創(chuàng)建連接工廠
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory("tcp://localhost:61616");
//創(chuàng)建連接
Connectionconnection=connectionFactory.createConnection();
//啟動(dòng)連接
connection.start();
//創(chuàng)建會(huì)話
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列
Destinationdestination=session.createQueue("MyQueue");
//創(chuàng)建消息生產(chǎn)者
MessageProducerproducer=session.createProducer(destination);
//創(chuàng)建文本消息
TextMessagemessage=session.createTextMessage("Hello,ActiveMQ!");
//發(fā)送消息
producer.send(message);
//關(guān)閉資源
session.close();
connection.close();
}
}1.2.2示例代碼:從隊(duì)列接收消息importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.Message;
importjavax.jms.MessageConsumer;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnectionFactory;
publicclassJMSConsumer{
publicstaticvoidmain(String[]args)throwsException{
//創(chuàng)建連接工廠
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory("tcp://localhost:61616");
//創(chuàng)建連接
Connectionconnection=connectionFactory.createConnection();
//啟動(dòng)連接
connection.start();
//創(chuàng)建會(huì)話
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列
Destinationdestination=session.createQueue("MyQueue");
//創(chuàng)建消息消費(fèi)者
MessageConsumerconsumer=session.createConsumer(destination);
//接收消息
Messagemessage=consumer.receive();
//檢查消息類型并打印消息內(nèi)容
if(messageinstanceofTextMessage){
TextMessagetextMessage=(TextMessage)message;
System.out.println("Receivedmessage:"+textMessage.getText());
}
//關(guān)閉資源
session.close();
connection.close();
}
}1.3ActiveMQ的安裝與配置1.3.1安裝ActiveMQ下載ActiveMQ:從ApacheActiveMQ的官方網(wǎng)站下載最新版本的ActiveMQ。解壓:將下載的ActiveMQ壓縮包解壓到一個(gè)目錄中。啟動(dòng)ActiveMQ:在解壓后的目錄中,找到bin目錄,運(yùn)行activemq.bat(Windows)或activemq(Linux)腳本來啟動(dòng)ActiveMQ。1.3.2配置ActiveMQActiveMQ的配置主要通過conf/activemq.xml文件進(jìn)行。以下是一個(gè)基本的配置示例:<beansxmlns="/schema/beans"
xmlns:xsi="/2001/XMLSchema-instance"
xmlns:activemq="/schema/core"
xsi:schemaLocation="/schema/beans
/schema/beans/spring-beans-3.0.xsd
/schema/core
/schema/core/activemq-core.xsd">
<brokerxmlns="/schema/core"brokerName="localhost"dataDirectory="${activemq.data}">
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://localhost:61616"/>
</transportConnectors>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntryqueue=">"topic=">"/>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
</beans>在這個(gè)配置文件中,我們定義了一個(gè)名為localhost的Broker,它監(jiān)聽在tcp://localhost:61616上。destinationPolicy部分定義了隊(duì)列和主題的策略。1.3.3配置持久化ActiveMQ支持消息的持久化,這意味著即使Broker重啟,消息也不會(huì)丟失。持久化配置通常在activemq.xml文件中進(jìn)行,如下所示:<brokerxmlns="/schema/core"brokerName="localhost"dataDirectory="${activemq.data}">
<!--...-->
<persistenceAdapter>
<kahaDBdirectory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<!--...-->
</broker>在這個(gè)配置中,我們使用了kahaDB作為持久化適配器,它將消息存儲(chǔ)在kahadb目錄下。1.3.4配置消息過濾ActiveMQ支持消息過濾,這允許消費(fèi)者只接收滿足特定條件的消息。消息過濾是通過JMS的MessageSelector進(jìn)行的,它是一個(gè)SQL-92風(fēng)格的表達(dá)式,用于選擇消息。示例代碼:使用MessageSelector過濾消息importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.Message;
importjavax.jms.MessageConsumer;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importjavax.jms.MessageSelector;
importorg.apache.activemq.ActiveMQConnectionFactory;
publicclassJMSConsumerWithFilter{
publicstaticvoidmain(String[]args)throwsException{
//創(chuàng)建連接工廠
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory("tcp://localhost:61616");
//創(chuàng)建連接
Connectionconnection=connectionFactory.createConnection();
//啟動(dòng)連接
connection.start();
//創(chuàng)建會(huì)話
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列
Destinationdestination=session.createQueue("MyQueue");
//創(chuàng)建消息消費(fèi)者,并設(shè)置MessageSelector
MessageConsumerconsumer=session.createConsumer(destination,"property='value'");
//接收消息
Messagemessage=consumer.receive();
//檢查消息類型并打印消息內(nèi)容
if(messageinstanceofTextMessage){
TextMessagetextMessage=(TextMessage)message;
System.out.println("Receivedmessage:"+textMessage.getText());
}
//關(guān)閉資源
session.close();
connection.close();
}
}在這個(gè)示例中,我們創(chuàng)建了一個(gè)消費(fèi)者,并設(shè)置了一個(gè)MessageSelector,它只接收property屬性等于value的消息。1.3.5配置消息選擇除了過濾,ActiveMQ還支持消息的選擇。消費(fèi)者可以使用MessageSelector來選擇隊(duì)列中的特定消息進(jìn)行處理,而其他消息則保留在隊(duì)列中,直到被其他消費(fèi)者處理。示例代碼:選擇特定消息進(jìn)行處理importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.Message;
importjavax.jms.MessageConsumer;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importjavax.jms.MessageSelector;
importorg.apache.activemq.ActiveMQConnectionFactory;
publicclassJMSConsumerWithSelector{
publicstaticvoidmain(String[]args)throwsException{
//創(chuàng)建連接工廠
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory("tcp://localhost:61616");
//創(chuàng)建連接
Connectionconnection=connectionFactory.createConnection();
//啟動(dòng)連接
connection.start();
//創(chuàng)建會(huì)話
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列
Destinationdestination=session.createQueue("MyQueue");
//創(chuàng)建消息消費(fèi)者,并設(shè)置MessageSelector
MessageConsumerconsumer=session.createConsumer(destination,"property='value'");
//接收消息
Messagemessage=consumer.receive();
//檢查消息類型并打印消息內(nèi)容
if(messageinstanceofTextMessage){
TextMessagetextMessage=(TextMessage)message;
System.out.println("Receivedmessage:"+textMessage.getText());
}
//關(guān)閉資源
session.close();
connection.close();
}
}在這個(gè)示例中,我們使用了MessageSelector來選擇property屬性等于value的消息進(jìn)行處理。注意,這個(gè)示例與過濾示例的代碼幾乎相同,因?yàn)檫x擇和過濾在JMSAPI中使用相同的方法。通過以上介紹,我們了解了ActiveMQ的基本概念、工作原理以及如何進(jìn)行安裝和配置。同時(shí),我們也學(xué)習(xí)了如何在ActiveMQ中使用消息過濾和選擇,這對(duì)于構(gòu)建復(fù)雜的消息處理系統(tǒng)非常有用。2消息過濾與選擇2.1ActiveMQ的消息選擇器在ActiveMQ中,消息選擇器是一個(gè)強(qiáng)大的功能,允許消費(fèi)者根據(jù)消息的屬性和內(nèi)容來選擇接收哪些消息。這通過使用SELECTOR參數(shù)在Consumer創(chuàng)建時(shí)指定實(shí)現(xiàn)。選擇器語法基于SQL的WHERE子句,但進(jìn)行了簡(jiǎn)化,以適應(yīng)消息屬性的查詢。2.1.1原理消息選擇器的工作原理是基于消息的屬性和內(nèi)容進(jìn)行過濾。當(dāng)消息被發(fā)送到隊(duì)列或主題時(shí),它們可以攜帶各種屬性,如JMSMessageID、JMSType、JMSCorrelationID等,以及自定義的屬性。選擇器語法允許你指定一個(gè)條件,只有當(dāng)消息滿足這個(gè)條件時(shí),它才會(huì)被特定的消費(fèi)者接收。2.1.2示例假設(shè)我們有一個(gè)消息隊(duì)列,其中包含不同類型的消息,我們只對(duì)特定類型的消息感興趣。我們可以使用選擇器來過濾這些消息。importorg.apache.activemq.ActiveMQConnectionFactory;
importjavax.jms.*;
publicclassMessageSelectorExample{
privatestaticfinalStringURL="tcp://localhost:61616";
privatestaticfinalStringQUEUE="MyQueue";
publicstaticvoidmain(String[]args)throwsJMSException{
//創(chuàng)建連接工廠
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(URL);
//創(chuàng)建連接
Connectionconnection=connectionFactory.createConnection();
//啟動(dòng)連接
connection.start();
//創(chuàng)建會(huì)話
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列
Destinationdestination=session.createQueue(QUEUE);
//創(chuàng)建消息消費(fèi)者,并設(shè)置選擇器
MessageConsumerconsumer=session.createConsumer(destination,"JMSType='Alert'");
//接收消息
TextMessagemessage=(TextMessage)consumer.receive();
//輸出消息內(nèi)容
System.out.println("Receivedmessage:"+message.getText());
//關(guān)閉資源
consumer.close();
session.close();
connection.close();
}
}在這個(gè)例子中,我們創(chuàng)建了一個(gè)消費(fèi)者,它只接收J(rèn)MSType屬性為Alert的消息。這意味著,如果隊(duì)列中有多種類型的消息,只有那些類型為Alert的消息會(huì)被這個(gè)消費(fèi)者接收。2.2使用選擇器進(jìn)行消息過濾選擇器可以用于過濾消息,確保只有滿足特定條件的消息被處理。這在處理大量消息時(shí)特別有用,可以避免不必要的消息處理,提高系統(tǒng)的效率。2.2.1示例讓我們擴(kuò)展上面的例子,這次我們將使用更復(fù)雜的選擇器來過濾消息。importorg.apache.activemq.ActiveMQConnectionFactory;
importjavax.jms.*;
publicclassComplexSelectorExample{
privatestaticfinalStringURL="tcp://localhost:61616";
privatestaticfinalStringQUEUE="MyQueue";
publicstaticvoidmain(String[]args)throwsJMSException{
//創(chuàng)建連接工廠
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(URL);
//創(chuàng)建連接
Connectionconnection=connectionFactory.createConnection();
//啟動(dòng)連接
connection.start();
//創(chuàng)建會(huì)話
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列
Destinationdestination=session.createQueue(QUEUE);
//創(chuàng)建消息消費(fèi)者,并設(shè)置選擇器
MessageConsumerconsumer=session.createConsumer(destination,"JMSType='Alert'ANDJMSCorrelationID='123'");
//接收消息
TextMessagemessage=(TextMessage)consumer.receive();
//輸出消息內(nèi)容
if(message!=null){
System.out.println("Receivedmessage:"+message.getText());
}else{
System.out.println("Nomessagematchedtheselector.");
}
//關(guān)閉資源
consumer.close();
session.close();
connection.close();
}
}在這個(gè)例子中,我們使用了一個(gè)復(fù)合選擇器,它不僅檢查JMSType屬性,還檢查JMSCorrelationID屬性。這意味著,只有當(dāng)消息的JMSType為Alert且JMSCorrelationID為123時(shí),消息才會(huì)被接收。2.3消息過濾的高級(jí)用法ActiveMQ的選擇器支持更復(fù)雜的過濾邏輯,包括邏輯運(yùn)算符(AND、OR、NOT)和比較運(yùn)算符(=、!=、<、>、<=、>=)。此外,你還可以使用通配符(*、?)和正則表達(dá)式來進(jìn)行更靈活的過濾。2.3.1示例假設(shè)我們想要接收所有類型為Alert或Warning的消息,我們可以使用OR運(yùn)算符來實(shí)現(xiàn)。importorg.apache.activemq.ActiveMQConnectionFactory;
importjavax.jms.*;
publicclassORSelectorExample{
privatestaticfinalStringURL="tcp://localhost:61616";
privatestaticfinalStringQUEUE="MyQueue";
publicstaticvoidmain(String[]args)throwsJMSException{
//創(chuàng)建連接工廠
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(URL);
//創(chuàng)建連接
Connectionconnection=connectionFactory.createConnection();
//啟動(dòng)連接
connection.start();
//創(chuàng)建會(huì)話
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列
Destinationdestination=session.createQueue(QUEUE);
//創(chuàng)建消息消費(fèi)者,并設(shè)置選擇器
MessageConsumerconsumer=session.createConsumer(destination,"JMSType='Alert'ORJMSType='Warning'");
//接收消息
TextMessagemessage=(TextMessage)consumer.receive();
//輸出消息內(nèi)容
if(message!=null){
System.out.println("Receivedmessage:"+message.getText());
}else{
System.out.println("Nomessagematchedtheselector.");
}
//關(guān)閉資源
consumer.close();
session.close();
connection.close();
}
}在這個(gè)例子中,我們使用了OR運(yùn)算符來接收類型為Alert或Warning的消息。這使得我們的消費(fèi)者可以處理更廣泛的消息類型,只要它們滿足選擇器中的任一條件。2.3.2使用正則表達(dá)式正則表達(dá)式可以用于更復(fù)雜的過濾,例如,我們想要接收所有以Alert開頭的消息類型。importorg.apache.activemq.ActiveMQConnectionFactory;
importjavax.jms.*;
publicclassRegexSelectorExample{
privatestaticfinalStringURL="tcp://localhost:61616";
privatestaticfinalStringQUEUE="MyQueue";
publicstaticvoidmain(String[]args)throwsJMSException{
//創(chuàng)建連接工廠
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(URL);
//創(chuàng)建連接
Connectionconnection=connectionFactory.createConnection();
//啟動(dòng)連接
connection.start();
//創(chuàng)建會(huì)話
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列
Destinationdestination=session.createQueue(QUEUE);
//創(chuàng)建消息消費(fèi)者,并設(shè)置選擇器
MessageConsumerconsumer=session.createConsumer(destination,"JMSTypeLIKE'Alert%'");
//接收消息
TextMessagemessage=(TextMessage)consumer.receive();
//輸出消息內(nèi)容
if(message!=null){
System.out.println("Receivedmessage:"+message.getText());
}else{
System.out.println("Nomessagematchedtheselector.");
}
//關(guān)閉資源
consumer.close();
session.close();
connection.close();
}
}在這個(gè)例子中,我們使用了LIKE運(yùn)算符和正則表達(dá)式'Alert%'來接收所有以Alert開頭的消息類型。這提供了一種更靈活的方式來過濾消息,特別是當(dāng)消息類型可能有變化時(shí)。通過這些例子,我們可以看到ActiveMQ的消息選擇器是一個(gè)非常強(qiáng)大的工具,可以用于精確控制哪些消息被消費(fèi)者接收。這不僅可以提高系統(tǒng)的效率,還可以幫助我們更好地管理和處理消息隊(duì)列中的消息。3實(shí)踐操作3.1配置消息過濾規(guī)則在ActiveMQ中,消息過濾是一個(gè)關(guān)鍵特性,它允許消費(fèi)者根據(jù)消息的屬性或內(nèi)容選擇接收哪些消息。這通過使用選擇器(selector)實(shí)現(xiàn),選擇器是一個(gè)基于消息屬性的SQL-92風(fēng)格的表達(dá)式。例如,如果一個(gè)消息隊(duì)列中的消息包含一個(gè)名為priority的屬性,你可以配置一個(gè)選擇器來只接收priority屬性值為high的消息。3.1.1示例代碼假設(shè)我們有一個(gè)消息隊(duì)列,其中消息包含priority和type兩個(gè)屬性。下面是如何配置選擇器來過濾消息的示例://創(chuàng)建一個(gè)ActiveMQConnectionFactory實(shí)例
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory("tcp://localhost:61616");
//創(chuàng)建一個(gè)Connection實(shí)例
Connectionconnection=connectionFactory.createConnection();
//啟動(dòng)連接
connection.start();
//創(chuàng)建一個(gè)Session實(shí)例
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建一個(gè)Destination實(shí)例,這里假設(shè)是一個(gè)隊(duì)列
Destinationdestination=session.createQueue("MyQueue");
//創(chuàng)建一個(gè)MessageConsumer實(shí)例,并設(shè)置選擇器
MessageConsumerconsumer=session.createConsumer(destination,"priority='high'ANDtype='urgent'");
//接收消息
Messagemessage=consumer.receive();
//關(guān)閉資源
consumer.close();
session.close();
connection.close();3.1.2解釋在上述代碼中,我們創(chuàng)建了一個(gè)MessageConsumer實(shí)例,并通過createConsumer方法的第二個(gè)參數(shù)設(shè)置了選擇器。選擇器priority='high'ANDtype='urgent'表示只接收priority屬性為high且type屬性為urgent的消息。3.2編寫消費(fèi)者以應(yīng)用選擇器為了應(yīng)用選擇器,消費(fèi)者在創(chuàng)建時(shí)需要指定一個(gè)選擇器表達(dá)式。這使得消費(fèi)者能夠根據(jù)特定條件過濾消息,從而實(shí)現(xiàn)更高效和更精確的消息處理。3.2.1示例代碼下面是一個(gè)使用選擇器的消費(fèi)者示例,它將只接收特定類型的消息://創(chuàng)建一個(gè)ActiveMQConnectionFactory實(shí)例
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory("tcp://localhost:61616");
//創(chuàng)建一個(gè)Connection實(shí)例
Connectionconnection=connectionFactory.createConnection();
//啟動(dòng)連接
connection.start();
//創(chuàng)建一個(gè)Session實(shí)例
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建一個(gè)Destination實(shí)例,這里假設(shè)是一個(gè)隊(duì)列
Destinationdestination=session.createQueue("MyQueue");
//創(chuàng)建一個(gè)MessageConsumer實(shí)例,并設(shè)置選擇器
MessageConsumerconsumer=session.createConsumer(destination,"type='notification'");
//定義一個(gè)消息監(jiān)聽器
consumer.setMessageListener(newMessageListener(){
publicvoidonMessage(Messagemessage){
if(messageinstanceofTextMessage){
TextMessagetextMessage=(TextMessage)message;
try{
System.out.println("Receivedmessage:"+textMessage.getText());
}catch(JMSExceptione){
e.printStackTrace();
}
}
}
});
//保持連接打開,以便接收消息
//在實(shí)際應(yīng)用中,你可能需要一個(gè)循環(huán)或線程來持續(xù)監(jiān)聽消息
//這里為了示例簡(jiǎn)單,我們假設(shè)消息監(jiān)聽器會(huì)自動(dòng)處理所有到達(dá)的消息
//關(guān)閉資源
//注意:在使用MessageListener時(shí),通常不需要顯式關(guān)閉consumer,session和connection
//但為了資源管理,這里還是展示了關(guān)閉的代碼
session.close();
connection.close();3.2.2解釋在這個(gè)示例中,我們創(chuàng)建了一個(gè)MessageConsumer并設(shè)置了一個(gè)選擇器type='notification'。這意味著消費(fèi)者將只接收type屬性為notification的消息。我們還定義了一個(gè)MessageListener,它將自動(dòng)處理接收到的消息,打印出消息的文本內(nèi)容。3.3測(cè)試消息過濾與選擇測(cè)試消息過濾和選擇的正確性是確保消息隊(duì)列按預(yù)期工作的重要步驟。你可以通過發(fā)送不同類型和屬性的消息到隊(duì)列,然后使用具有特定選擇器的消費(fèi)者來驗(yàn)證是否只接收到了符合條件的消息。3.3.1示例代碼下面是一個(gè)發(fā)送和接收消息的測(cè)試示例,用于驗(yàn)證選擇器是否正確工作://創(chuàng)建一個(gè)ActiveMQConnectionFactory實(shí)例
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory("tcp://localhost:61616");
//創(chuàng)建一個(gè)Connection實(shí)例
Connectionconnection=connectionFactory.createConnection();
//啟動(dòng)連接
connection.start();
//創(chuàng)建一個(gè)Session實(shí)例
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建一個(gè)Destination實(shí)例,這里假設(shè)是一個(gè)隊(duì)列
Destinationdestination=session.createQueue("MyQueue");
//創(chuàng)建一個(gè)MessageProducer實(shí)例
MessageProducerproducer=session.createProducer(destination);
//創(chuàng)建并發(fā)送消息
for(inti=0;i<10;i++){
TextMessagemessage=session.createTextMessage("Message"+i);
if(i%2==0){
message.setStringProperty("type","notification");
}else{
message.setStringProperty("type","information");
}
producer.send(message);
}
//創(chuàng)建一個(gè)MessageConsumer實(shí)例,并設(shè)置選擇器
MessageConsumerconsumer=session.createConsumer(destination,"type='notification'");
//接收并處理消息
for(inti=0;i<5;i++){
TextMessagetextMessage=(TextMessage)consumer.receive();
try{
System.out.println("Receivedmessage:"+textMessage.getText());
}catch(JMSExceptione){
e.printStackTrace();
}
}
//關(guān)閉資源
consumer.close();
session.close();
connection.close();3.3.2解釋在這個(gè)測(cè)試示例中,我們首先創(chuàng)建了一個(gè)MessageProducer并發(fā)送了10條消息到隊(duì)列,其中一半的消息type屬性被設(shè)置為notification,另一半為information。然后,我們創(chuàng)建了一個(gè)MessageConsumer并設(shè)置了選擇器type='notification',這意味著消費(fèi)者將只接收type屬性為notification的消息。通過接收并打印出消息,我們可以驗(yàn)證選擇器是否正確地過濾了消息。通過這些實(shí)踐操作,你可以有效地在ActiveMQ中配置和測(cè)試消息過濾規(guī)則,確保消息隊(duì)列能夠根據(jù)你的需求精確地分發(fā)消息。4案例分析4.1基于內(nèi)容的路由案例在ActiveMQ中,基于內(nèi)容的路由是一種高級(jí)功能,允許消息根據(jù)其內(nèi)容被路由到不同的目的地。這在需要根據(jù)消息的具體內(nèi)容進(jìn)行處理的場(chǎng)景中非常有用,例如,將訂單消息路由到不同的隊(duì)列,基于訂單的類型或金額。4.1.1實(shí)現(xiàn)原理基于內(nèi)容的路由主要依賴于ActiveMQ的ContentBasedRouter插件和MessageSelector。ContentBasedRouter插件可以根據(jù)消息的屬性或內(nèi)容將消息路由到不同的目的地。MessageSelector則是在消息消費(fèi)者中使用,允許消費(fèi)者只接收滿足特定條件的消息。4.1.2示例代碼下面是一個(gè)使用Java和ActiveMQ實(shí)現(xiàn)基于內(nèi)容的路由的示例。我們將創(chuàng)建一個(gè)生產(chǎn)者,發(fā)送不同類型的消息,然后創(chuàng)建兩個(gè)消費(fèi)者,一個(gè)只接收類型為order的消息,另一個(gè)只接收類型為invoice的消息。importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.JMSException;
importjavax.jms.MessageConsumer;
importjavax.jms.MessageProducer;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnectionFactory;
publicclassContentBasedRoutingExample{
publicstaticvoidmain(String[]args)throwsJMSException{
//創(chuàng)建連接工廠
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory("tcp://localhost:61616");
Connectionconnection=connectionFactory.createConnection();
connection.start();
//創(chuàng)建會(huì)話
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建目的地
Destinationdestination=session.createQueue("contentBasedRoutingQueue");
//創(chuàng)建消息生產(chǎn)者
MessageProducerproducer=session.createProducer(destination);
//發(fā)送不同類型的消息
TextMessageorderMessage=session.createTextMessage("Thisisanordermessage.");
orderMessage.setStringProperty("type","order");
producer.send(orderMessage);
TextMessageinvoiceMessage=session.createTextMessage("Thisisaninvoicemessage.");
invoiceMessage.setStringProperty("type","invoice");
producer.send(invoiceMessage);
//創(chuàng)建消息消費(fèi)者
MessageConsumerorderConsumer=session.createConsumer(destination,"type='order'");
MessageConsumerinvoiceConsumer=session.createConsumer(destination,"type='invoice'");
//接收并處理消息
TextMessagereceivedOrderMessage=(TextMessage)orderConsumer.receive();
System.out.println("OrderConsumerreceived:"+receivedOrderMessage.getText());
TextMessagereceivedInvoiceMessage=(TextMessage)invoiceConsumer.receive();
System.out.println("InvoiceConsumerreceived:"+receivedInvoiceMessage.getText());
//關(guān)閉資源
orderConsumer.close();
invoiceConsumer.close();
session.close();
connection.close();
}
}4.1.3解釋在上述代碼中,我們首先創(chuàng)建了一個(gè)連接到ActiveMQ的ConnectionFactory,然后使用它創(chuàng)建了一個(gè)Connection。接著,我們創(chuàng)建了一個(gè)Session,并使用它創(chuàng)建了一個(gè)隊(duì)列Destination。我們創(chuàng)建了兩個(gè)TextMessage,一個(gè)標(biāo)記為order,另一個(gè)標(biāo)記為invoice,并使用MessageProducer將它們發(fā)送到隊(duì)列。然后,我們創(chuàng)建了兩個(gè)MessageConsumer,每個(gè)消費(fèi)者都有一個(gè)特定的MessageSelector,這使得orderConsumer只接收類型為order的消息,而invoiceConsumer只接收類型為invoice的消息。最后,我們接收并處理了這些消息,然后關(guān)閉了所有資源。4.2性能優(yōu)化與消息過濾在處理大量消息時(shí),性能優(yōu)化和有效的消息過濾變得至關(guān)重要。ActiveMQ提供了多種工具和策略來優(yōu)化性能,同時(shí)確保消息被正確地過濾和處理。4.2.1實(shí)現(xiàn)原理性能優(yōu)化主要涉及以下方面:-消息持久化:通過調(diào)整消息持久化策略,可以減少磁盤I/O操作,從而提高性能。-消息選擇器:使用MessageSelector可以減少不必要的消息處理,只處理符合特定條件的消息。-批量處理:通過批量發(fā)送和接收消息,可以減少網(wǎng)絡(luò)通信的開銷。-預(yù)取策略:調(diào)整預(yù)取策略可以控制消費(fèi)者從隊(duì)列中預(yù)取消息的數(shù)量,從而避免內(nèi)存溢出。4.2.2示例代碼下面是一個(gè)使用Java和ActiveMQ進(jìn)行性能優(yōu)化和消息過濾的示例。我們將創(chuàng)建一個(gè)生產(chǎn)者,批量發(fā)送大量消息,然后創(chuàng)建一個(gè)消費(fèi)者,使用MessageSelector過濾消息。importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.JMSException;
importjavax.jms.MessageConsumer;
importjavax.jms.MessageProducer;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnectionFactory;
publicclassPerformanceOptimizationExample{
publicstaticvoidmain(String[]args)throwsJMSException{
//創(chuàng)建連接工廠
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory("tcp://localhost:61616");
Connectionconnection=connectionFactory.createConnection();
connection.start();
//創(chuàng)建會(huì)話
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建目的地
Destinationdestination=session.createQueue("performanceOptimizationQueue");
//創(chuàng)建消息生產(chǎn)者
MessageProducerproducer=session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//非持久化消息,提高性能
//批量發(fā)送消息
for(inti=0;i<10000;i++){
TextMessagemessage=session.createTextMessage("Message"+i);
message.setIntProperty("priority",i%10);//設(shè)置消息優(yōu)先級(jí)
producer.send(message);
}
//創(chuàng)建消息消費(fèi)者
MessageConsumerconsumer=session.createConsumer(destination,"priority>5");
//接收并處理消息
for(inti=0;i<10000;i++){
TextMessagereceivedMessage=(TextMessage)consumer.receive();
if(receivedMessage!=null){
System.out.println("Consumerreceived:"+receivedMessage.getText());
}
}
//關(guān)閉資源
consumer.close();
session.close();
connection.close();
}
}4.2.3解釋在上述代碼中,我們創(chuàng)建了一個(gè)ConnectionFactory和Connection,然后創(chuàng)建了一個(gè)Session和一個(gè)隊(duì)列Destination。我們創(chuàng)建了一個(gè)MessageProducer,并將其DeliveryMode設(shè)置為NON_PERSISTENT,這意味著消息不會(huì)被持久化到磁盤,從而提高了發(fā)送速度。我們批量發(fā)送了10000條消息,每條消息都有一個(gè)優(yōu)先級(jí)屬性,范圍從0到9。然后,我們創(chuàng)建了一個(gè)MessageConsumer,并使用MessageSelector過濾優(yōu)先級(jí)大于5的消息。最后,我們接收并處理了這些消息,然后關(guān)閉了所有資源。4.3錯(cuò)誤處理與消息選擇在消息隊(duì)列中,錯(cuò)誤處理和消息選擇是確保系統(tǒng)穩(wěn)定性和消息正確處理的關(guān)鍵。ActiveMQ提供了多種機(jī)制來處理錯(cuò)誤和選擇消息進(jìn)行重試。4.3.1實(shí)現(xiàn)原理錯(cuò)誤處理主要涉及以下方面:-消息重試:當(dāng)消息處理失敗時(shí),可以配置ActiveMQ使其自動(dòng)重試消息處理。-死信隊(duì)列:如果消息多次處理失敗,可以將其移動(dòng)到死信隊(duì)列,以便后續(xù)分析和處理。-消息選擇器:使用MessageSelector可以確保只有特定類型的消息被處理,從而避免錯(cuò)誤處理中的不必要操作。4.3.2示例代碼下面是一個(gè)使用Java和ActiveMQ進(jìn)行錯(cuò)誤處理和消息選擇的示例。我們將創(chuàng)建一個(gè)生產(chǎn)者,發(fā)送帶有錯(cuò)誤處理屬性的消息,然后創(chuàng)建一個(gè)消費(fèi)者,使用MessageSelector選擇消息,并處理可能的錯(cuò)誤。importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.JMSException;
importjavax.jms.MessageConsumer;
importjavax.jms.MessageProducer;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnectionFactory;
publicclassErrorHandlingExample{
publicstaticvoidmain(String[]args)throwsJMSException{
//創(chuàng)建連接工廠
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory("tcp://localhost:61616");
Connectionconnection=connectionFactory.createConnection();
connection.start();
//創(chuàng)建會(huì)話
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建目的地
Destinationdestination=session.createQueue("errorHandlingQueue");
//創(chuàng)建消息生產(chǎn)者
MessageProducerproducer=session.createProducer(destination);
//發(fā)送帶有錯(cuò)誤處理屬性的消息
TextMessagemessage=session.createTextMessage("Thisisamessagethatmightfail.");
message.setIntProperty("retryCount",3);//設(shè)置重試次數(shù)
producer.send(message);
//創(chuàng)建消息消費(fèi)者
MessageConsumerconsumer=session.createConsumer(destination,"retryCount>0");
//接收并處理消息
TextMessagereceivedMessage=(TextMessage)consumer.receive();
if(receivedMessage!=null){
intretryCount=receivedMessage.getIntProperty("retryCount");
if(retryCount>0){
System.out.println("Consumerreceived:"+receiv
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025-2030中國(guó)瓶裝水市場(chǎng)發(fā)展分析及市場(chǎng)趨勢(shì)與投資方向研究報(bào)告
- 三年級(jí)健康教育實(shí)踐計(jì)劃
- 2025-2030中國(guó)濕紙巾行業(yè)發(fā)展分析及投資前景與戰(zhàn)略規(guī)劃研究報(bào)告
- 2025-2030中國(guó)消防安裝和維保經(jīng)營(yíng)發(fā)展策略分析及未來發(fā)展趨勢(shì)
- 梁廷枏經(jīng)世思想研究
- 2025-2030中國(guó)榴蓮酥行業(yè)發(fā)展現(xiàn)狀與銷售模式分析研究報(bào)告
- 中小學(xué)教學(xué)督導(dǎo)崗位職責(zé)
- 2025年大學(xué)輔導(dǎo)員招聘考試題庫(kù):學(xué)生心理健康測(cè)評(píng)案例分析試題解析
- 皮膚外科術(shù)后護(hù)理指南
- 2025年廚師職業(yè)技能鑒定高級(jí)試卷:烹飪?cè)喜少?gòu)與儲(chǔ)存高級(jí)實(shí)操試題
- NY∕T 309-1996 全國(guó)耕地類型區(qū)、耕地地力等級(jí)劃分
- 濃縮機(jī)的選擇與計(jì)算
- 滬教版六年級(jí)下冊(cè)單詞表
- 地基基礎(chǔ)軟弱下臥層驗(yàn)算計(jì)算表格
- 最新投標(biāo)書密封條
- SAPFI清賬接口和部分清賬接口例子
- TWI之工作改善JM精講
- 聚酯裝置流程與聚酯生產(chǎn)概述
- 鄉(xiāng)鎮(zhèn)綜治中心管理考核辦法(試行)
- BIM培訓(xùn)計(jì)劃Revit 培訓(xùn)計(jì)劃
- 中考英語常用特殊疑問句總結(jié)
評(píng)論
0/150
提交評(píng)論