How To Get Closer to Consistency in Microservice Architecture

When it comes to transactions, the first thing that comes to mind is ACID. Actually ACID is an important 4-point concept:

ACID makes sense in that every system has its own database and everything is in one module of codes.  However, in the microservice-based architectures, we have multiple loosely coupled modules where each one has its own database, transaction, and of course, ACID, but they are connected. There are still scenarios that involve multiple microservices, and the completion of the scenario depends on the completion of the tasks of each of these microservices. Ideally, all microservices perform their tasks correctly; but if one microservice fails, the whole scenario must be failed. Conceptually we need to have a general transaction between multiple independent microservices. 

For better understanding, let's look at a real-world scenario.  This scenario is a regular microservice-based ordering system that contains the following microservices:

1- OrderService, which is responsible for registering a customer's order.

2- PaymentService, which is responsible for giving order money from a customer.

3- RestaurantService, which is responsible for customer order preparation.

4- DeliveryService, which puts orders in the delivery queue.

Regular microservice-based ordering system

Suppose in this simple scenario, the customer registers a food order through the OrderService so that this microservice receives order information from the customer and stores it in its own database. The next step is the payment process, which is performed by PaymentSerivce. This microservice receives customers' card information and pays. If everything was correct and payment was made, the customer's order can be complete; but what happens if there is not enough money on the customer's card, or the payment gateway is cut off? We have an unsuccessful payment, but still a pending order in OrderService.  We would need to find a way to cancel that pending payment in OrderService database. To put it simply, we need a transaction between microservices.

Distributed Transactions

There are several ways to manage transactions in microservice-based architecture, the most prominent of which is the distributed transactions. A distributed transaction is based on operations on data that are performed across multiple databases. Distributed transactions can be handled in two ways: first, coordinated across multiple separate nodes connected through a general network; and second, with a single server that contains multiple databases.

Two-Phase Commit

A two-phase commit (2PC) is the most famous protocol of distributed transactions. In implementing this protocol, we assume that one process will function as the coordinator and the rest as cohorts (the coordinator may be the one that initiated the transaction, but that's not necessary). We further assume that there is stable storage and a write-ahead log at each site. Furthermore, we assume that no machine involved crashes forever. The protocol works as follows (the coordinator is ready to commit and needs to ensure that everyone else will do so as well):

Phase

two phase commits steps

coordinator

cohort

Phase 1

write prepare commit message to the log

Work on transaction; when done, wait for prepare message                      

Send prepare to commit message

 

Wait for reply

Receive message, when transaction is ready to commit, write agree to commit or abort to  log 

Send agree or abort reply 

Phase 2

Write commit message to the log

Wait for commit message 

Send commit or abort message

Receive commit or abort message 

Wait for all cohorts to respond

if  commit was received, write “commit” to the log, release all locks, and update database else if abort received undo all changes 

Send done message

Cleanup all state, Done.

 

Two-phase commit and similar protocols are not good options for transaction in microservice-based architecture for several reasons.  The most important reason is that one node is responsible for the coordination of transactions between other nodes. Obviously, there is a single point of failure, and if any problem occurred for the coordinator node, the whole transaction management system will be in trouble. The second important problem is that the transaction system response time depends on the slowest node's response time. In addition, the following can be mentioned as other problems of 2PC:

SAGA

SAGA pattern was first introduced to the world in 1987 by Hector Garcia-Molina and  Kenneth Salem in an article named SAGAS. The idea behind it is quite simple and based on the sequence of transactions that can be interleaved with other transactions. Generally, microservice-based architectures consist of smaller services and each service has its own transaction. SAGA provides the mechanisms for interleaving these small transactions with each other. SAGA's architecture pattern manages general transactions using a sequence of local transactions. Each local transaction participates in SAGA as a single independent unit of work. The key point is that SAGA guarantees that either all operations are completed successfully or all participants should be rolled back to their local transaction. In regular and RDBMS-based systems, commit and rollback of transactions are done automatically. However, in SAGA, the situation is different and every unit of the SAGA ecosystem should manage, commit, and rollback by itself.  Actually, every microservice that has been involved in the SAGA ecosystem should expose two major services: first, a service that represents the main functionality that microservice wants to participate with; and second, the rollback service that reverses that first service change in the local database, which is called compensation service. For example, in the OrderService, the main service is CREATE ORDER, and the compensation service is CANCEL ORDER. Each SAGA-based transaction consists of n transaction and n compensation. This order system has been considered as a case study in this article. As you can see in the below picture, we have four microservices, so we have a four-compensation service in addition to four main transactions.

assumption 1 : for  n>0 if  T(n+1) fails then all T(1)..T(n) should be failed =>All C(1)..C(n) must be called.

assumption 2: compensation service must be idempotent and can not abort, they must be retried until succussed.

Four-compensation service

SAGA Implementations Strategy

SAGA regularly is implemented in two ways:

Orchestration-Based SAGA

In this approach, we have one coordinator component that can even be a participant microservice or an independent component. Indeed, an orchestrator (object) tells the participants what local transactions to execute. Suppose this scenario:

  1. Order Service receives the post/order request and creates an order through SAGA orchestrator.
  2. The SAGA orchestrator creates an Order in the PENDING state.
  3. It then sends a command to the PAYMENT SERVICE.
  4. The PAYMENT SERVICE attempts to pay the order amount through the payments gateway.
  5. If payment is either successful or fails, a reply message sends back indicating the outcome.
  6. The SAGA orchestrator either approves or rejects the Order with the response of ORDER SERVICE.
  7. If the result of step 6 is a success, the coordinator tells RESTAURANT SERVICE to start his transaction and tells ORDER SERVICE  to change the Order status to PAID.
  8. If the result of step 6 is failed, the coordinator tells ORDER SERVICE  to change the Order status to PAID.

Single Point of Failure.

This approach, as you can guess, has a famous problem that we are all afraid of: Single Point of Failure. If the coordinator fails, all ecosystems will fail.

Choreography-Based SAGA

Unlike the previous approach, in this protocol, there is not a single point of failure. In choreography-based SAGA, each local transaction publishes domain events that trigger local transactions in other services.

Practical Choreography-Based Saga With Simplifier Assumption

The ideal implementation of SAGA is choreography-based protocol. This approach relies on message brokers and event handlers in order to achieve the desired result. The event handler that we have used is Apache Kafka. Apache Kafka is a distributed open-source streaming and event handling platform that is used by many companies as well as software architectures. Kafka was initially conceived as a messaging queue and is based on an abstraction of a distributed commit log. Since being created and open-sourced by LinkedIn in 2011, Kafka has quickly evolved from messaging queue to a full-fledged event streaming platform. The most important data structure in Kafka is topic. Simply put, topic can be considered as a queue of events which everybody in the role of consumer can listen to in order to notify of new events, or can put a new event in the topic as a producer.

Let's simulate or case study with choreography and aim of Kafka.

Simplifier assumption: In this part, we assume that we have only two microservices: OrderService and PaymentService.

Assumption: There is a customer Order object with three possible statuses: PENDING, PAID, or FAILED.

  1. The customer sends his order to OrderService.
  2. OrderService puts the order in the topic (PendingOrderTopic) after processing the order and setting the status of it to PENDING. Immediately after that, the Payment Service which listens to this topic received this order and tries to pay through the payment gateways like PayPal.
  3. If the payment is successful, the state of the order is set to PAID and sent to the restaurant to prepare the order.  
  4. However, if the payment is failed for any reason, the order state changes to FAILED and is placed in the topic with the name (FailedPaymentTopic). Finally, the OrderService which listens to this topic receives this event and notifies the customer.

Practical Choreography-Based Saga With Simplifier Assumption

Invariant 

The main invariant of consistency in our case study is every time we have successful order, if and only if  =>sum(order.amount)<=  customer.credit, and you can not find any path in the system to violate it.

Saga Tends To Be ACD in Some Cases

SAGA-based architectures actually explicitly guarantee Atomicity, Consistency, and Durability, but do not necessarily guarantee Isolation.

For reasons as illustrated in the table below, we have a lack of Isolation in SAGA:

  Lost update one microservice reads the data that can be changed with other microservice:
1- T(i) reads data
2- T(j) changes that data
3-T(i) changes the data = >T(j) will lose the data
Dirty Reads   microservice  one writes the data; another one reads the data; then microservice one compensate the transaction :
1- T(i) writes the data
2-T(j) reads the data
3-T(i) compensates the transaction =>T(j) does not have a valid data

Non - repeatable/fuzzy reads 1- T(i) reads the data
2-T(k) writes the data
3-T(j) reads =>T(i) and T(j) have a different value

However, this is not the end of the game.  There are a variety of countermeasures for reducing the impact of isolation anomalies, such as:

Case Study: Order Management System Implementation with SAGA

In this part, we tend to represent a complete overview and implementation of OrderService. As you can see, we have four microservices: OrderService, PaymentService, RestaurantService, and DeliveryService. Each microservice triggers via the events that come from outside of it, so we have the following steps:

  1.  ORDER_CREATE  events come from UI; order created by OrderService
  2. PaymentService starts to do payment after receiving ORDER_CREATED event.
  3. If payment was successful, ORDER_PAIED event sent to RestaurantService. Otherwise, FAIL event is sent to ORDER_SERVICE to compensate the transaction.
  4. RestaurantService, which is triggered by the ORDER_PAID event, tells the restaurant to prepare the order. If preparation is successful, the ORDER_PREPARED event is sent to DeliveryService to deliver the order; otherwise, it tells the OrderService and PaymentService to compensate the transaction.

Order Management System Implementation with SAGA

My practical implementation is based on Spring Boot and Spring Cloud frameworks, as well as Apache Kafka event handler. Every microservice exposed two APIs: main transaction and compensation transaction.

There are four Kafka topics: Pending_Orders, Success_Payment_Orders, Failed_Payment_Orders, and Deliverable_Orders.  For simplifying implementation, I considered just one compensation scenario, and that scenario is when payment is failed. I suppose that all scenarios after payment will definitely be executed.

  1. OrderService puts new orders in Pending_Orders and listens to Failed_Payment_Orders to notify the failed payments.
  2. PaymentService listens to Pending_Orders to notify from new orders. If payment is successful, it puts the order in Success_Payment_Orders, and in the case of failed payment puts in Failed_Payment_Orders.
  3. RestaurantService listens to Success_Payment_Orders and puts the result in Deliverable_Orders.
  4. DeliveryService emits the order from Deliverable_Orders and the payment is successfully done.

All microservices involved in the case study has the following features:

  1. There are four base model entities that all ecosystem based on this model: Customer, Item, Order, and Payment Status (topics that contain the KAFKA topics static name). For simplicity and cleanness of code, there is no setter and getter in code and we have just used lombok.
  2. Spring Cloud, Spring Boot, and Kafka dependencies that have been added to Pom of all microservices.
  3. KafkaTemplate<String,Object> has been used in projects to put events in Kafka.
  4. @KaflkaListener annotation used in the method to listen to the specific topic.

Domain Model

Java
 
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Customer {
    private String name;
    private  String cardNo;
}


Java
 
public enum Item {
    CHICKEN,MASHED_POTATO,FRIED_CHICKEN,BURGERS,SPAGHETTI;
}


Java
 
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    private List<Item> items;
    private PaymentState paymentStat;
    private  Double amount;
    private Customer customer;
}
Java
 
public enum PaymentState {
     SUCCESS,FAIL;
}
Java
 
public interface Topics {
     String FAILED_PAYMENT="failed_payment_orders";
     String DELIVARABLE_ORDER="delivarable_orders";
     String DELIVARABLE_ORDERS="delivarable_orders";
     String PENDING_ORDERS="pending_orders";
     String SUCCESS_PAYMENT_ORDERS="success_payment_orders";

}

Order Service

In this microservice, we have as the first class order service microservice, which receives the order from the outside and puts in the appropriate topic:

Java
 
@Service
public class OrderService {
    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;

   public Order registerOrder(Order order){
        ListenableFuture<SendResult<String, Order>> future =
                kafkaTemplate.send(Topics.PENDING_ORDERS, order);
        return  order;
    }

}

The second class is the listener class, which listens to Failed_Payment_Orders for failed payment, reverses the order, and notifies the customer.

@Configuration
public class PaymentListener {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;



    @KafkaListener(topics = Topics.FAILED_PAYMENT, groupId = "order" ,containerFactory = "kafkaListenerContainerFactory" )
    public void listenToFailedPayments( Order order) {
        System.out.println("we have failed ordered and going rollback the order : " + order);
        kafkaTemplate.send(Topics.DELIVARABLE_ORDER, order);

    }
    @Bean
    public Map<String, Object> consumerConfigs() {
        JsonDeserializer<HeaderEnricher.Container> deserializer = new JsonDeserializer<>(HeaderEnricher.Container.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        props.put(JsonDeserializer.TRUSTED_PACKAGES, Order.class.getPackage().getName());
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Order.class);
        props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false");

        return props;
    }

    @Bean
    public ConsumerFactory<String, Order> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Order>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Order> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Finally, we have a rest class, which used to test our code with the following code:

Java
 
@RestController
@RequestMapping(value = "/order")

public class OrderController {

    @Autowired
    OrderService orderService;

    @PostMapping(value = "/register")
    public void registerOrder(){
        Customer cr=new Customer("reza","1111-2222-3333-4444");
        Customer ct=new Customer("test","5555-6666-7777-8888");

        List<Item> items=new ArrayList<Item>();

        items= Arrays.asList(Item.BURGERS,Item.CHICKEN,Item.SPAGHETTI);

        Order successOrder=new Order(items, PaymentState.SUCCESS,2000.0,cr);
        Order failOrder=new Order(items, PaymentState.SUCCESS,100.0,ct);

          orderService.registerOrder(successOrder);
         orderService.registerOrder(failOrder);

    }

}

Payment Service 

This microservice just has one listener class that listens to Pending_Order and makes the right decision based on the result of payment.

Java
 
@Configuration
public class OrderListener {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;


    @KafkaListener(topics = Topics.PENDING_ORDERS, groupId = "order" ,containerFactory = "kafkaListenerContainerFactory" )
    public void listenToFailedPayments(Order order) {
        if(order.getAmount()>1000D) {
            order.setPaymentStat(PaymentState.SUCCESS);
            kafkaTemplate.send(Topics.SUCCESS_PAYMENT_ORDERS, order);

        }
        else{
            order.setPaymentStat(PaymentState.FAIL);
            kafkaTemplate.send(Topics.FAILED_PAYMENT, order);
        }


    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        JsonDeserializer<HeaderEnricher.Container> deserializer = new JsonDeserializer<>(HeaderEnricher.Container.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        props.put(JsonDeserializer.TRUSTED_PACKAGES, Order.class.getPackage().getName());
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Order.class);
        props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false");

        return props;
    }

    @Bean
    public ConsumerFactory<String, Order> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Order>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Order> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Other Services 

There are two more microservices that you can find for all ecosystems projects. Refer to my GitHub.

Finally, you can see the sample execution result in the failed payment:

Plain Text
 
2021-09-12 19:34:02.052  INFO 11956 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-order-1, groupId=order] Discovered group coordinator 192.168.7.61:9092 (id: 2147483646 rack: null)
2021-09-12 19:34:02.052  INFO 11956 --- [-thread | order] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-order-1, groupId=order] Discovered group coordinator 192.168.7.61:9092 (id: 2147483646 rack: null)
we have failed ordered and going rollback the order : Order(items=[BURGERS, CHICKEN, SPAGHETTI], paymentStat=FAIL, amount=100.0, customer=Customer(name=test, cardNo=5555-6666-7777-8888))


Java
 
2021-09-12 19:34:02.052  INFO 8644 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-order-1, groupId=order] Discovered group coordinator 192.168.7.61:9092 (id: 2147483646 rack: null)
2021-09-12 19:34:02.052  INFO 8644 --- [-thread | order] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-order-1, groupId=order] Discovered group coordinator 192.168.7.61:9092 (id: 2147483646 rack: null)
send notification to kitchen and after food is ready for order  : Order(items=[BURGERS, CHICKEN, SPAGHETTI], paymentStat=SUCCESS, amount=2000.0, customer=Customer(name=reza, cardNo=1111-2222-3333-4444)) put it in deliverable orders


 

 

 

 

Top