From da1b27c5e74883b0005ffef5a770f1debc1284ea Mon Sep 17 00:00:00 2001
From: Oleksandr Yakushev <alex@bytopia.org>
Date: Fri, 8 Nov 2024 21:39:40 +0200
Subject: [PATCH] Disable connection reuse for requests that serve streaming
 responses (#49773)

* Revert "[streaming-response] Use HttpEndpoint.isOpen() instead of reading a byte from TCP socket (#49460)"

This reverts commit 060baf0ce4628a773b685de4b769845c9bc0f6e5.

* Disable connection reuse for requests that serve streaming responses
---
 src/metabase/async/streaming_response.clj     | 29 +++++--
 .../async/streaming_response_test.clj         | 77 +++++++++----------
 2 files changed, 60 insertions(+), 46 deletions(-)

diff --git a/src/metabase/async/streaming_response.clj b/src/metabase/async/streaming_response.clj
index 641dbf1f328..a627b3f5d5f 100644
--- a/src/metabase/async/streaming_response.clj
+++ b/src/metabase/async/streaming_response.clj
@@ -17,6 +17,7 @@
    (jakarta.servlet AsyncContext)
    (jakarta.servlet.http HttpServletResponse)
    (java.io BufferedWriter OutputStream OutputStreamWriter)
+   (java.nio ByteBuffer)
    (java.nio.channels ClosedChannelException SocketChannel)
    (java.nio.charset StandardCharsets)
    (java.util.zip GZIPOutputStream)
@@ -168,14 +169,22 @@
     (swap! *reported-types conj transport-type)))
 
 (defn- canceled?
-  "Check whether the HTTP request has been canceled by the client. This function uses `HttpEndpoint.isOpen` method which
-  is not guaranteed to return false in case the client closes the connection, but it's the best heuristic we have."
+  "Check whether the HTTP request has been canceled by the client.
+
+  This function attempts to read a single byte from the underlying TCP socket; if the request is canceled, `.read`
+  will return `-1`. Otherwise, since the entire request has already been read, `.read` *should* probably complete
+  immediately, returning `0`."
   [^Request request]
   (try
-    (let [endpoint (.. request getHttpChannel getEndPoint)
-          is-open  (.isOpen endpoint)]
-      (log/tracef "Check cancelation status: .isOpen returned %s" is-open)
-      (not is-open))
+    (let [transport (.. request getHttpChannel getEndPoint getTransport)]
+      (if-let [channel (get-channel transport)]
+        (let [buf        (ByteBuffer/allocate 1)
+              status     (.read channel buf)]
+          (log/tracef "Check cancelation status: .read returned %d" status)
+          (neg? status))
+        (do
+          (log-unexpected-transport! transport)
+          false)))
     (catch InterruptedException _ false)
     (catch ClosedChannelException _ true)
     (catch Throwable e
@@ -217,7 +226,13 @@
     (try
       (.setStatus response (or status 202))
       (let [gzip?   (should-gzip-response? request-map)
-            headers (cond-> (assoc (merge headers (:headers response-map)) "Content-Type" content-type)
+            headers (cond-> (assoc (merge headers (:headers response-map))
+                                   "Content-Type" content-type
+                                   ;; Very important: connections which serve streaming responses SHOULD NOT be reused
+                                   ;; by the client because of `start-async-cancel-loop!`. The latter tries to read a
+                                   ;; byte from the input stream at some interval, and that may/will cause corruption
+                                   ;; of the subsequent requests that come through the reused connection (see #46071).
+                                   "Connection" "close")
                       gzip? (assoc "Content-Encoding" "gzip"))]
         (#'servlet/set-headers response headers)
         (let [output-stream-delay (output-stream-delay gzip? response)
diff --git a/test/metabase/async/streaming_response_test.clj b/test/metabase/async/streaming_response_test.clj
index 2add6012bba..e054ec6eb3a 100644
--- a/test/metabase/async/streaming_response_test.clj
+++ b/test/metabase/async/streaming_response_test.clj
@@ -53,17 +53,17 @@
   "A core.async channel that will get a message when query execution starts."
   (atom nil))
 
-;; (defmacro ^:private with-start-execution-chan
-;;   "Runs body with `chan-binding` bound to a core.async promise channel that will get a message once a query execution
-;;   starts running on the streaming response thread pool."
-;;   [[chan-binding] & body]
-;;   `(mt/with-open-channels [chan# (a/promise-chan)]
-;;      (try
-;;        (reset! start-execution-chan chan#)
-;;        (let [~chan-binding chan#]
-;;          ~@body)
-;;        (finally
-;;          (reset! start-execution-chan nil)))))
+(defmacro ^:private with-start-execution-chan
+  "Runs body with `chan-binding` bound to a core.async promise channel that will get a message once a query execution
+  starts running on the streaming response thread pool."
+  [[chan-binding] & body]
+  `(mt/with-open-channels [chan# (a/promise-chan)]
+     (try
+       (reset! start-execution-chan chan#)
+       (let [~chan-binding chan#]
+         ~@body)
+       (finally
+         (reset! start-execution-chan nil)))))
 
 (defmethod driver/execute-reducible-query ::test-driver
   [_driver {{{:keys [sleep]} :query} :native, database-id :database} _context respond]
@@ -124,34 +124,32 @@
                 (let [elapsed-ms (- (System/currentTimeMillis) start-time-ms)]
                   (is (< elapsed-ms 500)))))))))))
 
-;; TODO: this test has been commented in #49460 until the cancellation mechanism is reworked.
-
-;; (deftest cancelation-test
-;;   (testing "Make sure canceling a HTTP request ultimately causes the query to be canceled"
-;;     (mt/test-helpers-set-global-values!
-;;       (with-redefs [streaming-response/async-cancellation-poll-interval-ms 50]
-;;         (with-test-driver-db!
-;;           (reset! canceled? false)
-;;           (with-start-execution-chan [start-chan]
-;;             (let [url           (client/build-url "dataset" nil)
-;;                   session-token (client/authenticate (mt/user->credentials :lucky))
-;;                   request       (client/build-request-map session-token
-;;                                                           {:database (mt/id)
-;;                                                            :type     "native"
-;;                                                            :native   {:query {:sleep 5000}}}
-;;                                                           nil)
-;;                   futur         (http/post url (assoc request :async? true) identity (fn [e] (throw e)))]
-;;               (is (future? futur))
-;;               ;; wait a little while for the query to start running -- this should usually happen fairly quickly
-;;               (mt/wait-for-result start-chan (u/seconds->ms 15))
-;;               (future-cancel futur)
-;;               ;; check every 10ms, up to 1000ms, whether `canceled?` is now `true`
-;;               (is (loop [[wait & more] (repeat 10 100)]
-;;                     (or @canceled?
-;;                         (when wait
-;;                           (do
-;;                             (Thread/sleep (long wait))
-;;                             (recur more)))))))))))))
+(deftest cancelation-test
+  (testing "Make sure canceling a HTTP request ultimately causes the query to be canceled"
+    (mt/test-helpers-set-global-values!
+      (with-redefs [streaming-response/async-cancellation-poll-interval-ms 50]
+        (with-test-driver-db!
+          (reset! canceled? false)
+          (with-start-execution-chan [start-chan]
+            (let [url           (client/build-url "dataset" nil)
+                  session-token (client/authenticate (mt/user->credentials :lucky))
+                  request       (client/build-request-map session-token
+                                                          {:database (mt/id)
+                                                           :type     "native"
+                                                           :native   {:query {:sleep 5000}}}
+                                                          nil)
+                  futur         (http/post url (assoc request :async? true) identity (fn [e] (throw e)))]
+              (is (future? futur))
+              ;; wait a little while for the query to start running -- this should usually happen fairly quickly
+              (mt/wait-for-result start-chan (u/seconds->ms 15))
+              (future-cancel futur)
+              ;; check every 10ms, up to 1000ms, whether `canceled?` is now `true`
+              (is (loop [[wait & more] (repeat 10 100)]
+                    (or @canceled?
+                        (when wait
+                          (do
+                            (Thread/sleep (long wait))
+                            (recur more)))))))))))))
 
 (def ^:private ^:dynamic *number-of-cans* nil)
 
@@ -165,6 +163,7 @@
         (server.protocols/respond streaming-response
                                   {:response      (reify HttpServletResponse
                                                     (setStatus [_ _])
+                                                    (setHeader [_ _ _])
                                                     (getOutputStream [_]
                                                       (proxy [ServletOutputStream] []
                                                         (write
-- 
GitLab