Skip to content
Snippets Groups Projects
Unverified Commit 6a7a3c3c authored by Cam Saul's avatar Cam Saul Committed by GitHub
Browse files

Merge pull request #9372 from metabase/tasks-cleanup-and-debug-page

Tasks cleanup; tasks debugging admin page
parents f2dc4811 3af45676
No related branches found
No related tags found
No related merge requests found
Showing
with 631 additions and 282 deletions
......@@ -22,8 +22,11 @@ import AdminPeopleApp from "metabase/admin/people/containers/AdminPeopleApp.jsx"
import FieldApp from "metabase/admin/datamodel/containers/FieldApp.jsx";
import TableSettingsApp from "metabase/admin/datamodel/containers/TableSettingsApp.jsx";
import TroubleshootingApp from "metabase/admin/tasks/containers/TroubleshootingApp";
import TasksApp from "metabase/admin/tasks/containers/TasksApp";
import TaskModal from "metabase/admin/tasks/containers/TaskModal";
import JobInfoApp from "metabase/admin/tasks/containers/JobInfoApp";
import JobTriggersModal from "metabase/admin/tasks/containers/JobTriggersModal";
// People
import PeopleListingApp from "metabase/admin/people/containers/PeopleListingApp.jsx";
......@@ -38,7 +41,7 @@ const getRoutes = (store, IsAdmin) => (
title={t`Admin`}
component={withBackground("bg-white")(IsAdmin)}
>
<IndexRedirect to="/admin/settings" />
<IndexRedirect to="settings" />
<Route path="databases" title={t`Databases`}>
<IndexRoute component={DatabaseListApp} />
......@@ -80,16 +83,27 @@ const getRoutes = (store, IsAdmin) => (
</Route>
{/* Troubleshooting */}
<Route path="troubleshooting" title={t`Troubleshooting`}>
<Route
path="troubleshooting"
title={t`Troubleshooting`}
component={TroubleshootingApp}
>
<IndexRedirect to="tasks" />
<Route path="tasks" component={TasksApp}>
<ModalRoute path=":taskId" modal={TaskModal} />
</Route>
<Route path="jobs" component={JobInfoApp}>
<ModalRoute
path=":jobKey"
modal={JobTriggersModal}
modalProps={{ wide: true }}
/>
</Route>
</Route>
{/* SETTINGS */}
<Route path="settings" title={t`Settings`}>
<IndexRedirect to="/admin/settings/setup" />
<IndexRedirect to="setup" />
{/* <IndexRoute component={SettingsEditorApp} /> */}
<Route path=":section/:authType" component={SettingsEditorApp} />
<Route path=":section" component={SettingsEditorApp} />
......
import React from "react";
import { t } from "c-3po";
import { connect } from "react-redux";
import { Box, Flex } from "grid-styled";
import LoadingAndErrorWrapper from "metabase/components/LoadingAndErrorWrapper.jsx";
import AdminHeader from "metabase/components/AdminHeader";
import Link from "metabase/components/Link";
import { fetchJobInfo } from "../jobInfo";
const renderSchedulerInfo = scheduler => {
return (
scheduler && (
<Flex align="center">
<pre>{scheduler.join("\n")}</pre>
</Flex>
)
);
};
const renderJobsTable = jobs => {
return (
jobs && (
<table className="ContentTable mt2">
<thead>
<tr>
<th>{t`Key`}</th>
<th>{t`Class`}</th>
<th>{t`Description`}</th>
<th>{t`Triggers`}</th>
</tr>
</thead>
<tbody>
{jobs &&
jobs.map(job => (
<tr key={job.key}>
<td className="text-bold">{job.key}</td>
<td>{job.class}</td>
<td>{job.description}</td>
<td>{job.durable}</td>
<td>
<Link
className="link"
to={`/admin/troubleshooting/jobs/${job.key}`}
>
{t`View triggers`}
</Link>
</td>
</tr>
))}
</tbody>
</table>
)
);
};
@connect(null, { fetchJobInfo })
export default class JobInfoApp extends React.Component {
async componentDidMount() {
try {
const info = (await this.props.fetchJobInfo()).payload;
this.setState({
scheduler: info.scheduler,
jobs: info.jobs,
error: null,
});
} catch (error) {
this.setState({ error });
}
}
render() {
const { children } = this.props;
const { error, scheduler, jobs } = this.state || {};
return (
<LoadingAndErrorWrapper loading={!scheduler} error={error}>
<Box p={3}>
<Flex align="center">
<AdminHeader title={t`Scheduler Info`} />
</Flex>
{renderSchedulerInfo(scheduler)}
{renderJobsTable(jobs)}
{
// render 'children' so that the invididual task modals show up
children
}
</Box>
</LoadingAndErrorWrapper>
);
}
}
import React from "react";
import { t } from "c-3po";
import { connect } from "react-redux";
import { goBack } from "react-router-redux";
import _ from "underscore";
import LoadingAndErrorWrapper from "metabase/components/LoadingAndErrorWrapper.jsx";
import ModalContent from "metabase/components/ModalContent";
import { fetchJobInfo } from "../jobInfo";
const renderTriggersTable = triggers => {
return (
<table className="ContentTable mt2">
<thead>
<tr>
<th>{t`Key`}</th>
<th>{t`Description`}</th>
<th>{t`State`}</th>
<th>{t`Priority`}</th>
<th>{t`Last Fired`}</th>
<th>{t`Next Fire Time`}</th>
<th>{t`Start Time`}</th>
<th>{t`End Time`}</th>
<th>{t`Final Fire Time`}</th>
<th>{t`May Fire Again?`}</th>
</tr>
</thead>
<tbody>
{triggers &&
triggers.map(trigger => (
<tr key={trigger.key}>
<td className="text-bold">{trigger.key}</td>
<td>{trigger.description}</td>
<td>{trigger.state}</td>
<td>{trigger.priority}</td>
<td>{trigger["previous-fire-time"]}</td>
<td>{trigger["next-fire-time"]}</td>
<td>{trigger["start-time"]}</td>
<td>{trigger["end-time"]}</td>
<td>{trigger["final-fire-time"]}</td>
<td>{trigger["may-fire-again?"] ? t`Yes` : t`No`}</td>
</tr>
))}
</tbody>
</table>
);
};
@connect(null, { fetchJobInfo, goBack })
export default class JobTriggersModal extends React.Component {
state = {
triggers: null,
error: null,
};
async componentDidMount() {
try {
const { jobKey } = this.props.params;
const jobs = jobKey && (await this.props.fetchJobInfo()).payload.jobs;
const job = jobs && _.findWhere(jobs, { key: jobKey });
const triggers = (job && job.triggers) || [];
this.setState({ triggers, error: null });
} catch (error) {
this.setState({ error });
}
}
render() {
const { params: { jobKey }, goBack } = this.props;
const { triggers, error } = this.state;
return (
<ModalContent title={t`Triggers for ${jobKey}`} onClose={goBack}>
<LoadingAndErrorWrapper loading={!triggers} error={error}>
{() => renderTriggersTable(triggers)}
</LoadingAndErrorWrapper>
</ModalContent>
);
}
}
/* eslint "react/prop-types": "warn" */
import React, { Component } from "react";
import PropTypes from "prop-types";
import { t } from "c-3po";
import {
LeftNavPane,
LeftNavPaneItem,
} from "metabase/components/LeftNavPane.jsx";
import AdminLayout from "metabase/components/AdminLayout.jsx";
export default class TroubleshootingApp extends Component {
static propTypes = {
children: PropTypes.any,
};
render() {
const { children } = this.props;
return (
<AdminLayout
sidebar={
<LeftNavPane>
<LeftNavPaneItem
name={t`Tasks`}
path="/admin/troubleshooting/tasks"
index
/>
<LeftNavPaneItem
name={t`Jobs`}
path="/admin/troubleshooting/jobs"
/>
</LeftNavPane>
}
>
{children}
</AdminLayout>
);
}
}
import { createThunkAction } from "metabase/lib/redux";
import { TaskApi } from "metabase/services";
export const FETCH_JOB_INFO = "metabase/admin/tasks/FETCH_JOB_INFO";
export const fetchJobInfo = createThunkAction(
FETCH_JOB_INFO,
() => async () => {
return await TaskApi.getJobsInfo();
},
);
......@@ -4,7 +4,7 @@ import { push } from "react-router-redux";
import { connect } from "react-redux";
import Modal from "metabase/components/Modal";
const ModalWithRoute = ComposedModal =>
const ModalWithRoute = (ComposedModal, modalProps = {}) =>
connect(null, { onChangeLocation: push })(
class extends Component {
static displayName = `ModalWithRoute[${ComposedModal.displayName ||
......@@ -21,7 +21,7 @@ const ModalWithRoute = ComposedModal =>
render() {
return (
<Modal onClose={this.onClose}>
<Modal {...modalProps} onClose={this.onClose}>
<ComposedModal {...this.props} onClose={this.onClose} />
</Modal>
);
......@@ -32,11 +32,11 @@ const ModalWithRoute = ComposedModal =>
// react-router Route wrapper that handles routed modals
export class ModalRoute extends Route {
static createRouteFromReactElement(element) {
const { modal } = element.props;
const { modal, modalProps } = element.props;
if (modal) {
element = React.cloneElement(element, {
component: ModalWithRoute(modal),
component: ModalWithRoute(modal, modalProps),
});
return Route.createRouteFromReactElement(element);
......
......@@ -314,6 +314,7 @@ export const I18NApi = {
export const TaskApi = {
get: GET("api/task"),
getJobsInfo: GET("api/task/info"),
};
export function setPublicQuestionEndpoints(uuid: string) {
......
org.quartz.scheduler.instanceName = MetabaseScheduler
org.quartz.scheduler.instanceId = AUTO
org.quartz.threadPool.threadCount = 4
# default is 10
org.quartz.threadPool.threadCount = 10
# Don't phone home
org.quartz.scheduler.skipUpdateCheck: true
......@@ -19,6 +20,16 @@ org.quartz.jobStore.isClustered = true
org.quartz.dataSource.db.validationQuery=SELECT 1
# By default, Quartz will fire triggers up to a minute late without considering them to be misfired; if it cannot fire
# anything within that period for one reason or another (such as all threads in the thread pool being tied up), the
# trigger is considered misfired. Threshold is in milliseconds.
#
# Default threshould is one minute (60,000)
# We'll bump it up to 15 minutes (900,000) because the sorts of things we're scheduling aren't extremely time-sensitive,
# for example Pulses and Sync can be sent out more than a minute late without issue. (In fact, 15 minutes late is better
# than not at all for such things)
org.quartz.jobStore.misfireThreshold=900000
# Useful for debugging when Quartz jobs run and when they misfire
#org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingTriggerHistoryPlugin
#org.quartz.plugin.triggHistory.triggerFiredMessage = Trigger \{1\}.\{0\} fired job \{6\}.\{5\} at: \{4, date, HH:mm:ss MM/dd/yyyy}
......
......@@ -3,6 +3,7 @@
(:require [compojure.core :refer [GET]]
[metabase.api.common :as api]
[metabase.models.task-history :as task-history :refer [TaskHistory]]
[metabase.task :as task]
[metabase.util
[i18n :as ui18n :refer [tru]]
[schema :as su]]
......@@ -41,4 +42,11 @@
[id]
(api/read-check TaskHistory id))
(api/defendpoint GET "/info"
"Return raw data about all scheduled tasks (i.e., Quartz Jobs and Triggers)."
[]
(api/check-superuser)
(task/scheduler-info))
(api/define-routes)
(ns metabase.models.task-history
(:require [metabase.models.interface :as i]
[metabase.util :as u]
[metabase.util.schema :as su]
[metabase.util
[date :as du]
[schema :as su]]
[schema.core :as s]
[toucan
[db :as db]
......@@ -10,8 +12,8 @@
(models/defmodel TaskHistory :task_history)
(defn cleanup-task-history!
"Deletes older TaskHistory rows. Will order TaskHistory by `ended_at` and delete everything after
`num-rows-to-keep`. This is intended for a quick cleanup of old rows."
"Deletes older TaskHistory rows. Will order TaskHistory by `ended_at` and delete everything after `num-rows-to-keep`.
This is intended for a quick cleanup of old rows. Returns `true` if something was deleted."
[num-rows-to-keep]
;; Ideally this would be one query, but MySQL does not allow nested queries with a limit. The query below orders the
;; tasks by the time they finished, newest first. Then finds the first row after skipping `num-rows-to-keep`. Using
......@@ -41,3 +43,50 @@
{:limit limit})
(when offset
{:offset offset}))))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | with-task-history macro |
;;; +----------------------------------------------------------------------------------------------------------------+
(def ^:private TaskHistoryInfo
"Schema for `info` passed to the `with-task-history` macro."
{:task su/NonBlankString ; task name, i.e. `send-pulses`. Conventionally lisp-cased
(s/optional-key :db_id) (s/maybe s/Int) ; DB involved, for sync operations or other tasks where this is applicable.
(s/optional-key :task_details) (s/maybe su/Map)}) ; additional map of details to include in the recorded row
(defn- save-task-history! [start-time-ms info]
(let [end-time-ms (System/currentTimeMillis)
duration-ms (- end-time-ms start-time-ms)]
(db/insert! TaskHistory
(assoc info
:started_at (du/->Timestamp start-time-ms)
:ended_at (du/->Timestamp end-time-ms)
:duration duration-ms))))
(s/defn do-with-task-history
"Impl for `with-task-history` macro; see documentation below."
[info :- TaskHistoryInfo, f]
(let [start-time-ms (System/currentTimeMillis)]
(try
(u/prog1 (f)
(save-task-history! start-time-ms info))
(catch Throwable e
(let [info (assoc info :task_details {:status :failed
:exception (class e)
:message (.getMessage e)
:stacktrace (u/filtered-stacktrace e)
:ex-data (ex-data e)
:original-info (:task_details info)})]
(save-task-history! start-time-ms info))
(throw e)))))
(defmacro with-task-history
"Execute `body`, recording a TaskHistory entry when the task completes; if it failed to complete, records an entry
containing information about the Exception. `info` should contain at least a name for the task (conventionally
lisp-cased) as `:task`; see the `TaskHistoryInfo` schema in this namespace for other optional keys.
(with-task-history {:task \"send-pulses\"}
...)"
{:style/indent 1}
[info & body]
`(do-with-task-history ~info (fn [] ~@body)))
......@@ -7,7 +7,8 @@
function which accepts zero arguments. This function is dynamically resolved and called exactly once when the
application goes through normal startup procedures. Inside this function you can do any work needed and add your
task to the scheduler as usual via `schedule-task!`."
(:require [clojure.tools.logging :as log]
(:require [clojure.string :as str]
[clojure.tools.logging :as log]
[clojurewerkz.quartzite.scheduler :as qs]
[metabase
[db :as mdb]
......@@ -23,6 +24,22 @@
(defonce ^:private quartz-scheduler
(atom nil))
;; whenever the value of `quartz-scheduler` changes:
;;
;; 1. shut down the old scheduler, if there was one
;; 2. start the new scheduler, if there is one
(add-watch
quartz-scheduler
::quartz-scheduler-watcher
(fn [_ _ old-scheduler new-scheduler]
(when-not (identical? old-scheduler new-scheduler)
(when old-scheduler
(log/debug (trs "Stopping Quartz Scheduler {0}" old-scheduler))
(qs/shutdown old-scheduler))
(when new-scheduler
(log/debug (trs "Starting Quartz Scheduler {0}" new-scheduler))
(qs/start new-scheduler)))))
(defn- scheduler
"Fetch the instance of our Quartz scheduler. Call this function rather than dereffing the atom directly because there
are a few places (e.g., in tests) where we swap the instance out."
......@@ -35,16 +52,40 @@
;;; | FINDING & LOADING TASKS |
;;; +----------------------------------------------------------------------------------------------------------------+
(defmulti init!
"Initialize (i.e., schedule) Job(s) with a given name. All implementations of this method are called once and only
once when the Quartz task scheduler is initialized. Task namespaces (`metabase.task.*`) should add new
implementations of this method to schedule the jobs they define (i.e., with a call to `schedule-task!`.)
The dispatch value for this function can be any unique keyword, but by convention is a namespaced keyword version of
the name of the Job being initialized; for sake of consistency with the Job name itself, the keyword should be left
CamelCased.
(defmethod task/init! ::SendPulses [_]
(task/schedule-task! my-job my-trigger))"
{:arglists '([job-name-string])}
keyword)
(defn- find-and-load-tasks!
"Search Classpath for namespaces that start with `metabase.tasks.`, then `require` them so initialization can happen."
[]
;; first, load all the task namespaces
(doseq [ns-symb @u/metabase-namespace-symbols
:when (.startsWith (name ns-symb) "metabase.task.")]
(log/info (trs "Loading tasks namespace:") (u/format-color 'blue ns-symb) (u/emoji "📆"))
(require ns-symb)
;; look for `task-init` function in the namespace and call it if it exists
(when-let [init-fn (ns-resolve ns-symb 'task-init)]
(init-fn))))
(try
(log/debug (trs "Loading tasks namespace:") (u/format-color 'blue ns-symb))
(require ns-symb)
(catch Throwable e
(log/error e (trs "Error loading tasks namespace {0}" ns-symb)))))
;; next, call all implementations of `init!`
(doseq [[k f] (methods init!)]
(try
;; don't bother logging namespace for now, maybe in the future if there's tasks of the same name in multiple
;; namespaces we can log it
(log/info (trs "Initializing task {0}" (u/format-color 'green (name k))) (u/emoji "📆"))
(f k)
(catch Throwable e
(log/error e (trs "Error initializing task {0}" k))))))
;;; +----------------------------------------------------------------------------------------------------------------+
......@@ -63,31 +104,30 @@
(System/setProperty "org.quartz.jobStore.driverDelegateClass" "org.quartz.impl.jdbcjobstore.PostgreSQLDelegate"))
;; set other properties like URL, user, and password so Quartz knows how to connect
(doseq [[k, ^String v] {:driver classname
:URL (str "jdbc:" subprotocol \: subname)
:URL (format "jdbc:%s:%s" subprotocol subname)
:user user
:password password}]
(when v
(System/setProperty (str "org.quartz.dataSource.db." (name k)) v)))))
(def ^:private start-scheduler-lock (Object.))
(defn start-scheduler!
"Start our Quartzite scheduler which allows jobs to be submitted and triggers to begin executing."
[]
(when-not @quartz-scheduler
(set-jdbc-backend-properties!)
(log/debug (trs "Starting Quartz Scheduler"))
;; keep a reference to our scheduler
(reset! quartz-scheduler (qs/start (qs/initialize)))
;; look for job/trigger definitions
(find-and-load-tasks!)))
(locking start-scheduler-lock
(when-not @quartz-scheduler
(set-jdbc-backend-properties!)
;; keep a reference to our scheduler
(reset! quartz-scheduler (qs/initialize))
;; look for job/trigger definitions
(find-and-load-tasks!)))))
(defn stop-scheduler!
"Stop our Quartzite scheduler and shutdown any running executions."
[]
(log/debug (trs "Stopping Quartz Scheduler"))
;; tell quartz to stop everything
(when-let [scheduler (scheduler)]
(qs/shutdown scheduler))
;; reset our scheduler reference
;; setting `quartz-scheduler` to nil will cause it to shut down via the watcher on it
(reset! quartz-scheduler nil))
......@@ -102,7 +142,7 @@
(try
(qs/schedule scheduler job trigger)
(catch org.quartz.ObjectAlreadyExistsException _
(log/info (trs "Job already exists:") (-> job .getKey .getName))))))
(log/debug (trs "Job already exists:") (-> job .getKey .getName))))))
(s/defn delete-task!
"delete a task from the scheduler"
......@@ -128,3 +168,43 @@
[trigger-key :- TriggerKey]
(when-let [scheduler (scheduler)]
(qs/delete-trigger scheduler trigger-key)))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | Scheduler Info |
;;; +----------------------------------------------------------------------------------------------------------------+
(defn- job-detail->info [^JobDetail job-detail]
{:key (-> (.getKey job-detail) .getName)
:class (-> (.getJobClass job-detail) .getCanonicalName)
:description (.getDescription job-detail)
:concurrent-executation-disallowed? (.isConcurrentExectionDisallowed job-detail)
:durable? (.isDurable job-detail)
:requests-recovery? (.requestsRecovery job-detail)})
(defn- trigger->info [^Trigger trigger]
{:description (.getDescription trigger)
:end-time (.getEndTime trigger)
:final-fire-time (.getFinalFireTime trigger)
:key (-> (.getKey trigger) .getName)
:state (some->> (.getKey trigger) (.getTriggerState (scheduler)) str)
:next-fire-time (.getNextFireTime trigger)
:previous-fire-time (.getPreviousFireTime trigger)
:priority (.getPriority trigger)
:start-time (.getStartTime trigger)
:may-fire-again? (.mayFireAgain trigger)})
(defn scheduler-info
"Return raw data about all the scheduler and scheduled tasks (i.e. Jobs and Triggers). Primarily for debugging
purposes."
[]
{:scheduler
(str/split-lines (.getSummary (.getMetaData (scheduler))))
:jobs
(for [^JobKey job-key (->> (.getJobKeys (scheduler) nil)
(sort-by #(.getName ^JobKey %) ))]
(assoc (job-detail->info (qs/get-job (scheduler) job-key))
:triggers (for [trigger (->> (qs/get-triggers-of-job (scheduler) job-key)
(sort-by #(-> ^Trigger % .getKey .getName)))]
(trigger->info trigger))))})
......@@ -21,12 +21,9 @@
[metabase.util.i18n :refer [trs]]
[toucan.db :as db]))
(declare send-follow-up-email! send-abandonment-email!)
(def ^:private follow-up-emails-job-key "metabase.task.follow-up-emails.job")
(def ^:private follow-up-emails-trigger-key "metabase.task.follow-up-emails.trigger")
(defonce ^:private follow-up-emails-job (atom nil))
(defonce ^:private follow-up-emails-trigger (atom nil))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | send follow-up emails |
;;; +----------------------------------------------------------------------------------------------------------------+
(setting/defsetting ^:private follow-up-email-sent
;; No need to i18n this as it's not user facing
......@@ -35,28 +32,30 @@
:default false
:internal? true)
(def ^:private abandonment-emails-job-key "metabase.task.abandonment-emails.job")
(def ^:private abandonment-emails-trigger-key "metabase.task.abandonment-emails.trigger")
(defonce ^:private abandonment-emails-job (atom nil))
(defonce ^:private abandonment-emails-trigger (atom nil))
(setting/defsetting ^:private abandonment-email-sent
"Have we sent an abandonment email to the instance admin?"
:type :boolean
:default false
:internal? true)
(defn- send-follow-up-email!
"Send an email to the instance admin following up on their experience with Metabase thus far."
[]
;; we need access to email AND the instance must be opted into anonymous tracking. Make sure email hasn't been sent yet
(when (and (email/email-configured?)
(public-settings/anon-tracking-enabled)
(not (follow-up-email-sent)))
;; grab the oldest admins email address (likely the user who created this MB instance), that's who we'll send to
;; TODO - Does it make to send to this user instead of `(public-settings/admin-email)`?
(when-let [admin (User :is_superuser true, :is_active true, {:order-by [:date_joined]})]
(try
(messages/send-follow-up-email! (:email admin) "follow-up")
(catch Throwable e
(log/error "Problem sending follow-up email:" e))
(finally
(follow-up-email-sent true))))))
(defn- instance-creation-timestamp
"The date this Metabase instance was created. We use the `:date_joined` of the first `User` to determine this."
^java.sql.Timestamp []
(db/select-one-field :date_joined User, {:order-by [[:date_joined :asc]]}))
;; 2 weeks of inactivity after 30 days of total install
;; this sends out a general 2 week email follow up email
(jobs/defjob FollowUpEmail
[ctx]
(jobs/defjob FollowUpEmail [_]
;; if we've already sent the follow-up email then we are done
(when-not (follow-up-email-sent)
;; figure out when we consider the instance created
......@@ -66,68 +65,31 @@
(- (System/currentTimeMillis) (.getTime instance-created)))
(send-follow-up-email!)))))
;; this sends out an email any time after 30 days if the instance has stopped being used for 14 days
(jobs/defjob AbandonmentEmail
[ctx]
;; if we've already sent the abandonment email then we are done
(when-not (abandonment-email-sent)
;; figure out when we consider the instance created
(when-let [instance-created (instance-creation-timestamp)]
;; we need to be 4+ weeks (30 days) from creation to send the follow up
(when (< (* 30 24 60 60 1000)
(- (System/currentTimeMillis) (.getTime instance-created)))
;; we need access to email AND the instance must be opted into anonymous tracking
(when (and (email/email-configured?)
(public-settings/anon-tracking-enabled))
(send-abandonment-email!))))))
(defn task-init
"Automatically called during startup; start the job for sending follow up emails."
[]
;; FollowUpEmail job + trigger
(reset! follow-up-emails-job (jobs/build
(jobs/of-type FollowUpEmail)
(jobs/with-identity (jobs/key follow-up-emails-job-key))))
(reset! follow-up-emails-trigger (triggers/build
(triggers/with-identity (triggers/key follow-up-emails-trigger-key))
(triggers/start-now)
(triggers/with-schedule
;; run once a day
(cron/cron-schedule "0 0 12 * * ? *"))))
;; submit ourselves to the scheduler
(task/schedule-task! @follow-up-emails-job @follow-up-emails-trigger)
(def ^:private follow-up-emails-job-key "metabase.task.follow-up-emails.job")
(def ^:private follow-up-emails-trigger-key "metabase.task.follow-up-emails.trigger")
;; AbandonmentEmail job + trigger
(reset! abandonment-emails-job (jobs/build
(jobs/of-type AbandonmentEmail)
(jobs/with-identity (jobs/key abandonment-emails-job-key))))
(reset! abandonment-emails-trigger (triggers/build
(triggers/with-identity (triggers/key abandonment-emails-trigger-key))
(triggers/start-now)
(triggers/with-schedule
;; run once a day
(cron/cron-schedule "0 0 12 * * ? *"))))
;; submit ourselves to the scheduler
(task/schedule-task! @abandonment-emails-job @abandonment-emails-trigger))
(defmethod task/init! ::SendFollowUpEmails [_]
(let [job (jobs/build
(jobs/of-type FollowUpEmail)
(jobs/with-identity (jobs/key follow-up-emails-job-key)))
trigger (triggers/build
(triggers/with-identity (triggers/key follow-up-emails-trigger-key))
(triggers/start-now)
(triggers/with-schedule
;; run once a day
(cron/cron-schedule "0 0 12 * * ? *")))]
(task/schedule-task! job trigger)))
(defn- send-follow-up-email!
"Send an email to the instance admin following up on their experience with Metabase thus far."
[]
;; we need access to email AND the instance must be opted into anonymous tracking. Make sure email hasn't been sent yet
(when (and (email/email-configured?)
(public-settings/anon-tracking-enabled)
(not (follow-up-email-sent)))
;; grab the oldest admins email address (likely the user who created this MB instance), that's who we'll send to
;; TODO - Does it make to send to this user instead of `(public-settings/admin-email)`?
(when-let [admin (User :is_superuser true, :is_active true, {:order-by [:date_joined]})]
(try
(messages/send-follow-up-email! (:email admin) "follow-up")
(catch Throwable e
(log/error "Problem sending follow-up email:" e))
(finally
(follow-up-email-sent true))))))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | send abandoment emails |
;;; +----------------------------------------------------------------------------------------------------------------+
(setting/defsetting ^:private abandonment-email-sent
"Have we sent an abandonment email to the instance admin?"
:type :boolean
:default false
:internal? true)
(defn- send-abandonment-email!
"Send an email to the instance admin about why Metabase usage has died down."
......@@ -148,3 +110,32 @@
(log/error e (trs "Problem sending abandonment email")))
(finally
(abandonment-email-sent true)))))))
;; this sends out an email any time after 30 days if the instance has stopped being used for 14 days
(jobs/defjob AbandonmentEmail [_]
;; if we've already sent the abandonment email then we are done
(when-not (abandonment-email-sent)
;; figure out when we consider the instance created
(when-let [instance-created (instance-creation-timestamp)]
;; we need to be 4+ weeks (30 days) from creation to send the follow up
(when (< (* 30 24 60 60 1000)
(- (System/currentTimeMillis) (.getTime instance-created)))
;; we need access to email AND the instance must be opted into anonymous tracking
(when (and (email/email-configured?)
(public-settings/anon-tracking-enabled))
(send-abandonment-email!))))))
(def ^:private abandonment-emails-job-key "metabase.task.abandonment-emails.job")
(def ^:private abandonment-emails-trigger-key "metabase.task.abandonment-emails.trigger")
(defmethod task/init! ::SendAbandomentEmails [_]
(let [job (jobs/build
(jobs/of-type AbandonmentEmail)
(jobs/with-identity (jobs/key abandonment-emails-job-key)))
trigger (triggers/build
(triggers/with-identity (triggers/key abandonment-emails-trigger-key))
(triggers/start-now)
(triggers/with-schedule
;; run once a day
(cron/cron-schedule "0 0 12 * * ? *")))]
(task/schedule-task! job trigger)))
......@@ -12,36 +12,27 @@
[i18n :refer [trs]]
[stats :as stats]]))
(def ^:private job-key "metabase.task.anonymous-stats.job")
(def ^:private trigger-key "metabase.task.anonymous-stats.trigger")
(defonce ^:private job (atom nil))
(defonce ^:private trigger (atom nil))
;; if we can collect usage data, do so and send it home
(jobs/defjob SendAnonymousUsageStats
[ctx]
(jobs/defjob SendAnonymousUsageStats [_]
(when (public-settings/anon-tracking-enabled)
(log/debug (trs "Sending anonymous usage stats."))
(try
;; TODO: add in additional request params if anonymous tracking is enabled
(stats/phone-home-stats!)
(catch Throwable e
(log/error e (trs "Error sending anonymous usage stats: "))))))
(log/error e (trs "Error sending anonymous usage stats"))))))
(def ^:private job-key "metabase.task.anonymous-stats.job")
(def ^:private trigger-key "metabase.task.anonymous-stats.trigger")
(defn task-init
"Job initialization"
[]
;; build our job
(reset! job (jobs/build
(jobs/of-type SendAnonymousUsageStats)
(jobs/with-identity (jobs/key job-key))))
;; build our trigger
(reset! trigger (triggers/build
(triggers/with-identity (triggers/key trigger-key))
(triggers/start-now)
(triggers/with-schedule
;; run twice a day
(cron/cron-schedule "0 15 7 * * ? *"))))
;; submit ourselves to the scheduler
(task/schedule-task! @job @trigger))
(defmethod task/init! ::SendAnonymousUsageStats [_]
(let [job (jobs/build
(jobs/of-type SendAnonymousUsageStats)
(jobs/with-identity (jobs/key job-key)))
trigger (triggers/build
(triggers/with-identity (triggers/key trigger-key))
(triggers/start-now)
(triggers/with-schedule
;; run twice a day
(cron/cron-schedule "0 15 7 * * ? *")))]
(task/schedule-task! job trigger)))
......@@ -14,72 +14,10 @@
[metabase.models
[pulse :as pulse]
[pulse-channel :as pulse-channel]
[setting :as setting]]
[setting :as setting]
[task-history :as task-history]]
[metabase.util.i18n :refer [trs]]))
(declare send-pulses!)
(def ^:private ^:const send-pulses-job-key "metabase.task.send-pulses.job")
(def ^:private ^:const send-pulses-trigger-key "metabase.task.send-pulses.trigger")
(defonce ^:private send-pulses-job (atom nil))
(defonce ^:private send-pulses-trigger (atom nil))
(defn- monthday [dt]
(cond
(timepr/first-day-of-month? dt) :first
(timepr/last-day-of-month? dt) :last
(= 15 (time/day dt)) :mid
:else :other))
(defn- monthweek [dt]
(let [curr-day-of-month (time/day dt)
last-of-month (time/day (time/last-day-of-the-month dt))
start-of-last-week (- last-of-month 7)]
(cond
(> 8 curr-day-of-month) :first
(< start-of-last-week curr-day-of-month) :last
:else :other)))
;; triggers the sending of all pulses which are scheduled to run in the current hour
(jobs/defjob SendPulses
[ctx]
(try
;; determine what time it is right now (hour-of-day & day-of-week) in reporting timezone
(let [reporting-timezone (setting/get :report-timezone)
now (if (empty? reporting-timezone)
(time/now)
(time/to-time-zone (time/now) (time/time-zone-for-id reporting-timezone)))
curr-hour (time/hour now)
;; joda time produces values of 1-7 here (Mon -> Sun) and we subtract 1 from it to
;; make the values zero based to correspond to the indexes in pulse-channel/days-of-week
curr-weekday (->> (dec (time/day-of-week now))
(get pulse-channel/days-of-week)
:id)
curr-monthday (monthday now)
curr-monthweek (monthweek now)]
(send-pulses! curr-hour curr-weekday curr-monthday curr-monthweek))
(catch Throwable e
(log/error e (trs "SendPulses task failed")))))
(defn task-init
"Automatically called during startup; start the job for sending pulses."
[]
;; build our job
(reset! send-pulses-job (jobs/build
(jobs/of-type SendPulses)
(jobs/with-identity (jobs/key send-pulses-job-key))))
;; build our trigger
(reset! send-pulses-trigger (triggers/build
(triggers/with-identity (triggers/key send-pulses-trigger-key))
(triggers/start-now)
(triggers/with-schedule
;; run at the top of every hour
(cron/cron-schedule "0 0 * * * ? *"))))
;; submit ourselves to the scheduler
(task/schedule-task! @send-pulses-job @send-pulses-trigger))
;;; ------------------------------------------------- PULSE SENDING --------------------------------------------------
(defn- log-pulse-exception [pulse-id exception]
......@@ -103,9 +41,71 @@
(let [channels-by-pulse (group-by :pulse_id (pulse-channel/retrieve-scheduled-channels hour weekday monthday monthweek))]
(doseq [pulse-id (keys channels-by-pulse)]
(try
(log/debug (format "Starting Pulse Execution: %d" pulse-id))
(when-let [pulse (pulse/retrieve-notification pulse-id :archived false)]
(p/send-pulse! pulse :channel-ids (mapv :id (get channels-by-pulse pulse-id))))
(log/debug (format "Finished Pulse Execution: %d" pulse-id))
(task-history/with-task-history {:task (format "send-pulse %s" pulse-id)}
(log/debug (format "Starting Pulse Execution: %d" pulse-id))
(when-let [pulse (pulse/retrieve-notification pulse-id :archived false)]
(p/send-pulse! pulse :channel-ids (mapv :id (get channels-by-pulse pulse-id))))
(log/debug (format "Finished Pulse Execution: %d" pulse-id)))
(catch Throwable e
(on-error pulse-id e)))))))
;;; ------------------------------------------------------ Task ------------------------------------------------------
(defn- monthday [dt]
(cond
(timepr/first-day-of-month? dt) :first
(timepr/last-day-of-month? dt) :last
(= 15 (time/day dt)) :mid
:else :other))
(defn- monthweek [dt]
(let [curr-day-of-month (time/day dt)
last-of-month (time/day (time/last-day-of-the-month dt))
start-of-last-week (- last-of-month 7)]
(cond
(> 8 curr-day-of-month) :first
(< start-of-last-week curr-day-of-month) :last
:else :other)))
;; triggers the sending of all pulses which are scheduled to run in the current hour
(jobs/defjob SendPulses [_]
(try
(task-history/with-task-history {:task "send-pulses"}
;; determine what time it is right now (hour-of-day & day-of-week) in reporting timezone
(let [reporting-timezone (setting/get :report-timezone)
now (if (empty? reporting-timezone)
(time/now)
(time/to-time-zone (time/now) (time/time-zone-for-id reporting-timezone)))
curr-hour (time/hour now)
;; joda time produces values of 1-7 here (Mon -> Sun) and we subtract 1 from it to
;; make the values zero based to correspond to the indexes in pulse-channel/days-of-week
curr-weekday (->> (dec (time/day-of-week now))
(get pulse-channel/days-of-week)
:id)
curr-monthday (monthday now)
curr-monthweek (monthweek now)]
(send-pulses! curr-hour curr-weekday curr-monthday curr-monthweek)))
(catch Throwable e
(log/error e (trs "SendPulses task failed")))))
(def ^:private send-pulses-job-key "metabase.task.send-pulses.job")
(def ^:private send-pulses-trigger-key "metabase.task.send-pulses.trigger")
(defmethod task/init! ::SendPulses [_]
(let [job (jobs/build
(jobs/of-type SendPulses)
(jobs/with-identity (jobs/key send-pulses-job-key)))
trigger (triggers/build
(triggers/with-identity (triggers/key send-pulses-trigger-key))
(triggers/start-now)
(triggers/with-schedule
(cron/schedule
;; run at the top of every hour
(cron/cron-schedule "0 0 * * * ? *")
;; If a trigger misfires (i.e., Quartz cannot run our job for one reason or another, such as all
;; worker threads being busy), attempt to fire the triggers again ASAP. This article does a good
;; job explaining what this means:
;; https://www.nurkiewicz.com/2012/04/quartz-scheduler-misfire-instructions.html
(cron/with-misfire-handling-instruction-ignore-misfires))))]
(task/schedule-task! job trigger)))
......@@ -14,7 +14,9 @@
[analyze :as analyze]
[field-values :as field-values]
[sync-metadata :as sync-metadata]]
[metabase.util.cron :as cron-util]
[metabase.util
[cron :as cron-util]
[i18n :refer [trs]]]
[schema.core :as s]
[toucan.db :as db])
(:import metabase.models.database.DatabaseInstance
......@@ -184,10 +186,10 @@
(task/add-job! sync-analyze-job)
(task/add-job! field-values-job))
(defn task-init
"Automatically called during startup; start the jobs for syncing/analyzing and updating FieldValues for all
Databases."
[]
(defmethod task/init! ::SyncDatabases [_]
(job-init)
(doseq [database (db/select Database)]
(schedule-tasks-for-db! database)))
(try
(schedule-tasks-for-db! database)
(catch Throwable e
(log/error e (trs "Failed to scheduler tasks for Database {0}" (:id database)))))))
(ns metabase.task.task-history-cleanup
(:require [clj-time.core :as time]
[clojure.tools.logging :as log]
(:require [clojure.tools.logging :as log]
[clojurewerkz.quartzite
[jobs :as jobs]
[triggers :as triggers]]
[clojurewerkz.quartzite.schedule.cron :as cron]
[metabase.models.task-history :as thist :refer [TaskHistory]]
[metabase.models.task-history :as task-history]
[metabase.task :as task]
[metabase.util.date :as du]
[puppetlabs.i18n.core :refer [trs]]
[toucan.db :as db]))
(def ^:private job-name "task-history-cleanup")
(def ^:private job-key (format "metabase.task.%s.job" job-name))
(def ^:private trigger-key (format "metabase.task.%s.trigger" job-name))
(defonce ^:private job (atom nil))
(defonce ^:private trigger (atom nil))
[metabase.util.i18n :refer [trs]]))
(def ^:private history-rows-to-keep
"Maximum number of TaskHistory rows. This is not a `const` so that we can redef it in tests"
100000)
(defn- task-history-cleanup!
[]
(log/debug "Cleaning up task history")
(let [before-cleanup (time/now)
result (thist/cleanup-task-history! history-rows-to-keep)
after-cleanup (time/now)]
(db/insert! TaskHistory {:task job-name
:started_at (du/->Timestamp before-cleanup)
:ended_at (du/->Timestamp after-cleanup)
:duration (du/calculate-duration before-cleanup after-cleanup)})
(log/debug (trs "Task history cleanup successful, rows were {0}deleted"
(when-not result (str (trs "not")
" "))))))
(defn- task-history-cleanup! []
(log/debug (trs "Cleaning up task history"))
(task-history/with-task-history {:task "task-history-cleanup"}
(let [deleted-rows? (task-history/cleanup-task-history! history-rows-to-keep)]
(log/debug
(if deleted-rows?
(trs "Task history cleanup successful, rows were deleted")
(trs "Task history cleanup successful, no rows were deleted"))))))
(jobs/defjob TaskHistoryCleanup
[_]
(jobs/defjob TaskHistoryCleanup [_]
(task-history-cleanup!))
(defn task-init
"Job initialization"
[]
;; build our job
(reset! job (jobs/build
(jobs/of-type TaskHistoryCleanup)
(jobs/with-identity (jobs/key job-key))))
;; build our trigger
(reset! trigger (triggers/build
(triggers/with-identity (triggers/key trigger-key))
(triggers/start-now)
(triggers/with-schedule
;; run every day at midnight
(cron/cron-schedule "0 0 * * * ? *"))))
;; submit ourselves to the scheduler
(task/schedule-task! @job @trigger))
(def ^:private job-key "metabase.task.task-history-cleanup.job")
(def ^:private trigger-key "metabase.task.task-history-cleanup.trigger")
(defmethod task/init! ::TaskHistoryCleanup [_]
(let [job (jobs/build
(jobs/of-type TaskHistoryCleanup)
(jobs/with-identity (jobs/key job-key)))
trigger (triggers/build
(triggers/with-identity (triggers/key trigger-key))
(triggers/start-now)
(triggers/with-schedule
;; run every day at midnight
(cron/cron-schedule "0 0 * * * ? *")))]
(task/schedule-task! job trigger)))
......@@ -13,12 +13,6 @@
[task :as task]]
[metabase.util.i18n :refer [trs]]))
(def ^:private ^:const job-key "metabase.task.upgrade-checks.job")
(def ^:private ^:const trigger-key "metabase.task.upgrade-checks.trigger")
(defonce ^:private job (atom nil))
(defonce ^:private trigger (atom nil))
(defn- get-version-info []
(let [version-info-url (config/config-str :mb-version-info-url)
{:keys [status body]} (http/get version-info-url {:content-type "application/json"})]
......@@ -27,8 +21,7 @@
(json/parse-string body keyword)))
;; simple job which looks up all databases and runs a sync on them
(jobs/defjob CheckForNewVersions
[ctx]
(jobs/defjob CheckForNewVersions [_]
(when (public-settings/check-for-updates)
(log/debug (trs "Checking for new Metabase version info."))
(try
......@@ -36,21 +29,19 @@
(when-let [version-info (get-version-info)]
(public-settings/version-info version-info))
(catch Throwable e
(log/error e (trs "Error fetching version info: "))))))
(log/error e (trs "Error fetching version info"))))))
(def ^:private job-key "metabase.task.upgrade-checks.job")
(def ^:private trigger-key "metabase.task.upgrade-checks.trigger")
(defn task-init
"Job initialization"
[]
;; build our job
(reset! job (jobs/build
(jobs/of-type CheckForNewVersions)
(jobs/with-identity (jobs/key job-key))))
;; build our trigger
(reset! trigger (triggers/build
(triggers/with-identity (triggers/key trigger-key))
(triggers/start-now)
(triggers/with-schedule
;; run twice a day
(cron/cron-schedule "0 15 6,18 * * ? *"))))
;; submit ourselves to the scheduler
(task/schedule-task! @job @trigger))
(defmethod task/init! ::CheckForNewVersions [_]
(let [job (jobs/build
(jobs/of-type CheckForNewVersions)
(jobs/with-identity (jobs/key job-key)))
trigger (triggers/build
(triggers/with-identity (triggers/key trigger-key))
(triggers/start-now)
(triggers/with-schedule
;; run twice a day
(cron/cron-schedule "0 15 6,18 * * ? *")))]
(task/schedule-task! job trigger)))
......@@ -14,7 +14,7 @@
(let [task-2 (tu/random-name)
task-3 (tu/random-name)]
(expect
#{task-2 task-3 (var-get #'cleanup-task/job-name)}
#{task-2 task-3 "task-history-cleanup"}
(let [t1-start (time/now)
t2-start (tht/add-second t1-start)
t3-start (tht/add-second t2-start)]
......@@ -34,7 +34,7 @@
task-2 (tu/random-name)
task-3 (tu/random-name)]
(expect
#{task-1 task-2 task-3 (var-get #'cleanup-task/job-name)}
#{task-1 task-2 task-3 "task-history-cleanup"}
(let [t1-start (time/now)
t2-start (tht/add-second t1-start)
t3-start (tht/add-second t2-start)]
......
......@@ -465,9 +465,10 @@
(defn do-with-temp-scheduler [f]
(let [temp-scheduler (qs/start (qs/initialize))]
(with-scheduler temp-scheduler
(try (f)
(finally
(qs/shutdown temp-scheduler))))))
(try
(f)
(finally
(qs/shutdown temp-scheduler))))))
(defmacro with-temp-scheduler
"Execute BODY with a temporary scheduler in place.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment