Skip to content
Snippets Groups Projects
Unverified Commit 6dee6012 authored by github-automation-metabase's avatar github-automation-metabase Committed by GitHub
Browse files

[Databricks] Use system's information schema during sync (#48950) (#49402)


* Use system schema

* Use data only from selected catalog

* Adjust tests

* Update modules/drivers/databricks/test/metabase/driver/databricks_test.clj



* Comments

---------

Co-authored-by: default avatarlbrdnk <lbrdnk@users.noreply.github.com>
Co-authored-by: default avatarmetamben <103100869+metamben@users.noreply.github.com>
parent 49ed316b
Branches
Tags
No related merge requests found
......@@ -54,7 +54,7 @@
" TABLE_NAME as name,"
" TABLE_SCHEMA as schema,"
" COMMENT description"
" from information_schema.tables"
" from system.information_schema.tables"
" where TABLE_CATALOG = ?"
" AND TABLE_SCHEMA <> 'information_schema'"])
catalog])
......@@ -79,7 +79,8 @@
e)))))
(defmethod sql-jdbc.sync/describe-fields-sql :databricks
[driver & {:keys [schema-names table-names]}]
[driver & {:keys [schema-names table-names catalog]}]
(assert (string? (not-empty catalog)) "`catalog` is required for sync.")
(sql/format {:select [[:c.column_name :name]
[:c.full_data_type :database-type]
[:c.ordinal_position :database-position]
......@@ -87,7 +88,7 @@
[:c.table_name :table-name]
[[:case [:= :cs.constraint_type [:inline "PRIMARY KEY"]] true :else false] :pk?]
[[:case [:not= :c.comment [:inline ""]] :c.comment :else nil] :field-comment]]
:from [[:information_schema.columns :c]]
:from [[:system.information_schema.columns :c]]
;; Following links contains contains diagram of `information_schema`:
;; https://docs.databricks.com/en/sql/language-manual/sql-ref-information-schema.html
:left-join [[{:select [[:tc.table_catalog :table_catalog]
......@@ -95,8 +96,8 @@
[:tc.table_name :table_name]
[:ccu.column_name :column_name]
[:tc.constraint_type :constraint_type]]
:from [[:information_schema.table_constraints :tc]]
:join [[:information_schema.constraint_column_usage :ccu]
:from [[:system.information_schema.table_constraints :tc]]
:join [[:system.information_schema.constraint_column_usage :ccu]
[:and
[:= :tc.constraint_catalog :ccu.constraint_catalog]
[:= :tc.constraint_schema :ccu.constraint_schema]
......@@ -117,6 +118,7 @@
[:= :c.table_name :cs.table_name]
[:= :c.column_name :cs.column_name]]]
:where [:and
[:= :c.table_catalog [:inline catalog]]
;; Ignore `timestamp_ntz` type columns. Columns of this type are not recognizable from
;; `timestamp` columns when fetching the data. This exception should be removed when the problem
;; is resolved by Databricks in underlying jdbc driver.
......@@ -127,8 +129,14 @@
:order-by [:table-schema :table-name :database-position]}
:dialect (sql.qp/quote-style driver)))
(defmethod driver/describe-fields :sql-jdbc
[driver database & {:as args}]
(let [catalog (get-in database [:details :catalog])]
(sql-jdbc.sync/describe-fields driver database (assoc args :catalog catalog))))
(defmethod sql-jdbc.sync/describe-fks-sql :databricks
[driver & {:keys [schema-names table-names]}]
[driver & {:keys [schema-names table-names catalog]}]
(assert (string? (not-empty catalog)) "`catalog` is required for sync.")
(sql/format {:select (vec
{:fk_kcu.table_schema "fk-table-schema"
:fk_kcu.table_name "fk-table-name"
......@@ -136,24 +144,30 @@
:pk_kcu.table_schema "pk-table-schema"
:pk_kcu.table_name "pk-table-name"
:pk_kcu.column_name "pk-column-name"})
:from [[:information_schema.key_column_usage :fk_kcu]]
:join [[:information_schema.referential_constraints :rc]
:from [[:system.information_schema.key_column_usage :fk_kcu]]
:join [[:system.information_schema.referential_constraints :rc]
[:and
[:= :fk_kcu.constraint_catalog :rc.constraint_catalog]
[:= :fk_kcu.constraint_schema :rc.constraint_schema]
[:= :fk_kcu.constraint_name :rc.constraint_name]]
[:information_schema.key_column_usage :pk_kcu]
[:system.information_schema.key_column_usage :pk_kcu]
[[:and
[:= :pk_kcu.constraint_catalog :rc.unique_constraint_catalog]
[:= :pk_kcu.constraint_schema :rc.unique_constraint_schema]
[:= :pk_kcu.constraint_name :rc.unique_constraint_name]]]]
:where [:and
[:= :fk_kcu.table_catalog [:inline catalog]]
[:not [:in :fk_kcu.table_schema ["information_schema"]]]
(when table-names [:in :fk_kcu.table_name table-names])
(when schema-names [:in :fk_kcu.table_schema schema-names])]
:order-by [:fk-table-schema :fk-table-name]}
:dialect (sql.qp/quote-style driver)))
(defmethod driver/describe-fks :sql-jdbc
[driver database & {:as args}]
(let [catalog (get-in database [:details :catalog])]
(sql-jdbc.sync/describe-fks driver database (assoc args :catalog catalog))))
(defmethod sql-jdbc.execute/set-timezone-sql :databricks
[_driver]
"SET TIME ZONE %s;")
......
......@@ -45,102 +45,108 @@
(is (contains? (:tables actual-tables) {:name "bird", :schema "bird-flocks", :description nil})))))))
(deftest ^:parallel describe-fields-test
(testing "`describe-fields` returns expected values"
(mt/test-driver
:databricks
(is (= #{{:table-schema "test-data"
:table-name "orders"
:pk? true
:name "id"
:database-type "int"
:database-position 0
:base-type :type/Integer
:json-unfolding false}
{:table-schema "test-data"
:table-name "orders"
:pk? false
:name "user_id"
:database-type "int"
:database-position 1
:base-type :type/Integer
:json-unfolding false}
{:table-schema "test-data"
:table-name "orders"
:pk? false
:name "product_id"
:database-type "int"
:database-position 2
:base-type :type/Integer
:json-unfolding false}
{:table-schema "test-data"
:table-name "orders"
:pk? false
:name "subtotal"
:database-type "double"
:database-position 3
:base-type :type/Float
:json-unfolding false}
{:table-schema "test-data"
:table-name "orders"
:pk? false
:name "tax"
:database-type "double"
:database-position 4
:base-type :type/Float
:json-unfolding false}
{:table-schema "test-data"
:table-name "orders"
:pk? false
:name "total"
:database-type "double"
:database-position 5
:base-type :type/Float
:json-unfolding false}
{:table-schema "test-data"
:table-name "orders"
:pk? false
:name "discount"
:database-type "double"
:database-position 6
:base-type :type/Float
:json-unfolding false}
{:table-schema "test-data"
:table-name "orders"
:pk? false
:name "created_at"
:database-type "timestamp"
:database-position 7
:base-type :type/DateTimeWithLocalTZ
:json-unfolding false}
{:table-schema "test-data"
:table-name "orders"
:pk? false
:name "quantity"
:database-type "int"
:database-position 8
:base-type :type/Integer
:json-unfolding false}}
(reduce conj #{} (driver/describe-fields :databricks (mt/db) {:schema-names ["test-data"]
:table-names ["orders"]})))))))
(mt/test-driver
:databricks
(let [fields (vec (driver/describe-fields :databricks (mt/db) {:schema-names ["test-data"]
:table-names ["orders"]}))]
(testing "Underlying query returns only fields from selected catalog"
(is (= 9 (count fields))))
(testing "Expected fields are returned"
(is (= #{{:table-schema "test-data"
:table-name "orders"
:pk? true
:name "id"
:database-type "int"
:database-position 0
:base-type :type/Integer
:json-unfolding false}
{:table-schema "test-data"
:table-name "orders"
:pk? false
:name "user_id"
:database-type "int"
:database-position 1
:base-type :type/Integer
:json-unfolding false}
{:table-schema "test-data"
:table-name "orders"
:pk? false
:name "product_id"
:database-type "int"
:database-position 2
:base-type :type/Integer
:json-unfolding false}
{:table-schema "test-data"
:table-name "orders"
:pk? false
:name "subtotal"
:database-type "double"
:database-position 3
:base-type :type/Float
:json-unfolding false}
{:table-schema "test-data"
:table-name "orders"
:pk? false
:name "tax"
:database-type "double"
:database-position 4
:base-type :type/Float
:json-unfolding false}
{:table-schema "test-data"
:table-name "orders"
:pk? false
:name "total"
:database-type "double"
:database-position 5
:base-type :type/Float
:json-unfolding false}
{:table-schema "test-data"
:table-name "orders"
:pk? false
:name "discount"
:database-type "double"
:database-position 6
:base-type :type/Float
:json-unfolding false}
{:table-schema "test-data"
:table-name "orders"
:pk? false
:name "created_at"
:database-type "timestamp"
:database-position 7
:base-type :type/DateTimeWithLocalTZ
:json-unfolding false}
{:table-schema "test-data"
:table-name "orders"
:pk? false
:name "quantity"
:database-type "int"
:database-position 8
:base-type :type/Integer
:json-unfolding false}}
(set fields)))))))
(deftest ^:parallel describe-fks-test
(testing "`describe-fks` returns expected values"
(mt/test-driver
:databricks
(is (= #{{:fk-table-schema "test-data"
:fk-table-name "orders"
:fk-column-name "product_id"
:pk-table-schema "test-data"
:pk-table-name "products"
:pk-column-name "id"}
{:fk-table-schema "test-data"
:fk-table-name "orders"
:fk-column-name "user_id"
:pk-table-schema "test-data"
:pk-table-name "people"
:pk-column-name "id"}}
(reduce conj #{} (driver/describe-fks :databricks (mt/db) {:schema-names ["test-data"]
:table-names ["orders"]})))))))
(mt/test-driver
:databricks
(let [fks (vec (driver/describe-fks :databricks (mt/db) {:schema-names ["test-data"]
:table-names ["orders"]}))]
(testing "Only fks from current catalog are registered"
(is (= 2 (count fks))))
(testing "Expected fks are returned"
(is (= #{{:fk-table-schema "test-data"
:fk-table-name "orders"
:fk-column-name "product_id"
:pk-table-schema "test-data"
:pk-table-name "products"
:pk-column-name "id"}
{:fk-table-schema "test-data"
:fk-table-name "orders"
:fk-column-name "user_id"
:pk-table-schema "test-data"
:pk-table-name "people"
:pk-column-name "id"}}
(set fks)))))))
(mt/defdataset dataset-with-ntz
[["table_with_ntz" [{:field-name "timestamp"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment