tech.v3.dataset.reductions
Specific high performance reductions intended to be performend over a sequence
of datasets. This allows aggregations to be done in situations where the dataset is
larger than what will fit in memory on a normal machine. Due to this fact, summation
is implemented using Kahan algorithm and various statistical methods are done in using
statistical estimation techniques and thus are prefixed with prob-
which is short
for probabilistic
.
aggregate
- Perform a multi-dataset aggregation. Returns a dataset with row.group-by-column-agg
- Perform a multi-dataset group-by followed by an aggregation. Returns a dataset with one row per key.
Examples:
user> (require '[tech.v3.dataset :as ds])
nil
user> (require '[tech.v3.datatype.datetime :as dtype-dt])
nil
user> (def stocks (-> (ds/->dataset "test/data/stocks.csv" {:key-fn keyword})
(ds/update-column :date #(dtype-dt/datetime->epoch :epoch-days %))))
#'user/stocks
user> (require '[tech.v3.dataset.reductions :as ds-reduce])
nil
user> (ds-reduce/group-by-column-agg
:symbol
{:symbol (ds-reduce/first-value :symbol)
:price-avg (ds-reduce/mean :price)
:price-sum (ds-reduce/sum :price)
:price-med (ds-reduce/prob-median :price)}
(repeat 3 stocks))
:symbol-aggregation [5 4]:
| :symbol | :price-avg | :price-sum | :price-med |
|---------|--------------|------------|--------------|
| IBM | 91.26121951 | 33675.39 | 88.70468750 |
| AAPL | 64.73048780 | 23885.55 | 37.05281250 |
| MSFT | 24.73674797 | 9127.86 | 24.07277778 |
| AMZN | 47.98707317 | 17707.23 | 41.35142361 |
| GOOG | 415.87044118 | 84837.57 | 422.69722222 |
aggregate
(aggregate agg-map options ds-seq)
(aggregate agg-map ds-seq)
Create a set of aggregate statistics over a sequence of datasets. Returns a dataset with a single row and uses the same interface group-by-column-agg.
Example:
(ds-reduce/aggregate
{:n-elems (ds-reduce/row-count)
:price-avg (ds-reduce/mean :price)
:price-sum (ds-reduce/sum :price)
:price-med (ds-reduce/prob-median :price)
:price-iqr (ds-reduce/prob-interquartile-range :price)
:n-dates (ds-reduce/count-distinct :date :int32)}
[ds-seq])
distinct
(distinct colname finalizer)
(distinct colname)
Create a reducer that will return a set of values.
distinct-int32
(distinct-int32 colname finalizer)
(distinct-int32 colname)
Get the set of distinct items given you know the space is no larger than int32 space. The optional finalizer allows you to post-process the data.
group-by-column-agg
(group-by-column-agg colname agg-map options ds-seq)
(group-by-column-agg colname agg-map ds-seq)
Group a sequence of datasets by a column and aggregate down into a new dataset.
-
colname - Either a single scalar column name or a vector of column names to group by.
-
agg-map - map of result column name to reducer. All values in the agg map must be functions from dataset to hamf (non-parallel) reducers. Note that transducer-compatible rf's - such as kixi.mean, are valid hamf reducers.
-
ds-seq - Either a single dataset or sequence of datasets.
See also group-by-column-agg-rf.
Options:
:map-initial-capacity
- initial hashmap capacity. Resizing hash-maps is expensive so we would like to set this to something reasonable. Defaults to 10000.:index-filter
- A function that given a dataset produces a function from long index to boolean, ideally either nil or a java.util.function.LongPredicate. Only indexes for which the index-filter returns true will be added to the aggregation. For very large datasets, this is a bit faster than using filter before the aggregation.
Example:
user> (require '[tech.v3.dataset :as ds])
nil
user> (require '[tech.v3.dataset.reductions :as ds-reduce])
nil
user> (def ds (ds/->dataset "https://github.com/techascent/tech.ml.dataset/raw/master/test/data/stocks.csv"
{:key-fn keyword}))
#'user/ds
user> (ds-reduce/group-by-column-agg
:symbol
{:price-avg (ds-reduce/mean :price)
:price-sum (ds-reduce/sum :price)}
ds)
_unnamed [5 3]:
| :symbol | :price-avg | :price-sum |
|---------|-------------:|-----------:|
| MSFT | 24.73674797 | 3042.62 |
| AAPL | 64.73048780 | 7961.85 |
| IBM | 91.26121951 | 11225.13 |
| AMZN | 47.98707317 | 5902.41 |
| GOOG | 415.87044118 | 28279.19 |
user> (def testds (ds/->dataset {:a ["a" "a" "a" "b" "b" "b" "c" "d" "e"]
:b [22 21 22 44 42 44 77 88 99]}))
#'user/testds
user> (ds-reduce/group-by-column-agg
[:a :b] {:c (ds-reduce/row-count)}
testds)
_unnamed [7 3]:
| :a | :b | :c |
|----|---:|---:|
| e | 99 | 1 |
| a | 21 | 1 |
| c | 77 | 1 |
| d | 88 | 1 |
| b | 44 | 2 |
| b | 42 | 1 |
| a | 22 | 2 |
group-by-column-agg-rf
(group-by-column-agg-rf colname agg-map)
(group-by-column-agg-rf colname agg-map options)
Produce a transduce-compatible rf that will perform the group-by-column-agg pathway. See documentation for group-by-column-agg.
tech.v3.dataset.reductions-test> (def stocks (ds/->dataset "test/data/stocks.csv" {:key-fn keyword}))
#'tech.v3.dataset.reductions-test/stocks
tech.v3.dataset.reductions-test> (transduce (map identity)
(ds-reduce/group-by-column-agg-rf
:symbol
{:n-elems (ds-reduce/row-count)
:price-avg (ds-reduce/mean :price)
:price-sum (ds-reduce/sum :price)
:symbol (ds-reduce/first-value :symbol)
:n-dates (ds-reduce/count-distinct :date :int32)}
{:index-filter (fn [dataset]
(let [rdr (dtype/->reader (dataset :price))]
(hamf/long-predicate
idx (> (.readDouble rdr idx) 100.0))))})
[stocks stocks stocks])
_unnamed [4 5]:
| :symbol | :n-elems | :price-avg | :price-sum | :n-dates |
|---------|---------:|-------------:|-----------:|---------:|
| AAPL | 93 | 160.19096774 | 14897.76 | 31 |
| IBM | 120 | 111.03775000 | 13324.53 | 40 |
| AMZN | 18 | 126.97833333 | 2285.61 | 6 |
| GOOG | 204 | 415.87044118 | 84837.57 | 68 |
prob-cdf
(prob-cdf colname cdf)
(prob-cdf colname cdf k)
See docs for tech.v3.dataset.reductions.apache-data-sketch/prob-cdf
- k - defaults to 128. This produces a normalized rank error of about 1.7%
prob-interquartile-range
(prob-interquartile-range colname k)
(prob-interquartile-range colname)
See docs for [[tech.v3.dataset.reductions.apache-data-sketch/prob-interquartile-range
- k - defaults to 128. This produces a normalized rank error of about 1.7%
prob-median
(prob-median colname)
(prob-median colname k)
See docs for tech.v3.dataset.reductions.apache-data-sketch/prob-median
- k - defaults to 128. This produces a normalized rank error of about 1.7%
prob-quantile
(prob-quantile colname quantile)
(prob-quantile colname quantile k)
See docs for tech.v3.dataset.reductions.apache-data-sketch/prob-quantile
- k - defaults to 128. This produces a normalized rank error of about 1.7%
prob-set-cardinality
(prob-set-cardinality colname options)
(prob-set-cardinality colname)
See docs for tech.v3.dataset.reductions.apache-data-sketch/prob-set-cardinality.
Options:
:hll-lgk
- defaults to 12, this is log-base2 of k, so k = 4096. lgK can be from 4 to 21.:hll-type
- One of #{4,6,8}, defaults to 8. The HLL_4, HLL_6 and HLL_8 represent different levels of compression of the final HLL array where the 4, 6 and 8 refer to the number of bits each bucket of the HLL array is compressed down to. The HLL_4 is the most compressed but generally slightly slower than the other two, especially during union operations.:datatype
- One of :float64, :int64, :string
reducer
(reducer column-name init-val-fn rfn merge-fn finalize-fn)
(reducer column-name rfn)
Make a group-by-agg reducer.
column-name
- Single column name or multiple columns.init-val-fn
- Function to produce initial accumulatorsrfn
- Function that takes the accumulator and each column's data as as further arguments. For a single-column pathway this looks like a normal clojure reduction function but for two columns it gets extra arguments.merge-fn
- Function that takes two accumulators and merges them. Merge is not required for group-by-column-agg but it is required for aggregate.finalize-fn
- finalize the result after aggregation. Optional, will be replaced with identity of not provided.
reducer->column-reducer
(reducer->column-reducer reducer cname)
(reducer->column-reducer reducer op-space cname)
Given a hamf parallel reducer and a column name, return a dataset reducer of one column.
reservoir-dataset
(reservoir-dataset reservoir-size)
(reservoir-dataset reservoir-size options)
reservoir-desc-stat
(reservoir-desc-stat colname reservoir-size stat-name options)
(reservoir-desc-stat colname reservoir-size stat-name)
Calculate a descriptive statistic using reservoir sampling. A list of statistic
names are found in tech.v3.datatype.statistics/all-descriptive-stats-names
.
Options are options used in
reservoir-sampler.
Note that this method will not convert datetime objects to milliseconds for you as in descriptive-stats.
row-count
(row-count)
Create a simple reducer that returns the number of times reduceIndex was called.