Skip to content
Snippets Groups Projects
Unverified Commit 2b0b0e3a authored by Cam Saul's avatar Cam Saul Committed by GitHub
Browse files

Merge pull request #8866 from metabase/cache-using-stream

Switch from caching estimates to actuals
parents 1b269b2d 6f2b86a3
No related branches found
No related tags found
No related merge requests found
......@@ -145,7 +145,7 @@
:profiles {:dev {:dependencies [[clj-http-fake "1.0.3"] ; Library to mock clj-http responses
[expectations "2.2.0-beta2"] ; unit tests
[ring/ring-mock "0.3.0"]] ; Library to create mock Ring requests for unit tests
:plugins [[docstring-checker "1.0.2"] ; Check that all public vars have docstrings. Run with 'lein docstring-checker'
:plugins [[docstring-checker "1.0.3"] ; Check that all public vars have docstrings. Run with 'lein docstring-checker'
[jonase/eastwood "0.3.1"
:exclusions [org.clojure/clojure]] ; Linting
[lein-bikeshed "0.4.1"] ; Linting
......
......@@ -5111,3 +5111,18 @@ databaseChangeLog:
name: settings
type: text
remarks: 'Serialized JSON FE-specific settings like formatting, etc. Scope of what is stored here may increase in future.'
#
# Change MySQL/Maria's blob type to LONGBLOB to more closely match what H2 and PostgreSQL support for size limits
#
- changeSet:
id: 97
author: senior
comment: 'Added 0.32.0'
preconditions:
- onFail: MARK_RAN
- dbms: mysql, mariadb
changes:
- modifyDataType:
tableName: query_cache
columnName: results
newDataType: longblob
......@@ -12,7 +12,9 @@
[toucan
[models :as models]
[util :as toucan-util]])
(:import java.sql.Blob))
(:import [java.io BufferedInputStream ByteArrayInputStream DataInputStream]
java.sql.Blob
java.util.zip.GZIPInputStream))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | Toucan Extensions |
......@@ -109,20 +111,19 @@
:in encryption/maybe-encrypt
:out (comp encryption/maybe-decrypt u/jdbc-clob->str))
(defn compress
"Compress OBJ, returning a byte array."
[obj]
(nippy/freeze obj {:compressor nippy/snappy-compressor}))
(defn decompress
"Decompress COMPRESSED-BYTES."
[compressed-bytes]
(if (instance? Blob compressed-bytes)
(recur (.getBytes ^Blob compressed-bytes 0 (.length ^Blob compressed-bytes)))
(nippy/thaw compressed-bytes {:compressor nippy/snappy-compressor})))
(with-open [bis (ByteArrayInputStream. compressed-bytes)
bif (BufferedInputStream. bis)
gz-in (GZIPInputStream. bif)
data-in (DataInputStream. gz-in)]
(nippy/thaw-from-in! data-in))))
(models/add-type! :compressed
:in compress
:in identity
:out decompress)
(defn- validate-cron-string [s]
......
......@@ -92,13 +92,27 @@
:type :boolean
:default false)
(def ^:private ^:const global-max-caching-kb
"Although depending on the database, we can support much larger cached values (1GB for PG, 2GB for H2 and 4GB for
MySQL) we are not curretly setup to deal with data of that size. The datatypes we are using will hold this data in
memory and will not truly be streaming. This is a global max in order to prevent our users from setting the caching
value so high it becomes a performance issue. The value below represents 200MB"
(* 200 1024))
(defsetting query-caching-max-kb
(tru "The maximum size of the cache, per saved question, in kilobytes:")
;; (This size is a measurement of the length of *uncompressed* serialized result *rows*. The actual size of
;; the results as stored will vary somewhat, since this measurement doesn't include metadata returned with the
;; results, and doesn't consider whether the results are compressed, as the `:db` backend does.)
:type :integer
:default 1000)
:default 1000
:setter (fn [new-value]
(when (> new-value global-max-caching-kb)
(throw (IllegalArgumentException.
(str
(tru "Failed setting `query-caching-max-kb` to {0}." new-value)
(tru "Values greater than {1} are not allowed." global-max-caching-kb)))))
(setting/set-integer! :query-caching-max-kb new-value)))
(defsetting query-caching-max-ttl
(tru "The absolute maximum time to keep any cached query results, in seconds.")
......
......@@ -89,26 +89,8 @@
(boolean (and (public-settings/enable-query-caching)
cache-ttl)))
(defn- results-are-below-max-byte-threshold?
"Measure the size of the `:rows` in QUERY-RESULTS and see whether they're smaller than `query-caching-max-kb`
*before* compression."
^Boolean [{{rows :rows} :data}]
(let [max-bytes (* (public-settings/query-caching-max-kb) 1024)]
;; We don't want to serialize the entire result set since that could explode if the query is one that returns a
;; huge number of rows. (We also want to keep `:rows` lazy.)
;; So we'll serialize one row at a time, and keep a running total of bytes; if we pass the `query-caching-max-kb`
;; threshold, we'll fail right away.
(loop [total-bytes 0, [row & more] rows]
(cond
(> total-bytes max-bytes) false
(not row) true
:else (recur (+ total-bytes (count (str row)))
more)))))
(defn- save-results-if-successful! [query-hash results]
(when (and (= (:status results) :completed)
(or (results-are-below-max-byte-threshold? results)
(log/info "Results are too large to cache." (u/emoji "😫"))))
(when (= (:status results) :completed)
(save-results! query-hash results)))
(defn- run-query-and-save-results-if-successful! [query-hash qp query]
......@@ -127,7 +109,6 @@
(or (cached-results query-hash cache-ttl)
(run-query-and-save-results-if-successful! query-hash qp query))))
(defn maybe-return-cached-results
"Middleware for caching results of a query if applicable.
In order for a query to be eligible for caching:
......
(ns metabase.query-processor.middleware.cache-backend.db
(:require [metabase.models
[interface :as models]
[query-cache :refer [QueryCache]]]
[metabase.public-settings :as public-settings]
(:require [clojure.tools.logging :as log]
[metabase
[public-settings :as public-settings]
[util :as u]]
[metabase.models.query-cache :refer [QueryCache]]
[metabase.query-processor.middleware.cache-backend.interface :as i]
[metabase.util.date :as du]
[toucan.db :as db]))
[taoensso.nippy :as nippy]
[toucan.db :as db])
(:import [java.io BufferedOutputStream ByteArrayOutputStream DataOutputStream]
java.util.zip.GZIPOutputStream))
(defn- cached-results
"Return cached results for QUERY-HASH if they exist and are newer than MAX-AGE-SECONDS."
......@@ -23,17 +27,71 @@
:updated_at [:<= (du/->Timestamp (- (System/currentTimeMillis)
(* 1000 (public-settings/query-caching-max-ttl))))]))
(defn- throw-if-max-exceeded [max-num-bytes bytes-in-flight]
(when (< max-num-bytes bytes-in-flight)
(throw (ex-info "Exceeded the max number of bytes" {:type ::max-bytes}))))
(defn- limited-byte-output-stream
"Returns a `FilterOutputStream` that will throw an exception if more than `max-num-bytes` are written to
`output-stream`"
[max-num-bytes output-stream]
(let [bytes-so-far (atom 0)]
(proxy [java.io.FilterOutputStream] [output-stream]
(write
([byte-or-byte-array]
(let [^java.io.OutputStream this this]
(if-let [^bytes byte-arr (and (bytes? byte-or-byte-array)
byte-or-byte-array)]
(do
(swap! bytes-so-far + (alength byte-arr))
(throw-if-max-exceeded max-num-bytes @bytes-so-far)
(proxy-super write byte-arr))
(let [^byte b byte-or-byte-array]
(swap! bytes-so-far inc)
(throw-if-max-exceeded max-num-bytes @bytes-so-far)
(proxy-super write b)))))
([byte-arr offset length]
(let [^java.io.OutputStream this this]
(swap! bytes-so-far + length)
(throw-if-max-exceeded max-num-bytes @bytes-so-far)
(proxy-super write byte-arr offset length)))))))
(defn- compress-until-max
"Compresses `results` and returns a byte array. If more than `max-bytes` is written, `::exceeded-max-bytes` is
returned."
[max-bytes results]
(try
(let [bos (ByteArrayOutputStream.)
lbos (limited-byte-output-stream max-bytes bos)]
(with-open [buff-out (BufferedOutputStream. lbos)
gz-out (GZIPOutputStream. buff-out)
data-out (DataOutputStream. gz-out)]
(nippy/freeze-to-out! data-out results))
(.toByteArray bos))
(catch clojure.lang.ExceptionInfo e
(if (= ::max-bytes (:type (ex-data e)))
::exceeded-max-bytes
(throw e)))))
(defn- save-results!
"Save the RESULTS of query with QUERY-HASH, updating an existing QueryCache entry
if one already exists, otherwise creating a new entry."
[query-hash results]
(purge-old-cache-entries!)
(or (db/update-where! QueryCache {:query_hash query-hash}
:updated_at (du/new-sql-timestamp)
:results (models/compress results)) ; have to manually call these here since Toucan doesn't call type conversion fns for update-where! (yet)
(db/insert! QueryCache
:query_hash query-hash
:results results))
;; Explicitly compressing the results here rather than having Toucan compress it automatically. This allows us to
;; get the size of the compressed output to decide whether or not to store it.
(let [max-bytes (* (public-settings/query-caching-max-kb) 1024)
compressed-results (compress-until-max max-bytes results)]
(if-not (= ::exceeded-max-bytes compressed-results)
(do
(purge-old-cache-entries!)
(or (db/update-where! QueryCache {:query_hash query-hash}
:updated_at (du/new-sql-timestamp)
:results compressed-results)
(db/insert! QueryCache
:query_hash query-hash
:results compressed-results)))
(log/info "Results are too large to cache." (u/emoji "😫"))))
:ok)
(def instance
......
(ns metabase.query-processor.middleware.cache-backend.db-test
(:require [expectations :refer :all]
[metabase.query-processor.middleware.cache-backend.db :as cache-db]
[metabase.test.util :as tu]))
(defn- in-kb [x]
(* 1024 x))
;; We should successfully compress data smaller than the max and return the byte array
(expect
(bytes? (#'cache-db/compress-until-max (in-kb 10) (range 1 10))))
;; If the data is more than the max allowed, return `:exceeded-max-bytes`
(expect
::cache-db/exceeded-max-bytes
(#'cache-db/compress-until-max 10 (repeat 10000 (range 1 10))))
......@@ -54,31 +54,6 @@
(tu/with-temporary-setting-values [enable-query-caching true]
(#'cache/is-cacheable? {:cache-ttl nil})))
;;; ------------------------------------- results-are-below-max-byte-threshold? --------------------------------------
(expect
(tu/with-temporary-setting-values [query-caching-max-kb 128]
(#'cache/results-are-below-max-byte-threshold? {:data {:rows [[1 "ABCDEF"]
[3 "GHIJKL"]]}})))
(expect
false
(tu/with-temporary-setting-values [query-caching-max-kb 1]
(#'cache/results-are-below-max-byte-threshold? {:data {:rows (repeat 500 [1 "ABCDEF"])}})))
;; check that `#'cache/results-are-below-max-byte-threshold?` is lazy and fails fast if the query is over the
;; threshold rather than serializing the entire thing
(expect
false
(let [lazy-seq-realized? (atom false)]
(tu/with-temporary-setting-values [query-caching-max-kb 1]
(#'cache/results-are-below-max-byte-threshold? {:data {:rows (lazy-cat (repeat 500 [1 "ABCDEF"])
(do (reset! lazy-seq-realized? true)
[2 "GHIJKL"]))}})
@lazy-seq-realized?)))
;;; ------------------------------------------ End-to-end middleware tests -------------------------------------------
;; if there's nothing in the cache, cached results should *not* be returned
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment