Spring與Akka的集成

CandelariaB 7年前發布 | 35K 次閱讀 Spring JEE框架 AKKA

概述

近年來隨著Spark的火熱,Spark本身使用的開發語言Scala、用到的分布式內存文件系統Tachyon(現已更名為Alluxio)以及基于Actor并發編程模型的Akka。了解過Akka或者Actor的人應該知道,這的確是一個很不錯的框架,按照Akka官網的描述——使用Akka使得構建強有力的并發與分布式應用將更加容易。由于歷史原因,很多Web系統在開發分布式服務時首先會選擇RMI(Remote Method Invoke ,遠程方法調用)、RPC(Remote Procedure Call Protocol,遠程過程調用)或者使用JMS(Java Messaging Service,Java消息服務)。

但是使用RMI只能使用java語言,而且開發、執行效率都不高;RPC框架雖然可以通過匹配方法簽名的方式比RMI更靈活,但是其存在調用超時、調用丟失等缺點;JMS方式雖然可以通過At Least Delivery Once、消息持久化等機制保證消息不會丟失,但是只能作為一種跨服務的生產者、消費者編程模型使用。Akka不但處理了以上問題,而且還可以使用Actor作為并發編程模型,減少java多線程編程的阻塞、調度、上下文開銷甚至死鎖等問題。此外,Akka還提供了集群Sharding、流處理等功能的支持,更易于實現有限狀態自動機等功能。所以有心的開發者勢必會關心如何在最常見的Java系統中使用它,如何與Spring集成?

本文參考Akka官方使用文檔,根據自身的經驗和理解,提供Akka與Spring集成的方案。本文不說明Spring框架的具體使用,并從Spring已經配置完備的情況開始敘述。

Actor系統——ActorSystem

什么是ActorSystem?根據Akka官網的描述——ActorSystem是一個重量級的結構體,可以用于分配1到N個線程,所以每個應用都需要創建一個ActorSystem。通常而言,使用以下代碼來創建ActorSystem。

ActorSystem system = ActorSystem.create("Hello");

不過對于接入Spring而言,由IOC(Inversion of Control,控制反轉)方式會更接地氣,你可以這樣:

<!-- AKKA System Setup -->
    <bean id="actorSystem" class="akka.actor.ActorSystem" factory-method="create" destroy-method="shutdown" scope="singleton">
        <constructor-arg value="helloAkkaSystem"/>
    </bean>

然后在你需要的地方依賴注入即可。

Actor編程模型

在最新的Scala官方網站上已經決定廢棄Scala自身的Actor編程模型,轉而全面擁抱Akka提供的Actor編程模型。

我們可以通過以下代碼(代碼片段借用了Akka官網的例子)創建一個簡單的Actor例子。

Greeter是代表問候者的Actor:

public class Greeter extends UntypedActor {

public static enum Msg { GREET, DONE; }

@Override public void onReceive(Object msg) { if (msg == Msg.GREET) { System.out.println("Hello World!"); getSender().tell(Msg.DONE, getSelf()); } else unhandled(msg); }

}</code></pre>

一般情況下我們的Actor都需要繼承自UntypedActor,并實現其onReceive方法。onReceive用于接收消息,你可以在其中實現對消息的匹配并做不同的處理。

HelloWorld是用于向Greeter發送問候消息的訪客:

public class HelloWorld extends UntypedActor {

@Override public void preStart() { // create the greeter actor final ActorRef greeter = getContext().actorOf(Props.create(Greeter.class), "greeter"); // tell it to perform the greeting greeter.tell(Greeter.Msg.GREET, getSelf()); }

@Override public void onReceive(Object msg) { if (msg == Greeter.Msg.DONE) { // when the greeter is done, stop this actor and with it the application getContext().stop(getSelf()); } else unhandled(msg); } }</code></pre>

有了Actor之后,我們可以這樣使用它:

ActorRef a = system.actorOf(Props.create(HelloWorld.class), "helloWorld");

在HelloWorld的preStart實現中,獲取了Greeter的ActorRef(Actor的引用)并向Greeter發送了問候的消息,Greeter收到問候消息后,會先打印Hello World!,然后向HelloWorld回復完成的消息,HelloWorld得知Greeter完成了向世界問好這個偉大的任務后,就結束了自己的生命。HelloWorld的例子用編程API的方式告訴了我們如何使用Actor及發送、接收消息。為了便于描述與Spring的集成,下面再介紹一個例子。

CountingActor(代碼主體借用自Akka官網)是用于計數的Actor,見代碼清單1所示。

代碼清單1

@Named("CountingActor")
@Scope("prototype")
public class CountingActor extends UntypedActor {

public static class Count {
}

public static class Get {
}

// the service that will be automatically injected
@Resource
private CountingService countingService;

private int count = 0;

@Override
public void onReceive(Object message) throws Exception {
    if (message instanceof Count) {
        count = countingService.increment(count);
    } else if (message instanceof Get) {
        getSender().tell(count, getSelf());
    } else {
        unhandled(message);
    }
}

}</code></pre>

CountingActor用于接收Count消息進行計數,接收Get消息回復給發送者當前的計數值。CountingService是用于計數的接口,其定義如下:

public interface CountingService {

/**
 * 計數
 * @param count
 * @return
 */
int increment(int count);

}</code></pre>

CountingService的具體實現是CountingServiceImpl,其實現如下:

@Service("countingService")
public class CountingServiceImpl implements CountingService {

private static Logger logger = LoggerFactory.getLogger(CountingServiceImpl.class);

/*
 * (non-Javadoc)
 * 
 * @see com.elong.sentosa.metadata.service.CountingService#increment(int)
 */
@Override
public int increment(int count) {
    logger.info("increase " + count + "by 1.");
    return count + 1;
}

}</code></pre>

CountingActor的計數實際是由CountingService完成,并且CountingService通過注解方式注入了CountingActor。

細心的同學可能發現了CountingActor使用了注解Named,這里為什么沒有使用@Service或者@Component等注解呢?由于Akka的Actor在初始化的時候必須使用System或者Context的工廠方法actorOf創建新的Actor實例,不能使用構造器來初始化,而使用Spring的Service或者Component注解,會導致使用構造器初始化Actor,所以會拋出以下異常:

akka.actor.ActorInitializationException: You cannot create an instance of [com.elong.metadata.akka.actor.CountingActor] explicitly using the constructor (new). You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.

如果我們不能使用@Service或者@Component,也不能使用XML配置的方式使用(與注解一個道理),那么我們如何使用CountingActor提供的服務呢?

IndirectActorProducer接口

IndirectActorProducer是Akka提供的Actor生成接口,從其名字我們知道Akka給我們指出了另一條道路——石頭大了繞著走!通過實現IndirectActorProducer接口我們可以定制一些Actor的生成方式,與Spring集成可以這樣實現它,見代碼清單2所示。

代碼清單2

public class SpringActorProducer implements IndirectActorProducer {
    private final ApplicationContext applicationContext;
    private final String actorBeanName;
    private final Object[] args;

public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName, Object ... args) {
    this.applicationContext = applicationContext;
    this.actorBeanName = actorBeanName;
    this.args = args;
}

public Actor produce() {
    return (Actor) applicationContext.getBean(actorBeanName, args);
}

public Class<? extends Actor> actorClass() {
    return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
}

}</code></pre>

SpringActorProducer的實現主要借鑒了Akka官方文檔,我這里對其作了一些擴展以便于支持構造器帶有多個參數的情況。從其實現看到實際是利用了ApplicationContext提供的getBean方式實例化Actor。

這里還有兩個問題:一、ApplicationContext如何獲取和設置?二、如何使用SpringActorProducer生成Spring需要的Actor實例?

對于第一個問題,我們可以通過封裝SpringActorProducer并實現ApplicationContextAware接口的方式獲取ApplicationContext;對于第二個問題,我們知道Akka中的所有Actor實例都是以Props作為配置參數開始的,這里以SpringActorProducer為代理生成我們需要的Actor的Props。

SpringExt實現了以上思路,見代碼清單3所示。

代碼清單3

@Component("springExt")
public class SpringExt implements Extension, ApplicationContextAware {

private ApplicationContext applicationContext;

/**
 * Create a Props for the specified actorBeanName using the
 * SpringActorProducer class.
 *
 * @param actorBeanName
 *            The name of the actor bean to create Props for
 * @return a Props that will create the named actor bean using Spring
 */
public Props props(String actorBeanName, Object ... args) {
    return Props.create(SpringActorProducer.class, applicationContext, actorBeanName, args);
}

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.applicationContext = applicationContext;
}

}</code></pre>

應用例子

經過了以上的鋪墊,現在你可以使用創建好的CountingActor了,首先你需要在你的業務類中注入ActorSystem和SpringExt。

@Autowired
    private ActorSystem actorSystem;

@Autowired
private SpringExt springExt;</code></pre> 

然后我們使用CountingActor進行計數,代碼如下:

ActorRef counter = actorSystem.actorOf(springExt.props("CountingActor"), "counter");

    // Create the "actor-in-a-box"
        final Inbox inbox = Inbox.create(system);

    // tell it to count three times
        inbox.send(counter, new Count());
        inbox.send(counter, new Count());
        inbox.send(counter, new Count());

    // print the result
    FiniteDuration duration = FiniteDuration.create(3, TimeUnit.SECONDS);
    Future<Object> result = ask(counter, new Get(), Timeout.durationToTimeout(duration));
    try {
        System.out.println("Got back " + Await.result(result, duration));
    } catch (Exception e) {
        System.err.println("Failed getting result: " + e.getMessage());
        throw e;
    }

輸出結果為:

Got back 3

總結

本文只是最簡單的Akka集成Spring的例子,Akka的remote、cluster、persistence、router等機制都可以應用。

 

 

來自:http://blog.csdn.net/beliefer/article/details/53783936

 

 本文由用戶 CandelariaB 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!