Event sourcing with Akka Persistence
Published at 2019-06-07 by Nick ten Veen
Asynchronous pains
In one of our projects at the Port of Rotterdam we do a lot of stream processing where we require intermediate state. We are using Event Sourcing with Akka Persistence. It allows us to create robust stateful streaming applications that can maintain state between application restarts. We were struggling a bit writing our command handlers since we do a bunch of asynchronous operations. Akka Persistence does not allow you to handle command asynchronously which means you need to deal with this yourself. Let us explore the problem in a simplified event sourcing application.
Event Sourcing
Let us create a simple implementation of an event sourcing system. A simple calculator that can add and subtract values. First we define our state, which is simply an integer value:
[block1.scala](https://gist.github.com/besuikerd/f860cbd65e5c4c0cc1f8f22ecfda0b94#file-block1-scala)
case class State(n: Int)
The state can only be modified by firing events. Let us create two possible operations, adding and subtracting from the state:
[block2.scala](https://gist.github.com/besuikerd/08af6b2f0526d6edeae5a7288b6f2f2b#file-block2-scala)
sealed trait Event
case class Added(n: Int) extends Event
case class Subtracted(n: Int) extends Event
Now that we have a definition for our state and possible events, we can write a handler that will process these events:
[block3.scala](https://gist.github.com/besuikerd/b9265ac0a285d7e096ce27332cbdf18e#file-block3-scala)
type EventHandler = (State, Event) => State
val eventHandler: EventHandler =
(state, event) =>
event match {
case Added(n) => state.copy(n = state.n + n)
case Subtracted(n) => state.copy(n = state.n - n)
}
We can test the event handler to verify that the events are processed correctly:
[block4.scala](https://gist.github.com/besuikerd/a63482d631107621c5d3b4e3142dff9b#file-block4-scala)
eventHandler(State(2), Added(2)) shouldBe State(4)
eventHandler(State(42), Subtracted(21)) shouldBe State(21)
Commands
In event sourcing, events are immutable facts that happened. These events should be handled deterministically without any side effect. However, sometimes we need to perform side effects. For example when we need to query a database to check if an operation is allowed. We can use the command abstraction for this purpose. A command is a request to do something. Requests can be accepted or denied, or even transformed. They are also allowed to perform side effects. We can define commands for addition and subtraction:
[block5.scala](https://gist.github.com/besuikerd/7ad7221547e7cbc05b73a7a333bf737c#file-block5-scala)
sealed trait Command
case class Add(n: Int) extends Command
case class Sub(n: Int) extends Command
A command handler can process these commands and decide to fire zero or more events:
[block6.scala](https://gist.github.com/besuikerd/161e166621d79da4cc2769a09b2e5aa6#file-block6-scala)
type CommandHandler = (State, Command) => Seq[Event]
val commandHandler: CommandHandler =
(state, command) => command match {
case Add(n) => Seq(Added(n))
case Sub(n) => Seq(Subtracted(n))
}
We can test the command handler to verify it will fire events accordingly:
[block7.scala](https://gist.github.com/besuikerd/0f08b31ca39ca0bac9a16dfdd6c0e993#file-block7-scala)
commandHandler(State(0), Add(2)) shouldBe Seq(Added(2))
commandHandler(State(0), Subtract(42)) shouldBe Seq(Subtracted(42))
The command handler and event handler can be folded together to calculate the state for a given list of commands:
[block8.scala](https://gist.github.com/besuikerd/c115e2df5c28c50797abd0fa951ccbe5#file-block8-scala)
def combined(commands: Seq[Command]): State =
commands.foldLeft(State(0)) {
case (state, command) =>
val events = commandHandler(state, command)
events.foldLeft(state)(eventHandler)
}
This all works fine, but if we want to recover the state during a crash or restart, we also need to store the events that we persist. We need a function that accumulates the events while calculating the state:
[block9.scala](https://gist.github.com/besuikerd/5a9e4ca0ef7e199caddb3b16c105aef3#file-block9-scala)
def combinedWithEvents(commands: Seq[Command]): (State, Seq[Event]) =
commands.foldLeft((State(0), Seq.empty[Event])) {
case ((state, accumulatedEvents), command) =>
val events = commandHandler(state, command)
val nextState = events.foldLeft(state)(eventHandler)
(nextState, accumulatedEvents ++ events)
}
We can keep the accumulated state in memory during processing and at the same time persist the generated events somewhere. On restarts we can replay these events with the eventHandler to restore our state.
Akka Persistence
This pattern is encoded in Akka Persistence and allows us to have actors with state that can be recovered after crashes and restarts. The command handler is a little bit different. Instead of returning a list of events that happened, you can specify an Effect. These effects are simply an encoding of possible actions a persistent actor can do after receiving a command:
- Persist an event
- Stop the actor
- Stash the command
- Do nothing
These effects can be composed together to (for example) persist multiple events. In our example we can write a simple command handler:
[block10.scala](https://gist.github.com/besuikerd/94f2c3c255bdff421c5a3de1ad0ae6a9#file-block10-scala)
type CommandHandler[Command, Event, State] = (State, Command) ⇒ Effect[Event, State]
val commandHandler: CommandHandler[Command, Event, State] = (state, command) =>
command match {
case Add(n) => Effect.persist(Added(n))
case Sub(n) => Effect.persist(Subtracted(n))
}
Asynchronous command handling
One issue about the commandHandler is that it is synchronous. There are currently no plans for aynchronous command handlers in akka persistence. If you want to do some asynchronous processing before deciding to persist an event, you need to introduce extra commands. For example, lets say we want to have a check if a specific addition or subtraction is allowed before we emit an event. We really need to do this asynchronously for some reason, so lets create a definition of our permission check:
[block11.scala](https://gist.github.com/besuikerd/582ff0a0dafe23067676abe807466311#file-block11-scala)
type CheckPermission = Int => Future[Boolean]
val trivialCheck: CheckPermission = _ => Future.successful(true)
To be able to add this to our command handler, we need an extra command that is fired after validation. We also group our previous commands into a subtype so it can be a parameter of our new command:
[block12.scala](https://gist.github.com/besuikerd/9169c542dd68d396b4b30189f260ffc1#file-block12-scala)
sealed trait Command
sealed trait AlgebraicCommand extends Command { def n: Int }
case class Add(n: Int) extends AlgebraicCommand
case class Sub(n: Int) extends AlgebraicCommand
case class OperationAllowed(algebraicCommand: AlgebraicCommand) extends Command
With this definition we can rewrite our event handler to take this check into account:
[block13.scala](https://gist.github.com/besuikerd/fb292d22d42a9a767bb2296da0562a2e#file-block13-scala)
def commandHandler(self: ActorRef[Command], checkPermission: CheckPermission): CommandHandler[Command, Event, State] =
(state, command) =>
command match {
case algebraicCommand: AlgebraicCommand =>
checkPermission(algebraicCommand.n).map {
case true =>
ctx.self ! OperationAllowed(op)
case _ => ()
}
Effect.none
case OperationAllowed(algebraicCommand) =>
algebraicCommand match {
case Add(n) => Effect.persist(Added(n))
case Sub(n) => Effect.persist(Subtracted(n))
}
}
This does work, however we lost a property that might be important to us. The order in which the algebraic commands are processed is lost due to the asynchronous boundary. Say our check is really slow for some specific elements. Other elements that arrived later might have been processed already and arrive out of order:
[block14.scala](https://gist.github.com/besuikerd/9c4fe71b95328fc79537ff311f8ac986#file-block14-scala)
val slowCheck = (n: Int) =>
Future {
if (n == 3) {
Thread.sleep(3000)
}
true
}
If we would process the following commands in order, the outcome might have a different order:
[block15.scala](https://gist.github.com/besuikerd/d474236e670ad484003575cc6b170d5a#file-block15-scala)
val commands = Seq(Add(2), Add(3), Sub(2))
val events = processCommands(commands)
events shouldBe (Added(2), Subtracted(2), Added(3)) //They arrived in the wrong order
You could fix this by storing inflight messages in some (non-persistent) state, or by using the ask pattern and waiting for replies before sending each command. Currently (as far as I am aware) you are unable to store volatile state in a persistent actor. This means that if you want to store messages that are in flight, you need to use persistence for this. We can extend the state to store this along with a persistent event to signal inflight messages:
[block16.scala](https://gist.github.com/besuikerd/171295d8524864177e7a8604f129d531#file-block16-scala)
case class State(n: Int, inFlight: Option[AlgebraicCommand])
case class Inflight(command: AlgebraicCommand) extends Event
Previously we only fired a command if an operation is allowed, but since we also need to unstash if an operation is not allowed, we always need to fire a command for a result. So we need to modify our OperationAllowed command:
[block17.scala](https://gist.github.com/besuikerd/526911697c11ec573958e92287503493#file-block17-scala)
case class CheckPermissionResult(
algebraicCommand: AlgebraicCommand,
allowed: Boolean
) extends Command
Our event handler is now responsible for handling this extra event and cleaning up after a command has been successfully processed:
[block18.scala](https://gist.github.com/besuikerd/060de6dd2909eaacf1a5b6a221b080fa#file-block18-scala)
val eventHandler: EventHandler[State, Event] = (state, event) =>
event match {
case Added(n) => state.copy(n = state.n + n, inFlight = None)
case Subtracted(n) => state.copy(n = state.n - n, inFlight = None)
case Inflight(command) => state.copy(inFlight = Some(command))
}
Finally we need to rewrite our command handler to stash incoming commands as long as there is still a message in flight. After a command is successfully processed, we need to unstash to continue processing potentially stashed commands:
[stashing-commandhandler.scala](https://gist.github.com/besuikerd/ff5ae12ad86652802ce1b4747c75f27b#file-stashing-commandhandler-scala)
def commandHandler: CommandHandler[Command, Event, State] =
(state, command) =>
command match {
case algebraicCommand: AlgebraicCommand =>
state.inFlight match {
case Some(inFlight) => Effect.stash()
case None =>
Effect.persist[Event, State](Inflight(algebraicCommand)) thenRun { _ =>
checkPermission(algebraicCommand.n).map { allowed =>
ctx.self ! CheckPermissionResult(algebraicCommand, allowed)
}
}
}
case CheckPermissionResult(algebraicCommand, allowed) =>
val effect: EffectBuilder[Event, State] =
if (allowed) {
algebraicCommand match {
case Add(n) => Effect.persist(Added(n))
case Sub(n) => Effect.persist(Subtracted(n))
}
} else Effect.none
effect.thenUnstashAll()
}
After all this there are still a few concerns with this implementation. What if checkPermission fails? We would need to extend the example to deal with failing futures as well. Moreover, we persist the state of inflight messages so it survives restarts. However after a restart this message is not in flight and we might wait for eternity for it to resolve. This example illustrates that you can handle commands asynchronously, but in order to ensure messages are processed in the correct order, we needed to add error-prone synchronisation code.
Conclusion
You can do asynchronous command handling with Akka Persistence. It does however require you to write some error-prone boilerplate code. Can we do better? Are persistent actors the correct approach for this problem? Maybe we can express the problem in a different paradigm where we still have the nice property of state recovery, while also allowing us to handle commands asynchronously. Maybe we could use stream processing to have a cleaner solution to our problem. But that is for another blog post.