Skip to content
Snippets Groups Projects
Unverified Commit d6b958c1 authored by Cam Saul's avatar Cam Saul Committed by GitHub
Browse files

[MLv2] Basic QP support for MLv2 queries (#29635)

* [MLv2] Basic QP support for MLv2 queries

* Fix data/id init on launch

* Fix Kondo errors

* Add missing docstrings

* Seriously Kondo!
parent a5ad9452
No related branches found
No related tags found
No related merge requests found
Showing
with 371 additions and 33 deletions
......@@ -489,6 +489,8 @@
metabase.query-processor.error-type/deferror clojure.core/def
metabase.query-processor.middleware.cache.impl/with-reducible-deserialized-results clojure.core/let
metabase.query-processor.middleware.process-userland-query-test/with-query-execution clojure.core/let
metabase.query-processor-test.pipeline-queries-test/pmbql-query clojure.core/->
metabase.query-processor-test.pipeline-queries-test/run-pmbql-query clojure.core/->
metabase.shared.util.namespaces/import-fns potemkin/import-vars
metabase.sync.util/sum-for clojure.core/for
metabase.sync.util/with-emoji-progress-bar clojure.core/let
......
......@@ -163,3 +163,13 @@
query-type inner-query})]
#?(:cljs (js/console.log "->legacy-MBQL on query" query result))
result))
;;; placeholder, feel free to delete @braden.
(defmethod ->legacy-MBQL :count
[[_tag opts field]]
(let [clause (if field
[:count (->legacy-MBQL field)]
[:count])]
(if-let [aggregation-options-opts (not-empty (select-keys opts [:name :display-name]))]
[:aggregation-options clause aggregation-options-opts]
clause)))
......@@ -15,6 +15,7 @@
[metabase.lib.metadata.calculation :as lib.metadata.calculation]
[metabase.lib.metric :as lib.metric]
[metabase.lib.native :as lib.native]
[metabase.lib.normalize :as lib.normalize]
[metabase.lib.order-by :as lib.order-by]
[metabase.lib.query :as lib.query]
[metabase.lib.segment :as lib.segment]
......@@ -34,6 +35,7 @@
lib.metadata.calculation/keep-me
lib.metric/keep-me
lib.native/keep-me
lib.normalize/keep-me
lib.order-by/keep-me
lib.query/keep-me
lib.segment/keep-me
......@@ -60,7 +62,9 @@
breakout]
[lib.dev
field
query-for-table-name]
query-for-table-id
query-for-table-name
table]
[lib.expression
expression
+
......@@ -103,6 +107,8 @@
rtrim
upper
lower]
[lib.field
fields]
[lib.filter
filter
add-filter
......@@ -148,6 +154,8 @@
order-by-clause
order-bys
orderable-columns]
[lib.normalize
normalize]
[lib.query
native-query
query
......
......@@ -47,3 +47,16 @@
table-name :- ::lib.schema.common/non-blank-string]
(let [table-metadata (lib.metadata/table metadata-provider schema-name table-name)]
(lib.query/query metadata-provider table-metadata))))
(mu/defn query-for-table-id :- ::lib.schema/query
"Create a new query for a specific Table with `table-id`."
[metadata-provider :- lib.metadata/MetadataProvider
table-id :- ::lib.schema.id/table]
(let [table-metadata (lib.metadata/table metadata-provider table-id)]
(lib.query/query metadata-provider table-metadata)))
(mu/defn table :- fn?
"Returns a function that can be resolved to Table metadata. For use with a [[lib/join]] or something like that."
([id :- ::lib.schema.id/table]
(fn [query _stage-number]
(lib.metadata/table query id))))
......@@ -5,6 +5,7 @@
[metabase.lib.join :as lib.join]
[metabase.lib.metadata :as lib.metadata]
[metabase.lib.metadata.calculation :as lib.metadata.calculation]
[metabase.lib.normalize :as lib.normalize]
[metabase.lib.options :as lib.options]
[metabase.lib.schema :as lib.schema]
[metabase.lib.schema.common :as lib.schema.common]
......@@ -18,27 +19,44 @@
(comment metabase.lib.schema.ref/keep-me)
(defn- normalize-binning-options [opts]
(lib.normalize/normalize-map
opts
keyword
{:strategy keyword}))
(defn- normalize-field-options [opts]
(lib.normalize/normalize-map
opts
keyword
{:temporal-unit keyword
:binning normalize-binning-options}))
(defmethod lib.normalize/normalize :field
[[tag opts id-or-name]]
[(keyword tag) (normalize-field-options opts) id-or-name])
(mu/defn ^:private resolve-field-id :- lib.metadata/ColumnMetadata
"Integer Field ID: get metadata from the metadata provider. This is probably not 100% the correct thing to do if
this isn't the first stage of the query, but we can fix that behavior in a follow-on"
[query :- ::lib.schema/query
_stage-number :- :int
field-id :- ::lib.schema.id/field]
[query :- ::lib.schema/query
_stage-number :- :int
field-id :- ::lib.schema.id/field]
(lib.metadata/field query field-id))
(mu/defn ^:private resolve-field-name :- lib.metadata/ColumnMetadata
"String column name: get metadata from the previous stage, if it exists, otherwise if this is the first stage and we
have a native query or a Saved Question source query or whatever get it from our results metadata."
[query :- ::lib.schema/query
stage-number :- :int
column-name :- ::lib.schema.common/non-blank-string]
[query :- ::lib.schema/query
stage-number :- :int
column-name :- ::lib.schema.common/non-blank-string]
(or (some (fn [column]
(when (= (:name column) column-name)
column))
(if-let [previous-stage-number (lib.util/previous-stage-number query stage-number)]
(let [previous-stage (lib.util/query-stage query previous-stage-number)]
(:lib/stage-metadata previous-stage))
(get-in (lib.util/query-stage query stage-number) [:lib/stage-metadata :columns])))
(let [stage (if-let [previous-stage-number (lib.util/previous-stage-number query stage-number)]
(lib.util/query-stage query previous-stage-number)
(lib.util/query-stage query stage-number))]
(get-in stage [:lib/stage-metadata :columns])))
(throw (ex-info (i18n/tru "Invalid :field clause: column {0} does not exist" (pr-str column-name))
{:name column-name
:query query
......@@ -67,6 +85,7 @@
[_query _stage-number field-metadata]
field-metadata)
;;; TODO -- base type should be affected by `temporal-unit`, right?
(defmethod lib.metadata.calculation/metadata :field
[query stage-number [_tag {:keys [base-type temporal-unit], :as opts} :as field-ref]]
(let [field-metadata (resolve-field-metadata query stage-number field-ref)
......@@ -131,13 +150,13 @@
true lib.options/ensure-uuid))
(defmethod ->field :dispatch-type/integer
[query _stage field-id]
[query _stage-number field-id]
(lib.metadata/field query field-id))
;;; Pass in a function that takes `query` and `stage` to support ad-hoc usage in tests etc
;;; Pass in a function that takes `query` and `stage-number` to support ad-hoc usage in tests etc
(defmethod ->field :dispatch-type/fn
[query stage f]
(f query stage))
[query stage-number f]
(f query stage-number))
(defmethod lib.temporal-bucket/temporal-bucket* :field
[[_field options id-or-name] unit]
......@@ -153,3 +172,17 @@
(defmethod lib.join/with-join-alias-method :field
[field-ref join-alias]
(lib.options/update-options field-ref assoc :join-alias join-alias))
(defn fields
"Specify the `:fields` for a query."
([xs]
(fn [query stage-number]
(fields query stage-number xs)))
([query xs]
(fields query -1 xs))
([query stage-number xs]
(let [xs (mapv #(->field query stage-number %)
xs)]
(lib.util/update-query-stage query stage-number assoc :fields xs))))
......@@ -60,7 +60,7 @@
#?@(:clj
[pretty.core/PrettyPrintable
(pretty [_this]
(list `->CachedProxyMetadataProvider cache metadata-provider))]))
(list `cached-metadata-provider metadata-provider))]))
(defn cached-metadata-provider
"Wrap `metadata-provider` with an implementation that automatically caches results.
......
......@@ -38,7 +38,7 @@
`lib.metadata.protocols/database
`UncachedApplicationDatabaseMetadataProvider)
{})))
(fetch-instance :metadata.models.database/Database database-id))
(fetch-instance :metadata/database database-id))
(table [_this table-id] (fetch-instance :metadata/table table-id))
(field [_this field-id] (fetch-instance :metadata/field field-id))
......@@ -53,15 +53,13 @@
`UncachedApplicationDatabaseMetadataProvider)
{})))
(log/debugf "Fetching all Tables for Database %d" database-id)
(into []
(map #(assoc % :lib/type :metadata/table))
(t2/reducible-select :metabase.models.table/Table :db_id database-id)))
(mapv #(assoc % :lib/type :metadata/table)
(t2/select :metabase.models.table/Table :db_id database-id)))
(fields [_this table-id]
(log/debugf "Fetching all Fields for Table %d" table-id)
(into []
(map #(assoc % :lib/type :metadata/field))
(t2/reducible-select :table_id table-id)))
(mapv #(assoc % :lib/type :metadata/field)
(t2/select :table_id table-id)))
lib.metadata.protocols/BulkMetadataProvider
(bulk-metadata [_this metadata-type ids]
......
(ns metabase.lib.normalize
(:require
[metabase.lib.dispatch :as lib.dispatch]))
(defn- mbql-clause-type [x]
(when (and (vector? x)
((some-fn keyword? string?) (first x)))
(keyword (first x))))
(defn- map-type [m]
(when (map? m)
(some-> (or
(:lib/type m)
(get m "lib/type"))
keyword)))
(defn- dispatch-value [x]
(or
(mbql-clause-type x)
(map-type x)
(keyword (lib.dispatch/dispatch-value x))))
(defmulti normalize
"Ensure some part of an MBQL query `x`, e.g. a clause or map, is in the right shape after coming in from JavaScript or
deserialized JSON (from the app DB or a REST API request). This is intended for things that are already in a
generally correct pMBQL; to 'normalize' things from legacy MBQL, use [[metabase.lib.convert]].
The default implementation will keywordize keys for maps, and convert some known keys
using [[default-map-value-fns]]; for MBQL clauses, it will convert the clause name to a keyword and recursively
normalize its options and arguments. Implement this method if you need custom behavior for something."
{:arglists '([x])}
dispatch-value)
(def default-map-value-fns
"Default normalization functions keys when doing map normalization."
{:base-type keyword
:type keyword
:lib/type keyword
:lib/options normalize})
(defn normalize-map
"[[normalize]] a map using `key-fn` (default [[clojure.core/keyword]]) for keys and
`value-fns` (default [[default-map-value-fns]]; additional functions are merged into this map).
This is the default implementation for maps. Custom map implementations can call this with a different `key-fn` or
additional `value-fns` as needed."
([m]
(normalize-map m keyword))
([m key-fn]
(normalize-map m key-fn nil))
([m key-fn value-fns]
(let [value-fns (merge default-map-value-fns value-fns)]
(into {}
(map (fn [[k v]]
(let [k (key-fn k)]
[k
(if-let [f (get value-fns k)]
(f v)
v)])))
m))))
(defmethod normalize :dispatch-type/map
[m]
(normalize-map m))
(defn- default-normalize-mbql-clause [[tag opts & args]]
(into [(keyword tag) (normalize opts)]
(map normalize)
args))
(defmethod normalize :default
[x]
(cond
(mbql-clause-type x) (default-normalize-mbql-clause x)
(map-type x) (normalize-map x)
:else x))
......@@ -4,12 +4,21 @@
[metabase.lib.dispatch :as lib.dispatch]
[metabase.lib.metadata :as lib.metadata]
[metabase.lib.metadata.calculation :as lib.metadata.calculation]
[metabase.lib.normalize :as lib.normalize]
[metabase.lib.options :as lib.options]
[metabase.lib.schema :as lib.schema]
[metabase.lib.schema.id :as lib.schema.id]
[metabase.lib.util :as lib.util]
[metabase.util.malli :as mu]))
(defmethod lib.normalize/normalize :mbql/query
[query]
(lib.normalize/normalize-map
query
keyword
{:type keyword
:stages (partial mapv lib.normalize/normalize)}))
(defmethod lib.metadata.calculation/metadata :mbql/query
[query stage-number x]
(lib.metadata.calculation/metadata query stage-number (lib.util/query-stage x stage-number)))
......
......@@ -8,6 +8,7 @@
[metabase.lib.expression :as lib.expression]
[metabase.lib.metadata :as lib.metadata]
[metabase.lib.metadata.calculation :as lib.metadata.calculation]
[metabase.lib.normalize :as lib.normalize]
[metabase.lib.options :as lib.options]
[metabase.lib.schema :as lib.schema]
[metabase.lib.schema.common :as lib.schema.common]
......@@ -21,6 +22,14 @@
(declare stage-metadata)
(defmethod lib.normalize/normalize :mbql.stage/mbql
[stage]
(lib.normalize/normalize-map
stage
keyword
{:aggregation (partial mapv lib.normalize/normalize)
:filter lib.normalize/normalize}))
(mu/defn ^:private fields-columns :- [:maybe [:sequential lib.metadata/ColumnMetadata]]
[query :- ::lib.schema/query
stage-number :- :int]
......
......@@ -86,6 +86,14 @@
(let [previous-stages (if source-query
(inner-query->stages source-query)
[])
source-metadata (when source-metadata
(-> (if (vector? source-metadata)
{:columns source-metadata}
source-metadata)
(update :columns (fn [columns]
(for [column columns]
(assoc column :lib/type :metadata/field))))
(assoc :lib/type :metadata/results)))
previous-stages (cond-> previous-stages
source-metadata (assoc-in [(dec (count previous-stages)) :lib/stage-metadata] source-metadata))
stage-type (if (:native inner-query)
......@@ -156,6 +164,12 @@
(let [{:keys [stages]} (pipeline query)]
(get (vec stages) (non-negative-stage-index stages stage-number))))
(mu/defn previous-stage :- [:maybe ::lib.schema/stage]
"Return the previous stage of the query, if there is one; otherwise return `nil`."
[query stage-number :- :int]
(when-let [stage-num (previous-stage-number query stage-number)]
(query-stage query stage-num)))
(mu/defn update-query-stage :- ::lib.schema/query
"Update a specific `stage-number` of a `query` by doing
......@@ -198,3 +212,43 @@
","
conjunction
(last coll)))))))
(defn update-stages-ignore-joins
"Like [[update-stages]], but does not recurse into the stages inside joins.
`f` has the signature
(f query stage-number stage)"
[query f]
(reduce
(fn [query stage-number]
(update-in query [:stages stage-number] (fn [stage]
(f query stage-number stage))))
query
(range 0 (count (:stages query)))))
(defn update-stages
"Apply function `f` to every stage of a query, depth-first. Also applied to all query stages.
`f` has the signature
(f query stage-number stage)
`query` reflects the results of the previous call to `f`.
As a convenience, if `f` returns nil, the original stage will be used without changes."
[query f]
(letfn [(update-join [join]
(-> query
(assoc :stages (:stages join))
(update-stages f)
:stages))
(update-joins [joins]
(mapv update-join joins))]
(update-stages-ignore-joins
query
(fn [query stage-number stage]
(let [stage (cond-> stage
(:joins stage)
update-joins)]
(f query stage-number stage))))))
(ns metabase.query-processor.middleware.normalize-query
"Middleware that converts a query into a normalized, canonical form."
(:require
[metabase.lib.convert :as lib.convert]
[metabase.lib.core :as lib]
[metabase.mbql.normalize :as mbql.normalize]
[metabase.query-processor.error-type :as qp.error-type]
[metabase.util :as u]
......@@ -8,18 +10,27 @@
(set! *warn-on-reflection* true)
(defn- normalize* [query]
(try
(let [query-type (keyword ((some-fn :type #(get % "type")) query))
normalized (case query-type
:pipeline
(lib.convert/->legacy-MBQL (lib/normalize query))
(:query :native)
(mbql.normalize/normalize query))]
(log/tracef "Normalized query:\n%s\n=>\n%s" (u/pprint-to-str query) (u/pprint-to-str normalized))
normalized)
(catch Throwable e
(throw (ex-info (.getMessage e)
{:type qp.error-type/qp
:query query}
e)))))
(defn normalize
"Middleware that converts a query into a normalized, canonical form, including things like converting all identifiers
into standard `lisp-case` ones, removing/rewriting legacy clauses, removing empty ones, etc. This is done to
simplifiy the logic in the QP steps following this."
[qp]
(fn [query rff context]
(let [query' (try
(u/prog1 (mbql.normalize/normalize query)
(log/tracef "Normalized query:\n%s" (u/pprint-to-str <>)))
(catch Throwable e
(throw (ex-info (.getMessage e)
{:type qp.error-type/invalid-query
:query query}
e))))]
(qp query' rff context))))
(qp (normalize* query) rff context)))
(ns metabase.lib.normalize-test
(:require
[clojure.test :refer [are deftest is testing]]
[metabase.lib.core :as lib]))
(deftest ^:parallel normalize-query-type-test
(testing "Query type should get normalized"
(is (= {:type :native}
(lib/normalize {:type "native"})))))
(deftest ^:parallel do-not-normalize-native-queries-test
(testing "native queries should NOT get normalized"
(are [x expected] (= expected
(lib/normalize x))
{"lib/type" "mbql/query"
"stages" [{"lib/type" "mbql.stage/native"
"native" "SELECT COUNT(*) FROM CANS;"}]}
{:lib/type :mbql/query
:stages [{:lib/type :mbql.stage/native
:native "SELECT COUNT(*) FROM CANS;"}]}
{:lib/type :mbql/query
:stages [{"lib/type" "mbql.stage/native"
"native" {:NAME "FAKE_QUERY"
"description" "Theoretical fake query in a JSON-based query lang"}}]}
{:lib/type :mbql/query
:stages [{:lib/type :mbql.stage/native
:native {:NAME "FAKE_QUERY"
"description" "Theoretical fake query in a JSON-based query lang"}}]})))
(deftest ^:parallel normalize-value-test
(testing ":value clauses should keep snake_case keys in the type info arg"
;; See https://github.com/metabase/metabase/issues/23354 for details
(is (= [:value {:some_key "some key value"} "some value"]
(lib/normalize [:value {:some_key "some key value"} "some value"])))))
(deftest ^:parallel e2e-test
(is (= {:lib/type :mbql/query
:database 1
:type :pipeline
:stages [{:lib/type :mbql.stage/mbql
:lib/options {:lib/uuid "a8f41095-1e37-4da6-a4e5-51a9c2c3f523"}
:source-table 1
:aggregation [[:count {:lib/uuid "a6685a7d-62b3-4ceb-a13f-f9db405dcb49"}]]
:filter [:=
{:lib/uuid "c4984ada-f8fe-4ac2-b6b4-45885527f5b4"}
[:field
{:base-type :type/Integer
:lib/uuid "5a84d551-ea5f-44f4-952f-2162f05cdcc4"}
1]
4]}]}
(lib/normalize
{"lib/type" "mbql/query"
"database" 1
"type" "pipeline"
"stages" [{"lib/type" "mbql.stage/mbql"
"lib/options" {"lib/uuid" "a8f41095-1e37-4da6-a4e5-51a9c2c3f523"}
"source-table" 1
"aggregation" [["count" {"lib/uuid" "a6685a7d-62b3-4ceb-a13f-f9db405dcb49"}]]
"filter" ["="
{"lib/uuid" "c4984ada-f8fe-4ac2-b6b4-45885527f5b4"}
["field"
{"base-type" "type/Integer"
"lib/uuid" "5a84d551-ea5f-44f4-952f-2162f05cdcc4"}
1]
4]}]}))))
......@@ -90,7 +90,8 @@
:type :pipeline
:stages [{:lib/type :mbql.stage/mbql
:source-table (meta/id :venues)
:lib/stage-metadata [(meta/field-metadata :venues :id)]}
:lib/stage-metadata {:lib/type :metadata/results
:columns [(meta/field-metadata :venues :id)]}}
{:lib/type :mbql.stage/mbql}]}
(lib.util/pipeline
{:database (meta/id)
......
(ns metabase.query-processor-test.pipeline-queries-test
(:require
[clojure.test :refer :all]
[metabase.lib.core :as lib]
[metabase.lib.metadata.jvm :as lib.metadata.jvm]
[metabase.query-processor :as qp]
[metabase.test :as mt]
[metabase.util :as u]))
(defn- metadata-provider []
(lib.metadata.jvm/application-database-metadata-provider (mt/id)))
;;; this stuff is mostly so we can get a sense of what using MLv2 in tests will ultimately look like
(defmacro ^:private pmbql-query
{:style/indent 1}
[table-name & body]
`(-> (lib/query-for-table-id (metadata-provider) (mt/id ~(keyword table-name)))
~@body))
(defmacro ^:private run-pmbql-query
{:style/indent 1}
[table-name & body]
`(qp/process-query (pmbql-query ~table-name ~@body)))
(defn- $price [query stage-number]
((lib/field (mt/id :venues :price)) query stage-number))
(deftest ^:parallel pipeline-queries-test
(testing "Ensure that the QP can handle pMBQL `:pipeline` queries"
(is (= [6]
(mt/first-row
(run-pmbql-query :venues
(lib/aggregate (lib/count))
(lib/filter (lib/= $price 4))))))))
(deftest ^:parallel denormalized-pipeline-queries-test
(testing "Ensure that the QP can handle pMBQL `:pipeline` queries as they'd come in from the REST API or application database"
(let [query (-> (pmbql-query :venues
(lib/aggregate (lib/count))
(lib/filter (lib/= $price 4)))
(dissoc :lib/metadata)
mt/obj->json->obj)]
(testing (format "Query =\n%s" (u/pprint-to-str query))
(is (= [6]
(mt/first-row (qp/process-query query))))))))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment