From d6b958c1fd27968ad8bd775f7413ba06a7ad3d11 Mon Sep 17 00:00:00 2001
From: Cam Saul <1455846+camsaul@users.noreply.github.com>
Date: Wed, 29 Mar 2023 20:17:10 -0700
Subject: [PATCH] [MLv2] Basic QP support for MLv2 queries (#29635)

* [MLv2] Basic QP support for MLv2 queries

* Fix data/id init on launch

* Fix Kondo errors

* Add missing docstrings

* Seriously Kondo!
---
 .clj-kondo/config.edn                         |  2 +
 src/metabase/lib/convert.cljc                 | 10 +++
 src/metabase/lib/core.cljc                    | 10 ++-
 src/metabase/lib/dev.cljc                     | 13 ++++
 src/metabase/lib/field.cljc                   | 61 +++++++++++----
 .../lib/metadata/cached_provider.cljc         |  2 +-
 src/metabase/lib/metadata/jvm.clj             | 12 ++-
 src/metabase/lib/normalize.cljc               | 78 +++++++++++++++++++
 src/metabase/lib/query.cljc                   |  9 +++
 src/metabase/lib/stage.cljc                   |  9 +++
 src/metabase/lib/util.cljc                    | 54 +++++++++++++
 .../middleware/normalize_query.clj            | 29 ++++---
 test/metabase/lib/normalize_test.cljc         | 66 ++++++++++++++++
 test/metabase/lib/util_test.cljc              |  3 +-
 .../pipeline_queries_test.clj                 | 46 +++++++++++
 15 files changed, 371 insertions(+), 33 deletions(-)
 create mode 100644 src/metabase/lib/normalize.cljc
 create mode 100644 test/metabase/lib/normalize_test.cljc
 create mode 100644 test/metabase/query_processor_test/pipeline_queries_test.clj

diff --git a/.clj-kondo/config.edn b/.clj-kondo/config.edn
index f72aa65e406..526d273b2f6 100644
--- a/.clj-kondo/config.edn
+++ b/.clj-kondo/config.edn
@@ -489,6 +489,8 @@
   metabase.query-processor.error-type/deferror                                         clojure.core/def
   metabase.query-processor.middleware.cache.impl/with-reducible-deserialized-results   clojure.core/let
   metabase.query-processor.middleware.process-userland-query-test/with-query-execution clojure.core/let
+  metabase.query-processor-test.pipeline-queries-test/pmbql-query                      clojure.core/->
+  metabase.query-processor-test.pipeline-queries-test/run-pmbql-query                  clojure.core/->
   metabase.shared.util.namespaces/import-fns                                           potemkin/import-vars
   metabase.sync.util/sum-for                                                           clojure.core/for
   metabase.sync.util/with-emoji-progress-bar                                           clojure.core/let
diff --git a/src/metabase/lib/convert.cljc b/src/metabase/lib/convert.cljc
index 38c83573fb9..064f867450b 100644
--- a/src/metabase/lib/convert.cljc
+++ b/src/metabase/lib/convert.cljc
@@ -163,3 +163,13 @@
                 query-type inner-query})]
     #?(:cljs (js/console.log "->legacy-MBQL on query" query result))
     result))
+
+;;; placeholder, feel free to delete @braden.
+(defmethod ->legacy-MBQL :count
+  [[_tag opts field]]
+  (let [clause (if field
+                 [:count (->legacy-MBQL field)]
+                 [:count])]
+    (if-let [aggregation-options-opts (not-empty (select-keys opts [:name :display-name]))]
+      [:aggregation-options clause aggregation-options-opts]
+      clause)))
diff --git a/src/metabase/lib/core.cljc b/src/metabase/lib/core.cljc
index 03f01df2858..f7c1e5a73d7 100644
--- a/src/metabase/lib/core.cljc
+++ b/src/metabase/lib/core.cljc
@@ -15,6 +15,7 @@
    [metabase.lib.metadata.calculation :as lib.metadata.calculation]
    [metabase.lib.metric :as lib.metric]
    [metabase.lib.native :as lib.native]
+   [metabase.lib.normalize :as lib.normalize]
    [metabase.lib.order-by :as lib.order-by]
    [metabase.lib.query :as lib.query]
    [metabase.lib.segment :as lib.segment]
@@ -34,6 +35,7 @@
          lib.metadata.calculation/keep-me
          lib.metric/keep-me
          lib.native/keep-me
+         lib.normalize/keep-me
          lib.order-by/keep-me
          lib.query/keep-me
          lib.segment/keep-me
@@ -60,7 +62,9 @@
    breakout]
   [lib.dev
    field
-   query-for-table-name]
+   query-for-table-id
+   query-for-table-name
+   table]
   [lib.expression
    expression
    +
@@ -103,6 +107,8 @@
    rtrim
    upper
    lower]
+  [lib.field
+   fields]
   [lib.filter
    filter
    add-filter
@@ -148,6 +154,8 @@
    order-by-clause
    order-bys
    orderable-columns]
+  [lib.normalize
+   normalize]
   [lib.query
    native-query
    query
diff --git a/src/metabase/lib/dev.cljc b/src/metabase/lib/dev.cljc
index b09ef2a5e75..42b3a72a297 100644
--- a/src/metabase/lib/dev.cljc
+++ b/src/metabase/lib/dev.cljc
@@ -47,3 +47,16 @@
     table-name        :- ::lib.schema.common/non-blank-string]
    (let [table-metadata (lib.metadata/table metadata-provider schema-name table-name)]
      (lib.query/query metadata-provider table-metadata))))
+
+(mu/defn query-for-table-id :- ::lib.schema/query
+  "Create a new query for a specific Table with `table-id`."
+  [metadata-provider :- lib.metadata/MetadataProvider
+   table-id          :- ::lib.schema.id/table]
+  (let [table-metadata (lib.metadata/table metadata-provider table-id)]
+    (lib.query/query metadata-provider table-metadata)))
+
+(mu/defn table :- fn?
+  "Returns a function that can be resolved to Table metadata. For use with a [[lib/join]] or something like that."
+  ([id :- ::lib.schema.id/table]
+   (fn [query _stage-number]
+     (lib.metadata/table query id))))
diff --git a/src/metabase/lib/field.cljc b/src/metabase/lib/field.cljc
index f7849ce5554..a6acae705c3 100644
--- a/src/metabase/lib/field.cljc
+++ b/src/metabase/lib/field.cljc
@@ -5,6 +5,7 @@
    [metabase.lib.join :as lib.join]
    [metabase.lib.metadata :as lib.metadata]
    [metabase.lib.metadata.calculation :as lib.metadata.calculation]
+   [metabase.lib.normalize :as lib.normalize]
    [metabase.lib.options :as lib.options]
    [metabase.lib.schema :as lib.schema]
    [metabase.lib.schema.common :as lib.schema.common]
@@ -18,27 +19,44 @@
 
 (comment metabase.lib.schema.ref/keep-me)
 
+(defn- normalize-binning-options [opts]
+  (lib.normalize/normalize-map
+   opts
+   keyword
+   {:strategy keyword}))
+
+(defn- normalize-field-options [opts]
+  (lib.normalize/normalize-map
+   opts
+   keyword
+   {:temporal-unit keyword
+    :binning       normalize-binning-options}))
+
+(defmethod lib.normalize/normalize :field
+  [[tag opts id-or-name]]
+  [(keyword tag) (normalize-field-options opts) id-or-name])
+
 (mu/defn ^:private resolve-field-id :- lib.metadata/ColumnMetadata
   "Integer Field ID: get metadata from the metadata provider. This is probably not 100% the correct thing to do if
   this isn't the first stage of the query, but we can fix that behavior in a follow-on"
-  [query          :- ::lib.schema/query
-   _stage-number  :- :int
-   field-id       :- ::lib.schema.id/field]
+  [query         :- ::lib.schema/query
+   _stage-number :- :int
+   field-id      :- ::lib.schema.id/field]
   (lib.metadata/field query field-id))
 
 (mu/defn ^:private resolve-field-name :- lib.metadata/ColumnMetadata
   "String column name: get metadata from the previous stage, if it exists, otherwise if this is the first stage and we
   have a native query or a Saved Question source query or whatever get it from our results metadata."
-  [query         :- ::lib.schema/query
-   stage-number  :- :int
-   column-name   :- ::lib.schema.common/non-blank-string]
+  [query        :- ::lib.schema/query
+   stage-number :- :int
+   column-name  :- ::lib.schema.common/non-blank-string]
   (or (some (fn [column]
               (when (= (:name column) column-name)
                 column))
-            (if-let [previous-stage-number (lib.util/previous-stage-number query stage-number)]
-              (let [previous-stage (lib.util/query-stage query previous-stage-number)]
-                (:lib/stage-metadata previous-stage))
-              (get-in (lib.util/query-stage query stage-number) [:lib/stage-metadata :columns])))
+            (let [stage (if-let [previous-stage-number (lib.util/previous-stage-number query stage-number)]
+                          (lib.util/query-stage query previous-stage-number)
+                          (lib.util/query-stage query stage-number))]
+              (get-in stage [:lib/stage-metadata :columns])))
       (throw (ex-info (i18n/tru "Invalid :field clause: column {0} does not exist" (pr-str column-name))
                       {:name         column-name
                        :query        query
@@ -67,6 +85,7 @@
   [_query _stage-number field-metadata]
   field-metadata)
 
+;;; TODO -- base type should be affected by `temporal-unit`, right?
 (defmethod lib.metadata.calculation/metadata :field
   [query stage-number [_tag {:keys [base-type temporal-unit], :as opts} :as field-ref]]
   (let [field-metadata (resolve-field-metadata query stage-number field-ref)
@@ -131,13 +150,13 @@
     true      lib.options/ensure-uuid))
 
 (defmethod ->field :dispatch-type/integer
-  [query _stage field-id]
+  [query _stage-number field-id]
   (lib.metadata/field query field-id))
 
-;;; Pass in a function that takes `query` and `stage` to support ad-hoc usage in tests etc
+;;; Pass in a function that takes `query` and `stage-number` to support ad-hoc usage in tests etc
 (defmethod ->field :dispatch-type/fn
-  [query stage f]
-  (f query stage))
+  [query stage-number f]
+  (f query stage-number))
 
 (defmethod lib.temporal-bucket/temporal-bucket* :field
   [[_field options id-or-name] unit]
@@ -153,3 +172,17 @@
 (defmethod lib.join/with-join-alias-method :field
   [field-ref join-alias]
   (lib.options/update-options field-ref assoc :join-alias join-alias))
+
+(defn fields
+  "Specify the `:fields` for a query."
+  ([xs]
+   (fn [query stage-number]
+     (fields query stage-number xs)))
+
+  ([query xs]
+   (fields query -1 xs))
+
+  ([query stage-number xs]
+   (let [xs (mapv #(->field query stage-number %)
+                  xs)]
+     (lib.util/update-query-stage query stage-number assoc :fields xs))))
diff --git a/src/metabase/lib/metadata/cached_provider.cljc b/src/metabase/lib/metadata/cached_provider.cljc
index 2995280baad..2d62bc896ce 100644
--- a/src/metabase/lib/metadata/cached_provider.cljc
+++ b/src/metabase/lib/metadata/cached_provider.cljc
@@ -60,7 +60,7 @@
   #?@(:clj
       [pretty.core/PrettyPrintable
        (pretty [_this]
-               (list `->CachedProxyMetadataProvider cache metadata-provider))]))
+               (list `cached-metadata-provider metadata-provider))]))
 
 (defn cached-metadata-provider
   "Wrap `metadata-provider` with an implementation that automatically caches results.
diff --git a/src/metabase/lib/metadata/jvm.clj b/src/metabase/lib/metadata/jvm.clj
index 0f43e9bd0a4..ee8b75c05c7 100644
--- a/src/metabase/lib/metadata/jvm.clj
+++ b/src/metabase/lib/metadata/jvm.clj
@@ -38,7 +38,7 @@
                               `lib.metadata.protocols/database
                               `UncachedApplicationDatabaseMetadataProvider)
                       {})))
-    (fetch-instance :metadata.models.database/Database database-id))
+    (fetch-instance :metadata/database database-id))
 
   (table   [_this table-id]   (fetch-instance :metadata/table   table-id))
   (field   [_this field-id]   (fetch-instance :metadata/field   field-id))
@@ -53,15 +53,13 @@
                               `UncachedApplicationDatabaseMetadataProvider)
                       {})))
     (log/debugf "Fetching all Tables for Database %d" database-id)
-    (into []
-          (map #(assoc % :lib/type :metadata/table))
-          (t2/reducible-select :metabase.models.table/Table :db_id database-id)))
+    (mapv #(assoc % :lib/type :metadata/table)
+          (t2/select :metabase.models.table/Table :db_id database-id)))
 
   (fields [_this table-id]
     (log/debugf "Fetching all Fields for Table %d" table-id)
-    (into []
-          (map #(assoc % :lib/type :metadata/field))
-          (t2/reducible-select :table_id table-id)))
+    (mapv #(assoc % :lib/type :metadata/field)
+          (t2/select :table_id table-id)))
 
   lib.metadata.protocols/BulkMetadataProvider
   (bulk-metadata [_this metadata-type ids]
diff --git a/src/metabase/lib/normalize.cljc b/src/metabase/lib/normalize.cljc
new file mode 100644
index 00000000000..73af5074942
--- /dev/null
+++ b/src/metabase/lib/normalize.cljc
@@ -0,0 +1,78 @@
+(ns metabase.lib.normalize
+  (:require
+   [metabase.lib.dispatch :as lib.dispatch]))
+
+(defn- mbql-clause-type [x]
+  (when (and (vector? x)
+             ((some-fn keyword? string?) (first x)))
+    (keyword (first x))))
+
+(defn- map-type [m]
+  (when (map? m)
+    (some-> (or
+             (:lib/type m)
+             (get m "lib/type"))
+            keyword)))
+
+(defn- dispatch-value [x]
+  (or
+   (mbql-clause-type x)
+   (map-type x)
+   (keyword (lib.dispatch/dispatch-value x))))
+
+(defmulti normalize
+  "Ensure some part of an MBQL query `x`, e.g. a clause or map, is in the right shape after coming in from JavaScript or
+  deserialized JSON (from the app DB or a REST API request). This is intended for things that are already in a
+  generally correct pMBQL; to 'normalize' things from legacy MBQL, use [[metabase.lib.convert]].
+
+  The default implementation will keywordize keys for maps, and convert some known keys
+  using [[default-map-value-fns]]; for MBQL clauses, it will convert the clause name to a keyword and recursively
+  normalize its options and arguments. Implement this method if you need custom behavior for something."
+  {:arglists '([x])}
+  dispatch-value)
+
+(def default-map-value-fns
+  "Default normalization functions keys when doing map normalization."
+  {:base-type   keyword
+   :type        keyword
+   :lib/type    keyword
+   :lib/options normalize})
+
+(defn normalize-map
+  "[[normalize]] a map using `key-fn` (default [[clojure.core/keyword]]) for keys and
+  `value-fns` (default [[default-map-value-fns]]; additional functions are merged into this map).
+
+  This is the default implementation for maps. Custom map implementations can call this with a different `key-fn` or
+  additional `value-fns` as needed."
+  ([m]
+   (normalize-map m keyword))
+
+  ([m key-fn]
+   (normalize-map m key-fn nil))
+
+  ([m key-fn value-fns]
+   (let [value-fns (merge default-map-value-fns value-fns)]
+     (into {}
+           (map (fn [[k v]]
+                  (let [k (key-fn k)]
+                    [k
+                     (if-let [f (get value-fns k)]
+                       (f v)
+                       v)])))
+           m))))
+
+(defmethod normalize :dispatch-type/map
+  [m]
+  (normalize-map m))
+
+(defn- default-normalize-mbql-clause [[tag opts & args]]
+  (into [(keyword tag) (normalize opts)]
+        (map normalize)
+        args))
+
+(defmethod normalize :default
+  [x]
+  (cond
+    (mbql-clause-type x) (default-normalize-mbql-clause x)
+    (map-type x)         (normalize-map x)
+    :else                x))
diff --git a/src/metabase/lib/query.cljc b/src/metabase/lib/query.cljc
index f3e0d628b35..480246709d6 100644
--- a/src/metabase/lib/query.cljc
+++ b/src/metabase/lib/query.cljc
@@ -4,12 +4,21 @@
    [metabase.lib.dispatch :as lib.dispatch]
    [metabase.lib.metadata :as lib.metadata]
    [metabase.lib.metadata.calculation :as lib.metadata.calculation]
+   [metabase.lib.normalize :as lib.normalize]
    [metabase.lib.options :as lib.options]
    [metabase.lib.schema :as lib.schema]
    [metabase.lib.schema.id :as lib.schema.id]
    [metabase.lib.util :as lib.util]
    [metabase.util.malli :as mu]))
 
+(defmethod lib.normalize/normalize :mbql/query
+  [query]
+  (lib.normalize/normalize-map
+   query
+   keyword
+   {:type   keyword
+    :stages (partial mapv lib.normalize/normalize)}))
+
 (defmethod lib.metadata.calculation/metadata :mbql/query
   [query stage-number x]
   (lib.metadata.calculation/metadata query stage-number (lib.util/query-stage x stage-number)))
diff --git a/src/metabase/lib/stage.cljc b/src/metabase/lib/stage.cljc
index f8c6a59099e..8a7e3ed7801 100644
--- a/src/metabase/lib/stage.cljc
+++ b/src/metabase/lib/stage.cljc
@@ -8,6 +8,7 @@
    [metabase.lib.expression :as lib.expression]
    [metabase.lib.metadata :as lib.metadata]
    [metabase.lib.metadata.calculation :as lib.metadata.calculation]
+   [metabase.lib.normalize :as lib.normalize]
    [metabase.lib.options :as lib.options]
    [metabase.lib.schema :as lib.schema]
    [metabase.lib.schema.common :as lib.schema.common]
@@ -21,6 +22,14 @@
 
 (declare stage-metadata)
 
+(defmethod lib.normalize/normalize :mbql.stage/mbql
+  [stage]
+  (lib.normalize/normalize-map
+   stage
+   keyword
+   {:aggregation (partial mapv lib.normalize/normalize)
+    :filter      lib.normalize/normalize}))
+
 (mu/defn ^:private fields-columns :- [:maybe [:sequential lib.metadata/ColumnMetadata]]
   [query        :- ::lib.schema/query
    stage-number :- :int]
diff --git a/src/metabase/lib/util.cljc b/src/metabase/lib/util.cljc
index 90ac25a9641..55800e16646 100644
--- a/src/metabase/lib/util.cljc
+++ b/src/metabase/lib/util.cljc
@@ -86,6 +86,14 @@
   (let [previous-stages (if source-query
                           (inner-query->stages source-query)
                           [])
+        source-metadata (when source-metadata
+                          (-> (if (vector? source-metadata)
+                                {:columns source-metadata}
+                                source-metadata)
+                              (update :columns (fn [columns]
+                                                 (for [column columns]
+                                                   (assoc column :lib/type :metadata/field))))
+                              (assoc :lib/type :metadata/results)))
         previous-stages (cond-> previous-stages
                           source-metadata (assoc-in [(dec (count previous-stages)) :lib/stage-metadata] source-metadata))
         stage-type      (if (:native inner-query)
@@ -156,6 +164,12 @@
   (let [{:keys [stages]} (pipeline query)]
     (get (vec stages) (non-negative-stage-index stages stage-number))))
 
+(mu/defn previous-stage :- [:maybe ::lib.schema/stage]
+  "Return the previous stage of the query, if there is one; otherwise return `nil`."
+  [query stage-number :- :int]
+  (when-let [stage-num (previous-stage-number query stage-number)]
+    (query-stage query stage-num)))
+
 (mu/defn update-query-stage :- ::lib.schema/query
   "Update a specific `stage-number` of a `query` by doing
 
@@ -198,3 +212,43 @@
            ","
            conjunction
            (last coll)))))))
+
+(defn update-stages-ignore-joins
+  "Like [[update-stages]], but does not recurse into the stages inside joins.
+
+  `f` has the signature
+
+    (f query stage-number stage)"
+  [query f]
+  (reduce
+   (fn [query stage-number]
+     (update-in query [:stages stage-number] (fn [stage]
+                                               (f query stage-number stage))))
+   query
+   (range 0 (count (:stages query)))))
+
+(defn update-stages
+  "Apply function `f` to every stage of a query, depth-first. Also applied to all query stages.
+
+  `f` has the signature
+
+    (f query stage-number stage)
+
+  `query` reflects the results of the previous call to `f`.
+
+  As a convenience, if `f` returns nil, the original stage will be used without changes."
+  [query f]
+  (letfn [(update-join [join]
+            (-> query
+                (assoc :stages (:stages join))
+                (update-stages f)
+                :stages))
+          (update-joins [joins]
+            (mapv update-join joins))]
+    (update-stages-ignore-joins
+     query
+     (fn [query stage-number stage]
+       (let [stage (cond-> stage
+                     (:joins stage)
+                     update-joins)]
+         (f query stage-number stage))))))
diff --git a/src/metabase/query_processor/middleware/normalize_query.clj b/src/metabase/query_processor/middleware/normalize_query.clj
index 5970f73a412..26497316cee 100644
--- a/src/metabase/query_processor/middleware/normalize_query.clj
+++ b/src/metabase/query_processor/middleware/normalize_query.clj
@@ -1,6 +1,8 @@
 (ns metabase.query-processor.middleware.normalize-query
   "Middleware that converts a query into a normalized, canonical form."
   (:require
+   [metabase.lib.convert :as lib.convert]
+   [metabase.lib.core :as lib]
    [metabase.mbql.normalize :as mbql.normalize]
    [metabase.query-processor.error-type :as qp.error-type]
    [metabase.util :as u]
@@ -8,18 +10,27 @@
 
 (set! *warn-on-reflection* true)
 
+(defn- normalize* [query]
+  (try
+    (let [query-type (keyword ((some-fn :type #(get % "type")) query))
+          normalized (case query-type
+                       :pipeline
+                       (lib.convert/->legacy-MBQL (lib/normalize query))
+
+                       (:query :native)
+                       (mbql.normalize/normalize query))]
+      (log/tracef "Normalized query:\n%s\n=>\n%s" (u/pprint-to-str query) (u/pprint-to-str normalized))
+      normalized)
+    (catch Throwable e
+      (throw (ex-info (.getMessage e)
+                      {:type  qp.error-type/qp
+                       :query query}
+                      e)))))
+
 (defn normalize
   "Middleware that converts a query into a normalized, canonical form, including things like converting all identifiers
   into standard `lisp-case` ones, removing/rewriting legacy clauses, removing empty ones, etc. This is done to
   simplifiy the logic in the QP steps following this."
   [qp]
   (fn [query rff context]
-    (let [query' (try
-                   (u/prog1 (mbql.normalize/normalize query)
-                     (log/tracef "Normalized query:\n%s" (u/pprint-to-str <>)))
-                   (catch Throwable e
-                     (throw (ex-info (.getMessage e)
-                                     {:type  qp.error-type/invalid-query
-                                      :query query}
-                                     e))))]
-      (qp query' rff context))))
+    (qp (normalize* query) rff context)))
diff --git a/test/metabase/lib/normalize_test.cljc b/test/metabase/lib/normalize_test.cljc
new file mode 100644
index 00000000000..62736c7cb71
--- /dev/null
+++ b/test/metabase/lib/normalize_test.cljc
@@ -0,0 +1,66 @@
+(ns metabase.lib.normalize-test
+  (:require
+   [clojure.test :refer [are deftest is testing]]
+   [metabase.lib.core :as lib]))
+
+(deftest ^:parallel normalize-query-type-test
+  (testing "Query type should get normalized"
+    (is (= {:type :native}
+           (lib/normalize {:type "native"})))))
+
+(deftest ^:parallel do-not-normalize-native-queries-test
+  (testing "native queries should NOT get normalized"
+    (are [x expected] (= expected
+                         (lib/normalize x))
+      {"lib/type" "mbql/query"
+       "stages"   [{"lib/type" "mbql.stage/native"
+                    "native"   "SELECT COUNT(*) FROM CANS;"}]}
+      {:lib/type :mbql/query
+       :stages   [{:lib/type :mbql.stage/native
+                   :native   "SELECT COUNT(*) FROM CANS;"}]}
+
+      {:lib/type :mbql/query
+       :stages   [{"lib/type" "mbql.stage/native"
+                   "native"   {:NAME         "FAKE_QUERY"
+                               "description" "Theoretical fake query in a JSON-based query lang"}}]}
+      {:lib/type :mbql/query
+       :stages   [{:lib/type :mbql.stage/native
+                   :native   {:NAME         "FAKE_QUERY"
+                              "description" "Theoretical fake query in a JSON-based query lang"}}]})))
+
+(deftest ^:parallel normalize-value-test
+  (testing ":value clauses should keep snake_case keys in the type info arg"
+    ;; See https://github.com/metabase/metabase/issues/23354 for details
+    (is (= [:value {:some_key "some key value"} "some value"]
+           (lib/normalize [:value {:some_key "some key value"} "some value"])))))
+
+(deftest ^:parallel e2e-test
+  (is (= {:lib/type :mbql/query
+          :database 1
+          :type     :pipeline
+          :stages   [{:lib/type     :mbql.stage/mbql
+                      :lib/options  {:lib/uuid "a8f41095-1e37-4da6-a4e5-51a9c2c3f523"}
+                      :source-table 1
+                      :aggregation  [[:count {:lib/uuid "a6685a7d-62b3-4ceb-a13f-f9db405dcb49"}]]
+                      :filter       [:=
+                                     {:lib/uuid "c4984ada-f8fe-4ac2-b6b4-45885527f5b4"}
+                                     [:field
+                                      {:base-type :type/Integer
+                                       :lib/uuid "5a84d551-ea5f-44f4-952f-2162f05cdcc4"}
+                                      1]
+                                     4]}]}
+         (lib/normalize
+          {"lib/type" "mbql/query"
+           "database" 1
+           "type"     "pipeline"
+           "stages"   [{"lib/type"     "mbql.stage/mbql"
+                        "lib/options"  {"lib/uuid" "a8f41095-1e37-4da6-a4e5-51a9c2c3f523"}
+                        "source-table" 1
+                        "aggregation"  [["count" {"lib/uuid" "a6685a7d-62b3-4ceb-a13f-f9db405dcb49"}]]
+                        "filter"       ["="
+                                        {"lib/uuid" "c4984ada-f8fe-4ac2-b6b4-45885527f5b4"}
+                                        ["field"
+                                         {"base-type" "type/Integer"
+                                          "lib/uuid"  "5a84d551-ea5f-44f4-952f-2162f05cdcc4"}
+                                         1]
+                                        4]}]}))))
diff --git a/test/metabase/lib/util_test.cljc b/test/metabase/lib/util_test.cljc
index aeb3da851b7..fb9bc806cad 100644
--- a/test/metabase/lib/util_test.cljc
+++ b/test/metabase/lib/util_test.cljc
@@ -90,7 +90,8 @@
              :type     :pipeline
              :stages   [{:lib/type           :mbql.stage/mbql
                          :source-table       (meta/id :venues)
-                         :lib/stage-metadata [(meta/field-metadata :venues :id)]}
+                         :lib/stage-metadata {:lib/type :metadata/results
+                                              :columns  [(meta/field-metadata :venues :id)]}}
                         {:lib/type :mbql.stage/mbql}]}
             (lib.util/pipeline
              {:database (meta/id)
diff --git a/test/metabase/query_processor_test/pipeline_queries_test.clj b/test/metabase/query_processor_test/pipeline_queries_test.clj
new file mode 100644
index 00000000000..523ade99025
--- /dev/null
+++ b/test/metabase/query_processor_test/pipeline_queries_test.clj
@@ -0,0 +1,46 @@
+(ns metabase.query-processor-test.pipeline-queries-test
+  (:require
+   [clojure.test :refer :all]
+   [metabase.lib.core :as lib]
+   [metabase.lib.metadata.jvm :as lib.metadata.jvm]
+   [metabase.query-processor :as qp]
+   [metabase.test :as mt]
+   [metabase.util :as u]))
+
+(defn- metadata-provider []
+  (lib.metadata.jvm/application-database-metadata-provider (mt/id)))
+
+;;; this stuff is mostly so we can get a sense of what using MLv2 in tests will ultimately look like
+
+(defmacro ^:private pmbql-query
+  {:style/indent 1}
+  [table-name & body]
+  `(-> (lib/query-for-table-id (metadata-provider) (mt/id ~(keyword table-name)))
+       ~@body))
+
+(defmacro ^:private run-pmbql-query
+  {:style/indent 1}
+  [table-name & body]
+  `(qp/process-query (pmbql-query ~table-name ~@body)))
+
+(defn- $price [query stage-number]
+  ((lib/field (mt/id :venues :price)) query stage-number))
+
+(deftest ^:parallel pipeline-queries-test
+  (testing "Ensure that the QP can handle pMBQL `:pipeline` queries"
+    (is (= [6]
+           (mt/first-row
+            (run-pmbql-query :venues
+              (lib/aggregate (lib/count))
+              (lib/filter (lib/= $price 4))))))))
+
+(deftest ^:parallel denormalized-pipeline-queries-test
+  (testing "Ensure that the QP can handle pMBQL `:pipeline` queries as they'd come in from the REST API or application database"
+    (let [query (-> (pmbql-query :venues
+                      (lib/aggregate (lib/count))
+                      (lib/filter (lib/= $price 4)))
+                    (dissoc :lib/metadata)
+                    mt/obj->json->obj)]
+      (testing (format "Query =\n%s" (u/pprint-to-str query))
+        (is (= [6]
+               (mt/first-row (qp/process-query query))))))))
-- 
GitLab