Java開源項目:Spring Integration
采用spring integration開發一個健壯的消息傳送框架
先是總覽了Spring integration,它是一種便捷的事件驅動消息框架;你可以用它協同消息、通道、適配器、網關。接著介紹了如何利用Spring Integration實現ActvieMQ和JMS,隨后簡短地介紹了針對輕量級負載和重量級負載的多個應用工作流程。
Spring Integration作為一種企業級集成框架,遵從現代經典書籍《企業集成模式》,為開發者提供了一種便捷的實現模式。Spring Integration構建在Spring控制反轉設計模式之上,抽象了消息源和目標,利用消息傳送和消息操作來集成應用環境下的各種組件。采用Spring Integration構建的應用可以在組件之間發送消息,可以穿過一個消息總線,將該消息發送到應用環境中的另一個服務器,甚至是同一臺虛擬機的其它類中。
我會在Spring開源Java項目的 第二部分向大家介紹Spring Integration。首先總覽基于Spring Integration的事件驅動框架的各個組件,接著做一個簡單的開發,了解Spring Integration是如何工作的。最后向大家展示一個更加復雜的應用場景,即在此場景下借助JMS,集成組件并貫穿整個ActiveMQ消息總線。
事件驅動框架
事件驅動框架是企業級集成領域最重要最成功的模式之一,也是本文關注的重點。在事件驅動框架中,系統發布事件,接著系統中相應的組件就會監聽這些特定的事件、或者某種類型的事件。一旦某個感興趣的事件發生了,組件就會告警,并做出必要的響應。
事件驅動框架的優勢是耦合度很低、系統擴展性好,而且生產者無需關心消費者。這就使得在一個已存在或者舊系統中集成一個新的組件變得相對容易:該系統發布 事件,配置新組件用來監聽這些事件。所有事件驅動框架交互都是異步的,因此組件可以適時地處理這些消息。試想如果負載增加很大,一個組件處理某個消息可能 需要耗費更多的時間,但這是避免不了的事情。
(譯者注,為了閱讀的順暢性, 本文中的生產者和消費者均指的是message producers、 message consumers)
某個應用響應可能變慢,但本不應該如此。
Spring Integration所支持的事件驅動框架基于三個核心組件:
- 消息作為對象從一個組件傳遞給另一個組件;
- 通道用來傳遞消息,它們可以是同步或者異步的;
- 適配器調度一個通道輸出進入另一個通道的輸入中; </ul>
圖1展示了Spring Integration中消息、通道、適配器之間的關系。
Figure 1. Messages, channels, and adapters
圖1.消息、通道、適配器
注意,一旦組件1發送一個消息到指定通道,適配器會調度通道輸出到組件2中。最重要的是適配器指示,任何發送到此通道中的消息都應該定向到組件2中.
你好,Spring Integration!
沒有“Hello World”,Java 技術介紹都不算完整。本例中,我利用Spring Integration集成一個小程序,調度一個文本消息,從一個組件傳遞到另一個組件。通過這個練習,希望大家會對Spring Integration的消息、通道、適配器如何工作有一個更加清晰的認識。(也可以通過最新的Spring Integration Java文檔查看每個組件更多詳細信息。)
首先,列表1展示applicationContext.xml文件,它像膠水一樣將這三個應用組件整合在了一起。
列表1、applicationContext.xml
<beans:beans xmlns:beans="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration ;<!-- Component scan to find all Spring components --> <context:component-scan base-package="com.geekcap.springintegrationexample" /> <!-- A Spring Integration channel --> <channel id="helloWorldChannel" /> <!-- A Spring Integration adapter that routes messages sent to the helloWorldChannel to the bean named "helloServiceImpl"'s hello() method --> <service-activator input-channel="helloWorldChannel" ref="helloServiceImpl" method="hello" />
</beans:beans></pre>
注意, <beans>節點定義了約束(schema)和XML命名空間,默認的上下文(xmlns),定義在 http://www.springframework.org/schema/integration中,無需為通道或者服務激活器節點指定前綴。(這 么做主要是為了XML更易讀,稍后我們切換到beans默認上下文,此時就需要為Spring Integration節點指定前綴。)
列表1定義了三個組件:
- 利用 component-scan,我們就可以在代碼中采用諸如@Service或者@Component等注解beans。一旦beans被注解,運行某個 組件掃描,Spring就能找到這些beans。component-scan節點會去掃描這個基礎包,進而掃描此包路徑下的所有子包。本例中,我們定 義,注釋了兩種bean:HelloService和GreeterService;
- HelloService bean打印“Hello,name”到標準輸出。GreeterService bean發送一個name到HelloService;
- helloWorldChannel是一個通道,代碼可以給它發送消息;
- service-activator是一個適配器,指示所有發送到helloWorldChannel的消息,都應轉發到helloServiceImpl的hello()方法中。注意,Spring默認bean名是類名,打頭的字母需要小寫。
</ol>列表2展示HelloService接口,接口并不是必需的,比如直接發送消息到某個bean就不需要接口。不過我們習慣了利用Spring定義接口,這樣到后面實現改動起來會比較容易。(采用接口,單元測試也變得更加容易。)
列表2 、HelloService.java
package com.geekcap.springintegrationexample.service;public interface HelloService { public void hello( String name ); }</pre>
HelloService接口定義了一個hello()方法,接受一個字符串參數。Spring可以非常智能地找到該方法和其參數簽名,并將該消息轉換成一個字符串值。
列表3展示了HelloServiceImpl類,該類實現了HelloService接口。
列表3、HelloServiceImpl.java
package com.geekcap.springintegrationexample.service;import org.springframework.stereotype.Service;
@Service public class HelloServiceImpl implements HelloService { @Override public void hello(String name) { System.out.println( "Hello, " + name ); } }</pre>
HelloServiceImpl實現了hello()方法,打印“Hello,name”到標準輸出。用@Service注解該類,這樣定義在 applicationContext.xml文件中的component-scan就可以找到它。注意,服務看起來很標準,沒有涉及某個Spring Integration action。
列表4展示了GreeterService,這是greeters需要實現的接口。
列表4、GreeterService.java
package com.geekcap.springintegrationexample.service;public interface GreeterService { public void greet( String name ); }</pre>
列表5展示了GreeterService的接口實現。
列表5、GreeterServiceImpl.java
package com.geekcap.springintegrationexample.service;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.MessageChannel; import org.springframework.integration.support.MessageBuilder; import org.springframework.stereotype.Service;
@Service public class GreeterServiceImpl implements GreeterService { @Autowired private MessageChannel helloWorldChannel;
@Override public void greet(String name) { helloWorldChannel.send( MessageBuilder.withPayload( name ).build() ); }
}</pre>
更多的代碼信息
GreeterServiceImpl類采用@Service注解,這樣Spring可以認定其為一個服務(service)。該類自動注入名為 helloWorldChannel的MessageChannel;只要此處的通道名和applicationContext.xml文件中定義的相 同,Spring就會找到此通道。如果你想重載此MessageChannel,你可以給MessageChannel注解@Qualifier,給此 MessageChannel一個channel bean名字。一旦GreeterServiceImpl’s greet()方法被調用,它會創建并發送一個消息給helloWorldChannel。
MessageChannel是一個接口,定義了兩種send()方法:一種用來接收超時,另一種不接受(取決于其實現是否為永久阻塞)。MessageBuilder類為創建者設計模式(Builder design pattern)一 種實現形式,幫助創建消息。本例中我們將一個字符串傳給MessageBuilder,字符串可以用來指定消息頭、過期日期時間、優先級、關聯ID、回復 和錯誤通道等等。一旦完成MessageBuilder配置,調用build()方法就會返回一個消息,此消息可以發送給任何一個通道。
Listing 6 shows the source code for a command-line application that pulls all of our code together.
列表6展示了某命令行應用源碼,自此該應用將之前所有的代碼串起來了。列表6、App.java
package com.geekcap.springintegrationexample.main;import com.geekcap.springintegrationexample.service.GreeterService; import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
Main entry-point into the test application */ public class App { public static void main( String[] args ) {
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "applicationContext.xml" ); GreeterService greeterService = applicationContext.getBean( "greeterServiceImpl", GreeterService.class ); greeterService.greet( "Spring Integration!" );
} }</pre>
App類從classpath路徑下加載applicationContext.xml文件,此文件位于src/main/resources路徑下,通 過Maven自動地將其嵌入到JAR文件中。接下來,App從application context獲取greeterServiceImpl bean,最后調用GreeterService的greet()方法。
A Spring Integration
圖2展示了圖1的Spring Integration示例圖在本例中的具體實現。
圖2、你好,Spring Integration!
![]()
下面是集成應用流程:
- App類調用了GreeterService的greet()方法,把字符串傳給”Spring Integration!”給greet();
- GreeterService 用wired注入一個名為helloWorldChannel的MessageChannel。此通道利用一個MessageBuilder創建一個消 息,此消息包含”Spring Integration!”字符串,然后將此消息發送給MessageChannel;
- 配置的service-activator負責將任何發送到helloWorldChannel的消息調度到HelloService的hello();
- HelloServiceImpl類的hello()方法調用后,”Hello, Spring Integration!”就會打印到屏幕上。
列表7展示Maven pom.xml文件構建這個應用示例:
列表7、你好,Spring Integration的Maven POM文件
<project xmlns="<modelVersion>4.0.0</modelVersion>
<groupId>com.geekcap.javaworld</groupId> <artifactId>HelloSpringIntegration</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging>
<name>HelloSpringIntegration</name> <url>http://maven.apache.org</url>
<properties>
<spring.version>3.2.1.RELEASE</spring.version> <spring.integration.version>2.2.5.RELEASE</spring.integration.version> <java.version>1.6</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Spring Dependencies --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>${spring.version}</version> </dependency> <!-- Spring Integration --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>${spring.integration.version}</version> </dependency> <!-- Testing --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency>
</dependencies>
<build>
<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass>com.geekcap.springintegrationexample.main.App</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> <id>copy</id> <phase>install</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> </configuration> </execution> </executions> </plugin> </plugins> <finalName>hello-spring-integration</finalName>
</build> </project></pre>
POM文件引入Spring核心、上下文、bean依賴,同時引入了特定的Spring Integration依賴,并且定義了三個插件:
- maven-compiler-plugin指示Spring采用Java1.6進行構建。
- maven- jar-plugin指示Spring包含JAR文件classpath路徑下lib目錄中的所有文件。同樣它會指導Spring為此JAR文件生成名為 com.geekcap.springintegrationexample.main.App的main-class。只要對此文件執行Java -jar命令,該類就會執行。
- maven-dependency-plugin指示Maven將所有的項目(project)依賴拷貝到target/lib目錄中。
可以通過下面的命令構建此項目:
mvn clean install正確配置以后,target目錄就會執行源代碼:
java -jar hello-spring-integration.jarSpring logger 會輸出幾行日志,緊隨其后的是本程序的輸出:
Hello, Spring Integration!注意,采用log4j.properties文件配置的Spring logger日志相對簡潔,此properties文件改變了Spring組件中的日志級別。
Spring Integration集成網關代理
除了發送消息,有時還需要做出回應。倘若你想執行某個服務的某個方法
,并做出回應,那么你可以使用網關代理。目前先介紹這些,稍后我會做出解釋。首先,創建一個通道和一個服務激活器(service-activator),就像我們前面在列表1中做的那樣,不過這次還需要添加一個網關節點。
列表8 、New channel和service-activator
<!-- A Spring Integration channel for use by our gateway --> <channel id="helloWorldWithReplyChannel" /><!-- A Spring Integration adapter that routes messages sent to the helloWorldChannel to the bean named "helloServiceImpl"'s getHelloMessage() method --> <service-activator input-channel="helloWorldWithReplyChannel" ref="helloServiceImpl" method="getHelloMessage" />
<!-- Define a gateway that we can use to capture a return value --> <gateway id="helloWorldGateway" service-interface="com.geekcap.springintegrationexample.service.HelloService" default-request-channel="helloWorldWithReplyChannel" /></pre>
網關是由一個接口定義的,在本例中該接口是由HelloService實現的。它定義了一個可用的默認請求/輸入通道。服務激活器(service-activator)調用一個新方法getHelloMessage(),而非hello()。新方法返回”Hello, NAME”。
定義網關之后,就可以像列表5那樣,將其自動注入到GreeterService。這次沒有注入通道,而是注入網關,如列表9其類型為HelloService。
列表9 、Updated GreeterServiceImpl.java
package com.geekcap.springintegrationexample.service;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.MessageChannel; import org.springframework.integration.support.MessageBuilder; import org.springframework.stereotype.Service;
@Service public class GreeterServiceImpl implements GreeterService { @Autowired private MessageChannel helloWorldChannel;
@Autowired private HelloService helloWorldGateway; @Override public void greet(String name) { helloWorldChannel.send(MessageBuilder.withPayload(name).build()); } @Override public void greet2(String name) { System.out.println( helloWorldGateway.getHelloMessage( name ) ); }
}</pre>
更新后的GreeterServiceImpl類自動注入了helloWorldGateway,后者會在applicationContext.xml 文件中被解析,它的類型為HelloService;新寫入的方法greet2()會調用此helloWorldGateway,就像在調用 HelloService。從方法的角度看,greet2()僅僅調用了一個HelloService,此接口就如同一個服務bean:方法無需知曉 Spring Integration所涉及的具體事務。
列表10是更新后的App類,此類會調用新寫入的greet2()方法.
列表10、更改后的 App.java
package com.geekcap.springintegrationexample.main;import com.geekcap.springintegrationexample.service.GreeterService; import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
Main entry-point into the test application */ public class App { public static void main( String[] args ) {
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "applicationContext.xml" ); GreeterService greeterService = applicationContext.getBean( "greeterServiceImpl", GreeterService.class ); greeterService.greet( "Spring Integration!" ); greeterService.greet2( "Spring Integration (with response)!");
} }</pre>
略去Spring的日志部分,此段代碼的輸出結果如下:
Hello, Spring Integration! Hello, Spring Integration (with response)!重構后的新輪子
這背后發生了什么?這里我們對重構應用做一個總結:
- App類調用GreeterService的greet2()方法;
- greet2()調用getHelloMessage()方法,此方法被認為是一個HelloService方法(確切的說它是一個網關);
- 網關實現了HelloService接口中的方法,該方法通過helloWorldWithReplyChannel通道對請求進行調度;
- 通過配置服務激活器(service-activator),任何發給helloWorldWithReplyChannel的消息都會傳給helloServiceImpl的getHelloMessage()方法;
- helloServiceImpl構造了本次響應并將其返回;
- 服務激活器(service-activator)通過下面任意一種方式處理響應:一種是定義在服務激活器中的輸出通道(output-channel),另一種是定義在消息頭重的回復通道。網關自動創建一個臨時的匿名、點對點回復通道;監聽此通道,并將其添加到消息的replyHeader中;
- 通道接受這個響應,將其作為消息,通過回復通道,將其轉換成服務中定義的返回值;
- 最后網關返回響應給調用者,如本例的GreeterServiceImpl。
Spring Integration集成JMS和ActiveMQ
到目前為止,通過傳遞消息我們抽象了一個服務調用。同樣,我們在一個網關和通道后面隱藏了一個服務調用。以上兩部分給了我們充足的信息,從概念上去理解 Spring Integration是如何工作的。接下來我們利用這些知識來拓展一個更加真實的企業級應用場景,創建通道,讓其與某個運行在ActiveMQ上的JMS(Java Message Service)主題進行通信。ActiveMQ是一個支持JMS API的開源消息代理,它由Java語言寫成,遵循apache協議,可免費使用。
JMS定義了兩種消息傳遞方式:主題和隊列。主題采用發布訂閱方式進行運作,而隊列采用點對點方式運作。發布訂閱范例指的是一個消息生成者發布一個消息后,零個或者多個消費者接受這些消息。點對點范例指的是某個消息生產者發布一個消息,就有一個對應的消費者。用隊列做兩個組件之間異步通信是很棒的,而主題特別適合事件驅動框架。
基于事件驅動框的應用要點是將消息生產者與其消費者解耦,方式和發布訂閱范例一樣;利用主題范例,一個組件通知變化或者更新,但僅有訂閱了主題的消費者會接收這些消息。生產者并不知道消費者是誰,也無需關心 。這是一個很好的松耦合例子。
為了展示企業級系統中發布訂閱消息方式,我們會構建兩個組件:
- PublisherService:一個發布消息給主題的組件;
- MessageListener:一個可以訂閱主題,并接收消息的組件。
為了將消息傳給一個RESTful服務-(作為一種發布消息到主題的方式,)我們需要圍繞PublisherService構建基礎組件。圖3顯示了各個組件之間是如何交互的,接下來的是具體細節。
圖3、 JMS Spring Integration示例
![]()
下面是圖3具體流程:
- REST客戶端發布消息到MessageController,此Controller為Spring MVC Controller
- MessageController調用PublishService的 send() 方法發布一個消息到topicChannel
- 配置一個JMS 輸出 channel適配器將topicChannel中的消息調度到topic.myTopic
- ConnectionFactory中定義了JMS配置文件
- 配置一個JMS消息驅動通道適配器用來監聽topic.myTopic主題,并發送消息到listenerChannel
- 配置一個service-activator調度listenerChannel中的消息到messageListenerImpl的processMessage()方法中
- messageListenerImpl類接收這些消息,并對其進行處理,顯示在屏幕上
本應用的配置在列表11中展示。
列表11、springintegrationexample-servlet.xml (Application Context)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:oxm="http://www.springframework.org/schema/oxm" xmlns:int-jme="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm-3.0.xsd">
<!-- Component scan to find all Spring components --> <context:component-scan base-package="com.geekcap.springintegrationexample" />
<bean>
<property name="order" value="1" /> <property name="messageConverters"> <list> <!-- Default converters --> <bean/> <bean/> <bean /> <bean/> <bean/> <bean /> </list> </property>
</bean>
<!-- Define a channel to communicate out to a JMS Destination --> <int:channel id="topicChannel"/>
<!-- Define the ActiveMQ connection factory --> <bean id="connectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<!--
Define an adaptor that route topicChannel messages to the myTopic topic; the outbound-channel-adapter automagically finds the configured connectionFactory bean (by naming convention)
--> <int-jms:outbound-channel-adapter channel="topicChannel"
destination-name="topic.myTopic" pub-sub-domain="true" />
<!-- Create a channel for a listener that will consume messages--> <int:channel id="listenerChannel" />
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
channel="listenerChannel" destination-name="topic.myTopic" pub-sub-domain="true" />
<int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage" />
</beans></pre>
列表11的最開始為Spring MVC建立了AnnotationMethodHandlerAdapter,這個超越了本文的范疇。你需要掌握的是,消息轉換器展示的內容由Spring Controllers返回。本例中最重要的部分如下:
- connectionFactory定義了ActiveMQ連接參數,ActiveMQ安裝在本機上,默認配置,端口號為61616。
- topicChannel定義了一個用來發布消息的通道(如下)。
- outbound-channel-adapter定 義在Spring Integration JMS命名空間中,將topicChannel中的消息發布到topic.myTopic,并將其設置為一個主題范例。pub-sub-domain設置 為true意味著此為主題范例,設置為false則為隊列范例;需要注意的是,outbound-channel-adapter通過connectionFactory的bean名找到ActiveMQ配置。
- listenerChannel定義了一個用來處理消息的channel。
- message-driven-channel-adapter定義了一個適配器,用來監聽topic.myTopic主題中的消息,并將其派發給listenerChannel。
- service-activator再將listenerChannel中的消息派發給messageListenerImpl的processMessage()方法。
</ul>發布:MessageController.java
你可以下載源碼獲取所有文件。我僅僅回顧一下重點。首先,列表12展示了MessageController類,用來消息發布。
列表12、 MessageController.java
package com.geekcap.springintegrationexample.web;import com.geekcap.springintegrationexample.service.PublishService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody;
import javax.servlet.http.HttpServletResponse;
@Controller public class MessageController { @Autowired private PublishService publishService;
@RequestMapping( value = "/message", method = RequestMethod.POST ) @ResponseBody public void postMessage( @RequestBody com.geekcap.springintegrationexample.model.Message message, HttpServletResponse response ) { // Publish the message publishService.send( message ); // Set the status to 201 because we created a new message response.setStatus( HttpStatus.CREATED.value() ); }
}</pre>
MessageController是一個Spring MVC controller,實現了RESTful web service處理POST到 /message 資源。要求一 個JSON對象,后面我們會提到,Spring MVC最自動將其轉換為一個com.geekcap.springintegrationexample.model.Message。 MessageController將此message傳給PublishService,后者調用send()方法,將Message傳遞給調用者。
列表13展示了PublishServiceImpl類:
列表13、PublishServiceImpl.java
package com.geekcap.springintegrationexample.service;import com.geekcap.springintegrationexample.model.Message; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.integration.MessageChannel; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.support.MessageBuilder; import org.springframework.stereotype.Service;
@Service public class PublishServiceImpl implements PublishService { private static final Logger logger = Logger.getLogger( PublishServiceImpl.class );
@Autowired private MessageChannel topicChannel; @Override public void send( Message message ) { logger.info( "Sending message to message channel: " + message ); topicChannel.send( MessageBuilder.withPayload( message.toString() ).build() ); }
}</pre>
PublishServiceImpl類注入了topicChannel,topicChannel由application context創建;借助MessageBuilder類發送一個字符串類型的Message。這些都是發布一個消息到ActiveMQ主題所必需的。重 點是,Spring Integration利用outbound-channel-adapter查找connectionFactory,最終找到主題目標 topic.myTopic,并且將消息發送給它。
訂閱:MessageListenerImpl
在操作的另一端,列表14展示了MessageListenerImpl類的源碼:
列表14 、MessageListenerImpl.java
package com.geekcap.springintegrationexample.listener;import org.apache.log4j.Logger; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.stereotype.Service;
@Service public class MessageListenerImpl { private static final Logger logger = Logger.getLogger( MessageListenerImpl.class );
public void processMessage( String message ) { logger.info( "Received message: " + message ); System.out.println( "MessageListener::::::Received message: " + message ); }
}</pre>
MessageListenerImpl類作為一個服務,定義了一個方法:processMessage(String)。定義在application context中的message-driven-channel-adapter,負責監聽發布在topic.myTopic主題中的消息,并將其派發 給listenerChannel。service-activator再將listenerChannel中的消息派發給 messageListenerImpl類的processMessage(String)方法。正如你所看到 的,MessageListenerImpl對于如何回應JMS 消息一無所知,所有的細節都交由application context配置文件處理。
構建和運行應用
此項目的POM文件如列表15所示
列表15 、pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0 ; <modelVersion>4.0.0</modelVersion> <groupId>com.geekcap</groupId> <artifactId>spring-integration-example</artifactId> <packaging>war</packaging> <version>1.0-SNAPSHOT</version> <name>spring-integration-example Maven Webapp</name> <url>http://maven.apache.org</url><properties> <spring.version>3.2.1.RELEASE</spring.version> <spring.integration.version>2.2.5.RELEASE</spring.integration.version> <servlet-api.version>2.5</servlet-api.version> <java.version>1.6</java.version> <jackson.version>1.9.12</jackson.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- Spring Dependencies --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-orm</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> <version>${servlet-api.version}</version> <scope>provided</scope> </dependency> <!-- Spring Integration--> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>${spring.integration.version}</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jms</artifactId> <version>${spring.integration.version}</version> </dependency> <!-- Logging: Log4J --> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.5.8</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.15</version> <exclusions> <exclusion> <groupId>javax.mail</groupId> <artifactId>mail</artifactId> </exclusion> <exclusion> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> </exclusion> <exclusion> <groupId>com.sun.jdmk</groupId> <artifactId>jmxtools</artifactId> </exclusion> <exclusion> <groupId>com.sun.jmx</groupId> <artifactId>jmxri</artifactId> </exclusion> </exclusions> </dependency> <!-- Include Jackson so that we can render JSON responses --> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-jaxrs</artifactId> <version>1.6.1</version> </dependency> <!-- Include ActiveMQ --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> <!-- JUnit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> </plugins> <finalName>spring-integration-example</finalName> </build>
</project></pre>
應用依賴
本例包括以下依賴:
- Core Spring
- Spring MVC
- Spring JMS
- Spring Integration
- Spring Integration JMS
</ul>你需要給集成后的組件添加額外的依賴,其中的一些配置可以有多個選項,但它們都可以整合到通道和適配器中。
部署Tomcat和ActiveMQ
接下來,利用下面的命令構建此應用:
mvn clean install可以從Apache Tomcat網站下載Tomcat,在本地解壓,執行Tomcat bin目錄下的startup.sh或者startup.bat文件,啟動tomcat。將最終的WAR文件拷貝到Tomcat webapps目錄下方便部署到tomcat中。也可以從Apache ActiveMQ網站下載ActiveMQ,在本地解壓,執行bin目錄下的命令:
./activemq start或者window操作系統:
activemq start你可以執行activemq stop停止ActiveMQ,執行shutdown.sh或者shutdown.bat關閉Tomcat。建議先啟動ActiveMQ,這樣本例中的應用就能夠連接到topic.myTopic主題,并開始消息監聽。
整合應用和Spring Integration
大家已經了解消息、通道、適配器、網關如何一起工作,從消息生產者到消息消費者去抽象消息;了解ActiveMQ 和 JMS如何集成到Spring Integration中。Spring Integration最強大的地方,當開始思考構建模塊和復用服務,你就可以用Spring Integration整合application工作流。比如,一個事件驅動應用既可以發布一個輕量級負載消息(鑒別某個系統,該系統產生受影響資源的事件和ID),也可以發布一個重量級負載消息(包含所有修改資源的內容)。
使用輕量級負載的好處是,倘若資源經常改變,無需擔心重量級負載內容:回到系統記錄–其實是一種資源,尋找最新的改動資源版本。缺點是每一次系統產生一個事件,所有監聽器都會調用系統回復,這樣很可能會增加負載。因此,折衷的辦法是系統產生一個重量級負載,禁止監聽器回掉。倘若系統同一資源產生多個事件,其中一些負載可能過期,調節性事件負載落到監聽器上。
我們可以配置Spring Integration結合入站的、出站通道和模塊組件去處理這兩種場景。一個具體的例子,前面例子中定義的監聽器,這里采用Spring Integration配置此監聽器:
<!-- Create a channel for a listener that will consume messages--> <int:channel id="listenerChannel" /><int-jms:message-driven-channel-adapter id="messageDrivenAdapter" channel="listenerChannel" destination-name="topic.myTopic" pub-sub-domain="true" /> <int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage" /></pre> <p>從理論上講,負載已經存在,并傳遞給了messageListenerImpl的processMessage()方法。倘若沒有傳遞過去會怎樣?倘若發送給topic.myTopic的消息僅僅是改動資源的ID會怎樣?這取決于系統回掉以及獲取負載?重寫邏輯?幸運的是,這些都不需要。我們可以定義另外一個bean來獲取這些負載,派發消息到這個bean,并將此bean的響應傳給messageListenerImpl bean。 </p>
列表16展示了newRetrievePayloadImpl類的源碼。
列表16、 RetrievePayloadImpl.java
package com.geekcap.springintegrationexample.service;import org.springframework.stereotype.Service;
@Service public class RetrievePayloadServiceImpl implements RetrievePayloadService { @Override public String getPayload(String id) { // Go back to the SOR and retrieve the payload for the specified id ... return "Payload for " + id; } }</pre>
這個類理論上會返回到系統記錄(SOR),通過特定的ID為此組件獲取負載并返回。本例中,僅僅返回字符串:”Payload for …”
現在我們在message-driven-channel和messageListenerImpl之間注入這個新的服務,如列表17。
列表17 、Updated applicationContext.xml file with the RetrievePayloadService
<!-- Create a channel for a listener that will consume messages--> <int:channel id="listenerChannel" /><int-jms:message-driven-channel-adapter id="messageDrivenAdapter" channel="getPayloadChannel" destination-name="topic.myTopic" pub-sub-domain="true" /> <int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage" /> <int:channel id="getPayloadChannel" /> <int:service-activator input-channel="getPayloadChannel" output-channel="listenerChannel" ref="retrievePayloadServiceImpl" method="getPayload" /></pre> <p>不再直接將message-driven-channel-adapter傳給listenerChannel, 相反地將其傳給getPayloadChannel。接著getPayloadChannel調用retrievePayloadServiceImpl的getPayload()方法,將其輸出傳給listenerChannel,后者將增加后的負載傳給messageListenerImpl的processMessage()方法。通過配置,我們可以讓MessageListenerImpl類獨自完成所有這些工作,通過其它服務傳遞這些消息。 </p>
需要注意的是,一旦服務器暴露了某一個RESTful接口,那么RetrievePayloadService會被HTTP出站網關完整地取代。比如,列表18展示了某個項目配置,從一個Message對象中獲取”ResourceLink” ,并將其作為HTTP請求的一部分。
列表18、利用一個HTTP輸出網關回調一個服務
<int:channel id="createEntityChannel" /> <int-http:outbound-gateway request-channel="createGuestChannel" url="http://localhost:8080{link}" http-method="GET" expected-response-type="com.mycompany.model.Entity" reply-channel="transformEntityChannel" > <int-http:uri-variable name="link" expression="payload.getResourceLink()" /> </int-http:outbound-gateway>列表18僅僅是applicationContext.xml文件很小的一部分,但足以說明一個消息如何被發送到 createEntityChannel,接著被傳遞給某HTTP出站網關,后者進而從http://localhost:8080/link獲取 com.mycompany.model.Entity,接著將其傳遞給transformEntityChannel。Spring Integration提供了一組豐富的適配器和網關,這樣你就可以專注于業務,不用關心如何寫代碼去調用這些服務、發布消息、讀取主題等等。
結語
Spring Integration有助于解決企業級整個問題,它實現了Enterprise Integration Patterns書中定義的設計模式。包括將消息消費者從消息生成者那里剝離的異步消息范例。不是方法直接調用,而是由Spring Integration發送一個消息到某個通道。一個適配器或者網關管理這個通道,并將此消息發送到恰當的目的地,不管目的地是運行在相同虛擬機上的其它服務,還是在某個企業級服務總線上的其它數據中心運行的服務。
在Java開源項目中,我定義了消息、通道、適配器、網關。接著展示了如何利用Spring Integration從一個組件到另一個組件傳遞消息,如何處理響應,如何利用JMS和ActiveMQ作為消息總線集成這些組件。最后,展示了如何通過寫模塊組件,定義通道控制消息流來整合應用。
本文僅僅是對Spring Integration做了一些初級的介紹。JMS只是適配器的一種,Spring Integration同樣支持其它類型的適配器。比如利用Spring Integration借助郵件、文件系統、web service調用、推特等傳遞消息。關鍵是只有確保消息發送給正確的組件,才能得到你想要的結果。
原文鏈接: javaworld 翻譯: ImportNew.com - 喬永琪
譯文鏈接: http://www.importnew.com/16538.html