diff --git a/src/metabase/upload.clj b/src/metabase/upload.clj index 324be32f8cc8811da78b2cd15c10876a3e9c96e1..90c5cdec9ec8a59d6001da517a624fb9bc010208 100644 --- a/src/metabase/upload.clj +++ b/src/metabase/upload.clj @@ -428,33 +428,82 @@ (defn- file-size-mb [csv-file] (/ (.length ^File csv-file) 1048576.0)) +(def ^:private separators ",;\t") + +(defn- assert-inferred-separator [maybe-s] + (or maybe-s + (throw (ex-info (tru "Unable to recognise file separator") + {:status-code 422})))) + +(defn- infer-separator + "Guess at what symbol is being used as a separator in the given CSV-like file. + Our heuristic is to use the separator that gives us the most number of columns. + Exclude separators which give incompatible column counts between the header and the first row." + [^File file] + (let [count-columns (fn [s] + ;; Create a separate reader per separator, as the line-breaking behaviour depends on the parser. + (with-open [reader (bom/bom-reader file)] + (->> (csv/read-csv reader :separator s) + ;; we only consider the header row and the first data row + (take 2) + (map count) + ;; realize the list before the reader closes + doall)))] + (->> (map (juxt identity count-columns) separators) + ;; We cannot have more data columns than header columns + ;; We currently support files without any data rows, and these get a free pass. + (remove (fn [[_s [header-column-count data-column-count]]] + (when data-column-count + (> data-column-count header-column-count)))) + ;; Prefer separators according to the follow criteria, in order: + ;; - Splitting the header at least once + ;; - Giving a consistent column split for the first two lines of the file + ;; - The number of fields in the header + ;; - The precedence order in how we define them, e.g.. bias towards comma + (sort-by (fn [[_ [header-column-count data-column-count]]] + [(when header-column-count + (> header-column-count 1)) + (= header-column-count data-column-count) + header-column-count]) + u/reverse-compare) + ffirst + assert-inferred-separator))) + +(defn- infer-parser + "Currently this only infers the separator, but in future it may also handle different quoting options." + [file] + (let [s (infer-separator file)] + (fn [stream] + (csv/read-csv stream :separator s)))) + (defn- create-from-csv! "Creates a table from a CSV file. If the table already exists, it will throw an error. Returns the file size, number of rows, and number of columns." [driver db-id table-name ^File csv-file] - (with-open [reader (bom/bom-reader csv-file)] - (let [[header & rows] (without-auto-pk-columns (csv/read-csv reader)) - settings (upload-parsing/get-settings) - {:keys [extant-columns generated-columns]} (detect-schema settings header rows) - cols->upload-type (merge generated-columns extant-columns) - col-definitions (column-definitions driver cols->upload-type) - csv-col-names (keys extant-columns) - col-upload-types (vals extant-columns) - parsed-rows (vec (parse-rows settings col-upload-types rows))] - (driver/create-table! driver - db-id - table-name - col-definitions - :primary-key [auto-pk-column-keyword]) - (try - (driver/insert-into! driver db-id table-name csv-col-names parsed-rows) - {:num-rows (count rows) - :num-columns (count extant-columns) - :generated-columns (count generated-columns) - :size-mb (file-size-mb csv-file)} - (catch Throwable e - (driver/drop-table! driver db-id table-name) - (throw (ex-info (ex-message e) {:status-code 400}))))))) + (let [parse (infer-parser csv-file)] + (with-open [reader (bom/bom-reader csv-file)] + (let [[header & rows] (without-auto-pk-columns (parse reader)) + settings (upload-parsing/get-settings) + {:keys [extant-columns generated-columns]} (detect-schema settings header rows) + cols->upload-type (merge generated-columns extant-columns) + col-definitions (column-definitions driver cols->upload-type) + csv-col-names (keys extant-columns) + col-upload-types (vals extant-columns) + parsed-rows (vec (parse-rows settings col-upload-types rows))] + (driver/create-table! driver + db-id + table-name + col-definitions + :primary-key [auto-pk-column-keyword]) + (try + (driver/insert-into! driver db-id table-name csv-col-names parsed-rows) + {:num-rows (count rows) + :num-columns (count extant-columns) + :generated-columns (count generated-columns) + :size-mb (file-size-mb csv-file)} + (catch Throwable e + (driver/drop-table! driver db-id table-name) + (throw (ex-info (ex-message e) {:status-code 400})))))))) ;;;; +------------------+ ;;;; | Create upload @@ -533,13 +582,14 @@ (defn- fail-stats "If a given upload / append / replace fails, this function is used to create the failure event payload for snowplow. It may involve redundantly reading the file, or even failing again if the file is unreadable." - [file] - (with-open [reader (bom/bom-reader file)] - (let [rows (csv/read-csv reader)] - {:size-mb (file-size-mb file) - :num-columns (count (first rows)) - :num-rows (count (rest rows)) - :generated-columns 0}))) + [^File file] + (let [parse (infer-parser file)] + (with-open [reader (bom/bom-reader file)] + (let [rows (parse reader)] + {:size-mb (file-size-mb file) + :num-columns (count (first rows)) + :num-rows (count (rest rows)) + :generated-columns 0})))) (mu/defn create-csv-upload! "Main entry point for CSV uploading. @@ -717,71 +767,72 @@ (defn- update-with-csv! [database table file & {:keys [replace-rows?]}] (try - (with-open [reader (bom/bom-reader file)] - (let [timer (start-timer) - [header & rows] (without-auto-pk-columns (csv/read-csv reader)) - driver (driver.u/database->driver database) - normed-name->field (m/index-by (comp normalize-column-name :name) - (t2/select :model/Field :table_id (:id table) :active true)) - normed-header (map normalize-column-name header) - create-auto-pk? (and - (driver/create-auto-pk-with-append-csv? driver) - (not (contains? normed-name->field auto-pk-column-name))) - _ (check-schema (dissoc normed-name->field auto-pk-column-name) header) - settings (upload-parsing/get-settings) - old-types (map (comp base-type->upload-type :base_type normed-name->field) normed-header) - ;; in the happy, and most common, case all the values will match the existing types - ;; for now we just plan for the worst and perform a fairly expensive operation to detect any type changes - ;; we can come back and optimize this to an optimistic-with-fallback approach later. - detected-types (column-types-from-rows settings old-types rows) - new-types (map #(if (matching-or-promotable? %1 %2) %2 %1) old-types detected-types) - ;; avoid any schema modification unless all the promotions required by the file are supported, - ;; choosing to not promote means that we will defer failure until we hit the first value that cannot - ;; be parsed as its existing type - there is scope to improve these error messages in the future. - modify-schema? (and (not= old-types new-types) (= detected-types new-types)) - _ (when modify-schema? - (let [changes (field-changes normed-header old-types new-types)] - (add-columns! driver database table (:added changes)) - (alter-columns! driver database table (:updated changes)))) - ;; this will fail if any of our required relaxations were rejected. - parsed-rows (parse-rows settings new-types rows) - row-count (count parsed-rows) - stats {:num-rows row-count - :num-columns (count new-types) - :generated-columns (if create-auto-pk? 1 0) - :size-mb (file-size-mb file) - :upload-seconds (since-ms timer)}] - - (try - (when replace-rows? - (driver/truncate! driver (:id database) (table-identifier table))) - (driver/insert-into! driver (:id database) (table-identifier table) normed-header parsed-rows) - (catch Throwable e - (throw (ex-info (ex-message e) {:status-code 422})))) - - (when create-auto-pk? - (add-columns! driver database table - {auto-pk-column-keyword ::auto-incrementing-int-pk} - :primary-key [auto-pk-column-keyword])) - - (scan-and-sync-table! database table) - - (when create-auto-pk? - (let [auto-pk-field (table-id->auto-pk-column (:id table))] - (t2/update! :model/Field (:id auto-pk-field) {:display_name (:name auto-pk-field)}))) - - (events/publish-event! :event/upload-append - {:user-id (:id @api/*current-user*) - :model-id (:id table) - :model :model/Table - :details {:db-id (:id database) - :schema-name (:schema table) - :table-name (:name table) - :stats stats}}) - - (snowplow/track-event! ::snowplow/csv-append-successful api/*current-user-id* stats) - - {:row-count row-count})) + (let [parse (infer-parser file)] + (with-open [reader (bom/bom-reader file)] + (let [timer (start-timer) + [header & rows] (without-auto-pk-columns (parse reader)) + driver (driver.u/database->driver database) + normed-name->field (m/index-by (comp normalize-column-name :name) + (t2/select :model/Field :table_id (:id table) :active true)) + normed-header (map normalize-column-name header) + create-auto-pk? (and + (driver/create-auto-pk-with-append-csv? driver) + (not (contains? normed-name->field auto-pk-column-name))) + _ (check-schema (dissoc normed-name->field auto-pk-column-name) header) + settings (upload-parsing/get-settings) + old-types (map (comp base-type->upload-type :base_type normed-name->field) normed-header) + ;; in the happy, and most common, case all the values will match the existing types + ;; for now we just plan for the worst and perform a fairly expensive operation to detect any type changes + ;; we can come back and optimize this to an optimistic-with-fallback approach later. + detected-types (column-types-from-rows settings old-types rows) + new-types (map #(if (matching-or-promotable? %1 %2) %2 %1) old-types detected-types) + ;; avoid any schema modification unless all the promotions required by the file are supported, + ;; choosing to not promote means that we will defer failure until we hit the first value that cannot + ;; be parsed as its existing type - there is scope to improve these error messages in the future. + modify-schema? (and (not= old-types new-types) (= detected-types new-types)) + _ (when modify-schema? + (let [changes (field-changes normed-header old-types new-types)] + (add-columns! driver database table (:added changes)) + (alter-columns! driver database table (:updated changes)))) + ;; this will fail if any of our required relaxations were rejected. + parsed-rows (parse-rows settings new-types rows) + row-count (count parsed-rows) + stats {:num-rows row-count + :num-columns (count new-types) + :generated-columns (if create-auto-pk? 1 0) + :size-mb (file-size-mb file) + :upload-seconds (since-ms timer)}] + + (try + (when replace-rows? + (driver/truncate! driver (:id database) (table-identifier table))) + (driver/insert-into! driver (:id database) (table-identifier table) normed-header parsed-rows) + (catch Throwable e + (throw (ex-info (ex-message e) {:status-code 422})))) + + (when create-auto-pk? + (add-columns! driver database table + {auto-pk-column-keyword ::auto-incrementing-int-pk} + :primary-key [auto-pk-column-keyword])) + + (scan-and-sync-table! database table) + + (when create-auto-pk? + (let [auto-pk-field (table-id->auto-pk-column (:id table))] + (t2/update! :model/Field (:id auto-pk-field) {:display_name (:name auto-pk-field)}))) + + (events/publish-event! :event/upload-append + {:user-id (:id @api/*current-user*) + :model-id (:id table) + :model :model/Table + :details {:db-id (:id database) + :schema-name (:schema table) + :table-name (:name table) + :stats stats}}) + + (snowplow/track-event! ::snowplow/csv-append-successful api/*current-user-id* stats) + + {:row-count row-count}))) (catch Throwable e (snowplow/track-event! ::snowplow/csv-append-failed api/*current-user-id* (fail-stats file)) (throw e)))) diff --git a/test/metabase/upload_test.clj b/test/metabase/upload_test.clj index 1eac746882f9ae954c80cdd72e1d11b56f670130..91bf1708a97e0ce2d13caa406a924bb652aa1ac5 100644 --- a/test/metabase/upload_test.clj +++ b/test/metabase/upload_test.clj @@ -564,54 +564,66 @@ [table] (mt/rows (query-table table))) +(def ^:private example-files + {:comma ["id ,nulls,string ,bool ,number ,date ,datetime" + "2\t ,, a ,true ,1.1\t ,2022-01-01,2022-01-01T00:00:00" + "\" 3\",, b,false,\"$ 1,000.1\",2022-02-01,2022-02-01T00:00:00"] + + :semi-colon ["id ;nulls;string ;bool ;number ;date ;datetime" + "2\t ;; a ;true ;1.1\t ;2022-01-01;2022-01-01T00:00:00" + "\" 3\";; b;false;\"$ 1,000.1\";2022-02-01;2022-02-01T00:00:00"] + + :tab ["id \tnulls\tstring \tbool \tnumber \tdate \tdatetime" + "2 \t\t a \ttrue \t1.1 \t2022-01-01\t2022-01-01T00:00:00" + "\" 3\"\t\t b\tfalse\t\"$ 1,000.1\"\t2022-02-01\t2022-02-01T00:00:00"]}) + (deftest create-from-csv-test - (testing "Upload a CSV file" - (mt/test-drivers (mt/normal-drivers-with-feature :uploads) - (with-mysql-local-infile-on-and-off - (with-upload-table! - [table (let [table-name (mt/random-name)] - (@#'upload/create-from-csv! - driver/*driver* - (mt/id) - table-name - (csv-file-with ["id ,nulls,string ,bool ,number ,date ,datetime" - "2\t ,, a ,true ,1.1\t ,2022-01-01,2022-01-01T00:00:00" - "\" 3\",, b,false,\"$ 1,000.1\",2022-02-01,2022-02-01T00:00:00"])) - (sync-upload-test-table! :database (mt/db) :table-name table-name))] - (testing "Table and Fields exist after sync" - (is (=? {:name #"(?i)_mb_row_id" - :semantic_type (if (= driver/*driver* :redshift) - ;; TODO: there is a bug in the redshift driver where the semantic_type is not set - ;; to type/PK even if the column is a PK - :type/Category - :type/PK) - :base_type :type/BigInteger} - (t2/select-one Field :database_position 0 :table_id (:id table)))) - (is (=? {:name #"(?i)id" - :semantic_type :type/PK - :base_type :type/BigInteger} - (t2/select-one Field :database_position 1 :table_id (:id table)))) - (is (=? {:name #"(?i)nulls" - :base_type :type/Text} - (t2/select-one Field :database_position 2 :table_id (:id table)))) - (is (=? {:name #"(?i)string" - :base_type :type/Text} - (t2/select-one Field :database_position 3 :table_id (:id table)))) - (is (=? {:name #"(?i)bool" - :base_type :type/Boolean} - (t2/select-one Field :database_position 4 :table_id (:id table)))) - (is (=? {:name #"(?i)number" - :base_type :type/Float} - (t2/select-one Field :database_position 5 :table_id (:id table)))) - (is (=? {:name #"(?i)date" - :base_type :type/Date} - (t2/select-one Field :database_position 6 :table_id (:id table)))) - (is (=? {:name #"(?i)datetime" - :base_type :type/DateTime} - (t2/select-one Field :database_position 7 :table_id (:id table)))) - (testing "Check the data was uploaded into the table" - (is (= 2 - (count (rows-for-table table))))))))))) + (doseq [[separator lines] example-files] + (testing (format "Upload a CSV file with %s separators." separator) + (mt/test-drivers (mt/normal-drivers-with-feature :uploads) + (with-mysql-local-infile-on-and-off + (with-upload-table! + [table (let [table-name (mt/random-name)] + (@#'upload/create-from-csv! + driver/*driver* + (mt/id) + table-name + (csv-file-with lines)) + (sync-upload-test-table! :database (mt/db) :table-name table-name))] + (testing "Table and Fields exist after sync" + (is (=? {:name #"(?i)_mb_row_id" + :semantic_type (if (= driver/*driver* :redshift) + ;; TODO: there is a bug in the redshift driver where the semantic_type is not set + ;; to type/PK even if the column is a PK + :type/Category + :type/PK) + :base_type :type/BigInteger} + (t2/select-one Field :database_position 0 :table_id (:id table)))) + (is (=? {:name #"(?i)id" + :semantic_type :type/PK + :base_type :type/BigInteger} + (t2/select-one Field :database_position 1 :table_id (:id table)))) + (is (=? {:name #"(?i)nulls" + :base_type :type/Text} + (t2/select-one Field :database_position 2 :table_id (:id table)))) + (is (=? {:name #"(?i)string" + :base_type :type/Text} + (t2/select-one Field :database_position 3 :table_id (:id table)))) + (is (=? {:name #"(?i)bool" + :base_type :type/Boolean} + (t2/select-one Field :database_position 4 :table_id (:id table)))) + (is (=? {:name #"(?i)number" + :base_type :type/Float} + (t2/select-one Field :database_position 5 :table_id (:id table)))) + (is (=? {:name #"(?i)date" + :base_type :type/Date} + (t2/select-one Field :database_position 6 :table_id (:id table)))) + (is (=? {:name #"(?i)datetime" + :base_type :type/DateTime} + (t2/select-one Field :database_position 7 :table_id (:id table)))) + (testing "Check the data was uploaded into the table" + (is (= 2 + (count (rows-for-table table)))))))))))) (deftest create-from-csv-date-test (testing "Upload a CSV file with a datetime column"