Skip to content

Instantly share code, notes, and snippets.

@SystemFw
Last active June 1, 2019 03:15
Show Gist options
  • Select an option

  • Save SystemFw/256205f51bf0135e4c6fd95dee4590fc to your computer and use it in GitHub Desktop.

Select an option

Save SystemFw/256205f51bf0135e4c6fd95dee4590fc to your computer and use it in GitHub Desktop.
Cats-effect, blocking, RankN-types.

cats-effect

The cats-effect project defines a purely functional effect type (IO[A]), and associated typeclass defining its behaviour. The ones we care about for this example are:

trait Sync[F[_]] extends MonadError[F, Throwable] {
   def delay[A](a: => A): F[A]
   ...
}

trait Async[F[_] extends Sync[F] {
  def async[A](cb: (Either[Throwable,A] => Unit) => Unit): F[A]
  ...
}

which describe the ability of effects type like IO to embed and suspend synchronous and asynchronous side-effects

Blocking, shift, the shift-and-shift-back pattern

The design of cats-effect is oriented towards non-blocking functionality, and includes its own green-threading and "semantic blocking" primitives that don't actually block the underlying JVM thread. This allows users to run an arbitrary number of IOs on top of a CPU bound pool like global, and even on a single thread (like in scala.js). However, when interacting with legacy Java apis, one often deals with code that blocks JVM threads. The most common pattern to deal with this situation, after wrapping the blocking code in F[_]: Sync, is to run it on a custom thread pool that's specific to blocking code, and then run the rest of the computation back in the "normal" pool. This pattern is informally called "shift-and-shift-back", and relies on a function called shift, which can be defined for any F[_]: Async

def shift[F[_]](ec: ExecutionContext)(implicit F: Async[F]): F[Unit] =
    F.async { cb => ... }

What shift does is shift whatever F[A] is flatMapped after to ec, until a subsequent call to shift moves it somewhere else.

Example, say you have some code that makes a JDBC call (notoriously blocking).

def dbCall[F[_]: Sync]: F[MyType] = F.delay {... query that blocks... }

which is then followed my normal async or cpu-bound code, like an http4s http call, or some data transformation

def post[F[_]: Async](a: MyType): F[HttpResponse] = ....

and let's say that we have an EC for blocking, called blockingEC and we run everything else on global. An example of "shift-and-shift-back" would be:

 def prog[F[_]: Async]: F[HttpResponse] = 
   for {
     _ <- Async.shift[F](blockingEC]
     myType <- dbCall[F]
     _ <- Async.shift(global)
   res <- post[F](myType)
  } yield res

A blocking combinator

The above pattern works well enough, but it's a bit tedious and error-prone for the end user to have to remember to do it by hand each time, so let's write a combinator for it (for those familiar with cats-effect, I'm omitting Timer and bracket for simplicity):

def blocking[F[_]: Async, A](p: F[A], blockingEC: ExecutionContext)(implicit ec: ExecutionContext): F[A] = 
  for {
   _ <- Async.shift[F](blockingEc)
   res <- p
   _ <- Async.shift
 } yield res

The interesting part here is that we need an Async constraint on F in order to call shift.

However there's a problem: p is of type F[A] where F is Async, so who guarantees that p itself doesn't contain shifting calls already, which might make blocking useless (since p might end up not running on blockingEc as desired, given a call to shift inside p that moves it somewhere else).

Note that if we change the signature of blocking to (note the Sync constraint):

def blocking[F[_]: Sync, A](p: F[A], blockingEC: ExecutionContext)(implicit ec: ExecutionContext): F[A]

We solve the problem for p, but can't call shift anymore se we can't implement it.

This doesn't work either:

def blocking[F[_]: Async, G[_]: Sync A](p: G[A], blockingEC: ExecutionContext)(implicit ec: ExecutionContext): F[A]

because we would need to flatMap F and G in the same for comprehension, and there's no way to unify those.

RankN types

What we need is a RankN type: we want p to be passed as a polymorphic value, so that it's the callee (blocking) that decides which F is p instantiated to, and not the caller (which is in control with normal parametric polymorphism).

The type we need in Haskell can be written like so:

blocking :: Async f => (forall g. Sync g => g a) => BlockingEC => EC => f a

the caller to blocking can't pass an async value like IO[Int] and the g a to blocking, because blocking requires the value to be fully polymorphic in g, but it can pass a value defined over any Sync g => g. However, inside blocking g a can be instantiated to whatever g has a Sync instance, and since Sync extends Async, f is a valid value. So basically we get what we want: blocking can call shift, and is assured that g a is a synchronous value that doesn't do any shifting itself. Apologies for the lack of Scala in this last paragraph, but Scala doesn't support RankN types or polymorphic values, and I think this is a compelling example (there are ways to simulate this specific scenario I think). There are many other use cases though, ranging from Free.foldMap to the ST monad, to safe resource region that are enforced at compile time, to many more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment