Skip to content
Snippets Groups Projects
Unverified Commit 9fb1bd7d authored by dpsutton's avatar dpsutton Committed by GitHub
Browse files

Ensure we close the canceled chan in query processor after query (#44860)

Customer complaints that thread growth was unbounded. Investigated and
found bigquery involved and isolated to the cancel thread bits. Gist is
this:

```clojure
      (when cancel-chan
        (future                       ; this needs to run in a separate thread, because the <!! operation blocks forever
          (when (a/<!! cancel-chan)
            (log/debug "Received a message on the cancel channel; attempting to stop the BigQuery query execution")
            (reset! cancel-requested? true) ; signal the page iteration fn to stop
            (if-not (or (future-cancelled? res-fut) (future-done? res-fut))
              ;; somehow, even the FIRST page hasn't come back yet (i.e. the .query call above), so cancel the future to
              ;; interrupt the thread waiting on that response to come back
              ;; unfortunately, with this particular overload of .query, we have no access to (nor the ability to control)
              ;; the jobId, so we have no way to use the BigQuery client to cancel any job that might be running
              (future-cancel res-fut)
              (when (future-done? res-fut) ; canceled received after it was finished; may as well return it
                @res-fut)))))
```

The important parts is that we closeover the cancel-chan and start a
thread that is trying to pull from it. If we never put anything on that
channel or never close that channel this thread will just remain blocked
waiting.

In a thread dump, that looks like the following:
(note, I've made it set the thread names to make it easy to follow run a
query 30 times)

```
Full thread dump OpenJDK 64-Bit Server VM (21.0.2+13-LTS mixed mode):
...

"bigquery-cancel-thread" #73 [95747] prio=5 os_prio=31 cpu=2.76ms elapsed=208.53s tid=0x000000029e00f600 nid=95747 waiting on condition  [0x00000002c1266000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #108 [93211] prio=5 os_prio=31 cpu=154.60ms elapsed=128.72s tid=0x000000029f0dc200 nid=93211 waiting on condition  [0x000000029fa06000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #116 [116527] prio=5 os_prio=31 cpu=0.16ms elapsed=61.14s tid=0x00000002c1917c00 nid=116527 waiting on condition  [0x000000029fc12000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #118 [98315] prio=5 os_prio=31 cpu=161.69ms elapsed=60.27s tid=0x00000002b91cca00 nid=98315 waiting on condition  [0x000000029fe1e000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #119 [121607] prio=5 os_prio=31 cpu=0.13ms elapsed=59.35s tid=0x00000002b9236400 nid=121607 waiting on condition  [0x00000002a002a000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #120 [99079] prio=5 os_prio=31 cpu=0.13ms elapsed=58.55s tid=0x000000029e295600 nid=99079 waiting on condition  [0x00000002a0506000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #121 [122375] prio=5 os_prio=31 cpu=0.10ms elapsed=57.75s tid=0x00000002b921f400 nid=122375 waiting on condition  [0x00000002c0e4e000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #122 [123655] prio=5 os_prio=31 cpu=0.18ms elapsed=56.97s tid=0x0000000134bfa000 nid=123655 waiting on condition  [0x00000002c105a000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #123 [122631] prio=5 os_prio=31 cpu=0.11ms elapsed=56.28s tid=0x000000013569be00 nid=122631 waiting on condition  [0x00000002c1472000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #124 [96007] prio=5 os_prio=31 cpu=0.10ms elapsed=55.42s tid=0x0000000133b54e00 nid=96007 waiting on condition  [0x00000002c167e000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #125 [97799] prio=5 os_prio=31 cpu=0.09ms elapsed=54.73s tid=0x0000000131c19a00 nid=97799 waiting on condition  [0x00000002c2206000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #126 [96519] prio=5 os_prio=31 cpu=0.10ms elapsed=53.97s tid=0x000000012370f600 nid=96519 waiting on condition  [0x00000002c2412000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #127 [124167] prio=5 os_prio=31 cpu=0.10ms elapsed=53.33s tid=0x0000000134312200 nid=124167 waiting on condition  [0x00000002c28ea000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #128 [97287] prio=5 os_prio=31 cpu=0.12ms elapsed=52.57s tid=0x00000001233ae000 nid=97287 waiting on condition  [0x00000002c4706000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #129 [104715] prio=5 os_prio=31 cpu=0.10ms elapsed=51.83s tid=0x0000000131e47a00 nid=104715 waiting on condition  [0x00000002c4912000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #130 [102919] prio=5 os_prio=31 cpu=0.08ms elapsed=51.10s tid=0x0000000123540200 nid=102919 waiting on condition  [0x00000002c4b1e000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #131 [117263] prio=5 os_prio=31 cpu=0.17ms elapsed=50.37s tid=0x00000001233d1200 nid=117263 waiting on condition  [0x00000002c4d2a000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #132 [114951] prio=5 os_prio=31 cpu=0.09ms elapsed=49.71s tid=0x000000013484dc00 nid=114951 waiting on condition  [0x00000002c4f36000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #133 [103687] prio=5 os_prio=31 cpu=0.14ms elapsed=49.08s tid=0x0000000123517200 nid=103687 waiting on condition  [0x00000002c5142000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #134 [91419] prio=5 os_prio=31 cpu=0.08ms elapsed=48.38s tid=0x0000000131b43600 nid=91419 waiting on condition  [0x00000002c534e000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #135 [127495] prio=5 os_prio=31 cpu=0.14ms elapsed=47.66s tid=0x0000000131ce7000 nid=127495 waiting on condition  [0x00000002c555a000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #139 [103955] daemon prio=5 os_prio=31 cpu=0.12ms elapsed=23.22s tid=0x0000000121aaca00 nid=103955 waiting on condition  [0x00000002c5766000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #140 [105219] daemon prio=5 os_prio=31 cpu=0.10ms elapsed=22.54s tid=0x0000000133f47600 nid=105219 waiting on condition  [0x00000002c5972000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #141 [105731] daemon prio=5 os_prio=31 cpu=0.15ms elapsed=21.69s tid=0x00000001353ba800 nid=105731 waiting on condition  [0x00000002c5b7e000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #142 [114435] daemon prio=5 os_prio=31 cpu=0.08ms elapsed=20.89s tid=0x0000000131e76400 nid=114435 waiting on condition  [0x00000002c5d8a000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #143 [106499] daemon prio=5 os_prio=31 cpu=0.10ms elapsed=20.12s tid=0x0000000123107e00 nid=106499 waiting on condition  [0x00000002c5f96000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #144 [113923] daemon prio=5 os_prio=31 cpu=0.11ms elapsed=19.52s tid=0x00000002c1918400 nid=113923 waiting on condition  [0x00000002c61a2000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #145 [107011] daemon prio=5 os_prio=31 cpu=0.13ms elapsed=18.93s tid=0x00000002a6ad3e00 nid=107011 waiting on condition  [0x00000002c63ae000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #146 [113667] daemon prio=5 os_prio=31 cpu=0.12ms elapsed=18.37s tid=0x00000001341a2800 nid=113667 waiting on condition  [0x00000002c65ba000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #147 [113155] daemon prio=5 os_prio=31 cpu=0.16ms elapsed=17.74s tid=0x000000013445da00 nid=113155 waiting on condition  [0x00000002c67c6000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #148 [107779] daemon prio=5 os_prio=31 cpu=0.13ms elapsed=17.19s tid=0x0000000134704e00 nid=107779 waiting on condition  [0x00000002c69d2000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #149 [112643] daemon prio=5 os_prio=31 cpu=0.17ms elapsed=16.54s tid=0x000000011f185000 nid=112643 waiting on condition  [0x00000002c6bde000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #150 [108035] daemon prio=5 os_prio=31 cpu=0.24ms elapsed=15.95s tid=0x000000011f1be200 nid=108035 waiting on condition  [0x00000002c6dea000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #151 [108547] daemon prio=5 os_prio=31 cpu=0.11ms elapsed=15.38s tid=0x00000001302cf800 nid=108547 waiting on condition  [0x00000002c6ff6000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #152 [109059] daemon prio=5 os_prio=31 cpu=0.24ms elapsed=14.85s tid=0x0000000133c4be00 nid=109059 waiting on condition  [0x00000002c7202000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #153 [109315] daemon prio=5 os_prio=31 cpu=0.10ms elapsed=14.15s tid=0x00000001339dc000 nid=109315 waiting on condition  [0x00000002c740e000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #154 [109571] daemon prio=5 os_prio=31 cpu=0.17ms elapsed=13.57s tid=0x0000000134ea3600 nid=109571 waiting on condition  [0x00000002c761a000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #155 [110083] daemon prio=5 os_prio=31 cpu=0.20ms elapsed=12.95s tid=0x0000000121d7aa00 nid=110083 waiting on condition  [0x00000002c7826000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #156 [110339] daemon prio=5 os_prio=31 cpu=0.12ms elapsed=12.35s tid=0x0000000134ff5c00 nid=110339 waiting on condition  [0x00000002c7a32000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #157 [110595] daemon prio=5 os_prio=31 cpu=0.09ms elapsed=11.65s tid=0x0000000133e27000 nid=110595 waiting on condition  [0x00000002c7c3e000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #158 [111107] daemon prio=5 os_prio=31 cpu=0.13ms elapsed=11.04s tid=0x0000000131e8b800 nid=111107 waiting on condition  [0x00000002c7e4a000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #159 [174339] daemon prio=5 os_prio=31 cpu=0.13ms elapsed=10.51s tid=0x0000000134ff6400 nid=174339 waiting on condition  [0x00000002c8056000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #160 [173827] daemon prio=5 os_prio=31 cpu=0.12ms elapsed=9.79s tid=0x0000000133ff7c00 nid=173827 waiting on condition  [0x00000002c8262000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #161 [173315] daemon prio=5 os_prio=31 cpu=0.11ms elapsed=9.18s tid=0x0000000122c9b600 nid=173315 waiting on condition  [0x00000002c846e000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #162 [131331] daemon prio=5 os_prio=31 cpu=0.12ms elapsed=8.58s tid=0x0000000133b9c400 nid=131331 waiting on condition  [0x00000002c867a000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #163 [172547] daemon prio=5 os_prio=31 cpu=0.14ms elapsed=8.06s tid=0x0000000134d4cc00 nid=172547 waiting on condition  [0x00000002c8886000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #164 [131587] daemon prio=5 os_prio=31 cpu=0.15ms elapsed=7.35s tid=0x000000029e17f800 nid=131587 waiting on condition  [0x00000002c8a92000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #165 [131843] daemon prio=5 os_prio=31 cpu=0.16ms elapsed=6.71s tid=0x00000002b9513a00 nid=131843 waiting on condition  [0x00000002c8c9e000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #166 [132355] daemon prio=5 os_prio=31 cpu=0.10ms elapsed=6.12s tid=0x0000000122dede00 nid=132355 waiting on condition  [0x00000002c8eaa000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
--
"bigquery-cancel-thread" #167 [132867] daemon prio=5 os_prio=31 cpu=0.11ms elapsed=5.54s tid=0x00000001233b1000 nid=132867 waiting on condition  [0x00000002c90b6000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)

```

Simple repro:

```clojure
bigquery-profile=> (require '[clojure.core.async :as a])
nil
bigquery-profile=> (let [c (a/chan)
                         t (Thread. (fn []
                                      (when (a/<!! c)
                                        (println "got something")))
                                    "Forever-running")]
                     (.start t)
                     (a/close! c)
                     (Thread/sleep 4000)
                     (let [thread-names (into #{} (map ^[] Thread/.getName) (.keySet (Thread/getAllStackTraces)))]
                       (contains? thread-names "Forever-running")))
false
bigquery-profile=> (let [c (a/chan)
                         t (Thread. (fn []
                                      (when (a/<!! c)
                                        (println "got something")))
                                    "Forever-running")]
                     (.start t)
                     #_(a/close! c) ;; if we don't close it will never stop running
                     (Thread/sleep 4000)
                     (let [thread-names (into #{} (map ^[] Thread/.getName) (.keySet (Thread/getAllStackTraces)))]
                       (contains? thread-names "Forever-running")))
true
```

So this PR just adds a transducer around the rf that closes the
`cancel-chan` if it's open. Note that multiple closes are fine: from the
docstring: "Closing a closed channel is a no-op".
parent 02f93413
No related branches found
No related tags found
No related merge requests found
......@@ -742,7 +742,21 @@
(is (< 0 row-count 10000)))))
(catch clojure.lang.ExceptionInfo e
(is (= "Query cancelled"
(ex-message e))))))))))
(ex-message e))))))))
(testing "Cancel thread does not leak"
(mt/dataset test-data
(let [query (assoc-in (mt/query orders) [:query :limit] 2)
future-thread-names (fn []
;; kinda hacky but we don't control this thread pool
(into #{} (comp (map (fn [^Thread t] (.getName t)))
(filter #(str/includes? % "clojure-agent-send-off-pool")))
(.keySet (Thread/getAllStackTraces))))
count-before (count (future-thread-names))]
(dotimes [_ 10]
(mt/process-query query))
(let [count-after (count (future-thread-names))]
(is (< count-after (+ count-before 5))
"unbounded thread growth!")))))))
;; TODO Temporarily disabling due to flakiness (#33140)
#_
......
......@@ -60,7 +60,16 @@
[status result] (case status
::ready-to-reduce
(try
[::success (transduce identity rf-or-e reducible-rows)]
[::success (transduce (fn [rf]
(fn wrapper
([] (rf))
([acc]
(some-> *canceled-chan* a/close!)
(rf acc))
([acc row]
(rf acc row))))
rf-or-e
reducible-rows)]
(catch Throwable e
[::error (ex-info (i18n/tru "Error reducing result rows: {0}" (ex-message e))
{:type qp.error-type/qp}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment