Job Scheduling - Spark 3.5.1 Documentation (2024)

  • Overview
  • Scheduling Across Applications
    • Dynamic Resource Allocation
      • Caveats
      • Configuration and Setup
      • Resource Allocation Policy
        • Request Policy
        • Remove Policy
      • Graceful Decommission of Executors
  • Scheduling Within an Application
    • Fair Scheduler Pools
    • Default Behavior of Pools
    • Configuring Pool Properties
    • Scheduling using JDBC Connections
    • Concurrent Jobs in PySpark

Spark has several facilities for scheduling resources between computations. First, recall that, as describedin the cluster mode overview, each Spark application (instance of SparkContext)runs an independent set of executor processes. The cluster managers that Spark runs on providefacilities for scheduling across applications. Second,within each Spark application, multiple “jobs” (Spark actions) may be running concurrentlyif they were submitted by different threads. This is common if your application is serving requestsover the network. Spark includes a fair scheduler to schedule resources within each SparkContext.

When running on a cluster, each Spark application gets an independent set of executor JVMs that onlyrun tasks and store data for that application. If multiple users need to share your cluster, there aredifferent options to manage allocation, depending on the cluster manager.

The simplest option, available on all cluster managers, is static partitioning of resources. Withthis approach, each application is given a maximum amount of resources it can use and holds onto themfor its whole duration. This is the approach used in Spark’s standaloneand YARN modes, as well as thecoarse-grained Mesos mode.Resource allocation can be configured as follows, based on the cluster type:

  • Standalone mode: By default, applications submitted to the standalone mode cluster will run inFIFO (first-in-first-out) order, and each application will try to use all available nodes. You can limitthe number of nodes an application uses by setting the spark.cores.max configuration property in it,or change the default for applications that don’t set this setting through spark.deploy.defaultCores.Finally, in addition to controlling cores, each application’s spark.executor.memory setting controlsits memory use.
  • Mesos: To use static partitioning on Mesos, set the spark.mesos.coarse configuration property to true,and optionally set spark.cores.max to limit each application’s resource share as in the standalone mode.You should also set spark.executor.memory to control the executor memory.
  • YARN: The --num-executors option to the Spark YARN client controls how many executors it will allocateon the cluster (spark.executor.instances as configuration property), while --executor-memory(spark.executor.memory configuration property) and --executor-cores (spark.executor.cores configurationproperty) control the resources per executor. For more information, see theYARN Spark Properties.

A second option available on Mesos is dynamic sharing of CPU cores. In this mode, each Spark applicationstill has a fixed and independent memory allocation (set by spark.executor.memory), but when theapplication is not running tasks on a machine, other applications may run tasks on those cores. This modeis useful when you expect large numbers of not overly active applications, such as shell sessions fromseparate users. However, it comes with a risk of less predictable latency, because it may take a while foran application to gain back cores on one node when it has work to do. To use this mode, simply use amesos:// URL and set spark.mesos.coarse to false.

Note that none of the modes currently provide memory sharing across applications. If you would like to sharedata this way, we recommend running a single server application that can serve multiple requests by queryingthe same RDDs.

Dynamic Resource Allocation

Spark provides a mechanism to dynamically adjust the resources your application occupies basedon the workload. This means that your application may give resources back to the cluster if theyare no longer used and request them again later when there is demand. This feature is particularlyuseful if multiple applications share resources in your Spark cluster.

This feature is disabled by default and available on all coarse-grained cluster managers, i.e.standalone mode, YARN mode,Mesos coarse-grained mode and K8s mode.

Caveats

  • In standalone mode, without explicitly setting spark.executor.cores, each executor will get all the available cores of a worker. In this case, when dynamic allocation enabled, spark will possibly acquire much more executors than expected. When you want to use dynamic allocation in standalone mode, you are recommended to explicitly set cores for each executor before the issue SPARK-30299 got fixed.

Configuration and Setup

There are several ways for using this feature.Regardless of which approach you choose, your application must set spark.dynamicAllocation.enabled to true first, additionally,

  • your application must set spark.shuffle.service.enabled to true after you set up an external shuffle service on each worker node in the same cluster, or
  • your application must set spark.dynamicAllocation.shuffleTracking.enabled to true, or
  • your application must set both spark.decommission.enabled and spark.storage.decommission.shuffleBlocks.enabled to true, or
  • your application must configure spark.shuffle.sort.io.plugin.class to use a custom ShuffleDataIO who’s ShuffleDriverComponents supports reliable storage.

The purpose of the external shuffle service or the shuffle tracking or the ShuffleDriverComponents supports reliable storage is to allow executors to be removedwithout deleting shuffle files written by them (more detail describedbelow). While it is simple to enable shuffle tracking, the way to set up the external shuffle service varies across cluster managers:

In standalone mode, simply start your workers with spark.shuffle.service.enabled set to true.

In Mesos coarse-grained mode, run $SPARK_HOME/sbin/start-mesos-shuffle-service.sh on allworker nodes with spark.shuffle.service.enabled set to true. For instance, you may do sothrough Marathon.

In YARN mode, follow the instructions here.

All other relevant configurations are optional and under the spark.dynamicAllocation.* andspark.shuffle.service.* namespaces. For more detail, see theconfigurations page.

Resource Allocation Policy

At a high level, Spark should relinquish executors when they are no longer used and acquireexecutors when they are needed. Since there is no definitive way to predict whether an executorthat is about to be removed will run a task in the near future, or whether a new executor that isabout to be added will actually be idle, we need a set of heuristics to determine when to removeand request executors.

Request Policy

A Spark application with dynamic allocation enabled requests additional executors when it haspending tasks waiting to be scheduled. This condition necessarily implies that the existing setof executors is insufficient to simultaneously saturate all tasks that have been submitted butnot yet finished.

Spark requests executors in rounds. The actual request is triggered when there have been pendingtasks for spark.dynamicAllocation.schedulerBacklogTimeout seconds, and then triggered againevery spark.dynamicAllocation.sustainedSchedulerBacklogTimeout seconds thereafter if the queueof pending tasks persists. Additionally, the number of executors requested in each round increasesexponentially from the previous round. For instance, an application will add 1 executor in thefirst round, and then 2, 4, 8 and so on executors in the subsequent rounds.

The motivation for an exponential increase policy is twofold. First, an application should requestexecutors cautiously in the beginning in case it turns out that only a few additional executors issufficient. This echoes the justification for TCP slow start. Second, the application should beable to ramp up its resource usage in a timely manner in case it turns out that many executors areactually needed.

Remove Policy

The policy for removing executors is much simpler. A Spark application removes an executor whenit has been idle for more than spark.dynamicAllocation.executorIdleTimeout seconds. Note that,under most circ*mstances, this condition is mutually exclusive with the request condition, in thatan executor should not be idle if there are still pending tasks to be scheduled.

Graceful Decommission of Executors

Before dynamic allocation, if a Spark executor exits when the associated application has also exited then all state associated with the executor is no longer needed and can be safely discarded. With dynamic allocation, however, the application is still running when an executor is explicitly removed. If the application attempts to access state stored in or written by the executor, it will have to perform a recompute the state. Thus, Spark needs a mechanism to decommission an executor gracefully by preserving its state before removing it.

This requirement is especially important for shuffles. During a shuffle, the Spark executor firstwrites its own map outputs locally to disk, and then acts as the server for those files when otherexecutors attempt to fetch them. In the event of stragglers, which are tasks that run for muchlonger than their peers, dynamic allocation may remove an executor before the shuffle completes,in which case the shuffle files written by that executor must be recomputed unnecessarily.

The solution for preserving shuffle files is to use an external shuffle service, also introducedin Spark 1.2. This service refers to a long-running process that runs on each node of your clusterindependently of your Spark applications and their executors. If the service is enabled, Sparkexecutors will fetch shuffle files from the service instead of from each other. This means anyshuffle state written by an executor may continue to be served beyond the executor’s lifetime.

In addition to writing shuffle files, executors also cache data either on disk or in memory.When an executor is removed, however, all cached data will no longer be accessible. To mitigate this,by default executors containing cached data are never removed. You can configure this behavior withspark.dynamicAllocation.cachedExecutorIdleTimeout. When set spark.shuffle.service.fetch.rdd.enabledto true, Spark can use ExternalShuffleService for fetching disk persisted RDD blocks. In case of dynamic allocation if this feature is enabled executors having only disk persisted blocks are consideredidle after spark.dynamicAllocation.executorIdleTimeout and will be released accordingly. In future releases,the cached data may be preserved through an off-heap storage similar in spirit to how shuffle files are preserved through the external shuffle service.

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously ifthey were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save,collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safeand supports this use case to enable applications that serve multiple requests (e.g. queries formultiple users).

By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map andreduce phases), and the first job gets priority on all available resources while its stages have tasks tolaunch, then the second job gets priority, etc. If the jobs at the head of the queue don’t need to usethe whole cluster, later jobs can start to run right away, but if the jobs at the head of the queue arelarge, then later jobs may be delayed significantly.

Starting in Spark 0.8, it is also possible to configure fair sharing between jobs. Under fair sharing,Spark assigns tasks between jobs in a “round robin” fashion, so that all jobs get a roughly equal shareof cluster resources. This means that short jobs submitted while a long job is running can start receivingresources right away and still get good response times, without waiting for the long job to finish. Thismode is best for multi-user settings.

This feature is disabled by default and available on all coarse-grained cluster managers, i.e.standalone mode, YARN mode,K8s mode and Mesos coarse-grained mode.To enable the fair scheduler, simply set the spark.scheduler.mode property to FAIR when configuringa SparkContext:

val conf = new SparkConf().setMaster(...).setAppName(...)conf.set("spark.scheduler.mode", "FAIR")val sc = new SparkContext(conf)

Fair Scheduler Pools

The fair scheduler also supports grouping jobs into pools, and setting different scheduling options(e.g. weight) for each pool. This can be useful to create a “high-priority” pool for more important jobs,for example, or to group the jobs of each user together and give users equal shares regardless of howmany concurrent jobs they have instead of giving jobs equal shares. This approach is modeled after theHadoop Fair Scheduler.

Without any intervention, newly submitted jobs go into a default pool, but jobs’ pools can be set byadding the spark.scheduler.pool “local property” to the SparkContext in the thread that’s submitting them.This is done as follows:

// Assuming sc is your SparkContext variablesc.setLocalProperty("spark.scheduler.pool", "pool1")

After setting this local property, all jobs submitted within this thread (by calls in this threadto RDD.save, count, collect, etc) will use this pool name. The setting is per-thread to makeit easy to have a thread run multiple jobs on behalf of the same user. If you’d like to clear thepool that a thread is associated with, simply call:

sc.setLocalProperty("spark.scheduler.pool", null)

Default Behavior of Pools

By default, each pool gets an equal share of the cluster (also equal in share to each job in the defaultpool), but inside each pool, jobs run in FIFO order. For example, if you create one pool per user, thismeans that each user will get an equal share of the cluster, and that each user’s queries will run inorder instead of later queries taking resources from that user’s earlier ones.

Configuring Pool Properties

Specific pools’ properties can also be modified through a configuration file. Each pool supports threeproperties:

  • schedulingMode: This can be FIFO or FAIR, to control whether jobs within the pool queue up behindeach other (the default) or share the pool’s resources fairly.
  • weight: This controls the pool’s share of the cluster relative to other pools. By default, all poolshave a weight of 1. If you give a specific pool a weight of 2, for example, it will get 2x moreresources as other active pools. Setting a high weight such as 1000 also makes it possible to implementpriority between pools—in essence, the weight-1000 pool will always get to launch tasks firstwhenever it has jobs active.
  • minShare: Apart from an overall weight, each pool can be given a minimum shares (as a number ofCPU cores) that the administrator would like it to have. The fair scheduler always attempts to meetall active pools’ minimum shares before redistributing extra resources according to the weights.The minShare property can, therefore, be another way to ensure that a pool can always get up to acertain number of resources (e.g. 10 cores) quickly without giving it a high priority for the restof the cluster. By default, each pool’s minShare is 0.

The pool properties can be set by creating an XML file, similar to conf/fairscheduler.xml.template,and either putting a file named fairscheduler.xml on the classpath, or setting spark.scheduler.allocation.file property in yourSparkConf. The file path respects the hadoop configuration and can either be a local file path or HDFS file path.

// scheduler file at localconf.set("spark.scheduler.allocation.file", "file:///path/to/file")// scheduler file at hdfsconf.set("spark.scheduler.allocation.file", "hdfs:///path/to/file")

The format of the XML file is simply a <pool> element for each pool, with different elementswithin it for the various settings. For example:

<?xml version="1.0"?><allocations> <pool name="production"> <schedulingMode>FAIR</schedulingMode> <weight>1</weight> <minShare>2</minShare> </pool> <pool name="test"> <schedulingMode>FIFO</schedulingMode> <weight>2</weight> <minShare>3</minShare> </pool></allocations>

A full example is also available in conf/fairscheduler.xml.template. Note that any pools notconfigured in the XML file will simply get default values for all settings (scheduling mode FIFO,weight 1, and minShare 0).

Scheduling using JDBC Connections

To set a Fair Scheduler pool for a JDBC client session,users can set the spark.sql.thriftserver.scheduler.pool variable:

SET spark.sql.thriftserver.scheduler.pool=accounting;

Concurrent Jobs in PySpark

PySpark, by default, does not support to synchronize PVM threads with JVM threads and launching multiple jobs in multiple PVM threads does not guarantee to launch each jobin each corresponding JVM thread. Due to this limitation, it is unable to set a different job groupvia sc.setJobGroup in a separate PVM thread, which also disallows to cancel the job via sc.cancelJobGrouplater.

pyspark.InheritableThread is recommended to use together for a PVM thread to inherit the inheritable attributes such as local properties in a JVM thread.

Job Scheduling - Spark 3.5.1 Documentation (2024)
Top Articles
Latest Posts
Article information

Author: The Hon. Margery Christiansen

Last Updated:

Views: 5726

Rating: 5 / 5 (50 voted)

Reviews: 81% of readers found this page helpful

Author information

Name: The Hon. Margery Christiansen

Birthday: 2000-07-07

Address: 5050 Breitenberg Knoll, New Robert, MI 45409

Phone: +2556892639372

Job: Investor Mining Engineer

Hobby: Sketching, Cosplaying, Glassblowing, Genealogy, Crocheting, Archery, Skateboarding

Introduction: My name is The Hon. Margery Christiansen, I am a bright, adorable, precious, inexpensive, gorgeous, comfortable, happy person who loves writing and wants to share my knowledge and understanding with you.