Execute Spark Applications on Databricks Using the REST API
Introduction
While many of us are habituated to executing Spark applications using the 'spark-submit' command, with the popularity of Databricks, this seemingly easy activity is getting relegated to the background. Databricks has made it very easy to provision Spark-enabled VMs on the two most popular cloud platforms, namely AWS and Azure. A couple of weeks ago, Databricks announced their availability on GCP as well. The beauty of the Databricks platform is that they have made it very easy to become a part of their platform. While Spark application development will continue to have its challenges - depending on the problem being addressed - the Databricks platform has taken out the pain of having to establish and manage your own Spark cluster.
Using Databricks
Once registered on the platform, the Databricks platform allows us to define a cluster of one or more VMs, with configurable RAM and executor specifications. We can also define a cluster that can launch a minimum number of VMs at startup and then scale to a maximum number of VMs as required. After defining the cluster, we have to define jobs and notebooks. Notebooks contain the actual code executed on the cluster. We need to assign notebooks to jobs as the Databricks cluster executes jobs (and not Notebooks). Databricks also allows us to setup the cluster such that it can download additional JARs and/or Python packages during cluster startup. We can also upload and install our own packages (I used a Python wheel).
Recently, I developed some functionality - data reconciliation, data validation and data profiling - using Spark. Initially, we developed the functionality using the local Spark installation and things were fine. While I knew that the design would need to be reworked, we went ahead with the local implementation. Why the design change? The functionality we developed was fronted by a microservice - one for each. As the microservices were going to be deployed using Docker and Kubernetes, we would need to implement a design change for the simple reason that we could not deploy the Spark application on the Docker and Kubernetes setup. We needed to have the Spark application running on a dedicated Spark instance.
To make this happen, we had two options - Apache Livy and Databricks. For implementation flexibility and also to cater to customer infrastructure, we decided to implement both options. In an earlier article (Execute Spark Applications With Apache Livy), I have mentioned how we can execute Spark applications using Apache Livy's REST interface. I have covered the Apache Livy implementation in an earlier article.
Using Databricks Remotely
Similar to what Apache Livy has, Databricks also provides a REST API. As our implementation was in Python, we used the package databricks_api
. While the REST API makes it simple to invoke a Spark application available on a Databricks cluster, I realized that all the three services ended up with the same code - the mechanism for setting up and invoking the Databricks API was the same - the names of the jobs and the parameters passed during invocation were different. Hence I wrapped up the common functionality into a helper class.
Helper class
Here is the helper class to interact with Spark applications hosted on Databricks.
xxxxxxxxxx
. . . other imports
from databricks_api import DatabricksAPI
class DatabricksRunner:
def __init__(self):
self.databricks = None
self.host_id = None
self.access_token = None
self.cluster_is_running = False
self.cluster_is_defined = False
self.databricks_job_id = None
def get_job_id(self, job_list, name):
ret_val = None
jobs = job_list.get("jobs", None)
if jobs is not None:
for i in range(len(jobs)):
job_name = jobs[i].get("settings", {}).get("name", None)
if name == job_name:
ret_val = jobs[i]
break
return ret_val
def connect_to_cluster(self, host_id, access_token, cluster_name, headers):
self.host_id = host_id
self.access_token = access_token
self.databricks = DatabricksAPI(host=host_id, token=access_token)
clusters = self.databricks.cluster.list_clusters(headers=headers)
self.cluster_is_running = False
self.cluster_is_defined = False
for c in clusters["clusters"]:
if c["cluster_name"] == cluster_name and self.cluster_is_defined == False:
self.cluster_is_defined = True
if c["state"] == "RUNNING":
self.cluster_is_running = True
elif c["stat"] == "TERMINATED":
self.cluster_is_running = False
else:
self.cluster_is_running = False
if self.cluster_is_defined is False:
return False
if self.cluster_is_running is False:
return False
return True
def get_job_id(self, job_name):
if self.cluster_is_running == False:
return None
job_list = self.databricks.jobs.list_jobs(headers=None)
job_details = self.get_job_id(job_list, job_name)
if job_details is None:
return None
self.databricks_job_id = job_details["job_id"]
return True
def run_job(self, notebook_params, jar_params, python_params, spark_submit_params, headers):
ret_val = self.databricks.jobs.run_now(job_id=self.databricks_job_id, jar_prams=jar_params,
notebook_params=notebook_params, python_params=python_params,
spark_submit_params=spark_submit_params, headers=headers)
self.job_run_id = ret_val["run_id"]
return True
def wait_for_job(self, poll_time, timeout_value):
life_cycle_state = "RUNNING"
run_time = 0
result_state = ""
while "TERMINATED" != life_cycle_state and run_time <= timeout_value:
run_response = self.databricks.jobs.get_run(run_id=self.job_run_id, headers=None)
state = run_response.get("state", None)
if state is not None:
life_cycle_state = state.get("life_cycle_state", None)
result_state = state.get("result_state", None)
time.sleep(poll_time)
run_time = run_time + poll_time
if run_time >= timeout_value:
return "job is still running"
elif "TERMINATED" == life_cycle_state:
if result_state == "success":
ret_val = "success"
elif result_state == "failed":
ret_val = "failed"
else:
ret_val = "state not recognized"
return ret_val
Using the Helper Class
After defining the class, we can run Spark jobs as below
x
host_id = # assign value
access_token = # assign value
cluster_name = # assign value
headers = # assign value
job_name = # assign value
notebook_params = # json
python_params = # json
spark_submit_params = # json
runner = DatabricksRunner()
ret_val = runner.connect_to_cluster(host_id, access_token, cluster_name, headers)
ret_val = runner.get_job_id(job_name)
ret_val = runner.run_job(notebook_params, jar_params, python_params, spark_submit_params, headers)
ret_val = runner.wait_for_job(60, 600)
Conclusion
The Databricks API makes it easy to interact with Databricks jobs remotely. Not only can we run jobs on the Databricks cluster, but we can also monitor their execution state.