From b9a9f6234f6f9dc42435e4560abfd4010a112086 Mon Sep 17 00:00:00 2001 From: Cam Saul <cammsaul@gmail.com> Date: Tue, 16 Apr 2019 17:50:36 -0700 Subject: [PATCH] Move QP async wait logic into middleware :timer_clock: --- src/metabase/async/semaphore_channel.clj | 57 ++++++----- src/metabase/async/util.clj | 42 ++++---- src/metabase/query_processor.clj | 2 + src/metabase/query_processor/async.clj | 62 ++---------- .../query_processor/middleware/async.clj | 15 ++- .../query_processor/middleware/async_wait.clj | 55 +++++++++++ test/metabase/async/util_test.clj | 31 ++++++ .../middleware/async_wait_test.clj | 95 +++++++++++++++++++ 8 files changed, 256 insertions(+), 103 deletions(-) create mode 100644 src/metabase/query_processor/middleware/async_wait.clj create mode 100644 test/metabase/query_processor/middleware/async_wait_test.clj diff --git a/src/metabase/async/semaphore_channel.clj b/src/metabase/async/semaphore_channel.clj index 17dc1f5a28a..30a425a2cd8 100644 --- a/src/metabase/async/semaphore_channel.clj +++ b/src/metabase/async/semaphore_channel.clj @@ -2,7 +2,8 @@ (:require [clojure.core.async :as a] [clojure.tools.logging :as log] [metabase.async.util :as async.u] - [metabase.util.i18n :refer [trs]]) + [metabase.util.i18n :refer [trs]] + [schema.core :as s]) (:import java.io.Closeable java.util.concurrent.Semaphore)) @@ -70,45 +71,49 @@ second permit if this thread already has one." {}) -(defn- do-f-with-permit +(s/defn ^:private do-f-with-permit "Once a `permit` is obtained, execute `(apply f args)`, writing the results to `output-chan`, and returning the permit no matter what." - [^Closeable permit out-chan f & args] + [permit :- Closeable, out-chan :- async.u/PromiseChan, f & args] (try - (let [f (fn [] - (with-open [permit permit] - (try - (apply f args) - (catch Throwable e - e) - (finally - (log/debug (trs "f finished, permit will be returned"))))))] + (let [canceled-chan (async.u/single-value-pipe (apply async.u/do-on-separate-thread f args) out-chan)] + ;; canceled-chan will be called whenever `out-chan` is closed, either because `single-value-pipe` closes it when + ;; it gets a result from `do-on-separate-thread`, or because someone else closed it earlier (i.e. canceled API + ;; request). When this happens return the permit (a/go - (let [canceled-chan (async.u/single-value-pipe (async.u/do-on-separate-thread f) out-chan)] - (when (a/<! canceled-chan) - (log/debug (trs "request canceled, permit will be returned")) - (.close permit))))) + (if (a/<! canceled-chan) + (log/debug (trs "request canceled, permit will be returned")) + (log/debug (trs "f finished, permit will be returned"))) + (.close permit))) (catch Throwable e (log/error e (trs "Unexpected error attempting to run function after obtaining permit")) (a/>! out-chan e) - (.close permit)))) + (a/close! out-chan) + (.close permit))) + nil) -(defn- do-after-waiting-for-new-permit [semaphore-chan f & args] - (let [out-chan (a/chan 1)] +(s/defn ^:private do-after-waiting-for-new-permit :- async.u/PromiseChan + [semaphore-chan f & args] + (let [out-chan (a/promise-chan)] ;; fire off a go block to wait for a permit. (a/go (let [[permit first-done] (a/alts! [semaphore-chan out-chan])] - (binding [*permits* (assoc *permits* semaphore-chan permit)] - ;; If out-chan closes before we get a permit, there's nothing for us to do here. Otherwise if we got our - ;; permit then proceed - (if (= first-done out-chan) - (log/debug (trs "Not running pending function call: output channel already closed.")) - ;; otherwise if channel is still open run the function - (apply do-f-with-permit permit out-chan f args))))) + ;; If out-chan closes before we get a permit, there's nothing for us to do here. Otherwise if we got our + ;; permit then proceed + (condp = first-done + out-chan + (log/debug (trs "Not running pending function call: output channel already closed.")) + + ;; otherwise if channel is still open run the function + semaphore-chan + (if-not permit + (log/warn (trs "Warning: semaphore-channel is closed, will not run pending function call")) + (binding [*permits* (assoc *permits* semaphore-chan permit)] + (apply do-f-with-permit permit out-chan f args)))))) ;; return `out-chan` which can be used to wait for results out-chan)) -(defn do-after-receiving-permit +(s/defn do-after-receiving-permit :- async.u/PromiseChan "Run `(apply f args)` asynchronously after receiving a permit from `semaphore-chan`. Returns a channel from which you can fetch the results. Closing this channel before results are produced will cancel the function call." {:style/indent 1} diff --git a/src/metabase/async/util.clj b/src/metabase/async/util.clj index c0f33843291..5766babaac0 100644 --- a/src/metabase/async/util.clj +++ b/src/metabase/async/util.clj @@ -29,7 +29,7 @@ (a/close! canceled-chan)) canceled-chan)) -(s/defn single-value-pipe :- ManyToManyChannel +(s/defn single-value-pipe :- PromiseChan "Pipe that will forward a single message from `in-chan` to `out-chan`, closing both afterward. If `out-chan` is closed before `in-chan` produces a value, closes `in-chan`; this can be used to automatically cancel QP requests and the like. @@ -38,7 +38,7 @@ this message to implement special cancelation/close behavior, such as canceling async jobs. This channel automatically closes when either `in-chan` or `out-chan` closes." [in-chan :- ManyToManyChannel, out-chan :- ManyToManyChannel] - (let [canceled-chan (a/chan 1)] + (let [canceled-chan (a/promise-chan)] ;; fire off a block that will wait for either in-chan to produce a result or out-chan to be closed (a/go (try @@ -58,31 +58,35 @@ ;; return the canceled chan in case someone wants to listen to it canceled-chan)) -(defn do-on-separate-thread +(s/defn do-on-separate-thread :- PromiseChan "Run `(apply f args)` on a separate thread, returns a channel to fetch the results. Closing this channel early will cancel the future running the function, if possible." [f & args] - (let [in-chan (a/chan 1) - out-chan (a/chan 1) - canceled-chan (single-value-pipe in-chan out-chan) + (let [out-chan (a/promise-chan) + canceled-chan (promise-canceled-chan out-chan) ;; Run `f` on a separarate thread because it's a potentially long-running QP query and we don't want to tie ;; up precious core.async threads futur (future - (if-not (= ::open (first (a/alts!! [out-chan] :default ::open))) - (log/debug (trs "Output channel closed, will skip running {0}." f)) - (do - (log/debug (trs "Running {0} on separate thread..." f)) - (try - (let [result (apply f args)] - (a/put! in-chan result)) - ;; if we catch an Exception (shouldn't happen in a QP query, but just in case), send it to `chan`. It's ok, - ;; our IMPL of Ring `StreamableResponseBody` will do the right thing with it. - (catch Throwable e - (log/error e (trs "Caught error running {0}" f)) - (a/put! in-chan e))))))] + (try + (if (a/poll! canceled-chan) + (log/debug (trs "Output channel closed, will skip running {0}." f)) + (do + (log/debug (trs "Running {0} on separate thread..." f)) + (try + (let [result (apply f args)] + (if (some? result) + (a/put! out-chan result) + (log/warn "Warning: {0} returned `nil`" f))) + ;; if we catch an Exception (shouldn't happen in a QP query, but just in case), send it to `chan`. + ;; It's ok, our IMPL of Ring `StreamableResponseBody` will do the right thing with it. + (catch Throwable e + (log/error e (trs "Caught error running {0}" f)) + (a/put! out-chan e))))) + (finally + (a/close! out-chan))))] (a/go - (when-let [canceled (a/<! canceled-chan)] + (when (a/<! canceled-chan) (log/debug (trs "Request canceled, canceling future")) (future-cancel futur))) diff --git a/src/metabase/query_processor.clj b/src/metabase/query_processor.clj index e31095d4392..49592cd073f 100644 --- a/src/metabase/query_processor.clj +++ b/src/metabase/query_processor.clj @@ -12,6 +12,7 @@ [add-settings :as add-settings] [annotate :as annotate] [async :as async] + [async-wait :as async-wait] [auto-bucket-datetimes :as bucket-datetime] [bind-effective-timezone :as bind-timezone] [binning :as binning] @@ -141,6 +142,7 @@ ;; ;; ▼▼▼ ASYNC MIDDLEWARE ▼▼▼ async/async->sync + async-wait/wait-for-permit cache/maybe-return-cached-results validate/validate-query normalize/normalize diff --git a/src/metabase/query_processor/async.clj b/src/metabase/query_processor/async.clj index c703dd22b09..86baee73498 100644 --- a/src/metabase/query_processor/async.clj +++ b/src/metabase/query_processor/async.clj @@ -1,24 +1,13 @@ (ns metabase.query-processor.async "Async versions of the usual public query processor functions. Instead of blocking while the query is ran, these - functions all return a `core.async` channel that can be used to fetch the results when they become available. - - Each connected database is limited to a maximum of 15 simultaneous queries (configurable) using these methods; any - additional queries will park the thread. Super-useful for writing high-performance API endpoints. Prefer these - methods to the old-school synchronous versions. - - How is this achieved? For each Database, we'll maintain a channel that acts as a counting semaphore; the channel - will initially contain 15 permits. Each incoming request will asynchronously read from the channel until it acquires - a permit, then put it back when it finishes." + functions all return a `core.async` channel that can be used to fetch the results when they become available." (:require [clojure.core.async :as a] [clojure.tools.logging :as log] [metabase [query-processor :as qp] [util :as u]] [metabase.api.common :as api] - [metabase.async - [semaphore-channel :as semaphore-channel] - [util :as async.u]] - [metabase.models.setting :refer [defsetting]] + [metabase.async.util :as async.u] [metabase.query-processor [interface :as qpi] [util :as qputil]] @@ -26,58 +15,26 @@ [schema.core :as s]) (:import clojure.core.async.impl.channels.ManyToManyChannel)) -(defsetting max-simultaneous-queries-per-db - (trs "Maximum number of simultaneous queries to allow per connected Database.") - :type :integer - :default 15) - - -(defonce ^:private db-semaphore-channels (atom {})) - -(defn- fetch-db-semaphore-channel - "Fetch the counting semaphore channel for a Database, creating it if not already created." - [database-or-id] - (let [id (u/get-id database-or-id)] - (or - ;; channel already exists - (@db-semaphore-channels id) - ;; channel does not exist, Create a channel and stick it in the atom - (let [ch (semaphore-channel/semaphore-channel (max-simultaneous-queries-per-db)) - new-ch ((swap! db-semaphore-channels update id #(or % ch)) id)] - ;; ok, if the value swapped into the atom was a different channel (another thread beat us to it) then close our - ;; newly created channel - (when-not (= ch new-ch) - (a/close! ch)) - ;; return the newly created channel - new-ch)))) - -(defn- do-async - "Execute `f` asynchronously, waiting to receive a permit from `db`'s semaphore channel before proceeding. Returns the - results in a channel." - [db f & args] - (let [semaphore-chan (fetch-db-semaphore-channel db)] - (apply semaphore-channel/do-after-receiving-permit semaphore-chan f args))) - -(defn process-query +(s/defn process-query :- async.u/PromiseChan "Async version of `metabase.query-processor/process-query`. Runs query asynchronously, and returns a `core.async` channel that can be used to fetch the results once the query finishes running. Closing the channel will cancel the query." [query] - (do-async (:database query) qp/process-query query)) + (qp/process-query (assoc query :async? true))) -(defn process-query-and-save-execution! +(s/defn process-query-and-save-execution! :- async.u/PromiseChan "Async version of `metabase.query-processor/process-query-and-save-execution!`. Runs query asynchronously, and returns a `core.async` channel that can be used to fetch the results once the query finishes running. Closing the channel will cancel the query." [query options] - (do-async (:database query) qp/process-query-and-save-execution! query options)) + (qp/process-query-and-save-execution! (assoc query :async? true) options)) -(defn process-query-and-save-with-max-results-constraints! +(s/defn process-query-and-save-with-max-results-constraints! :- async.u/PromiseChan "Async version of `metabase.query-processor/process-query-and-save-with-max-results-constraints!`. Runs query asynchronously, and returns a `core.async` channel that can be used to fetch the results once the query finishes running. Closing the channel will cancel the query." [query options] - (do-async (:database query) qp/process-query-and-save-with-max-results-constraints! query options)) + (qp/process-query-and-save-with-max-results-constraints! (assoc query :async? true) options)) ;;; ------------------------------------------------ Result Metadata ------------------------------------------------- @@ -110,8 +67,7 @@ ;; (normally middleware takes care of calculating query hashes for 'userland' queries but this is not ;; technically a userland query -- we don't want to save a QueryExecution -- so we need to add `executed-by` ;; and `query-hash` ourselves so the remark gets added) - (assoc-in query [:info :query-hash] (qputil/query-hash query))) - )) + (assoc-in query [:info :query-hash] (qputil/query-hash query))))) out-chan) ;; return out-chan out-chan)) diff --git a/src/metabase/query_processor/middleware/async.clj b/src/metabase/query_processor/middleware/async.clj index e5ce7503c70..a2c759e264e 100644 --- a/src/metabase/query_processor/middleware/async.clj +++ b/src/metabase/query_processor/middleware/async.clj @@ -38,7 +38,9 @@ (let [out-chan (a/promise-chan) canceled-chan (async.u/promise-canceled-chan out-chan) respond (fn [result] - (a/>!! out-chan result) + (if (some? result) + (a/>!! out-chan result) + (log/warn (trs "Warning: `respond` as passed `nil`."))) (a/close! out-chan)) raise (fn [e] (log/warn e (trs "Unhandled exception, exepected `catch-exceptions` middleware to handle it")) @@ -47,7 +49,10 @@ (qp query respond raise canceled-chan) (catch Throwable e (raise e))) - (let [result (a/<!! out-chan)] - (if (instance? Throwable result) - (throw result) - result))))) + ;; if query is `async?` return the output channel; otherwise block until output channel returns a result + (if async? + out-chan + (let [result (a/<!! out-chan)] + (if (instance? Throwable result) + (throw result) + result)))))) diff --git a/src/metabase/query_processor/middleware/async_wait.clj b/src/metabase/query_processor/middleware/async_wait.clj new file mode 100644 index 00000000000..5370400d189 --- /dev/null +++ b/src/metabase/query_processor/middleware/async_wait.clj @@ -0,0 +1,55 @@ +(ns metabase.query-processor.middleware.async-wait + "Middleware that limits the number of concurrent queries for each database. + + Each connected database is limited to a maximum of 15 simultaneous queries (configurable) using these methods; any + additional queries will park the thread. Super-useful for writing high-performance API endpoints. Prefer these + methods to the old-school synchronous versions. + + How is this achieved? For each Database, we'll maintain a channel that acts as a counting semaphore; the channel + will initially contain 15 permits. Each incoming request will asynchronously read from the channel until it acquires + a permit, then put it back when it finishes." + (:require [clojure.core.async :as a] + [metabase.async.semaphore-channel :as semaphore-channel] + [metabase.models.setting :refer [defsetting]] + [metabase.util :as u] + [metabase.util.i18n :refer [trs]])) + +(defsetting max-simultaneous-queries-per-db + (trs "Maximum number of simultaneous queries to allow per connected Database.") + :type :integer + :default 15) + +(defonce ^:private db-semaphore-channels (atom {})) + +(defn- fetch-db-semaphore-channel + "Fetch the counting semaphore channel for a Database, creating it if not already created." + [database-or-id] + (let [id (u/get-id database-or-id)] + (or + ;; channel already exists + (@db-semaphore-channels id) + ;; channel does not exist, Create a channel and stick it in the atom + (let [ch (semaphore-channel/semaphore-channel (max-simultaneous-queries-per-db)) + new-ch ((swap! db-semaphore-channels update id #(or % ch)) id)] + ;; ok, if the value swapped into the atom was a different channel (another thread beat us to it) then close our + ;; newly created channel + (when-not (= ch new-ch) + (a/close! ch)) + ;; return the newly created channel + new-ch)))) + +(defn wait-for-permit + "Middleware that throttles the number of concurrent queries for each connected database, parking the thread until a + permit becomes available." + [qp] + (fn [{database-id :database, :as query} respond raise canceled-chan] + (let [semaphore-chan (fetch-db-semaphore-channel database-id) + output-chan (semaphore-channel/do-after-receiving-permit semaphore-chan + qp query respond raise canceled-chan)] + (a/go + (respond (a/<! output-chan)) + (a/close! output-chan)) + (a/go + (when (a/<! canceled-chan) + (a/close! output-chan))) + nil))) diff --git a/test/metabase/async/util_test.clj b/test/metabase/async/util_test.clj index b0d9aaae7ac..916dea3ce2c 100644 --- a/test/metabase/async/util_test.clj +++ b/test/metabase/async/util_test.clj @@ -30,6 +30,37 @@ (let [[val port] (a/alts!! [canceled-chan (a/timeout 1000)])] {:val val, :canceled-chan? (= port canceled-chan)})))) +;; canceled-chan should be a promise-chan which means we can fetch results more than once +(expect + ::async.u/canceled + (tu.async/with-open-channels [chan (a/promise-chan)] + (let [canceled-chan (async.u/promise-canceled-chan chan)] + (a/close! chan) + (first (a/alts!! [canceled-chan (a/timeout 1000)])) + (first (a/alts!! [canceled-chan (a/timeout 1000)]))))) + +;; can we add multiple canceled-chans to the same channel? +(expect + {1 ::async.u/canceled, 2 ::async.u/canceled} + (tu.async/with-open-channels [chan (a/promise-chan)] + (let [canceled-chans {1 (async.u/promise-canceled-chan chan) + 2 (async.u/promise-canceled-chan chan)}] + (a/close! chan) + (into {} (for [[id chan] canceled-chans] + [id (first (a/alts!! [chan (a/timeout 1000)]))]))))) + +(expect + {1 {:val nil, :canceled-chan? true} + 2 {:val nil, :canceled-chan? true}} + (tu.async/with-open-channels [chan (a/promise-chan)] + (let [canceled-chans {1 (async.u/promise-canceled-chan chan) + 2 (async.u/promise-canceled-chan chan)}] + (a/>!! chan "message") + (a/close! chan) + (into {} (for [[id chan] canceled-chans] + (let [[val port] (a/alts!! [chan (a/timeout 1000)])] + [id {:val val, :canceled-chan? (= port chan)}])))))) + ;;; ----------------------------------------------- single-value-pipe ------------------------------------------------ diff --git a/test/metabase/query_processor/middleware/async_wait_test.clj b/test/metabase/query_processor/middleware/async_wait_test.clj new file mode 100644 index 00000000000..9478a8e4665 --- /dev/null +++ b/test/metabase/query_processor/middleware/async_wait_test.clj @@ -0,0 +1,95 @@ +(ns metabase.query-processor.middleware.async-wait-test + (:require [clojure.core.async :as a] + [expectations :refer [expect]] + [metabase.query-processor.middleware.async-wait :as async-wait] + [metabase.test.util.async :as tu.async]) + (:import java.io.Closeable)) + +(defn- async-wait + "Mocked version of `async-wait/wait-for-permit` middleware. Runs `f` with 3 channels: + + * `result-chan` -- a channel will receive the result iff the rest of the QP pipeline is invoked + * `semaphore-chan` -- a mocked semapahore channel that `wait-for-permit` will wait to receive a permit from + * `canceled-chan` -- a channel you can pass a cancellation message to to simulate request cancelation" + [f] + (tu.async/with-open-channels [result-chan (a/promise-chan) + semaphore-chan (a/chan 15) + canceled-chan (a/promise-chan)] + (let [qp (fn [_ respond _ _] + (respond ::result))] + (with-redefs [async-wait/fetch-db-semaphore-channel (constantly semaphore-chan)] + (let [query {} + respond (fn [result] + (when result + (a/>!! result-chan result)) + (a/close! result-chan))] + ((async-wait/wait-for-permit qp) query respond respond canceled-chan)) + (f {:result-chan result-chan, :semaphore-chan semaphore-chan, :canceled-chan canceled-chan}))))) + +(defprotocol ^:private NotifyClosed + (^:private on-close-chan [this] + "Returns a channel that will get a `::closed` message when `.close` is called on the object.")) + +(defn- permit + "Return a mocked permit object to passed to the mocked semaphore channel. You can check whether this was closed + correctly using `on-close-chan` above." + [] + (let [closed-chan (a/promise-chan)] + (reify + Closeable + (close [_] + (a/>!! closed-chan ::closed) + (a/close! closed-chan)) + NotifyClosed + (on-close-chan [_] + closed-chan)))) + +(defn- wait-for-result + "Wait up to `timeout-ms` (default 200) for a result from `chan`, or return a `::timed-out` message." + ([chan] + (wait-for-result chan 200)) + ([chan timeout-ms] + (let [[val port] (a/alts!! [chan (a/timeout timeout-ms)])] + (if (not= port chan) + ::timed-out + val)))) + +;; QP should run if semaphore-chan gets a permit. Permit should be closed after QP finishes. +(expect + {:result ::result, :permit-taken? true, :permit-closed? true} + (async-wait + (fn [{:keys [semaphore-chan result-chan]}] + (let [permit (permit)] + ;; provide a permit async after a short delay + (a/go + (a/<! (a/timeout 10)) + (a/>! semaphore-chan permit)) + {:result (wait-for-result result-chan) + :permit-taken? (= (wait-for-result semaphore-chan) ::timed-out) + :permit-closed? (= (wait-for-result (on-close-chan permit)) ::closed)})))) + +;; If semaphore-chan never gets a permit, then the QP should never run +(expect + {:result ::timed-out, :permit-closed? false} + (async-wait + (fn [{:keys [result-chan]}] + (let [permit (permit)] + {:result (wait-for-result result-chan) + :permit-closed? (= (wait-for-result (on-close-chan permit)) ::closed)})))) + +;; if canceled-chan gets a message before permit is provided, QP should never run +(expect + {:result nil + :permit-taken? false + :permit-closed? false} + (async-wait + (fn [{:keys [result-chan semaphore-chan canceled-chan]}] + (let [permit (permit)] + (a/go + (a/<! (a/timeout 10)) + (a/>! canceled-chan :canceled) + (a/<! (a/timeout 100)) + (a/>! semaphore-chan permit)) + {:result (wait-for-result result-chan) + :permit-taken? (= (wait-for-result semaphore-chan) ::timed-out) + :permit-closed? (= (wait-for-result (on-close-chan permit)) ::closed)})))) -- GitLab