Skip to content
Snippets Groups Projects
Commit 2186d013 authored by Cam Saul's avatar Cam Saul
Browse files

Make some QP middleware async :twisted_rightwards_arrows:

[ci druid]
parent b9a9f623
No related branches found
No related tags found
No related merge requests found
Showing
with 373 additions and 247 deletions
......@@ -343,21 +343,17 @@
;; Query cancellation test, needs careful coordination between the query thread, cancellation thread to ensure
;; everything works correctly together
(datasets/expect-with-driver :druid
[false ;; Ensure the query promise hasn't fired yet
false ;; Ensure the cancellation promise hasn't fired yet
true ;; Was query called?
false ;; Cancel should not have been called yet
true ;; Cancel should have been called now
true ;; The paused query can proceed now
]
::tu/success
(tu/call-with-paused-query
(fn [query-thunk called-query? called-cancel? pause-query]
(future
;; stub out the query and delete functions so that we know when one is called vs. the other
(with-redefs [druid/do-query (fn [details query] (deliver called-query? true) @pause-query)
druid/DELETE (fn [url] (deliver called-cancel? true))]
(data/run-mbql-query checkins
{:aggregation [[:count]]}))))))
(try
;; stub out the query and delete functions so that we know when one is called vs. the other
(with-redefs [druid/do-query (fn [details query] (deliver called-query? true) @pause-query)
druid/DELETE (fn [url] (deliver called-cancel? true))]
(query-thunk))
(catch Throwable e
(println "Error running query:" e)))))))
;; Make sure Druid cols + columns come back in the same order and that that order is the expected MBQL columns order
;; (#9294)
......
......@@ -71,26 +71,27 @@
second permit if this thread already has one."
{})
(s/defn ^:private do-f-with-permit
"Once a `permit` is obtained, execute `(apply f args)`, writing the results to `output-chan`, and returning the permit
no matter what."
[permit :- Closeable, out-chan :- async.u/PromiseChan, f & args]
(try
(let [canceled-chan (async.u/single-value-pipe (apply async.u/do-on-separate-thread f args) out-chan)]
;; canceled-chan will be called whenever `out-chan` is closed, either because `single-value-pipe` closes it when
;; it gets a result from `do-on-separate-thread`, or because someone else closed it earlier (i.e. canceled API
;; request). When this happens return the permit
(s/defn ^:private do-with-existing-permit-for-current-thread :- (s/maybe async.u/PromiseChan)
[semaphore-chan f & args]
(when (get *permits* semaphore-chan)
(log/debug (trs "Current thread already has a permit, will not wait to acquire another"))
(apply async.u/do-on-separate-thread f args)))
(s/defn ^:private do-with-permit :- async.u/PromiseChan
[semaphore-chan, permit :- Closeable, f & args]
(binding [*permits* (assoc *permits* semaphore-chan permit)]
(let [out-chan (apply async.u/do-on-separate-thread f args)]
;; whenever `out-chan` closes return the permit
(a/go
(if (a/<! canceled-chan)
(log/debug (trs "request canceled, permit will be returned"))
(log/debug (trs "f finished, permit will be returned")))
(.close permit)))
(catch Throwable e
(log/error e (trs "Unexpected error attempting to run function after obtaining permit"))
(a/>! out-chan e)
(a/close! out-chan)
(.close permit)))
nil)
(a/<! out-chan)
(.close permit))
out-chan)))
(s/defn ^:private do-with-immediately-available-permit :- (s/maybe async.u/PromiseChan)
[semaphore-chan f & args]
(when-let [^Closeable permit (a/poll! semaphore-chan)]
(log/debug (trs "Permit available without waiting, will run fn immediately"))
(apply do-with-permit semaphore-chan permit f args)))
(s/defn ^:private do-after-waiting-for-new-permit :- async.u/PromiseChan
[semaphore-chan f & args]
......@@ -98,18 +99,21 @@
;; fire off a go block to wait for a permit.
(a/go
(let [[permit first-done] (a/alts! [semaphore-chan out-chan])]
;; If out-chan closes before we get a permit, there's nothing for us to do here. Otherwise if we got our
;; permit then proceed
(condp = first-done
out-chan
(cond
;; If out-chan closes before we get a permit, there's nothing for us to do here.
(= first-done out-chan)
(log/debug (trs "Not running pending function call: output channel already closed."))
;; otherwise if channel is still open run the function
semaphore-chan
(if-not permit
;; if `semaphore-chan` is closed for one reason or another we'll never get a permit so log a warning and
;; close the output channel
(not permit)
(do
(log/warn (trs "Warning: semaphore-channel is closed, will not run pending function call"))
(binding [*permits* (assoc *permits* semaphore-chan permit)]
(apply do-f-with-permit permit out-chan f args))))))
(a/close! out-chan))
;; otherwise we got a permit and chan run f with it now.
permit
(async.u/single-value-pipe (apply do-with-permit semaphore-chan permit f args) out-chan))))
;; return `out-chan` which can be used to wait for results
out-chan))
......@@ -118,11 +122,7 @@
can fetch the results. Closing this channel before results are produced will cancel the function call."
{:style/indent 1}
[semaphore-chan f & args]
;; check and see whether we already have a permit for `semaphore-chan`, if so, go ahead and run the function right
;; away instead of waiting for *another* permit
(if (get *permits* semaphore-chan)
(do
(log/debug (trs "Current thread already has a permit for {0}, will not wait to acquire another" semaphore-chan))
(apply async.u/do-on-separate-thread f args))
;; otherwise wait for a permit
(apply do-after-waiting-for-new-permit semaphore-chan f args)))
(some #(apply % semaphore-chan f args)
[do-with-existing-permit-for-current-thread
do-with-immediately-available-permit
do-after-waiting-for-new-permit]))
......@@ -5,7 +5,8 @@
[metabase.util.i18n :refer [trs]]
[schema.core :as s])
(:import clojure.core.async.impl.buffers.PromiseBuffer
clojure.core.async.impl.channels.ManyToManyChannel))
clojure.core.async.impl.channels.ManyToManyChannel
java.util.concurrent.Future))
(defn promise-chan?
"Is core.async `chan` a `promise-chan`?"
......@@ -58,6 +59,31 @@
;; return the canceled chan in case someone wants to listen to it
canceled-chan))
(s/defn ^:private do-on-separate-thread* :- Future
[out-chan canceled-chan f & args]
(future
(try
(if (a/poll! canceled-chan)
(log/debug (trs "Output channel closed, will skip running {0}." f))
(do
(log/debug (trs "Running {0} on separate thread..." f))
(try
(let [result (apply f args)]
(cond
(nil? result)
(log/warn "Warning: {0} returned `nil`" f)
(not (a/>!! out-chan result))
(log/error (trs "Unexpected error writing result to output channel: already closed"))))
;; if we catch an Exception (shouldn't happen in a QP query, but just in case), send it to `chan`.
;; It's ok, our IMPL of Ring `StreamableResponseBody` will do the right thing with it.
(catch Throwable e
(log/error e (trs "Caught error running {0}" f))
(when-not (a/>!! out-chan e)
(log/error (trs "Unexpected error writing exception to output channel: already closed")))))))
(finally
(a/close! out-chan)))))
(s/defn do-on-separate-thread :- PromiseChan
"Run `(apply f args)` on a separate thread, returns a channel to fetch the results. Closing this channel early will
cancel the future running the function, if possible."
......@@ -66,28 +92,10 @@
canceled-chan (promise-canceled-chan out-chan)
;; Run `f` on a separarate thread because it's a potentially long-running QP query and we don't want to tie
;; up precious core.async threads
futur
(future
(try
(if (a/poll! canceled-chan)
(log/debug (trs "Output channel closed, will skip running {0}." f))
(do
(log/debug (trs "Running {0} on separate thread..." f))
(try
(let [result (apply f args)]
(if (some? result)
(a/put! out-chan result)
(log/warn "Warning: {0} returned `nil`" f)))
;; if we catch an Exception (shouldn't happen in a QP query, but just in case), send it to `chan`.
;; It's ok, our IMPL of Ring `StreamableResponseBody` will do the right thing with it.
(catch Throwable e
(log/error e (trs "Caught error running {0}" f))
(a/put! out-chan e)))))
(finally
(a/close! out-chan))))]
futur (apply do-on-separate-thread* out-chan canceled-chan f args)]
;; if output chan is closed early cancel the future
(a/go
(when (a/<! canceled-chan)
(log/debug (trs "Request canceled, canceling future"))
(log/debug (trs "Request canceled, canceling future."))
(future-cancel futur)))
out-chan))
......@@ -16,7 +16,8 @@
[i18n :refer [trs tru]]
[schema :as su]]
[schema.core :as s]
[toucan.db :as db]))
[toucan.db :as db])
(:import org.joda.time.DateTime))
(declare notify-database-updated)
......@@ -672,9 +673,9 @@
dispatch-on-initialized-driver
:hierarchy #'hierarchy)
(defmulti current-db-time
(defmulti ^DateTime current-db-time
"Return the current time and timezone from the perspective of `database`. You can use
`metabase.driver.common/current-db-time` to implement this."
`metabase.driver.common/current-db-time` to implement this. This should return a Joda-Time `DateTime`."
{:arglists '([driver database])}
dispatch-on-initialized-driver
:hierarchy #'hierarchy)
......
......@@ -108,7 +108,9 @@
;;; +----------------------------------------------------------------------------------------------------------------+
(defprotocol ^:private ParseDateTimeString
(^:private parse [this date-time-str] "Parse the `date-time-str` and return a `DateTime` instance"))
(^:private parse
^DateTime [this date-time-str]
"Parse the `date-time-str` and return a `DateTime` instance."))
(extend-protocol ParseDateTimeString
DateTimeFormatter
......@@ -138,7 +140,7 @@
(defn- first-successful-parse
"Attempt to parse `time-str` with each of `date-formatters`, returning the first successful parse. If there are no
successful parses throws the exception that the last formatter threw."
[date-formatters time-str]
^DateTime [date-formatters time-str]
(or (some #(u/ignore-exceptions (parse % time-str)) date-formatters)
(doseq [formatter (reverse date-formatters)]
(parse formatter time-str))))
......@@ -163,7 +165,7 @@
`current-db-time-date-formatters` multimethods defined above. Execute a native query for the current time, and parse
the results using the date formatters, preserving the timezone. To use this implementation, you must implement the
aforementioned multimethods; no default implementation is provided."
[driver database]
^DateTime [driver database]
{:pre [(map? database)]}
(let [native-query (current-db-time-native-query driver)
date-formatters (current-db-time-date-formatters driver)
......
......@@ -134,7 +134,7 @@
resolve-database/resolve-database
fetch-source-query/fetch-source-query
store/initialize-store
log-query/log-initial-query
log-query/log-query
;; ▲▲▲ SYNC MIDDLEWARE ▲▲▲
;;
;; All middleware above this point is written in the synchronous 1-arg style. All middleware below is written in
......
......@@ -5,7 +5,8 @@
[util :as qputil]]))
(defn add-row-count-and-status
"Wrap the results of a successfully processed query in the format expected by the frontend (add `row_count` and `status`)."
"Wrap the results of a successfully processed query in the format expected by the frontend (add `row_count` and
`status`)."
[qp]
(fn [{{:keys [max-results max-results-bare-rows]} :constraints, :as query}]
(let [results-limit (or (when (qputil/query-without-aggregations-or-limits? query)
......
......@@ -25,6 +25,40 @@
;;; | async-setup |
;;; +----------------------------------------------------------------------------------------------------------------+
(defn- respond-fn [out-chan canceled-chan]
(fn [result]
(try
;; out-chan might already be closed if query was canceled. NBD if that's the case
(a/>!! out-chan (if (nil? result)
(Exception. (str (trs "Unexpectedly got `nil` Query Processor response.")))
result))
(finally
(a/close! out-chan)))))
(defn- raise-fn [out-chan respond]
(fn [e]
(if (instance? InterruptedException e)
(do
(log/debug (trs "Got InterruptedException. Canceling query."))
(a/close! out-chan))
(do
(log/warn e (trs "Unhandled exception, exepected `catch-exceptions` middleware to handle it."))
(respond e)))))
(defn- async-args []
(let [out-chan (a/promise-chan)
canceled-chan (async.u/promise-canceled-chan out-chan)
respond (respond-fn out-chan canceled-chan)
raise (raise-fn out-chan respond)]
{:out-chan out-chan, :canceled-chan canceled-chan, :respond respond, :raise raise}))
(defn- wait-for-result [out-chan]
;; TODO - there should probably be some sort of max timeout here for out-chan. At least for test/dev purposes
(let [result (a/<!! out-chan)]
(if (instance? Throwable result)
(throw result)
result)))
(defn async-setup
"Middleware that creates the output/canceled channels for the asynchronous (4-arg) QP middleware and runs it.
......@@ -35,24 +69,12 @@
closes."
[qp]
(fn [{:keys [async?], :as query}]
(let [out-chan (a/promise-chan)
canceled-chan (async.u/promise-canceled-chan out-chan)
respond (fn [result]
(if (some? result)
(a/>!! out-chan result)
(log/warn (trs "Warning: `respond` as passed `nil`.")))
(a/close! out-chan))
raise (fn [e]
(log/warn e (trs "Unhandled exception, exepected `catch-exceptions` middleware to handle it"))
(respond e))]
(let [{:keys [out-chan respond raise canceled-chan]} (async-args)]
(try
(qp query respond raise canceled-chan)
;; if query is `async?` return the output channel; otherwise block until output channel returns a result
(if async?
out-chan
(wait-for-result out-chan))
(catch Throwable e
(raise e)))
;; if query is `async?` return the output channel; otherwise block until output channel returns a result
(if async?
out-chan
(let [result (a/<!! out-chan)]
(if (instance? Throwable result)
(throw result)
result))))))
(raise e))))))
......@@ -85,11 +85,15 @@
(defn catch-exceptions
"Middleware for catching exceptions thrown by the query processor and returning them in a normal format."
[qp]
;; we're not using the version of `raise` passed in on purpose here -- that one is a placeholder -- this is the
;; implementation of `raise` we expect most QP middleware to ultimately use
(fn [query respond _ canceled-chan]
;; we're swapping out the top-level exception handler (`raise` fn) created by the `async-setup` middleware with one
;; that will format the Exceptions and pipe them thru as normal QP 'failure' responses. For InterruptedExceptions
;; however (caused when the query is canceled) pipe all the way thru to the top-level handler so it can close out
;; the output channel instead of writing a response to it, which will cause the cancelation message we're looking for
(fn [query respond top-level-raise canceled-chan]
(let [raise (fn [e]
(respond (format-exception query e)))]
(if (instance? InterruptedException e)
(top-level-raise e)
(respond (format-exception query e))))]
(try
(qp query respond raise canceled-chan)
(catch Throwable e
......
......@@ -5,13 +5,13 @@
[metabase.query-processor.interface :as i]
[metabase.util :as u]))
(defn- log-initial-query* [query]
(defn- log-query* [query]
(u/prog1 query
(when-not i/*disable-qp-logging*
(log/debug (u/format-color 'blue "\nQUERY: %s\n%s" (u/emoji "😎") (u/pprint-to-str query))))))
(defn log-initial-query
"Middleware for logging a query when it is very first encountered, before it is expanded."
(defn log-query
"Middleware that logs the query that will be ran."
[qp]
(comp qp log-initial-query*))
(comp qp log-query*))
......@@ -79,28 +79,41 @@
(save-successful-query-execution! query-execution result)
(success-response query-execution result))
(defn- fail [query-execution message result]
(save-failed-query-execution! query-execution message)
(failure-response query-execution message result))
(defn- fail [query-execution result]
(let [message (get result :error (tru "Unknown error"))]
(save-failed-query-execution! query-execution message)
(failure-response query-execution message result)))
(defn- format-userland-query-result
"Make sure `query-result` `:status` is something other than `nil`or `:failed`, or throw an Exception."
[query-execution {:keys [status], :as result}]
"Format QP response in the format expected by the frontend client, and save a QueryExecution entry."
[respond raise query-execution {:keys [status], :as result}]
(cond
;; if the result itself is invalid there's something wrong in the QP -- not just with the query. Pass an
;; Exception up to the top-level handler; this is basically a 500 situation
(nil? result)
(raise (Exception. (str (trs "Unexpected nil response from query processor."))))
(not status)
(fail query-execution (tru "Invalid response from database driver. No :status provided.") result)
(raise (Exception. (str (tru "Invalid response from database driver. No :status provided.") result)))
;; if query has been cancelled no need to save QueryExecution (or should we?) and no point formatting anything to
;; be returned since it won't be returned
(and (= status :failed)
(instance? InterruptedException (:class result)))
(log/info (trs "Query canceled"))
(do
(log/info (trs "Query canceled"))
(respond {:status :interrupted}))
;; 'Normal' query failures are usually caused by invalid queries -- equivalent of a HTTP 400. Save QueryExecution
;; & return a "status = failed" response
(= status :failed)
(do
(log/warn (trs "Query failure") (u/pprint-to-str 'red result))
(fail query-execution (get result :error (tru "Unknown error")) result))
(respond (fail query-execution result)))
;; Successful query (~= HTTP 200): save QueryExecution & return "status = completed" response
(= status :completed)
(succeed query-execution result)))
(respond (succeed query-execution result))))
;;; +----------------------------------------------------------------------------------------------------------------+
......@@ -138,5 +151,5 @@
(qp query respond raise canceled-chan)
;; add calculated hash to query
(let [query (assoc-in query [:info :query-hash] (qputil/query-hash query))
respond (comp respond (partial format-userland-query-result (query-execution-info query)))]
respond (partial format-userland-query-result respond raise (query-execution-info query))]
(qp query respond raise canceled-chan)))))
......@@ -321,11 +321,13 @@
frames-before-last-mb))))})
(defn deref-with-timeout
"Call `deref` on a FUTURE and throw an exception if it takes more than TIMEOUT-MS."
[futur timeout-ms]
(let [result (deref futur timeout-ms ::timeout)]
"Call `deref` on a something derefable (e.g. a future or promise), and throw an exception if it takes more than
`timeout-ms`. If `ref` is a future it will attempt to cancel it as well."
[reff timeout-ms]
(let [result (deref reff timeout-ms ::timeout)]
(when (= result ::timeout)
(future-cancel futur)
(when (instance? java.util.concurrent.Future reff)
(future-cancel reff))
(throw (TimeoutException. (str (tru "Timed out after {0} milliseconds." timeout-ms)))))
result))
......
......@@ -41,7 +41,8 @@
(def ^:private query-defaults
{:constraints constraints/default-query-constraints
:middleware {:add-default-userland-constraints? true, :userland-query? true}})
:middleware {:add-default-userland-constraints? true, :userland-query? true}
:async? true})
;;; ## POST /api/meta/dataset
;; Just a basic sanity check to make sure Query Processor endpoint is still working correctly.
......
......@@ -51,7 +51,7 @@
(expect
{:first-permit "Permit #1", :second-permit "Permit #1", :same? true}
(tu.async/with-open-channels [semaphore-chan (semaphore-channel/semaphore-channel 1)
output-chan (a/chan 1)]
output-chan (a/promise-chan)]
(let [existing-permit #(get @#'semaphore-channel/*permits* semaphore-chan)]
(semaphore-channel/do-after-receiving-permit semaphore-chan
(fn []
......@@ -62,49 +62,108 @@
(a/>!! output-chan {:first-permit (str first-permit)
:second-permit (str second-permit)
:same? (identical? first-permit second-permit)}))))))))
(first (a/alts!! [output-chan (a/timeout 100)]))))
(tu.async/wait-for-result output-chan)))
;; Make sure `do-f-with-permit` returns the permit when functions finish normally
;; Make sure `do-with-permit` returns the permit when functions finish normally
(expect
{:permit-returned? true, :result ::value}
(let [open? (atom false)
permit (reify
Closeable
(close [this]
(reset! open? false)))]
(tu.async/with-open-channels [output-chan (a/chan 1)]
(#'semaphore-channel/do-f-with-permit permit output-chan (constantly ::value))
(let [[result] (a/alts!! [output-chan (a/timeout 100)])]
{:permit-returned? (not @open?), :result result}))))
(let [permit (tu.async/permit)]
(tu.async/with-open-channels [semaphore-chan (a/chan 1)
output-chan (#'semaphore-channel/do-with-permit
semaphore-chan
permit
(constantly ::value))]
{:permit-returned? (tu.async/permit-closed? permit)
:result (tu.async/wait-for-result output-chan)})))
;; If `f` throws an Exception, `permit` should get returned, and Exception should get returned as the result
(expect
{:permit-returned? true, :result "FAIL"}
(let [open? (atom false)
permit (reify
Closeable
(close [this]
(reset! open? false)))]
(tu.async/with-open-channels [output-chan (a/chan 1)]
(#'semaphore-channel/do-f-with-permit permit output-chan (fn []
(throw (Exception. "FAIL"))))
(let [[result] (a/alts!! [output-chan (a/timeout 100)])]
{:permit-returned? (not @open?), :result (when (instance? Exception result)
(.getMessage ^Exception result))}))))
(let [permit (tu.async/permit)]
(tu.async/with-open-channels [semaphore-chan (a/chan 1)
output-chan (#'semaphore-channel/do-with-permit
semaphore-chan
permit
(fn [] (throw (Exception. "FAIL"))))]
{:permit-returned? (tu.async/permit-closed? permit)
:result (let [result (tu.async/wait-for-result output-chan)]
(if (instance? Throwable result)
(.getMessage ^Throwable result)
result))})))
;; If `output-chan` is closed early, permit should still get returned, but there's nowhere to write the result to so
;; it should be `nil`
(expect
{:permit-returned? true, :result nil}
(let [open? (atom false)
permit (reify
Closeable
(close [this]
(reset! open? false)))]
(tu.async/with-open-channels [output-chan (a/chan 1)]
(#'semaphore-channel/do-f-with-permit permit output-chan (fn []
(Thread/sleep 100)
::value))
(let [permit (tu.async/permit)]
(tu.async/with-open-channels [semaphore-chan (a/chan 1)
output-chan (#'semaphore-channel/do-with-permit
semaphore-chan
permit
(fn []
(Thread/sleep 100)
::value))]
(a/close! output-chan)
(let [[result] (a/alts!! [output-chan (a/timeout 500)])]
{:permit-returned? (not @open?), :result result}))))
{:permit-returned? (tu.async/permit-closed? permit)
:result (tu.async/wait-for-result output-chan)})))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | Tests for the new 0.32.5 optimizations that avoid async waits when permits are immediately available |
;;; +----------------------------------------------------------------------------------------------------------------+
;; there are basically 3 strategies that can be used by `do-after-receiving-permit`
;; 1) run immediately, because permit is already present
;; 2) run immediately, because permit is immediately available
;; 3) run after waiting for permit
;; to check that everything works correctly in a few different scenarios, rather than right 3 x n tests, largely
;; repeating ourselves, we'll break things out into small functions that can be combined to pick + choose the
;; functionality to test with a given strategy.
(defn- do-semaphore-chan-fn [thunk-fn strategy-fn]
(tu.async/with-open-channels [semaphore-chan (a/chan 1)]
(strategy-fn semaphore-chan (thunk-fn (partial #'semaphore-channel/do-after-receiving-permit semaphore-chan)))))
(defn- with-existing-permit [semaphore-chan thunk]
(binding [semaphore-channel/*permits* {semaphore-chan (tu.async/permit)}]
(thunk)))
(defn- with-immediately-available-permit [semaphore-chan thunk]
(a/>!! semaphore-chan (tu.async/permit))
(thunk))
(defn- after-waiting [semaphore-chan thunk]
(a/go
(a/<! (a/timeout 50))
(a/>! semaphore-chan (tu.async/permit)))
(thunk))
;; test normal functions work correctly
(defn- normal-fn [do-f]
(fn []
(tu.async/wait-for-result
(do-f (partial +) 1 2 3))))
(expect 6 (do-semaphore-chan-fn normal-fn with-existing-permit))
(expect 6 (do-semaphore-chan-fn normal-fn with-immediately-available-permit))
(expect 6 (do-semaphore-chan-fn normal-fn after-waiting))
;; Test that if output channel is closed, function gets interrupted
(defn- check-interrupted-fn [do-f]
(fn []
(let [f (fn [chan]
(try
(Thread/sleep 1000)
(catch InterruptedException e
(a/>!! chan ::interrupted))))]
(tu.async/with-open-channels [interrupted-chan (a/promise-chan)
out-chan (do-f f interrupted-chan)]
(a/go
(a/<! (a/timeout 100))
(a/close! out-chan))
(tu.async/wait-for-result interrupted-chan 500)))))
(expect ::interrupted (do-semaphore-chan-fn check-interrupted-fn with-existing-permit))
(expect ::interrupted (do-semaphore-chan-fn check-interrupted-fn with-immediately-available-permit))
(expect ::interrupted (do-semaphore-chan-fn check-interrupted-fn after-waiting))
......@@ -2,8 +2,7 @@
(:require [clojure.core.async :as a]
[expectations :refer [expect]]
[metabase.query-processor.middleware.async-wait :as async-wait]
[metabase.test.util.async :as tu.async])
(:import java.io.Closeable))
[metabase.test.util.async :as tu.async]))
(defn- async-wait
"Mocked version of `async-wait/wait-for-permit` middleware. Runs `f` with 3 channels:
......@@ -26,56 +25,30 @@
((async-wait/wait-for-permit qp) query respond respond canceled-chan))
(f {:result-chan result-chan, :semaphore-chan semaphore-chan, :canceled-chan canceled-chan})))))
(defprotocol ^:private NotifyClosed
(^:private on-close-chan [this]
"Returns a channel that will get a `::closed` message when `.close` is called on the object."))
(defn- permit
"Return a mocked permit object to passed to the mocked semaphore channel. You can check whether this was closed
correctly using `on-close-chan` above."
[]
(let [closed-chan (a/promise-chan)]
(reify
Closeable
(close [_]
(a/>!! closed-chan ::closed)
(a/close! closed-chan))
NotifyClosed
(on-close-chan [_]
closed-chan))))
(defn- wait-for-result
"Wait up to `timeout-ms` (default 200) for a result from `chan`, or return a `::timed-out` message."
([chan]
(wait-for-result chan 200))
([chan timeout-ms]
(let [[val port] (a/alts!! [chan (a/timeout timeout-ms)])]
(if (not= port chan)
::timed-out
val))))
;; QP should run if semaphore-chan gets a permit. Permit should be closed after QP finishes.
(expect
{:result ::result, :permit-taken? true, :permit-closed? true}
(async-wait
(fn [{:keys [semaphore-chan result-chan]}]
(let [permit (permit)]
(let [permit (tu.async/permit)]
;; provide a permit async after a short delay
(a/go
(a/<! (a/timeout 10))
(a/>! semaphore-chan permit))
{:result (wait-for-result result-chan)
:permit-taken? (= (wait-for-result semaphore-chan) ::timed-out)
:permit-closed? (= (wait-for-result (on-close-chan permit)) ::closed)}))))
{:result (tu.async/wait-for-result result-chan)
:permit-taken? (= (tu.async/wait-for-result semaphore-chan) ::tu.async/timed-out)
:permit-closed? (tu.async/permit-closed? permit)}))))
;; If semaphore-chan never gets a permit, then the QP should never run
(expect
{:result ::timed-out, :permit-closed? false}
{:result ::tu.async/timed-out, :permit-closed? false}
(async-wait
(fn [{:keys [result-chan]}]
(let [permit (permit)]
{:result (wait-for-result result-chan)
:permit-closed? (= (wait-for-result (on-close-chan permit)) ::closed)}))))
(let [permit (tu.async/permit)]
{:result (tu.async/wait-for-result result-chan)
:permit-closed? (tu.async/permit-closed? permit)}))))
;; if canceled-chan gets a message before permit is provided, QP should never run
(expect
......@@ -84,12 +57,12 @@
:permit-closed? false}
(async-wait
(fn [{:keys [result-chan semaphore-chan canceled-chan]}]
(let [permit (permit)]
(let [permit (tu.async/permit)]
(a/go
(a/<! (a/timeout 10))
(a/>! canceled-chan :canceled)
(a/<! (a/timeout 100))
(a/>! semaphore-chan permit))
{:result (wait-for-result result-chan)
:permit-taken? (= (wait-for-result semaphore-chan) ::timed-out)
:permit-closed? (= (wait-for-result (on-close-chan permit)) ::closed)}))))
{:result (tu.async/wait-for-result result-chan)
:permit-taken? (= (tu.async/wait-for-result semaphore-chan) ::tu.async/timed-out)
:permit-closed? (tu.async/permit-closed? permit)}))))
(ns metabase.query-processor-test.query-cancellation-test
"TODO - This is sql-jdbc specific, so it should go in a sql-jdbc test namespace."
(:require [clojure.java.jdbc :as jdbc]
[clojure.string :as str]
[expectations :refer [expect]]
[metabase.test.util :as tu]
[metabase.test.util.log :as tu.log]))
......@@ -12,35 +13,30 @@
(close [_] true))
(defn- make-fake-prep-stmt
"Returns `fake-value` whenenver the `sql` parameter returns a truthy value when passed to `use-fake-value?`"
[orig-make-prep-stmt use-fake-value? faked-value]
"Returns `fake-value` whenenver the `sql` parameter returns a truthy value when passed to `use-fake-value?`."
[orig-make-prep-stmt & {:keys [use-fake-value? faked-value]
:or {use-fake-value? (constantly false)}}]
(fn [connection sql options]
(if (use-fake-value? sql)
faked-value
(orig-make-prep-stmt connection sql options))))
(defn- fake-query
"Function to replace the `clojure.java.jdbc/query` function. Will invoke `call-on-query`, then `call-to-pause` whe
passed an instance of `FakePreparedStatement`"
[orig-jdbc-query call-on-query call-to-pause]
"Function to replace the `clojure.java.jdbc/query` function. Will invoke `on-fake-prepared-statement` when passed an
instance of `FakePreparedStatement`."
{:style/indent 1}
[orig-jdbc-query & {:keys [on-fake-prepared-statement]}]
(fn
([conn stmt+params]
(orig-jdbc-query conn stmt+params))
([conn stmt+params opts]
(if (instance? FakePreparedStatement (first stmt+params))
(do
(call-on-query)
(call-to-pause))
(when on-fake-prepared-statement (on-fake-prepared-statement))
(orig-jdbc-query conn stmt+params opts)))))
(expect
[false ;; Ensure the query promise hasn't fired yet
false ;; Ensure the cancellation promise hasn't fired yet
true ;; Was query called?
false ;; Cancel should not have been called yet
true ;; Cancel should have been called now
true ;; The paused query can proceed now
]
::tu/success
;; this might dump messages about the connection being closed; we don't need to worry about that
(tu.log/suppress-output
(tu/call-with-paused-query
......@@ -54,6 +50,16 @@
orig-jdbc-query jdbc/query
orig-prep-stmt jdbc/prepare-statement]
(future
(with-redefs [jdbc/prepare-statement (make-fake-prep-stmt orig-prep-stmt (fn [table-name] (re-find #"CHECKINS" table-name)) fake-prep-stmt)
jdbc/query (fake-query orig-jdbc-query #(deliver called-query? true) #(deref pause-query))]
(query-thunk))))))))
(try
(with-redefs [jdbc/prepare-statement (make-fake-prep-stmt
orig-prep-stmt
:use-fake-value? (fn [sql] (re-find #"checkins" (str/lower-case sql)))
:faked-value fake-prep-stmt)
jdbc/query (fake-query orig-jdbc-query
:on-fake-prepared-statement
(fn []
(deliver called-query? true)
(deref pause-query)))]
(query-thunk))
(catch Throwable e
(throw e)))))))))
......@@ -38,7 +38,8 @@
[schema.core :as s]
[toucan.db :as db]
[toucan.util.test :as test])
(:import org.apache.log4j.Logger
(:import java.util.concurrent.TimeoutException
org.apache.log4j.Logger
[org.quartz CronTrigger JobDetail JobKey Scheduler Trigger]))
;; record type for testing that results match a Schema
......@@ -574,48 +575,47 @@
[model-seq & body]
`(do-with-model-cleanup ~model-seq (fn [] ~@body)))
;; TODO - not 100% sure I understand
(defn call-with-paused-query
"This is a function to make testing query cancellation eaiser as it can be complex handling the multiple threads
needed to orchestrate a query cancellation.
This function takes `f` which is a function of 4 arguments:
- query-thunk - no-arg function that will invoke a query
- query promise - promise used to validate the query function was called
- cancel promise - promise used to validate a cancellation function was called
- pause query promise - promise used to hang the query function, allowing cancellation
This function returns a vector of booleans indicating the various statuses of the promises, useful for comparison
in an `expect`"
- query-thunk - No-arg function that will invoke a query.
- query promise - Promise used to validate the query function was called. Deliver something to this once the
query has started running
- cancel promise - Promise used to validate a cancellation function was called. Deliver something to this once
the query was successfully canceled.
- pause query promise - Promise used to hang the query function, allowing cancellation. Wait for this to be
delivered to hang the query.
`f` should return a future that can be canceled."
[f]
(data/with-db (data/get-or-create-database! defs/test-data)
(let [called-cancel? (promise)
called-query? (promise)
pause-query (promise)
before-query-called-cancel (realized? called-cancel?)
before-query-called-query (realized? called-query?)
query-thunk (fn [] (data/run-mbql-query checkins
{:aggregation [[:count]]}))
;; When the query is ran via the datasets endpoint, it will run in a future. That future can be cancelled,
(let [called-cancel? (promise)
called-query? (promise)
pause-query (promise)
query-thunk (fn []
(data/run-mbql-query checkins
{:aggregation [[:count]]}))
;; When the query is ran via the datasets endpoint, it will run in a future. That future can be canceled,
;; which should cause an interrupt
query-future (f query-thunk called-query? called-cancel? pause-query)]
;; Make sure that we start out with our promises not having a value
[before-query-called-cancel
before-query-called-query
;; The cancelled-query? and called-cancel? timeouts are very high and are really just intended to
;; prevent the test from hanging indefinitely. It shouldn't be hit unless something is really wrong
(deref called-query? 120000 ::query-never-called)
;; At this point in time, the query is blocked, waiting for `pause-query` do be delivered
(realized? called-cancel?)
(do
;; If we cancel the future, it should throw an InterruptedException, which should call the cancel
;; method on the prepared statement
(future-cancel query-future)
(deref called-cancel? 120000 ::cancel-never-called))
(do
;; This releases the fake query function so it finishes
(deliver pause-query true)
true)])))
query-future (f query-thunk called-query? called-cancel? pause-query)]
;; The cancelled-query? and called-cancel? timeouts are very high and are really just intended to
;; prevent the test from hanging indefinitely. It shouldn't be hit unless something is really wrong
(when (= ::query-never-called (deref called-query? 10000 ::query-never-called))
(throw (TimeoutException. "query should have been called by now.")))
;; At this point in time, the query is blocked, waiting for `pause-query` do be delivered. Cancel still should
;; not have been called yet.
(assert (not (realized? called-cancel?)) "cancel still should not have been called yet.")
;; If we cancel the future, it should throw an InterruptedException, which should call the cancel
;; method on the prepared statement
(future-cancel query-future)
(when (= ::cancel-never-called (deref called-cancel? 10000 ::cancel-never-called))
(throw (TimeoutException. "cancel should have been called by now.")))
;; This releases the fake query function so it finishes
(deliver pause-query true)
::success)))
(defmacro throw-if-called
"Redefines `fn-var` with a function that throws an exception if it's called"
......
(ns metabase.test.util.async
(:require [clojure.core.async :as a])
(:import java.util.concurrent.TimeoutException))
(:import java.io.Closeable
java.util.concurrent.TimeoutException))
(defn wait-for-close
"Wait up to `timeout-ms` for `chan` to be closed, and returns `true` once it is; otherwise throws an Exception if
......@@ -51,3 +52,40 @@
~(if (seq more)
`(with-chans ~more ~@body)
`(do ~@body))))
(defn wait-for-result
"Wait up to `timeout-ms` (default 200) for a result from `chan`, or return a `::timed-out` message."
([chan]
(wait-for-result chan 200))
([chan timeout-ms]
(try
(let [[val port] (a/alts!! [chan (a/timeout timeout-ms)])]
(if (not= port chan)
::timed-out
val))
(finally
(a/close! chan)))))
(defprotocol NotifyClosed
(on-close-chan [this]
"Returns a channel that will get a `::closed` message when `.close` is called on the object."))
(defn permit
"Return a mocked permit object to pass to a mocked semaphore channel. You can check whether this was closed correctly
using `on-close-chan` above, or with `permit-closed?` below."
[]
(let [closed-chan (a/promise-chan)]
(reify
Closeable
(close [_]
(a/>!! closed-chan ::closed)
(a/close! closed-chan))
NotifyClosed
(on-close-chan [_]
closed-chan))))
(defn permit-closed?
"Wait up to 200 ms for mocked semaphore channel permit to be closed; returns true if closed or false otherwise."
[mocked-permit]
(= (wait-for-result (on-close-chan mocked-permit))
::closed))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment