Created
December 10, 2014 17:04
-
-
Save creyer/74f77af01dce07daac34 to your computer and use it in GitHub Desktop.
custom cache with Monifu
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import monifu.concurrent.atomic.AtomicInt | |
| import monifu.reactive.Observable | |
| import monifu.reactive.channels.PublishChannel | |
| import monifu.reactive.subjects.PublishSubject | |
| import monifu.reactive._ | |
| import scala.concurrent.{Future} | |
| import concurrent.duration._ | |
| import monifu.concurrent.Implicits._ | |
| import monifu.reactive.Ack.Continue | |
| object Cache { | |
| val cache = PublishSubject[(String, IndexedSeq[Int])]() | |
| def apply(refresh: Observable[String]) = { | |
| val obs = getObs(cache) | |
| refresh.subscribe(obs) | |
| (cache,obs) | |
| } | |
| private def getObs(cache: PublishSubject[(String, IndexedSeq[Int])]) = new Observer[String] { | |
| var cacheP = DBEmulator.read() | |
| override def onNext(elem: String): Future[Ack] = { | |
| elem match { | |
| case "refresh" => | |
| cacheP = DBEmulator.read() | |
| cacheP flatMap (v => Continue) | |
| case msg: String => | |
| cacheP flatMap (v => { | |
| cache.onNext((msg,v)) | |
| Continue | |
| }) | |
| } | |
| } | |
| override def onError(ex: Throwable): Unit = cache.onError(ex) | |
| override def onComplete(): Unit = cache.onComplete() | |
| } | |
| } | |
| object DBEmulator { | |
| val aa = AtomicInt(2)//start with 2 elements | |
| // simulate some id's of real things | |
| val ids = Map (1 -> 1, 2 -> 3, 3 -> 7, 4 -> 11, 5 -> 23, 6 -> 71, 7 -> 120) | |
| // at each read we will give one more element from the ids | |
| def read() = Future { | |
| val total = if (aa.incrementAndGet() > ids.size) ids.size else aa.get | |
| println("total elements: "+total) | |
| for (i <- 1 to total) | |
| yield ids.get(i).get | |
| } | |
| } | |
| object Main extends App { | |
| val refreshCache = Observable.interval(15.second).drop(1).map(_=>"refresh") | |
| val extractor = Observable.interval(10.second).map(_=>"extract") | |
| val (cache, obs) = Cache(refreshCache) | |
| // connect the observer to the source: | |
| extractor.subscribe(obs) | |
| // filter only for the events we are interested in | |
| cache.filter(e => e._1 == "extract").doWork(a => println(s"${System.currentTimeMillis()/1000} I prove works: $a")).subscribe() | |
| // create a seccond extractor | |
| Observable.interval(1.second).map(_=>"extract2").subscribe(obs) | |
| // filter again for the events | |
| cache | |
| .filter(e => e._1 == "extract2") | |
| .doWork(a => println(s"${System.currentTimeMillis()/1000} II prove works: $a")).subscribe() | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment