Curating Efficient Distributed Application Runtime (Dapr) Workflows

Distributed Application Runtime (Dapr) is a portable and event-driven runtime that commoditizes some of the problems developers face with distributed systems and microservices daily.

Imagine there are 3-4 different microservices. As part of communication between these services, developers must think about:

These challenges are recurring, but with Dapr's Service-to-Service Invocation building block, they are seamlessly abstracted.

Dapr divides such capabilities into components that can be invoked using a building block, aka API.

Components Overview

Below mentioned are a subset of components that Dapr supports. 

Component Description
Service-to-Service  Facilitates communication between microservices: It encapsulates handling failures, observability, and applying policies (responsible for enforcing restrictions on who is allowed to call)
Secrets Facilitate communication with cloud secrets and Kubernetes secrets provider stores
Workflows With the Workflows component, developers can run long-running workloads distributed across nodes.
Publish/Subscribe Similar to the producer/consumer pattern, with this component messages can be produced to a topic and listeners can consume from the subscribed topic.

Let's dive into the workflow component.

Workflow Component

Problem

An example of a simple Workflow can be a scheduled job that moves data between data sources. The complexity increases when child workflows must be triggered as part of the parent workflow and the workflow author also becomes responsible for saving, resuming, and maintaining the state and the schema.

With the Dapr Workflow component, most of the state management is abstracted out, allowing developers to focus only on the business logic.

Key Terms

Workflow will compromise both these activities.

Workflow Patterns

Pattern 1

Workflow pattern 1

The parent workflow parallelly schedules multiple child activities.

Pattern 2

Workflow pattern 2

In this scenario, the workflow schedules Activity 1 and then passes its output to Activity 2 for further processing.

Pattern 3Workflow pattern 3

Here, the parent workflow schedules another child workflow which in turn schedules some activities.

Example

Let's explore an example using C# and Dapr to schedule workflows that read data from Blob storage.

Step 1

Import the Dapr packages into csproj.

XML
 
<ItemGroup>
  # https://www.nuget.org/packages/Dapr.AspNetCore
  <PackageReference Include="Dapr.AspNetCore" Version="1.14.0" ></PackageReference>
  # https://www.nuget.org/packages/Dapr.Workflow
  <PackageReference Include="Dapr.Workflow" Version="1.14.0" ></PackageReference>
</ItemGroup>


Step 2: Configuring Workflow and Activity

  1. Add workflow and activities to the Dapr Workflow extension.
  2. "Register Workflow" is used to register workflows.
  3. "Register Activity" is used to register activity.
C#
 
/// <summary>
 /// Configure workflow extension.
 /// </summary>
 public static class DaprConfigurationExtension
 {
     /// <summary>
     /// Configure Workflow extension.
     /// </summary>
     /// <param name="services">services.</param>
     /// <returns>IServiceCollection.</returns>
     public static IServiceCollection ConfigureDaprWorkflows(this IServiceCollection services)
     {
         services.AddDaprWorkflow(options =>
         {
             // Note that it's also possible to register a lambda function as the workflow
             // or activity implementation instead of a class.
             options.RegisterWorkflow<BlobOrchestrationWorkflow>();

             // These are the activities that get invoked by the Dapr workflow(s).
             options.RegisterActivity<BlobDataFetchActivity>();
         });

         return services;
     }
 }


Step 3: Writing the First Workflow

The Blob Orchestration Workflow implements Workflow coming from Dapr NuGet with input and output parameters. 

The input here is the name of the blob, which is a string, and the output is content from the blob, nothing but a list of lines.

C#
 
  /// <summary>
   /// Dapr workflow responsible for peforming operations on blob.
   /// </summary>
   public class BlobOrchestrationWorkflow : Workflow<string, List<string>>
   {
       /// <inheritdoc/>
       public async override Task<List<string>> RunAsync(WorkflowContext context, string input)
       {
           ArgumentNullException.ThrowIfNull(context);
           ArgumentNullException.ThrowIfNull(input);

           List<string> identifiers = await context.CallActivityAsync<List<string>>(
               name: nameof(BlobDataFetchActivity),
               input: input).ConfigureAwait(false); // state is saved

           return identifiers;
       }
   }


Step 4: Writing the First Activity

Like Workflow, Activity also takes input and output. In this case, input is the blob name, and output is the list of lines from the blob.

C#
 
/// <summary>
/// Fetch identifiers from Blob.
/// </summary>
public class BlobDataFetchActivity : WorkflowActivity<string, List<string>>
{
    private readonly IBlobReadProcessor readProcessor;

    /// <summary>
    /// Initializes a new instance of the <see cref="BlobDataFetchActivity"/> class.
    /// </summary>
    /// <param name="blobReadProcessor">read blob data.</param>
    public BlobDataFetchActivity(IBlobReadProcessor blobReadProcessor)
    {
        this.readProcessor = blobReadProcessor;
    }

    /// <inheritdoc/>
    public override async Task<List<string>> RunAsync(WorkflowActivityContext context, string input)
    {
        return await this.readProcessor.ReadBlobContentAsync<List<string>>(input).ConfigureAwait(false); // state is saved
    }
}


Step 5: Scheduling the First Workflow

C#
 
public class DaprService
{
    // Workflow client injected using Dependency Injection.
    private readonly DaprWorkflowClient daprWorkflowClient;

    /// <summary>
    /// Initializes a new instance of the <see cref="QueuedHostedService{T}"></see> class.
    /// </summary>
    /// <param name="daprWorkflowClient">Dapr workflow client.</param>
    public QueuedHostedService(DaprWorkflowClient daprWorkflowClient)
    {
        this.daprWorkflowClient = daprWorkflowClient;
    }

    /// <summary>
    /// Execute Dapr workflow.
    /// </summary>
    /// <param name="message">string Message.</param>
    /// <returns>Task.</returns>
    public async Task ExecuteWorkflowAsync(string message)
    {
        string id = Guid.NewGuid().ToString();
        
        // Schedule the Dapr Workflow.
        await this.daprWorkflowClient.ScheduleNewWorkflowAsync(
            name: nameof(NetworkRecordIngestionWorkflow),
            instanceId: id,
            input: message).ConfigureAwait(false);
		
        WorkflowState state = await this.daprWorkflowClient.GetWorkflowStateAsync(
                instanceId: id,
                getInputsAndOutputs: true).ConfigureAwait(false);

		// Track the workflow state until completion.
        while (!state.IsWorkflowCompleted)
        {
            state = await this.daprWorkflowClient.GetWorkflowStateAsync(
                        instanceId: id,
                        getInputsAndOutputs: true).ConfigureAwait(false);
        }
    }
}


Best Practices

JSON
 
{
  "blobName": "dapr-blob",
  "createdOn": "2024-12-11T23:00:00.11212Z"
}


After a restart, we resend the input with a different "created on" timestamp. Even though we’ve already saved the output for the blob name, the new timestamp qualifies this as a new payload, prompting the output to be recomputed. If the "created on" timestamp was omitted, we could retrieve the state from the state store without making an additional I/O call.

JSON
 
{
  "blobName": "dapr-blob",
  "createdOn": "2024-12-11T23:01:00.11212Z"
}


Workflow interaction with data other than the state must happen through Activities only.

 

 

 

 

Top