spark用程序提交任務到yarn

jopen 10年前發布 | 186K 次閱讀 分布式/云計算/大數據 Spark

因為spark文檔中只介紹了兩種用腳本提交到yarn的例子,并沒有介紹如何通過程序提交yarn,但是我們的需求需要這樣。網上很難找到例子,經過幾天摸索,終于用程序提交到yarn成功,下面總結一下。

先介紹官網提交的例子,我用的是spark 0.9.0 hadoop2.2.0

一.使用腳本提交

1.使用spark腳本提交到yarn,首先需要將spark所在的主機和hadoop集群之間hosts相互配置(也就是把spark主機的ip和主機名配置到hadoop所有節點的/etc/hosts里面,再把集群所有節點的ip和主機名配置到spark所在主機的/etc/hosts里面)。

2.然后需要把hadoop目錄etc/hadoop下面的*-sit.xml復制到${SPARK_HOME}的conf下面.

3.確保hadoop集群配置了 HADOOP_CONF_DIR or YARN_CONF_DIR 

1.yarn-standalone方式提交到yarn

${SPARK_HOME}下面執行:

SPARK_JAR=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \
    ./bin/spark-class org.apache.spark.deploy.yarn.Client \
      --jar ./examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \
      --class org.apache.spark.examples.SparkPi \
      --args yarn-standalone \
      --num-workers 3 \
      --master-memory 2g \
      --worker-memory 2g \
      --worker-cores 1



2. yarn-client 方式提交到yarn


${SPARK_HOME}下面執行:

SPARK_JAR=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \
SPARK_YARN_APP_JAR=examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \
./bin/run-example org.apache.spark.examples.SparkPi yarn-client



二、使用程序提交


1.必須使用linux主機提交任務,使用windows提交到linux hadoop集群會報


org.apache.hadoop.util.Shell$ExitCodeException: /bin/bash: 第 0 行: fg: 無任務控制


錯誤。hadoop2.2.0不支持windows提交到linux hadoop集群,網上搜索發現這是hadoop的bug。

2.提交任務的主機和hadoop集群主機名需要在hosts相互配置。

3.因為使用程序提交是使用yarn-client方式,所以必須像上面腳本那樣設置環境變量SPARK_JAR 和 SPARK_YARN_APP_JAR

比如我的設置為向提交任務主機~/.bashrc里面添加:


export SPARK_JAR=file:///home/ndyc/software/sparkTest/lib/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar
     export SPARK_YARN_APP_JAR=file:///home/ndyc/software/sparkTest/ndspark-0.0.1.jar


如果不加file:// spark會任務是hdfs,會在hdfs里面找,file://表明是本地文件。


其中SPARK_JAR是${SPARK_HOME}/assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar

SPARK_YARN_APP_JAR是自己程序打的jar包,包含自己的測試程序。

4.程序中加入hadoop、yarn、依賴。

注意,如果引入了hbase依賴,需要這樣配置


<dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-thrift</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

然后再加入


<dependency>
            <groupId>org.ow2.asm</groupId>
            <artifactId>asm-all</artifactId>
            <version>4.0</version>
        </dependency>


否則會報錯:



IncompatibleClassChangeError has interface org.objectweb.asm.ClassVisitor as super class



異常是因為Hbase jar hadoop-mapreduce-client-jobclient.jar里面使用到了asm3.1 而spark需要的是asm-all-4.0.jar

5. hadoop conf下的*-site.xml需要復制到提交主機的classpath下,或者說maven項目resources下面。

6.編寫程序

代碼示例:


package com.sdyc.ndspark.sys;

import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2;

import java.util.ArrayList; import java.util.List;

/**

  • Created with IntelliJ IDEA.
  • User: zarchary
  • Date: 14-1-19
  • Time: 下午6:23
  • To change this template use File | Settings | File Templates. */ public class ListTest {

    public static void main(String[] args) throws Exception {

     SparkConf sparkConf = new SparkConf();
     sparkConf.setAppName("listTest");
     //使用yarn模式提交
     sparkConf.setMaster("yarn-client");
    
     JavaSparkContext sc = new JavaSparkContext(sparkConf);
    
     List<String> listA = new ArrayList<String>();
    
     listA.add("a");
     listA.add("a");
     listA.add("b");
     listA.add("b");
     listA.add("b");
     listA.add("c");
     listA.add("d");
    
     JavaRDD<String> letterA = sc.parallelize(listA);
    
     JavaPairRDD<String, Integer> letterB = letterA.map(new PairFunction<String, String, Integer>() {
         @Override
         public Tuple2<String, Integer> call(String s) throws Exception {
             return new Tuple2<String, Integer>(s, 1);
         }
     });
    
     letterB = letterB.reduceByKey(new Function2<Integer, Integer, Integer>() {
         public Integer call(Integer i1, Integer i2) {
             return i1 + i2;
         }
     });
    
     //顛倒順序
     JavaPairRDD<Integer, String> letterC = letterB.map(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
         @Override
         public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
             return new Tuple2<Integer, String>(stringIntegerTuple2._2, stringIntegerTuple2._1);
         }
     });
    
     JavaPairRDD<Integer, List<String>> letterD = letterC.groupByKey();
    

    // //false說明是降序

     JavaPairRDD<Integer, List<String>> letterE = letterD.sortByKey(false);
    
     System.out.println("========" + letterE.collect());
    
     System.exit(0);
    

    } }</pre>



    代碼中master設置為yar-client表明了是使用提交到yarn.


    關于spark需要依賴的jar的配置可以參考我的博客spark安裝和遠程調用。

    以上弄完之后就可以運行程序了。

    運行后會看到yarn的ui界面出現:


    正在執行的過程中會發現hadoop yarn 有的nodemanage會有下面這個進程:


    13247 org.apache.spark.deploy.yarn.WorkerLauncher


    這是spark的工作進程。


    如果接收到異常為:

    WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

     出現這個錯誤是因為提交任務的節點不能和spark工作節點交互,因為提交完任務后提交任務節點上會起一個進程,展示任務進度,大多端口為4044,工作節點需要反饋進度給該該端口,所以如果主機名或者IP在hosts中配置不正確,就會報

    WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory錯誤。
    所以請檢查主機名和IP是否配置正確。



    我自己的理解為,程序提交任務到yarn后,會上傳SPARK_JAR和SPARK_YARN_APP_JAR到hadoop節點, yarn根據任務情況來分配資源,在nodemanage節點上來啟動org.apache.spark.deploy.yarn.WorkerLauncher工作節點來執行spark任務,執行完成后退出。

    來自:http://my.oschina.net/132722/blog/220096

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!