Event sourcing + free monads = free sourcing?

I recently wrote about Event Sourcing and Free Monads. The natural next step is combining the two! How would a free monad adjusted for event sourcing look like?

Please bear in mind that the below is just really a draft, not a complete solution. Hence any suggestions for improvements are very welcome! But, let’s review the basic assumptions. Our application emits events of a certain type E. This will typically be a hierarchy of case classes. As we want to be free from any specific interpretation of side-effects, we will use data of type A[_] to describe all actions that can happen in the system (again usually using case classes). Actions include reading and writing the model, sending e-mails, indexing data in search subsystems etc. Every action returns some result; writes typically result in Unit, reads in an Option[SomeData].

The events record what happened in the system and drive all of the “business logic”. All events are stored, however we don’t specify upfront how; the stream of events forms the the primary “source of truth”.

There are two main functions that will interpret the events:

  • model update: as the name suggests, basing on the event, updates the model. For example, for a UserRegistered event, this should write the user to a database, or store it in an in-memory storage. Cannot emit new events, only perform actions.
  • event listeners: run some logic basing on events. For example, for a UserRegistered event, this could trigger an action sending an email. Here we can emit new events, like creating some initial data for a new user (in our example, this will be an api key).

The main idea behind this distinction is that given a list of events, model update can be used to rebuild the model. We can do this multiple times; for example we could store the events in a persistent storage, and the model in-memory. We could then use the model update function to re-create a model on a new node. Or, we can run the model update functions to create a new view of the data.

On the other hand, event listeners should be run only once, and they should perform the main “business logic” for an event. Sending a welcome email should be done only once, even if the event is replayed to re-create the model. Here we can also emit new events, and they will be processed recursively using the two functions.

The top-level entry points are commands, parametrised with user-provided data (e.g. from a web form or REST endpoint), which basing on the model validate the input emit some events and return a value, which is then returned to the user.

To describe programs which can emit events, perform actions and where events can be handled using the model update & event listener functions, we will use a data type ES[E, A, R].

ES[E, A, R] is a description of a program which emits events of type E, contains actions of type A and produces a result of type R.

The three components described above should have the following signatures:

  • command: AnyUserData => ES[E, A, R]
  • model update: PartialFunction[E, ES[Nothing, A, Unit]] (cannot emit events; partial, as we don’t require model updates for every event)
  • event listener: PartialFunction[E, ES[E, A, Unit]]

What is ES?

ES is an extension of the free monad over A, which in addition to the normal constructors (FlatMap, Pure and Suspend – used for actions), contains an additional Emit constructor (its meaning shouldn’t be a surprise). Very similarly to the free monad, we can define a method to interpret ES using any monad, given an interpretation for actions and events (foldMap function).

Here’s the basic structure of ES:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
sealed trait ES[E, A[_], R] {
    def flatMap[R2](f: R => ES[E, A, R2]): ES[E, A, R2] = FlatMap(this, f)
    def map[R2](f: R => R2): ES[E, A, R2] = FlatMap(this, f andThen (x => Pure(x)))
    ...
}
 
case class Pure[E, A[_], R](r: R) extends ES[E, A, R] {}
case class Emit[E, A[_]](e: E) extends ES[E, A, Unit] {}
case class Suspend[E, A[_], R](a: A[R]) extends ES[E, A, R] {}
case class FlatMap[E, A[_], R1, R2](
   c: ES[E, A, R1], 
   f: R1 => ES[E, A, R2]) extends ES[E, A, R2] {}
 
object ES {
    implicit def esMonad[E, A[_]]: Monad[({type x[X] = ES[E, A, X]})#x] =
       new Monad[({type x[X] = ES[E, A, X]})#x] {
          override def pure[X](x: X) = Pure(x)
          override def flatMap[X, Y](fa: ES[E, A, X])(f: X => ES[E, A, Y]) = 
             fa.flatMap(f)
       }
 
    def pure[E, A[_], R](r: R): ES[E, A, R] = Pure(r)
    def done[E, A[_]]: ES[E, A, Unit] = pure(())
    def emit[E, A[_]](e: E): ES[E, A, Unit] = Emit(e)
    def suspend[E, A[_], R](a: A[R]): ES[E, A, R] = Suspend(a)
  }

Event-sourcing ES specifics

In addition to normal monad operations, we need two operations on the ES data type. Firstly, we need a way to provide the model update & event listener functions, which should be applied to the description of the program. As a result, we want to get a program where all of the events are handled, and which contains appropriate actions (for each event, first the actions described by the model update, and then the actions described by the event listener).

Hence ES contains a function:

1
2
3
4
5
sealed trait ES[E, A[_], R] {
   def handleEvents(
      modelUpdate: PartialFunction[E, ES[Nothing, A, Unit]],
      eventListener: PartialFunction[E, ES[E, A, Unit]]): ES[Handled[E], A, R]
}

Note that in the return type, events are wrapped in an case class Handled[E](e: E) wrapper. That’s to make sure that we can’t simply call handleEvents twice and re-interpret the events. And we need to retain the events in the description of the program to be able to store them when we do the final interpretation.

Which brings us to the interpretation function. Given a description of a program with the events handled as described by our model update/event listener functions, we want to interpret it in any monad, hence we get the function:

1
2
3
4
implicit class ESHandled[E, A[_], R](es: ES[Handled[E], A, R]) {
   def run[M[_]](ai: A ~> M, storeEvent: E => M[Unit])(
      implicit M: Monad[M]): M[R] = ???
}

To do the interpretation, we need both an interpretation of the actions (the A ~> M natural transformation) and a way to store the events. Given a program ES[E, A, R] and a monad M, we get back the result: M[R] with all events handled, stored and actions interpreted.

Example usage

How would a simple usage example look like? We will describe a program where users can register using a unique email, and for each new user an api key is created. Here’s the data, actions and events that we will use:

1
2
3
4
5
6
7
8
9
10
11
12
13
case class User(id: Long, email: String, password: String)
case class ApiKey(userId: Long, key: String)
 
sealed trait Action[R]
case class FindUserByEmail(email: String) extends Action[Option[User]]
case class WriteUser(u: User) extends Action[Unit]
case class FindApiKeyByUserId(userId: Long) extends Action[Option[ApiKey]]
case class WriteApiKey(ak: ApiKey) extends Action[Unit]
case class SendEmail(to: String, body: String) extends Action[Unit]
 
sealed trait Event
case class UserRegistered(u: User) extends Event
case class ApiKeyCreated(ak: ApiKey) extends Event

The entry point will be a command to register users:

1
2
3
4
5
6
7
8
9
10
11
12
def registerUserCommand(
   email: String, password: String): ES[Event, Action, Either[String, Unit]] = {
 
   action(FindUserByEmail(email)).flatMap {
      case None =>
         emit(UserRegistered(User(new Random().nextInt(), email, password)))
            .map(_ => Right(()))
 
      case Some(user) =>
         pure(Left("User with the given email already exists"))
   }
}

Note that we are using concrete types (Event, Action) as the type parameters for ES. The result of the command can be either an error message (represented by the left side of the either), or success (Right(())). In the command we perform actions (looking user up by email) and emit events (user registered) if validation succeeds.

We also need the model update/event listener functions to intepret the events:

1
2
3
4
5
6
7
8
9
10
11
val modelUpdate: PartialFunction[Event, ES[Nothing, Action, Unit]] = {
   case UserRegistered(u) => action(WriteUser(u))
   case ApiKeyCreated(ak) => action(WriteApiKey(ak))
}
 
val eventListeners: PartialFunction[Event, ES[Event, Action, Unit]] = {
   case UserRegistered(u) => for {
      _ <- emit(ApiKeyCreated(ApiKey(u.id, UUID.randomUUID().toString)))
      _ <- action(SendEmail(u.email, "Welcome!"))
   } yield ()
}

There are no event listeners for ApiKeyCreated, and for UserRegistered we emit another event and perform an action.

Given user input, we can handle the result of the command and interpret it in the Id monad:

1
2
3
4
5
6
7
8
9
10
11
12
val handledCommand = registerUserCommand("adam@example.org", "1234")
   .handleEvents(modelUpdate, eventListeners)
 
val result: Either[String, Unit] = handledCommand.run[Id](new (Action ~> Id) {
   override def apply[A](fa: Action[A]) = fa match {
      case FindUserByEmail(email) => println(s"Find user by email: $email"); None
      case WriteUser(u) => println(s"Write user $u")
      case FindApiKeyByUserId(id) => println(s"Find api key by user id: $id"); None
      case WriteApiKey(ak) => println(s"Write api key: $ak")
      case SendEmail(to, body) => println(s"Send email to $to, body: $body")
   }
}, e => println("Store event: " + e))

Id is great for testing, in real-life you would interpret the actions using e.g. Future or Task and write the results to a database. When executed, you would see a trail of actions being performed by the program.

Supporting code

The code for ES is mainly an expansion of the free monad interpretation. The full (quite short) source is available in a gist, alongside with the example above. It uses Cats for the monad and natural transformation abstractions.

Summing up

To make this usable in a real-life scenario, a couple of features are missing. First of all, we would need to add support for combining a number of different types of events/actions. Secondly, we would need to provide a convenient way to enrich events, to capture e.g. the timestamp of an event, some wider context (user’s ip/id) etc.

  • The same problem has been approached here: https://github.com/svalaskevicius/eventflow

  • Thanks! Didn’t read this before (here’s the blog: http://www.hyperlambda.com/posts/eventflow-cqrs-es-in-scala/), might be interesting :)

  • Juan José Vázquez Delgado

    The approach is very interesting, but how to apply it in distributed systems while maintaining the guarantees of consistency?

  • I assume that by a distributed system you mean one where you’ve got more than one server? If we are using a RDBMS, then most of the work regarding consistency is done by the database. If you want to use NoSQL, that this defines what kind of consistency you can expect.

  • Juan José Vázquez Delgado

    I mean an eventsourced service that is replicated in more than one node or server. Let’s say an e-commerce service where you need to maintain the consistency in the shopping cart regardless the node. In this kind of scenarios it’s important to deal with the causal consistency, i.e. the order of events matters.

  • Right, well I still think it’s mainly the choice of the storage that matters here, you need one that provides causal consistency and where you can at least partially order events. The general approach described above doesn’t deal with such issues I think