Skip to content

Instantly share code, notes, and snippets.

@ane
Created January 27, 2019 16:07
Show Gist options
  • Select an option

  • Save ane/3118d94e610b58a4e47908fb5f66defd to your computer and use it in GitHub Desktop.

Select an option

Save ane/3118d94e610b58a4e47908fb5f66defd to your computer and use it in GitHub Desktop.

Revisions

  1. ane created this gist Jan 27, 2019.
    52 changes: 52 additions & 0 deletions StreamInterruption.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,52 @@
    import scala.util.Random
    import scala.concurrent.duration._

    import cats.effect.{ExitCode, IO, IOApp}
    import cats.implicits._
    import fs2.Stream
    import scala.concurrent.ExecutionContext.Implicits.global
    import java.time.{Clock, ZoneOffset}

    import fs2.concurrent.SignallingRef

    object InterruptipleQueue extends IOApp {
    override def run(args: List[String]): IO[ExitCode] = {
    val clock = Clock.systemUTC()

    def interrupter(signallingRef: SignallingRef[IO, Boolean]): Stream[IO, Unit] = {
    for {
    _ <- Stream.fixedDelay(1.seconds) // check every 1 second
    _ <- Stream.eval {
    val t = clock.instant().atOffset(ZoneOffset.UTC)
    val on = t.getSecond <= 30 // the on/off logic (e.g. a light switch)
    signallingRef.access flatMap {
    case (current, set) =>
    // need update?
    if (on != current) {
    if (on) println(s"Seconds of $t <= 30: turning OFF.")
    else println(s"Seconds of $t > 30: turning ON.")
    set(on)
    } else IO.unit
    }
    }
    } yield ()
    }

    def printer(signallingRef: SignallingRef[IO, Boolean]): Stream[IO, Int] = {
    Stream
    .iterateEval(0)(i => IO(i + 1))
    .metered(100 millis)
    .pauseWhen(signallingRef)
    }

    (for {
    signal <- Stream.eval(SignallingRef[IO, Boolean](false))
    ints <- printer(signal).concurrently(interrupter(signal))
    } yield ints)
    .evalMap(i => IO(println(s"Got $i")))
    .compile
    .drain
    .as(ExitCode.Success)
    }

    }