Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

by-keys and assoc-in? #11

Closed
julienfantin opened this issue Feb 14, 2017 · 22 comments
Closed

by-keys and assoc-in? #11

julienfantin opened this issue Feb 14, 2017 · 22 comments

Comments

@julienfantin
Copy link

julienfantin commented Feb 14, 2017

Hi @cgrand,

Sorry this is more of a question than an issue.

I was wondering if it was possible to achieve something very close to what xforms/by-key already does but instead using a kfn that returns a vector and use those keys to assoc-in the accumulator?

I.e. something like this

(xforms/into {} (xforms/by-keys (juxt :a :b)) [{:a 1 :b 1}
                                               {:a 1 :b 2}
                                               {:a 2 :b 1}
                                               {:a 2 :b 2}])

;; =>
;; {1 {1 {:a 1 :b 1}
;;     2 {:a 1 :b 2}}
;;  2 {1 {:a 2 :b 1}
;;     2 {:a 2 :b 2}}}

;; I.e {<val of :a> {<val of :b> x}
@cgrand
Copy link
Owner

cgrand commented Feb 14, 2017

=> (xforms/into {}
     (xforms/by-key :a
       (comp
         (xforms/by-key :b identity)
         (xforms/into {})))
     [{:a 1 :b 1} {:a 1 :b 2} {:a 2 :b 1} {:a 2 :b 2}])
{1 {1 {:a 1, :b 1}, 2 {:a 1, :b 2}},
 2 {1 {:a 2, :b 1}, 2 {:a 2, :b 2}}}

The contract for your propsed by-keys is unclear: what are the items emitted by this transducer? [first-k nested-map]?

Did you consider something like:

(transduce
  (xforms/by-key (juxt :a :b) identity) ; or a (map (juxt (juxt :a :b) identity))
  (completing (fn [m [ks v]] (assoc-in m ks v))) {}
  [{:a 1 :b 1} {:a 1 :b 2} {:a 2 :b 1} {:a 2 :b 2}])

More generally: what's your broader usecase? A rollup?

Edit: closing because not an issue but we can keep discussing this problem.

@cgrand cgrand closed this as completed Feb 14, 2017
@julienfantin
Copy link
Author

julienfantin commented Feb 14, 2017

The original example assumed the transducer would emit a pair of [ks value], which is why it seemed so similar to x/by-key.

The broader use-case is indeed a rollup where a kfn is expected to return a seq of ks to assoc-in the accumulator. The part that I elided was that we'd want to apply an xform to the inner results as well, similarly to how a sql query would perform an aggregation over rows grouped by multiple fields I guess.

In any case it seems easily achieved in your first solution by replacing identity with an instance of xforms/reduce.

Only curious whether you think there is an even broader use-case here that could be fleshed out?

@cgrand
Copy link
Owner

cgrand commented Feb 14, 2017 via email

@julienfantin
Copy link
Author

julienfantin commented Feb 14, 2017

This is brilliant!

Another goal we had was to use the transducer incrementally. That is after processing an initial collection, we could update the previous result with a new value.

I was unable to figure out what rf to use when invoking the rollup transducer in your example though, I was expecting to be able to do something like this:

(let [xf      (rollup [:continent :country] :population)
      data    [{:continent "Europe" :country "France" :population 66}
               {:continent "Europe" :country "Germany" :population 80}
               {:continent "Europe" :country "Belarus" :population 9}
               {:continent "North-America" :country "USA" :population 319}
               {:continent "North-America" :country "Canada" :population 35}]
      acc     (into {} xf data)
      new-val {:continent "North-America" :country "Mexico" :population 100}
      rf      (xf ???)]
  (rf acc new-val))

;; Would return:
;; {:detail
;;  {"Europe"        {:detail {"France" 66, "Germany" 80, "Belarus" 9}, :total 155},
;;   "North-America" {:detail {"USA" 319, "Canada" 35 "Mexico" 100}, :total 454}},
;;  :total 509}

@cgrand
Copy link
Owner

cgrand commented Feb 15, 2017

Good old merge-with

=> (let [xf      (rollup [:continent :country] :population)
         data    [{:continent "Europe" :country "France" :population 66}
                  {:continent "Europe" :country "Germany" :population 80}
                  {:continent "Europe" :country "Belarus" :population 9}
                  {:continent "North-America" :country "USA" :population 319}
                  {:continent "North-America" :country "Canada" :population 35}]
         acc     (into {} xf data)
         new-vals [{:continent "North-America" :country "Mexico" :population 100}]]
     (transduce xf (fn mrg
                     ([x] x)
                     ([x x']
                       (if (:detail x) ; leaf detection could be better
                         {:detail (merge-with mrg (:detail x) (:detail x'))
                          :total (+ (:total x) (:total x'))}
                         (+ x x')))) acc new-vals))
{:detail
 {"Europe"
  {:detail
   {"France" 66,
    "Germany" 80,
    "Belarus" 9},
   :total 155},
  "North-America"
  {:detail
   {"USA" 319,
    "Canada" 35,
    "Mexico" 100},
   :total 454}},
 :total 609}

By the way, be very careful when doing rf (xf ???) because it's easy to forget that rf may be stateful, produce reduced values, has a completion arity and should only be used for one processing. It means that your pseudo code should have been (rf (unreduced (rf acc new-val))) to be safe.

@julienfantin
Copy link
Author

@cgrand thanks for mentioning unreduced I would have missed that, and indeed it doesn't work without it.

Interestingly your last example reminds me of how reduce functions need to be written for Spark, in how you need to provide an extra merge function to combine results from different partitions.

For RDDs this extra step seemed inevitable, but it's been bothering me since I started looking at this incremental approach to rollups with transducers.

It just seems that we get those merges for free when we process the initial data with transduce, and that it should be possible to get that behavior for the new-vals without manually re-defining parts of what the transducing-fn already does?

Sorry I'm rambling, but I feel like I'm missing something here and was wondering if you had any thoughts on how to improve that situation?

@cgrand
Copy link
Owner

cgrand commented Feb 20, 2017

@julienfantin it's not quite like the merge of RDDs. The model used for RDDs (or clojure.core.reducers/fold) assumes parallel computation (and is also very amenable to incremental updates of a result when the input can be updated in any place).

Transducers are really tied to linear traversal so the only incrementalism you could get for free is "append-only". Except that it may be difficult because of what I consider a bug in the way transducers are initialized – just a gut feeling I haven't thought enough about incremental transducers. Yet.

@julienfantin
Copy link
Author

What I was trying to point out is that even "append-only" incrementalisms can be non-trivial when the result is not a simple seq.

Take your incremental rollup example: the mrg fn has to match the recursive structure defined internally by rollup.

My observation, and frustration, is that we have to re-define something that the instantiated transducing-fn does during its lifecycle, i.e. "append" new values to the rollup.

To be more concrete, do you think there is a way that you could have written the new-vals use-case without having to define a custom mrg at all?

@cgrand
Copy link
Owner

cgrand commented Feb 23, 2017

Right now I don't see a way.

If transducers were pure, then all state would be in the accumulator and you would get what you want for free. (i.e. call (rf acc) to get a snapshot, if rf and acc are immutable there's no harm).

Would it be possible to make some transducers to opt-in for such behavior? Using something akin to puncutuated streams? I don't think so. Well, I could certainly make something xforms-only but this would break with any non-xform-aware transducer. (To be constrasted with kv transducers which are xforms-only but only an optimization and can interop with regular transducers.)

Our discussion makes me realize that I should rename "1-item transducers" to "aggregate transducers" (similar to SQL aggregate functions). Maybe starting from there I could build something not too confusing...

@julienfantin
Copy link
Author

I think I've done a poor job of getting my point across. Please bear with me as I try and explain it from a high level.

The initial use-case that prompted me to look into xforms is very similar to your rollup example, so thanks again for linking to that.

In our case however, the rollup is a process with a lifecycle. We receive thousands of events in an initial payload and much smaller updates throughout the application lifecycle.

Our main goals are to:

  • Make the aggregation linear-time w.r.t. to the size of the new data, as opposed to the full data.
  • Reuse the accumulation result across invocations of the aggregation.
  • Use the same logic to process both the initial batch as well as the updates.

The tradeoff we're willing to deal with is impurity, or statefulness of the transducing fn. Given our goals, it seems only natural that we'd have to trade time for space. We also do not have unbounded growth situations such as the use of distinct. To reiterate we're fine keeping an rf instance around.

Now for what I don't understand...

In your rollup example, what's most different from common uses of transducers I've seen -- and also what we're really after -- is the deeply-nested associative behavior. Our goals above seem like they could be achieved if we got that associative behavior when using the 2-arity only. However we only get what we expect from "finalizing" the result with the 1-arity.

From a high-level again, as I understand it we have two stateful aspects going on here:

  1. one that relates directly to the problem at hand: the xf closing over some state necessary to compute the aggregation.
  2. one that relates to the xf lifecycle: the use of multiple arities and finalization

It seems our problem is that we cannot get the transducer to exercise its associative behavior required for the aggregation (1.) without finalizing it (2.), thus negating our initial goals of reuse?

Now if the previous statement is correct, the next thing I don't understand is whether this is required by the nature of transducers in general or due to the particular implementations of x/group-by, x/into etc?

@cgrand
Copy link
Owner

cgrand commented Feb 25, 2017

You can also use a roadroller approach: borrow some code from Powderkeg which would allow you to "fork" any mutable stuff. So you would be able to clone a stateful rf, complete it to get a snapshot and keep aggregating with the original one.

@julienfantin
Copy link
Author

Unfortunately we need our solution to be portable to cljs!

I've been digging into some of the xforms internals trying to understand the implementation and it seems to me that the behavior that's been causing problems with the online usage is due to x/by-key rather than x/into?

As far as I understand the implementation of x/by-key, the state of the rollup is kept in a volatile when using the 2 and 3-arity while the acc stays untouched, and the rollup only comes together in the 1-arity?

The only problem I see with the whole online approach is that it'll preclude using a transient for the acc so it doesn't seem like this behavior should be the default, but it seems possible to implement a version of x/by-key that'd work as we intend.

Am I off the mark here?

@cgrand
Copy link
Owner

cgrand commented Feb 26, 2017 via email

@julienfantin
Copy link
Author

Thanks for the slides, wish I could have attended!

@cgrand
Copy link
Owner

cgrand commented Feb 27, 2017

So, some quirks about transducers: you can't touch the acc (short of the reduced-related fns) because:

  1. the 0-arg may not be called
  2. even if you believe you can be clever and wrap the first time through a 2+-arg arity, it's not going to work because sequence always pass nil as the accumulator, no matter what you returned previously.

A little thing that may interest you:

=> (sequence (x/by-key odd? (x/reduce +)) (range 10))
([false 20] [true 25])
=> (sequence (x/by-key odd? (x/reductions +)) (range 10))
([false 0]
 [false 0]
 [true 0]
 [true 1]
 [false 2]
 [true 4]
 [false 6]
 [true 9]
 [false 12]
 [true 16]
 [false 20]
 [true 25])
=> (reductions conj {} (sequence (x/by-key odd? (x/reductions +)) (range 10)))
({}
 {false 0}
 {false 0}
 {false 0, true 0}
 {false 0, true 1}
 {false 2, true 1}
 {false 2, true 4}
 {false 6, true 4}
 {false 6, true 9}
 {false 12, true 9}
 {false 12, true 16}
 {false 20, true 16}
 {false 20, true 25})

@cgrand
Copy link
Owner

cgrand commented Feb 27, 2017

To me the key is to provide running versions of all aggregating xforms (those which always return only one item)

@julienfantin
Copy link
Author

Indeed x/reductions had me thinking we could get what we want from xforms one way or another. I think I'm starting to get a clue what you're referring to.

Please correct me if I'm wrong but the key then would be to make sure we call the 2-arity rf in the 2-arity of the transducer? This is probably not quite correct but I got a basic example working with this:

(defn x-reduce
  ([f]
   (fn [rf]
     (let [vacc (volatile! (f))]
       (let [f (x/ensure-kvrf f)]
         (x/kvrf
          ([] (rf))
          ([acc] (rf acc))
          ([acc x] (rf acc (vswap! vacc f x)) )
          ([acc k v] (rf acc (vswap! vacc f k v) k)))))))
  ([f init]
   (reduce (fn ([] init) ([acc] (f acc)) ([acc x] (f acc x))))))

(into [] (x-reduce +) (range 10))
;; => [0 1 3 6 10 15 21 28 36 45]

More generally do you think x/reductions can be used to get that behavior without needing to reimplement the aggregation transducers, or is a different implementation warranted?

@cgrand
Copy link
Owner

cgrand commented Feb 28, 2017

Under the hood, most aggregation transducers are just (x/reduce some-rf). For example x/avg is (x/reduce rf/avg) and for such an aggregation you can't complete and reinit the rf for each item because completing an avg loses information.

I think the best way to approach that is to give a rf the opportunity to yield a snapshot.

I can see several designs:

(defprotocol Snap1
  (snap1 [rf acc] "returns [acc' snapshot-value]")
; pros: clean, functional looking ; cons: pair alloc

(defprotocol Snap2
  (snap2 [rf vacc] "returns snapshot-value, and vacc has been updated to contain the new accumulator")
; pros: easy, even handy when acc is already in a volatile ; cons: mutation

(defprotocol Snap3
  (pause [rf acc] "returns paused-acc")
  (snap [rf paused-acc] "returns snapshot-value")
  (resume [rf paused-acc] "returns acc'")
; pro : clean & functional looking ; cons : verbose

@cgrand
Copy link
Owner

cgrand commented Feb 28, 2017

It's interesting that your original question leads to a problematic I had on the back of my mind: there's avg and sd and I'd like running variants of them.

@julienfantin
Copy link
Author

That sounds interesting but I'm not sure I'm seeing how this would work for reusing the acc accross multiple transducing contexts?

Would you need new transducing contexts that'd call snap in the finalizer arity and return [acc snapshot] allowing to pass acc to the next context invocation and use snapshot as a normal value?

@cgrand
Copy link
Owner

cgrand commented Feb 28, 2017

The not quite formed idea is to leverage composability. So that rollup would become "runningable" because its constituents are.

@cgrand
Copy link
Owner

cgrand commented Mar 3, 2017

I opened #12

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants