Skip to content
Snippets Groups Projects
Unverified Commit 52615c37 authored by Chris Truter's avatar Chris Truter Committed by GitHub
Browse files

Abandon deduplicating queue due to a race condition (#46014)

parent f0323abe
No related branches found
No related tags found
No related merge requests found
(ns metabase.util.queue
(:import
(java.util HashSet Queue Set)
(java.util.concurrent ArrayBlockingQueue BlockingQueue SynchronousQueue TimeUnit)))
(java.util.concurrent ArrayBlockingQueue SynchronousQueue TimeUnit)))
(set! *warn-on-reflection* true)
......@@ -40,59 +39,12 @@
(.clear sync-queue)
(.clear async-queue)))
;; Similar to ArrayTransferQueue, but drops events that are already in the queue.
(deftype ^:private DeduplicatingArrayTransferQueue
[^Queue async-queue
^BlockingQueue sync-queue
^Set queued-set
^long block-ms
^long sleep-ms]
BoundedTransferQueue
(maybe-put!
[_ msg]
(let [payload (:payload msg msg)]
;; we hold the lock while we push to avoid races
(locking queued-set
;; returns null if we have already enqueued the message
(when (.add queued-set payload)
(let [accepted? (.offer ^Queue async-queue msg)]
(when-not accepted?
(.remove queued-set payload))
accepted?)))))
(blocking-put! [_ timeout msg]
;; we cannot hold the lock while we push, so there is some chance of a duplicate
(when (locking queued-set (.add queued-set (:payload msg msg)))
(.offer sync-queue msg timeout TimeUnit/MILLISECONDS)))
(blocking-take! [_ timeout]
(loop [time-remaining timeout]
;; we lock here to avoid leaving a blocking entry behind that can never be cleared
(or (locking queued-set
(when-let [msg (or (.poll ^Queue async-queue)
(.poll sync-queue block-ms TimeUnit/MILLISECONDS))]
(.remove queued-set (:payload msg msg))
msg))
(do (Thread/sleep ^long sleep-ms)
(recur (- time-remaining block-ms sleep-ms))))))
(clear! [_]
(locking queued-set
(.clear sync-queue)
(.clear async-queue)
(.clear queued-set))))
(defn bounded-transfer-queue
"Create a bounded transfer queue, specialized based on the high-level options."
[capacity & {:keys [block-ms sleep-ms dedupe?]
[capacity & {:keys [block-ms sleep-ms]
:or {block-ms 100
sleep-ms 100
dedupe? false}}]
(if dedupe?
(->DeduplicatingArrayTransferQueue (ArrayBlockingQueue. capacity)
(SynchronousQueue.)
(HashSet.)
block-ms
sleep-ms)
(->ArrayTransferQueue (ArrayBlockingQueue. capacity)
sleep-ms 100}}]
(->ArrayTransferQueue (ArrayBlockingQueue. capacity)
(SynchronousQueue.)
block-ms
sleep-ms)))
sleep-ms))
......@@ -3,10 +3,7 @@
[clojure.test :refer [deftest is testing]]
[metabase.test :as mt]
[metabase.util :as u]
[metabase.util.queue :as queue])
(:import
(java.util Set)
(metabase.util.queue DeduplicatingArrayTransferQueue)))
[metabase.util.queue :as queue]))
(set! *warn-on-reflection* true)
......@@ -49,31 +46,27 @@
:dropped @dropped
:skipped @skipped})))))
(deftest deduplicating-bounded-blocking-queue-test
(doseq [dedupe? [true false]]
(let [realtime-event-count 500
backfill-event-count 1000
capacity (- realtime-event-count 100)
;; Enqueue background events from oldest to newest
backfill-events (range backfill-event-count)
;; Enqueue realtime events from newest to oldest
realtime-events (take realtime-event-count (reverse backfill-events))
queue (queue/bounded-transfer-queue capacity :sleep-ms 10 :block-ms 10 :dedupe? dedupe?)
(deftest bounded-transfer-queue-test
(let [realtime-event-count 500
backfill-event-count 1000
capacity (- realtime-event-count 100)
;; Enqueue background events from oldest to newest
backfill-events (range backfill-event-count)
;; Enqueue realtime events from newest to oldest
realtime-events (take realtime-event-count (reverse backfill-events))
queue (queue/bounded-transfer-queue capacity :sleep-ms 10 :block-ms 10)
{:keys [processed sent dropped skipped] :as _result}
(simulate-queue! queue
:backfill-events backfill-events
:realtime-events realtime-events)]
{:keys [processed sent dropped skipped] :as _result}
(simulate-queue! queue
:backfill-events backfill-events
:realtime-events realtime-events)]
(testing "We processed all the events that were enqueued"
(is (= (+ (count backfill-events) sent)
(count processed))))
(if dedupe?
(testing "Some items are deduplicated"
(is (pos? skipped)))
(testing "No items are skipped"
(is (zero? skipped))))
(testing "No items are skipped"
(is (zero? skipped)))
(testing "Some items are dropped"
(is (pos? dropped)))
......@@ -82,8 +75,4 @@
(is (= (set (concat backfill-events realtime-events)) (set processed))))
(testing "The realtime events are processed in order"
(mt/ordered-subset? realtime-events processed))
(when dedupe?
(testing "No phantom items are left in the set"
(is (zero? (.size ^Set (.-queued-set ^DeduplicatingArrayTransferQueue queue)))))))))
(mt/ordered-subset? realtime-events processed))))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment