Skip to content
Snippets Groups Projects
Unverified Commit 37584db6 authored by Cam Saul's avatar Cam Saul Committed by GitHub
Browse files

Merge pull request #9927 from metabase/restore-dynamic-var-bindings-in-async-wait

Properly clone/restore thread binding frames in async QP
parents b294fe04 f6cf82df
No related branches found
No related tags found
No related merge requests found
......@@ -13,7 +13,8 @@
[metabase.util :as u]
[metabase.util.i18n :refer [trs]]
[schema.core :as s])
(:import [java.util.concurrent Executors ExecutorService]))
(:import clojure.lang.Var
[java.util.concurrent Executors ExecutorService]))
(defsetting max-simultaneous-queries-per-db
(trs "Maximum number of simultaneous queries to allow per connected Database.")
......@@ -60,12 +61,16 @@
(def ^:private ^:dynamic *already-in-thread-pool?* false)
(defn- runnable ^Runnable [qp query respond raise canceled-chan]
(fn []
(binding [*already-in-thread-pool?* true]
(try
(qp query respond raise canceled-chan)
(catch Throwable e
(raise e))))))
;; stash & restore bound dynamic vars. This is how Clojure does it for futures and the like in `binding-conveyor-fn`
;; (see source for `future` or `future-call`) which is unfortunately private
(let [frame (Var/cloneThreadBindingFrame)]
(^:once fn* []
(Var/resetThreadBindingFrame frame)
(binding [*already-in-thread-pool?* true]
(try
(qp query respond raise canceled-chan)
(catch Throwable e
(raise e)))))))
(defn- run-in-thread-pool [qp {database-id :database, :as query} respond raise canceled-chan]
(try
......
......@@ -9,16 +9,23 @@
[medley.core :as m]
[metabase.models
[card :refer [Card]]
[database :as database]
[query-execution :refer [QueryExecution]]]
[database :as database :refer [Database]]
[field :refer [Field]]
[permissions :as perms]
[permissions-group :as group]
[query-execution :refer [QueryExecution]]
[table :refer [Table]]]
[metabase.query-processor.middleware.constraints :as constraints]
[metabase.test.data :as data]
[metabase.test
[data :as data]
[util :as tu]]
[metabase.test.data
[dataset-definitions :as defs]
[datasets :refer [expect-with-driver]]
[users :as test-users]]
[metabase.test.util.log :as tu.log]
[metabase.util :as u]
[schema.core :as s]
[toucan.db :as db]
[toucan.util.test :as tt])
(:import com.fasterxml.jackson.core.JsonGenerator))
......@@ -263,3 +270,20 @@
:type :query
:query {:source-table (data/id :venues)}})]
(or row-count result))))
;; make sure `POST /dataset` calls check user permissions
(tu/expect-schema
{:status (s/eq "failed")
:error (s/eq "You do not have permissions to run this query.")
s/Keyword s/Any}
(tt/with-temp* [Database [{db-id :id} (select-keys (data/db) [:engine :details])]
Table [{table-id :id} {:db_id db-id, :name "VENUES"}]
Field [_ {:table_id table-id, :name "ID"}]]
;; give all-users *partial* permissions for the DB, so we know we're checking more than just read permissions for
;; the Database
(perms/revoke-permissions! (group/all-users) db-id)
(perms/grant-permissions! (group/all-users) db-id "schema_that_does_not_exist")
((test-users/user->client :rasta) :post "dataset"
{:database db-id
:type :query
:query {:source-table table-id, :limit 1}})))
(ns metabase.query-processor.middleware.async-wait-test
(:require [clojure.core.async :as a]
[expectations :refer [expect]]
[metabase.models.database :refer [Database]]
[metabase.query-processor.middleware.async-wait :as async-wait]
[metabase.test.util.async :as tu.async]
[metabase.util :as u]
[toucan.util.test :as tt])
(:import java.util.concurrent.Executors))
(def ^:private ^:dynamic *dynamic-var* false)
(defn- async-wait-bound-value
"Check the bound value of `*dynamic-var*` when a function is executed after the `async-wait` using the thread pool for
Database with `db-id`."
[db-id]
(let [bound-value (promise)]
(tu.async/with-open-channels [canceled-chan (a/promise-chan)]
((async-wait/wait-for-turn
(fn [& _]
(deliver bound-value *dynamic-var*)))
{:database db-id}
identity
identity
canceled-chan))
(u/deref-with-timeout bound-value 1000)))
;; basic sanity check: bound value of `*dynamic-var*` should be `false`
(expect
false
(tt/with-temp Database [{db-id :id}]
(async-wait-bound-value db-id)))
;; bound dynamic vars should get re-bound by in the async wait fn
(expect
::wow
(tt/with-temp Database [{db-id :id}]
(binding [*dynamic-var* ::wow]
(async-wait-bound-value db-id))))
;; binding should not be persisted between executions -- should be reset when we reuse a thread
(expect
false
(let [thread-pool (Executors/newSingleThreadExecutor)]
(with-redefs [async-wait/db-thread-pool (constantly thread-pool)]
(tt/with-temp Database [{db-id :id}]
(binding [*dynamic-var* true]
(async-wait-bound-value db-id))
(async-wait-bound-value db-id)))))
(ns metabase.query-processor.middleware.permissions-test
"Tests for the middleware that checks whether the current user has permissions to run a given query."
(:require [expectations :refer :all]
[metabase
[query-processor :as qp]
[util :as u]]
[metabase.api.common :as api]
[metabase.models
[database :refer [Database]]
[permissions :as perms]
[permissions-group :as perms-group]
[table :refer [Table]]]
[metabase.query-processor.middleware.permissions :refer [check-query-permissions]]
[metabase.test
[data :as data]
[util :as tu]]
[metabase.test.data.users :as users]
[metabase.util :as u]
[schema.core :as s]
[toucan.util.test :as tt]))
(def ^:private ^{:arglists '([query]), :style/indent 0} check-perms (check-query-permissions identity))
(defn- do-with-rasta
"Call F with rasta as the current user."
"Call `f` with Rasta as the current user."
[f]
(users/do-with-test-user :rasta f))
(defn- check-perms-for-rasta
"Check permissions for QUERY with rasta as the current user."
"Check permissions for `query` with rasta as the current user."
{:style/indent 0}
[query]
(do-with-rasta (fn [] (check-perms query))))
......@@ -113,3 +120,19 @@
{:database (u/get-id db)
:type :query
:query {:source-query {:source-table (u/get-id table)}}}))
;; Make sure it works end-to-end: make sure bound `*current-user-id*` and `*current-user-permissions-set*` are used to
;; permissions check queries
(tu/expect-schema
{:status (s/eq :failed)
:class (s/eq Exception)
:error (s/eq "You do not have permissions to run this query.")
s/Keyword s/Any}
(binding [api/*current-user-id* (users/user->id :rasta)
api/*current-user-permissions-set* (delay #{})]
(qp/process-query
{:database (data/id)
:type :query
:query {:source-table (data/id :venues)
:limit 1}})))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment