Microservices: Consistent State Propagation With Debezium Engine

This article outlines the main challenges of state propagation across different (micro)services together with architectural patterns for facing those challenges, providing an actual implementation using the Debezium Engine.

But first of all, let's define some of the basic concepts that will be used during the article.

Setting the Stage

As a playground for this article, we will use two bounded contexts defined as part of an e-learning platform: 

Between both contexts, there is an asynchronous, event-driven, relationship, so that whenever a course is created in the Course Management bounded context, an event is published and eventually received by the Rating System, which adapts the inbound event to its internal domain model using an anti-corruption layer (ACL) and creates an empty rating for that Course. The next image outlines this (simplified) context mapping: 

Bounded Context Mapping of the two services

Bounded Context Mapping of the two services

The Challenge

The scenario is apparently pretty simple: we can just publish a message (CourseCreated) to a message broker (e.g., RabbitMQ) whenever a Course is created, and by having the Rating System subscribed to that event, it will eventually be received and processed by the downstream service. However, it is not so simple, and we have several "what-if" situations, like:

  1. What if the Course creation transaction is eventually rolled back (e.g., the database does not accept the operation) but the message is correctly published to the message broker?
  2. What if the Course is created and persisted, but the message broker does not accept the message (or it is not available)?

In a nutshell, the main challenge here is how to ensure, in a distributed solution, that both the domain state mutation and the corresponding event publication happen in a consistent and atomic operation so that both or none should happen.

There are certainly solutions that can be implemented and message producer or message consumer sides to try to solve these situations, like retries, publishing compensation events, and manually reverting the write operation in the database. However, most of them require the software engineers to have too many scenarios in mind, which is error-prone and reduces codebase maintainability.

Another alternative is implementing a 2-phase-commit solution at the infrastructure level, making the deployment and operations of the underlying infrastructure more complex, and most likely, forcing the adoption of expensive commercial solutions.

During the rest of the article, we will focus on a solution based on the combination of two important patterns in distributed systems: Transactional Outbox and Change Data Capture, providing a reference implementation that will allow software engineers to focus on what really matters: providing domain value.

Applicable Architecture Patterns

As described above, we need to ensure that state mutation and the publication of a domain event are atomic operations. This can be achieved by the combination of two patterns which are nicely explained by Chris Richardson in his must-read Microservices Patterns book; therefore, I will not explain them in detail here.

Transactional Outbox Pattern

Transactional Outbox focuses on persisting both state mutation and corresponding event(s) in an atomic database operation. In our case, we will leverage the ACID capabilities of a relational database, with one transaction that includes two write operations, one in the domain-specific table, and another that persists the events in an outbox (supporting) table.

This ensures that we will achieve a consistent state of both domains and the corresponding events. This is shown in the next figure:

Transactional Outbox Pattern

Transactional Outbox Pattern

Change Data Capture With Transaction Log Tailing

Once the events are available in the Outbox table, we need a mechanism to detect new events stored in the Outbox (Change Data Capture or CDC) and publish them to external consumers (Transaction Log Tailing Message Relay).

The Message Relay is responsible for detecting (i.e., CDC) new events available in the outbox table and publishing those events for external consumers via message broker (e.g., RabbitMQ, SNS + SQS) or event stream (e.g., Kafka, Kinesis).

There are different Change Data Capture (CDC) techniques for the Message Relay to detect new events available in the outbox. In this article, we will use the Log Scanners approach, named Transaction Log Tailing by Chris Richardson, where the Message Relay tails the database transaction log to detect the new messages that have been appended to the Outbox table. I personally prefer this approach since it reduces the amount of manual work, but might not be available for all databases.

The next image illustrates how the Message Relay integrates with the Transactional Outbox:

Transactional Outbox in combination with Transaction Log Tailing CDC

Transactional Outbox in combination with Transaction Log Tailing CDC

One of the main goals of this solution is to ensure that the software engineers of the two bounded contexts only need to focus on the elements with orange color in the diagram above; the grey components are just infrastructure elements that shall be transparent for the developers.

So, how do we implement the Transaction Log Tailing Message Relay?

Debezium

Debezium is a Log Scanner type change data capture solution that provides connectors for several databases, creating a stream of messages out of the changes detected in the database's transaction log. Debezium comes in two flavors:

In this example, we will use Debezium Embedded, due to its simplicity (i.e., no Kafka instance is needed) but at the same time, it is robust enough to provide a suitable solution. 

The first time a Debezium instance starts to track a database, it takes a snapshot of the current data to be used as a basis, once completed, only the delta of changes from the latest stored offset will be processed.

Debezium is highly configurable, making it possible to shape its behavior to meet different expectations, allowing, for instance, to define:

Some of these properties will be analyzed later in the article.

All Pieces Together

The next image shows the overall solution from the deployment perspective:

Overall solution from the deployment perspective

Show Me the Code

All the code described in this article can be found in my personal GitHub repository. 

Overall Project Structure

The code provided to implement this example is structured as a multi-module Java Maven project, leveraging Spring Boot and following a hexagonal architecture-like structure.

There are three main package groups:

1. Toolkit Supporting Context

Modules providing shared components and infrastructure-related elements used by the functional bounded contexts (in this example Course Management and Rating System). For instance, the transactional outbox and the Debezium-based change data capture are shared concerns, and therefore their code belongs to these modules.

Toolkit Supporting Context

Where:

2. Course Management Bounded Context

These modules conform to the Course Management bounded context. The module adheres to hexagonal architecture principles, similar to the structure already used in my previous article about repository testing.

Course Management Bounded Context

Where:

3. Rating System Bounded Context

For the sake of simplicity, this bounded context is partially implemented with only an inbound AMPQ-based adapter for receiving the messages created by the Course Management service when a new course is created and published by the CDC service (toolkit-state-propagation-debezium-runtime).

Rating System Bounded Context

Where:

Request Flow

This section outlines the flow of a request for creating a course, starting when the user requests the creation of a course to the backend API and finalizing with the (consistent) publication of the corresponding event in the message broker.

This flow has been split into three phases.

Phase 1: State Mutation and Domain Events Creation

This phase starts with the request for creating a new Course Definition. The HTTP POST request is mapped to a domain command and processed by its corresponding command handler defined in course-management-application. Command handlers are automatically injected into the provided CommandBus implementation; in this example, the CommandBusInProcess defined in the toolkit-core module:

Java
 
import io.twba.tk.command.CommandHandler;
import jakarta.inject.Inject;
import jakarta.inject.Named;

@Named
public class CreateCourseDefinitionCommandHandler implements CommandHandler<CreateCourseDefinitionCommand> {

    private final CourseDefinitionRepository courseDefinitionRepository;

    @Inject
    public CreateCourseDefinitionCommandHandler(CourseDefinitionRepository courseDefinitionRepository) {
        this.courseDefinitionRepository = courseDefinitionRepository;
    }

    @Override
    public void handle(CreateCourseDefinitionCommand command) {
        if(!courseDefinitionRepository.existsCourseDefinitionWith(command.getTenantId(), command.getCourseDescription().title())) {
            courseDefinitionRepository.save(CourseDefinition.builder(command.getTenantId())
                    .withCourseDates(command.getCourseDates())
                    .withCourseDescription(command.getCourseDescription())
                    .withCourseObjective(command.getCourseObjective())
                    .withDuration(command.getCourseDuration())
                    .withTeacherId(command.getTeacherId())
                    .withPreRequirements(command.getPreRequirements())
                    .withCourseId(command.getCourseId())
                    .createNew());
        }
        else {
            throw new IllegalStateException("Course definition with value " + command.getCourseDescription().title() + " already exists");
        }
    }
}


The command handler creates an instance of the CourseDefinition entity. The business logic and invariants (if any) of creating a Course Definition are encapsulated within the domain entity. The creation of a new instance of the domain entity also comes with the corresponding CourseDefinitionCreated domain event:

Java
 
@Getter
public class CourseDefinition extends MultiTenantEntity {
    /* ... */
    public static class CourseDefinitionBuilder {

        /* ... */

        public CourseDefinition createNew() {
            //new instance, generate domain event
            CourseDefinition courseDefinition = new CourseDefinition(tenantId, 0L, id, courseDescription, courseObjective, preRequirements, duration, teacherId, courseDates, CourseStatus.PENDING_TO_REVIEW);
            var courseDefinitionCreatedEvent = CourseDefinitionCreatedEvent.triggeredFrom(courseDefinition);
            courseDefinition.record(courseDefinitionCreatedEvent); //record event in memory
            return courseDefinition;
        }
    }
}


The event is "recorded" into the created course definition instance. This method is defined in the abstract class Entity of the toolkit-core module

Java
 
public abstract class Entity extends ModelValidator implements ConcurrencyAware {
    @NotNull
    @Valid
    protected final List<@NotNull Event<? extends DomainEventPayload>> events;
    private Long version;

    public Entity(Long version) {
        this.events = new ArrayList<>();
        this.version = version;
    }

    /*...*/

    protected void record(Event<? extends DomainEventPayload> event) {
        event.setAggregateType(aggregateType());
        event.setAggregateId(aggregateId());
        event.setEventStreamVersion(Objects.isNull(version)?0:version + events.size());
        this.events.add(event);
    }


    public List<Event<? extends DomainEventPayload>> getDomainEvents() {
        return Collections.unmodifiableList(events);
    }
}


Once the course definition instance is in place, the command handler will persist the instance in the course definition repository, starting the second phase of the processing flow.

Phase 2, Persisting the State: Course Definition and Events in the Outbox

Whenever a domain entity is saved in the repository, the domain events associated with the domain state mutation (in this example, the creating of a CourseDefinition entity) are temporarily appended to an in-memory, ThreadLocal, buffer. This buffer resides in the DomainEventAppender of toolkit-core.

Java
 
public class DomainEventAppender {

    private final ThreadLocal<List<Event<? extends DomainEventPayload>>> eventsToPublish = new ThreadLocal<>();

    /*...*/

    public void append(List<Event<? extends DomainEventPayload>> events) {
        //add the event to the buffer, later this event will be published to other bounded contexts
        if(isNull(eventsToPublish.get())) {
            resetBuffer();
        }
        //ensure event is not already in buffer
        events.stream().filter(this::notInBuffer).map(this::addEventSourceMetadata).forEach(event -> eventsToPublish.get().add(event));
    }
    /*...*/
}


Events are placed in this buffer from an aspect executed around methods annotated with AppendEvents. The pointcut and aspect (both in toolkit-core) look like: 

Java
 
public class CrossPointcuts {

    @Pointcut("@annotation(io.twba.tk.core.AppendEvents)")
    public void shouldAppendEvents() {}
}

@Aspect
@Named
public class DomainEventAppenderConcern {

    private final DomainEventAppender domainEventAppender;

    @Inject
    public DomainEventAppenderConcern(DomainEventAppender domainEventAppender) {
        this.domainEventAppender = domainEventAppender;
    }

    @After(value = "io.twba.tk.aspects.CrossPointcuts.shouldAppendEvents()")
    public void appendEventsToBuffer(JoinPoint jp) {
        if(Entity.class.isAssignableFrom(jp.getArgs()[0].getClass())) {
            Entity entity = (Entity)jp.getArgs()[0];
            domainEventAppender.append(entity.getDomainEvents());
        }
    }
}


The command handlers are automatically decorated before being "injected" into the command bus. One of the decorators ensures the command handlers are transactional, and another ensures when the transaction completes the events in the in-memory-thread-local buffer are published to the outbox table consistently with the ongoing transaction. The next sequence diagram shows the decorators applied to the domain-specific command handler. 

High-Level Processing Flow

High-Level Processing Flow

The outbox is an append-only buffer (as a Postgres table in this example) where a new entry for each event is added. The outbox entry has the following structure: 

Java
 
public record OutboxMessage(String uuid,
                            String header,
                            String payload,
                            String type,
                            long epoch,
                            String partitionKey,
                            String tenantId,
                            String correlationId,
                            String source,
                            String aggregateId) {
}


Where the actual event payload is serialized as a JSON string in the payload property. The outbox interface is straightforward: 

Java
 
public interface Outbox {
    void appendMessage(OutboxMessage outboxMessage);
    int partitionFor(String partitionKey);
}


The Postgres implementation of the Outbox interface is placed in the toolkit-outbox-jpa-postgres module: 

Java
 
public class OutboxJpa implements Outbox {

    private final OutboxProperties outboxProperties;
    private final OutboxMessageRepositoryJpaHelper helper;

    @Autowired
    public OutboxJpa(OutboxProperties outboxProperties, OutboxMessageRepositoryJpaHelper helper) {
        this.outboxProperties = outboxProperties;
        this.helper = helper;
    }

    @Override
    public void appendMessage(OutboxMessage outboxMessage) {
        helper.save(toJpa(outboxMessage));
    }

    @Override
    public int partitionFor(String partitionKey) {
        return MurmurHash3.hash32x86(partitionKey.getBytes()) % outboxProperties.getNumPartitions();
    }
    
    /*...*/
}


Phase 2 is completed, and now both the domain state and the corresponding event consistently persist under the same transaction in our Postgres database.

Phase 3: Events Publication

In order to make the domain events generated in the previous phase available to external consumers, the message relay implementation based on Debezium Embedded is monitoring the outbox table, so that whenever a new record is added to the outbox, the message relay creates a Cloud Event and publishes it to the RabbitMQ instance following the Cloud Event AMQP binding specification.

The following code snippet shows the Message Relay specification as defined in the toolkit-core module:

Java
 
public class DebeziumMessageRelay implements MessageRelay {

    private static final Logger log = LoggerFactory.getLogger(DebeziumMessageRelay.class);

    private final Executor executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "debezium-message-relay"));
    private final CdcRecordChangeConsumer recordChangeConsumer;
    private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;

    public DebeziumMessageRelay(DebeziumProperties debeziumProperties,
                                CdcRecordChangeConsumer recordChangeConsumer) {
        this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
                .using(DebeziumConfigurationProvider.outboxConnectorConfig(debeziumProperties).asProperties())
                .notifying(this::handleChangeEvent)
                .build();
        this.recordChangeConsumer = recordChangeConsumer;
    }

    private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent)  {
        SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
        Struct sourceRecordChangeValue= (Struct) sourceRecord.value();
        log.info("Received record - Key = '{}' value = '{}'", sourceRecord.key(), sourceRecord.value());
        Struct struct = (Struct) sourceRecordChangeValue.get(AFTER);
        recordChangeConsumer.accept(DebeziumCdcRecord.of(struct));
    }

    @Override
    public void start() {
        this.executor.execute(debeziumEngine);
        log.info("Debezium CDC started");
    }

    @Override
    public void stop() throws IOException {
        if (this.debeziumEngine != null) {
            this.debeziumEngine.close();
        }
    }

    @Override
    public void close() throws Exception {
        stop();
    }
}


As can be seen in the code snippet above, the DebeziumEngine is configured to notify a private method handleChangeEvent when a change in the database is detected. In this method, a Consumer of CdcRecord is used as a wrapper of the internal Debezium model represented by the Struct class. Initial configuration must be provided to the Debezium Engine; in the example, this is done with the DebeziumConfigurationProvider class: 

Java
 
public class DebeziumConfigurationProvider {

    public static io.debezium.config.Configuration outboxConnectorConfig(DebeziumProperties properties) {
        return withCustomProps(withStorageProps(io.debezium.config.Configuration.create()
                .with("name", "outbox-connector")
                .with("connector.class", properties.getConnectorClass())
                .with("offset.storage", properties.getOffsetStorage().getType()), properties.getOffsetStorage())
                .with("offset.flush.interval.ms", properties.getOffsetStorage().getFlushInterval())
                .with("database.hostname", properties.getSourceDatabaseProperties().getHostname())
                .with("database.port", properties.getSourceDatabaseProperties().getPort())
                .with("database.user", properties.getSourceDatabaseProperties().getUser())
                .with("database.password", properties.getSourceDatabaseProperties().getPassword())
                .with("database.dbname", properties.getSourceDatabaseProperties().getDbName()), properties)
                .with("database.server.id", properties.getSourceDatabaseProperties().getServerId())
                .with("database.server.name", properties.getSourceDatabaseProperties().getServerName())
                .with("skipped.operations", "u,d,t")
                .with("include.schema.changes", "false")
                .with("table.include.list", properties.getSourceDatabaseProperties().getOutboxTable())
                .with("snapshot.include.collection.list", properties.getSourceDatabaseProperties().getOutboxTable())
                .build();
    }

    private static Configuration.Builder withStorageProps(Configuration.Builder debeziumConfigBuilder, DebeziumProperties.OffsetProperties offsetProperties) {
        offsetProperties.getOffsetProps().forEach(debeziumConfigBuilder::with);
        return debeziumConfigBuilder;
    }
    private static Configuration.Builder withCustomProps(Configuration.Builder debeziumConfigBuilder, DebeziumProperties debeziumProperties) {
        debeziumProperties.getCustomProps().forEach(debeziumConfigBuilder::with);
        return debeziumConfigBuilder;
    }
}


The most relevant properties are outlined below:

The first thing Debezium will do after starting is take a snapshot of the current data and generate the corresponding events. After that, the offset is updated to the latest record, and the deltas (newly added, updated, or deleted records) are processed, updating the offset accordingly. Since in the provided example we are using an in-memory-based offset store, the snapshot is performed always after starting the service. Therefore, since this is not a production-ready implementation yet, there are two options:

The Message Relay is configured and initialized in the toolkit-state-propagation-debezium-runtime module. The values of the configuration properties needed by Debezium Embedded are defined in the Spring Boot properties.yaml file:

YAML
 
server:
  port: 9091

management:
  endpoints:
    web:
      exposure:
        include: prometheus, health, flyway, info
debezium:
  connector-class: "io.debezium.connector.postgresql.PostgresConnector"
  custom-props:
    "[topic.prefix]": "embedded-debezium"
    "[debezium.source.plugin.name]": "pgoutput"
    "[plugin.name]": "pgoutput"
  source-database-properties:
    db-name: "${CDC_DB_NAME}"
    hostname: "${CDC_HOST}"
    user: "${CDC_DB_USER}"
    password: "${CDC_DB_PASSWORD}"
    port: 5432
    server-name: "debezium-message-relay"
    server-id: "debezium-message-relay-1"
    outbox-table: "${CDC_OUTBOX_TABLE}:outbox_schema.outbox"
    outbox-schema: ""
  offset-storage:
    type: "org.apache.kafka.connect.storage.MemoryOffsetBackingStore"
    flush-interval: 3000
    offset-props:
      "[offset.flush.timeout.ms]": 1000
      "[max.poll.records]": 1000


The engine is started using the Spring Boot lifecycle events:

Java
 
@Component
public class MessageRelayInitializer implements ApplicationListener<ApplicationReadyEvent> {

    private final MessageRelay messageRelay;

    @Autowired
    public MessageRelayInitializer(MessageRelay messageRelay) {
        this.messageRelay = messageRelay;
    }

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        messageRelay.start();
    }
}


The change data capture records (CdcRecord) are processed by the CloudEventRecordChangeConsumer which creates the Cloud Event representation of the CDC record and publishes it through the MessagePublisher

Java
 
public class CloudEventRecordChangeConsumer implements CdcRecordChangeConsumer {

    /*...*/

    @Override
    public void accept(CdcRecord cdcRecord) {

        final CloudEvent event;

        try {
            String payload = cdcRecord.valueOf("payload");
            String uuid = cdcRecord.valueOf("uuid");
            String type = cdcRecord.valueOf("type");
            String tenantId = cdcRecord.valueOf("tenant_id");
            String aggregateId = cdcRecord.valueOf("aggregate_id");
            long epoch = cdcRecord.valueOf("epoch");
            String partitionKey = cdcRecord.valueOf("partition_key");
            String source = cdcRecord.valueOf("source");
            String correlationId = cdcRecord.valueOf("correlation_id");

            event = new CloudEventBuilder()
                    .withId(uuid)
                    .withType(type)
                    .withSubject(aggregateId)
                    .withExtension(TwbaCloudEvent.CLOUD_EVENT_TENANT_ID, tenantId)
                    .withExtension(TwbaCloudEvent.CLOUD_EVENT_TIMESTAMP, epoch)
                    .withExtension(TwbaCloudEvent.CLOUD_EVENT_PARTITION_KEY, partitionKey)
                    .withExtension(TwbaCloudEvent.CLOUD_EVENT_CORRELATION_ID, correlationId)
                    .withExtension(TwbaCloudEvent.CLOUD_EVENT_GENERATING_APP_NAME, source)
                    .withSource(URI.create("https://thewhiteboardarchitect.com/" + source))
                    .withData("application/json",payload.getBytes("UTF-8"))
                    .build();

            messagePublisher.publish(event);

        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}


The provided MessagePublisher is a simple RabbitMQ outbound adapter converting the Cloud Event to the corresponding AMQP message as per the Cloud Event AMQP protocol binding. 

Java
 
public class MessagePublisherRabbitMq implements MessagePublisher {

    /*...*/

    @Override
    public boolean publish(CloudEvent dispatchedMessage) {

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);

        rabbitTemplate.send("__MR__" + dispatchedMessage.getExtension(CLOUD_EVENT_GENERATING_APP_NAME), //custom extension for message routing
                dispatchedMessage.getType(),
                toAmqpMessage(dispatchedMessage));

        return true;
    }

    private static Message toAmqpMessage(CloudEvent dispatchedMessage) {
        return MessageBuilder.withBody(Objects.nonNull(dispatchedMessage.getData()) ? dispatchedMessage.getData().toBytes() : new byte[0])
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setMessageId(dispatchedMessage.getId())
                .setHeader(CLOUD_EVENT_AMQP_BINDING_PREFIX + CLOUD_EVENT_TENANT_ID, dispatchedMessage.getExtension(CLOUD_EVENT_TENANT_ID))
                .setHeader(CLOUD_EVENT_AMQP_BINDING_PREFIX + CLOUD_EVENT_TIMESTAMP, dispatchedMessage.getExtension(CLOUD_EVENT_TIMESTAMP))
                .setHeader(CLOUD_EVENT_AMQP_BINDING_PREFIX + CLOUD_EVENT_PARTITION_KEY, dispatchedMessage.getExtension(CLOUD_EVENT_PARTITION_KEY))
                .setHeader(CLOUD_EVENT_AMQP_BINDING_PREFIX + CLOUD_EVENT_SUBJECT, dispatchedMessage.getSubject())
                .setHeader(CLOUD_EVENT_AMQP_BINDING_PREFIX + CLOUD_EVENT_SOURCE, dispatchedMessage.getSource().toString())
                .build();
    }
}


After Phase 3, the events are published to the producer's (Course Management Service) message relay RabbitMQ exchange defined under the convention __MR__<APP_NAME>. In our example, __MR__course-management, messages are routed to the right exchange based on a custom cloud event extension as shown in the previous code snippet.

__MR__course-management Visit my GitHub repository and check the readme file to see how to spin up the example.

Alternative Solutions

This example makes use of Debezium Embedded to provide a change data capture and message relay solution. This works fine for technologies supported by Debezium through its connectors.

For non-supported providers, alternative approaches can be applied:

Adding any of those alternatives in this example would simply just provide specific implementations of the MessageRelay interface, without additional changes in any of the other services.

Conclusion

Ensuring consistency in the state propagation and data exchange between services is key for providing a reliable distributed solution. Usually, this is not carefully considered when designing distributed, event-driven software, leading to undesired states and situations especially when those systems are already in production.

By the combination of the transactional outbox pattern and message relay, we have seen how this consistency can be enforced, and by using the hexagonal architecture style, the additional complexity of implementing those patterns can be easily hidden and reused across bounded context implementations.

The code of this article is not yet production-ready, as concerns like observability, retries, scalability (e.g., with partitioning), proper container orchestration, etc. are still pending. Subsequent articles will go through those concerns using the provided code as a basis.

 

 

 

 

Top