Spark SQL常见4种数据源详解

论坛 期权论坛 脚本     
niminba   2021-5-23 04:17   1623   0

通用load/write方法

手动指定选项

Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。

Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作。

修改配置项spark.sql.sources.default,可修改默认数据源格式。

scala> val df = spark.read.load("hdfs://hadoop001:9000/namesAndAges.parquet")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.select("name").write.save("names.parquet")

当数据源格式不是parquet格式文件时,需要手动指定数据源的格式。数据源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果数据源格式为内置格式,则只需要指定简称json, parquet, jdbc, orc, libsvm, csv, text来指定数据的格式。

可以通过SparkSession提供的read.load方法用于通用加载数据,使用write和save保存数据。

scala> val peopleDF = spark.read.format("json").load("hdfs://hadoop001:9000/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> peopleDF.write.format("parquet").save("hdfs://hadoop001:9000/namesAndAges.parquet")
scala>

除此之外,可以直接运行SQL在文件上:

val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://hadoop001:9000/namesAndAges.parquet`")
sqlDF.show()

文件保存选项

可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下表:

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(default) “error”(default) 如果文件存在,则报错
SaveMode.Append “append” 追加
SaveMode.Overwrite “overwrite” 覆写
SaveMode.Ignore “ignore” 数据存在,则忽略

Parquet文件

Parquet读写

Parquet格式经常在Hadoop生态圈中被使用,它也支持Spark SQL的全部数据类型。Spark SQL 提供了直接读取和存储 Parquet 格式文件的方法。

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("hdfs://hadoop001:9000/people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("hdfs://hadoop001:9000/people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+

解析分区信息

对表进行分区是对数据进行优化的方式之一。在分区的表内,数据通过分区列将数据存储在不同的目录下。Parquet数据源现在能够自动发现并解析分区信息。例如,对人口数据进行分区存储,分区列为gender和country,使用下面的目录结构:

path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...

通过传递path/to/table给 SQLContext.read.parque

或SQLContext.read.load,Spark SQL将自动解析分区信息。

返回的DataFrame的Schema如下:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

需要注意的是,数据的分区列的数据类型yn9 B Bi:)y/oymc9]{.b:`#y`fyl,yc8$ B]\HHBH\H\B\ Z\O OB ]B#9/h9/o9+]{g*\9.b#\ Z\.!l9k.,9g`;i9/h:g :)y+.-;` :)yl!K\KK[9b9aiyb,\oe{d)b&yc&b&\\"."Zoe{蹥&a9o.#yb,9e;/+yd$y/o&zg :)yl!Y]\yb(:fi;a+8 Bi%`]yn9 B Bi9繣yi%`lioy]{g :)z`&/"*izj8 BH9l!]y.+y]K\K[9'y% !z/kyb,\ey."oey."B9d\[;#"+]ya`kn- B]\HHBH\H\B[\[ K[X\\\Y N KZ\^\ XXZ]KMKK[\ OB ]Bl9kfB B\: '99bl9kf9;nml!koy... 9.*]\K9c&/\\[XY ycoy. 9.*]\[% !y. 9.*9#*#y+*/(9#9 :(c:` 9.*., B]\HHBH\H\B[YHZXY[CB[YH[HYCB[YH\[YN_CB[Z]]H\\[ [][X\\\\[\\CBYH[\[\[X][H]\ B[\\[\XH]\\YH] BH]HZ]\HH^[HH\XH[^[\[]H^[\\XZ[\\[KB[[QH\XY ] CBH[\Y[XHH\[^Y\H[[XJ HY]B[Q[[XJ CBB KHY[XHHYJCB KHN[[XHHYJCBX]\H[\\HY]\H]Q[YCB[QX]S\X[\Y][HCB][Y[H[H\HY]YYH\[Y[Y[Y\H\ PH[HTHYUS LS NHCBY[Y[Y\ CB KKKKJ_B KKKKJ\[B KKKKJ[\]][KH]Q[YHHX]YH]\\\YCBH]\[[HX\[[\[Q]\H\X]Q]\ B[YHZ[Y\H[X\]H[_H[ CB[\[HH\XY \[Q]\ CB\[K CB KKKKKKKKKKKKKJKKJY\_B KKKKKKKKKKKKKJKKJ[X\[Z[B KKKKKKKKKKKKKJKKJOB ]BB B\9c&/lkn幥l9kyo#]Q[Y{`&/]Q[Yy. 9b%yd#/!l9kayayflkn B# :)yl!all9kn9/b,\n9."B]\HHBH\H\B[\[ K[X\\\Y N KZ\^\ XXZ]KMKK[\BN[[[HXY]YXHZ]\H ]HY][]HH\B[H\XY X] K[\Λ^\Y N K[XHXHK[\K[\]HK CB[X[\Y\H]\Y\CBX[\Y\] \CBX[\Y\] \]HCB[H\XYBΛ^\Y N XHX[\Y\B[]HH\B]CBX] CB[\Λ^\Y N CB[XHXLCB[\CB[\]HCBJ CB]CBΛ^\Y N \X[\Y\BXZ[X]HXH[[]H\\]CB]CB[X]UXP[[\\[YHT K[Y[TT L HCBΛ^\Y N \X[\Y\ OB ]B."l,y+j:`yk{n#9&)9ki.h9"y`9n+b{.g&i&i&+/c.

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:1060120
帖子:212021
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP