import cats.effect.{Effect, IO} import fs2._ import scala.concurrent.ExecutionContext def runConcat[F[_], A](first: Stream[F, A], second: Stream[F, A])( implicit F: Effect[F], ec: ExecutionContext): Stream[F, A] = { type Step = AsyncPull[F, Option[(Segment[A, Unit], Stream[F, A])]] def readFull(s: Step): Pull[F, A, Unit] = s.pull.flatMap { case None => Pull.done case Some((hd, tl)) => Pull.output(hd) >> tl.pull.echo } def go(first: Step, second: Step): Pull[F, A, Unit] = first.race(second).pull.flatMap { case Left(Some((hd, first))) => //reading first stream Pull.output(hd) >> first.pull.unconsAsync.flatMap(go(_, second)) case Left(None) => //first stream has ended, starting to read second readFull(second) case Right(Some((hd, second))) => //Second stream has emitted something hd.uncons1 match { case Left(_) => //that was empty segment, ignore and continue to pull from both streams second.pull.unconsAsync.flatMap(go(first, _)) case Right((hd, tl)) => //Ok, stop reading second stream until first is over readFull(first) >> Pull.output(tl.cons(hd)) >> second.pull.echo } case Right(None) => readFull(first) } first.pull.unconsAsync .flatMap(first => second.pull.unconsAsync.flatMap(second => go(first, second))) .stream } //let's test import ExecutionContext.Implicits._ import scala.concurrent.duration._ Scheduler .apply[IO](3) .flatMap { sch => val a = sch.sleep_[IO](3.seconds) ++ Stream("a", "b", "c") .segmentLimit(1) .flatMap(Stream.segment) .covary[IO] .flatMap(s => sch.sleep_[IO](3.seconds) ++ Stream(s)) val b = sch .awakeEvery[IO](1.seconds) .take(3) .flatMap(_ => Stream.eval_(IO(println("--- WORKING...")))) ++ Stream("d", "e", "f").segmentLimit(1).flatMap(Stream.segment).covary[IO] ++ sch .awakeEvery[IO](1.seconds) .take(3) .flatMap(_ => Stream.eval_(IO(println("--- WORKING...")))) ++ Stream("d", "e", "f") runConcat(a, b) } .evalMap(e => IO(println(e))) .run .unsafeRunSync()