diff --git a/src/metabase/feature_extraction/async.clj b/src/metabase/feature_extraction/async.clj index 002ccce8d4165521bb9d8fb160aa0c7345e1962c..d1fc9849c9ceffbfd47a66cc707e9c8b894cfcac 100644 --- a/src/metabase/feature_extraction/async.clj +++ b/src/metabase/feature_extraction/async.clj @@ -23,8 +23,19 @@ "Has the computation job been canceled?" (comp some? #{:canceled} :status)) +(defn result + "Get result of an asynchronous computation job." + [job] + (if (done? job) + (if-let [result (db/select-one ComputationJobResult :job_id (:id job))] + {:status (:status job) + :result (:payload result) + :created-at (:created_at result)} + {:status :result-not-available}) + {:status (:status job)})) + (defn- save-result - [{:keys [id]} payload] + [{:keys [id] :as job} payload callback] (when-not (future-cancelled? (@running-jobs id)) (db/transaction (db/insert! ComputationJobResult @@ -33,13 +44,15 @@ :payload payload) (db/update! ComputationJob id :status :done - :ended_at (u/new-sql-timestamp)))) + :ended_at (u/new-sql-timestamp))) + (when callback + (callback job payload))) (swap! running-jobs dissoc id) (log/info (format "Async job %s done." id)) payload) (defn- save-error - [{:keys [id]} error] + [{:keys [id] :as job} error callback] (let [error (Throwable->map error)] (when-not (future-cancelled? (@running-jobs id)) (log/warn (format "Async job %s encountered an error:\n%s." id error)) @@ -50,7 +63,9 @@ :payload error) (db/update! ComputationJob id :status :error - :ended_at (u/new-sql-timestamp)))) + :ended_at (u/new-sql-timestamp))) + (when callback + (callback job error))) (swap! running-jobs dissoc id) error)) @@ -88,12 +103,16 @@ (defn compute "Compute closure `f` in context `ctx` asynchronously. Returns id of the - associated computation job. + associated computation job. Optionally takes a callback function `callback` + that will be called on compleation with 2 arguments: the ComputationJob object + and the result. Will return cached result if query caching is enabled and a job with identical context has successfully run within TTL." - [ctx f] - (or (-> ctx cached-job :id) + [ctx f & [callback]] + (or (when-let [job (cached-job ctx)] + (callback job (:result (result job))) + (:id job)) (let [{:keys [id] :as job} (db/insert! ComputationJob :creator_id api/*current-user-id* :status :running @@ -102,9 +121,9 @@ (log/info (format "Async job %s started." id)) (swap! running-jobs assoc id (future (try - (save-result job (f)) + (save-result job (f) callback) (catch Throwable e - (save-error job e))))) + (save-error job e callback))))) id))) (defmacro with-async @@ -121,17 +140,6 @@ :closure (zipmap (quote ~binding-vars) ~binding-vars)} (fn [] ~@body))))) -(defn result - "Get result of an asynchronous computation job." - [job] - (if (done? job) - (if-let [result (db/select-one ComputationJobResult :job_id (:id job))] - {:status (:status job) - :result (:payload result) - :created-at (:created_at result)} - {:status :result-not-available}) - {:status (:status job)})) - (defn running-jobs-user "Get all running jobs for a given user." ([] (running-jobs-user api/*current-user-id*)) diff --git a/test/metabase/test/async.clj b/test/metabase/test/async.clj index 1c4983f6b0e909a2958b91c0b1d3706e5a552630..e0b291b3709c023d546bfd16139c774786fba746 100644 --- a/test/metabase/test/async.clj +++ b/test/metabase/test/async.clj @@ -7,7 +7,7 @@ (def ^:dynamic ^Integer *max-while-runtime* "Maximal time in milliseconds `while-with-timeout` runs." - 10000000) + 100000) (defmacro while-with-timeout "Like `clojure.core/while` except it runs a maximum of `*max-while-runtime*` @@ -20,20 +20,17 @@ (when (>= (- (System/currentTimeMillis) start#) *max-while-runtime*) (log/warn "While loop terminated due to exceeded max runtime.")))) +(def ^:private job-done? (atom #{})) + +(add-watch (deref #'async/running-jobs) :done-watch + (fn [_ _ old new] + (let [in-new? (set (keys new))] + (reduce #(swap! %1 conj %2) + job-done? + (remove in-new? (keys old)))))) + (defn result! "Blocking version of async/result." [job-id] - (let [f (-> #'async/running-jobs - deref ; var - deref ; atom - (get job-id))] - (if (and f (not (or (future-cancelled? f) - (future-done? f)))) - {:result @f - :status (-> job-id ComputationJob :status) - :created-at (u/new-sql-timestamp)} - (do - ;; Make sure the transaction has finished - (binding [*max-while-runtime* 1000] - (while-with-timeout (-> job-id ComputationJob async/running?))) - (async/result (ComputationJob job-id)))))) + (while-with-timeout (not (@job-done? job-id))) + (async/result (ComputationJob job-id)))