From a15fc4eacb80d8cd7b47397e95dc5aa16f62f4eb Mon Sep 17 00:00:00 2001
From: dpsutton <dan@dpsutton.com>
Date: Tue, 31 May 2022 12:42:08 -0500
Subject: [PATCH] Fix deadlock in pivot table connection management (#22981)

Addresses part of https://github.com/metabase/metabase/issues/8679

Pivot tables can have subqueries that run to create tallies. We do not
hold the entirety of resultsets in memory so we have a bit of an
inversion of control flow: connections are opened, queries run, and
result sets are transduced and then the connection closed.

The error here was that subsequent queries for the pivot were run while
the first connection is still held open. But the connection is no longer
needed. But enough pivots running at the same time in a dashboard can
create a deadlock where the subqueries need a new connection, but the
main queries cannot be released until the subqueries have completed.

Also, rf management is critical. It's completion arity must be called
once and only once. We also have middleware that need to be
composed (format, etc) and others that can only be composed
once (limit). We have to save the original reducing function before
composition (this is the one that can write to the download writer, etc)
but compose it each time we use it with `(rff metadata)` so we have the
format and other middleware. Keeping this distinction in mind will save
you lots of time. (The limit query will ignore all subsequent rows if
you just grab the output of `(rff metadata)` and not the rf returned
from the `:rff` key on the context.

But this takes the following connection management:

```
tap> "OPENING CONNECTION 0"
tap> "already open: "
  tap> "OPENING CONNECTION 1"
  tap> "already open: 0"
  tap> "CLOSING CONNECTION 1"
  tap> "OPENING CONNECTION 2"
  tap> "already open: 0"
  tap> "CLOSING CONNECTION 2"
  tap> "OPENING CONNECTION 3"
  tap> "already open: 0"
  tap> "CLOSING CONNECTION 3"
tap> "CLOSING CONNECTION 0"
```

and properly sequences it so that connection 0 is closed before opening
connection 1.

It hijacks the executef to just pass that function into the reducef part
so we can reduce multiple times and therefore control the
connections. Otherwise the reducef happens "inside" of the executef at
which point the connection is closed.

Care is taken to ensure that:
- the init is only called once (subsequent queries have the init of the
rf overridden to just return `init` (the acc passed in) rather than
`(rf)`
- the completion arity is only called once (use of `(completing rf)` and
the reducing function in the subsequent queries is just `([acc] acc)`
and does not call `(rf acc)`. Remember this is just on the lower
reducing function and all of the takes, formats, etc _above_ it will
have the completion arity called because we are using transduce. The
completion arity is what takes the volatile rows and row counts and
actually nests them in the `{:data {:rows []}` structure. Without
calling that once (and ONLY once) you end up with no actual
results. they are just in memory.
---
 .dir-locals.el                                |  1 +
 .../query_processor/context/default.clj       |  4 +-
 src/metabase/query_processor/pivot.clj        | 45 ++++++++++++++-----
 3 files changed, 38 insertions(+), 12 deletions(-)

diff --git a/.dir-locals.el b/.dir-locals.el
index 8588f3ab913..ed13994446a 100644
--- a/.dir-locals.el
+++ b/.dir-locals.el
@@ -52,6 +52,7 @@
   (eval . (put-clojure-indent 'mt/test-driver 1))
   (eval . (put-clojure-indent 'prop/for-all 1))
   (eval . (put-clojure-indent 'qp.streaming/streaming-response 1))
+  (eval . (put-clojure-indent 'u/prog1 1))
   (eval . (put-clojure-indent 'u/select-keys-when 1))
   (eval . (put-clojure-indent 'u/strict-extend 1))
   ;; these ones have to be done with `define-clojure-indent' for now because of upstream bug
diff --git a/src/metabase/query_processor/context/default.clj b/src/metabase/query_processor/context/default.clj
index cd077fa7537..b100a6eb93f 100644
--- a/src/metabase/query_processor/context/default.clj
+++ b/src/metabase/query_processor/context/default.clj
@@ -62,10 +62,10 @@
                                                    context)))]
       (qp.context/reducedf reduced-rows context))))
 
-(defn- default-runf [query rf context]
+(defn- default-runf [query rff context]
   (try
     (qp.context/executef driver/*driver* query context (fn respond* [metadata reducible-rows]
-                                                         (qp.context/reducef rf context metadata reducible-rows)))
+                                                         (qp.context/reducef rff context metadata reducible-rows)))
     (catch Throwable e
       (qp.context/raisef e context))))
 
diff --git a/src/metabase/query_processor/pivot.clj b/src/metabase/query_processor/pivot.clj
index 4bb8732724a..35364c1b666 100644
--- a/src/metabase/query_processor/pivot.clj
+++ b/src/metabase/query_processor/pivot.clj
@@ -146,7 +146,7 @@
 
 (defn- process-queries-append-results
   "Reduce the results of a sequence of `queries` using `rf` and initial value `init`."
-  [queries rf init info context]
+  [init queries rf info context]
   (reduce
    (fn [acc query]
      (process-query-append-results query rf acc info (assoc context
@@ -157,15 +157,40 @@
 (defn- append-queries-context
   "Update Query Processor `context` so it appends the rows fetched when running `more-queries`."
   [info context more-queries]
-  (cond-> context
-    (seq more-queries)
-    (update :rff (fn [rff]
-                   (fn [metadata]
-                     (let [rf (rff metadata)]
-                       (fn
-                         ([]        (rf))
-                         ([acc]     (rf (process-queries-append-results more-queries rf acc info context)))
-                         ([acc row] (rf acc row)))))))))
+  (let [vrf (volatile! nil)]
+    (cond-> context
+      (seq more-queries)
+      (->  (update :rff (fn [rff]
+                          (fn [metadata]
+                            (u/prog1 (rff metadata)
+                              ;; this captures the reducing function before composed with limit and other middleware
+                              (vreset! vrf <>)))))
+           (update :executef
+                   (fn [orig]
+                     ;; execute holds open a connection from [[execute-reducible-query]] so we need to manage
+                     ;; connections in the reducing part reducef. The default runf is what orchestrates this together
+                     ;; and we just pass the original executef to the reducing part so we can control our multiple
+                     ;; connections.
+                     (fn multiple-executef [driver query _context respond]
+                       (respond [orig driver] query))))
+           (assoc :reducef
+                  ;; signature usually has metadata in place of driver but we are hijacking
+                  (fn multiple-reducing [rff context [orig-executef driver] query]
+                    (let [respond     (fn [metadata reducible-rows]
+                                        (let [rf (rff metadata)]
+                                          (assert (fn? rf))
+                                          (try
+                                            (transduce identity (completing rf) reducible-rows)
+                                            (catch Throwable e
+                                              (qp.context/raisef (ex-info (tru "Error reducing result rows")
+                                                                          {:type qp.error-type/qp}
+                                                                          e)
+                                                                 context)))))
+                          acc         (-> (orig-executef driver query context respond)
+                                          (process-queries-append-results
+                                           more-queries @vrf info context))]
+                      ;; completion arity can't be threaded because the value is derefed too early
+                      (qp.context/reducedf (@vrf acc) context))))))))
 
 (defn process-multiple-queries
   "Allows the query processor to handle multiple queries, stitched together to appear as one"
-- 
GitLab