Distributed Systems: Asynchrony, Event Sourcing, and CQRS

Wahome
13 min readApr 26, 2023

The earliest example of a distributed architecture happened in the 1970s when ethernet was invented and Local Area Networks (LAN) were created. For the first time computers would be able to send messages to other systems with a local IP address. Peer-to-peer networks evolved, e-mail came along, and then the Internet which continues to be the biggest, ever growing example of a distributed system. As the Internet changed from IPv4 to IPv6, distributed systems have evolved from “LAN” based to “Internet” based.

A distributed architecture is a group of deployment units connected through remote access protocols.

An application that is decomposed into loosely coupled, independent and autonomous services that collaborate and communicate with each other is a distributed system. This distinctive approach to system architecture that structures a large, complex application as a suite of single-function component services is called the microservice architecture. This architectural style draws parallels to the human body which is a combination of different independent systems that specialize in discrete functions yet cohesively operate together as one body.

Consistency across services

When a monolithic system is decomposed into self-encapsulated services, it can break transactions resulting in a local transaction in the monolithic system being distributed into multiple services that will be called in a sequence. In the last installment about transactions, atomic commitment, and sagas¹, we explored the example of placing an order in an e-commerce system to illustrate how a distributed transaction — a transaction that affects several resources — arises. With the microservices architecture, there are two key problems with respect to distributed transaction management: how to maintain a transaction’s atomicity and how to manage the transaction isolation level for concurrent requests.

In a distributed system, completeness and consistency are perpetually in conflict.

For a distributed transaction to commit, all participants must guarantee that any change to data will be permanent and these changes must persist despite system crashes or other unforeseen events. This ensures that the system is always left in a consistent state. ACID is most commonly associated with transactions on a single database server, but distributed transactions extend that guarantee across multiple databases. The execution of a distributed transaction requires coordination between a global transaction management system and all the local resource managers of all the involved systems.

Asynchrony

image from https://www.thrillist.com/drink/nation/starbucks-spelling-tumblr-23-hilariously-misspelled-names-on-starbucks-coffee-cups

Consider the example of ordering a cup of coffee from your local coffee shop and how orders are processed. The coffee shop, like most other businesses, is primarily interested in maximizing throughput of orders; more orders equals more revenue. As a result, they would naturally use asynchronous processing. When you place your order, the cashier marks a coffee cup with your order and places it into the queue. This queue decouples the cashier and the barista and allows the former to keep taking orders even if the latter is backed up for a moment. When the store gets busy, the coffee shop can deploy multiple baristas in a competing consumer pattern⁴.

By taking advantage of an asynchronous processing approach, your local coffee shop also has to deal with some challenges that asynchrony inherently brings. Drink orders are not necessarily completed in the order they were placed which can happen due to several reasons: various baristas may be processing orders using different equipment, blended drinks may take longer than a drip coffee, baristas may make multiple drinks in one batch to optimize processing time. As a result, the coffee shop has a correlation problem. Drinks are delivered out of sequence and need to be matched up to the correct customer. To solve this problem, the coffee shop can deploy a pattern that is similarly used in messaging architectures; use of a correlation identifier. They can add an explicit correlation identifier by writing your name on the cup and calling it out when the drink is ready — the atrocious spelling notwithstanding.

In messaging architectures, a correlation identifier is a unique identifier that indicates which request message the reply is for.

What would the coffee shop do if you were to refuse to pay for your order or they cannot deliver it? If the order has already been prepared and there is nothing wrong with it, they will simply toss it otherwise they will pull your cup from the queue. If the reason for your refusal to pay is that the order is incorrect or non-satisfactory, they will remake it to your liking. If the coffee machine breaks down and they cannot complete your order they will refund your money. Each of these scenarios describes a different, but common failure handling strategy which is different than a two-phase commit where separate prepare and execute steps are relied upon:

  • Write-off: a failure handling strategy that is based on doing nothing or discarding what has already been done. This strategy is acceptable if the effect of discarding an operation is small or tolerable. It is the simplest of all.
  • Retry: a common pattern for recovering from transient errors by transparently retrying a failed operation. This can improve the stability of the system. There are two factors that determine whether or not a request is safe to retry: the response received for the operation and its idempotency. For example, if a business rule is violated it is unlikely that a retry will succeed. However, if an external system is not available a retry might well be successful. Idempotence is the property of certain operations in mathematics and computer science that allows them to be applied multiple times without changing the result beyond the initial application.
  • Compensating action: a failure handling strategy that tries to transition the system from an intermediate state to a state that is equivalent to the initial state by nullifying an operation. The saga pattern uses compensating transactions to rollback the effect of the sub transactions and achieve backward recovery.

Choreography vs Orchestration

The coffee shop interaction is also a good example of a simple, but common pattern that Gregor Hohpe refers to as the conversation pattern. The interaction between a customer and the coffee shop consists of a short synchronous phase where the customer places their order and pays for it and a longer, asynchronous phase where the customer awaits as their order is prepared.

In this interaction, a two-phase commit is equivalent to waiting at the cashier with the receipt until the order is ready while the money is still on the table. Once the order is ready, the money, the receipt, and the cup of coffee change hands in one fell swoop. Neither the cashier nor the customer would be able to leave until the transaction is completed which makes for a blocking operation. A two-phase commit, while a lot simpler, can hurt the free flow of messages — and therefore scalability — because it has to maintain stateful transaction resources across the flow of multiple, asynchronous actions.

Communication is the means to coordinate independent executions and should be favoured as a collaboration mechanism over shared state.

To alleviate these problems, Garcia-Molina and Salem in their 1987 paper⁷, proposed the notion of a saga as a way to ensure data consistency in a distributed architecture without having a single ACID transaction. There are two common saga implementation approaches, orchestration and choreography. Orchestration is a way to coordinate sagas where a centralized controller tells the saga participants what local transactions to execute. The orchestrator executes saga requests, stores and interprets the states of each task, and handles failure recovery with compensating transactions. Choreography is a way to coordinate sagas where participants exchange events without a centralized point of control. With choreography, each local transaction publishes domain events that trigger local transactions in other services.

An Event

image from https://www.eventstore.com/event-sourcing

Service interaction is considered a first-class citizen in various industry standards. The importance of interaction increases as monolithic systems are broken down into smaller services, becoming more service oriented and distributed. Inter-service interactions can be defined into three types: queries, commands and events.

An event is a message to numerous independent endpoints sent when an action has been carried out. Unlike a command, an event cannot be “rejected” by the receiving service. It represents something that has already happened. In the context of the coffee shop, an example of a event is: “An order for a latte macchiato has been placed.”

“…has happened”

The difference between queries, commands, and events lies in their intent. While queries have no special intent and are rather requests for information, commands trigger something which should happen (in the future) and events inform about something which has happened and is already completed (in the past). Infrastructurally, events can be sent using different mechanisms and quality of service guarantees. For example events can be implemented to be reliable, such as delivered at most once and the message order is guaranteed. This is in contrast with best-effort delivery. Events can also be implemented to be consistent, where transactions and durability are important.

Event Sourcing

image from confluent.io

Events can be in process, i.e., within the service, or distributed, thus handled by multiple services. With events, especially between Aggregate Roots (the central concept in Domain Driven Design) within a service or between services, the events are used to propagate state, hence consistency comes into play. Consistency when dealing with events will always be weak meaning that the publisher of an event is not dependent on who processes the event or when it is processed. The processor of an event though, must process every event, hence the term Eventual Consistency.

Event-driven architectures use events to asynchronously trigger and communicate between decoupled services.

Event sourcing involves modelling the state changes made by applications as an immutable sequence or “log” of events. Instead of modifying the state of the application in-place, event sourcing involves storing the event that triggers the state change in an immutable log and modelling the state changes as responses to the events in the log.

In the context of the coffee shop example, ordering a cup of coffee is built with a series of “commands” and “events”. Events are a series of states that come in and are generated by the system. For example, the cashier may generate an OrderPlacedEvent that lets the barista know that a cup of coffee has been paid for. The barista in turn may generate an OrderCompletedEvent that lets the cashier know that a cup of coffee is ready to be served to the customer. Events are immutable. This immutable nature of events can be used to implement a consistent database.

Idempotence is doing the same thing over and over again and expecting the same results.

In order to implement consistency with events, an ACID2 philosophy must be employed. ACID2 implements consistency without using two-phased commit. ACID2 focuses on achieving high throughput by altering the following characteristics in its interactions:

  • A — Associative: the ordering of the events should not matter. Thus with this property, the following holds true: a + (b + c) = (a + b) + c.
  • C — Commutative: the ordering of the events should not matter. Thus with this property, the following holds true: a + b = b + a.
  • I — Idempotent: duplicate events must be ignored. Thus with this property, the following holds true: f(f(x)) = f(x).
  • D — Distributed: events will be published in a distributed environment.

The Outbox pattern

image from ITNEXT

A service command typically needs to create, update, and delete aggregates in the database and send messages or events to a message broker or bus. For example, a service that participates in a saga needs to update business entities and send messages or events. Similarly, a service that publishes a domain event must update an aggregate and publish an event. The command must thus atomically update the database and send messages in order to avoid data inconsistencies in the form of zombie records or ghost messages.

Consider a message handler in a service that creates an Order in the business database, and also publishes an OrderCreated event. If a failure occurs during the execution of the message handler, two scenarios may occur, depending on the order of operations:

  1. Zombie record: the message handler creates the Order in the database first, then publishes the OrderCreated event. If a failure occurs between these two operations, the Order is created in the database, but the OrderCreated event is not published. The message handler does not complete, so the message is retried, and both operations are repeated. This results in a duplicate Order in the database, known as a zombie record, which is never announced to the rest of the system.
  2. Ghost message: the message handler publishes the OrderCreated event first, then creates the Order in the database. If a failure occurs between these two operations, the OrderCreated event is published, but the Order is not created in the database. The rest of the system is notified about the creation of the Order, but it does not exist in the database. This causes further errors in other message handlers which expect the Order to exist in the database.

It is not viable to use two-phased commit with a distributed transaction that spans the database and the message broker because most messaging infrastructure, and some data stores, do not support the paradigm. And even if they do, it’s often undesirable to couple the service to both the database and the message broker. But without using two-phase commit, sending an event in the middle of a transaction is not reliable as there is no guarantee that the transaction will commit. Similarly, if a service sends a message after committing the transaction there is no guarantee that it will not crash before sending the message.

The Outbox Pattern is based on Guaranteed Delivery pattern.

A database-like consistency between the messaging operations — both consuming the incoming message, and sending outgoing messages — and the changes to business data is thus needed. The solution is for the service that sends the message to first store the message in the database as part of the transaction that updates the business entities. A separate process then sends the events to the messaging infrastructure. The outbox pattern guarantees that each message is processed once and only once by piggybacking on the local database transaction used to store business data. This turns the event messaging infrastructure’s at-least-once delivery guarantee into an exactly-once processing guarantee. To implement the outbox pattern, the event handling logic is divided into two phases: the message handler phase, and the dispatch phase. Consider the following pseudocode:

// phase 1: the message handler phase
// this phase is wrapped in the local database transaction
void createOrder(...) {
begin transaction(persistence, eventOutbox)
// do some validations

// 1. create and persist the order
order = orderFactory.create(...);
persistence.add(order);

// 2. create and persist the event
orderCreatedEvent = eventFactory.create(...);
eventOutbox.add(orderCreatedEvent)

commit
}
// phase 2: the dispatch phase
void publishEvent(EventMessage event) {
// do some validations and deduplication
...

// messaging is abstracted away in the publisher/forwarder
eventPublisher.publish(event);
}

CQRS

image from confluent.io

Let us revisit the example of ordering coffee from a coffee shop. You have walked into your local coffee shop and placed an order for your morning double shot espresso with the cashier. This is an example of a command, a point-to-point message sent when an action is required of a service. A command can be declined by the receiver as a result of: malformation or structural issues, business rules violation, or exception handling. The main attribute of a command is that when the command gets successfully executed, the system transitions to a new state.

“Please do…”

It has been fifteen minutes since you placed your order and your patience is now running thin. You need to find out what the hold up is. You walk up to the cashier and ask them for a status on your order. This is an example of a query, an on-demand request for information from another service. Unlike commands, queries do not execute any operations and should not contain any business logic. They have no side effects and are completely idempotent, so it doesn’t matter how many times the query gets executed, it will always return the same result, unless the system state changed in the meantime.

“Please tell me about…”

To attend to your query, the cashier needs to establish the state of your order. They can either interrupt the barista and ask them for a status or reconstruct the state of your order based on where the labelled coffee cup they placed in the queue on top of the espresso machine is. Interrupting both the cashier and the barista to handle status queries impacts processing times as both have their attention taken away from taking new orders and completing existing ones respectively. Thus, the coffee shop decides to hire a customer attendant who tracks orders and uses a correlation identifier — the customer’s name written on the cup — to call out ready orders. This describes the working principles of Command Query Responsibility Segregation (CQRS) pattern that is most commonly used with event sourcing.

The event sourcing and CQRS application architecture patterns are related.

The CQRS pattern separates the data mutation, or the command part of a system, from the query part. At its heart is the notion that you can use a different model to update information than the model you use to read information. The pattern can be used to separate updates and queries if they have different requirements for throughput, latency, or consistency. The command or write side is all about the business; it does not care about the queries, different materialized views over the data, optimal storage of the materialized views for performance and so on. On the other hand, the query or read side is all about the read access; its main purpose is making queries fast and efficient.

--

--