Skip to content

Spark

Description

spark website

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

  • SPARK_HOME - examples directory

Environment Modules

Run module spider spark to find out what environment modules are available for this application.

Environment Variables

  • HPC_SPARK_DIR - installation directory
  • HPC_SPARK_BIN - executable directory
  • HPC_SPARK_SLURM - SLURM job script examples

Additional Usage Information

To run your Spark jobs on HiperGator, two separate steps are required:

  1. Create a Spark cluster on HiperGator via SLURM. This section "Spark Cluster on HiPerGator" below shows a simple example how to create a Spark cluster on HiperGator.
  2. Submit your job to your Spark cluster. You can do this either interactively at the command line ("Spark Interactive Job" section below) or by submitting a a batch job ("Spark Batch Job" section below)

For Spark parameters used in this section, please refer to Spark's Homepage.

Spark Cluster on HiperGator:

Expand this section to view instructions for creating a spark cluster in HiperGator:

It is assumed that spark-local-cluster.sh is the file name of the SLURM job script for one-worker node Spark cluster in this section. Set SLURM parameters for Spark cluster. spark-local-cluster.sh is available on "Spark_Job_Scripts" page section.

#!/bin/bash
#filename: spark-local-cluster.sh
#SBATCH --job-name=spark_cluster
#SBATCH --nodes=1 # nodes allocated to the job
#SBATCH --cpus-per-task=16 # the number of CPUs allocated per task
#SBATCH --exclusive # not sharing of allocated nodes with other running jobs
#SBATCH --time=03:00:00
#SBATCH --output=spark_cluster.log
#SBATCH --error=spark_cluster.err

module load spark

## Set Spark parameters for Spark cluster
export SPARK_LOCAL_DIRS=$HOME/spark/tmp
export SPARK_WORKER_DIR=$SPARK_LOCAL_DIRS
export SPARK_WORKER_CORES=$SLURM_CPUS_PER_TASK
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8080
export SPARK_NO_DAEMONIZE=true
export SPARK_LOG_DIR=$SPARK_LOCAL_DIRS

mkdir -p $SPARK_LOCAL_DIRS

##Set Spark Master and Workers
MASTER_HOST=$(scontrol show hostname $SLURM_NODELIST | head -n 1)
export SPARK_MASTER_NODE=$(host $MASTER_HOST | head -1 | cut -d ' ' -f 4)
export MAX_SLAVES=$(expr $SLURM_JOB_NUM_NODES - 1)

## for starting spark master
$SPARK_HOME/sbin/start-master.sh & 

## use spark defaults for worker resources (all mem -1 GB, all cores) since using exclusive
## for starting spark worker
$SPARK_HOME/sbin/start-slave.sh spark://$SPARK_MASTER_NODE:$SPARK_MASTER_PORT

Submit the SLURM job script to HiperGator

sbatch spark-local-cluster.sh
Check the Spark master launched.
grep "Starting Spark master" spark_cluster.err
This grep command above should end up with information like
18/03/13 14:53:23 INFO Master: Starting Spark master at spark://c29a-s42.ufhpc:7077
Check the Spark worker launched.
grep "Starting Spark worker" spark_cluster.err
This grep command above should end up with information like
18/03/13 14:53:24 INFO Worker: Starting Spark worker 172.16.194.59:42418 with 16 cores, 124.3 GB RAM

Spark Interactive Job:

Expand this section to view instructions for using the Spark interactive shell on HiperGator:

Spark supports interactive job submission through the interactive shells.

Spark interactive shell in Scalar (spark-shell)
First, load spark module in the terminal where you want to submit a spark job.

module load spark

Get the location of the Spark master to connect to it through the interactive shell

SPARK_MASTER=$(grep "Starting Spark master" *.err | cut -d " " -f 9)

Connect to the master using the Spark interactive shell in scalar

spark-shell --master $SPARK_MASTER

Spark interactive shell in Python (pyspark)
Load spark module in the terminal where you want to submit a spark job.

module load spark

Get the location of the Spark master to connect to it through the interactive shell

SPARK_MASTER=$(grep "Starting Spark master" *.err | cut -d " " -f 9)

Connect to the master using the Spark interactive shell in scalar

pyspark --master $SPARK_MASTER

Example - PI estimation via pyspark

SPARK_MASTER=$(grep "Starting Spark master" *.err | cut -d " " -f 9)
pyspark --master $SPARK_MASTER

Pi with PySpark

Example - Pi estimation from file with pyspark
As of Spark 2.0., Spark interactive shell in python does not load python files to run python application. Instead, “PYTHONSTARTUP”, a python environmental variable can be used to run python script with pyspark, which executes the python script before an interactive shell starts.

SPARK_MASTER=$(grep "Starting Spark master" *.err | cut -d " " -f 9)
PYTHONSTARTUP=pi_with_pythonstartup.py pyspark --master $SPARK_MASTER

pi_with_pythonstartup.py script is avaialble on "Spark_Job_Scripts" page below.

Spark Batch Job:

Expand this section to view instructions for starting preset applications without a job script:

Spark supports batch job submission through spark-submit which provides unified interface for Spark jobs

$SPARK_HOME/bin/spark-submit  \
             --class <main-class>  --master <master-url> \
             --deploy-mode <deploy-mode>  --conf <key>=<value>  \
             ... # other options  <application-jar>  [application-arguments]

--class: The entry point for your application (e.g. org.apache.spark.examples.SparkPi)
--master: The master URL for the cluster (e.g. spark://123.45.67.890:7077)
--deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client)
--conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown).
: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster,
for instance, an hdfs:// path or a file:// path that is present on all nodes.
: Arguments passed to the main method of your main class, if any

For further details about spark-submit, refer to https://spark.apache.org/docs/2.2.0/submitting-applications.html.

Example - Pi estimation via Spark-submit

SPARK_MASTER=$(grep "Starting Spark master" *.err | cut -d " " -f 9)
spark-submit --master $SPARK_MASTER $SPARK_HOME/examples/src/main/python/pi.py 10

PySpark Submit

Job Script Examples

Example Script for Launching a Local Spark Cluster:

Expand this section to view spark-local-cluster.sh
#!/bin/bash
#filename: spark-local-cluster.sh

#SBATCH --job-name=spark_cluster
#SBATCH --nodes=1 # nodes allocated to the job
#SBATCH --cpus-per-task=16 # the number of CPUs allocated per task
#SBATCH --exclusive # not sharing of allocated nodes with other running jobs
#SBATCH --time=03:00:00
#SBATCH --output=spark_cluster.log
#SBATCH --error=spark_cluster.err

###SBATCH --ntasks= # tasks to be created for the job
###SBATCH --ntasks-per-core= # max number of tasks per allocated core
###SBATCH --ntasks-per-node= # max number of tasks per allocated node
###SBATCH --mail-type=END,FAIL
###SBATCH --mail-user=<yourID>@ufl.edu

module load spark
### Set Spark variables
export SPARK_LOCAL_DIRS=$HOME/spark/tmp
export SPARK_WORKER_DIR=$SPARK_LOCAL_DIRS
export SPARK_WORKER_CORES=$SLURM_CPUS_PER_TASK
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8080
export SPARK_NO_DAEMONIZE=true
export SPARK_LOG_DIR=$SPARK_LOCAL_DIRS
#export SPARK_CONF_DIR=$SPARK_LOCAL_DIRS
mkdir -p $SPARK_LOCAL_DIRS

MASTER_HOST=$(scontrol show hostname $SLURM_NODELIST | head -n 1)
export SPARK_MASTER_NODE=$(host $MASTER_HOST | head -1 | cut -d ' ' -f 4)
export MAX_SLAVES=$(expr $SLURM_JOB_NUM_NODES - 1)

# start master
$SPARK_HOME/sbin/start-master.sh &

# start workers
# use spark defaults for worker resources (all mem -1 GB, all cores) since using exclusive

$SPARK_HOME/sbin/start-slave.sh spark://$SPARK_MASTER_NODE:$SPARK_MASTER_PORT

Example Script for Pi Estimation Using PySpark:

Expand this section to view pi_with_pythonstartup.py
from operator import add
from random import random

partitions = 10
n = 100000 * partitions

def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0

count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))

Categories

utility