Two-phase consensus with functional Scala

Use the endless-transaction library to achieve consistency in microservices with distributed transactions and a flexible two-phase commit protocol.

Jonas Chapuis
6 min readMar 4, 2024

Keeping control of state consistency in distributed systems is a challenge as old as splitting a system into multiple running nodes. There is a full spectrum of options to solve this classic problem, with old-school approaches mainly emphasizing strong consistency “despite the odds” and more fashionable approaches embracing the distributed nature of inter-system interactions and eventual consistency.

Consistency flavors

In many cases, it is acceptable and even desirable that the system’s current state reflects the “messy” nature of real-life processes. Let’s take the example of booking a journey involving an airline and a hotel reservation. Both reservations ultimately define the operation’s success, but there can be a period during which independent confirmation of each reservation is pending. A compensation mechanism solves problematic situations, such as canceling the room if the airline reservation fails while the hotel succeeds.

Such distributed processes are also called long-running transactions, long-lived transactions, or sagas (because they tell a story). They can last arbitrarily long but ultimately converge on some final state. The extended lifespan contrasts with more traditional transactions, which must be short to avoid locking up resources.

Terminology is a bit fuzzy, but most people will agree that a distributed transaction is a mechanism that implements ACID properties across multiple systems or nodes (even though, as we have seen above, long-running transactions, which also qualify as “distributed,” typically do not).

Generally, variations of the two-phase commit protocol (2PC) achieve strong consistency. The XA standard, created by the Open Group in 1991, describes in detail the interface between a global transaction manager and a specific application. Various relational databases and message brokers support XA, and JTA can drive XA transactions from Java.

Still, such approaches have limits. Reliance on locking limits overall throughput, and XA is a complex protocol that is sometimes only partially implemented by vendors.

Two-phase consensus

Notably, some form of two-phase consensus is often present in distributed systems, from the strong to the loose consistency ends of the spectrum. 2PC arises naturally from the problem definition: similar exchanges emerge in analog processes, such as setting up a shared event with friends. The protocol's three tenants are prepare, commit, and abort/cancel. A transaction coordinator orchestrates the process and ensures persistence and resilience.

📌 2PC FTW
- Distributed databases such as CockroachDB, MongoDB, and others implement
2PC to atomically store values across partitions.
- Apache Kafka allows producing messages across multiple partitions
atomically with an implementation similar to 2PC.

Prepare

Preparation consists of sending a request to participants with a payload that allows the transaction branch to get prepared and vote for committing or aborting the transaction.

In endless-transaction this phase takes the form of an expression defined for a query value. This expression can effect the local branch and involve any asynchronous process. There is no limit to the time it takes for the vote to be delivered to the coordinator (unless a timeout is configured).

Below are some examples of preparation operations in an imaginary journey booking process (the example is somewhat convoluted but affords a description of various use cases).

Examples of the preparation phase in the context of a multi-step journey reservation process

Commit

The coordinator triggers the commit phase after all branches have voted in favor of the commit. The branch is expected to effect its local context to make the change durable (or at least part of it) to ensure consistency and return a confirmation value.

Note here that “consistency,” in contrast to lower-level locking-based implementations, is freely defined by the domain logic. Some degree of change could be allowed between the preparation and commit phases.

Like for prepare above, this expression can involve any asynchronous process, and the toolkit does not limit the time it takes to deliver the confirmation to the coordinator.

Examples of the commit phase in the context of a multi-step journey reservation process
📌 Commit consistency
It's up to the implementer to decide the level of consistency in the execution
of the commit. Transaction failure is also valid and can be signaled by
raising an exception in the target effect. Failure will lead to inconsistency
in the overall system state, which can be an acceptable compromise
in some use cases.

For instance, in our imaginary example, the "in-memory cache" could be
locked only briefly to preserve throughput. Because it's optional to the
journey process, updating it could be skipped in case of delays. On the other
hand, if the "internal database" update still fails despite the lock, the
commit expression could signal this by raising an exception. The exception
would conclude the transaction in a failed state, allowing surfacing in the
UI for manual remediation.

Abort

The coordinator triggers this phase if at least one branch votes for abort. All branches are expected to effect the local context to roll back any changes made during the preparation phase and return a confirmation value. The same flexibility in consistency and lack of timing requirements as for other phases apply here.

📌 Abort consistency
The same flexibility applies here as for the commit operation: it's up to the
implementer to decide the level of consistency in the execution of the abort.

In this dummy example, let's suppose traveler reminders have already been sent:
a compensation action could be to schedule a new notification inviting the
customer to ignore previous messages. On the other hand, failing to cancel the
hotel and flight reservations would be a more serious issue and should be
considered a failed transaction to elicit manual remediation.

State diagram

Protocol state throughout the phases mentioned above is tracked by an event-sourced entity, with events representing state transitions. The transaction state machine diagram is depicted below. As usual, side-effects are invoked after successful event persistence and repeated in case of recovery (at least once delivery characteristics).

State diagram for the two-phase protocol as defined by endless-transaction’s persistent entity. Transitions are materialized by events. Side-effects are implemented with cats-effect expressions making use of `Async`.

“Some form of two-phase consensus is often present in distributed systems, from the strong to the loose consistency ends of the spectrum.”

Abstractions

Two-phase consensus is defined by these dual prepare and commit/abort phases. Theendless-transaction library captures this pattern in tagless-final abstractions.

Transactor

The entry point is the Transactor trait that can create distributed transaction coordinators.

The transactor provided by the chosen endless-transaction runtime (endless-transaction-pekko or endless-transaction-akka) gives access to a coordinator for a given transaction type. The type parameters are: TID for the transaction identifier, BID for the branch identifier, Q for query type and R for abort reason type. Parameters are: transactionName, used as the entity name for persistence, branchForID, a function that describes the branch behavior for a given branch ID and an optional timeout for transaction preparation.

Coordinator

An instance of a coordinator allows for creating transactions of a specific type. Internally, it implements the two-phase commit protocol to guarantee a final transaction state of either committed, aborted or failed.

The create method of the coordinator returns a new transaction with the given ID, query, and set of branches wrapped in a resource. Resource release leads to transaction abort if it is still pending (or a no-op if the transaction is already completed).

Transaction

The returned transaction instance is the handle to the sharded entity and can be used to inspect its status and trigger a client abort.

The transaction trait exposes methods for interacting with a running or completed transaction, notably retrieving its status. Status values can be: Preparing (prepares have been issued by the coordinator, now waiting for votes from the participating branches), Committing (all participating branches have voted in favor of transaction commit, branch commits have been issued by the coordinator, which is now waiting for confirmations), Committed (transaction has completed, all participating branches have confirmed the commit), Aborting (at least one participating branch voted against transaction commit, or the client has requested an abort in the meantime), Aborted (transaction has been aborted and all participating branches have confirmed the abort), Failed (transaction has failed due to an exception raised by one of the participating branches)

Branch

Implementations of the Branch trait define the behavior in each phase of the 2PC for each participant in the transaction. The branch is responsible for preparing, committing, and aborting the transaction. The coordinator instantiates a branch for each transaction and branch ID. Branch implementations capture most of the domain logic when operating distributed transactions with the library.

The branch provides an indirection to the actual effectful operations to be performed in phases of the protocol, such as sending messages to other services, updating databases, etc.

Runtime

The two available runtimes are endless-transaction-pekko and endless-transaction-akka(we are also working on a native cats-effect implementation). Importing either gives access to a Transactor that relies on a persistent sharded entity for scalability and resilience.

PekkoTransactor’ creator, for instance, requires a PekkoCluster (an endless type) in implicit scope and a configured events journal to do its job — but otherwise, no extra configuration is needed. Internally, events and cluster messages are serialized in protobuf for efficiency.

Example

The library features a fully-fledged example app with a dummy API for managing bank accounts and transferring amounts between accounts. Accounts are implemented with sharded entities and transfers with transactions.

Algebras

The two main definitions of interest in the domain are Accounts and Account traits, which represent the repository and account, respectively. The ability to orchestrate transfers between accounts is quite logically exposed in Accountsas it involves two entities.

The two main domain definitions in the example app capture possible API interactions. The latter four methods are particularly interesting as the transactional branch logic uses them to carry out the various transfer phases.

Transfers are implemented using an endless-transaction transaction coordinator.

The coordinator is created with TransferID as the transaction identifier type, AccountID as the branch identifier, Transfer as the query payload (the amount, origin, and destination), and TransferFailure as the error coproduct.

The coordinator is used in the implementation of the transfer method in ShardedAccounts, i.e. the implementation of Accounts. The snippet below shows the logic: the code creates a transfer with two branches, one for the origin account and the other for the destination account.

The example implementation of the account transfer logic uses a distributed transaction. Note that the transaction entity is sharded, which implies it can run on a separate node. Therefore, we must regularly check for completion to respond to the transfer API request (the request is synchronous for simplicity). To implement this polling operation, we use the pollForFinalStatus() built-in extension method. This method retrieves the transaction status at configurable intervals and semantically sleeps in between.

An account’s involvement in a transfer is described by TransferBranch, as exemplified below with the implementation of the prepare method.

The “prepare” expression acts as middleware with the account preparation methods. It is invoked on both sides of the transaction: the first step is determining if the money is going out of the account or is incoming. The corresponding preparation methods are called on the account. If enough funds are present, the origin account will transition into a state where no other withdrawals or outgoing transfers are allowed until either commit or abort (note that the recipient account has no such restrictions and allows other operations while the transaction is pending). A vote is returned according to the outcome of account preparation. The “prepare” expression repeats the outgoing transfer attempt if the account is busy with another operation after some retry delay. Exceptions (typically arising due to communication timeouts, e.g., if a node restarts in the cluster) also lead to retries up to maximum attempts. Branch logic is indeed responsible for convergence on a final transaction state.

Consistency swiss knife

Operations spanning multiple entities in actor clusters often require coordination to ensure consistency within the cluster. Consistency requirements also usually arise beyond the cluster boundary when distributing state across several services. Even when eventual consistency is acceptable, some cohesion is frequently required, which involves detailing sagas using compensation actions.

Consistency enforcement mechanisms are costly to implement and maintain because they require absolute resilience. endless-transactionis a toolkit that piggybacks on proven event-sourcing technology to make it easier to develop such mechanisms should the need arise. It allows for describing two-phase consensus with expressive domain-centric algebraic expressions running on a trusty foundation.

--

--

Jonas Chapuis
Jonas Chapuis

Written by Jonas Chapuis

Principal Software Engineer at ZF

No responses yet