Skip to content
Snippets Groups Projects
Commit e3d3fc1e authored by Arthur Ulfeldt's avatar Arthur Ulfeldt
Browse files

use core.async to stream responses

i needed a way to have backpressure on the queue kill the request thread
parent 7e846988
No related branches found
No related tags found
No related merge requests found
......@@ -19,7 +19,8 @@
[ring.core.protocols :as protocols]
[toucan
[db :as db]
[models :as models]])
[models :as models]]
[clojure.core.async :as async])
(:import com.fasterxml.jackson.core.JsonGenerator
java.io.OutputStream
[java.util.concurrent LinkedBlockingQueue]))
......@@ -377,20 +378,20 @@
;; this avoids output buffering in the default stream handling which was not sending
;; any responses until ~5k characters where in the queue.
(extend-protocol protocols/StreamableResponseBody
LinkedBlockingQueue
clojure.core.async.impl.channels.ManyToManyChannel
(write-body-to-stream [output-queue _ ^OutputStream output-stream]
(log/debug (u/format-color 'blue "starting streaming request"))
(log/debug (u/format-color 'green "starting streaming request"))
(with-open [out (io/writer output-stream)]
(loop [chunk (.take output-queue)]
(log/error (u/format-color 'green "streaming chunk"))
(loop [chunk (async/<!! output-queue)]
(when-not (= chunk ::EOF)
(.write out (str chunk))
(try
(.flush out)
(catch Exception e
(log/info (u/format-color 'yellow "connection closed, canceling request %s" (type e)))
(async/close! output-queue)
(throw e)))
(recur (.take output-queue)))))))
(recur (async/<!! output-queue)))))))
(def ^:private ^:const streaming-response-keep-alive-interval-ms
"Interval between sending whitespace bytes to keep Heroku from terminating
......@@ -410,7 +411,7 @@
(let [response (future (handler request))
optomistic-response (deref response streaming-response-keep-alive-interval-ms ::no-immediate-response)]
(if (= optomistic-response ::no-immediate-response)
(let [output-queue (LinkedBlockingQueue.)]
(let [output-queue (async/chan 1)]
(future
(try
(loop []
......@@ -418,15 +419,15 @@
(when-not (realized? response)
(log/error (u/format-color 'blue "Response is %s" (realized? response)))
(log/error (u/format-color 'blue "Response not ready, writing one byte & sleeping..."))
(.put output-queue "\n") ;; a newline padding character is used because it forces output flushing in jetty.
(when-not (async/>!! output-queue "\n")
(log/error (u/format-color 'yellow "canceled request %s" (future-cancel response)))
(future-cancel response)) ;; a newline padding character is used because it forces output flushing in jetty.
(recur)))
(log/error (u/format-color 'blue "canceling request %s" (future-cancel response)))
(log/error (u/format-color 'red "loop returned" @response))
(catch Exception e
#_(catch Exception e
(log/error (u/format-color 'red "caught exception" e)))))
(future
(.put output-queue @response)
(.put output-queue ::EOF))
(async/>!! output-queue @response)
(async/>!! output-queue ::EOF))
{:status 200
:body output-queue})
optomistic-response))))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment