From 6d31c7935ac6c51d62d7d25fdc8ff6c8dc0c382a Mon Sep 17 00:00:00 2001
From: Ryan Senior <ryan@metabase.com>
Date: Mon, 24 Sep 2018 11:20:33 -0500
Subject: [PATCH] Add a query throttle middleware

This will keep track of the number of concurrent queries going through
the query pipeline. By default it will allow half of the total Jetty
connections to be used for querying via the query pipeline. By default
we have 50 Jetty connections, so out of the box this number os
25. The number of query threads can be overridden vya
MB_MAX_CONCURRENT_QUERIES. Once we have reached the max number of
concurrent queries, subsequent queries will wait 5 seconds for a
slot, then return a 503.
---
 src/metabase/middleware.clj                   |   2 +-
 src/metabase/query_processor.clj              |  13 ++-
 .../middleware/add_query_throttle.clj         |  51 ++++++++
 .../middleware/catch_exceptions.clj           |  12 +-
 .../middleware/add_query_throttle_test.clj    | 110 ++++++++++++++++++
 5 files changed, 179 insertions(+), 9 deletions(-)
 create mode 100644 src/metabase/query_processor/middleware/add_query_throttle.clj
 create mode 100644 test/metabase/query_processor/middleware/add_query_throttle_test.clj

diff --git a/src/metabase/middleware.clj b/src/metabase/middleware.clj
index 04626955aaf..e039a34fc95 100644
--- a/src/metabase/middleware.clj
+++ b/src/metabase/middleware.clj
@@ -387,7 +387,7 @@
   "Convert an exception from an API endpoint into an appropriate HTTP response."
   [^Throwable e]
   (let [{:keys [status-code], :as info} (ex-data e)
-        other-info                      (dissoc info :status-code :schema)
+        other-info                      (dissoc info :status-code :schema :type)
         message                         (.getMessage e)
         body                            (cond
                                           ;; Exceptions that include a status code *and* other info are things like
diff --git a/src/metabase/query_processor.clj b/src/metabase/query_processor.clj
index 1445e3d0f77..5f40a99f3a6 100644
--- a/src/metabase/query_processor.clj
+++ b/src/metabase/query_processor.clj
@@ -32,6 +32,7 @@
              [normalize-query :as normalize]
              [parameters :as parameters]
              [permissions :as perms]
+             [add-query-throttle :as query-throttle]
              [resolve :as resolve]
              [resolve-driver :as resolve-driver]
              [results-metadata :as results-metadata]
@@ -115,6 +116,7 @@
       bind-timezone/bind-effective-timezone
       fetch-source-query/fetch-source-query
       store/initialize-store
+      query-throttle/maybe-add-query-throttle
       log-query/log-initial-query
       ;; TODO - bind *query* here ?
       cache/maybe-return-cached-results
@@ -259,10 +261,13 @@
         (assert-query-status-successful result)
         (save-and-return-successful-query! query-execution result))
       (catch Throwable e
-        (log/warn (u/format-color 'red "Query failure: %s\n%s"
-                    (.getMessage e)
-                    (u/pprint-to-str (u/filtered-stacktrace e))))
-        (save-and-return-failed-query! query-execution (.getMessage e))))))
+        (if (= (:type (ex-data e)) ::query-throttle/concurrent-query-limit-reached)
+          (throw e)
+          (do
+            (log/warn (u/format-color 'red "Query failure: %s\n%s"
+                                      (.getMessage e)
+                                      (u/pprint-to-str (u/filtered-stacktrace e))))
+            (save-and-return-failed-query! query-execution (.getMessage e))))))))
 
 ;; TODO - couldn't saving the query execution be done by MIDDLEWARE?
 (s/defn process-query-and-save-execution!
diff --git a/src/metabase/query_processor/middleware/add_query_throttle.clj b/src/metabase/query_processor/middleware/add_query_throttle.clj
new file mode 100644
index 00000000000..042904a3a6b
--- /dev/null
+++ b/src/metabase/query_processor/middleware/add_query_throttle.clj
@@ -0,0 +1,51 @@
+(ns metabase.query-processor.middleware.add-query-throttle
+  "Middleware that constraints the number of concurrent queries, rejects queries by throwing an exception and
+  returning a 503 when we exceed our capacity"
+  (:require [metabase.config :as config]
+            [puppetlabs.i18n.core :refer [tru]])
+  (:import [java.util.concurrent Semaphore TimeUnit]))
+
+(def ^:private calculate-max-queries-from-max-threads
+  (let [max-threads (or (config/config-int :mb-jetty-maxthreads) 50)]
+    (int (Math/ceil (/ max-threads 2)))))
+
+(defn- ^Semaphore create-query-semaphore []
+  (let [total-concurrent-queries (or (config/config-int :mb-max-concurrent-queries)
+                                     calculate-max-queries-from-max-threads)]
+    (Semaphore. total-concurrent-queries true)))
+
+(def ^Semaphore ^:private query-semaphore (create-query-semaphore))
+
+(defn- throw-503-unavailable
+  []
+  (throw (ex-info (tru "Max concurrent query limit reached")
+           {:type        ::concurrent-query-limit-reached
+            :status-code 503})))
+
+;; Not marking this as `const` so it can be redef'd in tests
+(def ^:private max-query-wait-time-in-millis
+  (or (config/config-int :mb-max-query-wait-time)
+      5000))
+
+(defn- throttle-queries
+  "Query middle that will throttle queries using `semaphore`. Throws 503 exceptions if there are no more slots
+  available"
+  [^Semaphore semaphore qp]
+  (fn [query]
+    ;; `tryAquire` will return `true` if it is able to get a permit, false otherwise
+    (if (.tryAcquire semaphore max-query-wait-time-in-millis TimeUnit/MILLISECONDS)
+      (try
+        (qp query)
+        (finally
+          ;; We have a permit, whether the query is successful or it failed, we must make sure that we always release
+          ;; the permit
+          (.release semaphore)))
+      ;; We were not able to get a permit without the timeout period, return a 503
+      (throw-503-unavailable))))
+
+(defn maybe-add-query-throttle
+  "Adds the query throttle middleware as not as `MB_DISABLE_QUERY_THROTTLE` hasn't been set"
+  [qp]
+  (if (config/config-bool :mb-disable-query-throttle)
+    qp
+    (throttle-queries query-semaphore qp)))
diff --git a/src/metabase/query_processor/middleware/catch_exceptions.clj b/src/metabase/query_processor/middleware/catch_exceptions.clj
index 835d7bc4035..09e903ca9ca 100644
--- a/src/metabase/query_processor/middleware/catch_exceptions.clj
+++ b/src/metabase/query_processor/middleware/catch_exceptions.clj
@@ -1,6 +1,7 @@
 (ns metabase.query-processor.middleware.catch-exceptions
   "Middleware for catching exceptions thrown by the query processor and returning them in a friendlier format."
   (:require [metabase.query-processor.middleware
+             [add-query-throttle :as query-throttle]
              [expand :as expand]
              [resolve :as resolve]
              [source-table :as source-table]]
@@ -57,9 +58,12 @@
   (fn [query]
     (try (qp query)
          (catch clojure.lang.ExceptionInfo e
-           (fail query e (when-let [data (ex-data e)]
-                           (when (= (:type data) :schema.core/error)
-                             (when-let [error (explain-schema-validation-error (:error data))]
-                               {:error error})))))
+           (let [{error :error, error-type :type, :as data} (ex-data e)]
+             ;; When we've hit our concurrent query limit, let that exception bubble up, otherwise repackage it as a failure
+             (if (=  error-type ::query-throttle/concurrent-query-limit-reached)
+               (throw e)
+               (fail query e (when-let [error-msg (and (= error-type :schema.core/error)
+                                                       (explain-schema-validation-error error))]
+                               {:error error-msg})))))
          (catch Throwable e
            (fail query e)))))
diff --git a/test/metabase/query_processor/middleware/add_query_throttle_test.clj b/test/metabase/query_processor/middleware/add_query_throttle_test.clj
new file mode 100644
index 00000000000..955f1668063
--- /dev/null
+++ b/test/metabase/query_processor/middleware/add_query_throttle_test.clj
@@ -0,0 +1,110 @@
+(ns metabase.query-processor.middleware.add-query-throttle-test
+  (:require  [expectations :refer :all]
+             [metabase.query-processor.middleware
+              [add-query-throttle :as throttle :refer :all]
+              [catch-exceptions :as catch-exceptions]]
+             [metabase.test.data :as data]
+             [metabase.util :as u])
+  (:import java.util.concurrent.Semaphore))
+
+(defmacro ^:private exception-and-message [& body]
+  `(try
+     ~@body
+     (catch Exception e#
+       {:ex-class (class e#)
+        :msg      (.getMessage e#)
+        :data     (ex-data e#)})))
+
+;; Check that the middleware will throw an exception and return a 503 if there are no tickets available in the
+;; semaphore after waiting the timeout period
+(expect
+  {:ex-class clojure.lang.ExceptionInfo
+   :msg      "Max concurrent query limit reached"
+   :data     {:status-code 503
+              :type        ::throttle/concurrent-query-limit-reached}}
+  (with-redefs [throttle/max-query-wait-time-in-seconds 1]
+    (exception-and-message
+     (let [semaphore (Semaphore. 5)]
+       (.acquire semaphore 5)
+       ((#'throttle/throttle-queries semaphore (constantly "Should never be returned")) {})))))
+
+;; The `catch-exceptions` middleware catches any query pipeline errors and reformats it as a failed query result. The
+;; 503 exception here is special and should be bubbled up
+(expect
+  {:ex-class clojure.lang.ExceptionInfo
+   :msg      "Max concurrent query limit reached"
+   :data     {:status-code 503
+              :type        ::throttle/concurrent-query-limit-reached}}
+  (with-redefs [throttle/max-query-wait-time-in-seconds 1]
+    (exception-and-message
+     (let [semaphore (Semaphore. 5)
+           my-qp     (->> identity
+                          (#'throttle/throttle-queries semaphore)
+                          catch-exceptions/catch-exceptions)]
+       (.acquire semaphore 5)
+       (my-qp {:my "query"})))))
+
+;; Test that queries are "enqueued" for the timeout period and if another slot becomes available, it is used
+(expect
+  {:before-semaphore-release ::no-result
+   :after-semaphore-release  {:query "map"}}
+  (with-redefs [throttle/max-query-wait-time-in-seconds 120]
+    (let [semaphore    (Semaphore. 5)
+          _            (.acquire semaphore 5)
+          query-future (future ((#'throttle/throttle-queries semaphore identity) {:query "map"}))]
+      {:before-semaphore-release (deref query-future 10 ::no-result)
+       :after-semaphore-release  (do
+                                   (.release semaphore)
+                                   (deref query-future 10000 ::no-result))})))
+
+;; Test that a successful query result will return the permit to the semaphore
+(expect
+  {:beinning-permits       5
+   :before-failure-permits 4
+   :query-result           {:query "map"}
+   :after-success-permits  5}
+  (with-redefs [throttle/max-query-wait-time-in-seconds 5]
+    (let [semaphore                 (Semaphore. 5)
+          start-middleware-promise  (promise)
+          finish-middleware-promise (promise)
+          begin-num-permits         (.availablePermits semaphore)
+          coordinate-then-finish    (fn [query-map]
+                                      (deliver start-middleware-promise true)
+                                      @finish-middleware-promise
+                                      query-map)
+          query-future              (future
+                                      ((#'throttle/throttle-queries semaphore coordinate-then-finish) {:query "map"}))]
+      {:beinning-permits       begin-num-permits
+       :before-failure-permits (do
+                                 @start-middleware-promise
+                                 (.availablePermits semaphore))
+       :query-result           (do
+                                 (deliver finish-middleware-promise true)
+                                 @query-future)
+       :after-success-permits  (.availablePermits semaphore)})))
+
+;; Ensure that the even if there is a failure, the permit is always released
+(expect
+  {:beinning-permits       5
+   :before-failure-permits 4
+   :after-failure-permits  5}
+  (with-redefs [throttle/max-query-wait-time-in-seconds 5]
+    (let [semaphore                 (Semaphore. 5)
+          start-middleware-promise  (promise)
+          finish-middleware-promise (promise)
+          begin-num-permits         (.availablePermits semaphore)
+          coordinate-then-fail       (fn [_]
+                                       (deliver start-middleware-promise true)
+                                       @finish-middleware-promise
+                                       (throw (Exception. "failure")))
+          query-future              (future
+                                      (u/ignore-exceptions
+                                        ((#'throttle/throttle-queries semaphore coordinate-then-fail) {:query "map"})))]
+      {:beinning-permits       begin-num-permits
+       :before-failure-permits (do
+                                 @start-middleware-promise
+                                 (.availablePermits semaphore))
+       :after-failure-permits  (do
+                                 (deliver finish-middleware-promise true)
+                                 @query-future
+                                 (.availablePermits semaphore))})))
-- 
GitLab