exupero's blog
RSSApps

Workaday transducers

In the previous post I showed one of the custom transducers I've used most, split-by. Here are a few more I've relied on, the simplest being tap, inspired by Ruby's method of the same name (and not to be confused with Clojure's tap> or core.async's tap):

(defn tap [f]
  (fn [rf]
    (fn
      ([] (rf))
      ([result] (rf result))
      ([result item]
       (f item)
       (rf result item)))))

tap allows you to inspect values as they pass through the reducing function. You can print them, write them to a file, store them in an atom to deref in the REPL, or any other side-effect. Of course, depending on the context it may be easy enough just to comment out the rest of the transducers in the pipeline and evaluate the form, but in cases where the invocation is harder to access, tap can be useful.

dedupe-by complements Clojure's built-in dedupe:

(defn dedupe-by [f]
  (let [sentinel ::none
        prev (volatile! sentinel)]
    (fn [rf]
      (fn
        ([] [])
        ([result] (rf result))
        ([result item]
         (let [p @prev
               v (f item)]
           (vreset! prev v)
           (if (identical? p sentinel)
             (rf result item)
             (if (= p v)
               result
               (rf result item)))))))))
(sequence
  (dedupe-by :y)
  [{:x 1 :y 2}
   {:x 2 :y 2}
   {:x 3 :y 4}
   {:x 4 :y 2}
   {:x 5 :y 2}])
({:x 1, :y 2} {:x 3, :y 4} {:x 4, :y 2})

Like dedupe, it only removes consecutive duplicates, but instead of checking items by equality, it checks based on a keying function. Similarly, I've complemented distinct with distinct-by.

derivative is so named because I originally wrote it to find the differences between consecutive values, but it can also be generalized to functions besides subtraction:

(defn derivative [f]
  (fn [rf]
    (let [previous (volatile! nil)]
      (fn
        ([] (rf))
        ([result] (rf result))
        ([result item]
         (let [prev @previous]
           (vreset! previous item)
           (if (nil? prev)
             result
             (rf result (f prev item)))))))))
(sequence
  (derivative #(- %2 %1))
  [1 4 3 7 9 6 12])
(3 -1 4 2 -3 6)

In fact, it can be generalized further, into a transducer that hands a sliding window of N values to the given function:

(defn roll [n xs x]
  (if (< (count xs) n)
    (conj xs x)
    (conj (subvec xs 1) x)))
(defn sliding-window [n f]
  (fn [rf]
    (let [window (volatile! [])]
      (fn
        ([] (rf))
        ([result] (rf result))
        ([result item]
         (vswap! window #(roll n %1 item))
         (if (< (count @window) n)
           result
           (rf result (f @window))))))))
(sequence
  (sliding-window 2 (fn [[a b]] (- b a)))
  [1 4 3 7 9 6 12])
(3 -1 4 2 -3 6)

As such, these transducers have been replaced in my repertoire by the xforms library's partition transducer:

(require '[net.cgrand.xforms :as xf])
(sequence
  (comp
    (xf/partition 2 1)
    (map (fn [[a b]] (- b a))))
  [1 4 3 7 9 6 12])
(3 -1 4 2 -3 6)

One final, general-purpose transducer accomplishes the same thing as map but executes the mapping function over chunks of values in parallel:

(defn parallel [n f]
  (comp
    (partition-all n)
    (fn [rf]
      (fn
        ([] (rf))
        ([result] (rf result))
        ([result input]
         (rf result (pmap f input)))))
    cat))

This is helpful when the mapping function does some slow I/O.

parallel illustrates that transducers don't have to be written from scratch but can compose with other transducers. Above, partition-all chunks the incoming sequence before passing chunks to the custom transducer, and afterward cat concatenates each chunk back into a flat sequence.

In the next post I'll share some less common transducers I've written.