Skip to content
Snippets Groups Projects
This project is mirrored from https://github.com/metabase/metabase. Pull mirroring updated .
  1. Jul 02, 2024
  2. Jul 01, 2024
  3. Jun 28, 2024
  4. Jun 27, 2024
    • Uladzimir Havenchyk's avatar
    • dpsutton's avatar
      Ensure we close the canceled chan in query processor after query (#44860) · 9fb1bd7d
      dpsutton authored
      Customer complaints that thread growth was unbounded. Investigated and
      found bigquery involved and isolated to the cancel thread bits. Gist is
      this:
      
      ```clojure
            (when cancel-chan
              (future                       ; this needs to run in a separate thread, because the <!! operation blocks forever
                (when (a/<!! cancel-chan)
                  (log/debug "Received a message on the cancel channel; attempting to stop the BigQuery query execution")
                  (reset! cancel-requested? true) ; signal the page iteration fn to stop
                  (if-not (or (future-cancelled? res-fut) (future-done? res-fut))
                    ;; somehow, even the FIRST page hasn't come back yet (i.e. the .query call above), so cancel the future to
                    ;; interrupt the thread waiting on that response to come back
                    ;; unfortunately, with this particular overload of .query, we have no access to (nor the ability to control)
                    ;; the jobId, so we have no way to use the BigQuery client to cancel any job that might be running
                    (future-cancel res-fut)
                    (when (future-done? res-fut) ; canceled received after it was finished; may as well return it
                      @res-fut)))))
      ```
      
      The important parts is that we closeover the cancel-chan and start a
      thread that is trying to pull from it. If we never put anything on that
      channel or never close that channel this thread will just remain blocked
      waiting.
      
      In a thread dump, that looks like the following:
      (note, I've made it set the thread names to make it easy to follow run a
      query 30 times)
      
      ```
      Full thread dump OpenJDK 64-Bit Server VM (21.0.2+13-LTS mixed mode):
      ...
      
      "bigquery-cancel-thread" #73 [95747] prio=5 os_prio=31 cpu=2.76ms elapsed=208.53s tid=0x000000029e00f600 nid=95747 waiting on condition  [0x00000002c1266000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #108 [93211] prio=5 os_prio=31 cpu=154.60ms elapsed=128.72s tid=0x000000029f0dc200 nid=93211 waiting on condition  [0x000000029fa06000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #116 [116527] prio=5 os_prio=31 cpu=0.16ms elapsed=61.14s tid=0x00000002c1917c00 nid=116527 waiting on condition  [0x000000029fc12000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #118 [98315] prio=5 os_prio=31 cpu=161.69ms elapsed=60.27s tid=0x00000002b91cca00 nid=98315 waiting on condition  [0x000000029fe1e000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #119 [121607] prio=5 os_prio=31 cpu=0.13ms elapsed=59.35s tid=0x00000002b9236400 nid=121607 waiting on condition  [0x00000002a002a000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #120 [99079] prio=5 os_prio=31 cpu=0.13ms elapsed=58.55s tid=0x000000029e295600 nid=99079 waiting on condition  [0x00000002a0506000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #121 [122375] prio=5 os_prio=31 cpu=0.10ms elapsed=57.75s tid=0x00000002b921f400 nid=122375 waiting on condition  [0x00000002c0e4e000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #122 [123655] prio=5 os_prio=31 cpu=0.18ms elapsed=56.97s tid=0x0000000134bfa000 nid=123655 waiting on condition  [0x00000002c105a000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #123 [122631] prio=5 os_prio=31 cpu=0.11ms elapsed=56.28s tid=0x000000013569be00 nid=122631 waiting on condition  [0x00000002c1472000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #124 [96007] prio=5 os_prio=31 cpu=0.10ms elapsed=55.42s tid=0x0000000133b54e00 nid=96007 waiting on condition  [0x00000002c167e000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #125 [97799] prio=5 os_prio=31 cpu=0.09ms elapsed=54.73s tid=0x0000000131c19a00 nid=97799 waiting on condition  [0x00000002c2206000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #126 [96519] prio=5 os_prio=31 cpu=0.10ms elapsed=53.97s tid=0x000000012370f600 nid=96519 waiting on condition  [0x00000002c2412000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #127 [124167] prio=5 os_prio=31 cpu=0.10ms elapsed=53.33s tid=0x0000000134312200 nid=124167 waiting on condition  [0x00000002c28ea000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #128 [97287] prio=5 os_prio=31 cpu=0.12ms elapsed=52.57s tid=0x00000001233ae000 nid=97287 waiting on condition  [0x00000002c4706000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #129 [104715] prio=5 os_prio=31 cpu=0.10ms elapsed=51.83s tid=0x0000000131e47a00 nid=104715 waiting on condition  [0x00000002c4912000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #130 [102919] prio=5 os_prio=31 cpu=0.08ms elapsed=51.10s tid=0x0000000123540200 nid=102919 waiting on condition  [0x00000002c4b1e000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #131 [117263] prio=5 os_prio=31 cpu=0.17ms elapsed=50.37s tid=0x00000001233d1200 nid=117263 waiting on condition  [0x00000002c4d2a000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #132 [114951] prio=5 os_prio=31 cpu=0.09ms elapsed=49.71s tid=0x000000013484dc00 nid=114951 waiting on condition  [0x00000002c4f36000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #133 [103687] prio=5 os_prio=31 cpu=0.14ms elapsed=49.08s tid=0x0000000123517200 nid=103687 waiting on condition  [0x00000002c5142000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #134 [91419] prio=5 os_prio=31 cpu=0.08ms elapsed=48.38s tid=0x0000000131b43600 nid=91419 waiting on condition  [0x00000002c534e000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #135 [127495] prio=5 os_prio=31 cpu=0.14ms elapsed=47.66s tid=0x0000000131ce7000 nid=127495 waiting on condition  [0x00000002c555a000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #139 [103955] daemon prio=5 os_prio=31 cpu=0.12ms elapsed=23.22s tid=0x0000000121aaca00 nid=103955 waiting on condition  [0x00000002c5766000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #140 [105219] daemon prio=5 os_prio=31 cpu=0.10ms elapsed=22.54s tid=0x0000000133f47600 nid=105219 waiting on condition  [0x00000002c5972000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #141 [105731] daemon prio=5 os_prio=31 cpu=0.15ms elapsed=21.69s tid=0x00000001353ba800 nid=105731 waiting on condition  [0x00000002c5b7e000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #142 [114435] daemon prio=5 os_prio=31 cpu=0.08ms elapsed=20.89s tid=0x0000000131e76400 nid=114435 waiting on condition  [0x00000002c5d8a000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #143 [106499] daemon prio=5 os_prio=31 cpu=0.10ms elapsed=20.12s tid=0x0000000123107e00 nid=106499 waiting on condition  [0x00000002c5f96000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #144 [113923] daemon prio=5 os_prio=31 cpu=0.11ms elapsed=19.52s tid=0x00000002c1918400 nid=113923 waiting on condition  [0x00000002c61a2000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #145 [107011] daemon prio=5 os_prio=31 cpu=0.13ms elapsed=18.93s tid=0x00000002a6ad3e00 nid=107011 waiting on condition  [0x00000002c63ae000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #146 [113667] daemon prio=5 os_prio=31 cpu=0.12ms elapsed=18.37s tid=0x00000001341a2800 nid=113667 waiting on condition  [0x00000002c65ba000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #147 [113155] daemon prio=5 os_prio=31 cpu=0.16ms elapsed=17.74s tid=0x000000013445da00 nid=113155 waiting on condition  [0x00000002c67c6000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #148 [107779] daemon prio=5 os_prio=31 cpu=0.13ms elapsed=17.19s tid=0x0000000134704e00 nid=107779 waiting on condition  [0x00000002c69d2000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #149 [112643] daemon prio=5 os_prio=31 cpu=0.17ms elapsed=16.54s tid=0x000000011f185000 nid=112643 waiting on condition  [0x00000002c6bde000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #150 [108035] daemon prio=5 os_prio=31 cpu=0.24ms elapsed=15.95s tid=0x000000011f1be200 nid=108035 waiting on condition  [0x00000002c6dea000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #151 [108547] daemon prio=5 os_prio=31 cpu=0.11ms elapsed=15.38s tid=0x00000001302cf800 nid=108547 waiting on condition  [0x00000002c6ff6000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #152 [109059] daemon prio=5 os_prio=31 cpu=0.24ms elapsed=14.85s tid=0x0000000133c4be00 nid=109059 waiting on condition  [0x00000002c7202000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #153 [109315] daemon prio=5 os_prio=31 cpu=0.10ms elapsed=14.15s tid=0x00000001339dc000 nid=109315 waiting on condition  [0x00000002c740e000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #154 [109571] daemon prio=5 os_prio=31 cpu=0.17ms elapsed=13.57s tid=0x0000000134ea3600 nid=109571 waiting on condition  [0x00000002c761a000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #155 [110083] daemon prio=5 os_prio=31 cpu=0.20ms elapsed=12.95s tid=0x0000000121d7aa00 nid=110083 waiting on condition  [0x00000002c7826000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #156 [110339] daemon prio=5 os_prio=31 cpu=0.12ms elapsed=12.35s tid=0x0000000134ff5c00 nid=110339 waiting on condition  [0x00000002c7a32000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #157 [110595] daemon prio=5 os_prio=31 cpu=0.09ms elapsed=11.65s tid=0x0000000133e27000 nid=110595 waiting on condition  [0x00000002c7c3e000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #158 [111107] daemon prio=5 os_prio=31 cpu=0.13ms elapsed=11.04s tid=0x0000000131e8b800 nid=111107 waiting on condition  [0x00000002c7e4a000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #159 [174339] daemon prio=5 os_prio=31 cpu=0.13ms elapsed=10.51s tid=0x0000000134ff6400 nid=174339 waiting on condition  [0x00000002c8056000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #160 [173827] daemon prio=5 os_prio=31 cpu=0.12ms elapsed=9.79s tid=0x0000000133ff7c00 nid=173827 waiting on condition  [0x00000002c8262000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #161 [173315] daemon prio=5 os_prio=31 cpu=0.11ms elapsed=9.18s tid=0x0000000122c9b600 nid=173315 waiting on condition  [0x00000002c846e000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #162 [131331] daemon prio=5 os_prio=31 cpu=0.12ms elapsed=8.58s tid=0x0000000133b9c400 nid=131331 waiting on condition  [0x00000002c867a000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #163 [172547] daemon prio=5 os_prio=31 cpu=0.14ms elapsed=8.06s tid=0x0000000134d4cc00 nid=172547 waiting on condition  [0x00000002c8886000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #164 [131587] daemon prio=5 os_prio=31 cpu=0.15ms elapsed=7.35s tid=0x000000029e17f800 nid=131587 waiting on condition  [0x00000002c8a92000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #165 [131843] daemon prio=5 os_prio=31 cpu=0.16ms elapsed=6.71s tid=0x00000002b9513a00 nid=131843 waiting on condition  [0x00000002c8c9e000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #166 [132355] daemon prio=5 os_prio=31 cpu=0.10ms elapsed=6.12s tid=0x0000000122dede00 nid=132355 waiting on condition  [0x00000002c8eaa000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      --
      "bigquery-cancel-thread" #167 [132867] daemon prio=5 os_prio=31 cpu=0.11ms elapsed=5.54s tid=0x00000001233b1000 nid=132867 waiting on condition  [0x00000002c90b6000]
         java.lang.Thread.State: WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
      
      ```
      
      Simple repro:
      
      ```clojure
      bigquery-profile=> (require '[clojure.core.async :as a])
      nil
      bigquery-profile=> (let [c (a/chan)
                               t (Thread. (fn []
                                            (when (a/<!! c)
                                              (println "got something")))
                                          "Forever-running")]
                           (.start t)
                           (a/close! c)
                           (Thread/sleep 4000)
                           (let [thread-names (into #{} (map ^[] Thread/.getName) (.keySet (Thread/getAllStackTraces)))]
                             (contains? thread-names "Forever-running")))
      false
      bigquery-profile=> (let [c (a/chan)
                               t (Thread. (fn []
                                            (when (a/<!! c)
                                              (println "got something")))
                                          "Forever-running")]
                           (.start t)
                           #_(a/close! c) ;; if we don't close it will never stop running
                           (Thread/sleep 4000)
                           (let [thread-names (into #{} (map ^[] Thread/.getName) (.keySet (Thread/getAllStackTraces)))]
                             (contains? thread-names "Forever-running")))
      true
      ```
      
      So this PR just adds a transducer around the rf that closes the
      `cancel-chan` if it's open. Note that multiple closes are fine: from the
      docstring: "Closing a closed channel is a no-op".
      9fb1bd7d
Loading