diff --git a/project.clj b/project.clj index 438a859093833310f84e550662116d7fa024733e..8763129c658c88248b25d460cfe3f831a0dd4416 100644 --- a/project.clj +++ b/project.clj @@ -145,7 +145,7 @@ :profiles {:dev {:dependencies [[clj-http-fake "1.0.3"] ; Library to mock clj-http responses [expectations "2.2.0-beta2"] ; unit tests [ring/ring-mock "0.3.0"]] ; Library to create mock Ring requests for unit tests - :plugins [[docstring-checker "1.0.2"] ; Check that all public vars have docstrings. Run with 'lein docstring-checker' + :plugins [[docstring-checker "1.0.3"] ; Check that all public vars have docstrings. Run with 'lein docstring-checker' [jonase/eastwood "0.3.1" :exclusions [org.clojure/clojure]] ; Linting [lein-bikeshed "0.4.1"] ; Linting diff --git a/resources/migrations/000_migrations.yaml b/resources/migrations/000_migrations.yaml index 06b4a36808d9382b4b0ec0a9913459f778fbff8a..d61dff14fa6a926976eac5ef4c78f167123c71fb 100644 --- a/resources/migrations/000_migrations.yaml +++ b/resources/migrations/000_migrations.yaml @@ -5111,3 +5111,18 @@ databaseChangeLog: name: settings type: text remarks: 'Serialized JSON FE-specific settings like formatting, etc. Scope of what is stored here may increase in future.' +# +# Change MySQL/Maria's blob type to LONGBLOB to more closely match what H2 and PostgreSQL support for size limits +# + - changeSet: + id: 97 + author: senior + comment: 'Added 0.32.0' + preconditions: + - onFail: MARK_RAN + - dbms: mysql, mariadb + changes: + - modifyDataType: + tableName: query_cache + columnName: results + newDataType: longblob diff --git a/src/metabase/models/interface.clj b/src/metabase/models/interface.clj index d9015f7baa28568a71d372f75956947fb6392feb..8335c2f5ddacbf56dcdddda35953386c0ec87b72 100644 --- a/src/metabase/models/interface.clj +++ b/src/metabase/models/interface.clj @@ -12,7 +12,9 @@ [toucan [models :as models] [util :as toucan-util]]) - (:import java.sql.Blob)) + (:import [java.io BufferedInputStream ByteArrayInputStream DataInputStream] + java.sql.Blob + java.util.zip.GZIPInputStream)) ;;; +----------------------------------------------------------------------------------------------------------------+ ;;; | Toucan Extensions | @@ -109,20 +111,19 @@ :in encryption/maybe-encrypt :out (comp encryption/maybe-decrypt u/jdbc-clob->str)) -(defn compress - "Compress OBJ, returning a byte array." - [obj] - (nippy/freeze obj {:compressor nippy/snappy-compressor})) - (defn decompress "Decompress COMPRESSED-BYTES." [compressed-bytes] (if (instance? Blob compressed-bytes) (recur (.getBytes ^Blob compressed-bytes 0 (.length ^Blob compressed-bytes))) - (nippy/thaw compressed-bytes {:compressor nippy/snappy-compressor}))) + (with-open [bis (ByteArrayInputStream. compressed-bytes) + bif (BufferedInputStream. bis) + gz-in (GZIPInputStream. bif) + data-in (DataInputStream. gz-in)] + (nippy/thaw-from-in! data-in)))) (models/add-type! :compressed - :in compress + :in identity :out decompress) (defn- validate-cron-string [s] diff --git a/src/metabase/public_settings.clj b/src/metabase/public_settings.clj index 01354c9c157971f8d09a2662856c2c308d918d5d..5a99f4099909740986e553e99988d98b487193a6 100644 --- a/src/metabase/public_settings.clj +++ b/src/metabase/public_settings.clj @@ -92,13 +92,27 @@ :type :boolean :default false) +(def ^:private ^:const global-max-caching-kb + "Although depending on the database, we can support much larger cached values (1GB for PG, 2GB for H2 and 4GB for + MySQL) we are not curretly setup to deal with data of that size. The datatypes we are using will hold this data in + memory and will not truly be streaming. This is a global max in order to prevent our users from setting the caching + value so high it becomes a performance issue. The value below represents 200MB" + (* 200 1024)) + (defsetting query-caching-max-kb (tru "The maximum size of the cache, per saved question, in kilobytes:") ;; (This size is a measurement of the length of *uncompressed* serialized result *rows*. The actual size of ;; the results as stored will vary somewhat, since this measurement doesn't include metadata returned with the ;; results, and doesn't consider whether the results are compressed, as the `:db` backend does.) :type :integer - :default 1000) + :default 1000 + :setter (fn [new-value] + (when (> new-value global-max-caching-kb) + (throw (IllegalArgumentException. + (str + (tru "Failed setting `query-caching-max-kb` to {0}." new-value) + (tru "Values greater than {1} are not allowed." global-max-caching-kb))))) + (setting/set-integer! :query-caching-max-kb new-value))) (defsetting query-caching-max-ttl (tru "The absolute maximum time to keep any cached query results, in seconds.") diff --git a/src/metabase/query_processor/middleware/cache.clj b/src/metabase/query_processor/middleware/cache.clj index 6589b27db0d27c149b0107025058ab1370ea8303..130da62e8c7442e6414334cf8acbd4d341c05407 100644 --- a/src/metabase/query_processor/middleware/cache.clj +++ b/src/metabase/query_processor/middleware/cache.clj @@ -89,26 +89,8 @@ (boolean (and (public-settings/enable-query-caching) cache-ttl))) -(defn- results-are-below-max-byte-threshold? - "Measure the size of the `:rows` in QUERY-RESULTS and see whether they're smaller than `query-caching-max-kb` - *before* compression." - ^Boolean [{{rows :rows} :data}] - (let [max-bytes (* (public-settings/query-caching-max-kb) 1024)] - ;; We don't want to serialize the entire result set since that could explode if the query is one that returns a - ;; huge number of rows. (We also want to keep `:rows` lazy.) - ;; So we'll serialize one row at a time, and keep a running total of bytes; if we pass the `query-caching-max-kb` - ;; threshold, we'll fail right away. - (loop [total-bytes 0, [row & more] rows] - (cond - (> total-bytes max-bytes) false - (not row) true - :else (recur (+ total-bytes (count (str row))) - more))))) - (defn- save-results-if-successful! [query-hash results] - (when (and (= (:status results) :completed) - (or (results-are-below-max-byte-threshold? results) - (log/info "Results are too large to cache." (u/emoji "😫")))) + (when (= (:status results) :completed) (save-results! query-hash results))) (defn- run-query-and-save-results-if-successful! [query-hash qp query] @@ -127,7 +109,6 @@ (or (cached-results query-hash cache-ttl) (run-query-and-save-results-if-successful! query-hash qp query)))) - (defn maybe-return-cached-results "Middleware for caching results of a query if applicable. In order for a query to be eligible for caching: diff --git a/src/metabase/query_processor/middleware/cache_backend/db.clj b/src/metabase/query_processor/middleware/cache_backend/db.clj index b6400b62480a1eb89dc96239affb0f1746a70f32..d51adb1ee55f990d94f5cdd70d4150b9ff7acd74 100644 --- a/src/metabase/query_processor/middleware/cache_backend/db.clj +++ b/src/metabase/query_processor/middleware/cache_backend/db.clj @@ -1,11 +1,15 @@ (ns metabase.query-processor.middleware.cache-backend.db - (:require [metabase.models - [interface :as models] - [query-cache :refer [QueryCache]]] - [metabase.public-settings :as public-settings] + (:require [clojure.tools.logging :as log] + [metabase + [public-settings :as public-settings] + [util :as u]] + [metabase.models.query-cache :refer [QueryCache]] [metabase.query-processor.middleware.cache-backend.interface :as i] [metabase.util.date :as du] - [toucan.db :as db])) + [taoensso.nippy :as nippy] + [toucan.db :as db]) + (:import [java.io BufferedOutputStream ByteArrayOutputStream DataOutputStream] + java.util.zip.GZIPOutputStream)) (defn- cached-results "Return cached results for QUERY-HASH if they exist and are newer than MAX-AGE-SECONDS." @@ -23,17 +27,71 @@ :updated_at [:<= (du/->Timestamp (- (System/currentTimeMillis) (* 1000 (public-settings/query-caching-max-ttl))))])) +(defn- throw-if-max-exceeded [max-num-bytes bytes-in-flight] + (when (< max-num-bytes bytes-in-flight) + (throw (ex-info "Exceeded the max number of bytes" {:type ::max-bytes})))) + +(defn- limited-byte-output-stream + "Returns a `FilterOutputStream` that will throw an exception if more than `max-num-bytes` are written to + `output-stream`" + [max-num-bytes output-stream] + (let [bytes-so-far (atom 0)] + (proxy [java.io.FilterOutputStream] [output-stream] + (write + ([byte-or-byte-array] + (let [^java.io.OutputStream this this] + (if-let [^bytes byte-arr (and (bytes? byte-or-byte-array) + byte-or-byte-array)] + (do + (swap! bytes-so-far + (alength byte-arr)) + (throw-if-max-exceeded max-num-bytes @bytes-so-far) + (proxy-super write byte-arr)) + + (let [^byte b byte-or-byte-array] + (swap! bytes-so-far inc) + (throw-if-max-exceeded max-num-bytes @bytes-so-far) + (proxy-super write b))))) + ([byte-arr offset length] + (let [^java.io.OutputStream this this] + (swap! bytes-so-far + length) + (throw-if-max-exceeded max-num-bytes @bytes-so-far) + (proxy-super write byte-arr offset length))))))) + +(defn- compress-until-max + "Compresses `results` and returns a byte array. If more than `max-bytes` is written, `::exceeded-max-bytes` is + returned." + [max-bytes results] + (try + (let [bos (ByteArrayOutputStream.) + lbos (limited-byte-output-stream max-bytes bos)] + (with-open [buff-out (BufferedOutputStream. lbos) + gz-out (GZIPOutputStream. buff-out) + data-out (DataOutputStream. gz-out)] + (nippy/freeze-to-out! data-out results)) + (.toByteArray bos)) + (catch clojure.lang.ExceptionInfo e + (if (= ::max-bytes (:type (ex-data e))) + ::exceeded-max-bytes + (throw e))))) + (defn- save-results! "Save the RESULTS of query with QUERY-HASH, updating an existing QueryCache entry if one already exists, otherwise creating a new entry." [query-hash results] - (purge-old-cache-entries!) - (or (db/update-where! QueryCache {:query_hash query-hash} - :updated_at (du/new-sql-timestamp) - :results (models/compress results)) ; have to manually call these here since Toucan doesn't call type conversion fns for update-where! (yet) - (db/insert! QueryCache - :query_hash query-hash - :results results)) + ;; Explicitly compressing the results here rather than having Toucan compress it automatically. This allows us to + ;; get the size of the compressed output to decide whether or not to store it. + (let [max-bytes (* (public-settings/query-caching-max-kb) 1024) + compressed-results (compress-until-max max-bytes results)] + (if-not (= ::exceeded-max-bytes compressed-results) + (do + (purge-old-cache-entries!) + (or (db/update-where! QueryCache {:query_hash query-hash} + :updated_at (du/new-sql-timestamp) + :results compressed-results) + (db/insert! QueryCache + :query_hash query-hash + :results compressed-results))) + (log/info "Results are too large to cache." (u/emoji "😫")))) :ok) (def instance diff --git a/test/metabase/query_processor/middleware/cache_backend/db_test.clj b/test/metabase/query_processor/middleware/cache_backend/db_test.clj new file mode 100644 index 0000000000000000000000000000000000000000..17f16309fb95257464f9b6494b4fa776e2a9e0de --- /dev/null +++ b/test/metabase/query_processor/middleware/cache_backend/db_test.clj @@ -0,0 +1,16 @@ +(ns metabase.query-processor.middleware.cache-backend.db-test + (:require [expectations :refer :all] + [metabase.query-processor.middleware.cache-backend.db :as cache-db] + [metabase.test.util :as tu])) + +(defn- in-kb [x] + (* 1024 x)) + +;; We should successfully compress data smaller than the max and return the byte array +(expect + (bytes? (#'cache-db/compress-until-max (in-kb 10) (range 1 10)))) + +;; If the data is more than the max allowed, return `:exceeded-max-bytes` +(expect + ::cache-db/exceeded-max-bytes + (#'cache-db/compress-until-max 10 (repeat 10000 (range 1 10)))) diff --git a/test/metabase/query_processor/middleware/cache_test.clj b/test/metabase/query_processor/middleware/cache_test.clj index 905f92d8f293c7cd84c1a186579f51d3729dbf31..d6ed035f8a3ced53d8ecd6b46867fbee2260456c 100644 --- a/test/metabase/query_processor/middleware/cache_test.clj +++ b/test/metabase/query_processor/middleware/cache_test.clj @@ -54,31 +54,6 @@ (tu/with-temporary-setting-values [enable-query-caching true] (#'cache/is-cacheable? {:cache-ttl nil}))) - -;;; ------------------------------------- results-are-below-max-byte-threshold? -------------------------------------- - -(expect - (tu/with-temporary-setting-values [query-caching-max-kb 128] - (#'cache/results-are-below-max-byte-threshold? {:data {:rows [[1 "ABCDEF"] - [3 "GHIJKL"]]}}))) - -(expect - false - (tu/with-temporary-setting-values [query-caching-max-kb 1] - (#'cache/results-are-below-max-byte-threshold? {:data {:rows (repeat 500 [1 "ABCDEF"])}}))) - -;; check that `#'cache/results-are-below-max-byte-threshold?` is lazy and fails fast if the query is over the -;; threshold rather than serializing the entire thing -(expect - false - (let [lazy-seq-realized? (atom false)] - (tu/with-temporary-setting-values [query-caching-max-kb 1] - (#'cache/results-are-below-max-byte-threshold? {:data {:rows (lazy-cat (repeat 500 [1 "ABCDEF"]) - (do (reset! lazy-seq-realized? true) - [2 "GHIJKL"]))}}) - @lazy-seq-realized?))) - - ;;; ------------------------------------------ End-to-end middleware tests ------------------------------------------- ;; if there's nothing in the cache, cached results should *not* be returned