diff --git a/src/metabase/api/notify.clj b/src/metabase/api/notify.clj index dc85d3b9bc8d734db50cd15e5c5ae31d30a2bd33..e1020b6b1de0fe02d2763f1c6f6590b1c5c32004 100644 --- a/src/metabase/api/notify.clj +++ b/src/metabase/api/notify.clj @@ -1,6 +1,7 @@ (ns metabase.api.notify "/api/notify/* endpoints which receive inbound etl server notifications." (:require + [clojure.string :as str] [compojure.core :refer [POST]] [metabase.api.common :as api] [metabase.driver :as driver] @@ -33,19 +34,61 @@ table-sync-fn (if schema? sync-metadata/sync-table-metadata! sync/sync-table!) db-sync-fn (if schema? sync-metadata/sync-db-metadata! sync/sync-database!)] (api/let-404 [database (t2/select-one Database :id id)] - (cond-> (cond - table_id (api/let-404 [table (t2/select-one Table :db_id id, :id (int table_id))] - (future (table-sync-fn table))) - table_name (api/let-404 [table (t2/select-one Table :db_id id, :name table_name)] - (future (table-sync-fn table))) - :else (future (db-sync-fn database))) - synchronous? deref))) + (let [table (cond + table_id (api/check-404 (t2/select-one Table :db_id id, :id (int table_id))) + table_name (api/check-404 (t2/select-one Table :db_id id, :name table_name)))] + (cond-> (future (if table + (table-sync-fn table) + (db-sync-fn database))) + synchronous? deref)))) {:success true}) (defn- without-stacktrace [^Throwable throwable] (doto throwable (.setStackTrace (make-array StackTraceElement 0)))) +(defn- find-and-sync-new-table + [database table-name schema-name] + (let [driver (driver.u/database->driver database) + {db-tables :tables} (driver/describe-database driver database)] + (if-let [table (some (fn [table-in-db] + (when (and (= schema-name (:schema table-in-db)) + (= table-name (:name table-in-db))) + table-in-db)) + db-tables)] + (let [created (sync-tables/create-or-reactivate-table! database table)] + (doto created + sync/sync-table! + sync-util/set-initial-table-sync-complete!)) + (throw (without-stacktrace + (ex-info (trs "Unable to identify table ''{0}.{1}''" + schema-name table-name) + {:status-code 404 + :schema_name schema-name + :table_name table-name})))))) + +(api/defendpoint POST "/db/attached_datawarehouse" + "Sync the attached datawarehouse. Can provide in the body: + - table_name and schema_name: both strings. Will look for an existing table and sync it, otherwise will try to find a + new table with that name and sync it. If it cannot find a table it will throw an error. If table_name is empty or + blank, will sync the entire database. + - synchronous?: is a boolean value to indicate if this should block on the result." + [:as {{:keys [table_name schema_name synchronous?]} :body}] + {table_name [:maybe ms/NonBlankString] + schema_name [:maybe string?] + synchronous? [:maybe ms/BooleanValue]} + (api/let-404 [database (t2/select-one :model/Database :is_attached_dwh true)] + (if (str/blank? table_name) + (cond-> (future (sync-metadata/sync-db-metadata! database)) + synchronous? deref) + (if-let [table (t2/select-one Table :db_id (:id database), :name table_name :schema schema_name)] + (cond-> (future (sync-metadata/sync-table-metadata! table)) + synchronous? deref) + ;; find and sync is always synchronous. And we want it to be so since the "can't find this table" error is + ;; rather informative. If it's on a future we won't see it. + (find-and-sync-new-table database table_name schema_name)))) + {:success true}) + (api/defendpoint POST "/db/:id/new-table" "Sync a new table without running a full database sync. Requires `schema_name` and `table_name`. Will throw an error if the table already exists in Metabase or cannot be found." @@ -55,23 +98,7 @@ table_name ms/NonBlankString} (api/let-404 [database (t2/select-one Database :id id)] (if-not (t2/select-one Table :db_id id :name table_name :schema schema_name) - (let [driver (driver.u/database->driver database) - {db-tables :tables} (driver/describe-database driver database)] - (if-let [table (some (fn [table-in-db] - (when (and (= schema_name (:schema table-in-db)) - (= table_name (:name table-in-db))) - table-in-db)) - db-tables)] - (let [created (sync-tables/create-or-reactivate-table! database table)] - (doto created - sync/sync-table! - sync-util/set-initial-table-sync-complete!)) - (throw (without-stacktrace - (ex-info (trs "Unable to identify table ''{0}.{1}''" - schema_name table_name) - {:status-code 404 - :schema_name schema_name - :table_name table_name}))))) + (find-and-sync-new-table database table_name schema_name) (throw (without-stacktrace (ex-info (trs "Table ''{0}.{1}'' already exists" schema_name table_name) diff --git a/test/metabase/api/notify_test.clj b/test/metabase/api/notify_test.clj index a79e4a64a0c7605d8539f3733590340b470091c4..a1a6336a74cbe385c3fcda17cc5ff91bad7ad1fb 100644 --- a/test/metabase/api/notify_test.clj +++ b/test/metabase/api/notify_test.clj @@ -143,3 +143,72 @@ (let [tables (tableset database)] (is (= #{"public.foo" "public.bar"} tables)) (is (false? (contains? tables "public.fern")))))))))) + +(defn do-with-no-attached-data-warehouses + [f] + (let [attached (t2/select-fn-set :id :model/Database :is_attached_dwh true)] + (try + (when (seq attached) + (t2/update! :model/Database :id [:in attached] {:is_attached_dwh false})) + (f) + (finally + (when (seq attached) + (t2/update! :model/Database :id [:in attached] {:is_attached_dwh true})))))) + +(defmacro with-no-attached-data-warehouses + [& body] + `(do-with-no-attached-data-warehouses (fn [] ~@body))) + +(deftest sync-data-warehouse-test + (mt/test-driver :postgres + (testing "Ensure we can interact with the attached datawarehouse db" + (with-no-attached-data-warehouses + (let [db-name (str (gensym "attached_datawarehouse"))] + (try + (postgres-test/drop-if-exists-and-create-db! db-name) + (let [details (mt/dbdef->connection-details :postgres :db {:database-name db-name})] + (mt/with-temp [Database database {:engine :postgres + :details (assoc details :dbname db-name) + :is_attached_dwh true}] + (let [spec (sql-jdbc.conn/connection-details->spec :postgres details) + exec! (fn [spec statements] (doseq [statement statements] (jdbc/execute! spec [statement]))) + tableset #(set (map (fn [{:keys [schema name]}] (format "%s.%s" schema name)) (t2/select 'Table :db_id (:id %)))) + post (fn post-api + ([payload] (post-api payload 200)) + ([payload expected-code] + (mt/with-temporary-setting-values [api-key "test-api-key"] + (mt/client-full-response + :post expected-code "notify/db/attached_datawarehouse" + {:request-options api-headers} + (merge {:synchronous? true} + payload)))))] + ;; Create the initial table and sync it. + (exec! spec ["CREATE TABLE public.FOO (val bigint NOT NULL);"]) + (sync/sync-database! database) + (let [tables (tableset database)] + (is (= #{"public.foo"} tables))) + (testing "We can sync an existing database" + (is (= 200 (:status (post {:schema_name "public" :table_name "foo"} 200))))) + (testing "And it will see new fields" + (exec! spec ["ALTER TABLE public.FOO add column newly_added int"]) + (is (= 200 (:status (post {:schema_name "public" :table_name "foo"} 200)))) + (let [table (t2/select-one :model/Table :db_id (:id database) :name "foo") + fields (t2/select :model/Field :table_id (:id table))] + (is (= #{"val" "newly_added"} (into #{} (map :name) fields))))) + (testing "We get a 404 for non-existant tables" + (is (= 404 (:status (post {:schema_name "public" :table_name "bar"} 404))))) + ;; Create two more tables that are not yet synced + (exec! spec ["CREATE TABLE public.BAR (val bigint NOT NULL);" + "CREATE TABLE public.FERN (val bigint NOT NULL);"]) + (testing "But we will see new fields" + (is (= 200 (:status (post {:schema_name "public" :table_name "bar"}))))) + ;; Assert that only the synced tables are present. + (let [tables (tableset database)] + (is (= #{"public.foo" "public.bar"} tables)) + (is (false? (contains? tables "public.fern")))) + (testing "We can sync the whole database as well" + (is (= 200 (:status (post {})))) + (let [tables (tableset database)] + (is (= #{"public.foo" "public.bar" "public.fern"} tables))))))) + (finally + (postgres-test/drop-if-exists-and-create-db! db-name :pg/just-drop)))))))) diff --git a/test/metabase/driver/postgres_test.clj b/test/metabase/driver/postgres_test.clj index a605a979784ec39b07fbb8247e433afbaa5f2f28..20f3f25bb576fe6411e38b41a2e1c57caa4e468c 100644 --- a/test/metabase/driver/postgres_test.clj +++ b/test/metabase/driver/postgres_test.clj @@ -96,17 +96,18 @@ (defn drop-if-exists-and-create-db! "Drop a Postgres database named `db-name` if it already exists; then create a new empty one with that name." - [db-name] + [db-name & [just-drop]] (let [spec (sql-jdbc.conn/connection-details->spec :postgres (mt/dbdef->connection-details :postgres :server nil))] ;; kill any open connections (jdbc/query spec ["SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE pg_stat_activity.datname = ?;" db-name]) ;; create the DB - (jdbc/execute! spec [(format "DROP DATABASE IF EXISTS \"%s\"; - CREATE DATABASE \"%s\";" - db-name db-name)] - {:transaction? false}))) + (jdbc/execute! spec [(format "DROP DATABASE IF EXISTS \"%s\"" db-name)] + {:transaction? false}) + (when (not= just-drop :pg/just-drop) + (jdbc/execute! spec [(format "CREATE DATABASE \"%s\";" db-name)] + {:transaction? false})))) (defn- exec! "Execute a sequence of statements against the database whose spec is passed as the first param."