Skip to content
Snippets Groups Projects
Unverified Commit 3aeac641 authored by Jeff Evans's avatar Jeff Evans Committed by GitHub
Browse files

Support query cancellation in new BigQuery driver (#17959)

Support query cancellation in new BigQuery driver

Remove now unused 60 second timeout

Remove a bunch of unneeded code (`with-finished-response`) from the legacy driver

Add support for canceling query, which stops pagination of results

Add test to confirm pagination stops upon cancellation

Add test to confirm that max rows enforcement via middleware prevents unnecessary pages from being fetched
parent e157821d
No related branches found
No related tags found
No related merge requests found
(ns metabase.driver.bigquery-cloud-sdk
(:require [clojure.set :as set]
(:require [clojure.core.async :as a]
[clojure.set :as set]
[clojure.string :as str]
[clojure.tools.logging :as log]
[medley.core :as m]
......@@ -7,6 +8,7 @@
[metabase.driver.bigquery-cloud-sdk.common :as bigquery.common]
[metabase.driver.bigquery-cloud-sdk.params :as bigquery.params]
[metabase.driver.bigquery-cloud-sdk.query-processor :as bigquery.qp]
[metabase.query-processor.context :as context]
[metabase.query-processor.error-type :as error-type]
[metabase.query-processor.store :as qp.store]
[metabase.query-processor.timezone :as qp.timezone]
......@@ -17,9 +19,8 @@
[schema.core :as s])
(:import com.google.auth.oauth2.ServiceAccountCredentials
[com.google.cloud.bigquery BigQuery BigQuery$DatasetOption BigQuery$JobOption BigQuery$TableListOption
BigQuery$TableOption BigQueryException BigQueryOptions DatasetId EmptyTableResult
Field Field$Mode FieldValue FieldValueList QueryJobConfiguration Schema Table
TableId TableResult]
BigQuery$TableOption BigQueryException BigQueryOptions DatasetId Field Field$Mode FieldValue
FieldValueList QueryJobConfiguration Schema Table TableId TableResult]
java.io.ByteArrayInputStream
java.util.Collections))
......@@ -38,8 +39,7 @@
(defn- database->service-account-credential
"Returns a `ServiceAccountCredentials` (not scoped) for the given DB, from its service account JSON."
^ServiceAccountCredentials [{{:keys [^String service-account-json]} :details
:as db}]
^ServiceAccountCredentials [{{:keys [^String service-account-json]} :details, :as db}]
{:pre [(map? db) (seq service-account-json)]}
(ServiceAccountCredentials/fromStream (ByteArrayInputStream. (.getBytes service-account-json))))
......@@ -136,82 +136,51 @@
;;; | Running Queries |
;;; +----------------------------------------------------------------------------------------------------------------+
(def ^:private ^:dynamic ^Integer *query-timeout-seconds* 60)
(def ^:private ^:dynamic ^Long *max-results-per-page*
"Maximum number of rows to return per page in a query."
20000)
(def ^:private ^:dynamic ^Long *page-size*
"Maximum number of rows to return per page in a query. Leave unset (i.e. falling to the library default) by default,
but override for testing."
nil)
(def ^:private ^:dynamic *page-callback*
"Callback to execute when a new page is retrieved, used for testing"
nil)
(defprotocol ^:private GetJobComplete
"A Clojure protocol for the .getJobComplete method on disparate Google BigQuery results"
(^:private job-complete? [this] "Call .getJobComplete on a BigQuery API response"))
(extend-protocol GetJobComplete
com.google.api.services.bigquery.model.QueryResponse
(job-complete? [this] (.getJobComplete ^com.google.api.services.bigquery.model.QueryResponse this))
com.google.api.services.bigquery.model.GetQueryResultsResponse
(job-complete? [this] (.getJobComplete ^com.google.api.services.bigquery.model.GetQueryResultsResponse this))
TableResult
(job-complete? [_] true)
EmptyTableResult
(job-complete? [_] true))
(defn do-with-finished-response
"Impl for `with-finished-response`."
{:style/indent 1}
[response f]
;; 99% of the time by the time this is called `.getJobComplete` will return `true`. On the off chance it doesn't,
;; wait a few seconds for the job to finish.
(loop [remaining-timeout (double *query-timeout-seconds*)]
(cond
(job-complete? response)
(f response)
(pos? remaining-timeout)
(do
(Thread/sleep 250)
(recur (- remaining-timeout 0.25)))
:else
(throw (ex-info "Query timed out." (into {} response))))))
(defmacro with-finished-response
"Exeecute `body` with after waiting for `response` to complete. Throws exception if response does not complete before
`query-timeout-seconds`.
(with-finished-response [response (execute-bigquery ...)]
...)"
[[response-binding response] & body]
`(do-with-finished-response
~response
(fn [~response-binding]
~@body)))
(defn- throw-invalid-query [e sql parameters]
(throw (ex-info (tru "Error executing query")
{:type error-type/invalid-query, :sql sql, :parameters parameters}
e)))
(defn- ^TableResult execute-bigquery
[^BigQuery client ^String sql parameters]
[^BigQuery client ^String sql parameters cancel-chan cancel-requested?]
{:pre [client (not (str/blank? sql))]}
(try
(let [request (doto (QueryJobConfiguration/newBuilder sql)
(.setJobTimeoutMs (if (> *query-timeout-seconds* 0)
(* *query-timeout-seconds* 1000)
nil))
;; if the query contains a `#legacySQL` directive then use legacy SQL instead of standard SQL
(.setUseLegacySql (str/includes? (str/lower-case sql) "#legacysql"))
(bigquery.params/set-parameters! parameters)
(.setMaxResults *max-results-per-page*))]
(.query client (.build request) (u/varargs BigQuery$JobOption)))
(let [request (doto (QueryJobConfiguration/newBuilder sql)
;; if the query contains a `#legacySQL` directive then use legacy SQL instead of standard SQL
(.setUseLegacySql (str/includes? (str/lower-case sql) "#legacysql"))
(bigquery.params/set-parameters! parameters)
;; .setMaxResults is very misleading; it's actually the page size, and it only takes
;; effect for RPC (a.k.a. "fast") calls
;; there is no equivalent of .setMaxRows on a JDBC Statement; we rely on our middleware to stop
;; realizing more rows as per the maximum result size
(.setMaxResults *page-size*))
;; as long as we don't set certain additional QueryJobConfiguration options, our queries *should* always be
;; following the fast query path (i.e. RPC)
;; check out com.google.cloud.bigquery.QueryRequestInfo.isFastQuerySupported for full details
res-fut (future (.query client (.build request) (u/varargs BigQuery$JobOption)))]
(when cancel-chan
(future ; this needs to run in a separate thread, because the <!! operation blocks forever
(when (a/<!! cancel-chan)
(log/debugf "Received a message on the cancel channel; attempting to stop the BigQuery query execution")
(reset! cancel-requested? true) ; signal the page iteration fn to stop
(if-not (or (future-cancelled? res-fut) (future-done? res-fut))
;; somehow, even the FIRST page hasn't come back yet (i.e. the .query call above), so cancel the future to
;; interrupt the thread waiting on that response to come back
;; unfortunately, with this particular overload of .query, we have no access to (nor the ability to control)
;; the jobId, so we have no way to use the BigQuery client to cancel any job that might be running
(future-cancel res-fut)
(if (future-done? res-fut) ; canceled received after it was finished; may as well return it
@res-fut)))))
@res-fut)
(catch BigQueryException e
(if (.isRetryable e)
(throw (ex-info (tru "BigQueryException executing query")
......@@ -222,18 +191,32 @@
(throw-invalid-query e sql parameters))))
(defn- ^TableResult execute-bigquery-on-db
[database sql parameters]
[database sql parameters cancel-chan cancel-requested?]
(execute-bigquery
(database->client database)
sql
parameters))
parameters
cancel-chan
cancel-requested?))
(defn- fetch-page [^TableResult response cancel-requested?]
(when response
(when *page-callback*
(*page-callback*))
(lazy-cat
(.getValues response)
(when (some? (.getNextPageToken response))
(if @cancel-requested?
(do (log/debug "Cancellation requested; terminating fetching of BigQuery pages")
[])
(fetch-page (.getNextPage response) cancel-requested?))))))
(defn- post-process-native
"Parse results of a BigQuery query. `respond` is the same function passed to
`metabase.driver/execute-reducible-query`, and has the signature
(respond results-metadata rows)"
[database respond ^TableResult resp]
[database respond ^TableResult resp cancel-requested?]
(let [^Schema schema
(.getSchema resp)
......@@ -252,29 +235,30 @@
(dissoc :database-type :database-position)))]
(respond
{:cols columns}
(letfn [(fetch-page [^TableResult response]
(when response
(when *page-callback*
(*page-callback*))
(lazy-cat
(.getValues response)
(when (some? (.getNextPageToken response))
(fetch-page (.getNextPage response))))))]
(for [^FieldValueList row (fetch-page resp)]
(for [[^FieldValue cell, parser] (partition 2 (interleave row parsers))]
(when-let [v (.getValue cell)]
;; There is a weird error where everything that *should* be NULL comes back as an Object.
;; See https://jira.talendforge.org/browse/TBD-1592
;; Everything else comes back as a String luckily so we can proceed normally.
(when-not (= (class v) Object)
(parser v)))))))))
(defn- process-native* [respond database sql parameters]
(for [^FieldValueList row (fetch-page resp cancel-requested?)]
(for [[^FieldValue cell, parser] (partition 2 (interleave row parsers))]
(when-let [v (.getValue cell)]
;; There is a weird error where everything that *should* be NULL comes back as an Object.
;; See https://jira.talendforge.org/browse/TBD-1592
;; Everything else comes back as a String luckily so we can proceed normally.
(when-not (= (class v) Object)
(parser v))))))))
(defn- process-native* [respond database sql parameters cancel-chan]
{:pre [(map? database) (map? (:details database))]}
;; automatically retry the query if it times out or otherwise fails. This is on top of the auto-retry added by
;; `execute`
(letfn [(thunk []
(post-process-native database respond (execute-bigquery-on-db database sql parameters)))]
(let [cancel-requested? (atom false)
thunk (fn []
(post-process-native database
respond
(execute-bigquery-on-db
database
sql
parameters
cancel-chan
cancel-requested?)
cancel-requested?))]
(try
(thunk)
(catch Throwable e
......@@ -289,16 +273,14 @@
"UTC"))
(defmethod driver/execute-reducible-query :bigquery-cloud-sdk
;; TODO - it doesn't actually cancel queries the way we'd expect
[_ {{sql :query, :keys [params]} :native, :as outer-query} _ respond]
[_ {{sql :query, :keys [params]} :native, :as outer-query} context respond]
(let [database (qp.store/database)]
(binding [bigquery.common/*bigquery-timezone-id* (effective-query-timezone-id database)]
(log/tracef "Running BigQuery query in %s timezone" bigquery.common/*bigquery-timezone-id*)
(let [sql (if (get-in database [:details :include-user-id-and-hash] true)
(str "-- " (qputil/query->remark :bigquery-cloud-sdk outer-query) "\n" sql)
sql)]
(process-native* respond database sql params)))))
(process-native* respond database sql params (context/canceled-chan context))))))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | Other Driver Method Impls |
......
......@@ -191,7 +191,7 @@
;; if I run a BigQuery query, does it get a remark added to it?
(defn- query->native [query]
(let [native-query (atom nil)]
(with-redefs [bigquery/process-native* (fn [_ _ sql _]
(with-redefs [bigquery/process-native* (fn [_ _ sql _ _]
(reset! native-query sql)
(throw (Exception. "Done.")))]
(u/ignore-exceptions
......
(ns metabase.driver.bigquery-cloud-sdk-test
(:require [clojure.test :refer :all]
(:require [clojure.core.async :as a]
[clojure.test :refer :all]
[clojure.tools.logging :as log]
[metabase.db.metadata-queries :as metadata-queries]
[metabase.driver :as driver]
[metabase.driver.bigquery-cloud-sdk :as bigquery]
......@@ -47,12 +49,8 @@
(testing "with pagination"
(let [pages-retrieved (atom 0)
page-callback (fn [] (swap! pages-retrieved inc))]
(with-bindings {#'bigquery/*max-results-per-page* 25
#'bigquery/*page-callback* page-callback
;; for this test, set timeout to 0 to prevent setting it
;; so that the "fast" query path can be used (so that the max-results-per-page actually takes
;; effect); see com.google.cloud.bigquery.QueryRequestInfo.isFastQuerySupported
#'bigquery/*query-timeout-seconds* 0}
(with-bindings {#'bigquery/*page-size* 25
#'bigquery/*page-callback* page-callback}
(let [actual (->> (metadata-queries/table-rows-sample (Table (mt/id :venues))
[(Field (mt/id :venues :id))
(Field (mt/id :venues :name))]
......@@ -222,17 +220,13 @@
(deftest return-errors-test
(mt/test-driver :bigquery-cloud-sdk
(testing "If a Query fails, we should return the error right away (#14918)"
(let [before-ms (System/currentTimeMillis)]
(is (thrown-with-msg?
clojure.lang.ExceptionInfo
#"Error executing query"
(qp/process-query
{:database (mt/id)
:type :native
:native {:query "SELECT abc FROM 123;"}})))
(testing "Should return the error *before* the query timeout"
(let [duration-ms (- (System/currentTimeMillis) before-ms)]
(is (< duration-ms (u/seconds->ms @#'bigquery/*query-timeout-seconds*)))))))))
(is (thrown-with-msg?
clojure.lang.ExceptionInfo
#"Error executing query"
(qp/process-query
{:database (mt/id)
:type :native
:native {:query "SELECT abc FROM 123;"}}))))))
(deftest project-id-override-test
(mt/test-driver :bigquery-cloud-sdk
......@@ -336,13 +330,43 @@
(let [fake-execute-called (atom false)
orig-fn @#'bigquery/execute-bigquery]
(testing "Retry functionality works as expected"
(with-redefs [bigquery/execute-bigquery (fn [^BigQuery client ^String sql parameters]
(with-redefs [bigquery/execute-bigquery (fn [^BigQuery client ^String sql parameters _ _]
(if-not @fake-execute-called
(do (reset! fake-execute-called true)
;; simulate a transient error being thrown
(throw (ex-info "Transient error" {:retryable? true})))
(orig-fn client sql parameters)))]
(orig-fn client sql parameters nil nil)))]
;; run any other test that requires a successful query execution
(table-rows-sample-test)
;; make sure that the fake exception was thrown, and thus the query execution was retried
(is (true? @fake-execute-called)))))))
(deftest query-cancel-test
(mt/test-driver :bigquery-cloud-sdk
(testing "BigQuery queries can be canceled successfully"
(mt/with-open-channels [canceled-chan (a/promise-chan)]
(binding [bigquery/*page-size* 1000 ; set a relatively small pageSize
bigquery/*page-callback* (fn []
(log/debug "*page-callback* called, sending cancel message")
(a/>!! canceled-chan ::cancel))]
(mt/dataset sample-dataset
(let [rows (mt/rows (mt/process-query (mt/query orders) {:canceled-chan canceled-chan}))
row-count (count rows)]
(log/debugf "Loaded %d rows before BigQuery query was canceled" row-count)
(testing "Somewhere between 0 and the size of the orders table rows were loaded before cancellation"
(is (< 0 row-count 10000))))))))))
(deftest global-max-rows-test
(mt/test-driver :bigquery-cloud-sdk
(testing "The limit middleware prevents us from fetching more pages than are necessary to fulfill query max-rows"
(mt/with-open-channels [canceled-chan (a/promise-chan)]
(let [page-size 100
max-rows 1000
num-page-callbacks (atom 0)]
(binding [bigquery/*page-size* page-size
bigquery/*page-callback* (fn []
(swap! num-page-callbacks inc))]
(mt/dataset sample-dataset
(let [rows (mt/rows (mt/process-query (mt/query orders {:query {:limit max-rows}})))]
(is (= max-rows (count rows)))
(is (= (/ max-rows page-size) @num-page-callbacks))))))))))
......@@ -86,8 +86,7 @@
(let [sql (apply format format-string args)]
(printf "[BigQuery] %s\n" sql)
(flush)
(bigquery/with-finished-response [response (#'bigquery/execute-bigquery-on-db (data/db) sql nil)]
response))))
(#'bigquery/execute-bigquery-on-db (data/db) sql nil nil nil))))
(def ^:private valid-field-types
#{:BOOLEAN :DATE :DATETIME :FLOAT :INTEGER :NUMERIC :RECORD :STRING :TIME :TIMESTAMP})
......@@ -124,8 +123,8 @@
respond (fn [_ rows]
(ffirst rows))
client (bigquery)
^TableResult query-response (#'bigquery/execute-bigquery client sql [])]
(#'bigquery/post-process-native (test-db-details) respond query-response)))
^TableResult query-response (#'bigquery/execute-bigquery client sql [] nil nil)]
(#'bigquery/post-process-native (test-db-details) respond query-response (atom false))))
(defprotocol ^:private Insertable
(^:private ->insertable [this]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment