Skip to content
Snippets Groups Projects
Commit 22601dba authored by Simon Belak's avatar Simon Belak Committed by Cam Saul
Browse files

Harden fingerprinting (#12573)

parent c6e5cd29
No related branches found
No related tags found
No related merge requests found
......@@ -6,13 +6,14 @@
[kixi.stats
[core :as stats]
[math :as math]]
[medley.core :as m]
[metabase.models.field :as field]
[metabase.sync.analyze.classifiers.name :as classify.name]
[metabase.sync.util :as sync-util]
[metabase.util :as u]
[metabase.util
[date-2 :as u.date]
[i18n :refer [trs]]]
[i18n :refer [deferred-trs trs]]]
[redux.core :as redux])
(:import com.bigml.histogram.Histogram
com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
......@@ -53,6 +54,46 @@
(.offer acc x)
acc))
(defmacro robust-map
"Wrap each map value in try-catch block."
[& kvs]
`(hash-map ~@(apply concat (for [[k v] (partition 2 kvs)]
`[~k (try
~v
(catch Throwable _#))]))))
(defmacro ^:private with-reduced-error
[msg & body]
`(let [result# (sync-util/with-error-handling ~msg ~@body)]
(if (instance? Throwable result#)
(reduced result#)
result#)))
(defn with-error-handling
"Wrap `rf` in an error-catching transducer."
[rf msg]
(fn
([] (with-reduced-error msg (rf)))
([acc]
(unreduced
(if (or (reduced? acc)
(instance? Throwable acc))
acc
(with-reduced-error msg (rf acc)))))
([acc e] (with-reduced-error msg (rf acc e)))))
(defn robust-fuse
"Like `redux/fuse` but wraps every reducing fn in `with-error-handling` and returns `nil` for
that fn if an error has been encountered during transducing."
[kfs]
(redux/fuse (m/map-kv-vals (fn [k f]
(redux/post-complete
(with-error-handling f (deferred-trs "Error reducing {0}" (name k)))
(fn [result]
(when-not (instance? Throwable result)
result))))
kfs)))
(defmulti fingerprinter
"Return a fingerprinter transducer for a given field based on the field's type."
{:arglists '([field])}
......@@ -68,8 +109,8 @@
(def ^:private global-fingerprinter
(redux/post-complete
(redux/fuse {:distinct-count cardinality
:nil% (stats/share nil?)})
(robust-fuse {:distinct-count cardinality
:nil% (stats/share nil?)})
(partial hash-map :global)))
(defmethod fingerprinter :default
......@@ -101,26 +142,6 @@
(merge global-fingerprint
type-fingerprint))))
(defmacro ^:private with-reduced-error
[msg & body]
`(let [result# (sync-util/with-error-handling ~msg ~@body)]
(if (instance? Throwable result#)
(reduced result#)
result#)))
(defn with-error-handling
"Wrap `rf` in an error-catching transducer."
[rf msg]
(fn
([] (with-reduced-error msg (rf)))
([acc]
(unreduced
(if (or (reduced? acc)
(instance? Throwable acc))
acc
(with-reduced-error msg (rf acc)))))
([acc e] (with-reduced-error msg (rf acc e)))))
(defmacro ^:private deffingerprinter
[field-type transducer]
(let [field-type (if (vector? field-type)
......@@ -168,8 +189,8 @@
(deffingerprinter :type/DateTime
((map ->temporal)
(redux/fuse {:earliest earliest
:latest latest})))
(robust-fuse {:earliest earliest
:latest latest})))
(defn- histogram
"Transducer that summarizes numerical data with a histogram."
......@@ -177,17 +198,25 @@
([^Histogram histogram] histogram)
([^Histogram histogram x] (hist/insert-simple! histogram x)))
(defn real-number?
"Is `x` a real number (i.e. not a `NaN` or an `Infinity`)?"
[x]
(and (number? x)
(not (Double/isNaN x))
(not (Double/isInfinite x))))
(deffingerprinter :type/Number
(redux/post-complete
histogram
((filter real-number?) histogram)
(fn [h]
(let [{q1 0.25 q3 0.75} (hist/percentiles h 0.25 0.75)]
{:min (hist/minimum h)
(robust-map
:min (hist/minimum h)
:max (hist/maximum h)
:avg (hist/mean h)
:sd (some-> h hist/variance math/sqrt)
:q1 q1
:q3 q3}))))
:q3 q3)))))
(defn- valid-serialized-json?
"Is x a serialized JSON dictionary or array."
......@@ -199,10 +228,10 @@
((map str) ; we cast to str to support `field-literal` type overwriting:
; `[:field-literal "A_NUMBER" :type/Text]` (which still
; returns numbers in the result set)
(redux/fuse {:percent-json (stats/share valid-serialized-json?)
:percent-url (stats/share u/url?)
:percent-email (stats/share u/email?)
:average-length ((map count) stats/mean)})))
(robust-fuse {:percent-json (stats/share valid-serialized-json?)
:percent-url (stats/share u/url?)
:percent-email (stats/share u/email?)
:average-length ((map count) stats/mean)})))
(defn fingerprint-fields
"Return a transducer for fingerprinting a resultset with fields `fields`."
......
......@@ -99,25 +99,19 @@
(def ^:private ^:const ^Long validation-set-size 20)
(defn- real-number?
[x]
(and (number? x)
(not (Double/isNaN x))
(not (Double/isInfinite x))))
(defn- best-fit
"Fit curves from `trendline-function-families` and pick the one with the smallest RMSE.
To keep the operation single pass we collect a small validation set as we go using reservoir
sampling, and use it to calculate RMSE."
[fx fy]
(redux/post-complete
(redux/fuse
(f/robust-fuse
{:fits (->> (for [{:keys [x-link-fn y-link-fn formula model]} trendline-function-families]
(redux/post-complete
(stats/simple-linear-regression (comp (stats/somef x-link-fn) fx)
(comp (stats/somef y-link-fn) fy))
(fn [[offset slope]]
(when (every? real-number? [offset slope])
(when (every? f/real-number? [offset slope])
{:model (model offset slope)
:formula (formula offset slope)}))))
(apply redux/juxt))
......@@ -133,7 +127,7 @@
(map #(assoc % :mae (transduce identity
(mae (comp (:model %) first) second)
validation-set)))
(filter (comp real-number? :mae))
(filter (comp f/real-number? :mae))
not-empty
(apply min-key :mae)
:formula))))
......@@ -202,17 +196,19 @@
(redux/post-complete
(let [y-position (:position number-col)
yfn #(nth % y-position)]
(redux/juxt ((map yfn) (last-n 2))
((map xfn) (last-n 2))
(stats/simple-linear-regression xfn yfn)
(best-fit xfn yfn)))
((filter (comp f/real-number? yfn))
(redux/juxt ((map yfn) (last-n 2))
((map xfn) (last-n 2))
(stats/simple-linear-regression xfn yfn)
(best-fit xfn yfn))))
(fn [[[y-previous y-current] [x-previous x-current] [offset slope] best-fit]]
(let [unit (if (or (nil? (:unit datetime))
(->> datetime :unit mbql.u/normalize-token (= :default)))
(infer-unit x-previous x-current)
(:unit datetime))
show-change? (valid-period? x-previous x-current unit)]
{:last-value y-current
(f/robust-map
:last-value y-current
:previous-value (when show-change?
y-previous)
:last-change (when show-change?
......@@ -221,7 +217,7 @@
:offset offset
:best-fit best-fit
:col (:name number-col)
:unit unit})))))
:unit unit))))))
(trs "Error generating timeseries insight keyed by: {0}"
(sync-util/name-for-logging (field/map->FieldInstance datetime))))))
......
......@@ -42,7 +42,19 @@
:sd 1.0}}}
(transduce identity
(fingerprinter (field/map->FieldInstance {:base_type :type/Number}))
[1.0 2.0 3.0]))))
[1.0 2.0 3.0])))
(testing "We should robustly survive weird values such as NaN, Infinity, and nil"
(is (= {:global {:distinct-count 7
:nil% 0.25}
:type {:type/Number {:avg 2.0
:min 1.0
:max 3.0
:q1 1.25
:q3 2.75
:sd 1.0}}}
(transduce identity
(fingerprinter (field/map->FieldInstance {:base_type :type/Number}))
[1.0 2.0 3.0 Double/NaN Double/POSITIVE_INFINITY Double/NEGATIVE_INFINITY nil nil])))))
(deftest fingerprint-string-values-test
(is (= {:global {:distinct-count 5
......
(ns metabase.sync.analyze.fingerprint.insights-test
(:require [clojure.test :refer :all]
[expectations :refer :all]
[metabase.sync.analyze.fingerprint.insights :as i :refer :all]))
(def ^:private cols [{:base_type :type/DateTime} {:base_type :type/Number}])
......@@ -25,13 +24,6 @@
first
:last-value))))))
(expect
(transduce identity
(insights [{:base_type :type/DateTime :unit :year}
{:base_type :type/Integer}])
[["2014-01-01T00:00:00Z" 100]
["2015-01-01T00:00:00Z" 200]]))
(defn- inst->day
[t]
(some-> t (#'i/->millis-from-epoch) (#'i/ms->day)))
......@@ -97,26 +89,44 @@
["2018-12-02",179,3311]
["2018-12-03",144,2525]])
(expect
[{:last-value 144,
:previous-value 179,
:last-change -0.19553072625698323,
:slope -7.671473413418271,
:offset 137234.92983406168,
:best-fit [:* 1.5672560913548484E227 [:exp [:* -0.02899533549378612 :x]]],
:unit :day,
:col nil}
{:last-value 2525,
:previous-value 3311,
:last-change -0.2373905164602839,
:slope -498.764272733624,
:offset 8915371.843617931,
:best-fit [:+ 8915371.843617931 [:* -498.764272733624 :x]],
:col nil,
:unit :day}]
(transduce identity
(insights [{:base_type :type/DateTime} {:base_type :type/Number} {:base_type :type/Number}])
ts))
(deftest timeseries-insight-test
(is (= [{:last-value 144,
:previous-value 179,
:last-change -0.19553072625698323,
:slope -7.671473413418271,
:offset 137234.92983406168,
:best-fit [:* 1.5672560913548484E227 [:exp [:* -0.02899533549378612 :x]]],
:unit :day,
:col nil}
{:last-value 2525,
:previous-value 3311,
:last-change -0.2373905164602839,
:slope -498.764272733624,
:offset 8915371.843617931,
:best-fit [:+ 8915371.843617931 [:* -498.764272733624 :x]],
:col nil,
:unit :day}]
(transduce identity
(insights [{:base_type :type/DateTime}
{:base_type :type/Number}
{:base_type :type/Number}])
ts)))
(testing "We should robustly survive weird values such as NaN, Infinity, and nil"
(is (= [{:last-value 20.0
:previous-value 10.0
:last-change 1.0
:slope 10.0
:offset -178350.0
:best-fit [:+ -178350.0 [:* 10.0 :x]]
:unit :day
:col nil}]
(transduce identity
(insights [{:base_type :type/DateTime} {:base_type :type/Number}])
[["2018-11-01" 10.0]
["2018-11-02" 20.0]
["2018-11-03" nil]
["2018-11-05" Double/NaN]
["2018-11-08" Double/POSITIVE_INFINITY]])))))
(deftest change-test
(is (= 0.0 (change 1 1)))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment