diff --git a/src/metabase/api/common/internal.clj b/src/metabase/api/common/internal.clj index 7862acfc4d83e6f61cb8070089d3e12a3cf2afc5..50a621b0874f5db27b82ca837de2d8c365c8ef22 100644 --- a/src/metabase/api/common/internal.clj +++ b/src/metabase/api/common/internal.clj @@ -264,8 +264,7 @@ (contains? response :status) (contains? response :body)) response - {:status (if (or (instance? ManyToManyChannel response) - (instance? StreamingResponse response)) + {:status (if (some #(instance? % response) [ManyToManyChannel StreamingResponse]) 202 200) :body response})) diff --git a/src/metabase/async/streaming_response.clj b/src/metabase/async/streaming_response.clj index c74c50bfd2ed6ce7f19fc4616b8ad26c61aa5e36..8c47e49eca62fcf8ecd3339149346c5552403176 100644 --- a/src/metabase/async/streaming_response.clj +++ b/src/metabase/async/streaming_response.clj @@ -1,46 +1,78 @@ (ns metabase.async.streaming-response - "A special Ring response type that can handle async, streaming results. It writes newlines as 'heartbeats' to the - client until the real results are ready to begin streaming, then streams those to the client." (:require [cheshire.core :as json] [clojure.core.async :as a] - [clojure.tools.logging :as log] compojure.response - [metabase - [config :as config] - [util :as u]] - [metabase.util.i18n :refer [trs]] + [metabase.util :as u] [potemkin.types :as p.types] [pretty.core :as pretty] [ring.core.protocols :as ring.protocols] [ring.util.response :as ring.response]) - (:import [java.io BufferedWriter OutputStream OutputStreamWriter] + (:import [java.io BufferedWriter FilterOutputStream OutputStream OutputStreamWriter] java.nio.charset.StandardCharsets org.eclipse.jetty.io.EofException)) -;; TODO - this whole namespace seems a lot more complicated than it needs to be, simplify it. We can probably simplify -;; it by 50% or so - -;; 1. Change `proxy-output-stream` to something like a `keepalive-output-steam` that can encapsulate all the logic for -;; maintaining the keepalive loop -;; -;; 2. I don't remember why we need to wait for code elsewhere the output stream? It seems totally reasonable to -;; require `f` to block until it's done writing to the stream, since we have to wait anyway. The keepalive bytes are -;; already being handled on a separate thread - (def ^:private keepalive-interval-ms "Interval between sending newline characters to keep Heroku from terminating requests like queries that take a long time to complete." (u/seconds->ms 1)) ; one second -(def ^:private absolute-max-keepalive-ms - "Absolute maximum amount of time to wait for a response to return results, instead of keeping the connection open - forever. Normally we'll eventually give up when a connection is closed, but if someone keeps the connection open - forever, or if there's a bug in the API code (and `respond` is never called, or a value is never written to the - channel it returns) give up after 4 hours." - (cond - config/is-prod? (u/hours->ms 4) - config/is-dev? (u/minutes->ms 10) - config/is-test? (u/minutes->ms 1))) +(defn- jetty-eof-canceling-output-stream + "Wraps an `OutputStream` and sends a message to `canceled-chan` if a jetty `EofException` is thrown when writing to + the stream." + ^OutputStream [^OutputStream os canceled-chan] + (proxy [FilterOutputStream] [os] + (write + ([x] + (try + (if (int? x) + (.write os ^int x) + (.write os ^bytes x)) + (catch EofException e + (a/>!! canceled-chan ::cancel) + (throw e)))) + + ([^bytes ba ^Integer off ^Integer len] + (try + (.write os ba off len) + (catch EofException e + (a/>!! canceled-chan ::cancel) + (throw e))))))) + +(defn- keepalive-output-stream + "Wraps an `OutputStream` and writes keepalive newline bytes every interval until someone else starts writing to the + stream." + ^OutputStream [^OutputStream os write-keepalive-newlines?] + (let [write-newlines? (atom true)] + (a/go-loop [] + (a/<! (a/timeout keepalive-interval-ms)) + (when @write-newlines? + (when write-keepalive-newlines? + (.write os (byte \newline))) + (.flush os) + (recur))) + (proxy [FilterOutputStream] [os] + (close [] + (reset! write-newlines? false) + (let [^FilterOutputStream this this] + (proxy-super close))) + (write + ([x] + (reset! write-newlines? false) + (if (int? x) + (.write os ^int x) + (.write os ^bytes x))) + + ([^bytes ba ^Integer off ^Integer len] + (reset! write-newlines? false) + (.write os ba off len)))))) + +(defmacro ^:private with-open-chan [[chan-binding chan] & body] + `(let [chan# ~chan + ~chan-binding chan#] + (try + ~@body + (finally + (a/close! chan#))))) ;; TODO - this code is basically duplicated with the code in the QP catch-exceptions middleware; we should refactor to ;; remove the duplication @@ -53,7 +85,7 @@ (let [format-ex* (fn [^Throwable e] {:message (.getMessage e) :class (.getCanonicalName (class e)) - :stacktrace (map str (.getStackTrace e)) + :stacktrace (mapv str (.getStackTrace e)) :data (ex-data e)}) [e & more :as chain] (exception-chain e)] (merge @@ -64,161 +96,58 @@ (when (seq more) {:via (map format-ex* more)})))) -(defn write-error-and-close! - "Util fn for writing an Exception to the OutputStream provided by `streaming-response`." - [^OutputStream os, ^Throwable e] - (with-open [os os - writer (BufferedWriter. (OutputStreamWriter. os StandardCharsets/UTF_8))] - (try - (json/generate-stream (format-exception e) - writer) - (.flush writer) - (catch EofException _) - (catch Throwable e - (log/error e (trs "Error writing error to stream")))))) - -(defn- proxy-output-stream - "Proxy that wraps an `OutputStream` and: - - 1. Sends a message to `they-have-started-writing-chan` whenever someone writes something - 2. Sends a message to `they-are-done-chan` whenever someone closes the output stream - - The overhead of this compared to the wrapped `OutputStream` is relatively low -- ~85 ms for 1 million writes to disk - vs ~25 ms for a raw OutputStream." - ^OutputStream [^OutputStream os {:keys [they-have-started-writing-chan they-are-done-chan]}] - (let [send-begin-message! (delay - (a/>!! they-have-started-writing-chan ::wrote-something)) - send-close-message! (delay - (a/>!! they-are-done-chan ::closed))] - ;; TODO -- consider making this a `FilterInputStream` so it can actually take `os` as a constructor arg and - ;; provide default impls for some methods - (proxy [OutputStream] [] - (close [] - @send-close-message! - (u/ignore-exceptions - (.close os))) - (flush [] - (u/ignore-exceptions - (.flush os))) - (write - ([x] - @send-begin-message! - (if (int? x) - (.write os ^int x) - (.write os ^bytes x))) - ([^bytes ba ^Integer off ^Integer len] - @send-begin-message! - (.write os ba off len)))))) - -(defn- start-newline-loop! - "Write a newline every `keepalive-interval-ms` (e.g., one second) until 'they' start writing to the output stream." - [^OutputStream os {:keys [they-have-started-writing-chan canceled-chan]} {:keys [write-keepalive-newlines?] - :or {write-keepalive-newlines? true}}] - (a/go-loop [] - (let [timeout-chan (a/timeout keepalive-interval-ms) - [val port] (a/alts! [they-have-started-writing-chan timeout-chan] :priority true)] - ;; TODO - are we sure it's safe to write this newline on a `core.async` thread? I don't see why it would block, - ;; but it certainly seems possible. But if we write the newline byte async it seems possible that they could - ;; have started writing before our newline byte thread gets ran. Guess this will have to do for now. - (when (= port timeout-chan) - (log/debug (u/format-color 'blue (trs "Response not ready, writing one byte & sleeping..."))) - (when (try - (when write-keepalive-newlines? - (.write os (byte \newline))) - (.flush os) - true - (catch EofException _ - (log/debug (u/format-color 'yellow (trs "connection closed, canceling request"))) - (a/>!! canceled-chan ::canceled) - false)) - (recur)))))) - -(defn- setup-timeout-and-close! - "Once `they-are-done-chan` or `canceled-chan` gets a message, or is closed; or if we not finished by the timeout, shut - everything down and flush/close the output stream." - [^OutputStream os {:keys [they-are-done-chan canceled-chan], :as chans}] - (a/go - (let [timeout-chan (a/timeout absolute-max-keepalive-ms) - [val port] (a/alts! [canceled-chan they-are-done-chan timeout-chan])] - (when (= port timeout-chan) - ;; let "them" know to cancel anything outstanding as well - (a/>! canceled-chan ::timed-out)) - ;; go ahead and close all the channels now - (doseq [chan (vals chans)] - (a/close! chan)) - ;; we can write the timeout error (if needed) and flush/close the stream on another thread. To avoid tying up - ;; our precious core.async thread. - (a/thread - (if (= port timeout-chan) - (u/ignore-exceptions - (write-error-and-close! os (ex-info (trs "Response not finished after waiting {0}. Canceling request." - (u/format-milliseconds absolute-max-keepalive-ms)) - {:status 504}))) - (u/ignore-exceptions - (.close os))))))) - -(defn- streaming-chans [] - ;; this channel will get a message when they start writing to the proxy output stream - {:they-have-started-writing-chan (a/promise-chan) - ;; this channel will get a message when the request is canceled. - :canceled-chan (a/promise-chan) - ;; this channel will get a message when they .close() the proxy output stream - :they-are-done-chan (a/promise-chan)}) - -(defn do-streaming-response - "Stream results of `f` to output stream, writing newlines as appropiate. You shouldn't use this function directly -- - use `streaming-response` instead -- but I had to make it public because Eastwood isn't able to figure out it's being - used since the only use is inside the `deftype+` below." - [^OutputStream os f options] - (let [chans (streaming-chans)] - (start-newline-loop! os chans options) - (setup-timeout-and-close! os chans) - ;; ok, we can call f now with a proxy-output-stream +(defn write-error! + "Write an error to the output stream, formatting it nicely." + [^OutputStream os obj] + (if (instance? Throwable obj) + (recur os (format-exception obj)) (try - (f (proxy-output-stream os chans) (:canceled-chan chans)) - (catch Throwable e - (write-error-and-close! os e) - (a/>!! (:canceled-chan chans) ::exception) - (doseq [chan (vals chans)] - (a/close! chan)))) - ;; result of this fn is ignored - nil)) + (with-open [writer (BufferedWriter. (OutputStreamWriter. os StandardCharsets/UTF_8))] + (json/generate-stream obj writer) + (.flush writer)) + (catch Throwable _)))) + +(defn- write-to-stream! [f {:keys [write-keepalive-newlines?]} ^OutputStream os] + (with-open-chan [canceled-chan (a/promise-chan)] + (with-open [os os + os (jetty-eof-canceling-output-stream os canceled-chan) + os (keepalive-output-stream os write-keepalive-newlines?)] + + (try + (f os canceled-chan) + (catch Throwable e + (write-error! os {:message (.getMessage e)})) + (finally + (.flush os)))))) (p.types/deftype+ StreamingResponse [f options] pretty/PrettyPrintable (pretty [_] - (list '->StreamingResponse f options)) + (list (symbol (str (.getCanonicalName StreamingResponse) \.)) f options)) ;; both sync and async responses ring.protocols/StreamableResponseBody (write-body-to-stream [_ _ os] - (do-streaming-response os f options)) + (write-to-stream! f options os)) ;; async responses only compojure.response/Sendable - (send* [this request respond raise] + (send* [this _ respond _] (respond (merge (ring.response/response this) {:content-type (:content-type options) :headers (:headers options) :status 202})))) (defmacro streaming-response - "Return an async, streaming, keepalive response. + "Return an streaming response that writes keepalive newline bytes. Minimal example: (streaming-response {:content-type \"applicaton/json; charset=utf-8\"} [os canceled-chan] - (let [futur (future - ;; start writing stuff (possibly async) - (write-stuff! os) - ;; close the output stream when you are finished - (.close os))] - ;; canceled-chan will get a message if the API request is canceled. Listen to it and kill any async stuff - (a/go - (when (nil? (a/<! canceled-chan)) - (future-cancel futur))) - ;; result of `streaming-response` is ignored - nil)) + (write-something-to-stream! os)) + + `f` should block until it is completely finished writing to the stream, which will be closed thereafter. + `canceled-chan` can be monitored to see if the request is canceled before results are fully written to the stream. Current options: @@ -226,7 +155,7 @@ * `:headers` -- other headers to include in the API response. * `:write-keepalive-newlines?` -- whether we should write keepalive newlines every `keepalive-interval-ms`. Default `true`; you can disable this for formats where it wouldn't work, such as CSV." - {:style/indent 2} + {:style/indent 2, :arglists '([options [os-binding canceled-chan-binding] & body])} [options [os-binding canceled-chan-binding :as bindings] & body] {:pre [(= (count bindings) 2)]} `(->StreamingResponse (fn [~(vary-meta os-binding assoc :tag 'java.io.OutputStream) ~canceled-chan-binding] ~@body) diff --git a/src/metabase/query_processor/streaming.clj b/src/metabase/query_processor/streaming.clj index 630f53ec3e71d36262baea6e467aaafab08202f7..d545a7a64c5377b63f72ae63f2cd3267b24278e4 100644 --- a/src/metabase/query_processor/streaming.clj +++ b/src/metabase/query_processor/streaming.clj @@ -1,9 +1,6 @@ (ns metabase.query-processor.streaming - (:require [cheshire.core :as json] - [clojure.core.async :as a] - [metabase.async - [streaming-response :as streaming-response] - [util :as async.u]] + (:require [clojure.core.async :as a] + [metabase.async.streaming-response :as streaming-response] [metabase.query-processor.context :as context] [metabase.query-processor.streaming [csv :as streaming.csv] @@ -11,9 +8,8 @@ [json :as streaming.json] [xlsx :as streaming.xlsx]] [metabase.util :as u]) - (:import [java.io BufferedWriter OutputStream OutputStreamWriter] - java.nio.charset.StandardCharsets - org.eclipse.jetty.io.EofException)) + (:import clojure.core.async.impl.channels.ManyToManyChannel + java.io.OutputStream)) ;; these are loaded for side-effects so their impls of `i/results-writer` will be available ;; TODO - consider whether we should lazy-load these! @@ -46,13 +42,6 @@ (.close os)) (context/resultf final-metadata context))) -(defn- write-qp-failure-and-close! [^OutputStream os result] - (with-open [writer (BufferedWriter. (OutputStreamWriter. os StandardCharsets/UTF_8))] - (try - (json/generate-stream result writer) - (catch EofException _))) - (.close os)) - (defn streaming-context "Context to pass to the QP to streaming results as `export-format` to an output stream. Can be used independently of the normal `streaming-response` macro, which is geared toward Ring responses. @@ -64,41 +53,23 @@ {:rff (streaming-rff results-writer) :reducedf (streaming-reducedf results-writer os)})) -;; TODO -- consider whether it makes sense to begin writing keepalive chars right away or if maybe we should wait to -;; call `respond` in async endpoints for 30-60 seconds that way we're not wasting a Ring thread right away -(defn do-streaming-response +(defn streaming-response* "Impl for `streaming-response`." - ^metabase.async.streaming_response.StreamingResponse [export-format run-query-fn] + [export-format f] (streaming-response/streaming-response (i/stream-options export-format) [os canceled-chan] - (let [out-chan (try - (run-query-fn (streaming-context export-format os)) - (catch Throwable e - e))] - (if (async.u/promise-chan? out-chan) - (a/go - (let [[val port] (a/alts! [out-chan canceled-chan] :priority true)] - (cond - ;; if result is an Exception or a QP failure response write that out (async) and close up - (and (= port out-chan) - (instance? Throwable val)) - (a/thread (streaming-response/write-error-and-close! os val)) - - (and (= port out-chan) - (map? val) - (= (:status val) :failed)) - (a/thread (write-qp-failure-and-close! os val)) - - ;; otherwise if the `cancled-chan` go a message we can tell the QP to cancel the running query by - ;; closing `out-chan` - (and (= port canceled-chan) - (nil? val)) - (a/close! out-chan)))) - ;; if we got something besides a channel, such as a Throwable, write it as JSON to the `out-chan` and close - (a/thread - (if (instance? Throwable out-chan) - (streaming-response/write-error-and-close! os out-chan) - (write-qp-failure-and-close! os out-chan)))) - nil))) + (let [result (try + (f (streaming-context export-format os)) + (catch Throwable e + e)) + result (if (instance? ManyToManyChannel result) + (let [[val port] (a/alts!! [result canceled-chan])] + (when (= port canceled-chan) + (a/close! result)) + val) + result)] + (when (or (instance? Throwable result) + (= (:status result) :failed)) + (streaming-response/write-error! os result))))) (defmacro streaming-response "Return results of processing a query as a streaming response. This response implements the appropriate Ring/Compojure @@ -109,11 +80,13 @@ (api/defendpoint GET \"/whatever\" [] (qp.streaming/streaming-response [context :json] - (qp/process-query-and-save-with-max-results-constraints! (assoc my-query :async? true) context)))" + (qp/process-query-and-save-with-max-results-constraints! (assoc query :async true) context))) + + Handles either async or sync QP results, but you should prefer returning sync results so we can handle query + cancelations properly." {:style/indent 1} - [[context-binding export-format :as bindings] & body] - {:pre [(vector? bindings) (= (count bindings) 2)]} - `(do-streaming-response ~export-format (fn [~context-binding] ~@body))) + [[context-binding export-format] & body] + `(streaming-response* ~export-format (fn [~context-binding] ~@body))) (defn export-formats "Set of valid streaming response formats. Currently, `:json`, `:csv`, `:xlsx`, and `:api` (normal JSON API results diff --git a/test/metabase/api/dataset_test.clj b/test/metabase/api/dataset_test.clj index df621d76bb33db7428bc485653c64bf3bddea26b..242bd1a0b2d537f2a20f910fc1e7cd53fd01282c 100644 --- a/test/metabase/api/dataset_test.clj +++ b/test/metabase/api/dataset_test.clj @@ -172,6 +172,7 @@ (spreadsheet/select-columns {:A "Values"}))))) (defn- parse-and-sort-csv [response] + (assert (some? response)) (sort-by ;; ID in CSV is a string, parse it and sort it to get the first 5 (comp #(Integer/parseInt %) first) @@ -230,8 +231,10 @@ {:database mbql.s/saved-questions-virtual-database-id :type :query :query {:source-table (str "card__" (u/get-id card))}}))] - (is (= 16 - (count (csv/read-csv result))))))) + (is (some? result)) + (when (some? result) + (is (= 16 + (count (csv/read-csv result)))))))) ;; POST /api/dataset/:format ;; @@ -248,8 +251,10 @@ :middleware {:add-default-userland-constraints? true :userland-query? true}}))] - (is (= 101 - (count (csv/read-csv result))))))) + (is (some? result)) + (when (some? result) + (is (= 101 + (count (csv/read-csv result)))))))) ;; non-"download" queries should still get the default constraints ;; (this also is a sanitiy check to make sure the `with-redefs` in the test above actually works) diff --git a/test/metabase/query_processor/streaming_test.clj b/test/metabase/query_processor/streaming_test.clj index 5a14a697e06837ac9dafa66f8f38737b2aea7080..29adb1735d6d4b3761e9ed2ff057e61fc90afacc 100644 --- a/test/metabase/query_processor/streaming_test.clj +++ b/test/metabase/query_processor/streaming_test.clj @@ -1,6 +1,5 @@ (ns metabase.query-processor.streaming-test (:require [cheshire.core :as json] - [clojure.core.async :as a] [clojure.data.csv :as csv] [clojure.test :refer :all] [dk.ative.docjure.spreadsheet :as spreadsheet] @@ -13,8 +12,7 @@ [metabase.test.util :as tu] [ring.core.protocols :as ring.protocols] [toucan.db :as db]) - (:import [java.io BufferedInputStream BufferedOutputStream FilterOutputStream InputStream InputStreamReader - OutputStream PipedInputStream PipedOutputStream])) + (:import [java.io BufferedInputStream BufferedOutputStream ByteArrayInputStream ByteArrayOutputStream InputStream InputStreamReader])) (defmulti ^:private parse-result {:arglists '([export-format ^InputStream input-stream])} @@ -41,29 +39,31 @@ (spreadsheet/select-columns {:A "ID", :B "Name", :C "Category ID", :D "Latitude", :E "Longitude", :F "Price"}) rest)) -(defn- close-notifying-stream - ^FilterOutputStream [close-chan ^OutputStream os] - (proxy [FilterOutputStream] [os] - (close [] - (a/put! close-chan ::close) - (try - (.flush os) - (.close os) - (catch Throwable _))))) - -(defn- process-query-streaming [export-format query] +(defn- process-query-basic-streaming [export-format query] (with-redefs [streaming-response/keepalive-interval-ms 2] - (mt/with-open-channels [close-chan (a/promise-chan)] - (with-open [pos (PipedOutputStream.) - os (close-notifying-stream close-chan (BufferedOutputStream. pos)) - is (BufferedInputStream. (PipedInputStream. pos))] - (ring.protocols/write-body-to-stream - (qp.streaming/streaming-response [context export-format] - (qp/process-query-async query (assoc context :timeout 5000))) - nil - os) - (mt/wait-for-result close-chan 100) - (parse-result export-format is))))) + (with-open [bos (ByteArrayOutputStream.) + os (BufferedOutputStream. bos)] + (qp/process-query query (assoc (qp.streaming/streaming-context export-format os) + :timeout 15000)) + (.flush os) + (let [bytea (.toByteArray bos)] + (with-open [is (BufferedInputStream. (ByteArrayInputStream. bytea))] + (parse-result export-format is)))))) + +(defn- process-query-api-response-streaming [export-format query] + (with-redefs [streaming-response/keepalive-interval-ms 2] + (with-open [bos (ByteArrayOutputStream.) + os (BufferedOutputStream. bos)] + (ring.protocols/write-body-to-stream + (qp.streaming/streaming-response [context export-format] + (qp/process-query-async query (assoc context :timeout 5000))) + nil + os) + (.flush os) + (.flush bos) + (let [bytea (.toByteArray bos)] + (with-open [is (BufferedInputStream. (ByteArrayInputStream. bytea))] + (parse-result export-format is)))))) (defmulti ^:private expected-results {:arglists '([export-format normal-results])} @@ -104,17 +104,34 @@ (cond-> x (map? x) (m/dissoc-in [:data :results_metadata :checksum]))) -(defn- compare-results-for-query [export-format query] - (let [query query - streaming-results (maybe-remove-checksum (process-query-streaming export-format query)) - expected-results (maybe-remove-checksum (expected-results export-format (qp/process-query query)))] - (is (= expected-results - streaming-results)))) +(defn- expected-results* [export-format query] + (maybe-remove-checksum (expected-results export-format (qp/process-query query)))) + +(defn- basic-actual-results* [export-format query] + (maybe-remove-checksum (process-query-basic-streaming export-format query))) + +(deftest basic-streaming-test [] + (testing "Test that the underlying qp.streaming context logic itself works correctly. Not an end-to-end test!" + (let [query (mt/mbql-query venues + {:order-by [[:asc $id]] + :limit 5})] + (doseq [export-format (qp.streaming/export-formats)] + (testing export-format + (is (= (expected-results* export-format query) + (basic-actual-results* export-format query)))))))) + +(defn- actual-results* [export-format query] + (maybe-remove-checksum (process-query-api-response-streaming export-format query))) + +(defn- compare-results [export-format query] + (is (= (expected-results* export-format query) + (actual-results* export-format query)))) (deftest streaming-response-test - (doseq [export-format (qp.streaming/export-formats)] - (testing export-format - (compare-results-for-query export-format (mt/mbql-query venues {:limit 5}))))) + (testing "Test that the actual results going thru the same steps as an API response are correct." + (doseq [export-format (qp.streaming/export-formats)] + (testing export-format + (compare-results export-format (mt/mbql-query venues {:limit 5})))))) (deftest utf8-test ;; UTF-8 isn't currently working for XLSX -- fix me @@ -122,11 +139,11 @@ (testing export-format (testing "Make sure our various streaming formats properly write values as UTF-8." (testing "A query that will have a little → in its name" - (compare-results-for-query export-format (mt/mbql-query venues - {:fields [$name $category_id->categories.name] - :order-by [[:asc $id]] - :limit 5}))) + (compare-results export-format (mt/mbql-query venues + {:fields [$name $category_id->categories.name] + :order-by [[:asc $id]] + :limit 5}))) (testing "A query with emoji and other fancy unicode" (let [[sql & args] (db/honeysql->sql {:select [["Cam ðŒ† Saul 💩" :cam]]})] - (compare-results-for-query export-format (mt/native-query {:query sql - :params args})))))))) + (compare-results export-format (mt/native-query {:query sql + :params args}))))))))