Skip to content
Snippets Groups Projects
Unverified Commit 5aff96d6 authored by Cam Saul's avatar Cam Saul Committed by GitHub
Browse files

Fix API log timing for StreamingResponses (#11944)

parent e5d8439a
No related branches found
No related tags found
No related merge requests found
......@@ -107,7 +107,7 @@
(.flush writer))
(catch Throwable _))))
(defn- write-to-stream! [f {:keys [write-keepalive-newlines?]} ^OutputStream os]
(defn- write-to-stream! [f {:keys [write-keepalive-newlines?]} ^OutputStream os finished-chan]
(with-open-chan [canceled-chan (a/promise-chan)]
(with-open [os os
os (jetty-eof-canceling-output-stream os canceled-chan)
......@@ -118,9 +118,13 @@
(catch Throwable e
(write-error! os {:message (.getMessage e)}))
(finally
(.flush os))))))
(.flush os)
(a/>!! finished-chan (if (a/poll! canceled-chan)
:canceled
:done))
(a/close! finished-chan))))))
(p.types/deftype+ StreamingResponse [f options]
(p.types/deftype+ StreamingResponse [f options donechan]
pretty/PrettyPrintable
(pretty [_]
(list (symbol (str (.getCanonicalName StreamingResponse) \.)) f options))
......@@ -128,7 +132,7 @@
;; both sync and async responses
ring.protocols/StreamableResponseBody
(write-body-to-stream [_ _ os]
(write-to-stream! f options os))
(write-to-stream! f options os donechan))
;; async responses only
compojure.response/Sendable
......@@ -138,6 +142,12 @@
:headers (:headers options)
:status 202}))))
(defn finished-chan
"Fetch a promise channel that will get a message when a `StreamingResponse` is completely finished. Provided primarily
for logging purposes."
[^StreamingResponse response]
(.donechan response))
(defmacro streaming-response
"Return an streaming response that writes keepalive newline bytes.
......@@ -159,4 +169,5 @@
[options [os-binding canceled-chan-binding :as bindings] & body]
{:pre [(= (count bindings) 2)]}
`(->StreamingResponse (fn [~(vary-meta os-binding assoc :tag 'java.io.OutputStream) ~canceled-chan-binding] ~@body)
~options))
~options
(a/promise-chan)))
......@@ -6,12 +6,15 @@
[metabase
[server :as server]
[util :as u]]
[metabase.async.util :as async.u]
[metabase.async
[streaming-response :as streaming-response]
[util :as async.u]]
[metabase.middleware.util :as middleware.u]
[metabase.query-processor.middleware.async :as qp.middleware.async]
[metabase.util.i18n :refer [trs]]
[toucan.db :as db])
(:import clojure.core.async.impl.channels.ManyToManyChannel
metabase.async.streaming_response.StreamingResponse
org.eclipse.jetty.util.thread.QueuedThreadPool))
;; To simplify passing large amounts of arguments around most functions in this namespace take an "info" map that
......@@ -132,18 +135,24 @@
;; [async] wait for the pipe to close the canceled/finished channel and log the API response
(a/go
(let [result (a/<! chan)]
(log-info (assoc info :async-status (if (nil? result) "canceled" "completed")))))
;; [sync] return response as-is
response)
(log-info (assoc info :async-status (if (nil? result) "canceled" "completed"))))))
(defn- log-streaming-response [{{streaming-response :body, :as response} :response, :as info}]
;; [async] wait for the streaming response to be canceled/finished channel and log the API response
(let [finished-chan (streaming-response/finished-chan streaming-response)]
(a/go
(let [result (a/<! finished-chan)]
(log-info (assoc info :async-status (if (:canceled result) "canceled" "completed")))))))
(defn- logged-response
"Log an API response. Returns resonse, possibly modified (i.e., core.async channels will be wrapped); this value
should be passed to the normal `respond` function."
[{{:keys [body], :as response} :response, :as info}]
(if (instance? ManyToManyChannel body)
(log-core-async-response info)
(do (log-info info)
response)))
(condp instance? body
ManyToManyChannel (log-core-async-response info)
StreamingResponse (log-streaming-response info)
(log-info info))
response)
;;; +----------------------------------------------------------------------------------------------------------------+
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment