Skip to content
Snippets Groups Projects
Commit 5c9a3a50 authored by Cam Saül's avatar Cam Saül
Browse files

Merge pull request #2248 from metabase/bigquery-ci-fix

Automatically retry a BigQuery query if it times out :unamused:
parents dbd9ed87 079d599d
No related branches found
No related tags found
No related merge requests found
...@@ -172,33 +172,46 @@ ...@@ -172,33 +172,46 @@
"STRING" identity "STRING" identity
"TIMESTAMP" parse-timestamp-str}) "TIMESTAMP" parse-timestamp-str})
(def ^:private ^:const query-timeout-error-message "Query timed out.")
(defn- post-process-native (defn- post-process-native
;; 99% of the time by the time this is called `.getJobComplete` will return `true`. On the off chance it doesn't, wait a few seconds for the job to finish.
([^QueryResponse response] ([^QueryResponse response]
(post-process-native response 10)) ; wait up to 10 seconds for `.getJobComplete` to return `true` (post-process-native response 15))
([^QueryResponse response, ^Integer timeout-seconds] ([^QueryResponse response, ^Integer timeout-seconds]
(when-not (.getJobComplete response) (if-not (.getJobComplete response)
(when (zero? timeout-seconds) ; if we've ran out of wait time throw an exception ;; 99% of the time by the time this is called `.getJobComplete` will return `true`. On the off chance it doesn't, wait a few seconds for the job to finish.
(throw (Exception. (str "Query timed out: " (or (.getErrors response) (do
response))))) (when (zero? timeout-seconds)
(Thread/sleep 1000) ; otherwise sleep a second, try again, and decrement the remaining `timeout-seconds` (throw (ex-info query-timeout-error-message response)))
(post-process-native response (dec timeout-seconds))) (Thread/sleep 1000)
(let [^TableSchema schema (.getSchema response) (post-process-native response (dec timeout-seconds)))
parsers (for [^TableFieldSchema field (.getFields schema)] ;; Otherwise the job *is* complete
(type->parser (.getType field))) (let [^TableSchema schema (.getSchema response)
cols (table-schema->metabase-field-info schema)] parsers (for [^TableFieldSchema field (.getFields schema)]
{:columns (map :name cols) (type->parser (.getType field)))
:cols cols cols (table-schema->metabase-field-info schema)]
:rows (for [^TableRow row (.getRows response)] {:columns (map :name cols)
(for [[^TableCell cell, parser] (partition 2 (interleave (.getF row) parsers))] :cols cols
(when-let [v (.getV cell)] :rows (for [^TableRow row (.getRows response)]
;; There is a weird error where everything that *should* be NULL comes back as an Object. See https://jira.talendforge.org/browse/TBD-1592 (for [[^TableCell cell, parser] (partition 2 (interleave (.getF row) parsers))]
;; Everything else comes back as a String luckily so we can proceed normally. (when-let [v (.getV cell)]
(when-not (= (class v) Object) ;; There is a weird error where everything that *should* be NULL comes back as an Object. See https://jira.talendforge.org/browse/TBD-1592
(parser v)))))}))) ;; Everything else comes back as a String luckily so we can proceed normally.
(when-not (= (class v) Object)
(parser v)))))}))))
(defn- process-native* [database query-string] (defn- process-native* [database query-string]
(post-process-native (execute-query database query-string))) ;; Automatically retry the query a single time if it timed out
(let [do-query (fn [] (post-process-native (execute-query database query-string)))]
(try
(do-query)
(catch clojure.lang.ExceptionInfo e
(if (= (.getMessage e) query-timeout-error-message)
;; If this was a timeout error, retry
(do (log/info "Query timed out, retrying...")
(do-query))
;; Otherwise re-throw exception
(throw e))))))
(defn- process-native [{database-id :database, {native-query :query} :native}] (defn- process-native [{database-id :database, {native-query :query} :native}]
(process-native* (Database database-id) native-query)) (process-native* (Database database-id) native-query))
...@@ -304,7 +317,7 @@ ...@@ -304,7 +317,7 @@
(k/as-sql korma-form)) (k/as-sql korma-form))
#"\]\.\[" ".") #"\]\.\[" ".")
(catch Throwable e (catch Throwable e
(println (u/format-color 'red "Couldn't convert korma form to SQL:\n%s" (sqlqp/pprint-korma-form korma-form))) (log/error (u/format-color 'red "Couldn't convert korma form to SQL:\n%s" (sqlqp/pprint-korma-form korma-form)))
(throw e)))) (throw e))))
(defn- post-process-structured [dataset-id table-name {:keys [columns rows]}] (defn- post-process-structured [dataset-id table-name {:keys [columns rows]}]
...@@ -342,7 +355,7 @@ ...@@ -342,7 +355,7 @@
{:pre [(map? this) (or field {:pre [(map? this) (or field
index index
(and (seq schema-name) (seq field-name) (seq table-name)) (and (seq schema-name) (seq field-name) (seq table-name))
(println "Don't know how to alias: " this))]} (log/error "Don't know how to alias: " this))]}
(cond (cond
field (recur field) ; DateTimeField field (recur field) ; DateTimeField
index (name (let [{{{ag-type :aggregation-type} :aggregation} :query} sqlqp/*query*] index (name (let [{{{ag-type :aggregation-type} :aggregation} :query} sqlqp/*query*]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment