Skip to content
Snippets Groups Projects
Unverified Commit 113d75ce authored by dpsutton's avatar dpsutton Committed by GitHub
Browse files

sync the attached datawarehouse (#49970)

* sync the attached datawarehouse

requires either "sync_db" true or a table name

it will error
```shell
❯ http post 'localhost:3000/api/notify/db/attached_datawarehouse' x-metabase-apikey:apikey -pb
Must provide `sync_db` or a `table_name` to sync
```

It will sync the db when sync_db is true [NOTE: this is outdated. now just omitting the table name will sync the db]
```shell
❯ echo '{"synchronous?": true, "sync_db": true}' | http post 'localhost:3000/api/notify/db/attached_datawarehouse' x-metabase-apikey:apikey -pb
{
    "success": true
}
```

It will sync existing tables
```shell
❯ http post 'localhost:3000/api/notify/db/attached_datawarehouse' table_name=existing schema_name=public x-metabase-apikey:apikey -pb
{
    "success": true
}
```

it will error if it cannot find a table
```shell
❯ http post 'localhost:3000/api/notify/db/attached_datawarehouse' table_name=new schema_name=public x-metabase-apikey:apikey -pb
{
    "cause": "Unable to identify table 'public.new'",
    "data": {
        "schema_name": "public",
        "status-code": 404,
        "table_name": "new"
    },
    "message": "Unable to identify table 'public.new'",
    "schema_name": "public",
    "table_name": "new",
    "trace": [],
    "via": [
        {
            "data": {
                "schema_name": "public",
                "status-code": 404,
                "table_name": "new"
            },
            "message": "Unable to identify table 'public.new'",
            "type": "clojure.lang.ExceptionInfo"
        }
    ]
}
```

if i create that table
```sql
attached=# create table new (id int);
CREATE TABLE
```

it will then find and sync it

```shell
❯ http post 'localhost:3000/api/notify/db/attached_datawarehouse' table_name=new schema_name=public x-metabase-apikey:apikey -pb
{
    "success": true
}
```

* formatting

* Tests for attached datawarehouse

* PR nits

typo in `thorw` -> `throw`
comment explaining why `find-and-sync-new-table` is always sync
gensym the db name in test
remove a binding for a fn `sync!` that's only called once
test `{:sync_db true}` syncs the whole db

* stupid formatting :mechanical_arm:

* Use absence of table_name as indicator to sync database
parent bffdfada
No related branches found
No related tags found
No related merge requests found
(ns metabase.api.notify
"/api/notify/* endpoints which receive inbound etl server notifications."
(:require
[clojure.string :as str]
[compojure.core :refer [POST]]
[metabase.api.common :as api]
[metabase.driver :as driver]
......@@ -33,19 +34,61 @@
table-sync-fn (if schema? sync-metadata/sync-table-metadata! sync/sync-table!)
db-sync-fn (if schema? sync-metadata/sync-db-metadata! sync/sync-database!)]
(api/let-404 [database (t2/select-one Database :id id)]
(cond-> (cond
table_id (api/let-404 [table (t2/select-one Table :db_id id, :id (int table_id))]
(future (table-sync-fn table)))
table_name (api/let-404 [table (t2/select-one Table :db_id id, :name table_name)]
(future (table-sync-fn table)))
:else (future (db-sync-fn database)))
synchronous? deref)))
(let [table (cond
table_id (api/check-404 (t2/select-one Table :db_id id, :id (int table_id)))
table_name (api/check-404 (t2/select-one Table :db_id id, :name table_name)))]
(cond-> (future (if table
(table-sync-fn table)
(db-sync-fn database)))
synchronous? deref))))
{:success true})
(defn- without-stacktrace [^Throwable throwable]
(doto throwable
(.setStackTrace (make-array StackTraceElement 0))))
(defn- find-and-sync-new-table
[database table-name schema-name]
(let [driver (driver.u/database->driver database)
{db-tables :tables} (driver/describe-database driver database)]
(if-let [table (some (fn [table-in-db]
(when (and (= schema-name (:schema table-in-db))
(= table-name (:name table-in-db)))
table-in-db))
db-tables)]
(let [created (sync-tables/create-or-reactivate-table! database table)]
(doto created
sync/sync-table!
sync-util/set-initial-table-sync-complete!))
(throw (without-stacktrace
(ex-info (trs "Unable to identify table ''{0}.{1}''"
schema-name table-name)
{:status-code 404
:schema_name schema-name
:table_name table-name}))))))
(api/defendpoint POST "/db/attached_datawarehouse"
"Sync the attached datawarehouse. Can provide in the body:
- table_name and schema_name: both strings. Will look for an existing table and sync it, otherwise will try to find a
new table with that name and sync it. If it cannot find a table it will throw an error. If table_name is empty or
blank, will sync the entire database.
- synchronous?: is a boolean value to indicate if this should block on the result."
[:as {{:keys [table_name schema_name synchronous?]} :body}]
{table_name [:maybe ms/NonBlankString]
schema_name [:maybe string?]
synchronous? [:maybe ms/BooleanValue]}
(api/let-404 [database (t2/select-one :model/Database :is_attached_dwh true)]
(if (str/blank? table_name)
(cond-> (future (sync-metadata/sync-db-metadata! database))
synchronous? deref)
(if-let [table (t2/select-one Table :db_id (:id database), :name table_name :schema schema_name)]
(cond-> (future (sync-metadata/sync-table-metadata! table))
synchronous? deref)
;; find and sync is always synchronous. And we want it to be so since the "can't find this table" error is
;; rather informative. If it's on a future we won't see it.
(find-and-sync-new-table database table_name schema_name))))
{:success true})
(api/defendpoint POST "/db/:id/new-table"
"Sync a new table without running a full database sync. Requires `schema_name` and `table_name`. Will throw an error
if the table already exists in Metabase or cannot be found."
......@@ -55,23 +98,7 @@
table_name ms/NonBlankString}
(api/let-404 [database (t2/select-one Database :id id)]
(if-not (t2/select-one Table :db_id id :name table_name :schema schema_name)
(let [driver (driver.u/database->driver database)
{db-tables :tables} (driver/describe-database driver database)]
(if-let [table (some (fn [table-in-db]
(when (and (= schema_name (:schema table-in-db))
(= table_name (:name table-in-db)))
table-in-db))
db-tables)]
(let [created (sync-tables/create-or-reactivate-table! database table)]
(doto created
sync/sync-table!
sync-util/set-initial-table-sync-complete!))
(throw (without-stacktrace
(ex-info (trs "Unable to identify table ''{0}.{1}''"
schema_name table_name)
{:status-code 404
:schema_name schema_name
:table_name table_name})))))
(find-and-sync-new-table database table_name schema_name)
(throw (without-stacktrace
(ex-info (trs "Table ''{0}.{1}'' already exists"
schema_name table_name)
......
......@@ -143,3 +143,72 @@
(let [tables (tableset database)]
(is (= #{"public.foo" "public.bar"} tables))
(is (false? (contains? tables "public.fern"))))))))))
(defn do-with-no-attached-data-warehouses
[f]
(let [attached (t2/select-fn-set :id :model/Database :is_attached_dwh true)]
(try
(when (seq attached)
(t2/update! :model/Database :id [:in attached] {:is_attached_dwh false}))
(f)
(finally
(when (seq attached)
(t2/update! :model/Database :id [:in attached] {:is_attached_dwh true}))))))
(defmacro with-no-attached-data-warehouses
[& body]
`(do-with-no-attached-data-warehouses (fn [] ~@body)))
(deftest sync-data-warehouse-test
(mt/test-driver :postgres
(testing "Ensure we can interact with the attached datawarehouse db"
(with-no-attached-data-warehouses
(let [db-name (str (gensym "attached_datawarehouse"))]
(try
(postgres-test/drop-if-exists-and-create-db! db-name)
(let [details (mt/dbdef->connection-details :postgres :db {:database-name db-name})]
(mt/with-temp [Database database {:engine :postgres
:details (assoc details :dbname db-name)
:is_attached_dwh true}]
(let [spec (sql-jdbc.conn/connection-details->spec :postgres details)
exec! (fn [spec statements] (doseq [statement statements] (jdbc/execute! spec [statement])))
tableset #(set (map (fn [{:keys [schema name]}] (format "%s.%s" schema name)) (t2/select 'Table :db_id (:id %))))
post (fn post-api
([payload] (post-api payload 200))
([payload expected-code]
(mt/with-temporary-setting-values [api-key "test-api-key"]
(mt/client-full-response
:post expected-code "notify/db/attached_datawarehouse"
{:request-options api-headers}
(merge {:synchronous? true}
payload)))))]
;; Create the initial table and sync it.
(exec! spec ["CREATE TABLE public.FOO (val bigint NOT NULL);"])
(sync/sync-database! database)
(let [tables (tableset database)]
(is (= #{"public.foo"} tables)))
(testing "We can sync an existing database"
(is (= 200 (:status (post {:schema_name "public" :table_name "foo"} 200)))))
(testing "And it will see new fields"
(exec! spec ["ALTER TABLE public.FOO add column newly_added int"])
(is (= 200 (:status (post {:schema_name "public" :table_name "foo"} 200))))
(let [table (t2/select-one :model/Table :db_id (:id database) :name "foo")
fields (t2/select :model/Field :table_id (:id table))]
(is (= #{"val" "newly_added"} (into #{} (map :name) fields)))))
(testing "We get a 404 for non-existant tables"
(is (= 404 (:status (post {:schema_name "public" :table_name "bar"} 404)))))
;; Create two more tables that are not yet synced
(exec! spec ["CREATE TABLE public.BAR (val bigint NOT NULL);"
"CREATE TABLE public.FERN (val bigint NOT NULL);"])
(testing "But we will see new fields"
(is (= 200 (:status (post {:schema_name "public" :table_name "bar"})))))
;; Assert that only the synced tables are present.
(let [tables (tableset database)]
(is (= #{"public.foo" "public.bar"} tables))
(is (false? (contains? tables "public.fern"))))
(testing "We can sync the whole database as well"
(is (= 200 (:status (post {}))))
(let [tables (tableset database)]
(is (= #{"public.foo" "public.bar" "public.fern"} tables)))))))
(finally
(postgres-test/drop-if-exists-and-create-db! db-name :pg/just-drop))))))))
......@@ -96,17 +96,18 @@
(defn drop-if-exists-and-create-db!
"Drop a Postgres database named `db-name` if it already exists; then create a new empty one with that name."
[db-name]
[db-name & [just-drop]]
(let [spec (sql-jdbc.conn/connection-details->spec :postgres (mt/dbdef->connection-details :postgres :server nil))]
;; kill any open connections
(jdbc/query spec ["SELECT pg_terminate_backend(pg_stat_activity.pid)
FROM pg_stat_activity
WHERE pg_stat_activity.datname = ?;" db-name])
;; create the DB
(jdbc/execute! spec [(format "DROP DATABASE IF EXISTS \"%s\";
CREATE DATABASE \"%s\";"
db-name db-name)]
{:transaction? false})))
(jdbc/execute! spec [(format "DROP DATABASE IF EXISTS \"%s\"" db-name)]
{:transaction? false})
(when (not= just-drop :pg/just-drop)
(jdbc/execute! spec [(format "CREATE DATABASE \"%s\";" db-name)]
{:transaction? false}))))
(defn- exec!
"Execute a sequence of statements against the database whose spec is passed as the first param."
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment