Skip to content
Snippets Groups Projects
Commit 0f2fd1d7 authored by Cam Saul's avatar Cam Saul
Browse files

Druid test fixes :wrench:

[ci druid]
parent 2186d013
No related merge requests found
......@@ -7,7 +7,9 @@
[driver :as driver]
[util :as u]]
[metabase.driver.druid.query-processor :as qp]
[metabase.util.ssh :as ssh]))
[metabase.util
[i18n :refer [trs tru]]
[ssh :as ssh]]))
(driver/register! :druid)
......@@ -33,10 +35,11 @@
(:body options) (update :body json/generate-string))
{:keys [status body]} (request-fn url options)]
(when (not= status 200)
(throw (Exception. (format "Error [%d]: %s" status body))))
(try (json/parse-string body keyword)
(catch Throwable _
(throw (Exception. (str "Failed to parse body: " body)))))))
(throw (Exception. (str (tru "Error [{0}]: {1}" status body)))))
(try
(json/parse-string body keyword)
(catch Throwable _
(throw (Exception. (str (tru "Failed to parse body: {0}" body))))))))
(def ^:private ^{:arglists '([url & {:as options}])} GET (partial do-request http/get))
(def ^:private ^{:arglists '([url & {:as options}])} POST (partial do-request http/post))
......@@ -66,7 +69,8 @@
(:errorMessage body) "\n"
"Error class:" (:errorClass body))))
(.getMessage e))]
(log/error (u/format-color 'red "Error running query:\n%s" message))
(log/error (u/format-color 'red (trs "Error running query:")
"\n" message))
;; Re-throw a new exception with `message` set to the extracted message
(throw (Exception. message e)))))))
......@@ -78,21 +82,21 @@
;; Run the query in a future so that this thread will be interrupted, not the thread running the query (which is
;; not interrupt aware)
@query-fut
(catch InterruptedException interrupted-ex
(catch InterruptedException e
;; The future has been cancelled, if we ahve a query id, try to cancel the query
(if-not query-id
(log/warn interrupted-ex "Client closed connection, no queryId found, can't cancel query")
(log/warn e (trs "Client closed connection, no queryId found, can't cancel query"))
(ssh/with-ssh-tunnel [details-with-tunnel details]
(log/warnf "Client closed connection, canceling Druid queryId '%s'" query-id)
(log/warn (trs "Client closed connection, canceling Druid queryId {0}" query-id))
(try
;; If we can't cancel the query, we don't want to hide the original exception, attempt to cancel, but if
;; we can't, we should rethrow the InterruptedException, not an exception from the cancellation
(DELETE (details->url details-with-tunnel (format "/druid/v2/%s" query-id)))
(catch Exception cancel-ex
(log/warnf cancel-ex "Failed to cancel Druid query with queryId" query-id))
(catch Exception cancel-e
(log/warn cancel-e (trs "Failed to cancel Druid query with queryId {0}" query-id)))
(finally
;; Propogate the exception, will cause any other catch/finally clauses to fire
(throw interrupted-ex)))))))))
(throw e)))))))))
;;; ### Sync
......
......@@ -344,16 +344,29 @@
;; everything works correctly together
(datasets/expect-with-driver :druid
::tu/success
(tu/call-with-paused-query
(fn [query-thunk called-query? called-cancel? pause-query]
(future
(try
;; stub out the query and delete functions so that we know when one is called vs. the other
(with-redefs [druid/do-query (fn [details query] (deliver called-query? true) @pause-query)
druid/DELETE (fn [url] (deliver called-cancel? true))]
(query-thunk))
(catch Throwable e
(println "Error running query:" e)))))))
;; the `call-with-paused-query` helper is kind of wack and we need to redefine functions for the duration of the
;; test, and redefine them to operate on things that don't get bound unitl `call-with-paused-query` calls its fn
;;
;; that's why we're doing things this way
(let [promises (atom nil)]
(with-redefs [druid/do-query (fn [details query]
(deliver (:called-query? @promises) true)
@(:pause-query @promises))
druid/DELETE (fn [url]
(deliver (:called-cancel? @promises) true))]
(tu/call-with-paused-query
(fn [query-thunk called-query? called-cancel? pause-query]
(reset! promises {:called-query? called-query?
:called-cancel? called-cancel?
:pause-query pause-query})
(future
(try
(data/run-mbql-query checkins
{:aggregation [[:count]]})
(query-thunk)
(catch Throwable e
(println "Error running query:" e)
(throw e)))))))))
;; Make sure Druid cols + columns come back in the same order and that that order is the expected MBQL columns order
;; (#9294)
......
......@@ -71,7 +71,7 @@
(let [result (apply f args)]
(cond
(nil? result)
(log/warn "Warning: {0} returned `nil`" f)
(log/warn (trs "Warning: {0} returned `nil`" f))
(not (a/>!! out-chan result))
(log/error (trs "Unexpected error writing result to output channel: already closed"))))
......
......@@ -25,8 +25,6 @@
((async-wait/wait-for-permit qp) query respond respond canceled-chan))
(f {:result-chan result-chan, :semaphore-chan semaphore-chan, :canceled-chan canceled-chan})))))
;; QP should run if semaphore-chan gets a permit. Permit should be closed after QP finishes.
(expect
{:result ::result, :permit-taken? true, :permit-closed? true}
......
......@@ -6,7 +6,7 @@
[metabase.test.util :as tu]
[metabase.test.util.log :as tu.log]))
(deftype FakePreparedStatement [called-cancel?]
(defrecord ^:private FakePreparedStatement [called-cancel?]
java.sql.PreparedStatement
(closeOnCompletion [_]) ; no-op
(cancel [_] (deliver called-cancel? true))
......@@ -59,7 +59,7 @@
:on-fake-prepared-statement
(fn []
(deliver called-query? true)
(deref pause-query)))]
@pause-query))]
(query-thunk))
(catch Throwable e
(throw e)))))))))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment