Skip to content
Snippets Groups Projects
Unverified Commit 90de494b authored by metabase-bot[bot]'s avatar metabase-bot[bot] Committed by GitHub
Browse files

Fix Bigquery can't sync materialized views (#42092) (#42210)


Co-authored-by: default avatarNgoc Khuat <qn.khuat@gmail.com>
parent aaa0d790
No related merge requests found
......@@ -31,8 +31,9 @@
(clojure.lang PersistentList)
(com.google.cloud.bigquery BigQuery BigQuery$DatasetListOption BigQuery$JobOption BigQuery$TableDataListOption
BigQuery$TableListOption BigQuery$TableOption BigQueryException BigQueryOptions Dataset
DatasetId Field Field$Mode FieldValue FieldValueList QueryJobConfiguration Schema StandardTableDefinition
Table TableDefinition$Type TableId TableResult)))
DatasetId Field Field$Mode FieldValue FieldValueList MaterializedViewDefinition QueryJobConfiguration Schema
RangePartitioning TimePartitioning
StandardTableDefinition Table TableDefinition TableDefinition$Type TableId TableResult)))
(set! *warn-on-reflection* true)
......@@ -121,11 +122,29 @@
(.getTable client (TableId/of project-id dataset-id table-id) empty-table-options)
(.getTable client dataset-id table-id empty-table-options))))
(defn- tabledef->range-partition
[^TableDefinition tabledef]
(condp = (.getType tabledef)
TableDefinition$Type/TABLE
(.getRangePartitioning ^StandardTableDefinition tabledef)
TableDefinition$Type/MATERIALIZED_VIEW
(.getRangePartitioning ^MaterializedViewDefinition tabledef)
nil))
(defn- tabledef->time-partition
[^TableDefinition tabledef]
(condp = (.getType tabledef)
TableDefinition$Type/TABLE
(.getTimePartitioning ^StandardTableDefinition tabledef)
TableDefinition$Type/MATERIALIZED_VIEW
(.getTimePartitioning ^MaterializedViewDefinition tabledef)
nil))
(defn- table-is-partitioned?
[^StandardTableDefinition tabledef]
[^TableDefinition tabledef]
(when (#{TableDefinition$Type/TABLE TableDefinition$Type/MATERIALIZED_VIEW} (.getType tabledef))
(or (.getRangePartitioning tabledef)
(.getTimePartitioning tabledef))))
(or (tabledef->range-partition tabledef)
(tabledef->time-partition tabledef))))
(defmethod driver/describe-database :bigquery-cloud-sdk
[_ database]
......@@ -133,7 +152,7 @@
{:tables (set (for [^Table table tables
:let [^TableId table-id (.getTableId table)
^String dataset-id (.getDataset table-id)
^StandardTableDefinition tabledef (.getDefinition table)
^TableDefinition tabledef (.getDefinition table)
table-name (str (.getTable table-id))]]
{:schema dataset-id
:name table-name
......@@ -201,17 +220,17 @@
(defmethod driver/describe-table :bigquery-cloud-sdk
[_ database {table-name :name, dataset-id :schema}]
(let [table (get-table database dataset-id table-name)
^StandardTableDefinition tabledef (.getDefinition table)
is-partitioned? (table-is-partitioned? tabledef)
(let [table (get-table database dataset-id table-name)
^TableDefinition tabledef (.getDefinition table)
is-partitioned? (table-is-partitioned? tabledef)
;; a table can only have one partitioned field
partitioned-field-name (when is-partitioned?
(or (some-> (.getRangePartitioning tabledef) .getField)
(some-> (.getTimePartitioning tabledef) .getField)))
fields (set
(map
#(assoc % :database-partitioned (= (:name %) partitioned-field-name))
(table-schema->metabase-field-info (. tabledef getSchema))))]
partitioned-field-name (when is-partitioned?
(or (some-> ^RangePartitioning (tabledef->range-partition tabledef) .getField)
(some-> ^TimePartitioning (tabledef->time-partition tabledef) .getField)))
fields (set
(map
#(assoc % :database-partitioned (= (:name %) partitioned-field-name))
(table-schema->metabase-field-info (. tabledef getSchema))))]
{:schema dataset-id
:name table-name
:fields (cond-> fields
......@@ -219,7 +238,7 @@
;; meaning this table is partitioned by ingestion time
;; so we manually sync the 2 pseudo-columns _PARTITIONTIME AND _PARTITIONDATE
(and is-partitioned?
(some? (.getTimePartitioning tabledef))
(some? (tabledef->time-partition tabledef))
(nil? partitioned-field-name))
(conj
{:name partitioned-time-field-name
......
......@@ -35,6 +35,10 @@
[table-name]
(bigquery.tx/execute! (format "DROP TABLE IF EXISTS `%s`;" (fmt-table-name table-name))))
(defn- drop-mv-if-exists!
[table-name]
(bigquery.tx/execute! (format "DROP MATERIALIZED VIEW IF EXISTS `%s`;" (fmt-table-name table-name))))
(deftest can-connect?-test
(mt/test-driver :bigquery-cloud-sdk
(let [db-details (:details (mt/db))
......@@ -251,6 +255,27 @@
{:source-table (mt/id view-name)
:order-by [[:asc (mt/id view-name :id)]]}))))))))
(deftest sync-materialized-view-test
(mt/test-driver :bigquery-cloud-sdk
(mt/with-model-cleanup [:model/Table]
(let [view-name "mv_test_materialized_view"]
(try
(doseq [sql [(format "CREATE MATERIALIZED VIEW %s AS (
SELECT product_id, COUNT(id) as cnt FROM %s GROUP BY product_id);"
(fmt-table-name view-name)
(fmt-table-name "orders"))]]
(bigquery.tx/execute! sql))
(sync/sync-database! (mt/db) {:scan :schema})
(testing "We should be able to run queries against the view (#3414)"
(is (= [[1 93] [2 98] [3 77]]
(mt/rows
(mt/run-mbql-query nil
{:source-table (mt/id view-name)
:order-by [[:asc (mt/id view-name :product_id)]]
:limit 3})))))
(finally
(drop-mv-if-exists! view-name)))))))
(deftest sync-table-with-required-filter-test
(mt/test-driver :bigquery-cloud-sdk
(testing "tables that require a partition filters are synced correctly"
......@@ -314,8 +339,9 @@
(mt/test-driver :bigquery-cloud-sdk
(testing "Partitioned tables that require a partition filter can be synced"
(mt/with-model-cleanup [:model/Table]
(let [table-names ["partition_by_range" "partition_by_time" "partitioned_by_datetime"
"partition_by_ingestion_time" "partition_by_ingestion_time_not_required"]]
(let [table-names ["partition_by_range" "partition_by_time" "partition_by_datetime"
"partition_by_ingestion_time" "partition_by_ingestion_time_not_required"]
mv-names ["mv_partition_by_datetime" "mv_partition_by_range"]]
(try
(doseq [sql [(format "CREATE TABLE %s (customer_id INT64)
PARTITION BY RANGE_BUCKET(customer_id, GENERATE_ARRAY(0, 100, 10))
......@@ -324,15 +350,23 @@
(format "INSERT INTO %s (customer_id)
VALUES (1), (2), (3);"
(fmt-table-name "partition_by_range"))
(format "CREATE MATERIALIZED VIEW %s AS
SELECT customer_id + 41 as vip_customer FROM %s WHERE customer_id = 1;"
(fmt-table-name "mv_partition_by_range")
(fmt-table-name "partition_by_range"))
(format "CREATE TABLE %s (company STRING, founded DATETIME)
PARTITION BY DATE(founded)
OPTIONS (require_partition_filter = TRUE);"
(fmt-table-name "partitioned_by_datetime"))
(fmt-table-name "partition_by_datetime"))
(format "INSERT INTO %s (company, founded)
VALUES ('Metabase', DATETIME('2014-10-10 00:00:00')),
('Tesla', DATETIME('2003-07-01 00:00:00')),
('Apple', DATETIME('1976-04-01 00:00:00'));"
(fmt-table-name "partitioned_by_datetime"))
(fmt-table-name "partition_by_datetime"))
(format "CREATE MATERIALIZED VIEW %s AS
SELECT company AS ev_company FROM %s WHERE founded = DATETIME('2003-07-01 00:00:00');"
(fmt-table-name "mv_partition_by_datetime")
(fmt-table-name "partition_by_datetime"))
(format "CREATE TABLE %s (name STRING, birthday TIMESTAMP)
PARTITION BY DATE(birthday)
OPTIONS (require_partition_filter = TRUE);"
......@@ -357,21 +391,23 @@
(fmt-table-name "partition_by_ingestion_time_not_required"))]]
(bigquery.tx/execute! sql))
(sync/sync-database! (mt/db))
(let [table-ids (t2/select-pks-vec :model/Table :db_id (mt/id) :name [:in table-names])
(let [table-ids (t2/select-pks-vec :model/Table :db_id (mt/id) :name [:in (concat mv-names table-names)])
all-field-ids (t2/select-pks-vec :model/Field :table_id [:in table-ids])]
(testing "all fields are fingerprinted"
(is (every? some? (t2/select-fn-vec :fingerprint :model/Field :id [:in all-field-ids]))))
(testing "Field values are correctly synced"
(is (= {"customer_id" #{1 2 3}
"vip_customer" #{42}
"name" #{"Khuat" "Quang" "Ngoc"}
"company" #{"Metabase" "Tesla" "Apple"}
"ev_company" #{"Tesla"}
"is_awesome" #{true false}
"is_opensource" #{true false}}
(->> (t2/query {:select [[:field.name :field-name] [:fv.values :values]]
:from [[:metabase_field :field]]
:join [[:metabase_fieldvalues :fv] [:= :field.id :fv.field_id]]
:where [:and [:in :field.table_id table-ids]
[:in :field.name ["customer_id" "name" "is_awesome" "is_opensource" "company"]]]})
[:in :field.name ["customer_id" "vip_customer" "name" "is_awesome" "is_opensource" "company" "ev_company"]]]})
(map #(update % :values (comp set json/parse-string)))
(map (juxt :field-name :values))
(into {}))))))
......@@ -395,6 +431,7 @@
(first (mt/rows (mt/run-mbql-query partition_by_ingestion_time_not_required {:limit 1})))))))
(finally
(doall (map drop-table-if-exists! table-names))
(doall (map drop-mv-if-exists! mv-names))
nil)))))))
(deftest sync-update-require-partition-option-test
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment