Skip to content
Snippets Groups Projects
Unverified Commit 40a3d6e7 authored by john-metabase's avatar john-metabase Committed by GitHub
Browse files

Always create a transient dataset for BigQuery tests (#31605)


Resolves #28792 

* Always create a transient dataset for BigQuery tests

* Simplifies BigQuery test data code

* More BigQuery test data code cleanup

* More BigQuery test data code cleanup, test fix

* Extracts test-db-name in bigquery tests, restores normalize-name

* Fixes BigQuery QP tests for new test DB name scheme

* Removes extraneous clojure.core

* Updates bigquery QP test helper code style

* Cleans up comments and code in bigquery test data

* Update modules/drivers/bigquery-cloud-sdk/test/metabase/driver/bigquery_cloud_sdk/query_processor_test.clj

Co-authored-by: default avatarmetamben <103100869+metamben@users.noreply.github.com>

---------

Co-authored-by: default avatarmetamben <103100869+metamben@users.noreply.github.com>
parent 6297bbff
No related branches found
No related tags found
No related merge requests found
......@@ -23,16 +23,32 @@
[metabase.util.honeysql-extensions :as hx]
[toucan2.tools.with-temp :as t2.with-temp]))
(def ^:private test-db-name (bigquery.tx/normalize-name :db "test_data"))
(def ^:private sample-dataset-name (bigquery.tx/normalize-name :db "sample_dataset"))
(defn- with-test-db-name
"Replaces instances of v3_test_data with the full per-test-run DB name"
[x]
(cond
(string? x) (str/replace x "v3_test_data" test-db-name)
(map? x) (update-vals x with-test-db-name)
(vector? x) (mapv with-test-db-name x)
(list? x) (map with-test-db-name x)
(symbol? x) (-> x str with-test-db-name symbol)
:else x))
(deftest native-query-test
(mt/test-driver :bigquery-cloud-sdk
(is (= [[100] [99]]
(mt/rows
(qp/process-query
(mt/native-query
{:query (str "SELECT `v3_test_data.venues`.`id` "
"FROM `v3_test_data.venues` "
"ORDER BY `v3_test_data.venues`.`id` DESC "
"LIMIT 2;")})))))
{:query (with-test-db-name
(str "SELECT `v3_test_data.venues`.`id` "
"FROM `v3_test_data.venues` "
"ORDER BY `v3_test_data.venues`.`id` DESC "
"LIMIT 2;"))})))))
(testing (str "make sure that BigQuery native queries maintain the column ordering specified in the SQL -- "
"post-processing ordering shouldn't apply (metabase#2821)")
......@@ -56,11 +72,12 @@
:field_ref [:field "checkins_id" {:base-type :type/Integer}]}]
(qp.test/cols
(qp/process-query
{:native {:query (str "SELECT `v3_test_data.checkins`.`venue_id` AS `venue_id`, "
" `v3_test_data.checkins`.`user_id` AS `user_id`, "
" `v3_test_data.checkins`.`id` AS `checkins_id` "
"FROM `v3_test_data.checkins` "
"LIMIT 2")}
{:native {:query (with-test-db-name
(str "SELECT `v3_test_data.checkins`.`venue_id` AS `venue_id`, "
" `v3_test_data.checkins`.`user_id` AS `user_id`, "
" `v3_test_data.checkins`.`id` AS `checkins_id` "
"FROM `v3_test_data.checkins` "
"LIMIT 2"))}
:type :native
:database (mt/id)})))))
......@@ -76,15 +93,16 @@
[]]]
(mt/rows
(qp/process-query
{:native {:query (str "SELECT ['foo', 'bar'], "
"[1, 2], "
"[3.14159265359, 0.5772156649], "
"[NUMERIC '1234', NUMERIC '5678'], "
"[DATE '2018-01-01', DATE '2018-12-31'], "
"[TIME '12:34:00.00', TIME '20:01:13.23'], "
"[DATETIME '1957-05-17 03:35:00.00', DATETIME '2018-06-01 01:15:34.12'], "
"[TIMESTAMP '2014-09-27 12:30:00.45-08', TIMESTAMP '2020-09-27 09:57:00.45-05'], "
"[]")}
{:native {:query (with-test-db-name
(str "SELECT ['foo', 'bar'], "
"[1, 2], "
"[3.14159265359, 0.5772156649], "
"[NUMERIC '1234', NUMERIC '5678'], "
"[DATE '2018-01-01', DATE '2018-12-31'], "
"[TIME '12:34:00.00', TIME '20:01:13.23'], "
"[DATETIME '1957-05-17 03:35:00.00', DATETIME '2018-06-01 01:15:34.12'], "
"[TIMESTAMP '2014-09-27 12:30:00.45-08', TIMESTAMP '2020-09-27 09:57:00.45-05'], "
"[]"))}
:type :native
:database (mt/id)})))))))
......@@ -109,16 +127,17 @@
rows))))
(testing "let's make sure we're generating correct HoneySQL + SQL for aggregations"
(is (sql= '{:select [v3_test_data.venues.price AS price
avg (v3_test_data.venues.category_id) AS avg]
:from [v3_test_data.venues]
:group-by [price]
:order-by [avg ASC
price ASC]}
(is (sql= (with-test-db-name
'{:select [v3_test_data.venues.price AS price
avg (v3_test_data.venues.category_id) AS avg]
:from [v3_test_data.venues]
:group-by [price]
:order-by [avg ASC
price ASC]})
(mt/mbql-query venues
{:aggregation [[:avg $category_id]]
:breakout [$price]
:order-by [[:asc [:aggregation 0]]]}))))))
{:aggregation [[:avg $category_id]]
:breakout [$price]
:order-by [[:asc [:aggregation 0]]]}))))))
(deftest join-alias-test
(mt/test-driver :bigquery-cloud-sdk
......@@ -129,13 +148,14 @@
(let [results (mt/run-mbql-query venues
{:aggregation [:count]
:breakout [$category_id->categories.name]})]
(is (= (str "SELECT `categories__via__category_id`.`name` AS `categories__via__category_id__name`,"
" count(*) AS `count` "
"FROM `v3_test_data.venues` "
"LEFT JOIN `v3_test_data.categories` `categories__via__category_id`"
" ON `v3_test_data.venues`.`category_id` = `categories__via__category_id`.`id` "
"GROUP BY `categories__via__category_id__name` "
"ORDER BY `categories__via__category_id__name` ASC")
(is (= (with-test-db-name
(str "SELECT `categories__via__category_id`.`name` AS `categories__via__category_id__name`,"
" count(*) AS `count` "
"FROM `v3_test_data.venues` "
"LEFT JOIN `v3_test_data.categories` `categories__via__category_id`"
" ON `v3_test_data.venues`.`category_id` = `categories__via__category_id`.`id` "
"GROUP BY `categories__via__category_id__name` "
"ORDER BY `categories__via__category_id__name` ASC"))
(get-in results [:data :native_form :query] results))))))))
(defn- native-timestamp-query [db-or-db-id timestamp-str timezone-str]
......@@ -183,17 +203,17 @@
(deftest remark-test
(mt/test-driver :bigquery-cloud-sdk
(is (= (str
"-- Metabase:: userID: 1000 queryType: MBQL queryHash: 01020304\n"
"SELECT"
" `v3_test_data.venues`.`id` AS `id`,"
" `v3_test_data.venues`.`name` AS `name`,"
" `v3_test_data.venues`.`category_id` AS `category_id`,"
" `v3_test_data.venues`.`latitude` AS `latitude`,"
" `v3_test_data.venues`.`longitude` AS `longitude`,"
" `v3_test_data.venues`.`price` AS `price` "
"FROM `v3_test_data.venues` "
"LIMIT 1")
(is (= (with-test-db-name
(str "-- Metabase:: userID: 1000 queryType: MBQL queryHash: 01020304\n"
"SELECT"
" `v3_test_data.venues`.`id` AS `id`,"
" `v3_test_data.venues`.`name` AS `name`,"
" `v3_test_data.venues`.`category_id` AS `category_id`,"
" `v3_test_data.venues`.`latitude` AS `latitude`,"
" `v3_test_data.venues`.`longitude` AS `longitude`,"
" `v3_test_data.venues`.`price` AS `price` "
"FROM `v3_test_data.venues` "
"LIMIT 1"))
(query->native
{:database (mt/id)
:type :query
......@@ -206,11 +226,11 @@
;; if I run a BigQuery query with include-user-id-and-hash set to false, does it get a remark added to it?
(deftest remove-remark-test
(mt/test-driver :bigquery-cloud-sdk
(is (= (str
"SELECT `v3_test_data.venues`.`id` AS `id`,"
" `v3_test_data.venues`.`name` AS `name` "
"FROM `v3_test_data.venues` "
"LIMIT 1")
(is (= (with-test-db-name
(str "SELECT `v3_test_data.venues`.`id` AS `id`,"
" `v3_test_data.venues`.`name` AS `name` "
"FROM `v3_test_data.venues` "
"LIMIT 1"))
(t2.with-temp/with-temp [Database db {:engine :bigquery-cloud-sdk
:details (assoc (:details (mt/db))
:include-user-id-and-hash false)}
......@@ -237,9 +257,10 @@
(qp.test/rows
(qp/process-query
(mt/native-query
{:query (str "SELECT `v3_test_data.venues`.`name` AS `name` "
"FROM `v3_test_data.venues` "
"WHERE `v3_test_data.venues`.`name` = ?")
{:query (with-test-db-name
(str "SELECT `v3_test_data.venues`.`name` AS `name` "
"FROM `v3_test_data.venues` "
"WHERE `v3_test_data.venues`.`name` = ?"))
:params ["Red Medicine"]}))))
(str "Do we properly unprepare, and can we execute, queries that still have parameters for one reason or "
"another? (EE #277)"))))
......@@ -367,7 +388,7 @@
:limit 1})
filter-clause (get-in query [:query :filter])]
(mt/with-everything-store
(is (= [(str "timestamp_millis(v3_sample_dataset.reviews.rating)"
(is (= [(str (format "timestamp_millis(%s.reviews.rating)" sample-dataset-name)
" = "
"timestamp_trunc(timestamp_add(current_timestamp(), INTERVAL -30 day), day)")]
(hsql/format-predicate (sql.qp/->honeysql :bigquery-cloud-sdk filter-clause)))))
......@@ -505,7 +526,7 @@
(t/local-date "2019-11-12")]))))
(mt/test-driver :bigquery-cloud-sdk
(mt/with-everything-store
(let [expected ["WHERE `v3_test_data.checkins`.`date` BETWEEN ? AND ?"
(let [expected [(with-test-db-name "WHERE `v3_test_data.checkins`.`date` BETWEEN ? AND ?")
(t/local-date "2019-11-11")
(t/local-date "2019-11-12")]]
(testing "Should be able to get temporal type from a `:field` with integer ID"
......@@ -515,7 +536,7 @@
(t/local-date "2019-11-11")
(t/local-date "2019-11-12")]))))
(testing "Should be able to get temporal type from a `:field` with `:temporal-unit`"
(is (= (cons "WHERE date_trunc(`v3_test_data.checkins`.`date`, day) BETWEEN ? AND ?"
(is (= (cons (with-test-db-name "WHERE date_trunc(`v3_test_data.checkins`.`date`, day) BETWEEN ? AND ?")
(rest expected))
(between->sql [:between
[:field (mt/id :checkins :date) {::add/source-table (mt/id :checkins)
......@@ -535,14 +556,16 @@
(mt/with-temp-copy-of-db
(try
(bigquery.tx/execute!
(format "CREATE TABLE `v3_test_data.%s` ( ts TIMESTAMP, dt DATETIME )" table-name))
(with-test-db-name
(format "CREATE TABLE `v3_test_data.%s` ( ts TIMESTAMP, dt DATETIME )" table-name)))
(bigquery.tx/execute!
(format "INSERT INTO `v3_test_data.%s` (ts, dt) VALUES (TIMESTAMP \"2020-01-01 00:00:00 UTC\", DATETIME \"2020-01-01 00:00:00\")"
table-name))
(with-test-db-name
(format "INSERT INTO `v3_test_data.%s` (ts, dt) VALUES (TIMESTAMP \"2020-01-01 00:00:00 UTC\", DATETIME \"2020-01-01 00:00:00\")"
table-name)))
(sync/sync-database! (mt/db))
(f table-name)
(finally
(bigquery.tx/execute! "DROP TABLE IF EXISTS `v3_test_data.%s`" table-name)))))))
(bigquery.tx/execute! (with-test-db-name "DROP TABLE IF EXISTS `v3_test_data.%s`") table-name)))))))
(deftest filter-by-datetime-timestamp-test
(mt/test-driver :bigquery-cloud-sdk
......@@ -581,7 +604,8 @@
(let [query {:database (mt/id)
:type :native
:native {:query (str "SELECT count(*)\n"
"FROM `v3_attempted_murders.attempts`\n"
(format "FROM `%s.attempts`\n"
(bigquery.tx/normalize-name :db "attempted_murders"))
"WHERE {{d}}")
:template-tags {"d" {:name "d"
:display-name "Date"
......@@ -800,9 +824,10 @@
(mt/formatted-rows [int]
(qp/process-query
(mt/native-query
{:query (str "SELECT count(*) AS `count` "
"FROM `v3_test_data.venues` "
"WHERE `v3_test_data.venues`.`name` = ?")
{:query (with-test-db-name
(str "SELECT count(*) AS `count` "
"FROM `v3_test_data.venues` "
"WHERE `v3_test_data.venues`.`name` = ?"))
:params ["x\\\\' OR 1 = 1 -- "]})))))))))
(deftest escape-alias-test
......@@ -837,12 +862,13 @@
{:fields [$id $venue_id->venues.name]
:limit 1})]
(mt/with-temp-vals-in-db Table (mt/id :venues) {:name "Organização"}
(is (sql= '{:select [v3_test_data.checkins.id AS id
Organizacao__via__venue_id.name AS Organizacao__via__venue_id__name]
:from [v3_test_data.checkins]
:left-join [v3_test_data.Organização Organizacao__via__venue_id
ON v3_test_data.checkins.venue_id = Organizacao__via__venue_id.id]
:limit [1]}
(is (sql= (with-test-db-name
'{:select [v3_test_data.checkins.id AS id
Organizacao__via__venue_id.name AS Organizacao__via__venue_id__name]
:from [v3_test_data.checkins]
:left-join [v3_test_data.Organização Organizacao__via__venue_id
ON v3_test_data.checkins.venue_id = Organizacao__via__venue_id.id]
:limit [1]})
query))))))))
(deftest multiple-template-parameters-test
......@@ -894,9 +920,9 @@
(is (sql= {:select '[source.count AS count
count (*) AS count_2]
:from [(let [prefix (project-id-prefix-if-set)]
{:select ['date_trunc (list (symbol (str prefix 'v3_test_data.checkins.date)) 'month) 'AS 'date
'count '(*) 'AS 'count]
:from [(symbol (str prefix 'v3_test_data.checkins))]
{:select ['date_trunc (list (symbol (str prefix test-db-name ".checkins.date")) 'month) 'AS 'date
'count '(*) 'AS 'count]
:from [(symbol (str prefix test-db-name ".checkins"))]
:group-by '[date]
:order-by '[date ASC]})
'source]
......@@ -915,8 +941,10 @@
(is (= {:mbql? true
:params nil
:table-name "orders"
:query (str "SELECT APPROX_QUANTILES(`v3_sample_dataset.orders`.`quantity`, 10)[OFFSET(5)] AS `CE`"
" FROM `v3_sample_dataset.orders` LIMIT 10")}
:query (format
(str "SELECT APPROX_QUANTILES(`%s.orders`.`quantity`, 10)[OFFSET(5)] AS `CE`"
" FROM `%s.orders` LIMIT 10")
sample-dataset-name sample-dataset-name)}
(qp/compile (mt/mbql-query orders
{:aggregation [[:aggregation-options
[:percentile $orders.quantity 0.5]
......
......@@ -24,6 +24,8 @@
(set! *warn-on-reflection* true)
(def ^:private test-db-name (bigquery.tx/normalize-name :db "test_data"))
(deftest can-connect?-test
(mt/test-driver :bigquery-cloud-sdk
(let [db-details (:details (mt/db))
......@@ -171,17 +173,20 @@
(defmacro with-view [[view-name-binding] & body]
`(do-with-temp-obj "view_%s"
(fn [view-nm#] [(str "CREATE VIEW `v3_test_data.%s` AS "
(fn [view-nm#] [(str "CREATE VIEW `%s.%s` AS "
"SELECT v.id AS id, v.name AS venue_name, c.name AS category_name "
"FROM `%s.v3_test_data.venues` v "
"LEFT JOIN `%s.v3_test_data.categories` c "
"FROM `%s.%s.venues` v "
"LEFT JOIN `%s.%s.categories` c "
"ON v.category_id = c.id "
"ORDER BY v.id ASC "
"LIMIT 3")
~test-db-name
view-nm#
(bigquery.tx/project-id)
(bigquery.tx/project-id)])
(fn [view-nm#] ["DROP VIEW IF EXISTS `v3_test_data.%s`" view-nm#])
~test-db-name
(bigquery.tx/project-id)
~test-db-name])
(fn [view-nm#] ["DROP VIEW IF EXISTS `%s.%s`" ~test-db-name view-nm#])
(fn [~(or view-name-binding '_)] ~@body)))
(def ^:private numeric-val "-1.2E20")
......@@ -196,31 +201,32 @@
(defmacro with-numeric-types-table [[table-name-binding] & body]
`(do-with-temp-obj "table_%s"
(fn [tbl-nm#] [(str "CREATE TABLE `v3_test_data.%s` AS SELECT "
(fn [tbl-nm#] [(str "CREATE TABLE `%s.%s` AS SELECT "
"NUMERIC '%s' AS numeric_col, "
"DECIMAL '%s' AS decimal_col, "
"BIGNUMERIC '%s' AS bignumeric_col, "
"BIGDECIMAL '%s' AS bigdecimal_col")
~test-db-name
tbl-nm#
~numeric-val
~decimal-val
~bignumeric-val
~bigdecimal-val])
(fn [tbl-nm#] ["DROP TABLE IF EXISTS `v3_test_data.%s`" tbl-nm#])
(fn [tbl-nm#] ["DROP TABLE IF EXISTS `%s.%s`" ~test-db-name tbl-nm#])
(fn [~(or table-name-binding '_)] ~@body)))
(deftest sync-views-test
(mt/test-driver :bigquery-cloud-sdk
(with-view [#_:clj-kondo/ignore view-name]
(is (contains? (:tables (driver/describe-database :bigquery-cloud-sdk (mt/db)))
{:schema "v3_test_data", :name view-name})
{:schema test-db-name, :name view-name})
"`describe-database` should see the view")
(is (= {:schema "v3_test_data"
(is (= {:schema test-db-name
:name view-name
:fields #{{:name "id", :database-type "INTEGER", :base-type :type/Integer, :database-position 0}
{:name "venue_name", :database-type "STRING", :base-type :type/Text, :database-position 1}
{:name "category_name", :database-type "STRING", :base-type :type/Text, :database-position 2}}}
(driver/describe-table :bigquery-cloud-sdk (mt/db) {:name view-name, :schema "v3_test_data"}))
(driver/describe-table :bigquery-cloud-sdk (mt/db) {:name view-name, :schema test-db-name}))
"`describe-tables` should see the fields in the view")
(sync/sync-database! (mt/db) {:scan :schema})
(testing "We should be able to run queries against the view (#3414)"
......@@ -282,9 +288,9 @@
(testing "Table with decimal types"
(with-numeric-types-table [#_:clj-kondo/ignore tbl-nm]
(is (contains? (:tables (driver/describe-database :bigquery-cloud-sdk (mt/db)))
{:schema "v3_test_data", :name tbl-nm})
{:schema test-db-name, :name tbl-nm})
"`describe-database` should see the table")
(is (= {:schema "v3_test_data"
(is (= {:schema test-db-name
:name tbl-nm
:fields #{{:name "numeric_col", :database-type "NUMERIC", :base-type :type/Decimal, :database-position 0}
{:name "decimal_col", :database-type "NUMERIC", :base-type :type/Decimal, :database-position 1}
......@@ -296,7 +302,7 @@
:database-type "BIGNUMERIC"
:base-type :type/Decimal
:database-position 3}}}
(driver/describe-table :bigquery-cloud-sdk (mt/db) {:name tbl-nm, :schema "v3_test_data"}))
(driver/describe-table :bigquery-cloud-sdk (mt/db) {:name tbl-nm, :schema test-db-name}))
"`describe-table` should see the fields in the table")
(sync/sync-database! (mt/db) {:scan :schema})
(testing "We should be able to run queries against the table"
......@@ -319,15 +325,16 @@
(deftest sync-table-with-array-test
(testing "Tables with ARRAY (REPEATED) columns can be synced successfully"
(do-with-temp-obj "table_array_type_%s"
(fn [tbl-nm] ["CREATE TABLE `v3_test_data.%s` AS SELECT 1 AS int_col, GENERATE_ARRAY(1,10) AS array_col"
(fn [tbl-nm] ["CREATE TABLE `%s.%s` AS SELECT 1 AS int_col, GENERATE_ARRAY(1,10) AS array_col"
test-db-name
tbl-nm])
(fn [tbl-nm] ["DROP TABLE IF EXISTS `v3_test_data.%s`" tbl-nm])
(fn [tbl-nm] ["DROP TABLE IF EXISTS `%s.%s`" test-db-name tbl-nm])
(fn [tbl-nm]
(is (= {:schema "v3_test_data"
(is (= {:schema test-db-name
:name tbl-nm
:fields #{{:name "int_col", :database-type "INTEGER", :base-type :type/Integer, :database-position 0}
{:name "array_col", :database-type "INTEGER", :base-type :type/Array, :database-position 1}}}
(driver/describe-table :bigquery-cloud-sdk (mt/db) {:name tbl-nm, :schema "v3_test_data"}))
(driver/describe-table :bigquery-cloud-sdk (mt/db) {:name tbl-nm, :schema test-db-name}))
"`describe-table` should detect the correct base-type for array type columns")))))
(deftest sync-inactivates-old-duplicate-tables
......@@ -502,7 +509,7 @@
(deftest format-sql-test
(mt/test-driver :bigquery-cloud-sdk
(testing "native queries are compiled and formatted without whitespace errors (#30676)"
(is (= (str "SELECT\n count(*) AS `count`\nFROM\n `v3_test_data.venues`")
(is (= (str (format "SELECT\n count(*) AS `count`\nFROM\n `%s.venues`" test-db-name))
(-> (mt/mbql-query venues {:aggregation [:count]})
qp/compile-and-splice-parameters
:query
......
......@@ -40,26 +40,16 @@
;;; ----------------------------------------------- Connection Details -----------------------------------------------
(defn- transient-dataset?
"Returns a boolean indicating whether the given `dataset-name` (as per its definition, NOT the physical schema name
that is to be created on the cluster) should be made transient (i.e. created and destroyed with every test run, for
instance to check time intervals relative to \"now\")."
[dataset-name]
(str/includes? dataset-name "checkins_interval_"))
(defn- normalize-name ^String [db-or-table identifier]
(defn normalize-name
"Returns a normalized name for a test database or table"
^String [db-or-table identifier]
(let [s (str/replace (name identifier) "-" "_")]
(case db-or-table
:db (cond-> (str "v3_" s)
;; for transient datasets (i.e. those that are created and torn down with each test run), we should add
;; some unique name portion to prevent independent parallel test runs from interfering with each other
(transient-dataset? s)
;; for transient datasets, we will make them unique by appending a suffix that represents the millisecond
;; timestamp from when this namespace was loaded (i.e. test initialized on this particular JVM/instance)
;; note that this particular dataset will not be deleted after this test run finishes, since there is no
;; reasonable hook to do so (from this test extension namespace), so instead we will rely on each run
;; cleaning up outdated, transient datasets via the `transient-dataset-outdated?` mechanism above
(str "__transient_" ns-load-time))
;; All databases created during test runs by this JVM instance get a suffix based on the timestamp from when
;; this namespace was loaded. This dataset will not be deleted after this test run finishes, since there is no
;; reasonable hook to do so (from this test extension namespace), so instead we will rely on each run cleaning
;; up outdated, transient datasets via the `transient-dataset-outdated?` mechanism.
:db (str "v3_" s "__transient_" ns-load-time)
:table s)))
(defn- test-db-details []
......@@ -104,7 +94,7 @@
(.delete (bigquery) dataset-id (u/varargs
BigQuery$DatasetDeleteOption
[(BigQuery$DatasetDeleteOption/deleteContents)]))
(log/error (u/format-color 'red "Deleted BigQuery dataset `%s.%s`." (project-id) dataset-id)))
(log/infof "Deleted BigQuery dataset `%s.%s`." (project-id) dataset-id))
(defn execute!
"Execute arbitrary (presumably DDL) SQL statements against the test project. Waits for statement to complete, throwing
......@@ -300,22 +290,15 @@
(recur (dec num-retries))
(throw e)))))))
(defn- existing-dataset-names
(defn- get-all-datasets
"Fetch a list of *all* dataset names that currently exist in the BQ test project."
[]
(for [^Dataset dataset (.iterateAll (.listDatasets (bigquery) (into-array BigQuery$DatasetListOption [])))
:let [dataset-name (.. dataset getDatasetId getDataset)]]
dataset-name))
;; keep track of databases we haven't created yet
(def ^:private existing-datasets
"All datasets that already exist in the BigQuery cluster, so that we can possibly avoid recreating/repopulating them
on every run."
(atom #{}))
(for [^Dataset dataset (.iterateAll (.listDatasets (bigquery) (into-array BigQuery$DatasetListOption [])))]
(.. dataset getDatasetId getDataset)))
(defn- transient-dataset-outdated?
"Checks whether the given `dataset-name` is a transient dataset that is outdated, and should be deleted. Note that
unlike `transient-dataset?`, this doesn't need any domain specific knowledge about which transient datasets are
this doesn't need any domain specific knowledge about which transient datasets are
outdated. The fact that a *created* dataset (i.e. created on BigQuery) is transient has already been encoded by a
suffix, so we can just look for that here."
[dataset-name]
......@@ -325,53 +308,32 @@
(defmethod tx/create-db! :bigquery-cloud-sdk [_ {:keys [database-name table-definitions]} & _]
{:pre [(seq database-name) (sequential? table-definitions)]}
;; fetch existing datasets if we haven't done so yet
(when-not (seq @existing-datasets)
(let [{transient-datasets true non-transient-datasets false} (group-by transient-dataset?
(existing-dataset-names))]
(reset! existing-datasets (set non-transient-datasets))
(log/infof "These BigQuery datasets have already been loaded:\n%s" (u/pprint-to-str (sort @existing-datasets)))
(when-let [outdated-transient-datasets (seq (filter transient-dataset-outdated? transient-datasets))]
(log/info (u/format-color
'blue
"These BigQuery datasets are transient, and more than two hours old; deleting them: %s`."
(u/pprint-to-str (sort outdated-transient-datasets))))
(doseq [delete-ds outdated-transient-datasets]
(u/ignore-exceptions
(destroy-dataset! delete-ds))))))
;; now check and see if we need to create the requested one
;; clean up outdated datasets
(doseq [outdated (filter transient-dataset-outdated? (get-all-datasets))]
(log/info (u/format-color 'blue "Deleting temporary dataset more than two hours old: %s`." outdated))
(u/ignore-exceptions
(destroy-dataset! outdated)))
(let [database-name (normalize-name :db database-name)]
(when-not (contains? @existing-datasets database-name)
(u/ignore-exceptions
(u/auto-retry 2
(try
(log/infof "Creating dataset %s..." (pr-str database-name))
;; if the dataset failed to load successfully last time around, destroy whatever was loaded so we start
;; again from a blank slate
(u/ignore-exceptions
(destroy-dataset! database-name))
(u/auto-retry 2
(try
(log/infof "Creating dataset %s..." (pr-str database-name))
;; if the dataset failed to load successfully last time around, destroy whatever was loaded so we start
;; again from a blank slate
(destroy-dataset! database-name)
#_(u/ignore-exceptions
(destroy-dataset! database-name))
(create-dataset! database-name)
;; now create tables and load data.
(doseq [tabledef table-definitions]
(load-tabledef! database-name tabledef))
(swap! existing-datasets conj database-name)
(log/info (u/format-color 'green "Successfully created %s." (pr-str database-name)))
(catch Throwable e
(log/error (u/format-color 'red "Failed to load BigQuery dataset %s." (pr-str database-name)))
(log/error (u/pprint-to-str 'red (Throwable->map e)))
;; if creating the dataset ultimately fails to complete, then delete it so it will hopefully
;; work next time around
(u/ignore-exceptions
(destroy-dataset! database-name))
(throw e)))))))
(create-dataset! database-name)
;; now create tables and load data.
(doseq [tabledef table-definitions]
(load-tabledef! database-name tabledef))
(log/info (u/format-color 'green "Successfully created %s." (pr-str database-name)))
(catch Throwable e
(log/error (u/format-color 'red "Failed to load BigQuery dataset %s." (pr-str database-name)))
(log/error (u/pprint-to-str 'red (Throwable->map e)))
(throw e))))))
(defmethod tx/destroy-db! :bigquery-cloud-sdk
[_ {:keys [database-name]}]
(destroy-dataset! database-name)
(when (seq @existing-datasets)
(swap! existing-datasets disj database-name)))
(destroy-dataset! database-name))
(defmethod tx/aggregate-column-info :bigquery-cloud-sdk
([driver aggregation-type]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment