Skip to content
Snippets Groups Projects
Commit afbae085 authored by Ryan Senior's avatar Ryan Senior
Browse files

Switch to a stream based cache serialization/compression

This commit switches to a stream based nippy
compression/serialization. This has the benefit of being able to stop
serialization once the threshold for cache size has been reached.

Fixes #7479
parent b66a905a
No related branches found
No related tags found
No related merge requests found
......@@ -12,7 +12,9 @@
[toucan
[models :as models]
[util :as toucan-util]])
(:import java.sql.Blob))
(:import [java.io BufferedInputStream ByteArrayInputStream DataInputStream]
java.sql.Blob
java.util.zip.GZIPInputStream))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | Toucan Extensions |
......@@ -106,22 +108,19 @@
:in encryption/maybe-encrypt
:out (comp encryption/maybe-decrypt u/jdbc-clob->str))
(defn compress
"Compress OBJ, returning a byte array."
[obj]
(if (bytes? obj)
obj
(nippy/freeze obj {:compressor nippy/snappy-compressor})))
(defn decompress
"Decompress COMPRESSED-BYTES."
[compressed-bytes]
(if (instance? Blob compressed-bytes)
(recur (.getBytes ^Blob compressed-bytes 0 (.length ^Blob compressed-bytes)))
(nippy/thaw compressed-bytes {:compressor nippy/snappy-compressor})))
(with-open [bis (ByteArrayInputStream. compressed-bytes)
bif (BufferedInputStream. bis)
gz-in (GZIPInputStream. bif)
data-in (DataInputStream. gz-in)]
(nippy/thaw-from-in! data-in))))
(models/add-type! :compressed
:in compress
:in identity
:out decompress)
(defn- validate-cron-string [s]
......
(ns metabase.query-processor.middleware.cache-backend.db
(:require [clojure.tools.logging :as log]
[metabase.models
[interface :as models]
[query-cache :refer [QueryCache]]]
[metabase.public-settings :as public-settings]
[metabase
[public-settings :as public-settings]
[util :as u]]
[metabase.models.query-cache :refer [QueryCache]]
[metabase.query-processor.middleware.cache-backend.interface :as i]
[metabase.util :as u]
[metabase.util.date :as du]
[toucan.db :as db]))
[taoensso.nippy :as nippy]
[toucan.db :as db])
(:import [java.io BufferedOutputStream ByteArrayOutputStream DataOutputStream]
java.util.zip.GZIPOutputStream))
(defn- cached-results
"Return cached results for QUERY-HASH if they exist and are newer than MAX-AGE-SECONDS."
......@@ -25,10 +27,52 @@
:updated_at [:<= (du/->Timestamp (- (System/currentTimeMillis)
(* 1000 (public-settings/query-caching-max-ttl))))]))
(defn- throw-if-max-exceeded [max-num-bytes bytes-in-flight]
(when (< max-num-bytes bytes-in-flight)
(throw (ex-info "Exceeded the max number of bytes" {:type ::max-bytes}))))
(defn- results-below-max-threshold? [compressed-result-bytes]
(let [max-bytes (* (public-settings/query-caching-max-kb) 1024)]
(> max-bytes (alength compressed-result-bytes))))
(defn- limited-byte-output-stream
"Returns a `FilterOutputStream` that will throw an exception if more than `max-num-bytes` are written to
`output-stream`"
[max-num-bytes output-stream]
(let [bytes-so-far (atom 0)]
(proxy [java.io.FilterOutputStream] [output-stream]
(write
([byte-or-byte-array]
(let [^java.io.OutputStream this this]
(if-let [^bytes byte-arr (and (bytes? byte-or-byte-array)
byte-or-byte-array)]
(do
(swap! bytes-so-far + (alength byte-arr))
(throw-if-max-exceeded max-num-bytes @bytes-so-far)
(proxy-super write byte-arr))
(let [^byte b byte-or-byte-array]
(swap! bytes-so-far inc)
(throw-if-max-exceeded max-num-bytes @bytes-so-far)
(proxy-super write b)))))
([byte-arr offset length]
(let [^java.io.OutputStream this this]
(swap! bytes-so-far + length)
(throw-if-max-exceeded max-num-bytes @bytes-so-far)
(proxy-super write byte-arr offset length)))))))
(defn- compress-until-max
"Compresses `results` and returns a byte array. If more than `max-bytes` is written, `::exceeded-max-bytes` is
returned."
[max-bytes results]
(try
(let [bos (ByteArrayOutputStream.)
lbos (limited-byte-output-stream max-bytes bos)]
(with-open [buff-out (BufferedOutputStream. lbos)
gz-out (GZIPOutputStream. buff-out)
data-out (DataOutputStream. gz-out)]
(nippy/freeze-to-out! data-out results))
(.toByteArray bos))
(catch clojure.lang.ExceptionInfo e
(if (= ::max-bytes (:type (ex-data e)))
::exceeded-max-bytes
(throw e)))))
(defn- save-results!
"Save the RESULTS of query with QUERY-HASH, updating an existing QueryCache entry
......@@ -36,8 +80,9 @@
[query-hash results]
;; Explicitly compressing the results here rather than having Toucan compress it automatically. This allows us to
;; get the size of the compressed output to decide whether or not to store it.
(let [compressed-results (models/compress results)]
(if (results-below-max-threshold? compressed-results)
(let [max-bytes (* (public-settings/query-caching-max-kb) 1024)
compressed-results (compress-until-max max-bytes results)]
(if-not (= ::exceeded-max-bytes compressed-results)
(do
(purge-old-cache-entries!)
(or (db/update-where! QueryCache {:query_hash query-hash}
......
......@@ -6,11 +6,11 @@
(defn- in-kb [x]
(* 1024 x))
;; We should successfully compress data smaller than the max and return the byte array
(expect
(tu/with-temporary-setting-values [query-caching-max-kb 128]
(#'cache-db/results-below-max-threshold? (byte-array (in-kb 100)))))
(bytes? (#'cache-db/compress-until-max (in-kb 10) (range 1 10))))
;; If the data is more than the max allowed, return `:exceeded-max-bytes`
(expect
false
(tu/with-temporary-setting-values [query-caching-max-kb 1]
(#'cache-db/results-below-max-threshold? (byte-array (in-kb 2)))))
::cache-db/exceeded-max-bytes
(#'cache-db/compress-until-max 10 (repeat 10000 (range 1 10))))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment