Basic Google BigQuery Operations With a Salesforce Sync Demo in Mule 4

If we think about data storage, the first thing that comes to our mind is a regular database. This can be any of the most popular ones, like MySQL, SQL Server, Postgres, Vertica, etc., but I noticed not too many have interacted with one of the services Google provides with the same purpose: Google BigQuery. Maybe it is because of the pricing, but in the end, many companies are moving to cloud services and this service seems to be a great fit for them.

In this post, I will demonstrate in a few steps how we can make a sync job that allows us to describe a Salesforce instance and use a few objects to create a full schema of those objects (tables) into a Google BigQuery dataset. Then with the schema created, we will be able to push some data into BigQuery from Salesforce and see it in our Google Cloud Console project.

In order to connect to Salesforce and Google BigQuery, there are a few prerequisites needed:

Creating a New Project in GCP and Setting Up Our Service Account Key

Once you sign up for your account on GCP, you should be able to click on the New Project option and write a project name. In this example, I chose MuleSoft.

Creating a new project

Naming new project

Once a project is created we should be able to go to the menu on the left and select IAM & Admin > Service Accounts option.

Selecting Service Account

Now, we should be able to create our service account.

“A service account is a special type of Google account intended to represent a non-human user that needs to authenticate and be authorized to access data in Google APIs. Typically, service accounts are used in scenarios such as running workloads on virtual machines.” — Google Cloud Documentation

At the top of the page you should be able to see the option to create it,  then you just need to specify a name and click on Create and Continue.

Creating a service account


The next step is to set the permissions, so for this, we need to select from the roles menu BigQuery Admin.

Roles menu

Once created, we should be able to select the option manage keys from the three-dot menu on the right.

menu > manage keys

Then we can create a new key. In this case, one like JSON should be enough. The key will get downloaded automatically to your computer. (Please keep this JSON key somewhere you can use it later.)

Creating a new key


Dataset in BigQuery

Datasets are top-level containers that are used to organize and control access to your tables and views. A table or view must belong to a dataset, so you need to create at least one dataset before loading data into BigQuery.

From the left menu, we can search for BigQuery and click on it.


That will take us to the BigQuery console, now we can click on the three-dot menu and select the create dataset option.



Create dataset option

Now we just need to set the name as Salesforce and click on Create Dataset.


Setting Up Our Mule Application

Since this is a sync job, we don’t need any API specification, but we can totally fit some scenarios where we have another application that needs to consume specific endpoints/operations.

Let’s then open our Anypoint Studio app (in my case, I’m using a Mac), and let’s use the default template. For this we are going to create five flows:

  1. Sync. This flow triggers the process.
  2. DescribeInstanceThis flow will be in charge of calling the described operation using the Salesforce connector and providing all objects information from the Salesforce instance. It will also have a loop that will allow us to process the job for the objects we are going to use.
  3. DescribeIndividualSalesforceObjectThis allows us to describe a specific Salesforce object. It will basically capture the fields and field types (string, email, ID, reference, etc.) and will be responsible for creating a payload that BigQuery will recognize in order to be created in GBQ.
  4. BigQueryCreateTableThis flow only will be in charge of creating the table in BigQuery based on the Salesforce object name and the fields.
  5. QuerySalesforceObjectThis flow will dynamically query the Salesforce object and pull the data (For this we are limiting the output to 100 records, but on a bigger scale it should be done in a batch process of course.)
  6. InsertDataIntoBigQuery. This flow will push the data over into BigQuery only.

Now let’s grab our JSON key generated by Google and copy the file under the src/main/resources folder. The key will let us authenticate against our project and execute the operations. 

mulesoft.json file



Import the Google BigQuery Connector

From Exchange, we can search “BigQuery” and we should be able to see the connector listed

Google BigQuery Connector - Mule 4


Then we can just use the add to project option and we should be able to see the operations in the palette.

Operations in the palette


Sync Flow

As I mentioned, this is only in charge of triggering the whole application, so we only need one scheduler component and a flow reference to the DescribeInstance flow.


Sync flow

DescribeInstance

This flow will describe the whole Salesforce instance using the Describe Global operation. The next steps for this are to use a DataWeave transform to filter to get only the objects we are interested in, so in this case, I’m only pulling three: accounts, contacts, and a custom object called Project__c. I left in the transformation a few more attributes to only pull the objects that we are able to query.

 
%dw 2.0
import try, fail from dw::Runtime
output application/java


fun isDate(value: Any): Boolean = try(() -> value as Date).success
fun getDate(value: Any): Date | Null | Any = ( if ( isDate(value) ) value 
            as Date as String else value )

---
(payload map (item,index) ->{
    (item mapObject ((value, key, index) -> {
        (key):(getDate(value))
    } ))
}) 

mapSalesforceReocrds hosted with by GitHub

Finally, you need to loop over these three objects and there’s a flow reference for this sample that will call the other flows to be able to continue the process.

DescribeIndividualSalesforceObject

The flow basically takes the name of the Salesforce object and will allow you to describe it. The connector only asks for the object name, and then we have a pretty interesting DW transformation. 

 
%dw 2.0
input payload application/java
output application/java
fun validateField(field) = if ( (field == "REFERENCE") 
                                or (field == "ID") or 
                                (field == "PICKLIST")  or 
                                (field == "TEXTAREA") or
                                (field == "ADDRESS")or
                                (field == "EMAIL")or
                                (field == "PHONE") or
                                (field == "URL")) 
                                "STRING" 
                                else  
                                if ( (field == "DOUBLE") or
                                	 (field == "CURRENCY")
                                ) 
                                "FLOAT" 
                                else 
                                if ((field == "INT")) 
                                "INTEGER" 
                                else field
---
(payload.fields filter ($."type" != "LOCATION") map {
   fieldName : $.name,
   fieldType : validateField($."type")
})

Salesforce to BigQuery Fields Schema hosted with by GitHub 

Salesforce data types are not 100% the same as BigQuery, so we need to make a little trick to be able to create the schema in BigQuery as seamlessly as Salesforce. So in this case I’ve created a small function to convert some fields ( like ID, REFERENCE, TEXTAREA, PHONE, ADDRESS, PICKLIST, EMAIL) to be STRING. In this case, the reference or values are not really anything else than a text. For DOUBLE and CURRENCY, I’m using the value FLOAT, and finally for the INT fields, they are changed to be INTEGER.

Lastly, because the location fields are a bit tricky and we are not able to make much with the API on them, I’m removing all location fields.

The output of this is the actual schema we will use to create the table in Google BigQuery.

DescribeIndividualSalesforceObject: Describe SObject > Transform Message to Java > Transform message Salesforce to Bigquery Fields Schema


BigQueryCreateTable

This flow allows us to create the table in BigQuery; we only need to specify the table, dataset, and table fields.

Creating table in BigQuery

BigQueryCreateTable

QuerySalesForceObject

This flow basically queries the object in Salesforce and then maps the data dynamically to prepare the payload for BigQuery.

Salesforce query

The query basically comes from a variable salesforceFields — the same field we collected when we described the object using this script

 
(payload.fields filter ($."type" != "LOCATION") map {
   fieldName : $.name
}).fieldName joinBy ","

salesforceFields hosted with by GitHub

And finally, I’m limiting the result to only 100 records.

The next step is to map the Salesforce result data and map it dynamically using this script:

 
%dw 2.0
import try, fail from dw::Runtime
output application/java


fun isDate(value: Any): Boolean = try(() -> value as Date).success
fun getDate(value: Any): Date | Null | Any = ( if ( isDate(value) ) value 
            as Date as String else value )

---
(payload map (item,index) ->{
    (item mapObject ((value, key, index) -> {
        (key):(getDate(value))
    } ))
}) 
mapSalesforceReocrds hosted with by GitHub

Thanks so much to Alexandra Martinez for the insights on the utilities for DW 2.0!

This last script basically maps the records and uses the key as field and the value, but the value needs to be replaced as date in this case for the strings that are date or date-time. So I consider this the best script in this app.

QuerySalesforceObject


InsertDataInToBigQuery

This flow just inserts the data we prepared only, so basically, we only need to specify table ID, dataset ID, and the row data.
Table ID


Row data


Setting Up Our Mule Application

Now we should be able to run our application and see the new tables and the data on Google BigQuery.

On GCP I can see that the tables I selected are created:

Create tables


And if we open any of them we should look into the schema to verify all fields are there.
Verifying all the fields are present



Query results

I think this is kind of a common request that we get on the integration space and many tweaks can be implemented if we are thinking of big migrations or setting some jobs that eventually will require tables to be created automatically from Salesforce to GCP.

If you like to try it, I created this GitHub repository. I hope this was useful and I’m open to any enhancements/scenarios.

 

 

 

 

Top