(ns mutabots "Reimplementation of transducers, in terms of processing functions instead of reducing functions. tl;dr: reducing-fn based transducers are a special case, influenced by reducers, of processing-fn based transducers. In Clojure 1.7.0-alpha2, transducers are expressed in terms of the existing concept of reducing functions. To sum it up, a transducer has currently the signature : reducing-fn -> reducing-fn The following implementation proposes that transducers get the more general signature of: processing-fn -> processing-fn With a processing fn being a generalization of a process consuming inputs, and cleaning/flushing things at the end (when it receives the signal that there is no more input). While the signature of a reducing-fn is .... (fn ([]) ; <- used to pass init value down the transducers chain. ; Arity used by the transduce/into functions only ([acc]) ; <- used when the upstream has no more value. ; Arity used by all functions ; the acc(umulator) value must be passed unchanged down ; the transducers chain for consumption by the final ; reducing-fn (the real and only reducing-fn collecting ; the results). ; the acc argument is only used when a collector reducing-fn ; is required (e.g. only with transduce and into) ([acc input])) ; <- used to process a new input value from upstream. ; as for the 1-arity, the acc(umulator) value must be passed ; unchanged down the transducers chain ... ; Must return a reduced value if it won't accept any additional ; input. .... the signature of a processing-fn is : (fn ([]) ; <- used when the upstream has no more value. ([input]) ; <- used to process a new input value from upstream. ; Must return true if it won't accept any additional input, ; false otherwise. Comparing transducer implementations. Identity transducer: (defn rf-identity [] (fn [rf] (fn [] (rf)) (fn [acc] (rf acc)) (fn [acc input] (rf acc input)))) (defn pf-identity [] (fn [p] (fn [] (p)) (fn [input] (p input)))) note: simpler arity. It is yet to be proven that the 0-arity form will ever return something different than (rf), and that 1-arity and 2-arity can modify acc for a valuable reason. Partition-all transducer: ;; with current transducers (defn partition-all [^long n] (fn [rf] (let [a (java.util.ArrayList. n)] (fn ([] (rf)) ([result] (let [result (if (.isEmpty a) result (let [v (vec (.toArray a))] ;;clear first! (.clear a) (unreduced (rf result v))))] (rf result))) ([result input] (.add a input) (if (= n (.size a)) (let [v (vec (.toArray a))] (.clear a) (rf result v)) result)))))) ;; with processing-based transducers (defn partition-all [^long n] (fn [p] (let [a (java.util.ArrayList. n) flush! (fn [] (let [v (vec (.toArray a))] (.clear a) (p v)))] (fn ([] (when-not (.isEmpty a) (flush!)) (p)) ([x] (.add a x) (when (= n (.size a)) (flush!))))))) note: notice the call to #'unreduced in the rf-based version? Subtle bugs waiting for you. alsos notice that the pf-based does not need to wrap / unwrap values with #'reduced => just use truethy/falsy values instead." (:refer-clojure :exclude [map filter remove take take-while take-nth drop drop-while replace partition-by partition-all transduce sequence keep keep-indexed cycle dedupe cat mapcat])) (defn map [f] (fn [p] (fn ([] (p)) ([x] (p (f x)))))) (defn filter [pred] (fn [p1] (fn ([] (p1)) ([x] (and (pred x) (p1 x)))))) (defn remove [pred] (filter (complement pred))) (defn take [n] (fn [p1] (let [vn (volatile! (dec n))] (fn ([] (p1)) ([x] (or (neg? @vn) (p1 x) (neg? (vswap! vn dec)))))))) (defn take-while [pred] (fn [p1] (fn ([] (p1)) ([x] (if (pred x) (p1 x) true))))) (defn take-nth [n] (fn [p1] (let [vn (volatile! n)] (fn ([] (p1)) ([x] (if (== @vn n) (do (vreset! vn 1) (p1 x)) (do (vswap! vn inc) false))))))) (defn drop [n] (fn [p1] (let [vn (volatile! n)] (fn ([] (p1)) ([x] (if (pos? @vn) (do (vswap! vn dec) false) (p1 x))))))) (defn drop-while [pred] (fn [p1] (let [vtake? (volatile! false) start-take? (complement pred)] (fn ([] (p1)) ([x] (cond @vtake? (p1 x) (start-take? x) (do (vreset! vtake? true) (p1 x)))))))) (defn replace [smap] (map #(if-let [e (find smap %)] (val e) %))) (defn keep [f] (fn [p1] (fn ([] (p1)) ([x] (let [v (f x)] (when-not (nil? v) (p1 x))))))) (defn keep-indexed [f] (fn [p1] (let [vi (volatile! -1)] (fn ([] (p1)) ([x] (let [i (vswap! vi inc) v (f i x)] (when-not (nil? v) (p1 v)))))))) (defn cycle [] (fn [p1] (let [xs (java.util.ArrayList.)] (fn ([] (let [max (dec (.size xs))] (loop [i 0] (when-not (p1 (.get xs i)) (recur (if (< i max) (inc i) 0))))) (p1)) ([x] (.add xs x) (p1 x)))))) (defn partition-by [f] (fn [p] (let [a (java.util.ArrayList.) pv (volatile! ::none)] (fn ([] (when-not (.isEmpty a) (let [v (vec (.toArray a))] ;;clear first! (.clear a) (p v))) (p)) ([input] (let [pval @pv val (f input)] (vreset! pv val) (if (or (identical? pval ::none) (= val pval)) (do (.add a input) false) ; .add returns true (let [v (vec (.toArray a))] (.clear a) (or (p v) (do (.add a input) false)))))))))) (defn partition-all [^long n] (fn [p1] (let [a (java.util.ArrayList. n) flush! (fn [] (let [v (vec (.toArray a))] (.clear a) (p1 v)))] (fn ([] (when-not (.isEmpty a) (flush!)) (p1)) ([x] (.add a x) (when (= n (.size a)) (flush!))))))) (defn dedupe [] (fn [p1] (let [vprev (volatile! (Object.))] (fn ([] (p1)) ([x] (when (not= @vprev x) (vreset! vprev x) (p1 x))))))) (defn cat [] (fn [p1] (let [rf (fn [_ x] (when (p1 x) (reduced true)))] (fn ([] (p1)) ([c] (reduce rf false c)))))) (defn mapcat [f] (comp (map f) (cat))) (defn transduce [xform f init coll] (let [vacc (volatile! init) p (fn ([] (vswap! vacc f)) ([x] (reduced? (vreset! vacc (f @vacc x))))) p (xform p)] (reduce (fn [_ x] (when (p x) (reduced nil))) ::unused coll) (p) @vacc)) ;; bleh :-( (defn- promised-seq-proc! [pstep! p] (let [vp (volatile! p)] (fn ([] (deliver @vp nil)) ([x] (deliver @vp (cons x (let [p (vreset! vp (promise))] (lazy-seq (@pstep! p) @p)))) false)))) (defn sequence [xform coll] (let [vcoll (volatile! coll) p (promise) promised-seq (lazy-seq p) pstep! (promise) proc! (xform (promised-seq-proc! pstep! p)) step! (fn [p] (loop [coll @vcoll] (if (realized? p) (vreset! vcoll coll) (if-let [[x :as s] (seq coll)] (recur (if (proc! x) nil (rest s))) (proc!)))))] (deliver pstep! step!) ; tying the knot (lazy-seq (step! p) @p)))