AcitveMQ編程實踐

jopen 12年前發布 | 2K 次閱讀 Open Camera

本文主要介紹了ActiveMQ的編程模型,常用的類。以及一個通用的MQ編程模式。方便初學者快速掌握 ActiveMQ的編程方法。

一、             開發 JMS的步驟

一個 JMS 應用是幾個 JMS 客戶端交換消息,開發 JMS 客戶端應用由以下幾步構成:

1、用 JNDI 得到 ConnectionFactory 對象;

2、用 ConnectionFactory 創建 Connection 對象;

3、用 Connection 對象創建一個或多個 JMS Session

4、用 JNDI 得到目標隊列或主題對象,即 Destination 對象;

5、用 Session Destination 創建 MessageProducer MessageConsumer

6、通知 Connection 開始傳送消息。

 

二、             編程模型

1、  ConnectionFactory

要初始化 JMS,則需要使用連接工廠。客戶端通過創建 ConnectionFactory建立到 ActveMQ的連接,一個連接工廠封裝了一組連接配置參數,這組參數在配置ActiveMQ時已經定義,例如brokerURL參數,此參數傳入的是ActiveMQ服務地址和端口,支持openwire協議的默認連接為 tcp://localhost:61616,支持 stomp協議的默認連接為 tcp://localhost:61613

ActiveMQConnectionFactory構造方法:

ActiveMQConnectionFactory();

ActiveMQConnectionFactory(String brokerURL);

ActiveMQConnectionFactory(String userName, String password, String b rokerURL) ;

ActiveMQConnectionFactory(String userName, String password, URI brok erURL) ;

ActiveMQConnectionFactory(URI brokerURL);

其中 brokerURLActiveMQ服務地址和端口。

例如:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192. 168.0.135:61616");

或者

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

connectionFactory. setBrokerURL("tcp://192.168.0.135:61616");

 

</td> </tr> </tbody> </table>

 

2Connection

在成功創建正確的ConnectionFactory后,下一步將是創建一個連接,它是JMS定義的一個接口。ConnectionFactory負責返回可以與底層消息傳遞系統進行通信的Connection實現。通常客戶端只使用單一連接。根據JMS文檔,Connection的目的是“利用 JMS提供者封裝開放的連接”,以及表示“客戶端與提供者服務例程之間的開放TCP /IP套接字”。該文檔還指出 Connection應該是進行客戶端身份驗證的地方,除了其他一些事項外,客戶端還可以指定惟一標志符。

當一個Connection被創建時,它的傳輸默認是關閉的,必須使用start方法開啟。一個Connection可以建立一個或多個的Session。當一個程序執行完成后,必須關閉之前創建的Connection,否則 ActiveMQ不能釋放資源,關閉一個Connection同樣也關閉了 SessionMessageProducerMessageConsumer

Connection支持并發。

