import scala.util.Random import scala.concurrent.duration._ import cats.effect.{ExitCode, IO, IOApp} import cats.implicits._ import fs2.Stream import scala.concurrent.ExecutionContext.Implicits.global import java.time.{Clock, ZoneOffset} import fs2.concurrent.SignallingRef object InterruptipleQueue extends IOApp { override def run(args: List[String]): IO[ExitCode] = { val clock = Clock.systemUTC() def interrupter(signallingRef: SignallingRef[IO, Boolean]): Stream[IO, Unit] = { for { _ <- Stream.fixedDelay(1.seconds) // check every 1 second _ <- Stream.eval { val t = clock.instant().atOffset(ZoneOffset.UTC) val on = t.getSecond <= 30 // the on/off logic (e.g. a light switch) signallingRef.access flatMap { case (current, set) => // need update? if (on != current) { if (on) println(s"Seconds of $t <= 30: turning OFF.") else println(s"Seconds of $t > 30: turning ON.") set(on) } else IO.unit } } } yield () } def printer(signallingRef: SignallingRef[IO, Boolean]): Stream[IO, Int] = { Stream .iterateEval(0)(i => IO(i + 1)) .metered(100 millis) .pauseWhen(signallingRef) } (for { signal <- Stream.eval(SignallingRef[IO, Boolean](false)) ints <- printer(signal).concurrently(interrupter(signal)) } yield ints) .evalMap(i => IO(println(s"Got $i"))) .compile .drain .as(ExitCode.Success) } }