From 11de16d6ee41ffef39f52e708ce5738d852f4b53 Mon Sep 17 00:00:00 2001
From: Cam Saul <1455846+camsaul@users.noreply.github.com>
Date: Mon, 23 Mar 2020 13:13:25 -1000
Subject: [PATCH] New HTTP request cancellation implementation (#12180)

---
 src/metabase/api/card.clj                     |  66 ++++----
 src/metabase/async/streaming_response.clj     | 146 +++++++++++++-----
 src/metabase/async/util.clj                   |  75 +++++----
 .../async/streaming_response_test.clj         |  38 ++++-
 test/metabase/async/util_test.clj             |  25 ++-
 .../query_processor/streaming_test.clj        |  19 ++-
 6 files changed, 232 insertions(+), 137 deletions(-)

diff --git a/src/metabase/api/card.clj b/src/metabase/api/card.clj
index 41df548b35d..ae4fd4d1dcd 100644
--- a/src/metabase/api/card.clj
+++ b/src/metabase/api/card.clj
@@ -202,17 +202,16 @@
   "Save `card-data` as a new Card on a separate thread. Returns a channel to fetch the response; closing this channel
   will cancel the save."
   [card-data]
-  (async.u/do-on-separate-thread
-   (fn []
-     (let [card (db/transaction
-                  ;; Adding a new card at `collection_position` could cause other cards in this
-                  ;; collection to change position, check that and fix it if needed
-                  (api/maybe-reconcile-collection-position! card-data)
-                  (db/insert! Card card-data))]
-       (events/publish-event! :card-create card)
-       ;; include same information returned by GET /api/card/:id since frontend replaces the Card it
-       ;; currently has with returned one -- See #4283
-       (hydrate card :creator :dashboard_count :can_write :collection)))))
+  (async.u/cancelable-thread
+    (let [card (db/transaction
+                 ;; Adding a new card at `collection_position` could cause other cards in this
+                 ;; collection to change position, check that and fix it if needed
+                 (api/maybe-reconcile-collection-position! card-data)
+                 (db/insert! Card card-data))]
+      (events/publish-event! :card-create card)
+      ;; include same information returned by GET /api/card/:id since frontend replaces the Card it
+      ;; currently has with returned one -- See #4283
+      (hydrate card :creator :dashboard_count :can_write :collection))))
 
 (defn- create-card-async!
   "Create a new Card asynchronously. Returns a channel for fetching the newly created Card, or an Exception if one was
@@ -393,29 +392,30 @@
       :else
       nil)))
 
-(defn- update-card-async! [{:keys [id], :as card-before-update} {:keys [archived], :as card-updates}]
+(defn- update-card-async!
+  "Update a Card asynchronously. Returns a `core.async` promise channel that will return updated Card."
+  [{:keys [id], :as card-before-update} {:keys [archived], :as card-updates}]
   ;; don't block our precious core.async thread, run the actual DB updates on a separate thread
-  (async.u/do-on-separate-thread
-   (fn []
-     ;; Setting up a transaction here so that we don't get a partially reconciled/updated card.
-     (db/transaction
-       (api/maybe-reconcile-collection-position! card-before-update card-updates)
-
-       ;; ok, now save the Card
-       (db/update! Card id
-         ;; `collection_id` and `description` can be `nil` (in order to unset them). Other values should only be
-         ;; modified if they're passed in as non-nil
-         (u/select-keys-when card-updates
-           :present #{:collection_id :collection_position :description}
-           :non-nil #{:dataset_query :display :name :visualization_settings :archived :enable_embedding
-                      :embedding_params :result_metadata})))
-     ;; Fetch the updated Card from the DB
-     (let [card (Card id)]
-       (delete-alerts-if-needed! card-before-update card)
-       (publish-card-update! card archived)
-       ;; include same information returned by GET /api/card/:id since frontend replaces the Card it currently
-       ;; has with returned one -- See #4142
-       (hydrate card :creator :dashboard_count :can_write :collection)))))
+  (async.u/cancelable-thread
+    ;; Setting up a transaction here so that we don't get a partially reconciled/updated card.
+    (db/transaction
+      (api/maybe-reconcile-collection-position! card-before-update card-updates)
+
+      ;; ok, now save the Card
+      (db/update! Card id
+        ;; `collection_id` and `description` can be `nil` (in order to unset them). Other values should only be
+        ;; modified if they're passed in as non-nil
+        (u/select-keys-when card-updates
+          :present #{:collection_id :collection_position :description}
+          :non-nil #{:dataset_query :display :name :visualization_settings :archived :enable_embedding
+                     :embedding_params :result_metadata})))
+    ;; Fetch the updated Card from the DB
+    (let [card (Card id)]
+      (delete-alerts-if-needed! card-before-update card)
+      (publish-card-update! card archived)
+      ;; include same information returned by GET /api/card/:id since frontend replaces the Card it currently
+      ;; has with returned one -- See #4142
+      (hydrate card :creator :dashboard_count :can_write :collection))))
 
 (api/defendpoint ^:returns-chan PUT "/:id"
   "Update a `Card`."
diff --git a/src/metabase/async/streaming_response.clj b/src/metabase/async/streaming_response.clj
index f2bc2187aac..3defd336ec0 100644
--- a/src/metabase/async/streaming_response.clj
+++ b/src/metabase/async/streaming_response.clj
@@ -4,6 +4,7 @@
             [clojure.tools.logging :as log]
             compojure.response
             [metabase.async.streaming-response.thread-pool :as thread-pool]
+            [metabase.async.util :as async.u]
             [metabase.server.protocols :as server.protocols]
             [metabase.util :as u]
             [metabase.util.i18n :refer [trs]]
@@ -13,11 +14,14 @@
              [response :as ring.response]
              [servlet :as ring.servlet]])
   (:import [java.io BufferedWriter OutputStream OutputStreamWriter]
+           java.nio.ByteBuffer
+           [java.nio.channels ClosedChannelException SocketChannel]
            java.nio.charset.StandardCharsets
            java.util.zip.GZIPOutputStream
            javax.servlet.AsyncContext
            javax.servlet.http.HttpServletResponse
-           org.eclipse.jetty.io.EofException))
+           org.eclipse.jetty.io.EofException
+           org.eclipse.jetty.server.Request))
 
 (defn- write-to-output-stream!
   ([^OutputStream os x]
@@ -71,22 +75,24 @@
       (write-error! os e)
       nil)))
 
-(defn- do-f-async [^AsyncContext async-context f ^OutputStream os finished-chan]
+(defn- do-f-async
+  "Runs `f` asynchronously on the streaming response `thread-pool`, returning immediately. When `f` finishes, completes (i.e., closes) Jetty
+  `async-context`."
+  [^AsyncContext async-context f ^OutputStream os finished-chan canceled-chan]
   {:pre [(some? os)]}
-  (let [canceled-chan (a/promise-chan)
-        task          (bound-fn []
-                        (try
-                          (do-f* f os finished-chan canceled-chan)
-                          (catch Throwable e
-                            (log/error e (trs "bound-fn caught unexpected Exception"))
-                            (a/>!! finished-chan :unexpected-error))
-                          (finally
-                            (a/>!! finished-chan (if (a/poll! canceled-chan)
-                                                   :canceled
-                                                   :completed))
-                            (a/close! finished-chan)
-                            (a/close! canceled-chan)
-                            (.complete async-context))))]
+  (let [task (bound-fn []
+               (try
+                 (do-f* f os finished-chan canceled-chan)
+                 (catch Throwable e
+                   (log/error e (trs "bound-fn caught unexpected Exception"))
+                   (a/>!! finished-chan :unexpected-error))
+                 (finally
+                   (a/>!! finished-chan (if (a/poll! canceled-chan)
+                                          :canceled
+                                          :completed))
+                   (a/close! finished-chan)
+                   (a/close! canceled-chan)
+                   (.complete async-context))))]
     (.submit (thread-pool/thread-pool) ^Runnable task)
     nil))
 
@@ -118,27 +124,83 @@
       ([ba offset length]
        (write-to-output-stream! @dlay ba offset length)))))
 
-(defn- respond
-  [{:keys [^HttpServletResponse response ^AsyncContext async-context request-map response-map]}
-   f {:keys [content-type], :as options} finished-chan]
+(def ^:private async-cancellation-poll-interval-ms
+  "How often to check whether the request was canceled by the client."
+  1000)
+
+(defn- canceled?
+  "Check whether the HTTP request has been canceled by the client.
+
+  This function attempts to read a single byte from the underlying TCP socket; if the request is canceled, `.read`
+  will return `-1`. Otherwise, since the entire request has already been read, `.read` *should* probably complete
+  immediately, returning `0`."
+  [^Request request]
   (try
-    (.setStatus response 202)
-    (let [gzip?   (should-gzip-response? request-map)
-          headers (cond-> (assoc (:headers response-map) "Content-Type" content-type)
-                    gzip? (assoc "Content-Encoding" "gzip"))]
-      (#'ring.servlet/set-headers response headers)
-      (let [output-stream-delay (output-stream-delay gzip? response)
-            delay-os            (delay-output-stream output-stream-delay)]
-        (do-f-async async-context f delay-os finished-chan)))
+    (let [^SocketChannel channel (.. request getHttpChannel getEndPoint getTransport)
+          buf    (ByteBuffer/allocate 1)
+          status (.read channel buf)]
+      (log/tracef "Check cancelation status: .read returned %d" status)
+      (neg? status))
+    (catch InterruptedException _
+      false)
+    (catch ClosedChannelException _
+      true)
     (catch Throwable e
-      (log/error e (trs "Unexpected exception in do-f-async"))
-      (try
-        (.sendError response 500 (.getMessage e))
-        (catch Throwable e
-          (log/error e (trs "Unexpected exception writing error response"))))
-      (a/>!! finished-chan :unexpected-error)
-      (a/close! finished-chan)
-      (.complete async-context))))
+      (log/error e (trs "Error determining whether HTTP request was canceled"))
+      false)))
+
+(def ^:private async-cancellation-poll-timeout-ms
+  "How long to wait for the cancelation check to complete (it should usually complete immediately -- see above -- but if
+  it doesn't, we don't want to block forever)."
+  1000)
+
+(defn- start-async-cancel-loop!
+  "Starts an async loop that checks whether the client has canceled HTTP `request` at some interval. If the client has
+  canceled the request, this sends a message to `canceled-chan`."
+  [request finished-chan canceled-chan]
+  (a/go-loop []
+    (let [poll-timeout-chan (a/timeout async-cancellation-poll-interval-ms)
+          [_ port]          (a/alts! [poll-timeout-chan finished-chan])]
+      (when (= port poll-timeout-chan)
+        (log/tracef "Checking cancelation status after waiting %s")
+        (let [canceled-status-chan (async.u/cancelable-thread (canceled? request))
+              status-timeout-chan  (a/timeout async-cancellation-poll-timeout-ms)
+              [canceled? port]     (a/alts! [finished-chan canceled-status-chan status-timeout-chan])]
+          ;; if `canceled-status-chan` *wasn't* the first channel to return (i.e., we either timed out or the request
+          ;; was completed) then close `canceled-status-chan` which will kill the underlying thread
+          (a/close! canceled-status-chan)
+          (when (= port status-timeout-chan)
+            (log/debug (trs "Check cancelation status timed out after {0}"
+                            (u/format-milliseconds async-cancellation-poll-timeout-ms))))
+          (when (not= port finished-chan)
+            (if canceled?
+              (a/>! canceled-chan ::request-canceled)
+              (recur))))))))
+
+(defn- respond
+  [{:keys [^HttpServletResponse response ^AsyncContext async-context request-map response-map request]}
+   f {:keys [content-type], :as options} finished-chan]
+  (let [canceled-chan (a/promise-chan)]
+    (try
+      (.setStatus response 202)
+      (let [gzip?   (should-gzip-response? request-map)
+            headers (cond-> (assoc (:headers response-map) "Content-Type" content-type)
+                      gzip? (assoc "Content-Encoding" "gzip"))]
+        (#'ring.servlet/set-headers response headers)
+        (let [output-stream-delay (output-stream-delay gzip? response)
+              delay-os            (delay-output-stream output-stream-delay)]
+          (start-async-cancel-loop! request finished-chan canceled-chan)
+          (do-f-async async-context f delay-os finished-chan canceled-chan)))
+      (catch Throwable e
+        (log/error e (trs "Unexpected exception in do-f-async"))
+        (try
+          (.sendError response 500 (.getMessage e))
+          (catch Throwable e
+            (log/error e (trs "Unexpected exception writing error response"))))
+        (a/>!! finished-chan :unexpected-error)
+        (a/close! finished-chan)
+        (a/close! canceled-chan)
+        (.complete async-context)))))
 
 (declare render)
 
@@ -159,7 +221,13 @@
   ;; async responses only
   compojure.response/Sendable
   (send* [this request respond* _]
-    (respond* (compojure.response/render this request))))
+    (respond* (compojure.response/render this request)))
+
+  ;; TODO -- if we want this to work when running via `lein ring server` we need to add an impl for
+  ;; `ring.core.protocols/StreamableResponseBody`. Not sure if we want to do that because it would result in different
+  ;; behavior when running via `lein ring server` vs `lein run`/uberjar. Maybe better just to take `lein ring server`
+  ;; out and replace it with an auto-reload version of `lein run`
+  )
 
 ;; TODO -- don't think any of this is needed any mo
 (defn- render [^StreamingResponse streaming-response gzip?]
@@ -185,7 +253,7 @@
   (->StreamingResponse f options (a/promise-chan)))
 
 (defmacro streaming-response
-  "Return an streaming response that writes keepalive newline bytes.
+  "Create an API response that streams results to an `OutputStream`.
 
   Minimal example:
 
@@ -198,9 +266,7 @@
   Current options:
 
   *  `:content-type` -- string content type to return in the results. This is required!
-  *  `:headers` -- other headers to include in the API response.
-  *  `:write-keepalive-newlines?` -- whether we should write keepalive newlines every `keepalive-interval-ms`. Default
-      `true`; you can disable this for formats where it wouldn't work, such as CSV."
+  *  `:headers` -- other headers to include in the API response."
   {:style/indent 2, :arglists '([options [os-binding canceled-chan-binding] & body])}
   [options [os-binding canceled-chan-binding :as bindings] & body]
   {:pre [(= (count bindings) 2)]}
diff --git a/src/metabase/async/util.clj b/src/metabase/async/util.clj
index 462d3cfcb3d..9e69c023983 100644
--- a/src/metabase/async/util.clj
+++ b/src/metabase/async/util.clj
@@ -2,11 +2,10 @@
   "Utility functions for core.async-based async logic."
   (:require [clojure.core.async :as a]
             [clojure.tools.logging :as log]
-            [metabase.util.i18n :refer [trs]]
             [schema.core :as s])
   (:import clojure.core.async.impl.buffers.PromiseBuffer
            clojure.core.async.impl.channels.ManyToManyChannel
-           java.util.concurrent.Future))
+           java.util.concurrent.ThreadPoolExecutor))
 
 ;; TODO - most of this stuff can be removed now that we have the new-new reducible/async QP implementation of early
 ;; 2020. No longer needed
@@ -36,42 +35,42 @@
       (a/close! out-chan)))
   nil)
 
-(s/defn ^:private do-on-separate-thread* :- Future
-  [out-chan f & args]
-  (future
-    (try
-      (log/debug (trs "Running {0} on separate thread..." f))
-      (try
-        (let [result (apply f args)]
-          (cond
-            (nil? result)
-            (log/warn (trs "Warning: {0} returned `nil`" f))
+(defn cancelable-thread-call
+  "Exactly like `a/thread-call`, with two differences:
 
-            (not (a/>!! out-chan result))
-            (log/error (trs "Unexpected error writing result to output channel: already closed"))))
-        ;; if we catch an Exception (shouldn't happen in a QP query, but just in case), send it to `chan`.
-        ;; It's ok, our IMPL of Ring `StreamableResponseBody` will do the right thing with it.
-        (catch Throwable e
-          (log/error e (trs "Caught error running {0}" f))
-          (when-not (a/>!! out-chan e)
-            (log/error e (trs "Unexpected error writing exception to output channel: already closed")))))
-      (finally
-        (a/close! out-chan)))))
+    1) the result channel is a promise channel instead of a regular channel
+    2) Closing the result channel early will cancel the async thread call."
+  [f]
+  ;; create two channels:
+  ;; * `done-chan` will always get closed immediately after `(f)` is finished
+  ;; * `result-chan` will get the result of `(f)`, *after* `done-chan` is closed
+  (let [done-chan   (a/promise-chan)
+        result-chan (a/promise-chan)
+        f*          (bound-fn []
+                      (let [result (try
+                                     (f)
+                                     (catch Throwable e
+                                       (log/trace e "cancelable-thread-call: caught exception in f")
+                                       e))]
+                        (a/close! done-chan)
+                        (when (some? result)
+                          (a/>!! result-chan result)))
+                      (a/close! result-chan))
+        futur       (.submit ^ThreadPoolExecutor (var-get (resolve 'clojure.core.async/thread-macro-executor)) ^Runnable f*)]
+    ;; if `result-chan` gets a result/closed *before* `done-chan`, it means it was closed by the caller, so we should
+    ;; cancel the thread running `f*`
+    (a/go
+      (let [[_ port] (a/alts! [done-chan result-chan] :priority true)]
+        (when (= port result-chan)
+          (log/trace "cancelable-thread-call: result channel closed before f finished; canceling thread")
+          (future-cancel futur))))
+    result-chan))
 
-(s/defn do-on-separate-thread :- PromiseChan
-  "Run `(apply f args)` on a separate thread, returns a channel to fetch the results. Closing this channel early will
-  cancel the future running the function, if possible.
+(defmacro cancelable-thread
+  "Exactly like `a/thread`, with two differences:
 
-  This is basically like `core.async/thread-call` but returns a promise channel instead of a regular channel and
-  cancels the execution of `f` if the channel closes early."
-  [f & args]
-  (let [out-chan (a/promise-chan)
-        ;; Run `f` on a separarate thread because it's a potentially long-running QP query and we don't want to tie
-        ;; up precious core.async threads
-        futur    (apply do-on-separate-thread* out-chan f args)]
-    ;; if output chan is closed early cancel the future
-    (a/go
-      (when (nil? (a/<! out-chan))
-        (log/debug (trs "Request canceled, canceling future."))
-        (future-cancel futur)))
-    out-chan))
+    1) the result channel is a promise channel instead of a regular channel
+    2) Closing the result channel early will cancel the async thread call."
+  {:style/indent 0}
+  [& body]
+  `(cancelable-thread-call (fn [] ~@body)))
diff --git a/test/metabase/async/streaming_response_test.clj b/test/metabase/async/streaming_response_test.clj
index bc9b7aec4de..e75b84fab63 100644
--- a/test/metabase/async/streaming_response_test.clj
+++ b/test/metabase/async/streaming_response_test.clj
@@ -3,11 +3,12 @@
             [clojure.core.async :as a]
             [clojure.test :refer :all]
             [metabase
-             [config :as config]
              [driver :as driver]
              [http-client :as test-client]
              [models :refer [Database]]
-             [test :as mt]]
+             [test :as mt]
+             [util :as u]]
+            [metabase.async.streaming-response :as streaming-response]
             [metabase.async.streaming-response.thread-pool :as thread-pool]
             [metabase.query-processor.context :as context])
   (:import java.util.concurrent.Executors
@@ -45,7 +46,10 @@
   "A core.async channel that will get a message when query execution starts."
   (atom nil))
 
-(defmacro ^:private with-start-execution-chan [[chan-binding] & body]
+(defmacro ^:private with-start-execution-chan
+  "Runs body with `chan-binding` bound to a core.async promise channel that will get a message once a query execution
+  starts running on the streaming response thread pool."
+  [[chan-binding] & body]
   `(mt/with-open-channels [chan# (a/promise-chan)]
      (try
        (reset! start-execution-chan chan#)
@@ -89,8 +93,7 @@
 (deftest truly-async-test
   (testing "StreamingResponses should truly be asynchronous, and not block Jetty threads while waiting for results"
     (with-test-driver-db
-      (let [max-threads        (or (config/config-int :mb-jetty-maxthreads) 50)
-            num-requests       (+ max-threads 20)
+      (let [num-requests       (+ thread-pool-size 20)
             remaining          (atom num-requests)
             session-token      (test-client/authenticate (mt/user->credentials :lucky))
             url                (test-client/build-url "dataset" nil)
@@ -110,3 +113,28 @@
               (testing "(Usually this is under 100ms but might be a little over if CircleCI is being slow)"
                 (let [elapsed-ms (- (System/currentTimeMillis) start-time-ms)]
                   (is (< elapsed-ms 500)))))))))))
+
+(deftest cancelation-test
+  (testing "Make sure canceling a HTTP request ultimately causes the query to be canceled"
+    (with-redefs [streaming-response/async-cancellation-poll-interval-ms 50]
+      (with-test-driver-db
+        (reset! canceled? false)
+        (with-start-execution-chan [start-chan]
+          (let [url           (test-client/build-url "dataset" nil)
+                session-token (test-client/authenticate (mt/user->credentials :lucky))
+                request       (test-client/build-request-map session-token
+                                                             {:database (mt/id)
+                                                              :type     "native"
+                                                              :native   {:query {:sleep 5000}}})
+                futur         (http/post url (assoc request :async? true) identity (fn [e] (throw e)))]
+            (is (future? futur))
+            ;; wait a little while for the query to start running -- this should usually happen fairly quickly
+            (mt/wait-for-result start-chan (u/seconds->ms 15))
+            (future-cancel futur)
+            ;; check every 50ms, up to 500ms, whether `canceled?` is now `true`
+            (is (= true
+                   (loop [[wait & more] (repeat 10 50)]
+                     (or @canceled?
+                         (when wait
+                           (Thread/sleep wait)
+                           (recur more))))))))))))
diff --git a/test/metabase/async/util_test.clj b/test/metabase/async/util_test.clj
index 02e3b12deac..a251d587ed4 100644
--- a/test/metabase/async/util_test.clj
+++ b/test/metabase/async/util_test.clj
@@ -64,26 +64,25 @@
       (is (= ::value
              (first (a/alts!! [out-chan-2 (a/timeout 1000)])))))))
 
-(deftest do-on-separate-thread-test
-  (testing "Make sure `do-on-separate-thread` can actually run a function correctly"
-    (tu.async/with-open-channels [result-chan (async.u/do-on-separate-thread (fn []
-                                                                               (Thread/sleep 100)
-                                                                               ::success))]
+(deftest cancelable-thread-test
+  (testing "Make sure `cancelable-thread` can actually run a function correctly"
+    (tu.async/with-open-channels [result-chan (async.u/cancelable-thread
+                                                (Thread/sleep 10)
+                                                ::success)]
       (is (= ::success
              (first (a/alts!! [result-chan (a/timeout 500)]))))))
 
-  (testing (str "when you close the result channel of `do-on-separate-thread,` it should cancel the future that's "
-                "running it. This will produce an InterruptedException")
+  (testing (str "when you close the result channel of `cancelable-thread`, it should cancel the future that's running "
+                "it. This will produce an InterruptedException")
     (tu.async/with-open-channels [started-chan  (a/chan 1)
                                   finished-chan (a/chan 1)]
-      (let [f           (fn []
+      (let [result-chan (async.u/cancelable-thread
                           (try
                             (a/>!! started-chan ::started)
                             (Thread/sleep 5000)
                             (a/>!! finished-chan ::finished)
                             (catch Throwable e
-                              (a/>!! finished-chan e))))
-            result-chan (async.u/do-on-separate-thread f)]
+                              (a/>!! finished-chan e))))]
         ;; wait for `f` to actually start running before we kill it. Otherwise it may not get started at all
         (a/go
           (a/alts!! [started-chan (a/timeout 1000)])
@@ -92,11 +91,11 @@
              InterruptedException
              (first (a/alts!! [finished-chan (a/timeout 1000)])))))))
 
-  (testing "We should be able to combine the `promise-pipe` and `do-on-separate-thread` and get results"
+  (testing "We should be able to combine the `promise-pipe` and `cancelable-thread` and get results"
     (letfn [(f []
-              (Thread/sleep 100)
+              (Thread/sleep 10)
               ::success)]
       (tu.async/with-open-channels [result-chan (a/promise-chan)]
-        (async.u/promise-pipe (async.u/do-on-separate-thread f) result-chan)
+        (async.u/promise-pipe (async.u/cancelable-thread-call f) result-chan)
         (is (= ::success
                (first (a/alts!! [result-chan (a/timeout 500)]))))))))
diff --git a/test/metabase/query_processor/streaming_test.clj b/test/metabase/query_processor/streaming_test.clj
index 1b95848522e..97ee7151144 100644
--- a/test/metabase/query_processor/streaming_test.clj
+++ b/test/metabase/query_processor/streaming_test.clj
@@ -1,5 +1,6 @@
 (ns metabase.query-processor.streaming-test
   (:require [cheshire.core :as json]
+            [clojure.core.async :as a]
             [clojure.data.csv :as csv]
             [clojure.test :refer :all]
             [dk.ative.docjure.spreadsheet :as spreadsheet]
@@ -52,14 +53,16 @@
 (defn- process-query-api-response-streaming [export-format query]
   (with-open [bos (ByteArrayOutputStream.)
               os  (BufferedOutputStream. bos)]
-    (let [streaming-response (qp.streaming/streaming-response [context export-format]
-                               (qp/process-query-async query (assoc context :timeout 5000)))]
-      (#'streaming-response/do-f-async (proxy [AsyncContext] []
-                                         (complete []))
-                                       (.f streaming-response)
-                                       os
-                                       (.donechan streaming-response))
-      (mt/wait-for-result (streaming-response/finished-chan streaming-response) 1000))
+    (mt/with-open-channels [canceled-chan (a/promise-chan)]
+      (let [streaming-response (qp.streaming/streaming-response [context export-format]
+                                 (qp/process-query-async query (assoc context :timeout 5000)))]
+        (#'streaming-response/do-f-async (proxy [AsyncContext] []
+                                           (complete []))
+                                         (.f streaming-response)
+                                         os
+                                         (.donechan streaming-response)
+                                         canceled-chan)
+        (mt/wait-for-result (streaming-response/finished-chan streaming-response) 1000)))
     (let [bytea (.toByteArray bos)]
       (with-open [is (BufferedInputStream. (ByteArrayInputStream. bytea))]
         (parse-result export-format is)))))
-- 
GitLab