From 0434d5ad8710f2ad0f25dfc0e9a9deb8741cdb0d Mon Sep 17 00:00:00 2001
From: Chris Truter <crisptrutski@users.noreply.github.com>
Date: Wed, 13 Nov 2024 17:01:59 +0200
Subject: [PATCH] Optimize search index updates using delayed batches (#49899)

---
 .clj-kondo/config.edn                      |  1 +
 src/metabase/api/search.clj                |  4 +-
 src/metabase/search/postgres/index.clj     |  3 +-
 src/metabase/search/postgres/ingestion.clj | 54 +++++++++++----
 src/metabase/task/search_index.clj         | 77 +++++++++++++++++-----
 src/metabase/util.cljc                     |  8 +++
 src/metabase/util/queue.clj                | 40 ++++++++++-
 test/metabase/util/queue_test.clj          | 17 +++++
 test/metabase/util_test.cljc               | 11 ++++
 9 files changed, 182 insertions(+), 33 deletions(-)

diff --git a/.clj-kondo/config.edn b/.clj-kondo/config.edn
index 644c7c5e335..a3c19a00c5c 100644
--- a/.clj-kondo/config.edn
+++ b/.clj-kondo/config.edn
@@ -132,6 +132,7 @@
      metabase.driver.sql-jdbc.execute/execute-prepared-statement!
      metabase.pulse.send/send-pulse!
      metabase.query-processor.store/store-database!
+     metabase.util/run-count!
      next.jdbc/execute!}}
 
   :metabase/i-like-making-cams-eyes-bleed-with-horrifically-long-tests
diff --git a/src/metabase/api/search.clj b/src/metabase/api/search.clj
index d6a71f85f85..588c33a99d7 100644
--- a/src/metabase/api/search.clj
+++ b/src/metabase/api/search.clj
@@ -53,8 +53,8 @@
     (throw (ex-info "Search index is not enabled." {:status-code 501}))
 
     (search/supports-index?)
-    (if (task/job-exists? task.search-index/job-key)
-      (do (task/trigger-now! task.search-index/job-key) {:message "task triggered"})
+    (if (task/job-exists? task.search-index/reindex-job-key)
+      (do (task/trigger-now! task.search-index/reindex-job-key) {:message "task triggered"})
       (do (search/reindex!) {:message "done"}))
 
     :else
diff --git a/src/metabase/search/postgres/index.clj b/src/metabase/search/postgres/index.clj
index 72139deb0ef..d0ffffe3d64 100644
--- a/src/metabase/search/postgres/index.clj
+++ b/src/metabase/search/postgres/index.clj
@@ -240,7 +240,8 @@
     (when @initialized?
       (batch-upsert! active-table entries))
     (when @reindexing?
-      (batch-upsert! pending-table entries))))
+      (batch-upsert! pending-table entries))
+    (count entries)))
 
 (defn search-query
   "Query fragment for all models corresponding to a query parameter `:search-term`."
diff --git a/src/metabase/search/postgres/ingestion.clj b/src/metabase/search/postgres/ingestion.clj
index 4577838fac7..afe81fd0614 100644
--- a/src/metabase/search/postgres/ingestion.clj
+++ b/src/metabase/search/postgres/ingestion.clj
@@ -6,12 +6,20 @@
    [metabase.search.config :as search.config]
    [metabase.search.postgres.index :as search.index]
    [metabase.search.spec :as search.spec]
+   [metabase.task :as task]
    [metabase.util :as u]
+   [metabase.util.queue :as queue]
    [toucan2.core :as t2]
    [toucan2.realize :as t2.realize]))
 
 (set! *warn-on-reflection* true)
 
+(defonce ^:private queue (queue/delay-queue))
+
+(def ^:private delay-ms 100)
+
+(def ^:private batch-max 50)
+
 (def ^:private insert-batch-size 150)
 
 (def ^:private model-rankings
@@ -89,7 +97,7 @@
          (m/distinct-by (juxt :id :model))
          (map ->entry)
          (partition-all insert-batch-size)))
-       (run! search.index/batch-update!)))
+       (transduce (map search.index/batch-update!) + 0)))
 
 (defn populate-index!
   "Go over all searchable items and populate the index with them."
@@ -100,25 +108,43 @@
   "Force ingestion to happen immediately, on the same thread."
   false)
 
-(defmacro ^:private run-on-thread [& body]
-  `(if *force-sync*
-     (do ~@body)
-     (doto (Thread. ^Runnable (fn [] ~@body))
-       (.start))))
+(defn- bulk-ingest! [updates]
+  (->> (for [[search-model where-clauses] (u/group-by first second updates)]
+         (spec-index-reducible search-model (into [:or] (distinct where-clauses))))
+       ;; init collection is only for clj-kondo, as we know that the list is non-empty
+       (reduce u/rconcat [])
+       (batch-update!)))
+
+(defn process-next-batch
+  "Wait up to 'delay-ms' for a queued batch to become ready, and process the batch if we get one.
+  Returns the number of search index entries that get updated as a result."
+  [first-delay-ms next-delay-ms]
+  (if-let [queued-updates (queue/take-delayed-batch! queue batch-max first-delay-ms next-delay-ms)]
+    (bulk-ingest! queued-updates)
+    0))
+
+(defn- index-worker-exists? []
+  (task/job-exists? @(requiring-resolve 'metabase.task.search-index/reindex-job-key))
+  (task/job-exists? @(requiring-resolve 'metabase.task.search-index/update-job-key)))
+
+(defn- ^:private ingest-maybe-async!
+  "Update or create any search index entries related to the given updates.
+  Will be async if the worker exists, otherwise it will be done synchronously on the calling thread.
+  Can also be forced to be run synchronous for testing."
+  ([updates]
+   (ingest-maybe-async! updates (or *force-sync* (not (index-worker-exists?)))))
+  ([updates sync?]
+   (if sync?
+     (bulk-ingest! updates)
+     (doseq [update updates]
+       (queue/put-with-delay! queue delay-ms update)))))
 
 (defn update-index!
   "Given a new or updated instance, create or update all the corresponding search entries if needed."
   [instance & [always?]]
   (when-let [updates (seq (search.spec/search-models-to-update instance always?))]
     ;; We need to delay execution to handle deletes, which alert us *before* updating the database.
-    ;; TODO It's dangerous to simply unleash threads on the world, this should use a queue in future.
-    (run-on-thread
-     (Thread/sleep 100)
-     (->> (for [[search-model where-clause] updates]
-            (spec-index-reducible search-model where-clause))
-          ;; init collection is only for clj-kondo, as we know that the list is non-empty
-          (reduce u/rconcat [])
-          (batch-update!)))
+    (ingest-maybe-async! updates)
     nil))
 
 ;; TODO think about how we're going to handle cascading deletes.
diff --git a/src/metabase/task/search_index.clj b/src/metabase/task/search_index.clj
index 7fc5d52b99e..75ee1b024e1 100644
--- a/src/metabase/task/search_index.clj
+++ b/src/metabase/task/search_index.clj
@@ -1,26 +1,36 @@
 (ns metabase.task.search-index
+  ;; metabase.search.postgres.ingestion has not been exposed publicly yet, it needs a higher level API
+  #_{:clj-kondo/ignore [:metabase/ns-module-checker]}
   (:require
    [clojurewerkz.quartzite.jobs :as jobs]
    [clojurewerkz.quartzite.schedule.simple :as simple]
    [clojurewerkz.quartzite.triggers :as triggers]
    [metabase.search :as search]
+   [metabase.search.postgres.ingestion :as search.ingestion]
    [metabase.task :as task]
    [metabase.util.log :as log])
   (:import
-   (org.quartz DisallowConcurrentExecution)))
+   (org.quartz DisallowConcurrentExecution JobDetail Trigger)))
 
 (set! *warn-on-reflection* true)
 
 ;; This is problematic multi-instance deployments, see below.
 (def ^:private recreated? (atom false))
 
-(def job-key
-  "Key used to define and trigger the search-index task."
-  (jobs/key "metabase.task.search-index.job"))
+(def ^:private reindex-stem "metabase.task.search-index.reindex")
+(def ^:private update-stem "metabase.task.search-index.update")
+
+(def reindex-job-key
+  "Key used to define and trigger a job that rebuilds the entire index from scratch."
+  (jobs/key (str reindex-stem ".job")))
+
+(def update-job-key
+  "Key used to define and trigger a job that makes incremental updates to the search index."
+  (jobs/key (str update-stem ".job")))
 
 (jobs/defjob ^{DisallowConcurrentExecution true
                :doc                        "Populate Search Index"}
-  SearchIndexing [_ctx]
+  SearchIndexReindex [_ctx]
   (when (search/supports-index?)
     (if (not @recreated?)
       (do (log/info "Recreating search index from the latest schema")
@@ -34,13 +44,50 @@
           (search/reindex!)))
     (log/info "Done indexing.")))
 
-(defmethod task/init! ::SearchIndex [_]
-  (let [job     (jobs/build
-                 (jobs/of-type SearchIndexing)
-                 (jobs/with-identity job-key))
-        trigger (triggers/build
-                 (triggers/with-identity (triggers/key "metabase.task.search-index.trigger"))
-                 (triggers/start-now)
-                 (triggers/with-schedule
-                  (simple/schedule (simple/with-interval-in-hours 1))))]
-    (task/schedule-task! job trigger)))
+(defn- force-scheduled-task! [^JobDetail job ^Trigger trigger]
+  ;; For some reason, using the schedule-task! with a non-durable job causes it to only fire on the first trigger.
+  #_(task/schedule-task! job trigger)
+  (task/delete-task! (.getKey job) (.getKey trigger))
+  (task/add-job! job)
+  (task/add-trigger! trigger))
+
+(defmethod task/init! ::SearchIndexReindex [_]
+  (let [job         (jobs/build
+                     (jobs/of-type SearchIndexReindex)
+                     (jobs/store-durably)
+                     (jobs/with-identity reindex-job-key))
+        trigger-key (triggers/key (str reindex-stem ".trigger"))
+        trigger     (triggers/build
+                     (triggers/with-identity trigger-key)
+                     (triggers/for-job reindex-job-key)
+                     (triggers/start-now)
+                     (triggers/with-schedule
+                      (simple/schedule (simple/with-interval-in-hours 1))))]
+    (force-scheduled-task! job trigger)))
+
+(jobs/defjob ^{DisallowConcurrentExecution true
+               :doc                        "Keep Search Index updated"}
+  SearchIndexUpdate [_ctx]
+  (when (search/supports-index?)
+    (while true
+      (let [updated-entry-count (search.ingestion/process-next-batch Long/MAX_VALUE 100)]
+        (when (pos? updated-entry-count)
+          (log/infof "Updated %d search index entries" updated-entry-count))))))
+
+(defmethod task/init! ::SearchIndexUpdate [_]
+  (let [job         (jobs/build
+                     (jobs/of-type SearchIndexUpdate)
+                     (jobs/store-durably)
+                     (jobs/with-identity update-job-key))
+        trigger-key (triggers/key (str update-stem ".trigger"))
+        trigger     (triggers/build
+                     (triggers/with-identity trigger-key)
+                     (triggers/for-job update-job-key)
+                     (triggers/start-now)
+                     ;; This schedule is only here to restart the task if it dies for some reason.
+                     (triggers/with-schedule (simple/schedule (simple/with-interval-in-seconds 1))))]
+    (force-scheduled-task! job trigger)))
+
+(comment
+  (task/job-exists? reindex-job-key)
+  (task/job-exists? update-job-key))
diff --git a/src/metabase/util.cljc b/src/metabase/util.cljc
index 4c4936711a8..7db0f815f73 100644
--- a/src/metabase/util.cljc
+++ b/src/metabase/util.cljc
@@ -1145,3 +1145,11 @@
          (let [acc1 (reduce f init r1)
                acc2 (reduce f acc1 r2)]
            acc2)))))
+
+(defn run-count!
+  "Runs the supplied procedure (via reduce), for purposes of side effects, on successive items. See [clojure.core/run!]
+   Returns the number of items processed."
+  [proc reducible]
+  (let [cnt (volatile! 0)]
+    (reduce (fn [_ item] (vswap! cnt inc) (proc item)) nil reducible)
+    @cnt))
diff --git a/src/metabase/util/queue.clj b/src/metabase/util/queue.clj
index 68dd7545800..2b77e0ccc22 100644
--- a/src/metabase/util/queue.clj
+++ b/src/metabase/util/queue.clj
@@ -1,6 +1,7 @@
 (ns metabase.util.queue
   (:import
-   (java.util.concurrent ArrayBlockingQueue SynchronousQueue TimeUnit)))
+   (java.time Duration Instant)
+   (java.util.concurrent ArrayBlockingQueue DelayQueue Delayed SynchronousQueue TimeUnit)))
 
 (set! *warn-on-reflection* true)
 
@@ -47,3 +48,40 @@
                         (SynchronousQueue.)
                         block-ms
                         sleep-ms))
+
+(defrecord DelayValue [value ^Instant ready-at]
+  Delayed
+  (getDelay [_ unit]
+    (.convert unit (- (.toEpochMilli ready-at) (System/currentTimeMillis)) TimeUnit/MILLISECONDS))
+  (compareTo [this other]
+    (Long/compare (.getDelay this TimeUnit/MILLISECONDS)
+                  (.getDelay ^Delayed other TimeUnit/MILLISECONDS))))
+
+(defn delay-queue
+  "Return an unbounded queue that returns each item only after some specified delay."
+  ^DelayQueue []
+  (DelayQueue.))
+
+(defn put-with-delay!
+  "Put an item on the delay queue, with a delay given in milliseconds."
+  [^DelayQueue queue delay-ms value]
+  (.offer queue (->DelayValue value (.plus (Instant/now) (Duration/ofMillis delay-ms)))))
+
+(defn- take-delayed-batch* [^DelayQueue queue max-items ^long poll-ms acc]
+  (loop [acc acc]
+    (if (>= (count acc) max-items)
+      acc
+      (if-let [item (if (pos? poll-ms)
+                      (.poll queue poll-ms TimeUnit/MILLISECONDS)
+                      (.poll queue))]
+        (recur (conj acc (:value item)))
+        (not-empty acc)))))
+
+(defn take-delayed-batch!
+  "Get up to `max-items` of the ready items off a given delay queue."
+  ([queue max-items]
+   (take-delayed-batch* queue max-items 0 []))
+  ([^DelayQueue queue max-items ^long max-first-ms ^long max-next-ms]
+   (if-let [fst (.poll queue max-first-ms TimeUnit/MILLISECONDS)]
+     (take-delayed-batch* queue max-items max-next-ms [(:value fst)])
+     nil)))
diff --git a/test/metabase/util/queue_test.clj b/test/metabase/util/queue_test.clj
index e6d400efa1d..3af79ad6d01 100644
--- a/test/metabase/util/queue_test.clj
+++ b/test/metabase/util/queue_test.clj
@@ -83,3 +83,20 @@
 
     (testing "The realtime events are processed in order"
       (mt/ordered-subset? realtime-events processed))))
+
+(deftest delay-queue-test
+  (let [q (queue/delay-queue)]
+    (dotimes [_ 5]
+      (queue/put-with-delay! q (+ 100 (* 100 (Math/random))) (System/nanoTime)))
+    (is (empty? (queue/take-delayed-batch! q 3)))
+    (is (nil? (queue/take-delayed-batch! q 3 90 0)))
+    (is (< 0 (count (queue/take-delayed-batch! q 3 110 0))))
+    (is (> 4 (count (queue/take-delayed-batch! q 3))))
+    (is (empty? (queue/take-delayed-batch! q 3))))
+
+  ;; TODO
+  ;; add a bunch of stuff, from a bunch of queues
+  ;; check that it isn't all ready immediately
+  ;; check that we don't lose anything
+  ;; check that it matures (roughly) in order
+  )
diff --git a/test/metabase/util_test.cljc b/test/metabase/util_test.cljc
index 36ad263dbcf..5dee2a3c68c 100644
--- a/test/metabase/util_test.cljc
+++ b/test/metabase/util_test.cljc
@@ -563,3 +563,14 @@
             (eduction (map inc) (range 3))
             (eduction (map dec) (range 10 0 -1)))
            [25])))))
+
+(deftest ^:parallel run-count!-test
+  (testing "counts the things"
+    (is (zero? (u/run-count! inc nil)))
+    (is (zero? (u/run-count! inc [])))
+    (is (= 3 (u/run-count! inc (range 3))))
+    (is (= 3 (u/run-count! inc (eduction (map inc) (range 3))))))
+  (testing "does the stuff"
+    (let [acc (volatile! [])]
+      (u/run-count! #(vswap! acc conj %) (eduction (map inc) (range 3)))
+      (is (= [1 2 3] @acc)))))
-- 
GitLab