Skip to content
Snippets Groups Projects
Unverified Commit 9c0cda3a authored by Cam Saul's avatar Cam Saul Committed by GitHub
Browse files

Fix deadlocks! (#12126)

parent 1b6db6c5
No related branches found
No related tags found
No related merge requests found
Showing
with 165 additions and 116 deletions
......@@ -579,8 +579,8 @@
(let [ttl-seconds (Math/round (float (/ (* average-duration (public-settings/query-caching-ttl-ratio))
1000.0)))]
(when-not (zero? ttl-seconds)
(log/info (format "Question's average execution duration is %d ms; using 'magic' TTL of %d seconds"
average-duration ttl-seconds)
(log/info (trs "Question''s average execution duration is {0}; using ''magic'' TTL of {1}"
(u/format-milliseconds average-duration) (u/format-seconds ttl-seconds))
(u/emoji "💾"))
ttl-seconds))))
......
......@@ -69,15 +69,9 @@
(catch Throwable e
(log/error e (trs "Caught unexpected Exception in streaming response body"))
(write-error! os e)
nil)
(finally
(a/>!! finished-chan (if (a/poll! canceled-chan)
:canceled
:completed))
(a/close! finished-chan)
(a/close! canceled-chan))))
nil)))
(defn- do-f-async [f ^OutputStream os finished-chan]
(defn- do-f-async [^AsyncContext async-context f ^OutputStream os finished-chan]
{:pre [(some? os)]}
(let [canceled-chan (a/promise-chan)
task (bound-fn []
......@@ -85,9 +79,14 @@
(do-f* f os finished-chan canceled-chan)
(catch Throwable e
(log/error e (trs "bound-fn caught unexpected Exception"))
(a/>!! finished-chan :unexpected-error)
(a/>!! finished-chan :unexpected-error))
(finally
(a/>!! finished-chan (if (a/poll! canceled-chan)
:canceled
:completed))
(a/close! finished-chan)
(a/close! canceled-chan))))]
(a/close! canceled-chan)
(.complete async-context))))]
(.submit (thread-pool/thread-pool) ^Runnable task)
nil))
......@@ -122,9 +121,6 @@
(defn- respond
[{:keys [^HttpServletResponse response ^AsyncContext async-context request-map response-map]}
f {:keys [content-type], :as options} finished-chan]
(a/go
(a/<! finished-chan)
(.complete async-context))
(try
(.setStatus response 202)
(let [gzip? (should-gzip-response? request-map)
......@@ -133,7 +129,7 @@
(#'ring.servlet/set-headers response headers)
(let [output-stream-delay (output-stream-delay gzip? response)
delay-os (delay-output-stream output-stream-delay)]
(do-f-async f delay-os finished-chan)))
(do-f-async async-context f delay-os finished-chan)))
(catch Throwable e
(log/error e (trs "Unexpected exception in do-f-async"))
(try
......@@ -141,7 +137,8 @@
(catch Throwable e
(log/error e (trs "Unexpected exception writing error response"))))
(a/>!! finished-chan :unexpected-error)
(a/close! finished-chan))))
(a/close! finished-chan)
(.complete async-context))))
(declare render)
......
......@@ -20,7 +20,8 @@
[ring.util.codec :as codec]
[schema.core :as s]
[toucan.db :as db])
(:import liquibase.exception.LockException))
(:import com.mchange.v2.c3p0.PoolBackedDataSource
liquibase.exception.LockException))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | DB FILE & CONNECTION DETAILS |
......@@ -247,11 +248,19 @@
(when-let [max-pool-size (config/config-int :mb-application-db-max-connection-pool-size)]
{"maxPoolSize" max-pool-size})))
(defn- create-connection-pool! [jdbc-spec]
(defn- create-connection-pool!
"Create a connection pool for the application DB and set it as the default Toucan connection. This is normally called
once during start up; calling it a second time (e.g. from the REPL) will "
[jdbc-spec]
(db/set-default-quoting-style! (case (db-type)
:postgres :ansi
:h2 :h2
:mysql :mysql))
;; REPL usage only: kill the old pool if one exists
(u/ignore-exceptions
(when-let [^PoolBackedDataSource pool (:datasource (db/connection))]
(log/trace "Closing old application DB connection pool")
(.close pool)))
(log/debug (trs "Set default db connection with connection pool..."))
(db/set-default-db-connection! (connection-pool/connection-pool-spec jdbc-spec application-db-connection-pool-props))
(db/set-default-jdbc-options! {:read-columns db.jdbc-protocols/read-columns})
......
......@@ -14,6 +14,7 @@
[metabase.util.i18n :refer [trs]]
[toucan.db :as db])
(:import clojure.core.async.impl.channels.ManyToManyChannel
com.mchange.v2.c3p0.PoolBackedDataSource
metabase.async.streaming_response.StreamingResponse
org.eclipse.jetty.util.thread.QueuedThreadPool))
......@@ -49,12 +50,17 @@
(defn- stats []
(str
(let [^PoolBackedDataSource pool (:datasource (db/connection))]
(trs "App DB connections: {0}/{1}"
(.getNumBusyConnectionsAllUsers pool) (.getNumConnectionsAllUsers pool)))
" "
(when-let [^QueuedThreadPool pool (some-> (server/instance) .getThreadPool)]
(trs "Jetty threads: {0}/{1} ({2} idle, {3} queued) "
(trs "Jetty threads: {0}/{1} ({2} idle, {3} queued)"
(.getBusyThreads pool)
(.getMaxThreads pool)
(.getIdleThreads pool)
(.getQueueSize pool)))
" "
(trs "({0} total active threads)" (Thread/activeCount))
" "
(trs "Queries in flight: {0}" (streaming-response.thread-pool/active-thread-count))
......
......@@ -78,7 +78,7 @@
(defn- default-resultf [result context]
(if (nil? result)
(do
(log/error (ex-info (trs "Unexpected nil result") {}) (trs "Unexpected nil result"))
(log/error (ex-info (trs "Unexpected nil result") {}))
(recur false context))
(let [out-chan (context/out-chan context)]
(a/>!! out-chan result)
......@@ -87,7 +87,7 @@
(defn- default-timeoutf
[context]
(let [timeout (context/timeout context)]
(log/debug (trs "Query timed out after {0} ms, raising timeout exception." timeout))
(log/debug (trs "Query timed out after {0}, raising timeout exception." (u/format-milliseconds timeout)))
(context/raisef (ex-info (tru "Timed out after {0}." (u/format-milliseconds timeout))
{:status :timed-out
:type error-type/timed-out})
......
......@@ -25,7 +25,7 @@
[interface :as i]]
[metabase.query-processor.middleware.cache.impl :as impl]
[metabase.util.i18n :refer [trs]])
(:import java.io.InputStream))
(:import org.eclipse.jetty.io.EofException))
(comment backend.db/keep-me)
......@@ -87,7 +87,6 @@
(let [y (a/<! (a/thread
(try
(i/save-results! *backend* query-hash x)
:ok
(catch Throwable e
e))))]
(if (instance? Throwable y)
......@@ -134,27 +133,31 @@
([acc row]
(rf acc row))))
(defn- do-with-cached-results
(defn- maybe-reduce-cached-results
"Reduces cached results if there is a hit. Otherwise, returns `::miss` directly."
[query-hash max-age-seconds rff context]
(if *ignore-cached-results*
::miss
(do
(log/tracef "Looking for cached-results for query with hash %s younger than %s\n"
(pr-str (i/short-hex-hash query-hash)) (u/format-seconds max-age-seconds))
(i/cached-results *backend* query-hash max-age-seconds
(fn [^InputStream is]
(if (nil? is)
::miss
(impl/reducible-deserialized-results is
(fn
([_]
::miss)
([metadata reducible-rows]
(context/reducef (fn [metadata]
(add-cached-metadata-xform (rff metadata)))
context metadata reducible-rows))))))))))
(try
(or (when-not *ignore-cached-results*
(log/tracef "Looking for cached results for query with hash %s younger than %s\n"
(pr-str (i/short-hex-hash query-hash)) (u/format-seconds max-age-seconds))
(i/with-cached-results *backend* query-hash max-age-seconds [is]
(when is
(impl/with-reducible-deserialized-results [[metadata reducible-rows] is]
(when reducible-rows
(let [rff* (fn [metadata]
(add-cached-metadata-xform (rff metadata)))]
(log/tracef "Reducing cached rows...")
(context/reducef rff* context metadata reducible-rows)
(log/tracef "All cached rows reduced")
::ok))))))
::miss)
(catch EofException _
(log/debug (trs "Request is closed; no one to return cached results to"))
::canceled)
(catch Throwable e
(log/error e (trs "Error attempting to fetch cached results for query with hash {0}"
(i/short-hex-hash query-hash)))
::miss)))
;;; --------------------------------------------------- Middleware ---------------------------------------------------
......@@ -164,10 +167,10 @@
;; TODO - Query will already have `info.hash` if it's a userland query. I'm not 100% sure it will be the same hash,
;; because this is calculated after normalization, instead of before
(let [query-hash (qputil/query-hash query)
result (do-with-cached-results query-hash cache-ttl rff context)]
(if-not (= ::miss result)
result
result (maybe-reduce-cached-results query-hash cache-ttl rff context)]
(when (= result ::miss)
(let [start-time-ms (System/currentTimeMillis)]
(log/trace "Running query and saving cached results (if eligible)...")
(qp query
(fn [metadata]
(save-results-xform start-time-ms metadata query-hash (rff metadata)))
......
......@@ -6,8 +6,7 @@
[util :as u]]
[metabase.util.i18n :refer [trs tru]]
[taoensso.nippy :as nippy])
(:import [java.io BufferedInputStream BufferedOutputStream ByteArrayOutputStream DataInputStream DataOutputStream
EOFException FilterOutputStream InputStream OutputStream]
(:import [java.io BufferedInputStream BufferedOutputStream ByteArrayOutputStream DataInputStream DataOutputStream EOFException FilterOutputStream InputStream OutputStream]
[java.util.zip GZIPInputStream GZIPOutputStream]))
(defn- max-bytes-output-stream ^OutputStream [max-bytes ^OutputStream os]
......@@ -86,11 +85,11 @@
(a/>!! out-chan e)))))))
(defn serialize-async
"Create output streamings for serializing QP results. Returns a pair of core.async channels, `in-chan` and `out-chan`.
"Create output streams for serializing QP results. Returns a map of core.async channels, `in-chan` and `out-chan`.
Send all objects to be serialized to `in-chan`; then close it when finished; the result of `out-chan` will be the
serialized byte array (or an Exception, if one was thrown).
`out-chan` is closed automatically upon recieving a result; all chans and output streams are closed thereafter.
`out-chan` is closed automatically upon receiving a result; all chans and output streams are closed thereafter.
(let [{:keys [in-chan out-chan]} (serialize-async)]
(doseq [obj objects]
......@@ -131,25 +130,22 @@
acc
(recur (rf acc row)))))))))
(defn reducible-deserialized-results
"Take cached result bytes from `is` and call `respond` like
(respond metadata reducible-rows)
If cached results cannot be deserialized, calls
(respond nil)"
{:style/indent 1}
[^InputStream is respond]
(let [result (try
(with-open [is (DataInputStream. (GZIPInputStream. (BufferedInputStream. is)))]
(let [metadata (thaw! is)]
(if (= metadata ::eof)
::invalid
(respond metadata (reducible-rows is)))))
(catch Throwable e
(log/error e (trs "Error parsing serialized results"))
::invalid))]
(if (= result ::invalid)
(respond nil)
result)))
(defn do-reducible-deserialized-results
"Impl for `with-reducible-deserialized-results`."
[^InputStream is f]
(with-open [is (DataInputStream. (GZIPInputStream. (BufferedInputStream. is)))]
(let [metadata (thaw! is)]
(if (= metadata ::eof)
(f nil)
(f [metadata (reducible-rows is)])))))
(defmacro with-reducible-deserialized-results
"Fetches metadata and reducible rows from an InputStream `is` and executes body with them bound
(with-reducible-deserialized-results [[metadata reducible-rows] is]
...)
`metadata` and `reducible-rows` will be `nil` if the data fetched from the input stream is invalid, from an older
cache version, or otherwise unusable."
[[metadata-rows-binding is] & body]
`(do-reducible-deserialized-results ~is (fn [~metadata-rows-binding] ~@body)))
......@@ -8,7 +8,7 @@
[date-2 :as u.date]
[i18n :refer [trs]]]
[toucan.db :as db])
(:import java.sql.ResultSet
(:import [java.sql Connection PreparedStatement ResultSet Types]
javax.sql.DataSource))
(defn- ^DataSource datasource []
......@@ -20,32 +20,47 @@
[:second n])]
(u.date/add (t/offset-date-time) unit (- n))))
(defn- cached-results-query [query-hash max-age-seconds]
(hsql/format {:select [:results]
:from [QueryCache]
:where [:and
[:= :query_hash query-hash]
[:>= :updated_at (seconds-ago max-age-seconds)]]
:order-by [[:updated_at :desc]]
:limit 1}
:quoting (db/quoting-style)))
(def ^:private cached-results-query-sql
(delay (first (hsql/format {:select [:results]
:from [QueryCache]
:where [:and
[:= :query_hash (hsql/raw "?")]
[:>= :updated_at (hsql/raw "?")]]
:order-by [[:updated_at :desc]]
:limit 1}
:quoting (db/quoting-style)))))
(defn- prepare-statement
^PreparedStatement [^Connection conn query-hash max-age-seconds]
(let [stmt (.prepareStatement conn ^String @cached-results-query-sql
ResultSet/TYPE_FORWARD_ONLY
ResultSet/CONCUR_READ_ONLY
ResultSet/CLOSE_CURSORS_AT_COMMIT)]
(try
(doto stmt
(.setFetchDirection ResultSet/FETCH_FORWARD)
(.setBytes 1 query-hash)
(.setObject 2 (seconds-ago max-age-seconds) Types/TIMESTAMP_WITH_TIMEZONE)
(.setMaxRows 1))
(catch Throwable e
(log/error e (trs "Error preparing statement to fetch cached query results"))
(.close stmt)
(throw e)))))
(defn- cached-results [query-hash max-age-seconds respond]
(let [[sql _ t] (cached-results-query query-hash max-age-seconds)]
(with-open [conn (.getConnection (datasource))
stmt (doto (.prepareStatement conn sql
ResultSet/TYPE_FORWARD_ONLY
ResultSet/CONCUR_READ_ONLY
ResultSet/CLOSE_CURSORS_AT_COMMIT)
(.setFetchDirection ResultSet/FETCH_FORWARD)
(.setBytes 1 query-hash)
(.setObject 2 t java.sql.Types/TIMESTAMP_WITH_TIMEZONE)
(.setMaxRows 1))]
(with-open [rs (.executeQuery stmt)]
(if-not (.next rs)
(respond nil)
(with-open [is (.getBinaryStream rs 1)]
(respond is)))))))
(with-open [conn (.getConnection (datasource))
stmt (prepare-statement conn query-hash max-age-seconds)
rs (.executeQuery stmt)]
;; VERY IMPORTANT! Bind `*db-connection*` so it will get reused elsewhere for the duration of results reduction,
;; otherwise we can potentially end up deadlocking if we need to acquire another connection for one reason or
;; another, such as recording QueryExecutions
(binding [db/*db-connection* {:connection conn}]
(if-not (.next rs)
(respond nil)
(with-open [is (.getBinaryStream rs 1)]
(respond is))))))
(defn- purge-old-cache-entries!
"Delete any cache entries that are older than the global max age `max-cache-entry-age-seconds` (currently 3 months)."
......
......@@ -21,7 +21,7 @@
(with-open [is (...)]
(respond is)))
`max-age-seconds` may be floating-point.")
`max-age-seconds` may be floating-point. This method *must* return the result of `respond`.")
(save-results! [this ^bytes query-hash ^bytes results]
"Add a cache entry with the `results` of running query with byte array `query-hash`. This should replace any prior
......@@ -31,6 +31,18 @@
"Purge all cache entires older than `max-age-seconds`. Will be called periodically when this backend is in use.
`max-age-seconds` may be floating-point."))
(defmacro with-cached-results
"Macro version for consuming `cached-results` from a `backend`.
(with-cached-results backend query-hash max-age-seconds [is]
...)
InputStream `is` will be `nil` if no cached results were available."
{:style/indent 4}
[backend query-hash max-age-seconds [is-binding] & body]
`(cached-results ~backend ~query-hash ~max-age-seconds (fn [~(vary-meta is-binding assoc :tag 'java.io.InputStream)]
~@body)))
(defmulti cache-backend
"Return an instance of a cache backend, which is any object that implements `QueryProcessorCacheBackend`.
......
......@@ -35,15 +35,22 @@
(db/insert! QueryExecution (dissoc query-execution :json_query))))
(defn- save-query-execution!
"Save a `QueryExecution` row containing `execution-info`. This is done when a query is finished, so
regardless of whether results streaming is canceled, we want to continue the save; for this reason, we don't call
`future-cancel` if we get a message to `canceled-chan` the way we normally do."
"Save a `QueryExecution` row containing `execution-info`. Done asynchronously when a query is finished."
[execution-info]
(log/trace "Saving QueryExecution info")
(try
(save-query-execution!* (add-running-time execution-info))
(catch Throwable e
(log/error e (trs "Error saving query execution info")))))
(let [execution-info (add-running-time execution-info)]
;; 1. Asynchronously save QueryExecution, update query average execution time etc. using the Agent/pooledExecutor
;; pool, which is a fixed pool of size `nthreads + 2`. This way we don't spin up a ton of threads doing unimportant
;; background query execution saving (as `future` would do, which uses an unbounded thread pool by default)
;;
;; 2. This is on purpose! By *not* using `bound-fn` or `future`, any dynamic variables in play when the task is
;; submitted, such as `db/*connection*`, won't be in play when the task is actually executed. That way we won't
;; attempt to use closed DB connections
(.submit clojure.lang.Agent/pooledExecutor ^Runnable (fn []
(log/trace "Saving QueryExecution info")
(try
(save-query-execution!* execution-info)
(catch Throwable e
(log/error e (trs "Error saving query execution info"))))))))
(defn- save-successful-query-execution! [query-execution result-rows]
(save-query-execution! (assoc query-execution :result_rows result-rows)))
......
......@@ -3,7 +3,8 @@
[clojure.tools.logging :as log]
[metabase.async.util :as async.u]
[metabase.query-processor.context :as context]
[metabase.query-processor.context.default :as context.default]))
[metabase.query-processor.context.default :as context.default]
[metabase.util :as u]))
(defn- wire-up-context-channels!
"Wire up the core.async channels in a QP `context`."
......@@ -17,7 +18,7 @@
(a/go
(let [[val port] (a/alts! [out-chan (a/timeout timeout)] :priority true)]
(log/tracef "Port %s got %s"
(if (= port out-chan) "out-chan" (format "[timeout after %d ms]" timeout))
(if (= port out-chan) "out-chan" (format "[timeout after %s]" (u/format-milliseconds timeout)))
val)
(cond
(not= port out-chan) (context/timeoutf context)
......
......@@ -21,12 +21,10 @@
([^bytes bytea rff]
(with-open [bis (ByteArrayInputStream. bytea)]
(impl/reducible-deserialized-results bis (fn respond
([_] nil)
([metadata rows]
(let [rf (rff metadata)]
(reduce rf (rf) rows))))))))
(impl/with-reducible-deserialized-results [[metadata rows] bis]
(when rows
(let [rf (rff metadata)]
(reduce rf (rf) rows)))))))
(deftest e2e-test
(let [{:keys [in-chan out-chan]} (impl/serialize-async)]
......
......@@ -11,7 +11,8 @@
[metabase.query-processor.streaming :as qp.streaming]
[metabase.test.util :as tu]
[toucan.db :as db])
(:import [java.io BufferedInputStream BufferedOutputStream ByteArrayInputStream ByteArrayOutputStream InputStream InputStreamReader]))
(:import [java.io BufferedInputStream BufferedOutputStream ByteArrayInputStream ByteArrayOutputStream InputStream InputStreamReader]
javax.servlet.AsyncContext))
(defmulti ^:private parse-result
{:arglists '([export-format ^InputStream input-stream])}
......@@ -53,7 +54,11 @@
os (BufferedOutputStream. bos)]
(let [streaming-response (qp.streaming/streaming-response [context export-format]
(qp/process-query-async query (assoc context :timeout 5000)))]
(#'streaming-response/do-f-async (.f streaming-response) os (.donechan streaming-response))
(#'streaming-response/do-f-async (proxy [AsyncContext] []
(complete []))
(.f streaming-response)
os
(.donechan streaming-response))
(mt/wait-for-result (streaming-response/finished-chan streaming-response) 1000))
(let [bytea (.toByteArray bos)]
(with-open [is (BufferedInputStream. (ByteArrayInputStream. bytea))]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment