Run and Scale an Apache Spark Application on Kubernetes
Learn how to set up Apache Spark on IBM Cloud Kubernetes Service by pushing the Spark container images to IBM Cloud Container Registry.
Let's begin by looking at the technologies involved.
What is Apache Spark?
Apache Spark (Spark) is an open source data-processing engine for large data sets. It is designed to deliver the computational speed, scalability and programmability required for Big Data - specifically for streaming data, graph data, machine learning and artificial intelligence (AI) applications.
Spark's analytics engine processes data 10 to 100 times faster than alternatives. It scales by distributing processing work across large clusters of computers, with built-in parallelism and fault tolerance. It even includes APIs for programming languages that are popular among data analysts and data scientists, including Scala, Java, Python and R.
Quick intro to Kubernetes and the IBM Cloud Kubernetes Service
Kubernetes is an open source platform for managing containerized workloads and services across multiple hosts. It offers management tools for deploying, automating, monitoring and scaling containerized apps with minimal-to-no manual intervention.
IBM Cloud Kubernetes Service is a managed offering to create your own Kubernetes cluster of compute hosts to deploy and manage containerized apps on IBM Cloud. As a certified Kubernetes provider, IBM Cloud Kubernetes Service provides intelligent scheduling, self-healing, horizontal scaling, service discovery and load balancing, automated rollouts and rollbacks, and secret and configuration management for your apps.
How Apache Spark works on Kubernetes
To understand how Spark works on Kubernetes, refer to the Spark documentation. The following occurs when you run your Python application on Spark:
- Apache Spark creates a driver pod with the requested CPU and Memory.
- The driver then creates executor pods that connect to the driver and execute application code.
- While the application is running, the executor pods are terminated and new pods are created based on the load. Once the application completes, all the executor pods are terminated and the logs are persisted in the driver pod that remains in the completed state:
Prerequisites
- A runnable distribution of Spark 2.3 or above. Recommended: Spark 3.1.1 (Download)
- A running Kubernetes cluster with access configured to it using kubectl. Check the IBM Cloud Kubernetes Service documentation to create a cluster. For autoscaling, set the Worker nodes per zone to one.
- Install and setup an IBM Cloud Container Registry CLI and namespace
- IBM Cloud Kubernetes CLI
- Docker engine
- Helm
In short, you need three things to complete this journey:
- An standard IBM Cloud Kubernetes Service cluster
- An unzipped Spark distribution
- An IBM Cloud Container Registry with a namespace setup
Configure the IBM Cloud Kubernetes Service cluster
In this section, you will access the IBM Cloud Kubernetes Service cluster and will create a custom serviceaccount and a clusterrolebinding.
- To access your standard IBM Cloud Kubernetes Service cluster, refer to the Access section of your cluster. Following the steps, you should be able to download and add the
kubeconfig
configuration file for your cluster to your existingkubeconfig
in~/.kube/config
or the last file in theKUBECONFIG
environment variable. - Run the below command to create the serviceaccount. To understand why we require RBAC, refer to the RBAC on Spark Documentation:
Shell
xxxxxxxxxx
1
1$ kubectl create serviceaccount spark
- To grant a service account a
Role
orClusterRole
, you need aRoleBinding
orClusterRoleBinding
. To create aRoleBinding
orClusterRoleBinding
, you can use thekubectl create rolebinding
(orclusterrolebinding
forClusterRoleBinding
) command. For example, the following command creates anedit ClusterRole
in thedefault
namespace and grants it to thespark
service account created above:Shell
xxxxxxxxxx
1
1$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default
-
Patch the spark serviceaccount to use the default all-icr-io secret to pull the images from the IBM Cloud container registry:Shell
xxxxxxxxxx
1
1$ kubectl patch -n default serviceaccount/spark -p '{"imagePullSecrets":[{"name": "all-icr-io"}]}'
Push the Spark container images to a private container registry
Let's start by pushing our Spark container images to our private registry on IBM Cloud.
- In the terminal on your machine, move to the unzipped Spark folder.
- Run the
ibmcloud cr login
command to log your local Docker daemon into IBM Cloud Container Registry:Shell
xxxxxxxxxx
1
1$ ibmcloud cr login
- Set an environment variable to store you container registry namespace:
Shell
xxxxxxxxxx
1
1$ CONTAINER_NAMESPACE=<your_container_namespace>
- To get your container registry based on the region you logged in, run the below command to export it as an environment variable:
Shell
xxxxxxxxxx
1
1$ CONTAINER_REGISTRY=$(ibmcloud cr info | grep "Container Registry" | awk 'FNR==1 {print $3}')
- Build the container image:
Shell
xxxxxxxxxx
1
1$ ./bin/docker-image-tool.sh -r $CONTAINER_REGISTRY/$CONTAINER_NAMESPACE -t latest -p ./kubernetes/dockerfiles/spark/bindings/python/Dockerfile build
- Push the container image:
Shell
xxxxxxxxxx
1
1$ ./bin/docker-image-tool.sh -r $CONTAINER_REGISTRY/$CONTAINER_NAMESPACE -t latest -p ./kubernetes/dockerfiles/spark/bindings/python/Dockerfile push
Run the Spark application
There are two ways to run a Spark application on IBM Cloud Kubernetes Service:
- Using spark-submit.
- Using spark-on-k8s operator.
The spark-submit way
- Before running spark-submit, export the K8S_MASTER_URL:
Shell
xxxxxxxxxx
1
1$ K8S_MASTER_URL=$(ibmcloud ks cluster get --cluster $CLUSTER_ID --output json | jq --raw-output '.serviceEndpoints.publicServiceEndpointURL')
- spark-submitcan be directly used to submit a Spark application to a Kubernetes cluster. You can do that with the following command:
Shell
xxxxxxxxxx
110
1./bin/spark-submit \
2--master k8s://$K8S_MASTER_URL \
3--deploy-mode cluster \
4--name spark-pi \
5--class org.apache.spark.examples.SparkPi \
6--conf spark.executor.instances=2 \
7--conf spark.kubernetes.container.image=$CONTAINER_REGISTRY/$CONTAINER_NAMESPACE/spark-py:latest \
8--conf spark.kubernetes.container.image.pullPolicy=Always\
9--conf spark.kubernetes.executor.request.cores=100m \
10local:///opt/spark/examples/src/main/python/pi.py 500
- Get the driver pod name by running the below command:
Shell
xxxxxxxxxx
1
1$ kubectl get pods -l spark-role=driver
- The UI associated with any application can be accessed locally using kubectl port-forward:
Shell
xxxxxxxxxx
1
1$ kubectl port-forward <driver-pod-name> 4040:4040
The spark-on-k8s operator way
Spark-on-k8s operator is a Kubernetes operator for managing the lifecycle of Apache Spark applications on Kubernetes.
- To install, you need Helm. Follow the instructions mentioned in the GitHub repo to install the operator on your IBM Cloud Kubernetes Service cluster.
- Create a YAML file -
spark-deploy.yaml
- with the content below. Replace the placeholders <CONTAINER_REGISTRY> and <CONTAINER_NAMESPACE> and save the file:YAML
xxxxxxxxxx
134
1apiVersion"sparkoperator.k8s.io/v1beta2"
2kind ScheduledSparkApplication
3metadata
4name pyspark-pi
5namespace default
6spec
7schedule"@every 10m"
8#suspend: true
9concurrencyPolicy Forbid
10template
11type Python
12pythonVersion"3"
13mode cluster
14image"<CONTAINER_REGISTRY>/<CONTAINER_NAMESPACE>/spark-py:latest"
15imagePullPolicy Always
16imagePullSecrets
17all-icr-io
18mainApplicationFile local ///opt/spark/examples/src/main/python/pi.py
19sparkVersion"3.1.1"
20restartPolicy
21type Never
22driver
23cores1
24coreLimit"1200m"
25memory"512m"
26labels
27version3.1.1
28serviceAccount spark
29executor
30cores1
31instances2
32memory"100m"
33labels
34version3.1.1
- You'll see that the Spark application is scheduled to run every 10 minutes, calculating the value is Pi.
- Apply the spark-deploy.yaml to install the operator:
Shell
xxxxxxxxxx
1
1$ kubectl apply -f spark-deploy.yaml
- Run the
kubectl get pods --watch
command to check the number of executor pods running.
Note: To check the Kubernetes resources, logs etc., I would recommend IBM-kui, a hybrid command-line/UI development experience for cloud native development.
Autoscaling
The autoscaling of the pods and IBM Cloud Kubernetes Service cluster depends on the requests and limits you set on the Spark driver and executor pods.
With the cluster-autoscaler
add-on, you can scale the worker pools in your IBM Cloud Kubernetes Service classic or VPC cluster automatically to increase or decrease the number of worker nodes in the worker pool based on the sizing needs of your scheduled workloads. The cluster-autoscaler
add-on is based on the Kubernetes Cluster-Autoscaler project.
For scaling apps, check out the IBM Cloud documentation.
Install the cluster autoscaler add-on to your cluster from the console:
- From the IBM Cloud Kubernetes Service cluster dashboard, select the cluster where you want to enable autoscaling.
- On the Overview page, click Add-ons.
- On the Add-ons page, locate the Cluster Autoscaler add-on and click Install. You can also do the same using the CLI.
After enabling the add-on, you need to edit the ConfigMap. For step-by-step instructions, refer to the autoscaling cluster documentation.
Dynamic allocation
When a Spark application is submitted, resources are requested based on the requests you set on the driver and executor. With dynamic resource allocation, Spark allocates additional resources required to complete the tasks in a job. The resources are automatically released once the load comes down or the tasks are completed.
For dynamic allocation to work, your application must set two configuration settings spark.dynamicAllocation.enabled
and spark.dynamicAllocation.shuffleTracking.enabled
to true
. To do this, follow these steps:
- Move to the unzipped Spark folder.
- Under the conffolder, rename spark-defaults.conf.template to spark-defaults.conf and add the below environment settings:
spark.dynamicAllocation.enabled true spark.dynamicAllocation.shuffleTracking.enabled true
- Save the configuration file.
- You may need to build and push the container image to reflect the config changes. As the
imagePullPolicy
is set toAlways
, the new container image will be pulled automatically. Remember to delete the existing driver and executor pods with thekubectl delete pods --all
command.
What's next?
You can add Apache Spark Streaming for PySpark applications like wordcount to read and write batches of data to Cloud services like IBM Cloud Object Storage (COS). Also, check Stocator for connecting COS to Apache Spark.
If you have any queries, feel free to reach out to me on Twitter or on LinkedIn.