Skip to content

Instantly share code, notes, and snippets.

@SystemFw
Created July 9, 2018 10:32
Show Gist options
  • Select an option

  • Save SystemFw/168ff694eecf45a8d0b93ce7ef060cfd to your computer and use it in GitHub Desktop.

Select an option

Save SystemFw/168ff694eecf45a8d0b93ce7ef060cfd to your computer and use it in GitHub Desktop.

Revisions

  1. SystemFw created this gist Jul 9, 2018.
    37 changes: 37 additions & 0 deletions groupBy.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,37 @@
    // Grows with the number of distinct `K`
    def partitions[F[_], A, K](selector: A => F[K])(implicit F: Effect[F], ec: ExecutionContext) : Pipe[F, A, (K, Stream[F, A])] = in =>
    Stream.eval(async.refOf[F, Map[K, Queue[F, Option[A]]]](Map.empty)).flatMap { st =>
    val cleanup = {
    import alleycats.std.all._
    st.get.flatMap(_.traverse_(_.enqueue1(None)))
    }

    (in ++ Stream.eval_(cleanup)).evalMap { el =>
    (selector(el), st.get).mapN { (key, queues) =>
    queues.get(key).fold(
    for {
    newQ <- Queue.unbounded[F, Option[A]]
    _ <- st.modify(_ + (key -> newQ))
    _ <- newQ.enqueue1(el.some)
    } yield (key -> newQ.dequeue.unNoneTerminate).some
    )(_.enqueue1(el.some) as None)
    }.flatten
    }.unNone.onFinalize(cleanup)
    }

    import ExecutionContext.Implicits.global

    def selector(i: Int): IO[Int] =
    IO.pure(i % 3)

    def flakiness[A]: Pipe[IO, A, A] = in => {
    def wait = IO(scala.util.Random.nextInt(500)).flatMap(d => Timer[IO].sleep(d.millis))

    Stream.repeatEval(wait).zip(in).map(_._2)
    }

    def example =
    Stream.range(1, 100).covary[IO].through(partitions(selector)).map {
    case (k, st) => st.tupleLeft(k).through(flakiness).through(Sink.showLinesStdOut)
    }.joinUnbounded
    .compile.drain.unsafeRunSync