;; Here is a spike of a lightweight in-process pubsub mechanism that allows pure ;; functional consumers, both blocking and asynchronous. ;; This defines the event stream, in this case just a series of numbers, ;; a new one produced each second (defn timer [] (lazy-seq (do (Thread/sleep 1000) (cons (System/nanoTime) (timer))))) ;; We can see some events (this takes a couple seconds to complete): (take 3 (timer)) ;=> (3383024932037 3384025272769 3385025571742) ;; We can have two consumers run concurrently, but they're actually ;; consuming two different streams, each having started their own, ;; so this isn't quite what we want: [@(future (take 3 (timer))) @(future (take 3 (timer)))] ;=> [(3445765729693 3446765925828 3447766208228) ; (3448766569987 3449766774030 3450767042063)] ;; So we want to set up a single producer that can be observed in a stable way: (def most-recent (atom (timer))) (defn advance-most-recent [] (swap! most-recent rest) (recur)) ;; There, now this kicks off a single producer: (future (advance-most-recent)) ;; Now we can have two consumers look at the same stream of events. Note the ;; event numbers are identical for each consumer: [@(future (take 3 @most-recent)) @(future (take 3 @most-recent))] ;=> [(3735764451405 3736764670106 3737764892260) (3735764451405 3736764670106 3737764892260)] ;; If one consumer starts late, it may miss some events, but will still see ;; a consistent and sequential view: [@(future (take 3 @most-recent)) (do (Thread/sleep 1500) @(future (take 3 @most-recent)))] ;=> [(3756768628228 3757768831178 3758769051789) ; (3757768831178 3758769051789 3759769275472)] ;; Consumers could choose to use a watcher instead, for a callback or async-style ;; mechanism. This starts printing a new number every second: (add-watch most-recent :async (fn callback [_ _ _ stream] (prn (first stream))) ;;========================== ;; If the producer wants to asynchronously drop items into the event stream, ;; we have to set it up a bit differently. First, a queue that the producer ;; can put events in: (def producer-queue (java.util.concurrent.LinkedBlockingQueue.)) ;; Then the atom for consumers to follow: (def most-recent (atom ((fn more [] (lazy-seq (cons (.take producer-queue) (more))))))) ;; The thread for advancing most-recent is just as above: (defn advance-most-recent [] (swap! most-recent rest) (recur)) (future (advance-most-recent)) ;; Now the producer can be written like this: (future (loop [i 0] (Thread/sleep 1000) (.put producer-queue i) ;; Publish the next event (recur (inc i)))) ;; The consumers also use exactly the same API as earlier, both async watchers ;; and blocking will work: [@(future (take 3 @most-recent)) @(future (take 3 @most-recent))] ;=> [(45 46 47) (45 46 47)] ;; thanks to zenoli in #clojure irc for inspiration.