Skip to content
Snippets Groups Projects
Commit 09da415e authored by Simon Belak's avatar Simon Belak
Browse files

Change done detection to use watches.

parent d045cbff
No related branches found
No related tags found
No related merge requests found
......@@ -23,8 +23,19 @@
"Has the computation job been canceled?"
(comp some? #{:canceled} :status))
(defn result
"Get result of an asynchronous computation job."
[job]
(if (done? job)
(if-let [result (db/select-one ComputationJobResult :job_id (:id job))]
{:status (:status job)
:result (:payload result)
:created-at (:created_at result)}
{:status :result-not-available})
{:status (:status job)}))
(defn- save-result
[{:keys [id]} payload]
[{:keys [id] :as job} payload callback]
(when-not (future-cancelled? (@running-jobs id))
(db/transaction
(db/insert! ComputationJobResult
......@@ -33,13 +44,15 @@
:payload payload)
(db/update! ComputationJob id
:status :done
:ended_at (u/new-sql-timestamp))))
:ended_at (u/new-sql-timestamp)))
(when callback
(callback job payload)))
(swap! running-jobs dissoc id)
(log/info (format "Async job %s done." id))
payload)
(defn- save-error
[{:keys [id]} error]
[{:keys [id] :as job} error callback]
(let [error (Throwable->map error)]
(when-not (future-cancelled? (@running-jobs id))
(log/warn (format "Async job %s encountered an error:\n%s." id error))
......@@ -50,7 +63,9 @@
:payload error)
(db/update! ComputationJob id
:status :error
:ended_at (u/new-sql-timestamp))))
:ended_at (u/new-sql-timestamp)))
(when callback
(callback job error)))
(swap! running-jobs dissoc id)
error))
......@@ -88,12 +103,16 @@
(defn compute
"Compute closure `f` in context `ctx` asynchronously. Returns id of the
associated computation job.
associated computation job. Optionally takes a callback function `callback`
that will be called on compleation with 2 arguments: the ComputationJob object
and the result.
Will return cached result if query caching is enabled and a job with identical
context has successfully run within TTL."
[ctx f]
(or (-> ctx cached-job :id)
[ctx f & [callback]]
(or (when-let [job (cached-job ctx)]
(callback job (:result (result job)))
(:id job))
(let [{:keys [id] :as job} (db/insert! ComputationJob
:creator_id api/*current-user-id*
:status :running
......@@ -102,9 +121,9 @@
(log/info (format "Async job %s started." id))
(swap! running-jobs assoc id (future
(try
(save-result job (f))
(save-result job (f) callback)
(catch Throwable e
(save-error job e)))))
(save-error job e callback)))))
id)))
(defmacro with-async
......@@ -121,17 +140,6 @@
:closure (zipmap (quote ~binding-vars) ~binding-vars)}
(fn [] ~@body)))))
(defn result
"Get result of an asynchronous computation job."
[job]
(if (done? job)
(if-let [result (db/select-one ComputationJobResult :job_id (:id job))]
{:status (:status job)
:result (:payload result)
:created-at (:created_at result)}
{:status :result-not-available})
{:status (:status job)}))
(defn running-jobs-user
"Get all running jobs for a given user."
([] (running-jobs-user api/*current-user-id*))
......
......@@ -7,7 +7,7 @@
(def ^:dynamic ^Integer *max-while-runtime*
"Maximal time in milliseconds `while-with-timeout` runs."
10000000)
100000)
(defmacro while-with-timeout
"Like `clojure.core/while` except it runs a maximum of `*max-while-runtime*`
......@@ -20,20 +20,17 @@
(when (>= (- (System/currentTimeMillis) start#) *max-while-runtime*)
(log/warn "While loop terminated due to exceeded max runtime."))))
(def ^:private job-done? (atom #{}))
(add-watch (deref #'async/running-jobs) :done-watch
(fn [_ _ old new]
(let [in-new? (set (keys new))]
(reduce #(swap! %1 conj %2)
job-done?
(remove in-new? (keys old))))))
(defn result!
"Blocking version of async/result."
[job-id]
(let [f (-> #'async/running-jobs
deref ; var
deref ; atom
(get job-id))]
(if (and f (not (or (future-cancelled? f)
(future-done? f))))
{:result @f
:status (-> job-id ComputationJob :status)
:created-at (u/new-sql-timestamp)}
(do
;; Make sure the transaction has finished
(binding [*max-while-runtime* 1000]
(while-with-timeout (-> job-id ComputationJob async/running?)))
(async/result (ComputationJob job-id))))))
(while-with-timeout (not (@job-done? job-id)))
(async/result (ComputationJob job-id)))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment