From 8d4efdd99e239606fb1031d5fb3662eb1cb8061e Mon Sep 17 00:00:00 2001 From: Cam Saul <cammsaul@gmail.com> Date: Tue, 7 May 2019 17:52:07 -0700 Subject: [PATCH] Async QP performance improvements :racing_car: --- resources/log4j.properties | 2 - src/metabase/async/semaphore_channel.clj | 128 ------------- src/metabase/core.clj | 5 + src/metabase/driver.clj | 1 + src/metabase/middleware/log.clj | 20 ++- src/metabase/models/database.clj | 12 ++ src/metabase/query_processor.clj | 5 +- .../query_processor/middleware/async.clj | 13 +- .../query_processor/middleware/async_wait.clj | 100 +++++++---- .../query_processor/middleware/cache.clj | 5 +- .../middleware/process_userland_query.clj | 4 +- src/metabase/task.clj | 1 + test/expectation_options.clj | 70 ++++---- .../metabase/async/semaphore_channel_test.clj | 169 ------------------ test/metabase/public_settings_test.clj | 19 +- .../middleware/async_wait_test.clj | 66 ------- test/metabase/test/util.clj | 2 + test/metabase/test_setup.clj | 16 +- 18 files changed, 182 insertions(+), 456 deletions(-) delete mode 100644 src/metabase/async/semaphore_channel.clj delete mode 100644 test/metabase/async/semaphore_channel_test.clj delete mode 100644 test/metabase/query_processor/middleware/async_wait_test.clj diff --git a/resources/log4j.properties b/resources/log4j.properties index ca4192baa2f..3bd60709b70 100644 --- a/resources/log4j.properties +++ b/resources/log4j.properties @@ -26,8 +26,6 @@ log4j.logger.metabase.sync=DEBUG log4j.logger.metabase.models.field-values=INFO # TODO - we can dial these back a bit once we are satisfied the async stuff isn't so new (0.33.0+) -log4j.logger.metabase.async.api-response=DEBUG -log4j.logger.metabase.async.semaphore-channel=DEBUG log4j.logger.metabase.async.util=DEBUG log4j.logger.metabase.middleware.async=DEBUG log4j.logger.metabase.query-processor.async=DEBUG diff --git a/src/metabase/async/semaphore_channel.clj b/src/metabase/async/semaphore_channel.clj deleted file mode 100644 index af6a6907fdf..00000000000 --- a/src/metabase/async/semaphore_channel.clj +++ /dev/null @@ -1,128 +0,0 @@ -(ns metabase.async.semaphore-channel - (:require [clojure.core.async :as a] - [clojure.tools.logging :as log] - [metabase.async.util :as async.u] - [metabase.util.i18n :refer [trs]] - [schema.core :as s]) - (:import java.io.Closeable - java.util.concurrent.Semaphore)) - -(defn- permit-handle - "Object that can holds on to a permit for a Semaphore. Can be closed with `.close`, and thus, used with `with-open`; - also returns the permit upon finalization if not already returned." - ^Closeable [^Semaphore semaphore, id] - (let [closed? (atom false) - close! (fn [] - (when (compare-and-set! closed? false true) - (.release semaphore)))] - (reify - Object - (toString [_] - (format "Permit #%d" id)) ; ID is a simple per-channel counter mainly for debugging purposes - (finalize [_] - (close!)) - Closeable - (close [_] - (close!))))) - -(defn- notifying-semaphore - "When a permit is released via Semaphore.release() we'll send a message to the `notify-released-chan`. This is a - signal to the go-loop in the code below below to resume and try to acquire more permits from the semaphore." - ^Semaphore [num-permits notify-released-chan] - (proxy [Semaphore] [num-permits] - (release [] - ;; Release the permit ASAP. (Add tag to proxy anaphor `this`, otherwise we get reflection warnings) - (let [^Semaphore this this] - (proxy-super release)) - ;; Then send the message right away to let the go-loop know a permit is available. - (a/>!! notify-released-chan ::released)))) - -(defn semaphore-channel - "Creates a core.async channel that manages a counting Semaphore with `num-permits`. Takes from this channel will block - until a permit is available; the object taken is a special 'permit handle' that implements `Closeable`; hold on to - it with `with-open` or close it with `.close` to return the permit when finished with it." - [^Integer num-permits] - (let [permits-chan (a/chan) - ;; We only need one such 'release' notification at any given moment to let the loop know to resume so we can - ;; go ahead and make this channel a dropping buffer that will drop any additional messages. - notify-released-chan (a/chan (a/dropping-buffer 1)) - semaphore (notifying-semaphore num-permits notify-released-chan)] - ;; start the loop that will deliver permits - (a/go-loop [next-id 1] - (if (.tryAcquire semaphore) - ;; If the semaphore has a permit available right away, send a new `permit-handle` to `permits-chan`. Since - ;; that channel has no buffer this loop will park until someone is there to take it. Recur unless the - ;; permits-chan is closed. - (if (a/>! permits-chan (permit-handle semaphore next-id)) - (recur (inc next-id)) - (a/close! notify-released-chan)) - ;; Otherwise if no permit is available, wait for a notification on `notify-released-chan`, then recur and try - ;; again, unless channel is closed - (when (a/<! notify-released-chan) - (recur next-id)))) - ;; return a channel to get permits on - permits-chan)) - - -;;; ------------------------------------------- do-after-receiving-permit -------------------------------------------- - -(def ^:private ^:dynamic *permits* - "Map of semaphore channel -> obtained permit for the current and child thread[s]. Used so we can skip obtaining a - second permit if this thread already has one." - {}) - -(s/defn ^:private do-with-existing-permit-for-current-thread :- (s/maybe async.u/PromiseChan) - [semaphore-chan f & args] - (when (get *permits* semaphore-chan) - (log/debug (trs "Current thread already has a permit, will not wait to acquire another")) - (apply async.u/do-on-separate-thread f args))) - -(s/defn ^:private do-with-permit :- async.u/PromiseChan - [semaphore-chan, permit :- Closeable, f & args] - (binding [*permits* (assoc *permits* semaphore-chan permit)] - (let [out-chan (apply async.u/do-on-separate-thread f args)] - ;; whenever `out-chan` closes return the permit - (a/go - (a/<! out-chan) - (.close permit)) - out-chan))) - -(s/defn ^:private do-with-immediately-available-permit :- (s/maybe async.u/PromiseChan) - [semaphore-chan f & args] - (when-let [^Closeable permit (a/poll! semaphore-chan)] - (log/debug (trs "Permit available without waiting, will run fn immediately")) - (apply do-with-permit semaphore-chan permit f args))) - -(s/defn ^:private do-after-waiting-for-new-permit :- async.u/PromiseChan - [semaphore-chan f & args] - (let [out-chan (a/promise-chan)] - ;; fire off a go block to wait for a permit. - (a/go - (let [[permit first-done] (a/alts! [semaphore-chan out-chan])] - (cond - ;; If out-chan closes before we get a permit, there's nothing for us to do here. - (= first-done out-chan) - (log/debug (trs "Not running pending function call: output channel already closed.")) - - ;; if `semaphore-chan` is closed for one reason or another we'll never get a permit so log a warning and - ;; close the output channel - (not permit) - (do - (log/warn (trs "Warning: semaphore-channel is closed, will not run pending function call")) - (a/close! out-chan)) - - ;; otherwise we got a permit and chan run f with it now. - permit - (async.u/single-value-pipe (apply do-with-permit semaphore-chan permit f args) out-chan)))) - ;; return `out-chan` which can be used to wait for results - out-chan)) - -(s/defn do-after-receiving-permit :- async.u/PromiseChan - "Run `(apply f args)` asynchronously after receiving a permit from `semaphore-chan`. Returns a channel from which you - can fetch the results. Closing this channel before results are produced will cancel the function call." - {:style/indent 1} - [semaphore-chan f & args] - (some #(apply % semaphore-chan f args) - [do-with-existing-permit-for-current-thread - do-with-immediately-available-permit - do-after-waiting-for-new-permit])) diff --git a/src/metabase/core.clj b/src/metabase/core.clj index 43cef9ec9bd..08c53bda147 100644 --- a/src/metabase/core.clj +++ b/src/metabase/core.clj @@ -19,6 +19,7 @@ [metabase.models [setting :as setting] [user :refer [User]]] + [metabase.query-processor.middleware.async-wait :as qp.middleware.async-wait] [metabase.util.i18n :refer [set-locale trs]] [toucan.db :as db])) @@ -44,7 +45,11 @@ "General application shutdown function which should be called once at application shuddown." [] (log/info (trs "Metabase Shutting Down ...")) + ;; TODO - it would really be much nicer if we implemented a basic notification system so these things could listen + ;; to a Shutdown hook of some sort instead of having here (task/stop-scheduler!) + (server/stop-web-server!) + (qp.middleware.async-wait/destroy-all-thread-pools!) (log/info (trs "Metabase Shutdown COMPLETE"))) diff --git a/src/metabase/driver.clj b/src/metabase/driver.clj index 76446b02df0..34ab3bd54aa 100644 --- a/src/metabase/driver.clj +++ b/src/metabase/driver.clj @@ -619,6 +619,7 @@ query) +;; TODO - we should just have some sort of `core.async` channel to handle DB update notifications instead (defmulti notify-database-updated "Notify the driver that the attributes of a `database` have changed, or that `database was deleted. This is specifically relevant in the event that the driver was doing some caching or connection pooling; the driver should diff --git a/src/metabase/middleware/log.clj b/src/metabase/middleware/log.clj index 2bcb32d3739..ea4a0f8ce80 100644 --- a/src/metabase/middleware/log.clj +++ b/src/metabase/middleware/log.clj @@ -8,6 +8,7 @@ [util :as u]] [metabase.async.util :as async.u] [metabase.middleware.util :as middleware.u] + [metabase.query-processor.middleware.async :as qp.middleware.async] [metabase.util [date :as du] [i18n :refer [trs]]] @@ -40,15 +41,16 @@ (format "%s (%d DB calls)" elapsed-time db-calls))) (defn- format-threads-info [{:keys [include-stats?]}] - (str - (when-let [^QueuedThreadPool pool (some-> (server/instance) .getThreadPool)] - (format "Jetty threads: %s/%s (%s busy, %s idle, %s queued) " - (.getMinThreads pool) - (.getMaxThreads pool) - (.getBusyThreads pool) - (.getIdleThreads pool) - (.getQueueSize pool))) - (format "(%d total active threads)" (Thread/activeCount)))) + (when include-stats? + (str + (when-let [^QueuedThreadPool pool (some-> (server/instance) .getThreadPool)] + (format "Jetty threads: %s/%s (%s idle, %s queued) " + (.getBusyThreads pool) + (.getMaxThreads pool) + (.getIdleThreads pool) + (.getQueueSize pool))) + (format "(%d total active threads) " (Thread/activeCount)) + (format "Queries in flight: %d" (qp.middleware.async/in-flight))))) (defn- format-error-info [{{:keys [body]} :response} {:keys [error?]}] (when (and error? diff --git a/src/metabase/models/database.clj b/src/metabase/models/database.clj index bbcfd52aa40..aae5164b843 100644 --- a/src/metabase/models/database.clj +++ b/src/metabase/models/database.clj @@ -51,6 +51,9 @@ (catch Throwable e (log/error e (trs "Error scheduling tasks for DB"))))) +;; TODO - something like NSNotificationCenter in Objective-C would be really really useful here so things that want to +;; implement behavior when an object is deleted can do it without having to put code here + (defn- unschedule-tasks! "Unschedule any currently pending sync operation tasks for `database`." [database] @@ -60,6 +63,14 @@ (catch Throwable e (log/error e (trs "Error unscheduling tasks for DB."))))) +(defn- destroy-qp-thread-pool! + [database] + (try + (require 'metabase.query-processor.middleware.async-wait) + ((resolve 'metabase.query-processor.middleware.async-wait/destroy-thread-pool!) database) + (catch Throwable e + (log/error e (trs "Error destroying thread pool for DB."))))) + (defn- post-insert [database] (u/prog1 database ;; add this database to the All Users permissions groups @@ -74,6 +85,7 @@ (defn- pre-delete [{id :id, driver :engine, :as database}] (unschedule-tasks! database) + (destroy-qp-thread-pool! database) (db/delete! 'Card :database_id id) (db/delete! 'Permissions :object [:like (str (perms/object-path id) "%")]) (db/delete! 'Table :db_id id) diff --git a/src/metabase/query_processor.clj b/src/metabase/query_processor.clj index ee449bd2981..60fcc95a649 100644 --- a/src/metabase/query_processor.clj +++ b/src/metabase/query_processor.clj @@ -140,9 +140,12 @@ ;; All middleware above this point is written in the synchronous 1-arg style. All middleware below is written in ;; async 4-arg style. Eventually the entire QP middleware stack will be rewritten in the async style. But not yet ;; + ;; TODO - `async-wait` should be moved way up the stack, at least after the DB is resolved, right now for nested + ;; queries it creates a thread pool for the nested query placeholder DB ID + ;; ;; ▼▼▼ ASYNC MIDDLEWARE ▼▼▼ async/async->sync - async-wait/wait-for-permit + async-wait/wait-for-turn cache/maybe-return-cached-results validate/validate-query normalize/normalize diff --git a/src/metabase/query_processor/middleware/async.clj b/src/metabase/query_processor/middleware/async.clj index 65357e75c41..1eda9a895ab 100644 --- a/src/metabase/query_processor/middleware/async.clj +++ b/src/metabase/query_processor/middleware/async.clj @@ -16,7 +16,7 @@ (if (a/poll! canceled-chan) (log/debug (trs "Request already canceled, will not run synchronous QP code.")) (try - (respond (qp query)) + (some-> (qp query) respond) (catch Throwable e (raise e)))))) @@ -45,11 +45,22 @@ (log/warn e (trs "Unhandled exception, exepected `catch-exceptions` middleware to handle it.")) (respond e))))) +(def ^:private in-flight* (atom 0)) + +(defn in-flight + "Return the number of queries currently in flight." + [] + @in-flight*) + (defn- async-args [] (let [out-chan (a/promise-chan) canceled-chan (async.u/promise-canceled-chan out-chan) respond (respond-fn out-chan canceled-chan) raise (raise-fn out-chan respond)] + (swap! in-flight* inc) + (a/go + (a/<! canceled-chan) + (swap! in-flight* dec)) {:out-chan out-chan, :canceled-chan canceled-chan, :respond respond, :raise raise})) (defn- wait-for-result [out-chan] diff --git a/src/metabase/query_processor/middleware/async_wait.clj b/src/metabase/query_processor/middleware/async_wait.clj index 5370400d189..6d6666a3161 100644 --- a/src/metabase/query_processor/middleware/async_wait.clj +++ b/src/metabase/query_processor/middleware/async_wait.clj @@ -5,51 +5,85 @@ additional queries will park the thread. Super-useful for writing high-performance API endpoints. Prefer these methods to the old-school synchronous versions. - How is this achieved? For each Database, we'll maintain a channel that acts as a counting semaphore; the channel - will initially contain 15 permits. Each incoming request will asynchronously read from the channel until it acquires - a permit, then put it back when it finishes." + How is this achieved? For each Database, we'll maintain a thread pool executor to limit the number of simultaneous + queries." (:require [clojure.core.async :as a] - [metabase.async.semaphore-channel :as semaphore-channel] + [clojure.tools.logging :as log] [metabase.models.setting :refer [defsetting]] [metabase.util :as u] - [metabase.util.i18n :refer [trs]])) + [metabase.util.i18n :refer [trs]] + [schema.core :as s]) + (:import [java.util.concurrent Executors ExecutorService])) (defsetting max-simultaneous-queries-per-db (trs "Maximum number of simultaneous queries to allow per connected Database.") :type :integer :default 15) -(defonce ^:private db-semaphore-channels (atom {})) +(defonce ^:private db-thread-pools (atom {})) -(defn- fetch-db-semaphore-channel - "Fetch the counting semaphore channel for a Database, creating it if not already created." +;; easier just to use a lock for creating the pools rather than using `swap-vals!` and having to nuke a bunch of +;; thread pools that ultimately don't get used +(defonce ^:private db-thread-pool-lock (Object.)) + +(s/defn ^:private db-thread-pool :- ExecutorService [database-or-id] (let [id (u/get-id database-or-id)] (or - ;; channel already exists - (@db-semaphore-channels id) - ;; channel does not exist, Create a channel and stick it in the atom - (let [ch (semaphore-channel/semaphore-channel (max-simultaneous-queries-per-db)) - new-ch ((swap! db-semaphore-channels update id #(or % ch)) id)] - ;; ok, if the value swapped into the atom was a different channel (another thread beat us to it) then close our - ;; newly created channel - (when-not (= ch new-ch) - (a/close! ch)) - ;; return the newly created channel - new-ch)))) - -(defn wait-for-permit - "Middleware that throttles the number of concurrent queries for each connected database, parking the thread until a - permit becomes available." - [qp] - (fn [{database-id :database, :as query} respond raise canceled-chan] - (let [semaphore-chan (fetch-db-semaphore-channel database-id) - output-chan (semaphore-channel/do-after-receiving-permit semaphore-chan - qp query respond raise canceled-chan)] - (a/go - (respond (a/<! output-chan)) - (a/close! output-chan)) + (@db-thread-pools id) + (locking db-thread-pool-lock + (or + (@db-thread-pools id) + (log/debug (trs "Creating new query thread pool for Database {0}" id)) + (let [new-pool (Executors/newFixedThreadPool (max-simultaneous-queries-per-db))] + (swap! db-thread-pools assoc id new-pool) + new-pool)))))) + +(defn destroy-thread-pool! + "Destroy the QP thread pool for a Database (done automatically when DB is deleted.)" + [database-or-id] + (let [id (u/get-id database-or-id)] + (let [[{^ExecutorService thread-pool id}] (locking db-thread-pool-lock + (swap-vals! db-thread-pools dissoc id))] + (when thread-pool + (log/debug (trs "Destroying query thread pool for Database {0}" id)) + (.shutdownNow thread-pool))))) + +(defn destroy-all-thread-pools! + "Destroy all QP thread pools (done on shutdown)." + [] + (locking db-thread-pool-lock + (let [[old] (reset-vals! db-thread-pools nil)] + (doseq [^ExecutorService pool (vals old)] + (.shutdownNow pool))))) + +(def ^:private ^:dynamic *already-in-thread-pool?* false) + +(defn- runnable ^Runnable [qp query respond raise canceled-chan] + (fn [] + (binding [*already-in-thread-pool?* true] + (try + (qp query respond raise canceled-chan) + (catch Throwable e + (raise e)))))) + +(defn- run-in-thread-pool [qp {database-id :database, :as query} respond raise canceled-chan] + (try + (let [pool (db-thread-pool database-id) + futur (.submit pool (runnable qp query respond raise canceled-chan))] (a/go (when (a/<! canceled-chan) - (a/close! output-chan))) - nil))) + (log/debug (trs "Request canceled, canceling pending query")) + (future-cancel futur)))) + (catch Throwable e + (raise e))) + nil) + +(defn wait-for-turn + "Middleware that throttles the number of concurrent queries for each connected database, parking the thread until it + is allowed to run." + [qp] + (fn [{database-id :database, :as query} respond raise canceled-chan] + (if *already-in-thread-pool?* + (qp query respond raise canceled-chan) + (run-in-thread-pool qp query respond raise canceled-chan)))) diff --git a/src/metabase/query_processor/middleware/cache.clj b/src/metabase/query_processor/middleware/cache.clj index ecbb6a41bdd..3923418bfbf 100644 --- a/src/metabase/query_processor/middleware/cache.clj +++ b/src/metabase/query_processor/middleware/cache.clj @@ -106,8 +106,9 @@ (respond cached-results) (let [start-time (System/currentTimeMillis) respond (fn [results] - (save-results-if-successful! query-hash start-time results) - (respond results))] + (when results + (save-results-if-successful! query-hash start-time results) + (respond results)))] (qp query respond raise canceled-chan))))) (defn maybe-return-cached-results diff --git a/src/metabase/query_processor/middleware/process_userland_query.clj b/src/metabase/query_processor/middleware/process_userland_query.clj index 7f8af0db2e7..55a4415899d 100644 --- a/src/metabase/query_processor/middleware/process_userland_query.clj +++ b/src/metabase/query_processor/middleware/process_userland_query.clj @@ -94,7 +94,9 @@ (raise (Exception. (str (trs "Unexpected nil response from query processor.")))) (not status) - (raise (Exception. (str (tru "Invalid response from database driver. No :status provided.") result))) + (raise (Exception. (str (tru "Invalid response from database driver. No :status provided.") + " " + result))) ;; if query has been cancelled no need to save QueryExecution (or should we?) and no point formatting anything to ;; be returned since it won't be returned diff --git a/src/metabase/task.clj b/src/metabase/task.clj index 0bf425489f5..62b45fb451a 100644 --- a/src/metabase/task.clj +++ b/src/metabase/task.clj @@ -134,6 +134,7 @@ (defn start-scheduler! "Start our Quartzite scheduler which allows jobs to be submitted and triggers to begin executing." [] + (classloader/the-classloader) (when-not @quartz-scheduler (set-jdbc-backend-properties!) (let [new-scheduler (qs/initialize)] diff --git a/test/expectation_options.clj b/test/expectation_options.clj index 635fa87d7c3..2c71ec1abd4 100644 --- a/test/expectation_options.clj +++ b/test/expectation_options.clj @@ -4,7 +4,9 @@ [data :as data] [set :as set]] [expectations :as expectations] - [metabase.util :as u])) + [metabase.util :as u] + [metabase.util.date :as du]) + (:import java.util.concurrent.TimeoutException)) ;;; ---------------------------------------- Expectations Framework Settings ----------------------------------------- @@ -51,20 +53,21 @@ ;;; ---------------------------------------------- check-for-slow-tests ---------------------------------------------- -(def ^:private slow-test-threshold-nanoseconds - "Any test that takes longer than this time should be considered slow, and we should print a little notice about it!" - (* 10 1000 1000 1000)) ; 10 seconds +(def ^:private test-timeout-ms (* 60 1000)) -(defn- check-for-slow-tests [test-fn] - (fn [] - (let [start-time-ns (System/nanoTime) - result (test-fn) - total-time (- (System/nanoTime) start-time-ns)] - ;; TODO - we should also have a test timeout if it takes over 5 minutes or something like that - (when (> total-time slow-test-threshold-nanoseconds) - (println (let [{:keys [file line]} (-> test-fn meta :the-var meta)] - (format "Test %s:%s is SLOW: took %.1f seconds." file line (/ total-time (* 1000.0 1000.0 1000.0)))))) - result))) +(defn- check-for-slow-tests + "If any test takes longer that 60 seconds to run return a TimeoutException, effectively failing the test." + [run] + (fn [test-fn] + (deref + (future + (try + (run test-fn) + (catch Throwable e + e))) + test-timeout-ms + ;; return Exception than throwing, otherwise it will mess up our test running + (TimeoutException. (format "Test timed out after %s" (du/format-milliseconds test-timeout-ms)))))) ;;; ---------------------------------------------- check-table-cleanup ----------------------------------------------- @@ -86,17 +89,17 @@ " of '" (:name model) "' that " (if more-than-one? "were" "was") " not cleaned up."))) -(defn- check-table-cleanup +(def ^{:arglists '([run])} check-table-cleanup "Function that will run around each test. This function is usually a noop, but it useful for helping to debug stale data in local development. Modify the private `models-to-check` var to check if there are any rows in the given model's table after each expectation. If a row is found, the relevant information will be written to standard out and the test run will exit" - [test-fn] - (fn [] - (let [result (test-fn)] - ;; The typical case is no models-to-check, this then becomes a noop - (when (seq models-to-check) - (let [{:keys [file line]} (-> test-fn meta :the-var meta) + (if-not (seq models-to-check) + identity + (fn [run] + (fn [test-fn] + (let [result (run test-fn) + {:keys [file line]} (-> test-fn meta :the-var meta) error-msgs (tables-with-data->error-msg models-to-check)] (when (seq error-msgs) (println "\n-----------------------------------------------------") @@ -107,17 +110,22 @@ (flush) ;; I found this necessary as throwing an exception would show the exception, but the test run would hang and ;; you'd have to Ctrl-C anyway - (System/exit 1)))) - result))) + (System/exit 1)) + result))))) ;;; -------------------------------------------- Putting it all together --------------------------------------------- -(def ^:private ^{:arglists '([test-fn])} do-with-test-middleware - (comp check-table-cleanup check-for-slow-tests)) - -(defn- in-context - "Run an expectations `test-fn` with the test middleware functions above wrapping it." - [test-fn] - {:expectations-options :in-context} - ((do-with-test-middleware test-fn))) +(defn- log-tests [run] + (fn [test-fn] + (let [{:keys [file line]} (-> test-fn meta :the-var meta)] + (println (format "Run %s %s" file line))) + (run test-fn))) + +(def ^:private ^{:expectations-options :in-context} test-middleware + (-> (fn [test-fn] + (test-fn)) + ;; uncomment `log-tests` if you need to debug tests or see which ones are being noisy + #_log-tests + check-for-slow-tests + check-table-cleanup)) diff --git a/test/metabase/async/semaphore_channel_test.clj b/test/metabase/async/semaphore_channel_test.clj deleted file mode 100644 index d5003628482..00000000000 --- a/test/metabase/async/semaphore_channel_test.clj +++ /dev/null @@ -1,169 +0,0 @@ -(ns metabase.async.semaphore-channel-test - (:require [clojure.core.async :as a] - [expectations :refer [expect]] - [metabase.async.semaphore-channel :as semaphore-channel] - [metabase.test.util.async :as tu.async]) - (:import java.io.Closeable)) - -(defn- get-permits [semaphore-chan n] - (loop [acc [], n n] - (if-not (pos? n) - acc - (let [[permit] (a/alts!! [semaphore-chan (a/timeout 100)])] - (assert permit) - (recur (conj acc permit) (dec n)))))) - -;; check that a semaphore channel only gives out the correct number of permits -(expect - nil - (tu.async/with-open-channels [semaphore-chan (semaphore-channel/semaphore-channel 3)] - (let [permits (get-permits semaphore-chan 3) - response (first (a/alts!! [semaphore-chan (a/timeout 100)]))] - ;; make sure we're actually doint something with the permits after we get `response`, otherwise there's a very - ;; small chance they'll get garbage collected and `alts!!` will actually manage to get a permit - (count permits) - response))) - -;; check that when a permit is returned, whoever was waiting will get their permit -(expect - "Permit #4" - (tu.async/with-open-channels [semaphore-chan (semaphore-channel/semaphore-channel 3)] - (let [[^Closeable permit-1] (get-permits semaphore-chan 3)] - (.close permit-1) - (some-> (first (a/alts!! [semaphore-chan (a/timeout 100)])) str)))) - -;; if we are true knuckleheads and *lose* a permit it should eventually get garbage collected and returned to the pool -(expect - "Permit #4" - (tu.async/with-open-channels [semaphore-chan (semaphore-channel/semaphore-channel 3)] - (get-permits semaphore-chan 3) - (loop [tries 10] - (System/gc) - (or - (some-> (a/alts!! [semaphore-chan (a/timeout 200)]) first str) - (when (pos? tries) - (recur (dec tries))))))) - - -;;; ------------------------------------------- do-after-receiving-permit -------------------------------------------- - -;; If we already have a permit, code should be smart enough to skip getting another one -(expect - {:first-permit "Permit #1", :second-permit "Permit #1", :same? true} - (tu.async/with-open-channels [semaphore-chan (semaphore-channel/semaphore-channel 1) - output-chan (a/promise-chan)] - (let [existing-permit #(get @#'semaphore-channel/*permits* semaphore-chan)] - (semaphore-channel/do-after-receiving-permit semaphore-chan - (fn [] - (let [first-permit (existing-permit)] - (semaphore-channel/do-after-receiving-permit semaphore-chan - (fn [] - (let [second-permit (existing-permit)] - (a/>!! output-chan {:first-permit (str first-permit) - :second-permit (str second-permit) - :same? (identical? first-permit second-permit)})))))))) - (tu.async/wait-for-result output-chan))) - -;; Make sure `do-with-permit` returns the permit when functions finish normally -(expect - {:permit-returned? true, :result ::value} - (let [permit (tu.async/permit)] - (tu.async/with-open-channels [semaphore-chan (a/chan 1) - output-chan (#'semaphore-channel/do-with-permit - semaphore-chan - permit - (constantly ::value))] - {:permit-returned? (tu.async/permit-closed? permit) - :result (tu.async/wait-for-result output-chan)}))) - -;; If `f` throws an Exception, `permit` should get returned, and Exception should get returned as the result -(expect - {:permit-returned? true, :result "FAIL"} - (let [permit (tu.async/permit)] - (tu.async/with-open-channels [semaphore-chan (a/chan 1) - output-chan (#'semaphore-channel/do-with-permit - semaphore-chan - permit - (fn [] (throw (Exception. "FAIL"))))] - {:permit-returned? (tu.async/permit-closed? permit) - :result (let [result (tu.async/wait-for-result output-chan)] - (if (instance? Throwable result) - (.getMessage ^Throwable result) - result))}))) - -;; If `output-chan` is closed early, permit should still get returned, but there's nowhere to write the result to so -;; it should be `nil` -(expect - {:permit-returned? true, :result nil} - (let [permit (tu.async/permit)] - (tu.async/with-open-channels [semaphore-chan (a/chan 1) - output-chan (#'semaphore-channel/do-with-permit - semaphore-chan - permit - (fn [] - (Thread/sleep 100) - ::value))] - (a/close! output-chan) - {:permit-returned? (tu.async/permit-closed? permit) - :result (tu.async/wait-for-result output-chan)}))) - - -;;; +----------------------------------------------------------------------------------------------------------------+ -;;; | Tests for the new 0.32.5 optimizations that avoid async waits when permits are immediately available | -;;; +----------------------------------------------------------------------------------------------------------------+ - -;; there are basically 3 strategies that can be used by `do-after-receiving-permit` -;; 1) run immediately, because permit is already present -;; 2) run immediately, because permit is immediately available -;; 3) run after waiting for permit - -;; to check that everything works correctly in a few different scenarios, rather than right 3 x n tests, largely -;; repeating ourselves, we'll break things out into small functions that can be combined to pick + choose the -;; functionality to test with a given strategy. - -(defn- do-semaphore-chan-fn [thunk-fn strategy-fn] - (tu.async/with-open-channels [semaphore-chan (a/chan 1)] - (strategy-fn semaphore-chan (thunk-fn (partial #'semaphore-channel/do-after-receiving-permit semaphore-chan))))) - -(defn- with-existing-permit [semaphore-chan thunk] - (binding [semaphore-channel/*permits* {semaphore-chan (tu.async/permit)}] - (thunk))) - -(defn- with-immediately-available-permit [semaphore-chan thunk] - (a/>!! semaphore-chan (tu.async/permit)) - (thunk)) - -(defn- after-waiting [semaphore-chan thunk] - (a/go - (a/<! (a/timeout 50)) - (a/>! semaphore-chan (tu.async/permit))) - (thunk)) - -;; test normal functions work correctly -(defn- normal-fn [do-f] - (fn [] - (tu.async/wait-for-result - (do-f (partial +) 1 2 3)))) - -(expect 6 (do-semaphore-chan-fn normal-fn with-existing-permit)) -(expect 6 (do-semaphore-chan-fn normal-fn with-immediately-available-permit)) -(expect 6 (do-semaphore-chan-fn normal-fn after-waiting)) - -;; Test that if output channel is closed, function gets interrupted -(defn- check-interrupted-fn [do-f] - (fn [] - (let [f (fn [chan] - (try - (Thread/sleep 1000) - (catch InterruptedException e - (a/>!! chan ::interrupted))))] - (tu.async/with-open-channels [interrupted-chan (a/promise-chan) - out-chan (do-f f interrupted-chan)] - (a/go - (a/<! (a/timeout 100)) - (a/close! out-chan)) - (tu.async/wait-for-result interrupted-chan 500))))) - -(expect ::interrupted (do-semaphore-chan-fn check-interrupted-fn with-existing-permit)) -(expect ::interrupted (do-semaphore-chan-fn check-interrupted-fn with-immediately-available-permit)) -(expect ::interrupted (do-semaphore-chan-fn check-interrupted-fn after-waiting)) diff --git a/test/metabase/public_settings_test.clj b/test/metabase/public_settings_test.clj index e87b7f2565c..8e433e7e23f 100644 --- a/test/metabase/public_settings_test.clj +++ b/test/metabase/public_settings_test.clj @@ -4,6 +4,7 @@ [metabase.models.setting :as setting] [metabase.public-settings :as public-settings] [metabase.test.util :as tu] + [metabase.test.util.log :as tu.log] [puppetlabs.i18n.core :as i18n :refer [tru]])) ;; double-check that setting the `site-url` setting will automatically strip off trailing slashes @@ -53,10 +54,11 @@ ;; if `site-url` in the database is invalid, the getter for `site-url` should return `nil` (#9849) (expect {:get-string "https://www.camsaul.x", :site-url nil} - (tu/with-temporary-setting-values [site-url "https://metabase.com"] - (setting/set-string! :site-url "https://www.camsaul.x") - {:get-string (setting/get-string :site-url) - :site-url (public-settings/site-url)})) + (tu.log/suppress-output + (tu/with-temporary-setting-values [site-url "https://metabase.com"] + (setting/set-string! :site-url "https://www.camsaul.x") + {:get-string (setting/get-string :site-url) + :site-url (public-settings/site-url)}))) ;; We should normalize `site-url` when set via env var we should still normalize it (#9764) (expect @@ -69,10 +71,11 @@ ;; if `site-url` is set via an env var, and it's invalid, we should return `nil` rather than having the whole instance break (expect {:get-string "asd_12w31%$;", :site-url nil} - (with-redefs [env/env (assoc env/env :mb-site-url "asd_12w31%$;")] - (tu/with-temporary-setting-values [site-url nil] - {:get-string (setting/get-string :site-url) - :site-url (public-settings/site-url)}))) + (tu.log/suppress-output + (with-redefs [env/env (assoc env/env :mb-site-url "asd_12w31%$;")] + (tu/with-temporary-setting-values [site-url nil] + {:get-string (setting/get-string :site-url) + :site-url (public-settings/site-url)})))) (expect "HOST" diff --git a/test/metabase/query_processor/middleware/async_wait_test.clj b/test/metabase/query_processor/middleware/async_wait_test.clj deleted file mode 100644 index ab95e4e3b1c..00000000000 --- a/test/metabase/query_processor/middleware/async_wait_test.clj +++ /dev/null @@ -1,66 +0,0 @@ -(ns metabase.query-processor.middleware.async-wait-test - (:require [clojure.core.async :as a] - [expectations :refer [expect]] - [metabase.query-processor.middleware.async-wait :as async-wait] - [metabase.test.util.async :as tu.async])) - -(defn- async-wait - "Mocked version of `async-wait/wait-for-permit` middleware. Runs `f` with 3 channels: - - * `result-chan` -- a channel will receive the result iff the rest of the QP pipeline is invoked - * `semaphore-chan` -- a mocked semapahore channel that `wait-for-permit` will wait to receive a permit from - * `canceled-chan` -- a channel you can pass a cancellation message to to simulate request cancelation" - [f] - (tu.async/with-open-channels [result-chan (a/promise-chan) - semaphore-chan (a/chan 15) - canceled-chan (a/promise-chan)] - (let [qp (fn [_ respond _ _] - (respond ::result))] - (with-redefs [async-wait/fetch-db-semaphore-channel (constantly semaphore-chan)] - (let [query {} - respond (fn [result] - (when result - (a/>!! result-chan result)) - (a/close! result-chan))] - ((async-wait/wait-for-permit qp) query respond respond canceled-chan)) - (f {:result-chan result-chan, :semaphore-chan semaphore-chan, :canceled-chan canceled-chan}))))) - -;; QP should run if semaphore-chan gets a permit. Permit should be closed after QP finishes. -(expect - {:result ::result, :permit-taken? true, :permit-closed? true} - (async-wait - (fn [{:keys [semaphore-chan result-chan]}] - (let [permit (tu.async/permit)] - ;; provide a permit async after a short delay - (a/go - (a/<! (a/timeout 10)) - (a/>! semaphore-chan permit)) - {:result (tu.async/wait-for-result result-chan) - :permit-taken? (= (tu.async/wait-for-result semaphore-chan) ::tu.async/timed-out) - :permit-closed? (tu.async/permit-closed? permit)})))) - -;; If semaphore-chan never gets a permit, then the QP should never run -(expect - {:result ::tu.async/timed-out, :permit-closed? false} - (async-wait - (fn [{:keys [result-chan]}] - (let [permit (tu.async/permit)] - {:result (tu.async/wait-for-result result-chan) - :permit-closed? (tu.async/permit-closed? permit)})))) - -;; if canceled-chan gets a message before permit is provided, QP should never run -(expect - {:result nil - :permit-taken? false - :permit-closed? false} - (async-wait - (fn [{:keys [result-chan semaphore-chan canceled-chan]}] - (let [permit (tu.async/permit)] - (a/go - (a/<! (a/timeout 10)) - (a/>! canceled-chan :canceled) - (a/<! (a/timeout 100)) - (a/>! semaphore-chan permit)) - {:result (tu.async/wait-for-result result-chan) - :permit-taken? (= (tu.async/wait-for-result semaphore-chan) ::tu.async/timed-out) - :permit-closed? (tu.async/permit-closed? permit)})))) diff --git a/test/metabase/test/util.clj b/test/metabase/test/util.clj index 840584b984d..c705c877ef7 100644 --- a/test/metabase/test/util.clj +++ b/test/metabase/test/util.clj @@ -32,6 +32,7 @@ [table :refer [Table]] [task-history :refer [TaskHistory]] [user :refer [User]]] + [metabase.plugins.classloader :as classloader] [metabase.test.data :as data] [metabase.test.data.dataset-definitions :as defs] [metabase.util.date :as du] @@ -499,6 +500,7 @@ `(do-with-scheduler ~scheduler (fn [] ~@body))) (defn do-with-temp-scheduler [f] + (classloader/the-classloader) (let [temp-scheduler (qs/start (qs/initialize))] (with-scheduler temp-scheduler (try diff --git a/test/metabase/test_setup.clj b/test/metabase/test_setup.clj index ed47f29be45..cb179fce2d9 100644 --- a/test/metabase/test_setup.clj +++ b/test/metabase/test_setup.clj @@ -13,6 +13,7 @@ [metabase.core.initialization-status :as init-status] [metabase.models.setting :as setting] [metabase.plugins.initialize :as plugins.init] + [metabase.query-processor.middleware.async-wait :as qp.middleware.async-wait] [metabase.test.data.env :as tx.env] [yaml.core :as yaml])) @@ -50,9 +51,13 @@ {:expectations-options :before-run} [] ;; We can shave about a second from unit test launch time by doing the various setup stages in on different threads - ;; Start Jetty in the BG so if test setup fails we have an easier time debugging it -- it's trickier to debug things - ;; on a BG thread - (let [start-web-server! (future (server/start-web-server! handler/app))] + (let [start-web-server! + (future + (try + (server/start-web-server! handler/app) + (catch Throwable e + (log/error e "Web server failed to start") + (System/exit -2))))] (try (log/info (format "Setting up %s test DB and running migrations..." (name (mdb/db-type)))) (mdb/setup-db! :auto-migrate true) @@ -71,7 +76,8 @@ (log/error (u/format-color 'red "Test setup failed: %s\n%s" e (u/pprint-to-str (vec (.getStackTrace e))))) (System/exit -1))) - @start-web-server!)) + (u/deref-with-timeout start-web-server! 10000) + nil)) (defn test-teardown @@ -79,7 +85,7 @@ [] (log/info "Shutting down Metabase unit test runner") (server/stop-web-server!) - (shutdown-agents)) + (qp.middleware.async-wait/destroy-all-thread-pools!)) (defn call-with-test-scaffolding "Runs `test-startup` and ensures `test-teardown` is always called. This function is useful for running a test (or test -- GitLab