Functional event sourcing with cats-effect

Achieve maximal expressivity and domain abstraction while retaining actor protocol precision using the endless4s Scala library

Jonas Chapuis
8 min readSep 3, 2022

Code describing business logic is undoubtedly the most valuable asset in a software system. Also called domain code among domain-driven design practitioners, it captures expertise and value proposition and accumulates a wealth of knowledge over time. While such deep models take time to mature, software trends and technologies change rapidly and even undergo licensing changes, as was just recently the case with Akka. At the same time, as the language, techniques, and frameworks age, the software value grows as the customer base expands. I hear that you can still earn a living as a COBOL programmer!

Algebras

There is a language that has stood the test of time: that of mathematics, whose universal abstract symbolic notation hasn’t aged over the centuries. Ideally, I’d say our programs should aim for the same!

FP and techniques such as tagless-final strive for this ideal. Defining logic with so-called algebras and DSLs attempts to make domain expression as lean as possible and decouple it from the infrastructure. Separating concerns make it easier to reason about the system and evolve it. It affords precise test coverage too, and when operating a complex SaaS platform, securing areas of confidence is essential.

Actors

It seems like backend systems fall into a few categories: traditional CRUD systems, running behind a load balancer, delegating state and concurrency to the database; streaming services that got popular with event-driven architectures and brokers such as Kafka, and then there are actors. This somewhat strange and underrated computation model fueled WhatsApp’s success and had achieved nine 9’s availability in telephone switches already back in the 90s. Actors offer natural semantics for distributed systems of millions of small state machines organically phasing in and out of memory according to demand. It’s also an ideal model for reliably implementing active processes at scale (involving scheduling, e.g., powering on a machine at a particular time).

Best of both worlds

In Scala, Akka is, without a doubt, the ruling actor framework. However, its API isn’t designed to integrate with monadic effect systems directly: it’s usable from Java OO code (and even for .NET, thanks to the community). As a result, on the surface, it seems you go for cats-effect or ZIO and forgo the Lightbend’s stack, or you stick with Akka and Futures.

This article will try to show that if you are convinced by both algebraic thinking and the power of the actor model to describe stateful and active entities, you do not have to make a choice: it turns out they work well together! In addition, you get a reasonable degree of shielding from future evolutions of the underlying actor framework, which allows for switching implementations entirely if the need arises.

“Functional description of event sourced entities shields domain code from the actor framework, allowing for switching implementations completely if the need arises.”

State reader, events writer

A sharded actor is called an entity — conveniently mapping one to one with the domain-driven concept, where it’s an object defined by its identity. The distributed entity actor receives a command, upon which it usually performs some validation, possibly persisting some events, and formulates a reply. The entity state transitions according to persisted events.

A monad combination fits this pattern pretty well: a reader and a writer. As others have also proposed, we can use it to describe a chain of computations with the capacity to write events, read the entity state, and finally return a reply:

An illustrative monadic chain involving event sourcing using a reader-writer: F[_] is the higher kinded effect wrapper type, A stands for the command type, write allows producing events, read leads to intermediate folding of events over the initial state, and B represents command reply. A typical chain would naturally include some computation steps, such as validation.

If this looks abstract, don’t worry: with the example below, you’ll see that these notions hide behind simpler constructs.

Example: ride booking system

Let’s suppose we are creating a distributed system for booking rides. Here’s how we represent a booking:

The simplistic domain model for a passenger transportation booking

Let’s also imagine that the booking has an associated vehicle route. One could represent such interactions with the following tagless algebra:

Simplistic algebra for transportation bookings: place creates an entity, getBooking returns its definition if the entity exists, and setRoute & getRoute allow for indicating geographical data. With these definitions, booking a ride into the system spells as repository.bookingFor(bookingID).place(booking) and notifying that the route is computed becomes repository.bookingFor(bookingID).setRoute(steps)

This repository trait represents the entity cluster and can be composed into other expressions describing the overall booking process, route computation, etc. The critical insight here is that, at this level, the code doesn’t rely on knowledge of actors, commands, and sharding. On the contrary, abstracting these notions away affords leaner expression and simpler testing.

📌 Note: thanks to the BookingRepositoryAlg abstraction, the BookingAlg trait does not need to include the ID as a parameter to all the methods.

Implementation

That’s all fine and dandy, but how do we describe the behavior, and where are the events? That’s where the reader-writer comes into the picture.

Similar to cats MTL’s Ask and Tell, reader-writer abilities are exposed via the Entity typeclass: trait Entity[F[_], S, E] extends StateReader[F, S] with EventWriter[F, E] with Monad[F]:

  • StateReader allows for read: F[S] (and other helper methods, see below)
  • EventWriter provides the ability to persist events:write(events: E*): F[Unit]

But enough with the complicated jargon:

Example of simple behavior implementation for booking entity. The Entity instance is injected directly into the constructor (we’ll cover that further below). write allows persisting events. ifKnown, ifUnknown helper methods relying on the reader make it easier to describe differentiated behavior for these cases. Any combination of such calls, including writes interspersed with reads, can be chained for more complex logic: read always provides a consistent view of the state by folding the events behind the scenes.

Event folding

Application of an event on the entity state (a.k.a. folding the events over the state) is a pure function of the shape((Option[S], E) => String \/ Option[S]). In plain English, it is a tupled function of a possibly defined state (before the first event, it’s still empty) and the event, leading to either a new version of the state or an error. For our simple booking event set, this is:

With algebra, behavior, and event applier defined, we have all the domain-level definitions we require (typically, these definitions would live in a module with limited dependencies within a codebase that strictly separates business logic from infrastructure).

Testing

Testing this code can be done synchronously with cats.ID, or usingIO testing toolkits, such as the one provided for munit:

Example of a munit suite making use of scalacheck for entity behavior. Note here the usage of the EntityT monad transformer provided by endless, which interprets programs making use of Entity into the target monad (IO in this case). In addition, EntityT requires an implicit definition of the event folding function eventApplier (to be able to fold the state in case of interspersed reads and writes).

Repository implementation

The BookingRepositoryAlg implementation is trivial:

Similar to theBookingEntity implementation relying on theEntitytypeclass, a Repositoryinstance is passed in the constructor: this instance provides precisely the ability to obtain a handle of an entity for a specific ID, so we delegate to it.

Wire protocol

We’ve mentioned above how these abstractions afford decoupling domain logic from actors and messages. At the infrastructure level, these aspects of “wire protocol” are defined with two specialized implementations of theBookings trait, one for the “client” side (the sender of the command) and one for the “server” (the actor receiving the message).

The twist is to use the effect type Fto specify the command and reply types, together with encoders and decoders.

Outgoing command

By providing implementations of each entity algebra methods of the type OutgoingCommand[*] we can define a binary encoding for the parameters and also specify the decoder required to parse the reply:

Definition for effect type representing an outgoing command, used for the “client” protocol implementation

Incoming command

On the receiving side, we need to decode the command, run the command logic and encode the return value into the reply:

Definition for effect type representing an incoming command, used for the “server” protocol implementation. runWith is in charge of decoding the actor message and invoking the corresponding behavior.

Client and server implementations are provided with a single class extending trait CommandProtocol[Alg[_[_]]. In our example, we use the helper subclass ProtobufCommandProtocolwhich provides helpers for protobuf protocols (there are also built-in helpers for JSON). The resulting code is relatively dense but low-complexity; it’s just about mapping domain types to serialized message representations and acting as a switchboard for incoming commands:

Definition of command protocol for Booking entity: the class extends CommandProtocol, which defines both “server” and “client.” On the client side, the implementation provides OutgoingCommand, which converts the parameters into the protobuf message and establishes a mapper to decode the reply. On the server side, it’s about decoding the incoming protobuf message and invoking the corresponding method of the entity, as well as encoding the reply (note the calls to transformInto, we indeed generally make use of chimney for data mapping)

An essential advantage of separating concerns is that protocol versioning has no impact on the domain, shielding it from obsolete types and mapping boilerplate. In the example above, you might have noticed that we even adopted a convention to encode the version number in protobuf message names explicitly.

Command “routing”

A natural transformation represents the delivery of a command to the target entity: OutgoingCommand[*] ~> F :

CommandRouter provides a natural transformation for an entity ID that can map the entity algebra interpreted by CommandProtocol.client into a context OutgoingCommand[*] back to F. This represents the "routing" of a command to its target actor, obtaining a reply, decoding and delivering the reply to the caller.
📌 Note 1: In order to support this natural transformation, a cats-tagless FunctorK instance must be provided for the entity algebra, but this can be automated with cats-tagless derivation macros: cats.tagless.derive.functorK[BookingAlg]📌 Note 2: the command router is used when interpreting monadic values resulting from calls to the repository algebra (which are internally interpreted by the RepositoryT monad transformer).
Sequence diagram illustrating the various pieces at play when retrieving an instance of BookingAlg[F]

Built-in Akka runtime

Endless provides an endless-runtime-akka module, a plug & play runtime for Akka. However, nothing prevents you from developing a custom runtime.

Once required trait implementations are defined, deploying an entity with the Akka runtime boils down to a single call to deployEntity. This call brings everything together and delivers a cats effect Resource with the repository instance running over Akka Cluster Sharding:

Example call to deployEntity for our Booking entity (with an indication of return type). The call is parametrized with all related types and accepts constructors for the various implementations. The resulting resource provides a tuple with the repository instance interpreted into the target monad and the region actor ref resulting from the internal call to ClusterSharding.init in case further interactions are required.
📌 Note 1: in order to bridge Akka’s implicit asynchronicity with the side-effect free context F used for algebras, the command router and deployEntity require Async from F. This makes it possible to use the Dispatcher mechanism for running the command handling monadic chain synchronously from within the actor thread.📌 Note 2: Akka's ask provides the implementation for the command router trait. Internally, the endless Akka runtime transmits commands using a “carrier” protobuf message, embedding target ID, binary payload, and the serialized sender actor ref for receiving the reply.
A detailed breakdown of interactions on the “client” side of a call to place(booking) with the Akka runtime, culminating in transmitting the command and receiving the reply.
What happens on the “server” side, culminating in using Akka’s Cluster Sharding effect DSL.

Effector

We haven’t yet mentioned how to deal with side effects: this is covered by a flexible Effectortypeclass: trait Effector[F[_], S] extends StateReader[F, S] with Passivator[F] with Self[F]. Let’s look at each of these abilities in turn:

  • StateReaderallows reading the updated entity state after event persistence or recovery.
  • Passivator affords fine-grained control over passivation. In certain domains, entities can evolve into “dormant” forms (e.g., after a BookingCancelled event) for which it is beneficial to trigger passivation immediately or after some delay, enabling proactive optimization of cluster resources.
  • Self exposes the algebra of the entity within the effector context, making it possible to define asynchronous processes involving interaction with the same entity, typically to define entities acting as process managers (see the documentation for more detail).
📌 Note: monadic values making use of Effector are interpreted with the EffectorT monad transformer, which provides cats effect instances all the way down to Async, so that time-based operators such as sleep, timeoutTo can be used.

End-less

We decided to finally-tag 😃 this library endless to refer to the ever-flowing stream of events capturing state evolution (a nod to tag-less). In our experience, event sourcing and actors are potent tools to represent distributed state machines at scale. Adopting an algebraic style to describe entities allows smooth integration with functional domain logic while retaining independence from the actor framework.

📌 Credits: thanks to endless4s contributors, Shannon Alexandrine for the great logo, the Aecor framework for inspiration, Lightbend and contributors for Akka, and my colleagues at ZF for their support and review of this article.   

--

--

Jonas Chapuis
Jonas Chapuis

Written by Jonas Chapuis

Principal Software Engineer at ZF

No responses yet