diff --git a/src/metabase/lib/breakout.cljc b/src/metabase/lib/breakout.cljc
index f68d49f74cdbc6e4c9b00bc6e8a2f43a78f7a5fb..35de9af81b9f6c6f9617c87c99dc14f702bacf07 100644
--- a/src/metabase/lib/breakout.cljc
+++ b/src/metabase/lib/breakout.cljc
@@ -3,6 +3,7 @@
    [clojure.string :as str]
    [metabase.lib.binning :as lib.binning]
    [metabase.lib.equality :as lib.equality]
+   [metabase.lib.join :as-alias lib.join]
    [metabase.lib.metadata.calculation :as lib.metadata.calculation]
    [metabase.lib.ref :as lib.ref]
    [metabase.lib.remove-replace :as lib.remove-replace]
@@ -102,23 +103,31 @@
   ([query                                         :- ::lib.schema/query
     stage-number                                  :- :int
     column                                        :- ::lib.schema.metadata/column
-    {:keys [same-temporal-bucket?], :as _options} :- [:maybe
+    {:keys [same-binning-strategy?
+            same-temporal-bucket?], :as _options} :- [:maybe
                                                       [:map
+                                                       [:same-binning-strategy? {:optional true} [:maybe :boolean]]
                                                        [:same-temporal-bucket? {:optional true} [:maybe :boolean]]]]]
    (not-empty
     (into []
-          (filter (fn [a-breakout]
+          (filter (fn [[_ref {:keys [join-alias source-field]} _id-or-name :as a-breakout]]
                     (and (lib.equality/find-matching-column query stage-number a-breakout [column] {:generous? true})
-                         (if same-temporal-bucket?
-                           (= (lib.temporal-bucket/temporal-bucket a-breakout)
-                              (lib.temporal-bucket/temporal-bucket column))
-                           true))))
+                         (= source-field (:fk-field-id column)) ; Must match, including both being nil/missing.
+                         (= join-alias   (::lib.join/join-alias column))  ; Must match, including both being nil/missing.
+                         (or (not same-temporal-bucket?)
+                             (= (lib.temporal-bucket/temporal-bucket a-breakout)
+                                (lib.temporal-bucket/temporal-bucket column)))
+                         (or (not same-binning-strategy?)
+                             (lib.binning/binning= (lib.binning/binning a-breakout)
+                                                   (lib.binning/binning column))))))
           (breakouts query stage-number)))))
 
 (defn breakout-column?
   "Returns if `column` is a breakout column of stage with `stage-number` of `query`."
-  [query stage-number column]
-  (seq (existing-breakouts query stage-number column)))
+  ([query stage-number column]
+   (breakout-column? query stage-number column nil))
+  ([query stage-number column opts]
+   (seq (existing-breakouts query stage-number column opts))))
 
 (mu/defn remove-existing-breakouts-for-column :- ::lib.schema/query
   "Remove all existing breakouts against `column` if there are any in the stage in question. Disregards temporal
diff --git a/test/metabase/lib/breakout_test.cljc b/test/metabase/lib/breakout_test.cljc
index b88c9719103d3a6163088eac8fb2e25b8ea425ca..57f4d3ecc4a8f5e678fd10ae74b3678d463638fe 100644
--- a/test/metabase/lib/breakout_test.cljc
+++ b/test/metabase/lib/breakout_test.cljc
@@ -650,6 +650,61 @@
               (meta/id :people :latitude)]]
             (lib.breakout/existing-breakouts query -1 (meta/field-metadata :people :latitude))))))
 
+(deftest ^:parallel existing-breakouts-multiple-implicit-joins-test
+  (let [base   (lib/query meta/metadata-provider (meta/table-metadata :ic/reports))
+        groups (lib/group-columns (lib/breakoutable-columns base))
+        by-fk  (m/index-by :fk-field-id groups)
+        ;; Implicitly joining on both :created-by and :updated-by.
+        name-by-created-by (->> (get by-fk (meta/id :ic/reports :created-by))
+                                lib/columns-group-columns
+                                (m/find-first #(= (:id %) (meta/id :ic/accounts :name))))
+        name-by-updated-by (->> (get by-fk (meta/id :ic/reports :updated-by))
+                                lib/columns-group-columns
+                                (m/find-first #(= (:id %) (meta/id :ic/accounts :name))))]
+    (testing "implicit joins through two FKs to the same column"
+      (testing "both exist"
+        (is (some? name-by-created-by))
+        (is (some? name-by-updated-by)))
+      (testing "are distinct"
+        (is (not= name-by-created-by name-by-updated-by)))
+      (testing "are not considered 'existing breakouts'"
+        (is (nil? (-> base
+                      (lib/breakout name-by-created-by)
+                      (lib.breakout/existing-breakouts -1 name-by-updated-by))))
+        (is (nil? (-> base
+                      (lib/breakout name-by-updated-by)
+                      (lib.breakout/existing-breakouts -1 name-by-created-by))))))))
+
+(deftest ^:parallel existing-breakouts-multiple-explicit-joins-test
+  (let [base (-> (lib/query meta/metadata-provider (meta/table-metadata :ic/reports))
+                 (lib/join (lib/join-clause (meta/table-metadata :ic/accounts)
+                                            [(lib/= (meta/field-metadata :ic/reports :created-by)
+                                                    (meta/field-metadata :ic/accounts :id))]))
+                 (lib/join (lib/join-clause (meta/table-metadata :ic/accounts)
+                                            [(lib/= (meta/field-metadata :ic/reports :updated-by)
+                                                    (meta/field-metadata :ic/accounts :id))])))
+        groups (lib/group-columns (lib/breakoutable-columns base))
+        joined (filter #(= (:metabase.lib.column-group/group-type %) :group-type/join.explicit) groups)
+        name-by-created-by (->> (nth joined 0)
+                                lib/columns-group-columns
+                                (m/find-first #(= (:id %) (meta/id :ic/accounts :name))))
+        name-by-updated-by (->> (nth joined 1)
+                                lib/columns-group-columns
+                                (m/find-first #(= (:id %) (meta/id :ic/accounts :name))))]
+    (testing "explicit joins through two FKs to the same column"
+      (testing "both exist"
+        (is (some? name-by-created-by))
+        (is (some? name-by-updated-by)))
+      (testing "are distinct"
+        (is (not= name-by-created-by name-by-updated-by)))
+      (testing "are not considered 'existing breakouts'"
+        (is (nil? (-> base
+                      (lib/breakout name-by-created-by)
+                      (lib.breakout/existing-breakouts -1 name-by-updated-by))))
+        (is (nil? (-> base
+                      (lib/breakout name-by-updated-by)
+                      (lib.breakout/existing-breakouts -1 name-by-created-by))))))))
+
 (deftest ^:parallel remove-existing-breakouts-for-column-test
   (let [query  (-> (lib/query meta/metadata-provider (meta/table-metadata :people))
                    (lib/aggregate (lib/count))
diff --git a/test/metabase/lib/test_util/generators.cljc b/test/metabase/lib/test_util/generators.cljc
index e50970912d9c94e273b3fd272a1df27b8ca2846e..e4ff2af9d816fd90a0c0b5a61addb316e9971dbd 100644
--- a/test/metabase/lib/test_util/generators.cljc
+++ b/test/metabase/lib/test_util/generators.cljc
@@ -2,8 +2,10 @@
   (:require
    #?@(:cljs (metabase.test-runner.assert-exprs.approximately-equal))
    [clojure.test :refer [deftest is testing]]
+   [metabase.lib.breakout :as lib.breakout]
    [metabase.lib.core :as lib]
-   [metabase.lib.test-metadata :as meta]))
+   [metabase.lib.test-metadata :as meta]
+   [metabase.util :as u]))
 
 ;; NOTE: Being able to *execute* these queries and grok the results would actually be really powerful, if we can
 ;; achieve it with moderate cost. I think we can, at least for most queries. Some temporal stuff is a huge PITA,
@@ -74,6 +76,22 @@
 (defn- add-step [{:keys [kind] :as step-def}]
   (swap! step-kinds assoc kind step-def))
 
+;; Helpers =======================================================================================
+(defn- choose
+  "Uniformly chooses among a seq of options.
+
+  Returns nil if the list is empty! This is handy for choose-and-do vs. do-nothing while writing the next steps."
+  [xs]
+  (when-not (empty? xs)
+    (rand-nth xs)))
+
+(defn- choose-stage
+  "Chooses a stage to operator on. 80% act on -1, 20% chooses a stage by index (which might be the last stage)."
+  [query]
+  (if (< (rand) 0.8)
+    -1
+    (rand-int (count (:stages query)))))
+
 ;; Aggregations ==================================================================================
 ;; TODO: Add a schema for the step `[vectors ...]`?
 (add-step {:kind :aggregate})
@@ -96,18 +114,6 @@
               (last after-aggs))
           "new aggregation should resemble the intended one"))))
 
-(defn- choose
-  "Uniformly chooses among a seq of options."
-  [xs]
-  (rand-nth xs))
-
-(defn- choose-stage
-  "Chooses a stage to operato on. 80% act on -1, 20% chooses a stage by index (which might be the last stage)."
-  [query]
-  (if (< (rand) 0.8)
-    -1
-    (rand-int (count (:stages query)))))
-
 ;; TODO: If exhaustion is a goal, we should think about making these into generators or otherwise reifying the choices.
 (defmethod next-steps* :aggregate [query _aggregate]
   (let [stage-number (choose-stage query)
@@ -117,7 +123,51 @@
                        (lib/aggregation-clause operator))]
     [:aggregate stage-number agg]))
 
-;; Helpers
+;; Breakouts =====================================================================================
+(add-step {:kind :breakout})
+
+(defmethod next-steps* :breakout [query _breakout]
+  (let [stage-number (choose-stage query)
+        column       (choose (lib/breakoutable-columns query stage-number))
+        ;; If this is a temporal column, we need to choose a unit for it. Nil if it's not temporal.
+        ;; TODO: Don't always bucket/bin! We should sometimes choose the "Don't bin" option etc.
+        bucket       (choose (lib/available-temporal-buckets query stage-number column))
+        binning      (choose (lib/available-binning-strategies query stage-number column))
+        brk-column   (cond-> column
+                       bucket  (lib/with-temporal-bucket bucket)
+                       binning (lib/with-binning binning))]
+    [:breakout stage-number (lib/ref brk-column) brk-column]))
+
+(defmethod run-step* :breakout [query [_breakout stage-number brk-clause _column]]
+  (lib/breakout query stage-number brk-clause))
+
+(defmethod before-and-after :breakout [before after [_breakout stage-number brk-clause column]]
+  ;; Duplicate breakouts are not allowed! So we want to check that logic.
+  (let [before-breakouts (lib/breakouts before stage-number)
+        after-breakouts  (lib/breakouts after  stage-number)
+        opts             {:same-binning-strategy? true
+                          :same-temporal-bucket?  true}
+        fresh?           (empty? (lib.breakout/existing-breakouts before stage-number column opts))]
+    (testing (str ":breakout stage " stage-number
+                  "\n\nBefore query\n" (u/pprint-to-str before)
+                  "\n\nwith column\n" (u/pprint-to-str column)
+                  "\n\nwith breakout clause\n" (u/pprint-to-str brk-clause)
+                  "\n")
+      (if fresh?
+        (testing "freshly added breakout columns"
+          (is (= false (boolean (lib.breakout/breakout-column? before stage-number column opts)))
+              "are not present before")
+          (is (= true  (boolean (lib.breakout/breakout-column? after  stage-number column opts)))
+              "are present after")
+          (testing "go at the end of the list"
+            (is (= (count after-breakouts)
+                   (inc (count before-breakouts))))
+            (is (=? brk-clause (last after-breakouts)))))
+        (testing "duplicate breakout columns are blocked"
+          (is (= (count after-breakouts)
+                 (count before-breakouts))))))))
+
+;; Generator internals ===========================================================================
 (defn- run-step
   "Applies a step, returning the updated context."
   [{:keys [query] :as ctx} step]
@@ -231,8 +281,9 @@
      (vary-meta query assoc ::context (dissoc ctx :query)))))
 
 (deftest ^:parallel query-generator-test
-  (doseq [q (-> (lib/query meta/metadata-provider (meta/table-metadata :orders))
-                (random-queries-from 100))]
+  (doseq [table (meta/tables)
+          q     (-> (lib/query meta/metadata-provider (meta/table-metadata table))
+                    (random-queries-from 100))]
     (is (= (->> (lib/stage-count q)
                 range
                 (map (comp count #(lib/aggregations q %)))