Skip to content
Snippets Groups Projects
Unverified Commit f5d43617 authored by Ngoc Khuat's avatar Ngoc Khuat Committed by GitHub
Browse files

Fix pulse being sent multiple times (#45703)

parent 2a4a2839
No related branches found
No related tags found
No related merge requests found
......@@ -113,36 +113,16 @@
(t2/delete! :model/PulseChannel :id [:in ids-to-delete])
(set ids-to-delete)))
(defn- ms-duration->priority
"Converts a duration in milliseconds to a priority value for a trigger.
Pulses are time sensitive, so we need it to have higher priority than other tasks, in which has a default priority of 5."
[ms-duration]
(-> ms-duration
(/ 1000)
;; assuming 1 hour is the max duration a pulse can take
(#(- (* 60 60) %))
;; make sure it's higher than 5 (the default priority)
(max 6)
int))
(defn- send-pulse!*
"Do several things:
- Clear PulseChannels that have no recipients and no channel set for a pulse
- Send a pulse to a list of channels
- Update the priority of the trigger if the pulse is sent successfully"
[schedule-map pulse-id channel-ids]
- Send a pulse to a list of channels"
[pulse-id channel-ids]
(let [cleared-channel-ids (clear-pulse-channels-no-recipients! pulse-id)
to-send-channel-ids (set/difference channel-ids cleared-channel-ids)
to-send-enabled-channel-ids (t2/select-pks-set :model/PulseChannel :id [:in to-send-channel-ids] :enabled true)]
(if (seq to-send-enabled-channel-ids)
(let [start (System/currentTimeMillis)
result (send-pulse! pulse-id to-send-enabled-channel-ids)
end (System/currentTimeMillis)
priority (ms-duration->priority (- end start))]
(when (= :done result)
(log/infof "Updating priority of trigger %s to %d" (.getName ^TriggerKey (send-pulse-trigger-key pulse-id schedule-map)) priority)
(task/reschedule-trigger! (send-pulse-trigger pulse-id schedule-map channel-ids (send-trigger-timezone) priority))))
(send-pulse! pulse-id to-send-enabled-channel-ids)
(log/infof "Skip sending pulse %d because all channels have no recipients" pulse-id))))
;; called in [driver/report-timezone] setter
......@@ -163,9 +143,8 @@
(jobs/defjob ^{:doc "Triggers that send a pulse to a list of channels at a specific time"}
SendPulse
[context]
(let [{:strs [pulse-id channel-ids]} (qc/from-job-data context)
trigger-key (.. context getTrigger getKey getName)]
(send-pulse!* (:schedule-map (send-pulse-trigger-key->info trigger-key)) pulse-id channel-ids)))
(let [{:strs [pulse-id channel-ids]} (qc/from-job-data context)]
(send-pulse!* pulse-id channel-ids)))
;;; ------------------------------------------------ Job: InitSendPulseTriggers ----------------------------------------------------
......
......@@ -93,33 +93,13 @@
:channel_type :slack
:details {}}
daily-at-1am)]
(#'task.send-pulses/send-pulse!* daily-at-1am pulse #{pc pc-disabled pc-no-recipient})
(#'task.send-pulses/send-pulse!* pulse #{pc pc-disabled pc-no-recipient})
(testing "only send to enabled channels that has recipients"
(is (= #{pc} @sent-channel-ids)))
(testing "channels that has no recipients are deleted"
(is (false? (t2/exists? :model/PulseChannel pc-no-recipient)))))))))
(deftest send-pulse!*-update-trigger-priority-test
(testing "send-pulse!* should update the priority of the trigger based on the duration of the pulse"
(pulse-channel-test/with-send-pulse-setup!
(with-redefs [task.send-pulses/ms-duration->priority (constantly 7)
metabase.pulse/send-pulse! (constantly nil)]
(mt/with-temp
[:model/Pulse {pulse :id} {}
:model/PulseChannel {pc :id} (merge
{:pulse_id pulse
:channel_type :slack
:details {:channel "#random"}}
daily-at-1am)]
(testing "priority is 6 to start with"
(is (= 6 (-> (pulse-channel-test/send-pulse-triggers pulse) first :priority))))
(#'task.send-pulses/send-pulse!* daily-at-1am pulse #{pc})
(testing "send pulse should update its priority"
;; 5 is the default priority of a trigger, we need it to be higher than that because
;; pulse is time sensitive compared to other tasks like sync
(is (= 7 (-> (pulse-channel-test/send-pulse-triggers pulse) first :priority)))))))))
(deftest init-send-pulse-triggers!-group-runs-test
(testing "a SendJob trigger will send pulse to channels that have the same schedueld time"
(pulse-channel-test/with-send-pulse-setup!
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment