使用Spark DataFrame進行大數據處理

jopen 10年前發布 | 120K 次閱讀 分布式/云計算/大數據

簡介

    DataFrame讓Spark具備了處理大規模結構化數據的能力,在比原有的RDD轉化方式易用的前提下,計算性能更還快了兩倍。這一個小小的API,隱含著Spark希望大一統「大數據江湖」的野心和決心。DataFrame像是一條聯結所有主流數據源并自動轉化為可并行處理格式的水渠,通過它Spark能取悅大數據生態鏈上的所有玩家,無論是善用R的數據科學家,慣用SQL的商業分析師,還是在意效率和實時性的統計工程師。

例子說明

    提供了將結構化數據為DataFrame并注冊為表,使用SQL查詢的例子

    提供了從RMDB中讀取數據為DataFrame的例子

    提供了將數據寫入到RMDB中的例子

代碼樣例

import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import java.io.PrintWriter
import util.control.Breaks._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.Connection
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import java.util.Properties
import org.apache.spark.sql.SaveMode

object SimpleDemo extends App {
  val sc = new SparkContext("local[*]", "test")
  val sqlc = new SQLContext(sc)
  val driverUrl = "jdbc:mysql://ip:3306/ding?user=root&password=root&zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8"
  val tableName = "tbaclusterresult"

  //把數據轉化為DataFrame,并注冊為一個表
  val df = sqlc.read.json("G:/data/json.txt")
  df.registerTempTable("user")
  val res = sqlc.sql("select * from user")
  println(res.count() + "---------------------------")
  res.collect().map { row =>
    {
      println(row.toString())
    }
  }

  //從MYSQL讀取數據
  val jdbcDF = sqlc.read
    .options(Map("url" -> driverUrl,
      //      "user" -> "root",
      //      "password" -> "root",
      "dbtable" -> tableName))
    .format("jdbc")
    .load()
  println(jdbcDF.count() + "---------------------------")
  jdbcDF.collect().map { row =>
    {
      println(row.toString())
    }
  }

  //插入數據至MYSQL
  val schema = StructType(
    StructField("name", StringType) ::
      StructField("age", IntegerType)
      :: Nil)

  val data1 = sc.parallelize(List(("blog1", 301), ("iteblog", 29),
    ("com", 40), ("bt", 33), ("www", 23))).
    map(item => Row.apply(item._1, item._2))
  import sqlc.implicits._
  val df1 = sqlc.createDataFrame(data1, schema)
  //  df1.write.jdbc(driverUrl, "sparktomysql", new Properties)
  df1.write.mode(SaveMode.Overwrite).jdbc(driverUrl, "testtable", new Properties)

  //DataFrame類中還有insertIntoJDBC方法,調用該函數必須保證表事先存在,它只用于插入數據,函數原型如下:
  //def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit

  //插入數據到MYSQL
  val data = sc.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
  data.foreachPartition(myFun)

  case class Blog(name: String, count: Int)

  def myFun(iterator: Iterator[(String, Int)]): Unit = {
    var conn: Connection = null
    var ps: PreparedStatement = null
    val sql = "insert into blog(name, count) values (?, ?)"
    try {
      conn = DriverManager.getConnection(driverUrl, "root", "root")
      iterator.foreach(data => {
        ps = conn.prepareStatement(sql)
        ps.setString(1, data._1)
        ps.setInt(2, data._2)
        ps.executeUpdate()
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (ps != null) {
        ps.close()
      }
      if (conn != null) {
        conn.close()
      }
    }
  }
}

來自: http://my.oschina.net/cloudcoder/blog/599859

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!