Writing a Simple Pulumi Provider for Airbyte
In this quick tutorial, I will go over Pulumi by writing a very simple Provider for Airbyte.
The simplified use case is as follows: I want to programmatically (Infrastructure as Code) create, update, and delete Sources, Destinations, and Connections in Airbyte instead of using the UI.
Airbyte has its own Terraform Provider. However, at the time of writing, it is somewhat not entirely mature and difficult to debug, particularly if you are not used to writing your own terraform providers
Airbyte offers a great and simple REST API, which, in fact, is what is used under the hood by the Terraform provider.
Instead of using the Terraform provider, I was planning to use the API directly, as it is easier to try, configure, and debug in case of getting errors. You simply use any REST or HTTP client (I was planning on using requests) and invoke the different API endpoints. For example, for creating a BigQuery Destination, you would do an HTTP POST similar to the following (assuming you are connecting to api.airbyte.com
):
import requests
url = "https://api.airbyte.com/v1/destinations"
payload = {
"configuration": {
"destinationType": "bigquery",
"dataset_location": "US",
"loading_method": { "method": "Standard" },
"project_id": "xxx",
"dataset_id": "abc",
"credentials_json": "jjjjjjj"
},
"name": "destination"
}
headers = {
"accept": "application/json",
"content-type": "application/json"
}
response = requests.post(url, json=payload, headers=headers)
print(response.text)
That is all and good, simple, and easy to understand and debug. However, using Terraform has its advantages. One advantage of using Terraform is the state management. Using the Terraform provider you declare your resources (Sources, Destinations, and Connections) and the provider makes sure to create them in Airbyte. If you modify the Terraform configuration, the provider makes the correct modification (PATCH HTTP requests) in Airbyte. If you delete the resource in the Terraform file, again the provider notices this and requests the deletion of the component in Airbyte.
So I wanted to see if there was a way to combine the best of both worlds: the developer-friendly simplicity of the Python code with something similar to the state management from Terraform.
Enter Pulumi, which allowed me to do exactly that, and in a very elegant, flexible, and secure way.
On its home page, Pulumi says:
Infrastructure as Code in any programming language …
Author infrastructure code using programming languages you know and love. Write statements to define infrastructure using your IDE with autocomplete, type checking, and documentation.
Awesome! That sounded promising. So I established my PoC:
- Write an Airbyte Pulumi“provider” that allows to:
- Create, modify, and delete sources
- Create, modify, and delete destinations
- Create, modify, and delete connections
- Write a few resources to exercise the provider.
- In particular, I selected a Jira source connected to a BigQuery destination.
- Create some external resources using Pulumi and link them to the Airbyte-specific resources.
- In particular, create a dataset in BigQuery using Pulumi GCP provider.
- Link that dataset to an Airbyte BigQuery destination.
- Make sure I can store the state in the cloud (GCS in particular).
- Make sure the state doesn't contain plain text secrets or passwords.
- Make sure I can run it programmatically easily, as well as from the command line.
Setting Up the Project
brew install pulumi/tap/pulumi
mkdir pulumi_airbyte
cd pulumi_airbyte
pulumi login --local
pulumi new python
Follow the wizard. You should end up with a directory content like:
Pulumi.dev.yaml Pulumi.yaml __main__.py requirements.txt venv/
Writing the Provider
In Airbyte we define sources, destinations, and connections. We will create a very simple Pulumi provider that allows us to create these:
import pulumi
from pulumi.dynamic import Resource, ResourceProvider, CreateResult, UpdateResult
import requests
AIRBYTE_API_URL = "http://localhost:8006/v1" # Update with your Airbyte API URL
class SourceProvider(ResourceProvider):
def create(self, inputs):
headers = {
"accept": "application/json",
"content-type": "application/json"
}
response = requests.post(f"{AIRBYTE_API_URL}/sources", json=inputs, headers=headers)
response_data = response.json()
source_id = response_data["sourceId"]
outputs = {**inputs, "id": source_id}
return CreateResult(id_=source_id, outs=outputs)
def update(self, id, olds, news):
update_data = {**news, "sourceId": id}
requests.patch(f"{AIRBYTE_API_URL}/sources/{id}", json=update_data)
return UpdateResult(outs=news)
def delete(self, id, props):
requests.delete(f"{AIRBYTE_API_URL}/sources/{id}", json={"sourceId": id})
class DestinationProvider(ResourceProvider):
def create(self, inputs):
response = requests.post(f"{AIRBYTE_API_URL}/destinations", json=inputs)
response_data = response.json()
destination_id = response_data["destinationId"]
outputs = {**inputs, "id": destination_id}
return CreateResult(id_=destination_id, outs=outputs)
def update(self, id, olds, news):
update_data = {**news, "destinationId": id}
requests.patch(f"{AIRBYTE_API_URL}/destinations/{id}", json=update_data)
return UpdateResult(outs=news)
def delete(self, id, props):
requests.delete(f"{AIRBYTE_API_URL}/destinations/{id}", json={"destinationId": id})
class ConnectionProvider(ResourceProvider):
def create(self, inputs):
response = requests.post(f"{AIRBYTE_API_URL}/connections", json=inputs)
response_data = response.json()
connection_id = response_data["connectionId"]
outputs = {**inputs, "id": connection_id}
return CreateResult(id_=connection_id, outs=outputs)
def update(self, id, olds, news):
print("UPDATING WITH: ")
response = requests.patch(f"{AIRBYTE_API_URL}/connections/{id}", json=news)
print("RESPONSE: " + response.text + " " + str(response.status_code))
return UpdateResult(outs=news)
def delete(self, id, props):
requests.delete(f"{AIRBYTE_API_URL}/connections/{id}", json={"connectionId": id})
We see we have defined 3 ResourceProvider
types. In each of them, we can see we defined the logic to create
, update
and delete
resources.
Next, we need to define the Resource
types to create the that will use each of these providers. So we create a Source
a Destination
and a Connection
resource. We will also create a more specific JiraResource
to demonstrate that if we want we can create full Python objects to specify our resources and their properties.
class Source(Resource):
def __init__(self, the_name, opts=None, **props):
super().__init__(SourceProvider(), the_name, props, opts)
class Destination(Resource):
def __init__(self, the_name, opts=None, **props):
super().__init__(DestinationProvider(), the_name, props, opts)
class Connection(Resource):
def __init__(self, the_name, opts=None, **props):
super().__init__(ConnectionProvider(), the_name, props, opts)
class JiraSource(Source):
def __init__(self, the_name, *, workspace, name, api_token, domain, email, opts=None):
configuration = {"workspaceId": workspace, "name": name,
"configuration": {"sourceType": "jira",
"api_token": api_token,
"domain": domain,
"email": email}}
super().__init__(the_name, opts, **configuration)
You can see we extend the Resource
class from Pulumi.
We have now defined all the types we need. This is now a working provider. The next step is to define the actual resources.
We will define:
- A BigQuery dataset
- A GCP Service account with permissions on the dataset
- A Jira Airbyte source
- A BigQuery Airbyte destination
- A Jira -> BigQuery connection
Defining the GCP Resources
Pulumi offers a lot of providers already made. In particular, we will use the out-of-the-box GCP Provider.
Let’s define the dataset and service account in a file named gcp_resources.py
:
import pulumi
import pulumi_gcp as gcp
from dotenv import load_dotenv
load_dotenv()
def gcp_resources():
config = pulumi.Config("pulumi_airbyte")
bqowner = gcp.serviceaccount.Account("bqowner2", account_id="bqowner2", project=config.require_secret('gcpProject'))
dataset = gcp.bigquery.Dataset("dataset",
project=config.require_secret('gcpProject'),
dataset_id="airbyte_pulumi_2",
friendly_name="test",
description="This is a test description",
location="europe-west2",
default_table_expiration_ms=3600000,
labels={
"env": "default",
},
accesses=[
gcp.bigquery.DatasetAccessArgs( role="OWNER",
user_by_email=bqowner.email,
),
])
pulumi.export("jira_dataset", dataset.id)
pulumi.export("jira_servce_account", bqowner.id)
return {"dataset": dataset}
Next, let’s define the Airbyte resources in a file named resources.py.
import pulumi
from provider.airbyte_provider import Source, Destination, Connection, JiraSource
from dotenv import load_dotenv
from gcp_resources import gcp_resources
load_dotenv()
def pulumi_execution(gcp_resources):
config = pulumi.Config("pulumi_airbyte")
dataset = gcp_resources["dataset"]
workspace_id = config.require('airbyte_workspace'),
# Jira Source configuration
jira_source_config = {"workspaceId": workspace_id, "name": "Personal Jira 8",
"configuration": {"sourceType": "jira",
"api_token": config.require_secret('jiraApiToken'),
"domain": config.require_secret('jira_domain'),
"email": config.require_secret('jira_email')}}
# BigQuery Destination configuration
bigquery_destination_config = {"workspaceId": workspace_id, "name": "BigQuery Destination 3xxx", "configuration": {
"destinationType": "bigquery",
"dataset_location": "europe-west2",
"project_id": config.require_secret('gcpProject'),
"dataset_id": dataset.dataset_id,
"credentials_json": config.require_secret('serviceAccountJson'),
"loading_method": {
"method": "Standard"
}
} }
# Create Jira Source
jira_source = Source("jiraSource", **jira_source_config)
# Create BigQuery Destination
bigquery_destination = Destination("bigQueryDestination", **bigquery_destination_config)
# Create Connection linking Jira Source and BigQuery Destination
connection_config = {
"name": "The Connection 5",
"sourceId": jira_source.id,
"destinationId": bigquery_destination.id,
"schedule": {"scheduleType": "manual"},
"namespaceDefinition": "destination",
"namespaceFormat": None,
"nonBreakingSchemaUpdatesBehavior": "ignore",
"configurations": {"streams": [
{ "syncMode": "full_refresh_overwrite",
"name": "issues"
}
]} }
connection = Connection("jiraToBigQueryConnection2", **connection_config)
pulumi.export("jira_source_id", jira_source.id)
pulumi.export("bigquery_destination_id", bigquery_destination.id)
pulumi.export("connection_id", connection.id)
pulumi.export("jira_source2_id", jira_source2.id)
A thing to note:
- Look how the
pulumi_execution
function is receiving agcp_resources
as a parameter. This will be used to link the GCP resources with the Airbyte resources:- You can see in the Destination config the line
"dataset_id": dataset.dataset_id,
which basically does the linking.
- You can see in the Destination config the line
We’ll now see how the two resource files are linked together.
For standard execution, Pulumi will look for a __main__.py
file. Let’s define it here:
from resources import pulumi_execution
from gcp_resources import gcp_resources
gcp_stuff = gcp_resources()
pulumi_execution(gcp_stuff)
This is all the code needed. A couple of pieces to note in the code chunks are the use of:
config.require_secret('jiraApiToken')
And:
config.require('airbyte_workspace')
These are generated using more pulumi
cli commands. In particular:
pulumi config set pulumi_airbyte:airbyte_workspace "xxxxxxx"
For normal non-secret configs and:
pulumi config set --secret pulumi_airbyte:jira_email "xxx@gmail.com"
For secret encrypted configurations.
Then we execute pulumi up
to provision. (The following output is on my machine after I already created them, so you can see pulumi
correctly says that there is nothing to change):
pulumi up 21:29:35
Enter your passphrase to unlock config/secrets
(set PULUMI_CONFIG_PASSPHRASE or PULUMI_CONFIG_PASSPHRASE_FILE to remember):
Enter your passphrase to unlock config/secrets
Previewing update (dev2):
Type Name Plan Info
pulumi:pulumi:Stack pulumi_airbyte-dev2 1 warning
Diagnostics:
pulumi:pulumi:Stack (pulumi_airbyte-dev2):
warning: unable to detect a global setting for GCP Project.
Pulumi will rely on per-resource settings for this operation.
Set the GCP Project by using:
`pulumi config set gcp:project <project>`
Resources:
9 unchanged
Do you want to perform this update? yes
Updating (dev2):
Type Name Status Info
pulumi:pulumi:Stack pulumi_airbyte-dev2 1 warning
Diagnostics:
pulumi:pulumi:Stack (pulumi_airbyte-dev2):
warning: unable to detect a global setting for GCP Project.
Pulumi will rely on per-resource settings for this operation.
Set the GCP Project by using:
`pulumi config set gcp:project <project>`
Outputs:
bigquery_destination_id: "5381d0a9-9fbe-49f6-873a-416cf956b4f4"
connection_id : "343c3846-d88d-444f-8697-0586cb8e80b7"
jira_dataset : "projects/[secret]/datasets/airbyte_pulumi_2"
jira_servce_account : "projects/[secret]/serviceAccounts/bqowner2@[secret].iam.gserviceaccount.com"
jira_source_id : "42bd42fc-9e74-4854-a099-6523bda38df1"
Resources:
9 unchanged
Duration: 1s
In the output here, you can also see the results of all the pulumi export
that are in the code in the Outputs section of the output.
The full example is here. Feel free to copy, clone, etc.