通用load/write方法
手动指定选项
Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。
Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作。
修改配置项spark.sql.sources.default,可修改默认数据源格式。
scala> val df = spark.read.load("hdfs://hadoop001:9000/namesAndAges.parquet")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.select("name").write.save("names.parquet")
当数据源格式不是parquet格式文件时,需要手动指定数据源的格式。数据源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果数据源格式为内置格式,则只需要指定简称json, parquet, jdbc, orc, libsvm, csv, text来指定数据的格式。
可以通过SparkSession提供的read.load方法用于通用加载数据,使用write和save保存数据。
scala> val peopleDF = spark.read.format("json").load("hdfs://hadoop001:9000/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> peopleDF.write.format("parquet").save("hdfs://hadoop001:9000/namesAndAges.parquet")
scala>
除此之外,可以直接运行SQL在文件上:
val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://hadoop001:9000/namesAndAges.parquet`")
sqlDF.show()
文件保存选项
可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下表:
Scala/Java |
Any Language |
Meaning |
SaveMode.ErrorIfExists(default) |
“error”(default) |
如果文件存在,则报错 |
SaveMode.Append |
“append” |
追加 |
SaveMode.Overwrite |
“overwrite” |
覆写 |
SaveMode.Ignore |
“ignore” |
数据存在,则忽略 |
Parquet文件
Parquet读写
Parquet格式经常在Hadoop生态圈中被使用,它也支持Spark SQL的全部数据类型。Spark SQL 提供了直接读取和存储 Parquet 格式文件的方法。
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("hdfs://hadoop001:9000/people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("hdfs://hadoop001:9000/people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
解析分区信息
对表进行分区是对数据进行优化的方式之一。在分区的表内,数据通过分区列将数据存储在不同的目录下。Parquet数据源现在能够自动发现并解析分区信息。例如,对人口数据进行分区存储,分区列为gender和country,使用下面的目录结构:
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
通过传递path/to/table给 SQLContext.read.parque
或SQLContext.read.load,Spark SQL将自动解析分区信息。
返回的DataFrame的Schema如下:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
需要注意的是,数据的分区列的数据类型yn9BBi:)y/oymc9]{.b:`#y`fyl,yc8$B]\HHBH\H\B\Z\OOB]B#9/h9/o9+]{g*\9.b#\Z\.!l9k.,9g`;i9/h:g :)y+.-;` :)yl!K\K[9dK[9b9aiyb,\oe{d)b&yc&b&\\"."Zoe{蹥&a9o.#yb,9e;/+yd$y/o&zg :)yl!Y]\yb(:fi;a+8Bi%`]yn9BBi9繣yi%`lioy]{g :)z`&/"*izj8BH9l!]y.+y]K\K[9'y% !z/kyb,\ey."oey."B9d\[;#"+]ya`kn- B]\HHBH\H\B[\[K[X\\\YNKZ\^\XXZ]KMKK[\OB]Bl9kfBB\: '99bl9kf9;nml!koy... 9.*]\K9c&/\\[XY
ycoy. 9.*]\[% !y. 9.*9#*#y+*/(9#9 :(c:` 9.*.,B]\HHBH\H\B[YHZXY[CB[YH[HYCB[YH\[YN_CB[Z]]H\\[[][X\\\\[\\CBYH[\[\[X][H]\B[\\[\XH]\\YH]BH]HZ]\HH^[HH\XH[^[\[]H^[\\XZ[\\[KB[[QH\XY]
CBH[\Y[XHH\[^Y\H[[XJ
HY]B[Q[[XJ
CBBKHY[XHHYJCBKH[YN[[XHHYJCBX]\H[\\HY]\H]Q[YCB[QX]S\X[\Y][HCB][Y[H[H\HY]YYH\[Y[Y[Y\H\
P[YH[HTHYUSLSNHCBY[Y[Y\
CB
KKKKJ[Y_B
KKKKJ\[B
KKKKJ[\]][KH]Q[YHHX]YH]\\\YCBH]\[[HX\[[\[Q]\H\X]Q]\
B[YHZ[Y\H[X\]H[_H[
CB[\[HH\XY\[Q]\
CB\[K
CB
KKKKKKKKKKKKKJKKJY\[Y_B
KKKKKKKKKKKKKJKKJ[X\[Z[B
KKKKKKKKKKKKKJKKJOB]BBB\9c&/lkn幥l9kyo#]Q[Y{`&/]Q[Yy. 9b%yd#/!l9kayayflkn B# :)yl!all9kn9/b,\n9."B]\HHBH\H\B[\[K[X\\\YNKZ\^\XXZ]KMKK[\BN[[[HXY]YXHZ]\H]HY][]HH\B[H\XYX]
K[\Λ^\YN
K[XHXHK[\K[\]HK
CB[X[\Y\H]\Y\CBX[\Y\]
\CBX[\Y\]
\]HCB[H\XYBΛ^\YN
XHX[\Y\B[]HH\B]CBX]
CB[\Λ^\YN
CB[XHXLCB[\CB[\]HCBJ
CB]CBΛ^\YN
\X[\Y\BXZ[X]HXH[[]H\\]CB]CB[X]UXP[[\\[YHT
K[Y[TTL
HCBΛ^\YN
\X[\Y\OB]B."l,y+j:`yk{n#9&)9ki.h9"y`9n+b{.g&i&i&+/c. |