Spark¶
Description¶
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.
Environment Modules¶
Run module spider spark
to find out what environment modules are available for this application.
Environment Variables¶
- HPC_SPARK_DIR - installation directory
- HPC_SPARK_BIN - executable directory
- HPC_SPARK_SLURM - SLURM job script examples
- SPARK_HOME - examples directory
Additional Usage Information¶
To run your Spark jobs on HiperGator, two separate steps are required:
- Create a Spark cluster on HiperGator via SLURM. This section "Spark Cluster on HiPerGator" below shows a simple example how to create a Spark cluster on HiperGator.
- Submit your job to your Spark cluster. You can do this either interactively at the command line ("Spark Interactive Job" section below) or by submitting a a batch job ("Spark Batch Job" section below)
For Spark parameters used in this section, please refer to Spark's Homepage.
Spark Cluster on HiperGator:¶
Expand this section to view instructions for creating a spark cluster in HiperGator:
It is assumed that spark-local-cluster.sh is the file name of the SLURM job script for one-worker node Spark cluster in this section. Set SLURM parameters for Spark cluster. spark-local-cluster.sh is available on "Spark_Job_Scripts" page section.
#!/bin/bash
#filename: spark-local-cluster.sh
#SBATCH --job-name=spark_cluster
#SBATCH --nodes=1 # nodes allocated to the job
#SBATCH --cpus-per-task=16 # the number of CPUs allocated per task
#SBATCH --exclusive # not sharing of allocated nodes with other running jobs
#SBATCH --time=03:00:00
#SBATCH --output=spark_cluster.log
#SBATCH --error=spark_cluster.err
module load spark
## Set Spark parameters for Spark cluster
export SPARK_LOCAL_DIRS=$HOME/spark/tmp
export SPARK_WORKER_DIR=$SPARK_LOCAL_DIRS
export SPARK_WORKER_CORES=$SLURM_CPUS_PER_TASK
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8080
export SPARK_NO_DAEMONIZE=true
export SPARK_LOG_DIR=$SPARK_LOCAL_DIRS
mkdir -p $SPARK_LOCAL_DIRS
##Set Spark Master and Workers
MASTER_HOST=$(scontrol show hostname $SLURM_NODELIST | head -n 1)
export SPARK_MASTER_NODE=$(host $MASTER_HOST | head -1 | cut -d ' ' -f 4)
export MAX_SLAVES=$(expr $SLURM_JOB_NUM_NODES - 1)
## for starting spark master
$SPARK_HOME/sbin/start-master.sh &
## use spark defaults for worker resources (all mem -1 GB, all cores) since using exclusive
## for starting spark worker
$SPARK_HOME/sbin/start-slave.sh spark://$SPARK_MASTER_NODE:$SPARK_MASTER_PORT
Submit the SLURM job script to HiperGator
sbatch spark-local-cluster.sh
grep "Starting Spark master" spark_cluster.err
18/03/13 14:53:23 INFO Master: Starting Spark master at spark://c29a-s42.ufhpc:7077
grep "Starting Spark worker" spark_cluster.err
18/03/13 14:53:24 INFO Worker: Starting Spark worker 172.16.194.59:42418 with 16 cores, 124.3 GB RAM
Spark Interactive Job:¶
Expand this section to view instructions for using the Spark interactive shell on HiperGator:
Spark supports interactive job submission through the interactive shells.
Spark interactive shell in Scalar (spark-shell)
First, load spark module in the terminal where you want to submit a spark job.
module load spark
Get the location of the Spark master to connect to it through the interactive shell
SPARK_MASTER=$(grep "Starting Spark master" *.err | cut -d " " -f 9)
Connect to the master using the Spark interactive shell in scalar
spark-shell --master $SPARK_MASTER
Spark interactive shell in Python (pyspark)
Load spark module in the terminal where you want to submit a spark job.
module load spark
Get the location of the Spark master to connect to it through the interactive shell
SPARK_MASTER=$(grep "Starting Spark master" *.err | cut -d " " -f 9)
Connect to the master using the Spark interactive shell in scalar
pyspark --master $SPARK_MASTER
Example - PI estimation via pyspark
SPARK_MASTER=$(grep "Starting Spark master" *.err | cut -d " " -f 9)
pyspark --master $SPARK_MASTER
Example - Pi estimation from file with pyspark
As of Spark 2.0., Spark interactive shell in python does not load python files to run python application. Instead, “PYTHONSTARTUP”, a python environmental variable can be used to run python script with pyspark, which executes the python script before an interactive shell starts.
SPARK_MASTER=$(grep "Starting Spark master" *.err | cut -d " " -f 9)
PYTHONSTARTUP=pi_with_pythonstartup.py pyspark --master $SPARK_MASTER
pi_with_pythonstartup.py script is avaialble on "Spark_Job_Scripts" page below.
Spark Batch Job:¶
Expand this section to view instructions for starting preset applications without a job script:
Spark supports batch job submission through spark-submit which provides unified interface for Spark jobs
$SPARK_HOME/bin/spark-submit \
--class <main-class> --master <master-url> \
--deploy-mode <deploy-mode> --conf <key>=<value> \
... # other options <application-jar> [application-arguments]
--class: The entry point for your application (e.g. org.apache.spark.examples.SparkPi)
--master: The master URL for the cluster (e.g. spark://123.45.67.890:7077)
--deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client)
--conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown).
for instance, an hdfs:// path or a file:// path that is present on all nodes.
For further details about spark-submit, refer to https://spark.apache.org/docs/2.2.0/submitting-applications.html.
Example - Pi estimation via Spark-submit
SPARK_MASTER=$(grep "Starting Spark master" *.err | cut -d " " -f 9)
spark-submit --master $SPARK_MASTER $SPARK_HOME/examples/src/main/python/pi.py 10
Job Script Examples¶
Example Script for Launching a Local Spark Cluster:¶
Expand this section to view spark-local-cluster.sh
#!/bin/bash
#filename: spark-local-cluster.sh
#SBATCH --job-name=spark_cluster
#SBATCH --nodes=1 # nodes allocated to the job
#SBATCH --cpus-per-task=16 # the number of CPUs allocated per task
#SBATCH --exclusive # not sharing of allocated nodes with other running jobs
#SBATCH --time=03:00:00
#SBATCH --output=spark_cluster.log
#SBATCH --error=spark_cluster.err
###SBATCH --ntasks= # tasks to be created for the job
###SBATCH --ntasks-per-core= # max number of tasks per allocated core
###SBATCH --ntasks-per-node= # max number of tasks per allocated node
###SBATCH --mail-type=END,FAIL
###SBATCH --mail-user=<yourID>@ufl.edu
module load spark
### Set Spark variables
export SPARK_LOCAL_DIRS=$HOME/spark/tmp
export SPARK_WORKER_DIR=$SPARK_LOCAL_DIRS
export SPARK_WORKER_CORES=$SLURM_CPUS_PER_TASK
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8080
export SPARK_NO_DAEMONIZE=true
export SPARK_LOG_DIR=$SPARK_LOCAL_DIRS
#export SPARK_CONF_DIR=$SPARK_LOCAL_DIRS
mkdir -p $SPARK_LOCAL_DIRS
MASTER_HOST=$(scontrol show hostname $SLURM_NODELIST | head -n 1)
export SPARK_MASTER_NODE=$(host $MASTER_HOST | head -1 | cut -d ' ' -f 4)
export MAX_SLAVES=$(expr $SLURM_JOB_NUM_NODES - 1)
# start master
$SPARK_HOME/sbin/start-master.sh &
# start workers
# use spark defaults for worker resources (all mem -1 GB, all cores) since using exclusive
$SPARK_HOME/sbin/start-slave.sh spark://$SPARK_MASTER_NODE:$SPARK_MASTER_PORT
Example Script for Pi Estimation Using PySpark:¶
Expand this section to view pi_with_pythonstartup.py
from operator import add
from random import random
partitions = 10
n = 100000 * partitions
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 <= 1 else 0
count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
Categories¶
utility