spark用程序提交任務到yarn
因為spark文檔中只介紹了兩種用腳本提交到yarn的例子,并沒有介紹如何通過程序提交yarn,但是我們的需求需要這樣。網上很難找到例子,經過幾天摸索,終于用程序提交到yarn成功,下面總結一下。
先介紹官網提交的例子,我用的是spark 0.9.0 hadoop2.2.0
一.使用腳本提交
1.使用spark腳本提交到yarn,首先需要將spark所在的主機和hadoop集群之間hosts相互配置(也就是把spark主機的ip和主機名配置到hadoop所有節點的/etc/hosts里面,再把集群所有節點的ip和主機名配置到spark所在主機的/etc/hosts里面)。
2.然后需要把hadoop目錄etc/hadoop下面的*-sit.xml復制到${SPARK_HOME}的conf下面.
3.確保hadoop集群配置了 HADOOP_CONF_DIR or YARN_CONF_DIR
1.yarn-standalone方式提交到yarn
在${SPARK_HOME}下面執行:
SPARK_JAR=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \ ./bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar ./examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \ --class org.apache.spark.examples.SparkPi \ --args yarn-standalone \ --num-workers 3 \ --master-memory 2g \ --worker-memory 2g \ --worker-cores 1
在${SPARK_HOME}下面執行:
SPARK_JAR=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \ SPARK_YARN_APP_JAR=examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \ ./bin/run-example org.apache.spark.examples.SparkPi yarn-client
1.必須使用linux主機提交任務,使用windows提交到linux hadoop集群會報
org.apache.hadoop.util.Shell$ExitCodeException: /bin/bash: 第 0 行: fg: 無任務控制
錯誤。hadoop2.2.0不支持windows提交到linux hadoop集群,網上搜索發現這是hadoop的bug。
2.提交任務的主機和hadoop集群主機名需要在hosts相互配置。
3.因為使用程序提交是使用yarn-client方式,所以必須像上面腳本那樣設置環境變量SPARK_JAR 和 SPARK_YARN_APP_JAR
比如我的設置為向提交任務主機~/.bashrc里面添加:
export SPARK_JAR=file:///home/ndyc/software/sparkTest/lib/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar export SPARK_YARN_APP_JAR=file:///home/ndyc/software/sparkTest/ndspark-0.0.1.jar
其中SPARK_JAR是${SPARK_HOME}/assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar
SPARK_YARN_APP_JAR是自己程序打的jar包,包含自己的測試程序。
4.程序中加入hadoop、yarn、依賴。
注意,如果引入了hbase依賴,需要這樣配置
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-thrift</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> </exclusion> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> </exclusion> </exclusions> </dependency>
然后再加入
<dependency> <groupId>org.ow2.asm</groupId> <artifactId>asm-all</artifactId> <version>4.0</version> </dependency>
IncompatibleClassChangeError has interface org.objectweb.asm.ClassVisitor as super class
異常是因為Hbase jar hadoop-mapreduce-client-jobclient.jar里面使用到了asm3.1 而spark需要的是asm-all-4.0.jar
5. hadoop conf下的*-site.xml需要復制到提交主機的classpath下,或者說maven項目resources下面。
6.編寫程序
代碼示例:
package com.sdyc.ndspark.sys;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2;
import java.util.ArrayList; import java.util.List;
/**
- Created with IntelliJ IDEA.
- User: zarchary
- Date: 14-1-19
- Time: 下午6:23
To change this template use File | Settings | File Templates. */ public class ListTest {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("listTest"); //使用yarn模式提交 sparkConf.setMaster("yarn-client"); JavaSparkContext sc = new JavaSparkContext(sparkConf); List<String> listA = new ArrayList<String>(); listA.add("a"); listA.add("a"); listA.add("b"); listA.add("b"); listA.add("b"); listA.add("c"); listA.add("d"); JavaRDD<String> letterA = sc.parallelize(listA); JavaPairRDD<String, Integer> letterB = letterA.map(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); letterB = letterB.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); //顛倒順序 JavaPairRDD<Integer, String> letterC = letterB.map(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return new Tuple2<Integer, String>(stringIntegerTuple2._2, stringIntegerTuple2._1); } }); JavaPairRDD<Integer, List<String>> letterD = letterC.groupByKey();
// //false說明是降序
JavaPairRDD<Integer, List<String>> letterE = letterD.sortByKey(false); System.out.println("========" + letterE.collect()); System.exit(0);
} }</pre>
代碼中master設置為yar-client表明了是使用提交到yarn.
關于spark需要依賴的jar的配置可以參考我的博客spark安裝和遠程調用。
以上弄完之后就可以運行程序了。
運行后會看到yarn的ui界面出現:
正在執行的過程中會發現hadoop yarn 有的nodemanage會有下面這個進程:
13247 org.apache.spark.deploy.yarn.WorkerLauncher
這是spark的工作進程。
如果接收到異常為:
WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
出現這個錯誤是因為提交任務的節點不能和spark工作節點交互,因為提交完任務后提交任務節點上會起一個進程,展示任務進度,大多端口為4044,工作節點需要反饋進度給該該端口,所以如果主機名或者IP在hosts中配置不正確,就會報
WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory錯誤。
所以請檢查主機名和IP是否配置正確。
我自己的理解為,程序提交任務到yarn后,會上傳SPARK_JAR和SPARK_YARN_APP_JAR到hadoop節點, yarn根據任務情況來分配資源,在nodemanage節點上來啟動org.apache.spark.deploy.yarn.WorkerLauncher工作節點來執行spark任務,執行完成后退出。
來自:http://my.oschina.net/132722/blog/220096