Skip to content
Snippets Groups Projects
Unverified Commit df2e2404 authored by Alexander Solovyov's avatar Alexander Solovyov Committed by GitHub
Browse files

[search] metrics in logs and exported to prometheus (#50339)

parent 53000311
No related branches found
No related tags found
No related merge requests found
......@@ -210,7 +210,10 @@
(prometheus/counter :metabase-query-processor/metrics-adjust
{:description "Number of queries with metrics processed by the metrics adjust middleware."})
(prometheus/counter :metabase-query-processor/metrics-adjust-errors
{:description "Number of errors when processing metrics in the metrics adjust middleware."})])
{:description "Number of errors when processing metrics in the metrics adjust middleware."})
(prometheus/counter :metabase-search/index
{:description "Number of entries indexed for search"
:labels [:model]})])
(defn- setup-metrics!
"Instrument the application. Conditionally done when some setting is set. If [[prometheus-server-port]] is not set it
......@@ -267,8 +270,15 @@
(defn inc!
"Call iapetos.core/inc on the metric in the global registry,
if it has already been initialized and the metric is registered."
[metric]
(some-> system .-registry metric prometheus/inc))
([metric] (inc! metric nil 1))
([metric labels-or-amount]
(if (seq? labels-or-amount)
(inc! metric labels-or-amount 1)
(inc! metric nil labels-or-amount)))
([metric labels amount]
(when-let [registry (some-> system .-registry)]
(when (metric registry)
(prometheus/inc registry metric labels amount)))))
(comment
(require 'iapetos.export)
......
......@@ -70,7 +70,7 @@
(search/supports-index?)
(if (task/job-exists? task.search-index/reindex-job-key)
(do (task/trigger-now! task.search-index/reindex-job-key) {:message "task triggered"})
(do (search/reindex!) {:message "done"}))
(do (task.search-index/reindex!) {:message "done"}))
:else
(throw (ex-info "Search index is not supported for this installation." {:status-code 501}))))
......
......@@ -11,6 +11,7 @@
[metabase.search.postgres.index :as search.index]
[metabase.search.postgres.ingestion :as search.ingestion]
[metabase.search.postgres.scoring :as search.scoring]
[metabase.util :as u]
[toucan2.core :as t2])
(:import
(java.time OffsetDateTime)))
......@@ -158,8 +159,8 @@
[]
(search.index/ensure-ready! false)
(search.index/maybe-create-pending!)
(search.ingestion/populate-index!)
(search.index/activate-pending!))
(u/prog1 (search.ingestion/populate-index!)
(search.index/activate-pending!)))
(comment
(init! true)
......
......@@ -252,7 +252,7 @@
(batch-upsert! *active-table* entries))
(when @reindexing?
(batch-upsert! pending-table entries))
(count entries)))
(->> entries (map :model) frequencies)))
(defn search-query
"Query fragment for all models corresponding to a query parameter `:search-term`."
......
......@@ -90,7 +90,7 @@
(m/distinct-by (juxt :id :model))
(map ->entry)
(partition-all insert-batch-size)))
(transduce (map search.index/batch-update!) + 0)))
(transduce (map search.index/batch-update!) (partial merge-with +))))
(defn populate-index!
"Go over all searchable items and populate the index with them."
......@@ -112,9 +112,8 @@
"Wait up to 'delay-ms' for a queued batch to become ready, and process the batch if we get one.
Returns the number of search index entries that get updated as a result."
[first-delay-ms next-delay-ms]
(if-let [queued-updates (queue/take-delayed-batch! queue batch-max first-delay-ms next-delay-ms)]
(bulk-ingest! queued-updates)
0))
(when-let [queued-updates (queue/take-delayed-batch! queue batch-max first-delay-ms next-delay-ms)]
(bulk-ingest! queued-updates)))
(defn- index-worker-exists? []
(task/job-exists? @(requiring-resolve 'metabase.task.search-index/reindex-job-key))
......
......@@ -5,9 +5,11 @@
[clojurewerkz.quartzite.jobs :as jobs]
[clojurewerkz.quartzite.schedule.simple :as simple]
[clojurewerkz.quartzite.triggers :as triggers]
[metabase.analytics.prometheus :as prometheus]
[metabase.search :as search]
[metabase.search.postgres.ingestion :as search.ingestion]
[metabase.task :as task]
[metabase.util :as u]
[metabase.util.log :as log])
(:import
(org.quartz DisallowConcurrentExecution JobDetail Trigger)))
......@@ -30,28 +32,37 @@
;; We define the job bodies outside the defrecord, so that we can redefine them live from the REPL
(defn- reindex! []
(defn- report->prometheus! [report]
(doseq [[model cnt] report]
(prometheus/inc! :metabase-search/index {:model model} cnt)))
(defn reindex!
"Reindex the whole AppDB"
[]
(when (search/supports-index?)
(if (not @recreated?)
(do (log/info "Recreating search index from the latest schema")
;; Each instance in a multi-instance deployment will recreate the table the first time it is selected to run
;; the job, resulting in a momentary lack of search results.
;; One solution to this would be to store metadata about the index in another table, which we can use to
;; determine whether it was built by another version of Metabase and should be rebuilt.
(search/init-index! {:force-reset? (not @recreated?)})
(reset! recreated? true))
(do (log/info "Reindexing searchable entities")
(search/reindex!)))
;; It would be nice to output how many entries were updated.
(log/info "Done indexing.")))
(let [timer (u/start-timer)
report (if (not @recreated?)
(do (log/info "Recreating search index from the latest schema")
;; Each instance in a multi-instance deployment will recreate the table the first time it is
;; selected to run the job, resulting in a momentary lack of search results. One solution to
;; this would be to store metadata about the index in another table, which we can use to
;; determine whether it was built by another version of Metabase and should be rebuilt.
(u/prog1 (search/init-index! {:force-reset? (not @recreated?)})
(reset! recreated? true)))
(do (log/info "Reindexing searchable entities")
(search/reindex!)))]
(report->prometheus! report)
(log/infof "Done indexing in %.0fms %s" (u/since-ms timer) (sort-by (comp - val) report)))))
(defn- update-index! []
(when (search/supports-index?)
(while true
(let [updated-entry-count (search.ingestion/process-next-batch Long/MAX_VALUE 100)]
(when (pos? updated-entry-count)
(log/infof "Updated %d search index entries" updated-entry-count))))))
(let [timer (u/start-timer)
report (search.ingestion/process-next-batch Long/MAX_VALUE 100)]
(when (seq report)
(report->prometheus! report)
(log/debugf "Indexed search entries in %.0fms %s" (u/since-ms timer) (sort-by (comp - val) report)))))))
(defn- force-scheduled-task! [^JobDetail job ^Trigger trigger]
;; For some reason, using the schedule-task! with a non-durable job causes it to only fire on the first trigger.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment