diff --git a/modules/drivers/redshift/test/metabase/test/data/redshift.clj b/modules/drivers/redshift/test/metabase/test/data/redshift.clj
index def7b6555500ef28f16ee2b1caf544f2493ecf55..a6e34d5379dc93d6d0f628a082e733132f77c749 100644
--- a/modules/drivers/redshift/test/metabase/test/data/redshift.clj
+++ b/modules/drivers/redshift/test/metabase/test/data/redshift.clj
@@ -1,6 +1,8 @@
 (ns metabase.test.data.redshift
   (:require
    [clojure.java.jdbc :as jdbc]
+   [clojure.string :as str]
+   [java-time :as t]
    [metabase.driver.sql-jdbc.connection :as sql-jdbc.conn]
    [metabase.driver.sql-jdbc.sync :as sql-jdbc.sync]
    [metabase.driver.sql.test-util.unique-prefix :as sql.tu.unique-prefix]
@@ -71,15 +73,89 @@
 
 ;;; Create + destroy the schema used for this test session
 
-(defn- delete-old-schemas! [^java.sql.Connection conn]
-  (with-open [rset (.. conn getMetaData getSchemas)
-              stmt (.createStatement conn)]
-    (while (.next rset)
-      (let [schema (.getString rset "TABLE_SCHEM")
-            sql    (format "DROP SCHEMA IF EXISTS \"%s\" CASCADE;" schema)]
-        (when (sql.tu.unique-prefix/old-dataset-name? schema)
-          (log/info (u/format-color 'blue "[redshift] %s" sql))
-          (.execute stmt sql))))))
+(defn- reducible-result-set [^java.sql.ResultSet rset]
+  (reify clojure.lang.IReduceInit
+    (reduce [_ rf init]
+      (with-open [rset rset]
+        (loop [res init]
+          (if (.next rset)
+            (recur (rf res rset))
+            res))))))
+
+(defn- fetch-schemas [^java.sql.Connection conn]
+  (reify clojure.lang.IReduceInit
+    (reduce [_ rf init]
+      (reduce ((map (fn [^java.sql.ResultSet rset]
+                      (.getString rset "TABLE_SCHEM"))) rf)
+              init
+              (reducible-result-set (.. conn getMetaData getSchemas))))))
+
+(def ^Long HOURS-BEFORE-EXPIRED-THRESHOLD
+  "Number of hours that elapse before a persisted schema is considered expired."
+  6)
+
+(defn- classify-cache-schemas
+  "Classifies the persistence cache schemas. Returns a map with where each value is a (possibly empty) sequence of
+  schemas:
+
+  {:old-style-cache    schemas without a `cache_info` table
+   :recent             schemas with a `cache_info` table and are recently created
+   :expired            `cache_info` table and created [[HOURS-BEFORE-EXPIRED-THRESHOLD]] ago
+   :lacking-created-at should never happen, but if they lack an entry for `created-at`
+   :unknown-error      if an error was thrown while classifying the schema}"
+  [^java.sql.Connection conn schemas]
+  (let [threshold (t/minus (t/instant) (t/hours HOURS-BEFORE-EXPIRED-THRESHOLD))]
+    (with-open [stmt (.createStatement conn)]
+      (let [classify! (fn [schema-name]
+                        (try (let [sql (format "select value from %s.cache_info where key = 'created-at'"
+                                               schema-name)
+                                   rset (.executeQuery stmt sql)]
+                               (if (.next rset)
+                                 (let [date-string (.getString rset "value")
+                                       created-at  (java.time.Instant/parse date-string)]
+                                   (if (t/before? created-at threshold)
+                                     :expired
+                                     :recent))
+                                 :lacking-created-at))
+                             (catch com.amazon.redshift.util.RedshiftException e
+                               (if (re-find #"relation .* does not exist" (or (ex-message e) ""))
+                                 :old-style-cache
+                                 (do (log/error "Error classifying cache schema" e)
+                                     :unknown-error)))
+                             (catch Exception e
+                               (log/error "Error classifying cache schema" e)
+                               :unknown-error)))]
+
+        (group-by classify! schemas)))))
+
+(defn- delete-old-schemas!
+  "Remove unneeded schemas from redshift. Local databases are thrown away after a test run. Shared cloud instances do
+  not have this luxury. Test runs can create schemas where models are persisted and nothing cleans these up, leading
+  to redshift clusters hitting the max number of tables allowed."
+  [^java.sql.Connection conn]
+  (let [{old-convention   :old
+         caches-with-info :cache}    (reduce (fn [acc s]
+                                               (cond (sql.tu.unique-prefix/old-dataset-name? s)
+                                                     (update acc :old conj s)
+                                                     (str/starts-with? s "metabase_cache_")
+                                                     (update acc :cache conj s)
+                                                     :else acc))
+                                             {:old [] :cache []}
+                                             (fetch-schemas conn))
+        {:keys [expired
+                old-style-cache
+                lacking-created-at]} (classify-cache-schemas conn caches-with-info)
+        drop-sql                     (fn [schema-name] (format "DROP SCHEMA IF EXISTS \"%s\" CASCADE;"
+                                                               schema-name))]
+    ;; don't delete unknown-error and recent.
+    (with-open [stmt (.createStatement conn)]
+      (doseq [[collection fmt-str] [[old-convention "Dropping old data schema: %s"]
+                                    [expired "Dropping expired cache schema: %s"]
+                                    [lacking-created-at "Dropping cache without created-at info: %s"]
+                                    [old-style-cache "Dropping old cache schema without `cache_info` table: %s"]]
+              schema               collection]
+        (log/infof fmt-str schema)
+        (.execute stmt (drop-sql schema))))))
 
 (defn- create-session-schema! [^java.sql.Connection conn]
   (with-open [stmt (.createStatement conn)]