Akka 和 Storm 的設計差異

jopen 9年前發布 | 21K 次閱讀 Storm 分布式/云計算/大數據

原文 http://segmentfault.com/a/1190000003886656

Akka 和 Storm 的設計差異

Akka 和 Storm 都是實現低延時, 高吞吐量計算的重要工具. 不過它們并非完全的競品,

如果說 Akka 是 linux 內核的話, storm 更像是類似 Ubuntu 的發行版.然而 Storm

并非 Akka 的發行版, 或許說 Akka 比作 BSD, Storm 比作 Ubuntu 更合適.

實現的功能差異

Akka 包括了一套 API 和執行引擎.

Storm 除了 API 和執行引擎之外,還包括了監控數據,WEB界面,集群管理,消息傳遞保障機制.

此文討論 Akka 和 Storm 重合的部分,也就是 API 和 執行引擎的異同.

API 差異

我們看下 Storm 兩個主要的 API

public interface ISpout extends Serializable {
    /**
     * Called when a task for this component is initialized within a worker on the cluster.
     * It provides the spout with the environment in which the spout executes.
     *
     * <p>This includes the:</p>
     *
     * @param conf The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster configuration on this machine.
     * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc.
     * @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The collector is thread-safe and should be saved as an instance variable of this spout object.
     */
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

    /**
     * Called when an ISpout is going to be shutdown. There is no guarentee that close
     * will be called, because the supervisor kill -9's worker processes on the cluster.
     *
     * <p>The one context where close is guaranteed to be called is a topology is
     * killed when running Storm in local mode.</p>
     */
    void close();

    /**
     * Called when a spout has been activated out of a deactivated mode.
     * nextTuple will be called on this spout soon. A spout can become activated
     * after having been deactivated when the topology is manipulated using the
     * `storm` client.
     */
    void activate();

    /**
     * Called when a spout has been deactivated. nextTuple will not be called while
     * a spout is deactivated. The spout may or may not be reactivated in the future.
     */
    void deactivate();

    /**
     * When this method is called, Storm is requesting that the Spout emit tuples to the
     * output collector. This method should be non-blocking, so if the Spout has no tuples
     * to emit, this method should return. nextTuple, ack, and fail are all called in a tight
     * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous
     * to have nextTuple sleep for a short amount of time (like a single millisecond)
     * so as not to waste too much CPU.
     */
    void nextTuple();

    /**
     * Storm has determined that the tuple emitted by this spout with the msgId identifier
     * has been fully processed. Typically, an implementation of this method will take that
     * message off the queue and prevent it from being replayed.
     */
    void ack(Object msgId);

    /**
     * The tuple emitted by this spout with the msgId identifier has failed to be
     * fully processed. Typically, an implementation of this method will put that
     * message back on the queue to be replayed at a later time.
     */
    void fail(Object msgId);
}

以及

public interface IBasicBolt extends IComponent {
    void prepare(Map stormConf, TopologyContext context);
    /**
     * Process the input tuple and optionally emit new tuples based on the input tuple.
     *
     * All acking is managed for you. Throw a FailedException if you want to fail the tuple.
     */
    void execute(Tuple input, BasicOutputCollector collector);
    void cleanup();
}

和 akka 中 actor 的 api

trait Actor {

  import Actor._

  // to make type Receive known in subclasses without import
  type Receive = Actor.Receive

  /**
   * Stores the context for this actor, including self, and sender.
   * It is implicit to support operations such as `forward`.
   *
   * WARNING: Only valid within the Actor itself, so do not close over it and
   * publish it to other threads!
   *
   * [[akka.actor.ActorContext]] is the Scala API. `getContext` returns a
   * [[akka.actor.UntypedActorContext]], which is the Java API of the actor
   * context.
   */
  implicit val context: ActorContext = {
    val contextStack = ActorCell.contextStack.get
    if ((contextStack.isEmpty) || (contextStack.head eq null))
      throw ActorInitializationException(
        s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " +
          "You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.")
    val c = contextStack.head
    ActorCell.contextStack.set(null :: contextStack)
    c
  }

  /**
   * The 'self' field holds the ActorRef for this actor.
   * <p/>
   * Can be used to send messages to itself:
   * <pre>
   * self ! message
   * </pre>
   */
  implicit final val self = context.self //MUST BE A VAL, TRUST ME

  /**
   * The reference sender Actor of the last received message.
   * Is defined if the message was sent from another Actor,
   * else `deadLetters` in [[akka.actor.ActorSystem]].
   *
   * WARNING: Only valid within the Actor itself, so do not close over it and
   * publish it to other threads!
   */
  final def sender(): ActorRef = context.sender()

  /**
   * This defines the initial actor behavior, it must return a partial function
   * with the actor logic.
   */
  //#receive
  def receive: Actor.Receive
  //#receive

  /**
   * INTERNAL API.
   *
   * Can be overridden to intercept calls to this actor's current behavior.
   *
   * @param receive current behavior.
   * @param msg current message.
   */
  protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled)

  /**
   * Can be overridden to intercept calls to `preStart`. Calls `preStart` by default.
   */
  protected[akka] def aroundPreStart(): Unit = preStart()

  /**
   * Can be overridden to intercept calls to `postStop`. Calls `postStop` by default.
   */
  protected[akka] def aroundPostStop(): Unit = postStop()

  /**
   * Can be overridden to intercept calls to `preRestart`. Calls `preRestart` by default.
   */
  protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = preRestart(reason, message)

  /**
   * Can be overridden to intercept calls to `postRestart`. Calls `postRestart` by default.
   */
  protected[akka] def aroundPostRestart(reason: Throwable): Unit = postRestart(reason)

  /**
   * User overridable definition the strategy to use for supervising
   * child actors.
   */
  def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy

  /**
   * User overridable callback.
   * <p/>
   * Is called when an Actor is started.
   * Actors are automatically started asynchronously when created.
   * Empty default implementation.
   */
  @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
  //#lifecycle-hooks
  def preStart(): Unit = ()

  //#lifecycle-hooks

  /**
   * User overridable callback.
   * <p/>
   * Is called asynchronously after 'actor.stop()' is invoked.
   * Empty default implementation.
   */
  @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
  //#lifecycle-hooks
  def postStop(): Unit = ()

  //#lifecycle-hooks

  /**
   * User overridable callback: '''By default it disposes of all children and then calls `postStop()`.'''
   * @param reason the Throwable that caused the restart to happen
   * @param message optionally the current message the actor processed when failing, if applicable
   * <p/>
   * Is called on a crashed Actor right BEFORE it is restarted to allow clean
   * up of resources before Actor is terminated.
   */
  @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
  //#lifecycle-hooks
  def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    context.children foreach { child ?
      context.unwatch(child)
      context.stop(child)
    }
    postStop()
  }

  //#lifecycle-hooks

  /**
   * User overridable callback: By default it calls `preStart()`.
   * @param reason the Throwable that caused the restart to happen
   * <p/>
   * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
   */
  @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
  //#lifecycle-hooks
  def postRestart(reason: Throwable): Unit = {
    preStart()
  }
  //#lifecycle-hooks

  /**
   * User overridable callback.
   * <p/>
   * Is called when a message isn't handled by the current behavior of the actor
   * by default it fails with either a [[akka.actor.DeathPactException]] (in
   * case of an unhandled [[akka.actor.Terminated]] message) or publishes an [[akka.actor.UnhandledMessage]]
   * to the actor's system's [[akka.event.EventStream]]
   */
  def unhandled(message: Any): Unit = {
    message match {
      case Terminated(dead) ? throw new DeathPactException(dead)
      case _                ? context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
    }
  }
}

可以說 Storm 主要的 API 和 Actor 非常相像, 不過從時間線上看 Storm 和 Akka

都是從差不多的時間開始開發的,因此很有可能 Storm 是作者受了 Erlang 的 Actor 實現啟發而寫的.

從目前的狀況看來, 很有可能作者想用 Clojure 語言寫一個"樸素"的 Actor 實現, 然而這個"樸素"實現已經滿足了 Storm 的設計目標, 所以作者也沒有繼續把 Storm 變成一個 Actor 在 clojure 上的完整實現.

那么,僅僅是從 API 上看的話 Spout/Bolt 和 Actor 的差異有哪些呢?

Storm API 比 Actor 多的地方

Storm 在 API 上比 Actor 多了 ack 和 fail 兩個接口. 有這兩個接口主要是因為 Storm 比 Akka 的應用場景更加細分(基本上只是用于統計), 所以已經做好了容錯機制,能讓在這個細分領域的用戶達到開箱可用.

另外,在 Storm 的 Tuple 類中存儲著一些 context 信息,也是出于目標使用場景的需求封裝的.

Actor API 比 Storm 多的地方

context: Spout 的 open 方法里也有 context, 然而 context 在 actor 中是隨時可以調用的,表明 Actor 比 Spout 更加鼓勵用戶使用 context, context 中的數據也會動態更新.

self: Actor對自身的引用,可以理解為 Actor 模型更加支持下游收到數據的組件往上游回發數據的行為,甚至自己對自己發數據也可以.在 Storm 中,我們默認數據發送是單向的,下游接收的組件不會對上游有反饋(除了系統定義的ack,和fail)postRestart: 區分 Actor 的第一次啟動和重啟, 還是蠻有用的,Storm 沒有應該是最初懶得寫或者沒想到,后來又不想改核心 API.

unhandled: 對沒有預期到會發送給自身的消息做處理,默認是傳到一個系統 stream,因為 Actor 本身是開放的,外部應用只要知道這個 Actor 的地址就能發消息給它.Storm 本身只接收你為它設計好的消息,所以沒有這個需求.

運行時差異

Actor 和 Task 的比較, 線程調度模型的不同, 以及代碼熱部署,Storm 的 ack 機制對異步代碼的限制等.

Actor 和 Component 的比較

Component 是 Spout 和 Bolt 的總稱,是 Storm 中執行用戶代碼的基本組件. 共同點是都根據消息做出響應,也能夠存儲內容,一次只有一個線程進入,除非你手動另外開啟線程.主要的區別在于 Actor 是非常輕量的組件,你可以在一個程序里創建幾萬個 Actor, 或者每十行代碼都在一個 Actor 里, 這都沒有問題. 然而換成 Storm 的Component, 情況都不一樣了,你最好只用若干個 Component 來描述頂層抽象.

線程調度模型

API 很相似,為什么 Actor 可以隨便開新的, Component 就要盡量少開呢? 秘密都在 Akka 的 調度器 (Dispatchers)里. Akka 程序的所有異步代碼,包括 Actor , Future , Runnable 甚至 ParIterable ,可以說除了你要用主線程啟動ActorSystem外,其他所有線程都可以交給Dispatcher管理.Dispatcher 可以自定義,默認的情況下采用了 "fork-join-executor",相對于一般的線程池,fork-join-executor 特別適合 Actor模型,可以提供相當優異的性能.

相比較的, Storm 的線程調度模型就要"樸素"很多,就是每個 Component 一個線程,或者若干個Component輪流共用一個線程,這也就是為什么Component不能開太多的原因.

代碼熱部署

實時計算方面,熱部署的需求主要是諸如修改排序算法之類的,替換某個算法模塊,其他東西不變.

因為 Storm 是可以通過 Thrift 支持任何語言編程的,所以你如果是用python之類的腳本語言寫的算法,想要換掉算法而不重啟,那只要把每臺機器上相應位置的py文件替換掉就好了.不過這樣就會讓程序限定在用此類語言實現.

Akka 方面, 因為 Actor 模型對進程內和進程間的通信接口都是統一的, 可以負責算法的一類 Actor 作為單獨的進程啟動,代碼更新了就重啟這個進程. 雖然系統中有一個進程重啟了,但是整個系統還是可以一刻不停地運轉.

Storm 中的 Ack 機制

Storm 的 消息保障機制 是具有獨創性的, 利用位亦或能夠用非常小的內存,高性能地掌握數據處理過程中的成功或失敗情況. 默認的情況下,在用戶的代碼中只需要指定一個MessageId, Ack 機制就能愉快地跑起來了. 所以通常用戶不用關心這塊內容, 但是默認接口的問題就是, 一旦使用了異步程序, ack 機制就會失效,包括 schedule 和 submit runnable 等行為,都不會被 Ack 機制關心,也就是說異步邏輯執行失敗了,acker也不知道. 如何能讓 Storm 的 Ack 機制與異步代碼和諧相處,還是一個待探討的問題.

總結

我認為 Storm 的 API 是優秀的, 可靠性也是在若干年的實踐中得到證實的, 然而其核心運轉機制過于樸素又給人一種烈士暮年的感覺. Storm 最初的使用者 推ter 也在不久前公布了他們兼容 Storm 接口的新的解決方案 Heron , 不過并沒有開源. 如果有開源方案能夠基于 Akka "重新實現" 一個 Storm,那將是非常令人期待的事情. 我目前發現 gearpump 是其中一個.

參考資料

Akka Dispatcher

[Akka and the Java Memory Model]( http://doc.akka.io/docs/akka/snapshot/general/jmm.html

)

A Java Fork/Join Framework

[聊聊并發(三)——JAVA線程池的分析和使用

]( http://www.infoq.com/cn/articles/java-threadPool)

Java Tip: When to use ForkJoinPool vs ExecutorService

Akka VS Storm

Guaranteeing Message Processing

Storm Source Code

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!