Monix

Monix’ Task can be used as a target of your FreeS programs.

import freestyle.free._

We will use an example similar to the one used in the parallelism section. We will create a small algebra to validate numbers:

@free trait Validator {
  def isPositive: FS[Boolean]
  def isEven: FS[Boolean]
}
// defined trait Validator
// defined object Validator

We can assert that something should be positive and even by combining both operations:

import cats.syntax.apply._
// import cats.syntax.apply._

def isPositiveEven[F[_]](implicit V: Validator[F]): FreeS.Par[F, Boolean] =
  (V.isPositive, V.isEven).mapN(_ && _)
// isPositiveEven: [F[_]](implicit V: Validator[F])freestyle.free.FreeS.Par[F,Boolean]

Our algebra didn’t specify exactly what will be validated, so that gives us the liberty to do that in a handler.

We will create a handler that validates integers and can potentially do so asynchronously.

We can encode that we will validate integers by using Reader[Int, ?] and the (potential) asynchronicity with Monix Task, combining the two we end up with ReaderT[Task, Int, ?] or Kleisli[Task, Int, ?].

import cats.data.Kleisli
// import cats.data.Kleisli

import monix.eval.Task
// import monix.eval.Task

type ValidateInt[A] = Kleisli[Task, Int, A]
// defined type alias ValidateInt

def blockAndPrint(millis: Long, msg: String): Unit = {
  Thread.sleep(Math.abs(millis))
  println(msg)
}
// blockAndPrint: (millis: Long, msg: String)Unit

implicit val validateIntTaskHandler: Validator.Handler[ValidateInt] =
  new Validator.Handler[ValidateInt] {
    def isPositive: ValidateInt[Boolean] =
      Kleisli(i => Task.eval { blockAndPrint(i * 500L, s"isPositive($i)"); i > 0 })

    def isEven: ValidateInt[Boolean] =
      Kleisli(i => Task.eval { blockAndPrint(i * 250L, s"isEven($i)"); i % 2 == 0 })
  }
// validateIntTaskHandler: Validator.Handler[ValidateInt] = $anon$1@376f3d26

With the Freestyle implicits and the Monix to Cats conversions in scope, we can use our handler to interpret the isPositiveEven program:

import freestyle.free.implicits._
// import freestyle.free.implicits._

val check = isPositiveEven[Validator.Op].interpret[ValidateInt]
// check: ValidateInt[Boolean] = Kleisli(cats.data.Kleisli$$Lambda$10184/595186738@253d9a92)

We can pass some integers to the check program with the Kleisli#run method and run the Task as a Future using Task#runAsync:

import monix.execution.Scheduler.Implicits.global
// import monix.execution.Scheduler.Implicits.global

import scala.concurrent.Await
// import scala.concurrent.Await

import scala.concurrent.duration.Duration
// import scala.concurrent.duration.Duration

Await.result(check.run(1).runAsync, Duration.Inf)
// isPositive(1)
// isEven(1)
// res0: Boolean = false

Await.result(check.run(2).runAsync, Duration.Inf)
// isPositive(2)
// isEven(2)
// res1: Boolean = true

Await.result(check.run(-1).runAsync, Duration.Inf)
// isPositive(-1)
// isEven(-1)
// res2: Boolean = false

We can see that isPositive is always printed before isEven even though isEven doesn’t take as long as isPositive.

This happens because, in most Monad instances, the Applicative operations are implemented using flatMap, which means that the operations are sequential.

Monix however, allows parallel execution in batches, that does deterministic (ordered) signaling of results with the help of Task.

The following example uses Task.gather, which does parallel processing while preserving result ordering, but in order to ensure that parallel processing actually happens, the tasks need to be effectively asynchronous, which for simple functions need to fork threads.

val check2 = isPositiveEven[Validator.Op].interpret[ValidateInt]
// check2: ValidateInt[Boolean] = Kleisli(cats.data.Kleisli$$Lambda$10184/595186738@2e651dcc)

val items = 1 :: 2 :: -1 :: Nil
// items: List[Int] = List(1, 2, -1)

// The list of all tasks needed for execution
val tasks = items.map(check2.run(_))
// tasks: List[monix.eval.Task[Boolean]] = List(Task.FlatMap$1815848211, Task.FlatMap$976389827, Task.FlatMap$1849949731)

// Processing in parallel
val aggregate = Task.gather(tasks).map(_.toList)
// aggregate: monix.eval.Task[List[Boolean]] = Task.Map$1064819729

// Evaluation:
aggregate.foreach(println)
// isPositive(-1)
// isEven(-1)
// isPositive(2)
// isEven(2)
// isPositive(1)
// isEven(1)
// List(false, true, false)
// res6: monix.execution.CancelableFuture[Unit] = Async(Future(Success(())),monix.execution.cancelables.StackedCancelable$Impl@2a638dc1)

If ordering of results does not matter, you can also use Task.gatherUnordered instead of gather, which might yield better results, given its non-blocking execution.

Async

There is also the frees-async-cats-effect module mentioned in the async callback section, which allows you to work with callback-based APIs in freestyle and translate your FreeS program in the end to a type like Monix’ Task.