Skip to content
Snippets Groups Projects
Commit dceeb370 authored by Sameer Al-Sakran's avatar Sameer Al-Sakran
Browse files

Merge branch 'release-0.26.0' of github.com:metabase/metabase-init into release-0.26.0

parents a94a68c0 7db972e4
No related branches found
No related tags found
No related merge requests found
......@@ -3828,88 +3828,4 @@ databaseChangeLog:
- dropForeignKeyConstraint:
baseTableName: raw_table
constraintName: fk_rawtable_ref_database
remarks: 'This FK prevents deleting databases even though RAW_TABLE is no longer used. The table is still around to support downgrades, but the FK reference is no longer needed.'
- changeSet:
id: 65
author: sbelak
comment: 'Added 0.26.0'
changes:
- createTable:
tableName: computation_job
remarks: 'Stores submitted async computation jobs.'
columns:
- column:
name: id
type: int
autoIncrement: true
constraints:
primaryKey: true
nullable: false
- column:
constraints:
deferrable: false
foreignKeyName: fk_computation_job_ref_user_id
initiallyDeferred: false
references: core_user(id)
name: creator_id
type: int
- column:
name: created_at
type: DATETIME
constraints:
nullable: false
- column:
name: updated_at
type: DATETIME
constraints:
nullable: false
- column:
name: type
type: varchar(254)
constraints:
nullable: false
- column:
name: status
type: varchar(254)
constraints:
nullable: false
- createTable:
tableName: computation_job_result
remarks: 'Stores results of async computation jobs.'
columns:
- column:
name: id
type: int
autoIncrement: true
constraints:
primaryKey: true
nullable: false
- column:
constraints:
deferrable: false
foreignKeyName: fk_computation_result_ref_job_id
initiallyDeferred: false
nullable: false
references: computation_job(id)
name: job_id
type: int
- column:
name: created_at
type: DATETIME
constraints:
nullable: false
- column:
name: updated_at
type: DATETIME
constraints:
nullable: false
- column:
name: permanence
type: varchar(254)
constraints:
nullable: false
- column:
name: payload
type: text
constraints:
nullable: false
\ No newline at end of file
remarks: 'This FK prevents deleting databases even though RAW_TABLE is no longer used. The table is still around to support downgrades, but the FK reference is no longer needed.'
\ No newline at end of file
......@@ -2,12 +2,10 @@
(:require [compojure.core :refer [GET PUT]]
[metabase.api.common :as api]
[metabase.feature-extraction
[async :as async]
[core :as fe]
[costs :as costs]]
[metabase.models
[card :refer [Card]]
[computation-job :refer [ComputationJob]]
[field :refer [Field]]
[metric :refer [Metric]]
[segment :refer [Segment]]
......@@ -58,26 +56,6 @@
max_computation_cost)})
fe/x-ray))
(api/defendpoint GET "/async/table/:id"
"Get x-ray for a `Tield` with ID."
[id max_query_cost max_computation_cost]
{max_query_cost MaxQueryCost
max_computation_cost MaxComputationCost}
(api/check-403 (costs/enable-xrays))
(let [table (api/read-check Table id)]
{:job-id (async/compute
#(->> table
(fe/extract-features {:max-cost (max-cost max_query_cost
max_computation_cost)})
fe/x-ray))}))
(api/defendpoint GET "/async/:id"
"Get x-ray for a `Tield` with ID."
[id]
(->> id
(api/read-check ComputationJob)
async/result))
(api/defendpoint GET "/segment/:id"
"Get x-ray for a `Segment` with ID."
[id max_query_cost max_computation_cost]
......
......@@ -29,8 +29,6 @@
[card-label :refer [CardLabel]]
[collection :refer [Collection]]
[collection-revision :refer [CollectionRevision]]
[computation-job :refer [ComputationJob]]
[computation-job-result :refer [ComputationJobResult]]
[dashboard :refer [Dashboard]]
[dashboard-card :refer [DashboardCard]]
[dashboard-card-series :refer [DashboardCardSeries]]
......@@ -100,8 +98,6 @@
CollectionRevision
DashboardFavorite
Dimension
ComputationJob
ComputationJobResult
;; migrate the list of finished DataMigrations as the very last thing (all models to copy over should be listed above this line)
DataMigrations])
......
(ns metabase.feature-extraction.async
(:require [metabase.api.common :as api]
[metabase.models
[computation-job :refer [ComputationJob]]
[computation-job-result :refer [ComputationJobResult]]]
[toucan.db :as db]))
(defonce ^:private running-jobs (atom {}))
(def ^{:arglists '([job])} done?
"Is the computation job done?"
(comp some? #{:done :error} :status))
(def ^{:arglists '([job])} running?
"Is the computation job still running?"
(comp some? #{:running} :status))
(defn- save-result
[{:keys [id]} payload]
(db/transaction
(db/insert! ComputationJobResult
:job_id id
:permanence :temporary
:payload payload)
(db/update! ComputationJob id :status :done))
(swap! running-jobs dissoc id))
(defn- save-error
[{:keys [id]} error]
(db/transaction
(db/insert! ComputationJobResult
:job_id id
:permanence :temporary
:payload (Throwable->map error))
(db/update! ComputationJob id :status :error))
(swap! running-jobs dissoc id))
(defn cancel
"Cancel computation job (if still running)."
[{:keys [id] :as job}]
(when (running? job)
(future-cancel (@running-jobs id))
(swap! running-jobs dissoc id)
(db/update! ComputationJob id :status :canceled)))
(defn compute
"Compute closure `f` asynchronously. Returns id of the associated computation
job."
[f]
(let [job (db/insert! ComputationJob
:creator_id api/*current-user-id*
:status :running
:type :simple-job)
id (:id job)]
(swap! running-jobs assoc id (future
(try
(save-result job (f))
(catch Exception e
(save-error job e)))))
id))
(defn result
"Get result of an asynchronous computation job."
[job]
(if (done? job)
(if-let [result (db/select-one ComputationJobResult :job_id (:id job))]
{:status (:status job)
:result (:payload result)}
{:status :result-not-available})
{:status (:status job)}))
(ns metabase.models.computation-job
(:require [metabase.api.common :as api]
[metabase.models.interface :as i]
[metabase.util :as u]
[toucan.models :as models]))
(models/defmodel ComputationJob :computation_job)
(defn- creator?
[{:keys [creator_id]}]
(= creator_id api/*current-user-id*))
(u/strict-extend (class ComputationJob)
models/IModel
(merge models/IModelDefaults
{:types (constantly {:status :keyword
:type :keyword})
:properties (constantly {:timestamped? true})})
i/IObjectPermissions
(merge i/IObjectPermissionsDefaults
{:can-read? creator?
:can-write? creator?}))
(ns metabase.models.computation-job-result
(:require [metabase.models.interface :as i]
[metabase.util :as u]
[toucan.models :as models]))
(models/defmodel ComputationJobResult :computation_job_result)
(u/strict-extend (class ComputationJobResult)
models/IModel
(merge models/IModelDefaults
{:types (constantly {:permanence :keyword
:payload :json})
:properties (constantly {:timestamped? true})}))
(ns metabase.task.cleanup-temporary-computation-job-results
"Cleanup of old async computation results."
(:require [clj-time.core :as t]
[clojurewerkz.quartzite
[jobs :as jobs]
[triggers :as triggers]]
[clojurewerkz.quartzite.schedule.daily-interval :as interval]
[metabase.task :as task]
[toucan.db :as db]))
(def ^:private temporary-result-lifetime (t/days 3))
(defn- cleanup-temporary-results!
[]
(db/delete! 'ComputationJobResult
:created_at [:< (-> (t/now)
(t/minus temporary-result-lifetime)
str)]))
(def ^:private ^:const cleanup-job-key "metabase.task.cleanup-temporary-computation-job-results.job")
(def ^:private ^:const cleanup-trigger-key "metabase.task.cleanup-temporary-computation-job-results.trigger")
(jobs/defjob Cleanup
[ctx]
(cleanup-temporary-results!))
(defonce ^:private cleanup-job (atom nil))
(defonce ^:private cleanup-trigger (atom nil))
(defn task-init
"Automatically called during startup; start the job for sending pulses."
[]
(reset! cleanup-job (jobs/build
(jobs/of-type Cleanup)
(jobs/with-identity (jobs/key cleanup-job-key))))
(reset! cleanup-trigger (triggers/build
(triggers/with-identity
(triggers/key cleanup-trigger-key))
(triggers/start-now)
(triggers/with-schedule
;; once per day at 3AM
(interval/schedule
(interval/starting-daily-at
(interval/time-of-day 03 00 00))))))
(task/schedule-task! @cleanup-job @cleanup-trigger))
(ns metabase.feature-extraction.async-test
(:require [expectations :refer :all]
[metabase.feature-extraction.async :refer :all]
[metabase.models.computation-job :refer [ComputationJob]]))
(expect
true
(let [job-id (compute (constantly 1))]
(Thread/sleep 100)
(done? (ComputationJob job-id))))
(expect
true
(let [job-id (compute #(do (Thread/sleep 10000) nil))]
(Thread/sleep 100)
(running? (ComputationJob job-id))))
(expect
[true false false]
(let [job-id (compute #(do (Thread/sleep 100000) nil))]
(Thread/sleep 100)
(let [r? (running? (ComputationJob job-id))]
(cancel (ComputationJob job-id))
[r? (done? (ComputationJob job-id)) (running? (ComputationJob job-id))])))
(expect
{:status :done
:result 1}
(let [job-id (compute (constantly 1))]
(Thread/sleep 100)
(result (ComputationJob job-id))))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment