From a767fd31c9fd69571298e6f4dabe9b822794d851 Mon Sep 17 00:00:00 2001
From: Chris Truter <>
Date: Mon, 18 Mar 2024 12:21:45 -0600
Subject: [PATCH] Support appending new CSV fields (#39951) (#40220)

 src/metabase/driver/h2.clj    |   8 +++
 src/metabase/upload.clj       | 125 ++++++++++++++++++++--------------
 test/metabase/upload_test.clj |  71 +++++++++++++------
 3 files changed, 129 insertions(+), 75 deletions(-)

diff --git a/src/metabase/driver/h2.clj b/src/metabase/driver/h2.clj
index ec9836388af..6a03cf100a2 100644
--- a/src/metabase/driver/h2.clj
+++ b/src/metabase/driver/h2.clj
@@ -580,3 +580,11 @@
+(defmethod driver/add-columns! :h2
+  [driver db-id table-name column-definitions & {:as settings}]
+  ;; Workaround for the fact that H2 uses different syntax for adding multiple columns, which is difficult to
+  ;; produce with HoneySQL. As a simpler workaround we instead break it up into single column statements.
+  (let [f (get-method driver/add-columns! :sql-jdbc)]
+    (doseq [[k v] column-definitions]
+      (f driver db-id table-name {k v} settings))))
diff --git a/src/metabase/upload.clj b/src/metabase/upload.clj
index b24d437259b..a7ba89b614d 100644
--- a/src/metabase/upload.clj
+++ b/src/metabase/upload.clj
@@ -570,28 +570,33 @@
 (defn- base-type->upload-type
   "Returns the most specific upload type for the given base type."
-  (condp #(isa? %2 %1) base-type
-    :type/Float                  ::float
-    :type/BigInteger             ::int
-    :type/Integer                ::int
-    :type/Boolean                ::boolean
-    :type/DateTimeWithTZ         ::offset-datetime
-    :type/DateTime               ::datetime
-    :type/Date                   ::date
-    :type/Text                   ::text))
+  (when base-type
+    (condp #(isa? %2 %1) base-type
+      :type/Float                  ::float
+      :type/BigInteger             ::int
+      :type/Integer                ::int
+      :type/Boolean                ::boolean
+      :type/DateTimeWithTZ         ::offset-datetime
+      :type/DateTime               ::datetime
+      :type/Date                   ::date
+      :type/Text                   ::text)))
 (defn- not-blank [s]
   (when-not (str/blank? s)
 (defn- extra-and-missing-error-markdown [extra missing]
-  (->> [[(tru "The CSV file contains extra columns that are not in the table:") extra]
-        [(tru "The CSV file is missing columns that are in the table:") missing]]
-       (keep (fn [[header columns]]
-               (when (seq columns)
-                 (str/join "\n" (cons header (map #(str "- " %) columns))))))
-       (str/join "\n\n")
-       (not-blank)))
+  (when (seq missing)
+    (->> [[(tru "The CSV file is missing columns that are in the table:") missing]
+          ;; Even though we allow new columns to be implicitly added by uploads, we mention then in the error messages
+          ;; for missing fields as a common case will be the misspelling of names. Seeing the actual and expected
+          ;; names together could help customers spot the root cause more easily.
+          [(tru "There are new columns in the CSV file that are not in the table:") extra]]
+         (keep (fn [[header columns]]
+                 (when (seq columns)
+                   (str/join "\n" (cons header (map #(str "- " %) columns))))))
+         (str/join "\n\n")
+         (not-blank))))
 (def ^:private allowed-type-upgrades
   "A mapping of which types a column can be implicitly relaxed to, based on the content of appended values."
@@ -607,35 +612,50 @@
   ;; Assumes table-cols are unique when normalized
   (let [normalized-field-names (keys fields-by-normed-name)
         normalized-header      (map normalize-column-name header)
-        [extra missing _both] (data/diff (set normalized-header) (set normalized-field-names))]
+        [extra missing _both]  (data/diff (set normalized-header) (set normalized-field-names))]
     ;; check for duplicates
     (when (some #(< 1 %) (vals (frequencies normalized-header)))
       (throw (ex-info (tru "The CSV file contains duplicate column names.")
                       {:status-code 422})))
-    (when (or extra missing)
-      (let [error-message (extra-and-missing-error-markdown extra missing)]
-        (throw (ex-info error-message {:status-code 422}))))))
+    (when-let [error-message (extra-and-missing-error-markdown extra missing)]
+      (throw (ex-info error-message {:status-code 422})))))
 (defn- matching-or-upgradable? [current-type relaxed-type]
-  (or (= current-type relaxed-type)
+  (or (nil? current-type)
+      (= current-type relaxed-type)
       (when-let [f (allowed-type-upgrades current-type)]
         (f relaxed-type))))
-(defn- changed-field->new-type
-  "Given some fields and old and new types, filter out fields with unchanged types, then pair with the new types."
-  [fields old-types new-types]
-  (let [new-if-changed #(when (not= %1 %2) %2)]
-    (->> (map new-if-changed old-types new-types)
-         (map vector fields)
-         (filter second)
-         (into {}))))
-(defn- alter-columns! [driver database table field->new-type]
-  (driver/alter-columns! driver (:id database) (table-identifier table)
-                         (m/map-kv (fn [field column-type]
-                                     [(keyword (:name field))
-                                      (driver/upload-type->database-type driver column-type)])
-                                   field->new-type)))
+(defn- field-changes
+  "Given existing and newly inferred types for the given `field-names`, calculate which fields need to be added or updated, along with their new types."
+  [field-names existing-types new-types]
+  (reduce
+   (fn [m [f e n]]
+     (cond
+       (nil? e)   (assoc-in m [:added f] n)
+       (not= e n) (assoc-in m [:updated f] n)
+       :else      m))
+   {:added {}, :updated {}}
+   (map vector field-names existing-types new-types)))
+(defn- field->db-type [driver field->col-type]
+  (m/map-kv
+   (fn [field-name col-type]
+     [(keyword field-name)
+      (driver/upload-type->database-type driver col-type)])
+   field->col-type))
+(defn- add-columns! [driver database table field->type & args]
+  (when (seq field->type)
+    (apply driver/add-columns! driver (:id database) (table-identifier table)
+           (field->db-type driver field->type)
+           args)))
+(defn- alter-columns! [driver database table field->new-type & args]
+  (when (seq field->new-type)
+    (apply driver/alter-columns! driver (:id database) (table-identifier table)
+           (field->db-type driver field->new-type)
+            args)))
 (defn- append-csv!*
   [database table file]
@@ -652,24 +672,25 @@
                                 (not (contains? normed-name->field auto-pk-column-name)))
             _                  (check-schema (dissoc normed-name->field auto-pk-column-name) header)
             settings           (upload-parsing/get-settings)
-            old-column-types   (map (comp base-type->upload-type :base_type normed-name->field) normed-header)
+            old-types          (map (comp base-type->upload-type :base_type normed-name->field) normed-header)
             ;; in the happy, and most common, case all the values will match the existing types
             ;; for now we just plan for the worst and perform a fairly expensive operation to detect any type changes
             ;; we can come back and optimize this to an optimistic-with-fallback approach later.
-            detected-types     (column-types-from-rows settings old-column-types rows)
-            new-column-types   (map #(if (matching-or-upgradable? %1 %2) %2 %1) old-column-types detected-types)
-            _                  (when (and (not= old-column-types new-column-types)
-                                          ;; if we cannot coerce all the columns, don't bother coercing any of them
-                                          ;; we will instead throw an error when we try to parse as the old type
-                                          (= detected-types new-column-types))
-                                 (let [fields (map normed-name->field normed-header)]
-                                   (->> (changed-field->new-type fields old-column-types detected-types)
-                                        (alter-columns! driver database table))))
+            detected-types     (column-types-from-rows settings old-types rows)
+            new-types          (map #(if (matching-or-upgradable? %1 %2) %2 %1) old-types detected-types)
+            ;; avoid any schema modification unless we are able to fully upgrade to supporting the given file
+            ;; choosing to not upgrade means that we will defer failure until we hit the first value that cannot
+            ;; be parsed as its previous type - there is scope to improve these error messages in the future.
+            modify-schema?     (and (not= old-types new-types) (= detected-types new-types))
+            _                  (when modify-schema?
+                                 (let [changes (field-changes normed-header old-types new-types)]
+                                   (add-columns! driver database table (:added changes))
+                                   (alter-columns! driver database table (:updated changes))))
             ;; this will fail if any of our required relaxations were rejected.
-            parsed-rows        (parse-rows settings new-column-types rows)
+            parsed-rows        (parse-rows settings new-types rows)
             row-count          (count parsed-rows)
             stats              {:num-rows          row-count
-                                :num-columns       (count new-column-types)
+                                :num-columns       (count new-types)
                                 :generated-columns (if create-auto-pk? 1 0)
                                 :size-mb           (file-size-mb file)
                                 :upload-seconds    (since-ms timer)}]
@@ -680,11 +701,9 @@
             (throw (ex-info (ex-message e) {:status-code 422}))))
         (when create-auto-pk?
-          (driver/add-columns! driver
-                               (:id database)
-                               (table-identifier table)
-                               {auto-pk-column-keyword (driver/upload-type->database-type driver ::auto-incrementing-int-pk)}
-                               :primary-key [auto-pk-column-keyword]))
+          (add-columns! driver database table
+                        {auto-pk-column-keyword ::auto-incrementing-int-pk}
+                        :primary-key [auto-pk-column-keyword]))
         (scan-and-sync-table! database table)
diff --git a/test/metabase/upload_test.clj b/test/metabase/upload_test.clj
index 8fed29a9a37..4d72e4a01ea 100644
--- a/test/metabase/upload_test.clj
+++ b/test/metabase/upload_test.clj
@@ -617,7 +617,7 @@
             (is (some? table))))))))
 (deftest load-from-csv-offset-datetime-test
-  (testing "Upload a CSV file with a datetime column"
+  (testing "Upload a CSV file with an offset datetime column"
     (mt/test-drivers (mt/normal-drivers-with-feature :uploads)
         (mt/with-dynamic-redefs [driver/db-default-timezone (constantly "Z")
@@ -1133,7 +1133,6 @@
                     :user-id (str (mt/user->id :rasta))}
                    (last (snowplow-test/pop-event-data-and-user-id!))))))))))
 (deftest csv-upload-audit-log-test
   ;; Just test with h2 because these events are independent of the driver
   (mt/test-driver :h2
@@ -1351,40 +1350,53 @@
 (deftest append-column-mismatch-test
   (mt/test-drivers (mt/normal-drivers-with-feature :uploads)
-      (testing "Append should fail if there are extra or missing columns in the CSV file"
+      (testing "Append should fail only if there are missing columns in the CSV file"
         (doseq [[csv-rows error-message]
-                {["_mb_row_id,id,name,extra column one,EXTRA COLUMN TWO"]
-                 (trim-lines "The CSV file contains extra columns that are not in the table:
-                              - extra_column_two
-                              - extra_column_one")
-                 [""]
+                {[""]
                  (trim-lines "The CSV file is missing columns that are in the table:
                               - id
                               - name")
+                 ;; Extra columns are fine, as long as none are missing.
+                 ["_mb_row_id,id,extra 1, extra 2,name"]
+                 nil
                  ["_mb_row_id,extra 1, extra 2"]
-                 (trim-lines "The CSV file contains extra columns that are not in the table:
+                 (trim-lines "The CSV file is missing columns that are in the table:
+                              - id
+                              - name
+                              There are new columns in the CSV file that are not in the table:
                               - extra_2
-                              - extra_1
+                              - extra_1")
-                              The CSV file is missing columns that are in the table:
-                              - id
-                              - name")}]
+                 ["_mb_row_id,id, extra 2"]
+                 (trim-lines "The CSV file is missing columns that are in the table:
+                              - name
+                              There are new columns in the CSV file that are not in the table:
+                              - extra_2")}]
             [table (create-upload-table!
                     {:col->upload-type (ordered-map/ordered-map
                                         :id         ::upload/int
                                         :name       ::upload/varchar-255)
                      :rows [[1,"some_text"]]})]
-            (let [file  (csv-file-with csv-rows (mt/random-name))]
-              (is (= {:message error-message
-                      :data {:status-code 422}}
-                     (catch-ex-info (append-csv! {:file     file
-                                                  :table-id (:id table)}))))
-              (testing "Check the data was not uploaded into the table"
-                (is (= [[1 "some_text"]]
-                       (rows-for-table table))))
+            (let [file (csv-file-with csv-rows (mt/random-name))]
+              (when error-message
+                (is (= {:message error-message
+                        :data    {:status-code 422}}
+                       (catch-ex-info (append-csv! {:file file :table-id (:id table)}))))
+                (testing "Check the data was not uploaded into the table"
+                  (is (= [[1 "some_text"]]
+                         (rows-for-table table)))))
+              (when-not error-message
+                (testing "Check the data was uploaded into the table"
+                  ;; No exception is thrown - but there were also no rows in the table to check
+                  (append-csv! {:file file :table-id (:id table)})))
               (io/delete-file file))))))))
 (deftest append-all-types-test
@@ -1639,6 +1651,21 @@
                    (rows-for-table table))))
           (io/delete-file file))))))
+(deftest append-new-column-test
+  (mt/test-drivers (mt/normal-drivers-with-feature :uploads)
+    (with-uploads-allowed
+     (testing "Append should handle new columns being added in the latest CSV"
+       (with-upload-table! [table (create-upload-table!)]
+         ;; Reorder as well for good measure
+         (let [csv-rows ["game,name" "Witticisms,Fluke Skytalker"]
+               file     (csv-file-with csv-rows (mt/random-name))]
+           (testing "The new row is inserted with the values correctly reordered"
+             (is (= {:row-count 1} (append-csv! {:file file, :table-id (:id table)})))
+             (is (= [[1 "Obi-Wan Kenobi" nil]
+                     [2 "Fluke Skytalker" "Witticisms"]]
+                    (rows-for-table table))))
+           (io/delete-file file)))))))
 (deftest append-type-mismatch-test
   (mt/test-drivers (mt/normal-drivers-with-feature :uploads)