文章出處

------------------------------------------------

開發一個JMS的基本步驟如下:

  1.創建一個JMS connection factory

  2.通過connection factory來創建JMS connection

  3.啟動JMS connection

  4.通過connection創建JMS session

  5.創建JMS destination

  6.創建JMS producer 或者創建JMS message,并設置destination

  7.創建JMS consumer 或者注冊一個JMS message listener

  8.發送或者接受JMS message

  9.關閉所有的JMS資源(connection、session、producer、consumer等)

可以參考下圖:

 

 

非持久的Topic消息示例

  對于非持久化的消息,當發送方發送消息的時候:

    如果接收方不在線,則接收方永遠也收不到這些消息了

    如果接收方在線,則接收方會收到這些消息

1、消息發送程序

 1 import javax.jms.Connection;
 2 import javax.jms.ConnectionFactory;
 3 import javax.jms.Destination;
 4 import javax.jms.MessageProducer;
 5 import javax.jms.Session;
 6 import javax.jms.TextMessage;
 7 
 8 import org.apache.activemq.ActiveMQConnectionFactory;
 9 
10 /**
11  * 非持久化Topic消息發送者
12  * @author Administrator
13  *
14  */
15 public class NoPersistenceSender {
16     public static void main(String[] args) throws Exception {
17         //創建一個JMS connection factory
18         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616");
19         //通過connection factory來創建JMS connection
20         Connection connection = connectionFactory.createConnection();
21         //啟動JMS connection
22         connection.start();
23         //通過connection創建JMS session
24         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
25         //創建JMS destination
26         Destination destination = session.createTopic("noPersistenceTopic");
27         //創建JMS producer
28         MessageProducer producer = session.createProducer(destination);
29         
30         for(int i = 0;i < 10;i++){
31             TextMessage message = session.createTextMessage("message-"+i);
32             //發送message
33             producer.send(message);
34         }
35         //關閉所有的JMS資源
36         session.commit();
37         session.close();
38         connection.close();
39     }
40 }

運行完消息發送程序后,可以訪問192.168.1.81:8161

 

2、消息接收程序

  對于非持久的Topic消息的接收需要注意以下幾點:

    a.接收程序必須在線,然后消息發送方再發送消息,接收程序才能接收到消息

    b.由于不知道消息發送方要發送多少條消息,所以利用while循環的方式來接收消息

    c.如果接收程序不在線,此時發送程序發送了消息的話,則該消息將永遠不會被接收方收到。

 1 import javax.jms.Connection;
 2 import javax.jms.ConnectionFactory;
 3 import javax.jms.Destination;
 4 import javax.jms.Message;
 5 import javax.jms.MessageConsumer;
 6 import javax.jms.Session;
 7 import javax.jms.TextMessage;
 8 
 9 import org.apache.activemq.ActiveMQConnectionFactory;
10 
11 /**
12  * 非持久化Topic消息接收者
13  * @author Administrator
14  *
15  */
16 public class NoPersistenceReceiver {
17     public static void main(String[] args) throws Exception {
18         //創建一個JMS connection factory
19         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616");
20         //通過connection factory來創建JMS connection
21         Connection connection = connectionFactory.createConnection();
22         //啟動JMS connection
23         connection.start();
24         //通過connection創建JMS session
25         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
26         //創建JMS destination
27         Destination destination = session.createTopic("noPersistenceTopic");
28         //創建JMS consumer
29         MessageConsumer consumer = session.createConsumer(destination);
30         
31         Message message = consumer.receive();
32         while(message != null){
33             TextMessage txtMsg = (TextMessage)message;
34             System.out.println("收到消息:"+txtMsg.getText());
35             message = consumer.receive();
36         }
37         //關閉所有的JMS資源
38         session.commit();
39         session.close();
40         connection.close();
41     }
42 }

 運行結果:

持久的Topic消息示例

 1.消息發送程序

  對于持久的Topic消息的發送方需要注意以下幾點:

    a.要用持久化訂閱,發送消息者要用DeliveryMode.PERSISTENT模式來發送

 1 import javax.jms.Connection;
 2 import javax.jms.ConnectionFactory;
 3 import javax.jms.DeliveryMode;
 4 import javax.jms.Destination;
 5 import javax.jms.MessageProducer;
 6 import javax.jms.Session;
 7 import javax.jms.TextMessage;
 8 
 9 import org.apache.activemq.ActiveMQConnectionFactory;
10 
11 /**
12  * 持久化Topic消息發送者
13  * @author Administrator
14  */
15 public class PersistenceSender {
16     public static void main(String[] args) throws Exception {
17         //創建一個JMS connection factory
18         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616");
19         //通過connection factory來創建JMS connection
20         Connection connection = connectionFactory.createConnection();
21         //通過connection創建JMS session
22         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
23         //創建JMS destination
24         Destination destination = session.createTopic("PersistenceTopic");
25         //創建JMS producer
26         MessageProducer producer = session.createProducer(destination);
27        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
28         //啟動JMS connection
29         connection.start();
30         for(int i = 0;i < 10;i++){
31             TextMessage message = session.createTextMessage("message-"+i);
32             //發送message
33             producer.send(message);
34         }
35         //關閉所有的JMS資源
36         session.commit();
37         session.close();
38         connection.close();
39     }
40 }

 2.消息接收程序

  對于持久的Topic消息的接收方需要注意以下幾點:

    a.需要在連接上設置消費者id,用來識別消費者

    b.需要創建TopicSubscriber來訂閱

    c.一定要先運行一次該消費者程序,等于向消費服務中間件注冊這個消費者,然后再運行消息發送者來發送消息,這樣的話,無論消費者是否在線都會收到消息,如果不在線的話,則下次連接的時候會把沒有收過的消息都接收下來。

 1 import javax.jms.Connection;
 2 import javax.jms.ConnectionFactory;
 3 import javax.jms.Message;
 4 import javax.jms.Session;
 5 import javax.jms.TextMessage;
 6 import javax.jms.Topic;
 7 import javax.jms.TopicSubscriber;
 8 
 9 import org.apache.activemq.ActiveMQConnectionFactory;
10 
11 /**
12  * 持久化Topic消息接收者
13  * @author Administrator
14  *
15  */
16 public class PersistenceReceiver {
17     public static void main(String[] args) throws Exception {
18         //創建一個JMS connection factory
19         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://120.76.123.81:61616");
20         //通過connection factory來創建JMS connection
21         Connection connection = connectionFactory.createConnection();
22         connection.setClientID("con1");
23         //通過connection創建JMS session
24         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
25         //創建JMS destination
26         Topic destination = session.createTopic("PersistenceTopic");
27         //創建JMS consumer
28         TopicSubscriber ts = session.createDurableSubscriber(destination, "TT");
29         //啟動JMS connection
30         connection.start();
31         Message message = ts.receive();
32         while(message != null){
33             TextMessage txtMsg = (TextMessage)message;
34             session.commit();
35             System.out.println("收到消息:"+txtMsg.getText());
36             message = ts.receive(1000L);
37         }
38         //關閉所有的JMS資源
39         session.close();
40         connection.close();
41     }
42 }

關于持久化和非持久化消息

有兩種方式指定傳送模式:

  1.使用setDeliveryMode方法,這樣所有的消息都采用此傳送模式;如producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

  2.使用send方法為每條消息設置傳送模式

持久化消息

  這是ActiveMQ的默認傳送模式,此模式保證這些消息只被傳送一次和成功使用一次。對于這些消息,可靠性是優先考慮的因素。可靠性的另一個重要方面是確保持久性消息傳送至目標后,消息服務在向消費者傳送它們之前不會丟失這些消息。

  這意味著在持久性消息傳送至目標時,消息服務將其放入持久性數據存儲。如果消息服務由于某種原因導致失敗,它可以恢復此消息并將此消息傳送至相應的消費者。雖然這樣增加了消息傳送的開銷,但是卻增加了可靠性。

非持久化消息

  保證這些消息最多被傳送一次。對于這些消息,可靠性并非主要的考慮因素。此模式并不要求持久性的數據存儲,也不保證消息服務由于某種原因導致失敗后消息不會丟失。

  

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 AutoPoster 的頭像
    AutoPoster

    互聯網 - 大數據

    AutoPoster 發表在 痞客邦 留言(0) 人氣()