diff --git a/project.clj b/project.clj index d0ac83ab26ed9259318b337b7e43c3b3a7282461..db982ea4083b96cf23faf6cec520072d08129564 100644 --- a/project.clj +++ b/project.clj @@ -76,7 +76,8 @@ [postgresql "9.3-1102.jdbc41"] ; Postgres driver [io.crate/crate-jdbc "2.1.6"] ; Crate JDBC driver [prismatic/schema "1.1.5"] ; Data schema declaration and validation library - [ring/ring-jetty-adapter "1.5.1"] ; Ring adapter using Jetty webserver (used to run a Ring server for unit tests) + [ring/ring-core "1.6.0"] + [ring/ring-jetty-adapter "1.6.0"] ; Ring adapter using Jetty webserver (used to run a Ring server for unit tests) [ring/ring-json "0.4.0"] ; Ring middleware for reading/writing JSON automatically [stencil "0.5.0"] ; Mustache templates for Clojure [toucan "1.0.3" ; Model layer, hydration, and DB utilities diff --git a/src/metabase/api/card.clj b/src/metabase/api/card.clj index 5863d4ebf813d986a1ba031eadd01c61f3696c35..12541c5dd35fff4213d8750dabf0008e5f72cc60 100644 --- a/src/metabase/api/card.clj +++ b/src/metabase/api/card.clj @@ -5,6 +5,7 @@ [compojure.core :refer [DELETE GET POST PUT]] [metabase [events :as events] + [middleware :as middleware] [public-settings :as public-settings] [query-processor :as qp] [util :as u]] @@ -12,6 +13,7 @@ [common :as api] [dataset :as dataset-api] [label :as label-api]] + [metabase.api.common.internal :refer [route-fn-name]] [metabase.models [card :as card :refer [Card]] [card-favorite :refer [CardFavorite]] @@ -467,5 +469,5 @@ (api/check-embedding-enabled) (db/select [Card :name :id], :enable_embedding true, :archived false)) - -(api/define-routes) +(api/define-routes + (middleware/streaming-json-response (route-fn-name 'POST "/:card-id/query"))) diff --git a/src/metabase/api/common.clj b/src/metabase/api/common.clj index 0e18189aa8075b6306801d0ece1df95d38337e2a..4b8514c881880868c17a05d7c98940a6083f1f18 100644 --- a/src/metabase/api/common.clj +++ b/src/metabase/api/common.clj @@ -264,7 +264,7 @@ (s/replace #"^metabase\." "") (s/replace #"\." "/")) (u/pprint-to-str (concat api-routes additional-routes)))) - ~@api-routes ~@additional-routes))) + ~@additional-routes ~@api-routes))) ;;; ------------------------------------------------------------ PERMISSIONS CHECKING HELPER FNS ------------------------------------------------------------ diff --git a/src/metabase/api/dataset.clj b/src/metabase/api/dataset.clj index 548e15d97b8309c874967b1843ee4b2d1dcf9d1b..7666d83e7fd55382770e080c7339684a33fe9961 100644 --- a/src/metabase/api/dataset.clj +++ b/src/metabase/api/dataset.clj @@ -6,9 +6,11 @@ [compojure.core :refer [POST]] [dk.ative.docjure.spreadsheet :as spreadsheet] [metabase + [middleware :as middleware] [query-processor :as qp] [util :as u]] [metabase.api.common :as api] + [metabase.api.common.internal :refer [route-fn-name]] [metabase.models [database :refer [Database]] [query :as query]] @@ -124,5 +126,5 @@ (qp/dataset-query (dissoc query :constraints) {:executed-by api/*current-user-id*, :context (export-format->context export-format)})))) - -(api/define-routes) +(api/define-routes + (middleware/streaming-json-response (route-fn-name 'POST "/"))) diff --git a/src/metabase/core.clj b/src/metabase/core.clj index c36cfe2750630b5b58c2ce441bb1605d12bcbe9e..16cc4c3c09a137e5a9ce566e60b7cc9b4344dcbc 100644 --- a/src/metabase/core.clj +++ b/src/metabase/core.clj @@ -37,7 +37,7 @@ (def ^:private app "The primary entry point to the Ring HTTP server." - (-> routes/routes + (-> #'routes/routes ; the #' is to allow tests to redefine endpoints mb-middleware/log-api-call mb-middleware/add-security-headers ; Add HTTP headers to API responses to prevent them from being cached (wrap-json-body ; extracts json POST body and makes it avaliable on request diff --git a/src/metabase/middleware.clj b/src/metabase/middleware.clj index a521f291aeee14404af9ccd4c6c5a8b1d2a0d07f..5e2467e44bff706c5269cb29b60395e045bfa05a 100644 --- a/src/metabase/middleware.clj +++ b/src/metabase/middleware.clj @@ -1,6 +1,10 @@ (ns metabase.middleware "Metabase-specific middleware functions & configuration." - (:require [cheshire.generate :refer [add-encoder encode-nil encode-str]] + (:require [cheshire + [core :as json] + [generate :refer [add-encoder encode-nil encode-str]]] + [clojure.core.async :as async] + [clojure.java.io :as io] [clojure.tools.logging :as log] [metabase [config :as config] @@ -15,10 +19,13 @@ [setting :refer [defsetting]] [user :as user :refer [User]]] monger.json + [ring.core.protocols :as protocols] + [ring.util.response :as response] [toucan [db :as db] [models :as models]]) - (:import com.fasterxml.jackson.core.JsonGenerator)) + (:import com.fasterxml.jackson.core.JsonGenerator + java.io.OutputStream)) ;;; # ------------------------------------------------------------ UTIL FNS ------------------------------------------------------------ @@ -354,3 +361,75 @@ (handler request)) (catch Throwable e {:status 400, :body (.getMessage e)})))) + +;;; ------------------------------------------------------------ EXCEPTION HANDLING ------------------------------------------------------------ + +(def ^:private ^:const streaming-response-keep-alive-interval-ms + "Interval between sending newline characters to keep Heroku from terminating + requests like queries that take a long time to complete." + (* 1 1000)) + +;; Handle ring response maps that contain a core.async chan in the :body key: +;; +;; {:status 200 +;; :body (async/chan)} +;; +;; and send each string sent to that queue back to the browser as it arrives +;; this avoids output buffering in the default stream handling which was not sending +;; any responses until ~5k characters where in the queue. +(extend-protocol protocols/StreamableResponseBody + clojure.core.async.impl.channels.ManyToManyChannel + (write-body-to-stream [output-queue _ ^OutputStream output-stream] + (log/debug (u/format-color 'green "starting streaming request")) + (with-open [out (io/writer output-stream)] + (loop [chunk (async/<!! output-queue)] + (when-not (= chunk ::EOF) + (.write out (str chunk)) + (try + (.flush out) + (catch org.eclipse.jetty.io.EofException e + (log/info (u/format-color 'yellow "connection closed, canceling request %s" (type e))) + (async/close! output-queue) + (throw e))) + (recur (async/<!! output-queue))))))) + +(defn streaming-json-response + "This midelware assumes handlers fail early or return success + Run the handler in a future and send newlines to keep the connection open + and help detect when the browser is no longer listening for the response. + Waits for one second to see if the handler responds immediately, If it does + then there is no need to stream the response and it is sent back directly. + In cases where it takes longer than a second, assume the eventual result will + be a success and start sending newlines to keep the connection open." + [handler] + (fn [request] + (let [response (future (handler request)) + optimistic-response (deref response streaming-response-keep-alive-interval-ms ::no-immediate-response)] + (if (= optimistic-response ::no-immediate-response) + ;; if we didn't get a normal response in the first poling interval assume it's going to be slow + ;; and start sending keepalive packets. + (let [output (async/chan 1)] + ;; the output channel will be closed by the adapter when the incoming connection is closed. + (future + (loop [] + (Thread/sleep streaming-response-keep-alive-interval-ms) + (when-not (realized? response) + (log/debug (u/format-color 'blue "Response not ready, writing one byte & sleeping...")) + ;; a newline padding character is used because it forces output flushing in jetty. + ;; if sending this character fails because the connection is closed, the chan will then close. + ;; Newlines are no-ops when reading JSON which this depends upon. + (when-not (async/>!! output "\n") + (log/info (u/format-color 'yellow "canceled request %s" (future-cancel response))) + (future-cancel response)) ;; try our best to kill the thread running the query. + (recur)))) + (future + (try + ;; This is the part where we make this assume it's a JSON response we are sending. + (async/>!! output (json/encode (:body @response))) + (finally + (async/>!! output ::EOF) + (async/close! response)))) + ;; here we assume a successful response will be written to the output channel. + (assoc (response/response output) + :content-type "applicaton/json")) + optimistic-response)))) diff --git a/test/metabase/middleware_test.clj b/test/metabase/middleware_test.clj index e67b97392181bfdeaf83a484eb69696b59158206..90175af670f6a37eb34c3cf14472ce4a174e0676 100644 --- a/test/metabase/middleware_test.clj +++ b/test/metabase/middleware_test.clj @@ -1,13 +1,20 @@ (ns metabase.middleware-test (:require [cheshire.core :as json] + [clojure.core.async :as async] + [clojure.java.io :as io] + [clojure.tools.logging :as log] + [compojure.core :refer [GET]] [expectations :refer :all] [metabase - [middleware :refer :all] + [config :as config] + [middleware :as middleware :refer :all] + [routes :as routes] [util :as u]] [metabase.api.common :refer [*current-user* *current-user-id*]] [metabase.models.session :refer [Session]] [metabase.test.data.users :refer :all] [ring.mock.request :as mock] + [ring.util.response :as resp] [toucan.db :as db])) ;; =========================== TEST wrap-session-id middleware =========================== @@ -176,3 +183,95 @@ (expect "{\"my-bytes\":\"0xC42360D7\"}" (json/generate-string {:my-bytes (byte-array [196 35 96 215 8 106 108 248 183 215 244 143 17 160 53 186 213 30 116 25 87 31 123 172 207 108 47 107 191 215 76 92])})) +;;; stuff here + +(defn- streaming-fast-success [_] + (resp/response {:success true})) + +(defn- streaming-fast-failure [_] + (throw (Exception. "immediate failure"))) + +(defn- streaming-slow-success [_] + (Thread/sleep 7000) + (resp/response {:success true})) + +(defn- streaming-slow-failure [_] + (Thread/sleep 7000) + (throw (Exception. "delayed failure"))) + +(defn- test-streaming-endpoint [handler] + (let [path (str handler)] + (with-redefs [metabase.routes/routes (compojure.core/routes + (GET (str "/" path) [] (middleware/streaming-json-response + handler)))] + (let [connection (async/chan 1000) + reader (io/input-stream (str "http://localhost:" (config/config-int :mb-jetty-port) "/" path))] + (async/go-loop [next-char (.read reader)] + (if (pos? next-char) + (do + (async/>! connection (char next-char)) + (recur (.read reader))) + (async/close! connection))) + (let [_ (Thread/sleep 1500) + first-second (async/poll! connection) + _ (Thread/sleep 1000) + second-second (async/poll! connection) + eventually (apply str (async/<!! (async/into [] connection)))] + [first-second second-second eventually]))))) + + +;;slow success +(expect + [\newline \newline "\n\n\n{\"success\":true}"] + (test-streaming-endpoint streaming-slow-success)) + +;; immediate success should have no padding +(expect + [\{ \" "success\":true}"] + (test-streaming-endpoint streaming-fast-success)) + +;; we know delayed failures (exception thrown) will just drop the connection +(expect + [\newline \newline "\n\n\n"] + (test-streaming-endpoint streaming-slow-failure)) + +;; immediate failures (where an exception is thown will return a 500 +(expect + #"Server returned HTTP response code: 500 for URL:.*" + (try + (test-streaming-endpoint streaming-fast-failure) + (catch java.io.IOException e + (.getMessage e)))) + +;; test that handler is killed when connection closes +(def test-slow-handler-state (atom :unset)) + +(defn- test-slow-handler [_] + (log/debug (u/format-color 'yellow "starting test-slow-handler")) + (Thread/sleep 7000) ;; this is somewhat long to make sure the keepalive polling has time to kill it. + (reset! test-slow-handler-state :ran-to-compleation) + (log/debug (u/format-color 'yellow "finished test-slow-handler")) + (resp/response {:success true})) + +(defn- start-and-maybe-kill-test-request [kill?] + (reset! test-slow-handler-state :initial-state) + (let [path "test-slow-handler"] + (with-redefs [metabase.routes/routes (compojure.core/routes + (GET (str "/" path) [] (middleware/streaming-json-response + test-slow-handler)))] + (let [reader (io/input-stream (str "http://localhost:" (config/config-int :mb-jetty-port) "/" path))] + (Thread/sleep 1500) + (when kill? + (.close reader)) + (Thread/sleep 10000)))) ;; this is long enough to ensure that the handler has run to completion if it was not killed. + @test-slow-handler-state) + +;; In this first test we will close the connection before the test handler gets to change the state +(expect + :initial-state + (start-and-maybe-kill-test-request true)) + +;; and to make sure this test actually works, run the same test again and let it change the state. +(expect + :ran-to-compleation + (start-and-maybe-kill-test-request false))