Skip to content
Snippets Groups Projects
Unverified Commit e5d8439a authored by Cam Saul's avatar Cam Saul Committed by GitHub
Browse files

Simplify streaming QP/API response glue namespaces (#11943)

parent 1befabcf
No related branches found
No related tags found
No related merge requests found
......@@ -264,8 +264,7 @@
(contains? response :status)
(contains? response :body))
response
{:status (if (or (instance? ManyToManyChannel response)
(instance? StreamingResponse response))
{:status (if (some #(instance? % response) [ManyToManyChannel StreamingResponse])
202
200)
:body response}))
(ns metabase.async.streaming-response
"A special Ring response type that can handle async, streaming results. It writes newlines as 'heartbeats' to the
client until the real results are ready to begin streaming, then streams those to the client."
(:require [cheshire.core :as json]
[clojure.core.async :as a]
[clojure.tools.logging :as log]
compojure.response
[metabase
[config :as config]
[util :as u]]
[metabase.util.i18n :refer [trs]]
[metabase.util :as u]
[potemkin.types :as p.types]
[pretty.core :as pretty]
[ring.core.protocols :as ring.protocols]
[ring.util.response :as ring.response])
(:import [java.io BufferedWriter OutputStream OutputStreamWriter]
(:import [java.io BufferedWriter FilterOutputStream OutputStream OutputStreamWriter]
java.nio.charset.StandardCharsets
org.eclipse.jetty.io.EofException))
;; TODO - this whole namespace seems a lot more complicated than it needs to be, simplify it. We can probably simplify
;; it by 50% or so
;; 1. Change `proxy-output-stream` to something like a `keepalive-output-steam` that can encapsulate all the logic for
;; maintaining the keepalive loop
;;
;; 2. I don't remember why we need to wait for code elsewhere the output stream? It seems totally reasonable to
;; require `f` to block until it's done writing to the stream, since we have to wait anyway. The keepalive bytes are
;; already being handled on a separate thread
(def ^:private keepalive-interval-ms
"Interval between sending newline characters to keep Heroku from terminating requests like queries that take a long
time to complete."
(u/seconds->ms 1)) ; one second
(def ^:private absolute-max-keepalive-ms
"Absolute maximum amount of time to wait for a response to return results, instead of keeping the connection open
forever. Normally we'll eventually give up when a connection is closed, but if someone keeps the connection open
forever, or if there's a bug in the API code (and `respond` is never called, or a value is never written to the
channel it returns) give up after 4 hours."
(cond
config/is-prod? (u/hours->ms 4)
config/is-dev? (u/minutes->ms 10)
config/is-test? (u/minutes->ms 1)))
(defn- jetty-eof-canceling-output-stream
"Wraps an `OutputStream` and sends a message to `canceled-chan` if a jetty `EofException` is thrown when writing to
the stream."
^OutputStream [^OutputStream os canceled-chan]
(proxy [FilterOutputStream] [os]
(write
([x]
(try
(if (int? x)
(.write os ^int x)
(.write os ^bytes x))
(catch EofException e
(a/>!! canceled-chan ::cancel)
(throw e))))
([^bytes ba ^Integer off ^Integer len]
(try
(.write os ba off len)
(catch EofException e
(a/>!! canceled-chan ::cancel)
(throw e)))))))
(defn- keepalive-output-stream
"Wraps an `OutputStream` and writes keepalive newline bytes every interval until someone else starts writing to the
stream."
^OutputStream [^OutputStream os write-keepalive-newlines?]
(let [write-newlines? (atom true)]
(a/go-loop []
(a/<! (a/timeout keepalive-interval-ms))
(when @write-newlines?
(when write-keepalive-newlines?
(.write os (byte \newline)))
(.flush os)
(recur)))
(proxy [FilterOutputStream] [os]
(close []
(reset! write-newlines? false)
(let [^FilterOutputStream this this]
(proxy-super close)))
(write
([x]
(reset! write-newlines? false)
(if (int? x)
(.write os ^int x)
(.write os ^bytes x)))
([^bytes ba ^Integer off ^Integer len]
(reset! write-newlines? false)
(.write os ba off len))))))
(defmacro ^:private with-open-chan [[chan-binding chan] & body]
`(let [chan# ~chan
~chan-binding chan#]
(try
~@body
(finally
(a/close! chan#)))))
;; TODO - this code is basically duplicated with the code in the QP catch-exceptions middleware; we should refactor to
;; remove the duplication
......@@ -53,7 +85,7 @@
(let [format-ex* (fn [^Throwable e]
{:message (.getMessage e)
:class (.getCanonicalName (class e))
:stacktrace (map str (.getStackTrace e))
:stacktrace (mapv str (.getStackTrace e))
:data (ex-data e)})
[e & more :as chain] (exception-chain e)]
(merge
......@@ -64,161 +96,58 @@
(when (seq more)
{:via (map format-ex* more)}))))
(defn write-error-and-close!
"Util fn for writing an Exception to the OutputStream provided by `streaming-response`."
[^OutputStream os, ^Throwable e]
(with-open [os os
writer (BufferedWriter. (OutputStreamWriter. os StandardCharsets/UTF_8))]
(try
(json/generate-stream (format-exception e)
writer)
(.flush writer)
(catch EofException _)
(catch Throwable e
(log/error e (trs "Error writing error to stream"))))))
(defn- proxy-output-stream
"Proxy that wraps an `OutputStream` and:
1. Sends a message to `they-have-started-writing-chan` whenever someone writes something
2. Sends a message to `they-are-done-chan` whenever someone closes the output stream
The overhead of this compared to the wrapped `OutputStream` is relatively low -- ~85 ms for 1 million writes to disk
vs ~25 ms for a raw OutputStream."
^OutputStream [^OutputStream os {:keys [they-have-started-writing-chan they-are-done-chan]}]
(let [send-begin-message! (delay
(a/>!! they-have-started-writing-chan ::wrote-something))
send-close-message! (delay
(a/>!! they-are-done-chan ::closed))]
;; TODO -- consider making this a `FilterInputStream` so it can actually take `os` as a constructor arg and
;; provide default impls for some methods
(proxy [OutputStream] []
(close []
@send-close-message!
(u/ignore-exceptions
(.close os)))
(flush []
(u/ignore-exceptions
(.flush os)))
(write
([x]
@send-begin-message!
(if (int? x)
(.write os ^int x)
(.write os ^bytes x)))
([^bytes ba ^Integer off ^Integer len]
@send-begin-message!
(.write os ba off len))))))
(defn- start-newline-loop!
"Write a newline every `keepalive-interval-ms` (e.g., one second) until 'they' start writing to the output stream."
[^OutputStream os {:keys [they-have-started-writing-chan canceled-chan]} {:keys [write-keepalive-newlines?]
:or {write-keepalive-newlines? true}}]
(a/go-loop []
(let [timeout-chan (a/timeout keepalive-interval-ms)
[val port] (a/alts! [they-have-started-writing-chan timeout-chan] :priority true)]
;; TODO - are we sure it's safe to write this newline on a `core.async` thread? I don't see why it would block,
;; but it certainly seems possible. But if we write the newline byte async it seems possible that they could
;; have started writing before our newline byte thread gets ran. Guess this will have to do for now.
(when (= port timeout-chan)
(log/debug (u/format-color 'blue (trs "Response not ready, writing one byte & sleeping...")))
(when (try
(when write-keepalive-newlines?
(.write os (byte \newline)))
(.flush os)
true
(catch EofException _
(log/debug (u/format-color 'yellow (trs "connection closed, canceling request")))
(a/>!! canceled-chan ::canceled)
false))
(recur))))))
(defn- setup-timeout-and-close!
"Once `they-are-done-chan` or `canceled-chan` gets a message, or is closed; or if we not finished by the timeout, shut
everything down and flush/close the output stream."
[^OutputStream os {:keys [they-are-done-chan canceled-chan], :as chans}]
(a/go
(let [timeout-chan (a/timeout absolute-max-keepalive-ms)
[val port] (a/alts! [canceled-chan they-are-done-chan timeout-chan])]
(when (= port timeout-chan)
;; let "them" know to cancel anything outstanding as well
(a/>! canceled-chan ::timed-out))
;; go ahead and close all the channels now
(doseq [chan (vals chans)]
(a/close! chan))
;; we can write the timeout error (if needed) and flush/close the stream on another thread. To avoid tying up
;; our precious core.async thread.
(a/thread
(if (= port timeout-chan)
(u/ignore-exceptions
(write-error-and-close! os (ex-info (trs "Response not finished after waiting {0}. Canceling request."
(u/format-milliseconds absolute-max-keepalive-ms))
{:status 504})))
(u/ignore-exceptions
(.close os)))))))
(defn- streaming-chans []
;; this channel will get a message when they start writing to the proxy output stream
{:they-have-started-writing-chan (a/promise-chan)
;; this channel will get a message when the request is canceled.
:canceled-chan (a/promise-chan)
;; this channel will get a message when they .close() the proxy output stream
:they-are-done-chan (a/promise-chan)})
(defn do-streaming-response
"Stream results of `f` to output stream, writing newlines as appropiate. You shouldn't use this function directly --
use `streaming-response` instead -- but I had to make it public because Eastwood isn't able to figure out it's being
used since the only use is inside the `deftype+` below."
[^OutputStream os f options]
(let [chans (streaming-chans)]
(start-newline-loop! os chans options)
(setup-timeout-and-close! os chans)
;; ok, we can call f now with a proxy-output-stream
(defn write-error!
"Write an error to the output stream, formatting it nicely."
[^OutputStream os obj]
(if (instance? Throwable obj)
(recur os (format-exception obj))
(try
(f (proxy-output-stream os chans) (:canceled-chan chans))
(catch Throwable e
(write-error-and-close! os e)
(a/>!! (:canceled-chan chans) ::exception)
(doseq [chan (vals chans)]
(a/close! chan))))
;; result of this fn is ignored
nil))
(with-open [writer (BufferedWriter. (OutputStreamWriter. os StandardCharsets/UTF_8))]
(json/generate-stream obj writer)
(.flush writer))
(catch Throwable _))))
(defn- write-to-stream! [f {:keys [write-keepalive-newlines?]} ^OutputStream os]
(with-open-chan [canceled-chan (a/promise-chan)]
(with-open [os os
os (jetty-eof-canceling-output-stream os canceled-chan)
os (keepalive-output-stream os write-keepalive-newlines?)]
(try
(f os canceled-chan)
(catch Throwable e
(write-error! os {:message (.getMessage e)}))
(finally
(.flush os))))))
(p.types/deftype+ StreamingResponse [f options]
pretty/PrettyPrintable
(pretty [_]
(list '->StreamingResponse f options))
(list (symbol (str (.getCanonicalName StreamingResponse) \.)) f options))
;; both sync and async responses
ring.protocols/StreamableResponseBody
(write-body-to-stream [_ _ os]
(do-streaming-response os f options))
(write-to-stream! f options os))
;; async responses only
compojure.response/Sendable
(send* [this request respond raise]
(send* [this _ respond _]
(respond (merge (ring.response/response this)
{:content-type (:content-type options)
:headers (:headers options)
:status 202}))))
(defmacro streaming-response
"Return an async, streaming, keepalive response.
"Return an streaming response that writes keepalive newline bytes.
Minimal example:
(streaming-response {:content-type \"applicaton/json; charset=utf-8\"} [os canceled-chan]
(let [futur (future
;; start writing stuff (possibly async)
(write-stuff! os)
;; close the output stream when you are finished
(.close os))]
;; canceled-chan will get a message if the API request is canceled. Listen to it and kill any async stuff
(a/go
(when (nil? (a/<! canceled-chan))
(future-cancel futur)))
;; result of `streaming-response` is ignored
nil))
(write-something-to-stream! os))
`f` should block until it is completely finished writing to the stream, which will be closed thereafter.
`canceled-chan` can be monitored to see if the request is canceled before results are fully written to the stream.
Current options:
......@@ -226,7 +155,7 @@
* `:headers` -- other headers to include in the API response.
* `:write-keepalive-newlines?` -- whether we should write keepalive newlines every `keepalive-interval-ms`. Default
`true`; you can disable this for formats where it wouldn't work, such as CSV."
{:style/indent 2}
{:style/indent 2, :arglists '([options [os-binding canceled-chan-binding] & body])}
[options [os-binding canceled-chan-binding :as bindings] & body]
{:pre [(= (count bindings) 2)]}
`(->StreamingResponse (fn [~(vary-meta os-binding assoc :tag 'java.io.OutputStream) ~canceled-chan-binding] ~@body)
......
(ns metabase.query-processor.streaming
(:require [cheshire.core :as json]
[clojure.core.async :as a]
[metabase.async
[streaming-response :as streaming-response]
[util :as async.u]]
(:require [clojure.core.async :as a]
[metabase.async.streaming-response :as streaming-response]
[metabase.query-processor.context :as context]
[metabase.query-processor.streaming
[csv :as streaming.csv]
......@@ -11,9 +8,8 @@
[json :as streaming.json]
[xlsx :as streaming.xlsx]]
[metabase.util :as u])
(:import [java.io BufferedWriter OutputStream OutputStreamWriter]
java.nio.charset.StandardCharsets
org.eclipse.jetty.io.EofException))
(:import clojure.core.async.impl.channels.ManyToManyChannel
java.io.OutputStream))
;; these are loaded for side-effects so their impls of `i/results-writer` will be available
;; TODO - consider whether we should lazy-load these!
......@@ -46,13 +42,6 @@
(.close os))
(context/resultf final-metadata context)))
(defn- write-qp-failure-and-close! [^OutputStream os result]
(with-open [writer (BufferedWriter. (OutputStreamWriter. os StandardCharsets/UTF_8))]
(try
(json/generate-stream result writer)
(catch EofException _)))
(.close os))
(defn streaming-context
"Context to pass to the QP to streaming results as `export-format` to an output stream. Can be used independently of
the normal `streaming-response` macro, which is geared toward Ring responses.
......@@ -64,41 +53,23 @@
{:rff (streaming-rff results-writer)
:reducedf (streaming-reducedf results-writer os)}))
;; TODO -- consider whether it makes sense to begin writing keepalive chars right away or if maybe we should wait to
;; call `respond` in async endpoints for 30-60 seconds that way we're not wasting a Ring thread right away
(defn do-streaming-response
(defn streaming-response*
"Impl for `streaming-response`."
^metabase.async.streaming_response.StreamingResponse [export-format run-query-fn]
[export-format f]
(streaming-response/streaming-response (i/stream-options export-format) [os canceled-chan]
(let [out-chan (try
(run-query-fn (streaming-context export-format os))
(catch Throwable e
e))]
(if (async.u/promise-chan? out-chan)
(a/go
(let [[val port] (a/alts! [out-chan canceled-chan] :priority true)]
(cond
;; if result is an Exception or a QP failure response write that out (async) and close up
(and (= port out-chan)
(instance? Throwable val))
(a/thread (streaming-response/write-error-and-close! os val))
(and (= port out-chan)
(map? val)
(= (:status val) :failed))
(a/thread (write-qp-failure-and-close! os val))
;; otherwise if the `cancled-chan` go a message we can tell the QP to cancel the running query by
;; closing `out-chan`
(and (= port canceled-chan)
(nil? val))
(a/close! out-chan))))
;; if we got something besides a channel, such as a Throwable, write it as JSON to the `out-chan` and close
(a/thread
(if (instance? Throwable out-chan)
(streaming-response/write-error-and-close! os out-chan)
(write-qp-failure-and-close! os out-chan))))
nil)))
(let [result (try
(f (streaming-context export-format os))
(catch Throwable e
e))
result (if (instance? ManyToManyChannel result)
(let [[val port] (a/alts!! [result canceled-chan])]
(when (= port canceled-chan)
(a/close! result))
val)
result)]
(when (or (instance? Throwable result)
(= (:status result) :failed))
(streaming-response/write-error! os result)))))
(defmacro streaming-response
"Return results of processing a query as a streaming response. This response implements the appropriate Ring/Compojure
......@@ -109,11 +80,13 @@
(api/defendpoint GET \"/whatever\" []
(qp.streaming/streaming-response [context :json]
(qp/process-query-and-save-with-max-results-constraints! (assoc my-query :async? true) context)))"
(qp/process-query-and-save-with-max-results-constraints! (assoc query :async true) context)))
Handles either async or sync QP results, but you should prefer returning sync results so we can handle query
cancelations properly."
{:style/indent 1}
[[context-binding export-format :as bindings] & body]
{:pre [(vector? bindings) (= (count bindings) 2)]}
`(do-streaming-response ~export-format (fn [~context-binding] ~@body)))
[[context-binding export-format] & body]
`(streaming-response* ~export-format (fn [~context-binding] ~@body)))
(defn export-formats
"Set of valid streaming response formats. Currently, `:json`, `:csv`, `:xlsx`, and `:api` (normal JSON API results
......
......@@ -172,6 +172,7 @@
(spreadsheet/select-columns {:A "Values"})))))
(defn- parse-and-sort-csv [response]
(assert (some? response))
(sort-by
;; ID in CSV is a string, parse it and sort it to get the first 5
(comp #(Integer/parseInt %) first)
......@@ -230,8 +231,10 @@
{:database mbql.s/saved-questions-virtual-database-id
:type :query
:query {:source-table (str "card__" (u/get-id card))}}))]
(is (= 16
(count (csv/read-csv result)))))))
(is (some? result))
(when (some? result)
(is (= 16
(count (csv/read-csv result))))))))
;; POST /api/dataset/:format
;;
......@@ -248,8 +251,10 @@
:middleware
{:add-default-userland-constraints? true
:userland-query? true}}))]
(is (= 101
(count (csv/read-csv result)))))))
(is (some? result))
(when (some? result)
(is (= 101
(count (csv/read-csv result))))))))
;; non-"download" queries should still get the default constraints
;; (this also is a sanitiy check to make sure the `with-redefs` in the test above actually works)
......
(ns metabase.query-processor.streaming-test
(:require [cheshire.core :as json]
[clojure.core.async :as a]
[clojure.data.csv :as csv]
[clojure.test :refer :all]
[dk.ative.docjure.spreadsheet :as spreadsheet]
......@@ -13,8 +12,7 @@
[metabase.test.util :as tu]
[ring.core.protocols :as ring.protocols]
[toucan.db :as db])
(:import [java.io BufferedInputStream BufferedOutputStream FilterOutputStream InputStream InputStreamReader
OutputStream PipedInputStream PipedOutputStream]))
(:import [java.io BufferedInputStream BufferedOutputStream ByteArrayInputStream ByteArrayOutputStream InputStream InputStreamReader]))
(defmulti ^:private parse-result
{:arglists '([export-format ^InputStream input-stream])}
......@@ -41,29 +39,31 @@
(spreadsheet/select-columns {:A "ID", :B "Name", :C "Category ID", :D "Latitude", :E "Longitude", :F "Price"})
rest))
(defn- close-notifying-stream
^FilterOutputStream [close-chan ^OutputStream os]
(proxy [FilterOutputStream] [os]
(close []
(a/put! close-chan ::close)
(try
(.flush os)
(.close os)
(catch Throwable _)))))
(defn- process-query-streaming [export-format query]
(defn- process-query-basic-streaming [export-format query]
(with-redefs [streaming-response/keepalive-interval-ms 2]
(mt/with-open-channels [close-chan (a/promise-chan)]
(with-open [pos (PipedOutputStream.)
os (close-notifying-stream close-chan (BufferedOutputStream. pos))
is (BufferedInputStream. (PipedInputStream. pos))]
(ring.protocols/write-body-to-stream
(qp.streaming/streaming-response [context export-format]
(qp/process-query-async query (assoc context :timeout 5000)))
nil
os)
(mt/wait-for-result close-chan 100)
(parse-result export-format is)))))
(with-open [bos (ByteArrayOutputStream.)
os (BufferedOutputStream. bos)]
(qp/process-query query (assoc (qp.streaming/streaming-context export-format os)
:timeout 15000))
(.flush os)
(let [bytea (.toByteArray bos)]
(with-open [is (BufferedInputStream. (ByteArrayInputStream. bytea))]
(parse-result export-format is))))))
(defn- process-query-api-response-streaming [export-format query]
(with-redefs [streaming-response/keepalive-interval-ms 2]
(with-open [bos (ByteArrayOutputStream.)
os (BufferedOutputStream. bos)]
(ring.protocols/write-body-to-stream
(qp.streaming/streaming-response [context export-format]
(qp/process-query-async query (assoc context :timeout 5000)))
nil
os)
(.flush os)
(.flush bos)
(let [bytea (.toByteArray bos)]
(with-open [is (BufferedInputStream. (ByteArrayInputStream. bytea))]
(parse-result export-format is))))))
(defmulti ^:private expected-results
{:arglists '([export-format normal-results])}
......@@ -104,17 +104,34 @@
(cond-> x
(map? x) (m/dissoc-in [:data :results_metadata :checksum])))
(defn- compare-results-for-query [export-format query]
(let [query query
streaming-results (maybe-remove-checksum (process-query-streaming export-format query))
expected-results (maybe-remove-checksum (expected-results export-format (qp/process-query query)))]
(is (= expected-results
streaming-results))))
(defn- expected-results* [export-format query]
(maybe-remove-checksum (expected-results export-format (qp/process-query query))))
(defn- basic-actual-results* [export-format query]
(maybe-remove-checksum (process-query-basic-streaming export-format query)))
(deftest basic-streaming-test []
(testing "Test that the underlying qp.streaming context logic itself works correctly. Not an end-to-end test!"
(let [query (mt/mbql-query venues
{:order-by [[:asc $id]]
:limit 5})]
(doseq [export-format (qp.streaming/export-formats)]
(testing export-format
(is (= (expected-results* export-format query)
(basic-actual-results* export-format query))))))))
(defn- actual-results* [export-format query]
(maybe-remove-checksum (process-query-api-response-streaming export-format query)))
(defn- compare-results [export-format query]
(is (= (expected-results* export-format query)
(actual-results* export-format query))))
(deftest streaming-response-test
(doseq [export-format (qp.streaming/export-formats)]
(testing export-format
(compare-results-for-query export-format (mt/mbql-query venues {:limit 5})))))
(testing "Test that the actual results going thru the same steps as an API response are correct."
(doseq [export-format (qp.streaming/export-formats)]
(testing export-format
(compare-results export-format (mt/mbql-query venues {:limit 5}))))))
(deftest utf8-test
;; UTF-8 isn't currently working for XLSX -- fix me
......@@ -122,11 +139,11 @@
(testing export-format
(testing "Make sure our various streaming formats properly write values as UTF-8."
(testing "A query that will have a little → in its name"
(compare-results-for-query export-format (mt/mbql-query venues
{:fields [$name $category_id->categories.name]
:order-by [[:asc $id]]
:limit 5})))
(compare-results export-format (mt/mbql-query venues
{:fields [$name $category_id->categories.name]
:order-by [[:asc $id]]
:limit 5})))
(testing "A query with emoji and other fancy unicode"
(let [[sql & args] (db/honeysql->sql {:select [["Cam 𝌆 Saul 💩" :cam]]})]
(compare-results-for-query export-format (mt/native-query {:query sql
:params args}))))))))
(compare-results export-format (mt/native-query {:query sql
:params args}))))))))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment