Skip to content
Snippets Groups Projects
Unverified Commit 1918dc1f authored by Cam Saul's avatar Cam Saul Committed by GitHub
Browse files

Remove QP context cancelf and metadataf fn which aren't needed/used (#12065)

* Remove QP context cancelf and metadataf functions which aren't needed/used

* Test fixes :wrench:
parent f8349b51
Branches
Tags
No related merge requests found
......@@ -90,27 +90,23 @@
[driver honeysql-form] [[driver dataset] honeysql-form]
[driver [sql & params]] [[driver dataset] [sql & params]])}
[driver-or-driver+dataset sql-args]
(if (map? sql-args)
(recur driver-or-driver+dataset (hsql/format sql-args))
(let [[driver dataset] (u/one-or-many driver-or-driver+dataset)
[sql & params] (u/one-or-many sql-args)
canceled-chan (a/promise-chan)]
(try
(driver/with-driver driver
(letfn [(thunk []
(with-open [conn (sql-jdbc.execute/connection-with-timezone driver (mt/db) (qp.timezone/report-timezone-id-if-supported))
stmt (sql-jdbc.execute/prepared-statement driver conn sql params)
rs (sql-jdbc.execute/execute-query! driver stmt)]
(let [rsmeta (.getMetaData rs)]
(reduce
(fn [result row]
(update result :rows conj row))
{:rows []
:cols (sql-jdbc.execute/column-metadata driver rsmeta)}
(sql-jdbc.execute/reducible-rows driver rs rsmeta canceled-chan)))))]
(if dataset
(data.impl/do-with-dataset (data.impl/resolve-dataset-definition *ns* dataset) thunk)
(thunk))))
(catch InterruptedException e
(a/>!! canceled-chan :cancel)
(throw e))))))
(let [[driver dataset] (u/one-or-many driver-or-driver+dataset)
[sql & params] (if (map? sql-args)
(hsql/format sql-args)
(u/one-or-many sql-args))
canceled-chan (a/promise-chan)]
(try
(driver/with-driver driver
(letfn [(thunk []
(with-open [conn (sql-jdbc.execute/connection-with-timezone driver (mt/db) (qp.timezone/report-timezone-id-if-supported))
stmt (sql-jdbc.execute/prepared-statement driver conn sql params)
rs (sql-jdbc.execute/execute-query! driver stmt)]
(let [rsmeta (.getMetaData rs)]
{:cols (sql-jdbc.execute/column-metadata driver rsmeta)
:rows (reduce conj [] (sql-jdbc.execute/reducible-rows driver rs rsmeta canceled-chan))})))]
(if dataset
(data.impl/do-with-dataset (data.impl/resolve-dataset-definition *ns* dataset) thunk)
(thunk))))
(catch InterruptedException e
(a/>!! canceled-chan :cancel)
(throw e)))))
......@@ -287,7 +287,7 @@
rows [["Toucan Sighting" 1000]]
context {:timeout 500
:runf (fn [query rff context]
(let [metadata (qp.context/metadataf {:cols cols} context)]
(let [metadata {:cols cols}]
(qp.context/reducef rff context metadata rows)))}
qp (fn [query]
(qp/process-query query context))]
......
(ns metabase.query-processor.context
"Interface for the QP context/utility functions for using the things in the context correctly.
The default implementations of all these functions live in `metabase.query-processor.context.default`; refer to
those when overriding individual functions. Some wiring for the `core.async` channels takes place in
`metabase.query-processor.reducible.`"
(:require [metabase.async.util :as async.u]))
(defn raisef
......@@ -8,14 +13,29 @@
{:pre [(fn? raisef*)]}
(raisef* e context))
;; Normal flow is something like
;; [middleware] -> preprocessedf -> nativef -> runf -> executef -> metadataf -> reducef -> reducedf -|
; -+-> resultf -> out-chan
;; raisef ----------------------------------------------------------------------------------|
;; |
;; cancelf ->-+
;; Normal flow is something like:
;;
;; [middleware] → preprocessedf → nativef → runf → executef → reducef → reducedf -\
;; ↓ ↦ resultf → out-chan
;; [Exception] → raisef ---------------------------------------------------------/ ↑
;; ↑ |
;; timeoutf |
;; ↑ |
;; [time out] [out-chan closed early] |
;; ↓ [closes] |
;; canceled-chan ----------------------------------------------------/
;; ↑
;; [message sent to canceled chan]
;;
;; 1. Query normally runs thru middleware and then a series of context functions as described above; result is sent thru
;; `resultf` and finally to `out-chan`
;;
;; 2. If an `Exception` is thrown, it is sent thru `raisef`, `resultf` and finally to `out-chan`
;;
;; 3. If the query times out, `timeoutf` throws an Exception
;;
;; 4. If the query is canceled (either by closing `out-chan` before it gets a result, or by sending `canceled-chan` a
;; message), the execution is canceled and `out-chan` is closed (if not already closed).
(defn runf
"Called by pivot fn to run preprocessed query. Normally, this simply calls `executef`, but you can override this for
test purposes. The result of this function is ignored."
......@@ -56,13 +76,6 @@
{:pre [(fn? reducedf*)]}
(reducedf* metadata reduced-rows context))
(defn metadataf
"Called upon receiving metadata from driver."
{:arglists '([metadata context])}
[metadata {metadataf* :metadataf, :as context}]
{:pre [(fn? metadataf*)], :post [(map? %)]}
(metadataf* metadata context))
(defn preprocessedf
"Called when query is fully preprocessed."
{:arglsts '([query context])}
......@@ -84,15 +97,8 @@
{:pre [(fn? timeoutf*)]}
(timeoutf* context))
(defn cancelf
"Call this function to cancel a query."
{:arglists '([context])}
[{cancelf* :cancelf, :as context}]
{:pre [(fn? cancelf*)]}
(cancelf* context))
(defn resultf
"ALWAYS alled exactly once with the final result, which is the result of either `reducedf` or `raisef`."
"Called exactly once with the final result, which is the result of either `reducedf` or `raisef`."
{:arglists '([result context])}
[result {resultf* :resultf, :as context}]
{:pre [(fn? resultf*)]}
......
......@@ -43,7 +43,6 @@
(vswap! rows conj row)
result))))
;; TODO - not 100% this makes sense -- should rows always be merged into metadata?
(defn- default-reducedf [metadata reduced-result context]
(context/resultf reduced-result context))
......@@ -53,18 +52,17 @@
`metabase.query-processor.reducible-test/write-rows-to-file-test` for an example of a custom implementation."
[rff context metadata reducible-rows]
{:pre [(fn? rff)]}
(let [metadata (context/metadataf metadata context)]
;; TODO -- how to pass updated metadata to reducedf?
(let [rf (rff metadata)]
(assert (fn? rf))
(when-let [reduced-rows (try
(transduce identity rf reducible-rows)
(catch Throwable e
(context/raisef (ex-info (tru "Error reducing result rows")
{:type error-type/qp}
e)
context)))]
(context/reducedf metadata reduced-rows context)))))
;; TODO -- how to pass updated metadata to reducedf?
(let [rf (rff metadata)]
(assert (fn? rf))
(when-let [reduced-rows (try
(transduce identity rf reducible-rows)
(catch Throwable e
(context/raisef (ex-info (tru "Error reducing result rows")
{:type error-type/qp}
e)
context)))]
(context/reducedf metadata reduced-rows context))))
(defn- default-runf [query rf context]
(try
......@@ -95,12 +93,6 @@
:type error-type/timed-out})
context)))
(defn- default-cancelf [context]
(log/debug (trs "Query canceled before finishing."))
(let [canceled-chan (context/canceled-chan context)]
(a/>!! canceled-chan :cancel)
(a/close! canceled-chan)))
(defn- identity1
"Util fn. Takes 2 args and returns the first arg as-is."
[x _]
......@@ -116,11 +108,9 @@
:executef driver/execute-reducible-query
:reducef default-reducef
:reducedf default-reducedf
:metadataf identity1
:preprocessedf identity1
:nativef identity1
:cancelf default-cancelf
:timeoutf default-timeoutf
:resultf default-resultf
:canceled-chan (a/promise-chan identity identity)
:out-chan (a/promise-chan identity identity)})
:canceled-chan (a/promise-chan)
:out-chan (a/promise-chan)})
......@@ -9,7 +9,7 @@
"Wire up the core.async channels in a QP `context`."
[context]
;; 1) If query doesn't complete by `timeoutf`, call `timeoutf`, which should raise an Exception
;; 2) when `out-chan` is closed prematurely, call `cancelf` to send a message to `canceled-chan`
;; 2) when `out-chan` is closed prematurely, send a message to `canceled-chan`
;; 3) when `out-chan` is closed or gets a result, close both out-chan and canceled-chan
(let [out-chan (context/out-chan context)
canceled-chan (context/canceled-chan context)
......@@ -21,7 +21,7 @@
val)
(cond
(not= port out-chan) (context/timeoutf context)
(nil? val) (context/cancelf context))
(nil? val) (a/>!! canceled-chan ::cancel))
(log/tracef "Closing out-chan.")
(a/close! out-chan)
(a/close! canceled-chan)))
......
......@@ -96,26 +96,28 @@
(testing "Keepalive newlines should be written while waiting for a response."
(with-redefs [streaming-response/keepalive-interval-ms 50]
(with-test-driver-db
(is (re= #"(?s)^\n{3,}\{\"data\":.*$"
(:body (http/post (test-client/build-url "dataset" nil)
(test-client/build-request-map (mt/user->credentials :lucky)
{:database (mt/id)
:type "native"
:native {:query {:sleep 300}}})))))))))
(let [url (test-client/build-url "dataset" nil)
session-token (test-client/authenticate (mt/user->credentials :lucky))
request (test-client/build-request-map session-token
{:database (mt/id)
:type "native"
:native {:query {:sleep 300}}})]
(is (re= #"(?s)^\n{3,}\{\"data\":.*$"
(:body (http/post url request)))))))))
(deftest cancelation-test
(testing "Make sure canceling a HTTP request ultimately causes the query to be canceled"
(with-redefs [streaming-response/keepalive-interval-ms 50]
(with-test-driver-db
(reset! canceled? false)
(let [futur (http/post (test-client/build-url "dataset" nil)
(assoc (test-client/build-request-map (mt/user->credentials :lucky)
{:database (mt/id)
:type "native"
:native {:query {:sleep 5000}}})
:async true)
identity
(fn [e] (throw e)))]
(let [url (test-client/build-url "dataset" nil)
session-token (test-client/authenticate (mt/user->credentials :lucky))
request (test-client/build-request-map session-token
{:database (mt/id)
:type "native"
:native {:query {:sleep 5000}}})
futur (http/post url (assoc request :async? true) identity (fn [e] (throw e)))]
(is (future? futur))
(Thread/sleep 100)
(future-cancel futur)
(Thread/sleep 100)
......
......@@ -115,7 +115,7 @@
(is (= 'canceled-chan
(if (= port canceled-chan) 'canceled-chan 'timeout))
"port")
(is (= :cancel
(is (= :metabase.query-processor.reducible/cancel
val)
"val")))
(testing "No QueryExecution should get saved when a query is canceled"
......
......@@ -96,7 +96,6 @@
:query {:source-table (mt/id :venues), :limit 2, :order-by [[:asc (mt/id :venues :id)]]}}
{:rff maps-rff}))))))
;; TODO - fix me
(deftest cancelation-test
(testing "Example of canceling a query early before results are returned."
(letfn [(process-query [canceled-chan timeout]
......@@ -114,14 +113,14 @@
(mt/with-open-channels [canceled-chan (a/promise-chan)]
(let [out-chan (process-query canceled-chan 1000)]
(a/close! out-chan)
(is (= :cancel
(is (= ::qp.reducible/cancel
(first (a/alts!! [canceled-chan (a/timeout 500)]))))))
(mt/with-open-channels [canceled-chan (a/promise-chan)]
(let [out-chan (process-query canceled-chan 1000)]
(future
(Thread/sleep 50)
(a/close! out-chan))
(is (= :cancel
(is (= ::qp.reducible/cancel
(a/<!! canceled-chan)))
(is (= nil
(a/<!! out-chan)))))
......
......@@ -219,8 +219,7 @@
:runf (fn [query rff context]
(try
(when run (run))
(let [metadata (qp.context/metadataf metadata context)]
(qp.context/reducef rff context (assoc metadata :pre query) rows))
(qp.context/reducef rff context (assoc metadata :pre query) rows)
(catch Throwable e
(println "Error in test-qp-middleware runf:" e)
(throw e))))}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment