Spring與Akka的集成
概述
近年來隨著Spark的火熱,Spark本身使用的開發語言Scala、用到的分布式內存文件系統Tachyon(現已更名為Alluxio)以及基于Actor并發編程模型的Akka。了解過Akka或者Actor的人應該知道,這的確是一個很不錯的框架,按照Akka官網的描述——使用Akka使得構建強有力的并發與分布式應用將更加容易。由于歷史原因,很多Web系統在開發分布式服務時首先會選擇RMI(Remote Method Invoke ,遠程方法調用)、RPC(Remote Procedure Call Protocol,遠程過程調用)或者使用JMS(Java Messaging Service,Java消息服務)。
但是使用RMI只能使用java語言,而且開發、執行效率都不高;RPC框架雖然可以通過匹配方法簽名的方式比RMI更靈活,但是其存在調用超時、調用丟失等缺點;JMS方式雖然可以通過At Least Delivery Once、消息持久化等機制保證消息不會丟失,但是只能作為一種跨服務的生產者、消費者編程模型使用。Akka不但處理了以上問題,而且還可以使用Actor作為并發編程模型,減少java多線程編程的阻塞、調度、上下文開銷甚至死鎖等問題。此外,Akka還提供了集群Sharding、流處理等功能的支持,更易于實現有限狀態自動機等功能。所以有心的開發者勢必會關心如何在最常見的Java系統中使用它,如何與Spring集成?
本文參考Akka官方使用文檔,根據自身的經驗和理解,提供Akka與Spring集成的方案。本文不說明Spring框架的具體使用,并從Spring已經配置完備的情況開始敘述。
Actor系統——ActorSystem
什么是ActorSystem?根據Akka官網的描述——ActorSystem是一個重量級的結構體,可以用于分配1到N個線程,所以每個應用都需要創建一個ActorSystem。通常而言,使用以下代碼來創建ActorSystem。
ActorSystem system = ActorSystem.create("Hello");
不過對于接入Spring而言,由IOC(Inversion of Control,控制反轉)方式會更接地氣,你可以這樣:
<!-- AKKA System Setup -->
<bean id="actorSystem" class="akka.actor.ActorSystem" factory-method="create" destroy-method="shutdown" scope="singleton">
<constructor-arg value="helloAkkaSystem"/>
</bean>
然后在你需要的地方依賴注入即可。
Actor編程模型
在最新的Scala官方網站上已經決定廢棄Scala自身的Actor編程模型,轉而全面擁抱Akka提供的Actor編程模型。
我們可以通過以下代碼(代碼片段借用了Akka官網的例子)創建一個簡單的Actor例子。
Greeter是代表問候者的Actor:
public class Greeter extends UntypedActor {
public static enum Msg {
GREET, DONE;
}
@Override
public void onReceive(Object msg) {
if (msg == Msg.GREET) {
System.out.println("Hello World!");
getSender().tell(Msg.DONE, getSelf());
} else
unhandled(msg);
}
}</code></pre>
一般情況下我們的Actor都需要繼承自UntypedActor,并實現其onReceive方法。onReceive用于接收消息,你可以在其中實現對消息的匹配并做不同的處理。
HelloWorld是用于向Greeter發送問候消息的訪客:
public class HelloWorld extends UntypedActor {
@Override
public void preStart() {
// create the greeter actor
final ActorRef greeter = getContext().actorOf(Props.create(Greeter.class), "greeter");
// tell it to perform the greeting
greeter.tell(Greeter.Msg.GREET, getSelf());
}
@Override
public void onReceive(Object msg) {
if (msg == Greeter.Msg.DONE) {
// when the greeter is done, stop this actor and with it the application
getContext().stop(getSelf());
} else
unhandled(msg);
}
}</code></pre>
有了Actor之后,我們可以這樣使用它:
ActorRef a = system.actorOf(Props.create(HelloWorld.class), "helloWorld");
在HelloWorld的preStart實現中,獲取了Greeter的ActorRef(Actor的引用)并向Greeter發送了問候的消息,Greeter收到問候消息后,會先打印Hello World!,然后向HelloWorld回復完成的消息,HelloWorld得知Greeter完成了向世界問好這個偉大的任務后,就結束了自己的生命。HelloWorld的例子用編程API的方式告訴了我們如何使用Actor及發送、接收消息。為了便于描述與Spring的集成,下面再介紹一個例子。
CountingActor(代碼主體借用自Akka官網)是用于計數的Actor,見代碼清單1所示。
代碼清單1
@Named("CountingActor")
@Scope("prototype")
public class CountingActor extends UntypedActor {
public static class Count {
}
public static class Get {
}
// the service that will be automatically injected
@Resource
private CountingService countingService;
private int count = 0;
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Count) {
count = countingService.increment(count);
} else if (message instanceof Get) {
getSender().tell(count, getSelf());
} else {
unhandled(message);
}
}
}</code></pre>
CountingActor用于接收Count消息進行計數,接收Get消息回復給發送者當前的計數值。CountingService是用于計數的接口,其定義如下:
public interface CountingService {
/**
* 計數
* @param count
* @return
*/
int increment(int count);
}</code></pre>
CountingService的具體實現是CountingServiceImpl,其實現如下:
@Service("countingService")
public class CountingServiceImpl implements CountingService {
private static Logger logger = LoggerFactory.getLogger(CountingServiceImpl.class);
/*
* (non-Javadoc)
*
* @see com.elong.sentosa.metadata.service.CountingService#increment(int)
*/
@Override
public int increment(int count) {
logger.info("increase " + count + "by 1.");
return count + 1;
}
}</code></pre>
CountingActor的計數實際是由CountingService完成,并且CountingService通過注解方式注入了CountingActor。
細心的同學可能發現了CountingActor使用了注解Named,這里為什么沒有使用@Service或者@Component等注解呢?由于Akka的Actor在初始化的時候必須使用System或者Context的工廠方法actorOf創建新的Actor實例,不能使用構造器來初始化,而使用Spring的Service或者Component注解,會導致使用構造器初始化Actor,所以會拋出以下異常:
akka.actor.ActorInitializationException: You cannot create an instance of [com.elong.metadata.akka.actor.CountingActor] explicitly using the constructor (new). You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.
如果我們不能使用@Service或者@Component,也不能使用XML配置的方式使用(與注解一個道理),那么我們如何使用CountingActor提供的服務呢?
IndirectActorProducer接口
IndirectActorProducer是Akka提供的Actor生成接口,從其名字我們知道Akka給我們指出了另一條道路——石頭大了繞著走!通過實現IndirectActorProducer接口我們可以定制一些Actor的生成方式,與Spring集成可以這樣實現它,見代碼清單2所示。
代碼清單2
public class SpringActorProducer implements IndirectActorProducer {
private final ApplicationContext applicationContext;
private final String actorBeanName;
private final Object[] args;
public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName, Object ... args) {
this.applicationContext = applicationContext;
this.actorBeanName = actorBeanName;
this.args = args;
}
public Actor produce() {
return (Actor) applicationContext.getBean(actorBeanName, args);
}
public Class<? extends Actor> actorClass() {
return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
}
}</code></pre>
SpringActorProducer的實現主要借鑒了Akka官方文檔,我這里對其作了一些擴展以便于支持構造器帶有多個參數的情況。從其實現看到實際是利用了ApplicationContext提供的getBean方式實例化Actor。
這里還有兩個問題:一、ApplicationContext如何獲取和設置?二、如何使用SpringActorProducer生成Spring需要的Actor實例?
對于第一個問題,我們可以通過封裝SpringActorProducer并實現ApplicationContextAware接口的方式獲取ApplicationContext;對于第二個問題,我們知道Akka中的所有Actor實例都是以Props作為配置參數開始的,這里以SpringActorProducer為代理生成我們需要的Actor的Props。
SpringExt實現了以上思路,見代碼清單3所示。
代碼清單3
@Component("springExt")
public class SpringExt implements Extension, ApplicationContextAware {
private ApplicationContext applicationContext;
/**
* Create a Props for the specified actorBeanName using the
* SpringActorProducer class.
*
* @param actorBeanName
* The name of the actor bean to create Props for
* @return a Props that will create the named actor bean using Spring
*/
public Props props(String actorBeanName, Object ... args) {
return Props.create(SpringActorProducer.class, applicationContext, actorBeanName, args);
}
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}</code></pre>
應用例子
經過了以上的鋪墊,現在你可以使用創建好的CountingActor了,首先你需要在你的業務類中注入ActorSystem和SpringExt。
@Autowired
private ActorSystem actorSystem;
@Autowired
private SpringExt springExt;</code></pre>
然后我們使用CountingActor進行計數,代碼如下:
ActorRef counter = actorSystem.actorOf(springExt.props("CountingActor"), "counter");
// Create the "actor-in-a-box"
final Inbox inbox = Inbox.create(system);
// tell it to count three times
inbox.send(counter, new Count());
inbox.send(counter, new Count());
inbox.send(counter, new Count());
// print the result
FiniteDuration duration = FiniteDuration.create(3, TimeUnit.SECONDS);
Future<Object> result = ask(counter, new Get(), Timeout.durationToTimeout(duration));
try {
System.out.println("Got back " + Await.result(result, duration));
} catch (Exception e) {
System.err.println("Failed getting result: " + e.getMessage());
throw e;
}
輸出結果為:
Got back 3
總結
本文只是最簡單的Akka集成Spring的例子,Akka的remote、cluster、persistence、router等機制都可以應用。
來自:http://blog.csdn.net/beliefer/article/details/53783936