diff --git a/modules/drivers/druid/test/metabase/driver/druid_test.clj b/modules/drivers/druid/test/metabase/driver/druid_test.clj index feb83579ddb26cbb7767629b2e518f2cf534c207..f037bc3b58b5c52f3cfc03362641a70acd6fb46a 100644 --- a/modules/drivers/druid/test/metabase/driver/druid_test.clj +++ b/modules/drivers/druid/test/metabase/driver/druid_test.clj @@ -343,21 +343,17 @@ ;; Query cancellation test, needs careful coordination between the query thread, cancellation thread to ensure ;; everything works correctly together (datasets/expect-with-driver :druid - [false ;; Ensure the query promise hasn't fired yet - false ;; Ensure the cancellation promise hasn't fired yet - true ;; Was query called? - false ;; Cancel should not have been called yet - true ;; Cancel should have been called now - true ;; The paused query can proceed now - ] + ::tu/success (tu/call-with-paused-query (fn [query-thunk called-query? called-cancel? pause-query] (future - ;; stub out the query and delete functions so that we know when one is called vs. the other - (with-redefs [druid/do-query (fn [details query] (deliver called-query? true) @pause-query) - druid/DELETE (fn [url] (deliver called-cancel? true))] - (data/run-mbql-query checkins - {:aggregation [[:count]]})))))) + (try + ;; stub out the query and delete functions so that we know when one is called vs. the other + (with-redefs [druid/do-query (fn [details query] (deliver called-query? true) @pause-query) + druid/DELETE (fn [url] (deliver called-cancel? true))] + (query-thunk)) + (catch Throwable e + (println "Error running query:" e))))))) ;; Make sure Druid cols + columns come back in the same order and that that order is the expected MBQL columns order ;; (#9294) diff --git a/src/metabase/async/semaphore_channel.clj b/src/metabase/async/semaphore_channel.clj index 30a425a2cd8a6e4acbcc9a63401d853064125a38..af6a6907fdf49c8fe0efd375adf64b5adc184bc7 100644 --- a/src/metabase/async/semaphore_channel.clj +++ b/src/metabase/async/semaphore_channel.clj @@ -71,26 +71,27 @@ second permit if this thread already has one." {}) -(s/defn ^:private do-f-with-permit - "Once a `permit` is obtained, execute `(apply f args)`, writing the results to `output-chan`, and returning the permit - no matter what." - [permit :- Closeable, out-chan :- async.u/PromiseChan, f & args] - (try - (let [canceled-chan (async.u/single-value-pipe (apply async.u/do-on-separate-thread f args) out-chan)] - ;; canceled-chan will be called whenever `out-chan` is closed, either because `single-value-pipe` closes it when - ;; it gets a result from `do-on-separate-thread`, or because someone else closed it earlier (i.e. canceled API - ;; request). When this happens return the permit +(s/defn ^:private do-with-existing-permit-for-current-thread :- (s/maybe async.u/PromiseChan) + [semaphore-chan f & args] + (when (get *permits* semaphore-chan) + (log/debug (trs "Current thread already has a permit, will not wait to acquire another")) + (apply async.u/do-on-separate-thread f args))) + +(s/defn ^:private do-with-permit :- async.u/PromiseChan + [semaphore-chan, permit :- Closeable, f & args] + (binding [*permits* (assoc *permits* semaphore-chan permit)] + (let [out-chan (apply async.u/do-on-separate-thread f args)] + ;; whenever `out-chan` closes return the permit (a/go - (if (a/<! canceled-chan) - (log/debug (trs "request canceled, permit will be returned")) - (log/debug (trs "f finished, permit will be returned"))) - (.close permit))) - (catch Throwable e - (log/error e (trs "Unexpected error attempting to run function after obtaining permit")) - (a/>! out-chan e) - (a/close! out-chan) - (.close permit))) - nil) + (a/<! out-chan) + (.close permit)) + out-chan))) + +(s/defn ^:private do-with-immediately-available-permit :- (s/maybe async.u/PromiseChan) + [semaphore-chan f & args] + (when-let [^Closeable permit (a/poll! semaphore-chan)] + (log/debug (trs "Permit available without waiting, will run fn immediately")) + (apply do-with-permit semaphore-chan permit f args))) (s/defn ^:private do-after-waiting-for-new-permit :- async.u/PromiseChan [semaphore-chan f & args] @@ -98,18 +99,21 @@ ;; fire off a go block to wait for a permit. (a/go (let [[permit first-done] (a/alts! [semaphore-chan out-chan])] - ;; If out-chan closes before we get a permit, there's nothing for us to do here. Otherwise if we got our - ;; permit then proceed - (condp = first-done - out-chan + (cond + ;; If out-chan closes before we get a permit, there's nothing for us to do here. + (= first-done out-chan) (log/debug (trs "Not running pending function call: output channel already closed.")) - ;; otherwise if channel is still open run the function - semaphore-chan - (if-not permit + ;; if `semaphore-chan` is closed for one reason or another we'll never get a permit so log a warning and + ;; close the output channel + (not permit) + (do (log/warn (trs "Warning: semaphore-channel is closed, will not run pending function call")) - (binding [*permits* (assoc *permits* semaphore-chan permit)] - (apply do-f-with-permit permit out-chan f args)))))) + (a/close! out-chan)) + + ;; otherwise we got a permit and chan run f with it now. + permit + (async.u/single-value-pipe (apply do-with-permit semaphore-chan permit f args) out-chan)))) ;; return `out-chan` which can be used to wait for results out-chan)) @@ -118,11 +122,7 @@ can fetch the results. Closing this channel before results are produced will cancel the function call." {:style/indent 1} [semaphore-chan f & args] - ;; check and see whether we already have a permit for `semaphore-chan`, if so, go ahead and run the function right - ;; away instead of waiting for *another* permit - (if (get *permits* semaphore-chan) - (do - (log/debug (trs "Current thread already has a permit for {0}, will not wait to acquire another" semaphore-chan)) - (apply async.u/do-on-separate-thread f args)) - ;; otherwise wait for a permit - (apply do-after-waiting-for-new-permit semaphore-chan f args))) + (some #(apply % semaphore-chan f args) + [do-with-existing-permit-for-current-thread + do-with-immediately-available-permit + do-after-waiting-for-new-permit])) diff --git a/src/metabase/async/util.clj b/src/metabase/async/util.clj index 5766babaac0be749f9ed082df2e7c15a0e417167..5e8e8096ecbc4bdbc765b4b85069cdacf056de33 100644 --- a/src/metabase/async/util.clj +++ b/src/metabase/async/util.clj @@ -5,7 +5,8 @@ [metabase.util.i18n :refer [trs]] [schema.core :as s]) (:import clojure.core.async.impl.buffers.PromiseBuffer - clojure.core.async.impl.channels.ManyToManyChannel)) + clojure.core.async.impl.channels.ManyToManyChannel + java.util.concurrent.Future)) (defn promise-chan? "Is core.async `chan` a `promise-chan`?" @@ -58,6 +59,31 @@ ;; return the canceled chan in case someone wants to listen to it canceled-chan)) +(s/defn ^:private do-on-separate-thread* :- Future + [out-chan canceled-chan f & args] + (future + (try + (if (a/poll! canceled-chan) + (log/debug (trs "Output channel closed, will skip running {0}." f)) + (do + (log/debug (trs "Running {0} on separate thread..." f)) + (try + (let [result (apply f args)] + (cond + (nil? result) + (log/warn "Warning: {0} returned `nil`" f) + + (not (a/>!! out-chan result)) + (log/error (trs "Unexpected error writing result to output channel: already closed")))) + ;; if we catch an Exception (shouldn't happen in a QP query, but just in case), send it to `chan`. + ;; It's ok, our IMPL of Ring `StreamableResponseBody` will do the right thing with it. + (catch Throwable e + (log/error e (trs "Caught error running {0}" f)) + (when-not (a/>!! out-chan e) + (log/error (trs "Unexpected error writing exception to output channel: already closed"))))))) + (finally + (a/close! out-chan))))) + (s/defn do-on-separate-thread :- PromiseChan "Run `(apply f args)` on a separate thread, returns a channel to fetch the results. Closing this channel early will cancel the future running the function, if possible." @@ -66,28 +92,10 @@ canceled-chan (promise-canceled-chan out-chan) ;; Run `f` on a separarate thread because it's a potentially long-running QP query and we don't want to tie ;; up precious core.async threads - futur - (future - (try - (if (a/poll! canceled-chan) - (log/debug (trs "Output channel closed, will skip running {0}." f)) - (do - (log/debug (trs "Running {0} on separate thread..." f)) - (try - (let [result (apply f args)] - (if (some? result) - (a/put! out-chan result) - (log/warn "Warning: {0} returned `nil`" f))) - ;; if we catch an Exception (shouldn't happen in a QP query, but just in case), send it to `chan`. - ;; It's ok, our IMPL of Ring `StreamableResponseBody` will do the right thing with it. - (catch Throwable e - (log/error e (trs "Caught error running {0}" f)) - (a/put! out-chan e))))) - (finally - (a/close! out-chan))))] + futur (apply do-on-separate-thread* out-chan canceled-chan f args)] + ;; if output chan is closed early cancel the future (a/go (when (a/<! canceled-chan) - (log/debug (trs "Request canceled, canceling future")) + (log/debug (trs "Request canceled, canceling future.")) (future-cancel futur))) - out-chan)) diff --git a/src/metabase/driver.clj b/src/metabase/driver.clj index 0d1b1d9279172bd98dc5987c527e99f955fa3aaf..76446b02df091eb3cbee5fd94690abef0aa43b3a 100644 --- a/src/metabase/driver.clj +++ b/src/metabase/driver.clj @@ -16,7 +16,8 @@ [i18n :refer [trs tru]] [schema :as su]] [schema.core :as s] - [toucan.db :as db])) + [toucan.db :as db]) + (:import org.joda.time.DateTime)) (declare notify-database-updated) @@ -672,9 +673,9 @@ dispatch-on-initialized-driver :hierarchy #'hierarchy) -(defmulti current-db-time +(defmulti ^DateTime current-db-time "Return the current time and timezone from the perspective of `database`. You can use - `metabase.driver.common/current-db-time` to implement this." + `metabase.driver.common/current-db-time` to implement this. This should return a Joda-Time `DateTime`." {:arglists '([driver database])} dispatch-on-initialized-driver :hierarchy #'hierarchy) diff --git a/src/metabase/driver/common.clj b/src/metabase/driver/common.clj index f708334f4ccea18de6fbb004a6a4e826b2c493d0..bc69c290647ddd8031dfe3cd042dc3ccd4ad9aa0 100644 --- a/src/metabase/driver/common.clj +++ b/src/metabase/driver/common.clj @@ -108,7 +108,9 @@ ;;; +----------------------------------------------------------------------------------------------------------------+ (defprotocol ^:private ParseDateTimeString - (^:private parse [this date-time-str] "Parse the `date-time-str` and return a `DateTime` instance")) + (^:private parse + ^DateTime [this date-time-str] + "Parse the `date-time-str` and return a `DateTime` instance.")) (extend-protocol ParseDateTimeString DateTimeFormatter @@ -138,7 +140,7 @@ (defn- first-successful-parse "Attempt to parse `time-str` with each of `date-formatters`, returning the first successful parse. If there are no successful parses throws the exception that the last formatter threw." - [date-formatters time-str] + ^DateTime [date-formatters time-str] (or (some #(u/ignore-exceptions (parse % time-str)) date-formatters) (doseq [formatter (reverse date-formatters)] (parse formatter time-str)))) @@ -163,7 +165,7 @@ `current-db-time-date-formatters` multimethods defined above. Execute a native query for the current time, and parse the results using the date formatters, preserving the timezone. To use this implementation, you must implement the aforementioned multimethods; no default implementation is provided." - [driver database] + ^DateTime [driver database] {:pre [(map? database)]} (let [native-query (current-db-time-native-query driver) date-formatters (current-db-time-date-formatters driver) diff --git a/src/metabase/query_processor.clj b/src/metabase/query_processor.clj index 49592cd073f27341892e014d3cc41232854f5aa1..ee449bd29811290b6037a625e045b01dfea4ffed 100644 --- a/src/metabase/query_processor.clj +++ b/src/metabase/query_processor.clj @@ -134,7 +134,7 @@ resolve-database/resolve-database fetch-source-query/fetch-source-query store/initialize-store - log-query/log-initial-query + log-query/log-query ;; ▲▲▲ SYNC MIDDLEWARE ▲▲▲ ;; ;; All middleware above this point is written in the synchronous 1-arg style. All middleware below is written in diff --git a/src/metabase/query_processor/middleware/add_row_count_and_status.clj b/src/metabase/query_processor/middleware/add_row_count_and_status.clj index c350ed3b607b5a245f83a6c411a157bd6685cc8c..a34787861cf518a7c81080e9212b2400af36f54c 100644 --- a/src/metabase/query_processor/middleware/add_row_count_and_status.clj +++ b/src/metabase/query_processor/middleware/add_row_count_and_status.clj @@ -5,7 +5,8 @@ [util :as qputil]])) (defn add-row-count-and-status - "Wrap the results of a successfully processed query in the format expected by the frontend (add `row_count` and `status`)." + "Wrap the results of a successfully processed query in the format expected by the frontend (add `row_count` and + `status`)." [qp] (fn [{{:keys [max-results max-results-bare-rows]} :constraints, :as query}] (let [results-limit (or (when (qputil/query-without-aggregations-or-limits? query) diff --git a/src/metabase/query_processor/middleware/async.clj b/src/metabase/query_processor/middleware/async.clj index a2c759e264e429007b34bd83f47364b1729f3ce6..65357e75c41cef20a20063b13589b40b97f7e00a 100644 --- a/src/metabase/query_processor/middleware/async.clj +++ b/src/metabase/query_processor/middleware/async.clj @@ -25,6 +25,40 @@ ;;; | async-setup | ;;; +----------------------------------------------------------------------------------------------------------------+ +(defn- respond-fn [out-chan canceled-chan] + (fn [result] + (try + ;; out-chan might already be closed if query was canceled. NBD if that's the case + (a/>!! out-chan (if (nil? result) + (Exception. (str (trs "Unexpectedly got `nil` Query Processor response."))) + result)) + (finally + (a/close! out-chan))))) + +(defn- raise-fn [out-chan respond] + (fn [e] + (if (instance? InterruptedException e) + (do + (log/debug (trs "Got InterruptedException. Canceling query.")) + (a/close! out-chan)) + (do + (log/warn e (trs "Unhandled exception, exepected `catch-exceptions` middleware to handle it.")) + (respond e))))) + +(defn- async-args [] + (let [out-chan (a/promise-chan) + canceled-chan (async.u/promise-canceled-chan out-chan) + respond (respond-fn out-chan canceled-chan) + raise (raise-fn out-chan respond)] + {:out-chan out-chan, :canceled-chan canceled-chan, :respond respond, :raise raise})) + +(defn- wait-for-result [out-chan] + ;; TODO - there should probably be some sort of max timeout here for out-chan. At least for test/dev purposes + (let [result (a/<!! out-chan)] + (if (instance? Throwable result) + (throw result) + result))) + (defn async-setup "Middleware that creates the output/canceled channels for the asynchronous (4-arg) QP middleware and runs it. @@ -35,24 +69,12 @@ closes." [qp] (fn [{:keys [async?], :as query}] - (let [out-chan (a/promise-chan) - canceled-chan (async.u/promise-canceled-chan out-chan) - respond (fn [result] - (if (some? result) - (a/>!! out-chan result) - (log/warn (trs "Warning: `respond` as passed `nil`."))) - (a/close! out-chan)) - raise (fn [e] - (log/warn e (trs "Unhandled exception, exepected `catch-exceptions` middleware to handle it")) - (respond e))] + (let [{:keys [out-chan respond raise canceled-chan]} (async-args)] (try (qp query respond raise canceled-chan) + ;; if query is `async?` return the output channel; otherwise block until output channel returns a result + (if async? + out-chan + (wait-for-result out-chan)) (catch Throwable e - (raise e))) - ;; if query is `async?` return the output channel; otherwise block until output channel returns a result - (if async? - out-chan - (let [result (a/<!! out-chan)] - (if (instance? Throwable result) - (throw result) - result)))))) + (raise e)))))) diff --git a/src/metabase/query_processor/middleware/catch_exceptions.clj b/src/metabase/query_processor/middleware/catch_exceptions.clj index 64ed62457e0a411403170c34bd31cb618fd782c5..dcfb048531a1feabd473286e7ccb2e78056c2661 100644 --- a/src/metabase/query_processor/middleware/catch_exceptions.clj +++ b/src/metabase/query_processor/middleware/catch_exceptions.clj @@ -85,11 +85,15 @@ (defn catch-exceptions "Middleware for catching exceptions thrown by the query processor and returning them in a normal format." [qp] - ;; we're not using the version of `raise` passed in on purpose here -- that one is a placeholder -- this is the - ;; implementation of `raise` we expect most QP middleware to ultimately use - (fn [query respond _ canceled-chan] + ;; we're swapping out the top-level exception handler (`raise` fn) created by the `async-setup` middleware with one + ;; that will format the Exceptions and pipe them thru as normal QP 'failure' responses. For InterruptedExceptions + ;; however (caused when the query is canceled) pipe all the way thru to the top-level handler so it can close out + ;; the output channel instead of writing a response to it, which will cause the cancelation message we're looking for + (fn [query respond top-level-raise canceled-chan] (let [raise (fn [e] - (respond (format-exception query e)))] + (if (instance? InterruptedException e) + (top-level-raise e) + (respond (format-exception query e))))] (try (qp query respond raise canceled-chan) (catch Throwable e diff --git a/src/metabase/query_processor/middleware/log.clj b/src/metabase/query_processor/middleware/log.clj index 4cd47196f490e99a307886016b1ba6bd0e99a5a5..3a0ba913f3a0e1c2d41ae5ba6fc898755894bb32 100644 --- a/src/metabase/query_processor/middleware/log.clj +++ b/src/metabase/query_processor/middleware/log.clj @@ -5,13 +5,13 @@ [metabase.query-processor.interface :as i] [metabase.util :as u])) -(defn- log-initial-query* [query] +(defn- log-query* [query] (u/prog1 query (when-not i/*disable-qp-logging* (log/debug (u/format-color 'blue "\nQUERY: %s\n%s" (u/emoji "😎") (u/pprint-to-str query)))))) -(defn log-initial-query - "Middleware for logging a query when it is very first encountered, before it is expanded." +(defn log-query + "Middleware that logs the query that will be ran." [qp] - (comp qp log-initial-query*)) + (comp qp log-query*)) diff --git a/src/metabase/query_processor/middleware/process_userland_query.clj b/src/metabase/query_processor/middleware/process_userland_query.clj index 18b770d2cf471dd4bc38d5999a11ae7f7a66d3df..7f8af0db2e772e69f09dc3259ee14dbb22e2029b 100644 --- a/src/metabase/query_processor/middleware/process_userland_query.clj +++ b/src/metabase/query_processor/middleware/process_userland_query.clj @@ -79,28 +79,41 @@ (save-successful-query-execution! query-execution result) (success-response query-execution result)) -(defn- fail [query-execution message result] - (save-failed-query-execution! query-execution message) - (failure-response query-execution message result)) +(defn- fail [query-execution result] + (let [message (get result :error (tru "Unknown error"))] + (save-failed-query-execution! query-execution message) + (failure-response query-execution message result))) (defn- format-userland-query-result - "Make sure `query-result` `:status` is something other than `nil`or `:failed`, or throw an Exception." - [query-execution {:keys [status], :as result}] + "Format QP response in the format expected by the frontend client, and save a QueryExecution entry." + [respond raise query-execution {:keys [status], :as result}] (cond + ;; if the result itself is invalid there's something wrong in the QP -- not just with the query. Pass an + ;; Exception up to the top-level handler; this is basically a 500 situation + (nil? result) + (raise (Exception. (str (trs "Unexpected nil response from query processor.")))) + (not status) - (fail query-execution (tru "Invalid response from database driver. No :status provided.") result) + (raise (Exception. (str (tru "Invalid response from database driver. No :status provided.") result))) + ;; if query has been cancelled no need to save QueryExecution (or should we?) and no point formatting anything to + ;; be returned since it won't be returned (and (= status :failed) (instance? InterruptedException (:class result))) - (log/info (trs "Query canceled")) + (do + (log/info (trs "Query canceled")) + (respond {:status :interrupted})) + ;; 'Normal' query failures are usually caused by invalid queries -- equivalent of a HTTP 400. Save QueryExecution + ;; & return a "status = failed" response (= status :failed) (do (log/warn (trs "Query failure") (u/pprint-to-str 'red result)) - (fail query-execution (get result :error (tru "Unknown error")) result)) + (respond (fail query-execution result))) + ;; Successful query (~= HTTP 200): save QueryExecution & return "status = completed" response (= status :completed) - (succeed query-execution result))) + (respond (succeed query-execution result)))) ;;; +----------------------------------------------------------------------------------------------------------------+ @@ -138,5 +151,5 @@ (qp query respond raise canceled-chan) ;; add calculated hash to query (let [query (assoc-in query [:info :query-hash] (qputil/query-hash query)) - respond (comp respond (partial format-userland-query-result (query-execution-info query)))] + respond (partial format-userland-query-result respond raise (query-execution-info query))] (qp query respond raise canceled-chan))))) diff --git a/src/metabase/util.clj b/src/metabase/util.clj index c7189fec10906c620ed64c3fbed423699995694f..c3982d42aea7db6fcf74824abbd8f141f13a7603 100644 --- a/src/metabase/util.clj +++ b/src/metabase/util.clj @@ -321,11 +321,13 @@ frames-before-last-mb))))}) (defn deref-with-timeout - "Call `deref` on a FUTURE and throw an exception if it takes more than TIMEOUT-MS." - [futur timeout-ms] - (let [result (deref futur timeout-ms ::timeout)] + "Call `deref` on a something derefable (e.g. a future or promise), and throw an exception if it takes more than + `timeout-ms`. If `ref` is a future it will attempt to cancel it as well." + [reff timeout-ms] + (let [result (deref reff timeout-ms ::timeout)] (when (= result ::timeout) - (future-cancel futur) + (when (instance? java.util.concurrent.Future reff) + (future-cancel reff)) (throw (TimeoutException. (str (tru "Timed out after {0} milliseconds." timeout-ms))))) result)) diff --git a/test/metabase/api/dataset_test.clj b/test/metabase/api/dataset_test.clj index a9ef98dbdd323af89e80cb7ec13807cdf43b08d5..dc426351eed9c8e0936bbb439617731509a56931 100644 --- a/test/metabase/api/dataset_test.clj +++ b/test/metabase/api/dataset_test.clj @@ -41,7 +41,8 @@ (def ^:private query-defaults {:constraints constraints/default-query-constraints - :middleware {:add-default-userland-constraints? true, :userland-query? true}}) + :middleware {:add-default-userland-constraints? true, :userland-query? true} + :async? true}) ;;; ## POST /api/meta/dataset ;; Just a basic sanity check to make sure Query Processor endpoint is still working correctly. diff --git a/test/metabase/async/semaphore_channel_test.clj b/test/metabase/async/semaphore_channel_test.clj index b9717832ef5dbf6bcb6b85c48ede480e813bf188..d500362848282e1a0799d596d15e5d9529b14629 100644 --- a/test/metabase/async/semaphore_channel_test.clj +++ b/test/metabase/async/semaphore_channel_test.clj @@ -51,7 +51,7 @@ (expect {:first-permit "Permit #1", :second-permit "Permit #1", :same? true} (tu.async/with-open-channels [semaphore-chan (semaphore-channel/semaphore-channel 1) - output-chan (a/chan 1)] + output-chan (a/promise-chan)] (let [existing-permit #(get @#'semaphore-channel/*permits* semaphore-chan)] (semaphore-channel/do-after-receiving-permit semaphore-chan (fn [] @@ -62,49 +62,108 @@ (a/>!! output-chan {:first-permit (str first-permit) :second-permit (str second-permit) :same? (identical? first-permit second-permit)})))))))) - (first (a/alts!! [output-chan (a/timeout 100)])))) + (tu.async/wait-for-result output-chan))) -;; Make sure `do-f-with-permit` returns the permit when functions finish normally +;; Make sure `do-with-permit` returns the permit when functions finish normally (expect {:permit-returned? true, :result ::value} - (let [open? (atom false) - permit (reify - Closeable - (close [this] - (reset! open? false)))] - (tu.async/with-open-channels [output-chan (a/chan 1)] - (#'semaphore-channel/do-f-with-permit permit output-chan (constantly ::value)) - (let [[result] (a/alts!! [output-chan (a/timeout 100)])] - {:permit-returned? (not @open?), :result result})))) + (let [permit (tu.async/permit)] + (tu.async/with-open-channels [semaphore-chan (a/chan 1) + output-chan (#'semaphore-channel/do-with-permit + semaphore-chan + permit + (constantly ::value))] + {:permit-returned? (tu.async/permit-closed? permit) + :result (tu.async/wait-for-result output-chan)}))) ;; If `f` throws an Exception, `permit` should get returned, and Exception should get returned as the result (expect {:permit-returned? true, :result "FAIL"} - (let [open? (atom false) - permit (reify - Closeable - (close [this] - (reset! open? false)))] - (tu.async/with-open-channels [output-chan (a/chan 1)] - (#'semaphore-channel/do-f-with-permit permit output-chan (fn [] - (throw (Exception. "FAIL")))) - (let [[result] (a/alts!! [output-chan (a/timeout 100)])] - {:permit-returned? (not @open?), :result (when (instance? Exception result) - (.getMessage ^Exception result))})))) + (let [permit (tu.async/permit)] + (tu.async/with-open-channels [semaphore-chan (a/chan 1) + output-chan (#'semaphore-channel/do-with-permit + semaphore-chan + permit + (fn [] (throw (Exception. "FAIL"))))] + {:permit-returned? (tu.async/permit-closed? permit) + :result (let [result (tu.async/wait-for-result output-chan)] + (if (instance? Throwable result) + (.getMessage ^Throwable result) + result))}))) ;; If `output-chan` is closed early, permit should still get returned, but there's nowhere to write the result to so ;; it should be `nil` (expect {:permit-returned? true, :result nil} - (let [open? (atom false) - permit (reify - Closeable - (close [this] - (reset! open? false)))] - (tu.async/with-open-channels [output-chan (a/chan 1)] - (#'semaphore-channel/do-f-with-permit permit output-chan (fn [] - (Thread/sleep 100) - ::value)) + (let [permit (tu.async/permit)] + (tu.async/with-open-channels [semaphore-chan (a/chan 1) + output-chan (#'semaphore-channel/do-with-permit + semaphore-chan + permit + (fn [] + (Thread/sleep 100) + ::value))] (a/close! output-chan) - (let [[result] (a/alts!! [output-chan (a/timeout 500)])] - {:permit-returned? (not @open?), :result result})))) + {:permit-returned? (tu.async/permit-closed? permit) + :result (tu.async/wait-for-result output-chan)}))) + + +;;; +----------------------------------------------------------------------------------------------------------------+ +;;; | Tests for the new 0.32.5 optimizations that avoid async waits when permits are immediately available | +;;; +----------------------------------------------------------------------------------------------------------------+ + +;; there are basically 3 strategies that can be used by `do-after-receiving-permit` +;; 1) run immediately, because permit is already present +;; 2) run immediately, because permit is immediately available +;; 3) run after waiting for permit + +;; to check that everything works correctly in a few different scenarios, rather than right 3 x n tests, largely +;; repeating ourselves, we'll break things out into small functions that can be combined to pick + choose the +;; functionality to test with a given strategy. + +(defn- do-semaphore-chan-fn [thunk-fn strategy-fn] + (tu.async/with-open-channels [semaphore-chan (a/chan 1)] + (strategy-fn semaphore-chan (thunk-fn (partial #'semaphore-channel/do-after-receiving-permit semaphore-chan))))) + +(defn- with-existing-permit [semaphore-chan thunk] + (binding [semaphore-channel/*permits* {semaphore-chan (tu.async/permit)}] + (thunk))) + +(defn- with-immediately-available-permit [semaphore-chan thunk] + (a/>!! semaphore-chan (tu.async/permit)) + (thunk)) + +(defn- after-waiting [semaphore-chan thunk] + (a/go + (a/<! (a/timeout 50)) + (a/>! semaphore-chan (tu.async/permit))) + (thunk)) + +;; test normal functions work correctly +(defn- normal-fn [do-f] + (fn [] + (tu.async/wait-for-result + (do-f (partial +) 1 2 3)))) + +(expect 6 (do-semaphore-chan-fn normal-fn with-existing-permit)) +(expect 6 (do-semaphore-chan-fn normal-fn with-immediately-available-permit)) +(expect 6 (do-semaphore-chan-fn normal-fn after-waiting)) + +;; Test that if output channel is closed, function gets interrupted +(defn- check-interrupted-fn [do-f] + (fn [] + (let [f (fn [chan] + (try + (Thread/sleep 1000) + (catch InterruptedException e + (a/>!! chan ::interrupted))))] + (tu.async/with-open-channels [interrupted-chan (a/promise-chan) + out-chan (do-f f interrupted-chan)] + (a/go + (a/<! (a/timeout 100)) + (a/close! out-chan)) + (tu.async/wait-for-result interrupted-chan 500))))) + +(expect ::interrupted (do-semaphore-chan-fn check-interrupted-fn with-existing-permit)) +(expect ::interrupted (do-semaphore-chan-fn check-interrupted-fn with-immediately-available-permit)) +(expect ::interrupted (do-semaphore-chan-fn check-interrupted-fn after-waiting)) diff --git a/test/metabase/query_processor/middleware/async_wait_test.clj b/test/metabase/query_processor/middleware/async_wait_test.clj index 9478a8e4665d0fc63dcab2c5d08a236c7b562057..cebefac0dd5fa941dd4c8871d542ff2db4aec486 100644 --- a/test/metabase/query_processor/middleware/async_wait_test.clj +++ b/test/metabase/query_processor/middleware/async_wait_test.clj @@ -2,8 +2,7 @@ (:require [clojure.core.async :as a] [expectations :refer [expect]] [metabase.query-processor.middleware.async-wait :as async-wait] - [metabase.test.util.async :as tu.async]) - (:import java.io.Closeable)) + [metabase.test.util.async :as tu.async])) (defn- async-wait "Mocked version of `async-wait/wait-for-permit` middleware. Runs `f` with 3 channels: @@ -26,56 +25,30 @@ ((async-wait/wait-for-permit qp) query respond respond canceled-chan)) (f {:result-chan result-chan, :semaphore-chan semaphore-chan, :canceled-chan canceled-chan}))))) -(defprotocol ^:private NotifyClosed - (^:private on-close-chan [this] - "Returns a channel that will get a `::closed` message when `.close` is called on the object.")) -(defn- permit - "Return a mocked permit object to passed to the mocked semaphore channel. You can check whether this was closed - correctly using `on-close-chan` above." - [] - (let [closed-chan (a/promise-chan)] - (reify - Closeable - (close [_] - (a/>!! closed-chan ::closed) - (a/close! closed-chan)) - NotifyClosed - (on-close-chan [_] - closed-chan)))) - -(defn- wait-for-result - "Wait up to `timeout-ms` (default 200) for a result from `chan`, or return a `::timed-out` message." - ([chan] - (wait-for-result chan 200)) - ([chan timeout-ms] - (let [[val port] (a/alts!! [chan (a/timeout timeout-ms)])] - (if (not= port chan) - ::timed-out - val)))) ;; QP should run if semaphore-chan gets a permit. Permit should be closed after QP finishes. (expect {:result ::result, :permit-taken? true, :permit-closed? true} (async-wait (fn [{:keys [semaphore-chan result-chan]}] - (let [permit (permit)] + (let [permit (tu.async/permit)] ;; provide a permit async after a short delay (a/go (a/<! (a/timeout 10)) (a/>! semaphore-chan permit)) - {:result (wait-for-result result-chan) - :permit-taken? (= (wait-for-result semaphore-chan) ::timed-out) - :permit-closed? (= (wait-for-result (on-close-chan permit)) ::closed)})))) + {:result (tu.async/wait-for-result result-chan) + :permit-taken? (= (tu.async/wait-for-result semaphore-chan) ::tu.async/timed-out) + :permit-closed? (tu.async/permit-closed? permit)})))) ;; If semaphore-chan never gets a permit, then the QP should never run (expect - {:result ::timed-out, :permit-closed? false} + {:result ::tu.async/timed-out, :permit-closed? false} (async-wait (fn [{:keys [result-chan]}] - (let [permit (permit)] - {:result (wait-for-result result-chan) - :permit-closed? (= (wait-for-result (on-close-chan permit)) ::closed)})))) + (let [permit (tu.async/permit)] + {:result (tu.async/wait-for-result result-chan) + :permit-closed? (tu.async/permit-closed? permit)})))) ;; if canceled-chan gets a message before permit is provided, QP should never run (expect @@ -84,12 +57,12 @@ :permit-closed? false} (async-wait (fn [{:keys [result-chan semaphore-chan canceled-chan]}] - (let [permit (permit)] + (let [permit (tu.async/permit)] (a/go (a/<! (a/timeout 10)) (a/>! canceled-chan :canceled) (a/<! (a/timeout 100)) (a/>! semaphore-chan permit)) - {:result (wait-for-result result-chan) - :permit-taken? (= (wait-for-result semaphore-chan) ::timed-out) - :permit-closed? (= (wait-for-result (on-close-chan permit)) ::closed)})))) + {:result (tu.async/wait-for-result result-chan) + :permit-taken? (= (tu.async/wait-for-result semaphore-chan) ::tu.async/timed-out) + :permit-closed? (tu.async/permit-closed? permit)})))) diff --git a/test/metabase/query_processor_test/query_cancellation_test.clj b/test/metabase/query_processor_test/query_cancellation_test.clj index bc146057b0bec4b6f3312b11cb4d7fb09433c5f2..415c5921bb1d0e5b819b7bb31de559bd83a86a6d 100644 --- a/test/metabase/query_processor_test/query_cancellation_test.clj +++ b/test/metabase/query_processor_test/query_cancellation_test.clj @@ -1,6 +1,7 @@ (ns metabase.query-processor-test.query-cancellation-test "TODO - This is sql-jdbc specific, so it should go in a sql-jdbc test namespace." (:require [clojure.java.jdbc :as jdbc] + [clojure.string :as str] [expectations :refer [expect]] [metabase.test.util :as tu] [metabase.test.util.log :as tu.log])) @@ -12,35 +13,30 @@ (close [_] true)) (defn- make-fake-prep-stmt - "Returns `fake-value` whenenver the `sql` parameter returns a truthy value when passed to `use-fake-value?`" - [orig-make-prep-stmt use-fake-value? faked-value] + "Returns `fake-value` whenenver the `sql` parameter returns a truthy value when passed to `use-fake-value?`." + [orig-make-prep-stmt & {:keys [use-fake-value? faked-value] + :or {use-fake-value? (constantly false)}}] (fn [connection sql options] (if (use-fake-value? sql) faked-value (orig-make-prep-stmt connection sql options)))) (defn- fake-query - "Function to replace the `clojure.java.jdbc/query` function. Will invoke `call-on-query`, then `call-to-pause` whe - passed an instance of `FakePreparedStatement`" - [orig-jdbc-query call-on-query call-to-pause] + "Function to replace the `clojure.java.jdbc/query` function. Will invoke `on-fake-prepared-statement` when passed an + instance of `FakePreparedStatement`." + {:style/indent 1} + [orig-jdbc-query & {:keys [on-fake-prepared-statement]}] (fn ([conn stmt+params] (orig-jdbc-query conn stmt+params)) + ([conn stmt+params opts] (if (instance? FakePreparedStatement (first stmt+params)) - (do - (call-on-query) - (call-to-pause)) + (when on-fake-prepared-statement (on-fake-prepared-statement)) (orig-jdbc-query conn stmt+params opts))))) (expect - [false ;; Ensure the query promise hasn't fired yet - false ;; Ensure the cancellation promise hasn't fired yet - true ;; Was query called? - false ;; Cancel should not have been called yet - true ;; Cancel should have been called now - true ;; The paused query can proceed now - ] + ::tu/success ;; this might dump messages about the connection being closed; we don't need to worry about that (tu.log/suppress-output (tu/call-with-paused-query @@ -54,6 +50,16 @@ orig-jdbc-query jdbc/query orig-prep-stmt jdbc/prepare-statement] (future - (with-redefs [jdbc/prepare-statement (make-fake-prep-stmt orig-prep-stmt (fn [table-name] (re-find #"CHECKINS" table-name)) fake-prep-stmt) - jdbc/query (fake-query orig-jdbc-query #(deliver called-query? true) #(deref pause-query))] - (query-thunk)))))))) + (try + (with-redefs [jdbc/prepare-statement (make-fake-prep-stmt + orig-prep-stmt + :use-fake-value? (fn [sql] (re-find #"checkins" (str/lower-case sql))) + :faked-value fake-prep-stmt) + jdbc/query (fake-query orig-jdbc-query + :on-fake-prepared-statement + (fn [] + (deliver called-query? true) + (deref pause-query)))] + (query-thunk)) + (catch Throwable e + (throw e))))))))) diff --git a/test/metabase/test/util.clj b/test/metabase/test/util.clj index c25c43af3269491db83ad9289a5e72bc07a1d498..6038818e0e1b103e004281f839ac7eeb755ba898 100644 --- a/test/metabase/test/util.clj +++ b/test/metabase/test/util.clj @@ -38,7 +38,8 @@ [schema.core :as s] [toucan.db :as db] [toucan.util.test :as test]) - (:import org.apache.log4j.Logger + (:import java.util.concurrent.TimeoutException + org.apache.log4j.Logger [org.quartz CronTrigger JobDetail JobKey Scheduler Trigger])) ;; record type for testing that results match a Schema @@ -574,48 +575,47 @@ [model-seq & body] `(do-with-model-cleanup ~model-seq (fn [] ~@body))) +;; TODO - not 100% sure I understand (defn call-with-paused-query "This is a function to make testing query cancellation eaiser as it can be complex handling the multiple threads needed to orchestrate a query cancellation. This function takes `f` which is a function of 4 arguments: - - query-thunk - no-arg function that will invoke a query - - query promise - promise used to validate the query function was called - - cancel promise - promise used to validate a cancellation function was called - - pause query promise - promise used to hang the query function, allowing cancellation - - This function returns a vector of booleans indicating the various statuses of the promises, useful for comparison - in an `expect`" + - query-thunk - No-arg function that will invoke a query. + - query promise - Promise used to validate the query function was called. Deliver something to this once the + query has started running + - cancel promise - Promise used to validate a cancellation function was called. Deliver something to this once + the query was successfully canceled. + - pause query promise - Promise used to hang the query function, allowing cancellation. Wait for this to be + delivered to hang the query. + + `f` should return a future that can be canceled." [f] (data/with-db (data/get-or-create-database! defs/test-data) - (let [called-cancel? (promise) - called-query? (promise) - pause-query (promise) - before-query-called-cancel (realized? called-cancel?) - before-query-called-query (realized? called-query?) - query-thunk (fn [] (data/run-mbql-query checkins - {:aggregation [[:count]]})) - ;; When the query is ran via the datasets endpoint, it will run in a future. That future can be cancelled, + (let [called-cancel? (promise) + called-query? (promise) + pause-query (promise) + query-thunk (fn [] + (data/run-mbql-query checkins + {:aggregation [[:count]]})) + ;; When the query is ran via the datasets endpoint, it will run in a future. That future can be canceled, ;; which should cause an interrupt - query-future (f query-thunk called-query? called-cancel? pause-query)] - - ;; Make sure that we start out with our promises not having a value - [before-query-called-cancel - before-query-called-query - ;; The cancelled-query? and called-cancel? timeouts are very high and are really just intended to - ;; prevent the test from hanging indefinitely. It shouldn't be hit unless something is really wrong - (deref called-query? 120000 ::query-never-called) - ;; At this point in time, the query is blocked, waiting for `pause-query` do be delivered - (realized? called-cancel?) - (do - ;; If we cancel the future, it should throw an InterruptedException, which should call the cancel - ;; method on the prepared statement - (future-cancel query-future) - (deref called-cancel? 120000 ::cancel-never-called)) - (do - ;; This releases the fake query function so it finishes - (deliver pause-query true) - true)]))) + query-future (f query-thunk called-query? called-cancel? pause-query)] + ;; The cancelled-query? and called-cancel? timeouts are very high and are really just intended to + ;; prevent the test from hanging indefinitely. It shouldn't be hit unless something is really wrong + (when (= ::query-never-called (deref called-query? 10000 ::query-never-called)) + (throw (TimeoutException. "query should have been called by now."))) + ;; At this point in time, the query is blocked, waiting for `pause-query` do be delivered. Cancel still should + ;; not have been called yet. + (assert (not (realized? called-cancel?)) "cancel still should not have been called yet.") + ;; If we cancel the future, it should throw an InterruptedException, which should call the cancel + ;; method on the prepared statement + (future-cancel query-future) + (when (= ::cancel-never-called (deref called-cancel? 10000 ::cancel-never-called)) + (throw (TimeoutException. "cancel should have been called by now."))) + ;; This releases the fake query function so it finishes + (deliver pause-query true) + ::success))) (defmacro throw-if-called "Redefines `fn-var` with a function that throws an exception if it's called" diff --git a/test/metabase/test/util/async.clj b/test/metabase/test/util/async.clj index f7109310f577cfa334e44d13e4d5391525289796..b85eae7b47fc7a3fcc2db50d5c0ada2c6fc54f3a 100644 --- a/test/metabase/test/util/async.clj +++ b/test/metabase/test/util/async.clj @@ -1,6 +1,7 @@ (ns metabase.test.util.async (:require [clojure.core.async :as a]) - (:import java.util.concurrent.TimeoutException)) + (:import java.io.Closeable + java.util.concurrent.TimeoutException)) (defn wait-for-close "Wait up to `timeout-ms` for `chan` to be closed, and returns `true` once it is; otherwise throws an Exception if @@ -51,3 +52,40 @@ ~(if (seq more) `(with-chans ~more ~@body) `(do ~@body)))) + +(defn wait-for-result + "Wait up to `timeout-ms` (default 200) for a result from `chan`, or return a `::timed-out` message." + ([chan] + (wait-for-result chan 200)) + ([chan timeout-ms] + (try + (let [[val port] (a/alts!! [chan (a/timeout timeout-ms)])] + (if (not= port chan) + ::timed-out + val)) + (finally + (a/close! chan))))) + +(defprotocol NotifyClosed + (on-close-chan [this] + "Returns a channel that will get a `::closed` message when `.close` is called on the object.")) + +(defn permit + "Return a mocked permit object to pass to a mocked semaphore channel. You can check whether this was closed correctly + using `on-close-chan` above, or with `permit-closed?` below." + [] + (let [closed-chan (a/promise-chan)] + (reify + Closeable + (close [_] + (a/>!! closed-chan ::closed) + (a/close! closed-chan)) + NotifyClosed + (on-close-chan [_] + closed-chan)))) + +(defn permit-closed? + "Wait up to 200 ms for mocked semaphore channel permit to be closed; returns true if closed or false otherwise." + [mocked-permit] + (= (wait-for-result (on-close-chan mocked-permit)) + ::closed))