YARN批處理方式kill Applications解決方案

jopen 9年前發布 | 35K 次閱讀 分布式/云計算/大數據

前言

在使用hadoop集群的時候,所有的任務都是最終以Application的形式跑在集群中,不管你是自己寫的MR程序亦或是你寫的hive sql轉化成的mr任務,最終都是以Application應用的身份在跑.這些Application跑完之后,這些信息在jobHistory中就可以看了,可以說hadoop在這方面做得真的非常完整.但是完善歸完善.但是jobHistory可以說是一種"事后分析",而有的時候我們反而是需要去控制目前所在的Applications.舉個例子,比如某用戶誤啟動了幾十個應用,這些應用又是非常消耗資源的,一般這時候集群的管理者就會想盡辦法去kill這些應用,否則資源都被占了,別人怎么使用.但是比較不幸的一點,目前Yarn在kill應用的操作上還沒有提供批處理的方式,只有最簡單的kill+appID的形式,都是單一的操作,所以會造成最后"應用是kill掉了,但是花了半個小時時間."這樣的結果.所以本文就來分析一下批處理kill應用的解決方案,從應用場景到方案設計再到最終實現,依次進行描述.


應用場景

1.誤操作啟動大量應用.就是文章開頭所描述到的場景.這個在小集群,少用戶的情況下不容易發生,但是一旦集群具備了一定規模,達到多組用戶的時候,這種現象發生的概率就會比較高了.用戶多了,遇到這種問題,必須得解決吧,而且必須得高效解決.

2.第二種情況發生在Application recovery階段,比如說當你的集群中開啟了yarn的recovery功能之后,此時做ResourceManager重啟的時候,他會從rmStateStore中讀取上次保存下來的Application信息,然后進行全部恢復。這個時候如果你不想要恢復上次由于RM服務停止造成中斷的應用,那么你也需要有個方法去批量kill正在此時恢復的應用。這種場景就不是被動發生的,在某些場景下他就會出現,發生。

所以總結上面提到的2個case,1個是偏外部因素,1個是偏內部因素,但是都說明了我們需要1個方法能夠去批量執行kill應用的操作。


現有的Kill Application命令

現有存在于YARN中的kill應用的操作命令很簡單,如下

bin/yarn application -kill <applicationId>
沒用過這個命令的同學可以在自己的測試集群中輸入這個命令玩一下。同時yarn application下還有一些其他的命令參數,輸入下面這個命令可以打出幫助信息

bin/yarn application


批處理Kill Applications CLI設計

CLI是CommandLine的簡稱,就是命令行,上一段中的-kill <applicationId>在Yarn是是被定義為1個ApplicationCLI,是針對Application的命令操作.表現出來的形式是以YarnCli的形式被調用,所以會在前面看到yarn這個前綴.所以我們的1個目標就是創造出一些能進行批處理的執行kill應用的操作命令.但是這里會冒出另外1個問題,如果進行批量執行應用操作,那么這些待kill的應用一定具有某種類似或相同的屬性,否則我們就無法劃分,歸類了.首先需要找出在Yarn中的Application到底有哪些屬性,我截取出了部分代碼:

@Private
public class Application {
  private static final Log LOG = LogFactory.getLog(Application.class);

  private AtomicInteger taskCounter = new AtomicInteger(0);

  private AtomicInteger numAttempts = new AtomicInteger(0);
  final private String user;
  final private String queue;
  final private ApplicationId applicationId;
...
另外一個project中的定義:

public interface Application extends EventHandler<ApplicationEvent> {

  String getUser();

  Map<ContainerId, Container> getContainers();

  ApplicationId getAppId();

  ApplicationState getApplicationState();

}
所以綜合以上這2個類的定義,可以跳出3個比較常見的屬性作為CLI參數,

第一.user,用戶名.

第二.queue,所屬隊列.

第三.applicationState.應用狀態,比如我想kill正在running狀態的應用或是處于Accepted但是還沒開始running的應用,這也是1個不錯的指標維度.

我們最終想要達到的效果是這樣的:

-killByAppStates <States>       The states of application that will be killed.
-killByUser <User name>         Kill running-state applications of specific user.
-killOfQueue <Queue Name>       Kill the applications of specific queue.
這里需要提出一點建議,對于第二個命令-killByUser,因為涉及到直接關聯用戶的操作命令,如果操作者是普通用戶,不一定會有權限去kill別人的應用,所以這個命令更應該放在RmAdminCLI的命令中而不是YarnCLI.這一點在后面的具體代碼實現中會加以區分.


ResourceManager的CLI相關服務

在打算具體實現新的命令接口,需要首先對rm對應的內部對應服務有1個了解.ResourceManager在這里的設計可以說是比較巧妙的,為了避免客戶端命令的服務影響admin超級管理員的命令服務,他將這2套服務進行了分開處理,即AdminService和ClientRMService,然后被RMAdminCLI和ApplicationCLI所遠程調用,具體的結構如下


所以在后面需要改動代碼的地方就是在圖中出現的幾個主要類中.


批處理Kill Application操作實現

對Yarn新增命令行操作,難的并不是他的實現,而是在于他的流程,因為有些代碼是自己生成的,你需要改一些.proto的文件,下面演示一下如何改ApplicationCLI,RMAdminCLI將簡單描述.

第一步.定義rpc方法接口以及請求回復類

import "Security.proto";
import "yarn_service_protos.proto";

service ApplicationClientProtocolService {
  ....
  rpc killApplicationsByAppStates (KillApplicationsByAppStatesRequestProto) returns (KillApplicationsByAppStatesResponseProto);
  rpc killApplicationsOfQueue (KillApplicationsOfQueueRequestProto) returns (KillApplicationsOfQueueResponseProto);
}
...
message KillApplicationsByAppStatesRequestProto {
  repeated YarnApplicationStateProto application_states = 1;
}

message KillApplicationsByAppStatesResponseProto {
  optional bool is_kill_completed = 1 [default = false];
}

message KillApplicationsOfQueueRequestProto {
  required string queue = 1;
}

message KillApplicationsOfQueueResponseProto {
  optional bool is_kill_completed = 1 [default = false];
}
里面不明白的地方,請自行查閱protocolbuffer的使用說明.最后使用編譯命令重新打包,比如"mvn package -Dmaven.test.skip=true",以此生成相應代碼.

第二步.實現Request,Response類以及具體的Impl實現類

比如以KillByAppStates的命令為例子:

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.api.protocolrecords;

import java.util.EnumSet;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.util.Records;

/**
 * <p>
 * The request sent by the client to the <code>ResourceManager</code> to abort
 * the applications by appStates.
 * </p>
 * <p>
 * The client, via {@link KillApplicationsByAppStatesRequest} provides the
 * {@link YarnApplicationState} of the applications to be aborted.
 * </p>
 * 
 * @see ApplicationClientProtocol#killApplicationsByApRpStates(KillApplicationsByAppStatesRequest)
 */
@Public
@Stable
public abstract class KillApplicationsByAppStatesRequest {
  @Private
  @Unstable
  public static KillApplicationsByAppStatesRequest newInstance(
      EnumSet<YarnApplicationState> applicationStates) {
    KillApplicationsByAppStatesRequest request =
        Records.newRecord(KillApplicationsByAppStatesRequest.class);
    request.setApplicationStates(applicationStates);
    return request;
  }

  /**
   * Get the application states to filter applications on
   *
   * @return Set of Application states to filter on
   */
  @Public
  @Stable
  public abstract EnumSet<YarnApplicationState> getApplicationStates();

  /**
   * Set the application states to filter applications on
   *
   * @param applicationStates A Set of Application states to filter on. If not
   *          defined, match all running applications
   */
  @Private
  @Unstable
  public abstract void setApplicationStates(
      EnumSet<YarnApplicationState> applicationStates);
}
public class KillApplicationsByAppStatesRequestPBImpl extends
    KillApplicationsByAppStatesRequest {

  EnumSet<YarnApplicationState> applicationStates = null;

  KillApplicationsByAppStatesRequestProto proto =
      KillApplicationsByAppStatesRequestProto.getDefaultInstance();
  KillApplicationsByAppStatesRequestProto.Builder builder = null;
  boolean viaProto = false;

  public KillApplicationsByAppStatesRequestPBImpl() {
    builder = KillApplicationsByAppStatesRequestProto.newBuilder();
  }

  public KillApplicationsByAppStatesRequestPBImpl(
      KillApplicationsByAppStatesRequestProto proto) {
    this.proto = proto;
    viaProto = true;
  }

  public KillApplicationsByAppStatesRequestProto getProto() {
    mergeLocalToProto();
    proto = viaProto ? proto : builder.build();
    viaProto = true;
    return proto;
  }

  private void mergeLocalToProto() {
    if (viaProto)
      maybeInitBuilder();
    mergeLocalToBuilder();
    proto = builder.build();
    viaProto = true;
  }

  private void mergeLocalToBuilder() {
    if (applicationStates != null && !applicationStates.isEmpty()) {
      builder.clearApplicationStates();
      builder.addAllApplicationStates(Iterables.transform(applicationStates,
          new Function<YarnApplicationState, YarnApplicationStateProto>() {
            @Override
            public YarnApplicationStateProto apply(YarnApplicationState input) {
              return ProtoUtils.convertToProtoFormat(input);
            }
          }));
    }
  }

  private void maybeInitBuilder() {
    if (viaProto || builder == null) {
      builder = KillApplicationsByAppStatesRequestProto.newBuilder(proto);
    }
    viaProto = false;
  }

  private void initApplicationStates() {
    if (this.applicationStates != null) {
      return;
    }
    KillApplicationsByAppStatesRequestProtoOrBuilder p =
        viaProto ? proto : builder;
    List<YarnApplicationStateProto> appStatesList =
        p.getApplicationStatesList();
    this.applicationStates = EnumSet.noneOf(YarnApplicationState.class);

    for (YarnApplicationStateProto c : appStatesList) {
      this.applicationStates.add(ProtoUtils.convertFromProtoFormat(c));
    }
  }

  @Override
  public EnumSet<YarnApplicationState> getApplicationStates() {
    initApplicationStates();
    return this.applicationStates;
  }

  @Override
  public void setApplicationStates(
      EnumSet<YarnApplicationState> applicationStates) {
    maybeInitBuilder();
    if (applicationStates == null) {
      builder.clearApplicationStates();
    }
    this.applicationStates = applicationStates;
  }

  @Override
  public int hashCode() {
    return getProto().hashCode();
  }

  @Override
  public boolean equals(Object other) {
    if (other == null)
      return false;
    if (other.getClass().isAssignableFrom(this.getClass())) {
      return this.getProto().equals(this.getClass().cast(other).getProto());
    }
    return false;
  }

  @Override
  public String toString() {
    return TextFormat.shortDebugString(getProto());
  }
}
Impl實現類中會用到之前pb生成的代碼.

第三步.定義協議方法接口類并實現其子類方法

/**
 * <p>The protocol between clients and the <code>ResourceManager</code>
 * to submit/abort jobs and to get information on applications, cluster metrics,
 * nodes, queues and ACLs.</p> 
 */
@Public
@Stable
public interface ApplicationClientProtocol extends ApplicationBaseProtocol {
   .....

  /**
   * <p>
   * The interface used by clients to request the <code>ResourceManager</code>
   * to abort applications by appStates.
   * </p>
   *
   * <p>
   * The client, via {@link KillApplicationsByAppStatesRequest} provides the
   * {@link YarnApplicationState} of the applications to be aborted.
   * </p>
   *
   * <p>
   * In secure mode,the <code>ResourceManager</code> verifies access to the
   * application, queue etc. before terminating the application.
   * </p>
   *
   * <p>
   * Currently, the <code>ResourceManager</code> returns an empty response on
   * success and throws an exception on rejecting the request.
   * </p>
   *
   * @param request request to abort the application by appStates
   * @return <code>ResourceManager</code> returns an empty response on success
   *         and throws an exception on rejecting the request
   * @throws YarnException
   * @throws IOException
   * @see #getQueueUserAcls(GetQueueUserAclsInfoRequest)
   */
  @Public
  @Stable
  @Idempotent
  public KillApplicationsByAppStatesResponse killApplicationsByAppStates(
      KillApplicationsByAppStatesRequest request) throws YarnException,
      IOException;
}
具體繼承子類,一般就2類,1個是Client對應類,1個是服務端對應類,
@Private
public class ApplicationClientProtocolPBClientImpl implements ApplicationClientProtocol,
    Closeable {

  private ApplicationClientProtocolPB proxy;

  public ApplicationClientProtocolPBClientImpl(long clientVersion,
      InetSocketAddress addr, Configuration conf) throws IOException {
    RPC.setProtocolEngine(conf, ApplicationClientProtocolPB.class,
      ProtobufRpcEngine.class);
    proxy = RPC.getProxy(ApplicationClientProtocolPB.class, clientVersion, addr, conf);
  }

  @Override
  public void close() {
    if (this.proxy != null) {
      RPC.stopProxy(this.proxy);
    }
  }

  ...

  @Override
  public KillApplicationsByAppStatesResponse killApplicationsByAppStates(
      KillApplicationsByAppStatesRequest request) throws YarnException,
      IOException {
    KillApplicationsByAppStatesRequestProto requestProto =
        ((KillApplicationsByAppStatesRequestPBImpl) request).getProto();
    try {
      return new KillApplicationsByAppStatesResponsePBImpl(
          proxy.killApplicationsByAppStates(null, requestProto));
    } catch (ServiceException e) {
      RPCUtil.unwrapAndThrowException(e);
      return null;
    }
  }
}
服務端對應2個,1個是PbClient對應的PbServer還有1個真正操作的服務端,在這里就是ClientRMService

public class ApplicationClientProtocolPBServiceImpl implements ApplicationClientProtocolPB {

  private ApplicationClientProtocol real;

  public ApplicationClientProtocolPBServiceImpl(ApplicationClientProtocol impl) {
    this.real = impl;
  }

  ...
  @Override
  public KillApplicationsByAppStatesResponseProto killApplicationsByAppStates(
      RpcController controller, KillApplicationsByAppStatesRequestProto proto)
      throws ServiceException {
    KillApplicationsByAppStatesRequestPBImpl request =
        new KillApplicationsByAppStatesRequestPBImpl(proto);
    try {
      KillApplicationsByAppStatesResponse response =
          real.killApplicationsByAppStates(request);
      return ((KillApplicationsByAppStatesResponsePBImpl) response).getProto();
    } catch (YarnException e) {
      throw new ServiceException(e);
    } catch (IOException e) {
      throw new ServiceException(e);
    }
  }
}
ClientRMService中的實現:

/**
 * The client interface to the Resource Manager. This module handles all the rpc
 * interfaces to the resource manager from the client.
 */
public class ClientRMService extends AbstractService implements
    ApplicationClientProtocol {
  ....

  @SuppressWarnings("unchecked")
  @Override
  public KillApplicationsByAppStatesResponse killApplicationsByAppStates(
      KillApplicationsByAppStatesRequest request) throws YarnException,
      IOException {
    UserGroupInformation callerUGI;
    try {
      callerUGI = UserGroupInformation.getCurrentUser();
    } catch (IOException ie) {
      LOG.info("Error getting UGI ", ie);
      RMAuditLogger.logFailure("UNKNOWN", AuditConstants.KILL_APP_REQUEST,
          "UNKNOWN", "ClientRMService", "Error getting UGI");
      throw RPCUtil.getRemoteException(ie);
    }

    EnumSet<YarnApplicationState> appStates = request.getApplicationStates();
    List<ApplicationReport> reports =
        getApplicationReportListByAppStates(callerUGI, appStates);
    List<ApplicationId> applicationIds = new ArrayList<ApplicationId>();

    if (reports != null && reports.size() > 0) {
      for (ApplicationReport report : reports) {
        applicationIds.add(report.getApplicationId());
      }
    }

    for (ApplicationId appId : applicationIds) {
      RMApp application = this.rmContext.getRMApps().get(appId);
      if (application == null) {
        RMAuditLogger.logFailure(callerUGI.getUserName(),
            AuditConstants.KILL_APP_REQUEST, "UNKNOWN", "ClientRMService",
            "Trying to kill an absent application", appId);
        throw new ApplicationNotFoundException("Trying to kill an absent"
            + " application " + appId);
      }

      if (!checkAccess(callerUGI, application.getUser(),
          ApplicationAccessType.MODIFY_APP, application)) {
        RMAuditLogger.logFailure(callerUGI.getShortUserName(),
            AuditConstants.KILL_APP_REQUEST,
            "User doesn't have permissions to "
                + ApplicationAccessType.MODIFY_APP.toString(),
            "ClientRMService", AuditConstants.UNAUTHORIZED_USER, appId);
        throw RPCUtil.getRemoteException(new AccessControlException("User "
            + callerUGI.getShortUserName() + " cannot perform operation "
            + ApplicationAccessType.MODIFY_APP.name() + " on " + appId));
      }

      if (application.isAppFinalStateStored()) {
        RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
            AuditConstants.KILL_APP_REQUEST, "ClientRMService", appId);
        continue;
      }

      this.rmContext.getDispatcher().getEventHandler()
          .handle(new RMAppEvent(appId, RMAppEventType.KILL));
    }

    return KillApplicationsByAppStatesResponse.newInstance(true);
  }
  ...
可以看到ClientRMService也是繼承了ApplicationClientPtotocol,只要有新的客戶端命令,就必然會存在對應的服務端的實現.上面的killApplicationsByAppStates的方法是模仿了原有的kill單一Application方法,如下:

@SuppressWarnings("unchecked")
  @Override
  public KillApplicationResponse forceKillApplication(
      KillApplicationRequest request) throws YarnException {

    ApplicationId applicationId = request.getApplicationId();

    UserGroupInformation callerUGI;
    try {
      callerUGI = UserGroupInformation.getCurrentUser();
    } catch (IOException ie) {
      LOG.info("Error getting UGI ", ie);
      RMAuditLogger.logFailure("UNKNOWN", AuditConstants.KILL_APP_REQUEST,
          "UNKNOWN", "ClientRMService" , "Error getting UGI",
          applicationId);
      throw RPCUtil.getRemoteException(ie);
    }

    RMApp application = this.rmContext.getRMApps().get(applicationId);
    if (application == null) {
      RMAuditLogger.logFailure(callerUGI.getUserName(),
          AuditConstants.KILL_APP_REQUEST, "UNKNOWN", "ClientRMService",
          "Trying to kill an absent application", applicationId);
      throw new ApplicationNotFoundException("Trying to kill an absent"
          + " application " + applicationId);
    }

    if (!checkAccess(callerUGI, application.getUser(),
        ApplicationAccessType.MODIFY_APP, application)) {
      RMAuditLogger.logFailure(callerUGI.getShortUserName(),
          AuditConstants.KILL_APP_REQUEST,
          "User doesn't have permissions to "
              + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
          AuditConstants.UNAUTHORIZED_USER, applicationId);
      throw RPCUtil.getRemoteException(new AccessControlException("User "
          + callerUGI.getShortUserName() + " cannot perform operation "
          + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
    }

    if (application.isAppFinalStateStored()) {
      RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
          AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId);
      return KillApplicationResponse.newInstance(true);
    }

    this.rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId, RMAppEventType.KILL));

    // For UnmanagedAMs, return true so they don't retry
    return KillApplicationResponse.newInstance(
        application.getApplicationSubmissionContext().getUnmanagedAM());
  }
最終都是把包裝好后的kill event事件發給對應的處理分發器.

第四步.編寫客戶端命令類并提供對應處理方法

...
      Option killAppStateOpt =
          new Option(APP_KILL_BY_APPSTATES_CMD, true,
              "The states of application that will be killed"
                  + ", input comma-separated list of application states.");
      killAppStateOpt.setValueSeparator(',');
      killAppStateOpt.setArgs(Option.UNLIMITED_VALUES);
      killAppStateOpt.setArgName("States");
      opts.addOption(killAppStateOpt);
      ...
....
    } else if (cliParser.hasOption(APP_KILL_BY_APPSTATES_CMD)) {
      if (args.length != 3) {
        printUsage(title, opts);
        return exitCode;
      }

      EnumSet<YarnApplicationState> appStates =
          EnumSet.noneOf(YarnApplicationState.class);
      String[] states = cliParser.getOptionValues(APP_KILL_BY_APPSTATES_CMD);
      if (states != null) {
        for (String state : states) {
          if (!state.trim().isEmpty()) {
            try {
              appStates.add(YarnApplicationState.valueOf(StringUtils
                  .toUpperCase(state).trim()));
            } catch (IllegalArgumentException ex) {
              sysout.println("The application state " + state + " is invalid.");
              sysout.println(getAllValidApplicationStates());
              return exitCode;
            }
          }
        }
      }

      try {
        killApplicationsByAppStates(appStates);
      } catch (ApplicationNotFoundException e) {
        return exitCode;
      }
    }
....
調用方法如下:
/**
   * Kill the applications by appStates
   *
   * @param appStates
   * @throws IOException
   */
  private void killApplicationsByAppStates(
      EnumSet<YarnApplicationState> appStates) throws YarnException,
      IOException {
    if (appStates == null || appStates.isEmpty()) {
      sysout.println("The appStates should not be null.");
      return;
    }

    if (appStates.contains(YarnApplicationState.FAILED)
        || appStates.contains(YarnApplicationState.KILLED)
        || appStates.contains(YarnApplicationState.FINISHED)) {
      sysout
          .println("The appState should not contain state failed, killed, finished");
      return;
    }

    sysout.println("Killing applications of specific states.");
    client.killApplicationsByAppStates(appStates);
  }
客戶端的被調用方法就是要力求簡單,就傳個參數就可以了,主要操作在Server端執行就OK了.

第五步.編寫單元測試測驗結果

在沒有測試集群的情況下,這不失為1種最好的試驗辦法.對client端和Server端各造一個testcase.

@Test
  public void testKillApplicationsByAppStates() throws Exception {
    ApplicationCLI cli = createAndGetAppCLI();
    EnumSet<YarnApplicationState> appStates =
        EnumSet.noneOf(YarnApplicationState.class);
    appStates.add(YarnApplicationState.RUNNING);
    appStates.add(YarnApplicationState.SUBMITTED);

    int result =
        cli.run(new String[] { "application", "-killByAppStates",
            "RUNNING,SUBMITTED" });
    assertEquals(0, result);
    verify(client).killApplicationsByAppStates(appStates);
    verify(sysOut).println("Killing applications of specific states.");
  }
@Test
  public void testKillApplicationsByAppStates() throws Exception {
    YarnConfiguration conf = new YarnConfiguration();
    MockRM rm = new MockRM();
    rm.init(conf);
    rm.start();

    ClientRMService rmService = rm.getClientRMService();
    GetApplicationsRequest getRequest =
        GetApplicationsRequest.newInstance(EnumSet
            .of(YarnApplicationState.KILLED));

    EnumSet<YarnApplicationState> appStates =
        EnumSet.noneOf(YarnApplicationState.class);
    appStates.add(YarnApplicationState.ACCEPTED);
    RMApp app1 = rm.submitApp(1024);
    RMApp app2 = rm.submitApp(1024);

    assertEquals("Incorrect number of apps in the RM", 0, rmService
        .getApplications(getRequest).getApplicationList().size());

    KillApplicationsByAppStatesRequest killRequest =
        KillApplicationsByAppStatesRequest.newInstance(appStates);

    KillApplicationsByAppStatesResponse killResponse =
        rmService.killApplicationsByAppStates(killRequest);
    assertTrue("Kill the applications successfully",
        killResponse.getIsKillCompleted());
  }
對于其他2個維度,對user和queue都是類似的,這里就不重新進行描述了,具體代碼可以見我下面的patch鏈接.這個新屬性功能我將此建議功能提交開源hadoop社區,編號YARN-4529.


相關鏈接

Issue鏈接:https://issues.apache.org/jira/browse/YARN-4529

Github patch鏈接:https://github.com/linyiqun/open-source-patch/tree/master/yarn/YARN-4529


來自: http://blog.csdn.net/androidlushangderen/article/details/50457963

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!