Skip to content
Snippets Groups Projects
Unverified Commit 9c235d60 authored by Cam Saul's avatar Cam Saul Committed by GitHub
Browse files

Simplify QP cache middleware (#20310)

* Simplify QP cache middleware

* Fix hanging tests
parent 9cc38ac4
No related branches found
No related tags found
No related merge requests found
......@@ -219,11 +219,11 @@
(apply (qp) args))
(qp))))
(def ^{:arglists '([query] [query context])} process-query-async
(def ^{:arglists '([query] [query context] [query rff context])} process-query-async
"Process a query asynchronously, returning a `core.async` channel that is called with the final result (or Throwable)."
(base-qp default-middleware))
(def ^{:arglists '([query] [query context])} process-query-sync
(def ^{:arglists '([query] [query context] [query rff context])} process-query-sync
"Process a query synchronously, blocking until results are returned. Throws raised Exceptions directly."
(qp.reducible/sync-qp process-query-async))
......@@ -231,7 +231,7 @@
"Process an MBQL query. This is the main entrypoint to the magical realm of the Query Processor. Returns a *single*
core.async channel if option `:async?` is true; otherwise returns results in the usual format. For async queries, if
the core.async channel is closed, the query will be canceled."
{:arglists '([query] [query context])}
{:arglists '([query] [query context] [query rff context])}
[{:keys [async?], :as query} & args]
(apply (if async? process-query-async process-query-sync)
query
......
(ns metabase.query-processor.context
"Interface for the QP context/utility functions for using the things in the context correctly.
The default implementations of all these functions live in `metabase.query-processor.context.default`; refer to
those when overriding individual functions. Some wiring for the `core.async` channels takes place in
`metabase.query-processor.reducible.`"
The default implementations of all these functions live in [[metabase.query-processor.context.default]]; refer to
those when overriding individual functions. Some wiring for the [[clojure.core.async]] channels takes place in
[[metabase.query-processor.reducible]]."
(:require [metabase.async.util :as async.u]))
(defn raisef
......@@ -28,17 +28,17 @@
;; [message sent to canceled chan]
;;
;; 1. Query normally runs thru middleware and then a series of context functions as described above; result is sent thru
;; `resultf` and finally to `out-chan`
;; [[resultf]] and finally to [[out-chan]]
;;
;; 2. If an `Exception` is thrown, it is sent thru `raisef`, `resultf` and finally to `out-chan`
;; 2. If an `Exception` is thrown, it is sent thru [[raisef]], [[resultf]] and finally to [[out-chan]]
;;
;; 3. If the query times out, `timeoutf` throws an Exception
;;
;; 4. If the query is canceled (either by closing `out-chan` before it gets a result, or by sending `canceled-chan` a
;; message), the execution is canceled and `out-chan` is closed (if not already closed).
;; 4. If the query is canceled (either by closing [[out-chan]] before it gets a result, or by sending [[canceled-chan]]
;; a message), the execution is canceled and [[out-chan]] is closed (if not already closed).
(defn runf
"Called by pivot fn to run preprocessed query. Normally, this simply calls `executef`, but you can override this for
test purposes. The result of this function is ignored."
"Called by the [[metabase.query-processor.reducible/identity-qp]] fn to run preprocessed query. Normally, this simply
calls [[executef]], but you can override this for test purposes. The result of this function is ignored."
{:arglists '([query rff context])}
[query rff {runf* :runf, :as context}]
{:pre [(fn? runf*)]}
......@@ -46,12 +46,12 @@
nil)
(defn executef
"Called by `runf` to have driver run query. By default, `driver/execute-reducible-query`. `respond` is a callback with
the signature:
"Called by [[runf]] to have driver run query. By default, [[metabase.driver/execute-reducible-query]]. `respond` is a
callback with the signature:
(respond results-metadata reducible-rows)
The implementation of `executef` should call `respond` with this information once it is available. The result of
The implementation of [[executef]] should call `respond` with this information once it is available. The result of
this function is ignored."
{:arglists '([driver query context respond])}
[driver query {executef* :executef, :as context} respond]
......@@ -60,9 +60,9 @@
nil)
(defn reducef
"Called by `runf` (inside the `respond` callback provided by it) to reduce results of query. `reducedf` is called with
the reduced results. The actual output of this function is ignored, but the entire result set must be reduced and
passed to `reducedf` before this function completes."
"Called by [[runf]] (inside the `respond` callback provided by it) to reduce results of query. [[reducedf]] is called
with the reduced results. The actual output of this function is ignored, but the entire result set must be reduced
and passed to [[reducedf]] before this function completes."
{:arglists '([rff context metadata reducible-rows])}
[rff {reducef* :reducef, :as context} metadata reducible-rows]
{:pre [(fn? reducef*)]}
......@@ -70,7 +70,7 @@
nil)
(defn reducedf
"Called in `reducedf` with fully reduced results. This result is passed to `resultf`."
"Called in [[reducedf]] with fully reduced results. This result is passed to [[resultf]]."
{:arglists '([metadata reduced-rows context])}
[metadata reduced-rows {reducedf* :reducedf, :as context}]
{:pre [(fn? reducedf*)]}
......@@ -84,7 +84,7 @@
(timeoutf* context))
(defn resultf
"Called exactly once with the final result, which is the result of either `reducedf` or `raisef`."
"Called exactly once with the final result, which is the result of either [[reducedf]] or [[raisef]]."
{:arglists '([result context])}
[result {resultf* :resultf, :as context}]
{:pre [(fn? resultf*)]}
......
......@@ -95,7 +95,8 @@
(defn default-context
"Return a new context for executing queries using the default values. These can be overrided as needed."
[]
{:timeout query-timeout-ms
{::complete? true
:timeout query-timeout-ms
:rff default-rff
:raisef default-raisef
:runf default-runf
......
......@@ -10,8 +10,7 @@
The default backend is `db`, which uses the application database; this value can be changed by setting the env var
`MB_QP_CACHE_BACKEND`. Refer to [[metabase.query-processor.middleware.cache-backend.interface]] for more details
about how the cache backends themselves."
(:require [clojure.core.async :as a]
[clojure.tools.logging :as log]
(:require [clojure.tools.logging :as log]
[java-time :as t]
[medley.core :as m]
[metabase.config :as config]
......@@ -44,66 +43,76 @@
(try
(log/tracef "Purging cache entries older than %s" (u/format-seconds (public-settings/query-caching-max-ttl)))
(i/purge-old-entries! backend (public-settings/query-caching-max-ttl))
(log/trace "Successfully purged old cache entries.")
:done
(catch Throwable e
(log/error e (trs "Error purging old cache entries")))))
(log/error e (trs "Error purging old cache entries: {0}" (ex-message e))))))
(defn- min-duration-ms
"Minimum duration it must take a query to complete in order for it to be eligible for caching."
[]
(* (public-settings/query-caching-min-ttl) 1000))
(defn- cache-results-async!
"Save the results of a query asynchronously once they are delivered (as a byte array) to the promise channel
`out-chan`."
[query-hash out-chan]
(log/info (trs "Caching results for next time for query with hash {0}." (pr-str (i/short-hex-hash query-hash))) (u/emoji "💾"))
(a/go
(let [x (a/<! out-chan)]
(condp instance? x
Throwable
(if (= (:type (ex-data x)) ::impl/max-bytes)
(log/debug x (trs "Not caching results: results are larger than {0} KB" (public-settings/query-caching-max-kb)))
(log/error x (trs "Error saving query results to cache.")))
(Class/forName "[B")
(let [y (a/<! (a/thread
(try
(i/save-results! *backend* query-hash x)
(catch Throwable e
e))))]
(if (instance? Throwable y)
(log/error y (trs "Error saving query results to cache."))
(do
(log/debug (trs "Successfully cached results for query."))
(purge! *backend*))))
(log/error (trs "Cannot cache results: expected byte array, got {0}" (class x)))))))
(def ^:private ^:dynamic *in-fn*
"The `in-fn` provided by [[impl/do-with-serialization]]."
nil)
(defn- add-object-to-cache!
"Add `object` (e.g. a result row or metadata) to the current cache entry."
[object]
(when *in-fn*
(*in-fn* object)))
(def ^:private ^:dynamic *result-fn*
"The `result-fn` provided by [[impl/do-with-serialization]]."
nil)
(defn- serialized-bytes []
(when *result-fn*
(*result-fn*)))
(defn- cache-results!
"Save the final results of a query."
[query-hash]
(log/info (trs "Caching results for next time for query with hash {0}."
(pr-str (i/short-hex-hash query-hash))) (u/emoji "💾"))
(try
(let [bytez (serialized-bytes)]
(if-not (instance? (Class/forName "[B") bytez)
(log/error (trs "Cannot cache results: expected byte array, got {0}" (class bytez)))
(do
(log/trace "Got serialized bytes; saving to cache backend")
(i/save-results! *backend* query-hash bytez)
(log/debug "Successfully cached results for query.")
(purge! *backend*))))
:done
(catch Throwable e
(if (= (:type (ex-data e)) ::impl/max-bytes)
(log/debug e (trs "Not caching results: results are larger than {0} KB" (public-settings/query-caching-max-kb)))
(log/error e (trs "Error saving query results to cache: {0}" (ex-message e)))))))
(defn- save-results-xform [start-time metadata query-hash rf]
(let [{:keys [in-chan out-chan]} (impl/serialize-async)
has-rows? (volatile! false)]
(a/put! in-chan (assoc metadata
:cache-version cache-version
:last-ran (t/zoned-date-time)))
(let [has-rows? (volatile! false)]
(add-object-to-cache! (assoc metadata
:cache-version cache-version
:last-ran (t/zoned-date-time)))
(fn
([] (rf))
([result]
(a/put! in-chan (if (map? result)
(m/dissoc-in result [:data :rows])
{}))
(a/close! in-chan)
(add-object-to-cache! (if (map? result)
(m/dissoc-in result [:data :rows])
{}))
(let [duration-ms (- (System/currentTimeMillis) start-time)]
(log/info (trs "Query took {0} to run; minimum for cache eligibility is {1}"
(u/format-milliseconds duration-ms) (u/format-milliseconds (min-duration-ms))))
(when (and @has-rows?
(> duration-ms (min-duration-ms)))
(cache-results-async! query-hash out-chan)))
(cache-results! query-hash)))
(rf result))
([acc row]
;; Blocking so we don't exceed async's MAX-QUEUE-SIZE when transducing a large result set
(a/>!! in-chan row)
(add-object-to-cache! row)
(vreset! has-rows? true)
(rf acc row)))))
......@@ -159,15 +168,15 @@
(log/debug (trs "Request is closed; no one to return cached results to"))
::canceled)
(catch Throwable e
(log/error e (trs "Error attempting to fetch cached results for query with hash {0}"
(i/short-hex-hash query-hash)))
(log/error e (trs "Error attempting to fetch cached results for query with hash {0}: {1}"
(i/short-hex-hash query-hash) (ex-message e)))
::miss)))
;;; --------------------------------------------------- Middleware ---------------------------------------------------
(defn- run-query-with-cache
[qp {:keys [cache-ttl middleware], :as query} rff context]
[qp {:keys [cache-ttl middleware], :as query} rff {:keys [reducef], :as context}]
;; TODO - Query will already have `info.hash` if it's a userland query. I'm not 100% sure it will be the same hash,
;; because this is calculated after normalization, instead of before
(let [query-hash (qputil/query-hash query)
......@@ -175,10 +184,16 @@
(when (= result ::miss)
(let [start-time-ms (System/currentTimeMillis)]
(log/trace "Running query and saving cached results (if eligible)...")
(qp query
(fn [metadata]
(save-results-xform start-time-ms metadata query-hash (rff metadata)))
context)))))
(let [reducef' (fn [rff context metadata rows]
(impl/do-with-serialization
(fn [in-fn result-fn]
(binding [*in-fn* in-fn
*result-fn* result-fn]
(reducef rff context metadata rows)))))]
(qp query
(fn [metadata]
(save-results-xform start-time-ms metadata query-hash (rff metadata)))
(assoc context :reducef reducef')))))))
(defn- is-cacheable? {:arglists '([query])} [{:keys [cache-ttl]}]
(and (public-settings/enable-query-caching)
......@@ -197,7 +212,7 @@
running the query, satisfying this requirement.)
* The result *rows* of the query must be less than `query-caching-max-kb` when serialized (before compression)."
[qp]
(fn [query rff context]
(fn maybe-return-cached-results* [query rff context]
(let [cacheable? (is-cacheable? query)]
(log/tracef "Query is cacheable? %s" (boolean cacheable?))
(if cacheable?
......
(ns metabase.query-processor.middleware.cache.impl
(:require [clojure.core.async :as a]
[clojure.tools.logging :as log]
(:require [clojure.tools.logging :as log]
[metabase.public-settings :as public-settings]
[metabase.util :as u]
[metabase.util.i18n :refer [trs tru]]
[metabase.util.i18n :refer [trs]]
[taoensso.nippy :as nippy])
(:import [java.io BufferedInputStream BufferedOutputStream ByteArrayOutputStream DataInputStream DataOutputStream
EOFException FilterOutputStream InputStream OutputStream]
......@@ -31,95 +30,58 @@
(check-total (swap! byte-count + len))
(.write os ba off len))))))
(def ^:private serialization-timeout-ms (u/minutes->ms 10))
(defn- start-out-chan-close-block!
"When `out-chan` closes, close everything. Wait up to 10 minutes for `out-chan` to close, and throw an Exception if
it's not done by then."
[in-chan out-chan ^ByteArrayOutputStream bos ^DataOutputStream os]
(a/go
(let [timeout-chan (a/timeout serialization-timeout-ms)
[_val port] (a/alts! [out-chan timeout-chan])]
(when (= port timeout-chan)
(a/>! out-chan (ex-info (tru "Serialization timed out after {0}." (u/format-milliseconds serialization-timeout-ms))
{}))))
(log/tracef "Closing core.async channels and output streams.")
(try
;; don't really need to close both, probably
(.close os)
(.close bos)
(catch Throwable e
(a/>! out-chan e)))
(a/close! out-chan)
(a/close! in-chan)))
(defn- freeze!
[^OutputStream os obj]
(try
(nippy/freeze-to-out! os obj)
(.flush os)
:ok
(catch Throwable e
e)))
(defn- start-input-loop!
"Listen for things sent to `in-chan`. When we get an object to `in-chan`, write it to the ouput stream (async), then
recur and wait for the next obj. When `in-chan` is closed, write the bytes to `out-chan` (async).
If serialization fails, writes thrown Exception to `out-chan`."
[in-chan out-chan ^ByteArrayOutputStream bos ^DataOutputStream os]
(a/go-loop []
;; we got a result
(if-let [obj (a/<! in-chan)]
(do
(log/tracef "Serializing %s" (pr-str obj))
(let [result (a/<! (a/thread (freeze! os obj)))]
(if (instance? Throwable result)
(do
;; Serialization has failed, close the channel as there's no point in continuing writing to it
(a/close! in-chan)
;; Drain the channel to unblock
(while (a/poll! in-chan))
(a/>! out-chan result))
(recur))))
;; `in-chan` is closed
(a/thread
(try
(.flush os)
(let [result (.toByteArray bos)]
(a/>!! out-chan result))
(catch Throwable e
(a/>!! out-chan e)))))))
(defn serialize-async
"Create output streams for serializing QP results. Returns a map of core.async channels, `in-chan` and `out-chan`.
Send all objects to be serialized to `in-chan`; then close it when finished; the result of `out-chan` will be the
serialized byte array (or an Exception, if one was thrown).
`out-chan` is closed automatically upon receiving a result; all chans and output streams are closed thereafter.
(let [{:keys [in-chan out-chan]} (serialize-async)]
(doseq [obj objects]
(a/put! in-chan obj))
(a/close! in-chan)
(let [[val] (a/alts!! [out-chan (a/timeout 1000)])]
(when (instance? Throwable val)
(throw val))
val))"
([]
(serialize-async {:max-bytes (* (public-settings/query-caching-max-kb) 1024)}))
([{:keys [max-bytes]}]
(let [in-chan (a/chan 1)
out-chan (a/promise-chan)
bos (ByteArrayOutputStream.)
os (-> (max-bytes-output-stream max-bytes bos)
BufferedOutputStream.
(GZIPOutputStream. true)
DataOutputStream.)]
(start-out-chan-close-block! in-chan out-chan bos os)
(start-input-loop! in-chan out-chan bos os)
{:in-chan in-chan, :out-chan out-chan})))
(log/tracef "Freezing %s" (pr-str obj))
(nippy/freeze-to-out! os obj)
(.flush os))
(defn do-with-serialization
"Create output streams for serializing QP results and invoke `f`, a function of the form
(f in-fn result-fn)
`in-fn` is of the form `(in-fn object)` and should be called once for each object that should be serialized. `in-fn`
will catch any exceptions thrown during serialization; these will be thrown later when invoking `result-fn`. After
the first exception `in-fn` will no-op for all subsequent calls.
When you have serialized *all* objects, call `result-fn` to get the serialized byte array. If an error was
encountered during serialization (such as the serialized bytes being longer than `max-bytes`), `result-fn` will
throw an Exception rather than returning a byte array; be sure to handle this case.
(do-with-serialization
(fn [in result]
(doseq [obj objects]
(in obj))
(result)))"
([f]
(do-with-serialization f {:max-bytes (* (public-settings/query-caching-max-kb) 1024)}))
([f {:keys [max-bytes]}]
(with-open [bos (ByteArrayOutputStream.)]
(let [os (-> (max-bytes-output-stream max-bytes bos)
BufferedOutputStream.
(GZIPOutputStream. true)
DataOutputStream.)
error (atom nil)]
(try
(f (fn in* [obj]
(when-not @error
(try
(freeze! os obj)
(catch Throwable e
(log/trace e "Caught error when freezing object")
(reset! error e))))
nil)
(fn result* []
(when @error
(throw @error))
(log/trace "Getting result byte array")
(.toByteArray bos)))
;; this is done manually instead of `with-open` because it might throw an Exception when we close it if it's
;; past the byte limit; that's fine and we can ignore it
(finally
(u/ignore-exceptions (.close os))))))))
(defn- thaw!
[^InputStream is]
......
......@@ -6,29 +6,7 @@
[metabase.query-processor.context.default :as context.default]
[metabase.util :as u]))
(defn- wire-up-context-channels!
"Wire up the core.async channels in a QP `context`."
[context]
;; 1) If query doesn't complete by `timeoutf`, call `timeoutf`, which should raise an Exception
;; 2) when `out-chan` is closed prematurely, send a message to `canceled-chan`
;; 3) when `out-chan` is closed or gets a result, close both out-chan and canceled-chan
(let [out-chan (context/out-chan context)
canceled-chan (context/canceled-chan context)
timeout (context/timeout context)]
(a/go
(let [[val port] (a/alts! [out-chan (a/timeout timeout)] :priority true)]
(log/tracef "Port %s got %s"
(if (= port out-chan) "out-chan" (format "[timeout after %s]" (u/format-milliseconds timeout)))
val)
(cond
(not= port out-chan) (context/timeoutf context)
(nil? val) (a/>!! canceled-chan ::cancel))
(log/tracef "Closing out-chan.")
(a/close! out-chan)
(a/close! canceled-chan)))
nil))
(defn pivot
(defn identity-qp
"The initial value of `qp` passed to QP middleware."
[query rff context]
(context/runf query rff context))
......@@ -39,9 +17,9 @@
(qp query rff context)"
([middleware]
(combine-middleware middleware pivot))
(combine-middleware middleware identity-qp))
([middleware pivot-fn]
([middleware qp]
(reduce
(fn [qp middleware]
(when (var? middleware)
......@@ -49,9 +27,43 @@
(if (some? middleware)
(middleware qp)
qp))
pivot-fn
qp
middleware)))
;; Why isn't this just done automatically when we create the context in [[context.default/default-context]]? The timeout
;; could be subject to change so it makes sense to wait until we actually run the query to wire stuff up. Also, since
;; we're doing
;;
;; (merge (context.default/default-context) context)
;;
;; all over the place, it probably reduces overhead a bit to not run around adding a bunch of timeouts to channels we
;; don't end up using.
(defn- wire-up-context-channels!
"Wire up the core.async channels in a QP `context`
1. If query doesn't complete by [[context/timeout]], call [[context/timeoutf]], which should raise an Exception.
2. When [[context/out-chan]] is closed prematurely, send a message to [[context/canceled-chan]].
3. When [[context/out-chan]] is closed or gets a result, close both [[context/out-chan]]
and [[context/canceled-chan]]."
[context]
(let [out-chan (context/out-chan context)
canceled-chan (context/canceled-chan context)
timeout (context/timeout context)]
(a/go
(let [[val port] (a/alts! [out-chan (a/timeout timeout)] :priority true)]
(log/tracef "Port %s got %s"
(if (= port out-chan) "out-chan" (format "[timeout after %s]" (u/format-milliseconds timeout)))
val)
(cond
(not= port out-chan) (context/timeoutf context)
(nil? val) (a/>!! canceled-chan ::cancel))
(log/tracef "Closing out-chan.")
(a/close! out-chan)
(a/close! canceled-chan)))
nil))
(def ^:dynamic *run-on-separate-thread?*
"Whether to run the query on a separate thread. When running a query asynchronously (i.e., with [[async-qp]]), this is
normally `true`, meaning the `out-chan` is returned immediately. When running a query synchronously (i.e., with
......@@ -76,22 +88,26 @@
(qp* query nil))
([query context]
(qp* query nil context))
([query rff context]
{:pre [(map? query) ((some-fn nil? map?) context)]}
(let [context (merge (context.default/default-context) context)]
(wire-up-context-channels! context)
(let [thunk (fn [] (try
(qp query (context/rff context) context)
(let [context (doto (merge (context.default/default-context) context)
wire-up-context-channels!)
rff (or rff
(context/rff context))
thunk (fn [] (try
(qp query rff context)
(catch Throwable e
(context/raisef e context))))]
(log/tracef "Running on separate thread? %s" *run-on-separate-thread?*)
(if *run-on-separate-thread?*
(future (thunk))
(thunk)))
(log/tracef "Running on separate thread? %s" *run-on-separate-thread?*)
(if *run-on-separate-thread?*
(future (thunk))
(thunk))
(context/out-chan context)))))
(defn- wait-for-async-result [out-chan]
{:pre [(async.u/promise-chan? out-chan)]}
;; TODO - consider whether we should have another timeout here as well
(let [result (a/<!! out-chan)]
(if (instance? Throwable result)
(throw result)
......@@ -105,14 +121,9 @@
(qp query context)"
[qp]
{:pre [(fn? qp)]}
(fn qp*
([query]
(wait-for-async-result (binding [*run-on-separate-thread?* false]
(qp query))))
([query context]
(wait-for-async-result (binding [*run-on-separate-thread?* false]
(qp query context))))))
(fn qp* [& args]
(binding [*run-on-separate-thread?* false]
(wait-for-async-result (apply qp args)))))
;;; ------------------------------------------------- Other Util Fns -------------------------------------------------
......
(ns metabase.query-processor.middleware.cache.impl-test
(:require [clojure.core.async :as a]
[clojure.test :refer :all]
(:require [clojure.test :refer :all]
[metabase.query-processor.middleware.cache.impl :as impl]
[potemkin.types :as p.types])
(:import java.io.ByteArrayInputStream))
......@@ -27,26 +26,26 @@
(reduce rf (rf) rows)))))))
(deftest e2e-test
(let [{:keys [in-chan out-chan]} (impl/serialize-async)]
(doseq [obj objects]
(a/put! in-chan obj))
(a/close! in-chan)
(let [[val] (a/alts!! [out-chan (a/timeout 1000)])]
(is (= objects
(if (instance? Throwable val)
(throw val)
(deserialize val)))))))
(impl/do-with-serialization
(fn [in result]
(doseq [obj objects]
(is (= nil
(in obj))))
(let [val (result)]
(is (instance? (Class/forName "[B") val))
(is (= objects
(if (instance? Throwable val)
(throw val)
(deserialize val))))))))
(deftest max-bytes-test
(let [{:keys [in-chan out-chan]} (impl/serialize-async {:max-bytes 50})]
(doseq [obj objects]
(a/put! in-chan obj))
(a/close! in-chan)
(let [[val] (a/alts!! [out-chan (a/timeout 1000)])]
(is (thrown-with-msg?
Exception
#"Results are too large to cache\."
(if (instance? Throwable val)
(throw val)
val)))
nil)))
(impl/do-with-serialization
(fn [in result]
(doseq [obj objects]
(is (= nil
(in obj))))
(is (thrown-with-msg?
Exception
#"Results are too large to cache\."
(result))))
{:max-bytes 50}))
......@@ -13,12 +13,13 @@
[metabase.models.query :as query :refer [Query]]
[metabase.public-settings :as public-settings]
[metabase.query-processor :as qp]
[metabase.query-processor.context.default :as context.default]
[metabase.query-processor.context.default :as context.default]
[metabase.query-processor.middleware.cache :as cache]
[metabase.query-processor.middleware.cache-backend.interface :as i]
[metabase.query-processor.middleware.cache.impl :as impl]
[metabase.query-processor.middleware.cache.impl-test :as impl-test]
[metabase.query-processor.middleware.process-userland-query :as process-userland-query]
[metabase.query-processor.reducible :as qp.reducible]
[metabase.query-processor.streaming :as qp.streaming]
[metabase.query-processor.util :as qputil]
[metabase.server.middleware.session :as session]
......@@ -32,6 +33,10 @@
(use-fixtures :once (fixtures/initialize :db))
(use-fixtures :each (fn [thunk]
(mt/with-log-level :fatal
(thunk))))
(def ^:private ^:dynamic *save-chan*
"Gets a message whenever results are saved to the test backend, or if the reducing function stops serializing results
because of an Exception or if the byte threshold is passed."
......@@ -52,7 +57,7 @@
pretty/PrettyPrintable
(pretty [_]
(str "\n"
(metabase.util/pprint-to-str 'blue
(u/pprint-to-str 'blue
(for [[hash {:keys [created]}] @store]
[hash (u/format-nanoseconds (.getNano (t/duration created (t/instant))))]))))
......@@ -64,7 +69,7 @@
i/CacheBackend
(cached-results [this query-hash max-age-seconds respond]
(let [hex-hash (codecs/bytes->hex query-hash)]
(log/tracef "Fetch results for %s store: %s" hex-hash this)
(log/tracef "Fetch results for %s store: %s" hex-hash (pretty/pretty this))
(if-let [^bytes results (when-let [{:keys [created results]} (some (fn [[hash entry]]
(when (= hash hex-hash)
entry))
......@@ -79,7 +84,7 @@
(let [hex-hash (codecs/bytes->hex query-hash)]
(swap! store assoc hex-hash {:results results
:created (t/instant)})
(log/tracef "Save results for %s --> store: %s" hex-hash this))
(log/tracef "Save results for %s --> store: %s" hex-hash (pretty/pretty this)))
(a/>!! save-chan results))
(purge-old-entries! [this max-age-seconds]
......@@ -87,28 +92,28 @@
(into {} (filter (fn [[_ {:keys [created]}]]
(t/after? created (t/minus (t/instant) (t/seconds max-age-seconds))))
store))))
(log/tracef "Purge old entries --> store: %s" this)
(log/tracef "Purge old entries --> store: %s" (pretty/pretty this))
(a/>!! purge-chan ::purge)))))
(defn do-with-mock-cache [f]
(mt/with-open-channels [save-chan (a/chan 1)
purge-chan (a/chan 1)]
(mt/with-open-channels [save-chan (a/chan 10)
purge-chan (a/chan 10)]
(mt/with-temporary-setting-values [enable-query-caching true
query-caching-max-ttl 60
query-caching-min-ttl 0]
(binding [cache/*backend* (test-backend save-chan purge-chan)
*save-chan* save-chan
*purge-chan* purge-chan]
(let [orig (var-get #'cache/cache-results-async!)]
(with-redefs [cache/cache-results-async! (fn [hash out-chan]
(a/go
;; if `save-results!` isn't going to get called because
;; `out-chan` isn't a byte array then forward the result to
;; `save-chan` so it always gets a value
(let [result (a/<! out-chan)]
(when-not (bytes? result)
(a/>!! save-chan result))))
(orig hash out-chan))]
(let [orig @#'cache/serialized-bytes]
(with-redefs [cache/serialized-bytes (fn []
;; if `save-results!` isn't going to get called because `*result-fn*`
;; throws an Exception, catch it and send it to `save-chan` so it still
;; gets a result and tests can finish
(try
(orig)
(catch Throwable e
(a/>!! save-chan e)
(throw e))))]
(f {:save-chan save-chan, :purge-chan purge-chan})))))))
(defmacro with-mock-cache [[& bindings] & body]
......@@ -123,27 +128,30 @@
;; clear out stale values in save/purge channels
(while (a/poll! *save-chan*))
(while (a/poll! *purge-chan*))
(:metadata
(mt/test-qp-middleware
cache/maybe-return-cached-results
(test-query query-kvs)
{}
[[:toucan 71]
[:bald-eagle 92]
[:hummingbird 11]
[:owl 10]
[:chicken 69]
[:robin 96]
[:osprey 72]
[:flamingo 70]]
{:timeout 2000
:run (fn []
(Thread/sleep *query-execution-delay-ms*))})))
(let [qp (qp.reducible/sync-qp
(qp.reducible/async-qp
(cache/maybe-return-cached-results qp.reducible/identity-qp)))
metadata {}
rows [[:toucan 71]
[:bald-eagle 92]
[:hummingbird 11]
[:owl 10]
[:chicken 69]
[:robin 96]
[:osprey 72]
[:flamingo 70]]
query (test-query query-kvs)
context {:timeout 2000
:executef (fn [_driver _query _context respond]
(Thread/sleep *query-execution-delay-ms*)
(respond metadata rows))}]
(-> (qp query context)
(assoc :data {}))))
(defn- run-query [& args]
(let [result (apply run-query* args)]
(is (= :completed
(:status result)))
(is (partial= {:status :completed}
result))
(if (:cached result)
:cached
:not-cached)))
......@@ -274,9 +282,8 @@
(some? input-stream))))))
(i/save-results! cache/*backend* query-hash (byte-array [0 0 0]))
(testing "Invalid cache entry should be handled gracefully"
(mt/suppress-output
(is (= :not-cached
(run-query)))))))))
(is (= :not-cached
(run-query))))))))
(deftest metadata-test
(testing "Verify that correct metadata about caching such as `:updated_at` and `:cached` come back with cached results."
......@@ -285,8 +292,6 @@
(run-query)
(mt/wait-for-result save-chan)
(let [result (run-query*)]
(is (= true
(:cached result)))
(is (= {:data {}
:cached true
:updated_at #t "2020-02-19T02:31:07.798Z[UTC]"
......
......@@ -147,7 +147,7 @@
(testing "Rows don't actually have to be reducible. And you can build your own QP with your own middleware."
(is (= {:data {:cols [{:name "n"}]
:rows [{:n 1} {:n 2} {:n 3} {:n 4} {:n 5}]}}
((qp.reducible/sync-qp (qp.reducible/async-qp qp.reducible/pivot))
((qp.reducible/sync-qp (qp.reducible/async-qp qp.reducible/identity-qp))
{}
{:executef (fn [_ _ _ respond]
(respond {:cols [{:name "n"}]}
......
......@@ -3,6 +3,7 @@
(Prefer using `metabase.test` to requiring bits and pieces from these various namespaces going forward, since it
reduces the cognitive load required to write tests.)"
(:refer-clojure :exclude [compile])
(:require clojure.data
[clojure.test :refer :all]
[clojure.tools.macro :as tools.macro]
......
......@@ -56,9 +56,11 @@
"Execute `body` with all logging/`*out*`/`*err*` messages suppressed. Useful for avoiding cluttering up test output
for tests with stacktraces and error messages from tests that are supposed to fail.
DEPRECATED -- you don't need to do this anymore. Tests now have a default log level of `CRITICAL` which means error
DEPRECATED -- you don't need to do this anymore. Tests now have a default log level of `FATAL` which means error
logging will be suppressed by default. This macro predates the current test logging levels. You can remove usages of
this macro."
this macro.
If you want to suppress log messages for REPL usage you can use [[with-log-level]] instead."
{:style/indent 0}
[& body]
`(do-with-suppressed-output (fn [] ~@body)))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment