Skip to content

Instantly share code, notes, and snippets.

@creyer
Created December 10, 2014 17:04
Show Gist options
  • Save creyer/74f77af01dce07daac34 to your computer and use it in GitHub Desktop.
Save creyer/74f77af01dce07daac34 to your computer and use it in GitHub Desktop.
custom cache with Monifu
import monifu.concurrent.atomic.AtomicInt
import monifu.reactive.Observable
import monifu.reactive.channels.PublishChannel
import monifu.reactive.subjects.PublishSubject
import monifu.reactive._
import scala.concurrent.{Future}
import concurrent.duration._
import monifu.concurrent.Implicits._
import monifu.reactive.Ack.Continue
object Cache {
val cache = PublishSubject[(String, IndexedSeq[Int])]()
def apply(refresh: Observable[String]) = {
val obs = getObs(cache)
refresh.subscribe(obs)
(cache,obs)
}
private def getObs(cache: PublishSubject[(String, IndexedSeq[Int])]) = new Observer[String] {
var cacheP = DBEmulator.read()
override def onNext(elem: String): Future[Ack] = {
elem match {
case "refresh" =>
cacheP = DBEmulator.read()
cacheP flatMap (v => Continue)
case msg: String =>
cacheP flatMap (v => {
cache.onNext((msg,v))
Continue
})
}
}
override def onError(ex: Throwable): Unit = cache.onError(ex)
override def onComplete(): Unit = cache.onComplete()
}
}
object DBEmulator {
val aa = AtomicInt(2)//start with 2 elements
// simulate some id's of real things
val ids = Map (1 -> 1, 2 -> 3, 3 -> 7, 4 -> 11, 5 -> 23, 6 -> 71, 7 -> 120)
// at each read we will give one more element from the ids
def read() = Future {
val total = if (aa.incrementAndGet() > ids.size) ids.size else aa.get
println("total elements: "+total)
for (i <- 1 to total)
yield ids.get(i).get
}
}
object Main extends App {
val refreshCache = Observable.interval(15.second).drop(1).map(_=>"refresh")
val extractor = Observable.interval(10.second).map(_=>"extract")
val (cache, obs) = Cache(refreshCache)
// connect the observer to the source:
extractor.subscribe(obs)
// filter only for the events we are interested in
cache.filter(e => e._1 == "extract").doWork(a => println(s"${System.currentTimeMillis()/1000} I prove works: $a")).subscribe()
// create a seccond extractor
Observable.interval(1.second).map(_=>"extract2").subscribe(obs)
// filter again for the events
cache
.filter(e => e._1 == "extract2")
.doWork(a => println(s"${System.currentTimeMillis()/1000} II prove works: $a")).subscribe()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment