摘要:DataFrame API的引入一改RDD API高冷的FP姿態(tài),令Spark變得更加平易近人。外部數(shù)據(jù)源API體現(xiàn)出的則是兼容并蓄,Spark SQL多元一體的結(jié)構(gòu)化數(shù)據(jù)處理能力正在逐漸釋放。
關(guān)于作者:連城,Databricks工程師,Spark committer,Spark SQL主要開發(fā)者之一。在4月18日召開的 2015 Spark技術(shù)峰會 上,連城將做名為“四兩撥千斤——Spark SQL結(jié)構(gòu)化數(shù)據(jù)分析”的主題演講。
自2013年3月面世以來,Spark SQL已經(jīng)成為除Spark Core以外大的Spark組件。除了接過Shark的接力棒,繼續(xù)為Spark用戶提供高性能的SQL on Hadoop解決方案之外,它還為Spark帶來了通用、高效、多元一體的結(jié)構(gòu)化數(shù)據(jù)處理能力。在剛剛發(fā)布的1.3.0版中,Spark SQL的兩大升級被詮釋得淋漓盡致。
DataFrame
就易用性而言,對比傳統(tǒng)的MapReduce API,說Spark的RDD API有了數(shù)量級的飛躍并不為過。然而,對于沒有MapReduce和函數(shù)式編程經(jīng)驗(yàn)的新手來說,RDD API仍然存在著一定的門檻。另一方面,數(shù)據(jù)科學(xué)家們所熟悉的R、Pandas等傳統(tǒng)數(shù)據(jù)框架雖然提供了直觀的API,卻局限于單機(jī)處理,無法勝任大數(shù)據(jù)場景。為了解決這一矛盾,Spark SQL 1.3.0在原有SchemaRDD的基礎(chǔ)上提供了與R和Pandas風(fēng)格類似的DataFrame API。新的DataFrame AP不僅可以大幅度降低普通開發(fā)者的學(xué)習(xí)門檻,同時(shí)還支持Scala、Java與Python三種語言。更重要的是,由于脫胎自SchemaRDD,DataFrame天然適用于分布式大數(shù)據(jù)場景。
DataFrame是什么?
在Spark中,DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,類似于傳統(tǒng)數(shù)據(jù)庫中的二維表格。DataFrame與RDD的主要區(qū)別在于,前者帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結(jié)構(gòu)信息,從而對藏于DataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進(jìn)行了針對性的優(yōu)化,終達(dá)到大幅提升運(yùn)行時(shí)效率的目標(biāo)。反觀RDD,由于無從得知所存數(shù)據(jù)元素的具體內(nèi)部結(jié)構(gòu),Spark Core只能在stage層面進(jìn)行簡單、通用的流水線優(yōu)化。
創(chuàng)建DataFrame
在Spark SQL中,開發(fā)者可以非常便捷地將各種內(nèi)、外部的單機(jī)、分布式數(shù)據(jù)轉(zhuǎn)換為DataFrame。以下Python示例代碼充分體現(xiàn)了Spark SQL 1.3.0中DataFrame數(shù)據(jù)源的豐富多樣和簡單易用:
[py] view plaincopy在CODE上查看代碼片派生到我的代碼片
# 從Hive中的users表構(gòu)造DataFrame
users = sqlContext.table("users")
# 加載S3上的JSON文件
logs = sqlContext.load("s3n://path/to/data.json", "json")
# 加載HDFS上的Parquet文件
clicks = sqlContext.load("hdfs://path/to/data.parquet", "parquet")
# 通過JDBC訪問MySQL
comments = sqlContext.jdbc("jdbc:mysql://localhost/comments", "user")
# 將普通RDD轉(zhuǎn)變?yōu)镈ataFrame
rdd = sparkContext.textFile("article.txt")
.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b)
wordCounts = sqlContext.createDataFrame(rdd, ["word", "count"])
# 將本地?cái)?shù)據(jù)容器轉(zhuǎn)變?yōu)镈ataFrame
data = [("Alice", 21), ("Bob", 24)]
people = sqlContext.createDataFrame(data, ["name", "age"])
# 將Pandas DataFrame轉(zhuǎn)變?yōu)镾park DataFrame(Python API特有功能)
sparkDF = sqlContext.createDataFrame(pandasDF)
可見,從Hive表,到外部數(shù)據(jù)源API支持的各種數(shù)據(jù)源(JSON、Parquet、JDBC),再到RDD乃至各種本地?cái)?shù)據(jù)集,都可以被方便快捷地加載、轉(zhuǎn)換為DataFrame。這些功能也同樣存在于Spark SQL的Scala API和Java API中。
使用DataFrame
和R、Pandas類似,Spark DataFrame也提供了一整套用于操縱數(shù)據(jù)的DSL。這些DSL在語義上與SQL關(guān)系查詢非常相近(這也是Spark SQL能夠?yàn)镈ataFrame提供無縫支持的重要原因之一)。以下是一組用戶數(shù)據(jù)分析示例:
[py] view plaincopy在CODE上查看代碼片派生到我的代碼片
# 創(chuàng)建一個(gè)只包含"年輕"用戶的DataFrame
young = users.filter(users.age < 21)
# 也可以使用Pandas風(fēng)格的語法
young = users[users.age < 21]
# 將所有人的年齡加1
young.select(young.name, young.age + 1)
# 統(tǒng)計(jì)年輕用戶中各性別人數(shù)
young.groupBy("gender").count()
# 將所有年輕用戶與另一個(gè)名為logs的DataFrame聯(lián)接起來
young.join(logs, logs.userId == users.userId, "left_outer")
除DSL以外,我們當(dāng)然也可以像以往一樣,用SQL來處理DataFrame:
[py] view plaincopy在CODE上查看代碼片派生到我的代碼片
young.registerTempTable("young")
sqlContext.sql("SELECT count(*) FROM young")
后,當(dāng)數(shù)據(jù)分析邏輯編寫完畢后,我們便可以將終結(jié)果保存下來或展現(xiàn)出來:
[js] view plaincopy在CODE上查看代碼片派生到我的代碼片
# 追加至HDFS上的Parquet文件
young.save(path="hdfs://path/to/data.parquet",
source="parquet",
mode="append")
# 覆寫S3上的JSON文件
young.save(path="s3n://path/to/data.json",
source="json",
mode="append")
# 保存為SQL表
young.saveAsTable(tableName="young", source="parquet" mode="overwrite")
# 轉(zhuǎn)換為Pandas DataFrame(Python API特有功能)
pandasDF = young.toPandas()
# 以表格形式打印輸出
young.show()
幕后英雄:Spark SQL查詢優(yōu)化器與代碼生成
正如RDD的各種變換實(shí)際上只是在構(gòu)造RDD DAG,DataFrame的各種變換同樣也是lazy的。它們并不直接求出計(jì)算結(jié)果,而是將各種變換組裝成與RDD DAG類似的邏輯查詢計(jì)劃。如前所述,由于DataFrame帶有schema元信息,Spark SQL的查詢優(yōu)化器得以洞察數(shù)據(jù)和計(jì)算的精細(xì)結(jié)構(gòu),從而施行具有很強(qiáng)針對性的優(yōu)化。隨后,經(jīng)過優(yōu)化的邏輯執(zhí)行計(jì)劃被翻譯為物理執(zhí)行計(jì)劃,并終落實(shí)為RDD DAG。
這樣做的好處體現(xiàn)在幾個(gè)方面:
1. 用戶可以用更少的申明式代碼闡明計(jì)算邏輯,物理執(zhí)行路徑則交由Spark SQL自行挑選。一方面降低了開發(fā)成本,一方面也降低了使用門檻——很多情況下,即便新手寫出了較為低效的查詢,Spark SQL也可以通過過濾條件下推、列剪枝等策略予以有效優(yōu)化。這是RDD API所不具備的。
2. Spark SQL可以動(dòng)態(tài)地為物理執(zhí)行計(jì)劃中的表達(dá)式生成JVM字節(jié)碼,進(jìn)一步實(shí)現(xiàn)歸避虛函數(shù)調(diào)用開銷、削減對象分配次數(shù)等底層優(yōu)化,使得終的查詢執(zhí)行性能可以與手寫代碼的性能相媲美。
3. 對于PySpark而言,采用DataFrame編程時(shí)只需要構(gòu)造體積小巧的邏輯執(zhí)行計(jì)劃,物理執(zhí)行全部由JVM端負(fù)責(zé),Python解釋器和JVM間大量不必要的跨進(jìn)程通訊得以免除。如上圖所示,一組簡單的對一千萬整數(shù)對做聚合的測試中,PySpark中DataFrame API的性能輕松勝出RDD API近五倍。此外,今后Spark SQL在Scala端對查詢優(yōu)化器的所有性能改進(jìn),PySpark都可以免費(fèi)獲益。
外部數(shù)據(jù)源API增強(qiáng)
從前文中我們已經(jīng)看到,Spark 1.3.0為DataFrame提供了豐富多樣的數(shù)據(jù)源支持。其中的重頭戲,便是自Spark 1.2.0引入的外部數(shù)據(jù)源API。在1.3.0中,我們對這套API做了進(jìn)一步的增強(qiáng)。
數(shù)據(jù)寫入支持
在Spark 1.2.0中,外部數(shù)據(jù)源API只能將外部數(shù)據(jù)源中的數(shù)據(jù)讀入Spark,而無法將計(jì)算結(jié)果寫回?cái)?shù)據(jù)源;同時(shí),通過數(shù)據(jù)源引入并注冊的表只能是臨時(shí)表,相關(guān)元信息無法持久化。在1.3.0中,我們提供了完整的數(shù)據(jù)寫入支持,從而補(bǔ)全了多數(shù)據(jù)源互操作的后一塊重要拼圖。前文示例中Hive、Parquet、JSON、Pandas等多種數(shù)據(jù)源間的任意轉(zhuǎn)換,正是這一增強(qiáng)的直接成果。
站在Spark SQL外部數(shù)據(jù)源開發(fā)者的角度,數(shù)據(jù)寫入支持的API主要包括:
1. 數(shù)據(jù)源表元數(shù)據(jù)持久化
1.3.0引入了新的外部數(shù)據(jù)源DDL語法
[py] view plaincopy在CODE上查看代碼片派生到我的代碼片
CREATE [TEMPORARY] TABLE [IF NOT EXISTS]
<table-name> [(col-name data-type [, …])]
USING <source> [OPTIONS ...]
[AS <select-query>]
由此,注冊自外部數(shù)據(jù)的SQL表既可以是臨時(shí)表,也可以被持久化至Hive metastore。需要持久化支持的外部數(shù)據(jù)源,除了需要繼承原有的RelationProvider以外,還需繼承CreatableRelationProvider。
2. InsertableRelation
支持?jǐn)?shù)據(jù)寫入的外部數(shù)據(jù)源的relation類,還需繼承trait InsertableRelation,并在insert方法中實(shí)現(xiàn)數(shù)據(jù)插入邏輯。
Spark 1.3.0中內(nèi)置的JSON和Parquet數(shù)據(jù)源都已實(shí)現(xiàn)上述API,可以作為開發(fā)外部數(shù)據(jù)源的參考示例。
統(tǒng)一的load/save API
在Spark 1.2.0中,要想將SchemaRDD中的結(jié)果保存下來,便捷的選擇并不多。常用的一些包括:
rdd.saveAsParquetFile(...)
rdd.saveAsTextFile(...)
rdd.toJSON.saveAsTextFile(...)
rdd.saveAsTable(...)
....
可見,不同的數(shù)據(jù)輸出方式,采用的API也不盡相同。更令人頭疼的是,我們?nèi)狈σ粋€(gè)靈活擴(kuò)展新的數(shù)據(jù)寫入格式的方式。
針對這一問題,1.3.0統(tǒng)一了load/save API,讓用戶按需自由選擇外部數(shù)據(jù)源。這套API包括:
1.SQLContext.table
從SQL表中加載DataFrame。
2.SQLContext.load
從指定的外部數(shù)據(jù)源加載DataFrame。
3.SQLContext.createExternalTable
將指定位置的數(shù)據(jù)保存為外部SQL表,元信息存入Hive metastore,并返回包含相應(yīng)數(shù)據(jù)的DataFrame。
4.DataFrame.save
將DataFrame寫入指定的外部數(shù)據(jù)源。
5.DataFrame.saveAsTable
將DataFrame保存為SQL表,元信息存入Hive metastore,同時(shí)將數(shù)據(jù)寫入指定位置。
Parquet數(shù)據(jù)源增強(qiáng)
Spark SQL從一開始便內(nèi)置支持Parquet這一高效的列式存儲格式。在開放外部數(shù)據(jù)源API之后,原有的Parquet支持也正在逐漸轉(zhuǎn)向外部數(shù)據(jù)源。1.3.0中,Parquet外部數(shù)據(jù)源的能力得到了顯著增強(qiáng)。主要包括schema合并和自動(dòng)分區(qū)處理。
1.Schema合并
與ProtocolBuffer和Thrift類似,Parquet也允許用戶在定義好schema之后隨時(shí)間推移逐漸添加新的列,只要不修改原有列的元信息,新舊schema仍然可以兼容。這一特性使得用戶可以隨時(shí)按需添加新的數(shù)據(jù)列,而無需操心數(shù)據(jù)遷移。
2.分區(qū)信息發(fā)現(xiàn)
按目錄對同一張表中的數(shù)據(jù)分區(qū)存儲,是Hive等系統(tǒng)采用的一種常見的數(shù)據(jù)存儲方式。新的Parquet數(shù)據(jù)源可以自動(dòng)根據(jù)目錄結(jié)構(gòu)發(fā)現(xiàn)和推演分區(qū)信息。
3.分區(qū)剪枝
分區(qū)實(shí)際上提供了一種粗粒度的索引。當(dāng)查詢條件中僅涉及部分分區(qū)時(shí),通過分區(qū)剪枝跳過不必要掃描的分區(qū)目錄,可以大幅提升查詢性能。
以下Scala代碼示例統(tǒng)一展示了1.3.0中Parquet數(shù)據(jù)源的這幾個(gè)能力:
[py] view plaincopy在CODE上查看代碼片派生到我的代碼片
// 創(chuàng)建兩個(gè)簡單的DataFrame,將之存入兩個(gè)獨(dú)立的分區(qū)目錄
val df1 = (1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.save("data/test_table/key=1", "parquet", SaveMode.Append)
val df2 = (6 to 10).map(i => (i, i * 2)).toDF("single", "double")
df2.save("data/test_table/key=2", "parquet", SaveMode.Append)
// 在另一個(gè)DataFrame中引入一個(gè)新的列,并存入另一個(gè)分區(qū)目錄
val df3 = (11 to 15).map(i => (i, i * 3)).toDF("single", "triple")
df3.save("data/test_table/key=3", "parquet", SaveMode.Append)
// 一次性讀入整個(gè)分區(qū)表的數(shù)據(jù)
val df4 = sqlContext.load("data/test_table", "parquet")
// 按分區(qū)進(jìn)行查詢,并展示結(jié)果
val df5 = df4.filter($"key" >= 2) df5.show()
這段代碼的執(zhí)行結(jié)果為:
6 12 null 2
7 14 null 2
8 16 null 2
9 18 null 2
10 20 null 2
11 null 33 3
12 null 36 3
13 null 39 3
14 null 42 3
15 null 45 3
可見,Parquet數(shù)據(jù)源自動(dòng)從文件路徑中發(fā)現(xiàn)了key這個(gè)分區(qū)列,并且正確合并了兩個(gè)不相同但相容的schema。值得注意的是,在后的查詢中查詢條件跳過了key=1這個(gè)分區(qū)。Spark SQL的查詢優(yōu)化器會根據(jù)這個(gè)查詢條件將該分區(qū)目錄剪掉,完全不掃描該目錄中的數(shù)據(jù),從而提升查詢性能。
小結(jié)
DataFrame API的引入一改RDD API高冷的FP姿態(tài),令Spark變得更加平易近人,使大數(shù)據(jù)分析的開發(fā)體驗(yàn)與傳統(tǒng)單機(jī)數(shù)據(jù)分析的開發(fā)體驗(yàn)越來越接近。外部數(shù)據(jù)源API體現(xiàn)出的則是兼容并蓄。目前,除了內(nèi)置的JSON、Parquet、JDBC以外,社區(qū)中已經(jīng)涌現(xiàn)出了CSV、Avro、HBase等多種數(shù)據(jù)源,Spark SQL多元一體的結(jié)構(gòu)化數(shù)據(jù)處理能力正在逐漸釋放。
為開發(fā)者提供更多的擴(kuò)展點(diǎn),是Spark貫穿整個(gè)2015年的主題之一。我們希望通過這些擴(kuò)展API,切實(shí)地引爆社區(qū)的能量,令Spark的生態(tài)更加豐滿和多樣。
本站文章版權(quán)歸原作者及原出處所有 。內(nèi)容為作者個(gè)人觀點(diǎn), 并不代表本站贊同其觀點(diǎn)和對其真實(shí)性負(fù)責(zé),本站只提供參考并不構(gòu)成任何投資及應(yīng)用建議。本站是一個(gè)個(gè)人學(xué)習(xí)交流的平臺,網(wǎng)站上部分文章為轉(zhuǎn)載,并不用于任何商業(yè)目的,我們已經(jīng)盡可能的對作者和來源進(jìn)行了通告,但是能力有限或疏忽,造成漏登,請及時(shí)聯(lián)系我們,我們將根據(jù)著作權(quán)人的要求,立即更正或者刪除有關(guān)內(nèi)容。本站擁有對此聲明的最終解釋權(quán)。