diff --git a/.dir-locals.el b/.dir-locals.el index e737fa70faacb773e4359e53f360b915f37d6ec5..95050db9ef8d5593991c7844aebc97b9b80d04a7 100644 --- a/.dir-locals.el +++ b/.dir-locals.el @@ -2,7 +2,9 @@ ;; Specify which arg is the docstring for certain macros ;; (Add more as needed) (put 'defendpoint 'clojure-doc-string-elt 3) + (put 'defendpoint-async 'clojure-doc-string-elt 3) (put 'api/defendpoint 'clojure-doc-string-elt 3) + (put 'api/defendpoint-async 'clojure-doc-string-elt 3) (put 'defsetting 'clojure-doc-string-elt 2) (put 'setting/defsetting 'clojure-doc-string-elt 2) (put 's/defn 'clojure-doc-string-elt 2) @@ -13,7 +15,6 @@ (assert 1) (assoc 1) (ex-info 1) - (execute-sql! 2) (expect 0) (match 1) (merge-with 1) diff --git a/resources/log4j.properties b/resources/log4j.properties index 40bc26ba7cc3f1d45df31042cd7a84a8f7b004e8..d0b2c7b0c8738195a33fea8ac0406ca5ab656b30 100644 --- a/resources/log4j.properties +++ b/resources/log4j.properties @@ -25,6 +25,12 @@ log4j.logger.metabase.query-processor.permissions=INFO log4j.logger.metabase.query-processor=INFO log4j.logger.metabase.sync=DEBUG log4j.logger.metabase.models.field-values=INFO +# NOCOMMIT +log4j.logger.metabase.async.semaphore-channel=DEBUG +log4j.logger.metabase.async.util=DEBUG +log4j.logger.metabase.middleware.async=DEBUG +log4j.logger.metabase.query-processor.async=DEBUG +log4j.logger.metabase.async.api-response=DEBUG log4j.logger.metabase=INFO # c3p0 connection pools tend to log useless warnings way too often; only log actual errors log4j.logger.com.mchange=ERROR diff --git a/src/metabase/api/card.clj b/src/metabase/api/card.clj index 157fb19d037ed0b33f7584ad4f92c326f16b0c70..9a6466d00e90317460a3213483ffed46b8bb57e5 100644 --- a/src/metabase/api/card.clj +++ b/src/metabase/api/card.clj @@ -1,6 +1,7 @@ (ns metabase.api.card "/api/card endpoints." (:require [cheshire.core :as json] + [clojure.core.async :as a] [clojure.tools.logging :as log] [compojure.core :refer [DELETE GET POST PUT]] [medley.core :as m] @@ -13,6 +14,7 @@ [metabase.api [common :as api] [dataset :as dataset-api]] + [metabase.async.util :as async.u] [metabase.email.messages :as messages] [metabase.models [card :as card :refer [Card]] @@ -26,7 +28,7 @@ [view-log :refer [ViewLog]]] [metabase.models.query.permissions :as query-perms] [metabase.query-processor - [interface :as qpi] + [async :as qp.async] [util :as qputil]] [metabase.query-processor.middleware [cache :as cache] @@ -39,7 +41,8 @@ [toucan [db :as db] [hydrate :refer [hydrate]]]) - (:import java.util.UUID + (:import clojure.core.async.impl.channels.ManyToManyChannel + java.util.UUID metabase.models.card.CardInstance)) ;;; --------------------------------------------------- Hydration ---------------------------------------------------- @@ -181,31 +184,23 @@ ;; we'll also pass a simple checksum and have the frontend pass it back to us. See the QP `results-metadata` ;; middleware namespace for more details -(s/defn ^:private result-metadata-for-query :- qr/ResultsMetadata - "Fetch the results metadata for a QUERY by running the query and seeing what the QP gives us in return. - This is obviously a bit wasteful so hopefully we can avoid having to do this." - [query] - (binding [qpi/*disable-qp-logging* true] - (let [{:keys [status], :as results} (qp/process-query-without-save! api/*current-user-id* query)] - (if (= status :failed) - (log/error (trs "Error running query to determine Card result metadata:") - (u/pprint-to-str 'red results)) - (get-in results [:data :results_metadata :columns]))))) - -(s/defn ^:private result-metadata :- (s/maybe qr/ResultsMetadata) - "Get the right results metadata for this CARD. We'll check to see whether the METADATA passed in seems valid; - otherwise we'll run the query ourselves to get the right values." + + +(s/defn ^:private result-metadata-async :- ManyToManyChannel + "Get the right results metadata for this `card`, and return them in a channel. We'll check to see whether the + `metadata` passed in seems valid,and, if so, return a channel that returns the value as-is; otherwise, we'll run the + query ourselves to get the right values, and return a channel where you can listen for the results." [query metadata checksum] (let [valid-metadata? (and (results-metadata/valid-checksum? metadata checksum) (s/validate qr/ResultsMetadata metadata))] - (log/info (str "Card results metadata passed in to API is " - (cond - valid-metadata? "VALID. Thanks!" - metadata "INVALID. Running query to fetch correct metadata." - :else "MISSING. Running query to fetch correct metadata."))) + (log/info + (cond + valid-metadata? (trs "Card results metadata passed in to API is VALID. Thanks!") + metadata (trs "Card results metadata passed in to API is INVALID. Running query to fetch correct metadata.") + :else (trs "Card results metadata passed in to API is ISSING. Running query to fetch correct metadata."))) (if valid-metadata? - metadata - (result-metadata-for-query query)))) + (a/to-chan [metadata]) + (qp.async/result-metadata-for-query-async query)))) (defn check-data-permissions-for-query "Check that we have *data* permissions to run the QUERY in question." @@ -213,6 +208,48 @@ {:pre [(map? query)]} (api/check-403 (query-perms/can-run-query? query))) +(defn- save-new-card-async! + "Save `card-data` as a new Card on a separate thread. Returns a channel to fetch the response; closing this channel + will cancel the save." + [card-data] + (async.u/do-on-separate-thread + (fn [] + (let [card (db/transaction + ;; Adding a new card at `collection_position` could cause other cards in this + ;; collection to change position, check that and fix it if needed + (api/maybe-reconcile-collection-position! card-data) + (db/insert! Card card-data))] + (events/publish-event! :card-create card) + ;; include same information returned by GET /api/card/:id since frontend replaces the Card it + ;; currently has with returned one -- See #4283 + (hydrate card :creator :dashboard_count :can_write :collection))))) + +(defn- create-card-async! + "Create a new Card asynchronously. Returns a channel for fetching the newly created Card, or an Exception if one was + thrown. Closing this channel before it finishes will cancel the Card creation." + [{:keys [dataset_query result_metadata metadata_checksum], :as card-data}] + ;; `zipmap` instead of `select-keys` because we want to get `nil` values for keys that aren't present. Required by + ;; `api/maybe-reconcile-collection-position!` + (let [data-keys [:dataset_query :description :display :name + :visualization_settings :collection_id :collection_position] + card-data (assoc (zipmap data-keys (map card-data data-keys)) + :creator_id api/*current-user-id*) + result-metadata-chan (result-metadata-async dataset_query result_metadata metadata_checksum) + out-chan (a/chan 1)] + (a/go + (try + (let [card-data (assoc card-data :result_metadata (a/<! result-metadata-chan))] + (a/close! result-metadata-chan) + ;; now do the actual saving on a separate thread so we don't tie up our precious core.async thread. Pipe the + ;; result into `out-chan`. + (async.u/single-value-pipe (save-new-card-async! card-data) out-chan)) + (catch Throwable e + (a/put! out-chan e) + (a/close! e)))) + ;; Return a channel + out-chan)) + + (api/defendpoint POST "/" "Create a new `Card`." [:as {{:keys [collection_id collection_position dataset_query description display metadata_checksum name @@ -229,26 +266,8 @@ (check-data-permissions-for-query dataset_query) ;; check that we have permissions for the collection we're trying to save this card to, if applicable (collection/check-write-perms-for-collection collection_id) - ;; everything is g2g, now save the card - (let [card-data {:creator_id api/*current-user-id* - :dataset_query dataset_query - :description description - :display display - :name name - :visualization_settings visualization_settings - :collection_id collection_id - :collection_position collection_position - :result_metadata (result-metadata dataset_query result_metadata metadata_checksum)} - - card (db/transaction - ;; Adding a new card at `collection_position` could cause other cards in this - ;; collection to change position, check that and fix it if needed - (api/maybe-reconcile-collection-position! card-data) - (db/insert! Card card-data))] - (events/publish-event! :card-create card) - ;; include same information returned by GET /api/card/:id since frontend replaces the Card it currently has - ;; with returned one -- See #4283 - (hydrate card :creator :dashboard_count :can_write :collection))) + ;; Return a channel that can be used to fetch the results asynchronously + (create-card-async! body)) ;;; ------------------------------------------------- Updating Cards ------------------------------------------------- @@ -276,14 +295,19 @@ (api/check-superuser))) -(defn- result-metadata-for-updating - "If CARD's query is being updated, return the value that should be saved for `result_metadata`. This *should* be - passed in to the API; if so, verifiy that it was correct (the checksum is valid); if not, go fetch it. If the query - has not changed, this returns `nil`, which means the value won't get updated below." +(defn- result-metadata-for-updating-async + "If `card`'s query is being updated, return the value that should be saved for `result_metadata`. This *should* be + passed in to the API; if so, verifiy that it was correct (the checksum is valid); if not, go fetch it. If the query + has not changed, this returns a closed channel (so you will get `nil` when you attempt to fetch the result, and + will know not to update the value in the DB.) + + Either way, results are returned asynchronously on a channel." [card query metadata checksum] - (when (and query + (if (and query (not= query (:dataset_query card))) - (result-metadata query metadata checksum))) + (result-metadata-async query metadata checksum) + (u/prog1 (a/chan) + (a/close! <>)))) (defn- publish-card-update! "Publish an event appropriate for the update(s) done to this CARD (`:card-update`, or archiving/unarchiving @@ -379,11 +403,35 @@ :else nil))) +(defn- update-card-async! [{:keys [id], :as card-before-update} {:keys [archived], :as card-updates}] + ;; don't block our precious core.async thread, run the actual DB updates on a separate thread + (async.u/do-on-separate-thread + (fn [] + ;; Setting up a transaction here so that we don't get a partially reconciled/updated card. + (db/transaction + (api/maybe-reconcile-collection-position! card-before-update card-updates) + + ;; ok, now save the Card + (db/update! Card id + ;; `collection_id` and `description` can be `nil` (in order to unset them). Other values should only be + ;; modified if they're passed in as non-nil + (u/select-keys-when card-updates + :present #{:collection_id :collection_position :description} + :non-nil #{:dataset_query :display :name :visualization_settings :archived :enable_embedding + :embedding_params :result_metadata}))) + ;; Fetch the updated Card from the DB + (let [card (Card id)] + (delete-alerts-if-needed! card-before-update card) + (publish-card-update! card archived) + ;; include same information returned by GET /api/card/:id since frontend replaces the Card it currently + ;; has with returned one -- See #4142 + (hydrate card :creator :dashboard_count :can_write :collection))))) + (api/defendpoint PUT "/:id" "Update a `Card`." [id :as {{:keys [dataset_query description display name visualization_settings archived collection_id collection_position enable_embedding embedding_params result_metadata metadata_checksum] - :as card-updates} :body}] + :as card-updates} :body}] {name (s/maybe su/NonBlankString) dataset_query (s/maybe su/Map) display (s/maybe su/NonBlankString) @@ -403,29 +451,23 @@ (check-allowed-to-unarchive card-before-update card-updates) (check-allowed-to-change-embedding card-before-update card-updates) ;; make sure we have the correct `result_metadata` - (let [card-updates (assoc card-updates - :result_metadata (result-metadata-for-updating card-before-update dataset_query - result_metadata metadata_checksum))] - - ;; Setting up a transaction here so that we don't get a partially reconciled/updated card. - (db/transaction - (api/maybe-reconcile-collection-position! card-before-update card-updates) - - ;; ok, now save the Card - (db/update! Card id - ;; `collection_id` and `description` can be `nil` (in order to unset them). Other values should only be - ;; modified if they're passed in as non-nil - (u/select-keys-when card-updates - :present #{:collection_id :collection_position :description} - :non-nil #{:dataset_query :display :name :visualization_settings :archived :enable_embedding - :embedding_params :result_metadata})))) - ;; Fetch the updated Card from the DB - (let [card (Card id)] - (delete-alerts-if-needed! card-before-update card) - (publish-card-update! card archived) - ;; include same information returned by GET /api/card/:id since frontend replaces the Card it currently has with - ;; returned one -- See #4142 - (hydrate card :creator :dashboard_count :can_write :collection)))) + (let [result-metadata-chan (result-metadata-for-updating-async + card-before-update + dataset_query + result_metadata + metadata_checksum) + out-chan (a/chan 1)] + ;; asynchronously wait for our updated result metadata, then after that call `update-card-async!`, which is done + ;; on a non-core.async thread. Pipe the results of that into `out-chan`. + (a/go + (try + (let [card-updates (assoc card-updates :result_metadata (a/<! result-metadata-chan))] + (async.u/single-value-pipe + (update-card-async! card-before-update card-updates) + out-chan)) + (finally + (a/close! result-metadata-chan)))) + out-chan))) ;;; ------------------------------------------------- Deleting Cards ------------------------------------------------- @@ -563,8 +605,9 @@ (query-magic-ttl query)))] (assoc query :cache-ttl ttl))) -(defn run-query-for-card - "Run the query for Card with PARAMETERS and CONSTRAINTS, and return results in the usual format." +(defn run-query-for-card-async + "Run the query for Card with `parameters` and `constraints`, and return results in a core.async channel. Will throw an + Exception if preconditions (such as read perms) are not met before returning a channel." {:style/indent 1} [card-id & {:keys [parameters constraints context dashboard-id middleware] :or {constraints qp/default-query-constraints @@ -577,24 +620,24 @@ :card-id card-id :dashboard-id dashboard-id}] (api/check-not-archived card) - (qp/process-query-and-save-execution! query options))) + (qp.async/process-query-and-save-execution! query options))) (api/defendpoint POST "/:card-id/query" "Run the query associated with a Card." [card-id :as {{:keys [parameters ignore_cache], :or {ignore_cache false}} :body}] {ignore_cache (s/maybe s/Bool)} (binding [cache/*ignore-cached-results* ignore_cache] - (run-query-for-card card-id, :parameters parameters))) + (run-query-for-card-async card-id, :parameters parameters))) -(api/defendpoint POST "/:card-id/query/:export-format" +(api/defendpoint-async POST "/:card-id/query/:export-format" "Run the query associated with a Card, and return its results as a file in the specified format. Note that this expects the parameters as serialized JSON in the 'parameters' parameter" - [card-id export-format parameters] + [{{:keys [card-id export-format parameters]} :params} respond raise] {parameters (s/maybe su/JSONString) export-format dataset-api/ExportFormat} (binding [cache/*ignore-cached-results* true] - (dataset-api/as-format export-format - (run-query-for-card card-id + (dataset-api/as-format-async export-format respond raise + (run-query-for-card-async (Integer/parseUnsignedInt card-id) :parameters (json/parse-string parameters keyword) :constraints nil :context (dataset-api/export-format->context export-format) diff --git a/src/metabase/api/common.clj b/src/metabase/api/common.clj index aba0f74a231756506390493481dba114b293e70c..24d5a4666f0fe2240dabb16eb1bc281cbd7ef850 100644 --- a/src/metabase/api/common.clj +++ b/src/metabase/api/common.clj @@ -1,10 +1,6 @@ (ns metabase.api.common "Dynamic variables and utility functions/macros for writing API functions." - (:require [cheshire.core :as json] - [clojure.core.async :as async] - [clojure.core.async.impl.protocols :as async-proto] - [clojure.java.io :as io] - [clojure.string :as str] + (:require [clojure.string :as str] [clojure.tools.logging :as log] [compojure.core :as compojure] [honeysql.types :as htypes] @@ -17,11 +13,8 @@ [metabase.util [i18n :as ui18n :refer [trs tru]] [schema :as su]] - [ring.core.protocols :as protocols] - [ring.util.response :as response] [schema.core :as s] - [toucan.db :as db]) - (:import java.io.OutputStream)) + [toucan.db :as db])) (declare check-403 check-404) @@ -296,6 +289,27 @@ ~@validate-param-calls (wrap-response-if-needed (do ~@body))))))) +(defmacro defendpoint-async + "Like `defendpoint`, but generates an endpoint that accepts the usual `[request respond raise]` params." + {:arglists '([method route docstr? args schemas-map? & body])} + [method route & more] + (let [fn-name (route-fn-name method route) + route (typify-route route) + [docstr [args & more]] (u/optional string? more) + [arg->schema body] (u/optional (every-pred map? #(every? symbol? (keys %))) more) + validate-param-calls (validate-params arg->schema)] + (when-not docstr + (log/warn (trs "Warning: endpoint {0}/{1} does not have a docstring." (ns-name *ns*) fn-name))) + `(def ~(vary-meta fn-name assoc + ;; eval the vals in arg->schema to make sure the actual schemas are resolved so we can document + ;; their API error messages + :doc (route-dox method route docstr args (m/map-vals eval arg->schema) body) + :is-endpoint? true) + (~method ~route [] + (fn ~args + ~@validate-param-calls + ~@body))))) + (defn- namespace->api-route-fns "Return a sequence of all API endpoint functions defined by `defendpoint` in a namespace." [nmspace] @@ -378,115 +392,6 @@ [entity m] (check-403 (mi/can-create? entity m))) -;;; --------------------------------------------------- STREAMING ---------------------------------------------------- - -(def ^:private ^:const streaming-response-keep-alive-interval-ms - "Interval between sending newline characters to keep Heroku from terminating requests like queries that take a long - time to complete." - (* 1 1000)) - -;; Handle ring response maps that contain a core.async chan in the :body key: -;; -;; {:status 200 -;; :body (async/chan)} -;; -;; and send strings (presumibly \n) as heartbeats to the client until the real results (a seq) is received, then -;; stream that to the client -(extend-protocol protocols/StreamableResponseBody - clojure.core.async.impl.channels.ManyToManyChannel - (write-body-to-stream [output-queue _ ^OutputStream output-stream] - (log/debug (u/format-color 'green (trs "starting streaming request"))) - (with-open [out (io/writer output-stream)] - (loop [chunk (async/<!! output-queue)] - (cond - (char? chunk) - (do - (try - (.write out (str chunk)) - (.flush out) - (catch org.eclipse.jetty.io.EofException e - (log/info e (u/format-color 'yellow (trs "connection closed, canceling request"))) - (async/close! output-queue) - (throw e))) - (recur (async/<!! output-queue))) - - ;; An error has occurred, let the user know - (instance? Exception chunk) - (json/generate-stream {:error (.getMessage ^Exception chunk)} out) - - ;; We've recevied the response, write it to the output stream and we're done - (seq chunk) - (json/generate-stream chunk out) - - ;;chunk is nil meaning the output channel has been closed - :else - out))))) - -(def ^:private InvokeWithKeepAliveSchema - {;; Channel that contains any number of newlines followed by the results of the invoked query thunk - :output-channel (s/protocol async-proto/Channel) - ;; This channel will have an exception if that error condition is hit before the first heartbeat time, if a - ;; heartbeat has been sent, this channel is closed and its no longer useful - :error-channel (s/protocol async-proto/Channel) - ;; Future that is invoking the query thunk. This is mainly useful for testing metadata to see if the future has been - ;; cancelled or was completed successfully - :response-future java.util.concurrent.Future}) - -(s/defn ^:private invoke-thunk-with-keepalive :- InvokeWithKeepAliveSchema - "This function does the heavy lifting of invoking `query-thunk` on a background thread and returning it's results - along with a heartbeat while waiting for the results. This function returns a map that includes the relevate - execution information, see `InvokeWithKeepAliveSchema` for more information" - [query-thunk] - (let [response-chan (async/chan 1) - output-chan (async/chan 1) - error-chan (async/chan 1) - response-fut (future - (try - (async/>!! response-chan (query-thunk)) - (catch Exception e - (async/>!! error-chan e) - (async/>!! response-chan e)) - (finally - (async/close! error-chan))))] - (async/go-loop [] - (let [[response-or-timeout c] (async/alts! [response-chan (async/timeout streaming-response-keep-alive-interval-ms)])] - (if response-or-timeout - ;; We have a response since it's non-nil, write the results and close, we're done - (do - ;; If output-chan is closed, it's already too late, nothing else we need to do - (async/>! output-chan response-or-timeout) - (async/close! output-chan)) - (do - ;; We don't have a result yet, but enough time has passed, let's assume it's not an error - (async/close! error-chan) - ;; a newline padding character as it's harmless and will allow us to check if the client is connected. If - ;; sending this character fails because the connection is closed, the chan will then close. Newlines are - ;; no-ops when reading JSON which this depends upon. - (log/debug (u/format-color 'blue (trs "Response not ready, writing one byte & sleeping..."))) - (if (async/>! output-chan \newline) - ;; Success put the channel, wait and see if we get the response next time - (recur) - ;; The channel is closed, client has given up, we should give up too - (future-cancel response-fut)))))) - {:output-channel output-chan - :error-channel error-chan - :response-future response-fut})) - -(defn cancelable-json-response - "Invokes `cancelable-thunk` in a future. If there's an immediate exception, throw it. If there's not an immediate - exception, return a ring response with a channel. The channel will potentially include newline characters before the - full response is delivered as a keepalive to the client. Eventually the results of `cancelable-thunk` will be put - to the channel" - [cancelable-thunk] - (let [{:keys [output-channel error-channel]} (invoke-thunk-with-keepalive cancelable-thunk)] - ;; If there's an immediate exception, it will be in `error-chan`, if not, `error-chan` will close and we'll assume - ;; the response is a success - (if-let [ex (async/<!! error-channel)] - (throw ex) - (assoc (response/response output-channel) - :content-type "applicaton/json")))) - - ;;; ------------------------------------------------ OTHER HELPER FNS ------------------------------------------------ (defn check-public-sharing-enabled @@ -598,3 +503,19 @@ (do (reconcile-position-for-collection! old-collection-id old-position nil) (reconcile-position-for-collection! new-collection-id nil new-position)))))) + +(defmacro catch-and-raise + "Catches exceptions thrown in `body` and passes them along to the `raise` function. Meant for writing async + endpoints. + + You only need to `raise` Exceptions that happen outside the initial thread of the API endpoint function; things like + normal permissions checks are usually done within the same thread that called the endpoint, meaning the middleware + that catches Exceptions will automatically handle them." + {:style/indent 1} + ;; using 2+ args so we can catch cases where people forget to pass in `raise` + [raise body & more] + `(try + ~body + ~@more + (catch Throwable e# + (~raise e#)))) diff --git a/src/metabase/api/dataset.clj b/src/metabase/api/dataset.clj index 669d1f566685f394516c52e6a29c94900961b131..28024af1e50772a08a9c0a8599544788bab5092c 100644 --- a/src/metabase/api/dataset.clj +++ b/src/metabase/api/dataset.clj @@ -1,6 +1,7 @@ (ns metabase.api.dataset "/api/dataset endpoints." (:require [cheshire.core :as json] + [clojure.core.async :as a] [clojure.string :as str] [clojure.tools.logging :as log] [compojure.core :refer [POST]] @@ -10,13 +11,17 @@ [database :as database :refer [Database]] [query :as query]] [metabase.query-processor :as qp] - [metabase.query-processor.util :as qputil] + [metabase.query-processor + [async :as qp.async] + [util :as qputil]] [metabase.util [date :as du] [export :as ex] [i18n :refer [trs tru]] [schema :as su]] - [schema.core :as s])) + [schema.core :as s]) + (:import clojure.core.async.impl.channels.ManyToManyChannel)) + ;;; -------------------------------------------- Running a Query Normally -------------------------------------------- @@ -39,13 +44,10 @@ (when-not (= database database/virtual-id) (api/read-check Database database)) ;; add sensible constraints for results limits on our query - (let [source-card-id (query->source-card-id query)] - (api/cancelable-json-response - (fn [] - (qp/process-query-and-save-with-max! - query - {:executed-by api/*current-user-id*, :context :ad-hoc, - :card-id source-card-id, :nested? (boolean source-card-id)}))))) + (let [source-card-id (query->source-card-id query) + options {:executed-by api/*current-user-id*, :context :ad-hoc, + :card-id source-card-id, :nested? (boolean source-card-id)}] + (qp.async/process-query-and-save-with-max! query options))) ;;; ----------------------------------- Downloading Query Results in Other Formats ----------------------------------- @@ -95,8 +97,8 @@ (map (comp (swap-date-columns date-indexes) vec) rows) rows))) -(defn as-format - "Return a response containing the RESULTS of a query in the specified format." +(defn- as-format-response + "Return a response containing the `results` of a query in the specified format." {:style/indent 1, :arglists '([export-format results])} [export-format {{:keys [columns rows cols]} :data, :keys [status], :as response}] (api/let-404 [export-conf (ex/export-formats export-format)] @@ -110,6 +112,23 @@ {:status 500 :body (:error response)}))) +(s/defn as-format-async + "Write the results of an async query to API `respond` or `raise` functions in `export-format`. `in-chan` should be a + core.async channel that can be used to fetch the results of the query." + {:style/indent 3} + [export-format :- ExportFormat, respond :- (s/pred fn?), raise :- (s/pred fn?), in-chan :- ManyToManyChannel] + (a/go + (try + (let [results (a/<! in-chan)] + (if (instance? Throwable results) + (raise results) + (respond (as-format-response export-format results)))) + (catch Throwable e + (raise e)) + (finally + (a/close! in-chan)))) + nil) + (def export-format-regex "Regex for matching valid export formats (e.g., `json`) for queries. Inteneded for use in an endpoint definition: @@ -117,19 +136,20 @@ (api/defendpoint POST [\"/:export-format\", :export-format export-format-regex]" (re-pattern (str "(" (str/join "|" (keys ex/export-formats)) ")"))) -(api/defendpoint POST ["/:export-format", :export-format export-format-regex] +(api/defendpoint-async POST ["/:export-format", :export-format export-format-regex] "Execute a query and download the result data as a file in the specified format." - [export-format query] + [{{:keys [export-format query]} :params} respond raise] {query su/JSONString export-format ExportFormat} (let [{:keys [database] :as query} (json/parse-string query keyword)] (when-not (= database database/virtual-id) (api/read-check Database database)) - (as-format export-format - (qp/process-query-and-save-execution! (-> query - (dissoc :constraints) - (assoc-in [:middleware :skip-results-metadata?] true)) - {:executed-by api/*current-user-id*, :context (export-format->context export-format)})))) + (as-format-async export-format respond raise + (qp.async/process-query-and-save-execution! + (-> query + (dissoc :constraints) + (assoc-in [:middleware :skip-results-metadata?] true)) + {:executed-by api/*current-user-id*, :context (export-format->context export-format)})))) ;;; ------------------------------------------------ Other Endpoints ------------------------------------------------- @@ -141,8 +161,11 @@ (api/read-check Database database) ;; try calculating the average for the query as it was given to us, otherwise with the default constraints if ;; there's no data there. If we still can't find relevant info, just default to 0 - {:average (or (query/average-execution-time-ms (qputil/query-hash query)) - (query/average-execution-time-ms (qputil/query-hash (assoc query :constraints qp/default-query-constraints))) - 0)}) + {:average (or + (some (comp query/average-execution-time-ms qputil/query-hash) + [query + (assoc query :constraints qp/default-query-constraints)]) + 0)}) + (api/define-routes) diff --git a/src/metabase/api/embed.clj b/src/metabase/api/embed.clj index 7ad440fa3bf912b910f4ffebd35430188b5fb549..467d0bd5531addd74f09ed3beab74cedd55fcd89 100644 --- a/src/metabase/api/embed.clj +++ b/src/metabase/api/embed.clj @@ -208,15 +208,15 @@ (remove-locked-and-disabled-params (or embedding-params (db/select-one-field :embedding_params Card :id card-id)))))) -(defn run-query-for-card-with-params - "Run the query associated with Card with CARD-ID using JWT TOKEN-PARAMS, user-supplied URL QUERY-PARAMS, - an EMBEDDING-PARAMS whitelist, and additional query OPTIONS." +(defn run-query-for-card-with-params-async + "Run the query associated with Card with `card-id` using JWT `token-params`, user-supplied URL `query-params`, + an `embedding-params` whitelist, and additional query `options`. Returns channel for fetching the results." {:style/indent 0} [& {:keys [card-id embedding-params token-params query-params options]}] {:pre [(integer? card-id) (u/maybe? map? embedding-params) (map? token-params) (map? query-params)]} (let [parameter-values (validate-and-merge-params embedding-params token-params (normalize-query-params query-params)) parameters (apply-parameter-values (resolve-card-parameters card-id) parameter-values)] - (apply public-api/run-query-for-card-with-id card-id parameters, :context :embedded-question, options))) + (apply public-api/run-query-for-card-with-id-async card-id parameters, :context :embedded-question, options))) ;;; -------------------------- Dashboard Fns used by both /api/embed and /api/preview_embed -------------------------- @@ -233,7 +233,7 @@ (remove-locked-and-disabled-params (or embedding-params (db/select-one-field :embedding_params Dashboard, :id dashboard-id)))))) -(defn dashcard-results +(defn dashcard-results-async "Return results for running the query belonging to a DashboardCard." {:style/indent 0} [& {:keys [dashboard-id dashcard-id card-id embedding-params token-params query-params]}] @@ -242,7 +242,7 @@ (let [parameter-values (validate-and-merge-params embedding-params token-params (normalize-query-params query-params)) parameters (apply-parameter-values (resolve-dashboard-parameters dashboard-id dashcard-id card-id) parameter-values)] - (public-api/public-dashcard-results dashboard-id card-id parameters, :context :embedded-dashboard))) + (public-api/public-dashcard-results-async dashboard-id card-id parameters, :context :embedded-dashboard))) ;;; ------------------------------------- Other /api/embed-specific utility fns -------------------------------------- @@ -279,13 +279,13 @@ (card-for-unsigned-token unsigned, :constraints {:enable_embedding true}))) -(defn- run-query-for-unsigned-token +(defn- run-query-for-unsigned-token-async "Run the query belonging to Card identified by `unsigned-token`. Checks that embedding is enabled both globally and - for this Card." + for this Card. Returns core.async channel to fetch the results." [unsigned-token query-params & options] (let [card-id (eu/get-in-unsigned-token-or-throw unsigned-token [:resource :question])] (check-embedding-enabled-for-card card-id) - (run-query-for-card-with-params + (run-query-for-card-with-params-async :card-id card-id :token-params (eu/get-in-unsigned-token-or-throw unsigned-token [:params]) :embedding-params (db/select-one-field :embedding_params Card :id card-id) @@ -301,15 +301,15 @@ {:resource {:question <card-id>} :params <parameters>}" [token & query-params] - (run-query-for-unsigned-token (eu/unsign token) query-params)) + (run-query-for-unsigned-token-async (eu/unsign token) query-params)) -(api/defendpoint GET ["/card/:token/query/:export-format", :export-format dataset-api/export-format-regex] +(api/defendpoint-async GET ["/card/:token/query/:export-format", :export-format dataset-api/export-format-regex] "Like `GET /api/embed/card/query`, but returns the results as a file in the specified format." - [token export-format & query-params] + [{{:keys [token export-format]} :params, :keys [query-params]} respond raise] {export-format dataset-api/ExportFormat} - (dataset-api/as-format export-format - (run-query-for-unsigned-token (eu/unsign token) query-params, :constraints nil))) + (dataset-api/as-format-async export-format respond raise + (run-query-for-unsigned-token-async (eu/unsign token) (m/map-keys keyword query-params), :constraints nil))) ;;; ----------------------------------------- /api/embed/dashboard endpoints ----------------------------------------- @@ -328,7 +328,7 @@ -(defn- card-for-signed-token +(defn- card-for-signed-token-async "Fetch the results of running a Card belonging to a Dashboard using a JSON Web Token signed with the `embedding-secret-key`. @@ -343,7 +343,7 @@ (let [unsigned-token (eu/unsign token) dashboard-id (eu/get-in-unsigned-token-or-throw unsigned-token [:resource :dashboard])] (check-embedding-enabled-for-dashboard dashboard-id) - (dashcard-results + (dashcard-results-async :dashboard-id dashboard-id :dashcard-id dashcard-id :card-id card-id @@ -355,7 +355,7 @@ "Fetch the results of running a Card belonging to a Dashboard using a JSON Web Token signed with the `embedding-secret-key`" [token dashcard-id card-id & query-params] - (card-for-signed-token token dashcard-id card-id query-params )) + (card-for-signed-token-async token dashcard-id card-id query-params )) ;;; +----------------------------------------------------------------------------------------------------------------+ @@ -428,12 +428,17 @@ (public-api/dashboard-field-remapped-values dashboard-id field-id remapped-id value))) -(api/defendpoint GET ["/dashboard/:token/dashcard/:dashcard-id/card/:card-id/:export-format", - :export-format dataset-api/export-format-regex] +(api/defendpoint-async GET ["/dashboard/:token/dashcard/:dashcard-id/card/:card-id/:export-format" + :export-format dataset-api/export-format-regex] "Fetch the results of running a Card belonging to a Dashboard using a JSON Web Token signed with the `embedding-secret-key` return the data in one of the export formats" - [token export-format dashcard-id card-id & query-params] + [{{:keys [token export-format dashcard-id card-id]} :params, :keys [query-params]} respond raise] {export-format dataset-api/ExportFormat} - (dataset-api/as-format export-format (card-for-signed-token token dashcard-id card-id query-params ))) + (dataset-api/as-format-async export-format respond raise + (card-for-signed-token-async token + (Integer/parseUnsignedInt dashcard-id) + (Integer/parseUnsignedInt card-id) + (m/map-keys keyword query-params)))) + (api/define-routes) diff --git a/src/metabase/api/preview_embed.clj b/src/metabase/api/preview_embed.clj index 3d3ca312f596812bc9eff75bf6a9b0b8d46064d0..5aba5a037b8b549fe9a08d96c4c34cfa6c666df4 100644 --- a/src/metabase/api/preview_embed.clj +++ b/src/metabase/api/preview_embed.clj @@ -30,7 +30,7 @@ [token & query-params] (let [unsigned-token (check-and-unsign token) card-id (eu/get-in-unsigned-token-or-throw unsigned-token [:resource :question])] - (embed-api/run-query-for-card-with-params + (embed-api/run-query-for-card-with-params-async :card-id card-id :token-params (eu/get-in-unsigned-token-or-throw unsigned-token [:params]) :embedding-params (eu/get-in-unsigned-token-or-throw unsigned-token [:_embedding_params]) @@ -47,7 +47,7 @@ "Fetch the results of running a Card belonging to a Dashboard you're considering embedding with JWT TOKEN." [token dashcard-id card-id & query-params] (let [unsigned-token (check-and-unsign token)] - (embed-api/dashcard-results + (embed-api/dashcard-results-async :dashboard-id (eu/get-in-unsigned-token-or-throw unsigned-token [:resource :dashboard]) :dashcard-id dashcard-id :card-id card-id diff --git a/src/metabase/api/public.clj b/src/metabase/api/public.clj index d2a13332fced20132cb47601e0e1ad1cad7bb93e..45a178913671b4b3a2f62f49775dae82366395eb 100644 --- a/src/metabase/api/public.clj +++ b/src/metabase/api/public.clj @@ -1,6 +1,7 @@ (ns metabase.api.public "Metabase API endpoints for viewing publicly-accessible Cards and Dashboards." (:require [cheshire.core :as json] + [clojure.core.async :as a] [compojure.core :refer [GET]] [medley.core :as m] [metabase @@ -13,6 +14,7 @@ [dashboard :as dashboard-api] [dataset :as dataset-api] [field :as field-api]] + [metabase.async.util :as async.u] [metabase.mbql [normalize :as normalize] [util :as mbql.u]] @@ -64,27 +66,37 @@ (api/check-public-sharing-enabled) (card-with-uuid uuid)) -(defn run-query-for-card-with-id - "Run the query belonging to Card with CARD-ID with PARAMETERS and other query options (e.g. `:constraints`)." +(defn- transform-results [results] + (if (= (:status results) :failed) + ;; if the query failed instead of returning anything about the query just return a generic error message + (ex-info "An error occurred while running the query." {:status-code 400}) + (u/select-nested-keys + results + [[:data :columns :cols :rows :rows_truncated :insights] [:json_query :parameters] :error :status]))) + +(defn run-query-for-card-with-id-async + "Run the query belonging to Card with `card-id` with `parameters` and other query options (e.g. `:constraints`). + Returns core.async channel to fetch the results." {:style/indent 2} [card-id parameters & options] - (u/prog1 (-> ;; run this query with full superuser perms - (binding [api/*current-user-permissions-set* (atom #{"/"}) - qp/*allow-queries-with-no-executor-id* true] - (apply card-api/run-query-for-card card-id, :parameters parameters, :context :public-question, options)) - (u/select-nested-keys [[:data :columns :cols :rows :rows_truncated :insights] [:json_query :parameters] :error :status])) - ;; if the query failed instead of returning anything about the query just return a generic error message - (when (= (:status <>) :failed) - (throw (ex-info "An error occurred while running the query." {:status-code 400}))))) - -(defn- run-query-for-card-with-public-uuid - "Run query for a *public* Card with UUID. If public sharing is not enabled, this throws an exception." + ;; run this query with full superuser perms + (let [in-chan (binding [api/*current-user-permissions-set* (atom #{"/"}) + qp/*allow-queries-with-no-executor-id* true] + (apply card-api/run-query-for-card-async card-id, :parameters parameters, :context :public-question, options)) + out-chan (a/chan 1 (map transform-results))] + (async.u/single-value-pipe in-chan out-chan) + out-chan)) + +(defn- run-query-for-card-with-public-uuid-async + "Run query for a *public* Card with UUID. If public sharing is not enabled, this throws an exception. Returns channel + for fetching results." [uuid parameters & options] (api/check-public-sharing-enabled) - (apply run-query-for-card-with-id - (api/check-404 (db/select-one-id Card :public_uuid uuid, :archived false)) - parameters - options)) + (apply + run-query-for-card-with-id-async + (api/check-404 (db/select-one-id Card :public_uuid uuid, :archived false)) + parameters + options)) (api/defendpoint GET "/card/:uuid/query" @@ -92,16 +104,16 @@ credentials. Public sharing must be enabled." [uuid parameters] {parameters (s/maybe su/JSONString)} - (run-query-for-card-with-public-uuid uuid (json/parse-string parameters keyword))) + (run-query-for-card-with-public-uuid-async uuid (json/parse-string parameters keyword))) -(api/defendpoint GET "/card/:uuid/query/:export-format" +(api/defendpoint-async GET "/card/:uuid/query/:export-format" "Fetch a publicly-accessible Card and return query results in the specified format. Does not require auth credentials. Public sharing must be enabled." - [uuid export-format parameters] + [{{:keys [uuid export-format parameters]} :params}, respond raise] {parameters (s/maybe su/JSONString) export-format dataset-api/ExportFormat} - (dataset-api/as-format export-format - (run-query-for-card-with-public-uuid uuid (json/parse-string parameters keyword), :constraints nil))) + (dataset-api/as-format-async export-format respond raise + (run-query-for-card-with-public-uuid-async uuid (json/parse-string parameters keyword), :constraints nil))) @@ -219,15 +231,17 @@ :card_id card-id :dashboardcard_id [:in dashcard-ids]))))) -(defn public-dashcard-results - "Return the results of running a query with PARAMETERS for Card with CARD-ID belonging to Dashboard with - DASHBOARD-ID. Throws a 404 if the Card isn't part of the Dashboard." +(defn public-dashcard-results-async + "Return the results of running a query with `parameters` for Card with `card-id` belonging to Dashboard with + `dashboard-id`. Throws a 404 immediately if the Card isn't part of the Dashboard. + + Otherwise returns channel for fetching results." [dashboard-id card-id parameters & {:keys [context] :or {context :public-dashboard}}] (check-card-is-in-dashboard card-id dashboard-id) - (run-query-for-card-with-id card-id (resolve-params dashboard-id (if (string? parameters) - (json/parse-string parameters keyword) - parameters)) + (run-query-for-card-with-id-async card-id (resolve-params dashboard-id (if (string? parameters) + (json/parse-string parameters keyword) + parameters)) :context context, :dashboard-id dashboard-id)) (api/defendpoint GET "/dashboard/:uuid/card/:card-id" @@ -236,7 +250,7 @@ [uuid card-id parameters] {parameters (s/maybe su/JSONString)} (api/check-public-sharing-enabled) - (public-dashcard-results + (public-dashcard-results-async (api/check-404 (db/select-one-id Dashboard :public_uuid uuid, :archived false)) card-id parameters)) diff --git a/src/metabase/async/api_response.clj b/src/metabase/async/api_response.clj new file mode 100644 index 0000000000000000000000000000000000000000..0f6b1340ef375554eda7e23bff8311a15c777579 --- /dev/null +++ b/src/metabase/async/api_response.clj @@ -0,0 +1,173 @@ +(ns metabase.async.api-response + (:require [cheshire.core :as json] + [clojure.core.async :as a] + [clojure.java.io :as io] + [clojure.tools.logging :as log] + [compojure.response :refer [Sendable]] + [metabase.middleware.exceptions :as mw.exceptions] + [metabase.util :as u] + [metabase.util + [date :as du] + [i18n :as ui18n :refer [trs]]] + [ring.core.protocols :as ring.protocols] + [ring.util.response :as response]) + (:import clojure.core.async.impl.channels.ManyToManyChannel + [java.io OutputStream Writer] + java.util.concurrent.TimeoutException + org.eclipse.jetty.io.EofException)) + +(def ^:private keepalive-interval-ms + "Interval between sending newline characters to keep Heroku from terminating requests like queries that take a long + time to complete." + (* 1 1000)) + +(def ^:private absolute-max-keepalive-ms + "Absolute maximum amount of time to wait for a response to return results, instead of keeping the connection open + forever. Normally we'll eventually give up when a connection is closed, but if someone keeps the connection open + forever, or if there's a bug in the API code (and `respond` is never called, or a value is never written to the + channel it returns) give up after 4 hours." + ;; 4 hours + (* 4 60 60 1000)) + +;; Handle ring response maps that contain a core.async chan in the :body key: +;; +;; {:status 200 +;; :body (a/chan)} +;; +;; and send strings (presumibly \n) as heartbeats to the client until the real results (a seq) is received, then +;; stream that to the client +(defn- write-keepalive-character [^Writer out] + (try + ;; a newline padding character as it's harmless and will allow us to check if the client + ;; is connected. If sending this character fails because the connection is closed, the + ;; chan will then close. Newlines are no-ops when reading JSON which this depends upon. + (.write out (str \newline)) + (.flush out) + true + (catch EofException e + (log/debug e (u/format-color 'yellow (trs "connection closed, canceling request"))) + false) + (catch Throwable e + (log/error e (trs "Unexpected error writing keepalive characters")) + false))) + +;; `chunkk` named as such to avoid conflict with `clojure.core/chunk` +(defn- write-response-chunk [chunkk, ^Writer out] + (cond + ;; An error has occurred, let the user know + (instance? Throwable chunkk) + (json/generate-stream (:body (mw.exceptions/api-exception-response chunkk)) out) + + ;; We've recevied the response, write it to the output stream and we're done + (seqable? chunkk) + (json/generate-stream chunkk out) + + :else + (log/error (trs "Unexpected output in async API response") (class chunkk)))) + +(defn- write-channel-to-output-stream [chan, ^Writer out] + (a/go-loop [chunkk (a/<! chan)] + (cond + (= chunkk ::keepalive) + ;; keepalive chunkk + (if (write-keepalive-character out) + (recur (a/<! chan)) + (do + (a/close! chan) + (.close out))) + + ;; nothing -- `chan` is prematurely closed + (nil? chunkk) + (.close out) + + ;; otherwise we got an actual response. Do this on another thread so we don't block our precious core.async + ;; threads doing potentially long-running I/O + :else + (future + (try + ;; chunkk *might* be `nil` if the channel already go closed. + (write-response-chunk chunkk out) + (finally + ;; should already be closed, but just to be safe + (a/close! chan) + ;; close the writer so Ring knows the response is finished + (.close out)))))) + nil) + + +(extend-protocol ring.protocols/StreamableResponseBody + ManyToManyChannel + (write-body-to-stream [chan _ ^OutputStream output-stream] + (log/debug (u/format-color 'green (trs "starting streaming response"))) + (write-channel-to-output-stream chan (io/writer output-stream)))) + + +(defn- start-async-keepalive-loop + "Starts a go-loop that will send `::keepalive` messages to `output-chan` every second until `input-chan` either + produces a response or one of the two channels is closed. If `output-chan` is closed (because there's no longer + anywhere to write to -- the connection was canceled), closes `input-chan`; this can and is used by producers such as + the async QP to cancel whatever they're doing." + [input-chan output-chan] + (let [start-time-ms (System/currentTimeMillis)] + ;; Start the async loop to wait for the response/write messages to the output + (a/go-loop [] + ;; check whether input-chan is closed or has produced a value, or time out after a second + (let [[response chan] (a/alts! [input-chan (a/timeout keepalive-interval-ms)]) + elapsed-time-ms (- (System/currentTimeMillis) start-time-ms) + exceeded-absolute-max-keepalive? (> elapsed-time-ms absolute-max-keepalive-ms) + timed-out? (not= chan input-chan) + input-chan-closed? (and (= chan input-chan) + (nil? response)) + should-write-keepalive-byte? (and timed-out? (not exceeded-absolute-max-keepalive?))] + ;; if we hit a timeout before getting a response but haven't hit the `absolute-max-keepalive-ms` limit then + ;; attempt to write our byte. Recur if successful + (if (when should-write-keepalive-byte? + (log/debug (u/format-color 'blue (trs "Response not ready, writing one byte & sleeping..."))) + (a/>! output-chan ::keepalive)) + (recur) + ;; otherwise do the appropriate thing & then we're done here + (try + (cond + ;; if we attempted to write a keepalive byte but `>!` returned `nil`, that means output-chan is closed. + ;; Log a message, and the `finally` block will handle closing everything + should-write-keepalive-byte? + (log/debug (trs "Output chan closed, canceling keepalive request.")) + + ;; We have a response since it's non-nil, write the results, we're done + (some? response) + (do + ;; BTW if output-chan is closed, it's already too late, nothing else we need to do + (a/>! output-chan response) + (log/debug (u/format-color 'blue (trs "Async response finished, closing channels.")))) + + ;; Otherwise if we've been waiting longer than `absolute-max-keepalive-ms` it's time to call it quits + exceeded-absolute-max-keepalive? + (a/>! output-chan (TimeoutException. (str (trs "No response after waiting {0}. Canceling request." + (du/format-milliseconds absolute-max-keepalive-ms))))) + + ;; if input-chan was unexpectedly closed log a message to that effect and return an appropriate error + ;; rather than letting people wait forever + input-chan-closed? + (do + (log/error (trs "Input channel unexpectedly closed.")) + (a/>! output-chan (InterruptedException. (str (trs "Input channel unexpectedly closed.")))))) + (finally + (a/close! output-chan) + (a/close! input-chan)))))))) + +(defn- async-keepalive-chan [input-chan] + ;; Output chan only needs to hold on to the last message it got, for example no point in writing multiple `\n` + ;; characters if the consumer didn't get a chance to consume them, and no point writing `\n` before writing the + ;; actual response + (let [output-chan (a/chan (a/sliding-buffer 1))] + (start-async-keepalive-loop input-chan output-chan) + output-chan)) + +(defn- async-keepalive-response [input-chan] + (assoc (response/response (async-keepalive-chan input-chan)) + :content-type "applicaton/json; charset=utf-8")) + +(extend-protocol Sendable + ManyToManyChannel + (send* [input-chan _ respond _] + (respond (async-keepalive-response input-chan)))) diff --git a/src/metabase/async/semaphore_channel.clj b/src/metabase/async/semaphore_channel.clj new file mode 100644 index 0000000000000000000000000000000000000000..d0f7dbf640c710155bc81a4cb8455611de768977 --- /dev/null +++ b/src/metabase/async/semaphore_channel.clj @@ -0,0 +1,123 @@ +(ns metabase.async.semaphore-channel + (:require [clojure.core.async :as a] + [clojure.tools.logging :as log] + [metabase.async.util :as async.u] + [metabase.util.i18n :refer [trs]]) + (:import java.io.Closeable + java.util.concurrent.Semaphore)) + +(defn- permit-handle + "Object that can holds on to a permit for a Semaphore. Can be closed with `.close`, and thus, used with `with-open`; + also returns the permit upon finalization if not already returned." + ^Closeable [^Semaphore semaphore, id] + (let [closed? (atom false) + close! (fn [] + (when (compare-and-set! closed? false true) + (.release semaphore)))] + (reify + Object + (toString [_] + (format "Permit #%d" id)) ; ID is a simple per-channel counter mainly for debugging purposes + (finalize [_] + (close!)) + Closeable + (close [_] + (close!))))) + +(defn- notifying-semaphore + "When a permit is released via Semaphore.release() we'll send a message to the `notify-released-chan`. This is a + signal to the go-loop in the code below below to resume and try to acquire more permits from the semaphore." + ^Semaphore [num-permits notify-released-chan] + (proxy [Semaphore] [num-permits] + (release [] + ;; Release the permit ASAP. (Add tag to proxy anaphor `this`, otherwise we get reflection warnings) + (let [^Semaphore this this] + (proxy-super release)) + ;; Then send the message right away to let the go-loop know a permit is available. + (a/>!! notify-released-chan ::released)))) + +(defn semaphore-channel + "Creates a core.async channel that manages a counting Semaphore with `num-permits`. Takes from this channel will block + until a permit is available; the object taken is a special 'permit handle' that implements `Closeable`; hold on to + it with `with-open` or close it with `.close` to return the permit when finished with it." + [^Integer num-permits] + (let [permits-chan (a/chan) + ;; We only need one such 'release' notification at any given moment to let the loop know to resume so we can + ;; go ahead and make this channel a dropping buffer that will drop any additional messages. + notify-released-chan (a/chan (a/dropping-buffer 1)) + semaphore (notifying-semaphore num-permits notify-released-chan)] + ;; start the loop that will deliver permits + (a/go-loop [next-id 1] + (if (.tryAcquire semaphore) + ;; If the semaphore has a permit available right away, send a new `permit-handle` to `permits-chan`. Since + ;; that channel has no buffer this loop will park until someone is there to take it. Recur unless the + ;; permits-chan is closed. + (if (a/>! permits-chan (permit-handle semaphore next-id)) + (recur (inc next-id)) + (a/close! notify-released-chan)) + ;; Otherwise if no permit is available, wait for a notification on `notify-released-chan`, then recur and try + ;; again, unless channel is closed + (when (a/<! notify-released-chan) + (recur next-id)))) + ;; return a channel to get permits on + permits-chan)) + + +;;; ------------------------------------------- do-after-receiving-permit -------------------------------------------- + +(def ^:private ^:dynamic *permits* + "Map of semaphore channel -> obtained permit for the current and child thread[s]. Used so we can skip obtaining a + second permit if this thread already has one." + {}) + +(defn- do-f-with-permit + "Once a `permit` is obtained, execute `(apply f args)`, writing the results to `output-chan`, and returning the permit + no matter what." + [^Closeable permit out-chan f & args] + (try + (let [f (fn [] + (with-open [permit permit] + (try + (apply f args) + (catch Throwable e + e) + (finally + (log/debug (trs "f finished, permit will be returned"))))))] + (a/go + (let [canceled-chan (async.u/single-value-pipe (async.u/do-on-separate-thread f) out-chan)] + (when (a/<! canceled-chan) + (log/debug (trs "request canceled, permit will be returned")) + (.close permit))))) + (catch Throwable e + (log/error e (trs "Unexpected error attempting to run function after obtaining permit")) + (a/>! out-chan e) + (.close permit)))) + +(defn- do-after-waiting-for-new-permit [semaphore-chan f & args] + (let [out-chan (a/chan 1)] + ;; fire off a go block to wait for a permit. + (a/go + (let [[permit first-done] (a/alts! [semaphore-chan out-chan])] + (binding [*permits* (assoc *permits* semaphore-chan permit)] + ;; If out-chan closes before we get a permit, there's nothing for us to do here. Otherwise if we got our + ;; permit then proceed + (if (= first-done out-chan) + (log/debug (trs "Not running pending function call: output channel already closed.")) + ;; otherwise if channel is still open run the function + (apply do-f-with-permit permit out-chan f args))))) + ;; return `out-chan` which can be used to wait for results + out-chan)) + +(defn do-after-receiving-permit + "Run `(apply f args)` asynchronously after receiving a permit from `semaphore-chan`. Returns a channel from which you + can fetch the results. Closing this channel before results are produced will cancel the function call." + {:style/indent 1} + [semaphore-chan f & args] + ;; check and see whether we already have a permit for `semaphore-chan`, if so, go ahead and run the function right + ;; away instead of waiting for *another* permit + (if (get *permits* semaphore-chan) + (do + (log/debug (trs "Current thread already has a permit for {0}, will not wait to acquire another" semaphore-chan)) + (async.u/do-on-separate-thread f)) + ;; otherwise wait for a permit + (apply do-after-waiting-for-new-permit semaphore-chan f args))) diff --git a/src/metabase/async/util.clj b/src/metabase/async/util.clj new file mode 100644 index 0000000000000000000000000000000000000000..4a7e671038b344b85cd9a30a3da5e48fce11befd --- /dev/null +++ b/src/metabase/async/util.clj @@ -0,0 +1,65 @@ +(ns metabase.async.util + (:require [clojure.core.async :as a] + [clojure.tools.logging :as log] + [metabase.util.i18n :refer [trs]] + [schema.core :as s]) + (:import clojure.core.async.impl.channels.ManyToManyChannel)) + +(s/defn single-value-pipe :- ManyToManyChannel + "Pipe that will forward a single message from `in-chan` to `out-chan`, closing both afterward. If `out-chan` is closed + before `in-chan` produces a value, closes `in-chan`; this can be used to automatically cancel QP requests and the + like. + + Returns a channel that will send a single message when such early-closing cancelation occurs. You can listen for + this message to implement special cancelation behavior, such as canceling async jobs. This channel automatically + closes when either `in-chan` or `out-chan` closes." + [in-chan :- ManyToManyChannel, out-chan :- ManyToManyChannel] + (let [canceled-chan (a/chan 1)] + ;; fire off a block that will wait for either in-chan to produce a result or out-chan to be closed + (a/go + (try + (let [[result first-finished-chan] (a/alts! [in-chan out-chan])] + (if (and (= first-finished-chan in-chan) + (some? result)) + ;; If `in-chan` (e.g. fn call result) finishes first and receives a result, forward result to `out-chan` + (a/>! out-chan result) + ;; Otherwise one of the two channels was closed (e.g. query cancelation) before `in-chan` returned a + ;; result (e.g. QP result), pass a message to `canceled-chan`; `finally` block will close all three channels + (a/>! canceled-chan ::canceled))) + ;; Either way, close whichever of the channels is still open just to be safe + (finally + (a/close! out-chan) + (a/close! in-chan) + (a/close! canceled-chan)))) + ;; return the canceled chan in case someone wants to listen to it + canceled-chan)) + +(defn do-on-separate-thread + "Run `(apply f args)` on a separate thread, returns a channel to fetch the results. Closing this channel early will + cancel the future running the function, if possible." + [f & args] + (let [in-chan (a/chan 1) + out-chan (a/chan 1) + canceled-chan (single-value-pipe in-chan out-chan) + ;; Run `f` on a separarate thread because it's a potentially long-running QP query and we don't want to tie + ;; up precious core.async threads + futur + (future + (if-not (= ::open (first (a/alts!! [out-chan] :default ::open))) + (log/debug (trs "Output channel closed, will skip running {0}." f)) + (do + (log/debug (trs "Running {0} on separate thread..." f)) + (try + (let [result (apply f args)] + (a/put! in-chan result)) + ;; if we catch an Exception (shouldn't happen in a QP query, but just in case), send it to `chan`. It's ok, + ;; our IMPL of Ring `StreamableResponseBody` will do the right thing with it. + (catch Throwable e + (log/error e (trs "Caught error running {0}" f)) + (a/put! in-chan e))))))] + (a/go + (when-let [canceled (a/<! canceled-chan)] + (log/debug (trs "Request canceled, canceling future")) + (future-cancel futur))) + + out-chan)) diff --git a/src/metabase/db.clj b/src/metabase/db.clj index d757df7ec0aa39c16eb961e7be53a8765b2bba72..9e7a49c701e051c25aa07dd11f30aa13e475de91 100644 --- a/src/metabase/db.clj +++ b/src/metabase/db.clj @@ -98,8 +98,11 @@ (u/format-color 'red (str (trs "WARNING: Using Metabase with an H2 application database is not recomended for production deployments.") + " " (trs "For production deployments, we highly recommend using Postgres, MySQL, or MariaDB instead.") + " " (trs "If you decide to continue to use H2, please be sure to back up the database file regularly.") + " " (trs "See https://metabase.com/docs/latest/operations-guide/start.html#migrating-from-using-the-h2-database-to-mysql-or-postgres for more information."))))) (or @connection-string-details (case (db-type) diff --git a/src/metabase/driver/sql_jdbc/execute.clj b/src/metabase/driver/sql_jdbc/execute.clj index 7eeb88bfcd4aeee7da94c238805d501847cc09f6..a518e65df497781a4f15e3a57e7486ecb28178e6 100644 --- a/src/metabase/driver/sql_jdbc/execute.clj +++ b/src/metabase/driver/sql_jdbc/execute.clj @@ -1,3 +1,4 @@ + (ns metabase.driver.sql-jdbc.execute "Code related to actually running a SQL query against a JDBC database (including setting the session timezone when appropriate), and for properly encoding/decoding types going in and out of the database." diff --git a/src/metabase/handler.clj b/src/metabase/handler.clj index 17ce2f7803225a8e7f1a25cf1d4e149f2f87d21e..8b9e9e3bca9a054010b5d5eea9cec0ab00121199 100644 --- a/src/metabase/handler.clj +++ b/src/metabase/handler.clj @@ -14,6 +14,10 @@ [keyword-params :refer [wrap-keyword-params]] [params :refer [wrap-params]]])) +;; required here because this namespace is not actually used anywhere but we need it to be loaded because it adds +;; impls for handling `core.async` channels as web server responses +(require 'metabase.async.api-response) + (def app "The primary entry point to the Ring HTTP server." ;; ▼▼▼ POST-PROCESSING ▼▼▼ happens from TOP-TO-BOTTOM @@ -36,6 +40,5 @@ mw.misc/bind-user-locale ; Binds *locale* for i18n wrap-cookies ; Parses cookies in the request map and assocs as :cookies mw.misc/add-content-type ; Adds a Content-Type header for any response that doesn't already have one - mw.misc/wrap-gzip ; GZIP response if client can handle it - )) + mw.misc/wrap-gzip)) ; GZIP response if client can handle it ;; ▲▲▲ PRE-PROCESSING ▲▲▲ happens from BOTTOM-TO-TOP diff --git a/src/metabase/middleware/exceptions.clj b/src/metabase/middleware/exceptions.clj index 75ad339b1601b05c82ea973016b9d17455028438..1f8f8411f6a8b370b102f4b18cdf60ba17a22eb8 100644 --- a/src/metabase/middleware/exceptions.clj +++ b/src/metabase/middleware/exceptions.clj @@ -3,10 +3,11 @@ (:require [clojure.java.jdbc :as jdbc] [clojure.string :as str] [clojure.tools.logging :as log] - [metabase.middleware.security :as middleware.security] + [metabase.middleware.security :as mw.security] [metabase.util :as u] [metabase.util.i18n :as ui18n :refer [trs]]) - (:import java.sql.SQLException)) + (:import java.sql.SQLException + org.eclipse.jetty.io.EofException)) (defn genericize-exceptions "Catch any exceptions thrown in the request handler body and rethrow a generic 400 exception instead. This minimizes @@ -34,9 +35,12 @@ (catch Throwable e (raise e)))))) -(defn- api-exception-response +(defmulti api-exception-response "Convert an exception from an API endpoint into an appropriate HTTP response." - [^Throwable e] + {:arglists '([e])} + class) + +(defmethod api-exception-response Throwable [^Throwable e] (let [{:keys [status-code], :as info} (ex-data e) @@ -62,20 +66,24 @@ ;; Otherwise it's a 500. Return a body that includes exception & filtered ;; stacktrace for debugging purposes :else - (let [stacktrace (u/filtered-stacktrace e)] - (merge - (assoc other-info - :message message - :type (class e) - :stacktrace stacktrace) - (when (instance? SQLException e) - {:sql-exception-chain - (str/split (with-out-str (jdbc/print-sql-exception-chain e)) - #"\s*\n\s*")}))))] + (assoc other-info + :message message + :type (class e) + :stacktrace (u/filtered-stacktrace e)))] {:status (or status-code 500) - :headers (middleware.security/security-headers) + :headers (mw.security/security-headers) :body body})) +(defmethod api-exception-response SQLException [e] + (-> + ((get-method api-exception-response (.getSuperclass SQLException)) e) + (assoc-in [:body :sql-exception-chain] (str/split (with-out-str (jdbc/print-sql-exception-chain e)) + #"\s*\n\s*")))) + +(defmethod api-exception-response EofException [e] + (log/info (trs "Request canceled before finishing.")) + {:status-code 204, :body nil, :headers (mw.security/security-headers)}) + (defn catch-api-exceptions "Middleware that catches API Exceptions and returns them in our normal-style format rather than the Jetty 500 Stacktrace page, which is not so useful for our frontend." @@ -91,8 +99,16 @@ "Middleware that catches any unexpected Exceptions that reroutes them thru `raise` where they can be handled appropriately." [handler] - (fn [request response raise] + (fn [request respond raise] (try - (handler request response raise) + (handler + request + ;; for people that accidentally pass along an Exception, e.g. from qp.async, do the nice thing and route it to + ;; the write place for them + (fn [response] + ((if (instance? Throwable response) + raise + respond) response)) + raise) (catch Throwable e (raise e))))) diff --git a/src/metabase/middleware/log.clj b/src/metabase/middleware/log.clj index 47a0c7ab9564145affbbbf5d4bea1f8506c29101..68f9a9accee6a9547e0c87816f00f6393508caa9 100644 --- a/src/metabase/middleware/log.clj +++ b/src/metabase/middleware/log.clj @@ -39,6 +39,7 @@ (.toUpperCase (name request-method)) uri status elapsed-time db-call-count (when stats? (jetty-stats-coll (jetty-stats)))) + (format " active threads: %d" (Thread/activeCount)) ;; only print body on error so we don't pollute our environment by over-logging (when (and error? (or (string? body) (coll? body))) diff --git a/src/metabase/models/dashboard.clj b/src/metabase/models/dashboard.clj index 39937027bbd8db628795311ae057acb9f92ef854..3787ec8bf2d6e836d44bac58090fdcc1465bece3 100644 --- a/src/metabase/models/dashboard.clj +++ b/src/metabase/models/dashboard.clj @@ -3,11 +3,11 @@ [data :refer [diff]] [set :as set] [string :as str]] + [clojure.core.async :as a] [clojure.tools.logging :as log] [metabase [events :as events] [public-settings :as public-settings] - [query-processor :as qp] [util :as u]] [metabase.automagic-dashboards.populate :as magic.populate] [metabase.models @@ -20,7 +20,7 @@ [permissions :as perms] [revision :as revision]] [metabase.models.revision.diff :refer [build-sentence]] - [metabase.query-processor.interface :as qpi] + [metabase.query-processor.async :as qp.async] [metabase.util.i18n :as ui18n] [toucan [db :as db] @@ -217,11 +217,11 @@ (update-field-values-for-on-demand-dbs! old-param-field-ids new-param-field-ids)))) +;; TODO - we need to actually make this async, but then we'd need to make `save-card!` async, and so forth (defn- result-metadata-for-query "Fetch the results metadata for a `query` by running the query and seeing what the `qp` gives us in return." [query] - (binding [qpi/*disable-qp-logging* true] - (get-in (qp/process-query query) [:data :results_metadata :columns]))) + (a/<!! (qp.async/result-metadata-for-query-async query))) (defn- save-card! [card] diff --git a/src/metabase/query_processor.clj b/src/metabase/query_processor.clj index 6e1dfaa9f21c6b1efb7989292bf59f70cb8e9d8d..fd64e52000b294e59ee98689760fe28332641ace 100644 --- a/src/metabase/query_processor.clj +++ b/src/metabase/query_processor.clj @@ -14,7 +14,6 @@ [metabase.query-processor.middleware [add-dimension-projections :as add-dim] [add-implicit-clauses :as implicit-clauses] - [add-query-throttle :as query-throttle] [add-row-count-and-status :as row-count-and-status] [add-settings :as add-settings] [annotate :as annotate] @@ -51,7 +50,7 @@ [metabase.query-processor.util :as qputil] [metabase.util [date :as du] - [i18n :refer [tru]]] + [i18n :refer [trs tru]]] [schema.core :as s] [toucan.db :as db])) @@ -59,12 +58,12 @@ ;;; | QUERY PROCESSOR | ;;; +----------------------------------------------------------------------------------------------------------------+ -(defn- execute-query +(s/defn ^:private execute-query "The pivotal stage of the `process-query` pipeline where the query is actually executed by the driver's Query Processor methods. This function takes the fully pre-processed query, runs it, and returns the results, which then run through the various post-processing steps." - [query] - {:pre [(map? query) (:driver query)]} + [query :- {:driver s/Keyword + s/Keyword s/Any}] (driver/execute-query (:driver query) query)) ;; The way these functions are applied is actually straight-forward; it matches the middleware pattern used by @@ -142,7 +141,6 @@ resolve-database/resolve-database fetch-source-query/fetch-source-query store/initialize-store - query-throttle/maybe-add-query-throttle log-query/log-initial-query ;; TODO - bind `*query*` here ? cache/maybe-return-cached-results @@ -344,13 +342,11 @@ (assert-query-status-successful result) (save-and-return-successful-query! query-execution result)) (catch Throwable e - (if (= (:type (ex-data e)) ::query-throttle/concurrent-query-limit-reached) - (throw e) - (do - (log/warn (u/format-color 'red "Query failure: %s\n%s" - (.getMessage e) - (u/pprint-to-str (u/filtered-stacktrace e)))) - (save-and-return-failed-query! query-execution e))))))) + (log/warn (u/format-color 'red (trs "Query failure") + (.getMessage e) + "\n" + (u/pprint-to-str (u/filtered-stacktrace e)))) + (save-and-return-failed-query! query-execution e))))) (s/defn ^:private assoc-query-info [query, options :- mbql.s/Info] (assoc query :info (assoc options diff --git a/src/metabase/query_processor/async.clj b/src/metabase/query_processor/async.clj new file mode 100644 index 0000000000000000000000000000000000000000..b829c6c3d7c29eb6d145b7d78139daab8c0dcb27 --- /dev/null +++ b/src/metabase/query_processor/async.clj @@ -0,0 +1,115 @@ +(ns metabase.query-processor.async + "Async versions of the usual public query processor functions. Instead of blocking while the query is ran, these + functions all return a `core.async` channel that can be used to fetch the results when they become available. + + Each connected database is limited to a maximum of 15 simultaneous queries (configurable) using these methods; any + additional queries will park the thread. Super-useful for writing high-performance API endpoints. Prefer these + methods to the old-school synchronous versions. + + How is this achieved? For each Database, we'll maintain a channel that acts as a counting semaphore; the channel + will initially contain 15 permits. Each incoming request will asynchronously read from the channel until it acquires + a permit, then put it back when it finishes." + (:require [clojure.core.async :as a] + [clojure.tools.logging :as log] + [metabase + [query-processor :as qp] + [util :as u]] + [metabase.api.common :as api] + [metabase.async + [semaphore-channel :as semaphore-channel] + [util :as async.u]] + [metabase.models.setting :refer [defsetting]] + [metabase.query-processor.interface :as qpi] + [metabase.util.i18n :refer [trs]] + [schema.core :as s]) + (:import clojure.core.async.impl.channels.ManyToManyChannel)) + +(defsetting max-simultaneous-queries-per-db + (trs "Maximum number of simultaneous queries to allow per connected Database.") + :type :integer + :default 15) + + +(defonce ^:private db-semaphore-channels (atom {})) + +(defn- fetch-db-semaphore-channel + "Fetch the counting semaphore channel for a Database, creating it if not already created." + [database-or-id] + (let [id (u/get-id database-or-id)] + (or + ;; channel already exists + (@db-semaphore-channels id) + ;; channel does not exist, Create a channel and stick it in the atom + (let [ch (semaphore-channel/semaphore-channel (max-simultaneous-queries-per-db)) + new-ch ((swap! db-semaphore-channels update id #(or % ch)) id)] + ;; ok, if the value swapped into the atom was a different channel (another thread beat us to it) then close our + ;; newly created channel + (when-not (= ch new-ch) + (a/close! ch)) + ;; return the newly created channel + new-ch)))) + +(defn- do-async + "Execute `f` asynchronously, waiting to receive a permit from `db`'s semaphore channel before proceeding. Returns the + results in a channel." + [db f & args] + (let [semaphore-chan (fetch-db-semaphore-channel db)] + (apply semaphore-channel/do-after-receiving-permit semaphore-chan f args))) + +(defn process-query + "Async version of `metabase.query-processor/process-query`. Runs query asynchronously, and returns a `core.async` + channel that can be used to fetch the results once the query finishes running. Closing the channel will cancel the + query." + [query] + (do-async (:database query) qp/process-query query)) + +(defn process-query-and-save-execution! + "Async version of `metabase.query-processor/process-query-and-save-execution!`. Runs query asynchronously, and returns + a `core.async` channel that can be used to fetch the results once the query finishes running. Closing the channel + will cancel the query." + [query options] + (do-async (:database query) qp/process-query-and-save-execution! query options)) + +(defn process-query-and-save-with-max! + "Async version of `metabase.query-processor/process-query-and-save-with-max!`. Runs query asynchronously, and returns + a `core.async` channel that can be used to fetch the results once the query finishes running. Closing the channel + will cancel the query." + [query options] + (do-async (:database query) qp/process-query-and-save-with-max! query options)) + +(defn process-query-without-save! + "Async version of `metabase.query-processor/process-query-without-save!`. Runs query asynchronously, and returns a + `core.async` channel that can be used to fetch the results once the query finishes running. Closing the channel will + cancel the query." + [user query] + (do-async (:database query) qp/process-query-without-save! user query)) + + +;;; ------------------------------------------------ Result Metadata ------------------------------------------------- + +(defn- transform-result-metadata-query-results [{:keys [status], :as results}] + (when (= status :failed) + (log/error (trs "Error running query to determine Card result metadata:") + (u/pprint-to-str 'red results))) + (or (get-in results [:data :results_metadata :columns]) + [])) + +(s/defn result-metadata-for-query-async :- ManyToManyChannel + "Fetch the results metadata for a `query` by running the query and seeing what the QP gives us in return. + This is obviously a bit wasteful so hopefully we can avoid having to do this. Returns a channel to get the + results." + [query] + (let [out-chan (a/chan 1 (map transform-result-metadata-query-results))] + ;; set up a pipe to get the async QP results and pipe them thru to out-chan + (async.u/single-value-pipe + (binding [qpi/*disable-qp-logging* true] + (process-query-without-save! + api/*current-user-id* + ;; for purposes of calculating the actual Fields & types returned by this query we really only need the first + ;; row in the results + (-> query + (assoc-in [:constrains :max-results] 1) + (assoc-in [:constrains :max-results-bare-rows] 1)))) + out-chan) + ;; return out-chan + out-chan)) diff --git a/src/metabase/query_processor/middleware/add_query_throttle.clj b/src/metabase/query_processor/middleware/add_query_throttle.clj deleted file mode 100644 index 859351121a82b8dbceb50e1bb8e3b7ef0c13a354..0000000000000000000000000000000000000000 --- a/src/metabase/query_processor/middleware/add_query_throttle.clj +++ /dev/null @@ -1,51 +0,0 @@ -(ns metabase.query-processor.middleware.add-query-throttle - "Middleware that constrains the number of concurrent queries, rejects queries by throwing an exception and - returning a 503 when we exceed our capacity" - (:require [metabase.config :as config] - [puppetlabs.i18n.core :refer [tru]]) - (:import [java.util.concurrent Semaphore TimeUnit])) - -(def ^:private calculate-max-queries-from-max-threads - (let [max-threads (or (config/config-int :mb-jetty-maxthreads) 50)] - (int (Math/ceil (/ max-threads 2))))) - -(defn- ^Semaphore create-query-semaphore [] - (let [total-concurrent-queries (or (config/config-int :mb-max-concurrent-queries) - calculate-max-queries-from-max-threads)] - (Semaphore. total-concurrent-queries true))) - -(def ^Semaphore ^:private query-semaphore (create-query-semaphore)) - -(defn- throw-503-unavailable - [] - (throw (ex-info (str (tru "Max concurrent query limit reached")) - {:type ::concurrent-query-limit-reached - :status-code 503}))) - -;; Not marking this as `const` so it can be redef'd in tests -(def ^:private max-query-wait-time-in-millis - (or (config/config-int :mb-max-query-wait-time) - 5000)) - -(defn- throttle-queries - "Query middle that will throttle queries using `semaphore`. Throws 503 exceptions if there are no more slots - available" - [^Semaphore semaphore qp] - (fn [query] - ;; `tryAquire` will return `true` if it is able to get a permit, false otherwise - (if (.tryAcquire semaphore max-query-wait-time-in-millis TimeUnit/MILLISECONDS) - (try - (qp query) - (finally - ;; We have a permit, whether the query is successful or it failed, we must make sure that we always release - ;; the permit - (.release semaphore))) - ;; We were not able to get a permit without the timeout period, return a 503 - (throw-503-unavailable)))) - -(defn maybe-add-query-throttle - "Adds the query throttle middleware if `MB_ENABLE_QUERY_THROTTLE` has been set" - [qp] - (if (config/config-bool :mb-enable-query-throttle) - (throttle-queries query-semaphore qp) - qp)) diff --git a/src/metabase/query_processor/middleware/catch_exceptions.clj b/src/metabase/query_processor/middleware/catch_exceptions.clj index 62ede372d1e0fe8f02f3bd00cf7fa2d46f79d933..5dcba95f1700491b0ac602409c03ee884a8a8882 100644 --- a/src/metabase/query_processor/middleware/catch_exceptions.clj +++ b/src/metabase/query_processor/middleware/catch_exceptions.clj @@ -1,7 +1,6 @@ (ns metabase.query-processor.middleware.catch-exceptions "Middleware for catching exceptions thrown by the query processor and returning them in a friendlier format." - (:require [metabase.query-processor.middleware.add-query-throttle :as query-throttle] - [metabase.util :as u] + (:require [metabase.util :as u] schema.utils) (:import [schema.utils NamedError ValidationError])) @@ -64,11 +63,8 @@ (try (qp query) (catch clojure.lang.ExceptionInfo e (let [{error :error, error-type :type, :as data} (ex-data e)] - ;; When we've hit our concurrent query limit, let that exception bubble up, otherwise repackage it as a failure - (if (= error-type ::query-throttle/concurrent-query-limit-reached) - (throw e) - (fail query e (when-let [error-msg (and (= error-type :schema.core/error) - (explain-schema-validation-error error))] - {:error error-msg}))))) + (fail query e (when-let [error-msg (and (= error-type :schema.core/error) + (explain-schema-validation-error error))] + {:error error-msg})))) (catch Throwable e (fail query e))))) diff --git a/src/metabase/query_processor/middleware/results_metadata.clj b/src/metabase/query_processor/middleware/results_metadata.clj index 86941a97087f5f0bae35f65e86f53b936455e2f7..ad99f653a12f2325c48ba9ef95df7185cb770144 100644 --- a/src/metabase/query_processor/middleware/results_metadata.clj +++ b/src/metabase/query_processor/middleware/results_metadata.clj @@ -69,8 +69,7 @@ (-> metadata serialize-metadata-for-hashing hash/md5 - codec/base64-encode - encryption/maybe-encrypt))) + codec/base64-encode))) (defn valid-checksum? "Is the CHECKSUM the right one for this column METADATA?" diff --git a/src/metabase/server.clj b/src/metabase/server.clj index 13d2f9defe721ba758a9087ce30dbf27e9711e6c..03dfdc75a0adc8f493d352279143f2e80682120b 100644 --- a/src/metabase/server.clj +++ b/src/metabase/server.clj @@ -23,8 +23,7 @@ (defn- jetty-config [] (cond-> (m/filter-vals some? - {:async? true - :port (config/config-int :mb-jetty-port) + {:port (config/config-int :mb-jetty-port) :host (config/config-str :mb-jetty-host) :max-threads (config/config-int :mb-jetty-maxthreads) :min-threads (config/config-int :mb-jetty-minthreads) @@ -49,10 +48,17 @@ ^Server [] @instance*) -(defn- create-server +(defn create-server + "Create a new async Jetty server with `handler` and `options`. Handy for creating the real Metabase web server, and + creating one-off web servers for tests and REPL usage." ^Server [handler options] - (doto ^Server (#'ring-jetty/create-server options) - (.setHandler (#'ring-jetty/async-proxy-handler handler 0)))) + (doto ^Server (#'ring-jetty/create-server (assoc options :async? true)) + (.setHandler (#'ring-jetty/async-proxy-handler + handler + ;; if any API endpoint functions aren't at the very least returning a channel to fetch the results + ;; later after 30 seconds we're in serious trouble. Kill the request. + (or (config/config-int :mb-jetty-async-response-timeout) + (* 30 1000)))))) (defn start-web-server! "Start the embedded Jetty web server. Returns `:started` if a new server was started; `nil` if there was already a diff --git a/test/metabase/api/card_test.clj b/test/metabase/api/card_test.clj index 607cefde3900361ef625cd53d6a2e3a6dec815cb..bfc0b23ca09e30d9b43a08288f2a5fa00a19494e 100644 --- a/test/metabase/api/card_test.clj +++ b/test/metabase/api/card_test.clj @@ -8,7 +8,6 @@ [email-test :as et] [http-client :as http :refer :all] [util :as u]] - [metabase.api.card :as card-api] [metabase.driver.sql-jdbc.execute :as sql-jdbc.execute] [metabase.middleware.util :as middleware.u] [metabase.models @@ -25,6 +24,7 @@ [pulse-channel-recipient :refer [PulseChannelRecipient]] [table :refer [Table]] [view-log :refer [ViewLog]]] + [metabase.query-processor.async :as qp.async] [metabase.query-processor.middleware.results-metadata :as results-metadata] [metabase.test [data :as data] @@ -353,7 +353,7 @@ card-name (tu/random-name)] (tt/with-temp Collection [collection] (perms/grant-collection-readwrite-permissions! (perms-group/all-users) collection) - (tu/throw-if-called card-api/result-metadata-for-query + (tu/throw-if-called qp.async/result-metadata-for-query-async (tu/with-model-cleanup [Card] ;; create a card with the metadata ((user->client :rasta) :post 200 "card" diff --git a/test/metabase/api/common_test.clj b/test/metabase/api/common_test.clj index b5ceffd02db2c1a35513f58a0bf752c5bb2b6395..bccd4913141361316e5deb3704143f2261504fd8 100644 --- a/test/metabase/api/common_test.clj +++ b/test/metabase/api/common_test.clj @@ -1,6 +1,5 @@ (ns metabase.api.common-test - (:require [clojure.core.async :as async] - [expectations :refer [expect]] + (:require [expectations :refer [expect]] [metabase.api.common :as api :refer :all] [metabase.api.common.internal :refer :all] [metabase.middleware @@ -133,86 +132,3 @@ (defendpoint GET "/:id" [id] {id su/IntGreaterThanZero} (select-one Card :id id)))) - -(def ^:private long-timeout - ;; 2 minutes - (* 2 60000)) - -(defn- take-with-timeout [response-chan] - (let [[response c] (async/alts!! [response-chan - ;; We should never reach this unless something is REALLY wrong - (async/timeout long-timeout)])] - (when (and (nil? response) - (not= c response-chan)) - (throw (Exception. "Taking from streaming endpoint timed out!"))) - - response)) - -(defn- wait-for-future-cancellation - "Once a client disconnects, the next heartbeat sent will result in an exception that should cancel the future. In - theory 1 keepalive-interval should be enough, but building in some wiggle room here for poor concurrency timing in - tests." - [fut] - (let [keepalive-interval (var-get #'api/streaming-response-keep-alive-interval-ms) - max-iterations (long (/ long-timeout keepalive-interval))] - (loop [i 0] - (if (or (future-cancelled? fut) (> i max-iterations)) - fut - (do - (Thread/sleep keepalive-interval) - (recur (inc i))))))) - -;; This simulates 2 keepalive-intervals followed by the query response -(expect - [\newline \newline {:success true} false] - (let [send-response (promise) - {:keys [output-channel error-channel response-future]} (#'api/invoke-thunk-with-keepalive (fn [] @send-response))] - [(take-with-timeout output-channel) - (take-with-timeout output-channel) - (do - (deliver send-response {:success true}) - (take-with-timeout output-channel)) - (future-cancelled? response-future)])) - -;; This simulates an immediate query response -(expect - [{:success true} false] - (let [{:keys [output-channel error-channel response-future]} (#'api/invoke-thunk-with-keepalive (fn [] {:success true}))] - [(take-with-timeout output-channel) - (future-cancelled? response-future)])) - -;; This simulates a closed connection from the client, should cancel the future -(expect - [\newline \newline true] - (let [send-response (promise) - {:keys [output-channel error-channel response-future]} (#'api/invoke-thunk-with-keepalive (fn [] (Thread/sleep long-timeout)))] - [(take-with-timeout output-channel) - (take-with-timeout output-channel) - (do - (async/close! output-channel) - (future-cancelled? (wait-for-future-cancellation response-future)))])) - -;; When an immediate exception happens, we should know that via the error channel -(expect - ;; Each channel should have the failure and then get closed - ["It failed" "It failed" nil nil] - (let [{:keys [output-channel error-channel response-future]} (#'api/invoke-thunk-with-keepalive (fn [] (throw (Exception. "It failed"))))] - [(.getMessage (take-with-timeout error-channel)) - (.getMessage (take-with-timeout output-channel)) - (async/<!! error-channel) - (async/<!! output-channel)])) - -;; This simulates a slow failure, we'll still get an exception, but the error channel is closed, so at this point -;; we've assumed it would be a success, but it wasn't -(expect - [\newline nil \newline "It failed" false] - (let [now-throw-exception (promise) - {:keys [output-channel error-channel response-future]} (#'api/invoke-thunk-with-keepalive - (fn [] @now-throw-exception (throw (Exception. "It failed"))))] - [(take-with-timeout output-channel) - (take-with-timeout error-channel) - (take-with-timeout output-channel) - (do - (deliver now-throw-exception true) - (.getMessage (take-with-timeout output-channel))) - (future-cancelled? response-future)])) diff --git a/test/metabase/api/database_test.clj b/test/metabase/api/database_test.clj index c8d96523c95c3c979563b1f3b84545875ee0d6bd..fefa8ed4fad74f798d78370ef31bb069b70e442b 100644 --- a/test/metabase/api/database_test.clj +++ b/test/metabase/api/database_test.clj @@ -118,6 +118,7 @@ ;; ## POST /api/database ;; Check that we can create a Database +;; TODO - this test fails if we're running Postgres locally & it requires a password... (expect-with-temp-db-created-via-api [db {:is_full_sync false}] (merge default-db-details (match-$ db diff --git a/test/metabase/api/embed_test.clj b/test/metabase/api/embed_test.clj index d358358f79d9fbd27fdb8b305b9e4c1dc157315a..12cd51a4cd2fc5feecfd77be0109a0814e420175 100644 --- a/test/metabase/api/embed_test.clj +++ b/test/metabase/api/embed_test.clj @@ -203,7 +203,9 @@ (with-temp-card [card {:enable_embedding true, :dataset_query {:database (data/id) :type :native :native {:query "SELECT * FROM XYZ"}}}] - (http/client :get 400 (card-query-url card response-format)))))) + ;; since results are keepalive-streamed for normal queries (i.e., not CSV, JSON, or XLSX) we have to return a + ;; status code right away, so streaming responses always return 200 + (http/client :get (if (seq response-format) 400 200) (card-query-url card response-format)))))) ;; check that the endpoint doesn't work if embedding isn't enabled (expect-for-response-formats [response-format] @@ -404,7 +406,7 @@ :card {:dataset_query {:database (data/id) :type :native, :native {:query "SELECT * FROM XYZ"}}}}] - (http/client :get 400 (dashcard-url dashcard)))))) + (http/client :get 200 (dashcard-url dashcard)))))) ;; check that the endpoint doesn't work if embedding isn't enabled (expect diff --git a/test/metabase/async/api_response_test.clj b/test/metabase/async/api_response_test.clj new file mode 100644 index 0000000000000000000000000000000000000000..0798d4dc26e94a938578a868e9e8ab6eff3b61fb --- /dev/null +++ b/test/metabase/async/api_response_test.clj @@ -0,0 +1,197 @@ +(ns metabase.async.api-response-test + (:require [cheshire.core :as json] + [clojure.core.async :as a] + [expectations :refer [expect]] + [metabase.async.api-response :as async-response] + [metabase.test.util.async :as tu.async] + [ring.core.protocols :as ring.protocols]) + (:import [java.io ByteArrayOutputStream Closeable])) + +(def ^:private long-timeout-ms + ;; 5 seconds + (* 5 1000)) + + +;;; +----------------------------------------------------------------------------------------------------------------+ +;;; | New Tests | +;;; +----------------------------------------------------------------------------------------------------------------+ + +(defn- do-with-response [input-chan f] + ;; don't wait more than 10 seconds for results, our tests or code are busted otherwise + (with-redefs [async-response/absolute-max-keepalive-ms (min (* 10 1000) @#'async-response/absolute-max-keepalive-ms)] + (tu.async/with-chans [os-closed-chan] + (with-open [os (proxy [ByteArrayOutputStream] [] + (close [] + (a/close! os-closed-chan) + (let [^Closeable this this] + (proxy-super close))))] + (let [{output-chan :body, :as response} (#'async-response/async-keepalive-response input-chan)] + (ring.protocols/write-body-to-stream output-chan response os) + (try + (f {:os os, :output-chan output-chan, :os-closed-chan os-closed-chan}) + (finally + (a/close! output-chan)))))))) + +(defmacro ^:private with-response [[response-objects-binding input-chan] & body] + `(do-with-response ~input-chan (fn [~response-objects-binding] ~@body))) + +(defn- wait-for-close [chan] + (tu.async/wait-for-close chan long-timeout-ms) + true) + +(defn- os->response [^ByteArrayOutputStream os] + (some-> + os + .toString + (json/parse-string keyword) + ((fn [response] + (cond-> response + (:stacktrace response) (update :stacktrace (partial every? string?))))))) + + +;;; ------------------------------ Normal responses: message sent to the input channel ------------------------------- + +;; check that response is actually written to the output stream +(expect + {:success true} + (tu.async/with-chans [input-chan] + (with-response [{:keys [os os-closed-chan]} input-chan] + (a/>!! input-chan {:success true}) + (wait-for-close os-closed-chan) + (os->response os)))) + +;; when we send a single message to the input channel, it should get closed automatically by the async code +(expect + (tu.async/with-chans [input-chan] + (with-response [{:keys [output-chan]} input-chan] + ;; send the result to the input channel + (a/>!! input-chan {:success true}) + (wait-for-close output-chan) + ;; now see if input-chan is closed + (wait-for-close input-chan)))) + +;; when we send a message to the input channel, output-chan should *also* get closed +(expect + (tu.async/with-chans [input-chan] + (with-response [{:keys [output-chan]} input-chan] + ;; send the result to the input channel + (a/>!! input-chan {:success true}) + ;; now see if output-chan is closed + (wait-for-close output-chan)))) + +;; ...and the output-stream should be closed as well +(expect + (tu.async/with-chans [input-chan] + (with-response [{:keys [os-closed-chan]} input-chan] + (a/>!! input-chan {:success true}) + (wait-for-close os-closed-chan)))) + + +;;; ----------------------------------------- Input-chan closed unexpectedly ----------------------------------------- + +;; if we close input-channel prematurely, output-channel should get closed +(expect + (tu.async/with-chans [input-chan] + (with-response [{:keys [output-chan]} input-chan] + (a/close! input-chan) + (wait-for-close output-chan)))) + +;; ...as should the output stream +(expect + (tu.async/with-chans [input-chan] + (with-response [{:keys [os-closed-chan]} input-chan] + (a/close! input-chan) + (wait-for-close os-closed-chan)))) + +;; An error should be written to the output stream +(expect + {:message "Input channel unexpectedly closed." + :type "class java.lang.InterruptedException" + :stacktrace true} + (tu.async/with-chans [input-chan] + (with-response [{:keys [os os-closed-chan]} input-chan] + (a/close! input-chan) + (wait-for-close os-closed-chan) + (os->response os)))) + + +;;; ------------------------------ Output-chan closed early (i.e. API request canceled) ------------------------------ + +;; If output-channel gets closed (presumably because the API request is canceled), input-chan should also get closed +(expect + (tu.async/with-chans [input-chan] + (with-response [{:keys [output-chan]} input-chan] + (a/close! output-chan) + (wait-for-close input-chan)))) + +;; if output chan gets closed, output-stream should also get closed +(expect + (tu.async/with-chans [input-chan] + (with-response [{:keys [output-chan os-closed-chan]} input-chan] + (a/close! output-chan) + (wait-for-close os-closed-chan)))) + +;; we shouldn't bother writing anything to the output stream if output-chan is closed because it should already be +;; closed +(expect + nil + (tu.async/with-chans [input-chan] + (with-response [{:keys [output-chan os os-closed-chan]} input-chan] + (a/close! output-chan) + (wait-for-close os-closed-chan) + (os->response os)))) + + +;;; ------------ Normal response with a delay: message sent to Input chan at unspecified point in future ------------- + +;; Should write newlines if it has to wait +(expect + "\n\n{\"ready?\":true}" + (with-redefs [async-response/keepalive-interval-ms 500] + (tu.async/with-chans [input-chan] + (with-response [{:keys [os-closed-chan os]} input-chan] + (a/<!! (a/timeout 1400)) + (a/>!! input-chan {:ready? true}) + (wait-for-close os-closed-chan) + (.toString os))))) + + +;;; --------------------------------------- input chan message is an Exception --------------------------------------- + +;; If the message sent to input-chan is an Exception an appropriate response should be generated +(expect + {:message "Broken", :type "class java.lang.Exception", :stacktrace true} + (tu.async/with-chans [input-chan] + (with-response [{:keys [os os-closed-chan]} input-chan] + (a/>!! input-chan (Exception. "Broken")) + (wait-for-close os-closed-chan) + (os->response os)))) + + +;;; ------------------------------------------ input-chan never written to ------------------------------------------- + +;; If we write a bad API endpoint and return a channel but never write to it, the request should be canceled after +;; `absolute-max-keepalive-ms` +(expect + {:message "No response after waiting 500 ms. Canceling request." + :type "class java.util.concurrent.TimeoutException" + :stacktrace true} + (with-redefs [async-response/absolute-max-keepalive-ms 500] + (tu.async/with-chans [input-chan] + (with-response [{:keys [os os-closed-chan]} input-chan] + (wait-for-close os-closed-chan) + (os->response os))))) + +;; input chan should get closed +(expect + (with-redefs [async-response/absolute-max-keepalive-ms 500] + (tu.async/with-chans [input-chan] + (with-response [_ input-chan] + (wait-for-close input-chan))))) + +;; output chan should get closed +(expect + (with-redefs [async-response/absolute-max-keepalive-ms 500] + (tu.async/with-chans [input-chan] + (with-response [{:keys [output-chan]} input-chan] + (wait-for-close output-chan))))) diff --git a/test/metabase/async/semaphore_channel_test.clj b/test/metabase/async/semaphore_channel_test.clj new file mode 100644 index 0000000000000000000000000000000000000000..ffab725ad0bddbd649995714e1f2d91dd3de8895 --- /dev/null +++ b/test/metabase/async/semaphore_channel_test.clj @@ -0,0 +1,106 @@ +(ns metabase.async.semaphore-channel-test + (:require [clojure.core.async :as a] + [expectations :refer [expect]] + [metabase.async.semaphore-channel :as semaphore-channel] + [metabase.test.util.async :as tu.async]) + (:import java.io.Closeable)) + +(defn- get-permits [semaphore-chan n] + (loop [acc [], n n] + (if-not (pos? n) + acc + (let [[permit] (a/alts!! [semaphore-chan (a/timeout 100)])] + (assert permit) + (recur (conj acc permit) (dec n)))))) + +;; check that a semaphore channel only gives out the correct number of permits +(expect + nil + (tu.async/with-open-channels [semaphore-chan (semaphore-channel/semaphore-channel 3)] + (let [_ (get-permits semaphore-chan 3)] + (first (a/alts!! [semaphore-chan (a/timeout 100)]))))) + +;; check that when a permit is returned, whoever was waiting will get their permit +(expect + "Permit #4" + (tu.async/with-open-channels [semaphore-chan (semaphore-channel/semaphore-channel 3)] + (let [[^Closeable permit-1] (get-permits semaphore-chan 3)] + (.close permit-1) + (some-> (first (a/alts!! [semaphore-chan (a/timeout 100)])) str)))) + +;; if we are true knuckleheads and *lose* a permit it should eventually get garbage collected and returned to the pool +(expect + "Permit #4" + (tu.async/with-open-channels [semaphore-chan (semaphore-channel/semaphore-channel 3)] + (get-permits semaphore-chan 3) + (loop [tries 10] + (System/gc) + (or + (some-> (a/alts!! [semaphore-chan (a/timeout 200)]) first str) + (when (pos? tries) + (recur (dec tries))))))) + + +;;; ------------------------------------------- do-after-receiving-permit -------------------------------------------- + +;; If we already have a permit, code should be smart enough to skip getting another one +(expect + {:first-permit "Permit #1", :second-permit "Permit #1", :same? true} + (tu.async/with-open-channels [semaphore-chan (semaphore-channel/semaphore-channel 1) + output-chan (a/chan 1)] + (let [existing-permit #(get @#'semaphore-channel/*permits* semaphore-chan)] + (semaphore-channel/do-after-receiving-permit semaphore-chan + (fn [] + (let [first-permit (existing-permit)] + (semaphore-channel/do-after-receiving-permit semaphore-chan + (fn [] + (let [second-permit (existing-permit)] + (a/>!! output-chan {:first-permit (str first-permit) + :second-permit (str second-permit) + :same? (identical? first-permit second-permit)})))))))) + (first (a/alts!! [output-chan (a/timeout 100)])))) + +;; Make sure `do-f-with-permit` returns the permit when functions finish normally +(expect + {:permit-returned? true, :result ::value} + (let [open? (atom false) + permit (reify + Closeable + (close [this] + (reset! open? false)))] + (tu.async/with-open-channels [output-chan (a/chan 1)] + (#'semaphore-channel/do-f-with-permit permit output-chan (constantly ::value)) + (let [[result] (a/alts!! [output-chan (a/timeout 100)])] + {:permit-returned? (not @open?), :result result})))) + +;; If `f` throws an Exception, `permit` should get returned, and Exception should get returned as the result +(expect + {:permit-returned? true, :result "FAIL"} + (let [open? (atom false) + permit (reify + Closeable + (close [this] + (reset! open? false)))] + (tu.async/with-open-channels [output-chan (a/chan 1)] + (#'semaphore-channel/do-f-with-permit permit output-chan (fn [] + (throw (Exception. "FAIL")))) + (let [[result] (a/alts!! [output-chan (a/timeout 100)])] + {:permit-returned? (not @open?), :result (when (instance? Exception result) + (.getMessage ^Exception result))})))) + +;; If `output-chan` is closed early, permit should still get returned, but there's nowhere to write the result to so +;; it should be `nil` +(expect + {:permit-returned? true, :result nil} + (let [open? (atom false) + permit (reify + Closeable + (close [this] + (reset! open? false)))] + (tu.async/with-open-channels [output-chan (a/chan 1)] + (#'semaphore-channel/do-f-with-permit permit output-chan (fn [] + (Thread/sleep 100) + ::value)) + (a/close! output-chan) + (let [[result] (a/alts!! [output-chan (a/timeout 500)])] + {:permit-returned? (not @open?), :result result})))) diff --git a/test/metabase/async/util_test.clj b/test/metabase/async/util_test.clj new file mode 100644 index 0000000000000000000000000000000000000000..4ea34c2dc4850ab60da25152acb953062708a4f5 --- /dev/null +++ b/test/metabase/async/util_test.clj @@ -0,0 +1,118 @@ +(ns metabase.async.util-test + (:require [clojure.core.async :as a] + [expectations :refer [expect]] + [metabase.async.util :as async.u] + [metabase.test.util.async :as tu.async])) + +;;; ----------------------------------------------- single-value-pipe ------------------------------------------------ + +;; make sure `single-value-pipe` pipes a value from in-chan to out-chan +(expect + ::value + (tu.async/with-open-channels [in-chan (a/chan 1) + out-chan (a/chan 1)] + (async.u/single-value-pipe in-chan out-chan) + (a/>!! in-chan ::value) + (first (a/alts!! [out-chan (a/timeout 1000)])))) + +;; `single-value-pipe` should close input-chan if output-chan is closed +(expect + (tu.async/with-open-channels [in-chan (a/chan 1) + out-chan (a/chan 1)] + (async.u/single-value-pipe in-chan out-chan) + (a/close! out-chan) + (tu.async/wait-for-close in-chan 100))) + +;; `single-value-pipe` should close output-chan if input-chan is closed +(expect + (tu.async/with-open-channels [in-chan (a/chan 1) + out-chan (a/chan 1)] + (async.u/single-value-pipe in-chan out-chan) + (a/close! in-chan) + (tu.async/wait-for-close out-chan 100))) + +;; `single-value-pipe` should return a `canceled-chan` you can listen to to see whether either channel closes early +(expect + ::async.u/canceled + (tu.async/with-open-channels [in-chan (a/chan 1) + out-chan (a/chan 1)] + (let [canceled-chan (async.u/single-value-pipe in-chan out-chan)] + (a/close! in-chan) + (first (a/alts!! [canceled-chan (a/timeout 1000)]))))) + +(expect + ::async.u/canceled + (tu.async/with-open-channels [in-chan (a/chan 1) + out-chan (a/chan 1)] + (let [canceled-chan (async.u/single-value-pipe in-chan out-chan)] + (a/close! out-chan) + (first (a/alts!! [canceled-chan (a/timeout 1000)]))))) + +;; if things proceed normally the `canceled-chan` should close with no result +(expect + (tu.async/with-open-channels [in-chan (a/chan 1) + out-chan (a/chan 1)] + (let [canceled-chan (async.u/single-value-pipe in-chan out-chan)] + (a/>!! in-chan :ok) + (tu.async/wait-for-close canceled-chan 100)))) + +;; if you are a knucklehead and write to out-chan it should cancel things +(expect + ::async.u/canceled + (tu.async/with-open-channels [in-chan (a/chan 1) + out-chan (a/chan 1)] + (let [canceled-chan (async.u/single-value-pipe in-chan out-chan)] + (a/>!! out-chan "Oops") + (first (a/alts!! [canceled-chan (a/timeout 1000)]))))) + +;; can we combine multiple single value pipes? +(expect + ::value + (tu.async/with-open-channels [in-chan (a/chan 1) + out-chan-1 (a/chan 1) + out-chan-2 (a/chan 1)] + (async.u/single-value-pipe in-chan out-chan-1) + (async.u/single-value-pipe out-chan-1 out-chan-2) + (a/>!! in-chan ::value) + (first (a/alts!! [out-chan-2 (a/timeout 1000)])))) + + +;;; --------------------------------------------- do-on-separate-thread ---------------------------------------------- + +;; Make sure `do-on-separate-thread` can actually run a function correctly +(expect + ::success + (tu.async/with-open-channels [result-chan (async.u/do-on-separate-thread (fn [] + (Thread/sleep 100) + ::success))] + (first (a/alts!! [result-chan (a/timeout 500)])))) + +;; when you close the result channel of `do-on-separate-thread,` it should cancel the future that's running it. This +;; will produce an InterruptedException +(expect + InterruptedException + (tu.async/with-open-channels [started-chan (a/chan 1) + finished-chan (a/chan 1)] + (let [f (fn [] + (try + (a/>!! started-chan ::started) + (Thread/sleep 5000) + (a/>!! finished-chan ::finished) + (catch Throwable e + (a/>!! finished-chan e)))) + result-chan (async.u/do-on-separate-thread f)] + ;; wait for `f` to actually start running before we kill it. Otherwise it may not get started at all + (a/go + (a/alts!! [started-chan (a/timeout 1000)]) + (a/close! result-chan)) + (first (a/alts!! [finished-chan (a/timeout 1000)]))))) + +;; We should be able to combine the `single-value-pipe` and `do-on-separate-thread` and get results +(expect + ::success + (let [f (fn [] + (Thread/sleep 100) + ::success)] + (tu.async/with-open-channels [result-chan (a/chan 1)] + (let [canceled-chan (async.u/single-value-pipe (async.u/do-on-separate-thread f) result-chan)] + (first (a/alts!! [canceled-chan result-chan (a/timeout 500)])))))) diff --git a/test/metabase/automagic_dashboards/core_test.clj b/test/metabase/automagic_dashboards/core_test.clj index 62840668b83bf8ec35fada26047e606c68d881cc..1b122460f64666ba048bf8c3439d5121ffce8779 100644 --- a/test/metabase/automagic_dashboards/core_test.clj +++ b/test/metabase/automagic_dashboards/core_test.clj @@ -2,8 +2,8 @@ (:require [clj-time [core :as t] [format :as t.format]] + [clojure.core.async :as a] [expectations :refer :all] - [metabase.api.card :as card.api] [metabase.automagic-dashboards [core :as magic :refer :all] [rules :as rules]] @@ -17,6 +17,7 @@ [permissions-group :as perms-group] [query :as query] [table :as table :refer [Table]]] + [metabase.query-processor.async :as qp.async] [metabase.test [automagic-dashboards :refer :all] [data :as data] @@ -146,6 +147,12 @@ (perms/grant-collection-readwrite-permissions! (perms-group/all-users) collection-id) (-> card-id Card test-automagic-analysis)))))) +(defn- result-metadata-for-query [query] + (first + (a/alts!! + [(qp.async/result-metadata-for-query-async query) + (a/timeout 1000)]))) + (expect (tu/with-non-admin-groups-no-root-collection-perms (let [source-query {:query {:source-table (data/id :venues)} @@ -155,7 +162,7 @@ Card [{source-id :id} {:table_id (data/id :venues) :collection_id collection-id :dataset_query source-query - :result_metadata (with-rasta (#'card.api/result-metadata-for-query source-query))}] + :result_metadata (with-rasta (result-metadata-for-query source-query))}] Card [{card-id :id} {:table_id (data/id :venues) :collection_id collection-id :dataset_query {:query {:filter [:> [:field-literal "PRICE" "type/Number"] 10] @@ -190,7 +197,7 @@ Card [{source-id :id} {:table_id nil :collection_id collection-id :dataset_query source-query - :result_metadata (with-rasta (#'card.api/result-metadata-for-query source-query))}] + :result_metadata (with-rasta (result-metadata-for-query source-query))}] Card [{card-id :id} {:table_id nil :collection_id collection-id :dataset_query {:query {:filter [:> [:field-literal "PRICE" "type/Number"] 10] diff --git a/test/metabase/query_processor/async_test.clj b/test/metabase/query_processor/async_test.clj new file mode 100644 index 0000000000000000000000000000000000000000..85247ea865b145d5cec6f7fd327e4c7cf1be1f8e --- /dev/null +++ b/test/metabase/query_processor/async_test.clj @@ -0,0 +1,44 @@ +(ns metabase.query-processor.async-test + (:require [clojure.core.async :as a] + [expectations :refer [expect]] + [metabase.query-processor :as qp] + [metabase.query-processor.async :as qp.async] + [metabase.test.data :as data] + [metabase.test.util.async :as tu.async] + [metabase.util.encryption :as encrypt])) + +;; running a query async should give you the same results as running that query synchronously +(let [query + {:database (data/id) + :type :query + :query {:source-table (data/id :venues) + :fields [[:field-id (data/id :venues :name)]] + :limit 5}} + ;; Metadata checksum might be encrypted if a encryption key is set on this system (to make it hard for bad + ;; actors to forge one) in which case the checksums won't be equal. + maybe-decrypt-checksum + #(some-> % (update-in [:data :results_metadata :checksum] encrypt/maybe-decrypt))] + (expect + (maybe-decrypt-checksum + (qp/process-query query)) + (maybe-decrypt-checksum + (tu.async/with-open-channels [result-chan (qp.async/process-query query)] + (first (a/alts!! [result-chan (a/timeout 1000)])))))) + +(expect + [{:name "NAME" + :display_name "Name" + :base_type :type/Text + :special_type :type/Name + :fingerprint {:global {:distinct-count 100, :nil% 0.0}, + :type #:type {:Text + {:percent-json 0.0, + :percent-url 0.0, + :percent-email 0.0, + :average-length 15.63}}}}] + (tu.async/with-open-channels [result-chan (qp.async/result-metadata-for-query-async + {:database (data/id) + :type :query + :query {:source-table (data/id :venues) + :fields [[:field-id (data/id :venues :name)]]}})] + (first (a/alts!! [result-chan (a/timeout 1000)])))) diff --git a/test/metabase/query_processor/middleware/add_query_throttle_test.clj b/test/metabase/query_processor/middleware/add_query_throttle_test.clj deleted file mode 100644 index 9ca101888c1f6dbcbcafcd4306029ce6ccf6ecf4..0000000000000000000000000000000000000000 --- a/test/metabase/query_processor/middleware/add_query_throttle_test.clj +++ /dev/null @@ -1,136 +0,0 @@ -(ns metabase.query-processor.middleware.add-query-throttle-test - (:require [environ.core :as environ] - [expectations :refer :all] - [metabase.query-processor.middleware - [add-query-throttle :as throttle :refer :all] - [catch-exceptions :as catch-exceptions]] - [metabase.test.util :as tu] - [metabase.util :as u]) - (:import java.util.concurrent.Semaphore)) - -(defmacro ^:private with-query-wait-time-in-seconds [time-in-seconds & body] - `(with-redefs [throttle/max-query-wait-time-in-millis ~(* 1000 time-in-seconds)] - ~@body)) - -;; Check that the middleware will throw an exception and return a 503 if there are no tickets available in the -;; semaphore after waiting the timeout period -(expect - {:ex-class clojure.lang.ExceptionInfo - :msg "Max concurrent query limit reached" - :data {:status-code 503 - :type ::throttle/concurrent-query-limit-reached}} - (with-query-wait-time-in-seconds 1 - (tu/exception-and-message - (let [semaphore (Semaphore. 5)] - (.acquire semaphore 5) - ((#'throttle/throttle-queries semaphore (constantly "Should never be returned")) {}))))) - -;; The `catch-exceptions` middleware catches any query pipeline errors and reformats it as a failed query result. The -;; 503 exception here is special and should be bubbled up -(expect - {:ex-class clojure.lang.ExceptionInfo - :msg "Max concurrent query limit reached" - :data {:status-code 503 - :type ::throttle/concurrent-query-limit-reached}} - (with-query-wait-time-in-seconds 1 - (tu/exception-and-message - (let [semaphore (Semaphore. 5) - my-qp (->> identity - (#'throttle/throttle-queries semaphore) - catch-exceptions/catch-exceptions)] - (.acquire semaphore 5) - (my-qp {:my "query"}))))) - -;; Test that queries are "enqueued" for the timeout period and if another slot becomes available, it is used -(expect - {:before-semaphore-release ::no-result - :after-semaphore-release {:query "map"}} - (with-query-wait-time-in-seconds 120 - (let [semaphore (Semaphore. 5) - _ (.acquire semaphore 5) - query-future (future ((#'throttle/throttle-queries semaphore identity) {:query "map"}))] - {:before-semaphore-release (deref query-future 10 ::no-result) - :after-semaphore-release (do - (.release semaphore) - (deref query-future 10000 ::no-result))}))) - -;; Test that a successful query result will return the permit to the semaphore -(expect - {:beinning-permits 5 - :before-failure-permits 4 - :query-result {:query "map"} - :after-success-permits 5} - (with-query-wait-time-in-seconds 5 - (let [semaphore (Semaphore. 5) - start-middleware-promise (promise) - finish-middleware-promise (promise) - begin-num-permits (.availablePermits semaphore) - coordinate-then-finish (fn [query-map] - (deliver start-middleware-promise true) - @finish-middleware-promise - query-map) - query-future (future - ((#'throttle/throttle-queries semaphore coordinate-then-finish) {:query "map"}))] - {:beinning-permits begin-num-permits - :before-failure-permits (do - @start-middleware-promise - (.availablePermits semaphore)) - :query-result (do - (deliver finish-middleware-promise true) - @query-future) - :after-success-permits (.availablePermits semaphore)}))) - -;; Ensure that the even if there is a failure, the permit is always released -(expect - {:beinning-permits 5 - :before-failure-permits 4 - :after-failure-permits 5} - (with-query-wait-time-in-seconds 5 - (let [semaphore (Semaphore. 5) - start-middleware-promise (promise) - finish-middleware-promise (promise) - begin-num-permits (.availablePermits semaphore) - coordinate-then-fail (fn [_] - (deliver start-middleware-promise true) - @finish-middleware-promise - (throw (Exception. "failure"))) - query-future (future - (u/ignore-exceptions - ((#'throttle/throttle-queries semaphore coordinate-then-fail) {:query "map"})))] - {:beinning-permits begin-num-permits - :before-failure-permits (do - @start-middleware-promise - (.availablePermits semaphore)) - :after-failure-permits (do - (deliver finish-middleware-promise true) - @query-future - (.availablePermits semaphore))}))) - -;; Test the function that adds the middleware only when MB_ENABLE_QUERY_THROTTLE is set to true - -(defmacro ^:private with-query-throttle-value [enable-query-thottle-str & body] - `(with-redefs [environ/env {:mb-enable-query-throttle ~enable-query-thottle-str}] - ~@body)) - -;; By default the query throttle should not be applied -(expect - #'identity - (tu/throw-if-called throttle/throttle-queries - (with-query-throttle-value nil - (throttle/maybe-add-query-throttle #'identity)))) - -;; The query throttle should not be applied if MB_ENABLE_QUERY_THROTTLE is false -(expect - #'identity - (tu/throw-if-called throttle/throttle-queries - (with-query-throttle-value "false" - (throttle/maybe-add-query-throttle #'identity)))) - -;; The query throttle should be applied if MB_ENABLE_QUERY_THROTTLE is true -(expect - (let [called? (atom false)] - (with-redefs [throttle/throttle-queries (fn [& args] - (reset! called? true))] - (with-query-throttle-value "true" - (throttle/maybe-add-query-throttle #'identity) - @called?)))) diff --git a/test/metabase/query_processor/middleware/results_metadata_test.clj b/test/metabase/query_processor/middleware/results_metadata_test.clj index e9b55f6d84348deaa3c2fc20f82e4c39ad2a4340..6cde4840d4d2f96715bcc7650fbf27388e5cee38 100644 --- a/test/metabase/query_processor/middleware/results_metadata_test.clj +++ b/test/metabase/query_processor/middleware/results_metadata_test.clj @@ -128,6 +128,11 @@ (with-redefs [encrypt/default-secret-key nil] (#'results-metadata/metadata-checksum metadata))) +;; metadata-checksum should be the same every time +(expect + (metadata-checksum example-metadata) + (metadata-checksum example-metadata)) + ;; tests that the checksum is consistent when an array-map is switched to a hash-map (expect (metadata-checksum example-metadata) diff --git a/test/metabase/server_test.clj b/test/metabase/server_test.clj index 82ac6023321035743aadf5fa897505e76602b218..ba5a9b4445064bf920da71f2287212652fa2acff 100644 --- a/test/metabase/server_test.clj +++ b/test/metabase/server_test.clj @@ -12,7 +12,6 @@ :min-threads 10 :host "10" :daemon? false - :async? true :ssl? true :trust-password "10" :key-password "10" diff --git a/test/metabase/test/util/async.clj b/test/metabase/test/util/async.clj new file mode 100644 index 0000000000000000000000000000000000000000..c3d413d9a909b6854707b0d1f772f69f8a5778f7 --- /dev/null +++ b/test/metabase/test/util/async.clj @@ -0,0 +1,51 @@ +(ns metabase.test.util.async + (:require [clojure.core.async :as a]) + (:import java.util.concurrent.TimeoutException)) + +(defn wait-for-close + "Wait up to `timeout-ms` for `chan` to be closed, and returns `true` once it is; otherwise throws an Exception if + channel is not closed by the timeout or unexpectedly returns a result." + [chan timeout-ms] + (let [[result first-to-finish] (a/alts!! [chan (a/timeout timeout-ms)])] + (cond + (and (= result nil) + (= first-to-finish chan)) + true + + (= result nil) + (throw (TimeoutException. "Timed out.")) + + :else + (throw (ex-info "Waiting for channel to close, but got unexpected result" + {:result result}))))) + +(defmacro with-open-channels + "Like `with-open`, but closes core.async channels at the conclusion of `body`." + [[binding chan & more] & body] + {:pre [binding chan]} + `(let [chan# ~chan + ~binding chan#] + (try + ~(if (seq more) + `(with-open-channels ~more ~@body) + `(do ~@body)) + (finally + (a/close! chan#))))) + + +(defmacro with-chans + "Create core.async channels and bind them; execute body, closing out the channels in a `finally` block. Useful for + writing tests where you don't want to accidentally leave things open if something goes wrong. + + ;; Specifying definition is optional; defaults to `(a/chan 1)` + (with-chans [my-chan] + + ;; specify multiple chans + (with-chans [chan-1 (a/chan 1) + chan-2 (a/chan 100)] + ...) " + [[chan-binding chan & more] & body] + `(with-open-channels [~chan-binding ~(or chan `(a/chan 1))] + ~(if (seq more) + `(with-chans ~more ~@body) + `(do ~@body))))