Skip to content
Snippets Groups Projects
Commit d60ca3d1 authored by Oleksandr Yakushev's avatar Oleksandr Yakushev
Browse files

Revert "[streaming-response] Use HttpEndpoint.isOpen() instead of reading a...

Revert "[streaming-response] Use HttpEndpoint.isOpen() instead of reading a byte from TCP socket (#49460)"

This reverts commit 060baf0c.
parent 57d786df
Branches
Tags
No related merge requests found
......@@ -17,6 +17,7 @@
(jakarta.servlet AsyncContext)
(jakarta.servlet.http HttpServletResponse)
(java.io BufferedWriter OutputStream OutputStreamWriter)
(java.nio ByteBuffer)
(java.nio.channels ClosedChannelException SocketChannel)
(java.nio.charset StandardCharsets)
(java.util.zip GZIPOutputStream)
......@@ -168,14 +169,22 @@
(swap! *reported-types conj transport-type)))
(defn- canceled?
"Check whether the HTTP request has been canceled by the client. This function uses `HttpEndpoint.isOpen` method which
is not guaranteed to return false in case the client closes the connection, but it's the best heuristic we have."
"Check whether the HTTP request has been canceled by the client.
This function attempts to read a single byte from the underlying TCP socket; if the request is canceled, `.read`
will return `-1`. Otherwise, since the entire request has already been read, `.read` *should* probably complete
immediately, returning `0`."
[^Request request]
(try
(let [endpoint (.. request getHttpChannel getEndPoint)
is-open (.isOpen endpoint)]
(log/tracef "Check cancelation status: .isOpen returned %s" is-open)
(not is-open))
(let [transport (.. request getHttpChannel getEndPoint getTransport)]
(if-let [channel (get-channel transport)]
(let [buf (ByteBuffer/allocate 1)
status (.read channel buf)]
(log/tracef "Check cancelation status: .read returned %d" status)
(neg? status))
(do
(log-unexpected-transport! transport)
false)))
(catch InterruptedException _ false)
(catch ClosedChannelException _ true)
(catch Throwable e
......
......@@ -53,17 +53,17 @@
"A core.async channel that will get a message when query execution starts."
(atom nil))
;; (defmacro ^:private with-start-execution-chan
;; "Runs body with `chan-binding` bound to a core.async promise channel that will get a message once a query execution
;; starts running on the streaming response thread pool."
;; [[chan-binding] & body]
;; `(mt/with-open-channels [chan# (a/promise-chan)]
;; (try
;; (reset! start-execution-chan chan#)
;; (let [~chan-binding chan#]
;; ~@body)
;; (finally
;; (reset! start-execution-chan nil)))))
(defmacro ^:private with-start-execution-chan
"Runs body with `chan-binding` bound to a core.async promise channel that will get a message once a query execution
starts running on the streaming response thread pool."
[[chan-binding] & body]
`(mt/with-open-channels [chan# (a/promise-chan)]
(try
(reset! start-execution-chan chan#)
(let [~chan-binding chan#]
~@body)
(finally
(reset! start-execution-chan nil)))))
(defmethod driver/execute-reducible-query ::test-driver
[_driver {{{:keys [sleep]} :query} :native, database-id :database} _context respond]
......@@ -124,34 +124,32 @@
(let [elapsed-ms (- (System/currentTimeMillis) start-time-ms)]
(is (< elapsed-ms 500)))))))))))
;; TODO: this test has been commented in #49460 until the cancellation mechanism is reworked.
;; (deftest cancelation-test
;; (testing "Make sure canceling a HTTP request ultimately causes the query to be canceled"
;; (mt/test-helpers-set-global-values!
;; (with-redefs [streaming-response/async-cancellation-poll-interval-ms 50]
;; (with-test-driver-db!
;; (reset! canceled? false)
;; (with-start-execution-chan [start-chan]
;; (let [url (client/build-url "dataset" nil)
;; session-token (client/authenticate (mt/user->credentials :lucky))
;; request (client/build-request-map session-token
;; {:database (mt/id)
;; :type "native"
;; :native {:query {:sleep 5000}}}
;; nil)
;; futur (http/post url (assoc request :async? true) identity (fn [e] (throw e)))]
;; (is (future? futur))
;; ;; wait a little while for the query to start running -- this should usually happen fairly quickly
;; (mt/wait-for-result start-chan (u/seconds->ms 15))
;; (future-cancel futur)
;; ;; check every 10ms, up to 1000ms, whether `canceled?` is now `true`
;; (is (loop [[wait & more] (repeat 10 100)]
;; (or @canceled?
;; (when wait
;; (do
;; (Thread/sleep (long wait))
;; (recur more)))))))))))))
(deftest cancelation-test
(testing "Make sure canceling a HTTP request ultimately causes the query to be canceled"
(mt/test-helpers-set-global-values!
(with-redefs [streaming-response/async-cancellation-poll-interval-ms 50]
(with-test-driver-db!
(reset! canceled? false)
(with-start-execution-chan [start-chan]
(let [url (client/build-url "dataset" nil)
session-token (client/authenticate (mt/user->credentials :lucky))
request (client/build-request-map session-token
{:database (mt/id)
:type "native"
:native {:query {:sleep 5000}}}
nil)
futur (http/post url (assoc request :async? true) identity (fn [e] (throw e)))]
(is (future? futur))
;; wait a little while for the query to start running -- this should usually happen fairly quickly
(mt/wait-for-result start-chan (u/seconds->ms 15))
(future-cancel futur)
;; check every 10ms, up to 1000ms, whether `canceled?` is now `true`
(is (loop [[wait & more] (repeat 10 100)]
(or @canceled?
(when wait
(do
(Thread/sleep (long wait))
(recur more)))))))))))))
(def ^:private ^:dynamic *number-of-cans* nil)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment