From e8692e1380bc538b76f234037a3569b45b2074ab Mon Sep 17 00:00:00 2001
From: Cam Saul <cammsaul@gmail.com>
Date: Tue, 16 Apr 2019 14:25:34 -0700
Subject: [PATCH] Async QP middleware pattern :twisted_rightwards_arrows:

---
 src/metabase/async/util.clj                   | 26 ++++++++-
 src/metabase/query_processor.clj              | 12 ++++-
 .../query_processor/middleware/async.clj      | 53 +++++++++++++++++++
 .../query_processor/middleware/cache.clj      | 36 ++++++-------
 .../middleware/catch_exceptions.clj           | 14 +++--
 .../middleware/constraints.clj                |  3 +-
 .../middleware/normalize_query.clj            |  3 +-
 .../middleware/process_userland_query.clj     |  9 ++--
 .../query_processor/middleware/validate.clj   |  4 +-
 test/metabase/async/util_test.clj             | 27 ++++++++++
 .../query_processor/middleware/cache_test.clj | 13 +++--
 .../middleware/catch_exceptions_test.clj      | 32 ++++++++++-
 12 files changed, 193 insertions(+), 39 deletions(-)
 create mode 100644 src/metabase/query_processor/middleware/async.clj

diff --git a/src/metabase/async/util.clj b/src/metabase/async/util.clj
index 552e0035543..c0f33843291 100644
--- a/src/metabase/async/util.clj
+++ b/src/metabase/async/util.clj
@@ -1,9 +1,33 @@
 (ns metabase.async.util
+  "Utility functions for core.async-based async logic."
   (:require [clojure.core.async :as a]
             [clojure.tools.logging :as log]
             [metabase.util.i18n :refer [trs]]
             [schema.core :as s])
-  (:import clojure.core.async.impl.channels.ManyToManyChannel))
+  (:import clojure.core.async.impl.buffers.PromiseBuffer
+           clojure.core.async.impl.channels.ManyToManyChannel))
+
+(defn promise-chan?
+  "Is core.async `chan` a `promise-chan`?"
+  [chan]
+  (and (instance? ManyToManyChannel chan)
+       (instance? PromiseBuffer (.buf ^ManyToManyChannel chan))))
+
+(def PromiseChan
+  "Schema for a core.async promise channel."
+  (s/constrained ManyToManyChannel promise-chan? "promise chan"))
+
+(s/defn promise-canceled-chan :- PromiseChan
+  "Given a `promise-chan`, return a new channel that will receive a single message if `promise-chan` is closed before
+  a message is written to it (i.e. if an API request is canceled). Automatically closes after `promise-chan` receives
+  a message or is closed."
+  [promise-chan :- PromiseChan]
+  (let [canceled-chan (a/promise-chan)]
+    (a/go
+      (when (nil? (a/<! promise-chan))
+        (a/>! canceled-chan ::canceled))
+      (a/close! canceled-chan))
+    canceled-chan))
 
 (s/defn single-value-pipe :- ManyToManyChannel
   "Pipe that will forward a single message from `in-chan` to `out-chan`, closing both afterward. If `out-chan` is closed
diff --git a/src/metabase/query_processor.clj b/src/metabase/query_processor.clj
index e9065cf0f63..e31095d4392 100644
--- a/src/metabase/query_processor.clj
+++ b/src/metabase/query_processor.clj
@@ -11,6 +11,7 @@
              [add-row-count-and-status :as row-count-and-status]
              [add-settings :as add-settings]
              [annotate :as annotate]
+             [async :as async]
              [auto-bucket-datetimes :as bucket-datetime]
              [bind-effective-timezone :as bind-timezone]
              [binning :as binning]
@@ -133,13 +134,20 @@
       fetch-source-query/fetch-source-query
       store/initialize-store
       log-query/log-initial-query
-      ;; TODO - bind `*query*` here ?
+      ;; ▲▲▲ SYNC MIDDLEWARE ▲▲▲
+      ;;
+      ;; All middleware above this point is written in the synchronous 1-arg style. All middleware below is written in
+      ;; async 4-arg style. Eventually the entire QP middleware stack will be rewritten in the async style. But not yet
+      ;;
+      ;; ▼▼▼ ASYNC MIDDLEWARE ▼▼▼
+      async/async->sync
       cache/maybe-return-cached-results
       validate/validate-query
       normalize/normalize
       catch-exceptions/catch-exceptions
       process-userland-query/process-userland-query
-      constraints/add-default-userland-constraints))
+      constraints/add-default-userland-constraints
+      async/async-setup))
 ;; ▲▲▲ PRE-PROCESSING ▲▲▲ happens from BOTTOM-TO-TOP, e.g. the results of `expand-macros` are passed to
 ;; `substitute-parameters`
 
diff --git a/src/metabase/query_processor/middleware/async.clj b/src/metabase/query_processor/middleware/async.clj
new file mode 100644
index 00000000000..e5ce7503c70
--- /dev/null
+++ b/src/metabase/query_processor/middleware/async.clj
@@ -0,0 +1,53 @@
+(ns metabase.query-processor.middleware.async
+  "Middleware for implementing async QP behavior."
+  (:require [clojure.core.async :as a]
+            [clojure.tools.logging :as log]
+            [metabase.async.util :as async.u]
+            [metabase.util.i18n :refer [trs]]))
+
+;;; +----------------------------------------------------------------------------------------------------------------+
+;;; |                                                  async->sync                                                   |
+;;; +----------------------------------------------------------------------------------------------------------------+
+
+(defn async->sync
+  "Async-style (4-arg) middleware that wraps the synchronous (1-arg) portion of the QP middleware."
+  [qp]
+  (fn [query respond raise canceled-chan]
+    (if (a/poll! canceled-chan)
+      (log/debug (trs "Request already canceled, will not run synchronous QP code."))
+      (try
+        (respond (qp query))
+        (catch Throwable e
+          (raise e))))))
+
+
+;;; +----------------------------------------------------------------------------------------------------------------+
+;;; |                                                  async-setup                                                   |
+;;; +----------------------------------------------------------------------------------------------------------------+
+
+(defn async-setup
+  "Middleware that creates the output/canceled channels for the asynchronous (4-arg) QP middleware and runs it.
+
+  Our 4-arg middleware follows the same pattern as async 3-arg Ring middleware, with the addition of fourth
+  `canceled-chan` arg; this is a core.async channel that can be listened to to implement special query cancelation
+  behavior, such as canceling JDBC queries. If the output channel is closed before the query completes (i.e., API
+  request is canceled) this channel will receive a message; otherwise it will close whenever the output channel
+  closes."
+  [qp]
+  (fn [{:keys [async?], :as query}]
+    (let [out-chan      (a/promise-chan)
+          canceled-chan (async.u/promise-canceled-chan out-chan)
+          respond       (fn [result]
+                          (a/>!! out-chan result)
+                          (a/close! out-chan))
+          raise         (fn [e]
+                          (log/warn e (trs "Unhandled exception, exepected `catch-exceptions` middleware to handle it"))
+                          (respond e))]
+      (try
+        (qp query respond raise canceled-chan)
+        (catch Throwable e
+          (raise e)))
+      (let [result (a/<!! out-chan)]
+        (if (instance? Throwable result)
+          (throw result)
+          result)))))
diff --git a/src/metabase/query_processor/middleware/cache.clj b/src/metabase/query_processor/middleware/cache.clj
index c9ce68f0c27..ecbb6a41bdd 100644
--- a/src/metabase/query_processor/middleware/cache.clj
+++ b/src/metabase/query_processor/middleware/cache.clj
@@ -90,25 +90,25 @@
   (boolean (and (public-settings/enable-query-caching)
                 cache-ttl)))
 
-(defn- save-results-if-successful! [query-hash results]
-  (when (= (:status results) :completed)
-    (save-results! query-hash results)))
-
-(defn- run-query-and-save-results-if-successful! [query-hash qp query]
-  (let [start-time-ms (System/currentTimeMillis)
-        results       (qp query)
-        total-time-ms (- (System/currentTimeMillis) start-time-ms)
+(defn- save-results-if-successful! [query-hash start-time-ms {:keys [status], :as results}]
+  (let [total-time-ms (- (System/currentTimeMillis) start-time-ms)
         min-ttl-ms    (* (public-settings/query-caching-min-ttl) 1000)]
     (log/info (format "Query took %d ms to run; miminum for cache eligibility is %d ms" total-time-ms min-ttl-ms))
-    (when (>= total-time-ms min-ttl-ms)
-      (save-results-if-successful! query-hash results))
-    results))
+    (when (and (= status :completed)
+               (>= total-time-ms min-ttl-ms))
+      (save-results! query-hash results))))
 
-(defn- run-query-with-cache [qp {:keys [cache-ttl], :as query}]
-  ;; TODO - Query will already have `info.hash` if it's a userland query. I'm not 100% sure it will be the same hash.
+(defn- run-query-with-cache [qp {:keys [cache-ttl], :as query} respond raise canceled-chan]
+  ;; TODO - Query will already have `info.hash` if it's a userland query. I'm not 100% sure it will be the same hash,
+  ;; because this is calculated after normalization, instead of before
   (let [query-hash (qputil/query-hash query)]
-    (or (cached-results query-hash cache-ttl)
-        (run-query-and-save-results-if-successful! query-hash qp query))))
+    (if-let [cached-results (cached-results query-hash cache-ttl)]
+      (respond cached-results)
+      (let [start-time (System/currentTimeMillis)
+            respond    (fn [results]
+                         (save-results-if-successful! query-hash start-time results)
+                         (respond results))]
+        (qp query respond raise canceled-chan)))))
 
 (defn maybe-return-cached-results
   "Middleware for caching results of a query if applicable.
@@ -123,13 +123,13 @@
         running the query, satisfying this requirement.)
      *  The result *rows* of the query must be less than `query-caching-max-kb` when serialized (before compression)."
   [qp]
-  (fn [query]
+  (fn [query respond raise canceled-chan]
     (if-not (is-cacheable? query)
-      (qp query)
+      (qp query respond raise canceled-chan)
       ;; wait until we're actually going to use the cache before initializing the backend. We don't want to initialize
       ;; it when the files get compiled, because that would give it the wrong version of the
       ;; `IQueryProcessorCacheBackend` protocol
       (do
         (when-not @backend-instance
           (set-backend!))
-        (run-query-with-cache qp query)))))
+        (run-query-with-cache qp query respond raise canceled-chan)))))
diff --git a/src/metabase/query_processor/middleware/catch_exceptions.clj b/src/metabase/query_processor/middleware/catch_exceptions.clj
index 4a9b93f0530..64ed62457e0 100644
--- a/src/metabase/query_processor/middleware/catch_exceptions.clj
+++ b/src/metabase/query_processor/middleware/catch_exceptions.clj
@@ -85,8 +85,12 @@
 (defn catch-exceptions
   "Middleware for catching exceptions thrown by the query processor and returning them in a normal format."
   [qp]
-  (fn [query]
-    (try
-      (qp query)
-      (catch Throwable e
-        (format-exception query e)))))
+  ;; we're not using the version of `raise` passed in on purpose here -- that one is a placeholder -- this is the
+  ;; implementation of `raise` we expect most QP middleware to ultimately use
+  (fn [query respond _ canceled-chan]
+    (let [raise (fn [e]
+                  (respond (format-exception query e)))]
+      (try
+        (qp query respond raise canceled-chan)
+        (catch Throwable e
+          (raise e))))))
diff --git a/src/metabase/query_processor/middleware/constraints.clj b/src/metabase/query_processor/middleware/constraints.clj
index 94b222c0e90..3aaf520c908 100644
--- a/src/metabase/query_processor/middleware/constraints.clj
+++ b/src/metabase/query_processor/middleware/constraints.clj
@@ -33,4 +33,5 @@
   "Middleware that optionally adds default `max-results` and `max-results-bare-rows` constraints to queries, meant for
   use with `process-query-and-save-with-max-results-constraints!`, which ultimately powers most QP API endpoints."
   [qp]
-  (comp qp add-default-userland-constraints*))
+  (fn [query respond raise canceled-chan]
+    (qp (add-default-userland-constraints* query) respond raise canceled-chan)))
diff --git a/src/metabase/query_processor/middleware/normalize_query.clj b/src/metabase/query_processor/middleware/normalize_query.clj
index fbd576b52d2..3a1d7f076af 100644
--- a/src/metabase/query_processor/middleware/normalize_query.clj
+++ b/src/metabase/query_processor/middleware/normalize_query.clj
@@ -7,4 +7,5 @@
   into standard `lisp-case` ones, removing/rewriting legacy clauses, removing empty ones, etc. This is done to
   simplifiy the logic in the QP steps following this."
   [qp]
-  (comp qp normalize/normalize))
+  (fn [query respond raise canceled-chan]
+    (qp (normalize/normalize query) respond raise canceled-chan)))
diff --git a/src/metabase/query_processor/middleware/process_userland_query.clj b/src/metabase/query_processor/middleware/process_userland_query.clj
index d2f8f44d857..18b770d2cf4 100644
--- a/src/metabase/query_processor/middleware/process_userland_query.clj
+++ b/src/metabase/query_processor/middleware/process_userland_query.clj
@@ -133,9 +133,10 @@
   "Do extra handling 'userland' queries (i.e. ones ran as a result of a user action, e.g. an API call, scheduled Pulse,
   etc.). This includes recording QueryExecution entries and returning the results in an FE-client-friendly format."
   [qp]
-  (fn [{{:keys [userland-query?]} :middleware, :as query}]
+  (fn [{{:keys [userland-query?]} :middleware, :as query} respond raise canceled-chan]
     (if-not userland-query?
-      (qp query)
+      (qp query respond raise canceled-chan)
       ;; add calculated hash to query
-      (let [query (assoc-in query [:info :query-hash] (qputil/query-hash query))]
-        (format-userland-query-result (query-execution-info query) (qp query))))))
+      (let [query   (assoc-in query [:info :query-hash] (qputil/query-hash query))
+            respond (comp respond (partial format-userland-query-result (query-execution-info query)))]
+        (qp query respond raise canceled-chan)))))
diff --git a/src/metabase/query_processor/middleware/validate.clj b/src/metabase/query_processor/middleware/validate.clj
index 5c914a4b6ad..7e0e965365b 100644
--- a/src/metabase/query_processor/middleware/validate.clj
+++ b/src/metabase/query_processor/middleware/validate.clj
@@ -5,4 +5,6 @@
 (defn validate-query
   "Middleware that validates a query immediately after normalization."
   [qp]
-  (comp qp mbql.s/validate-query))
+  (fn [query respond raise canceled-chan]
+    (mbql.s/validate-query query)
+    (qp query respond raise canceled-chan)))
diff --git a/test/metabase/async/util_test.clj b/test/metabase/async/util_test.clj
index 4ea34c2dc48..b0d9aaae7ac 100644
--- a/test/metabase/async/util_test.clj
+++ b/test/metabase/async/util_test.clj
@@ -4,6 +4,33 @@
             [metabase.async.util :as async.u]
             [metabase.test.util.async :as tu.async]))
 
+(expect true  (async.u/promise-chan? (a/promise-chan)))
+(expect false (async.u/promise-chan? (a/chan 1)))
+(expect false (async.u/promise-chan? (a/chan)))
+(expect false (async.u/promise-chan? nil))
+(expect false (async.u/promise-chan? "ABC"))
+
+;;; --------------------------------------------- promise-canceled-chan ----------------------------------------------
+
+;; make sure the canceled chan gets a message if the promise-chan it wraps closes before anything is written to it
+(expect
+  ::async.u/canceled
+  (tu.async/with-open-channels [chan (a/promise-chan)]
+    (let [canceled-chan (async.u/promise-canceled-chan chan)]
+      (a/close! chan)
+      (first (a/alts!! [canceled-chan (a/timeout 1000)])))))
+
+;; canceled-chan should close with no message if the channel it wraps gets a message before it closes
+(expect
+  {:val nil, :canceled-chan? true}
+  (tu.async/with-open-channels [chan (a/promise-chan)]
+    (let [canceled-chan (async.u/promise-canceled-chan chan)]
+      (a/>!! chan "message")
+      (a/close! chan)
+      (let [[val port] (a/alts!! [canceled-chan (a/timeout 1000)])]
+        {:val val, :canceled-chan? (= port canceled-chan)}))))
+
+
 ;;; ----------------------------------------------- single-value-pipe ------------------------------------------------
 
 ;; make sure `single-value-pipe` pipes a value from in-chan to out-chan
diff --git a/test/metabase/query_processor/middleware/cache_test.clj b/test/metabase/query_processor/middleware/cache_test.clj
index d6ed035f8a3..ad6052ef62a 100644
--- a/test/metabase/query_processor/middleware/cache_test.clj
+++ b/test/metabase/query_processor/middleware/cache_test.clj
@@ -1,6 +1,6 @@
 (ns metabase.query-processor.middleware.cache-test
   "Tests for the Query Processor cache."
-  (:require [expectations :refer :all]
+  (:require [expectations :refer [expect]]
             [metabase.models.query-cache :refer [QueryCache]]
             [metabase.query-processor.middleware.cache :as cache]
             [metabase.test.util :as tu]
@@ -20,9 +20,9 @@
 
 (def ^:private ^:dynamic ^Integer *query-execution-delay-ms* 0)
 
-(defn- mock-qp [& _]
+(defn- mock-qp [_ respond _ _]
   (Thread/sleep *query-execution-delay-ms*)
-  mock-results)
+  (respond mock-results))
 
 (def ^:private maybe-return-cached-results (cache/maybe-return-cached-results mock-qp))
 
@@ -34,7 +34,12 @@
     :not-cached))
 
 (defn- run-query [& {:as query-kvs}]
-  (cached? (maybe-return-cached-results (merge {:cache-ttl 60, :query :abc} query-kvs))))
+  (cached?
+   (maybe-return-cached-results
+    (merge {:cache-ttl 60, :query :abc} query-kvs)
+    identity
+    (fn [e] (throw e))
+    nil)))
 
 
 ;;; -------------------------------------------- tests for is-cacheable? ---------------------------------------------
diff --git a/test/metabase/query_processor/middleware/catch_exceptions_test.clj b/test/metabase/query_processor/middleware/catch_exceptions_test.clj
index 8d5756a7fae..20708d179da 100644
--- a/test/metabase/query_processor/middleware/catch_exceptions_test.clj
+++ b/test/metabase/query_processor/middleware/catch_exceptions_test.clj
@@ -2,15 +2,43 @@
   (:require [expectations :refer [expect]]
             [metabase.query-processor.middleware.catch-exceptions :as catch-exceptions]))
 
+(defn- catch-exceptions
+  ([qp]
+   (catch-exceptions qp {}))
+  ([qp query]
+   ((catch-exceptions/catch-exceptions qp)
+    query
+    identity
+    identity
+    nil)))
+
+;; No Exception -- should return response as-is
 (expect
   {}
-  ((catch-exceptions/catch-exceptions identity) {}))
+  (catch-exceptions
+   (fn [query respond _ _]
+     (respond query))))
+
+;; if the QP throws an Exception (synchronously), should format the response appropriately
+(expect
+  {:status     :failed
+   :class      java.lang.Exception
+   :error      "Something went wrong"
+   :stacktrace true
+   :query      {}}
+  (-> (catch-exceptions
+       (fn [& _]
+         (throw (Exception. "Something went wrong"))))
+      (update :stacktrace boolean)))
 
+;; if an Exception is returned asynchronously by `raise`, should format it the same way
 (expect
   {:status     :failed
    :class      java.lang.Exception
    :error      "Something went wrong"
    :stacktrace true
    :query      {}}
-  (-> ((catch-exceptions/catch-exceptions (fn [_] (throw (Exception. "Something went wrong")))) {})
+  (-> (catch-exceptions
+       (fn [_ _ raise _]
+         (raise (Exception. "Something went wrong"))))
       (update :stacktrace boolean)))
-- 
GitLab