FS2

Interleaving FS2 streams in Freestyle programs can be achieved through the algebra and interpreter provided by the freestyle-fs2 package. freestyle-fs2 allows you to run streams when interpreting free programs, using the target runtime monad as their effect type.

Familiarity with fs2 is assumed, take a look at the fs2 documentation if you haven’t before.

We’ll start by creating a simple algebra for our application for printing messages on the screen:

import freestyle._
// import freestyle._

@free trait Interact {
  def tell(msg: String): FS[Unit]
}
// defined trait Interact
// defined object Interact

Then, make sure to include the streams algebra StreamM in your application:

import freestyle._
// import freestyle._

import freestyle.implicits._
// import freestyle.implicits._

import freestyle.fs2._
// import freestyle.fs2._

import freestyle.fs2.implicits._
// import freestyle.fs2.implicits._

@module trait App {
  val interact: Interact
  val streams: StreamM
}
// defined trait App
// defined object App

Now that we’ve got our Interact algebra and StreamM in our app, we’re ready to write the first program:

import _root_.fs2.Stream
// import _root_.fs2.Stream

def program[F[_]](
  implicit app: App[F]
  ): FreeS[F, Vector[Int]] =  for {
    _ <- app.interact.tell("Hello")
	x <- app.streams.runLog(Stream.emits(List(1, 2, 3)))
	_ <- app.interact.tell(s"Result: ${x}")
  } yield x
// program: [F[_]](implicit app: App[F])freestyle.FreeS[F,Vector[Int]]

To run it, we need to create an implicit interpreter for our Interact algebra:

import cats._
// import cats._

implicit def interactInterp[F[_]](
  implicit ME: MonadError[F, Throwable]
): Interact.Handler[F] = new Interact.Handler[F] {
  def tell(msg: String): F[Unit] = {
    println(msg)
    ME.pure(())
  }
}
// interactInterp: [F[_]](implicit ME: cats.MonadError[F,Throwable])Interact.Handler[F]

And now we can run the program to a Future. Check how the stream’s value is printed to the console:

import cats.instances.future._
// import cats.instances.future._

import scala.concurrent._
// import scala.concurrent._

import scala.concurrent.duration._
// import scala.concurrent.duration._

import scala.concurrent.ExecutionContext.Implicits.global
// import scala.concurrent.ExecutionContext.Implicits.global

Await.result(program[App.Op].interpret[Future], Duration.Inf)
// Hello
// res0: Vector[Int] = Vector(1, 2, 3)

Stream operations

A handful of operations for running streams are exposed in the StreamM algebra.

runLog

We’ve already seen StreamM#runLog, that runs a stream accumulating its result in a Vector. In the following example, we use it to pass a Stream that emits the number 42 and ends:

def program[F[_]](
  implicit app: App[F]
): FreeS[F, Vector[Int]] = app.streams.runLog(Stream.emit(42))
// program: [F[_]](implicit app: App[F])freestyle.FreeS[F,Vector[Int]]

Await.result(program[App.Op].interpret[Future], Duration.Inf)
// res1: Vector[Int] = Vector(42)

runFold

We can run a fold over a stream too; let’s create a stream with a series of numbers. We use a type provided by freestyle-fs2 as the effect type for the stream (Eff), this allows the final stream effect type to be decided when running the program:

val aStream: Stream[Eff, Int] = Stream.emits(0 until 10)
// aStream: fs2.Stream[freestyle.fs2.Eff,Int] = Segment(Emit(Chunk(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)))

Now we can fold over the above stream by adding all of its numbers:

def program[F[_]](
  implicit app: App[F]
): FreeS[F, Int] = app.streams.runFold(0, (x: Int, y: Int) => x + y)(aStream)
// program: [F[_]](implicit app: App[F])freestyle.FreeS[F,Int]

Await.result(program[App.Op].interpret[Future], Duration.Inf)
// res2: Int = 45

runLast

Another option is to run a stream discarding all the results but the last one. Since the stream can be empty, the result of runLast is an Option:

def program[F[_]](
  implicit app: App[F]
): FreeS[F, Option[Int]] = app.streams.runLast(aStream)
// program: [F[_]](implicit app: App[F])freestyle.FreeS[F,Option[Int]]

Await.result(program[App.Op].interpret[Future], Duration.Inf)
// res3: Option[Int] = Some(9)

Streaming IO

The fs2 library comes with support for streaming IO through its fs2-io package, and it’s straightforward to integrate in freestyle programs.

Here is an example borrowed from the fs2 README of a fahrenheit to celsius converter process. In order for an IO stream to be able to run in a free program, we need to use Eff as the effect type for the stream:

import _root_.fs2.{io, text}
// import _root_.fs2.{io, text}

import java.nio.file.Paths
// import java.nio.file.Paths

def fahrenheitToCelsius(f: Double): Double =
  (f - 32.0) * (5.0/9.0)
// fahrenheitToCelsius: (f: Double)Double

val converter: Stream[Eff, Unit] = {
  io.file.readAll[Eff](Paths.get("testdata/fahrenheit.txt"), 4096)
    .through(text.utf8Decode)
    .through(text.lines)
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => fahrenheitToCelsius(line.toDouble).toString)
    .intersperse("\n")
    .through(text.utf8Encode)
    .through(io.file.writeAll(Paths.get("testdata/celsius.txt")))
}
// converter: fs2.Stream[freestyle.fs2.Eff,Unit] = evalScope(<scope>).flatMap(<function1>)

We can now interleave this stream inside our free programs, choosing the streams’ effect type when running the whole program:

def program[F[_]](
  implicit app: App[F]
): FreeS[F, Unit] = for {
 _ <- app.interact.tell("Converting from farenheit to celsius")
 _ <- app.streams.run(converter)
} yield ()
// program: [F[_]](implicit app: App[F])freestyle.FreeS[F,Unit]

Await.result(program[App.Op].interpret[Future], Duration.Inf)
// Converting from farenheit to celsius