Join operations in Spark
以下の内容については基本的にSpark: The Definitive Guideの内容をベースとしており、一部必要な情報については他のリソースから付け足している。Sparkについて理解を深めるにはまず本書を読んでおくと良い。今回はChapter8 JoinをベースとしてDataFrame Joinおよび追加でRDD Joinについて挙動を確認していく。大まかな流れとしては以下。
- DataFrame Join Types - Inner, Outer, Left Outer, Right Outer, Left Semi, Left Anti, Natural, Cross (Cartesian)
- Notes in Join operation
- How performing DataFrame Join
- RDD Join
- Misc
Joinとは2つのData-set (LeftとRight)における、1つ以上のKeyを比較し、それぞれのデータを合わせることである。よく知られるJoin expressionとしてはEqui Joinがある。これはLeft-sideおよびRight-sideにおけるKeyを比較し、等しいものを合わせ、異なるKeyを含むRowは捨てるといったものである。SparkではEqui Join以外のJoinもサポートしており、それらについては以下に記載する。
#
1. DataFrame Join Types- Inner Joins
- Outer Joins
- Left Outer Joins
- Right Outer Joins
- Left Semi Joins
- Left Anti Joins
- Natural Joins
- Cross (or Cartesian) Joins
上記8Join operationsについてそれぞれ確認するために、まずは以下の通りData-setを準備する (Definitive GuideではScala, PySparkどちらともで示されていたがここでは簡単のためPySparkのみ記載する。また元Data-setは一部変更している)。
Additaionlly, we confirm all tables:
#
1-1. Inner JoinsInner Joinは各Tableにおける指定されたKeyを比較し、共通しているRowのみ残し、Tableを結合する (以下の例では、hero.hero_rank_range
とheroRankRange.id
における共通のもののみ残っており、Right-sideのその他のRowは捨てられている)。なおDataFrameにおけるDefaultのJoinはInnerで、明示的にinner
と指定する必要はない。
#
1-2. Outer Joins指定したTableをKeyにかかわらず結合する。共通のKeyがないRowに関しては、以下の例のようにSpark DataFrameではnull
を挿入する。
#
1-3. Left Outer JoinsLeftにおけるTableは全て残し、さらにLeftとRightにおいて共通しているLeft側に存在するRight-sideのKeyを合わせる。以下の例ではheroRankRange
(Left)のTableは残し、heroRankRange.id
とhero.hero_rank_range
において共通のKeyを残している。またRight-sideのTableに存在しないRowについてはnull
を加える。
上記例におけるLeft/Rightを逆にJoinした場合は、Left (hero
)側にない、Right (heroRankRange
)側のid
のRowは捨てられる。
#
1-4. Right Outer Joins1-3. Left Outer Joinsと類似しており、Right-sideのTableは全て残し、LeftとRightを比較した上で、共通しているLeft側に存在するLeft-sideのKeyを合わせる。以下の例では、heroRankRange
Tableは全て残し、Left/Rightで共通しているものをさらに結合している。Left-sideのTableに存在しないRowについてはnull
を加える。
#
1-5. Left Semi JoinsLeftとRightを比較のみ行われ、共通のKeyがあればそのRowを残し、共通でないものは捨てる。DataFrameのFilteringを行う際に利用すると良い (Right-sideのTableは捨てられる)。以下のようにhero
をLeftとした場合は、指定したKeyに関して、全RowがheroRankRange
に含まれるため全Rowが残る。
Left/Right逆にすると、heroRankRange
とhero
の両方に含まれているKeyが存在するRowのみ残り、それ以外は捨てられる。
#
1-6. Left Anti Joins1-5. Left Semi Joinsと同様に共通のKeyがあるかを比較し、Right-sideのTableを捨てる (下の例ではhero
Tableが捨てられる)が、Left Semi Joinsとは逆で、共通でないものを残し、共通のものを捨てる。AntiについてはSQL FilterにおけるNOT IN
と考えると理解し易い。
Left/Right逆にすると、hero.hero_rank_range
における各値は全てheroRankRange.id
に含まれるので以下の通り空のDataFrameが返される。
#
1-7. Natural JoinsNatural Joinsの場合は暗黙的にどのKeyを結合するかSpark側で推測される。使用する場合は意図したJoinになっているか注意する必要がある。
上記ようにexplain()
methodを利用すると、id
をKeyとしてInner Joinが行われていることが確認できる。参考までに例えばLeft Antiの場合のPhysical planを確認してみると、以下の通りLeftAnti
の計画があることが確認できる。
#
1-8. Cross (or Cartesian) Joins上記例のLeft/Rightを逆にした場合は、
なおCartesian Joinについては以下の点を考慮する必要がある。
- Output rowの数に気を付ける (ex. 100,000 * 100,000 = 10B)
- Cartesian Joinの代替案を実施する:
- UniqueIdを持ったRDDを作成し、このKeyをベースにJoinする (RDDであれば細かく制御可能)
- Broadcastを強制する
- 各UIDごとにUDFを呼び出すUDFを作成し、Table rowに対し処理を行う
- 事前にサンプルデータセットをベースに現在のcluster sizeに対する所要時間を見積もっておく
#
2. Notes in Join OperationここではJoin operationを行う上で留意しなければならないことや、その扱い方について記載する。具体的には以下の内容に触れる。
- Joins on Complex Types
- Handling Duplicate Column Names - 3 approaches
#
Joins on Complex TypesComplex Typesに関するJoinについても同様に実施できる。以下では、hero
Tableにおけるid
Columnをperson_id
にRenameしてから、rank_history
(in hero
Table)およびid
(in rankHistory
Table)ColumnをKeyとしてJoinしている。
#
Handling Duplicate Column NamesJoin処理後に重複したColumnに対し行う処理としては、以下の3 approachesが考えられる。
- Different join expression
- Dropping the column after join
- Renaming a column before the join
DataFrame JoinではCatalyst内で各Columnに対しuniqueIdが設定される。なお本uniqueIdはCatalyst内部で使用され外部から参照することはできない。各ColumnがuniqueIdで扱われるために、Duplicated column namesをCatalyst内で扱うことができず、手動で対応する必要がある。Duplicated column namesが起きた場合以下の2 situationsの発生を考慮する必要がある。
- Join時に指定したDataFrame Keyが同じColumn nameである場合
- Join時に指定したDataFrame Keyは異なるが、各DataFrameにて同じColumn nameをもつ場合
Duplicated column namesには本sectionの冒頭でも記載したが以下の3 Patternsで対応すると良い。
#
(1) Different join expression最も簡単な方法としては、Join expressionをBooleanからString or Sequence expressionへ変更することである。例えば以下のようにString expressionに変更した場合、Duplicated columnはRemoveされる。
#
(2) Dropping the column after the joinJoin後にdrop
methodによりDuplicated columnをdropすることで対応する。ただし事前に元のDataFrameを参照する必要がある。このApproachは以下の2つの場合に有効である。
- 同じKeyに対してJoinする場合
- Join対象のDataFrameが同じColumn nameを持つ場合
それぞれの場合について、以下に対応例を示す。
#
(3) Renaming a column before the joinJoin前にwithColumnRenamed
methodによりDuplicated column nameとなるColumn nameを変更し、重複を防ぐことができる。
#
3. How performing DataFrame JoinDataFrame Joinに関しては、以下2つのStrategyがある。
- node-to-node communication strategy -> Shuffle (Hash) Join
- per node computation strategy -> Broadcast (Hash) Join
これらの違いについてはBig-Tabale/Small-Tableの2つを考慮した際に、Big to BigでJoinするのか、あるいはBig to SmallでJoinするのかであり (あるいはSmall to Small)、それぞれCatalyst側で判断される。ここでいう Small とは、Table sizeが各Workerにおけるmemoryにfitすることを意味している。DataFrame JoinについてはTable sizeによってそれぞれ以下のように分類することができる。
- Shuffle Join = Big to Big
- Broadcast Join = Big to Small (or Small to Small)
#
Shuffle Join - Big to BigShuffle Joinでは、Shuffleが発生する。つまり各Nodeごとにcommunicationを行い、各Nodeにおけるデータを特定のKey (もしくはkey-set)に基づきデータをshareする。本JoinはShuffle (に伴うnetwork転送)が発生するため、転送コストが高い。特にPartition分散があまりなされていない場合は特に高い。Shuffle JoinのFlowについては、MapReduceの動きと似ておりJoin対象のDataFrame (Table)がMapされ、Join-keyよりData-setがShuffleされ、最終的にKeyに基づきReduce phaseでJoinされる。
なおShuffle JoinのPerformanceに関してBestとなる場合は、DataFrameが以下をみたす場合である。
- Join-keyをベースに均等にPartition分散されていること
- 並列処理を行うにあたり適切な数のKeyを持っていること (Keyが分散していないと並列Joinできなくなる)
Best solutionとしては、Join対象の2-DataFramesがEven-shardingとなることである。もしどうしても片Table-sizeが大きくなる場合は、filter
などでDataFrame-sizeを小さくすることも1つの方法。
なおShuffle problemsを見つけるには、SparkUIを確認すると良い。注目するポイントとしては以下。
- 他のTaskに比べ処理に時間のかかっているTaskがあるか
- Speculative tasksが発生しているか
- 大量のinput/shuffle outputが存在するshardがあるか
#
Broadcast Join - Big to SmallTable sizeが1 WorkerNodeのmemory size内におさまる場合に (SizeEstimator.scalaにより類推される)、Broadcast Joinとなり、より効率的にJoinすることが可能となる。Broadcast Joinの場合はShuffleが発生せず、Join前後のDataFrameの関係性はNarrow Dependencyとなる。具体的には以下のようなJoin Flowとなる。
DataFrame explain
methodにより以下のように確認することができる (以下の場合はbroadcast joinとなるように強制している)。
なおhero
とheroRankRange
を逆にした場合も同様にBroadcast Joinとなる。SparkSQLでも同様の強制が可能で、SQL同様/*+ ... */
とすることでHintを渡すことができる ( MAPJOIN
, BROADCAST
, BROADCASTJOIN
など)。
ちなみにRDBSにおけるSQLのように/*
の後に+
がないHintについてはSparkSQLの場合、Hintが有効にはならずに (Errorにもならない)以下の通りDefaultのSortMergeJoinとなる。
なおSparkSQLにおけるHintは必ずしも強制ではないため、Optimizer側で無視される可能性があることに注意する。
join(<otherRDD>, numPartitions)
#
4. RDD Join - RDD Joinについては2つのPairRDD (ex: RDD[(K, V)]
and RDD[(K, W)]
)を取り1つのPairRDD (ex: RDD[(K, (V, W))]
)を返すOperationである。join
methodを使用した場合はInner Joinとなり、それ以外のoperationについては以下に記載するように、各Join operationごとのmethodを使用する必要がある。
- Inner Join -
join
- FullOuter Join -
fullOuterJoin
- LeftOuter Join -
leftOuterJoin
- RightOuter Join -
rightOuterJoin
- Cartesian Join -
cartesian
例としてInner Joinを確認してみると以下の通り (今回は簡単のためDataFrameからRDDに変更しJoin operationを実施している)。
RDD Join operationの詳細については下図に示す通り以下のフローで実行される。
- 2 PairRDDsに対し、
cogroup()
methodが呼ばれ、RDD [(K, (Iterable[V1], Iterable[V2]))]
typeをもつMappedValuesRDD
が作成される - その後2つの
Iterable
間でのCartesian product (直積)が取られる - 最後に
flatMap
methodが呼ばれ、FlatMappedValuesRDD
が作成される
詳細についてはTransformation and Action in Spark Internals - join(otherRDD, numPartitions)に記載している。
またjoin
method以外にもPairRDDをJoinするmethodの1つとしてzip
が存在する (本methodを使用する場合は、各Partitionに同数の要素がなければいけないという制約がある。失敗するとorg.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
のエラーが出力される)。
#
Misc.その他、Joinに関して特別なUse-caseを考える。
#
Theta Join以下のようなEqualでない場合のJoinのことを指している。このようなJoinにおいてSparkSQLはtheta conditionが満たされているかどうか、各keyBに対し、各keyAを調べ、ループして確認する挙動となる。そのためkeyAおよびkeyBに対しBucketingしておくと良い。
#
One to Many JoinSingle row tableが、many row tableにJoinされる場合のJoinのことを指している。この場合parquetを使用していると、重複したデータをencodeするのみでよくなるため良い。
#
Reference- The Spark Definitive Guide, Chapter8 Joins
- SparkInternals/2-JobLogicalPlan.md at master · JerryLead/SparkInternals · GitHub
- Optimizing Apache Spark SQL Joins: Spark Summit East talk by Vida Ha - YouTube