Skip to content

Instantly share code, notes, and snippets.

@avakhrenev
Last active January 22, 2019 14:35
Show Gist options
  • Save avakhrenev/3325d924f3b47a6f887d95022cce06a2 to your computer and use it in GitHub Desktop.
Save avakhrenev/3325d924f3b47a6f887d95022cce06a2 to your computer and use it in GitHub Desktop.

Revisions

  1. avakhrenev revised this gist Sep 15, 2017. 1 changed file with 1 addition and 4 deletions.
    5 changes: 1 addition & 4 deletions runConcat.scala
    Original file line number Diff line number Diff line change
    @@ -17,10 +17,7 @@ def runConcat[F[_], A](first: Stream[F, A], second: Stream[F, A])(
    case Left(Some((hd, first))) => //reading first stream
    Pull.output(hd) >> first.pull.unconsAsync.flatMap(go(_, second))
    case Left(None) => //first stream has ended, starting to read second
    second.pull.flatMap {
    case None => Pull.done
    case Some((hd, tl)) => Pull.output(hd) >> tl.pull.echo
    }
    readFull(second)
    case Right(Some((hd, second))) => //Second stream has emitted something
    hd.uncons1 match {
    case Left(_) => //that was empty segment, ignore and continue to pull from both streams
  2. avakhrenev created this gist Sep 15, 2017.
    61 changes: 61 additions & 0 deletions runConcat.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,61 @@
    import cats.effect.{Effect, IO}
    import fs2._

    import scala.concurrent.ExecutionContext

    def runConcat[F[_], A](first: Stream[F, A], second: Stream[F, A])(
    implicit F: Effect[F],
    ec: ExecutionContext): Stream[F, A] = {
    type Step = AsyncPull[F, Option[(Segment[A, Unit], Stream[F, A])]]
    def readFull(s: Step): Pull[F, A, Unit] =
    s.pull.flatMap {
    case None => Pull.done
    case Some((hd, tl)) => Pull.output(hd) >> tl.pull.echo
    }
    def go(first: Step, second: Step): Pull[F, A, Unit] =
    first.race(second).pull.flatMap {
    case Left(Some((hd, first))) => //reading first stream
    Pull.output(hd) >> first.pull.unconsAsync.flatMap(go(_, second))
    case Left(None) => //first stream has ended, starting to read second
    second.pull.flatMap {
    case None => Pull.done
    case Some((hd, tl)) => Pull.output(hd) >> tl.pull.echo
    }
    case Right(Some((hd, second))) => //Second stream has emitted something
    hd.uncons1 match {
    case Left(_) => //that was empty segment, ignore and continue to pull from both streams
    second.pull.unconsAsync.flatMap(go(first, _))
    case Right((hd, tl)) => //Ok, stop reading second stream until first is over
    readFull(first) >> Pull.output(tl.cons(hd)) >> second.pull.echo
    }
    case Right(None) => readFull(first)
    }
    first.pull.unconsAsync
    .flatMap(first => second.pull.unconsAsync.flatMap(second => go(first, second)))
    .stream
    }

    //let's test
    import ExecutionContext.Implicits._
    import scala.concurrent.duration._
    Scheduler
    .apply[IO](3)
    .flatMap { sch =>
    val a = sch.sleep_[IO](3.seconds) ++ Stream("a", "b", "c")
    .segmentLimit(1)
    .flatMap(Stream.segment)
    .covary[IO]
    .flatMap(s => sch.sleep_[IO](3.seconds) ++ Stream(s))
    val b = sch
    .awakeEvery[IO](1.seconds)
    .take(3)
    .flatMap(_ => Stream.eval_(IO(println("--- WORKING...")))) ++
    Stream("d", "e", "f").segmentLimit(1).flatMap(Stream.segment).covary[IO] ++ sch
    .awakeEvery[IO](1.seconds)
    .take(3)
    .flatMap(_ => Stream.eval_(IO(println("--- WORKING...")))) ++ Stream("d", "e", "f")
    runConcat(a, b)
    }
    .evalMap(e => IO(println(e)))
    .run
    .unsafeRunSync()