diff --git a/modules/drivers/bigquery-cloud-sdk/src/metabase/driver/bigquery_cloud_sdk.clj b/modules/drivers/bigquery-cloud-sdk/src/metabase/driver/bigquery_cloud_sdk.clj index 8e8c6692132d3d5ce86a1da2160b0f989b59279e..75330103412ce30f651f9e3c780fe661e82d93d7 100644 --- a/modules/drivers/bigquery-cloud-sdk/src/metabase/driver/bigquery_cloud_sdk.clj +++ b/modules/drivers/bigquery-cloud-sdk/src/metabase/driver/bigquery_cloud_sdk.clj @@ -1,5 +1,6 @@ (ns metabase.driver.bigquery-cloud-sdk - (:require [clojure.set :as set] + (:require [clojure.core.async :as a] + [clojure.set :as set] [clojure.string :as str] [clojure.tools.logging :as log] [medley.core :as m] @@ -7,6 +8,7 @@ [metabase.driver.bigquery-cloud-sdk.common :as bigquery.common] [metabase.driver.bigquery-cloud-sdk.params :as bigquery.params] [metabase.driver.bigquery-cloud-sdk.query-processor :as bigquery.qp] + [metabase.query-processor.context :as context] [metabase.query-processor.error-type :as error-type] [metabase.query-processor.store :as qp.store] [metabase.query-processor.timezone :as qp.timezone] @@ -17,9 +19,8 @@ [schema.core :as s]) (:import com.google.auth.oauth2.ServiceAccountCredentials [com.google.cloud.bigquery BigQuery BigQuery$DatasetOption BigQuery$JobOption BigQuery$TableListOption - BigQuery$TableOption BigQueryException BigQueryOptions DatasetId EmptyTableResult - Field Field$Mode FieldValue FieldValueList QueryJobConfiguration Schema Table - TableId TableResult] + BigQuery$TableOption BigQueryException BigQueryOptions DatasetId Field Field$Mode FieldValue + FieldValueList QueryJobConfiguration Schema Table TableId TableResult] java.io.ByteArrayInputStream java.util.Collections)) @@ -38,8 +39,7 @@ (defn- database->service-account-credential "Returns a `ServiceAccountCredentials` (not scoped) for the given DB, from its service account JSON." - ^ServiceAccountCredentials [{{:keys [^String service-account-json]} :details - :as db}] + ^ServiceAccountCredentials [{{:keys [^String service-account-json]} :details, :as db}] {:pre [(map? db) (seq service-account-json)]} (ServiceAccountCredentials/fromStream (ByteArrayInputStream. (.getBytes service-account-json)))) @@ -136,82 +136,51 @@ ;;; | Running Queries | ;;; +----------------------------------------------------------------------------------------------------------------+ -(def ^:private ^:dynamic ^Integer *query-timeout-seconds* 60) - -(def ^:private ^:dynamic ^Long *max-results-per-page* - "Maximum number of rows to return per page in a query." - 20000) +(def ^:private ^:dynamic ^Long *page-size* + "Maximum number of rows to return per page in a query. Leave unset (i.e. falling to the library default) by default, + but override for testing." + nil) (def ^:private ^:dynamic *page-callback* "Callback to execute when a new page is retrieved, used for testing" nil) -(defprotocol ^:private GetJobComplete - "A Clojure protocol for the .getJobComplete method on disparate Google BigQuery results" - (^:private job-complete? [this] "Call .getJobComplete on a BigQuery API response")) - -(extend-protocol GetJobComplete - com.google.api.services.bigquery.model.QueryResponse - (job-complete? [this] (.getJobComplete ^com.google.api.services.bigquery.model.QueryResponse this)) - - com.google.api.services.bigquery.model.GetQueryResultsResponse - (job-complete? [this] (.getJobComplete ^com.google.api.services.bigquery.model.GetQueryResultsResponse this)) - - TableResult - (job-complete? [_] true) - - EmptyTableResult - (job-complete? [_] true)) - -(defn do-with-finished-response - "Impl for `with-finished-response`." - {:style/indent 1} - [response f] - ;; 99% of the time by the time this is called `.getJobComplete` will return `true`. On the off chance it doesn't, - ;; wait a few seconds for the job to finish. - (loop [remaining-timeout (double *query-timeout-seconds*)] - (cond - (job-complete? response) - (f response) - - (pos? remaining-timeout) - (do - (Thread/sleep 250) - (recur (- remaining-timeout 0.25))) - - :else - (throw (ex-info "Query timed out." (into {} response)))))) - -(defmacro with-finished-response - "Exeecute `body` with after waiting for `response` to complete. Throws exception if response does not complete before - `query-timeout-seconds`. - - (with-finished-response [response (execute-bigquery ...)] - ...)" - [[response-binding response] & body] - `(do-with-finished-response - ~response - (fn [~response-binding] - ~@body))) - (defn- throw-invalid-query [e sql parameters] (throw (ex-info (tru "Error executing query") {:type error-type/invalid-query, :sql sql, :parameters parameters} e))) (defn- ^TableResult execute-bigquery - [^BigQuery client ^String sql parameters] + [^BigQuery client ^String sql parameters cancel-chan cancel-requested?] {:pre [client (not (str/blank? sql))]} (try - (let [request (doto (QueryJobConfiguration/newBuilder sql) - (.setJobTimeoutMs (if (> *query-timeout-seconds* 0) - (* *query-timeout-seconds* 1000) - nil)) - ;; if the query contains a `#legacySQL` directive then use legacy SQL instead of standard SQL - (.setUseLegacySql (str/includes? (str/lower-case sql) "#legacysql")) - (bigquery.params/set-parameters! parameters) - (.setMaxResults *max-results-per-page*))] - (.query client (.build request) (u/varargs BigQuery$JobOption))) + (let [request (doto (QueryJobConfiguration/newBuilder sql) + ;; if the query contains a `#legacySQL` directive then use legacy SQL instead of standard SQL + (.setUseLegacySql (str/includes? (str/lower-case sql) "#legacysql")) + (bigquery.params/set-parameters! parameters) + ;; .setMaxResults is very misleading; it's actually the page size, and it only takes + ;; effect for RPC (a.k.a. "fast") calls + ;; there is no equivalent of .setMaxRows on a JDBC Statement; we rely on our middleware to stop + ;; realizing more rows as per the maximum result size + (.setMaxResults *page-size*)) + ;; as long as we don't set certain additional QueryJobConfiguration options, our queries *should* always be + ;; following the fast query path (i.e. RPC) + ;; check out com.google.cloud.bigquery.QueryRequestInfo.isFastQuerySupported for full details + res-fut (future (.query client (.build request) (u/varargs BigQuery$JobOption)))] + (when cancel-chan + (future ; this needs to run in a separate thread, because the <!! operation blocks forever + (when (a/<!! cancel-chan) + (log/debugf "Received a message on the cancel channel; attempting to stop the BigQuery query execution") + (reset! cancel-requested? true) ; signal the page iteration fn to stop + (if-not (or (future-cancelled? res-fut) (future-done? res-fut)) + ;; somehow, even the FIRST page hasn't come back yet (i.e. the .query call above), so cancel the future to + ;; interrupt the thread waiting on that response to come back + ;; unfortunately, with this particular overload of .query, we have no access to (nor the ability to control) + ;; the jobId, so we have no way to use the BigQuery client to cancel any job that might be running + (future-cancel res-fut) + (if (future-done? res-fut) ; canceled received after it was finished; may as well return it + @res-fut))))) + @res-fut) (catch BigQueryException e (if (.isRetryable e) (throw (ex-info (tru "BigQueryException executing query") @@ -222,18 +191,32 @@ (throw-invalid-query e sql parameters)))) (defn- ^TableResult execute-bigquery-on-db - [database sql parameters] + [database sql parameters cancel-chan cancel-requested?] (execute-bigquery (database->client database) sql - parameters)) + parameters + cancel-chan + cancel-requested?)) + +(defn- fetch-page [^TableResult response cancel-requested?] + (when response + (when *page-callback* + (*page-callback*)) + (lazy-cat + (.getValues response) + (when (some? (.getNextPageToken response)) + (if @cancel-requested? + (do (log/debug "Cancellation requested; terminating fetching of BigQuery pages") + []) + (fetch-page (.getNextPage response) cancel-requested?)))))) (defn- post-process-native "Parse results of a BigQuery query. `respond` is the same function passed to `metabase.driver/execute-reducible-query`, and has the signature (respond results-metadata rows)" - [database respond ^TableResult resp] + [database respond ^TableResult resp cancel-requested?] (let [^Schema schema (.getSchema resp) @@ -252,29 +235,30 @@ (dissoc :database-type :database-position)))] (respond {:cols columns} - (letfn [(fetch-page [^TableResult response] - (when response - (when *page-callback* - (*page-callback*)) - (lazy-cat - (.getValues response) - (when (some? (.getNextPageToken response)) - (fetch-page (.getNextPage response))))))] - (for [^FieldValueList row (fetch-page resp)] - (for [[^FieldValue cell, parser] (partition 2 (interleave row parsers))] - (when-let [v (.getValue cell)] - ;; There is a weird error where everything that *should* be NULL comes back as an Object. - ;; See https://jira.talendforge.org/browse/TBD-1592 - ;; Everything else comes back as a String luckily so we can proceed normally. - (when-not (= (class v) Object) - (parser v))))))))) - -(defn- process-native* [respond database sql parameters] + (for [^FieldValueList row (fetch-page resp cancel-requested?)] + (for [[^FieldValue cell, parser] (partition 2 (interleave row parsers))] + (when-let [v (.getValue cell)] + ;; There is a weird error where everything that *should* be NULL comes back as an Object. + ;; See https://jira.talendforge.org/browse/TBD-1592 + ;; Everything else comes back as a String luckily so we can proceed normally. + (when-not (= (class v) Object) + (parser v)))))))) + +(defn- process-native* [respond database sql parameters cancel-chan] {:pre [(map? database) (map? (:details database))]} ;; automatically retry the query if it times out or otherwise fails. This is on top of the auto-retry added by ;; `execute` - (letfn [(thunk [] - (post-process-native database respond (execute-bigquery-on-db database sql parameters)))] + (let [cancel-requested? (atom false) + thunk (fn [] + (post-process-native database + respond + (execute-bigquery-on-db + database + sql + parameters + cancel-chan + cancel-requested?) + cancel-requested?))] (try (thunk) (catch Throwable e @@ -289,16 +273,14 @@ "UTC")) (defmethod driver/execute-reducible-query :bigquery-cloud-sdk - ;; TODO - it doesn't actually cancel queries the way we'd expect - [_ {{sql :query, :keys [params]} :native, :as outer-query} _ respond] + [_ {{sql :query, :keys [params]} :native, :as outer-query} context respond] (let [database (qp.store/database)] (binding [bigquery.common/*bigquery-timezone-id* (effective-query-timezone-id database)] (log/tracef "Running BigQuery query in %s timezone" bigquery.common/*bigquery-timezone-id*) (let [sql (if (get-in database [:details :include-user-id-and-hash] true) (str "-- " (qputil/query->remark :bigquery-cloud-sdk outer-query) "\n" sql) sql)] - (process-native* respond database sql params))))) - + (process-native* respond database sql params (context/canceled-chan context)))))) ;;; +----------------------------------------------------------------------------------------------------------------+ ;;; | Other Driver Method Impls | diff --git a/modules/drivers/bigquery-cloud-sdk/test/metabase/driver/bigquery_cloud_sdk/query_processor_test.clj b/modules/drivers/bigquery-cloud-sdk/test/metabase/driver/bigquery_cloud_sdk/query_processor_test.clj index 9569934e64752ec854999f0a25f3ed06e00677be..d3405ab6bb7e98c6560dc9255d90ca8ceaa366c7 100644 --- a/modules/drivers/bigquery-cloud-sdk/test/metabase/driver/bigquery_cloud_sdk/query_processor_test.clj +++ b/modules/drivers/bigquery-cloud-sdk/test/metabase/driver/bigquery_cloud_sdk/query_processor_test.clj @@ -191,7 +191,7 @@ ;; if I run a BigQuery query, does it get a remark added to it? (defn- query->native [query] (let [native-query (atom nil)] - (with-redefs [bigquery/process-native* (fn [_ _ sql _] + (with-redefs [bigquery/process-native* (fn [_ _ sql _ _] (reset! native-query sql) (throw (Exception. "Done.")))] (u/ignore-exceptions diff --git a/modules/drivers/bigquery-cloud-sdk/test/metabase/driver/bigquery_cloud_sdk_test.clj b/modules/drivers/bigquery-cloud-sdk/test/metabase/driver/bigquery_cloud_sdk_test.clj index 4570f401dd2655e491460d4369b6db1d701b73da..556923f33bad105d05a39b29323fb03c063d4ca1 100644 --- a/modules/drivers/bigquery-cloud-sdk/test/metabase/driver/bigquery_cloud_sdk_test.clj +++ b/modules/drivers/bigquery-cloud-sdk/test/metabase/driver/bigquery_cloud_sdk_test.clj @@ -1,5 +1,7 @@ (ns metabase.driver.bigquery-cloud-sdk-test - (:require [clojure.test :refer :all] + (:require [clojure.core.async :as a] + [clojure.test :refer :all] + [clojure.tools.logging :as log] [metabase.db.metadata-queries :as metadata-queries] [metabase.driver :as driver] [metabase.driver.bigquery-cloud-sdk :as bigquery] @@ -47,12 +49,8 @@ (testing "with pagination" (let [pages-retrieved (atom 0) page-callback (fn [] (swap! pages-retrieved inc))] - (with-bindings {#'bigquery/*max-results-per-page* 25 - #'bigquery/*page-callback* page-callback - ;; for this test, set timeout to 0 to prevent setting it - ;; so that the "fast" query path can be used (so that the max-results-per-page actually takes - ;; effect); see com.google.cloud.bigquery.QueryRequestInfo.isFastQuerySupported - #'bigquery/*query-timeout-seconds* 0} + (with-bindings {#'bigquery/*page-size* 25 + #'bigquery/*page-callback* page-callback} (let [actual (->> (metadata-queries/table-rows-sample (Table (mt/id :venues)) [(Field (mt/id :venues :id)) (Field (mt/id :venues :name))] @@ -222,17 +220,13 @@ (deftest return-errors-test (mt/test-driver :bigquery-cloud-sdk (testing "If a Query fails, we should return the error right away (#14918)" - (let [before-ms (System/currentTimeMillis)] - (is (thrown-with-msg? - clojure.lang.ExceptionInfo - #"Error executing query" - (qp/process-query - {:database (mt/id) - :type :native - :native {:query "SELECT abc FROM 123;"}}))) - (testing "Should return the error *before* the query timeout" - (let [duration-ms (- (System/currentTimeMillis) before-ms)] - (is (< duration-ms (u/seconds->ms @#'bigquery/*query-timeout-seconds*))))))))) + (is (thrown-with-msg? + clojure.lang.ExceptionInfo + #"Error executing query" + (qp/process-query + {:database (mt/id) + :type :native + :native {:query "SELECT abc FROM 123;"}})))))) (deftest project-id-override-test (mt/test-driver :bigquery-cloud-sdk @@ -336,13 +330,43 @@ (let [fake-execute-called (atom false) orig-fn @#'bigquery/execute-bigquery] (testing "Retry functionality works as expected" - (with-redefs [bigquery/execute-bigquery (fn [^BigQuery client ^String sql parameters] + (with-redefs [bigquery/execute-bigquery (fn [^BigQuery client ^String sql parameters _ _] (if-not @fake-execute-called (do (reset! fake-execute-called true) ;; simulate a transient error being thrown (throw (ex-info "Transient error" {:retryable? true}))) - (orig-fn client sql parameters)))] + (orig-fn client sql parameters nil nil)))] ;; run any other test that requires a successful query execution (table-rows-sample-test) ;; make sure that the fake exception was thrown, and thus the query execution was retried (is (true? @fake-execute-called))))))) + +(deftest query-cancel-test + (mt/test-driver :bigquery-cloud-sdk + (testing "BigQuery queries can be canceled successfully" + (mt/with-open-channels [canceled-chan (a/promise-chan)] + (binding [bigquery/*page-size* 1000 ; set a relatively small pageSize + bigquery/*page-callback* (fn [] + (log/debug "*page-callback* called, sending cancel message") + (a/>!! canceled-chan ::cancel))] + (mt/dataset sample-dataset + (let [rows (mt/rows (mt/process-query (mt/query orders) {:canceled-chan canceled-chan})) + row-count (count rows)] + (log/debugf "Loaded %d rows before BigQuery query was canceled" row-count) + (testing "Somewhere between 0 and the size of the orders table rows were loaded before cancellation" + (is (< 0 row-count 10000)))))))))) + +(deftest global-max-rows-test + (mt/test-driver :bigquery-cloud-sdk + (testing "The limit middleware prevents us from fetching more pages than are necessary to fulfill query max-rows" + (mt/with-open-channels [canceled-chan (a/promise-chan)] + (let [page-size 100 + max-rows 1000 + num-page-callbacks (atom 0)] + (binding [bigquery/*page-size* page-size + bigquery/*page-callback* (fn [] + (swap! num-page-callbacks inc))] + (mt/dataset sample-dataset + (let [rows (mt/rows (mt/process-query (mt/query orders {:query {:limit max-rows}})))] + (is (= max-rows (count rows))) + (is (= (/ max-rows page-size) @num-page-callbacks)))))))))) diff --git a/modules/drivers/bigquery-cloud-sdk/test/metabase/test/data/bigquery_cloud_sdk.clj b/modules/drivers/bigquery-cloud-sdk/test/metabase/test/data/bigquery_cloud_sdk.clj index 8116bec22072662d8df236e366a98978588fb0da..fa93f5b8c1662f2a774396ba4721b6d2efe0b1da 100644 --- a/modules/drivers/bigquery-cloud-sdk/test/metabase/test/data/bigquery_cloud_sdk.clj +++ b/modules/drivers/bigquery-cloud-sdk/test/metabase/test/data/bigquery_cloud_sdk.clj @@ -86,8 +86,7 @@ (let [sql (apply format format-string args)] (printf "[BigQuery] %s\n" sql) (flush) - (bigquery/with-finished-response [response (#'bigquery/execute-bigquery-on-db (data/db) sql nil)] - response)))) + (#'bigquery/execute-bigquery-on-db (data/db) sql nil nil nil)))) (def ^:private valid-field-types #{:BOOLEAN :DATE :DATETIME :FLOAT :INTEGER :NUMERIC :RECORD :STRING :TIME :TIMESTAMP}) @@ -124,8 +123,8 @@ respond (fn [_ rows] (ffirst rows)) client (bigquery) - ^TableResult query-response (#'bigquery/execute-bigquery client sql [])] - (#'bigquery/post-process-native (test-db-details) respond query-response))) + ^TableResult query-response (#'bigquery/execute-bigquery client sql [] nil nil)] + (#'bigquery/post-process-native (test-db-details) respond query-response (atom false)))) (defprotocol ^:private Insertable (^:private ->insertable [this]