Skip to content
Snippets Groups Projects
Unverified Commit b9a9f623 authored by Cam Saul's avatar Cam Saul
Browse files

Move QP async wait logic into middleware :timer:

parent 3ecbffe0
No related branches found
No related tags found
No related merge requests found
......@@ -2,7 +2,8 @@
(:require [clojure.core.async :as a]
[clojure.tools.logging :as log]
[metabase.async.util :as async.u]
[metabase.util.i18n :refer [trs]])
[metabase.util.i18n :refer [trs]]
[schema.core :as s])
(:import java.io.Closeable
java.util.concurrent.Semaphore))
......@@ -70,45 +71,49 @@
second permit if this thread already has one."
{})
(defn- do-f-with-permit
(s/defn ^:private do-f-with-permit
"Once a `permit` is obtained, execute `(apply f args)`, writing the results to `output-chan`, and returning the permit
no matter what."
[^Closeable permit out-chan f & args]
[permit :- Closeable, out-chan :- async.u/PromiseChan, f & args]
(try
(let [f (fn []
(with-open [permit permit]
(try
(apply f args)
(catch Throwable e
e)
(finally
(log/debug (trs "f finished, permit will be returned"))))))]
(let [canceled-chan (async.u/single-value-pipe (apply async.u/do-on-separate-thread f args) out-chan)]
;; canceled-chan will be called whenever `out-chan` is closed, either because `single-value-pipe` closes it when
;; it gets a result from `do-on-separate-thread`, or because someone else closed it earlier (i.e. canceled API
;; request). When this happens return the permit
(a/go
(let [canceled-chan (async.u/single-value-pipe (async.u/do-on-separate-thread f) out-chan)]
(when (a/<! canceled-chan)
(log/debug (trs "request canceled, permit will be returned"))
(.close permit)))))
(if (a/<! canceled-chan)
(log/debug (trs "request canceled, permit will be returned"))
(log/debug (trs "f finished, permit will be returned")))
(.close permit)))
(catch Throwable e
(log/error e (trs "Unexpected error attempting to run function after obtaining permit"))
(a/>! out-chan e)
(.close permit))))
(a/close! out-chan)
(.close permit)))
nil)
(defn- do-after-waiting-for-new-permit [semaphore-chan f & args]
(let [out-chan (a/chan 1)]
(s/defn ^:private do-after-waiting-for-new-permit :- async.u/PromiseChan
[semaphore-chan f & args]
(let [out-chan (a/promise-chan)]
;; fire off a go block to wait for a permit.
(a/go
(let [[permit first-done] (a/alts! [semaphore-chan out-chan])]
(binding [*permits* (assoc *permits* semaphore-chan permit)]
;; If out-chan closes before we get a permit, there's nothing for us to do here. Otherwise if we got our
;; permit then proceed
(if (= first-done out-chan)
(log/debug (trs "Not running pending function call: output channel already closed."))
;; otherwise if channel is still open run the function
(apply do-f-with-permit permit out-chan f args)))))
;; If out-chan closes before we get a permit, there's nothing for us to do here. Otherwise if we got our
;; permit then proceed
(condp = first-done
out-chan
(log/debug (trs "Not running pending function call: output channel already closed."))
;; otherwise if channel is still open run the function
semaphore-chan
(if-not permit
(log/warn (trs "Warning: semaphore-channel is closed, will not run pending function call"))
(binding [*permits* (assoc *permits* semaphore-chan permit)]
(apply do-f-with-permit permit out-chan f args))))))
;; return `out-chan` which can be used to wait for results
out-chan))
(defn do-after-receiving-permit
(s/defn do-after-receiving-permit :- async.u/PromiseChan
"Run `(apply f args)` asynchronously after receiving a permit from `semaphore-chan`. Returns a channel from which you
can fetch the results. Closing this channel before results are produced will cancel the function call."
{:style/indent 1}
......
......@@ -29,7 +29,7 @@
(a/close! canceled-chan))
canceled-chan))
(s/defn single-value-pipe :- ManyToManyChannel
(s/defn single-value-pipe :- PromiseChan
"Pipe that will forward a single message from `in-chan` to `out-chan`, closing both afterward. If `out-chan` is closed
before `in-chan` produces a value, closes `in-chan`; this can be used to automatically cancel QP requests and the
like.
......@@ -38,7 +38,7 @@
this message to implement special cancelation/close behavior, such as canceling async jobs. This channel
automatically closes when either `in-chan` or `out-chan` closes."
[in-chan :- ManyToManyChannel, out-chan :- ManyToManyChannel]
(let [canceled-chan (a/chan 1)]
(let [canceled-chan (a/promise-chan)]
;; fire off a block that will wait for either in-chan to produce a result or out-chan to be closed
(a/go
(try
......@@ -58,31 +58,35 @@
;; return the canceled chan in case someone wants to listen to it
canceled-chan))
(defn do-on-separate-thread
(s/defn do-on-separate-thread :- PromiseChan
"Run `(apply f args)` on a separate thread, returns a channel to fetch the results. Closing this channel early will
cancel the future running the function, if possible."
[f & args]
(let [in-chan (a/chan 1)
out-chan (a/chan 1)
canceled-chan (single-value-pipe in-chan out-chan)
(let [out-chan (a/promise-chan)
canceled-chan (promise-canceled-chan out-chan)
;; Run `f` on a separarate thread because it's a potentially long-running QP query and we don't want to tie
;; up precious core.async threads
futur
(future
(if-not (= ::open (first (a/alts!! [out-chan] :default ::open)))
(log/debug (trs "Output channel closed, will skip running {0}." f))
(do
(log/debug (trs "Running {0} on separate thread..." f))
(try
(let [result (apply f args)]
(a/put! in-chan result))
;; if we catch an Exception (shouldn't happen in a QP query, but just in case), send it to `chan`. It's ok,
;; our IMPL of Ring `StreamableResponseBody` will do the right thing with it.
(catch Throwable e
(log/error e (trs "Caught error running {0}" f))
(a/put! in-chan e))))))]
(try
(if (a/poll! canceled-chan)
(log/debug (trs "Output channel closed, will skip running {0}." f))
(do
(log/debug (trs "Running {0} on separate thread..." f))
(try
(let [result (apply f args)]
(if (some? result)
(a/put! out-chan result)
(log/warn "Warning: {0} returned `nil`" f)))
;; if we catch an Exception (shouldn't happen in a QP query, but just in case), send it to `chan`.
;; It's ok, our IMPL of Ring `StreamableResponseBody` will do the right thing with it.
(catch Throwable e
(log/error e (trs "Caught error running {0}" f))
(a/put! out-chan e)))))
(finally
(a/close! out-chan))))]
(a/go
(when-let [canceled (a/<! canceled-chan)]
(when (a/<! canceled-chan)
(log/debug (trs "Request canceled, canceling future"))
(future-cancel futur)))
......
......@@ -12,6 +12,7 @@
[add-settings :as add-settings]
[annotate :as annotate]
[async :as async]
[async-wait :as async-wait]
[auto-bucket-datetimes :as bucket-datetime]
[bind-effective-timezone :as bind-timezone]
[binning :as binning]
......@@ -141,6 +142,7 @@
;;
;; ▼▼▼ ASYNC MIDDLEWARE ▼▼▼
async/async->sync
async-wait/wait-for-permit
cache/maybe-return-cached-results
validate/validate-query
normalize/normalize
......
(ns metabase.query-processor.async
"Async versions of the usual public query processor functions. Instead of blocking while the query is ran, these
functions all return a `core.async` channel that can be used to fetch the results when they become available.
Each connected database is limited to a maximum of 15 simultaneous queries (configurable) using these methods; any
additional queries will park the thread. Super-useful for writing high-performance API endpoints. Prefer these
methods to the old-school synchronous versions.
How is this achieved? For each Database, we'll maintain a channel that acts as a counting semaphore; the channel
will initially contain 15 permits. Each incoming request will asynchronously read from the channel until it acquires
a permit, then put it back when it finishes."
functions all return a `core.async` channel that can be used to fetch the results when they become available."
(:require [clojure.core.async :as a]
[clojure.tools.logging :as log]
[metabase
[query-processor :as qp]
[util :as u]]
[metabase.api.common :as api]
[metabase.async
[semaphore-channel :as semaphore-channel]
[util :as async.u]]
[metabase.models.setting :refer [defsetting]]
[metabase.async.util :as async.u]
[metabase.query-processor
[interface :as qpi]
[util :as qputil]]
......@@ -26,58 +15,26 @@
[schema.core :as s])
(:import clojure.core.async.impl.channels.ManyToManyChannel))
(defsetting max-simultaneous-queries-per-db
(trs "Maximum number of simultaneous queries to allow per connected Database.")
:type :integer
:default 15)
(defonce ^:private db-semaphore-channels (atom {}))
(defn- fetch-db-semaphore-channel
"Fetch the counting semaphore channel for a Database, creating it if not already created."
[database-or-id]
(let [id (u/get-id database-or-id)]
(or
;; channel already exists
(@db-semaphore-channels id)
;; channel does not exist, Create a channel and stick it in the atom
(let [ch (semaphore-channel/semaphore-channel (max-simultaneous-queries-per-db))
new-ch ((swap! db-semaphore-channels update id #(or % ch)) id)]
;; ok, if the value swapped into the atom was a different channel (another thread beat us to it) then close our
;; newly created channel
(when-not (= ch new-ch)
(a/close! ch))
;; return the newly created channel
new-ch))))
(defn- do-async
"Execute `f` asynchronously, waiting to receive a permit from `db`'s semaphore channel before proceeding. Returns the
results in a channel."
[db f & args]
(let [semaphore-chan (fetch-db-semaphore-channel db)]
(apply semaphore-channel/do-after-receiving-permit semaphore-chan f args)))
(defn process-query
(s/defn process-query :- async.u/PromiseChan
"Async version of `metabase.query-processor/process-query`. Runs query asynchronously, and returns a `core.async`
channel that can be used to fetch the results once the query finishes running. Closing the channel will cancel the
query."
[query]
(do-async (:database query) qp/process-query query))
(qp/process-query (assoc query :async? true)))
(defn process-query-and-save-execution!
(s/defn process-query-and-save-execution! :- async.u/PromiseChan
"Async version of `metabase.query-processor/process-query-and-save-execution!`. Runs query asynchronously, and returns
a `core.async` channel that can be used to fetch the results once the query finishes running. Closing the channel
will cancel the query."
[query options]
(do-async (:database query) qp/process-query-and-save-execution! query options))
(qp/process-query-and-save-execution! (assoc query :async? true) options))
(defn process-query-and-save-with-max-results-constraints!
(s/defn process-query-and-save-with-max-results-constraints! :- async.u/PromiseChan
"Async version of `metabase.query-processor/process-query-and-save-with-max-results-constraints!`. Runs query
asynchronously, and returns a `core.async` channel that can be used to fetch the results once the query finishes
running. Closing the channel will cancel the query."
[query options]
(do-async (:database query) qp/process-query-and-save-with-max-results-constraints! query options))
(qp/process-query-and-save-with-max-results-constraints! (assoc query :async? true) options))
;;; ------------------------------------------------ Result Metadata -------------------------------------------------
......@@ -110,8 +67,7 @@
;; (normally middleware takes care of calculating query hashes for 'userland' queries but this is not
;; technically a userland query -- we don't want to save a QueryExecution -- so we need to add `executed-by`
;; and `query-hash` ourselves so the remark gets added)
(assoc-in query [:info :query-hash] (qputil/query-hash query)))
))
(assoc-in query [:info :query-hash] (qputil/query-hash query)))))
out-chan)
;; return out-chan
out-chan))
......@@ -38,7 +38,9 @@
(let [out-chan (a/promise-chan)
canceled-chan (async.u/promise-canceled-chan out-chan)
respond (fn [result]
(a/>!! out-chan result)
(if (some? result)
(a/>!! out-chan result)
(log/warn (trs "Warning: `respond` as passed `nil`.")))
(a/close! out-chan))
raise (fn [e]
(log/warn e (trs "Unhandled exception, exepected `catch-exceptions` middleware to handle it"))
......@@ -47,7 +49,10 @@
(qp query respond raise canceled-chan)
(catch Throwable e
(raise e)))
(let [result (a/<!! out-chan)]
(if (instance? Throwable result)
(throw result)
result)))))
;; if query is `async?` return the output channel; otherwise block until output channel returns a result
(if async?
out-chan
(let [result (a/<!! out-chan)]
(if (instance? Throwable result)
(throw result)
result))))))
(ns metabase.query-processor.middleware.async-wait
"Middleware that limits the number of concurrent queries for each database.
Each connected database is limited to a maximum of 15 simultaneous queries (configurable) using these methods; any
additional queries will park the thread. Super-useful for writing high-performance API endpoints. Prefer these
methods to the old-school synchronous versions.
How is this achieved? For each Database, we'll maintain a channel that acts as a counting semaphore; the channel
will initially contain 15 permits. Each incoming request will asynchronously read from the channel until it acquires
a permit, then put it back when it finishes."
(:require [clojure.core.async :as a]
[metabase.async.semaphore-channel :as semaphore-channel]
[metabase.models.setting :refer [defsetting]]
[metabase.util :as u]
[metabase.util.i18n :refer [trs]]))
(defsetting max-simultaneous-queries-per-db
(trs "Maximum number of simultaneous queries to allow per connected Database.")
:type :integer
:default 15)
(defonce ^:private db-semaphore-channels (atom {}))
(defn- fetch-db-semaphore-channel
"Fetch the counting semaphore channel for a Database, creating it if not already created."
[database-or-id]
(let [id (u/get-id database-or-id)]
(or
;; channel already exists
(@db-semaphore-channels id)
;; channel does not exist, Create a channel and stick it in the atom
(let [ch (semaphore-channel/semaphore-channel (max-simultaneous-queries-per-db))
new-ch ((swap! db-semaphore-channels update id #(or % ch)) id)]
;; ok, if the value swapped into the atom was a different channel (another thread beat us to it) then close our
;; newly created channel
(when-not (= ch new-ch)
(a/close! ch))
;; return the newly created channel
new-ch))))
(defn wait-for-permit
"Middleware that throttles the number of concurrent queries for each connected database, parking the thread until a
permit becomes available."
[qp]
(fn [{database-id :database, :as query} respond raise canceled-chan]
(let [semaphore-chan (fetch-db-semaphore-channel database-id)
output-chan (semaphore-channel/do-after-receiving-permit semaphore-chan
qp query respond raise canceled-chan)]
(a/go
(respond (a/<! output-chan))
(a/close! output-chan))
(a/go
(when (a/<! canceled-chan)
(a/close! output-chan)))
nil)))
......@@ -30,6 +30,37 @@
(let [[val port] (a/alts!! [canceled-chan (a/timeout 1000)])]
{:val val, :canceled-chan? (= port canceled-chan)}))))
;; canceled-chan should be a promise-chan which means we can fetch results more than once
(expect
::async.u/canceled
(tu.async/with-open-channels [chan (a/promise-chan)]
(let [canceled-chan (async.u/promise-canceled-chan chan)]
(a/close! chan)
(first (a/alts!! [canceled-chan (a/timeout 1000)]))
(first (a/alts!! [canceled-chan (a/timeout 1000)])))))
;; can we add multiple canceled-chans to the same channel?
(expect
{1 ::async.u/canceled, 2 ::async.u/canceled}
(tu.async/with-open-channels [chan (a/promise-chan)]
(let [canceled-chans {1 (async.u/promise-canceled-chan chan)
2 (async.u/promise-canceled-chan chan)}]
(a/close! chan)
(into {} (for [[id chan] canceled-chans]
[id (first (a/alts!! [chan (a/timeout 1000)]))])))))
(expect
{1 {:val nil, :canceled-chan? true}
2 {:val nil, :canceled-chan? true}}
(tu.async/with-open-channels [chan (a/promise-chan)]
(let [canceled-chans {1 (async.u/promise-canceled-chan chan)
2 (async.u/promise-canceled-chan chan)}]
(a/>!! chan "message")
(a/close! chan)
(into {} (for [[id chan] canceled-chans]
(let [[val port] (a/alts!! [chan (a/timeout 1000)])]
[id {:val val, :canceled-chan? (= port chan)}]))))))
;;; ----------------------------------------------- single-value-pipe ------------------------------------------------
......
(ns metabase.query-processor.middleware.async-wait-test
(:require [clojure.core.async :as a]
[expectations :refer [expect]]
[metabase.query-processor.middleware.async-wait :as async-wait]
[metabase.test.util.async :as tu.async])
(:import java.io.Closeable))
(defn- async-wait
"Mocked version of `async-wait/wait-for-permit` middleware. Runs `f` with 3 channels:
* `result-chan` -- a channel will receive the result iff the rest of the QP pipeline is invoked
* `semaphore-chan` -- a mocked semapahore channel that `wait-for-permit` will wait to receive a permit from
* `canceled-chan` -- a channel you can pass a cancellation message to to simulate request cancelation"
[f]
(tu.async/with-open-channels [result-chan (a/promise-chan)
semaphore-chan (a/chan 15)
canceled-chan (a/promise-chan)]
(let [qp (fn [_ respond _ _]
(respond ::result))]
(with-redefs [async-wait/fetch-db-semaphore-channel (constantly semaphore-chan)]
(let [query {}
respond (fn [result]
(when result
(a/>!! result-chan result))
(a/close! result-chan))]
((async-wait/wait-for-permit qp) query respond respond canceled-chan))
(f {:result-chan result-chan, :semaphore-chan semaphore-chan, :canceled-chan canceled-chan})))))
(defprotocol ^:private NotifyClosed
(^:private on-close-chan [this]
"Returns a channel that will get a `::closed` message when `.close` is called on the object."))
(defn- permit
"Return a mocked permit object to passed to the mocked semaphore channel. You can check whether this was closed
correctly using `on-close-chan` above."
[]
(let [closed-chan (a/promise-chan)]
(reify
Closeable
(close [_]
(a/>!! closed-chan ::closed)
(a/close! closed-chan))
NotifyClosed
(on-close-chan [_]
closed-chan))))
(defn- wait-for-result
"Wait up to `timeout-ms` (default 200) for a result from `chan`, or return a `::timed-out` message."
([chan]
(wait-for-result chan 200))
([chan timeout-ms]
(let [[val port] (a/alts!! [chan (a/timeout timeout-ms)])]
(if (not= port chan)
::timed-out
val))))
;; QP should run if semaphore-chan gets a permit. Permit should be closed after QP finishes.
(expect
{:result ::result, :permit-taken? true, :permit-closed? true}
(async-wait
(fn [{:keys [semaphore-chan result-chan]}]
(let [permit (permit)]
;; provide a permit async after a short delay
(a/go
(a/<! (a/timeout 10))
(a/>! semaphore-chan permit))
{:result (wait-for-result result-chan)
:permit-taken? (= (wait-for-result semaphore-chan) ::timed-out)
:permit-closed? (= (wait-for-result (on-close-chan permit)) ::closed)}))))
;; If semaphore-chan never gets a permit, then the QP should never run
(expect
{:result ::timed-out, :permit-closed? false}
(async-wait
(fn [{:keys [result-chan]}]
(let [permit (permit)]
{:result (wait-for-result result-chan)
:permit-closed? (= (wait-for-result (on-close-chan permit)) ::closed)}))))
;; if canceled-chan gets a message before permit is provided, QP should never run
(expect
{:result nil
:permit-taken? false
:permit-closed? false}
(async-wait
(fn [{:keys [result-chan semaphore-chan canceled-chan]}]
(let [permit (permit)]
(a/go
(a/<! (a/timeout 10))
(a/>! canceled-chan :canceled)
(a/<! (a/timeout 100))
(a/>! semaphore-chan permit))
{:result (wait-for-result result-chan)
:permit-taken? (= (wait-for-result semaphore-chan) ::timed-out)
:permit-closed? (= (wait-for-result (on-close-chan permit)) ::closed)}))))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment