Execute Spark Applications on Databricks Using the REST API

Introduction

While many of us are habituated to executing Spark applications using the 'spark-submit' command, with the popularity of Databricks, this seemingly easy activity is getting relegated to the background. Databricks has made it very easy to provision Spark-enabled VMs on the two most popular cloud platforms, namely AWS and Azure. A couple of weeks ago, Databricks announced their availability on GCP as well. The beauty of the Databricks platform is that they have made it very easy to become a part of their platform. While Spark application development will continue to have its challenges - depending on the problem being addressed - the Databricks platform has taken out the pain of having to establish and manage your own Spark cluster.

Using Databricks

Once registered on the platform, the Databricks platform allows us to define a cluster of one or more VMs, with configurable RAM and executor specifications. We can also define a cluster that can launch a minimum number of VMs at startup and then scale to a maximum number of VMs as required. After defining the cluster, we have to define jobs and notebooks. Notebooks contain the actual code executed on the cluster. We need to assign notebooks to jobs as the Databricks cluster executes jobs (and not Notebooks). Databricks also allows us to setup the cluster such that it can download additional JARs and/or Python packages during cluster startup. We can also upload and install our own packages (I used a Python wheel).

Recently, I developed some functionality - data reconciliation, data validation and data profiling - using Spark. Initially, we developed the functionality using the local Spark installation and things were fine. While I knew that the design would need to be reworked, we went ahead with the local implementation. Why the design change? The functionality we developed was fronted by a microservice - one for each. As the microservices were going to be deployed using Docker and Kubernetes, we would need to implement a design change for the simple reason that we could not deploy the Spark application on the Docker and Kubernetes setup. We needed to have the Spark application running on a dedicated Spark instance.

To make this happen, we had two options - Apache Livy and Databricks. For implementation flexibility and also to cater to customer infrastructure, we decided to implement both options. In an earlier article (Execute Spark Applications With Apache Livy), I have mentioned how we can execute Spark applications using Apache Livy's REST interface. I have covered the Apache Livy implementation in an earlier article.

Using Databricks Remotely

Similar to what Apache Livy has, Databricks also provides a REST API. As our implementation was in Python, we used the package databricks_api. While the REST API makes it simple to invoke a Spark application available on a Databricks cluster, I realized that all the three services ended up with the same code - the mechanism for setting up and invoking the Databricks API was the same - the names of the jobs and the parameters passed during invocation were different. Hence I wrapped up the common functionality into a helper class.

Helper class

Here is the helper class to interact with Spark applications hosted on Databricks.

Python
 




xxxxxxxxxx
1
89


1
. . . other imports
2
from databricks_api import DatabricksAPI
3
 
          
4
class DatabricksRunner:
5
    def __init__(self):
6
        self.databricks = None
7
        self.host_id = None
8
        self.access_token = None
9
        self.cluster_is_running = False
10
        self.cluster_is_defined = False
11
        self.databricks_job_id = None
12
 
          
13
    def get_job_id(self, job_list, name):
14
        ret_val = None
15
        jobs = job_list.get("jobs", None)
16
        if jobs is not None:
17
            for i in range(len(jobs)):
18
                job_name = jobs[i].get("settings", {}).get("name", None)
19
                if name == job_name:
20
                    ret_val = jobs[i]
21
                    break
22
        return ret_val
23
 
          
24
    def connect_to_cluster(self, host_id, access_token, cluster_name, headers):
25
    self.host_id = host_id
26
    self.access_token = access_token
27
    self.databricks = DatabricksAPI(host=host_id, token=access_token)
28
 
          
29
    clusters = self.databricks.cluster.list_clusters(headers=headers)
30
    self.cluster_is_running = False
31
    self.cluster_is_defined = False
32
    for c in clusters["clusters"]:
33
        if c["cluster_name"] == cluster_name and self.cluster_is_defined == False:
34
            self.cluster_is_defined = True
35
        if c["state"] == "RUNNING":
36
            self.cluster_is_running = True
37
        elif c["stat"] == "TERMINATED":
38
            self.cluster_is_running = False
39
        else:
40
            self.cluster_is_running = False
41
 
          
42
        if self.cluster_is_defined is False:
43
            return False
44
 
          
45
        if self.cluster_is_running is False:
46
            return False
47
    return True
48
 
          
49
    def get_job_id(self, job_name):
50
        if self.cluster_is_running == False:
51
            return None
52
 
          
53
        job_list = self.databricks.jobs.list_jobs(headers=None)
54
        job_details = self.get_job_id(job_list, job_name)
55
        if job_details is None:
56
            return None
57
        self.databricks_job_id = job_details["job_id"]
58
        return True
59
 
          
60
    def run_job(self, notebook_params, jar_params, python_params, spark_submit_params, headers):
61
        ret_val = self.databricks.jobs.run_now(job_id=self.databricks_job_id,     jar_prams=jar_params,
62
            notebook_params=notebook_params, python_params=python_params,
63
            spark_submit_params=spark_submit_params, headers=headers)
64
        self.job_run_id = ret_val["run_id"]
65
        return True
66
 
          
67
    def wait_for_job(self, poll_time, timeout_value):
68
        life_cycle_state = "RUNNING"
69
        run_time = 0
70
        result_state = ""
71
        while "TERMINATED" != life_cycle_state and run_time <= timeout_value:
72
            run_response = self.databricks.jobs.get_run(run_id=self.job_run_id, headers=None)
73
            state = run_response.get("state", None)
74
            if state is not None:
75
                life_cycle_state = state.get("life_cycle_state", None)
76
                result_state = state.get("result_state", None)
77
            time.sleep(poll_time)
78
            run_time = run_time + poll_time
79
 
          
80
        if run_time >= timeout_value:
81
            return "job is still running"
82
        elif "TERMINATED" == life_cycle_state:
83
            if result_state == "success":
84
                ret_val = "success"
85
            elif result_state == "failed":
86
                ret_val = "failed"
87
            else:
88
                ret_val = "state not recognized"
89
    return ret_val



Using the Helper Class

After defining the class, we can run Spark jobs as below

Python
 




x


 
1
host_id = # assign value
2
access_token = # assign value
3
cluster_name = # assign value
4
headers = # assign value
5
job_name = # assign value
6
notebook_params = # json
7
python_params = # json
8
spark_submit_params = # json
9
 
          
10
runner = DatabricksRunner()
11
ret_val = runner.connect_to_cluster(host_id, access_token, cluster_name, headers)
12
ret_val = runner.get_job_id(job_name)
13
ret_val = runner.run_job(notebook_params, jar_params, python_params, spark_submit_params, headers)
14
ret_val = runner.wait_for_job(60, 600)



Conclusion

The Databricks API makes it easy to interact with Databricks jobs remotely. Not only can we run jobs on the Databricks cluster, but we can also monitor their execution state.

 

 

 

 

Top