2.1、創建Connection

  • ActiveMQConnectionFactory方法:

    Connection createConnection()

    Connection createConnection(String userName, String password);

    </td> </tr> </tbody> </table>

     

    2.2、開啟 Connection

  • void start();

    如:connection.start();

    </td> </tr> </tbody> </table>

     

    2.3關閉 Connection

    void close();

    如:connection.close();

    </td> </tr> </tbody> </table>

     

    3、                Session

    一旦從ConnectionFactory中獲得一個Connection,就必須從Connection中創建一個或者多個SessionSession是一個發送或接收消息的線程,可以使用Session創建 MessageProducerMessageConsumerMessage

    Session可以被事務化,也可以不被事務化。通常可以通過向Connection上的適當創建方法傳遞一個布爾參數對此進行設置。

    Session createSession(boolean transacted, int acknowledgeMode);

    其中transacted為使用事務標識,acknowledgeMode為簽收模式。

    如:Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    </td> </tr> </tbody> </table>

     

    4、                Destination

    Destination是一個客戶端用來指定生產消息目標和消費消息來源的對象。

    PTP模式中,Destination被稱作Queue即隊列;在Pub/Sub模式,Destination被稱作Topic即主題。在程序中可以使用多個QueueTopic

    Queue createQueue(String queueName);

    TemporaryQueue createTemporaryQueue();

    Topic createTopic(String topicName);

    TemporaryTopic createTemporaryTopic();

    如: Destination destination = session.createQueue("TEST.FOO");

    </td> </tr> </tbody> </table>

     

    5、                MessageProducer

    MessageProducer是一個由Session創建的對象,用來向Destination發送消息。

    5.1、創建MessageProducer

  • MessageProducer createProducer(Destination destination);

    如:MessageProducer producer = session.createProducer(destination);

    </td> </tr> </tbody> </table>

     

    5.2 發送消息

    void send(Destination destination, Message message);

    void send(Destination destination, Message message, int deliveryMode, in tpriority, long timeToLive);

    void send(Message message);

    void send(Message message, int deliveryMode, int priority, long timeToLive);

    其中deliveryMode為傳送模式,priority為消息優先級,timeToLive為消息過期時間。

    如:producer.send(message);

    </td> </tr> </tbody> </table>

     

    6、                MessageConsumer

    MessageConsumer是一個由Session創建的對象,用來從Destination接收消息。

    6.1、創建MessageConsumer

  • MessageConsumer createConsumer(Destination destination);

    MessageConsumer createConsumer(Destination destination, String messageSelector);

    MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal);

    TopicSubscriber createDurableSubscriber(Topic topic, String name);

    TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal);

    其中messageSelector為消息選擇器;noLocal標志默認為false,當設置為true時限制消費者只能接收和自己相同的連接(Connection)所發布的消息,此標志只適用于主題,不適用于隊列;name標識訂閱主題所對應的訂閱名稱,持久訂閱時需要設置此參數。

    如:MessageConsumer consumer = session.createConsumer(destination);

    </td> </tr> </tbody> </table>

     

    6.2、消息的同步與異步接收

    消息的同步接收是指客戶端主動去接收消息,客戶端可以采用MessageConsumerreceive方法去接收下一個消息。

    消息的異步接收是指當消息到達時,ActiveMQ主動通知客戶端。客戶端可以通過注冊一個實現 MessageListener接口的對象到MessageConsumerMessageListener只有一個必須實現的方法onMessage,它只接收一個參數,即Message。在為每個發送到Destination的消息實現onMessage時,將調用該方法。

    Message receive()

    Message receive(long timeout)

    Message receiveNoWait()

    其中timeout為等待時間,單位為毫秒。

    或者實現MessageListener接口,每當消息到達時,ActiveMQ會調用MessageListener中的

    onMessage函數。

    如:Message message = consumer.receive();

    </td> </tr> </tbody> </table>

     

    6.3、消息選擇器

    JMS提 供了一種機制,使用它,消息服務可根據消息選擇器中的標準來執行消息過濾。生產者可在消息中放入應用程序特有的屬性,而消費者可使用基于這些屬性的選擇標 準來表明對消息是否感興趣。這就簡化了客戶端的工作,并避免了向不需要這些消息的消費者傳送消息的開銷。然而,它也使得處理選擇標準的消息服務增加了一些 額外開銷。

    消息選擇器是用于MessageConsumer的過濾器,可以用來過濾傳入消息的屬性和消息頭部分(但不過濾消息體),并確定是否將實際消費該消息。按照JMS文檔的說法,消息選擇器是一些字符串,它們基于某種語法,而這種語法是SQL-92的子集。可以將消息選擇器作為 MessageConsumer創建的一部分。

    如:public final String SELECTOR = “JMSType = ‘TOPIC_PUBLISHER’”;

    該選擇器檢查了傳入消息的JMSType</span>屬性,并確定了這個屬性的值是否等于 TOPIC _PUBLISHER</span></span>。如果相等,則消息被消費;如果不相等,那么消息會被忽略。</span>

     本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
     轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
     本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!

    推薦閱讀

    AcitveMQ編程實踐

    本文主要介紹了ActiveMQ的編程模型,常用的類。以及一個通用的MQ編程模式。方便初學者快速掌握 ActiveMQ的編程方法。 一、 開發 JMS 的步驟 一個 JMS 應用是幾個 JMS 客...

    學編程關鍵在動手,提高在實踐

    作為一個菜鳥,東東有一些學習編程的體會,與大家一起分享,讓我們學習的路上少走些彎路。東東在之前的學習中走過不少彎路,先談一些體會(誤區)吧: 體會(誤區)之一:重理論,輕實踐 東東在PHP上花過...

    求教編程的高手

    #include <stdio.h> #include <conio.h> #include <stdarg.h> char buffer[80]="30 90.0 abc"; int vssf...

    簡介Scala編程語言

    Scala編程語言近來抓住了很多開發者的眼球。如果你粗略瀏覽Scala的網站,你會覺得Scala是一種純粹的面向對象編程語言,而又無縫地結合了命令式和函數式的編程風格。 Christopher ...
  • sesese色