Skip to content
Snippets Groups Projects
Unverified Commit 0434d5ad authored by Chris Truter's avatar Chris Truter Committed by GitHub
Browse files

Optimize search index updates using delayed batches (#49899)

parent e150fafa
No related branches found
No related tags found
No related merge requests found
......@@ -132,6 +132,7 @@
metabase.driver.sql-jdbc.execute/execute-prepared-statement!
metabase.pulse.send/send-pulse!
metabase.query-processor.store/store-database!
metabase.util/run-count!
next.jdbc/execute!}}
:metabase/i-like-making-cams-eyes-bleed-with-horrifically-long-tests
......
......@@ -53,8 +53,8 @@
(throw (ex-info "Search index is not enabled." {:status-code 501}))
(search/supports-index?)
(if (task/job-exists? task.search-index/job-key)
(do (task/trigger-now! task.search-index/job-key) {:message "task triggered"})
(if (task/job-exists? task.search-index/reindex-job-key)
(do (task/trigger-now! task.search-index/reindex-job-key) {:message "task triggered"})
(do (search/reindex!) {:message "done"}))
:else
......
......@@ -240,7 +240,8 @@
(when @initialized?
(batch-upsert! active-table entries))
(when @reindexing?
(batch-upsert! pending-table entries))))
(batch-upsert! pending-table entries))
(count entries)))
(defn search-query
"Query fragment for all models corresponding to a query parameter `:search-term`."
......
......@@ -6,12 +6,20 @@
[metabase.search.config :as search.config]
[metabase.search.postgres.index :as search.index]
[metabase.search.spec :as search.spec]
[metabase.task :as task]
[metabase.util :as u]
[metabase.util.queue :as queue]
[toucan2.core :as t2]
[toucan2.realize :as t2.realize]))
(set! *warn-on-reflection* true)
(defonce ^:private queue (queue/delay-queue))
(def ^:private delay-ms 100)
(def ^:private batch-max 50)
(def ^:private insert-batch-size 150)
(def ^:private model-rankings
......@@ -89,7 +97,7 @@
(m/distinct-by (juxt :id :model))
(map ->entry)
(partition-all insert-batch-size)))
(run! search.index/batch-update!)))
(transduce (map search.index/batch-update!) + 0)))
(defn populate-index!
"Go over all searchable items and populate the index with them."
......@@ -100,25 +108,43 @@
"Force ingestion to happen immediately, on the same thread."
false)
(defmacro ^:private run-on-thread [& body]
`(if *force-sync*
(do ~@body)
(doto (Thread. ^Runnable (fn [] ~@body))
(.start))))
(defn- bulk-ingest! [updates]
(->> (for [[search-model where-clauses] (u/group-by first second updates)]
(spec-index-reducible search-model (into [:or] (distinct where-clauses))))
;; init collection is only for clj-kondo, as we know that the list is non-empty
(reduce u/rconcat [])
(batch-update!)))
(defn process-next-batch
"Wait up to 'delay-ms' for a queued batch to become ready, and process the batch if we get one.
Returns the number of search index entries that get updated as a result."
[first-delay-ms next-delay-ms]
(if-let [queued-updates (queue/take-delayed-batch! queue batch-max first-delay-ms next-delay-ms)]
(bulk-ingest! queued-updates)
0))
(defn- index-worker-exists? []
(task/job-exists? @(requiring-resolve 'metabase.task.search-index/reindex-job-key))
(task/job-exists? @(requiring-resolve 'metabase.task.search-index/update-job-key)))
(defn- ^:private ingest-maybe-async!
"Update or create any search index entries related to the given updates.
Will be async if the worker exists, otherwise it will be done synchronously on the calling thread.
Can also be forced to be run synchronous for testing."
([updates]
(ingest-maybe-async! updates (or *force-sync* (not (index-worker-exists?)))))
([updates sync?]
(if sync?
(bulk-ingest! updates)
(doseq [update updates]
(queue/put-with-delay! queue delay-ms update)))))
(defn update-index!
"Given a new or updated instance, create or update all the corresponding search entries if needed."
[instance & [always?]]
(when-let [updates (seq (search.spec/search-models-to-update instance always?))]
;; We need to delay execution to handle deletes, which alert us *before* updating the database.
;; TODO It's dangerous to simply unleash threads on the world, this should use a queue in future.
(run-on-thread
(Thread/sleep 100)
(->> (for [[search-model where-clause] updates]
(spec-index-reducible search-model where-clause))
;; init collection is only for clj-kondo, as we know that the list is non-empty
(reduce u/rconcat [])
(batch-update!)))
(ingest-maybe-async! updates)
nil))
;; TODO think about how we're going to handle cascading deletes.
......
(ns metabase.task.search-index
;; metabase.search.postgres.ingestion has not been exposed publicly yet, it needs a higher level API
#_{:clj-kondo/ignore [:metabase/ns-module-checker]}
(:require
[clojurewerkz.quartzite.jobs :as jobs]
[clojurewerkz.quartzite.schedule.simple :as simple]
[clojurewerkz.quartzite.triggers :as triggers]
[metabase.search :as search]
[metabase.search.postgres.ingestion :as search.ingestion]
[metabase.task :as task]
[metabase.util.log :as log])
(:import
(org.quartz DisallowConcurrentExecution)))
(org.quartz DisallowConcurrentExecution JobDetail Trigger)))
(set! *warn-on-reflection* true)
;; This is problematic multi-instance deployments, see below.
(def ^:private recreated? (atom false))
(def job-key
"Key used to define and trigger the search-index task."
(jobs/key "metabase.task.search-index.job"))
(def ^:private reindex-stem "metabase.task.search-index.reindex")
(def ^:private update-stem "metabase.task.search-index.update")
(def reindex-job-key
"Key used to define and trigger a job that rebuilds the entire index from scratch."
(jobs/key (str reindex-stem ".job")))
(def update-job-key
"Key used to define and trigger a job that makes incremental updates to the search index."
(jobs/key (str update-stem ".job")))
(jobs/defjob ^{DisallowConcurrentExecution true
:doc "Populate Search Index"}
SearchIndexing [_ctx]
SearchIndexReindex [_ctx]
(when (search/supports-index?)
(if (not @recreated?)
(do (log/info "Recreating search index from the latest schema")
......@@ -34,13 +44,50 @@
(search/reindex!)))
(log/info "Done indexing.")))
(defmethod task/init! ::SearchIndex [_]
(let [job (jobs/build
(jobs/of-type SearchIndexing)
(jobs/with-identity job-key))
trigger (triggers/build
(triggers/with-identity (triggers/key "metabase.task.search-index.trigger"))
(triggers/start-now)
(triggers/with-schedule
(simple/schedule (simple/with-interval-in-hours 1))))]
(task/schedule-task! job trigger)))
(defn- force-scheduled-task! [^JobDetail job ^Trigger trigger]
;; For some reason, using the schedule-task! with a non-durable job causes it to only fire on the first trigger.
#_(task/schedule-task! job trigger)
(task/delete-task! (.getKey job) (.getKey trigger))
(task/add-job! job)
(task/add-trigger! trigger))
(defmethod task/init! ::SearchIndexReindex [_]
(let [job (jobs/build
(jobs/of-type SearchIndexReindex)
(jobs/store-durably)
(jobs/with-identity reindex-job-key))
trigger-key (triggers/key (str reindex-stem ".trigger"))
trigger (triggers/build
(triggers/with-identity trigger-key)
(triggers/for-job reindex-job-key)
(triggers/start-now)
(triggers/with-schedule
(simple/schedule (simple/with-interval-in-hours 1))))]
(force-scheduled-task! job trigger)))
(jobs/defjob ^{DisallowConcurrentExecution true
:doc "Keep Search Index updated"}
SearchIndexUpdate [_ctx]
(when (search/supports-index?)
(while true
(let [updated-entry-count (search.ingestion/process-next-batch Long/MAX_VALUE 100)]
(when (pos? updated-entry-count)
(log/infof "Updated %d search index entries" updated-entry-count))))))
(defmethod task/init! ::SearchIndexUpdate [_]
(let [job (jobs/build
(jobs/of-type SearchIndexUpdate)
(jobs/store-durably)
(jobs/with-identity update-job-key))
trigger-key (triggers/key (str update-stem ".trigger"))
trigger (triggers/build
(triggers/with-identity trigger-key)
(triggers/for-job update-job-key)
(triggers/start-now)
;; This schedule is only here to restart the task if it dies for some reason.
(triggers/with-schedule (simple/schedule (simple/with-interval-in-seconds 1))))]
(force-scheduled-task! job trigger)))
(comment
(task/job-exists? reindex-job-key)
(task/job-exists? update-job-key))
......@@ -1145,3 +1145,11 @@
(let [acc1 (reduce f init r1)
acc2 (reduce f acc1 r2)]
acc2)))))
(defn run-count!
"Runs the supplied procedure (via reduce), for purposes of side effects, on successive items. See [clojure.core/run!]
Returns the number of items processed."
[proc reducible]
(let [cnt (volatile! 0)]
(reduce (fn [_ item] (vswap! cnt inc) (proc item)) nil reducible)
@cnt))
(ns metabase.util.queue
(:import
(java.util.concurrent ArrayBlockingQueue SynchronousQueue TimeUnit)))
(java.time Duration Instant)
(java.util.concurrent ArrayBlockingQueue DelayQueue Delayed SynchronousQueue TimeUnit)))
(set! *warn-on-reflection* true)
......@@ -47,3 +48,40 @@
(SynchronousQueue.)
block-ms
sleep-ms))
(defrecord DelayValue [value ^Instant ready-at]
Delayed
(getDelay [_ unit]
(.convert unit (- (.toEpochMilli ready-at) (System/currentTimeMillis)) TimeUnit/MILLISECONDS))
(compareTo [this other]
(Long/compare (.getDelay this TimeUnit/MILLISECONDS)
(.getDelay ^Delayed other TimeUnit/MILLISECONDS))))
(defn delay-queue
"Return an unbounded queue that returns each item only after some specified delay."
^DelayQueue []
(DelayQueue.))
(defn put-with-delay!
"Put an item on the delay queue, with a delay given in milliseconds."
[^DelayQueue queue delay-ms value]
(.offer queue (->DelayValue value (.plus (Instant/now) (Duration/ofMillis delay-ms)))))
(defn- take-delayed-batch* [^DelayQueue queue max-items ^long poll-ms acc]
(loop [acc acc]
(if (>= (count acc) max-items)
acc
(if-let [item (if (pos? poll-ms)
(.poll queue poll-ms TimeUnit/MILLISECONDS)
(.poll queue))]
(recur (conj acc (:value item)))
(not-empty acc)))))
(defn take-delayed-batch!
"Get up to `max-items` of the ready items off a given delay queue."
([queue max-items]
(take-delayed-batch* queue max-items 0 []))
([^DelayQueue queue max-items ^long max-first-ms ^long max-next-ms]
(if-let [fst (.poll queue max-first-ms TimeUnit/MILLISECONDS)]
(take-delayed-batch* queue max-items max-next-ms [(:value fst)])
nil)))
......@@ -83,3 +83,20 @@
(testing "The realtime events are processed in order"
(mt/ordered-subset? realtime-events processed))))
(deftest delay-queue-test
(let [q (queue/delay-queue)]
(dotimes [_ 5]
(queue/put-with-delay! q (+ 100 (* 100 (Math/random))) (System/nanoTime)))
(is (empty? (queue/take-delayed-batch! q 3)))
(is (nil? (queue/take-delayed-batch! q 3 90 0)))
(is (< 0 (count (queue/take-delayed-batch! q 3 110 0))))
(is (> 4 (count (queue/take-delayed-batch! q 3))))
(is (empty? (queue/take-delayed-batch! q 3))))
;; TODO
;; add a bunch of stuff, from a bunch of queues
;; check that it isn't all ready immediately
;; check that we don't lose anything
;; check that it matures (roughly) in order
)
......@@ -563,3 +563,14 @@
(eduction (map inc) (range 3))
(eduction (map dec) (range 10 0 -1)))
[25])))))
(deftest ^:parallel run-count!-test
(testing "counts the things"
(is (zero? (u/run-count! inc nil)))
(is (zero? (u/run-count! inc [])))
(is (= 3 (u/run-count! inc (range 3))))
(is (= 3 (u/run-count! inc (eduction (map inc) (range 3))))))
(testing "does the stuff"
(let [acc (volatile! [])]
(u/run-count! #(vswap! acc conj %) (eduction (map inc) (range 3)))
(is (= [1 2 3] @acc)))))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment