大話Spark(8)-源碼之DAGScheduler

DAGScheduler的主要作用有2個:

一、把job劃分成多個Stage(Stage內部并行運行,整個作業按照Stage的順序依次執行)
二、提交任務

以下分別介紹下DAGScheduler是如何做這2件事情的,然後再跟源碼看下DAGScheduler的實現。

一、如何把Job劃分成多個Stage

1) 回顧下寬依賴和窄依賴

窄依賴:父RDD的每個分區只被子RDD的一個分區使用。(map,filter,union操作等)
寬依賴:父RDD的分區可能被多個子RDD的分區使用。(reduceByKey,groupByKey等)

如下圖所示,左側的算子為窄依賴, 右側為寬依賴


窄依賴可以支持在同一個集群Executor上,以管道形式順序執行多條命令,例如在執行了map后,緊接着執行filter。分區內的計算收斂,不需要依賴所有分區的數據,可以并行地在不同節點進行計算。所以它的失敗回復也更有效,因為它只需要重新計算丟失的parent partition即可。最重要的是窄依賴沒有shuffle過程,而寬依賴由於父RDD的分區可能被多個子RDD的分區使用,所以一定伴隨着shuffle操作。

2) DAGScheduler 如何把job劃分成多個Stage

DAGScheduler會把job劃分成多個Stage,如下圖sparkui上的截圖所示,job 0 被劃分成了3個stage

DAGScheduler劃分Stage的過程如下:
DAGScheduler會從觸發action操作的那個RDD開始往前倒推,首先會為最後一個RDD創建一個stage,然後往前倒推的時候,如果發現對某個RDD是寬依賴(產生Shuffle),那麼就會將寬依賴的那個RDD創建一個新的stage,那個RDD就是新的stage的最後一個RDD。然後依次類推,繼續往前倒推,根據窄依賴或者寬依賴進行stage的劃分,直到所有的RDD全部遍歷完成為止。

3) wordcount的Stage劃分

在前面大話spark(3)-一圖深入理解WordCount程序在Spark中的執行過程中,我畫過一張wordcount作業的Stage的劃分的圖,如下:

可以看出上圖中,第一個stage的3個task并行執行,遇到reduceByKey這個產生shuffle的操作開始劃分出新的Stage。但是其實這張圖是不準確的。
其實對於每一種有shuffle的操作,比如groupByKey、reduceByKey、countByKey的底層都對應了三個RDD:MapPartitionsRDD、ShuffleRdd、MapPartitionsRDD
(寬依賴shuffle生成的rdd為ShuffleRdd)
其中Shuffle發生在第一個RDD和第二個RDD之間,前面說過如果發現對某個RDD是寬依賴(產生Shuffle),那麼就會將寬依賴的那個RDD創建一個新的stage
所以說上圖中 reduceByKey操作其實對應了3個RDD,其中第一個RDD會被劃分到Stage1中!

4) DAGScheduler劃分Stage源碼

RDD類中所有的action算子觸發計算都會調用sc.runjob方法, 而sc.runjob方法底層都會調用到SparkContext中的dagscheduler對象的runJob方法
例如count這個action操作
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

一直追着runJob方法往底層看最終調用dagScheduler.runJob,傳入調用這個方法的rdd

dagScheduler.runJob內部調用submitJob提交當前的action到scheduler
submitJob內部調用DAGSchedulerEventProcessLoop發送JobSubmitted的信息,
在JobSubmitted內部最終調用dagScheduler的handleJobSubmitted(dagScheduler的核心入口)。

handleJobSubmitted方法如下:

上面代碼中submitStage提交作業,其內代碼如下:

submitStage方法中調用getMissingParentStages方法獲取finalStage的父stage,
如果不存在,則使用submitMissingTasks方法提交執行;
如果存在,則把該stage放到waitingStages中,同時遞歸調用submitStage。通過該算法把存在父stage的stage放入waitingStages中,不存在的作為作業運行的入口。

其中最重要的getMissingParentStages中是stage劃分的核心代碼,如下:

這裏就是前面說到的stage劃分的方式,查看最後一個rdd的依賴,如果是窄依賴,則不創建新的stage,如果是寬依賴,則用getOrCreateShuffledMapStage方法創建新的rdd,依次往前推。

所以Stage的劃分算法最核心的兩個方法為submitStage何getMissingParentStage

二、提交任務

當Stage提交運行后,在DAGScheduler的submitMissingTasks方法中,會根據Stage的Partition個數拆分對應個數任務,這些任務組成一個TaskSet提交到TaskScheduler進行處理。
對於ResultStage(最後一個Stage)生成ResultTask,對於ShuffleMapStage生成ShuffleMapTask。
每一個TaskSet包含對應Stage的所有task,這些Task的處理邏輯完全一樣,不同的是對應處理的數據,而這些數據是對應其數據分片的(Partition)。
submitMissingTasks如下:

【精選推薦文章】

智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

想知道網站建置、網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計及後台網頁設計

帶您來看台北網站建置台北網頁設計,各種案例分享

廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

您可能也會喜歡…