diff --git a/src/metabase/middleware.clj b/src/metabase/middleware.clj index 04626955aafcf08d0c01ac49343b628201a1d168..e039a34fc95e90613f5e02dcbba1ff70a854b08f 100644 --- a/src/metabase/middleware.clj +++ b/src/metabase/middleware.clj @@ -387,7 +387,7 @@ "Convert an exception from an API endpoint into an appropriate HTTP response." [^Throwable e] (let [{:keys [status-code], :as info} (ex-data e) - other-info (dissoc info :status-code :schema) + other-info (dissoc info :status-code :schema :type) message (.getMessage e) body (cond ;; Exceptions that include a status code *and* other info are things like diff --git a/src/metabase/query_processor.clj b/src/metabase/query_processor.clj index 1445e3d0f778a12bcb066f6625a93f3844a34eb1..5f40a99f3a6337ebe96a66e9faa32200e190dcec 100644 --- a/src/metabase/query_processor.clj +++ b/src/metabase/query_processor.clj @@ -32,6 +32,7 @@ [normalize-query :as normalize] [parameters :as parameters] [permissions :as perms] + [add-query-throttle :as query-throttle] [resolve :as resolve] [resolve-driver :as resolve-driver] [results-metadata :as results-metadata] @@ -115,6 +116,7 @@ bind-timezone/bind-effective-timezone fetch-source-query/fetch-source-query store/initialize-store + query-throttle/maybe-add-query-throttle log-query/log-initial-query ;; TODO - bind *query* here ? cache/maybe-return-cached-results @@ -259,10 +261,13 @@ (assert-query-status-successful result) (save-and-return-successful-query! query-execution result)) (catch Throwable e - (log/warn (u/format-color 'red "Query failure: %s\n%s" - (.getMessage e) - (u/pprint-to-str (u/filtered-stacktrace e)))) - (save-and-return-failed-query! query-execution (.getMessage e)))))) + (if (= (:type (ex-data e)) ::query-throttle/concurrent-query-limit-reached) + (throw e) + (do + (log/warn (u/format-color 'red "Query failure: %s\n%s" + (.getMessage e) + (u/pprint-to-str (u/filtered-stacktrace e)))) + (save-and-return-failed-query! query-execution (.getMessage e)))))))) ;; TODO - couldn't saving the query execution be done by MIDDLEWARE? (s/defn process-query-and-save-execution! diff --git a/src/metabase/query_processor/middleware/add_query_throttle.clj b/src/metabase/query_processor/middleware/add_query_throttle.clj new file mode 100644 index 0000000000000000000000000000000000000000..042904a3a6b76ea0a9e0516c829d95fe14ba12af --- /dev/null +++ b/src/metabase/query_processor/middleware/add_query_throttle.clj @@ -0,0 +1,51 @@ +(ns metabase.query-processor.middleware.add-query-throttle + "Middleware that constraints the number of concurrent queries, rejects queries by throwing an exception and + returning a 503 when we exceed our capacity" + (:require [metabase.config :as config] + [puppetlabs.i18n.core :refer [tru]]) + (:import [java.util.concurrent Semaphore TimeUnit])) + +(def ^:private calculate-max-queries-from-max-threads + (let [max-threads (or (config/config-int :mb-jetty-maxthreads) 50)] + (int (Math/ceil (/ max-threads 2))))) + +(defn- ^Semaphore create-query-semaphore [] + (let [total-concurrent-queries (or (config/config-int :mb-max-concurrent-queries) + calculate-max-queries-from-max-threads)] + (Semaphore. total-concurrent-queries true))) + +(def ^Semaphore ^:private query-semaphore (create-query-semaphore)) + +(defn- throw-503-unavailable + [] + (throw (ex-info (tru "Max concurrent query limit reached") + {:type ::concurrent-query-limit-reached + :status-code 503}))) + +;; Not marking this as `const` so it can be redef'd in tests +(def ^:private max-query-wait-time-in-millis + (or (config/config-int :mb-max-query-wait-time) + 5000)) + +(defn- throttle-queries + "Query middle that will throttle queries using `semaphore`. Throws 503 exceptions if there are no more slots + available" + [^Semaphore semaphore qp] + (fn [query] + ;; `tryAquire` will return `true` if it is able to get a permit, false otherwise + (if (.tryAcquire semaphore max-query-wait-time-in-millis TimeUnit/MILLISECONDS) + (try + (qp query) + (finally + ;; We have a permit, whether the query is successful or it failed, we must make sure that we always release + ;; the permit + (.release semaphore))) + ;; We were not able to get a permit without the timeout period, return a 503 + (throw-503-unavailable)))) + +(defn maybe-add-query-throttle + "Adds the query throttle middleware as not as `MB_DISABLE_QUERY_THROTTLE` hasn't been set" + [qp] + (if (config/config-bool :mb-disable-query-throttle) + qp + (throttle-queries query-semaphore qp))) diff --git a/src/metabase/query_processor/middleware/catch_exceptions.clj b/src/metabase/query_processor/middleware/catch_exceptions.clj index 835d7bc4035461ff54505cccb6f77ecea1100290..09e903ca9caf7756a9760810e8edce82f9280c44 100644 --- a/src/metabase/query_processor/middleware/catch_exceptions.clj +++ b/src/metabase/query_processor/middleware/catch_exceptions.clj @@ -1,6 +1,7 @@ (ns metabase.query-processor.middleware.catch-exceptions "Middleware for catching exceptions thrown by the query processor and returning them in a friendlier format." (:require [metabase.query-processor.middleware + [add-query-throttle :as query-throttle] [expand :as expand] [resolve :as resolve] [source-table :as source-table]] @@ -57,9 +58,12 @@ (fn [query] (try (qp query) (catch clojure.lang.ExceptionInfo e - (fail query e (when-let [data (ex-data e)] - (when (= (:type data) :schema.core/error) - (when-let [error (explain-schema-validation-error (:error data))] - {:error error}))))) + (let [{error :error, error-type :type, :as data} (ex-data e)] + ;; When we've hit our concurrent query limit, let that exception bubble up, otherwise repackage it as a failure + (if (= error-type ::query-throttle/concurrent-query-limit-reached) + (throw e) + (fail query e (when-let [error-msg (and (= error-type :schema.core/error) + (explain-schema-validation-error error))] + {:error error-msg}))))) (catch Throwable e (fail query e))))) diff --git a/test/metabase/query_processor/middleware/add_query_throttle_test.clj b/test/metabase/query_processor/middleware/add_query_throttle_test.clj new file mode 100644 index 0000000000000000000000000000000000000000..955f166806394a49dbdfa2b3d1c7bb794b7f4f30 --- /dev/null +++ b/test/metabase/query_processor/middleware/add_query_throttle_test.clj @@ -0,0 +1,110 @@ +(ns metabase.query-processor.middleware.add-query-throttle-test + (:require [expectations :refer :all] + [metabase.query-processor.middleware + [add-query-throttle :as throttle :refer :all] + [catch-exceptions :as catch-exceptions]] + [metabase.test.data :as data] + [metabase.util :as u]) + (:import java.util.concurrent.Semaphore)) + +(defmacro ^:private exception-and-message [& body] + `(try + ~@body + (catch Exception e# + {:ex-class (class e#) + :msg (.getMessage e#) + :data (ex-data e#)}))) + +;; Check that the middleware will throw an exception and return a 503 if there are no tickets available in the +;; semaphore after waiting the timeout period +(expect + {:ex-class clojure.lang.ExceptionInfo + :msg "Max concurrent query limit reached" + :data {:status-code 503 + :type ::throttle/concurrent-query-limit-reached}} + (with-redefs [throttle/max-query-wait-time-in-seconds 1] + (exception-and-message + (let [semaphore (Semaphore. 5)] + (.acquire semaphore 5) + ((#'throttle/throttle-queries semaphore (constantly "Should never be returned")) {}))))) + +;; The `catch-exceptions` middleware catches any query pipeline errors and reformats it as a failed query result. The +;; 503 exception here is special and should be bubbled up +(expect + {:ex-class clojure.lang.ExceptionInfo + :msg "Max concurrent query limit reached" + :data {:status-code 503 + :type ::throttle/concurrent-query-limit-reached}} + (with-redefs [throttle/max-query-wait-time-in-seconds 1] + (exception-and-message + (let [semaphore (Semaphore. 5) + my-qp (->> identity + (#'throttle/throttle-queries semaphore) + catch-exceptions/catch-exceptions)] + (.acquire semaphore 5) + (my-qp {:my "query"}))))) + +;; Test that queries are "enqueued" for the timeout period and if another slot becomes available, it is used +(expect + {:before-semaphore-release ::no-result + :after-semaphore-release {:query "map"}} + (with-redefs [throttle/max-query-wait-time-in-seconds 120] + (let [semaphore (Semaphore. 5) + _ (.acquire semaphore 5) + query-future (future ((#'throttle/throttle-queries semaphore identity) {:query "map"}))] + {:before-semaphore-release (deref query-future 10 ::no-result) + :after-semaphore-release (do + (.release semaphore) + (deref query-future 10000 ::no-result))}))) + +;; Test that a successful query result will return the permit to the semaphore +(expect + {:beinning-permits 5 + :before-failure-permits 4 + :query-result {:query "map"} + :after-success-permits 5} + (with-redefs [throttle/max-query-wait-time-in-seconds 5] + (let [semaphore (Semaphore. 5) + start-middleware-promise (promise) + finish-middleware-promise (promise) + begin-num-permits (.availablePermits semaphore) + coordinate-then-finish (fn [query-map] + (deliver start-middleware-promise true) + @finish-middleware-promise + query-map) + query-future (future + ((#'throttle/throttle-queries semaphore coordinate-then-finish) {:query "map"}))] + {:beinning-permits begin-num-permits + :before-failure-permits (do + @start-middleware-promise + (.availablePermits semaphore)) + :query-result (do + (deliver finish-middleware-promise true) + @query-future) + :after-success-permits (.availablePermits semaphore)}))) + +;; Ensure that the even if there is a failure, the permit is always released +(expect + {:beinning-permits 5 + :before-failure-permits 4 + :after-failure-permits 5} + (with-redefs [throttle/max-query-wait-time-in-seconds 5] + (let [semaphore (Semaphore. 5) + start-middleware-promise (promise) + finish-middleware-promise (promise) + begin-num-permits (.availablePermits semaphore) + coordinate-then-fail (fn [_] + (deliver start-middleware-promise true) + @finish-middleware-promise + (throw (Exception. "failure"))) + query-future (future + (u/ignore-exceptions + ((#'throttle/throttle-queries semaphore coordinate-then-fail) {:query "map"})))] + {:beinning-permits begin-num-permits + :before-failure-permits (do + @start-middleware-promise + (.availablePermits semaphore)) + :after-failure-permits (do + (deliver finish-middleware-promise true) + @query-future + (.availablePermits semaphore))})))