Async Callbacks

The freestyle-async module makes it possible to use asynchronous callback-based APIs with freestyle.

Example

Let’s imagine an API that allows us to find books written by a certain author. We need to supply a callback function handling both a successful case where the books have been found, and a case where something went wrong.

case class Author(name: String) extends AnyVal
// defined class Author

case class Book(title: String, year: Int)
// defined class Book

trait LibraryAPI {
  def findBooks(author: Author)(withBooks: List[Book] => Unit, error: Throwable => Unit): Unit
}
// defined trait LibraryAPI

We can use this type of callback-based (asynchronous) API in freestyle by using the freestyle-async module.

To use the freestyle-async module, you need to include one of the following dependencies:

libraryDependencies += "io.frees" %% "freestyle-async" % "0.3.1"

// and if you want to use Monix' Task:
libraryDependencies += "io.frees" %% "freestyle-async-monix" % "0.3.1"

// and if you want to use FS2's Task:
libraryDependencies += "io.frees" %% "freestyle-async-fs2" % "0.3.1"

The standard freestyle imports:

import freestyle._
import freestyle.implicits._

The imports for the freestyle-async module:

import freestyle.async._
import freestyle.async.implicits._

Now if we want to create a freestyle program which uses the findBooks function and returns all the books sorted by the year they were written, we can use the AsyncM effect:

def sortedBooks[F[_]: AsyncM](lib: LibraryAPI)(author: Author): FreeS[F, List[Book]] =
  AsyncM[F].async[List[Book]] { cb =>
    lib.findBooks(author)({ books => cb(Right(books)) }, { error => cb(Left(error)) })
  }
// sortedBooks: [F[_]](lib: LibraryAPI)(author: Author)(implicit evidence$1: freestyle.async.AsyncM[F])freestyle.FreeS[F,List[Book]]

A simple demo implementation for the LibararyAPI:

case class NoBooksFound(author: Author) extends Exception(s"No books found from ${author.name}")
// defined class NoBooksFound

object Library extends LibraryAPI {
  def findBooks(author: Author)(withBooks: List[Book] => Unit, error: Throwable => Unit): Unit =
    if (author.name.toLowerCase == "dickens") error(NoBooksFound(author))
    else withBooks(Book("The Old Man and the Sea", 1951) :: Book("1984", 1948) :: Book("On the Road", 1957) :: Nil)
}
// defined object Library

Now we can create simple programs requesting the books for certain authors:

val getSorted: Author => FreeS[AsyncM.Op, List[Book]] =
  sortedBooks[AsyncM.Op](Library) _
// getSorted: Author => freestyle.FreeS[freestyle.async.AsyncM.Op,List[Book]] = $$Lambda$4515/866772812@36a7c6f9

val dickensBooks = getSorted(Author("Dickens"))
// dickensBooks: freestyle.FreeS[freestyle.async.AsyncM.Op,List[Book]] = Free(...)

val otherBooks   = getSorted(Author("HemingwayOrwellKerouac"))
// otherBooks: freestyle.FreeS[freestyle.async.AsyncM.Op,List[Book]] = Free(...)

We can run these programs using:

  • FS2’s Task as target (we need to have an fs2.Strategy in implicit scope):
import freestyle.asyncFs2.implicits._
// import freestyle.asyncFs2.implicits._

import scala.concurrent.{Await, ExecutionContext}
// import scala.concurrent.{Await, ExecutionContext}

import scala.concurrent.duration.Duration
// import scala.concurrent.duration.Duration

import _root_.fs2.{Strategy, Task => Fs2Task}
// import _root_.fs2.{Strategy, Task=>Fs2Task}

import _root_.fs2.interop.cats._
// import _root_.fs2.interop.cats._

implicit val strategy = Strategy.fromExecutionContext(ExecutionContext.Implicits.global)
// strategy: fs2.Strategy = Strategy

val fs2Task1 = dickensBooks.interpret[Fs2Task]
// fs2Task1: fs2.Task[List[Book]] = Task

val fs2Task2 = otherBooks.interpret[Fs2Task]
// fs2Task2: fs2.Task[List[Book]] = Task
Await.result(fs2Task1.unsafeRunAsyncFuture, Duration.Inf)
// NoBooksFound: No books found from Dickens
//   at Library$.findBooks(<console>:33)
//   at .$anonfun$sortedBooks$1(<console>:31)
//   at .$anonfun$sortedBooks$1$adapted(<console>:30)
//   at fs2.Task$.$anonfun$async$2(Task.scala:247)
//   at fs2.Task$.$anonfun$async$2$adapted(Task.scala:246)
//   at fs2.internal.Future.listen(Future.scala:34)
//   at fs2.internal.Future.runAsync(Future.scala:69)
//   at fs2.Task.unsafeRunAsync(Task.scala:100)
//   at fs2.Task.unsafeRunAsyncFuture(Task.scala:109)
//   ... 307 elided
Await.result(fs2Task2.unsafeRunAsyncFuture, Duration.Inf)
// res1: List[Book] = List(Book(The Old Man and the Sea,1951), Book(1984,1948), Book(On the Road,1957))
  • Monix’ Task as target (we need to have an ExecutionContext in implicit scope):
import freestyle.asyncMonix.implicits._
// import freestyle.asyncMonix.implicits._

import monix.eval.{Task => MonixTask}
// import monix.eval.{Task=>MonixTask}

import monix.cats._
// import monix.cats._

import monix.execution.Scheduler
// import monix.execution.Scheduler

implicit val executionContext = Scheduler.Implicits.global
// executionContext: monix.execution.Scheduler = monix.execution.schedulers.AsyncScheduler@50e293ec

val monixTask1 = dickensBooks.interpret[MonixTask]
// monixTask1: monix.eval.Task[List[Book]] = Task.FlatMap(Task@268871536, monix.eval.Task$$$Lambda$4582/1189693075@2244df77)

val monixTask2 = otherBooks.interpret[MonixTask]
// monixTask2: monix.eval.Task[List[Book]] = Task.FlatMap(Task@974906720, monix.eval.Task$$$Lambda$4582/1189693075@7049ab4)
Await.result(monixTask1.runAsync, Duration.Inf)
// NoBooksFound: No books found from Dickens
//   at Library$.findBooks(<console>:33)
//   at .$anonfun$sortedBooks$1(<console>:31)
//   at .$anonfun$sortedBooks$1$adapted(<console>:30)
//   at freestyle.asyncMonix.implicits$$anon$1$$anon$2.run(async.scala:29)
//   at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:140)
//   at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
//   at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
//   at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
//   at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Await.result(monixTask2.runAsync, Duration.Inf)
// res3: List[Book] = List(Book(The Old Man and the Sea,1951), Book(1984,1948), Book(On the Road,1957))
  • Future as target (we need to have an ExecutionContext in implicit scope):
import scala.concurrent.Future
// import scala.concurrent.Future

// import scala.concurrent.ExecutionContext.Implicits.global

import cats.implicits._
// import cats.implicits._

val fut1 = dickensBooks.interpret[Future]
// fut1: scala.concurrent.Future[List[Book]] = Future(Failure(NoBooksFound: No books found from Dickens))

val fut2 = otherBooks.interpret[Future]
// fut2: scala.concurrent.Future[List[Book]] = Future(<not completed>)
Await.result(fut1, Duration.Inf)
// NoBooksFound: No books found from Dickens
//   at Library$.findBooks(<console>:33)
//   at .$anonfun$sortedBooks$1(<console>:31)
//   at .$anonfun$sortedBooks$1$adapted(<console>:30)
//   at freestyle.async$Implicits$$anon$2$$anon$3.run(async.scala:44)
//   at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:140)
//   at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
//   at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
//   at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
//   at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Await.result(fut2, Duration.Inf)
// res7: List[Book] = List(Book(The Old Man and the Sea,1951), Book(1984,1948), Book(On the Road,1957))