在YARN上運行Spark

jopen 8年前發布 | 23K 次閱讀 YARN Spark 分布式/云計算/大數據

在YARN 上運行 Spark

在Spark0.6.0 版本開始支持 YARN 模式,隨后的版本在逐漸地完善。

在YARN 上啟動 Spark

確保HADOOP_CONF_DIR或YARN_CONF_DIR屬性的值已經指向了Hadoop 集群的配置文件。 Spark 通常使用這些配置信息來向 HDFS 寫入數據和連接到 YARN 資源管理器。這個目錄下所有的文件將會被分發到 YARN 集群中,所以所有應用使用的容器都使用同樣的配置。如果 Java 的系統屬性或 YARN 沒有管理的環境變量等配置,它們應該在 Spark  的應用配置項中配置。

在YARN 上啟動 Spark 有兩種部署模式。在Cluster 模式中, Spark 的 driver 程序運行在被 YARN 管理的集群中的任何一個 master 進程中,并且 client 初始化應用后可以退出。在 Client 模式中, driver 程序運行在 client 進程中,并且這個應用程序的 master 只能被用來從 YARN 上請求資源。

和Spark Standalone 和 Mesos 模式不同的是, master 的地址被指定在 --master 參數中,在 YARN 模式中, ResourceManager 的地址可以在 Hadoop 的配置文件中找到。這樣, --master 的的參數是 yarn 。

在cluster 模式中啟動 Spark 應用程序:

$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]

舉例:

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \

--master yarn \

--deploy-mode cluster \

--driver-memory 4g \

--executor-memory 2g \

--executor-cores 1 \

--queue thequeue \

lib/spark-examples*.jar \

10

上面的應用例子將會啟動一個YARN client 程序,它將會啟動默認的應用 Master 。而 SparkPi 將會作為應用 Master 的一個子線程運行。 client 將會周期性地輪詢應用 Master 來達到轉態的更新并把它們顯示在 console 終端。一旦你的應用程序運行完畢, client 將會退出。

在client 模式中啟動 Spark 應用和 cluster 模式一樣,只是將 cluster 替換為 client 。如下所示:

$ ./bin/spark-shell --master yarn --deploy-mode client

添加其他Jar

在cluster 模式中, driver 程序和 client 在不同的機器上,所以只對于本機的可行的 SparkContext.addJar 將會失效。為了使 client 繼續能使用 SparkContext.addJar, 可以在創建命令時給 --jars 選項賦值。

$ ./bin/spark-submit --class my.main.Class \

--master yarn \

--deploy-mode cluster \

--jars my-other-jar.jar,my-other-other-jar.jar

my-main-jar.jar

app_arg1 app_arg2

預備

在YARN 上運行 Spark 要求一個支持 YARN 的一個二進制發布包。你可以在官網上下載,也可以自己編譯一個。

配置

Spark on YARN 上的許多配置和其他模式基本上一樣。

調試你的應用程序

在YARN 中, executor 和應用 master 運行在“ containers ”(容器)中。 應用程序運行完畢后, YARN 提供了兩種存放容器日志的方式。如果日志聚合服務被開啟的話(通過 yarn.log-aggregation-enable來配置),容器日志將會被拷貝到 HDFS 中并且刪除本機上的日志文件。這些日志文件使用 yarn logs 命令可以在任何一臺集群中的機器看到。如下:

yarn logs -applicationId <app ID>

上面的命令將會打印出應用程序申請到的所有容器日志文件的內容。你也可以通過HDFS shell 或 API 來直接看這些容器文件。這些日志文件的目錄可以查看 YARN 配置( yarn.nodemanager.remote-app-log-dir and yarn.nodemanager.remote-app-log-dir-suffix)。這些日志在 Spark Web UI 的“ Executors ”的選項卡中也能看到。你需要啟動 Spark history server 和 MapReduce history server 并且正確地在 yarn-site.xml配置好 yarn.log.server.url選項。這個Spark history server UI 的日志 URL 將會把重定向到 MapReduce 的 history server ,從而顯示日志信息。

當日志聚合服務關閉時,日志被保留在每臺機器的YARN_APP_LOGS_DIR目錄下,該目錄通常被用來配置為/tmp/logs或$HADOOP_HOME/logs/userlogs,這取決于Hadoop 的版本和安裝。查看一個容器的日志信息需要到對應的主機上的這個目錄下查找。子目錄名稱通過應用 ID 和容器 ID 來構成。這種日志在 Spark WebUI 的 Executors 選項卡中也能看到并且不要求啟動 MapReduce history server ,因為不需要讀取 HDFS 上的數據。

回顧一下每個容器創建的環境,增加yarn.nodemanager.delete.debug-delay-sec到一個大數值(比如36000),并且在容器上創建的節點上的yarn.nodemanager.local-dirs中得到應用程序的緩存。這個目錄包括創建的腳本,JARs 和用于創建每個容器的所有環境變量。它對于調試 classpath 問題是特別有用的。(注意允許這種方式在集群的設置和所有節點的重啟需要管理員權限,這樣的話它宿主機上不可用。)

對每個應用的 master 或 executors 使用自定義的 log4j 配置的話, 請看下:

  • 用spark-submit 上傳一個自己編寫的 log4j.properties 文件,通過 --files 參數把它和應用一起提交。
  • 給每個driver 添加值 -Dlog4j.configuration=<location of configuration file>到spark.driver.extraJavaOptions選項中。注意如果使用該選項的話,該文件需要在所有的節點上都存在。

l 上傳$SPARK_CONF_DIR/log4j.properties文件后,它會和其他的配置一樣自己更新。注意如果多個option 指定時,上面介紹的那種 option 比這種有更高的優先權。

Note that for the first option, both executors and the application master will share the same log4j configuration, which may cause issues when they run on the same node (e.g. trying to write to the same log file).

注意,對于第一種option 而言,所有的 executors 和應用程序 master 將會使用同樣的 log4j 配置,當他們運行在一樣的節點上可能會出問題(例如:寫入到同樣的日志文件中,也就是并發寫,不難理解吧)

如果在 Yarn 中你需要一個合適的位置來存放日志文件,通過在你的 log4j.properties 中配置 spark.yarn.app.container.log.dir,那么 yarn 可以更好的聚合它們并展示。例如:

log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log.對于Streaming 程序而言,配置 RollingFileAppender和yarn 的日志文件目錄 將避免大日志文件造成的磁盤移除,而且,日志也可以很好地被 YARN 使用。

重點提示

  • 在調度決策中主要的資源請求是否得到,取決于正在使用的調度器和它的具體配置。
  • 在cluster 模式中, Spark executors 和 driver 將會使用為 YARN 配置的本地文件目錄( Hadoop YARN 配置項  yarn.nodemanager.local-dirs)。如果使用特定的spark.local.dir,它將會失效。在client 模式中, Spark executors 將會使用 YARN 配置的本地目錄,但 Spark driver 將使用 spark.local.dir選項定義好的。這是因為Client 模式下 Spark driver 只是 Spark 的 executor 在執行,沒有運行在 YARN 集群中。
  • --files和--archives選項支持和 Hadoop 一樣使用 # 來指定文件別名。例如:你可以指定 --files localtest.txt#appSees.txt,那么它將會把本地文件的 localtest.txt 文件上傳到 HDFS 中,可理解為,它在 HDFS 中文件名將是 appSees.tx ,在 YARN 中使用 appSees.txt 文件名即可。
  • --jars 選項,如果你在使用本地文件和運行在cluster 模式時, SparkContext.addJar函數將會起作用。如果你正在用HDFS 、 HTTP 、 HTTPS 、 FTP 的文件時,它不需要。

Spark 屬性配置項,可根據如下列表進行參數的調整:

Property Name Default Meaning
spark.yarn.am.memory 512m Amount of memory to use for the YARN Application Master in client mode, in the same format as JVM memory strings (e.g. 512m , 2g ). In cluster mode, use spark.driver.memory instead.

Use lower-case suffixes, e.g. k , m , g , t , and p , for kibi-, mebi-, gibi-, tebi-, and pebibytes, respectively.

spark.driver.cores 1 Number of cores used by the driver in YARN cluster mode. Since the driver is run in the same JVM as the YARN Application Master in cluster mode, this also controls the cores used by the YARN Application Master. In client mode, use spark.yarn.am.cores to control the number of cores used by the YARN Application Master instead.
spark.yarn.am.cores 1 Number of cores to use for the YARN Application Master in client mode. In cluster mode, use spark.driver.cores instead.
spark.yarn.am.waitTime 100s In cluster mode, time for the YARN Application Master to wait for the SparkContext to be initialized. In client mode, time for the YARN Application Master to wait for the driver to connect to it.
spark.yarn.submit.file.replication The default HDFS replication (usually 3 ) HDFS replication level for the files uploaded into HDFS for the application. These include things like the Spark jar, the app jar, and any distributed cache files/archives.
spark.yarn.preserve.staging.files false Set to true to preserve the staged files (Spark jar, app jar, distributed cache files) at the end of the job rather than delete them.
spark.yarn.scheduler.heartbeat.interval-ms 3000 The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. The value is capped at half the value of YARN's configuration for the expiry interval, i.e. yarn.am.liveness-monitor.expiry-interval-ms .
spark.yarn.scheduler.initial-allocation.interval 200ms The initial interval in which the Spark application master eagerly heartbeats to the YARN ResourceManager when there are pending container allocation requests. It should be no larger than spark.yarn.scheduler.heartbeat.interval-ms . The allocation interval will doubled on successive eager heartbeats if pending containers still exist, until spark.yarn.scheduler.heartbeat.interval-ms is reached.
spark.yarn.max.executor.failures numExecutors * 2, with minimum of 3 The maximum number of executor failures before failing the application.
spark.yarn.historyServer.address (none) The address of the Spark history server, e.g. host.com:18080 . The address should not contain a scheme ( http:// ). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI. For this property, YARN properties can be used as variables, and these are substituted by Spark at runtime. For example, if the Spark history server runs on the same node as the YARN ResourceManager, it can be set to ${hadoopconf-yarn.resourcemanager.hostname}:18080 .
spark.yarn.dist.archives (none) Comma separated list of archives to be extracted into the working directory of each executor.
spark.yarn.dist.files (none) Comma-separated list of files to be placed in the working directory of each executor.
spark.executor.instances 2 The number of executors. Note that this property is incompatible with spark.dynamicAllocation.enabled . If both spark.dynamicAllocation.enabled and spark.executor.instances are specified, dynamic allocation is turned off and the specified number of spark.executor.instances is used.
spark.yarn.executor.memoryOverhead executorMemory * 0.10, with minimum of 384 The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
spark.yarn.driver.memoryOverhead driverMemory * 0.10, with minimum of 384 The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
spark.yarn.am.memoryOverhead AM memory * 0.10, with minimum of 384 Same as spark.yarn.driver.memoryOverhead , but for the YARN Application Master in client mode.
spark.yarn.am.port (random) Port for the YARN Application Master to listen on. In YARN client mode, this is used to communicate between the Spark driver running on a gateway and the YARN Application Master running on YARN. In YARN cluster mode, this is used for the dynamic executor feature, where it handles the kill from the scheduler backend.
spark.yarn.queue default The name of the YARN queue to which the application is submitted.
spark.yarn.jar (none) The location of the Spark jar file, in case overriding the default location is desired. By default, Spark on YARN will use a Spark jar installed locally, but the Spark jar can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an application runs. To point to a jar on HDFS, for example, set this configuration to hdfs:///some/path .
spark.yarn.access.namenodes (none) A comma-separated list of secure HDFS namenodes your Spark application is going to access. For example, spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032 . The Spark application must have access to the namenodes listed and Kerberos must be properly configured to be able to access them (either in the same realm or in a trusted realm). Spark acquires security tokens for each of the namenodes so that the Spark application can access those remote HDFS clusters.
spark.yarn.appMasterEnv.[EnvironmentVariableName] (none) Add the environment variable specified by EnvironmentVariableName to the Application Master process launched on YARN. The user can specify multiple of these and to set multiple environment variables. In cluster mode this controls the environment of the Spark driver and in client mode it only controls the environment of the executor launcher.
spark.yarn.containerLauncherMaxThreads 25 The maximum number of threads to use in the YARN Application Master for launching executor containers.
spark.yarn.am.extraJavaOptions (none) A string of extra JVM options to pass to the YARN Application Master in client mode. In cluster mode, use spark.driver.extraJavaOptions instead.
spark.yarn.am.extraLibraryPath (none) Set a special library path to use when launching the YARN Application Master in client mode.
spark.yarn.maxAppAttempts yarn.resourcemanager.am.max-attempts in YARN The maximum number of attempts that will be made to submit the application. It should be no larger than the global number of max attempts in the YARN configuration.
spark.yarn.am.attemptFailuresValidityInterval (none) Defines the validity interval for AM failure tracking. If the AM has been running for at least the defined interval, the AM failure count will be reset. This feature is not enabled if not configured, and only supported in Hadoop 2.6+.
spark.yarn.submit.waitAppCompletion true In YARN cluster mode, controls whether the client waits to exit until the application completes. If set to true , the client process will stay alive reporting the application's status. Otherwise, the client process will exit after submission.
spark.yarn.am.nodeLabelExpression (none) A YARN node label expression that restricts the set of nodes AM will be scheduled on. Only versions of YARN greater than or equal to 2.6 support node label expressions, so when running against earlier versions, this property will be ignored.
spark.yarn.executor.nodeLabelExpression (none) A YARN node label expression that restricts the set of nodes executors will be scheduled on. Only versions of YARN greater than or equal to 2.6 support node label expressions, so when running against earlier versions, this property will be ignored.
spark.yarn.tags (none) Comma-separated list of strings to pass through as YARN application tags appearing in YARN ApplicationReports, which can be used for filtering when querying YARN apps.
spark.yarn.keytab (none) The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the YARN Application Master via the Secure Distributed Cache, for renewing the login tickets and the delegation tokens periodically. (Works also with the "local" master)
spark.yarn.principal (none) Principal to be used to login to KDC, while running on secure HDFS. (Works also with the "local" master)
spark.yarn.config.gatewayPath (none) A path that is valid on the gateway host (the host where a Spark application is started) but may differ for paths for the same resource in other nodes in the cluster. Coupled with spark.yarn.config.replacementPath

, this is used to support clusters with heterogeneous configurations, so that Spark can correctly launch remote processes.

The replacement path normally will contain a reference to some environment variable exported by YARN (and, thus, visible to Spark containers).

For example, if the gateway node has Hadoop libraries installed on /disk1/hadoop , and the location of the Hadoop install is exported by YARN as the HADOOP_HOME environment variable, setting this value to /disk1/hadoop and the replacement path to $HADOOP_HOME will make sure that paths used to launch remote processes properly reference the local YARN configuration.

spark.yarn.config.replacementPath (none) See spark.yarn.config.gatewayPath .
spark.yarn.security.tokens.${service}.enabled

true

Controls whether to retrieve delegation tokens for non-HDFS services when security is enabled. By default, delegation tokens for all supported services are retrieved when those services are configured, but it's possible to disable that behavior if it somehow conflicts with the application being run.

Currently supported services are: hive , hbase

來自: http://www.cnblogs.com/yourarebest/p/5115512.html

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!