ManagedT: Compositional resource management
When bootstrapping a new application, you’d typically like to nicely structure the way resources (which we’ll define in a moment) are acquired. Resources should be allowed to depend on each other, imposing a sequential order on the startup sequence. This pretty much falls out of the module structure:
import monix.eval.Task
trait KinesisWriter
trait EventWriter
trait App
def acquireKinesis: Task[KinesisWriter] = ???
def acquireEventWriter(k: KinesisWriter): Task[EventWriter] = ???
def runApp(k: KinesisWriter, e: EventWriter): Task[Unit] = ???
Ruling out usage of naughty null
s and var
s, it is impossible to avoid sequential ordering of acquireKinesisWriter
, acquireEventWriter
and runApp
when starting up. However, say we also have close
methods which must be called on those writers:
trait KinesisWriter {
def close: Task[Unit] = Task.unit
}
trait EventWriter {
def close: Task[Unit] = Task.unit
}
It’s pretty important to call these close
methods in the right order - which is usually the reverse acquisition order. There are no function arguments to guide us in this case, so care must be taken; the EventWriter
might be buffering events to be written to Kinesis, and if we close that KinesisWriter
under its feet, some writes might be lost.
Composing resource acquisition means composing functions that return Task
. That’s monadic composition, and we can use for-comprehensions for that:
def acquireKinesis = Task(new KinesisWriter {})
def acquireEventWriter(kinesis: KinesisWriter) = Task(new EventWriter {})
def runApp(eventWriter: EventWriter) = Task.unit
val appTask = for {
kinesis <- acquireKinesis
eventWriter <- acquireEventWriter(kinesis)
_ <- runApp(eventWriter)
} yield ()
This doesn’t do anything for the cleanup handlers. Imperative programming offers us the try-catch-finally
statement, that looks like this:
val resource = acquire()
try {
use(resource)
} finally {
release(resource)
}
This assures that release(resource)
will be called, regardless if use(resource)
throws or not. If we want to compose resource acquisition, we nest the blocks. This doesn’t handle any sort of effect; errors are signaled through exceptions.
bracket
bracket
is a functional version of try-catch-finally
, that can be used with effects. Here’s bracket
from monix.eval.Task
’s master branch, slightly modified to be non-infix:
object Task {
def bracket[R, A](acquire: Task[R])(use: R => Task[A])(release: R => Task[Unit]): Task[A]
}
acquire
is a value that produces the resource; use
is a function that uses the resource to produce an A
; release
is a function that releases the resource.
We can write our own version of bracket
(that is slightly broken in that it doesn’t take cancellation into account, but we can leave this out for now) using combinators from the MonadError[Task, Throwable]
typeclass instance:
import cats.implicits._
def bracket[R, A](acquire: Task[R])(use: R => Task[A])(release: R => Task[Unit]): Task[A] =
for {
resource <- acquire
resultOrError <- use(resource).attempt
result <- resultOrError match {
case Right(a) =>
release(resource).map(_ => a)
case Left(e) =>
release(resource).flatMap(_ => e.raiseError[Task, A])
}
} yield result
We may want to convince ourselves that this function works given the various failure points it has:
- if
acquire
fails, nothing happens and the error is returned; - if
use
fails, its error will be surfaced in theEither
returned fromattempt
. This effectively means that line will always succeed; - if
release
in theRight
case fails, its error will be returned frombracket
anda
would be discarded; - if
release
in theLeft
case fails, its error will be returned frombracket
and the original error will be discarded.
The behavior in the last two points might be arguable, in that it may make sense to always return a
or to return the error from use
, but the important point here is that release
will always be called.
Note that we only used map
, flatMap
, attempt
and raiseError
. That means we can abstract Task
out too, and constrain F[_]
to have an instance of MonadError
:
import cats.MonadError
def bracket[F[_], R, A](acquire: F[R])(use: R => F[A])(
release: R => F[Unit])(implicit F: MonadError[F, Throwable]): F[A] =
for {
resource <- acquire
resultOrError <- use(resource).attempt
result <- resultOrError match {
case Right(a) =>
release(resource).map(_ => a)
case Left(e) =>
release(resource).flatMap(_ => e.raiseError[F, A])
}
} yield result
We can now build our application as such:
trait KinesisWriter {
def close: Task[Unit] = Task.unit
}
object KinesisWriter {
def acquire: Task[KinesisWriter] = Task(new KinesisWriter {})
}
trait EventWriter {
def close: Task[Unit] = Task.unit
}
object EventWriter {
def acquire(k: KinesisWriter): Task[EventWriter] = Task(new EventWriter {})
}
val app = bracket(KinesisWriter.acquire) { kw =>
bracket(EventWriter.acquire(kw)) { ew =>
Task.unit
}(_.close)
}(_.close)
One can only imagine how that would look if we have 15 resources with complex dependencies between them. This is the exact opposite of compositionality; everything has to be lumped together. bracket
looks like a good direction, but we have to improve on it to make it actually nice to use.
ManagedT
When in doubt, turn to Haskell: most mundane functional programming problems have already been explored there. Fabio Labella helpfully pointed me to Gabriel Gonzalez’ managed library that does exactly what we’ll construct here.
First, let’s re-arrange the bracket
signature slightly:
def bracketCurried[F[_], R, A](acquire: F[R])(release: R => F[Unit])(
implicit F: MonadError[F, Throwable]): (R => F[A]) => F[A] = { use =>
bracket(acquire)(use)(release)
}
What we’ve done is return a function that, when given the use
action, will run the acquision
, use
and cleanup
actions. By using the original bracket
, we show ourselves that this version is compatible with the previous.
We can make another mechanical change: instead of currying the function, let’s make this a case class (called ManagedT
) and move acquire
and release
to the constructor:
case class ManagedT[F[_], R](acquire: F[R])(release: R => F[Unit])(
implicit F: MonadError[F, Throwable]) {
def apply[A](use: R => F[A]): F[A] = bracket(acquire)(use)(release)
}
We also moved A
into the method definition, so the same instance of ManagedT
can be used with multiple use
actions, not necessarily of the same type. Again, still using the original bracket
.
What we have now is a data type that represents resource acquisition and release, decoupled from the code that uses it. We now only need to be able to compose several instances of this data type.
Monad[ManagedT[F, ?]]
When I see a data type that needs to be composed sequentially, monads immediately pop into my mind. Here are the ManagedT
instances for the Kinesis and Event writers:
val kinesis = ManagedT(KinesisWriter.acquire)(_.close)
def event(k: KinesisWriter) = ManagedT(EventWriter.acquire(k))(_.close)
The monad smell grows stronger as we see that the form of event
is KinesisWriter => ManagedT[Task, EventWriter]
- a Kleisli arrow for ManagedT[Task, ?]
. Let’s write the pure
and flatMap
functions required to write the Monad
instance.
def pure[F[_], R](r: R)(implicit F: MonadError[F, Throwable]) =
ManagedT(r.pure[F])(_ => F.unit)
What pure
does for ManagedT
is lift a regular value into ManagedT
with no cleanup action. That’s simple enough. flatMap
is next. We should look at the expected signature first:
def flatMap[F[_], R1, R2](fr1: ManagedT[F, R1])(fr2: R1 => ManagedT[F, R2])(
implicit F: MonadError[F, Throwable]): ManagedT[F, R2]
This says that we must be able to use a managed R1
in order to create a managed R2
. That sounds like nesting resources. Here’s a first try:
scala> def flatMap[F[_], R1, R2](fr1: ManagedT[F, R1])(fr2: R1 => ManagedT[F, R2])(
| implicit F: MonadError[F, Throwable]): ManagedT[F, R2] =
| ManagedT {
| fr1 { r1 =>
| fr2(r1)(???)
| }
| } { r2 => ??? }
<console>:22: error: type mismatch;
found : R1 => F[Nothing]
required: R1 => F[A]
fr1 { r1 =>
^
<console>:22: error: type mismatch;
found : F[Nothing]
required: F[R]
Note: Nothing <: R, but type F is invariant in type _.
You may wish to define _ as +_ instead. (SLS 4.5)
fr1 { r1 =>
^
This is pretty awkward. We’re really contorting ourselves to fit the composed ManagedT
into the existing constructor. What we really want is to create a new instance of ManagedT[F, R2]
that will reuse fr1
and fr2
upon execution. We can rearrange the class layout again to make this possible:
abstract class ManagedT[F[_], R] {
def apply[A](use: R => F[A]): F[A]
}
object ManagedT {
def apply[F[_], R](acquire: => F[R])(cleanup: R => F[Unit])(
implicit F: MonadError[F, Throwable]): ManagedT[F, R] =
new ManagedT[F, R] {
def apply[A](use: R => F[A]): F[A] = bracket(acquire)(use)(cleanup)
}
}
Again, purely mechanical. Instead of capturing acquire
and cleanup
in the class constructor, we capture them when creating an anonymous instance in the ManagedT.apply
function. We can now actually skip on providing acquire
and cleanup
; this is great for flatMap
:
def flatMap[F[_], R1, R2](fr1: ManagedT[F, R1])(fr2: R1 => ManagedT[F, R2])(
implicit F: MonadError[F, Throwable]): ManagedT[F, R2] =
new ManagedT[F, R2] {
def apply[A](use: R2 => F[A]): F[A] =
fr1 { r1 =>
fr2(r1) { r2 =>
use(r2)
}
}
}
The definition basically falls out of the types. We no longer have to provide an acquire or cleanup action; we just delegate to the instances that were passed as arguments ot the function. To convince ourselves that this is doing the right thing, we can trace through what is happening in the following expression:
val composed = flatMap(kinesis)(k => event(k))
val app = composed { eventWriter =>
Task {
// do something with eventWriter
()
}
}
The app
value is a program that, when executed, will execute the apply
function of kinesis
(that is, r1
in flatMap
) with a use
function that is created by applying event
to the created KinesisWriter
, which is then applied to the supplied app
body.
Whew! It’s mind bending, but it works. Since we composed the usage actions as before, in a nested fashion, the cleanup handlers will also be executed correctly, in reverse order of acquisition. The ManagedT
data type and its monad instance for cats
is available here.
Yay, composition!
With just the 25 lines that it takes to define the monad instance, we get a whole host of incredible combinators. We’ve seen flatMap
- that means we can do for-comprehensions with ManagedT
values. But that’s mundane, so I won’t bore you with that.
How about applicative composition? You’ve got two independent resources, and want to use them? Sure thing:
trait KafkaWriter {
def close: Task[Unit] = Task.unit
}
object KafkaWriter {
def acquire: Task[KafkaWriter] = Task(new KafkaWriter {})
}
import com.iravid.managedt.ManagedT
// import com.iravid.managedt.ManagedT
val zipped = (ManagedT(KinesisWriter.acquire)(_.close),
ManagedT(KafkaWriter.acquire)(_.close)).tupled
// zipped: com.iravid.managedt.ManagedT[monix.eval.Task,(KinesisWriter, KafkaWriter)] = com.iravid.managedt.ManagedT$$anon$1$$anon$5@218a2462
zipped
will acquire and release both the KafkaWriter
and KinesisWriter
properly. Incidentally, it will do so in order of declaration, but that shouldn’t matter as they are independent. Need to initialize a list of resources of a size only known at runtime? No worries, we’ve got you covered:
def acquireNamed(name: String): Task[KafkaWriter] = KafkaWriter.acquire
// acquireNamed: (name: String)monix.eval.Task[KafkaWriter]
val writers = List("a", "b", "c") traverse acquireNamed
// writers: monix.eval.Task[List[KafkaWriter]] = Task.FlatMap$837147664
Yes, they’ll be acquired as they are specified in the list, and released in reverse order. That trick also works for Option
and Either
, if you’ve got a resource that should only conditionally be initialized.
There’s also a Monoid
instance for ManagedT
, if you’ve got a resource that has a Monoid
instance. You can combine the managed values as much as you like, and the cleanup actions will still be executed:
def resource(i: Int) = ManagedT(Task {
println(s"Acquiring ${i}")
i
})(_ => Task(println(s"Releasing $i")))
// resource: (i: Int)com.iravid.managedt.ManagedT[monix.eval.Task,Int]
val squashed = (1 to 5).toList.foldMap(resource)
// squashed: com.iravid.managedt.ManagedT[monix.eval.Task,Int] = com.iravid.managedt.ManagedT$$anon$1$$anon$5@37df48a4
val sum = squashed(sum => Task(println(s"Got $sum")))
// sum: monix.eval.Task[Unit] = Task.FlatMap$2133665162
import monix.execution.Scheduler.Implicits.global
// import monix.execution.Scheduler.Implicits.global
import scala.concurrent.Await, scala.concurrent.duration._
// import scala.concurrent.Await
// import scala.concurrent.duration._
Await.result(sum.runAsync, 1.second)
// Acquiring 1
// Acquiring 2
// Acquiring 3
// Acquiring 4
// Acquiring 5
// Got 15
// Releasing 5
// Releasing 4
// Releasing 3
// Releasing 2
// Releasing 1
It’s amazing that 25 lines unlock so much power. This is what I like about functional programming: it’s a programmer’s dream come true; write a measly amount of code, get back a boatload of reusable functionality.
Summary
ManagedT
is usable today as a way to elegantly compose resource acquisition in your application. There are a few caveats, though; they are detailed in the README, but the biggest one is that MonadError
cannot be used if the underlying F
is an effect that models cancellable computations. monix.eval.Task
supports cancellation, so all bets are off if you cancel your cleanup handlers.
cats-effect
is about to add the MonadBracket
typeclass. When that happens, the MonadError
constraint will be replaced with that and we should get stronger guarantess for the cleanup actions.
Enjoy!