Skip to content
Snippets Groups Projects
Unverified Commit afadff15 authored by Cam Saul's avatar Cam Saul
Browse files

Async QP improvements :race_car:

parent 006a1801
No related branches found
No related tags found
No related merge requests found
......@@ -102,6 +102,7 @@
[metabase/throttle "1.0.1"] ; Tools for throttling access to API endpoints and other code pathways
[javax.xml.bind/jaxb-api "2.4.0-b180830.0359"] ; add the `javax.xml.bind` classes which we're still using but were removed in Java 11
[net.sf.cssbox/cssbox "4.12" :exclusions [org.slf4j/slf4j-api]] ; HTML / CSS rendering
[org.apache.commons/commons-lang3 "3.9"] ; helper methods for working with java.lang stuff
[org.clojars.pntblnk/clj-ldap "0.0.16"] ; LDAP client
[org.flatland/ordered "1.5.7"] ; ordered maps & sets
[org.liquibase/liquibase-core "3.6.3" ; migration management (Java lib)
......
......@@ -19,7 +19,6 @@
[metabase.models
[setting :as setting]
[user :refer [User]]]
[metabase.query-processor.middleware.async-wait :as qp.middleware.async-wait]
[metabase.util.i18n :refer [set-locale trs]]
[toucan.db :as db]))
......@@ -49,7 +48,6 @@
;; to a Shutdown hook of some sort instead of having here
(task/stop-scheduler!)
(server/stop-web-server!)
(qp.middleware.async-wait/destroy-all-thread-pools!)
(log/info (trs "Metabase Shutdown COMPLETE")))
......
......@@ -62,7 +62,7 @@
(int 5000))
(s/defn field-distinct-values
"Return the distinct values of FIELD.
"Return the distinct values of `field`.
This is used to create a `FieldValues` object for `:type/Category` Fields."
([field]
(field-distinct-values field absolute-max-distinct-values-limit))
......@@ -71,14 +71,14 @@
:limit max-results}))))
(defn field-distinct-count
"Return the distinct count of FIELD."
"Return the distinct count of `field`."
[field & [limit]]
(-> (field-query field {:aggregation [[:distinct [:field-id (u/get-id field)]]]
:limit limit})
first first int))
(defn field-count
"Return the count of FIELD."
"Return the count of `field`."
[field]
(-> (field-query field {:aggregation [[:count [:field-id (u/get-id field)]]]})
first first int))
......
......@@ -46,7 +46,8 @@
[validate :as validate]
[wrap-value-literals :as wrap-value-literals]]
[metabase.util.i18n :refer [tru]]
[schema.core :as s]))
[schema.core :as s])
(:import clojure.core.async.impl.channels.ManyToManyChannel))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | QUERY PROCESSOR |
......@@ -231,8 +232,15 @@
(def ^:private default-pipeline (qp-pipeline execute-query))
(defn process-query
"A pipeline of various QP functions (including middleware) that are used to process MB queries."
(def ^:private QueryResponse
(s/cond-pre
ManyToManyChannel
{:status (s/enum :completed :failed :canceled), s/Any s/Any}))
(s/defn process-query :- QueryResponse
"Process an MBQL query. This is the main entrypoint to the magical realm of the Query Processor. Returns a
core.async channel if option `:async?` is true; otherwise returns results in the usual format. For async queries, if
the core.async channel is closed, the query will be canceled."
{:style/indent 0}
[query]
(default-pipeline query))
......
......@@ -3,7 +3,11 @@
(:require [clojure.core.async :as a]
[clojure.tools.logging :as log]
[metabase.async.util :as async.u]
[metabase.util.i18n :refer [trs]]))
[metabase.config :as config]
[metabase.util
[date :as du]
[i18n :refer [trs tru]]])
(:import java.util.concurrent.TimeoutException))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | async->sync |
......@@ -63,11 +67,28 @@
(swap! in-flight* dec))
{:out-chan out-chan, :canceled-chan canceled-chan, :respond respond, :raise raise}))
(def ^:private query-timeout-ms
"Maximum amount of time to wait for a running query to complete before throwing an Exception."
;; I don't know if these numbers make sense, but my thinking is we want to enable (somewhat) long-running queries on
;; prod but for test and dev purposes we want to fail faster because it usually means I broke something in the QP
;; code
(cond
config/is-prod? (* 20 60 1000) ; twenty minutes
config/is-test? (* 30 1000) ; 30 seconds
config/is-dev? (* 5 60 1000))) ; 5 minutes
(defn- wait-for-result [out-chan]
;; TODO - there should probably be some sort of max timeout here for out-chan. At least for test/dev purposes
(let [result (a/<!! out-chan)]
(if (instance? Throwable result)
(let [[result port] (a/alts!! [out-chan (a/timeout query-timeout-ms)])]
(cond
(instance? Throwable result)
(throw result)
(not= port out-chan)
(do
(a/close! out-chan)
(throw (TimeoutException. (str (tru "Query timed out after %s" (du/format-milliseconds query-timeout-ms))))))
:else
result)))
(defn async-setup
......
......@@ -14,7 +14,8 @@
[metabase.util.i18n :refer [trs]]
[schema.core :as s])
(:import clojure.lang.Var
[java.util.concurrent Executors ExecutorService]))
[java.util.concurrent Executors ExecutorService]
org.apache.commons.lang3.concurrent.BasicThreadFactory$Builder))
(defsetting max-simultaneous-queries-per-db
(trs "Maximum number of simultaneous queries to allow per connected Database.")
......@@ -27,6 +28,19 @@
;; thread pools that ultimately don't get used
(defonce ^:private db-thread-pool-lock (Object.))
(defn- new-thread-pool ^ExecutorService [database-id]
(Executors/newFixedThreadPool
(max-simultaneous-queries-per-db)
(.build
(doto (BasicThreadFactory$Builder.)
(.namingPattern (format "qp-database-%d-threadpool-%%d" database-id))
;; Daemon threads do not block shutdown of the JVM
(.daemon true)
;; TODO - what should the priority of QP threads be? It seems like servicing general API requests should be
;; higher (?)
;; Stuff like Pulse sending and sync should definitely by MIN_PRIORITY (!)
#_(.priority Thread/MIN_PRIORITY)))))
(s/defn ^:private db-thread-pool :- ExecutorService
[database-or-id]
(let [id (u/get-id database-or-id)]
......@@ -36,7 +50,7 @@
(or
(@db-thread-pools id)
(log/debug (trs "Creating new query thread pool for Database {0}" id))
(let [new-pool (Executors/newFixedThreadPool (max-simultaneous-queries-per-db))]
(let [new-pool (new-thread-pool id)]
(swap! db-thread-pools assoc id new-pool)
new-pool))))))
......@@ -50,14 +64,6 @@
(log/debug (trs "Destroying query thread pool for Database {0}" id))
(.shutdownNow thread-pool)))))
(defn destroy-all-thread-pools!
"Destroy all QP thread pools (done on shutdown)."
[]
(locking db-thread-pool-lock
(let [[old] (reset-vals! db-thread-pools nil)]
(doseq [^ExecutorService pool (vals old)]
(.shutdownNow pool)))))
(def ^:private ^:dynamic *already-in-thread-pool?* false)
(defn- runnable ^Runnable [qp query respond raise canceled-chan]
......
......@@ -76,16 +76,16 @@
for any other queries, including ones for determining FieldValues."
[_ _]
(fn [query]
{:data
{:rows
(cond
(is-table-row-count-query? query)
[[1000]]
{:status :completed
:data {:rows
(cond
(is-table-row-count-query? query)
[[1000]]
(is-table-sample-query? query)
(let [fields-count (count (get-in query [:query :fields]))]
(for [i (range 500)]
(repeat fields-count i)))
(is-table-sample-query? query)
(let [fields-count (count (get-in query [:query :fields]))]
(for [i (range 500)]
(repeat fields-count i)))
:else
nil)}}))
:else
nil)}}))
......@@ -13,7 +13,6 @@
[metabase.core.initialization-status :as init-status]
[metabase.models.setting :as setting]
[metabase.plugins.initialize :as plugins.init]
[metabase.query-processor.middleware.async-wait :as qp.middleware.async-wait]
[metabase.test.data.env :as tx.env]
[yaml.core :as yaml]))
......@@ -83,8 +82,7 @@
{:expectations-options :after-run}
[]
(log/info "Shutting down Metabase unit test runner")
(server/stop-web-server!)
(qp.middleware.async-wait/destroy-all-thread-pools!))
(server/stop-web-server!))
(defn call-with-test-scaffolding
"Runs `test-startup` and ensures `test-teardown` is always called. This function is useful for running a test (or test
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment