diff --git a/modules/drivers/vertica/test/metabase/test/data/vertica.clj b/modules/drivers/vertica/test/metabase/test/data/vertica.clj
index 9195f79f46a28e3c5161476e4ba584ac6c92fd9c..a790141c1819402ee2ea32f4ee9f28a835cfa842 100644
--- a/modules/drivers/vertica/test/metabase/test/data/vertica.clj
+++ b/modules/drivers/vertica/test/metabase/test/data/vertica.clj
@@ -100,8 +100,8 @@
   "Dump a sequence of rows (as vectors) to a CSV file."
   [{:keys [field-definitions rows]} ^String filename]
   (try
-   (let [has-custom-pk? (when-let [pk (sql.tx/fielddefs->pk-field-name field-definitions)]
-                          (not= "id" pk))
+   (let [has-custom-pk? (when-let [pk (not-empty (sql.tx/fielddefs->pk-field-names field-definitions))]
+                          (not= ["id"] pk))
          column-names   (cond->> (mapv :field-name field-definitions)
                           (not has-custom-pk?)
                           (cons "id"))
diff --git a/test/metabase/test/data/dataset_definition_test.clj b/test/metabase/test/data/dataset_definition_test.clj
index 4b2008334227cbd9749ac7ae098594a50a8a22bb..4aa8ba9dad242240c0c96ef65c4a352d88592c70 100644
--- a/test/metabase/test/data/dataset_definition_test.clj
+++ b/test/metabase/test/data/dataset_definition_test.clj
@@ -39,3 +39,27 @@
                       :fk_target_field_id nil
                       :semantic_type      :type/PK}}
                    (set group-fields)))))))))
+
+(mt/defdataset composite-pk
+  [["songs"
+    [{:field-name "artist_id", :base-type :type/Integer, :pk? true}
+     {:field-name "song_id",   :base-type :type/Integer, :pk? true}]
+    [[1 2]]]])
+
+(deftest dataset-with-custom-composite-pk-test
+  (mt/test-drivers (disj (mt/sql-jdbc-drivers)
+                         ;; presto doesn't create PK for test data :) not sure why
+                         :presto-jdbc
+                         ;; creating db for athena is expensive and require some extra steps,
+                         ;; so it's not worth testing against, see [[metabase.test.data.athena/*allow-database-creation*]]
+                         :athena
+                         ;; there is no PK in sparksql
+                         :sparksql)
+    (mt/dataset composite-pk
+      (let [format-name #(ddl.i/format-name driver/*driver* %)]
+        (testing "(artist_id, song_id) is a PK"
+          (is (= #{(format-name "artist_id")
+                   (format-name "song_id")}
+                 (t2/select-fn-set :name :model/Field
+                                   :table_id (mt/id :songs)
+                                   :semantic_type :type/PK))))))))
diff --git a/test/metabase/test/data/sql.clj b/test/metabase/test/data/sql.clj
index a34c3b7fef3701620479bdba98ef7bff47259e5b..0b050d78528b00e017dc0eafd995de76c543b392 100644
--- a/test/metabase/test/data/sql.clj
+++ b/test/metabase/test/data/sql.clj
@@ -240,21 +240,23 @@
         inline-comment (inline-column-comment-sql driver field-comment)]
     (str/join " " (filter some? [field-name field-type not-null unique inline-comment]))))
 
-(defn fielddefs->pk-field-name
-  "Find the pk field name in fieldefs"
+(defn fielddefs->pk-field-names
+  "Find the pk field names in fieldefs"
   [fieldefs]
-  (->> fieldefs (filter :pk?) first :field-name))
+  (->> fieldefs (filter :pk?) (map :field-name)))
 
 (defmethod create-table-sql :sql/test-extensions
   [driver {:keys [database-name], :as _dbdef} {:keys [table-name field-definitions table-comment]}]
-  (let [pk-field-name (format-and-quote-field-name driver (fielddefs->pk-field-name field-definitions))]
+  (let [pk-field-names (->> (fielddefs->pk-field-names field-definitions)
+                            (map (partial format-and-quote-field-name driver))
+                            (str/join ", "))]
     (format "CREATE TABLE %s (%s, PRIMARY KEY (%s)) %s;"
             (qualify-and-quote driver database-name table-name)
             (str/join
              ", "
              (for [field-def field-definitions]
                (field-definition-sql driver field-def)))
-            pk-field-name
+            pk-field-names
             (or (inline-table-comment-sql driver table-comment) ""))))
 
 (defmulti drop-table-if-exists-sql
@@ -285,11 +287,15 @@
 
 (defmethod add-fk-sql :sql/test-extensions
   [driver {:keys [database-name] :as dbdef} {:keys [table-name]} {dest-table-name :fk, field-name :field-name}]
-  (let [quot               #(sql.u/quote-name driver %1 (ddl.i/format-name driver %2))
-        dest-table-name    (name dest-table-name)
-        dest-table-pk-name (->> (get-tabledef dbdef dest-table-name)
-                                :field-definitions
-                                fielddefs->pk-field-name)]
+  (let [quot            #(sql.u/quote-name driver %1 (ddl.i/format-name driver %2))
+        dest-table-name (name dest-table-name)
+        pk-names        (->> (get-tabledef dbdef dest-table-name)
+                             :field-definitions
+                             fielddefs->pk-field-names)
+        _ (when (< 1 (count pk-names))
+            (throw (IllegalArgumentException. "`add-fk-sql` only works with tables with a single PK field")))
+        pk-name             (first pk-names)]
+
     (format "ALTER TABLE %s ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s (%s);"
             (qualify-and-quote driver database-name table-name)
             ;; limit FK constraint name to 30 chars since Oracle doesn't support names longer than that
@@ -300,7 +306,7 @@
               (quot :constraint fk-name))
             (quot :field field-name)
             (qualify-and-quote driver database-name dest-table-name)
-            (quot :field dest-table-pk-name))))
+            (quot :field pk-name))))
 
 (defmethod tx/count-with-template-tag-query :sql/test-extensions
   [driver table field _param-type]