Skip to content
Snippets Groups Projects
Commit da1b27c5 authored by Oleksandr Yakushev's avatar Oleksandr Yakushev Committed by Metabase bot
Browse files

Disable connection reuse for requests that serve streaming responses (#49773)

* Revert "[streaming-response] Use HttpEndpoint.isOpen() instead of reading a byte from TCP socket (#49460)"

This reverts commit 060baf0c.

* Disable connection reuse for requests that serve streaming responses
parent 4ae6992a
No related tags found
No related merge requests found
......@@ -17,6 +17,7 @@
(jakarta.servlet AsyncContext)
(jakarta.servlet.http HttpServletResponse)
(java.io BufferedWriter OutputStream OutputStreamWriter)
(java.nio ByteBuffer)
(java.nio.channels ClosedChannelException SocketChannel)
(java.nio.charset StandardCharsets)
(java.util.zip GZIPOutputStream)
......@@ -168,14 +169,22 @@
(swap! *reported-types conj transport-type)))
(defn- canceled?
"Check whether the HTTP request has been canceled by the client. This function uses `HttpEndpoint.isOpen` method which
is not guaranteed to return false in case the client closes the connection, but it's the best heuristic we have."
"Check whether the HTTP request has been canceled by the client.
This function attempts to read a single byte from the underlying TCP socket; if the request is canceled, `.read`
will return `-1`. Otherwise, since the entire request has already been read, `.read` *should* probably complete
immediately, returning `0`."
[^Request request]
(try
(let [endpoint (.. request getHttpChannel getEndPoint)
is-open (.isOpen endpoint)]
(log/tracef "Check cancelation status: .isOpen returned %s" is-open)
(not is-open))
(let [transport (.. request getHttpChannel getEndPoint getTransport)]
(if-let [channel (get-channel transport)]
(let [buf (ByteBuffer/allocate 1)
status (.read channel buf)]
(log/tracef "Check cancelation status: .read returned %d" status)
(neg? status))
(do
(log-unexpected-transport! transport)
false)))
(catch InterruptedException _ false)
(catch ClosedChannelException _ true)
(catch Throwable e
......@@ -217,7 +226,13 @@
(try
(.setStatus response (or status 202))
(let [gzip? (should-gzip-response? request-map)
headers (cond-> (assoc (merge headers (:headers response-map)) "Content-Type" content-type)
headers (cond-> (assoc (merge headers (:headers response-map))
"Content-Type" content-type
;; Very important: connections which serve streaming responses SHOULD NOT be reused
;; by the client because of `start-async-cancel-loop!`. The latter tries to read a
;; byte from the input stream at some interval, and that may/will cause corruption
;; of the subsequent requests that come through the reused connection (see #46071).
"Connection" "close")
gzip? (assoc "Content-Encoding" "gzip"))]
(#'servlet/set-headers response headers)
(let [output-stream-delay (output-stream-delay gzip? response)
......
......@@ -53,17 +53,17 @@
"A core.async channel that will get a message when query execution starts."
(atom nil))
;; (defmacro ^:private with-start-execution-chan
;; "Runs body with `chan-binding` bound to a core.async promise channel that will get a message once a query execution
;; starts running on the streaming response thread pool."
;; [[chan-binding] & body]
;; `(mt/with-open-channels [chan# (a/promise-chan)]
;; (try
;; (reset! start-execution-chan chan#)
;; (let [~chan-binding chan#]
;; ~@body)
;; (finally
;; (reset! start-execution-chan nil)))))
(defmacro ^:private with-start-execution-chan
"Runs body with `chan-binding` bound to a core.async promise channel that will get a message once a query execution
starts running on the streaming response thread pool."
[[chan-binding] & body]
`(mt/with-open-channels [chan# (a/promise-chan)]
(try
(reset! start-execution-chan chan#)
(let [~chan-binding chan#]
~@body)
(finally
(reset! start-execution-chan nil)))))
(defmethod driver/execute-reducible-query ::test-driver
[_driver {{{:keys [sleep]} :query} :native, database-id :database} _context respond]
......@@ -124,34 +124,32 @@
(let [elapsed-ms (- (System/currentTimeMillis) start-time-ms)]
(is (< elapsed-ms 500)))))))))))
;; TODO: this test has been commented in #49460 until the cancellation mechanism is reworked.
;; (deftest cancelation-test
;; (testing "Make sure canceling a HTTP request ultimately causes the query to be canceled"
;; (mt/test-helpers-set-global-values!
;; (with-redefs [streaming-response/async-cancellation-poll-interval-ms 50]
;; (with-test-driver-db!
;; (reset! canceled? false)
;; (with-start-execution-chan [start-chan]
;; (let [url (client/build-url "dataset" nil)
;; session-token (client/authenticate (mt/user->credentials :lucky))
;; request (client/build-request-map session-token
;; {:database (mt/id)
;; :type "native"
;; :native {:query {:sleep 5000}}}
;; nil)
;; futur (http/post url (assoc request :async? true) identity (fn [e] (throw e)))]
;; (is (future? futur))
;; ;; wait a little while for the query to start running -- this should usually happen fairly quickly
;; (mt/wait-for-result start-chan (u/seconds->ms 15))
;; (future-cancel futur)
;; ;; check every 10ms, up to 1000ms, whether `canceled?` is now `true`
;; (is (loop [[wait & more] (repeat 10 100)]
;; (or @canceled?
;; (when wait
;; (do
;; (Thread/sleep (long wait))
;; (recur more)))))))))))))
(deftest cancelation-test
(testing "Make sure canceling a HTTP request ultimately causes the query to be canceled"
(mt/test-helpers-set-global-values!
(with-redefs [streaming-response/async-cancellation-poll-interval-ms 50]
(with-test-driver-db!
(reset! canceled? false)
(with-start-execution-chan [start-chan]
(let [url (client/build-url "dataset" nil)
session-token (client/authenticate (mt/user->credentials :lucky))
request (client/build-request-map session-token
{:database (mt/id)
:type "native"
:native {:query {:sleep 5000}}}
nil)
futur (http/post url (assoc request :async? true) identity (fn [e] (throw e)))]
(is (future? futur))
;; wait a little while for the query to start running -- this should usually happen fairly quickly
(mt/wait-for-result start-chan (u/seconds->ms 15))
(future-cancel futur)
;; check every 10ms, up to 1000ms, whether `canceled?` is now `true`
(is (loop [[wait & more] (repeat 10 100)]
(or @canceled?
(when wait
(do
(Thread/sleep (long wait))
(recur more)))))))))))))
(def ^:private ^:dynamic *number-of-cans* nil)
......@@ -165,6 +163,7 @@
(server.protocols/respond streaming-response
{:response (reify HttpServletResponse
(setStatus [_ _])
(setHeader [_ _ _])
(getOutputStream [_]
(proxy [ServletOutputStream] []
(write
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment