更新時間:2021年03月23日14時14分 來源:傳智教育 瀏覽次數(shù):
在Spark2.0版本之前,Spark SQL中的SQLContext是創(chuàng)建DataFrame和執(zhí)行SQL的入口,我們可以利用HiveContext接口,通過HiveQL語句操作Hive表數(shù)據(jù),實現(xiàn)數(shù)據(jù)查詢功能。而在Spark2.0之后,Spark使用全新的SparkSession接口替代SQLContext及HiveContext接口完成數(shù)據(jù)的加載、轉(zhuǎn)換、處理等功能。
創(chuàng)建SparkSession對象可以通過“SparkSession.builder().getOrCreate()”方法獲取,但當(dāng)我們使用Spark-Shell編寫程序時,Spark-Shell客戶端會默認(rèn)提供了一個名為“sc”的SparkContext對象和一個名為“spark”的SparkSession對象,因此我們可以直接使用這兩個對象,不需要自行創(chuàng)建。啟動Spark-Shell命令如下所示。
$ spark-shell --master local[2]
在啟動Spark-Shell完成后,效果如圖1所示。
圖1 啟動Spark-Shell
在圖1中可以看出,SparkContext、SparkSession對象已創(chuàng)建完成。創(chuàng)建DataFrame有多種方式,最基本的方式是從一個已經(jīng)存在的RDD調(diào)用toDF()方法進(jìn)行轉(zhuǎn)換得到DataFrame,或者通過Spark讀取數(shù)據(jù)源直接創(chuàng)建。
在創(chuàng)建DataFrame之前,為了支持RDD轉(zhuǎn)換成DataFrame及后續(xù)的SQL操作,需要導(dǎo)入spark.implicits._包啟用隱式轉(zhuǎn)換。若使用SparkSession方式創(chuàng)建DataFrame,可以使用spark.read操作,從不同類型的文件中加載數(shù)據(jù)創(chuàng)建DataFrame,具體操作API如表1所示。
表1 spark.read操作
代碼示例 | 描述 |
---|---|
spark.read.text("people.txt") | 讀取txt格式的文本文件,創(chuàng)建DataFrame |
spark.read.csv ("people.csv") | 讀取csv格式的文本文件,創(chuàng)建DataFrame |
spark.read.json("people.json") | 讀取json格式的文本文件,創(chuàng)建DataFrame |
spark.read.parquet("people.parquet") | 讀取parquet格式的文本文件,創(chuàng)建DataFrame |
1.?dāng)?shù)據(jù)準(zhǔn)備
在HDFS文件系統(tǒng)中的/spark目錄中有一個person.txt文件,內(nèi)容如文件1所示。
文件1 person.txt
zhangsan 20 lisi 29 wangwu 25 zhaoliu 30 tianqi 35 jerry 40
2.通過文件直接創(chuàng)建DataFrame
我們通過Spark讀取數(shù)據(jù)源的方式進(jìn)行創(chuàng)建DataFrame,在Spark-Shell輸入下列代碼:
scala > val personDF = spark.read.text("/spark/person.txt") personDF: org.apache.spark.sql.DataFrame = [value: String] scala > personDF.printSchema() root |-- value: String (Nullable = true)
從上述返回結(jié)果personDF的屬性可以看出,創(chuàng)建DataFrame對象完成,之后調(diào)用DataFrame的printSchema()方法可以打印當(dāng)前對象的Schema元數(shù)據(jù)信息。從返回結(jié)果可以看出,當(dāng)前value字段是String數(shù)據(jù)類型,并且還可以為Null。
使用DataFrame的show()方法可以查看當(dāng)前DataFrame的結(jié)果數(shù)據(jù),具體代碼和返回結(jié)果如下所示。
scala > personDF.show() +-------------+ | value | +-------------+ |1 zhangsan 20| |2 lisi 29| |3 wangwu 25| |4 zhaoliu 30| |5 tianqi 35| |6 jerry 40| +-------------+
從上述返回結(jié)果看出,當(dāng)前personDF對象中的6條記錄就對應(yīng)了person.txt文本文件中的數(shù)據(jù)。
3.RDD轉(zhuǎn)換DataFrame
調(diào)用RDD的toDF()方法,可以將RDD轉(zhuǎn)換為DataFrame對象,具體代碼如下所示。
scala > val lineRDD = sc.textFile("/spark/person.txt").map(_.split(" ")) lineRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at map at <console>:24 scala > case class Person(id:Int,name:String,age:Int) defined class Person scala > val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) personRDD: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[7] at map at <console>:27 scala > val personDF = personRDD.toDF() personDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] scala > personDF.show +----+--------+----+ | id | name | age| +----+--------+----+ | 1 |zhangsan | 20| | 2 |lisi | 29| | 3 |wangwu | 25| | 4 |zhaoliu | 30| | 5 |tianqi | 35| | 6 |jerry | 40| +----+--------+----+ scala > personDF.printSchema root |-- id: integer (nullable = false) |-- name: string (nullable = true) |-- age: integer (nullable = false)
在上述代碼中,第1行代碼將文本文件轉(zhuǎn)換成RDD,第4行代碼定義Person樣例類,相當(dāng)于定義表的Schema元數(shù)據(jù)信息,第6行代碼表示使RDD中的數(shù)組數(shù)據(jù)與樣例類進(jìn)行關(guān)聯(lián),最終會將RDD[Array[String]]更改為RDD[Person],第9行代碼表示調(diào)用RDD的toDF()方法,就可以把RDD轉(zhuǎn)換成了DataFrame了。第12-27行代碼表示調(diào)用DataFrame方法并從返回結(jié)果可以看出,RDD對象成功轉(zhuǎn)換DataFrame。
猜你喜歡:
Redis、傳統(tǒng)數(shù)據(jù)庫、HBase以及Hive的區(qū)別
DataFrame是什么意思?與RDD相比有哪些優(yōu)點?