Skip to content
Snippets Groups Projects
Unverified Commit 60f3c439 authored by Cam Saul's avatar Cam Saul Committed by GitHub
Browse files

Merge pull request #9704 from metabase/fix-async-keepalive

Fix bug where async keepalive characters would not always be written
parents 6d6329b7 72f3b9a8
No related branches found
No related tags found
No related merge requests found
...@@ -15,6 +15,7 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout ...@@ -15,6 +15,7 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d [%t] %-5p%c - %m%n log4j.appender.file.layout.ConversionPattern=%d [%t] %-5p%c - %m%n
# customizations to logging by package # customizations to logging by package
log4j.logger.metabase.driver=INFO log4j.logger.metabase.driver=INFO
log4j.logger.metabase.plugins=DEBUG log4j.logger.metabase.plugins=DEBUG
log4j.logger.metabase.middleware=DEBUG log4j.logger.metabase.middleware=DEBUG
...@@ -23,12 +24,15 @@ log4j.logger.metabase.query-processor.permissions=INFO ...@@ -23,12 +24,15 @@ log4j.logger.metabase.query-processor.permissions=INFO
log4j.logger.metabase.query-processor=INFO log4j.logger.metabase.query-processor=INFO
log4j.logger.metabase.sync=DEBUG log4j.logger.metabase.sync=DEBUG
log4j.logger.metabase.models.field-values=INFO log4j.logger.metabase.models.field-values=INFO
# NOCOMMIT
# TODO - we can dial these back a bit once we are satisfied the async stuff isn't so new (0.33.0+)
log4j.logger.metabase.async.api-response=DEBUG
log4j.logger.metabase.async.semaphore-channel=DEBUG log4j.logger.metabase.async.semaphore-channel=DEBUG
log4j.logger.metabase.async.util=DEBUG log4j.logger.metabase.async.util=DEBUG
log4j.logger.metabase.middleware.async=DEBUG log4j.logger.metabase.middleware.async=DEBUG
log4j.logger.metabase.query-processor.async=DEBUG log4j.logger.metabase.query-processor.async=DEBUG
log4j.logger.metabase.async.api-response=DEBUG
log4j.logger.metabase=INFO log4j.logger.metabase=INFO
# c3p0 connection pools tend to log useless warnings way too often; only log actual errors # c3p0 connection pools tend to log useless warnings way too often; only log actual errors
log4j.logger.com.mchange=ERROR log4j.logger.com.mchange=ERROR
...@@ -217,14 +217,14 @@ ...@@ -217,14 +217,14 @@
:collection_id collection_id :collection_id collection_id
:collection_position collection_position} :collection_position collection_position}
dashboard (db/transaction dashboard (db/transaction
;; Adding a new dashboard at `collection_position` could cause other dashboards in this collection to change ;; Adding a new dashboard at `collection_position` could cause other dashboards in this
;; position, check that and fix up if needed ;; collection to change position, check that and fix up if needed
(api/maybe-reconcile-collection-position! dashboard-data) (api/maybe-reconcile-collection-position! dashboard-data)
;; Ok, now save the Dashboard ;; Ok, now save the Dashboard
(u/prog1 (db/insert! Dashboard dashboard-data) (u/prog1 (db/insert! Dashboard dashboard-data)
;; Get cards from existing dashboard and associate to copied dashboard ;; Get cards from existing dashboard and associate to copied dashboard
(doseq [card (:ordered_cards existing-dashboard)] (doseq [card (:ordered_cards existing-dashboard)]
(api/check-500 (dashboard/add-dashcard! <> (:card_id card) card)))))] (api/check-500 (dashboard/add-dashcard! <> (:card_id card) card)))))]
(events/publish-event! :dashboard-create dashboard))) (events/publish-event! :dashboard-create dashboard)))
...@@ -233,13 +233,7 @@ ...@@ -233,13 +233,7 @@
(api/defendpoint GET "/:id" (api/defendpoint GET "/:id"
"Get `Dashboard` with ID." "Get `Dashboard` with ID."
[id] [id]
(u/prog1 (-> (Dashboard id) (u/prog1 (get-dashboard id)
api/check-404
(hydrate [:ordered_cards :card :series] :can_write)
api/read-check
api/check-not-archived
hide-unreadable-cards
add-query-average-durations)
(events/publish-event! :dashboard-read (assoc <> :actor_id api/*current-user-id*)))) (events/publish-event! :dashboard-read (assoc <> :actor_id api/*current-user-id*))))
......
...@@ -47,7 +47,7 @@ ...@@ -47,7 +47,7 @@
(let [source-card-id (query->source-card-id query) (let [source-card-id (query->source-card-id query)
options {:executed-by api/*current-user-id*, :context :ad-hoc, options {:executed-by api/*current-user-id*, :context :ad-hoc,
:card-id source-card-id, :nested? (boolean source-card-id)}] :card-id source-card-id, :nested? (boolean source-card-id)}]
(qp.async/process-query-and-save-with-max! query options))) (qp.async/process-query-and-save-with-max-results-constraints! query options)))
;;; ----------------------------------- Downloading Query Results in Other Formats ----------------------------------- ;;; ----------------------------------- Downloading Query Results in Other Formats -----------------------------------
......
(ns metabase.async.api-response (ns metabase.async.api-response
"Handle ring response maps that contain a core.async chan in the :body key:
{:status 200
:body (a/chan)}
and send strings (presumibly \n) as heartbeats to the client until the real results (a seq) is received, then stream
that to the client."
(:require [cheshire.core :as json] (:require [cheshire.core :as json]
[clojure.core.async :as a] [clojure.core.async :as a]
[clojure.java.io :as io] [clojure.java.io :as io]
...@@ -19,6 +26,7 @@ ...@@ -19,6 +26,7 @@
(def ^:private keepalive-interval-ms (def ^:private keepalive-interval-ms
"Interval between sending newline characters to keep Heroku from terminating requests like queries that take a long "Interval between sending newline characters to keep Heroku from terminating requests like queries that take a long
time to complete." time to complete."
;; 1 second
(* 1 1000)) (* 1 1000))
(def ^:private absolute-max-keepalive-ms (def ^:private absolute-max-keepalive-ms
...@@ -29,18 +37,16 @@ ...@@ -29,18 +37,16 @@
;; 4 hours ;; 4 hours
(* 4 60 60 1000)) (* 4 60 60 1000))
;; Handle ring response maps that contain a core.async chan in the :body key:
;; ;;; +----------------------------------------------------------------------------------------------------------------+
;; {:status 200 ;;; | Writing Results of Async Keep-alive Channel |
;; :body (a/chan)} ;;; +----------------------------------------------------------------------------------------------------------------+
;;
;; and send strings (presumibly \n) as heartbeats to the client until the real results (a seq) is received, then (defn- write-keepalive-character! [^Writer out]
;; stream that to the client
(defn- write-keepalive-character [^Writer out]
(try (try
;; a newline padding character as it's harmless and will allow us to check if the client ;; a newline padding character as it's harmless and will allow us to check if the client is connected. If sending
;; is connected. If sending this character fails because the connection is closed, the ;; this character fails because the connection is closed, the chan will then close. Newlines are no-ops when
;; chan will then close. Newlines are no-ops when reading JSON which this depends upon. ;; reading JSON which this depends upon.
(.write out (str \newline)) (.write out (str \newline))
(.flush out) (.flush out)
true true
...@@ -52,7 +58,7 @@ ...@@ -52,7 +58,7 @@
false))) false)))
;; `chunkk` named as such to avoid conflict with `clojure.core/chunk` ;; `chunkk` named as such to avoid conflict with `clojure.core/chunk`
(defn- write-response-chunk [chunkk, ^Writer out] (defn- write-response-chunk! [chunkk, ^Writer out]
(cond (cond
;; An error has occurred, let the user know ;; An error has occurred, let the user know
(instance? Throwable chunkk) (instance? Throwable chunkk)
...@@ -65,12 +71,16 @@ ...@@ -65,12 +71,16 @@
:else :else
(log/error (trs "Unexpected output in async API response") (class chunkk)))) (log/error (trs "Unexpected output in async API response") (class chunkk))))
(defn- write-channel-to-output-stream [chan, ^Writer out] (defn- write-chan-vals-to-writer!
"Write whatever val(s) come into `chan` onto the Writer wrapping our OutputStream. Vals should be either
`::keepalive`, meaning we should write a keepalive newline character to the Writer, or some other value, which is
the actual response we've been waiting for (at this point we can close both the Writer and the channel)."
[chan, ^Writer out]
(a/go-loop [chunkk (a/<! chan)] (a/go-loop [chunkk (a/<! chan)]
(cond (cond
(= chunkk ::keepalive)
;; keepalive chunkk ;; keepalive chunkk
(if (write-keepalive-character out) (= chunkk ::keepalive)
(if (write-keepalive-character! out)
(recur (a/<! chan)) (recur (a/<! chan))
(do (do
(a/close! chan) (a/close! chan)
...@@ -86,7 +96,7 @@ ...@@ -86,7 +96,7 @@
(future (future
(try (try
;; chunkk *might* be `nil` if the channel already go closed. ;; chunkk *might* be `nil` if the channel already go closed.
(write-response-chunk chunkk out) (write-response-chunk! chunkk out)
(finally (finally
;; should already be closed, but just to be safe ;; should already be closed, but just to be safe
(a/close! chan) (a/close! chan)
...@@ -94,15 +104,12 @@ ...@@ -94,15 +104,12 @@
(.close out)))))) (.close out))))))
nil) nil)
;;; +----------------------------------------------------------------------------------------------------------------+
(extend-protocol ring.protocols/StreamableResponseBody ;;; | Async Keep-alive Channel |
ManyToManyChannel ;;; +----------------------------------------------------------------------------------------------------------------+
(write-body-to-stream [chan _ ^OutputStream output-stream]
(log/debug (u/format-color 'green (trs "starting streaming response")))
(write-channel-to-output-stream chan (io/writer output-stream))))
(defn- start-async-keepalive-loop (defn- start-async-keepalive-loop!
"Starts a go-loop that will send `::keepalive` messages to `output-chan` every second until `input-chan` either "Starts a go-loop that will send `::keepalive` messages to `output-chan` every second until `input-chan` either
produces a response or one of the two channels is closed. If `output-chan` is closed (because there's no longer produces a response or one of the two channels is closed. If `output-chan` is closed (because there's no longer
anywhere to write to -- the connection was canceled), closes `input-chan`; this can and is used by producers such as anywhere to write to -- the connection was canceled), closes `input-chan`; this can and is used by producers such as
...@@ -155,19 +162,32 @@ ...@@ -155,19 +162,32 @@
(a/close! output-chan) (a/close! output-chan)
(a/close! input-chan)))))))) (a/close! input-chan))))))))
(defn- async-keepalive-chan [input-chan] (defn- async-keepalive-channel
"Given a core.async channel `input-chan` which will (presumably) eventually receive an asynchronous result, return a
new channel 'wrapping' the original that will write keepalive bytes until the actual result is obtained."
[input-chan]
;; Output chan only needs to hold on to the last message it got, for example no point in writing multiple `\n` ;; Output chan only needs to hold on to the last message it got, for example no point in writing multiple `\n`
;; characters if the consumer didn't get a chance to consume them, and no point writing `\n` before writing the ;; characters if the consumer didn't get a chance to consume them, and no point writing `\n` before writing the
;; actual response ;; actual response
(let [output-chan (a/chan (a/sliding-buffer 1))] (u/prog1 (a/chan (a/sliding-buffer 1))
(start-async-keepalive-loop input-chan output-chan) (start-async-keepalive-loop! input-chan <>)))
output-chan))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | Telling Ring & Compojure how to handle core.async channel API responses |
;;; +----------------------------------------------------------------------------------------------------------------+
;; Synchronous Compojure endpoint (e.g. `defendpoint`) responses go directly to here. Async endpoint
;; (`defendpoint-async`) responses go to Sendable and then to here. So technically this affects both sync & async.
(defn- async-keepalive-response [input-chan] (extend-protocol ring.protocols/StreamableResponseBody
(assoc (response/response (async-keepalive-chan input-chan)) ManyToManyChannel
:content-type "applicaton/json; charset=utf-8")) (write-body-to-stream [chan _ ^OutputStream output-stream]
(log/debug (u/format-color 'green (trs "starting streaming response")))
(write-chan-vals-to-writer! (async-keepalive-channel chan) (io/writer output-stream))))
(extend-protocol Sendable (extend-protocol Sendable
ManyToManyChannel ManyToManyChannel
(send* [input-chan _ respond _] (send* [input-chan _ respond _]
(respond (async-keepalive-response input-chan)))) (respond (assoc (response/response input-chan)
:content-type "applicaton/json; charset=utf-8"))))
...@@ -656,7 +656,7 @@ ...@@ -656,7 +656,7 @@
:question :question
:xlsx-download)) :xlsx-download))
;; TODO - this schema is somewhat misleading because if you use a function like `qp/process-query-and-save-with-max!` ;; TODO - this schema is somewhat misleading because if you use a function like `qp/process-query-and-save-with-max-results-constraints!`
;; some of these keys (e.g. `:context`) are in fact required ;; some of these keys (e.g. `:context`) are in fact required
(def Info (def Info
"Schema for query `:info` dictionary, which is used for informational purposes to record information about how a query "Schema for query `:info` dictionary, which is used for informational purposes to record information about how a query
......
...@@ -62,29 +62,6 @@ ...@@ -62,29 +62,6 @@
(respond ring.json/default-malformed-response)) (respond ring.json/default-malformed-response))
(handler request respond raise)))) (handler request respond raise))))
#_(defn check-application-type-headers
"We don't support API requests with any type of content encoding other than JSON so let's be nice and make that
explicit. Added benefit is that it reduces CSRF surface because POSTing a form with JSON content encoding isn't so
easy to do."
[handler]
(fn
[{:keys [request-method body], {:strs [content-type]} :headers, :as request} respond raise]
;; GET or DELETE requests with no body we can go ahead and proceed without Content-Type headers, since they
;; generally don't have bodies.
;;
;; POST/PUT requests always require Content-Type: application/json. GET/DELETE requests that specify any other
;; content type aren't allowed.
(if (or (and (#{:get :delete} request-method)
(nil? content-type))
(#'ring.json/json-request? request))
(handler request respond raise)
(respond
{:status 400
:headers {"Content-Type" "text/plain"}
:body (str (tru "Metabase only supports JSON requests.")
" "
(tru "Make sure you set a 'Content-Type: application/json' header."))}))))
;;; +----------------------------------------------------------------------------------------------------------------+ ;;; +----------------------------------------------------------------------------------------------------------------+
;;; | Streaming JSON Responses | ;;; | Streaming JSON Responses |
......
...@@ -33,10 +33,11 @@ ...@@ -33,10 +33,11 @@
(when-let [card (Card :id card-id, :archived false)] (when-let [card (Card :id card-id, :archived false)]
(let [{:keys [creator_id dataset_query]} card] (let [{:keys [creator_id dataset_query]} card]
{:card card {:card card
:result (qp/process-query-and-save-with-max! dataset_query (merge {:executed-by creator_id, :result (qp/process-query-and-save-with-max-results-constraints! dataset_query
:context :pulse, (merge {:executed-by creator_id,
:card-id card-id} :context :pulse,
options))})) :card-id card-id}
options))}))
(catch Throwable t (catch Throwable t
(log/warn t (trs "Error running query for Card {0}" card-id))))) (log/warn t (trs "Error running query for Card {0}" card-id)))))
......
...@@ -395,7 +395,7 @@ ...@@ -395,7 +395,7 @@
{:max-results-bare-rows max-results}) {:max-results-bare-rows max-results})
m)) m))
(s/defn process-query-and-save-with-max! (s/defn process-query-and-save-with-max-results-constraints!
"Same as `process-query-and-save-execution!` but will include the default max rows returned as a constraint. (This "Same as `process-query-and-save-execution!` but will include the default max rows returned as a constraint. (This
function is ulitmately what powers most API endpoints that run queries, including `POST /api/dataset`.)" function is ulitmately what powers most API endpoints that run queries, including `POST /api/dataset`.)"
{:style/indent 1} {:style/indent 1}
......
...@@ -70,12 +70,12 @@ ...@@ -70,12 +70,12 @@
[query options] [query options]
(do-async (:database query) qp/process-query-and-save-execution! query options)) (do-async (:database query) qp/process-query-and-save-execution! query options))
(defn process-query-and-save-with-max! (defn process-query-and-save-with-max-results-constraints!
"Async version of `metabase.query-processor/process-query-and-save-with-max!`. Runs query asynchronously, and returns "Async version of `metabase.query-processor/process-query-and-save-with-max-results-constraints!`. Runs query
a `core.async` channel that can be used to fetch the results once the query finishes running. Closing the channel asynchronously, and returns a `core.async` channel that can be used to fetch the results once the query finishes
will cancel the query." running. Closing the channel will cancel the query."
[query options] [query options]
(do-async (:database query) qp/process-query-and-save-with-max! query options)) (do-async (:database query) qp/process-query-and-save-with-max-results-constraints! query options))
(defn process-query-without-save! (defn process-query-without-save!
"Async version of `metabase.query-processor/process-query-without-save!`. Runs query asynchronously, and returns a "Async version of `metabase.query-processor/process-query-without-save!`. Runs query asynchronously, and returns a
......
...@@ -56,9 +56,13 @@ ...@@ -56,9 +56,13 @@
(.setHandler (#'ring-jetty/async-proxy-handler (.setHandler (#'ring-jetty/async-proxy-handler
handler handler
;; if any API endpoint functions aren't at the very least returning a channel to fetch the results ;; if any API endpoint functions aren't at the very least returning a channel to fetch the results
;; later after 30 seconds we're in serious trouble. Kill the request. ;; later after 10 minutes we're in serious trouble. (Almost everything 'slow' should be returning a
;; channel before then, but some things like CSV downloads don't currently return channels at this
;; time)
;;
;; TODO - I suppose the default value should be moved to the `metabase.config` namespace?
(or (config/config-int :mb-jetty-async-response-timeout) (or (config/config-int :mb-jetty-async-response-timeout)
(* 30 1000)))))) (* 10 60 1000))))))
(defn start-web-server! (defn start-web-server!
"Start the embedded Jetty web server. Returns `:started` if a new server was started; `nil` if there was already a "Start the embedded Jetty web server. Returns `:started` if a new server was started; `nil` if there was already a
......
(ns metabase.async.api-response-test (ns metabase.async.api-response-test
(:require [cheshire.core :as json] (:require [cheshire.core :as json]
[clj-http.client :as client]
[clojure.core.async :as a] [clojure.core.async :as a]
[compojure.core :as compojure]
[expectations :refer [expect]] [expectations :refer [expect]]
[metabase
[server :as server]
[util :as u]]
[metabase.async.api-response :as async-response] [metabase.async.api-response :as async-response]
[metabase.test.util.async :as tu.async] [metabase.test.util.async :as tu.async]
[ring.core.protocols :as ring.protocols]) [ring.core.protocols :as ring.protocols])
...@@ -13,7 +18,7 @@ ...@@ -13,7 +18,7 @@
;;; +----------------------------------------------------------------------------------------------------------------+ ;;; +----------------------------------------------------------------------------------------------------------------+
;;; | New Tests | ;;; | Tests to make sure channels do the right thing |
;;; +----------------------------------------------------------------------------------------------------------------+ ;;; +----------------------------------------------------------------------------------------------------------------+
(defn- do-with-response [input-chan f] (defn- do-with-response [input-chan f]
...@@ -25,8 +30,16 @@ ...@@ -25,8 +30,16 @@
(a/close! os-closed-chan) (a/close! os-closed-chan)
(let [^Closeable this this] (let [^Closeable this this]
(proxy-super close))))] (proxy-super close))))]
(let [{output-chan :body, :as response} (#'async-response/async-keepalive-response input-chan)] ;; normally `write-body-to-stream` will create the `output-chan`, however we want to do it ourselves so we can
(ring.protocols/write-body-to-stream output-chan response os) ;; truly enjoy the magical output channel slash see when it gets closed. Create it now...
(let [output-chan (#'async-response/async-keepalive-channel input-chan)
response {:status 200
:headers {}
:body input-chan
:content-type "applicaton/json; charset=utf-8"}]
;; and keep it from getting [re]created.
(with-redefs [async-response/async-keepalive-channel identity]
(ring.protocols/write-body-to-stream output-chan response os))
(try (try
(f {:os os, :output-chan output-chan, :os-closed-chan os-closed-chan}) (f {:os os, :output-chan output-chan, :os-closed-chan os-closed-chan})
(finally (finally
...@@ -198,3 +211,59 @@ ...@@ -198,3 +211,59 @@
(with-response [{:keys [output-chan os-closed-chan]} input-chan] (with-response [{:keys [output-chan os-closed-chan]} input-chan]
(wait-for-close os-closed-chan) (wait-for-close os-closed-chan)
(wait-for-close output-chan))))) (wait-for-close output-chan)))))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | Tests to make sure keepalive bytes actually get written |
;;; +----------------------------------------------------------------------------------------------------------------+
(defn- do-with-temp-server [handler f]
(let [port (+ 60000 (rand-int 5000))
server (server/create-server handler {:port port})]
(try
(.start server)
(f port)
(finally
(.stop server)))))
(defmacro ^:private with-temp-server
"Spin up a Jetty server with `handler` with a random port between 60000 and 65000; bind the random port to `port`, and
execute body. Shuts down server when finished."
[[port-binding handler] & body]
`(do-with-temp-server ~handler (fn [~port-binding] ~@body)))
(defn- num-keepalive-chars-in-response
"Make a request to `handler` and count the number of newline keepalive chars in the response."
[handler]
(with-redefs [async-response/keepalive-interval-ms 50]
(with-temp-server [port handler]
(let [{response :body} (client/get (format "http://localhost:%d/" port))]
(count (re-seq #"\n" response))))))
(defn- output-chan-with-delayed-result
"Returns an output channel that receives a 'DONE' value after 400ms. "
[]
(u/prog1 (a/chan 1)
(a/go
(a/<! (a/timeout 400))
(a/>! <> "DONE"))))
;; confirm that some newlines were written as part of the response for an async API response
(defn- async-handler [_ respond _]
(respond {:status 200, :headers {"Content-Type" "text/plain"}, :body (output-chan-with-delayed-result)}))
(expect pos? (num-keepalive-chars-in-response async-handler))
;; make sure newlines are written for sync-style compojure endpoints (e.g. `defendpoint`)
(def ^:private compojure-sync-handler
(compojure/routes
(compojure/GET "/" [_] (output-chan-with-delayed-result))))
(expect pos? (num-keepalive-chars-in-response compojure-sync-handler))
;; ...and for true async compojure endpoints (e.g. `defendpoint-async`)
(def ^:private compojure-async-handler
(compojure/routes
(compojure/GET "/" [] (fn [_ respond _] (respond (output-chan-with-delayed-result))))))
(expect pos? (num-keepalive-chars-in-response compojure-async-handler))
...@@ -29,8 +29,8 @@ ...@@ -29,8 +29,8 @@
:native (native-query) :native (native-query)
:constraints {:max-results 5}}))) :constraints {:max-results 5}})))
;; does it also work when running via `process-query-and-save-with-max!`, the function that powers endpoints like ;; does it also work when running via `process-query-and-save-with-max-results-constraints!`, the function that powers
;; `POST /api/dataset`? ;; endpoints like `POST /api/dataset`?
(qp.test/expect-with-non-timeseries-dbs (qp.test/expect-with-non-timeseries-dbs
[["Red Medicine"] [["Red Medicine"]
["Stout Burgers & Beers"] ["Stout Burgers & Beers"]
...@@ -38,7 +38,7 @@ ...@@ -38,7 +38,7 @@
["Wurstküche"] ["Wurstküche"]
["Brite Spot Family Restaurant"]] ["Brite Spot Family Restaurant"]]
(qp.test/rows (qp.test/rows
(qp/process-query-and-save-with-max! (qp/process-query-and-save-with-max-results-constraints!
{:database (data/id) {:database (data/id)
:type :native :type :native
:native (native-query) :native (native-query)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment