Skip to content
Snippets Groups Projects
Unverified Commit e8692e13 authored by Cam Saul's avatar Cam Saul
Browse files

Async QP middleware pattern :twisted_rightwards_arrows:

parent a1d2f25c
No related branches found
No related tags found
No related merge requests found
Showing
with 193 additions and 39 deletions
(ns metabase.async.util
"Utility functions for core.async-based async logic."
(:require [clojure.core.async :as a]
[clojure.tools.logging :as log]
[metabase.util.i18n :refer [trs]]
[schema.core :as s])
(:import clojure.core.async.impl.channels.ManyToManyChannel))
(:import clojure.core.async.impl.buffers.PromiseBuffer
clojure.core.async.impl.channels.ManyToManyChannel))
(defn promise-chan?
"Is core.async `chan` a `promise-chan`?"
[chan]
(and (instance? ManyToManyChannel chan)
(instance? PromiseBuffer (.buf ^ManyToManyChannel chan))))
(def PromiseChan
"Schema for a core.async promise channel."
(s/constrained ManyToManyChannel promise-chan? "promise chan"))
(s/defn promise-canceled-chan :- PromiseChan
"Given a `promise-chan`, return a new channel that will receive a single message if `promise-chan` is closed before
a message is written to it (i.e. if an API request is canceled). Automatically closes after `promise-chan` receives
a message or is closed."
[promise-chan :- PromiseChan]
(let [canceled-chan (a/promise-chan)]
(a/go
(when (nil? (a/<! promise-chan))
(a/>! canceled-chan ::canceled))
(a/close! canceled-chan))
canceled-chan))
(s/defn single-value-pipe :- ManyToManyChannel
"Pipe that will forward a single message from `in-chan` to `out-chan`, closing both afterward. If `out-chan` is closed
......
......@@ -11,6 +11,7 @@
[add-row-count-and-status :as row-count-and-status]
[add-settings :as add-settings]
[annotate :as annotate]
[async :as async]
[auto-bucket-datetimes :as bucket-datetime]
[bind-effective-timezone :as bind-timezone]
[binning :as binning]
......@@ -133,13 +134,20 @@
fetch-source-query/fetch-source-query
store/initialize-store
log-query/log-initial-query
;; TODO - bind `*query*` here ?
;; ▲▲▲ SYNC MIDDLEWARE ▲▲▲
;;
;; All middleware above this point is written in the synchronous 1-arg style. All middleware below is written in
;; async 4-arg style. Eventually the entire QP middleware stack will be rewritten in the async style. But not yet
;;
;; ▼▼▼ ASYNC MIDDLEWARE ▼▼▼
async/async->sync
cache/maybe-return-cached-results
validate/validate-query
normalize/normalize
catch-exceptions/catch-exceptions
process-userland-query/process-userland-query
constraints/add-default-userland-constraints))
constraints/add-default-userland-constraints
async/async-setup))
;; ▲▲▲ PRE-PROCESSING ▲▲▲ happens from BOTTOM-TO-TOP, e.g. the results of `expand-macros` are passed to
;; `substitute-parameters`
......
(ns metabase.query-processor.middleware.async
"Middleware for implementing async QP behavior."
(:require [clojure.core.async :as a]
[clojure.tools.logging :as log]
[metabase.async.util :as async.u]
[metabase.util.i18n :refer [trs]]))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | async->sync |
;;; +----------------------------------------------------------------------------------------------------------------+
(defn async->sync
"Async-style (4-arg) middleware that wraps the synchronous (1-arg) portion of the QP middleware."
[qp]
(fn [query respond raise canceled-chan]
(if (a/poll! canceled-chan)
(log/debug (trs "Request already canceled, will not run synchronous QP code."))
(try
(respond (qp query))
(catch Throwable e
(raise e))))))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | async-setup |
;;; +----------------------------------------------------------------------------------------------------------------+
(defn async-setup
"Middleware that creates the output/canceled channels for the asynchronous (4-arg) QP middleware and runs it.
Our 4-arg middleware follows the same pattern as async 3-arg Ring middleware, with the addition of fourth
`canceled-chan` arg; this is a core.async channel that can be listened to to implement special query cancelation
behavior, such as canceling JDBC queries. If the output channel is closed before the query completes (i.e., API
request is canceled) this channel will receive a message; otherwise it will close whenever the output channel
closes."
[qp]
(fn [{:keys [async?], :as query}]
(let [out-chan (a/promise-chan)
canceled-chan (async.u/promise-canceled-chan out-chan)
respond (fn [result]
(a/>!! out-chan result)
(a/close! out-chan))
raise (fn [e]
(log/warn e (trs "Unhandled exception, exepected `catch-exceptions` middleware to handle it"))
(respond e))]
(try
(qp query respond raise canceled-chan)
(catch Throwable e
(raise e)))
(let [result (a/<!! out-chan)]
(if (instance? Throwable result)
(throw result)
result)))))
......@@ -90,25 +90,25 @@
(boolean (and (public-settings/enable-query-caching)
cache-ttl)))
(defn- save-results-if-successful! [query-hash results]
(when (= (:status results) :completed)
(save-results! query-hash results)))
(defn- run-query-and-save-results-if-successful! [query-hash qp query]
(let [start-time-ms (System/currentTimeMillis)
results (qp query)
total-time-ms (- (System/currentTimeMillis) start-time-ms)
(defn- save-results-if-successful! [query-hash start-time-ms {:keys [status], :as results}]
(let [total-time-ms (- (System/currentTimeMillis) start-time-ms)
min-ttl-ms (* (public-settings/query-caching-min-ttl) 1000)]
(log/info (format "Query took %d ms to run; miminum for cache eligibility is %d ms" total-time-ms min-ttl-ms))
(when (>= total-time-ms min-ttl-ms)
(save-results-if-successful! query-hash results))
results))
(when (and (= status :completed)
(>= total-time-ms min-ttl-ms))
(save-results! query-hash results))))
(defn- run-query-with-cache [qp {:keys [cache-ttl], :as query}]
;; TODO - Query will already have `info.hash` if it's a userland query. I'm not 100% sure it will be the same hash.
(defn- run-query-with-cache [qp {:keys [cache-ttl], :as query} respond raise canceled-chan]
;; TODO - Query will already have `info.hash` if it's a userland query. I'm not 100% sure it will be the same hash,
;; because this is calculated after normalization, instead of before
(let [query-hash (qputil/query-hash query)]
(or (cached-results query-hash cache-ttl)
(run-query-and-save-results-if-successful! query-hash qp query))))
(if-let [cached-results (cached-results query-hash cache-ttl)]
(respond cached-results)
(let [start-time (System/currentTimeMillis)
respond (fn [results]
(save-results-if-successful! query-hash start-time results)
(respond results))]
(qp query respond raise canceled-chan)))))
(defn maybe-return-cached-results
"Middleware for caching results of a query if applicable.
......@@ -123,13 +123,13 @@
running the query, satisfying this requirement.)
* The result *rows* of the query must be less than `query-caching-max-kb` when serialized (before compression)."
[qp]
(fn [query]
(fn [query respond raise canceled-chan]
(if-not (is-cacheable? query)
(qp query)
(qp query respond raise canceled-chan)
;; wait until we're actually going to use the cache before initializing the backend. We don't want to initialize
;; it when the files get compiled, because that would give it the wrong version of the
;; `IQueryProcessorCacheBackend` protocol
(do
(when-not @backend-instance
(set-backend!))
(run-query-with-cache qp query)))))
(run-query-with-cache qp query respond raise canceled-chan)))))
......@@ -85,8 +85,12 @@
(defn catch-exceptions
"Middleware for catching exceptions thrown by the query processor and returning them in a normal format."
[qp]
(fn [query]
(try
(qp query)
(catch Throwable e
(format-exception query e)))))
;; we're not using the version of `raise` passed in on purpose here -- that one is a placeholder -- this is the
;; implementation of `raise` we expect most QP middleware to ultimately use
(fn [query respond _ canceled-chan]
(let [raise (fn [e]
(respond (format-exception query e)))]
(try
(qp query respond raise canceled-chan)
(catch Throwable e
(raise e))))))
......@@ -33,4 +33,5 @@
"Middleware that optionally adds default `max-results` and `max-results-bare-rows` constraints to queries, meant for
use with `process-query-and-save-with-max-results-constraints!`, which ultimately powers most QP API endpoints."
[qp]
(comp qp add-default-userland-constraints*))
(fn [query respond raise canceled-chan]
(qp (add-default-userland-constraints* query) respond raise canceled-chan)))
......@@ -7,4 +7,5 @@
into standard `lisp-case` ones, removing/rewriting legacy clauses, removing empty ones, etc. This is done to
simplifiy the logic in the QP steps following this."
[qp]
(comp qp normalize/normalize))
(fn [query respond raise canceled-chan]
(qp (normalize/normalize query) respond raise canceled-chan)))
......@@ -133,9 +133,10 @@
"Do extra handling 'userland' queries (i.e. ones ran as a result of a user action, e.g. an API call, scheduled Pulse,
etc.). This includes recording QueryExecution entries and returning the results in an FE-client-friendly format."
[qp]
(fn [{{:keys [userland-query?]} :middleware, :as query}]
(fn [{{:keys [userland-query?]} :middleware, :as query} respond raise canceled-chan]
(if-not userland-query?
(qp query)
(qp query respond raise canceled-chan)
;; add calculated hash to query
(let [query (assoc-in query [:info :query-hash] (qputil/query-hash query))]
(format-userland-query-result (query-execution-info query) (qp query))))))
(let [query (assoc-in query [:info :query-hash] (qputil/query-hash query))
respond (comp respond (partial format-userland-query-result (query-execution-info query)))]
(qp query respond raise canceled-chan)))))
......@@ -5,4 +5,6 @@
(defn validate-query
"Middleware that validates a query immediately after normalization."
[qp]
(comp qp mbql.s/validate-query))
(fn [query respond raise canceled-chan]
(mbql.s/validate-query query)
(qp query respond raise canceled-chan)))
......@@ -4,6 +4,33 @@
[metabase.async.util :as async.u]
[metabase.test.util.async :as tu.async]))
(expect true (async.u/promise-chan? (a/promise-chan)))
(expect false (async.u/promise-chan? (a/chan 1)))
(expect false (async.u/promise-chan? (a/chan)))
(expect false (async.u/promise-chan? nil))
(expect false (async.u/promise-chan? "ABC"))
;;; --------------------------------------------- promise-canceled-chan ----------------------------------------------
;; make sure the canceled chan gets a message if the promise-chan it wraps closes before anything is written to it
(expect
::async.u/canceled
(tu.async/with-open-channels [chan (a/promise-chan)]
(let [canceled-chan (async.u/promise-canceled-chan chan)]
(a/close! chan)
(first (a/alts!! [canceled-chan (a/timeout 1000)])))))
;; canceled-chan should close with no message if the channel it wraps gets a message before it closes
(expect
{:val nil, :canceled-chan? true}
(tu.async/with-open-channels [chan (a/promise-chan)]
(let [canceled-chan (async.u/promise-canceled-chan chan)]
(a/>!! chan "message")
(a/close! chan)
(let [[val port] (a/alts!! [canceled-chan (a/timeout 1000)])]
{:val val, :canceled-chan? (= port canceled-chan)}))))
;;; ----------------------------------------------- single-value-pipe ------------------------------------------------
;; make sure `single-value-pipe` pipes a value from in-chan to out-chan
......
(ns metabase.query-processor.middleware.cache-test
"Tests for the Query Processor cache."
(:require [expectations :refer :all]
(:require [expectations :refer [expect]]
[metabase.models.query-cache :refer [QueryCache]]
[metabase.query-processor.middleware.cache :as cache]
[metabase.test.util :as tu]
......@@ -20,9 +20,9 @@
(def ^:private ^:dynamic ^Integer *query-execution-delay-ms* 0)
(defn- mock-qp [& _]
(defn- mock-qp [_ respond _ _]
(Thread/sleep *query-execution-delay-ms*)
mock-results)
(respond mock-results))
(def ^:private maybe-return-cached-results (cache/maybe-return-cached-results mock-qp))
......@@ -34,7 +34,12 @@
:not-cached))
(defn- run-query [& {:as query-kvs}]
(cached? (maybe-return-cached-results (merge {:cache-ttl 60, :query :abc} query-kvs))))
(cached?
(maybe-return-cached-results
(merge {:cache-ttl 60, :query :abc} query-kvs)
identity
(fn [e] (throw e))
nil)))
;;; -------------------------------------------- tests for is-cacheable? ---------------------------------------------
......
......@@ -2,15 +2,43 @@
(:require [expectations :refer [expect]]
[metabase.query-processor.middleware.catch-exceptions :as catch-exceptions]))
(defn- catch-exceptions
([qp]
(catch-exceptions qp {}))
([qp query]
((catch-exceptions/catch-exceptions qp)
query
identity
identity
nil)))
;; No Exception -- should return response as-is
(expect
{}
((catch-exceptions/catch-exceptions identity) {}))
(catch-exceptions
(fn [query respond _ _]
(respond query))))
;; if the QP throws an Exception (synchronously), should format the response appropriately
(expect
{:status :failed
:class java.lang.Exception
:error "Something went wrong"
:stacktrace true
:query {}}
(-> (catch-exceptions
(fn [& _]
(throw (Exception. "Something went wrong"))))
(update :stacktrace boolean)))
;; if an Exception is returned asynchronously by `raise`, should format it the same way
(expect
{:status :failed
:class java.lang.Exception
:error "Something went wrong"
:stacktrace true
:query {}}
(-> ((catch-exceptions/catch-exceptions (fn [_] (throw (Exception. "Something went wrong")))) {})
(-> (catch-exceptions
(fn [_ _ raise _]
(raise (Exception. "Something went wrong"))))
(update :stacktrace boolean)))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment