diff --git a/src/metabase/async/util.clj b/src/metabase/async/util.clj index 552e0035543443f90a46ba07c126e40a132375ef..c0f33843291420d5e1a7de35f892bfe1bcd09b33 100644 --- a/src/metabase/async/util.clj +++ b/src/metabase/async/util.clj @@ -1,9 +1,33 @@ (ns metabase.async.util + "Utility functions for core.async-based async logic." (:require [clojure.core.async :as a] [clojure.tools.logging :as log] [metabase.util.i18n :refer [trs]] [schema.core :as s]) - (:import clojure.core.async.impl.channels.ManyToManyChannel)) + (:import clojure.core.async.impl.buffers.PromiseBuffer + clojure.core.async.impl.channels.ManyToManyChannel)) + +(defn promise-chan? + "Is core.async `chan` a `promise-chan`?" + [chan] + (and (instance? ManyToManyChannel chan) + (instance? PromiseBuffer (.buf ^ManyToManyChannel chan)))) + +(def PromiseChan + "Schema for a core.async promise channel." + (s/constrained ManyToManyChannel promise-chan? "promise chan")) + +(s/defn promise-canceled-chan :- PromiseChan + "Given a `promise-chan`, return a new channel that will receive a single message if `promise-chan` is closed before + a message is written to it (i.e. if an API request is canceled). Automatically closes after `promise-chan` receives + a message or is closed." + [promise-chan :- PromiseChan] + (let [canceled-chan (a/promise-chan)] + (a/go + (when (nil? (a/<! promise-chan)) + (a/>! canceled-chan ::canceled)) + (a/close! canceled-chan)) + canceled-chan)) (s/defn single-value-pipe :- ManyToManyChannel "Pipe that will forward a single message from `in-chan` to `out-chan`, closing both afterward. If `out-chan` is closed diff --git a/src/metabase/query_processor.clj b/src/metabase/query_processor.clj index e9065cf0f639e479d3b1b7cfffd28be910ed5602..e31095d4392e68099b6e1e6b3e0f3483487f3610 100644 --- a/src/metabase/query_processor.clj +++ b/src/metabase/query_processor.clj @@ -11,6 +11,7 @@ [add-row-count-and-status :as row-count-and-status] [add-settings :as add-settings] [annotate :as annotate] + [async :as async] [auto-bucket-datetimes :as bucket-datetime] [bind-effective-timezone :as bind-timezone] [binning :as binning] @@ -133,13 +134,20 @@ fetch-source-query/fetch-source-query store/initialize-store log-query/log-initial-query - ;; TODO - bind `*query*` here ? + ;; ▲▲▲ SYNC MIDDLEWARE ▲▲▲ + ;; + ;; All middleware above this point is written in the synchronous 1-arg style. All middleware below is written in + ;; async 4-arg style. Eventually the entire QP middleware stack will be rewritten in the async style. But not yet + ;; + ;; ▼▼▼ ASYNC MIDDLEWARE ▼▼▼ + async/async->sync cache/maybe-return-cached-results validate/validate-query normalize/normalize catch-exceptions/catch-exceptions process-userland-query/process-userland-query - constraints/add-default-userland-constraints)) + constraints/add-default-userland-constraints + async/async-setup)) ;; ▲▲▲ PRE-PROCESSING ▲▲▲ happens from BOTTOM-TO-TOP, e.g. the results of `expand-macros` are passed to ;; `substitute-parameters` diff --git a/src/metabase/query_processor/middleware/async.clj b/src/metabase/query_processor/middleware/async.clj new file mode 100644 index 0000000000000000000000000000000000000000..e5ce7503c70887b76d60cd3f0398dd69b9da54dd --- /dev/null +++ b/src/metabase/query_processor/middleware/async.clj @@ -0,0 +1,53 @@ +(ns metabase.query-processor.middleware.async + "Middleware for implementing async QP behavior." + (:require [clojure.core.async :as a] + [clojure.tools.logging :as log] + [metabase.async.util :as async.u] + [metabase.util.i18n :refer [trs]])) + +;;; +----------------------------------------------------------------------------------------------------------------+ +;;; | async->sync | +;;; +----------------------------------------------------------------------------------------------------------------+ + +(defn async->sync + "Async-style (4-arg) middleware that wraps the synchronous (1-arg) portion of the QP middleware." + [qp] + (fn [query respond raise canceled-chan] + (if (a/poll! canceled-chan) + (log/debug (trs "Request already canceled, will not run synchronous QP code.")) + (try + (respond (qp query)) + (catch Throwable e + (raise e)))))) + + +;;; +----------------------------------------------------------------------------------------------------------------+ +;;; | async-setup | +;;; +----------------------------------------------------------------------------------------------------------------+ + +(defn async-setup + "Middleware that creates the output/canceled channels for the asynchronous (4-arg) QP middleware and runs it. + + Our 4-arg middleware follows the same pattern as async 3-arg Ring middleware, with the addition of fourth + `canceled-chan` arg; this is a core.async channel that can be listened to to implement special query cancelation + behavior, such as canceling JDBC queries. If the output channel is closed before the query completes (i.e., API + request is canceled) this channel will receive a message; otherwise it will close whenever the output channel + closes." + [qp] + (fn [{:keys [async?], :as query}] + (let [out-chan (a/promise-chan) + canceled-chan (async.u/promise-canceled-chan out-chan) + respond (fn [result] + (a/>!! out-chan result) + (a/close! out-chan)) + raise (fn [e] + (log/warn e (trs "Unhandled exception, exepected `catch-exceptions` middleware to handle it")) + (respond e))] + (try + (qp query respond raise canceled-chan) + (catch Throwable e + (raise e))) + (let [result (a/<!! out-chan)] + (if (instance? Throwable result) + (throw result) + result))))) diff --git a/src/metabase/query_processor/middleware/cache.clj b/src/metabase/query_processor/middleware/cache.clj index c9ce68f0c27ed1da801ed15b702cbb0721895a32..ecbb6a41bdd89bc9bfac5e4a6722b054e7810358 100644 --- a/src/metabase/query_processor/middleware/cache.clj +++ b/src/metabase/query_processor/middleware/cache.clj @@ -90,25 +90,25 @@ (boolean (and (public-settings/enable-query-caching) cache-ttl))) -(defn- save-results-if-successful! [query-hash results] - (when (= (:status results) :completed) - (save-results! query-hash results))) - -(defn- run-query-and-save-results-if-successful! [query-hash qp query] - (let [start-time-ms (System/currentTimeMillis) - results (qp query) - total-time-ms (- (System/currentTimeMillis) start-time-ms) +(defn- save-results-if-successful! [query-hash start-time-ms {:keys [status], :as results}] + (let [total-time-ms (- (System/currentTimeMillis) start-time-ms) min-ttl-ms (* (public-settings/query-caching-min-ttl) 1000)] (log/info (format "Query took %d ms to run; miminum for cache eligibility is %d ms" total-time-ms min-ttl-ms)) - (when (>= total-time-ms min-ttl-ms) - (save-results-if-successful! query-hash results)) - results)) + (when (and (= status :completed) + (>= total-time-ms min-ttl-ms)) + (save-results! query-hash results)))) -(defn- run-query-with-cache [qp {:keys [cache-ttl], :as query}] - ;; TODO - Query will already have `info.hash` if it's a userland query. I'm not 100% sure it will be the same hash. +(defn- run-query-with-cache [qp {:keys [cache-ttl], :as query} respond raise canceled-chan] + ;; TODO - Query will already have `info.hash` if it's a userland query. I'm not 100% sure it will be the same hash, + ;; because this is calculated after normalization, instead of before (let [query-hash (qputil/query-hash query)] - (or (cached-results query-hash cache-ttl) - (run-query-and-save-results-if-successful! query-hash qp query)))) + (if-let [cached-results (cached-results query-hash cache-ttl)] + (respond cached-results) + (let [start-time (System/currentTimeMillis) + respond (fn [results] + (save-results-if-successful! query-hash start-time results) + (respond results))] + (qp query respond raise canceled-chan))))) (defn maybe-return-cached-results "Middleware for caching results of a query if applicable. @@ -123,13 +123,13 @@ running the query, satisfying this requirement.) * The result *rows* of the query must be less than `query-caching-max-kb` when serialized (before compression)." [qp] - (fn [query] + (fn [query respond raise canceled-chan] (if-not (is-cacheable? query) - (qp query) + (qp query respond raise canceled-chan) ;; wait until we're actually going to use the cache before initializing the backend. We don't want to initialize ;; it when the files get compiled, because that would give it the wrong version of the ;; `IQueryProcessorCacheBackend` protocol (do (when-not @backend-instance (set-backend!)) - (run-query-with-cache qp query))))) + (run-query-with-cache qp query respond raise canceled-chan))))) diff --git a/src/metabase/query_processor/middleware/catch_exceptions.clj b/src/metabase/query_processor/middleware/catch_exceptions.clj index 4a9b93f05306cb3e6a63759bdee49fa588188c59..64ed62457e0a411403170c34bd31cb618fd782c5 100644 --- a/src/metabase/query_processor/middleware/catch_exceptions.clj +++ b/src/metabase/query_processor/middleware/catch_exceptions.clj @@ -85,8 +85,12 @@ (defn catch-exceptions "Middleware for catching exceptions thrown by the query processor and returning them in a normal format." [qp] - (fn [query] - (try - (qp query) - (catch Throwable e - (format-exception query e))))) + ;; we're not using the version of `raise` passed in on purpose here -- that one is a placeholder -- this is the + ;; implementation of `raise` we expect most QP middleware to ultimately use + (fn [query respond _ canceled-chan] + (let [raise (fn [e] + (respond (format-exception query e)))] + (try + (qp query respond raise canceled-chan) + (catch Throwable e + (raise e)))))) diff --git a/src/metabase/query_processor/middleware/constraints.clj b/src/metabase/query_processor/middleware/constraints.clj index 94b222c0e90a93fc5318a734d64922cea643764d..3aaf520c9085772a069705de67f41ca48d6b36ac 100644 --- a/src/metabase/query_processor/middleware/constraints.clj +++ b/src/metabase/query_processor/middleware/constraints.clj @@ -33,4 +33,5 @@ "Middleware that optionally adds default `max-results` and `max-results-bare-rows` constraints to queries, meant for use with `process-query-and-save-with-max-results-constraints!`, which ultimately powers most QP API endpoints." [qp] - (comp qp add-default-userland-constraints*)) + (fn [query respond raise canceled-chan] + (qp (add-default-userland-constraints* query) respond raise canceled-chan))) diff --git a/src/metabase/query_processor/middleware/normalize_query.clj b/src/metabase/query_processor/middleware/normalize_query.clj index fbd576b52d2a8fb8bf4066caa116e73a536dad1e..3a1d7f076afd41c14acf5265e81a8bab82fb6196 100644 --- a/src/metabase/query_processor/middleware/normalize_query.clj +++ b/src/metabase/query_processor/middleware/normalize_query.clj @@ -7,4 +7,5 @@ into standard `lisp-case` ones, removing/rewriting legacy clauses, removing empty ones, etc. This is done to simplifiy the logic in the QP steps following this." [qp] - (comp qp normalize/normalize)) + (fn [query respond raise canceled-chan] + (qp (normalize/normalize query) respond raise canceled-chan))) diff --git a/src/metabase/query_processor/middleware/process_userland_query.clj b/src/metabase/query_processor/middleware/process_userland_query.clj index d2f8f44d857a235dd3fe85f59b65ea5eb3df8647..18b770d2cf471dd4bc38d5999a11ae7f7a66d3df 100644 --- a/src/metabase/query_processor/middleware/process_userland_query.clj +++ b/src/metabase/query_processor/middleware/process_userland_query.clj @@ -133,9 +133,10 @@ "Do extra handling 'userland' queries (i.e. ones ran as a result of a user action, e.g. an API call, scheduled Pulse, etc.). This includes recording QueryExecution entries and returning the results in an FE-client-friendly format." [qp] - (fn [{{:keys [userland-query?]} :middleware, :as query}] + (fn [{{:keys [userland-query?]} :middleware, :as query} respond raise canceled-chan] (if-not userland-query? - (qp query) + (qp query respond raise canceled-chan) ;; add calculated hash to query - (let [query (assoc-in query [:info :query-hash] (qputil/query-hash query))] - (format-userland-query-result (query-execution-info query) (qp query)))))) + (let [query (assoc-in query [:info :query-hash] (qputil/query-hash query)) + respond (comp respond (partial format-userland-query-result (query-execution-info query)))] + (qp query respond raise canceled-chan))))) diff --git a/src/metabase/query_processor/middleware/validate.clj b/src/metabase/query_processor/middleware/validate.clj index 5c914a4b6adefdd4a11274cc7dff11ea02201798..7e0e965365b8287ddb07da8170a063e2d6ae00d3 100644 --- a/src/metabase/query_processor/middleware/validate.clj +++ b/src/metabase/query_processor/middleware/validate.clj @@ -5,4 +5,6 @@ (defn validate-query "Middleware that validates a query immediately after normalization." [qp] - (comp qp mbql.s/validate-query)) + (fn [query respond raise canceled-chan] + (mbql.s/validate-query query) + (qp query respond raise canceled-chan))) diff --git a/test/metabase/async/util_test.clj b/test/metabase/async/util_test.clj index 4ea34c2dc4850ab60da25152acb953062708a4f5..b0d9aaae7acfe5f9716c04f92d6b705318bb756b 100644 --- a/test/metabase/async/util_test.clj +++ b/test/metabase/async/util_test.clj @@ -4,6 +4,33 @@ [metabase.async.util :as async.u] [metabase.test.util.async :as tu.async])) +(expect true (async.u/promise-chan? (a/promise-chan))) +(expect false (async.u/promise-chan? (a/chan 1))) +(expect false (async.u/promise-chan? (a/chan))) +(expect false (async.u/promise-chan? nil)) +(expect false (async.u/promise-chan? "ABC")) + +;;; --------------------------------------------- promise-canceled-chan ---------------------------------------------- + +;; make sure the canceled chan gets a message if the promise-chan it wraps closes before anything is written to it +(expect + ::async.u/canceled + (tu.async/with-open-channels [chan (a/promise-chan)] + (let [canceled-chan (async.u/promise-canceled-chan chan)] + (a/close! chan) + (first (a/alts!! [canceled-chan (a/timeout 1000)]))))) + +;; canceled-chan should close with no message if the channel it wraps gets a message before it closes +(expect + {:val nil, :canceled-chan? true} + (tu.async/with-open-channels [chan (a/promise-chan)] + (let [canceled-chan (async.u/promise-canceled-chan chan)] + (a/>!! chan "message") + (a/close! chan) + (let [[val port] (a/alts!! [canceled-chan (a/timeout 1000)])] + {:val val, :canceled-chan? (= port canceled-chan)})))) + + ;;; ----------------------------------------------- single-value-pipe ------------------------------------------------ ;; make sure `single-value-pipe` pipes a value from in-chan to out-chan diff --git a/test/metabase/query_processor/middleware/cache_test.clj b/test/metabase/query_processor/middleware/cache_test.clj index d6ed035f8a3ced53d8ecd6b46867fbee2260456c..ad6052ef62af5988c6fed9fd3012dcab9287ec37 100644 --- a/test/metabase/query_processor/middleware/cache_test.clj +++ b/test/metabase/query_processor/middleware/cache_test.clj @@ -1,6 +1,6 @@ (ns metabase.query-processor.middleware.cache-test "Tests for the Query Processor cache." - (:require [expectations :refer :all] + (:require [expectations :refer [expect]] [metabase.models.query-cache :refer [QueryCache]] [metabase.query-processor.middleware.cache :as cache] [metabase.test.util :as tu] @@ -20,9 +20,9 @@ (def ^:private ^:dynamic ^Integer *query-execution-delay-ms* 0) -(defn- mock-qp [& _] +(defn- mock-qp [_ respond _ _] (Thread/sleep *query-execution-delay-ms*) - mock-results) + (respond mock-results)) (def ^:private maybe-return-cached-results (cache/maybe-return-cached-results mock-qp)) @@ -34,7 +34,12 @@ :not-cached)) (defn- run-query [& {:as query-kvs}] - (cached? (maybe-return-cached-results (merge {:cache-ttl 60, :query :abc} query-kvs)))) + (cached? + (maybe-return-cached-results + (merge {:cache-ttl 60, :query :abc} query-kvs) + identity + (fn [e] (throw e)) + nil))) ;;; -------------------------------------------- tests for is-cacheable? --------------------------------------------- diff --git a/test/metabase/query_processor/middleware/catch_exceptions_test.clj b/test/metabase/query_processor/middleware/catch_exceptions_test.clj index 8d5756a7fae532b6716d842565428af8c81fb228..20708d179da909c9f16a8433f6a058d52941c758 100644 --- a/test/metabase/query_processor/middleware/catch_exceptions_test.clj +++ b/test/metabase/query_processor/middleware/catch_exceptions_test.clj @@ -2,15 +2,43 @@ (:require [expectations :refer [expect]] [metabase.query-processor.middleware.catch-exceptions :as catch-exceptions])) +(defn- catch-exceptions + ([qp] + (catch-exceptions qp {})) + ([qp query] + ((catch-exceptions/catch-exceptions qp) + query + identity + identity + nil))) + +;; No Exception -- should return response as-is (expect {} - ((catch-exceptions/catch-exceptions identity) {})) + (catch-exceptions + (fn [query respond _ _] + (respond query)))) + +;; if the QP throws an Exception (synchronously), should format the response appropriately +(expect + {:status :failed + :class java.lang.Exception + :error "Something went wrong" + :stacktrace true + :query {}} + (-> (catch-exceptions + (fn [& _] + (throw (Exception. "Something went wrong")))) + (update :stacktrace boolean))) +;; if an Exception is returned asynchronously by `raise`, should format it the same way (expect {:status :failed :class java.lang.Exception :error "Something went wrong" :stacktrace true :query {}} - (-> ((catch-exceptions/catch-exceptions (fn [_] (throw (Exception. "Something went wrong")))) {}) + (-> (catch-exceptions + (fn [_ _ raise _] + (raise (Exception. "Something went wrong")))) (update :stacktrace boolean)))