Writing a Simple Pulumi Provider for Airbyte

In this quick tutorial, I will go over Pulumi by writing a very simple Provider for Airbyte.

The simplified use case is as follows: I want to programmatically (Infrastructure as Code) create, update, and delete Sources, Destinations, and Connections in Airbyte instead of using the UI.

Airbyte has its own Terraform Provider. However, at the time of writing, it is somewhat not entirely mature and difficult to debug, particularly if you are not used to writing your own terraform providers

Airbyte offers a great and simple REST API, which, in fact, is what is used under the hood by the Terraform provider.

Instead of using the Terraform provider, I was planning to use the API directly, as it is easier to try, configure, and debug in case of getting errors. You simply use any REST or HTTP client (I was planning on using requests) and invoke the different API endpoints. For example, for creating a BigQuery Destination, you would do an HTTP POST similar to the following (assuming you are connecting to api.airbyte.com):

import requests

url = "https://api.airbyte.com/v1/destinations"

payload = {
    "configuration": {
        "destinationType": "bigquery",
        "dataset_location": "US",
        "loading_method": { "method": "Standard" },
        "project_id": "xxx",
        "dataset_id": "abc",
        "credentials_json": "jjjjjjj"
    },
    "name": "destination"
}
headers = {
    "accept": "application/json",
    "content-type": "application/json"
}

response = requests.post(url, json=payload, headers=headers)

print(response.text)


That is all and good, simple, and easy to understand and debug. However, using Terraform has its advantages. One advantage of using Terraform is the state management. Using the Terraform provider you declare your resources (Sources, Destinations, and Connections) and the provider makes sure to create them in Airbyte. If you modify the Terraform configuration, the provider makes the correct modification (PATCH HTTP requests) in Airbyte. If you delete the resource in the Terraform file, again the provider notices this and requests the deletion of the component in Airbyte.

So I wanted to see if there was a way to combine the best of both worlds: the developer-friendly simplicity of the Python code with something similar to the state management from Terraform.

Enter Pulumi, which allowed me to do exactly that, and in a very elegant, flexible, and secure way.

On its home page, Pulumi says:

Infrastructure as Code in any programming language …
Author infrastructure code using programming languages you know and love. Write statements to define infrastructure using your IDE with autocomplete, type checking, and documentation.

Awesome! That sounded promising. So I established my PoC:

Setting Up the Project

brew install pulumi/tap/pulumi
mkdir pulumi_airbyte
cd pulumi_airbyte
pulumi  login  --local
pulumi new python


Follow the wizard. You should end up with a directory content like:

Pulumi.dev.yaml Pulumi.yaml __main__.py requirements.txt venv/


Writing the Provider

In Airbyte we define sources, destinations, and connections. We will create a very simple Pulumi provider that allows us to create these:

import pulumi
from pulumi.dynamic import Resource, ResourceProvider, CreateResult, UpdateResult
import requests

AIRBYTE_API_URL = "http://localhost:8006/v1"  # Update with your Airbyte API URL


class SourceProvider(ResourceProvider):
    def create(self, inputs):
        headers = {
            "accept": "application/json",
            "content-type": "application/json"
        }
        response = requests.post(f"{AIRBYTE_API_URL}/sources", json=inputs, headers=headers)
        response_data = response.json()
        source_id = response_data["sourceId"]
        outputs = {**inputs, "id": source_id}
        return CreateResult(id_=source_id, outs=outputs)

    def update(self, id, olds, news):
        update_data = {**news, "sourceId": id}
        requests.patch(f"{AIRBYTE_API_URL}/sources/{id}", json=update_data)
        return UpdateResult(outs=news)

    def delete(self, id, props):
        requests.delete(f"{AIRBYTE_API_URL}/sources/{id}", json={"sourceId": id})


class DestinationProvider(ResourceProvider):
    def create(self, inputs):
        response = requests.post(f"{AIRBYTE_API_URL}/destinations", json=inputs)
        response_data = response.json()
        destination_id = response_data["destinationId"]
        outputs = {**inputs, "id": destination_id}
        return CreateResult(id_=destination_id, outs=outputs)

    def update(self, id, olds, news):
        update_data = {**news, "destinationId": id}
        requests.patch(f"{AIRBYTE_API_URL}/destinations/{id}", json=update_data)
        return UpdateResult(outs=news)

    def delete(self, id, props):
        requests.delete(f"{AIRBYTE_API_URL}/destinations/{id}", json={"destinationId": id})


class ConnectionProvider(ResourceProvider):
    def create(self, inputs):
        response = requests.post(f"{AIRBYTE_API_URL}/connections", json=inputs)
        response_data = response.json()
        connection_id = response_data["connectionId"]
        outputs = {**inputs, "id": connection_id}
        return CreateResult(id_=connection_id, outs=outputs)

    def update(self, id, olds, news):
        print("UPDATING WITH: ")
        response = requests.patch(f"{AIRBYTE_API_URL}/connections/{id}", json=news)
        print("RESPONSE: " + response.text + " " + str(response.status_code))
        return UpdateResult(outs=news)

    def delete(self, id, props):
        requests.delete(f"{AIRBYTE_API_URL}/connections/{id}", json={"connectionId": id})


We see we have defined 3 ResourceProvider types. In each of them, we can see we defined the logic to create, update and delete resources.

Next, we need to define the Resource types to create the that will use each of these providers. So we create a Source a Destination and a Connection resource. We will also create a more specific JiraResource to demonstrate that if we want we can create full Python objects to specify our resources and their properties.

class Source(Resource):
    def __init__(self, the_name, opts=None, **props):
        super().__init__(SourceProvider(), the_name, props, opts)


class Destination(Resource):
    def __init__(self, the_name, opts=None, **props):
        super().__init__(DestinationProvider(), the_name, props, opts)


class Connection(Resource):
    def __init__(self, the_name, opts=None, **props):
        super().__init__(ConnectionProvider(), the_name, props, opts)


class JiraSource(Source):
    def __init__(self, the_name, *, workspace, name, api_token, domain, email, opts=None):
        configuration = {"workspaceId": workspace, "name": name,
                         "configuration": {"sourceType": "jira",
                                           "api_token": api_token,
                                           "domain": domain,
                                           "email": email}}
        super().__init__(the_name, opts, **configuration)


You can see we extend the Resource class from Pulumi.

We have now defined all the types we need. This is now a working provider. The next step is to define the actual resources.

We will define:

Defining the GCP Resources

Pulumi offers a lot of providers already made. In particular, we will use the out-of-the-box GCP Provider.

Let’s define the dataset and service account in a file named gcp_resources.py:

import pulumi  
import pulumi_gcp as gcp  
from dotenv import load_dotenv  
  
load_dotenv()  
  
  
def gcp_resources():  
  config = pulumi.Config("pulumi_airbyte")  
  bqowner = gcp.serviceaccount.Account("bqowner2",   account_id="bqowner2", project=config.require_secret('gcpProject'))  
  
  dataset = gcp.bigquery.Dataset("dataset",  
  project=config.require_secret('gcpProject'),  
  dataset_id="airbyte_pulumi_2",  
  friendly_name="test",  
  description="This is a test description",  
  location="europe-west2",  
  default_table_expiration_ms=3600000,  
  labels={  
  "env": "default",  
  },  
  accesses=[  
    gcp.bigquery.DatasetAccessArgs( role="OWNER",  
    user_by_email=bqowner.email,  
    ),  
   ])  
  
 pulumi.export("jira_dataset", dataset.id)  
 pulumi.export("jira_servce_account", bqowner.id)  
 return {"dataset": dataset}


Next, let’s define the Airbyte resources in a file named resources.py.

import pulumi  
from provider.airbyte_provider import Source, Destination, Connection, JiraSource  
from dotenv import load_dotenv  
from gcp_resources import gcp_resources  
  
load_dotenv()  
  
  
def pulumi_execution(gcp_resources):  
 config = pulumi.Config("pulumi_airbyte")  
 dataset = gcp_resources["dataset"]  
 workspace_id = config.require('airbyte_workspace'),  
  # Jira Source configuration  
  jira_source_config = {"workspaceId": workspace_id, "name": "Personal Jira 8",  
  "configuration": {"sourceType": "jira",  
  "api_token": config.require_secret('jiraApiToken'),  
  "domain": config.require_secret('jira_domain'),  
  "email": config.require_secret('jira_email')}}  
  
  # BigQuery Destination configuration  
  bigquery_destination_config = {"workspaceId": workspace_id, "name": "BigQuery Destination 3xxx", "configuration": {  
  "destinationType": "bigquery",  
  "dataset_location": "europe-west2",  
  "project_id": config.require_secret('gcpProject'),  
  "dataset_id": dataset.dataset_id,  
  "credentials_json": config.require_secret('serviceAccountJson'),  
  "loading_method": {  
  "method": "Standard"  
  }  
 } }  
  # Create Jira Source  
  jira_source = Source("jiraSource", **jira_source_config)  
  
  # Create BigQuery Destination  
  bigquery_destination = Destination("bigQueryDestination", **bigquery_destination_config)  
  
  # Create Connection linking Jira Source and BigQuery Destination  
  connection_config = {  
  "name": "The Connection 5",  
  "sourceId": jira_source.id,  
  "destinationId": bigquery_destination.id,  
  "schedule": {"scheduleType": "manual"},  
  "namespaceDefinition": "destination",  
  "namespaceFormat": None,  
  "nonBreakingSchemaUpdatesBehavior": "ignore",  
  "configurations": {"streams": [  
 {  "syncMode": "full_refresh_overwrite",  
  "name": "issues"  
  }  
 ]} }  
 connection = Connection("jiraToBigQueryConnection2", **connection_config)  
 
  
 pulumi.export("jira_source_id", jira_source.id)  
 pulumi.export("bigquery_destination_id", bigquery_destination.id)  
 pulumi.export("connection_id", connection.id)  
 pulumi.export("jira_source2_id", jira_source2.id)


A thing to note:

We’ll now see how the two resource files are linked together.

For standard execution, Pulumi will look for a __main__.py file. Let’s define it here:

from resources import pulumi_execution  
from gcp_resources import gcp_resources  
  
gcp_stuff = gcp_resources()  
pulumi_execution(gcp_stuff)


This is all the code needed. A couple of pieces to note in the code chunks are the use of:

config.require_secret('jiraApiToken')


And:

config.require('airbyte_workspace')


These are generated using more pulumi cli commands. In particular:

 pulumi config set pulumi_airbyte:airbyte_workspace "xxxxxxx"


For normal non-secret configs and:

pulumi config set --secret pulumi_airbyte:jira_email "xxx@gmail.com"


For secret encrypted configurations.

Then we execute pulumi up to provision. (The following output is on my machine after I already created them, so you can see pulumi correctly says that there is nothing to change):

 pulumi up                                                                                                                                    21:29:35
Enter your passphrase to unlock config/secrets
    (set PULUMI_CONFIG_PASSPHRASE or PULUMI_CONFIG_PASSPHRASE_FILE to remember):  
Enter your passphrase to unlock config/secrets
Previewing update (dev2):
     Type                 Name                 Plan     Info
     pulumi:pulumi:Stack  pulumi_airbyte-dev2           1 warning

Diagnostics:
  pulumi:pulumi:Stack (pulumi_airbyte-dev2):
    warning: unable to detect a global setting for GCP Project.
    Pulumi will rely on per-resource settings for this operation.
    Set the GCP Project by using:
        `pulumi config set gcp:project <project>`

Resources:
    9 unchanged

Do you want to perform this update? yes
Updating (dev2):
     Type                 Name                 Status     Info
     pulumi:pulumi:Stack  pulumi_airbyte-dev2             1 warning

Diagnostics:
  pulumi:pulumi:Stack (pulumi_airbyte-dev2):
    warning: unable to detect a global setting for GCP Project.
    Pulumi will rely on per-resource settings for this operation.
    Set the GCP Project by using:
        `pulumi config set gcp:project <project>`

Outputs:
    bigquery_destination_id: "5381d0a9-9fbe-49f6-873a-416cf956b4f4"
    connection_id          : "343c3846-d88d-444f-8697-0586cb8e80b7"
    jira_dataset           : "projects/[secret]/datasets/airbyte_pulumi_2"
    jira_servce_account    : "projects/[secret]/serviceAccounts/bqowner2@[secret].iam.gserviceaccount.com"
    jira_source_id         : "42bd42fc-9e74-4854-a099-6523bda38df1"

Resources:
    9 unchanged

Duration: 1s


In the output here, you can also see the results of all the pulumi export that are in the code in the Outputs section of the output.

The full example is here. Feel free to copy, clone, etc.

 

 

 

 

Top