Skip to content
Snippets Groups Projects
Unverified Commit a1ab89e8 authored by Cam Saul's avatar Cam Saul Committed by GitHub
Browse files

Fix cached metadata (#12207)

* New & improved caching tests

* Fix cached results metadata for native queries. [ci postgres] [ci mysql]

* Test/lint fixes [ci postgres] [ci mysql]

* Test fix [ci postgres]

* Test fix [ci postgres] [ci mysql]
parent ea7296e3
No related branches found
No related tags found
No related merge requests found
......@@ -13,6 +13,7 @@
(:require [clojure.core.async :as a]
[clojure.tools.logging :as log]
[java-time :as t]
[medley.core :as m]
[metabase
[config :as config]
[public-settings :as public-settings]
......@@ -32,8 +33,8 @@
(def ^:private cache-version
"Current serialization format version. Basically
[initial-metadata & rows]"
2)
[initial-metadata row-1 row-2 ... row-n final-metadata]"
3)
;; TODO - Why not make this an option in the query itself? :confused:
(def ^:dynamic ^Boolean *ignore-cached-results*
......@@ -48,42 +49,32 @@
;;; ------------------------------------------------------ Save ------------------------------------------------------
;; purge runs on a loop that gets triggered whenever a new cache entry is saved. On each save, `purge-chan` is sent a
;; `::purge` message; the channel itself uses a sliding buffer of size 1, so additional messages are dropped. Thus
;; purge actions won't queue up if multiple queries are ran before we get a chance to finish the last one.
(defn- purge! [backend]
(try
(log/tracef "Purging old cache entires older than %s" (u/format-seconds (public-settings/query-caching-max-ttl)))
(log/tracef "Purging cache entires older than %s" (u/format-seconds (public-settings/query-caching-max-ttl)))
(i/purge-old-entries! backend (public-settings/query-caching-max-ttl))
(catch Throwable e
(log/error e (trs "Error purging old cache entires")))))
(defonce ^:private purge-chan
(delay
(let [purge-chan (a/chan (a/sliding-buffer 1))]
(a/go-loop []
(when-let [backend (a/<! purge-chan)]
(a/<! (a/thread (purge! backend)))
(recur)))
purge-chan)))
(defn- purge-async! []
(log/tracef "Sending async purge message to purge-chan")
(a/put! @purge-chan *backend*))
(defn- min-duration-ms
"Minimum duration it must take a query to complete in order for it to be eligable for caching."
"Minimum duration it must take a query to complete in order for it to be eligible for caching."
[]
(* (public-settings/query-caching-min-ttl) 1000))
(defn- cache-results-async! [query-hash out-chan]
(defn- cache-results-async!
"Save the results of a query asynchronously once they are delivered (as a byte array) to the promise channel
`out-chan`."
[query-hash out-chan]
(log/info (trs "Caching results for next time for query with hash {0}." (pr-str (i/short-hex-hash query-hash))) (u/emoji "💾"))
(a/go
(let [x (a/<! out-chan)]
(if (instance? Throwable x)
(when-not (= (:type (ex-data x)) ::impl/max-bytes)
(condp instance? x
Throwable
(if (= (:type (ex-data x)) ::impl/max-bytes)
(log/debug x (trs "Not caching results: results are larger than {0} KB" (public-settings/query-caching-max-kb)))
(log/error x (trs "Error saving query results to cache.")))
(Class/forName "[B")
(let [y (a/<! (a/thread
(try
(i/save-results! *backend* query-hash x)
......@@ -93,7 +84,9 @@
(log/error y (trs "Error saving query results to cache."))
(do
(log/debug (trs "Successfully cached results for query."))
(purge-async!))))))))
(purge! *backend*))))
(log/error (trs "Cannot cache results: expected byte array, got {0}" (class x)))))))
(defn- save-results-xform [start-time metadata query-hash rf]
(let [{:keys [in-chan out-chan]} (impl/serialize-async)]
......@@ -104,6 +97,7 @@
([] (rf))
([result]
(a/put! in-chan (if (map? result) (m/dissoc-in result [:data :rows]) {}))
(a/close! in-chan)
(let [duration-ms (- (System/currentTimeMillis) start-time)]
(log/info (trs "Query took {0} to run; miminum for cache eligibility is {1}"
......@@ -118,20 +112,33 @@
;;; ----------------------------------------------------- Fetch ------------------------------------------------------
(defn- add-cached-metadata-xform [rf]
(fn
([] (rf))
([acc]
(rf (if-not (map? acc)
acc
(-> acc
(assoc :cached true
:updated_at (get-in acc [:data :last-ran]))
(update :data dissoc :last-ran :cache-version)))))
([acc row]
(rf acc row))))
(defn- cached-results-rff
"Reducing function for cached results. Merges the final object in the cached results, the `final-metdata` map, with
the reduced value assuming it is a normal metadata map."
[rff]
(fn [{:keys [last-ran], :as metadata}]
(let [metadata (dissoc metadata :last-ran :cache-version)
rf (rff metadata)
final-metadata (volatile! nil)]
(fn
([]
(rf))
([acc]
;; if results are in the 'normal format' the use return the final metadata from the cache rather than
;; whatever `acc` is right now since we don't run the entire post-processing pipeline for cached results
(let [normal-format? (and (map? acc) (seq (get-in acc [:data :cols])))
acc* (-> (if normal-format?
@final-metadata
acc)
(assoc :cached true, :updated_at last-ran))]
(rf acc*)))
([acc row]
(if (map? row)
(do (vreset! final-metadata row)
(rf acc))
(rf acc row)))))))
(defn- maybe-reduce-cached-results
"Reduces cached results if there is a hit. Otherwise, returns `::miss` directly."
......@@ -143,13 +150,13 @@
(i/with-cached-results *backend* query-hash max-age-seconds [is]
(when is
(impl/with-reducible-deserialized-results [[metadata reducible-rows] is]
(when reducible-rows
(let [rff* (fn [metadata]
(add-cached-metadata-xform (rff metadata)))]
(log/tracef "Reducing cached rows...")
(context/reducef rff* context metadata reducible-rows)
(log/tracef "All cached rows reduced")
::ok))))))
(log/tracef "Found cached results. Version: %s" (pr-str (:cache-version metadata)))
(when (and (= (:cache-version metadata) cache-version)
reducible-rows)
(log/tracef "Reducing cached rows...")
(context/reducef (cached-results-rff rff) context metadata reducible-rows)
(log/tracef "All cached rows reduced")
::ok)))))
::miss)
(catch EofException _
(log/debug (trs "Request is closed; no one to return cached results to"))
......
......@@ -506,8 +506,8 @@
(mt/rows results))))
(testing "cols"
(is (= [{:display_name "sleep"
:base_type :type/*
:base_type :type/Text
:source :native
:field_ref [:field-literal "sleep" :type/*]
:field_ref [:field-literal "sleep" :type/Text]
:name "sleep"}]
(mt/cols results))))))))
This diff is collapsed.
......@@ -170,14 +170,14 @@
with-test-drivers])
(defn do-with-clock [clock thunk]
(let [clock (cond
(t/clock? clock) clock
(t/zoned-date-time? clock) (t/mock-clock (t/instant clock) (t/zone-id clock))
:else (throw (Exception. (format "Invalid clock: ^%s %s"
(.getName (class clock))
(pr-str clock)))))]
(t/with-clock clock
(testing (format "\nsystem clock = %s" (pr-str clock))
(testing (format "\nsystem clock = %s" (pr-str clock))
(let [clock (cond
(t/clock? clock) clock
(t/zoned-date-time? clock) (t/mock-clock (t/instant clock) (t/zone-id clock))
:else (throw (Exception. (format "Invalid clock: ^%s %s"
(.getName (class clock))
(pr-str clock)))))]
(t/with-clock clock
(thunk)))))
(defmacro with-clock
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment