Functional event sourcing with cats-effect

Achieve maximal expressivity and domain abstraction while retaining actor protocol precision using the endless4s Scala library

Algebras

Actors

Best of both worlds

State reader, events writer

An illustrative monadic chain involving event sourcing using a reader-writer: F[_] is the higher kinded effect wrapper type, A stands for the command type, write allows producing events, read leads to intermediate folding of events over the initial state, and B represents command reply. A typical chain would naturally include some computation steps, such as validation.

Example: ride booking system

The simplistic domain model for a passenger transportation booking
Simplistic algebra for transportation bookings: place creates an entity, getBooking returns its definition if the entity exists, and setRoute & getRoute allow for indicating geographical data. With these definitions, booking a ride into the system spells as repository.bookingFor(bookingID).place(booking) and notifying that the route is computed becomes repository.bookingFor(bookingID).setRoute(steps)
📌 Note: thanks to the BookingRepositoryAlg abstraction, the BookingAlg trait does not need to include the ID as a parameter to all the methods.

Implementation

Example of simple behavior implementation for booking entity. The Entity instance is injected directly into the constructor (we’ll cover that further below). write allows persisting events. ifKnown, ifUnknown helper methods relying on the reader make it easier to describe differentiated behavior for these cases. Any combination of such calls, including writes interspersed with reads, can be chained for more complex logic: read always provides a consistent view of the state by folding the events behind the scenes.

Event folding

Testing

Example of a munit suite making use of scalacheck for entity behavior. Note here the usage of the EntityT monad transformer provided by endless, which interprets programs making use of Entity into the target monad (IO in this case). In addition, EntityT requires an implicit definition of the event folding function eventApplier (to be able to fold the state in case of interspersed reads and writes).

Repository implementation

Wire protocol

Outgoing command

Definition for effect type representing an outgoing command, used for the “client” protocol implementation

Incoming command

Definition for effect type representing an incoming command, used for the “server” protocol implementation. runWith is in charge of decoding the actor message and invoking the corresponding behavior.
Definition of command protocol for Booking entity: the class extends CommandProtocol, which defines both “server” and “client.” On the client side, the implementation provides OutgoingCommand, which converts the parameters into the protobuf message and establishes a mapper to decode the reply. On the server side, it’s about decoding the incoming protobuf message and invoking the corresponding method of the entity, as well as encoding the reply (note the calls to transformInto, we indeed generally make use of chimney for data mapping)

Command “routing”

CommandRouter provides a natural transformation for an entity ID that can map the entity algebra interpreted by CommandProtocol.client into a context OutgoingCommand[*] back to F. This represents the "routing" of a command to its target actor, obtaining a reply, decoding and delivering the reply to the caller.
📌 Note 1: In order to support this natural transformation, a cats-tagless FunctorK instance must be provided for the entity algebra, but this can be automated with cats-tagless derivation macros: cats.tagless.derive.functorK[BookingAlg]📌 Note 2: the command router is used when interpreting monadic values resulting from calls to the repository algebra (which are internally interpreted by the RepositoryT monad transformer).
Sequence diagram illustrating the various pieces at play when retrieving an instance of BookingAlg[F]

Built-in Akka runtime

Example call to deployEntity for our Booking entity (with an indication of return type). The call is parametrized with all related types and accepts constructors for the various implementations. The resulting resource provides a tuple with the repository instance interpreted into the target monad and the region actor ref resulting from the internal call to ClusterSharding.init in case further interactions are required.
📌 Note 1: in order to bridge Akka’s implicit asynchronicity with the side-effect free context F used for algebras, the command router and deployEntity require Async from F. This makes it possible to use the Dispatcher mechanism for running the command handling monadic chain synchronously from within the actor thread.📌 Note 2: Akka's ask provides the implementation for the command router trait. Internally, the endless Akka runtime transmits commands using a “carrier” protobuf message, embedding target ID, binary payload, and the serialized sender actor ref for receiving the reply.
A detailed breakdown of interactions on the “client” side of a call to place(booking) with the Akka runtime, culminating in transmitting the command and receiving the reply.
What happens on the “server” side, culminating in using Akka’s Cluster Sharding effect DSL.

Effector

📌 Note: monadic values making use of Effector are interpreted with the EffectorT monad transformer, which provides cats effect instances all the way down to Async, so that time-based operators such as sleep, timeoutTo can be used.

End-less

📌 Credits: thanks to endless4s contributors, Shannon Alexandrine for the great logo, the Aecor framework for inspiration, Lightbend and contributors for Akka, and my colleagues at ZF for their support and review of this article.   

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store