Skip to content
Snippets Groups Projects
Commit 85fd05bb authored by Simon Belak's avatar Simon Belak
Browse files

First attempt

parent 9ea31ab8
Branches
Tags
No related merge requests found
......@@ -44,6 +44,9 @@
org.apache.httpcomponents/httpclient
net.sourceforge.nekohtml/nekohtml
ring/ring-core]]
[com.clearspring.analytics/stream "2.9.5" ; Various sketching algorithms
:exclusions [org.slf4j/slf4j-api
it.unimi.dsi/fastutil]]
[com.draines/postal "2.0.2"] ; SMTP library
[com.google.apis/google-api-services-analytics ; Google Analytics Java Client Library
"v3-rev154-1.23.0"]
......
(ns metabase.sync.analyze.fingerprint
"Analysis sub-step that takes a sample of values for a Field and saving a non-identifying fingerprint
used for classification. This fingerprint is saved as a column on the Field it belongs to."
(:require [clojure.set :as set]
(:require [clj-time
[coerce :as t.coerce]
[core :as t]]
[clojure.set :as set]
[clojure.tools.logging :as log]
[honeysql.helpers :as h]
[kixi.stats.core :as stats]
[medley.core :as m]
[metabase.models.field :refer [Field]]
[metabase.sync
[interface :as i]
......@@ -18,27 +23,102 @@
[metabase.util
[date :as du]
[schema :as su]]
[redux.core :as redux]
[schema.core :as s]
[toucan.db :as db]))
(s/defn ^:private type-specific-fingerprint :- (s/maybe i/TypeSpecificFingerprint)
"Return type-specific fingerprint info for FIELD AND. a FieldSample of Values if it has an elligible base type"
[field :- i/FieldInstance, values :- i/FieldSample]
(condp #(isa? %2 %1) (:base_type field)
:type/Text {:type/Text (text/text-fingerprint values)}
:type/Number {:type/Number (number/number-fingerprint values)}
:type/DateTime {:type/DateTime (datetime/datetime-fingerprint values)}
nil))
(s/defn ^:private fingerprint :- i/Fingerprint
"Generate a 'fingerprint' from a FieldSample of VALUES."
[field :- i/FieldInstance, values :- i/FieldSample]
(merge
(when-let [global-fingerprint (global/global-fingerprint values)]
{:global global-fingerprint})
(when-let [type-specific-fingerprint (type-specific-fingerprint field values)]
{:type type-specific-fingerprint})))
[toucan.db :as db])
(:import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus))
(defn- col-wise
[& rfs]
(fn
([]
(mapv (fn [rf]
(rf))
rfs))
([acc]
(mapv (fn [rf acc]
(rf acc))
rfs acc))
([acc e]
(mapv (fn [rf acc e]
(rf acc e))
rfs acc e))))
(defn- monoid
[f init]
(fn
([] init)
([acc] (f acc))
([acc x] (f acc x))))
(defn- share
[pred]
(fn
([]
{:match 0
:total 0})
([{:keys [match total]}]
(/ match (max total 1)))
([{:keys [match total]} e]
{:match (cond-> match
(pred e) inc)
:total (inc total)})))
(defn cardinality
"Transducer that sketches cardinality using HyperLogLog++.
https://research.google.com/pubs/pub40671.html"
([] (HyperLogLogPlus. 14 25))
([^HyperLogLogPlus acc] (.cardinality acc))
([^HyperLogLogPlus acc x]
(.offer acc x)
acc))
(defmulti
^{:private true
:arglists '([field])}
fingerprinter :base_type)
(def ^:private global-fingerprinter
(redux/fuse {:distinct-count cardinality}))
(defmethod fingerprinter :default
[_]
global-fingerprinter)
(defn- with-global-fingerprinter
[prefix fingerprinter]
(redux/post-complete
(redux/juxt
fingerprinter
global-fingerprinter)
(fn [[type-fingerprint global-fingerprint]]
{:global global-fingerprint
:type {prefix type-fingerprint}})))
(defmacro ^:private deffingerprinter
[type transducer]
`(defmethod fingerprinter ~type
[_#]
(with-global-fingerprinter ~type ~transducer)))
(deffingerprinter :type/DateTime
((map du/str->date-time)
(redux/post-complete
(redux/fuse {:earliest (monoid t/min-date (t.coerce/from-long Long/MAX_VALUE))
:latest (monoid t/max-date (t.coerce/from-long 0))})
(partial m/map-vals str))))
(deffingerprinter :type/Number
((remove nil?)
(redux/fuse {:min (monoid min Double/POSITIVE_INFINITY)
:max (monoid max Double/NEGATIVE_INFINITY)
:avg stats/mean})))
(deffingerprinter :type/Text
(redux/fuse {:percent-json (share text/valid-serialized-json?)
:percent-url (share u/url?)
:percent-email (share u/email?)
:average-length ((map (comp count str)) stats/mean)}))
(s/defn ^:private save-fingerprint!
[field :- i/FieldInstance, fingerprint :- i/Fingerprint]
......@@ -61,18 +141,9 @@
(s/defn ^:private fingerprint-table!
[table :- i/TableInstance, fields :- [i/FieldInstance]]
(let [fields-to-sample (sample/sample-fields table fields)]
(reduce (fn [count-info [field sample]]
(if-not sample
(update count-info :no-data-fingerprints inc)
(let [result (sync-util/with-error-handling (format "Error generating fingerprint for %s"
(sync-util/name-for-logging field))
(save-fingerprint! field (fingerprint field sample)))]
(if (instance? Exception result)
(update count-info :failed-fingerprints inc)
(update count-info :updated-fingerprints inc)))))
(empty-stats-map (count fields-to-sample))
fields-to-sample)))
(transduce identity
(apply col-wise (map fingerprinter fields))
(sample/sample-fields table fields)))
;;; +----------------------------------------------------------------------------------------------------------------+
......
......@@ -9,13 +9,20 @@
[redux.core :as redux]
[schema.core :as s]))
(defn- monoid
[f init]
(fn
([] init)
([acc] (f acc))
([acc x] (f acc x))))
(s/defn datetime-fingerprint :- i/DateTimeFingerprint
"Generate a fingerprint containing information about values that belong to a `DateTime` Field."
[values :- i/FieldSample]
(transduce (map du/str->date-time)
(redux/post-complete
(redux/fuse {:earliest t/min-date
:latest t/max-date})
(partial m/map-vals str))
(transduce ((map du/str->date-time)
(redux/post-complete
(redux/fuse {:earliest (monoid t/min-date (t.coerce/from-long Long/MAX_VALUE))
:latest (monoid t/min-date (t.coerce/from-long 0))})
(partial m/map-vals str)))
[(t.coerce/from-long Long/MAX_VALUE) (t.coerce/from-long 0)]
values))
......@@ -3,30 +3,12 @@
used to generate fingerprints for those Fields. Currently this is dumb and just fetches a contiguous sequence of
rows, but in the future we plan to make this more sophisticated and have different types of samples for different
Fields, or do a better job getting a more-random sample of rows."
(:require [medley.core :as m]
[metabase.driver :as driver]
(:require [metabase.driver :as driver]
[metabase.sync.interface :as i]
[schema.core :as s]))
(s/defn ^:private basic-sample :- (s/maybe i/TableSample)
(s/defn sample-fields :- (s/maybe i/TableSample)
"Procure a sequence of table rows, up to `max-sample-rows` (10,000 at the time of this writing), for
use in the fingerprinting sub-stage of analysis. Returns `nil` if no rows are available."
use in the fingerprinting sub-stage of analysis."
[table :- i/TableInstance, fields :- [i/FieldInstance]]
(seq (driver/table-rows-sample table fields)))
(s/defn ^:private table-sample->field-sample :- (s/maybe i/FieldSample)
"Fetch a sample for the Field whose values are at INDEX in the TABLE-SAMPLE.
Filters out `nil` values; returns `nil` if a non-empty sample cannot be obtained."
[table-sample :- i/TableSample, i :- s/Int]
(->> (for [row table-sample]
(nth row i))
(filter (complement nil?))
seq))
(s/defn sample-fields :- [(s/pair i/FieldInstance "Field", (s/maybe i/FieldSample) "FieldSample")]
"Fetch samples for a series of FIELDS. Returns tuples of Field and sample.
This may return `nil` if the sample could not be fetched for some other reason."
[table :- i/TableInstance, fields :- [i/FieldInstance]]
(when-let [table-sample (basic-sample table fields)]
(for [[i field] (m/indexed fields)]
[field (table-sample->field-sample table-sample i)])))
(driver/table-rows-sample table fields))
......@@ -22,7 +22,7 @@
(/ (double matching-count)
(double total-count))))
(defn- valid-serialized-json?
(defn valid-serialized-json?
"True if X is a serialized JSON dictionary or array."
[x]
(boolean
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment