From e5d8439a6629be461d2fb756e86587f679505d3f Mon Sep 17 00:00:00 2001
From: Cam Saul <1455846+camsaul@users.noreply.github.com>
Date: Wed, 19 Feb 2020 16:44:25 -0800
Subject: [PATCH] Simplify streaming QP/API response glue namespaces (#11943)

---
 src/metabase/api/common/internal.clj          |   3 +-
 src/metabase/async/streaming_response.clj     | 253 +++++++-----------
 src/metabase/query_processor/streaming.clj    |  77 ++----
 test/metabase/api/dataset_test.clj            |  13 +-
 .../query_processor/streaming_test.clj        |  97 ++++---
 5 files changed, 183 insertions(+), 260 deletions(-)

diff --git a/src/metabase/api/common/internal.clj b/src/metabase/api/common/internal.clj
index 7862acfc4d8..50a621b0874 100644
--- a/src/metabase/api/common/internal.clj
+++ b/src/metabase/api/common/internal.clj
@@ -264,8 +264,7 @@
            (contains? response :status)
            (contains? response :body))
     response
-    {:status (if (or (instance? ManyToManyChannel response)
-                     (instance? StreamingResponse response))
+    {:status (if (some #(instance? % response) [ManyToManyChannel StreamingResponse])
                202
                200)
      :body   response}))
diff --git a/src/metabase/async/streaming_response.clj b/src/metabase/async/streaming_response.clj
index c74c50bfd2e..8c47e49eca6 100644
--- a/src/metabase/async/streaming_response.clj
+++ b/src/metabase/async/streaming_response.clj
@@ -1,46 +1,78 @@
 (ns metabase.async.streaming-response
-  "A special Ring response type that can handle async, streaming results. It writes newlines as 'heartbeats' to the
-  client until the real results are ready to begin streaming, then streams those to the client."
   (:require [cheshire.core :as json]
             [clojure.core.async :as a]
-            [clojure.tools.logging :as log]
             compojure.response
-            [metabase
-             [config :as config]
-             [util :as u]]
-            [metabase.util.i18n :refer [trs]]
+            [metabase.util :as u]
             [potemkin.types :as p.types]
             [pretty.core :as pretty]
             [ring.core.protocols :as ring.protocols]
             [ring.util.response :as ring.response])
-  (:import [java.io BufferedWriter OutputStream OutputStreamWriter]
+  (:import [java.io BufferedWriter FilterOutputStream OutputStream OutputStreamWriter]
            java.nio.charset.StandardCharsets
            org.eclipse.jetty.io.EofException))
 
-;; TODO - this whole namespace seems a lot more complicated than it needs to be, simplify it. We can probably simplify
-;; it by 50% or so
-
-;; 1. Change `proxy-output-stream` to something like a `keepalive-output-steam` that can encapsulate all the logic for
-;; maintaining the keepalive loop
-;;
-;; 2. I don't remember why we need to wait for code elsewhere the output stream? It seems totally reasonable to
-;; require `f` to block until it's done writing to the stream, since we have to wait anyway. The keepalive bytes are
-;; already being handled on a separate thread
-
 (def ^:private keepalive-interval-ms
   "Interval between sending newline characters to keep Heroku from terminating requests like queries that take a long
   time to complete."
   (u/seconds->ms 1)) ; one second
 
-(def ^:private absolute-max-keepalive-ms
-  "Absolute maximum amount of time to wait for a response to return results, instead of keeping the connection open
-  forever. Normally we'll eventually give up when a connection is closed, but if someone keeps the connection open
-  forever, or if there's a bug in the API code (and `respond` is never called, or a value is never written to the
-  channel it returns) give up after 4 hours."
-  (cond
-    config/is-prod? (u/hours->ms 4)
-    config/is-dev?  (u/minutes->ms 10)
-    config/is-test? (u/minutes->ms 1)))
+(defn- jetty-eof-canceling-output-stream
+  "Wraps an `OutputStream` and sends a message to `canceled-chan` if a jetty `EofException` is thrown when writing to
+  the stream."
+  ^OutputStream [^OutputStream os canceled-chan]
+  (proxy [FilterOutputStream] [os]
+    (write
+      ([x]
+       (try
+         (if (int? x)
+           (.write os ^int x)
+           (.write os ^bytes x))
+         (catch EofException e
+           (a/>!! canceled-chan ::cancel)
+           (throw e))))
+
+      ([^bytes ba ^Integer off ^Integer len]
+       (try
+         (.write os ba off len)
+         (catch EofException e
+           (a/>!! canceled-chan ::cancel)
+           (throw e)))))))
+
+(defn- keepalive-output-stream
+  "Wraps an `OutputStream` and writes keepalive newline bytes every interval until someone else starts writing to the
+  stream."
+  ^OutputStream [^OutputStream os write-keepalive-newlines?]
+  (let [write-newlines? (atom true)]
+    (a/go-loop []
+      (a/<! (a/timeout keepalive-interval-ms))
+      (when @write-newlines?
+        (when write-keepalive-newlines?
+          (.write os (byte \newline)))
+        (.flush os)
+        (recur)))
+    (proxy [FilterOutputStream] [os]
+      (close []
+        (reset! write-newlines? false)
+        (let [^FilterOutputStream this this]
+          (proxy-super close)))
+      (write
+        ([x]
+         (reset! write-newlines? false)
+         (if (int? x)
+           (.write os ^int x)
+           (.write os ^bytes x)))
+
+        ([^bytes ba ^Integer off ^Integer len]
+         (reset! write-newlines? false)
+         (.write os ba off len))))))
+
+(defmacro ^:private with-open-chan [[chan-binding chan] & body]
+  `(let [chan#         ~chan
+         ~chan-binding chan#]
+     (try
+       ~@body
+       (finally
+         (a/close! chan#)))))
 
 ;; TODO - this code is basically duplicated with the code in the QP catch-exceptions middleware; we should refactor to
 ;; remove the duplication
@@ -53,7 +85,7 @@
   (let [format-ex*           (fn [^Throwable e]
                                {:message    (.getMessage e)
                                 :class      (.getCanonicalName (class e))
-                                :stacktrace (map str (.getStackTrace e))
+                                :stacktrace (mapv str (.getStackTrace e))
                                 :data       (ex-data e)})
         [e & more :as chain] (exception-chain e)]
     (merge
@@ -64,161 +96,58 @@
      (when (seq more)
        {:via (map format-ex* more)}))))
 
-(defn write-error-and-close!
-  "Util fn for writing an Exception to the OutputStream provided by `streaming-response`."
-  [^OutputStream os, ^Throwable e]
-  (with-open [os     os
-              writer (BufferedWriter. (OutputStreamWriter. os StandardCharsets/UTF_8))]
-    (try
-      (json/generate-stream (format-exception e)
-                            writer)
-      (.flush writer)
-      (catch EofException _)
-      (catch Throwable e
-        (log/error e (trs "Error writing error to stream"))))))
-
-(defn- proxy-output-stream
-  "Proxy that wraps an `OutputStream` and:
-
-  1.  Sends a message to `they-have-started-writing-chan` whenever someone writes something
-  2.  Sends a message to `they-are-done-chan` whenever someone closes the output stream
-
-  The overhead of this compared to the wrapped `OutputStream` is relatively low -- ~85 ms for 1 million writes to disk
-  vs ~25 ms for a raw OutputStream."
-  ^OutputStream [^OutputStream os {:keys [they-have-started-writing-chan they-are-done-chan]}]
-  (let [send-begin-message! (delay
-                              (a/>!! they-have-started-writing-chan ::wrote-something))
-        send-close-message! (delay
-                              (a/>!! they-are-done-chan ::closed))]
-    ;; TODO -- consider making this a `FilterInputStream` so it can actually take `os` as a constructor arg and
-    ;; provide default impls for some methods
-    (proxy [OutputStream] []
-      (close []
-        @send-close-message!
-        (u/ignore-exceptions
-          (.close os)))
-      (flush []
-        (u/ignore-exceptions
-          (.flush os)))
-      (write
-        ([x]
-         @send-begin-message!
-         (if (int? x)
-           (.write os ^int x)
-           (.write os ^bytes x)))
-        ([^bytes ba ^Integer off ^Integer len]
-         @send-begin-message!
-         (.write os ba off len))))))
-
-(defn- start-newline-loop!
-  "Write a newline every `keepalive-interval-ms` (e.g., one second) until 'they' start writing to the output stream."
-  [^OutputStream os {:keys [they-have-started-writing-chan canceled-chan]} {:keys [write-keepalive-newlines?]
-                                                                            :or   {write-keepalive-newlines? true}}]
-  (a/go-loop []
-    (let [timeout-chan (a/timeout keepalive-interval-ms)
-          [val port]   (a/alts! [they-have-started-writing-chan timeout-chan] :priority true)]
-      ;; TODO - are we sure it's safe to write this newline on a `core.async` thread? I don't see why it would block,
-      ;; but it certainly seems possible. But if we write the newline byte async it seems possible that they could
-      ;; have started writing before our newline byte thread gets ran. Guess this will have to do for now.
-      (when (= port timeout-chan)
-        (log/debug (u/format-color 'blue (trs "Response not ready, writing one byte & sleeping...")))
-        (when (try
-                (when write-keepalive-newlines?
-                  (.write os (byte \newline)))
-                (.flush os)
-                true
-                (catch EofException _
-                  (log/debug (u/format-color 'yellow (trs "connection closed, canceling request")))
-                  (a/>!! canceled-chan ::canceled)
-                  false))
-          (recur))))))
-
-(defn- setup-timeout-and-close!
-  "Once `they-are-done-chan` or `canceled-chan` gets a message, or is closed; or if we not finished by the timeout, shut
-  everything down and flush/close the output stream."
-  [^OutputStream os {:keys [they-are-done-chan canceled-chan], :as chans}]
-  (a/go
-    (let [timeout-chan (a/timeout absolute-max-keepalive-ms)
-          [val port]   (a/alts! [canceled-chan they-are-done-chan timeout-chan])]
-      (when (= port timeout-chan)
-        ;; let "them" know to cancel anything outstanding as well
-        (a/>! canceled-chan ::timed-out))
-      ;; go ahead and close all the channels now
-      (doseq [chan (vals chans)]
-        (a/close! chan))
-      ;; we can write the timeout error (if needed) and flush/close the stream on another thread. To avoid tying up
-      ;; our precious core.async thread.
-      (a/thread
-        (if (= port timeout-chan)
-          (u/ignore-exceptions
-            (write-error-and-close! os (ex-info (trs "Response not finished after waiting {0}. Canceling request."
-                                                     (u/format-milliseconds absolute-max-keepalive-ms))
-                                                {:status 504})))
-          (u/ignore-exceptions
-            (.close os)))))))
-
-(defn- streaming-chans []
-  ;; this channel will get a message when they start writing to the proxy output stream
-  {:they-have-started-writing-chan (a/promise-chan)
-   ;; this channel will get a message when the request is canceled.
-   :canceled-chan                  (a/promise-chan)
-   ;; this channel will get a message when they .close() the proxy output stream
-   :they-are-done-chan             (a/promise-chan)})
-
-(defn do-streaming-response
-  "Stream results of `f` to output stream, writing newlines as appropiate. You shouldn't use this function directly --
-  use `streaming-response` instead -- but I had to make it public because Eastwood isn't able to figure out it's being
-  used since the only use is inside the `deftype+` below."
-  [^OutputStream os f options]
-  (let [chans (streaming-chans)]
-    (start-newline-loop! os chans options)
-    (setup-timeout-and-close! os chans)
-    ;; ok, we can call f now with a proxy-output-stream
+(defn write-error!
+  "Write an error to the output stream, formatting it nicely."
+  [^OutputStream os obj]
+  (if (instance? Throwable obj)
+    (recur os (format-exception obj))
     (try
-      (f (proxy-output-stream os chans) (:canceled-chan chans))
-      (catch Throwable e
-        (write-error-and-close! os e)
-        (a/>!! (:canceled-chan chans) ::exception)
-        (doseq [chan (vals chans)]
-          (a/close! chan))))
-    ;; result of this fn is ignored
-    nil))
+      (with-open [writer (BufferedWriter. (OutputStreamWriter. os StandardCharsets/UTF_8))]
+        (json/generate-stream obj writer)
+        (.flush writer))
+      (catch Throwable _))))
+
+(defn- write-to-stream! [f {:keys [write-keepalive-newlines?]} ^OutputStream os]
+  (with-open-chan [canceled-chan (a/promise-chan)]
+    (with-open [os os
+                os (jetty-eof-canceling-output-stream os canceled-chan)
+                os (keepalive-output-stream os write-keepalive-newlines?)]
+
+      (try
+        (f os canceled-chan)
+        (catch Throwable e
+          (write-error! os {:message (.getMessage e)}))
+        (finally
+          (.flush os))))))
 
 (p.types/deftype+ StreamingResponse [f options]
   pretty/PrettyPrintable
   (pretty [_]
-    (list '->StreamingResponse f options))
+    (list (symbol (str (.getCanonicalName StreamingResponse) \.)) f options))
 
   ;; both sync and async responses
   ring.protocols/StreamableResponseBody
   (write-body-to-stream [_ _ os]
-    (do-streaming-response os f options))
+    (write-to-stream! f options os))
 
   ;; async responses only
   compojure.response/Sendable
-  (send* [this request respond raise]
+  (send* [this _ respond _]
     (respond (merge (ring.response/response this)
                     {:content-type (:content-type options)
                      :headers      (:headers options)
                      :status       202}))))
 
 (defmacro streaming-response
-  "Return an async, streaming, keepalive response.
+  "Return an streaming response that writes keepalive newline bytes.
 
   Minimal example:
 
     (streaming-response {:content-type \"applicaton/json; charset=utf-8\"} [os canceled-chan]
-      (let [futur (future
-                    ;; start writing stuff (possibly async)
-                    (write-stuff! os)
-                    ;; close the output stream when you are finished
-                    (.close os))]
-        ;; canceled-chan will get a message if the API request is canceled. Listen to it and kill any async stuff
-        (a/go
-          (when (nil? (a/<! canceled-chan))
-            (future-cancel futur)))
-        ;; result of `streaming-response` is ignored
-        nil))
+      (write-something-to-stream! os))
+
+  `f` should block until it is completely finished writing to the stream, which will be closed thereafter.
+  `canceled-chan` can be monitored to see if the request is canceled before results are fully written to the stream.
 
   Current options:
 
@@ -226,7 +155,7 @@
   *  `:headers` -- other headers to include in the API response.
   *  `:write-keepalive-newlines?` -- whether we should write keepalive newlines every `keepalive-interval-ms`. Default
       `true`; you can disable this for formats where it wouldn't work, such as CSV."
-  {:style/indent 2}
+  {:style/indent 2, :arglists '([options [os-binding canceled-chan-binding] & body])}
   [options [os-binding canceled-chan-binding :as bindings] & body]
   {:pre [(= (count bindings) 2)]}
   `(->StreamingResponse (fn [~(vary-meta os-binding assoc :tag 'java.io.OutputStream) ~canceled-chan-binding] ~@body)
diff --git a/src/metabase/query_processor/streaming.clj b/src/metabase/query_processor/streaming.clj
index 630f53ec3e7..d545a7a64c5 100644
--- a/src/metabase/query_processor/streaming.clj
+++ b/src/metabase/query_processor/streaming.clj
@@ -1,9 +1,6 @@
 (ns metabase.query-processor.streaming
-  (:require [cheshire.core :as json]
-            [clojure.core.async :as a]
-            [metabase.async
-             [streaming-response :as streaming-response]
-             [util :as async.u]]
+  (:require [clojure.core.async :as a]
+            [metabase.async.streaming-response :as streaming-response]
             [metabase.query-processor.context :as context]
             [metabase.query-processor.streaming
              [csv :as streaming.csv]
@@ -11,9 +8,8 @@
              [json :as streaming.json]
              [xlsx :as streaming.xlsx]]
             [metabase.util :as u])
-  (:import [java.io BufferedWriter OutputStream OutputStreamWriter]
-           java.nio.charset.StandardCharsets
-           org.eclipse.jetty.io.EofException))
+  (:import clojure.core.async.impl.channels.ManyToManyChannel
+           java.io.OutputStream))
 
 ;; these are loaded for side-effects so their impls of `i/results-writer` will be available
 ;; TODO - consider whether we should lazy-load these!
@@ -46,13 +42,6 @@
       (.close os))
     (context/resultf final-metadata context)))
 
-(defn- write-qp-failure-and-close! [^OutputStream os result]
-  (with-open [writer (BufferedWriter. (OutputStreamWriter. os StandardCharsets/UTF_8))]
-    (try
-      (json/generate-stream result writer)
-      (catch EofException _)))
-  (.close os))
-
 (defn streaming-context
   "Context to pass to the QP to streaming results as `export-format` to an output stream. Can be used independently of
   the normal `streaming-response` macro, which is geared toward Ring responses.
@@ -64,41 +53,23 @@
     {:rff      (streaming-rff results-writer)
      :reducedf (streaming-reducedf results-writer os)}))
 
-;; TODO -- consider whether it makes sense to begin writing keepalive chars right away or if maybe we should wait to
-;; call `respond` in async endpoints for 30-60 seconds that way we're not wasting a Ring thread right away
-(defn do-streaming-response
+(defn streaming-response*
   "Impl for `streaming-response`."
-  ^metabase.async.streaming_response.StreamingResponse [export-format run-query-fn]
+  [export-format f]
   (streaming-response/streaming-response (i/stream-options export-format) [os canceled-chan]
-    (let [out-chan (try
-                     (run-query-fn (streaming-context export-format os))
-                     (catch Throwable e
-                       e))]
-      (if (async.u/promise-chan? out-chan)
-        (a/go
-          (let [[val port] (a/alts! [out-chan canceled-chan] :priority true)]
-            (cond
-              ;; if result is an Exception or a QP failure response write that out (async) and close up
-              (and (= port out-chan)
-                   (instance? Throwable val))
-              (a/thread (streaming-response/write-error-and-close! os val))
-
-              (and (= port out-chan)
-                   (map? val)
-                   (= (:status val) :failed))
-              (a/thread (write-qp-failure-and-close! os val))
-
-              ;; otherwise if the `cancled-chan` go a message we can tell the QP to cancel the running query by
-              ;; closing `out-chan`
-              (and (= port canceled-chan)
-                   (nil? val))
-              (a/close! out-chan))))
-        ;; if we got something besides a channel, such as a Throwable, write it as JSON to the `out-chan` and close
-        (a/thread
-          (if (instance? Throwable out-chan)
-            (streaming-response/write-error-and-close! os out-chan)
-            (write-qp-failure-and-close! os out-chan))))
-      nil)))
+    (let [result (try
+                   (f (streaming-context export-format os))
+                   (catch Throwable e
+                     e))
+          result (if (instance? ManyToManyChannel result)
+                   (let [[val port] (a/alts!! [result canceled-chan])]
+                     (when (= port canceled-chan)
+                       (a/close! result))
+                     val)
+                   result)]
+      (when (or (instance? Throwable result)
+                (= (:status result) :failed))
+        (streaming-response/write-error! os result)))))
 
 (defmacro streaming-response
   "Return results of processing a query as a streaming response. This response implements the appropriate Ring/Compojure
@@ -109,11 +80,13 @@
 
     (api/defendpoint GET \"/whatever\" []
       (qp.streaming/streaming-response [context :json]
-        (qp/process-query-and-save-with-max-results-constraints! (assoc my-query :async? true) context)))"
+        (qp/process-query-and-save-with-max-results-constraints! (assoc query :async true) context)))
+
+  Handles either async or sync QP results, but you should prefer returning sync results so we can handle query
+  cancelations properly."
   {:style/indent 1}
-  [[context-binding export-format :as bindings] & body]
-  {:pre [(vector? bindings) (= (count bindings) 2)]}
-  `(do-streaming-response ~export-format (fn [~context-binding] ~@body)))
+  [[context-binding export-format] & body]
+  `(streaming-response* ~export-format (fn [~context-binding] ~@body)))
 
 (defn export-formats
   "Set of valid streaming response formats. Currently, `:json`, `:csv`, `:xlsx`, and `:api` (normal JSON API results
diff --git a/test/metabase/api/dataset_test.clj b/test/metabase/api/dataset_test.clj
index df621d76bb3..242bd1a0b2d 100644
--- a/test/metabase/api/dataset_test.clj
+++ b/test/metabase/api/dataset_test.clj
@@ -172,6 +172,7 @@
               (spreadsheet/select-columns {:A "Values"})))))
 
 (defn- parse-and-sort-csv [response]
+  (assert (some? response))
   (sort-by
    ;; ID in CSV is a string, parse it and sort it to get the first 5
    (comp #(Integer/parseInt %) first)
@@ -230,8 +231,10 @@
                           {:database mbql.s/saved-questions-virtual-database-id
                            :type     :query
                            :query    {:source-table (str "card__" (u/get-id card))}}))]
-      (is (= 16
-             (count (csv/read-csv result)))))))
+      (is (some? result))
+      (when (some? result)
+        (is (= 16
+               (count (csv/read-csv result))))))))
 
 ;; POST /api/dataset/:format
 ;;
@@ -248,8 +251,10 @@
                            :middleware
                            {:add-default-userland-constraints? true
                             :userland-query?                   true}}))]
-      (is (= 101
-             (count (csv/read-csv result)))))))
+      (is (some? result))
+      (when (some? result)
+        (is (= 101
+               (count (csv/read-csv result))))))))
 
 ;; non-"download" queries should still get the default constraints
 ;; (this also is a sanitiy check to make sure the `with-redefs` in the test above actually works)
diff --git a/test/metabase/query_processor/streaming_test.clj b/test/metabase/query_processor/streaming_test.clj
index 5a14a697e06..29adb1735d6 100644
--- a/test/metabase/query_processor/streaming_test.clj
+++ b/test/metabase/query_processor/streaming_test.clj
@@ -1,6 +1,5 @@
 (ns metabase.query-processor.streaming-test
   (:require [cheshire.core :as json]
-            [clojure.core.async :as a]
             [clojure.data.csv :as csv]
             [clojure.test :refer :all]
             [dk.ative.docjure.spreadsheet :as spreadsheet]
@@ -13,8 +12,7 @@
             [metabase.test.util :as tu]
             [ring.core.protocols :as ring.protocols]
             [toucan.db :as db])
-  (:import [java.io BufferedInputStream BufferedOutputStream FilterOutputStream InputStream InputStreamReader
-            OutputStream PipedInputStream PipedOutputStream]))
+  (:import [java.io BufferedInputStream BufferedOutputStream ByteArrayInputStream ByteArrayOutputStream InputStream InputStreamReader]))
 
 (defmulti ^:private parse-result
   {:arglists '([export-format ^InputStream input-stream])}
@@ -41,29 +39,31 @@
        (spreadsheet/select-columns {:A "ID", :B "Name", :C "Category ID", :D "Latitude", :E "Longitude", :F "Price"})
        rest))
 
-(defn- close-notifying-stream
-  ^FilterOutputStream [close-chan ^OutputStream os]
-  (proxy [FilterOutputStream] [os]
-    (close []
-      (a/put! close-chan ::close)
-      (try
-        (.flush os)
-        (.close os)
-        (catch Throwable _)))))
-
-(defn- process-query-streaming [export-format query]
+(defn- process-query-basic-streaming [export-format query]
   (with-redefs [streaming-response/keepalive-interval-ms 2]
-    (mt/with-open-channels [close-chan (a/promise-chan)]
-      (with-open [pos (PipedOutputStream.)
-                  os  (close-notifying-stream close-chan (BufferedOutputStream. pos))
-                  is  (BufferedInputStream. (PipedInputStream. pos))]
-        (ring.protocols/write-body-to-stream
-         (qp.streaming/streaming-response [context export-format]
-           (qp/process-query-async query (assoc context :timeout 5000)))
-         nil
-         os)
-        (mt/wait-for-result close-chan 100)
-        (parse-result export-format is)))))
+    (with-open [bos (ByteArrayOutputStream.)
+                os  (BufferedOutputStream. bos)]
+      (qp/process-query query (assoc (qp.streaming/streaming-context export-format os)
+                                     :timeout 15000))
+      (.flush os)
+      (let [bytea (.toByteArray bos)]
+        (with-open [is (BufferedInputStream. (ByteArrayInputStream. bytea))]
+          (parse-result export-format is))))))
+
+(defn- process-query-api-response-streaming [export-format query]
+  (with-redefs [streaming-response/keepalive-interval-ms 2]
+    (with-open [bos (ByteArrayOutputStream.)
+                os  (BufferedOutputStream. bos)]
+      (ring.protocols/write-body-to-stream
+       (qp.streaming/streaming-response [context export-format]
+         (qp/process-query-async query (assoc context :timeout 5000)))
+       nil
+       os)
+      (.flush os)
+      (.flush bos)
+      (let [bytea (.toByteArray bos)]
+        (with-open [is (BufferedInputStream. (ByteArrayInputStream. bytea))]
+          (parse-result export-format is))))))
 
 (defmulti ^:private expected-results
   {:arglists '([export-format normal-results])}
@@ -104,17 +104,34 @@
   (cond-> x
     (map? x) (m/dissoc-in [:data :results_metadata :checksum])))
 
-(defn- compare-results-for-query [export-format query]
-  (let [query             query
-        streaming-results (maybe-remove-checksum (process-query-streaming export-format query))
-        expected-results  (maybe-remove-checksum (expected-results export-format (qp/process-query query)))]
-    (is (= expected-results
-           streaming-results))))
+(defn- expected-results* [export-format query]
+  (maybe-remove-checksum (expected-results export-format (qp/process-query query))))
+
+(defn- basic-actual-results* [export-format query]
+  (maybe-remove-checksum (process-query-basic-streaming export-format query)))
+
+(deftest basic-streaming-test []
+  (testing "Test that the underlying qp.streaming context logic itself works correctly. Not an end-to-end test!"
+    (let [query (mt/mbql-query venues
+                  {:order-by [[:asc $id]]
+                   :limit    5})]
+      (doseq [export-format (qp.streaming/export-formats)]
+        (testing export-format
+          (is (= (expected-results* export-format query)
+                 (basic-actual-results* export-format query))))))))
+
+(defn- actual-results* [export-format query]
+  (maybe-remove-checksum (process-query-api-response-streaming export-format query)))
+
+(defn- compare-results [export-format query]
+  (is (= (expected-results* export-format query)
+         (actual-results* export-format query))))
 
 (deftest streaming-response-test
-  (doseq [export-format (qp.streaming/export-formats)]
-    (testing export-format
-      (compare-results-for-query export-format (mt/mbql-query venues {:limit 5})))))
+  (testing "Test that the actual results going thru the same steps as an API response are correct."
+    (doseq [export-format (qp.streaming/export-formats)]
+      (testing export-format
+        (compare-results export-format (mt/mbql-query venues {:limit 5}))))))
 
 (deftest utf8-test
   ;; UTF-8 isn't currently working for XLSX -- fix me
@@ -122,11 +139,11 @@
     (testing export-format
       (testing "Make sure our various streaming formats properly write values as UTF-8."
         (testing "A query that will have a little → in its name"
-          (compare-results-for-query export-format (mt/mbql-query venues
-                                                     {:fields   [$name $category_id->categories.name]
-                                                      :order-by [[:asc $id]]
-                                                      :limit    5})))
+          (compare-results export-format (mt/mbql-query venues
+                                           {:fields   [$name $category_id->categories.name]
+                                            :order-by [[:asc $id]]
+                                            :limit    5})))
         (testing "A query with emoji and other fancy unicode"
           (let [[sql & args] (db/honeysql->sql {:select [["Cam 𝌆 Saul 💩" :cam]]})]
-            (compare-results-for-query export-format (mt/native-query {:query  sql
-                                                                       :params args}))))))))
+            (compare-results export-format (mt/native-query {:query  sql
+                                                             :params args}))))))))
-- 
GitLab