tech.v3.dataset.reductions

Specific high performance reductions intended to be performend over a sequence of datasets. This allows aggregations to be done in situations where the dataset is larger than what will fit in memory on a normal machine. Due to this fact, summation is implemented using Kahan algorithm and various statistical methods are done in using statistical estimation techniques and thus are prefixed with prob- which is short for probabilistic.

  • aggregate - Perform a multi-dataset aggregation. Returns a dataset with row.
  • group-by-column-agg - Perform a multi-dataset group-by followed by an aggregation. Returns a dataset with one row per key.

Examples:

user> (require '[tech.v3.dataset :as ds])
nil
user> (require '[tech.v3.datatype.datetime :as dtype-dt])
nil
user> (def stocks (-> (ds/->dataset "test/data/stocks.csv" {:key-fn keyword})
                      (ds/update-column :date #(dtype-dt/datetime->epoch :epoch-days %))))
#'user/stocks
user> (require '[tech.v3.dataset.reductions :as ds-reduce])
nil
user> (ds-reduce/group-by-column-agg
       :symbol
       {:symbol (ds-reduce/first-value :symbol)
        :price-avg (ds-reduce/mean :price)
        :price-sum (ds-reduce/sum :price)
        :price-med (ds-reduce/prob-median :price)}
       (repeat 3 stocks))
:symbol-aggregation [5 4]:

| :symbol |   :price-avg | :price-sum |   :price-med |
|---------|--------------|------------|--------------|
|     IBM |  91.26121951 |   33675.39 |  88.70468750 |
|    AAPL |  64.73048780 |   23885.55 |  37.05281250 |
|    MSFT |  24.73674797 |    9127.86 |  24.07277778 |
|    AMZN |  47.98707317 |   17707.23 |  41.35142361 |
|    GOOG | 415.87044118 |   84837.57 | 422.69722222 |

aggregate

(aggregate agg-map options ds-seq)(aggregate agg-map ds-seq)

Create a set of aggregate statistics over a sequence of datasets. Returns a dataset with a single row and uses the same interface group-by-column-agg.

Example:

  (ds-reduce/aggregate
   {:n-elems (ds-reduce/row-count)
    :price-avg (ds-reduce/mean :price)
    :price-sum (ds-reduce/sum :price)
    :price-med (ds-reduce/prob-median :price)
    :price-iqr (ds-reduce/prob-interquartile-range :price)
    :n-dates (ds-reduce/count-distinct :date :int32)}
   [ds-seq])

count-distinct

(count-distinct colname op-space)(count-distinct colname)

distinct

(distinct colname finalizer)(distinct colname)

Create a reducer that will return a set of values.

distinct-int32

(distinct-int32 colname finalizer)(distinct-int32 colname)

Get the set of distinct items given you know the space is no larger than int32 space. The optional finalizer allows you to post-process the data.

first-value

(first-value colname)

group-by-column-agg

(group-by-column-agg colname agg-map options ds-seq)(group-by-column-agg colname agg-map ds-seq)

Group a sequence of datasets by a column and aggregate down into a new dataset.

  • colname - Either a single scalar column name or a vector of column names to group by.

  • agg-map - map of result column name to reducer. All values in the agg map must be functions from dataset to hamf (non-parallel) reducers. Note that transducer-compatible rf's - such as kixi.mean, are valid hamf reducers.

  • ds-seq - Either a single dataset or sequence of datasets.

See also group-by-column-agg-rf.

Options:

  • :map-initial-capacity - initial hashmap capacity. Resizing hash-maps is expensive so we would like to set this to something reasonable. Defaults to 10000.
  • :index-filter - A function that given a dataset produces a function from long index to boolean, ideally either nil or a java.util.function.LongPredicate. Only indexes for which the index-filter returns true will be added to the aggregation. For very large datasets, this is a bit faster than using filter before the aggregation.

Example:


user> (require '[tech.v3.dataset :as ds])
nil
user> (require '[tech.v3.dataset.reductions :as ds-reduce])
nil
user> (def ds (ds/->dataset "https://github.com/techascent/tech.ml.dataset/raw/master/test/data/stocks.csv"
                            {:key-fn keyword}))

#'user/ds
user> (ds-reduce/group-by-column-agg
       :symbol
       {:price-avg (ds-reduce/mean :price)
        :price-sum (ds-reduce/sum :price)}
       ds)
_unnamed [5 3]:

| :symbol |   :price-avg | :price-sum |
|---------|-------------:|-----------:|
|    MSFT |  24.73674797 |    3042.62 |
|    AAPL |  64.73048780 |    7961.85 |
|     IBM |  91.26121951 |   11225.13 |
|    AMZN |  47.98707317 |    5902.41 |
|    GOOG | 415.87044118 |   28279.19 |

user> (def testds (ds/->dataset {:a ["a" "a" "a" "b" "b" "b" "c" "d" "e"]
                                 :b [22   21  22 44  42  44   77 88 99]}))
#'user/testds
user> (ds-reduce/group-by-column-agg
       [:a :b] {:c (ds-reduce/row-count)}
       testds)
_unnamed [7 3]:

| :a | :b | :c |
|----|---:|---:|
|  e | 99 |  1 |
|  a | 21 |  1 |
|  c | 77 |  1 |
|  d | 88 |  1 |
|  b | 44 |  2 |
|  b | 42 |  1 |
|  a | 22 |  2 |

group-by-column-agg-rf

(group-by-column-agg-rf colname agg-map)(group-by-column-agg-rf colname agg-map options)

Produce a transduce-compatible rf that will perform the group-by-column-agg pathway. See documentation for group-by-column-agg.

tech.v3.dataset.reductions-test> (def stocks (ds/->dataset "test/data/stocks.csv" {:key-fn keyword}))
#'tech.v3.dataset.reductions-test/stocks
tech.v3.dataset.reductions-test> (transduce (map identity)
                                            (ds-reduce/group-by-column-agg-rf
                                             :symbol
                                             {:n-elems (ds-reduce/row-count)
                                              :price-avg (ds-reduce/mean :price)
                                              :price-sum (ds-reduce/sum :price)
                                              :symbol (ds-reduce/first-value :symbol)
                                              :n-dates (ds-reduce/count-distinct :date :int32)}
                                             {:index-filter (fn [dataset]
                                                              (let [rdr (dtype/->reader (dataset :price))]
                                                                (hamf/long-predicate
                                                                 idx (> (.readDouble rdr idx) 100.0))))})
                                            [stocks stocks stocks])
_unnamed [4 5]:

| :symbol | :n-elems |   :price-avg | :price-sum | :n-dates |
|---------|---------:|-------------:|-----------:|---------:|
|    AAPL |       93 | 160.19096774 |   14897.76 |       31 |
|     IBM |      120 | 111.03775000 |   13324.53 |       40 |
|    AMZN |       18 | 126.97833333 |    2285.61 |        6 |
|    GOOG |      204 | 415.87044118 |   84837.57 |       68 |

mean

(mean colname)

Create a double consumer which will produce a mean of the column.

prob-cdf

(prob-cdf colname cdf)(prob-cdf colname cdf k)

See docs for tech.v3.dataset.reductions.apache-data-sketch/prob-cdf

  • k - defaults to 128. This produces a normalized rank error of about 1.7%

prob-interquartile-range

(prob-interquartile-range colname k)(prob-interquartile-range colname)

See docs for [[tech.v3.dataset.reductions.apache-data-sketch/prob-interquartile-range

  • k - defaults to 128. This produces a normalized rank error of about 1.7%

prob-median

(prob-median colname)(prob-median colname k)

See docs for tech.v3.dataset.reductions.apache-data-sketch/prob-median

  • k - defaults to 128. This produces a normalized rank error of about 1.7%

prob-quantile

(prob-quantile colname quantile)(prob-quantile colname quantile k)

See docs for tech.v3.dataset.reductions.apache-data-sketch/prob-quantile

  • k - defaults to 128. This produces a normalized rank error of about 1.7%

prob-set-cardinality

(prob-set-cardinality colname options)(prob-set-cardinality colname)

See docs for tech.v3.dataset.reductions.apache-data-sketch/prob-set-cardinality.

Options:

  • :hll-lgk - defaults to 12, this is log-base2 of k, so k = 4096. lgK can be from 4 to 21.
  • :hll-type - One of #{4,6,8}, defaults to 8. The HLL_4, HLL_6 and HLL_8 represent different levels of compression of the final HLL array where the 4, 6 and 8 refer to the number of bits each bucket of the HLL array is compressed down to. The HLL_4 is the most compressed but generally slightly slower than the other two, especially during union operations.
  • :datatype - One of :float64, :int64, :string

reducer

(reducer column-name init-val-fn rfn merge-fn finalize-fn)(reducer column-name rfn)

Make a group-by-agg reducer.

  • column-name - Single column name or multiple columns.
  • init-val-fn - Function to produce initial accumulators
  • rfn - Function that takes the accumulator and each column's data as as further arguments. For a single-column pathway this looks like a normal clojure reduction function but for two columns it gets extra arguments.
  • merge-fn - Function that takes two accumulators and merges them. Merge is not required for group-by-column-agg but it is required for aggregate.
  • finalize-fn - finalize the result after aggregation. Optional, will be replaced with identity of not provided.

reducer->column-reducer

(reducer->column-reducer reducer cname)(reducer->column-reducer reducer op-space cname)

Given a hamf parallel reducer and a column name, return a dataset reducer of one column.

reservoir-dataset

(reservoir-dataset reservoir-size)(reservoir-dataset reservoir-size options)

reservoir-desc-stat

(reservoir-desc-stat colname reservoir-size stat-name options)(reservoir-desc-stat colname reservoir-size stat-name)

Calculate a descriptive statistic using reservoir sampling. A list of statistic names are found in tech.v3.datatype.statistics/all-descriptive-stats-names. Options are options used in reservoir-sampler.

Note that this method will not convert datetime objects to milliseconds for you as in descriptive-stats.

row-count

(row-count)

Create a simple reducer that returns the number of times reduceIndex was called.

sum

(sum colname)

Create a double consumer which will sum the values.