From 04d24fb3b84320ffa949b73f507e4e7151481886 Mon Sep 17 00:00:00 2001
From: Simon Belak <simon.belak@gmail.com>
Date: Tue, 19 Sep 2017 20:30:04 +0200
Subject: [PATCH] Add docs, test, cancelation

---
 src/metabase/feature_extraction/async.clj     | 44 ++++++++++++++-----
 .../feature_extraction/async_test.clj         | 31 +++++++++++++
 2 files changed, 63 insertions(+), 12 deletions(-)
 create mode 100644 test/metabase/feature_extraction/async_test.clj

diff --git a/src/metabase/feature_extraction/async.clj b/src/metabase/feature_extraction/async.clj
index c15d04ccb21..15cc6d1e331 100644
--- a/src/metabase/feature_extraction/async.clj
+++ b/src/metabase/feature_extraction/async.clj
@@ -5,34 +5,54 @@
              [computation-job-result :refer [ComputationJobResult]]]
             [toucan.db :as db]))
 
+(def ^:private running-jobs (atom {}))
+
+(defn done?
+  "Is the computation job done?"
+  [{:keys [status]}]
+  (= :done status))
+
+(defn running?
+  "Is the computation job still running?"
+  [{:keys [status]}]
+  (= :running status))
+
 (defn- save-result
   [{:keys [id]} payload]
-  (println id)
   (db/transaction
     (db/insert! ComputationJobResult
       :job_id     id
       :permanence :temporary
       :payload    payload)
-    (db/update! ComputationJob id :status :done)))
+    (db/update! ComputationJob id :status :done))
+  (swap! running-jobs dissoc id))
+
+(defn cancel
+  "Cancel computation job (if still running)."
+  [{:keys [id] :as job}]
+  (when (running? job)
+    (future-cancel (@running-jobs id))
+    (swap! running-jobs dissoc id)
+    (db/update! ComputationJob id :status :canceled)))
 
 (defn compute
+  "Compute closure `f` asynchronously. Returns id of the associated computation
+   job."
   [f]
   (let [job (db/insert! ComputationJob
                         :creator_id api/*current-user-id*
                         :status     :running
-                        :type       :simple-job)]
-    (future (save-result job (f)))
-    (:id job)))
-
-(defn done?
-  [{:keys [status]}]
-  (= :done status))
+                        :type       :simple-job)
+        id  (:id job)]
+    (swap! running-jobs assoc id (future (save-result job (f))))
+    id))
 
 (defn result
+  "Get result of an asynchronous computation job."
   [job]
   (if (done? job)
     (if-let [result (db/select-one ComputationJobResult :job_id (:id job))]
-      {:status  :done
-       :payload (:payload result)}
+      {:status :done
+       :result (:payload result)}
       {:status :result-not-available})
-    {:status :running}))
+    {:status (:status job)}))
diff --git a/test/metabase/feature_extraction/async_test.clj b/test/metabase/feature_extraction/async_test.clj
new file mode 100644
index 00000000000..661625689fa
--- /dev/null
+++ b/test/metabase/feature_extraction/async_test.clj
@@ -0,0 +1,31 @@
+(ns metabase.feature-extraction.async-test
+  (:require [expectations :refer :all]
+            [metabase.feature-extraction.async :refer :all]
+            [metabase.models.computation-job :refer [ComputationJob]]))
+
+(expect
+  true
+  (let [job-id (compute (constantly 1))]
+    (Thread/sleep 100)
+    (done? (ComputationJob job-id))))
+
+(expect
+  true
+  (let [job-id (compute #(do (Thread/sleep 10000) nil))]
+    (Thread/sleep 100)
+    (running? (ComputationJob job-id))))
+
+(expect
+  [true false false]
+  (let [job-id (compute #(do (Thread/sleep 100000) nil))]
+    (Thread/sleep 100)
+    (let [r? (running? (ComputationJob job-id))]
+      (cancel (ComputationJob job-id))
+      [r? (done? (ComputationJob job-id)) (running? (ComputationJob job-id))])))
+
+(expect
+  {:status :done
+   :result 1}
+  (let [job-id (compute (constantly 1))]
+    (Thread/sleep 100)
+    (result (ComputationJob job-id))))
-- 
GitLab