Skip to content
Snippets Groups Projects
Commit 6d31c793 authored by Ryan Senior's avatar Ryan Senior
Browse files

Add a query throttle middleware

This will keep track of the number of concurrent queries going through
the query pipeline. By default it will allow half of the total Jetty
connections to be used for querying via the query pipeline. By default
we have 50 Jetty connections, so out of the box this number os
25. The number of query threads can be overridden vya
MB_MAX_CONCURRENT_QUERIES. Once we have reached the max number of
concurrent queries, subsequent queries will wait 5 seconds for a
slot, then return a 503.
parent 1694be29
No related branches found
No related tags found
No related merge requests found
......@@ -387,7 +387,7 @@
"Convert an exception from an API endpoint into an appropriate HTTP response."
[^Throwable e]
(let [{:keys [status-code], :as info} (ex-data e)
other-info (dissoc info :status-code :schema)
other-info (dissoc info :status-code :schema :type)
message (.getMessage e)
body (cond
;; Exceptions that include a status code *and* other info are things like
......
......@@ -32,6 +32,7 @@
[normalize-query :as normalize]
[parameters :as parameters]
[permissions :as perms]
[add-query-throttle :as query-throttle]
[resolve :as resolve]
[resolve-driver :as resolve-driver]
[results-metadata :as results-metadata]
......@@ -115,6 +116,7 @@
bind-timezone/bind-effective-timezone
fetch-source-query/fetch-source-query
store/initialize-store
query-throttle/maybe-add-query-throttle
log-query/log-initial-query
;; TODO - bind *query* here ?
cache/maybe-return-cached-results
......@@ -259,10 +261,13 @@
(assert-query-status-successful result)
(save-and-return-successful-query! query-execution result))
(catch Throwable e
(log/warn (u/format-color 'red "Query failure: %s\n%s"
(.getMessage e)
(u/pprint-to-str (u/filtered-stacktrace e))))
(save-and-return-failed-query! query-execution (.getMessage e))))))
(if (= (:type (ex-data e)) ::query-throttle/concurrent-query-limit-reached)
(throw e)
(do
(log/warn (u/format-color 'red "Query failure: %s\n%s"
(.getMessage e)
(u/pprint-to-str (u/filtered-stacktrace e))))
(save-and-return-failed-query! query-execution (.getMessage e))))))))
;; TODO - couldn't saving the query execution be done by MIDDLEWARE?
(s/defn process-query-and-save-execution!
......
(ns metabase.query-processor.middleware.add-query-throttle
"Middleware that constraints the number of concurrent queries, rejects queries by throwing an exception and
returning a 503 when we exceed our capacity"
(:require [metabase.config :as config]
[puppetlabs.i18n.core :refer [tru]])
(:import [java.util.concurrent Semaphore TimeUnit]))
(def ^:private calculate-max-queries-from-max-threads
(let [max-threads (or (config/config-int :mb-jetty-maxthreads) 50)]
(int (Math/ceil (/ max-threads 2)))))
(defn- ^Semaphore create-query-semaphore []
(let [total-concurrent-queries (or (config/config-int :mb-max-concurrent-queries)
calculate-max-queries-from-max-threads)]
(Semaphore. total-concurrent-queries true)))
(def ^Semaphore ^:private query-semaphore (create-query-semaphore))
(defn- throw-503-unavailable
[]
(throw (ex-info (tru "Max concurrent query limit reached")
{:type ::concurrent-query-limit-reached
:status-code 503})))
;; Not marking this as `const` so it can be redef'd in tests
(def ^:private max-query-wait-time-in-millis
(or (config/config-int :mb-max-query-wait-time)
5000))
(defn- throttle-queries
"Query middle that will throttle queries using `semaphore`. Throws 503 exceptions if there are no more slots
available"
[^Semaphore semaphore qp]
(fn [query]
;; `tryAquire` will return `true` if it is able to get a permit, false otherwise
(if (.tryAcquire semaphore max-query-wait-time-in-millis TimeUnit/MILLISECONDS)
(try
(qp query)
(finally
;; We have a permit, whether the query is successful or it failed, we must make sure that we always release
;; the permit
(.release semaphore)))
;; We were not able to get a permit without the timeout period, return a 503
(throw-503-unavailable))))
(defn maybe-add-query-throttle
"Adds the query throttle middleware as not as `MB_DISABLE_QUERY_THROTTLE` hasn't been set"
[qp]
(if (config/config-bool :mb-disable-query-throttle)
qp
(throttle-queries query-semaphore qp)))
(ns metabase.query-processor.middleware.catch-exceptions
"Middleware for catching exceptions thrown by the query processor and returning them in a friendlier format."
(:require [metabase.query-processor.middleware
[add-query-throttle :as query-throttle]
[expand :as expand]
[resolve :as resolve]
[source-table :as source-table]]
......@@ -57,9 +58,12 @@
(fn [query]
(try (qp query)
(catch clojure.lang.ExceptionInfo e
(fail query e (when-let [data (ex-data e)]
(when (= (:type data) :schema.core/error)
(when-let [error (explain-schema-validation-error (:error data))]
{:error error})))))
(let [{error :error, error-type :type, :as data} (ex-data e)]
;; When we've hit our concurrent query limit, let that exception bubble up, otherwise repackage it as a failure
(if (= error-type ::query-throttle/concurrent-query-limit-reached)
(throw e)
(fail query e (when-let [error-msg (and (= error-type :schema.core/error)
(explain-schema-validation-error error))]
{:error error-msg})))))
(catch Throwable e
(fail query e)))))
(ns metabase.query-processor.middleware.add-query-throttle-test
(:require [expectations :refer :all]
[metabase.query-processor.middleware
[add-query-throttle :as throttle :refer :all]
[catch-exceptions :as catch-exceptions]]
[metabase.test.data :as data]
[metabase.util :as u])
(:import java.util.concurrent.Semaphore))
(defmacro ^:private exception-and-message [& body]
`(try
~@body
(catch Exception e#
{:ex-class (class e#)
:msg (.getMessage e#)
:data (ex-data e#)})))
;; Check that the middleware will throw an exception and return a 503 if there are no tickets available in the
;; semaphore after waiting the timeout period
(expect
{:ex-class clojure.lang.ExceptionInfo
:msg "Max concurrent query limit reached"
:data {:status-code 503
:type ::throttle/concurrent-query-limit-reached}}
(with-redefs [throttle/max-query-wait-time-in-seconds 1]
(exception-and-message
(let [semaphore (Semaphore. 5)]
(.acquire semaphore 5)
((#'throttle/throttle-queries semaphore (constantly "Should never be returned")) {})))))
;; The `catch-exceptions` middleware catches any query pipeline errors and reformats it as a failed query result. The
;; 503 exception here is special and should be bubbled up
(expect
{:ex-class clojure.lang.ExceptionInfo
:msg "Max concurrent query limit reached"
:data {:status-code 503
:type ::throttle/concurrent-query-limit-reached}}
(with-redefs [throttle/max-query-wait-time-in-seconds 1]
(exception-and-message
(let [semaphore (Semaphore. 5)
my-qp (->> identity
(#'throttle/throttle-queries semaphore)
catch-exceptions/catch-exceptions)]
(.acquire semaphore 5)
(my-qp {:my "query"})))))
;; Test that queries are "enqueued" for the timeout period and if another slot becomes available, it is used
(expect
{:before-semaphore-release ::no-result
:after-semaphore-release {:query "map"}}
(with-redefs [throttle/max-query-wait-time-in-seconds 120]
(let [semaphore (Semaphore. 5)
_ (.acquire semaphore 5)
query-future (future ((#'throttle/throttle-queries semaphore identity) {:query "map"}))]
{:before-semaphore-release (deref query-future 10 ::no-result)
:after-semaphore-release (do
(.release semaphore)
(deref query-future 10000 ::no-result))})))
;; Test that a successful query result will return the permit to the semaphore
(expect
{:beinning-permits 5
:before-failure-permits 4
:query-result {:query "map"}
:after-success-permits 5}
(with-redefs [throttle/max-query-wait-time-in-seconds 5]
(let [semaphore (Semaphore. 5)
start-middleware-promise (promise)
finish-middleware-promise (promise)
begin-num-permits (.availablePermits semaphore)
coordinate-then-finish (fn [query-map]
(deliver start-middleware-promise true)
@finish-middleware-promise
query-map)
query-future (future
((#'throttle/throttle-queries semaphore coordinate-then-finish) {:query "map"}))]
{:beinning-permits begin-num-permits
:before-failure-permits (do
@start-middleware-promise
(.availablePermits semaphore))
:query-result (do
(deliver finish-middleware-promise true)
@query-future)
:after-success-permits (.availablePermits semaphore)})))
;; Ensure that the even if there is a failure, the permit is always released
(expect
{:beinning-permits 5
:before-failure-permits 4
:after-failure-permits 5}
(with-redefs [throttle/max-query-wait-time-in-seconds 5]
(let [semaphore (Semaphore. 5)
start-middleware-promise (promise)
finish-middleware-promise (promise)
begin-num-permits (.availablePermits semaphore)
coordinate-then-fail (fn [_]
(deliver start-middleware-promise true)
@finish-middleware-promise
(throw (Exception. "failure")))
query-future (future
(u/ignore-exceptions
((#'throttle/throttle-queries semaphore coordinate-then-fail) {:query "map"})))]
{:beinning-permits begin-num-permits
:before-failure-permits (do
@start-middleware-promise
(.availablePermits semaphore))
:after-failure-permits (do
(deliver finish-middleware-promise true)
@query-future
(.availablePermits semaphore))})))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment