Two-phase consensus with functional Scala
Use the endless-transaction library to achieve consistency in microservices with distributed transactions and a flexible two-phase commit protocol.
Keeping control of state consistency in distributed systems is a challenge as old as splitting a system into multiple running nodes. There is a full spectrum of options to solve this classic problem, with old-school approaches mainly emphasizing strong consistency “despite the odds” and more fashionable approaches embracing the distributed nature of inter-system interactions and eventual consistency.
Consistency flavors
In many cases, it is acceptable and even desirable that the system’s current state reflects the “messy” nature of real-life processes. Let’s take the example of booking a journey involving an airline and a hotel reservation. Both reservations ultimately define the operation’s success, but there can be a period during which independent confirmation of each reservation is pending. A compensation mechanism solves problematic situations, such as canceling the room if the airline reservation fails while the hotel succeeds.
Such distributed processes are also called long-running transactions, long-lived transactions, or sagas (because they tell a story). They can last arbitrarily long but ultimately converge on some final state. The extended lifespan contrasts with more traditional transactions, which must be short to avoid locking up resources.
Terminology is a bit fuzzy, but most people will agree that a distributed transaction is a mechanism that implements ACID properties across multiple systems or nodes (even though, as we have seen above, long-running transactions, which also qualify as “distributed,” typically do not).
Generally, variations of the two-phase commit protocol (2PC) achieve strong consistency. The XA standard, created by the Open Group in 1991, describes in detail the interface between a global transaction manager and a specific application. Various relational databases and message brokers support XA, and JTA can drive XA transactions from Java.
Still, such approaches have limits. Reliance on locking limits overall throughput, and XA is a complex protocol that is sometimes only partially implemented by vendors.
Two-phase consensus
Notably, some form of two-phase consensus is often present in distributed systems, from the strong to the loose consistency ends of the spectrum. 2PC arises naturally from the problem definition: similar exchanges emerge in analog processes, such as setting up a shared event with friends. The protocol's three tenants are prepare, commit, and abort/cancel. A transaction coordinator orchestrates the process and ensures persistence and resilience.
📌 2PC FTW
- Distributed databases such as CockroachDB, MongoDB, and others implement
2PC to atomically store values across partitions.
- Apache Kafka allows producing messages across multiple partitions
atomically with an implementation similar to 2PC.
Prepare
Preparation consists of sending a request to participants with a payload that allows the transaction branch to get prepared and vote for committing or aborting the transaction.
In endless-transaction
this phase takes the form of an expression defined for a query
value. This expression can effect the local branch and involve any asynchronous process. There is no limit to the time it takes for the vote to be delivered to the coordinator (unless a timeout is configured).
Below are some examples of preparation operations in an imaginary journey booking process (the example is somewhat convoluted but affords a description of various use cases).
Commit
The coordinator triggers the commit phase after all branches have voted in favor of the commit. The branch is expected to effect its local context to make the change durable (or at least part of it) to ensure consistency and return a confirmation value.
Note here that “consistency,” in contrast to lower-level locking-based implementations, is freely defined by the domain logic. Some degree of change could be allowed between the preparation and commit phases.
Like for prepare above, this expression can involve any asynchronous process, and the toolkit does not limit the time it takes to deliver the confirmation to the coordinator.
📌 Commit consistency
It's up to the implementer to decide the level of consistency in the execution
of the commit. Transaction failure is also valid and can be signaled by
raising an exception in the target effect. Failure will lead to inconsistency
in the overall system state, which can be an acceptable compromise
in some use cases.
For instance, in our imaginary example, the "in-memory cache" could be
locked only briefly to preserve throughput. Because it's optional to the
journey process, updating it could be skipped in case of delays. On the other
hand, if the "internal database" update still fails despite the lock, the
commit expression could signal this by raising an exception. The exception
would conclude the transaction in a failed state, allowing surfacing in the
UI for manual remediation.
Abort
The coordinator triggers this phase if at least one branch votes for abort. All branches are expected to effect the local context to roll back any changes made during the preparation phase and return a confirmation value. The same flexibility in consistency and lack of timing requirements as for other phases apply here.
📌 Abort consistency
The same flexibility applies here as for the commit operation: it's up to the
implementer to decide the level of consistency in the execution of the abort.
In this dummy example, let's suppose traveler reminders have already been sent:
a compensation action could be to schedule a new notification inviting the
customer to ignore previous messages. On the other hand, failing to cancel the
hotel and flight reservations would be a more serious issue and should be
considered a failed transaction to elicit manual remediation.
State diagram
Protocol state throughout the phases mentioned above is tracked by an event-sourced entity, with events representing state transitions. The transaction state machine diagram is depicted below. As usual, side-effects are invoked after successful event persistence and repeated in case of recovery (at least once delivery characteristics).
“Some form of two-phase consensus is often present in distributed systems, from the strong to the loose consistency ends of the spectrum.”
Abstractions
Two-phase consensus is defined by these dual prepare and commit/abort phases. Theendless-transaction
library captures this pattern in tagless-final abstractions.
Transactor
The entry point is the Transactor
trait that can create distributed transaction coordinators.
Coordinator
An instance of a coordinator allows for creating transactions of a specific type. Internally, it implements the two-phase commit protocol to guarantee a final transaction state of either committed, aborted or failed.
Transaction
The returned transaction instance is the handle to the sharded entity and can be used to inspect its status and trigger a client abort.
Branch
Implementations of the Branch
trait define the behavior in each phase of the 2PC for each participant in the transaction. The branch is responsible for preparing, committing, and aborting the transaction. The coordinator instantiates a branch for each transaction and branch ID. Branch implementations capture most of the domain logic when operating distributed transactions with the library.
Runtime
The two available runtimes are endless-transaction-pekko
and endless-transaction-akka
(we are also working on a native cats-effect implementation). Importing either gives access to a Transactor
that relies on a persistent sharded entity for scalability and resilience.
PekkoTransactor’
creator, for instance, requires a PekkoCluster
(an endless
type) in implicit scope and a configured events journal to do its job — but otherwise, no extra configuration is needed. Internally, events and cluster messages are serialized in protobuf for efficiency.
Example
The library features a fully-fledged example app with a dummy API for managing bank accounts and transferring amounts between accounts. Accounts are implemented with sharded entities and transfers with transactions.
Algebras
The two main definitions of interest in the domain are Accounts
and Account
traits, which represent the repository and account, respectively. The ability to orchestrate transfers between accounts is quite logically exposed in Accounts
as it involves two entities.
Transfers are implemented using an endless-transaction
transaction coordinator.
The coordinator is used in the implementation of the transfer
method in ShardedAccounts
, i.e. the implementation of Accounts
. The snippet below shows the logic: the code creates a transfer with two branches, one for the origin account and the other for the destination account.
An account’s involvement in a transfer is described by TransferBranch
, as exemplified below with the implementation of the prepare
method.
Consistency swiss knife
Operations spanning multiple entities in actor clusters often require coordination to ensure consistency within the cluster. Consistency requirements also usually arise beyond the cluster boundary when distributing state across several services. Even when eventual consistency is acceptable, some cohesion is frequently required, which involves detailing sagas using compensation actions.
Consistency enforcement mechanisms are costly to implement and maintain because they require absolute resilience. endless-transaction
is a toolkit that piggybacks on proven event-sourcing technology to make it easier to develop such mechanisms should the need arise. It allows for describing two-phase consensus with expressive domain-centric algebraic expressions running on a trusty foundation.