Tips for Troubleshooting in Spark

以下の内容については基本的にSpark: The Definitive Guideの内容をベースとしており、一部必要な情報については他のリソースから付け足している。Sparkについて理解を深めるにはまず本書を読んでおくと良い。今回はChapter18 Monitoring & DebuggingおよびChapter19 Performance Tuningをベースとし、内容をまとめているあ。大まかな流れとしては以下。

  1. Monitoring
    1. What components to monitor
    2. What to monitor
    3. Spark Logs
    4. Spark UI - REST API and History Server
  2. Debugging - How to handle problems at first
  3. Performance Tuning

1. Monitoring

1-1. What components to monitor

Spark ApplicationをTroubleshootする上で、どのComponentで問題に発生したかを確認する必要があるが、同時にどのComponentをMonitoringできるのかについても知っておく必要がある。以下のComponentをMonitoringでき、それぞれ以下のような内容を確認できる。

  • Spark Application and Jobs: SparkUIや、Spark Logなどを確認することで、Performance tuningやTroubleshootが可能
  • JVM: Spark executorsはJVM上で稼働するので、各VMを確認することでSparkベースの次Levelのログを確認することができる。jstackによりstacktraceが、jmapによりheap-dumpが、jstatのよりtime based statisticsが、jconsoleによりJVM propertiesなどがvisualizeできる。さらにjvisualvmのようなToolで、SparkUIを見てもわからないような、もっとLowLevelでの確認が可能
  • OS/Machine: JVMはOS上で稼働するので、次にこれをmonitoringすると良い。CPU, network, I/O状況などをdstat, iotstad, iotopなどで確認すると良い
  • Cluster: Clusterの状況をGangliaや、Prometheusで確認すると良い

1-2. What to monitor

Spark ApplicationのMonitoringやdebuggingを行う上で、大まかに以下の2つをmonitoringすれば良い。

  1. Application関連のProcess (CPU usage, memory usage etc.)
  2. "1."における Query execution (e.g. jobs and tasks)

1-2-1. Driver & Executor Processes

Spark Applicationのmonitoringを行う場合、基本的にApplicationのstateはDriverに集約されるので、Driver周り (Log, DriverJVM)を確認すると良い。また各Executorの状態を確認するには、各Job (Sparkにおける)におけるExecutorの状態をLogやSpark metricsから確認すると良い。SparkではMetricsをカスタムすることができ、これらのMetricsはsystem based metricsであるDropwizard Metrics Libraryを使用している。Metricsについては${SPARK_HOME}/conf/metrics.propertiesにて指定することができる (以下に一部を抜粋する)。なお本metricsのPATHは、spark.metrics.confで指定されており、このPATHを変更することで任意のfileを参照させることも可能。

# syntax: [instance].sink|source.[name].[options]=[value]
...
# Enable CsvSink for all instances by class name
*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
# Polling period for the CsvSink
*.sink.csv.period=1
# Unit of the polling period for the CsvSink
*.sink.csv.unit=minutes

1-2-2. Queries, Jobs, Stages, and Tasks

SparkではDriverやExecutorのみでなく、クエリレベルでdebugが必要な場合もあるので、このような状況では、Spark Logsの他にQuery, Jobs, Stages, and Tasksを可視化できるSparkUIも確認すると良い。Spark (Application) LogsSparkUIを利用してdebug, performance tuningを行う。

1-3. Spark Logs

Spark Applicationに関して最も詳細なログが確認でき、spark.sparkContext.setLogLevel("<LogLevel; INFO, DEBUG etc>")によりLogLevelを設定することができる。YARNの場合であれば、containers/<ApplicationId>/配下にログが出力され、container_<timestamp>_<job>_<attempt>_000001がDriver Log、その他がExecutor Logを表す。またcontainerの番号が..._0003の次に..._0006などと飛んでいる場合は基本的にExecutorが何らかの原因で終了し、再試行された状況であると言える。

現状SparkではJava-based logging libraryを使用しているので、PySpark自体はあまり有効なLogを出せない場合が多い。どこで処理が中断したかなどを正確に把握するためにLogging Libraryなどを用いて、PySpark code側で目安となるログを出力するようにしておくと良い。

1-4. SparkUI

SparkUIを使用することでSpark workloadやJVM LevelでのApplicationの状況を可視化し確認することができる。SparkContextが作成されるたびに、WebUIがlaunchされ、http://[DriverHostName]:4040 にアクセスすることでApplicationの状況を確認できる。なおPortはデフォルト4040で、その後連番で増加し各Portが使用される。

SparkUI上部のtabについてそれぞれ以下のような情報にアクセスでき、確認したい内容をここから選択する (基本的にはJobsやStagesを確認することになる)。

  • Jobs tab: Applicationにおける各Spark Jobs
  • Stages tab: 各JobにおけるStagesとStageに対するTaskを確認
  • Storage tab: 各Spark Applicationでcacheされているデータの情報
  • Environment tab: ApplicationにおけるConfigurationの内容
  • Executors tab: 各ExecutorにおけるApplicationの状況
  • SQL tab: Strucuted API queries (SQL and DataFrames)

SparkUIも設定変更が可能で、例えばアクセスコントロールなどができる。UI Configurationについてはを確認すると良い。

Spark REST API

SparkUIに加え、REST API経由 (GETrequest)でもSpark statusを確認することができる。http://[DriverHostName]:4040/api/v1 REST API経由でに対してリクエストを行うことで確認ができる (なお以降に示すHistory ServerもREST API経由でアクセス可能で、この場合は、http://[ServerURL]:18080/api/v1 に対しリクエストを行えば良い)。詳細なAPIやリクエストPATHについてはREST API - Monitoring and Instrumentation - Spark 2.4.4 Documentationに記載がある。

SparkUI History Server

SparkUIはSparkContextがrunningの状態の場合以外にも、Event Logを有効化し、Spark History Serverを立ち上げることで、SparkUIをrunning application状態でない場合でも (いつでも)、確認することができるようになる。Spark History Serverを使用する場合は、spark.eventLog.enabledを有効化し、Event Log locationをspark.eventLog.dirより指定する必要がある。これらの詳細なoptionについてはSpark History Server Configuration Options - Monitoring and Instrumentation - Spark 2.4.4 Documentationに記載がある。

2. Debugging - How to handle problems at first

ここでは前回のMonitoringの要素を利用し、Logの内容からまず予想されるエラー原因をどうDebugするのかについて、"よくある"エラー (Taskが遅い、あるいはOOMが発生しているなど)を対象に確認していく。本sectionでは以下の項目について確認する (RunJobからJob実行中に発生する事象について確認する)。

  1. Spark Application Not Starting
  2. Errors Before Execution
  3. Errors During Execution
  4. Slow Application
    1. Slow Tasks or Stragglers
    2. Slow Aggregations
    3. Slow Joins
    4. Slow Reads and Writes
  5. Specific Errors
    1. Driver OOM Error or Driver Unresponsive
    2. Executor OOM Error or Executor Unresponsive
    3. Unexpected Nulls in Results
    4. No Space Left on Disk Errors
    5. Serialization Errors

2-1. Spark Application Not Starting

これはSparkの環境を構築した初期に起こりうる事象で、以下のような状況が確認される。

  • Spark ApplicationがStartしない
  • SparkUIがDriverを除くclusterのNode情報について何も表示しない
  • SparkUIが誤った情報をreportする

How treating

多くの場合は、clusterまたはApplication resource demandsが適切に設定されていない状況が考えられる。そのため、まずはNetwork (正しいIPか、あるいは必要なPortが開放されているか)、Filesystemsなどのリソースを確認する。また考えられる状況としては、Cluster Manager側で持っている空きResourceよりも、1 Executorあたりに大きなResourceを要求してしまうと、各ExecutorがLaunchされるまでDriverは永遠に待ち続ける挙動となる。具体的には以下の内容を確認すると良い。

  • Machinesが別のmachinと指定したPortでcommunicateできるか
  • Spark resource configurationが正しく設定できており、Cluster ManagerがSparkを適切にsetupできているか (簡単なApplicationを実行して確認する)
    • よくある問題としては、1 Executorあたりのmemoryよりも大きいmemoryをspark-submit時にリクエストしまうことがある

2-2. Errors Before Execution

よくある状況として新しいApplicationを開発している時に、それまでは動いていたコードが、動かなくなるといったことが考えられる。具体的なには以下のような状況が確認される。

  • 実行したコードが正常に実行できず、大量のError messageが出力される
  • SparkUIをCheckしても、Job, Stages, Tasksも全くない (実行された形跡がない)

How treating

コード起因の可能性が高いため、まずはコードをCheckする。その他SparkUIやDataFrame.explain()DataFrame.debugCodegen (Scala only) methodで確認できる。

  • Codeに誤りがないか (file pathや、field nameなど) Error messageを確認する
  • ClusterにおけるNetwork Connectivityを確認する (Driver, Worker, Storage systemなど)
  • Library pathやClass pathなどに誤りがないか確認する

2-3. Errors During Execution

Spark Application実行中に関しては様々な状況が存在する。

  • あるSpark Applicationは成功したが、同様のクラスターで実行した次のSpark Applicationが失敗した
  • 複数stepあるうちの1 stepにてqueryが失敗した
  • 昨日は成功したScheduling jobが今日失敗した
  • Error messageの出力から具体的なエラーが内容がわからない (わかりづらい)

How treating

様々な状況が考えられるが以下の内容を確認する。

  • Dataが存在するか、あるいはData formatが意図した通りのものとなっているか
  • Run query時、即座にErrorが出る場合 (つまりCompile時Error、Task Launch前)、Analysis Errorの場合が多いので、この場合はQueryを見直す
  • Stacktraceを読んで、どのComponentでErrorとなっているか確認する
  • inputデータと、どのデータが想定したものとなっているか確認し、問題を特定する。問題となった処理を見つけるまで正しいロジックを取り除いて範囲を狭めていくと良い
  • Applicationがたまに失敗するような場合があった際は、input dataに問題がないか確認する。schemaが正しく指定されていない、あるいは特定のRowがschema通りになっていないなどが考えられる。例えばnullを許容していないcolumnであるにもかかわらずnullデータがあるなど
  • Processing data中にProgramがcrashした場合は、Sparkにより何らかのExceptionが投げられるが、SparkUIにより "Failed" とマークされたTaskを確認すると、Error詳細を簡単に確認できて良い

2-4. Slow Application

2-4-1. Slow Tasks or Stragglers

Applicationを最適化するにあたって問題となる多くが、処理をうまく並列に分散できていない、あるいはHardware problemなどで特定のmachineにおける処理が動いていない or 遅いなどといった問題がある。具体的に観察される事象としては以下が挙げられる。

  • 特定のStageが終了せず、少数Tasksが残り続ける
  • SparkUIでSlow taskが確認でき、常に特定のdatasetに対し処理した際に発生する
  • Slow taskが複数Stageにまたがって段階的に発生する
  • Scale outをしたが効果がない
  • Spark metricsをにおいて特定のExecutorにおけるRead/Writeが他のExecutorに比べ極端に多い
How treating

Slow tasksはしばしばStragglersと呼ばれる。よくある原因としては、データが均等にpartitioningできていないといった事象である。この場合、多くは特定のExecutorに処理が偏り、対象のStageが完了せず処理が遅くなる。groupByKey operationにて特定のkeyをもつデータが大量に存在している場合も、対象のKeyに対する処理に偏ることが想定される。SparkUIより、いくつかのNodeにおいてShuffle dataが極端に多く行われていることが確認できる。これらに関する対応策は以下が考えられる。

  • 1 Partitionあたりのデータを減らすため、Partition数を増加させる
  • 偏りのないKeyをベースにRepartitioningする
  • ExecutorにおけるMemoryを増加する
  • ExecutorやMachine側に問題がないか (disk fullなど)をSparkUI, Spark metricsなどによりモニタリングする
  • AggregationやJoin処理に時間がかかっている場合は、それぞれ2.4.2 Slow Aggregationsおよび、2-4-3. Slow Joinsにおける内容を確認する
  • UDFを使用していた場合は、無駄なobject allocationやbusiness logicがないか確認する (DataFrameに可能であれば変換する。ただしPySparkの場合はSerdeが行われる場合があることに注意する)
  • UDFやUDAF (User Defined Aggregate Functions)が少量のデータ単位で処理を行っていないか確認する。Executorをフルに使えるよう可能な範囲で処理単位を大きくする
  • Speculationを有効にする (#2-4-4. Slow Reads & Writesで詳細について記載する)。簡単に記載すると遅いTaskのコピーを起動する。ただし新たにリソースが必要になることと、Eventually consistentなresourceへの書き込みの場合、データが重複する可能性があることに注意する。
  • Datasetsを利用している場合の問題点として、DatasetsではCatalyst経由でJava objectを作成するので、GCが大量に発生する可能性がある。Slow tasksがある場合はSparkUIを通して、GC metricsを確認するとよい

2-4-2. Slow Aggregations

特定のTaskにおけるAggregation processingに時間がかかっていると考えられる。

  • groupByの実行に時間がかかっているTaskがある
  • Aggregation後のJobsが遅い
How treating

こういった問題はしばしば解決しない場合がある。状況としては単にAggregation対象の元データのKeyが偏っているような場合は避けられないといったことが考えられる。

  • 処理の並列度を増加するため、Aggregation前にPartition数を増加させる
  • Executor memoryを増加する。これにより、ある特定のKeyにおけるデータが大量にある場合、spill to diskを防げ、改善する可能性がある
  • Aggregation後のTaskが遅い場合は、均一にデータが各Partitionごとに分散していない可能性があるので、repartition methodにより分散すると良い
  • 処理対象のデータをSELECT queryなどでfilterした上で絞り、処理データを減らす
  • null valueが正しくnullとなっているか確認する (別の文字で置き換わっていないか)。Sparkではなるべくnullを無視するようにするが、nullが別の文字となっている場合無視されない
  • Driverに返すようなAggregation method (collect_listcollect_setなど)がperformanceに影響していないか確認する

2-4-3. Slow Joins

JoinおよびAggregation operationは基本的にShuffleが発生する。各operationについてはそれぞれほぼ同様の対応を取ることができる。

  • Join stageに大量の時間がかかっている (これはSingle Taskか、大量のTaskかが想定される)
  • Join前後の複数Stagesに関してoperationが正常に行われているか
How treating
  • 最適なJoin types (inner, outer etc)を選択する
  • 複数Joinする場合は、Joinの順番をテストベースで最も良い順番にする。大量データを対象にJoinする必要がある場合は先に行う
  • Join処理前にPartitioningを行うことでShuffle数を減らすことができる (テストを行うと良い)
  • Slow joinsはdata skew (データの偏り)により発生する場合がある。分散することももちろんだが、Scale Up/Outを行うことでも改善する可能性がある
  • (Aggregationの場合と同様) 処理対象のデータをSELECT queryなどでfilterした上で絞り、処理データを減らす
  • (Aggregationの場合と同様) null valueが正しくnullとなっているか確認する (別の文字で置き換わっていないか)。Sparkではなるべくnullを無視するようにするが、nullが別の文字となっている場合無視されない
  • たまにDataFrame同士のJoinでも意図した通りBroadcast Joinにならない場合があるので、Forced Broadcast Joinする (参考: Join operations in Spark - Broadcast Join - Big to SmallあるいはAnalyze tableすると良い (参考: Analyze Table — Databricks Documentation)

2-4-4. Slow Reads & Writes

Slow I/Oについては判断が難しい。特にnetworked file systemsの場合、判断が難しくなる。具体的には以下のような状況が確認される。

  • DFSやExternal systemからのデータのReadが遅い
  • Network file systemやBlob storageへのWriteが遅い
How treating
  • Speculationを設定する (spark.conf.set("spark.speculation", true)とする)。これは最初に起動したtaskが遅い場合に、同じtaskを立ち上げるという設定である (MapReduceのspeculative taskと同様と考えて良い)。本設定によりRead/Writeのスピードが改善する可能性はあるが、同時にS3のようなEventually consistentなStorageの場合は、duplicated dataが発生する可能性があるので取り扱いには注意する
  • Storage systemとのNetwork connectivityやbandwidthに問題がないか確認する
  • HDFSのようなDFSを利用している場合は、Sparkが"locality-aware scheduling"できているかどうか確認する。これはSparkUIにおける "locality" Columnから確認することができる

2-5. Specific Errors

2-5-1. Driver OOM Error or Driver Unresponsive

本エラーはSpark Application自体はcrashするので防ぐことは重要である。例えば以下のような状況を考える。

  • Spark Applicationの応答がない、またはCrashした
  • OutOfMemoryErrorsまたはGC messageがDriver logに記録された
  • Commandsが実行されるまでかなりの時間を要した、または実行されなかった
  • REPLが遅い、または反応がない
  • DriverJVM memory usageが高い
How treating
  • Large datasetに対して、collectのようなDriver nodeに集めるoperationを実行していないか確認する
  • Large data tableに対して、Broadcast Joinを行っていないか確認する。spark.sql.autoBroadcastJoinThresholdにより設定できる (Ref: Performance Tuning - Spark 2.4.4 Documentation)。なおこのsizeについてはorg.apache.spark.util.SizeEstimatorによりEstimateされている
  • Long-running applicationはDriver上に大量のobjectsを生成し、それをreleaseできなくなる場合がある。jmap toolを使用することでDriverJVMをどの程度利用しているのかどうか確認することができる。一方でjmapは稼働中のJVMを計測のためpausingする点に注意する必要がある
  • 可能な範囲でDriver memoryに割り当てる量を増やす
  • Pythonような別の言語bindingを使用している場合に、Data conversionなどで対象のJVM memoryを必要とするような状況で、JVM OOMが発生する可能性がある。この時、ScalaやJavaなどJVM friendlyな言語を使用して、Driver nodeに戻すデータが少なくなるか確認する、あるいはMemory内データをファイルとして書き出すことでDriver issueが改善するか確認する
  • 別client (NotebookやJDBC serverなど)からSparkContextにアクセスしている場合に、大量のデータをDriver memory上に配置していないかを確認する。例として、大きいサイズのArrayや、大きいdatasetに対しcollect methodを呼んでいないかなどが挙げられる

2-5-2. Executor OOM Error or Executor Unresponsive

Sparkでは時可能な範囲で自動でrecoverされる (一方でDriverOOMはrecoverできない)。

  • OutOfMemoryErrorあるいは、GC messageがExecutor logに出力されている (Spark Logs, SparkUIで確認できる)
  • Executorがcrashしているかresponseがない
  • 特定のnodeでtaskが遅く、recoverする様子がない
How treating
  • Executor memoryあるいはExecutor数を増加させる
  • Python Worker (for PySpark) sizeを変更する (Configuration - Spark 2.4.4 Documentationを参照)
  • gc
  • null valueが正しくnullとなっているか確認する (別の文字で置き換わっていないか)。Sparkではなるべくnullを無視するようにするが、nullが別の文字となっている場合無視されない (上記でも記載している)
  • RDDやDatasetsなどSerdeによる遅延が発生していないか確認する。UDFは減らし、Structured APIを利用すると良い
  • jmapによりExecutorのheap memory usageを確認する。どのclassがusageが高いか確認すると良い
  • Executorが他のアプリケーションなど別のworkloadが稼働するようなNodeの上でにないことを確認する。Spark Applicationを別のJobとは隔離して実行する

2-5-3. Unexpected Nulls in Results

以下のような状況が想定される。

  • Transformation後に意図せずvalueがnullとなった
  • 正常に稼働していたSpark Applicationが (ある時点から)正常終了しなくなった、また正しい結果を返さなくなった
How treating
  • 処理対象のData formatが (Spark側ロジックとは無関係に)変わった可能性があるため、対象のデータを生成するcodeなどに問題がないか確認する
  • Accumulatorを使用し、recordのcountや、特定のtypeのcountを行い、parsing/processingの際にerrorが発生した場合は、対象のrecordをskipするように設定する。これによりどのformat、どのtypeでparsing/processingが失敗したか確認できる。UDFとAccumulatorと組み合わせ、valid recordはそのまま利用、invalid recordは意図した内容に変換するといった柔軟な処理を実装することもできる
  • SparkSQLは時折、暗黙の型強制を行う場合があり、これに伴い結果が意図したものとならない場合がある。例として、SELECT 5*"23"のようなクエリを実行した場合に、"23"はstringであるがintに変換され、115という結果が返るが、SELECT 5*" "にようなクエリを想定すると、" "がintに変換された際にnullとなり、クエリ結果としてnullが返る場合がある。そのめTransformationが意図したQuery planとなっているか確認する必要がある。これについては、printSchema methodを用いてschemaを確認し、CASTがどこで行われているかQuery planを確認すると良い (SparkUIから確認可能)

2-5-4. No Space Left on Disk Errors

no space left on disk errorが出力され、実行中のSpark Applicationが失敗した。

How treating
  • Disk spaceを増加させる (storage sizeをあげる、あるいはstorageを追加する)
  • Disk spaceに限りがある場合は、partitioned dataが特定のNodeに偏っていないか確認し、偏っている場合はrepartitionする
  • しばしばLog起因で本エラーが発生する場合があるので、Executor log optionを変更するとよい。具体的な変更optionについてはConfiguration - Spark 2.4.4 Documentationに記載がある (spark.executor.logs.rolling.maxRetainedFilesspark.executor.logs.rolling.enableCompressionなどが設定できる)
  • 必要のないLog fileやshuffle fileを削除する (ただし一時対応)

2-5-5. Serialization Errors

Serialization errorが出力され、実行中のSpark Applicationが失敗した。

How treating
  • Structured APIを利用すれば基本的には発生しないが、UDFやRDDと連携している場合に発生する可能性がある。これは元data format自体がserializeできない状態になっていたり、Kyro serializationを意図したものとなっている (serializerが異なる)、そして、特定のデータにおけるdata typeがserializeできないといったような状況が考えられる。そのため元データやserialization関連のclassを調べる必要がある
  • Java or Scala class内でUDFを使用する場合は、UDF内で保持するenclosing objectのfieldを参照しないようにする。これはSparkがenclosing object全体をserializeしようとするためで (ほぼ不可能)、代わりにclosureと同じscope内のlocal変数に関連するfieldをコピーし、closureとして使い回すようにする

3. Performance Tuning

ここでは、前sectionまでのoptimizingに加え、以下の観点でoptimizingを行う。

  1. "Indirect" Optimizing by setting configuration values or changing the runtime environment
    1. Code-level design choices (e.g. RDD vs. DataFrames)
    2. Object serialization in RDDs
    3. Cluster Configuration - Dynamic allocation etc.
    4. Scheduling
    5. Data at rest (Stored data) - Splittable, partitioning, bucketing etc.
    6. Shuffle Configuration
    7. Memory & GC
  2. "Direct" Optimizing by changing execution characteristic or design choice at the individual Spark job, stage or task level
    1. Parallelism
    2. Improved Filtering
    3. Repartitioning & Coalescing
    4. UDFs
    5. Caching
    6. Join & Aggregations
    7. Broadcast Variables

Spark Application performanceを向上させるにあたり、SparkUIや、Spark metricsなどを確認しながら行うと良い。

3-1. "Indirect" Performance Enhancement

3-1-1. Code-level design choices

Spark Applicationを作成する上で重要となる要素について記載する。具体的には以下の内容について記載する。

  • Scala vs. Java vs. Python vs. R
  • DataFrames vs. SQL vs. Datasets vs. RDDs
Scala vs. Java vs. Python vs. R

以下を考慮して選択すると良い。

  • MLに関してはRがML algorithmなどに関して豊富で強いのでSparkRを用いると良い
  • StructuredAPIsに関してはどの言語でもSpeed, stabilityが良い (ScalaAPIとして実行されるため)
  • RDD TransformationやUDFsを使用する場合は、Scalaを用いると良い。Scala自体の型をそのまま利用できるので、別の言語でtypeがうまく扱えないといったような事態を防げる可能性がある
DataFrames vs. SQL vs. Datasets vs. RDDs
  • どの言語でもDataFrame, Datasets, SparkSQLを使用すれば同様のspeedと考えて良い (内部的にはこれら全てにおいてScala APIが呼ばれるため)
  • UDFsを利用した場合は、Python, Rに関しては、ScalaやJavaに比べると遅くなる (SerDeの影響があるため)ので注意する
  • RDDを使用したい場合は、Java objectベースのScalaおよびJavaを使用する

3-1-2. Object Serialization in RDDs

DefaultのJava serialization libraryに含まれていないcustom data typesを利用する場合は、Kyro serializerを利用すると、Java serializationに比べ最大10倍程度速くなり、Performanceの向上が見込める。ただしKyroを利用するには事前に以下の手続きをふむ必要がある (ここでは簡単に記載する。詳細についてはKyro documentを参照)。

  • spark.serializerorg.apache.spark.serializer.KyroSerializerを設定する
  • spark.kyro.classesToRegisterに使用したいclassを設定する (conf.registerKyroClasses(Array(classOf[MyClass1], classOf[MyClass2]))によりclassを設定できる)

3-1-3. Cluster Configurations

Cluster側のconfigurationについては様々な設定があるが、ここでは代表的なものについて記載する。

  • Cluster/application sizing and sharing - ApplicationやCluster単位でconfigurationを行う (NumOfTasks/executorなど)。詳細についてはConfiguration - Spark 2.4.4 Documentationを確認する
  • Dynamic allocation - pending時間が長いtaskに対し、新たなtaskを起動するように設定できる。spark exeternal shuffle serviceが必要なことと、spark.dynamicAllocation.enabledtrueに設定する必要がある (設定についてはJob Scheduling - Spark 2.4.4 Documentationを参照)

3-1-4. Scheduling

ここでは複数Applicationを同一リソースで実行する際のperformanceを上げる方法について確認する。

  • Scheduler poolを調整し、並列度を上げる - FAIR schedulerに変更する (spark.scheduler.modeFAIRに変更する。詳細はJob Scheduling - Spark 2.4.4 Documentation)ことでより効率的にResourceをshareできる可能性がある
  • Dynamic allocationやExecutorあたりのcore数を変更することで並列度を上げる - --max-executor-cores (or spark.cores.max)により対象のApplicationで使用する最大Executor core数を決めることができる (余分にリソースを使用しすぎないように設定することができる)

3-1-5. Data at Rest

ここではRead処理に着目し、具体的にStored dataに対し行えるTuningについて確認する。

  • File-based long-term data stroage: csvなどのsemi-structured formatはparseに時間がかかる。SparkではApache parquetの使用を推奨している
  • Splittable file types and compression: Splittable fileに対してはfileを分割して読み込むことができるため、読み込みが速くなる可能性がある。各File formatおよびcompressionに関するsplittable or unsplittableについてはSplittable File Types and Compression Format - Data Sources API in Sparkに記載している
  • Table partitioning: Partitioningすることで読み込みの際に必要なPartitionのみReadすることができ、より読み込みPerformanceの向上が見込める (詳細はBucketing - Data Sources API in Sparkを参照)
  • Bucketing: 同じKeyのデータを予め集合させておくことで、全データをScanする必要なく、これも読み込みPerformanceの向上が見込める (詳細はBucketing - Data Sources API in Sparkを参照)
  • Managing the number of files: 8-3. Managing File Size - Data Sources API in Sparkに記載の通り、File数や各fileごとのサイズも考慮する必要がある。Small size fileが大量にある場合は、1 fileごとのlocation取得やRead taskが大量に作成されるため時間がかかり、Large size fileが少量の場合でも、1 Taskあたりの処理時間がのびる可能性があるため、バランスを取る必要がある。一般的には1 taskあたり最低でも数十MBとなるように調整することが目安。Write時のFile sizeの調整は1 fileあたりのrecord数を指定できるmaxRecordsPerFile parameterを使用すると良い
  • Data locality: 特定のNodeで起動するTaskと同様のNodeに処理対象のデータが存在する場合、対象のデータ転送を行わなくても良いので、Performanceの向上が見込める。Sparkでは基本的にinput blockになるべく近い場所でTaskを起動しようとする。Data localityかどうかについては、SparkWebUI経由で確認することができる (Stages tabにおけるTasksのLocality LevelLOCALとマークされる)
  • Statistics collection: SparkはCBO機能も含んでいる。これはinput dataをベースとしてstatisticsが必要となる。statisticsにはtable-levelcolumn-levelがあり、後者が前者に比べデータ収集に時間を要する場合が多いものの、より多くの情報を取得することができる。なおTableのみ対象で、DataFrameやRDDには使用できない。具体的なクエリについて以下に記載する。
    • Table-level: ANALYZE TABLE <TABLE_NAME> COMPUTE STATISTICS
    • Column-level: ANALYZE TABLE <TABLE_NAME> COMPUTE STATISTICS FOR COLUMNS <COL_NAME1>, <COL_NAME2>, ...

3-1-6. Shuffle Configuration

DynamicAllocationとセットでExternal Shuffle Serviceを設定することでPending taskが長い (Application起動後にspark.dynamicAllocation.schedulerBacklogTimeoutの時間が経過すると新しいExecutorの要求を行う)場合に、Executorを起動していくような動きとなるため、Performanceの向上が見込める。

加えて、RDDベースの処理に関してはSerializationがShuffle performanceに与える影響が大きく、可能であればKyroを使用する (Java serializationよりも速い)。またPartition数も調整する (目安についてはA Deeper Understanding of Spark Internals - Aaron Davidson (Databricks)を確認すると良い)。

3-1-7. Memory Pressure & GC

Spark Applicationを実行中に、memory不足に起因しTaskがcompleteしない状況が発生することがある。この原因として考えられるものは、memoryを大量に使用する処理が動いているか、GCが頻繁に発生し、大量のobjectがJVM上で作成されたりすることで不必要なmemory領域が解放されず、memoryを圧迫するといったことである。対応方法の1つとしては、Structured APIを利用することがあげられる。ただしRDDやUDFを使用したApplicationに関しては、GCの観点でTuningが必要となる場合もあるため、ここではGC Tuningについてまとめる。詳細についてはGarbage Collection Tuning - Tuning - Spark 2.4.4 Documentationに記載があるので、こちらを参考にすると良い。

[1st step] Measuring the impact of GC

GC Tuningの最初のstepとして行うことは「どの程度GCが発生しているのか」及び「どの程度GCに時間がかかっているのか」という情報を収集することである。これを実施するためには、spark.executor.extraJavaOptions (Spark JVM option) parameterに-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStampsを設定する。これによりGCに関する情報を取得することができる。ただしExecutorごとにlogが出力されるため、各WoprkerNode側のログを確認する必要があることに注意する。

GC tuning

GC Tuningの前に以下にGCに関するbasic informationをおさえておく。

  • Java heap spaceは以下の2領域に分かれる
    • Young: short-lived objects
    • Old: longer lifetimes objects
  • Young generationはさらに3領域に分割される - Eden, Survivor1, and Survivor2

またGCは以下の過程で行われる。

  1. Edenがfullになると、minor GCがEden上で行われ、alive objectsがEdenおよびSurvivor1からSurvivor2へコピーされる
  2. Survivor領域が、swapされる
  3. Objectが古いもの、あるいはSurvivor2がgullの場合は、Oldへ移動される
  4. 最後にOldがfull近くなると、FullGCが行われる。FullGCはheap上の全objectのtracingや、削除、未使用領域へのobject移動などが含まれるため、一般的にGCでは一番時間を要する

SparkにおけるGC TuningのGoalは、long-livedなcached datasetsをOld領域に格納し、Young領域に全short-lived objectsを割り当てられるよう十分な領域を確保することである。これによりFull GCを避けることができる。以下にTuning例の一部を記載する。

  • Task completion前にFull GCが複数回呼ばれている場合は、Task executionに必要なmemoryが不足している場合が想定される。この場合、sparkで使用しているmemory使用量 (spark.memory.fraction分) を抑えるようにする
  • minor GCが大量に発生している、かつFull GCは適量な場合、Eden側によりmemoryを割り当てることで改善する可能性がある。具体的には、Eden sizeをEとすると、Young領域を4/3 E となるようにすると良い (`-Xmn=4/3E`で設定)
  • G1GC GCを利用する (-XX:+UseG1GCで設定)。これはGC自体がbottleneckとなっている場合や、Young/Old領域を減らすことができない場合に有効となる可能性がある。-XX:G1HeapRegionSizeでG1領域のサイズを変更できる。またGCを変更した場合は、1st step同様に、GC頻度と時間を計測する

3-2. "Direct" Performance Enhancements

3-2-5. Cachingのみ詳細に記載する。

  • 3-2-1. Parallelism: 特定のstageにおけるparallelismを調整する。少なくとも2, 3 tasks/Coreとなるよう設定することが推奨される。またCore数に伴いspark.default.parallelismおよびspark.sql.shufflepartitionsも変更することが推奨される。  
  • 3-2-2. Improved Filtering: 処理に必要なデータのみを初期時にpushDownQueryや特定のpartitionに対し処理を行うなどし、filteringすると良い。PartitioningやBucketingは有効である。
  • 3-2-3. Repartitioning & Coalescing: Shuffleが発生するものの事前にpartition数を調整しておくことで、Join performanceを向上したり、効率的に並列処理できる可能性がある。Custom partitionerを使用してpartitioningを調整しても良い。
  • 3-2-4. UDFs: UDFsはSerializationの影響を受けるため、JVM based languageを使用する場合は注意する必要がある。Pandasを利用して複数recordをbatch処理するpythin extensionであるPandas UDFsを使用すると良い。
  • 3-2-6. Join & Aggregations: JoinについてはBroadcast joinを行うことでPerformance向上が見込めるので、Boradcast join hintを付与すると良い。Bucketingも有効である。Cartesian joinやfull-outer joinについてはなるべく避けるようにする。これらについては.explain() methodで確認すると良い。またAggregationについてはdataをfilterしておくことや、RDDベースに変換することで柔軟にコントロールすることができる (e.g. 可能な場合groupByKeyではなくreduceByKeyを使用するなど)
  • 3-2-7. Broadcast Variables: UDFを何度も呼び出すような処理の場合はBroadcast variablesとして扱うと良い

3-2-5. Caching

Spark applicationn内にて同じdatasetを何度も再利用するような場合のtunningとして、Cachingが有効である。CachingはDataFrame, table, RDDを一時的なstorage (memoryあるいはDisk)に保持する。ただし常に有効ではなく、cachingはobjectのSerDeやstorage costが発生するので必要な場合のみ使用する。またCachingはLazy operationであり、Cacheデータに対しアクセスする場合に実行される。なおCachingはRDD APIとStrucuted APIとで挙動が異なるので注意する。

  • RDD API: 実行時にそのまま実データとしてCacheされ、アクセス時に戻される。
  • Strructured API: まずPhysical planとしてcachingされ、実行時にCachigされる

以下にCaching時の各Storage levelsについて記載する。詳細についてはRDD Programming Guide - Spark 2.4.4 Documentationに記載がある。なおデフォルトStorage levelsに関して、RDDではMEMORY_ONLYであり、DataFrameではMEMORY_AND_DISKである。

Storage LevelDescription
MEMORY_ONLYRDD Default. DeserializeされたobjectとしてJVMに保存する。RDD sizeがmemoryにfitしない場合は、いくつかのPartitinがcacheされず、必要な場合にrecomputingされる
MEMORY_AND_DISKDataFrame Default. DeserializeされたobjectとしてJVMに保存する。RDD sizeがmemoryにfitしない場合は、spill to diskし、diskから読まれる
MEMORY_ONLY_SER (Java and Scala)SerializeされたJava object (one byte array per partition)として保存する。基本的にはDeserialized objectを保存するよりもmemory spaceを効率的に使用できる。速いserializerを使用すればするほど、CPU-intensive readとなる
MEMORY_ONLY_DISK_SER (Java and Scala)MEMORY_ONLY_SER同様だが、memory fitしない場合はpartition spill to diskとなる
DISK_ONLYRDD partitionsをDiskのみに保存する
MEMORY_<SAMEOPTIONS>_2他のLevelと同様の内容だが、各partition2 cluster nodesにreplicateする
OFF_HEAP (experimantal)MEMORY_ONLY_SERと基本的には同様だが、Off-heap memoryにデータを保存する (ただしoff-heap memoryを有効化する必要がある)

Cachingが有効な例として以下のようなコードを見てみる。

inputPath = ["s3://.../sharebike/201508_station_data.csv"]
df_base = spark.read.format("csv").option("header", True).load(inputPath)
df_id = df_base.groupBy("station_id").count().collect()
df_name = df_base.groupBy("name").count().collect()
df_dockcount = df_base.groupBy("dockcount").count().collect()

上記のような場合はdf_id, df_name, df_dockcount 3つに対してActionを実行する際に、3回Loadが呼ばれる事になる。これについてはSparkUI経由で確認できる (以下はdf_idをpickupしている。他のDataFrameについても同様。またShuffleが発生しているのではDataFrameのcount methodを実行しているため)。

こういった場合にCachingが有効である。以下のようにコードを書き換えることでLarge datasetがDatasourceの場合にperformanceを向上させることができる。なおCachingがlazy operationであるため、count actionを実行し先にcachingされるようにしている。

inputPath = ["s3://.../sharebike/201508_station_data.csv"]
df_base = spark.read.format("csv").option("header", True).load(inputPath)
df_base.count()
df_id = df_base.groupBy("station_id").count().collect()
df_name = df_base.groupBy("name").count().collect()
df_dockcount = df_base.groupBy("dockcount").count().collect()

上記コードをSparkUIで確認すると以下。

Reference

  1. The Spark Definitive Guide, Chapter18 Monitoring & Debugging
  2. The Spark Definitive Guide, Chapter19 Performance Tuning
  3. Dropwizard Metrics Library
  4. Monitoring and Instrumentation - Spark 2.4.4 Documentation
  5. Analyze Table — Databricks Documentation
  6. Performance Tuning - Spark 2.4.4 Documentation
  7. dataframe - What is the maximum size for a broadcast object in Spark? - Stack Overflow
  8. Configuration - Spark 2.4.4 Documentation
  9. Kyro serializer
  10. Job Scheduling - Spark 2.4.4 Documentation
  11. A Deeper Understanding of Spark Internals - Aaron Davidson (Databricks)
  12. Tuning - Spark 2.4.4 Documentation
  13. PySpark Usage Guide for Pandas with Apache Arrow - Spark 2.4.4 Documentation
  14. RDD Programming Guide - Spark 2.4.4 Documentation