Skip to content
Snippets Groups Projects
Unverified Commit a15fc4ea authored by dpsutton's avatar dpsutton Committed by GitHub
Browse files

Fix deadlock in pivot table connection management (#22981)

Addresses part of https://github.com/metabase/metabase/issues/8679

Pivot tables can have subqueries that run to create tallies. We do not
hold the entirety of resultsets in memory so we have a bit of an
inversion of control flow: connections are opened, queries run, and
result sets are transduced and then the connection closed.

The error here was that subsequent queries for the pivot were run while
the first connection is still held open. But the connection is no longer
needed. But enough pivots running at the same time in a dashboard can
create a deadlock where the subqueries need a new connection, but the
main queries cannot be released until the subqueries have completed.

Also, rf management is critical. It's completion arity must be called
once and only once. We also have middleware that need to be
composed (format, etc) and others that can only be composed
once (limit). We have to save the original reducing function before
composition (this is the one that can write to the download writer, etc)
but compose it each time we use it with `(rff metadata)` so we have the
format and other middleware. Keeping this distinction in mind will save
you lots of time. (The limit query will ignore all subsequent rows if
you just grab the output of `(rff metadata)` and not the rf returned
from the `:rff` key on the context.

But this takes the following connection management:

```
tap> "OPENING CONNECTION 0"
tap> "already open: "
  tap> "OPENING CONNECTION 1"
  tap> "already open: 0"
  tap> "CLOSING CONNECTION 1"
  tap> "OPENING CONNECTION 2"
  tap> "already open: 0"
  tap> "CLOSING CONNECTION 2"
  tap> "OPENING CONNECTION 3"
  tap> "already open: 0"
  tap> "CLOSING CONNECTION 3"
tap> "CLOSING CONNECTION 0"
```

and properly sequences it so that connection 0 is closed before opening
connection 1.

It hijacks the executef to just pass that function into the reducef part
so we can reduce multiple times and therefore control the
connections. Otherwise the reducef happens "inside" of the executef at
which point the connection is closed.

Care is taken to ensure that:
- the init is only called once (subsequent queries have the init of the
rf overridden to just return `init` (the acc passed in) rather than
`(rf)`
- the completion arity is only called once (use of `(completing rf)` and
the reducing function in the subsequent queries is just `([acc] acc)`
and does not call `(rf acc)`. Remember this is just on the lower
reducing function and all of the takes, formats, etc _above_ it will
have the completion arity called because we are using transduce. The
completion arity is what takes the volatile rows and row counts and
actually nests them in the `{:data {:rows []}` structure. Without
calling that once (and ONLY once) you end up with no actual
results. they are just in memory.
parent e8ba7d53
No related branches found
No related tags found
No related merge requests found
......@@ -52,6 +52,7 @@
(eval . (put-clojure-indent 'mt/test-driver 1))
(eval . (put-clojure-indent 'prop/for-all 1))
(eval . (put-clojure-indent 'qp.streaming/streaming-response 1))
(eval . (put-clojure-indent 'u/prog1 1))
(eval . (put-clojure-indent 'u/select-keys-when 1))
(eval . (put-clojure-indent 'u/strict-extend 1))
;; these ones have to be done with `define-clojure-indent' for now because of upstream bug
......
......@@ -62,10 +62,10 @@
context)))]
(qp.context/reducedf reduced-rows context))))
(defn- default-runf [query rf context]
(defn- default-runf [query rff context]
(try
(qp.context/executef driver/*driver* query context (fn respond* [metadata reducible-rows]
(qp.context/reducef rf context metadata reducible-rows)))
(qp.context/reducef rff context metadata reducible-rows)))
(catch Throwable e
(qp.context/raisef e context))))
......
......@@ -146,7 +146,7 @@
(defn- process-queries-append-results
"Reduce the results of a sequence of `queries` using `rf` and initial value `init`."
[queries rf init info context]
[init queries rf info context]
(reduce
(fn [acc query]
(process-query-append-results query rf acc info (assoc context
......@@ -157,15 +157,40 @@
(defn- append-queries-context
"Update Query Processor `context` so it appends the rows fetched when running `more-queries`."
[info context more-queries]
(cond-> context
(seq more-queries)
(update :rff (fn [rff]
(fn [metadata]
(let [rf (rff metadata)]
(fn
([] (rf))
([acc] (rf (process-queries-append-results more-queries rf acc info context)))
([acc row] (rf acc row)))))))))
(let [vrf (volatile! nil)]
(cond-> context
(seq more-queries)
(-> (update :rff (fn [rff]
(fn [metadata]
(u/prog1 (rff metadata)
;; this captures the reducing function before composed with limit and other middleware
(vreset! vrf <>)))))
(update :executef
(fn [orig]
;; execute holds open a connection from [[execute-reducible-query]] so we need to manage
;; connections in the reducing part reducef. The default runf is what orchestrates this together
;; and we just pass the original executef to the reducing part so we can control our multiple
;; connections.
(fn multiple-executef [driver query _context respond]
(respond [orig driver] query))))
(assoc :reducef
;; signature usually has metadata in place of driver but we are hijacking
(fn multiple-reducing [rff context [orig-executef driver] query]
(let [respond (fn [metadata reducible-rows]
(let [rf (rff metadata)]
(assert (fn? rf))
(try
(transduce identity (completing rf) reducible-rows)
(catch Throwable e
(qp.context/raisef (ex-info (tru "Error reducing result rows")
{:type qp.error-type/qp}
e)
context)))))
acc (-> (orig-executef driver query context respond)
(process-queries-append-results
more-queries @vrf info context))]
;; completion arity can't be threaded because the value is derefed too early
(qp.context/reducedf (@vrf acc) context))))))))
(defn process-multiple-queries
"Allows the query processor to handle multiple queries, stitched together to appear as one"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment