Basic Google BigQuery Operations With a Salesforce Sync Demo in Mule 4
If we think about data storage, the first thing that comes to our mind is a regular database. This can be any of the most popular ones, like MySQL, SQL Server, Postgres, Vertica, etc., but I noticed not too many have interacted with one of the services Google provides with the same purpose: Google BigQuery. Maybe it is because of the pricing, but in the end, many companies are moving to cloud services and this service seems to be a great fit for them.
In this post, I will demonstrate in a few steps how we can make a sync job that allows us to describe a Salesforce instance and use a few objects to create a full schema of those objects (tables) into a Google BigQuery dataset. Then with the schema created, we will be able to push some data into BigQuery from Salesforce and see it in our Google Cloud Console project.
In order to connect to Salesforce and Google BigQuery, there are a few prerequisites needed:
- Salesforce
- If you don’t have a Salesforce instance, you can create a developer one here.
- From the Salesforce side, you will need a username, password, and security token (you can follow this process to get it).
- A developer instance contains a few records, but if you need to have some more data, this will help the process to sync that information over.
- GCP (Google Cloud Platform)
- You can sign up here for free. Google gives you $300 for 90 days to test the product (similar to Azure). Also if you already have a Google account, you can use it for this.
Creating a New Project in GCP and Setting Up Our Service Account Key
Once you sign up for your account on GCP, you should be able to click on the New Project option and write a project name. In this example, I chose MuleSoft.
Once a project is created we should be able to go to the menu on the left and select IAM & Admin > Service Accounts option.
Now, we should be able to create our service account.
“A service account is a special type of Google account intended to represent a non-human user that needs to authenticate and be authorized to access data in Google APIs. Typically, service accounts are used in scenarios such as running workloads on virtual machines.” — Google Cloud Documentation
At the top of the page you should be able to see the option to create it, then you just need to specify a name and click on Create and Continue.
The next step is to set the permissions, so for this, we need to select from the roles menu BigQuery Admin.
Once created, we should be able to select the option manage keys from the three-dot menu on the right.
Then we can create a new key. In this case, one like JSON should be enough. The key will get downloaded automatically to your computer. (Please keep this JSON key somewhere you can use it later.)
Dataset in BigQuery
Datasets are top-level containers that are used to organize and control access to your tables and views. A table or view must belong to a dataset, so you need to create at least one dataset before loading data into BigQuery.
From the left menu, we can search for BigQuery and click on it.
That will take us to the BigQuery console, now we can click on the three-dot menu and select the create dataset option.
Now we just need to set the name as Salesforce and click on Create Dataset.
Setting Up Our Mule Application
Since this is a sync job, we don’t need any API specification, but we can totally fit some scenarios where we have another application that needs to consume specific endpoints/operations.
Let’s then open our Anypoint Studio app (in my case, I’m using a Mac), and let’s use the default template. For this we are going to create five flows:
- Sync. This flow triggers the process.
DescribeInstance
. This flow will be in charge of calling the described operation using the Salesforce connector and providing all objects information from the Salesforce instance. It will also have a loop that will allow us to process the job for the objects we are going to use.DescribeIndividualSalesforceObject
. This allows us to describe a specific Salesforce object. It will basically capture the fields and field types (string, email, ID, reference, etc.) and will be responsible for creating a payload that BigQuery will recognize in order to be created in GBQ.BigQueryCreateTable
. This flow only will be in charge of creating the table in BigQuery based on the Salesforce object name and the fields.QuerySalesforceObject
. This flow will dynamically query the Salesforce object and pull the data (For this we are limiting the output to 100 records, but on a bigger scale it should be done in a batch process of course.)InsertDataIntoBigQuery
. This flow will push the data over into BigQuery only.
Now let’s grab our JSON key generated by Google and copy the file under the src/main/resources folder. The key will let us authenticate against our project and execute the operations.
Import the Google BigQuery Connector
From Exchange, we can search “BigQuery” and we should be able to see the connector listed
Then we can just use the add to project option and we should be able to see the operations in the palette.
Sync Flow
As I mentioned, this is only in charge of triggering the whole application, so we only need one scheduler component and a flow reference to the DescribeInstance flow.
DescribeInstance
This flow will describe the whole Salesforce instance using the Describe Global operation. The next steps for this are to use a DataWeave transform to filter to get only the objects we are interested in, so in this case, I’m only pulling three: accounts, contacts, and a custom object called Project__c
. I left in the transformation a few more attributes to only pull the objects that we are able to query.
%dw 2.0
import try, fail from dw::Runtime
output application/java
fun isDate(value: Any): Boolean = try(() -> value as Date).success
fun getDate(value: Any): Date | Null | Any = ( if ( isDate(value) ) value
as Date as String else value )
---
(payload map (item,index) ->{
(item mapObject ((value, key, index) -> {
(key):(getDate(value))
} ))
})
mapSalesforceReocrds hosted with by GitHub
Finally, you need to loop over these three objects and there’s a flow reference for this sample that will call the other flows to be able to continue the process.
DescribeIndividualSalesforceObject
The flow basically takes the name of the Salesforce object and will allow you to describe it. The connector only asks for the object name, and then we have a pretty interesting DW transformation.
%dw 2.0
input payload application/java
output application/java
fun validateField(field) = if ( (field == "REFERENCE")
or (field == "ID") or
(field == "PICKLIST") or
(field == "TEXTAREA") or
(field == "ADDRESS")or
(field == "EMAIL")or
(field == "PHONE") or
(field == "URL"))
"STRING"
else
if ( (field == "DOUBLE") or
(field == "CURRENCY")
)
"FLOAT"
else
if ((field == "INT"))
"INTEGER"
else field
---
(payload.fields filter ($."type" != "LOCATION") map {
fieldName : $.name,
fieldType : validateField($."type")
})
Salesforce to BigQuery Fields Schema hosted with by GitHub
Salesforce data types are not 100% the same as BigQuery, so we need to make a little trick to be able to create the schema in BigQuery as seamlessly as Salesforce. So in this case I’ve created a small function to convert some fields ( like ID
, REFERENCE
, TEXTAREA
, PHONE
, ADDRESS
, PICKLIST
, EMAIL
) to be STRING
. In this case, the reference or values are not really anything else than a text. For DOUBLE
and CURRENCY
, I’m using the value FLOAT
, and finally for the INT
fields, they are changed to be INTEGER
.
Lastly, because the location fields are a bit tricky and we are not able to make much with the API on them, I’m removing all location fields.
The output of this is the actual schema we will use to create the table in Google BigQuery.
BigQueryCreateTable
This flow allows us to create the table in BigQuery; we only need to specify the table, dataset, and table fields.
QuerySalesForceObject
This flow basically queries the object in Salesforce and then maps the data dynamically to prepare the payload for BigQuery.
The query basically comes from a variable salesforceFields
— the same field we collected when we described the object using this script
(payload.fields filter ($."type" != "LOCATION") map {
fieldName : $.name
}).fieldName joinBy ","
salesforceFields hosted with by GitHub
And finally, I’m limiting the result to only 100 records.
The next step is to map the Salesforce result data and map it dynamically using this script:
%dw 2.0
import try, fail from dw::Runtime
output application/java
fun isDate(value: Any): Boolean = try(() -> value as Date).success
fun getDate(value: Any): Date | Null | Any = ( if ( isDate(value) ) value
as Date as String else value )
---
(payload map (item,index) ->{
(item mapObject ((value, key, index) -> {
(key):(getDate(value))
} ))
})
Thanks so much to Alexandra Martinez for the insights on the utilities for DW 2.0!
This last script basically maps the records and uses the key as field and the value, but the value needs to be replaced as date in this case for the strings that are date or date-time. So I consider this the best script in this app.
InsertDataInToBigQuery
This flow just inserts the data we prepared only, so basically, we only need to specify table ID, dataset ID, and the row data.
Setting Up Our Mule Application
Now we should be able to run our application and see the new tables and the data on Google BigQuery.
On GCP I can see that the tables I selected are created:
And if we open any of them we should look into the schema to verify all fields are there.
I think this is kind of a common request that we get on the integration space and many tweaks can be implemented if we are thinking of big migrations or setting some jobs that eventually will require tables to be created automatically from Salesforce to GCP.
If you like to try it, I created this GitHub repository. I hope this was useful and I’m open to any enhancements/scenarios.