diff --git a/src/metabase/query_processor.clj b/src/metabase/query_processor.clj
index 04adc4e80c44d5af40012e9a672c4aeace110d47..237bc42437ff5a1dd998f0de57983609acc36408 100644
--- a/src/metabase/query_processor.clj
+++ b/src/metabase/query_processor.clj
@@ -219,11 +219,11 @@
         (apply (qp) args))
       (qp))))
 
-(def ^{:arglists '([query] [query context])} process-query-async
+(def ^{:arglists '([query] [query context] [query rff context])} process-query-async
   "Process a query asynchronously, returning a `core.async` channel that is called with the final result (or Throwable)."
   (base-qp default-middleware))
 
-(def ^{:arglists '([query] [query context])} process-query-sync
+(def ^{:arglists '([query] [query context] [query rff context])} process-query-sync
   "Process a query synchronously, blocking until results are returned. Throws raised Exceptions directly."
   (qp.reducible/sync-qp process-query-async))
 
@@ -231,7 +231,7 @@
   "Process an MBQL query. This is the main entrypoint to the magical realm of the Query Processor. Returns a *single*
   core.async channel if option `:async?` is true; otherwise returns results in the usual format. For async queries, if
   the core.async channel is closed, the query will be canceled."
-  {:arglists '([query] [query context])}
+  {:arglists '([query] [query context] [query rff context])}
   [{:keys [async?], :as query} & args]
   (apply (if async? process-query-async process-query-sync)
          query
diff --git a/src/metabase/query_processor/context.clj b/src/metabase/query_processor/context.clj
index bf6ce4c317568b37f01e127769f6ebe501665f8f..37802638b724def16225a6bbd68e1be79dd8b3f3 100644
--- a/src/metabase/query_processor/context.clj
+++ b/src/metabase/query_processor/context.clj
@@ -1,9 +1,9 @@
 (ns metabase.query-processor.context
   "Interface for the QP context/utility functions for using the things in the context correctly.
 
-  The default implementations of all these functions live in `metabase.query-processor.context.default`; refer to
-  those when overriding individual functions. Some wiring for the `core.async` channels takes place in
-  `metabase.query-processor.reducible.`"
+  The default implementations of all these functions live in [[metabase.query-processor.context.default]]; refer to
+  those when overriding individual functions. Some wiring for the [[clojure.core.async]] channels takes place in
+  [[metabase.query-processor.reducible]]."
   (:require [metabase.async.util :as async.u]))
 
 (defn raisef
@@ -28,17 +28,17 @@
 ;;                       [message sent to canceled chan]
 ;;
 ;; 1. Query normally runs thru middleware and then a series of context functions as described above; result is sent thru
-;;    `resultf` and finally to `out-chan`
+;;    [[resultf]] and finally to [[out-chan]]
 ;;
-;; 2. If an `Exception` is thrown, it is sent thru `raisef`, `resultf` and finally to `out-chan`
+;; 2. If an `Exception` is thrown, it is sent thru [[raisef]], [[resultf]] and finally to [[out-chan]]
 ;;
 ;; 3. If the query times out, `timeoutf` throws an Exception
 ;;
-;; 4. If the query is canceled (either by closing `out-chan` before it gets a result, or by sending `canceled-chan` a
-;; message), the execution is canceled and `out-chan` is closed (if not already closed).
+;; 4. If the query is canceled (either by closing [[out-chan]] before it gets a result, or by sending [[canceled-chan]]
+;;    a message), the execution is canceled and [[out-chan]] is closed (if not already closed).
 (defn runf
-  "Called by pivot fn to run preprocessed query. Normally, this simply calls `executef`, but you can override this for
-  test purposes. The result of this function is ignored."
+  "Called by the [[metabase.query-processor.reducible/identity-qp]] fn to run preprocessed query. Normally, this simply
+  calls [[executef]], but you can override this for test purposes. The result of this function is ignored."
   {:arglists '([query rff context])}
   [query rff {runf* :runf, :as context}]
   {:pre [(fn? runf*)]}
@@ -46,12 +46,12 @@
   nil)
 
 (defn executef
-  "Called by `runf` to have driver run query. By default, `driver/execute-reducible-query`. `respond` is a callback with
-  the signature:
+  "Called by [[runf]] to have driver run query. By default, [[metabase.driver/execute-reducible-query]]. `respond` is a
+  callback with the signature:
 
     (respond results-metadata reducible-rows)
 
-  The implementation of `executef` should call `respond` with this information once it is available. The result of
+  The implementation of [[executef]] should call `respond` with this information once it is available. The result of
   this function is ignored."
   {:arglists '([driver query context respond])}
   [driver query {executef* :executef, :as context} respond]
@@ -60,9 +60,9 @@
   nil)
 
 (defn reducef
-  "Called by `runf` (inside the `respond` callback provided by it) to reduce results of query. `reducedf` is called with
-  the reduced results. The actual output of this function is ignored, but the entire result set must be reduced and
-  passed to `reducedf` before this function completes."
+  "Called by [[runf]] (inside the `respond` callback provided by it) to reduce results of query. [[reducedf]] is called
+  with the reduced results. The actual output of this function is ignored, but the entire result set must be reduced
+  and passed to [[reducedf]] before this function completes."
   {:arglists '([rff context metadata reducible-rows])}
   [rff {reducef* :reducef, :as context} metadata reducible-rows]
   {:pre [(fn? reducef*)]}
@@ -70,7 +70,7 @@
   nil)
 
 (defn reducedf
-  "Called in `reducedf` with fully reduced results. This result is passed to `resultf`."
+  "Called in [[reducedf]] with fully reduced results. This result is passed to [[resultf]]."
   {:arglists '([metadata reduced-rows context])}
   [metadata reduced-rows {reducedf* :reducedf, :as context}]
   {:pre [(fn? reducedf*)]}
@@ -84,7 +84,7 @@
   (timeoutf* context))
 
 (defn resultf
-  "Called exactly once with the final result, which is the result of either `reducedf` or `raisef`."
+  "Called exactly once with the final result, which is the result of either [[reducedf]] or [[raisef]]."
   {:arglists '([result context])}
   [result {resultf* :resultf, :as context}]
   {:pre [(fn? resultf*)]}
diff --git a/src/metabase/query_processor/context/default.clj b/src/metabase/query_processor/context/default.clj
index c053e40ab1fd39d6cb55f5b05dc0cd62573b18fb..02af99df7c733fbe6dbecd0f9376f17e3cfe7505 100644
--- a/src/metabase/query_processor/context/default.clj
+++ b/src/metabase/query_processor/context/default.clj
@@ -95,7 +95,8 @@
 (defn default-context
   "Return a new context for executing queries using the default values. These can be overrided as needed."
   []
-  {:timeout       query-timeout-ms
+  {::complete?    true
+   :timeout       query-timeout-ms
    :rff           default-rff
    :raisef        default-raisef
    :runf          default-runf
diff --git a/src/metabase/query_processor/middleware/cache.clj b/src/metabase/query_processor/middleware/cache.clj
index e9cc6b39dfdf72285c4dbdef2f5ed61fcfde7eb8..bc720412ef50c77fcc90bf0f37069d896ecf09df 100644
--- a/src/metabase/query_processor/middleware/cache.clj
+++ b/src/metabase/query_processor/middleware/cache.clj
@@ -10,8 +10,7 @@
   The default backend is `db`, which uses the application database; this value can be changed by setting the env var
   `MB_QP_CACHE_BACKEND`. Refer to [[metabase.query-processor.middleware.cache-backend.interface]] for more details
   about how the cache backends themselves."
-  (:require [clojure.core.async :as a]
-            [clojure.tools.logging :as log]
+  (:require [clojure.tools.logging :as log]
             [java-time :as t]
             [medley.core :as m]
             [metabase.config :as config]
@@ -44,66 +43,76 @@
   (try
     (log/tracef "Purging cache entries older than %s" (u/format-seconds (public-settings/query-caching-max-ttl)))
     (i/purge-old-entries! backend (public-settings/query-caching-max-ttl))
+    (log/trace "Successfully purged old cache entries.")
+    :done
     (catch Throwable e
-      (log/error e (trs "Error purging old cache entries")))))
+      (log/error e (trs "Error purging old cache entries: {0}" (ex-message e))))))
 
 (defn- min-duration-ms
   "Minimum duration it must take a query to complete in order for it to be eligible for caching."
   []
   (* (public-settings/query-caching-min-ttl) 1000))
 
-(defn- cache-results-async!
-  "Save the results of a query asynchronously once they are delivered (as a byte array) to the promise channel
-  `out-chan`."
-  [query-hash out-chan]
-  (log/info (trs "Caching results for next time for query with hash {0}." (pr-str (i/short-hex-hash query-hash))) (u/emoji "💾"))
-  (a/go
-    (let [x (a/<! out-chan)]
-      (condp instance? x
-        Throwable
-        (if (= (:type (ex-data x)) ::impl/max-bytes)
-          (log/debug x (trs "Not caching results: results are larger than {0} KB" (public-settings/query-caching-max-kb)))
-          (log/error x (trs "Error saving query results to cache.")))
-
-        (Class/forName "[B")
-        (let [y (a/<! (a/thread
-                        (try
-                          (i/save-results! *backend* query-hash x)
-                          (catch Throwable e
-                            e))))]
-          (if (instance? Throwable y)
-            (log/error y (trs "Error saving query results to cache."))
-            (do
-              (log/debug (trs "Successfully cached results for query."))
-              (purge! *backend*))))
-
-        (log/error (trs "Cannot cache results: expected byte array, got {0}" (class x)))))))
+(def ^:private ^:dynamic *in-fn*
+  "The `in-fn` provided by [[impl/do-with-serialization]]."
+  nil)
+
+(defn- add-object-to-cache!
+  "Add `object` (e.g. a result row or metadata) to the current cache entry."
+  [object]
+  (when *in-fn*
+    (*in-fn* object)))
+
+(def ^:private ^:dynamic *result-fn*
+  "The `result-fn` provided by [[impl/do-with-serialization]]."
+  nil)
+
+(defn- serialized-bytes []
+  (when *result-fn*
+    (*result-fn*)))
+
+(defn- cache-results!
+  "Save the final results of a query."
+  [query-hash]
+  (log/info (trs "Caching results for next time for query with hash {0}."
+                 (pr-str (i/short-hex-hash query-hash))) (u/emoji "💾"))
+  (try
+    (let [bytez (serialized-bytes)]
+      (if-not (instance? (Class/forName "[B") bytez)
+        (log/error (trs "Cannot cache results: expected byte array, got {0}" (class bytez)))
+        (do
+          (log/trace "Got serialized bytes; saving to cache backend")
+          (i/save-results! *backend* query-hash bytez)
+          (log/debug "Successfully cached results for query.")
+          (purge! *backend*))))
+    :done
+    (catch Throwable e
+      (if (= (:type (ex-data e)) ::impl/max-bytes)
+        (log/debug e (trs "Not caching results: results are larger than {0} KB" (public-settings/query-caching-max-kb)))
+        (log/error e (trs "Error saving query results to cache: {0}" (ex-message e)))))))
 
 (defn- save-results-xform [start-time metadata query-hash rf]
-  (let [{:keys [in-chan out-chan]} (impl/serialize-async)
-        has-rows?                  (volatile! false)]
-    (a/put! in-chan (assoc metadata
-                           :cache-version cache-version
-                           :last-ran      (t/zoned-date-time)))
+  (let [has-rows? (volatile! false)]
+    (add-object-to-cache! (assoc metadata
+                                 :cache-version cache-version
+                                 :last-ran      (t/zoned-date-time)))
     (fn
       ([] (rf))
 
       ([result]
-       (a/put! in-chan (if (map? result)
-                         (m/dissoc-in result [:data :rows])
-                         {}))
-       (a/close! in-chan)
+       (add-object-to-cache! (if (map? result)
+                               (m/dissoc-in result [:data :rows])
+                               {}))
        (let [duration-ms (- (System/currentTimeMillis) start-time)]
          (log/info (trs "Query took {0} to run; minimum for cache eligibility is {1}"
                         (u/format-milliseconds duration-ms) (u/format-milliseconds (min-duration-ms))))
          (when (and @has-rows?
                     (> duration-ms (min-duration-ms)))
-           (cache-results-async! query-hash out-chan)))
+           (cache-results! query-hash)))
        (rf result))
 
       ([acc row]
-       ;; Blocking so we don't exceed async's MAX-QUEUE-SIZE when transducing a large result set
-       (a/>!! in-chan row)
+       (add-object-to-cache! row)
        (vreset! has-rows? true)
        (rf acc row)))))
 
@@ -159,15 +168,15 @@
       (log/debug (trs "Request is closed; no one to return cached results to"))
       ::canceled)
     (catch Throwable e
-      (log/error e (trs "Error attempting to fetch cached results for query with hash {0}"
-                        (i/short-hex-hash query-hash)))
+      (log/error e (trs "Error attempting to fetch cached results for query with hash {0}: {1}"
+                        (i/short-hex-hash query-hash) (ex-message e)))
       ::miss)))
 
 
 ;;; --------------------------------------------------- Middleware ---------------------------------------------------
 
 (defn- run-query-with-cache
-  [qp {:keys [cache-ttl middleware], :as query} rff context]
+  [qp {:keys [cache-ttl middleware], :as query} rff {:keys [reducef], :as context}]
   ;; TODO - Query will already have `info.hash` if it's a userland query. I'm not 100% sure it will be the same hash,
   ;; because this is calculated after normalization, instead of before
   (let [query-hash (qputil/query-hash query)
@@ -175,10 +184,16 @@
     (when (= result ::miss)
       (let [start-time-ms (System/currentTimeMillis)]
         (log/trace "Running query and saving cached results (if eligible)...")
-        (qp query
-            (fn [metadata]
-              (save-results-xform start-time-ms metadata query-hash (rff metadata)))
-            context)))))
+        (let [reducef' (fn [rff context metadata rows]
+                         (impl/do-with-serialization
+                          (fn [in-fn result-fn]
+                            (binding [*in-fn*     in-fn
+                                      *result-fn* result-fn]
+                              (reducef rff context metadata rows)))))]
+          (qp query
+              (fn [metadata]
+                (save-results-xform start-time-ms metadata query-hash (rff metadata)))
+              (assoc context :reducef reducef')))))))
 
 (defn- is-cacheable? {:arglists '([query])} [{:keys [cache-ttl]}]
   (and (public-settings/enable-query-caching)
@@ -197,7 +212,7 @@
         running the query, satisfying this requirement.)
      *  The result *rows* of the query must be less than `query-caching-max-kb` when serialized (before compression)."
   [qp]
-  (fn [query rff context]
+  (fn maybe-return-cached-results* [query rff context]
     (let [cacheable? (is-cacheable? query)]
       (log/tracef "Query is cacheable? %s" (boolean cacheable?))
       (if cacheable?
diff --git a/src/metabase/query_processor/middleware/cache/impl.clj b/src/metabase/query_processor/middleware/cache/impl.clj
index 4d1eca97db3dd4d906ae1b71f900debc065e9b01..d0eac6b0a086ed4efb292bb939dc783b012da65f 100644
--- a/src/metabase/query_processor/middleware/cache/impl.clj
+++ b/src/metabase/query_processor/middleware/cache/impl.clj
@@ -1,9 +1,8 @@
 (ns metabase.query-processor.middleware.cache.impl
-  (:require [clojure.core.async :as a]
-            [clojure.tools.logging :as log]
+  (:require [clojure.tools.logging :as log]
             [metabase.public-settings :as public-settings]
             [metabase.util :as u]
-            [metabase.util.i18n :refer [trs tru]]
+            [metabase.util.i18n :refer [trs]]
             [taoensso.nippy :as nippy])
   (:import [java.io BufferedInputStream BufferedOutputStream ByteArrayOutputStream DataInputStream DataOutputStream
             EOFException FilterOutputStream InputStream OutputStream]
@@ -31,95 +30,58 @@
          (check-total (swap! byte-count + len))
          (.write os ba off len))))))
 
-(def ^:private serialization-timeout-ms (u/minutes->ms 10))
-
-(defn- start-out-chan-close-block!
-  "When `out-chan` closes, close everything. Wait up to 10 minutes for `out-chan` to close, and throw an Exception if
-  it's not done by then."
-  [in-chan out-chan ^ByteArrayOutputStream bos ^DataOutputStream os]
-  (a/go
-    (let [timeout-chan (a/timeout serialization-timeout-ms)
-          [_val port]   (a/alts! [out-chan timeout-chan])]
-      (when (= port timeout-chan)
-        (a/>! out-chan (ex-info (tru "Serialization timed out after {0}." (u/format-milliseconds serialization-timeout-ms))
-                                {}))))
-    (log/tracef "Closing core.async channels and output streams.")
-    (try
-      ;; don't really need to close both, probably
-      (.close os)
-      (.close bos)
-      (catch Throwable e
-        (a/>! out-chan e)))
-    (a/close! out-chan)
-    (a/close! in-chan)))
-
 (defn- freeze!
   [^OutputStream os obj]
-  (try
-    (nippy/freeze-to-out! os obj)
-    (.flush os)
-    :ok
-    (catch Throwable e
-      e)))
-
-(defn- start-input-loop!
-  "Listen for things sent to `in-chan`. When we get an object to `in-chan`, write it to the ouput stream (async), then
-  recur and wait for the next obj. When `in-chan` is closed, write the bytes to `out-chan` (async).
-
-  If serialization fails, writes thrown Exception to `out-chan`."
-  [in-chan out-chan ^ByteArrayOutputStream bos ^DataOutputStream os]
-  (a/go-loop []
-    ;; we got a result
-    (if-let [obj (a/<! in-chan)]
-      (do
-        (log/tracef "Serializing %s" (pr-str obj))
-        (let [result (a/<! (a/thread (freeze! os obj)))]
-          (if (instance? Throwable result)
-            (do
-              ;; Serialization has failed, close the channel as there's no point in continuing writing to it
-              (a/close! in-chan)
-              ;; Drain the channel to unblock
-              (while (a/poll! in-chan))
-              (a/>! out-chan result))
-            (recur))))
-      ;; `in-chan` is closed
-      (a/thread
-        (try
-          (.flush os)
-          (let [result (.toByteArray bos)]
-            (a/>!! out-chan result))
-          (catch Throwable e
-            (a/>!! out-chan e)))))))
-
-(defn serialize-async
-  "Create output streams for serializing QP results. Returns a map of core.async channels, `in-chan` and `out-chan`.
-  Send all objects to be serialized to `in-chan`; then close it when finished; the result of `out-chan` will be the
-  serialized byte array (or an Exception, if one was thrown).
-
-  `out-chan` is closed automatically upon receiving a result; all chans and output streams are closed thereafter.
-
-    (let [{:keys [in-chan out-chan]} (serialize-async)]
-      (doseq [obj objects]
-        (a/put! in-chan obj))
-      (a/close! in-chan)
-      (let [[val] (a/alts!! [out-chan (a/timeout 1000)])]
-        (when (instance? Throwable val)
-          (throw val))
-         val))"
-  ([]
-   (serialize-async {:max-bytes (* (public-settings/query-caching-max-kb) 1024)}))
-
-  ([{:keys [max-bytes]}]
-   (let [in-chan  (a/chan 1)
-         out-chan (a/promise-chan)
-         bos      (ByteArrayOutputStream.)
-         os       (-> (max-bytes-output-stream max-bytes bos)
-                      BufferedOutputStream.
-                      (GZIPOutputStream. true)
-                      DataOutputStream.)]
-     (start-out-chan-close-block! in-chan out-chan bos os)
-     (start-input-loop! in-chan out-chan bos os)
-     {:in-chan in-chan, :out-chan out-chan})))
+  (log/tracef "Freezing %s" (pr-str obj))
+  (nippy/freeze-to-out! os obj)
+  (.flush os))
+
+(defn do-with-serialization
+  "Create output streams for serializing QP results and invoke `f`, a function of the form
+
+    (f in-fn result-fn)
+
+  `in-fn` is of the form `(in-fn object)` and should be called once for each object that should be serialized. `in-fn`
+  will catch any exceptions thrown during serialization; these will be thrown later when invoking `result-fn`. After
+  the first exception `in-fn` will no-op for all subsequent calls.
+
+  When you have serialized *all* objects, call `result-fn` to get the serialized byte array. If an error was
+  encountered during serialization (such as the serialized bytes being longer than `max-bytes`), `result-fn` will
+  throw an Exception rather than returning a byte array; be sure to handle this case.
+
+    (do-with-serialization
+      (fn [in result]
+        (doseq [obj objects]
+          (in obj))
+        (result)))"
+  ([f]
+   (do-with-serialization f {:max-bytes (* (public-settings/query-caching-max-kb) 1024)}))
+
+  ([f {:keys [max-bytes]}]
+   (with-open [bos (ByteArrayOutputStream.)]
+     (let [os    (-> (max-bytes-output-stream max-bytes bos)
+                     BufferedOutputStream.
+                     (GZIPOutputStream. true)
+                     DataOutputStream.)
+           error (atom nil)]
+       (try
+         (f (fn in* [obj]
+              (when-not @error
+                (try
+                  (freeze! os obj)
+                  (catch Throwable e
+                    (log/trace e "Caught error when freezing object")
+                    (reset! error e))))
+              nil)
+            (fn result* []
+              (when @error
+                (throw @error))
+              (log/trace "Getting result byte array")
+              (.toByteArray bos)))
+         ;; this is done manually instead of `with-open` because it might throw an Exception when we close it if it's
+         ;; past the byte limit; that's fine and we can ignore it
+         (finally
+           (u/ignore-exceptions (.close os))))))))
 
 (defn- thaw!
   [^InputStream is]
diff --git a/src/metabase/query_processor/reducible.clj b/src/metabase/query_processor/reducible.clj
index a0c13b6a2841f855660c13aad90502c48d196c2d..c2258e1de86714565861724cc0e22ae4cecf41a0 100644
--- a/src/metabase/query_processor/reducible.clj
+++ b/src/metabase/query_processor/reducible.clj
@@ -6,29 +6,7 @@
             [metabase.query-processor.context.default :as context.default]
             [metabase.util :as u]))
 
-(defn- wire-up-context-channels!
-  "Wire up the core.async channels in a QP `context`."
-  [context]
-  ;; 1) If query doesn't complete by `timeoutf`, call `timeoutf`, which should raise an Exception
-  ;; 2) when `out-chan` is closed prematurely, send a message to `canceled-chan`
-  ;; 3) when `out-chan` is closed or gets a result, close both out-chan and canceled-chan
-  (let [out-chan      (context/out-chan context)
-        canceled-chan (context/canceled-chan context)
-        timeout       (context/timeout context)]
-    (a/go
-      (let [[val port] (a/alts! [out-chan (a/timeout timeout)] :priority true)]
-        (log/tracef "Port %s got %s"
-                    (if (= port out-chan) "out-chan" (format "[timeout after %s]" (u/format-milliseconds timeout)))
-                    val)
-        (cond
-          (not= port out-chan) (context/timeoutf context)
-          (nil? val)           (a/>!! canceled-chan ::cancel))
-        (log/tracef "Closing out-chan.")
-        (a/close! out-chan)
-        (a/close! canceled-chan)))
-    nil))
-
-(defn pivot
+(defn identity-qp
   "The initial value of `qp` passed to QP middleware."
   [query rff context]
   (context/runf query rff context))
@@ -39,9 +17,9 @@
 
     (qp query rff context)"
   ([middleware]
-   (combine-middleware middleware pivot))
+   (combine-middleware middleware identity-qp))
 
-  ([middleware pivot-fn]
+  ([middleware qp]
    (reduce
     (fn [qp middleware]
       (when (var? middleware)
@@ -49,9 +27,43 @@
       (if (some? middleware)
         (middleware qp)
         qp))
-    pivot-fn
+    qp
     middleware)))
 
+;; Why isn't this just done automatically when we create the context in [[context.default/default-context]]? The timeout
+;; could be subject to change so it makes sense to wait until we actually run the query to wire stuff up. Also, since
+;; we're doing
+;;
+;;    (merge (context.default/default-context) context)
+;;
+;; all over the place, it probably reduces overhead a bit to not run around adding a bunch of timeouts to channels we
+;; don't end up using.
+(defn- wire-up-context-channels!
+  "Wire up the core.async channels in a QP `context`
+
+  1. If query doesn't complete by [[context/timeout]], call [[context/timeoutf]], which should raise an Exception.
+
+  2. When [[context/out-chan]] is closed prematurely, send a message to [[context/canceled-chan]].
+
+  3. When [[context/out-chan]] is closed or gets a result, close both [[context/out-chan]]
+     and [[context/canceled-chan]]."
+  [context]
+  (let [out-chan      (context/out-chan context)
+        canceled-chan (context/canceled-chan context)
+        timeout       (context/timeout context)]
+    (a/go
+      (let [[val port] (a/alts! [out-chan (a/timeout timeout)] :priority true)]
+        (log/tracef "Port %s got %s"
+                    (if (= port out-chan) "out-chan" (format "[timeout after %s]" (u/format-milliseconds timeout)))
+                    val)
+        (cond
+          (not= port out-chan) (context/timeoutf context)
+          (nil? val)           (a/>!! canceled-chan ::cancel))
+        (log/tracef "Closing out-chan.")
+        (a/close! out-chan)
+        (a/close! canceled-chan)))
+    nil))
+
 (def ^:dynamic *run-on-separate-thread?*
   "Whether to run the query on a separate thread. When running a query asynchronously (i.e., with [[async-qp]]), this is
   normally `true`, meaning the `out-chan` is returned immediately. When running a query synchronously (i.e., with
@@ -76,22 +88,26 @@
      (qp* query nil))
 
     ([query context]
+     (qp* query nil context))
+
+    ([query rff context]
      {:pre [(map? query) ((some-fn nil? map?) context)]}
-     (let [context (merge (context.default/default-context) context)]
-       (wire-up-context-channels! context)
-       (let [thunk (fn [] (try
-                            (qp query (context/rff context) context)
+     (let [context (doto (merge (context.default/default-context) context)
+                     wire-up-context-channels!)
+           rff     (or rff
+                       (context/rff context))
+           thunk   (fn [] (try
+                            (qp query rff context)
                             (catch Throwable e
                               (context/raisef e context))))]
-         (log/tracef "Running on separate thread? %s" *run-on-separate-thread?*)
-         (if *run-on-separate-thread?*
-           (future (thunk))
-           (thunk)))
+       (log/tracef "Running on separate thread? %s" *run-on-separate-thread?*)
+       (if *run-on-separate-thread?*
+         (future (thunk))
+         (thunk))
        (context/out-chan context)))))
 
 (defn- wait-for-async-result [out-chan]
   {:pre [(async.u/promise-chan? out-chan)]}
-  ;; TODO - consider whether we should have another timeout here as well
   (let [result (a/<!! out-chan)]
     (if (instance? Throwable result)
       (throw result)
@@ -105,14 +121,9 @@
     (qp query context)"
   [qp]
   {:pre [(fn? qp)]}
-  (fn qp*
-    ([query]
-     (wait-for-async-result (binding [*run-on-separate-thread?* false]
-                              (qp query))))
-
-    ([query context]
-     (wait-for-async-result (binding [*run-on-separate-thread?* false]
-                              (qp query context))))))
+  (fn qp* [& args]
+    (binding [*run-on-separate-thread?* false]
+      (wait-for-async-result (apply qp args)))))
 
 
 ;;; ------------------------------------------------- Other Util Fns -------------------------------------------------
diff --git a/test/metabase/query_processor/middleware/cache/impl_test.clj b/test/metabase/query_processor/middleware/cache/impl_test.clj
index 988291e671ab716c972b9886a0607b97e77f7908..9b01e3df3ab883ea3a136623014841e9558ffdee 100644
--- a/test/metabase/query_processor/middleware/cache/impl_test.clj
+++ b/test/metabase/query_processor/middleware/cache/impl_test.clj
@@ -1,6 +1,5 @@
 (ns metabase.query-processor.middleware.cache.impl-test
-  (:require [clojure.core.async :as a]
-            [clojure.test :refer :all]
+  (:require [clojure.test :refer :all]
             [metabase.query-processor.middleware.cache.impl :as impl]
             [potemkin.types :as p.types])
   (:import java.io.ByteArrayInputStream))
@@ -27,26 +26,26 @@
            (reduce rf (rf) rows)))))))
 
 (deftest e2e-test
-  (let [{:keys [in-chan out-chan]} (impl/serialize-async)]
-    (doseq [obj objects]
-      (a/put! in-chan obj))
-    (a/close! in-chan)
-    (let [[val] (a/alts!! [out-chan (a/timeout 1000)])]
-      (is (= objects
-             (if (instance? Throwable val)
-               (throw val)
-               (deserialize val)))))))
+  (impl/do-with-serialization
+   (fn [in result]
+     (doseq [obj objects]
+       (is (= nil
+              (in obj))))
+     (let [val (result)]
+       (is (instance? (Class/forName "[B") val))
+       (is (= objects
+              (if (instance? Throwable val)
+                (throw val)
+                (deserialize val))))))))
 
 (deftest max-bytes-test
-  (let [{:keys [in-chan out-chan]} (impl/serialize-async {:max-bytes 50})]
-    (doseq [obj objects]
-      (a/put! in-chan obj))
-    (a/close! in-chan)
-    (let [[val] (a/alts!! [out-chan (a/timeout 1000)])]
-      (is (thrown-with-msg?
-           Exception
-           #"Results are too large to cache\."
-           (if (instance? Throwable val)
-             (throw val)
-             val)))
-      nil)))
+  (impl/do-with-serialization
+   (fn [in result]
+     (doseq [obj objects]
+       (is (= nil
+              (in obj))))
+     (is (thrown-with-msg?
+          Exception
+          #"Results are too large to cache\."
+          (result))))
+   {:max-bytes 50}))
diff --git a/test/metabase/query_processor/middleware/cache_test.clj b/test/metabase/query_processor/middleware/cache_test.clj
index 1350008396c29359843a1520b58a22d4e550e4c4..95d35fada31a32251d15155ee5b6a4a89c8a9b58 100644
--- a/test/metabase/query_processor/middleware/cache_test.clj
+++ b/test/metabase/query_processor/middleware/cache_test.clj
@@ -13,12 +13,13 @@
             [metabase.models.query :as query :refer [Query]]
             [metabase.public-settings :as public-settings]
             [metabase.query-processor :as qp]
-            [metabase.query-processor.context.default  :as context.default]
+            [metabase.query-processor.context.default :as context.default]
             [metabase.query-processor.middleware.cache :as cache]
             [metabase.query-processor.middleware.cache-backend.interface :as i]
             [metabase.query-processor.middleware.cache.impl :as impl]
             [metabase.query-processor.middleware.cache.impl-test :as impl-test]
             [metabase.query-processor.middleware.process-userland-query :as process-userland-query]
+            [metabase.query-processor.reducible :as qp.reducible]
             [metabase.query-processor.streaming :as qp.streaming]
             [metabase.query-processor.util :as qputil]
             [metabase.server.middleware.session :as session]
@@ -32,6 +33,10 @@
 
 (use-fixtures :once (fixtures/initialize :db))
 
+(use-fixtures :each (fn [thunk]
+                      (mt/with-log-level :fatal
+                        (thunk))))
+
 (def ^:private ^:dynamic *save-chan*
   "Gets a message whenever results are saved to the test backend, or if the reducing function stops serializing results
   because of an Exception or if the byte threshold is passed."
@@ -52,7 +57,7 @@
       pretty/PrettyPrintable
       (pretty [_]
         (str "\n"
-             (metabase.util/pprint-to-str 'blue
+             (u/pprint-to-str 'blue
                (for [[hash {:keys [created]}] @store]
                  [hash (u/format-nanoseconds (.getNano (t/duration created (t/instant))))]))))
 
@@ -64,7 +69,7 @@
       i/CacheBackend
       (cached-results [this query-hash max-age-seconds respond]
         (let [hex-hash (codecs/bytes->hex query-hash)]
-          (log/tracef "Fetch results for %s store: %s" hex-hash this)
+          (log/tracef "Fetch results for %s store: %s" hex-hash (pretty/pretty this))
           (if-let [^bytes results (when-let [{:keys [created results]} (some (fn [[hash entry]]
                                                                                (when (= hash hex-hash)
                                                                                  entry))
@@ -79,7 +84,7 @@
         (let [hex-hash (codecs/bytes->hex query-hash)]
           (swap! store assoc hex-hash {:results results
                                        :created (t/instant)})
-          (log/tracef "Save results for %s --> store: %s" hex-hash this))
+          (log/tracef "Save results for %s --> store: %s" hex-hash (pretty/pretty this)))
         (a/>!! save-chan results))
 
       (purge-old-entries! [this max-age-seconds]
@@ -87,28 +92,28 @@
                        (into {} (filter (fn [[_ {:keys [created]}]]
                                           (t/after? created (t/minus (t/instant) (t/seconds max-age-seconds))))
                                         store))))
-        (log/tracef "Purge old entries --> store: %s" this)
+        (log/tracef "Purge old entries --> store: %s" (pretty/pretty this))
         (a/>!! purge-chan ::purge)))))
 
 (defn do-with-mock-cache [f]
-  (mt/with-open-channels [save-chan  (a/chan 1)
-                          purge-chan (a/chan 1)]
+  (mt/with-open-channels [save-chan  (a/chan 10)
+                          purge-chan (a/chan 10)]
     (mt/with-temporary-setting-values [enable-query-caching  true
                                        query-caching-max-ttl 60
                                        query-caching-min-ttl 0]
       (binding [cache/*backend* (test-backend save-chan purge-chan)
                 *save-chan*     save-chan
                 *purge-chan*    purge-chan]
-        (let [orig (var-get #'cache/cache-results-async!)]
-          (with-redefs [cache/cache-results-async! (fn [hash out-chan]
-                                                     (a/go
-                                                       ;; if `save-results!` isn't going to get called because
-                                                       ;; `out-chan` isn't a byte array then forward the result to
-                                                       ;; `save-chan` so it always gets a value
-                                                       (let [result (a/<! out-chan)]
-                                                         (when-not (bytes? result)
-                                                           (a/>!! save-chan result))))
-                                                     (orig hash out-chan))]
+        (let [orig @#'cache/serialized-bytes]
+          (with-redefs [cache/serialized-bytes (fn []
+                                                 ;; if `save-results!` isn't going to get called because `*result-fn*`
+                                                 ;; throws an Exception, catch it and send it to `save-chan` so it still
+                                                 ;; gets a result and tests can finish
+                                                 (try
+                                                   (orig)
+                                                   (catch Throwable e
+                                                     (a/>!! save-chan e)
+                                                     (throw e))))]
             (f {:save-chan save-chan, :purge-chan purge-chan})))))))
 
 (defmacro with-mock-cache [[& bindings] & body]
@@ -123,27 +128,30 @@
   ;; clear out stale values in save/purge channels
   (while (a/poll! *save-chan*))
   (while (a/poll! *purge-chan*))
-  (:metadata
-   (mt/test-qp-middleware
-    cache/maybe-return-cached-results
-    (test-query query-kvs)
-    {}
-    [[:toucan      71]
-     [:bald-eagle  92]
-     [:hummingbird 11]
-     [:owl         10]
-     [:chicken     69]
-     [:robin       96]
-     [:osprey      72]
-     [:flamingo    70]]
-    {:timeout 2000
-     :run     (fn []
-                (Thread/sleep *query-execution-delay-ms*))})))
+  (let [qp       (qp.reducible/sync-qp
+                  (qp.reducible/async-qp
+                   (cache/maybe-return-cached-results qp.reducible/identity-qp)))
+        metadata {}
+        rows     [[:toucan      71]
+                  [:bald-eagle  92]
+                  [:hummingbird 11]
+                  [:owl         10]
+                  [:chicken     69]
+                  [:robin       96]
+                  [:osprey      72]
+                  [:flamingo    70]]
+        query    (test-query query-kvs)
+        context  {:timeout  2000
+                  :executef (fn [_driver _query _context respond]
+                              (Thread/sleep *query-execution-delay-ms*)
+                              (respond metadata rows))}]
+    (-> (qp query context)
+        (assoc :data {}))))
 
 (defn- run-query [& args]
   (let [result (apply run-query* args)]
-    (is (= :completed
-           (:status result)))
+    (is (partial= {:status :completed}
+                  result))
     (if (:cached result)
       :cached
       :not-cached)))
@@ -274,9 +282,8 @@
                      (some? input-stream))))))
         (i/save-results! cache/*backend* query-hash (byte-array [0 0 0]))
         (testing "Invalid cache entry should be handled gracefully"
-          (mt/suppress-output
-            (is (= :not-cached
-                   (run-query)))))))))
+          (is (= :not-cached
+                 (run-query))))))))
 
 (deftest metadata-test
   (testing "Verify that correct metadata about caching such as `:updated_at` and `:cached` come back with cached results."
@@ -285,8 +292,6 @@
         (run-query)
         (mt/wait-for-result save-chan)
         (let [result (run-query*)]
-          (is (= true
-                 (:cached result)))
           (is (= {:data       {}
                   :cached     true
                   :updated_at #t "2020-02-19T02:31:07.798Z[UTC]"
diff --git a/test/metabase/query_processor/reducible_test.clj b/test/metabase/query_processor/reducible_test.clj
index 3d61fe181d72e5c6947140479c3f5f7942b57711..7a071f4bb2ebafed0d802a2098dd5eacfd91f0d2 100644
--- a/test/metabase/query_processor/reducible_test.clj
+++ b/test/metabase/query_processor/reducible_test.clj
@@ -147,7 +147,7 @@
   (testing "Rows don't actually have to be reducible. And you can build your own QP with your own middleware."
     (is (= {:data {:cols [{:name "n"}]
                    :rows [{:n 1} {:n 2} {:n 3} {:n 4} {:n 5}]}}
-           ((qp.reducible/sync-qp (qp.reducible/async-qp qp.reducible/pivot))
+           ((qp.reducible/sync-qp (qp.reducible/async-qp qp.reducible/identity-qp))
             {}
             {:executef (fn [_ _ _ respond]
                          (respond {:cols [{:name "n"}]}
diff --git a/test/metabase/test.clj b/test/metabase/test.clj
index 9ed3024f9b77f3284fc1d13e8d6e37d8aad67082..bd9ce1a2b14f55a8b7572120d3ca7bb22250eae4 100644
--- a/test/metabase/test.clj
+++ b/test/metabase/test.clj
@@ -3,6 +3,7 @@
 
   (Prefer using `metabase.test` to requiring bits and pieces from these various namespaces going forward, since it
   reduces the cognitive load required to write tests.)"
+  (:refer-clojure :exclude [compile])
   (:require clojure.data
             [clojure.test :refer :all]
             [clojure.tools.macro :as tools.macro]
diff --git a/test/metabase/test/util/log.clj b/test/metabase/test/util/log.clj
index 3d17ffb9109480ae11bacc1edec02b6be9d2be5c..98e3f0060534b48ef786e49a7e3c42936667b252 100644
--- a/test/metabase/test/util/log.clj
+++ b/test/metabase/test/util/log.clj
@@ -56,9 +56,11 @@
   "Execute `body` with all logging/`*out*`/`*err*` messages suppressed. Useful for avoiding cluttering up test output
   for tests with stacktraces and error messages from tests that are supposed to fail.
 
-  DEPRECATED -- you don't need to do this anymore. Tests now have a default log level of `CRITICAL` which means error
+  DEPRECATED -- you don't need to do this anymore. Tests now have a default log level of `FATAL` which means error
   logging will be suppressed by default. This macro predates the current test logging levels. You can remove usages of
-  this macro."
+  this macro.
+
+  If you want to suppress log messages for REPL usage you can use [[with-log-level]] instead."
   {:style/indent 0}
   [& body]
   `(do-with-suppressed-output (fn [] ~@body)))