Spark2: 對RDD進行編程系列
首先,什么是RDD?
1 官方定義
Resilient Distributed Dataset (RDD)
Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster.其實就是說一個數據集,比如吧,一個100G的大文件,就是一個RDD,但是它是分布式的,
也就是說會分成若干塊,每塊會存在于集群中的一個或者多個節點上。
簡單來說,就是分而存之。
2 持久化
只要你需要,你可以把這個RDD持久化,語法就是 RDD.persist()。
RDD中的一下概念
Transformations are operations on RDDs that return a new RDD.比如
val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))
這里只是重新生成了一個RDD集合,如果你在inputRDD基礎上生成了2個集合,
你可以用union()來達到并集的目的!
Actions
比如
println("Here are 10 examples:")badLinesRDD.take(10).foreach(println)
意思就是取出前10條分別打印!
collect()可以用來檢索整個RDD,但是保證結果可以放在一個機器的內存里,所以collect()不適合處理大量的數據集。
saveAsTextFileaction和 saveAsSequenceFile可以用來保存文件到外部文件系統中!
----------示意圖
再來幾個scala的例子
class SearchFunctions(val query: String) {
def isMatch(s: String): Boolean = {
s.contains(query)
}
def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {
// Problem: "isMatch" means "this.isMatch", so we pass all of "this"
rdd.map(isMatch)
}
def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {
// Problem: "query" means "this.query", so we pass all of "this"
rdd.map(x => x.split(query))
}
def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
// Safe: extract just the field we need into a local variable
val query_ = this.query
rdd.map(x => x.split(query_))
}
}
---
關于map
The map transformation takes in a function and applies it to each
element in the RDD with the result of the function being the new value of each element
in the resulting RDD.
意思很簡單,自己體會即可!
---
關于map和 filter
例子:
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x*x)
println(result.collect())
例子2:
val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first() // returns "hello"
======================================
其它一些操作示意圖:
union包含重復的,intersection去掉重復的
也可以做一個笛卡爾乘積:
來自:http://my.oschina.net/qiangzigege/blog/314140