diff --git a/test/metabase/async/streaming_response_test.clj b/test/metabase/async/streaming_response_test.clj index ce89f9b45816322939e881672bba1eb37f97661e..4b7512b19bdeef5114ef134a0e7155c5ba1c9756 100644 --- a/test/metabase/async/streaming_response_test.clj +++ b/test/metabase/async/streaming_response_test.clj @@ -7,7 +7,8 @@ [driver :as driver] [http-client :as test-client] [models :refer [Database]] - [test :as mt]] + [test :as mt] + [util :as u]] [metabase.async.streaming-response :as streaming-response] [metabase.async.streaming-response.thread-pool :as thread-pool] [metabase.query-processor.context :as context]) @@ -42,11 +43,26 @@ (with-streaming-response-thread-pool ~@body)))) +(def ^:private start-execution-chan + "A core.async channel that will get a message when query execution starts." + (atom nil)) + +(defmacro ^:private with-start-execution-chan [[chan-binding] & body] + `(mt/with-open-channels [chan# (a/promise-chan)] + (try + (reset! start-execution-chan chan#) + (let [~chan-binding chan#] + ~@body) + (finally + (reset! start-execution-chan nil))))) + (defmethod driver/execute-reducible-query ::test-driver [_ {{{:keys [sleep]} :query} :native, database-id :database} context respond] {:pre [(integer? sleep) (integer? database-id)]} (let [futur (future (try + (when-let [chan @start-execution-chan] + (a/>!! chan ::started)) (Thread/sleep sleep) (respond {:cols [{:name "Sleep", :base_type :type/Integer}]} [[sleep]]) (catch InterruptedException e @@ -115,16 +131,18 @@ (with-redefs [streaming-response/keepalive-interval-ms 50] (with-test-driver-db (reset! canceled? false) - (let [url (test-client/build-url "dataset" nil) - session-token (test-client/authenticate (mt/user->credentials :lucky)) - request (test-client/build-request-map session-token - {:database (mt/id) - :type "native" - :native {:query {:sleep 5000}}}) - futur (http/post url (assoc request :async? true) identity (fn [e] (throw e)))] - (is (future? futur)) - (Thread/sleep 100) - (future-cancel futur) - (Thread/sleep 100) - (is (= true - @canceled?))))))) + (with-start-execution-chan [start-chan] + (let [url (test-client/build-url "dataset" nil) + session-token (test-client/authenticate (mt/user->credentials :lucky)) + request (test-client/build-request-map session-token + {:database (mt/id) + :type "native" + :native {:query {:sleep 5000}}}) + futur (http/post url (assoc request :async? true) identity (fn [e] (throw e)))] + (is (future? futur)) + ;; wait a little while for the query to start running -- this should usually happen fairly quickly + (mt/wait-for-result start-chan (u/seconds->ms 15)) + (future-cancel futur) + (Thread/sleep 200) + (is (= true + @canceled?))))))))