Skip to content
Snippets Groups Projects
Unverified Commit a8ee0217 authored by Simon Belak's avatar Simon Belak Committed by GitHub
Browse files

Merge pull request #6538 from metabase/async-test-fix-race-condition

Async test fix race condition, part III
parents a510d8f3 60398e69
No related branches found
No related tags found
No related merge requests found
......@@ -23,8 +23,19 @@
"Has the computation job been canceled?"
(comp some? #{:canceled} :status))
(defn result
"Get result of an asynchronous computation job."
[job]
(if (done? job)
(if-let [result (db/select-one ComputationJobResult :job_id (:id job))]
{:status (:status job)
:result (:payload result)
:created-at (:created_at result)}
{:status :result-not-available})
{:status (:status job)}))
(defn- save-result
[{:keys [id]} payload]
[{:keys [id] :as job} payload callback]
(when-not (future-cancelled? (@running-jobs id))
(db/transaction
(db/insert! ComputationJobResult
......@@ -33,14 +44,17 @@
:payload payload)
(db/update! ComputationJob id
:status :done
:ended_at (u/new-sql-timestamp))))
:ended_at (u/new-sql-timestamp)))
(when callback
(callback job payload)))
(swap! running-jobs dissoc id)
(log/info (format "Async job %s done." id)))
(log/info (format "Async job %s done." id))
payload)
(defn- save-error
[{:keys [id]} error]
(when-not (future-cancelled? (@running-jobs id))
(let [error (Throwable->map error)]
[{:keys [id] :as job} error callback]
(let [error (Throwable->map error)]
(when-not (future-cancelled? (@running-jobs id))
(log/warn (format "Async job %s encountered an error:\n%s." id error))
(db/transaction
(db/insert! ComputationJobResult
......@@ -49,8 +63,11 @@
:payload error)
(db/update! ComputationJob id
:status :error
:ended_at (u/new-sql-timestamp)))))
(swap! running-jobs dissoc id))
:ended_at (u/new-sql-timestamp)))
(when callback
(callback job error)))
(swap! running-jobs dissoc id)
error))
(defn cancel
"Cancel computation job (if still running)."
......@@ -71,8 +88,8 @@
Uses the same logic as `metabase.api.card`."
[{:keys [created_at ended_at]}]
(let [duration (time-delta-seconds created_at ended_at)
ttl (* duration (public-settings/query-caching-ttl-ratio))
age (time-delta-seconds ended_at (java.util.Date.))]
ttl (* duration (public-settings/query-caching-ttl-ratio))
age (time-delta-seconds ended_at (java.util.Date.))]
(<= age ttl)))
(defn- cached-job
......@@ -80,19 +97,22 @@
(when (public-settings/enable-query-caching)
(let [job (db/select-one ComputationJob
:context (json/encode ctx)
:status [:not= "error"]
{:order-by [[:ended_at :desc]]})]
:status [:not= "error"])]
(when (some-> job fresh?)
job))))
(defn compute
"Compute closure `f` in context `ctx` asynchronously. Returns id of the
associated computation job.
associated computation job. Optionally takes a callback function `callback`
that will be called on compleation with 2 arguments: the ComputationJob object
and the result.
Will return cached result if query caching is enabled and a job with identical
context has successfully run within TTL."
[ctx f]
(or (-> ctx cached-job :id)
[ctx f & [callback]]
(or (when-let [job (cached-job ctx)]
(callback job (:result (result job)))
(:id job))
(let [{:keys [id] :as job} (db/insert! ComputationJob
:creator_id api/*current-user-id*
:status :running
......@@ -101,9 +121,9 @@
(log/info (format "Async job %s started." id))
(swap! running-jobs assoc id (future
(try
(save-result job (f))
(save-result job (f) callback)
(catch Throwable e
(save-error job e)))))
(save-error job e callback)))))
id)))
(defmacro with-async
......@@ -120,17 +140,6 @@
:closure (zipmap (quote ~binding-vars) ~binding-vars)}
(fn [] ~@body)))))
(defn result
"Get result of an asynchronous computation job."
[job]
(if (done? job)
(if-let [result (db/select-one ComputationJobResult :job_id (:id job))]
{:status (:status job)
:result (:payload result)
:created-at (:created_at result)}
{:status :result-not-available})
{:status (:status job)}))
(defn running-jobs-user
"Get all running jobs for a given user."
([] (running-jobs-user api/*current-user-id*))
......
......@@ -2,11 +2,12 @@
"Utilities for testing async API endpoints."
(:require [clojure.tools.logging :as log]
[metabase.feature-extraction.async :as async]
[metabase.models.computation-job :refer [ComputationJob]]))
[metabase.models.computation-job :refer [ComputationJob]]
[metabase.util :as u]))
(def ^:dynamic ^Integer *max-while-runtime*
"Maximal time in milliseconds `while-with-timeout` runs."
10000000)
100000)
(defmacro while-with-timeout
"Like `clojure.core/while` except it runs a maximum of `*max-while-runtime*`
......@@ -19,16 +20,22 @@
(when (>= (- (System/currentTimeMillis) start#) *max-while-runtime*)
(log/warn "While loop terminated due to exceeded max runtime."))))
(def ^:private job-done? (atom #{}))
(add-watch (deref #'async/running-jobs) :done-watch
(fn [_ _ old new]
(let [in-new? (set (keys new))]
(reduce #(swap! %1 conj %2)
job-done?
(remove in-new? (keys old))))))
(defn result!
"Blocking version of async/result."
[job-id]
(when-let [f (-> #'async/running-jobs
deref ; var
deref ; atom
(get job-id))]
(when-not (or (future-cancelled? f)
(future-done? f))
@f))
; Wait for the transaction to finish
(while-with-timeout (-> job-id ComputationJob async/running?))
(while-with-timeout (or (not (@job-done? job-id))
(-> job-id
ComputationJob
async/result
(find :result)
nil?)))
(async/result (ComputationJob job-id)))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment