Skip to content
Snippets Groups Projects
Unverified Commit f8349b51 authored by Cam Saul's avatar Cam Saul Committed by GitHub
Browse files

StreamingResponse async tweaks (#12049)

* Upgrade Jetty version

* Remove async-wait middleware

* Move re= test macro

* Add mt/user->credentials alias

* Convert metabase.server-test to new style

* Rework dataset-test to use metabase.test

* Use atom instead of agent for tracking in-flight queries

* Streaming QP context should correctly pass the canceled-chan created by StreamingResponse

* Save query executions synchronously

* i18n the query logs

* Reworked StreamingResponse

* Almost working :interrobang:

* Test fixes

* Test impl & some perf improvements

* Test/lint fix :wrench:
parent 25465140
No related branches found
No related tags found
No related merge requests found
Showing
with 598 additions and 605 deletions
......@@ -113,7 +113,7 @@
[net.sf.cssbox/cssbox "4.12" :exclusions [org.slf4j/slf4j-api]] ; HTML / CSS rendering
[org.apache.commons/commons-lang3 "3.9"] ; helper methods for working with java.lang stuff
[org.clojars.pntblnk/clj-ldap "0.0.16"] ; LDAP client
[org.eclipse.jetty/jetty-server "9.4.15.v20190215"] ; We require JDK 8 which allows us to run Jetty 9.4, ring-jetty-adapter runs on 1.7 which forces an older version
[org.eclipse.jetty/jetty-server "9.4.27.v20200227"] ; We require JDK 8 which allows us to run Jetty 9.4, ring-jetty-adapter runs on 1.7 which forces an older version
[org.flatland/ordered "1.5.7"] ; ordered maps & sets
[org.liquibase/liquibase-core "3.6.3" ; migration management (Java lib)
:exclusions [ch.qos.logback/logback-classic]]
......
(ns metabase.async.streaming-response
(:require [cheshire.core :as json]
[clojure.core.async :as a]
[clojure.tools.logging :as log]
compojure.response
[metabase.async.streaming-response.thread-pool :as thread-pool]
[metabase.util :as u]
[metabase.util.i18n :refer [trs]]
[potemkin.types :as p.types]
[pretty.core :as pretty]
[ring.core.protocols :as ring.protocols]
......@@ -15,118 +18,176 @@
(def ^:private keepalive-interval-ms
"Interval between sending newline characters to keep Heroku from terminating requests like queries that take a long
time to complete."
(u/seconds->ms 1)) ; one second
(u/seconds->ms 1))
(defn- write-to-output-stream!
([^OutputStream os x]
(if (int? x)
(.write os ^int x)
(.write os ^bytes x)))
([^OutputStream os ^bytes ba ^Integer offset ^Integer len]
(.write os ba offset len)))
(defn- jetty-eof-canceling-output-stream
"Wraps an `OutputStream` and sends a message to `canceled-chan` if a jetty `EofException` is thrown when writing to
the stream."
^OutputStream [^OutputStream os canceled-chan]
(proxy [FilterOutputStream] [os]
(flush []
(try
(.flush os)
(catch EofException e
(log/trace "Caught EofException")
(a/>!! canceled-chan ::cancel)
(throw e))))
(write
([x]
(try
(if (int? x)
(.write os ^int x)
(.write os ^bytes x))
(write-to-output-stream! os x)
(catch EofException e
(log/trace "Caught EofException")
(a/>!! canceled-chan ::cancel)
(throw e))))
([^bytes ba ^Integer off ^Integer len]
([ba off len]
(try
(.write os ba off len)
(write-to-output-stream! os ba off len)
(catch EofException e
(log/trace "Caught EofException")
(a/>!! canceled-chan ::cancel)
(throw e)))))))
(defn- start-keepalive-loop! [^OutputStream os write-keepalive-newlines? continue-writing-newlines?]
(a/go-loop []
(a/<! (a/timeout keepalive-interval-ms))
;; by still attempting to flush even when not writing newlines we can hopefully trigger an EofException if the
;; request is canceled
(when @continue-writing-newlines?
(when (try
(when write-keepalive-newlines?
(.write os (byte \newline)))
(.flush os)
::recur
(catch Throwable _
nil))
(recur)))))
(defn- keepalive-output-stream
"Wraps an `OutputStream` and writes keepalive newline bytes every interval until someone else starts writing to the
stream."
^OutputStream [^OutputStream os write-keepalive-newlines?]
(let [write-newlines? (atom true)]
(a/go-loop []
(a/<! (a/timeout keepalive-interval-ms))
(when @write-newlines?
(when write-keepalive-newlines?
(.write os (byte \newline)))
(.flush os)
(recur)))
(let [continue-writing-newlines? (atom true)]
(start-keepalive-loop! os write-keepalive-newlines? continue-writing-newlines?)
(proxy [FilterOutputStream] [os]
(close []
(reset! continue-writing-newlines? false)
(u/ignore-exceptions (.close os)))
(write
([x]
(reset! continue-writing-newlines? false)
(write-to-output-stream! os x))
([ba off len]
(reset! continue-writing-newlines? false)
(write-to-output-stream! os ba off len))))))
(defn- stop-writing-after-close-stream
"Wraps `OutputStreams` and ignores additional calls to `flush` and `write` after it has been closed. (Background: I
think we call flush a few more times than we need to which causes Jetty and the GZIPOutputStream to throw
Exceptions.)"
^OutputStream [^OutputStream os]
(let [closed? (atom false)]
(proxy [FilterOutputStream] [os]
(close []
(reset! write-newlines? false)
(let [^FilterOutputStream this this]
(proxy-super close)))
(when-not @closed?
(reset! closed? true)
(u/ignore-exceptions (.close os))))
(flush []
(when-not @closed?
(.flush os)))
(write
([x]
(reset! write-newlines? false)
(if (int? x)
(.write os ^int x)
(.write os ^bytes x)))
([^bytes ba ^Integer off ^Integer len]
(reset! write-newlines? false)
(.write os ba off len))))))
(defmacro ^:private with-open-chan [[chan-binding chan] & body]
`(let [chan# ~chan
~chan-binding chan#]
(try
~@body
(finally
(a/close! chan#)))))
;; TODO - this code is basically duplicated with the code in the QP catch-exceptions middleware; we should refactor to
;; remove the duplication
(defn- exception-chain [^Throwable e]
(->> (iterate #(.getCause ^Throwable %) e)
(take-while some?)
reverse))
(when-not @closed?
(write-to-output-stream! os x)))
([ba off len]
(when-not @closed?
(write-to-output-stream! os ba off len)))))))
(defn- ex-status-code [e]
(or (some #((some-fn :status-code :status) (ex-data %))
(take-while some? (iterate ex-cause e)))
500))
(defn- format-exception [e]
(let [format-ex* (fn [^Throwable e]
{:message (.getMessage e)
:class (.getCanonicalName (class e))
:stacktrace (mapv str (.getStackTrace e))
:data (ex-data e)})
[e & more :as chain] (exception-chain e)]
(merge
(format-ex* e)
{:_status (or (some #((some-fn :status-code :status) (ex-data %))
chain)
500)}
(when (seq more)
{:via (map format-ex* more)}))))
(assoc (Throwable->map e) :_status (ex-status-code e)))
(defn write-error!
"Write an error to the output stream, formatting it nicely."
"Write an error to the output stream, formatting it nicely. Closes output stream afterwards."
[^OutputStream os obj]
(if (instance? Throwable obj)
(recur os (format-exception obj))
(try
(with-open [writer (BufferedWriter. (OutputStreamWriter. os StandardCharsets/UTF_8))]
(json/generate-stream obj writer)
(.flush writer))
(catch Throwable _))))
(with-open [os os]
(log/trace (pr-str (list 'write-error! obj)))
(try
(with-open [writer (BufferedWriter. (OutputStreamWriter. os StandardCharsets/UTF_8))]
(json/generate-stream obj writer))
(catch EofException _)
(catch Throwable e
(log/error e (trs "Error writing error to output stream") obj))))))
(defn- do-f* [f ^OutputStream os finished-chan canceled-chan]
(try
(f os canceled-chan)
(catch EofException _
(a/>!! canceled-chan ::cancel)
nil)
(catch InterruptedException _
(a/>!! canceled-chan ::cancel)
nil)
(catch Throwable e
(log/error e (trs "Caught unexpected Exception in streaming response body"))
(write-error! os e)
nil)
(finally
(a/>!! finished-chan (if (a/poll! canceled-chan)
:canceled
:done))
(a/close! finished-chan)
(a/close! canceled-chan))))
(defn- write-to-stream! [f {:keys [write-keepalive-newlines? gzip?], :as options} ^OutputStream os finished-chan]
(defn- do-f [f {:keys [gzip? write-keepalive-newlines?], :or {write-keepalive-newlines? true}, :as options}
^OutputStream os finished-chan canceled-chan]
(if gzip?
(with-open [gzos (GZIPOutputStream. os true)]
(write-to-stream! f (dissoc options :gzip?) gzos finished-chan))
(with-open-chan [canceled-chan (a/promise-chan)]
(with-open [os os
os (jetty-eof-canceling-output-stream os canceled-chan)
os (keepalive-output-stream os write-keepalive-newlines?)]
(try
(f os canceled-chan)
(catch Throwable e
(write-error! os {:message (.getMessage e)}))
(finally
(.flush os)
(a/>!! finished-chan (if (a/poll! canceled-chan)
:canceled
:done))
(a/close! finished-chan)))))))
(do-f f (assoc options :gzip? false) gzos finished-chan canceled-chan)
(.finish gzos))
(with-open [os (jetty-eof-canceling-output-stream os canceled-chan)
os (keepalive-output-stream os write-keepalive-newlines?)
os (stop-writing-after-close-stream os)]
(do-f* f os finished-chan canceled-chan)
(.flush os))))
(defn- do-f-async [f options ^OutputStream os finished-chan]
(let [canceled-chan (a/promise-chan identity identity)
task (bound-fn []
(try
(do-f f options os finished-chan canceled-chan)
(finally
(u/ignore-exceptions (.close os)))))
futur (.submit (thread-pool/thread-pool) ^Runnable task)]
(a/go
(when (a/<! canceled-chan)
(log/trace "Canceling async thread")
(future-cancel futur)
(a/>!! finished-chan :canceled)
(a/close! finished-chan)
(a/close! canceled-chan)
(.flush os)
(.close os)))))
;; `ring.middleware.gzip` doesn't work on our StreamingResponse class.
(defn- should-gzip-response?
......@@ -144,7 +205,7 @@
;; both sync and async responses
ring.protocols/StreamableResponseBody
(write-body-to-stream [_ _ os]
(write-to-stream! f options os donechan))
(do-f-async f options os donechan))
;; sync responses only
compojure.response/Renderable
......@@ -195,4 +256,4 @@
{:pre [(= (count bindings) 2)]}
`(->StreamingResponse (fn [~(vary-meta os-binding assoc :tag 'java.io.OutputStream) ~canceled-chan-binding] ~@body)
~options
(a/promise-chan)))
(a/promise-chan identity identity)))
(ns metabase.async.streaming-response.thread-pool
(:require [metabase.config :as config])
(:import [java.util.concurrent Executors ThreadPoolExecutor]
org.apache.commons.lang3.concurrent.BasicThreadFactory$Builder))
(def ^:private ^Long thread-pool-max-size
(or (config/config-int :mb-jetty-maxthreads) 50))
(defonce ^:private thread-pool*
(delay
(Executors/newFixedThreadPool thread-pool-max-size
(.build
(doto (BasicThreadFactory$Builder.)
(.namingPattern "streaming-response-thread-pool-%d")
;; Daemon threads do not block shutdown of the JVM
(.daemon true))))))
(defn thread-pool
"Thread pool for asynchronously running streaming responses."
^ThreadPoolExecutor []
@thread-pool*)
(defn active-thread-count
"The number of active streaming response threads."
[]
(.getActiveCount (thread-pool)))
(defn queued-thread-count
"The number of queued streaming response threads."
[]
(count (.getQueue (thread-pool))))
......@@ -9,8 +9,8 @@
[metabase.async
[streaming-response :as streaming-response]
[util :as async.u]]
[metabase.async.streaming-response.thread-pool :as streaming-response.thread-pool]
[metabase.middleware.util :as middleware.u]
[metabase.query-processor.middleware.async :as qp.middleware.async]
[metabase.util.i18n :refer [trs]]
[toucan.db :as db])
(:import clojure.core.async.impl.channels.ManyToManyChannel
......@@ -37,7 +37,7 @@
(str
(format "%s %s %d" (str/upper-case (name request-method)) uri status)
(when async-status
(format " [ASYNC: %s]" async-status))))
(format " [%s: %s]" (trs "ASYNC") async-status))))
(defn- format-performance-info
[{:keys [start-time call-count-fn]
......@@ -45,19 +45,22 @@
call-count-fn (constantly -1)}}]
(let [elapsed-time (u/format-nanoseconds (- (System/nanoTime) start-time))
db-calls (call-count-fn)]
(format "%s (%d DB calls)" elapsed-time db-calls)))
(trs "{0} ({1} DB calls)" elapsed-time db-calls)))
(defn- format-threads-info [{:keys [include-stats?]}]
(when include-stats?
(str
(when-let [^QueuedThreadPool pool (some-> (server/instance) .getThreadPool)]
(format "Jetty threads: %s/%s (%s idle, %s queued) "
(trs "Jetty threads: {0}/{1} ({2} idle, {3} queued) "
(.getBusyThreads pool)
(.getMaxThreads pool)
(.getIdleThreads pool)
(.getQueueSize pool)))
(format "(%d total active threads) " (Thread/activeCount))
(format "Queries in flight: %d" (qp.middleware.async/in-flight)))))
(trs "({0} total active threads)" (Thread/activeCount))
" "
(trs "Queries in flight: {0}" (streaming-response.thread-pool/active-thread-count))
" "
(trs "({0} queued)" (streaming-response.thread-pool/queued-thread-count)))))
(defn- format-error-info [{{:keys [body]} :response} {:keys [error?]}]
(when (and error?
......@@ -142,7 +145,7 @@
(let [finished-chan (streaming-response/finished-chan streaming-response)]
(a/go
(let [result (a/<! finished-chan)]
(log-info (assoc info :async-status (if (:canceled result) "canceled" "completed")))))))
(log-info (assoc info :async-status (if (= result :canceled) "canceled" "completed")))))))
(defn- logged-response
"Log an API response. Returns resonse, possibly modified (i.e., core.async channels will be wrapped); this value
......
......@@ -61,6 +61,9 @@
;;; ------------------------------------------------------ i18n ------------------------------------------------------
(def ^:private available-locales
(delay (puppet-i18n/available-locales)))
(defn bind-user-locale
"Middleware that binds locale info for the current User. (This is basically a copy of the
`puppetlabs.i18n.core/locale-negotiator`, but reworked to handle async-style requests as well.)"
......@@ -70,7 +73,7 @@
(let [headers (:headers request)
parsed (puppet-i18n/parse-http-accept-header (get headers "accept-language"))
wanted (mapv first parsed)
negotiated ^java.util.Locale (puppet-i18n/negotiate-locale wanted (puppet-i18n/available-locales))]
negotiated ^java.util.Locale (puppet-i18n/negotiate-locale wanted @available-locales)]
(puppet-i18n/with-user-locale negotiated
(handler request respond raise)))))
......
......@@ -44,15 +44,6 @@
(catch Throwable e
(log/error e (trs "Error unscheduling tasks for DB.")))))
(defn- destroy-qp-thread-pool!
[database]
(try
(classloader/the-classloader)
(classloader/require 'metabase.query-processor.middleware.async-wait)
((resolve 'metabase.query-processor.middleware.async-wait/destroy-thread-pool!) database)
(catch Throwable e
(log/error e (trs "Error destroying thread pool for DB.")))))
(defn- post-insert [database]
(u/prog1 database
;; add this database to the All Users permissions groups
......@@ -67,7 +58,6 @@
(defn- pre-delete [{id :id, driver :engine, :as database}]
(unschedule-tasks! database)
(destroy-qp-thread-pool! database)
(db/delete! 'Card :database_id id)
(db/delete! 'Permissions :object [:like (str (perms/object-path id) "%")])
(db/delete! 'Table :db_id id)
......
......@@ -20,8 +20,6 @@
[add-source-metadata :as add-source-metadata]
[add-timezone-info :as add-timezone-info]
[annotate :as annotate]
[async :as async]
[async-wait :as async-wait]
[auto-bucket-datetimes :as bucket-datetime]
[binning :as binning]
[cache :as cache]
......@@ -91,13 +89,11 @@
#'resolve-database-and-driver/resolve-database-and-driver
#'fetch-source-query/resolve-card-id-source-tables
#'store/initialize-store
#'async-wait/wait-for-turn
#'cache/maybe-return-cached-results
#'validate/validate-query
#'normalize/normalize
#'add-rows-truncated/add-rows-truncated
#'results-metadata/record-and-return-metadata!
#'async/count-in-flight-queries])
#'results-metadata/record-and-return-metadata!])
;; ▲▲▲ PRE-PROCESSING ▲▲▲ happens from BOTTOM-TO-TOP, e.g. the results of `expand-macros` are passed to
;; `substitute-parameters`
......@@ -136,8 +132,7 @@
(def ^:private ^:const preprocessing-timeout-ms 10000)
(defn- preprocess-query [query context]
(binding [*preprocessing-level* (inc *preprocessing-level*)
async-wait/*disable-async-wait* true]
(binding [*preprocessing-level* (inc *preprocessing-level*)]
;; record the number of recursive preprocesses taking place to prevent infinite preprocessing loops.
(log/tracef "*preprocessing-level*: %d" *preprocessing-level*)
(when (>= *preprocessing-level* max-preprocessing-level)
......
......@@ -122,5 +122,5 @@
:cancelf default-cancelf
:timeoutf default-timeoutf
:resultf default-resultf
:canceled-chan (a/promise-chan)
:out-chan (a/promise-chan)})
:canceled-chan (a/promise-chan identity identity)
:out-chan (a/promise-chan identity identity)})
(ns metabase.query-processor.middleware.async
(:require [clojure.core.async :as a]
[metabase.query-processor.context :as context]))
(def ^:private in-flight* (agent 0))
(defn in-flight
"Return the number of queries currently in flight."
[]
@in-flight*)
(defn count-in-flight-queries
"Middleware that tracks the current number of queries in flight."
[qp]
(fn [query rff context]
(send in-flight* inc)
(let [out-chan (context/out-chan context)]
(a/go
(a/<! out-chan)
(send in-flight* dec)))
(qp query rff context)))
(ns metabase.query-processor.middleware.async-wait
"Middleware that limits the number of concurrent queries for each database.
Each connected database is limited to a maximum of 15 simultaneous queries (configurable) using these methods; any
additional queries will park the thread. Super-useful for writing high-performance API endpoints. Prefer these
methods to the old-school synchronous versions.
How is this achieved? For each Database, we'll maintain a thread pool executor to limit the number of simultaneous
queries."
(:require [clojure.core.async :as a]
[clojure.tools.logging :as log]
[metabase.models.setting :refer [defsetting]]
[metabase.query-processor.context :as context]
[metabase.util :as u]
[metabase.util.i18n :refer [deferred-trs trs]]
[schema.core :as s])
(:import [java.util.concurrent Executors ExecutorService]
org.apache.commons.lang3.concurrent.BasicThreadFactory$Builder))
(defsetting max-simultaneous-queries-per-db
(deferred-trs "Maximum number of simultaneous queries to allow per connected Database.")
:type :integer
:default 15)
(defonce ^:private db-thread-pools (atom {}))
;; easier just to use a lock for creating the pools rather than using `swap-vals!` and having to nuke a bunch of
;; thread pools that ultimately don't get used
(defonce ^:private db-thread-pool-lock (Object.))
(defn- new-thread-pool ^ExecutorService [database-id]
(Executors/newFixedThreadPool
(max-simultaneous-queries-per-db)
(.build
(doto (BasicThreadFactory$Builder.)
(.namingPattern (format "qp-database-%d-threadpool-%%d" database-id))
;; Daemon threads do not block shutdown of the JVM
(.daemon true)
;; Running queries should be lower priority than other stuff e.g. API responses
(.priority Thread/MIN_PRIORITY)))))
(s/defn ^:private db-thread-pool :- ExecutorService
[database-or-id]
(let [id (u/get-id database-or-id)]
(or
(@db-thread-pools id)
(locking db-thread-pool-lock
(or
(@db-thread-pools id)
(log/debug (trs "Creating new query thread pool for Database {0}" id))
(let [new-pool (new-thread-pool id)]
(swap! db-thread-pools assoc id new-pool)
new-pool))))))
(defn destroy-thread-pool!
"Destroy the QP thread pool for a Database (done automatically when DB is deleted.)"
[database-or-id]
(let [id (u/get-id database-or-id)]
(let [[{^ExecutorService thread-pool id}] (locking db-thread-pool-lock
(swap-vals! db-thread-pools dissoc id))]
(when thread-pool
(log/debug (trs "Destroying query thread pool for Database {0}" id))
(.shutdownNow thread-pool)))))
(def ^:private ^:dynamic *already-in-thread-pool?*
"True if the current thread is a thread pool thread from the a DB thread pool (i.e., if we're already running
asynchronously after waiting if needed.)"
false)
(def ^:dynamic *disable-async-wait*
"Whether to disable async waiting entirely. Bind this to `true` for cases where we would not like to enforce async
waiting, such as for functions like `qp/query->native` that don't actually run queries.
DO NOT BIND THIS TO TRUE IN SITUATIONS WHERE WE ACTUALLY RUN QUERIES: some functionality relies on the fact that
things are ran in a separate thread to function correctly, such as the cancellation code that listens for
InterruptedExceptions."
false)
(defn- runnable ^Runnable [qp query rff context]
(bound-fn []
(binding [*already-in-thread-pool?* true]
(try
(qp query rff context)
(catch Throwable e
(context/raisef e context))))))
(defn- run-in-thread-pool [qp {database-id :database, :as query} rff context]
{:pre [(integer? database-id)]}
(try
(let [pool (db-thread-pool database-id)
futur (.submit pool (runnable qp query rff context))
canceled-chan (context/canceled-chan context)]
(a/go
(when (a/<! canceled-chan)
(log/debug (trs "Request canceled, canceling pending query"))
(future-cancel futur))))
(catch Throwable e
(context/raisef e context)))
nil)
(defn wait-for-turn
"Middleware that throttles the number of concurrent queries for each connected database, parking the thread until it
is allowed to run."
[qp]
(fn [query rff context]
{:pre [(map? query)]}
(if (or *already-in-thread-pool?* *disable-async-wait*)
(qp query rff context)
(run-in-thread-pool qp query rff context))))
......@@ -9,9 +9,7 @@
[query-execution :as query-execution :refer [QueryExecution]]]
[metabase.query-processor.util :as qputil]
[metabase.util.i18n :refer [trs]]
[toucan.db :as db])
(:import [java.util.concurrent Executors Future]
org.apache.commons.lang3.concurrent.BasicThreadFactory$Builder))
[toucan.db :as db]))
(defn- add-running-time [{start-time-ms :start_time_millis, :as query-execution}]
(-> query-execution
......@@ -28,7 +26,7 @@
;;
;; Async seems like it makes sense from a performance standpoint, but should we have some sort of shared threadpool
;; for other places where we would want to do async saves (such as results-metadata for Cards?)
(defn- save-query-execution!
(defn- save-query-execution!*
"Save a `QueryExecution` and update the average execution time for the corresponding `Query`."
[{query :json_query, query-hash :hash, running-time :running_time, context :context :as query-execution}]
(query/save-query-and-update-average-execution-time! query query-hash running-time)
......@@ -36,42 +34,22 @@
(log/warn (trs "Cannot save QueryExecution, missing :context"))
(db/insert! QueryExecution (dissoc query-execution :json_query))))
(def ^:private ^Long thread-pool-size 4)
(def ^:private ^{:arglists '(^java.util.concurrent.ExecutorService [])} thread-pool
"Thread pool for asynchronously saving query executions."
(let [pool (delay
(Executors/newFixedThreadPool
thread-pool-size
(.build
(doto (BasicThreadFactory$Builder.)
(.namingPattern "save-query-execution-thread-pool-%d")
;; Daemon threads do not block shutdown of the JVM
(.daemon true)
;; Save query executions should be lower priority than other stuff e.g. API responses
(.priority Thread/MIN_PRIORITY)))))]
(fn [] @pool)))
(defn- save-query-execution-async!
"Asynchronously save a `QueryExecution` row containing `execution-info`. This is done when a query is finished, so
(defn- save-query-execution!
"Save a `QueryExecution` row containing `execution-info`. This is done when a query is finished, so
regardless of whether results streaming is canceled, we want to continue the save; for this reason, we don't call
`future-cancel` if we get a message to `canceled-chan` the way we normally do."
^Future [execution-info]
(log/trace "Saving QueryExecution info asynchronously")
(let [execution-info (add-running-time execution-info)
^Runnable task (bound-fn []
(try
(save-query-execution! execution-info)
(catch Throwable e
(log/error e (trs "Error saving query execution info"))))
nil)]
(.submit (thread-pool) task)))
[execution-info]
(log/trace "Saving QueryExecution info")
(try
(save-query-execution!* (add-running-time execution-info))
(catch Throwable e
(log/error e (trs "Error saving query execution info")))))
(defn- save-successful-query-execution-async! [query-execution result-rows]
(save-query-execution-async! (assoc query-execution :result_rows result-rows)))
(defn- save-successful-query-execution! [query-execution result-rows]
(save-query-execution! (assoc query-execution :result_rows result-rows)))
(defn- save-failed-query-execution-async! [query-execution message]
(save-query-execution-async! (assoc query-execution :error (str message))))
(defn- save-failed-query-execution! [query-execution message]
(save-query-execution! (assoc query-execution :error (str message))))
;;; +----------------------------------------------------------------------------------------------------------------+
......@@ -100,7 +78,7 @@
(rf))
([acc]
(save-successful-query-execution-async! execution-info @row-count)
(save-successful-query-execution! execution-info @row-count)
(rf (if (map? acc)
(success-response execution-info acc)
acc)))
......@@ -142,7 +120,7 @@
(letfn [(rff* [metadata]
(add-and-save-execution-info-xform! metadata execution-info (rff metadata)))
(raisef* [^Throwable e context]
(save-failed-query-execution-async! execution-info (.getMessage e))
(save-failed-query-execution! execution-info (.getMessage e))
(raisef (ex-info (.getMessage e)
{:query-execution execution-info}
e)
......
......@@ -22,7 +22,7 @@
(cond
(not= port out-chan) (context/timeoutf context)
(nil? val) (context/cancelf context))
(log/tracef "Closing out-chan and canceled-chan.")
(log/tracef "Closing out-chan.")
(a/close! out-chan)
(a/close! canceled-chan)))
nil))
......
......@@ -47,25 +47,33 @@
the normal `streaming-response` macro, which is geared toward Ring responses.
(with-open [os ...]
(qp/process-query query (qp.streaming/streaming-context :csv os)))"
[export-format ^OutputStream os]
(let [results-writer (i/streaming-results-writer export-format os)]
{:rff (streaming-rff results-writer)
:reducedf (streaming-reducedf results-writer os)}))
(qp/process-query query (qp.streaming/streaming-context :csv os canceled-chan)))"
([export-format ^OutputStream os]
(let [results-writer (i/streaming-results-writer export-format os)]
{:rff (streaming-rff results-writer)
:reducedf (streaming-reducedf results-writer os)}))
([export-format os canceled-chan]
(assoc (streaming-context export-format os) :canceled-chan canceled-chan)))
(defn- await-async-result [out-chan canceled-chan]
;; if we get a cancel message, close `out-chan` so the query will be canceled
(a/go
(when (a/<! canceled-chan)
(a/close! out-chan)))
;; block until `out-chan` closes or gets a result
(a/<!! out-chan))
(defn streaming-response*
"Impl for `streaming-response`."
[export-format f]
(streaming-response/streaming-response (i/stream-options export-format) [os canceled-chan]
(let [result (try
(f (streaming-context export-format os))
(f (streaming-context export-format os canceled-chan))
(catch Throwable e
e))
result (if (instance? ManyToManyChannel result)
(let [[val port] (a/alts!! [result canceled-chan])]
(when (= port canceled-chan)
(a/close! result))
val)
(await-async-result result canceled-chan)
result)]
(when (or (instance? Throwable result)
(= (:status result) :failed))
......
......@@ -21,17 +21,12 @@
[permissions-group :as group]
[query-execution :refer [QueryExecution]]]
[metabase.query-processor.middleware.constraints :as constraints]
[metabase.test
[data :as data]
[util :as tu]]
[metabase.test.data
[dataset-definitions :as defs]
[datasets :as datasets]
[users :as test-users]]
[metabase.test.util.log :as tu.log]
[metabase.test.util :as tu]
[schema.core :as s]
[toucan.db :as db]
[toucan.util.test :as tt])
[toucan.db :as db])
(:import com.fasterxml.jackson.core.JsonGenerator))
(defn- format-response [m]
......@@ -86,7 +81,7 @@
:row_count 1
:result_rows 1
:context :ad-hoc
:executor_id (test-users/user->id :rasta)
:executor_id (mt/user->id :rasta)
:native false
:pulse_id nil
:card_id nil
......@@ -111,7 +106,7 @@
error-message
(re-find #"Syntax error in SQL statement")
boolean))))
result (tu.log/suppress-output
result (mt/suppress-output
((mt/user->client :rasta) :post 202 "dataset" {:database (mt/id)
:type "native"
:native {:query "foobar"}}))]
......@@ -143,7 +138,7 @@
:database_id (mt/id)
:started_at true
:running_time true
:executor_id (test-users/user->id :rasta)
:executor_id (mt/user->id :rasta)
:native true
:pulse_id nil
:card_id nil
......@@ -188,8 +183,8 @@
["3" "2014-09-15" "8" "56"]
["4" "2014-03-11" "5" "4"]
["5" "2013-05-05" "3" "49"]]
(let [result ((test-users/user->client :rasta) :post 202 "dataset/csv" :query
(json/generate-string (data/mbql-query checkins)))]
(let [result ((mt/user->client :rasta) :post 202 "dataset/csv" :query
(json/generate-string (mt/mbql-query checkins)))]
(take 5 (parse-and-sort-csv result))))))
(deftest download-response-headers-test
......@@ -201,28 +196,28 @@
"X-Accel-Buffering" "no"}
(-> (http-client/client-full-response (test-users/username->token :rasta)
:post 202 "dataset/csv"
:query (json/generate-string (data/mbql-query checkins {:limit 1})))
:query (json/generate-string (mt/mbql-query checkins {:limit 1})))
:headers
(select-keys ["Cache-Control" "Content-Disposition" "Content-Type" "Expires" "X-Accel-Buffering"])
(update "Content-Disposition" #(some-> % (str/replace #"query_result_.+(\.\w+)"
"query_result_<timestamp>$1"))))))))
(deftest check-an-empty-date-column
(is (= [["1" "2014-04-07" "" "5" "12"]
["2" "2014-09-18" "" "1" "31"]
["3" "2014-09-15" "" "8" "56"]
["4" "2014-03-11" "" "5" "4"]
["5" "2013-05-05" "" "3" "49"]]
(data/dataset defs/test-data-with-null-date-checkins
(let [result ((test-users/user->client :rasta) :post 202 "dataset/csv" :query
(json/generate-string (data/mbql-query checkins)))]
(mt/dataset defs/test-data-with-null-date-checkins
(let [result ((mt/user->client :rasta) :post 202 "dataset/csv" :query
(json/generate-string (mt/mbql-query checkins)))]
(is (= [["1" "2014-04-07" "" "5" "12"]
["2" "2014-09-18" "" "1" "31"]
["3" "2014-09-15" "" "8" "56"]
["4" "2014-03-11" "" "5" "4"]
["5" "2013-05-05" "" "3" "49"]]
(take 5 (parse-and-sort-csv result)))))))
(deftest sqlite-datetime-test
(mt/test-driver :sqlite
(testing "SQLite doesn't return proper date objects but strings, they just pass through the qp untouched"
(let [result ((test-users/user->client :rasta) :post 202 "dataset/csv" :query
(json/generate-string (data/mbql-query checkins {:order-by [[:asc $id]], :limit 5})))]
(let [result ((mt/user->client :rasta) :post 202 "dataset/csv" :query
(json/generate-string (mt/mbql-query checkins {:order-by [[:asc $id]], :limit 5})))]
(is (= [["1" "2014-04-07" "5" "12"]
["2" "2014-09-18" "1" "31"]
["3" "2014-09-15" "8" "56"]
......@@ -231,8 +226,8 @@
(parse-and-sort-csv result)))))))
(deftest datetime-fields-are-untouched-when-exported
(let [result ((test-users/user->client :rasta) :post 202 "dataset/csv" :query
(json/generate-string (data/mbql-query users {:order-by [[:asc $id]], :limit 5})))]
(let [result ((mt/user->client :rasta) :post 202 "dataset/csv" :query
(json/generate-string (mt/mbql-query users {:order-by [[:asc $id]], :limit 5})))]
(is (= [["1" "Plato Yeshua" "2014-04-01T08:30:00"]
["2" "Felipinho Asklepios" "2014-12-05T15:15:00"]
["3" "Kaneonuskatew Eiran" "2014-11-06T16:15:00"]
......@@ -241,10 +236,10 @@
(parse-and-sort-csv result)))))
(deftest check-that-we-can-export-the-results-of-a-nested-query
(tt/with-temp Card [card {:dataset_query {:database (data/id)
(mt/with-temp Card [card {:dataset_query {:database (mt/id)
:type :native
:native {:query "SELECT * FROM USERS;"}}}]
(let [result ((test-users/user->client :rasta) :post 202 "dataset/csv"
(let [result ((mt/user->client :rasta) :post 202 "dataset/csv"
:query (json/generate-string
{:database mbql.s/saved-questions-virtual-database-id
:type :query
......@@ -261,11 +256,11 @@
;; from one that had it -- see #9831)
(deftest formatted-results-ignore-query-constraints
(with-redefs [constraints/default-query-constraints {:max-results 10, :max-results-bare-rows 10}]
(let [result ((test-users/user->client :rasta) :post 202 "dataset/csv"
(let [result ((mt/user->client :rasta) :post 202 "dataset/csv"
:query (json/generate-string
{:database (data/id)
{:database (mt/id)
:type :query
:query {:source-table (data/id :venues)}
:query {:source-table (mt/id :venues)}
:middleware
{:add-default-userland-constraints? true
:userland-query? true}}))]
......@@ -279,16 +274,16 @@
(deftest non--download--queries-should-still-get-the-default-constraints
(with-redefs [constraints/default-query-constraints {:max-results 10, :max-results-bare-rows 10}]
(let [{row-count :row_count, :as result}
((test-users/user->client :rasta) :post 202 "dataset"
{:database (data/id)
((mt/user->client :rasta) :post 202 "dataset"
{:database (mt/id)
:type :query
:query {:source-table (data/id :venues)}})]
:query {:source-table (mt/id :venues)}})]
(is (= 10
(or row-count result))))))
(deftest check-permissions-test
(testing "make sure `POST /dataset` calls check user permissions"
(data/with-temp-copy-of-db
(mt/with-temp-copy-of-db
;; give all-users *partial* permissions for the DB, so we know we're checking more than just read permissions for
;; the Database
(perms/revoke-permissions! (group/all-users) (mt/id))
......@@ -322,8 +317,8 @@
:filter [:= $date "2015-11-13"]})))))
(testing "\nshould require that the user have ad-hoc native perms for the DB"
(tu.log/suppress-output
(data/with-temp-copy-of-db
(mt/suppress-output
(mt/with-temp-copy-of-db
;; Give All Users permissions to see the `venues` Table, but not ad-hoc native perms
(perms/revoke-permissions! (group/all-users) (mt/id))
(perms/grant-permissions! (group/all-users) (mt/id) "PUBLIC" (mt/id :venues))
......@@ -335,9 +330,9 @@
{:fields [$id $name]}))))))))))
(deftest report-timezone-test
(datasets/test-driver :postgres
(mt/test-driver :postgres
(testing "expected (desired) and actual timezone should be returned as part of query results"
(tu/with-temporary-setting-values [report-timezone "US/Pacific"]
(mt/with-temporary-setting-values [report-timezone "US/Pacific"]
(let [results ((mt/user->client :rasta) :post 202 "dataset" (mt/mbql-query checkins
{:aggregation [[:count]]}))]
(is (= {:requested_timezone "US/Pacific"
......
......@@ -15,6 +15,7 @@
[http-client :as http]
[models :refer [Card Dashboard DashboardCard DashboardCardSeries]]
[query-processor-test :as qp.test]
[test :as mt]
[util :as u]]
[metabase.api
[embed :as embed-api]
......@@ -23,7 +24,6 @@
[metabase.test
[data :as data]
[util :as tu]]
[metabase.test.util.log :as tu.log]
[toucan.db :as db]
[toucan.util.test :as tt])
(:import java.io.ByteArrayInputStream))
......@@ -202,7 +202,7 @@
(testing (str "...but if the card has an invalid query we should just get a generic \"query failed\" "
"exception (rather than leaking query info)")
(tu.log/suppress-output
(mt/suppress-output
(with-temp-card [card {:enable_embedding true, :dataset_query {:database (data/id)
:type :native
:native {:query "SELECT * FROM XYZ"}}}]
......@@ -405,7 +405,7 @@
(deftest generic-query-failed-exception-test
(testing (str "...but if the card has an invalid query we should just get a generic \"query failed\" exception "
"(rather than leaking query info)")
(tu.log/suppress-output
(mt/suppress-output
(with-embedding-enabled-and-new-secret-key
(with-temp-dashcard [dashcard {:dash {:enable_embedding true}
:card {:dataset_query (data/native-query {:query "SELECT * FROM XYZ"})}}]
......@@ -413,7 +413,6 @@
:error "An error occurred while running the query." }
(http/client :get 202 (dashcard-url dashcard)))))))))
(deftest check-that-the-dashcard-endpoint-doesn-t-work-if-embedding-isn-t-enabled
(tu/with-temporary-setting-values [enable-embedding false]
(with-new-secret-key
......
This diff is collapsed.
......@@ -203,36 +203,36 @@
Card [card-2 {:name "The card"
:description "Info"
:display :table}]]
(merge
pulse-defaults
{:name "A Pulse"
:creator_id (user->id :rasta)
:creator (user-details (fetch-user :rasta))
:cards (for [card [card-1 card-2]]
(assoc (pulse-card-details card)
:collection_id true))
:channels [(merge pulse-channel-defaults
{:channel_type "email"
:schedule_type "daily"
:schedule_hour 12
:recipients []})]
:collection_id true})
(card-api-test/with-cards-in-readable-collection [card-1 card-2]
(tt/with-temp Collection [collection]
(perms/grant-collection-readwrite-permissions! (perms-group/all-users) collection)
(tu/with-model-cleanup [Pulse]
(-> ((user->client :rasta) :post 200 "pulse" {:name "A Pulse"
:collection_id (u/get-id collection)
:cards [{:id (u/get-id card-1)
:include_csv false
:include_xls false}
(-> card-2
(select-keys [:id :name :description :display :collection_id])
(assoc :include_csv false, :include_xls false))]
:channels [daily-email-channel]
:skip_if_empty false})
pulse-response
(update :channels remove-extra-channels-fields))))))
(merge
pulse-defaults
{:name "A Pulse"
:creator_id (user->id :rasta)
:creator (user-details (fetch-user :rasta))
:cards (for [card [card-1 card-2]]
(assoc (pulse-card-details card)
:collection_id true))
:channels [(merge pulse-channel-defaults
{:channel_type "email"
:schedule_type "daily"
:schedule_hour 12
:recipients []})]
:collection_id true})
(card-api-test/with-cards-in-readable-collection [card-1 card-2]
(tt/with-temp Collection [collection]
(perms/grant-collection-readwrite-permissions! (perms-group/all-users) collection)
(tu/with-model-cleanup [Pulse]
(-> ((user->client :rasta) :post 200 "pulse" {:name "A Pulse"
:collection_id (u/get-id collection)
:cards [{:id (u/get-id card-1)
:include_csv false
:include_xls false}
(-> card-2
(select-keys [:id :name :description :display :collection_id])
(assoc :include_csv false, :include_xls false))]
:channels [daily-email-channel]
:skip_if_empty false})
pulse-response
(update :channels remove-extra-channels-fields))))))
;; Create a pulse with a csv and xls
(tt/expect-with-temp [Card [card-1]
......
(ns metabase.async.streaming-response-test
(:require [clj-http.client :as http]
[clojure.core.async :as a]
[clojure.test :refer :all]
[metabase
[config :as config]
[driver :as driver]
[http-client :as test-client]
[models :refer [Database]]
[test :as mt]]
[metabase.async.streaming-response :as streaming-response]
[metabase.async.streaming-response.thread-pool :as thread-pool]
[metabase.query-processor.context :as context])
(:import java.util.concurrent.Executors
org.apache.commons.lang3.concurrent.BasicThreadFactory$Builder))
(driver/register! ::test-driver)
(def ^:private canceled? (atom false))
(def ^:private thread-pool-size 5)
(defn- do-with-streaming-response-thread-pool [thunk]
(let [pool (Executors/newFixedThreadPool thread-pool-size
(.build
(doto (BasicThreadFactory$Builder.)
(.namingPattern "streaming-response-test-thread-pool-%d")
;; Daemon threads do not block shutdown of the JVM
(.daemon true))))]
(with-redefs [thread-pool/thread-pool (constantly pool)]
(try
(thunk)
(finally
(.shutdownNow pool))))))
(defmacro ^:private with-streaming-response-thread-pool {:style/indent 0} [& body]
`(do-with-streaming-response-thread-pool (fn [] ~@body)))
(defmacro ^:private with-test-driver-db {:style/indent 0} [& body]
`(mt/with-temp Database [db# {:engine ::test-driver}]
(mt/with-db db#
(with-streaming-response-thread-pool
~@body))))
(defmethod driver/execute-reducible-query ::test-driver
[_ {{{:keys [sleep]} :query} :native, database-id :database} context respond]
{:pre [(integer? sleep) (integer? database-id)]}
(let [futur (future
(Thread/sleep sleep)
(respond {:cols [{:name "Sleep", :base_type :type/Integer}]} [[sleep]]))]
(a/go
(when (a/<! (context/canceled-chan context))
(reset! canceled? true)
(future-cancel futur)))))
(defmethod driver/connection-properties ::test-driver
[& _]
[])
(deftest basic-test
(testing "Make sure our ::test-driver is working as expected"
(with-test-driver-db
(is (= [[10]]
(mt/rows
((mt/user->client :lucky)
:post 202 "dataset"
{:database (mt/id)
:type "native"
:native {:query {:sleep 10}}})))))))
(deftest truly-async-test
(testing "StreamingResponses should truly be asynchronous, and not block Jetty threads while waiting for results"
(with-test-driver-db
(let [max-threads (or (config/config-int :mb-jetty-maxthreads) 50)
num-requests (+ max-threads 20)
remaining (atom num-requests)
session-token (test-client/authenticate (mt/user->credentials :lucky))
url (test-client/build-url "dataset" nil)
request (test-client/build-request-map session-token
{:database (mt/id)
:type "native"
:native {:query {:sleep 2000}}})]
(testing (format "%d simultaneous queries" num-requests)
(dotimes [_ num-requests]
(future (http/post url request)))
(Thread/sleep 100)
(let [start-time-ms (System/currentTimeMillis)]
(is (= {:status "ok"} (test-client/client :get 200 "health")))
(testing "Health endpoint should complete before the first round of queries completes"
(is (> @remaining (inc (- num-requests thread-pool-size)))))
(testing "Health endpoint should complete in under 100ms regardless of how many queries are running"
(let [elapsed-ms (- (System/currentTimeMillis) start-time-ms)]
(is (< elapsed-ms 100))))))))))
(deftest newlines-test
(testing "Keepalive newlines should be written while waiting for a response."
(with-redefs [streaming-response/keepalive-interval-ms 50]
(with-test-driver-db
(is (re= #"(?s)^\n{3,}\{\"data\":.*$"
(:body (http/post (test-client/build-url "dataset" nil)
(test-client/build-request-map (mt/user->credentials :lucky)
{:database (mt/id)
:type "native"
:native {:query {:sleep 300}}})))))))))
(deftest cancelation-test
(testing "Make sure canceling a HTTP request ultimately causes the query to be canceled"
(with-redefs [streaming-response/keepalive-interval-ms 50]
(with-test-driver-db
(reset! canceled? false)
(let [futur (http/post (test-client/build-url "dataset" nil)
(assoc (test-client/build-request-map (mt/user->credentials :lucky)
{:database (mt/id)
:type "native"
:native {:query {:sleep 5000}}})
:async true)
identity
(fn [e] (throw e)))]
(Thread/sleep 100)
(future-cancel futur)
(Thread/sleep 100)
(is (= true
@canceled?)))))))
......@@ -100,7 +100,7 @@
;;; client
(defn- build-request-map [credentials http-body]
(defn build-request-map [credentials http-body]
(merge
{:accept :json
:headers {@#'mw.session/metabase-session-header
......
(ns metabase.query-processor.middleware.async-wait-test
(:require [clojure.core.async :as a]
[clojure.test :refer :all]
[metabase
[test :as mt]
[util :as u]]
[metabase.models.database :refer [Database]]
[metabase.query-processor.middleware.async-wait :as async-wait]
[metabase.test.util.async :as tu.async]
[toucan.util.test :as tt])
(:import java.util.concurrent.Executors
org.apache.commons.lang3.concurrent.BasicThreadFactory$Builder))
(def ^:private ^:dynamic *dynamic-var* false)
(defn- async-wait-bound-value
"Check the bound value of `*dynamic-var*` when a function is executed after the `async-wait` using the thread pool for
Database with `db-id`."
[db-id]
(let [bound-value (promise)]
(tu.async/with-open-channels [canceled-chan (a/promise-chan)]
(mt/test-qp-middleware
async-wait/wait-for-turn
{:database db-id}
{}
[]
{:chans {:canceled-chan canceled-chan}
:run (fn []
(deliver bound-value *dynamic-var*))}))
(u/deref-with-timeout bound-value 1000)))
(deftest sanity-check-test
(testing "basic sanity check: bound value of `*dynamic-var*` should be `false`"
(tt/with-temp Database [{db-id :id}]
(= false
(async-wait-bound-value db-id)))))
(deftest bindings-test
(testing "bound dynamic vars should get re-bound by in the async wait fn"
(tt/with-temp Database [{db-id :id}]
(binding [*dynamic-var* ::wow]
(is (= ::wow
(async-wait-bound-value db-id))))))
(testing "binding should not be persisted between executions -- should be reset when we reuse a thread"
(let [thread-pool (Executors/newSingleThreadExecutor
(.build
(doto (BasicThreadFactory$Builder.)
(.daemon true))))]
(with-redefs [async-wait/db-thread-pool (constantly thread-pool)]
(tt/with-temp Database [{db-id :id}]
(binding [*dynamic-var* true]
(async-wait-bound-value db-id))
(is (= false
(async-wait-bound-value db-id))))))))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment