From f8349b5122fdf542ae530f6da1eb9df1f03dc777 Mon Sep 17 00:00:00 2001 From: Cam Saul <1455846+camsaul@users.noreply.github.com> Date: Wed, 4 Mar 2020 15:25:03 -0800 Subject: [PATCH] StreamingResponse async tweaks (#12049) * Upgrade Jetty version * Remove async-wait middleware * Move re= test macro * Add mt/user->credentials alias * Convert metabase.server-test to new style * Rework dataset-test to use metabase.test * Use atom instead of agent for tracking in-flight queries * Streaming QP context should correctly pass the canceled-chan created by StreamingResponse * Save query executions synchronously * i18n the query logs * Reworked StreamingResponse * Almost working :interrobang: * Test fixes * Test impl & some perf improvements * Test/lint fix :wrench: --- project.clj | 2 +- src/metabase/async/streaming_response.clj | 217 ++++++---- .../async/streaming_response/thread_pool.clj | 31 ++ src/metabase/middleware/log.clj | 17 +- src/metabase/middleware/misc.clj | 5 +- src/metabase/models/database.clj | 10 - src/metabase/query_processor.clj | 9 +- .../query_processor/context/default.clj | 4 +- .../query_processor/middleware/async.clj | 21 - .../query_processor/middleware/async_wait.clj | 109 ----- .../middleware/process_userland_query.clj | 54 +-- src/metabase/query_processor/reducible.clj | 2 +- src/metabase/query_processor/streaming.clj | 28 +- test/metabase/api/dataset_test.clj | 71 ++-- test/metabase/api/embed_test.clj | 7 +- test/metabase/api/public_test.clj | 376 +++++++++--------- test/metabase/api/pulse_test.clj | 60 +-- .../async/streaming_response_test.clj | 123 ++++++ test/metabase/http_client.clj | 2 +- .../middleware/async_wait_test.clj | 55 --- .../process_userland_query_test.clj | 6 +- .../query_processor/streaming_test.clj | 9 +- test/metabase/query_processor_test.clj | 3 +- .../query_to_native_test.clj | 10 - test/metabase/server_test.clj | 36 +- test/metabase/test.clj | 1 + test/metabase/test/redefs.clj | 15 +- test/metabase/test/util.clj | 36 +- 28 files changed, 650 insertions(+), 669 deletions(-) create mode 100644 src/metabase/async/streaming_response/thread_pool.clj delete mode 100644 src/metabase/query_processor/middleware/async.clj delete mode 100644 src/metabase/query_processor/middleware/async_wait.clj create mode 100644 test/metabase/async/streaming_response_test.clj delete mode 100644 test/metabase/query_processor/middleware/async_wait_test.clj diff --git a/project.clj b/project.clj index 5167b27a96d..838a434a3d6 100644 --- a/project.clj +++ b/project.clj @@ -113,7 +113,7 @@ [net.sf.cssbox/cssbox "4.12" :exclusions [org.slf4j/slf4j-api]] ; HTML / CSS rendering [org.apache.commons/commons-lang3 "3.9"] ; helper methods for working with java.lang stuff [org.clojars.pntblnk/clj-ldap "0.0.16"] ; LDAP client - [org.eclipse.jetty/jetty-server "9.4.15.v20190215"] ; We require JDK 8 which allows us to run Jetty 9.4, ring-jetty-adapter runs on 1.7 which forces an older version + [org.eclipse.jetty/jetty-server "9.4.27.v20200227"] ; We require JDK 8 which allows us to run Jetty 9.4, ring-jetty-adapter runs on 1.7 which forces an older version [org.flatland/ordered "1.5.7"] ; ordered maps & sets [org.liquibase/liquibase-core "3.6.3" ; migration management (Java lib) :exclusions [ch.qos.logback/logback-classic]] diff --git a/src/metabase/async/streaming_response.clj b/src/metabase/async/streaming_response.clj index b705fbae1a9..5600de84a4f 100644 --- a/src/metabase/async/streaming_response.clj +++ b/src/metabase/async/streaming_response.clj @@ -1,8 +1,11 @@ (ns metabase.async.streaming-response (:require [cheshire.core :as json] [clojure.core.async :as a] + [clojure.tools.logging :as log] compojure.response + [metabase.async.streaming-response.thread-pool :as thread-pool] [metabase.util :as u] + [metabase.util.i18n :refer [trs]] [potemkin.types :as p.types] [pretty.core :as pretty] [ring.core.protocols :as ring.protocols] @@ -15,118 +18,176 @@ (def ^:private keepalive-interval-ms "Interval between sending newline characters to keep Heroku from terminating requests like queries that take a long time to complete." - (u/seconds->ms 1)) ; one second + (u/seconds->ms 1)) + +(defn- write-to-output-stream! + ([^OutputStream os x] + (if (int? x) + (.write os ^int x) + (.write os ^bytes x))) + + ([^OutputStream os ^bytes ba ^Integer offset ^Integer len] + (.write os ba offset len))) (defn- jetty-eof-canceling-output-stream "Wraps an `OutputStream` and sends a message to `canceled-chan` if a jetty `EofException` is thrown when writing to the stream." ^OutputStream [^OutputStream os canceled-chan] (proxy [FilterOutputStream] [os] + (flush [] + (try + (.flush os) + (catch EofException e + (log/trace "Caught EofException") + (a/>!! canceled-chan ::cancel) + (throw e)))) (write ([x] (try - (if (int? x) - (.write os ^int x) - (.write os ^bytes x)) + (write-to-output-stream! os x) (catch EofException e + (log/trace "Caught EofException") (a/>!! canceled-chan ::cancel) (throw e)))) - ([^bytes ba ^Integer off ^Integer len] + ([ba off len] (try - (.write os ba off len) + (write-to-output-stream! os ba off len) (catch EofException e + (log/trace "Caught EofException") (a/>!! canceled-chan ::cancel) (throw e))))))) +(defn- start-keepalive-loop! [^OutputStream os write-keepalive-newlines? continue-writing-newlines?] + (a/go-loop [] + (a/<! (a/timeout keepalive-interval-ms)) + ;; by still attempting to flush even when not writing newlines we can hopefully trigger an EofException if the + ;; request is canceled + (when @continue-writing-newlines? + (when (try + (when write-keepalive-newlines? + (.write os (byte \newline))) + (.flush os) + ::recur + (catch Throwable _ + nil)) + (recur))))) + (defn- keepalive-output-stream "Wraps an `OutputStream` and writes keepalive newline bytes every interval until someone else starts writing to the stream." ^OutputStream [^OutputStream os write-keepalive-newlines?] - (let [write-newlines? (atom true)] - (a/go-loop [] - (a/<! (a/timeout keepalive-interval-ms)) - (when @write-newlines? - (when write-keepalive-newlines? - (.write os (byte \newline))) - (.flush os) - (recur))) + (let [continue-writing-newlines? (atom true)] + (start-keepalive-loop! os write-keepalive-newlines? continue-writing-newlines?) + (proxy [FilterOutputStream] [os] + (close [] + (reset! continue-writing-newlines? false) + (u/ignore-exceptions (.close os))) + (write + ([x] + (reset! continue-writing-newlines? false) + (write-to-output-stream! os x)) + + ([ba off len] + (reset! continue-writing-newlines? false) + (write-to-output-stream! os ba off len)))))) + +(defn- stop-writing-after-close-stream + "Wraps `OutputStreams` and ignores additional calls to `flush` and `write` after it has been closed. (Background: I + think we call flush a few more times than we need to which causes Jetty and the GZIPOutputStream to throw + Exceptions.)" + ^OutputStream [^OutputStream os] + (let [closed? (atom false)] (proxy [FilterOutputStream] [os] (close [] - (reset! write-newlines? false) - (let [^FilterOutputStream this this] - (proxy-super close))) + (when-not @closed? + (reset! closed? true) + (u/ignore-exceptions (.close os)))) + + (flush [] + (when-not @closed? + (.flush os))) + (write ([x] - (reset! write-newlines? false) - (if (int? x) - (.write os ^int x) - (.write os ^bytes x))) - - ([^bytes ba ^Integer off ^Integer len] - (reset! write-newlines? false) - (.write os ba off len)))))) - -(defmacro ^:private with-open-chan [[chan-binding chan] & body] - `(let [chan# ~chan - ~chan-binding chan#] - (try - ~@body - (finally - (a/close! chan#))))) - -;; TODO - this code is basically duplicated with the code in the QP catch-exceptions middleware; we should refactor to -;; remove the duplication -(defn- exception-chain [^Throwable e] - (->> (iterate #(.getCause ^Throwable %) e) - (take-while some?) - reverse)) + (when-not @closed? + (write-to-output-stream! os x))) + + ([ba off len] + (when-not @closed? + (write-to-output-stream! os ba off len))))))) + +(defn- ex-status-code [e] + (or (some #((some-fn :status-code :status) (ex-data %)) + (take-while some? (iterate ex-cause e))) + 500)) (defn- format-exception [e] - (let [format-ex* (fn [^Throwable e] - {:message (.getMessage e) - :class (.getCanonicalName (class e)) - :stacktrace (mapv str (.getStackTrace e)) - :data (ex-data e)}) - [e & more :as chain] (exception-chain e)] - (merge - (format-ex* e) - {:_status (or (some #((some-fn :status-code :status) (ex-data %)) - chain) - 500)} - (when (seq more) - {:via (map format-ex* more)})))) + (assoc (Throwable->map e) :_status (ex-status-code e))) (defn write-error! - "Write an error to the output stream, formatting it nicely." + "Write an error to the output stream, formatting it nicely. Closes output stream afterwards." [^OutputStream os obj] (if (instance? Throwable obj) (recur os (format-exception obj)) - (try - (with-open [writer (BufferedWriter. (OutputStreamWriter. os StandardCharsets/UTF_8))] - (json/generate-stream obj writer) - (.flush writer)) - (catch Throwable _)))) + (with-open [os os] + (log/trace (pr-str (list 'write-error! obj))) + (try + (with-open [writer (BufferedWriter. (OutputStreamWriter. os StandardCharsets/UTF_8))] + (json/generate-stream obj writer)) + (catch EofException _) + (catch Throwable e + (log/error e (trs "Error writing error to output stream") obj)))))) + +(defn- do-f* [f ^OutputStream os finished-chan canceled-chan] + (try + (f os canceled-chan) + (catch EofException _ + (a/>!! canceled-chan ::cancel) + nil) + (catch InterruptedException _ + (a/>!! canceled-chan ::cancel) + nil) + (catch Throwable e + (log/error e (trs "Caught unexpected Exception in streaming response body")) + (write-error! os e) + nil) + (finally + (a/>!! finished-chan (if (a/poll! canceled-chan) + :canceled + :done)) + (a/close! finished-chan) + (a/close! canceled-chan)))) -(defn- write-to-stream! [f {:keys [write-keepalive-newlines? gzip?], :as options} ^OutputStream os finished-chan] +(defn- do-f [f {:keys [gzip? write-keepalive-newlines?], :or {write-keepalive-newlines? true}, :as options} + ^OutputStream os finished-chan canceled-chan] (if gzip? (with-open [gzos (GZIPOutputStream. os true)] - (write-to-stream! f (dissoc options :gzip?) gzos finished-chan)) - (with-open-chan [canceled-chan (a/promise-chan)] - (with-open [os os - os (jetty-eof-canceling-output-stream os canceled-chan) - os (keepalive-output-stream os write-keepalive-newlines?)] - - (try - (f os canceled-chan) - (catch Throwable e - (write-error! os {:message (.getMessage e)})) - (finally - (.flush os) - (a/>!! finished-chan (if (a/poll! canceled-chan) - :canceled - :done)) - (a/close! finished-chan))))))) + (do-f f (assoc options :gzip? false) gzos finished-chan canceled-chan) + (.finish gzos)) + (with-open [os (jetty-eof-canceling-output-stream os canceled-chan) + os (keepalive-output-stream os write-keepalive-newlines?) + os (stop-writing-after-close-stream os)] + (do-f* f os finished-chan canceled-chan) + (.flush os)))) + +(defn- do-f-async [f options ^OutputStream os finished-chan] + (let [canceled-chan (a/promise-chan identity identity) + task (bound-fn [] + (try + (do-f f options os finished-chan canceled-chan) + (finally + (u/ignore-exceptions (.close os))))) + futur (.submit (thread-pool/thread-pool) ^Runnable task)] + (a/go + (when (a/<! canceled-chan) + (log/trace "Canceling async thread") + (future-cancel futur) + (a/>!! finished-chan :canceled) + (a/close! finished-chan) + (a/close! canceled-chan) + (.flush os) + (.close os))))) ;; `ring.middleware.gzip` doesn't work on our StreamingResponse class. (defn- should-gzip-response? @@ -144,7 +205,7 @@ ;; both sync and async responses ring.protocols/StreamableResponseBody (write-body-to-stream [_ _ os] - (write-to-stream! f options os donechan)) + (do-f-async f options os donechan)) ;; sync responses only compojure.response/Renderable @@ -195,4 +256,4 @@ {:pre [(= (count bindings) 2)]} `(->StreamingResponse (fn [~(vary-meta os-binding assoc :tag 'java.io.OutputStream) ~canceled-chan-binding] ~@body) ~options - (a/promise-chan))) + (a/promise-chan identity identity))) diff --git a/src/metabase/async/streaming_response/thread_pool.clj b/src/metabase/async/streaming_response/thread_pool.clj new file mode 100644 index 00000000000..ff089fd92a7 --- /dev/null +++ b/src/metabase/async/streaming_response/thread_pool.clj @@ -0,0 +1,31 @@ +(ns metabase.async.streaming-response.thread-pool + (:require [metabase.config :as config]) + (:import [java.util.concurrent Executors ThreadPoolExecutor] + org.apache.commons.lang3.concurrent.BasicThreadFactory$Builder)) + +(def ^:private ^Long thread-pool-max-size + (or (config/config-int :mb-jetty-maxthreads) 50)) + +(defonce ^:private thread-pool* + (delay + (Executors/newFixedThreadPool thread-pool-max-size + (.build + (doto (BasicThreadFactory$Builder.) + (.namingPattern "streaming-response-thread-pool-%d") + ;; Daemon threads do not block shutdown of the JVM + (.daemon true)))))) + +(defn thread-pool + "Thread pool for asynchronously running streaming responses." + ^ThreadPoolExecutor [] + @thread-pool*) + +(defn active-thread-count + "The number of active streaming response threads." + [] + (.getActiveCount (thread-pool))) + +(defn queued-thread-count + "The number of queued streaming response threads." + [] + (count (.getQueue (thread-pool)))) diff --git a/src/metabase/middleware/log.clj b/src/metabase/middleware/log.clj index fc2d8ba856f..81104092c5f 100644 --- a/src/metabase/middleware/log.clj +++ b/src/metabase/middleware/log.clj @@ -9,8 +9,8 @@ [metabase.async [streaming-response :as streaming-response] [util :as async.u]] + [metabase.async.streaming-response.thread-pool :as streaming-response.thread-pool] [metabase.middleware.util :as middleware.u] - [metabase.query-processor.middleware.async :as qp.middleware.async] [metabase.util.i18n :refer [trs]] [toucan.db :as db]) (:import clojure.core.async.impl.channels.ManyToManyChannel @@ -37,7 +37,7 @@ (str (format "%s %s %d" (str/upper-case (name request-method)) uri status) (when async-status - (format " [ASYNC: %s]" async-status)))) + (format " [%s: %s]" (trs "ASYNC") async-status)))) (defn- format-performance-info [{:keys [start-time call-count-fn] @@ -45,19 +45,22 @@ call-count-fn (constantly -1)}}] (let [elapsed-time (u/format-nanoseconds (- (System/nanoTime) start-time)) db-calls (call-count-fn)] - (format "%s (%d DB calls)" elapsed-time db-calls))) + (trs "{0} ({1} DB calls)" elapsed-time db-calls))) (defn- format-threads-info [{:keys [include-stats?]}] (when include-stats? (str (when-let [^QueuedThreadPool pool (some-> (server/instance) .getThreadPool)] - (format "Jetty threads: %s/%s (%s idle, %s queued) " + (trs "Jetty threads: {0}/{1} ({2} idle, {3} queued) " (.getBusyThreads pool) (.getMaxThreads pool) (.getIdleThreads pool) (.getQueueSize pool))) - (format "(%d total active threads) " (Thread/activeCount)) - (format "Queries in flight: %d" (qp.middleware.async/in-flight))))) + (trs "({0} total active threads)" (Thread/activeCount)) + " " + (trs "Queries in flight: {0}" (streaming-response.thread-pool/active-thread-count)) + " " + (trs "({0} queued)" (streaming-response.thread-pool/queued-thread-count))))) (defn- format-error-info [{{:keys [body]} :response} {:keys [error?]}] (when (and error? @@ -142,7 +145,7 @@ (let [finished-chan (streaming-response/finished-chan streaming-response)] (a/go (let [result (a/<! finished-chan)] - (log-info (assoc info :async-status (if (:canceled result) "canceled" "completed"))))))) + (log-info (assoc info :async-status (if (= result :canceled) "canceled" "completed"))))))) (defn- logged-response "Log an API response. Returns resonse, possibly modified (i.e., core.async channels will be wrapped); this value diff --git a/src/metabase/middleware/misc.clj b/src/metabase/middleware/misc.clj index f86dc4ae3aa..0ab249a67b8 100644 --- a/src/metabase/middleware/misc.clj +++ b/src/metabase/middleware/misc.clj @@ -61,6 +61,9 @@ ;;; ------------------------------------------------------ i18n ------------------------------------------------------ +(def ^:private available-locales + (delay (puppet-i18n/available-locales))) + (defn bind-user-locale "Middleware that binds locale info for the current User. (This is basically a copy of the `puppetlabs.i18n.core/locale-negotiator`, but reworked to handle async-style requests as well.)" @@ -70,7 +73,7 @@ (let [headers (:headers request) parsed (puppet-i18n/parse-http-accept-header (get headers "accept-language")) wanted (mapv first parsed) - negotiated ^java.util.Locale (puppet-i18n/negotiate-locale wanted (puppet-i18n/available-locales))] + negotiated ^java.util.Locale (puppet-i18n/negotiate-locale wanted @available-locales)] (puppet-i18n/with-user-locale negotiated (handler request respond raise))))) diff --git a/src/metabase/models/database.clj b/src/metabase/models/database.clj index a82edbdebd5..0b2f68bf96a 100644 --- a/src/metabase/models/database.clj +++ b/src/metabase/models/database.clj @@ -44,15 +44,6 @@ (catch Throwable e (log/error e (trs "Error unscheduling tasks for DB."))))) -(defn- destroy-qp-thread-pool! - [database] - (try - (classloader/the-classloader) - (classloader/require 'metabase.query-processor.middleware.async-wait) - ((resolve 'metabase.query-processor.middleware.async-wait/destroy-thread-pool!) database) - (catch Throwable e - (log/error e (trs "Error destroying thread pool for DB."))))) - (defn- post-insert [database] (u/prog1 database ;; add this database to the All Users permissions groups @@ -67,7 +58,6 @@ (defn- pre-delete [{id :id, driver :engine, :as database}] (unschedule-tasks! database) - (destroy-qp-thread-pool! database) (db/delete! 'Card :database_id id) (db/delete! 'Permissions :object [:like (str (perms/object-path id) "%")]) (db/delete! 'Table :db_id id) diff --git a/src/metabase/query_processor.clj b/src/metabase/query_processor.clj index c4c54130c3c..7c75d95f553 100644 --- a/src/metabase/query_processor.clj +++ b/src/metabase/query_processor.clj @@ -20,8 +20,6 @@ [add-source-metadata :as add-source-metadata] [add-timezone-info :as add-timezone-info] [annotate :as annotate] - [async :as async] - [async-wait :as async-wait] [auto-bucket-datetimes :as bucket-datetime] [binning :as binning] [cache :as cache] @@ -91,13 +89,11 @@ #'resolve-database-and-driver/resolve-database-and-driver #'fetch-source-query/resolve-card-id-source-tables #'store/initialize-store - #'async-wait/wait-for-turn #'cache/maybe-return-cached-results #'validate/validate-query #'normalize/normalize #'add-rows-truncated/add-rows-truncated - #'results-metadata/record-and-return-metadata! - #'async/count-in-flight-queries]) + #'results-metadata/record-and-return-metadata!]) ;; ▲▲▲ PRE-PROCESSING ▲▲▲ happens from BOTTOM-TO-TOP, e.g. the results of `expand-macros` are passed to ;; `substitute-parameters` @@ -136,8 +132,7 @@ (def ^:private ^:const preprocessing-timeout-ms 10000) (defn- preprocess-query [query context] - (binding [*preprocessing-level* (inc *preprocessing-level*) - async-wait/*disable-async-wait* true] + (binding [*preprocessing-level* (inc *preprocessing-level*)] ;; record the number of recursive preprocesses taking place to prevent infinite preprocessing loops. (log/tracef "*preprocessing-level*: %d" *preprocessing-level*) (when (>= *preprocessing-level* max-preprocessing-level) diff --git a/src/metabase/query_processor/context/default.clj b/src/metabase/query_processor/context/default.clj index de30313a61d..23b41a78f9f 100644 --- a/src/metabase/query_processor/context/default.clj +++ b/src/metabase/query_processor/context/default.clj @@ -122,5 +122,5 @@ :cancelf default-cancelf :timeoutf default-timeoutf :resultf default-resultf - :canceled-chan (a/promise-chan) - :out-chan (a/promise-chan)}) + :canceled-chan (a/promise-chan identity identity) + :out-chan (a/promise-chan identity identity)}) diff --git a/src/metabase/query_processor/middleware/async.clj b/src/metabase/query_processor/middleware/async.clj deleted file mode 100644 index 57af8489238..00000000000 --- a/src/metabase/query_processor/middleware/async.clj +++ /dev/null @@ -1,21 +0,0 @@ -(ns metabase.query-processor.middleware.async - (:require [clojure.core.async :as a] - [metabase.query-processor.context :as context])) - -(def ^:private in-flight* (agent 0)) - -(defn in-flight - "Return the number of queries currently in flight." - [] - @in-flight*) - -(defn count-in-flight-queries - "Middleware that tracks the current number of queries in flight." - [qp] - (fn [query rff context] - (send in-flight* inc) - (let [out-chan (context/out-chan context)] - (a/go - (a/<! out-chan) - (send in-flight* dec))) - (qp query rff context))) diff --git a/src/metabase/query_processor/middleware/async_wait.clj b/src/metabase/query_processor/middleware/async_wait.clj deleted file mode 100644 index 8d46958ae45..00000000000 --- a/src/metabase/query_processor/middleware/async_wait.clj +++ /dev/null @@ -1,109 +0,0 @@ -(ns metabase.query-processor.middleware.async-wait - "Middleware that limits the number of concurrent queries for each database. - - Each connected database is limited to a maximum of 15 simultaneous queries (configurable) using these methods; any - additional queries will park the thread. Super-useful for writing high-performance API endpoints. Prefer these - methods to the old-school synchronous versions. - - How is this achieved? For each Database, we'll maintain a thread pool executor to limit the number of simultaneous - queries." - (:require [clojure.core.async :as a] - [clojure.tools.logging :as log] - [metabase.models.setting :refer [defsetting]] - [metabase.query-processor.context :as context] - [metabase.util :as u] - [metabase.util.i18n :refer [deferred-trs trs]] - [schema.core :as s]) - (:import [java.util.concurrent Executors ExecutorService] - org.apache.commons.lang3.concurrent.BasicThreadFactory$Builder)) - -(defsetting max-simultaneous-queries-per-db - (deferred-trs "Maximum number of simultaneous queries to allow per connected Database.") - :type :integer - :default 15) - -(defonce ^:private db-thread-pools (atom {})) - -;; easier just to use a lock for creating the pools rather than using `swap-vals!` and having to nuke a bunch of -;; thread pools that ultimately don't get used -(defonce ^:private db-thread-pool-lock (Object.)) - -(defn- new-thread-pool ^ExecutorService [database-id] - (Executors/newFixedThreadPool - (max-simultaneous-queries-per-db) - (.build - (doto (BasicThreadFactory$Builder.) - (.namingPattern (format "qp-database-%d-threadpool-%%d" database-id)) - ;; Daemon threads do not block shutdown of the JVM - (.daemon true) - ;; Running queries should be lower priority than other stuff e.g. API responses - (.priority Thread/MIN_PRIORITY))))) - -(s/defn ^:private db-thread-pool :- ExecutorService - [database-or-id] - (let [id (u/get-id database-or-id)] - (or - (@db-thread-pools id) - (locking db-thread-pool-lock - (or - (@db-thread-pools id) - (log/debug (trs "Creating new query thread pool for Database {0}" id)) - (let [new-pool (new-thread-pool id)] - (swap! db-thread-pools assoc id new-pool) - new-pool)))))) - -(defn destroy-thread-pool! - "Destroy the QP thread pool for a Database (done automatically when DB is deleted.)" - [database-or-id] - (let [id (u/get-id database-or-id)] - (let [[{^ExecutorService thread-pool id}] (locking db-thread-pool-lock - (swap-vals! db-thread-pools dissoc id))] - (when thread-pool - (log/debug (trs "Destroying query thread pool for Database {0}" id)) - (.shutdownNow thread-pool))))) - -(def ^:private ^:dynamic *already-in-thread-pool?* - "True if the current thread is a thread pool thread from the a DB thread pool (i.e., if we're already running - asynchronously after waiting if needed.)" - false) - -(def ^:dynamic *disable-async-wait* - "Whether to disable async waiting entirely. Bind this to `true` for cases where we would not like to enforce async - waiting, such as for functions like `qp/query->native` that don't actually run queries. - - DO NOT BIND THIS TO TRUE IN SITUATIONS WHERE WE ACTUALLY RUN QUERIES: some functionality relies on the fact that - things are ran in a separate thread to function correctly, such as the cancellation code that listens for - InterruptedExceptions." - false) - -(defn- runnable ^Runnable [qp query rff context] - (bound-fn [] - (binding [*already-in-thread-pool?* true] - (try - (qp query rff context) - (catch Throwable e - (context/raisef e context)))))) - -(defn- run-in-thread-pool [qp {database-id :database, :as query} rff context] - {:pre [(integer? database-id)]} - (try - (let [pool (db-thread-pool database-id) - futur (.submit pool (runnable qp query rff context)) - canceled-chan (context/canceled-chan context)] - (a/go - (when (a/<! canceled-chan) - (log/debug (trs "Request canceled, canceling pending query")) - (future-cancel futur)))) - (catch Throwable e - (context/raisef e context))) - nil) - -(defn wait-for-turn - "Middleware that throttles the number of concurrent queries for each connected database, parking the thread until it - is allowed to run." - [qp] - (fn [query rff context] - {:pre [(map? query)]} - (if (or *already-in-thread-pool?* *disable-async-wait*) - (qp query rff context) - (run-in-thread-pool qp query rff context)))) diff --git a/src/metabase/query_processor/middleware/process_userland_query.clj b/src/metabase/query_processor/middleware/process_userland_query.clj index 75d240b191c..80284cffdd1 100644 --- a/src/metabase/query_processor/middleware/process_userland_query.clj +++ b/src/metabase/query_processor/middleware/process_userland_query.clj @@ -9,9 +9,7 @@ [query-execution :as query-execution :refer [QueryExecution]]] [metabase.query-processor.util :as qputil] [metabase.util.i18n :refer [trs]] - [toucan.db :as db]) - (:import [java.util.concurrent Executors Future] - org.apache.commons.lang3.concurrent.BasicThreadFactory$Builder)) + [toucan.db :as db])) (defn- add-running-time [{start-time-ms :start_time_millis, :as query-execution}] (-> query-execution @@ -28,7 +26,7 @@ ;; ;; Async seems like it makes sense from a performance standpoint, but should we have some sort of shared threadpool ;; for other places where we would want to do async saves (such as results-metadata for Cards?) -(defn- save-query-execution! +(defn- save-query-execution!* "Save a `QueryExecution` and update the average execution time for the corresponding `Query`." [{query :json_query, query-hash :hash, running-time :running_time, context :context :as query-execution}] (query/save-query-and-update-average-execution-time! query query-hash running-time) @@ -36,42 +34,22 @@ (log/warn (trs "Cannot save QueryExecution, missing :context")) (db/insert! QueryExecution (dissoc query-execution :json_query)))) -(def ^:private ^Long thread-pool-size 4) - -(def ^:private ^{:arglists '(^java.util.concurrent.ExecutorService [])} thread-pool - "Thread pool for asynchronously saving query executions." - (let [pool (delay - (Executors/newFixedThreadPool - thread-pool-size - (.build - (doto (BasicThreadFactory$Builder.) - (.namingPattern "save-query-execution-thread-pool-%d") - ;; Daemon threads do not block shutdown of the JVM - (.daemon true) - ;; Save query executions should be lower priority than other stuff e.g. API responses - (.priority Thread/MIN_PRIORITY)))))] - (fn [] @pool))) - -(defn- save-query-execution-async! - "Asynchronously save a `QueryExecution` row containing `execution-info`. This is done when a query is finished, so +(defn- save-query-execution! + "Save a `QueryExecution` row containing `execution-info`. This is done when a query is finished, so regardless of whether results streaming is canceled, we want to continue the save; for this reason, we don't call `future-cancel` if we get a message to `canceled-chan` the way we normally do." - ^Future [execution-info] - (log/trace "Saving QueryExecution info asynchronously") - (let [execution-info (add-running-time execution-info) - ^Runnable task (bound-fn [] - (try - (save-query-execution! execution-info) - (catch Throwable e - (log/error e (trs "Error saving query execution info")))) - nil)] - (.submit (thread-pool) task))) + [execution-info] + (log/trace "Saving QueryExecution info") + (try + (save-query-execution!* (add-running-time execution-info)) + (catch Throwable e + (log/error e (trs "Error saving query execution info"))))) -(defn- save-successful-query-execution-async! [query-execution result-rows] - (save-query-execution-async! (assoc query-execution :result_rows result-rows))) +(defn- save-successful-query-execution! [query-execution result-rows] + (save-query-execution! (assoc query-execution :result_rows result-rows))) -(defn- save-failed-query-execution-async! [query-execution message] - (save-query-execution-async! (assoc query-execution :error (str message)))) +(defn- save-failed-query-execution! [query-execution message] + (save-query-execution! (assoc query-execution :error (str message)))) ;;; +----------------------------------------------------------------------------------------------------------------+ @@ -100,7 +78,7 @@ (rf)) ([acc] - (save-successful-query-execution-async! execution-info @row-count) + (save-successful-query-execution! execution-info @row-count) (rf (if (map? acc) (success-response execution-info acc) acc))) @@ -142,7 +120,7 @@ (letfn [(rff* [metadata] (add-and-save-execution-info-xform! metadata execution-info (rff metadata))) (raisef* [^Throwable e context] - (save-failed-query-execution-async! execution-info (.getMessage e)) + (save-failed-query-execution! execution-info (.getMessage e)) (raisef (ex-info (.getMessage e) {:query-execution execution-info} e) diff --git a/src/metabase/query_processor/reducible.clj b/src/metabase/query_processor/reducible.clj index 2f888bd6ca2..c436f48df17 100644 --- a/src/metabase/query_processor/reducible.clj +++ b/src/metabase/query_processor/reducible.clj @@ -22,7 +22,7 @@ (cond (not= port out-chan) (context/timeoutf context) (nil? val) (context/cancelf context)) - (log/tracef "Closing out-chan and canceled-chan.") + (log/tracef "Closing out-chan.") (a/close! out-chan) (a/close! canceled-chan))) nil)) diff --git a/src/metabase/query_processor/streaming.clj b/src/metabase/query_processor/streaming.clj index d545a7a64c5..8cb13d5d60a 100644 --- a/src/metabase/query_processor/streaming.clj +++ b/src/metabase/query_processor/streaming.clj @@ -47,25 +47,33 @@ the normal `streaming-response` macro, which is geared toward Ring responses. (with-open [os ...] - (qp/process-query query (qp.streaming/streaming-context :csv os)))" - [export-format ^OutputStream os] - (let [results-writer (i/streaming-results-writer export-format os)] - {:rff (streaming-rff results-writer) - :reducedf (streaming-reducedf results-writer os)})) + (qp/process-query query (qp.streaming/streaming-context :csv os canceled-chan)))" + ([export-format ^OutputStream os] + (let [results-writer (i/streaming-results-writer export-format os)] + {:rff (streaming-rff results-writer) + :reducedf (streaming-reducedf results-writer os)})) + + ([export-format os canceled-chan] + (assoc (streaming-context export-format os) :canceled-chan canceled-chan))) + +(defn- await-async-result [out-chan canceled-chan] + ;; if we get a cancel message, close `out-chan` so the query will be canceled + (a/go + (when (a/<! canceled-chan) + (a/close! out-chan))) + ;; block until `out-chan` closes or gets a result + (a/<!! out-chan)) (defn streaming-response* "Impl for `streaming-response`." [export-format f] (streaming-response/streaming-response (i/stream-options export-format) [os canceled-chan] (let [result (try - (f (streaming-context export-format os)) + (f (streaming-context export-format os canceled-chan)) (catch Throwable e e)) result (if (instance? ManyToManyChannel result) - (let [[val port] (a/alts!! [result canceled-chan])] - (when (= port canceled-chan) - (a/close! result)) - val) + (await-async-result result canceled-chan) result)] (when (or (instance? Throwable result) (= (:status result) :failed)) diff --git a/test/metabase/api/dataset_test.clj b/test/metabase/api/dataset_test.clj index 7a169453df1..2c30b585c1d 100644 --- a/test/metabase/api/dataset_test.clj +++ b/test/metabase/api/dataset_test.clj @@ -21,17 +21,12 @@ [permissions-group :as group] [query-execution :refer [QueryExecution]]] [metabase.query-processor.middleware.constraints :as constraints] - [metabase.test - [data :as data] - [util :as tu]] [metabase.test.data [dataset-definitions :as defs] - [datasets :as datasets] [users :as test-users]] - [metabase.test.util.log :as tu.log] + [metabase.test.util :as tu] [schema.core :as s] - [toucan.db :as db] - [toucan.util.test :as tt]) + [toucan.db :as db]) (:import com.fasterxml.jackson.core.JsonGenerator)) (defn- format-response [m] @@ -86,7 +81,7 @@ :row_count 1 :result_rows 1 :context :ad-hoc - :executor_id (test-users/user->id :rasta) + :executor_id (mt/user->id :rasta) :native false :pulse_id nil :card_id nil @@ -111,7 +106,7 @@ error-message (re-find #"Syntax error in SQL statement") boolean)))) - result (tu.log/suppress-output + result (mt/suppress-output ((mt/user->client :rasta) :post 202 "dataset" {:database (mt/id) :type "native" :native {:query "foobar"}}))] @@ -143,7 +138,7 @@ :database_id (mt/id) :started_at true :running_time true - :executor_id (test-users/user->id :rasta) + :executor_id (mt/user->id :rasta) :native true :pulse_id nil :card_id nil @@ -188,8 +183,8 @@ ["3" "2014-09-15" "8" "56"] ["4" "2014-03-11" "5" "4"] ["5" "2013-05-05" "3" "49"]] - (let [result ((test-users/user->client :rasta) :post 202 "dataset/csv" :query - (json/generate-string (data/mbql-query checkins)))] + (let [result ((mt/user->client :rasta) :post 202 "dataset/csv" :query + (json/generate-string (mt/mbql-query checkins)))] (take 5 (parse-and-sort-csv result)))))) (deftest download-response-headers-test @@ -201,28 +196,28 @@ "X-Accel-Buffering" "no"} (-> (http-client/client-full-response (test-users/username->token :rasta) :post 202 "dataset/csv" - :query (json/generate-string (data/mbql-query checkins {:limit 1}))) + :query (json/generate-string (mt/mbql-query checkins {:limit 1}))) :headers (select-keys ["Cache-Control" "Content-Disposition" "Content-Type" "Expires" "X-Accel-Buffering"]) (update "Content-Disposition" #(some-> % (str/replace #"query_result_.+(\.\w+)" "query_result_<timestamp>$1")))))))) (deftest check-an-empty-date-column - (is (= [["1" "2014-04-07" "" "5" "12"] - ["2" "2014-09-18" "" "1" "31"] - ["3" "2014-09-15" "" "8" "56"] - ["4" "2014-03-11" "" "5" "4"] - ["5" "2013-05-05" "" "3" "49"]] - (data/dataset defs/test-data-with-null-date-checkins - (let [result ((test-users/user->client :rasta) :post 202 "dataset/csv" :query - (json/generate-string (data/mbql-query checkins)))] + (mt/dataset defs/test-data-with-null-date-checkins + (let [result ((mt/user->client :rasta) :post 202 "dataset/csv" :query + (json/generate-string (mt/mbql-query checkins)))] + (is (= [["1" "2014-04-07" "" "5" "12"] + ["2" "2014-09-18" "" "1" "31"] + ["3" "2014-09-15" "" "8" "56"] + ["4" "2014-03-11" "" "5" "4"] + ["5" "2013-05-05" "" "3" "49"]] (take 5 (parse-and-sort-csv result))))))) (deftest sqlite-datetime-test (mt/test-driver :sqlite (testing "SQLite doesn't return proper date objects but strings, they just pass through the qp untouched" - (let [result ((test-users/user->client :rasta) :post 202 "dataset/csv" :query - (json/generate-string (data/mbql-query checkins {:order-by [[:asc $id]], :limit 5})))] + (let [result ((mt/user->client :rasta) :post 202 "dataset/csv" :query + (json/generate-string (mt/mbql-query checkins {:order-by [[:asc $id]], :limit 5})))] (is (= [["1" "2014-04-07" "5" "12"] ["2" "2014-09-18" "1" "31"] ["3" "2014-09-15" "8" "56"] @@ -231,8 +226,8 @@ (parse-and-sort-csv result))))))) (deftest datetime-fields-are-untouched-when-exported - (let [result ((test-users/user->client :rasta) :post 202 "dataset/csv" :query - (json/generate-string (data/mbql-query users {:order-by [[:asc $id]], :limit 5})))] + (let [result ((mt/user->client :rasta) :post 202 "dataset/csv" :query + (json/generate-string (mt/mbql-query users {:order-by [[:asc $id]], :limit 5})))] (is (= [["1" "Plato Yeshua" "2014-04-01T08:30:00"] ["2" "Felipinho Asklepios" "2014-12-05T15:15:00"] ["3" "Kaneonuskatew Eiran" "2014-11-06T16:15:00"] @@ -241,10 +236,10 @@ (parse-and-sort-csv result))))) (deftest check-that-we-can-export-the-results-of-a-nested-query - (tt/with-temp Card [card {:dataset_query {:database (data/id) + (mt/with-temp Card [card {:dataset_query {:database (mt/id) :type :native :native {:query "SELECT * FROM USERS;"}}}] - (let [result ((test-users/user->client :rasta) :post 202 "dataset/csv" + (let [result ((mt/user->client :rasta) :post 202 "dataset/csv" :query (json/generate-string {:database mbql.s/saved-questions-virtual-database-id :type :query @@ -261,11 +256,11 @@ ;; from one that had it -- see #9831) (deftest formatted-results-ignore-query-constraints (with-redefs [constraints/default-query-constraints {:max-results 10, :max-results-bare-rows 10}] - (let [result ((test-users/user->client :rasta) :post 202 "dataset/csv" + (let [result ((mt/user->client :rasta) :post 202 "dataset/csv" :query (json/generate-string - {:database (data/id) + {:database (mt/id) :type :query - :query {:source-table (data/id :venues)} + :query {:source-table (mt/id :venues)} :middleware {:add-default-userland-constraints? true :userland-query? true}}))] @@ -279,16 +274,16 @@ (deftest non--download--queries-should-still-get-the-default-constraints (with-redefs [constraints/default-query-constraints {:max-results 10, :max-results-bare-rows 10}] (let [{row-count :row_count, :as result} - ((test-users/user->client :rasta) :post 202 "dataset" - {:database (data/id) + ((mt/user->client :rasta) :post 202 "dataset" + {:database (mt/id) :type :query - :query {:source-table (data/id :venues)}})] + :query {:source-table (mt/id :venues)}})] (is (= 10 (or row-count result)))))) (deftest check-permissions-test (testing "make sure `POST /dataset` calls check user permissions" - (data/with-temp-copy-of-db + (mt/with-temp-copy-of-db ;; give all-users *partial* permissions for the DB, so we know we're checking more than just read permissions for ;; the Database (perms/revoke-permissions! (group/all-users) (mt/id)) @@ -322,8 +317,8 @@ :filter [:= $date "2015-11-13"]}))))) (testing "\nshould require that the user have ad-hoc native perms for the DB" - (tu.log/suppress-output - (data/with-temp-copy-of-db + (mt/suppress-output + (mt/with-temp-copy-of-db ;; Give All Users permissions to see the `venues` Table, but not ad-hoc native perms (perms/revoke-permissions! (group/all-users) (mt/id)) (perms/grant-permissions! (group/all-users) (mt/id) "PUBLIC" (mt/id :venues)) @@ -335,9 +330,9 @@ {:fields [$id $name]})))))))))) (deftest report-timezone-test - (datasets/test-driver :postgres + (mt/test-driver :postgres (testing "expected (desired) and actual timezone should be returned as part of query results" - (tu/with-temporary-setting-values [report-timezone "US/Pacific"] + (mt/with-temporary-setting-values [report-timezone "US/Pacific"] (let [results ((mt/user->client :rasta) :post 202 "dataset" (mt/mbql-query checkins {:aggregation [[:count]]}))] (is (= {:requested_timezone "US/Pacific" diff --git a/test/metabase/api/embed_test.clj b/test/metabase/api/embed_test.clj index 6224a8fe7c0..955fcee8ebe 100644 --- a/test/metabase/api/embed_test.clj +++ b/test/metabase/api/embed_test.clj @@ -15,6 +15,7 @@ [http-client :as http] [models :refer [Card Dashboard DashboardCard DashboardCardSeries]] [query-processor-test :as qp.test] + [test :as mt] [util :as u]] [metabase.api [embed :as embed-api] @@ -23,7 +24,6 @@ [metabase.test [data :as data] [util :as tu]] - [metabase.test.util.log :as tu.log] [toucan.db :as db] [toucan.util.test :as tt]) (:import java.io.ByteArrayInputStream)) @@ -202,7 +202,7 @@ (testing (str "...but if the card has an invalid query we should just get a generic \"query failed\" " "exception (rather than leaking query info)") - (tu.log/suppress-output + (mt/suppress-output (with-temp-card [card {:enable_embedding true, :dataset_query {:database (data/id) :type :native :native {:query "SELECT * FROM XYZ"}}}] @@ -405,7 +405,7 @@ (deftest generic-query-failed-exception-test (testing (str "...but if the card has an invalid query we should just get a generic \"query failed\" exception " "(rather than leaking query info)") - (tu.log/suppress-output + (mt/suppress-output (with-embedding-enabled-and-new-secret-key (with-temp-dashcard [dashcard {:dash {:enable_embedding true} :card {:dataset_query (data/native-query {:query "SELECT * FROM XYZ"})}}] @@ -413,7 +413,6 @@ :error "An error occurred while running the query." } (http/client :get 202 (dashcard-url dashcard))))))))) - (deftest check-that-the-dashcard-endpoint-doesn-t-work-if-embedding-isn-t-enabled (tu/with-temporary-setting-values [enable-embedding false] (with-new-secret-key diff --git a/test/metabase/api/public_test.clj b/test/metabase/api/public_test.clj index 588c90ac0d5..5c76552dc97 100644 --- a/test/metabase/api/public_test.clj +++ b/test/metabase/api/public_test.clj @@ -8,19 +8,13 @@ [metabase [http-client :as http] [models :refer [Card Collection Dashboard DashboardCard DashboardCardSeries Dimension Field FieldValues]] - [query-processor-test :as qp.test] [test :as mt] [util :as u]] [metabase.api.public :as public-api] [metabase.models [permissions :as perms] [permissions-group :as group]] - [metabase.test - [data :as data] - [util :as tu]] - [metabase.test.data.users :as test-users] - [toucan.db :as db] - [toucan.util.test :as tt]) + [toucan.db :as db]) (:import java.io.ByteArrayInputStream java.util.UUID)) @@ -32,11 +26,11 @@ (defn- shared-obj [] {:public_uuid (str (UUID/randomUUID)) - :made_public_by_id (test-users/user->id :crowberto)}) + :made_public_by_id (mt/user->id :crowberto)}) (defmacro ^:private with-temp-public-card {:style/indent 1} [[binding & [card]] & body] `(let [card-settings# (merge (count-of-venues-card) (shared-obj) ~card)] - (tt/with-temp Card [card# card-settings#] + (mt/with-temp Card [card# card-settings#] ;; add :public_uuid back in to the value that gets bound because it might not come back from post-select if ;; public sharing is disabled; but we still want to test it (let [~binding (assoc card# :public_uuid (:public_uuid card-settings#))] @@ -47,11 +41,11 @@ {:parameters [{:name "Venue ID" :slug "venue_id" :type "id" - :target [:dimension (data/id :venues :id)] + :target [:dimension (mt/id :venues :id)] :default nil}]} (shared-obj) ~dashboard)] - (tt/with-temp Dashboard [dashboard# dashboard-settings#] + (mt/with-temp Dashboard [dashboard# dashboard-settings#] (let [~binding (assoc dashboard# :public_uuid (:public_uuid dashboard-settings#))] ~@body)))) @@ -74,26 +68,26 @@ ;;; ------------------------------------------- GET /api/public/card/:uuid ------------------------------------------- (deftest check-that-we--cannot--fetch-a-publiccard-if-the-setting-is-disabled - (tu/with-temporary-setting-values [enable-public-sharing false] + (mt/with-temporary-setting-values [enable-public-sharing false] (with-temp-public-card [{uuid :public_uuid}] (is (= "An error occurred." (http/client :get 400 (str "public/card/" uuid))))))) (deftest check-that-we-get-a-400-if-the-publiccard-doesn-t-exist - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (is (= "An error occurred." (http/client :get 400 (str "public/card/" (UUID/randomUUID))))))) (deftest check-that-we--cannot--fetch-a-publiccard-if-the-card-has-been-archived - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (with-temp-public-card [{uuid :public_uuid} {:archived true}] (is (= "An error occurred." (http/client :get 400 (str "public/card/" uuid))))))) (deftest check-that-we-can-fetch-a-publiccard - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (with-temp-public-card [{uuid :public_uuid}] (is (= #{:dataset_query :description :display :id :name :visualization_settings :param_values :param_fields} (set (keys (http/client :get 200 (str "public/card/" uuid))))))))) @@ -101,8 +95,8 @@ (deftest make-sure--param-values-get-returned-as-expected - (tt/with-temp Card [card {:dataset_query - {:database (data/id) + (mt/with-temp Card [card {:dataset_query + {:database (mt/id) :type :native :native {:query (str "SELECT COUNT(*) " "FROM venues " @@ -112,15 +106,15 @@ :template-tags {:category {:name "category" :display-name "Category" :type "dimension" - :dimension ["field-id" (data/id :categories :name)] + :dimension ["field-id" (mt/id :categories :name)] :widget-type "category" :required true}}}}}] - (is (= {(data/id :categories :name) {:values 75 + (is (= {(mt/id :categories :name) {:values 75 :human_readable_values {} - :field_id (data/id :categories :name)}} + :field_id (mt/id :categories :name)}} (-> (:param_values (#'public-api/public-card :id (u/get-id card))) - (update-in [(data/id :categories :name) :values] count) - (update (data/id :categories :name) #(into {} %))))))) + (update-in [(mt/id :categories :name) :values] count) + (update (mt/id :categories :name) #(into {} %))))))) @@ -129,18 +123,18 @@ (deftest check-that-we--cannot--execute-a-publiccard-if-the-setting-is-disabled (is (= "An error occurred." - (tu/with-temporary-setting-values [enable-public-sharing false] + (mt/with-temporary-setting-values [enable-public-sharing false] (with-temp-public-card [{uuid :public_uuid}] (http/client :get 400 (str "public/card/" uuid "/query"))))))) (deftest check-that-we-get-a-400-if-the-publiccard-doesn-t-exist (is (= "An error occurred." - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (http/client :get 400 (str "public/card/" (UUID/randomUUID) "/query")))))) (deftest check-that-we--cannot--execute-a-publiccard-if-the-card-has-been-archived (is (= "An error occurred." - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (with-temp-public-card [{uuid :public_uuid} {:archived true}] (http/client :get 400 (str "public/card/" uuid "/query"))))))) @@ -152,7 +146,7 @@ (deftest execute-public-card-test (testing "GET /api/public/card/:uuid/query" - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (with-temp-public-card [{uuid :public_uuid}] (testing "Default :api response format" (is (= [[100]] @@ -174,7 +168,7 @@ (deftest execute-public-card-as-user-without-perms-test (testing "A user that doesn't have permissions to run the query normally should still be able to run a public Card as if they weren't logged in" (mt/with-temporary-setting-values [enable-public-sharing true] - (tt/with-temp Collection [{collection-id :id}] + (mt/with-temp Collection [{collection-id :id}] (perms/revoke-collection-permissions! (group/all-users) collection-id) (with-temp-public-card [{card-id :id, uuid :public_uuid} {:collection_id collection-id}] (is (= "You don't have permissions to do that." @@ -186,7 +180,7 @@ (deftest check-that-we-can-exec-a-publiccard-with---parameters- (is (= [{:name "Venue ID", :slug "venue_id", :type "id", :value 2}] - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (with-temp-public-card [{uuid :public_uuid}] (get-in (http/client :get 202 (str "public/card/" uuid "/query") :parameters (json/encode [{:name "Venue ID", :slug "venue_id", :type "id", :value 2}])) @@ -194,10 +188,10 @@ ;; Cards with required params (defn- do-with-required-param-card [f] - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (with-temp-public-card [{uuid :public_uuid} {:dataset_query - {:database (data/id) + {:database (mt/id) :type :native :native {:query "SELECT count(*) FROM venues v WHERE price = {{price}}" :template-tags {"price" {:name "price" @@ -206,45 +200,43 @@ :required true}}}}}] (f uuid)))) +(defmacro ^:private with-required-param-card [[uuid-binding] & body] + `(do-with-required-param-card (fn [~uuid-binding] ~@body))) (deftest should-be-able-to-run-a-card-with-a-required-param - (is (= [[22]] - (do-with-required-param-card - (fn [uuid] - (qp.test/rows + (with-required-param-card [uuid] + (is (= [[22]] + (mt/rows (http/client :get 202 (str "public/card/" uuid "/query") :parameters (json/encode [{:type "category" :target [:variable [:template-tag "price"]] - :value 1}])))))))) + :value 1}]))))))) (deftest missing-required-param-error-message-test (testing (str "If you're missing a required param, the error message should get passed thru, rather than the normal " "generic 'Query Failed' message that we show for most embedding errors") - (is (= {:status "failed" - :error "You'll need to pick a value for 'Price' before this query can run." - :error_type "missing-required-parameter"} - (do-with-required-param-card - (fn [uuid] - (http/client :get 202 (str "public/card/" uuid "/query")))))))) - - + (with-required-param-card [uuid] + (is (= {:status "failed" + :error "You'll need to pick a value for 'Price' before this query can run." + :error_type "missing-required-parameter"} + (http/client :get 202 (str "public/card/" uuid "/query"))))))) (defn- card-with-date-field-filter [] (assoc (shared-obj) - :dataset_query {:database (data/id) + :dataset_query {:database (mt/id) :type :native :native {:query "SELECT COUNT(*) AS \"count\" FROM CHECKINS WHERE {{date}}" :template-tags {:date {:name "date" :display-name "Date" :type "dimension" - :dimension [:field-id (data/id :checkins :date)] + :dimension [:field-id (mt/id :checkins :date)] :widget-type "date/quarter-year"}}}})) (deftest make-sure-csv--etc---downloads-take-editable-params-into-account---6407---- (is (= "count\n107\n" - (tu/with-temporary-setting-values [enable-public-sharing true] - (tt/with-temp Card [{uuid :public_uuid} (card-with-date-field-filter)] + (mt/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temp Card [{uuid :public_uuid} (card-with-date-field-filter)] (http/client :get 202 (str "public/card/" uuid "/query/csv") :parameters (json/encode [{:type :date/quarter-year :target [:dimension [:template-tag :date]] @@ -252,12 +244,12 @@ (deftest make-sure-it-also-works-with-the-forwarded-url - (is (= "count\n107\n" - (tu/with-temporary-setting-values [enable-public-sharing true] - (tt/with-temp Card [{uuid :public_uuid} (card-with-date-field-filter)] - ;; make sure the URL doesn't include /api/ at the beginning like it normally would - (binding [http/*url-prefix* (str/replace http/*url-prefix* #"/api/$" "/")] - (tu/with-temporary-setting-values [site-url http/*url-prefix*] + (mt/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temp Card [{uuid :public_uuid} (card-with-date-field-filter)] + ;; make sure the URL doesn't include /api/ at the beginning like it normally would + (binding [http/*url-prefix* (str/replace http/*url-prefix* #"/api/$" "/")] + (mt/with-temporary-setting-values [site-url http/*url-prefix*] + (is (= "count\n107\n" (http/client :get 202 (str "public/question/" uuid ".csv") :parameters (json/encode [{:type :date/quarter-year :target [:dimension [:template-tag :date]] @@ -265,16 +257,16 @@ (defn- card-with-trendline [] (assoc (shared-obj) - :dataset_query {:database (data/id) + :dataset_query {:database (mt/id) :type :query - :query {:source-table (data/id :checkins) - :breakout [[:datetime-field [:field-id (data/id :checkins :date)] :month]] + :query {:source-table (mt/id :checkins) + :breakout [[:datetime-field [:field-id (mt/id :checkins :date)] :month]] :aggregation [[:count]]}})) (deftest make-sure-we-include-all-the-relevant-fields-like-insights (is (= #{:cols :rows :insights :results_timezone} - (tu/with-temporary-setting-values [enable-public-sharing true] - (tt/with-temp Card [{uuid :public_uuid} (card-with-trendline)] + (mt/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temp Card [{uuid :public_uuid} (card-with-trendline)] (-> (http/client :get 202 (str "public/card/" uuid "/query")) :data keys @@ -285,13 +277,13 @@ (deftest check-that-we--cannot--fetch-publicdashboard-if-setting-is-disabled (is (= "An error occurred." - (tu/with-temporary-setting-values [enable-public-sharing false] + (mt/with-temporary-setting-values [enable-public-sharing false] (with-temp-public-dashboard [{uuid :public_uuid}] (http/client :get 400 (str "public/dashboard/" uuid))))))) (deftest check-that-we-get-a-400-if-the-publicdashboard-doesn-t-exis (is (= "An error occurred." - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (http/client :get 400 (str "public/dashboard/" (UUID/randomUUID))))))) (defn- fetch-public-dashboard [{uuid :public_uuid}] @@ -303,13 +295,13 @@ (deftest check-that-we-can-fetch-a-publicdashboard (is (= {:name true, :ordered_cards 1} - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (with-temp-public-dashboard-and-card [dash card] (fetch-public-dashboard dash)))))) (deftest check-that-we-don-t-see-cards-that-have-been-archived (is (= {:name true, :ordered_cards 0} - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (with-temp-public-dashboard-and-card [dash card] (db/update! Card (u/get-id card), :archived true) (fetch-public-dashboard dash)))))) @@ -323,62 +315,62 @@ (deftest check-that-we--cannot--exec-publiccard-via-publicdashboard-if-setting-is-disabled (is (= "An error occurred." - (tu/with-temporary-setting-values [enable-public-sharing false] + (mt/with-temporary-setting-values [enable-public-sharing false] (with-temp-public-dashboard-and-card [dash card] (http/client :get 400 (dashcard-url dash card))))))) (deftest check-that-we-get-a-400-if-publicdashboard-doesn-t-exist (is (= "An error occurred." - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (with-temp-public-dashboard-and-card [_ card] (http/client :get 400 (dashcard-url {:public_uuid (UUID/randomUUID)} card))))))) (deftest check-that-we-get-a-400-if-publiccard-doesn-t-exist (is (= "An error occurred." - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (with-temp-public-dashboard-and-card [dash _] (http/client :get 400 (dashcard-url dash Integer/MAX_VALUE))))))) (deftest check-that-we-get-a-400-if-the-card-does-exist-but-it-s-not-part-of-this-dashboard (is (= "An error occurred." - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (with-temp-public-dashboard-and-card [dash _] - (tt/with-temp Card [card] + (mt/with-temp Card [card] (http/client :get 400 (dashcard-url dash card)))))))) (deftest check-that-we--cannot--execute-a-publiccard-via-a-publicdashboard-if-the-card-has-been-archived (is (= "An error occurred." - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (with-temp-public-dashboard-and-card [dash card] (db/update! Card (u/get-id card), :archived true) (http/client :get 400 (dashcard-url dash card))))))) (deftest check-that-we-can-exec-a-publiccard-via-a-publicdashboard (is (= [[100]] - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (with-temp-public-dashboard-and-card [dash card] - (qp.test/rows (http/client :get 202 (dashcard-url dash card)))))))) + (mt/rows (http/client :get 202 (dashcard-url dash card)))))))) (deftest check-that-we-can-exec-a-publiccard-via-a-publicdashboard-with---parameters- (is (= [{:name "Venue ID" :slug "venue_id" - :target ["dimension" (data/id :venues :id)] + :target ["dimension" (mt/id :venues :id)] :value [10] :default nil :type "id"}] - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (with-temp-public-dashboard-and-card [dash card] (get-in (http/client :get 202 (dashcard-url dash card) :parameters (json/encode [{:name "Venue ID" :slug :venue_id - :target [:dimension (data/id :venues :id)] + :target [:dimension (mt/id :venues :id)] :value [10]}])) [:json_query :parameters])))))) (deftest execute-public-dashcard-as-user-without-perms-test (testing "A user that doesn't have permissions to run the query normally should still be able to run a public DashCard" (mt/with-temporary-setting-values [enable-public-sharing true] - (tt/with-temp Collection [{collection-id :id}] + (mt/with-temp Collection [{collection-id :id}] (perms/revoke-collection-permissions! (group/all-users) collection-id) (with-temp-public-dashboard-and-card [dash {card-id :id, :as card}] (db/update! Card card-id :collection_id collection-id) @@ -393,42 +385,42 @@ ;; Make sure params are validated: this should pass because venue_id *is* one of the Dashboard's :parameters (deftest params-are-validated (is (= [[1]] - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (with-temp-public-dashboard-and-card [dash card] (-> (http/client :get 202 (dashcard-url dash card) :parameters (json/encode [{:name "Venue ID" :slug :venue_id - :target [:dimension (data/id :venues :id)] + :target [:dimension (mt/id :venues :id)] :value [10]}])) - qp.test/rows)))))) + mt/rows)))))) (deftest make-sure-params-are-validated--this-should-fail-because-venue-name-is--not--one-of-the-dashboard-s--parameters (is (= "An error occurred." - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (with-temp-public-dashboard-and-card [dash card] (http/client :get 400 (dashcard-url dash card) :parameters (json/encode [{:name "Venue Name" :slug :venue_name - :target [:dimension (data/id :venues :name)] + :target [:dimension (mt/id :venues :name)] :value ["PizzaHacker"]}]))))))) (deftest check-that-an-additional-card-series-works-as-well (is (= [[100]] - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (with-temp-public-dashboard-and-card [dash card] (with-temp-public-card [card-2] - (tt/with-temp DashboardCardSeries [_ {:dashboardcard_id (db/select-one-id DashboardCard + (mt/with-temp DashboardCardSeries [_ {:dashboardcard_id (db/select-one-id DashboardCard :card_id (u/get-id card) :dashboard_id (u/get-id dash)) :card_id (u/get-id card-2)}] - (qp.test/rows (http/client :get 202 (dashcard-url dash card-2)))))))))) + (mt/rows (http/client :get 202 (dashcard-url dash card-2)))))))))) (deftest make-sure-that-parameters-actually-work-correctly---7212- (is (= [[50]] - (tu/with-temporary-setting-values [enable-public-sharing true] - (tt/with-temp Card [card {:dataset_query {:database (data/id) + (mt/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temp Card [card {:dataset_query {:database (mt/id) :type :native :native {:query "SELECT {{num}} AS num" :template-tags {:num {:name "num" @@ -445,21 +437,21 @@ :target [:variable [:template-tag :num]] :parameter_id "537e37b4"}]) - (-> ((test-users/user->client :crowberto) + (-> ((mt/user->client :crowberto) :get (str (dashcard-url dash card) "?parameters=" (json/generate-string [{:type :category :target [:variable [:template-tag :num]] :value "50"}]))) - qp.test/rows))))))) + mt/rows))))))) (deftest ---with-mbql-cards-as-well--- (is (= [[1]] - (tu/with-temporary-setting-values [enable-public-sharing true] - (tt/with-temp Card [card {:dataset_query {:database (data/id) + (mt/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temp Card [card {:dataset_query {:database (mt/id) :type :query - :query {:source-table (data/id :venues) + :query {:source-table (mt/id :venues) :aggregation [:count]}}}] (with-temp-public-dashboard [dash {:parameters [{:name "Venue ID" :slug "venue_id" @@ -470,23 +462,23 @@ :card_id (u/get-id card) :target [:dimension [:field-id - (data/id :venues :id)]]}]) - (-> ((test-users/user->client :crowberto) + (mt/id :venues :id)]]}]) + (-> ((mt/user->client :crowberto) :get (str (dashcard-url dash card) "?parameters=" (json/generate-string [{:type :id - :target [:dimension [:field-id (data/id :venues :id)]] + :target [:dimension [:field-id (mt/id :venues :id)]] :value "50"}]))) - qp.test/rows))))))) + mt/rows))))))) (deftest ---and-also-for-datetime-params (is (= [[733]] - (tu/with-temporary-setting-values [enable-public-sharing true] - (tt/with-temp Card [card {:dataset_query {:database (data/id) + (mt/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temp Card [card {:dataset_query {:database (mt/id) :type :query - :query {:source-table (data/id :checkins) + :query {:source-table (mt/id :checkins) :aggregation [:count]}}}] (with-temp-public-dashboard [dash {:parameters [{:name "Date Filter" :slug "date_filter" @@ -497,15 +489,15 @@ :card_id (u/get-id card) :target [:dimension [:field-id - (data/id :checkins :date)]]}]) - (-> ((test-users/user->client :crowberto) + (mt/id :checkins :date)]]}]) + (-> ((mt/user->client :crowberto) :get (str (dashcard-url dash card) "?parameters=" (json/generate-string [{:type "date/all-options" - :target [:dimension [:field-id (data/id :checkins :date)]] + :target [:dimension [:field-id (mt/id :checkins :date)]] :value "~2015-01-01"}]))) - qp.test/rows))))))) + mt/rows))))))) ;; make sure DimensionValue params also work if they have a default value, even if some is passed in for some reason @@ -515,8 +507,8 @@ (deftest dimensionvalue-params-work (is (= [["Wow"]] - (tu/with-temporary-setting-values [enable-public-sharing true] - (tt/with-temp Card [card {:dataset_query {:database (data/id) + (mt/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temp Card [card {:dataset_query {:database (mt/id) :type :native :native {:query "SELECT {{msg}} AS message" :template-tags {:msg {:id "181da7c5" @@ -533,7 +525,7 @@ :parameter_mappings [{:card_id (u/get-id card) :target [:variable [:template-tag :msg]] :parameter_id "181da7c5"}]) - (-> ((test-users/user->client :crowberto) + (-> ((mt/user->client :crowberto) :get (str (dashcard-url dash card) "?parameters=" (json/generate-string @@ -541,18 +533,18 @@ :target [:variable [:template-tag :msg]] :value nil :default "Hello"}]))) - qp.test/rows))))))) + mt/rows))))))) ;;; --------------------------- Check that parameter information comes back with Dashboard --------------------------- (deftest double-check-that-the-field-has-fieldvalues (is (= [1 2 3 4] - (db/select-one-field :values FieldValues :field_id (data/id :venues :price))))) + (db/select-one-field :values FieldValues :field_id (mt/id :venues :price))))) (defn- price-param-values [] - {(keyword (str (data/id :venues :price))) {:values [1 2 3 4] + {(keyword (str (mt/id :venues :price))) {:values [1 2 3 4] :human_readable_values {} - :field_id (data/id :venues :price)}}) + :field_id (mt/id :venues :price)}}) (defn- add-price-param-to-dashboard! [dashboard] (db/update! Dashboard (u/get-id dashboard) :parameters [{:name "Price", :type "category", :slug "price"}])) @@ -562,19 +554,19 @@ :target ["dimension" dimension]}])) (defn- GET-param-values [dashboard] - (tu/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temporary-setting-values [enable-public-sharing true] (:param_values (http/client :get 200 (str "public/dashboard/" (:public_uuid dashboard)))))) (deftest check-that-param-info-comes-back-for-sql-cards (is (= (price-param-values) (with-temp-public-dashboard-and-card [dash card dashcard] (db/update! Card (u/get-id card) - :dataset_query {:database (data/id) + :dataset_query {:database (mt/id) :type :native :native {:template-tags {:price {:name "price" :display-name "Price" :type "dimension" - :dimension ["field-id" (data/id :venues :price)]}}}}) + :dimension ["field-id" (mt/id :venues :price)]}}}}) (add-price-param-to-dashboard! dash) (add-dimension-param-mapping-to-dashcard! dashcard card ["template-tag" "price"]) (GET-param-values dash))))) @@ -583,14 +575,14 @@ (is (= (price-param-values) (with-temp-public-dashboard-and-card [dash card dashcard] (add-price-param-to-dashboard! dash) - (add-dimension-param-mapping-to-dashcard! dashcard card ["field-id" (data/id :venues :price)]) + (add-dimension-param-mapping-to-dashcard! dashcard card ["field-id" (mt/id :venues :price)]) (GET-param-values dash))))) (deftest check-that-param-info-comes-back-for-mbql-cards--fk--- (is (= (price-param-values) (with-temp-public-dashboard-and-card [dash card dashcard] (add-price-param-to-dashboard! dash) - (add-dimension-param-mapping-to-dashcard! dashcard card ["fk->" (data/id :checkins :venue_id) (data/id :venues :price)]) + (add-dimension-param-mapping-to-dashcard! dashcard card ["fk->" (mt/id :checkins :venue_id) (mt/id :venues :price)]) (GET-param-values dash))))) ;;; +----------------------------------------------------------------------------------------------------------------+ @@ -598,60 +590,60 @@ ;;; +----------------------------------------------------------------------------------------------------------------+ (defn- mbql-card-referencing-nothing [] - {:dataset_query {:database (data/id) + {:dataset_query {:database (mt/id) :type :query - :query {:source-table (data/id :venues)}}}) + :query {:source-table (mt/id :venues)}}}) (defn mbql-card-referencing [table-kw field-kw] {:dataset_query - {:database (data/id) + {:database (mt/id) :type :query - :query {:source-table (data/id table-kw) - :filter [:= [:field-id (data/id table-kw field-kw)] "Krua Siri"]}}}) + :query {:source-table (mt/id table-kw) + :filter [:= [:field-id (mt/id table-kw field-kw)] "Krua Siri"]}}}) (defn- mbql-card-referencing-venue-name [] (mbql-card-referencing :venues :name)) (defn- sql-card-referencing-venue-name [] {:dataset_query - {:database (data/id) + {:database (mt/id) :type :native :native {:query "SELECT COUNT(*) FROM VENUES WHERE {{x}}" :template-tags {:x {:name :x :display-name "X" :type :dimension - :dimension [:field-id (data/id :venues :name)]}}}}}) + :dimension [:field-id (mt/id :venues :name)]}}}}}) ;;; ------------------------------------------- card->referenced-field-ids ------------------------------------------- (deftest card-referencing-nothing (is (= #{} - (tt/with-temp Card [card (mbql-card-referencing-nothing)] + (mt/with-temp Card [card (mbql-card-referencing-nothing)] (#'public-api/card->referenced-field-ids card))))) (deftest it-should-pick-up-on-fields-referenced-in-the-mbql-query-itself - (is (= #{(data/id :venues :name)} - (tt/with-temp Card [card (mbql-card-referencing-venue-name)] + (is (= #{(mt/id :venues :name)} + (mt/with-temp Card [card (mbql-card-referencing-venue-name)] (#'public-api/card->referenced-field-ids card))))) (deftest ---as-well-as-template-tag--implict--params-for-sql-queries - (is (= #{(data/id :venues :name)} - (tt/with-temp Card [card (sql-card-referencing-venue-name)] + (is (= #{(mt/id :venues :name)} + (mt/with-temp Card [card (sql-card-referencing-venue-name)] (#'public-api/card->referenced-field-ids card))))) ;;; --------------------------------------- check-field-is-referenced-by-card ---------------------------------------- (deftest check-that-the-check-succeeds-when-field-is-referenced - (tt/with-temp Card [card (mbql-card-referencing-venue-name)] - (#'public-api/check-field-is-referenced-by-card (data/id :venues :name) (u/get-id card)))) + (mt/with-temp Card [card (mbql-card-referencing-venue-name)] + (#'public-api/check-field-is-referenced-by-card (mt/id :venues :name) (u/get-id card)))) (deftest check-that-exception-is-thrown-if-the-field-isn-t-referenced (is (thrown? Exception - (tt/with-temp Card [card (mbql-card-referencing-venue-name)] - (#'public-api/check-field-is-referenced-by-card (data/id :venues :category_id) (u/get-id card)))))) + (mt/with-temp Card [card (mbql-card-referencing-venue-name)] + (#'public-api/check-field-is-referenced-by-card (mt/id :venues :category_id) (u/get-id card)))))) ;;; ----------------------------------------- check-search-field-is-allowed ------------------------------------------ @@ -659,30 +651,30 @@ ;; search field is allowed IF: ;; A) search-field is the same field as the other one (deftest search-field-allowed-if-same-field-as-other-one - (#'public-api/check-search-field-is-allowed (data/id :venues :id) (data/id :venues :id)) + (#'public-api/check-search-field-is-allowed (mt/id :venues :id) (mt/id :venues :id)) (is (thrown? Exception - (#'public-api/check-search-field-is-allowed (data/id :venues :id) (data/id :venues :category_id))))) + (#'public-api/check-search-field-is-allowed (mt/id :venues :id) (mt/id :venues :category_id))))) ;; B) there's a Dimension that lists search field as the human_readable_field for the other field (deftest search-field-allowed-with-dimension - (is (tt/with-temp Dimension [_ {:field_id (data/id :venues :id), :human_readable_field_id (data/id :venues :category_id)}] - (#'public-api/check-search-field-is-allowed (data/id :venues :id) (data/id :venues :category_id))))) + (is (mt/with-temp Dimension [_ {:field_id (mt/id :venues :id), :human_readable_field_id (mt/id :venues :category_id)}] + (#'public-api/check-search-field-is-allowed (mt/id :venues :id) (mt/id :venues :category_id))))) ;; C) search-field is a Name Field belonging to the same table as the other field, which is a PK (deftest search-field-allowed-with-name-field - (is (#'public-api/check-search-field-is-allowed (data/id :venues :id) (data/id :venues :name)))) + (is (#'public-api/check-search-field-is-allowed (mt/id :venues :id) (mt/id :venues :name)))) ;; not allowed if search field isn't a NAME (deftest search-field-not-allowed-if-search-field-isnt-a-name (is (thrown? Exception - (tu/with-temp-vals-in-db Field (data/id :venues :name) {:special_type "type/Latitude"} - (#'public-api/check-search-field-is-allowed (data/id :venues :id) (data/id :venues :name)))))) + (mt/with-temp-vals-in-db Field (mt/id :venues :name) {:special_type "type/Latitude"} + (#'public-api/check-search-field-is-allowed (mt/id :venues :id) (mt/id :venues :name)))))) (deftest not-allowed-if-search-field-belongs-to-a-different-table (is (thrown? Exception - (tu/with-temp-vals-in-db Field (data/id :categories :name) {:special_type "type/Name"} - (#'public-api/check-search-field-is-allowed (data/id :venues :id) (data/id :categories :name)))))) + (mt/with-temp-vals-in-db Field (mt/id :categories :name) {:special_type "type/Name"} + (#'public-api/check-search-field-is-allowed (mt/id :venues :id) (mt/id :categories :name)))))) @@ -692,35 +684,35 @@ {:dashboard_id (u/get-id dashboard) :card_id (u/get-id card) :parameter_mappings [{:card_id (u/get-id card) - :target [:dimension [:field-id (data/id :venues :id)]]}]}) + :target [:dimension [:field-id (mt/id :venues :id)]]}]}) (deftest field-is--referenced--by-dashboard-if-it-s-one-of-the-dashboard-s-params--- - (is (tt/with-temp* [Dashboard [dashboard] + (is (mt/with-temp* [Dashboard [dashboard] Card [card] DashboardCard [_ (dashcard-with-param-mapping-to-venue-id dashboard card)]] - (#'public-api/check-field-is-referenced-by-dashboard (data/id :venues :id) (u/get-id dashboard))))) + (#'public-api/check-field-is-referenced-by-dashboard (mt/id :venues :id) (u/get-id dashboard))))) (deftest TODO-name-this-exception (is (thrown? Exception - (tt/with-temp* [Dashboard [dashboard] + (mt/with-temp* [Dashboard [dashboard] Card [card] DashboardCard [_ (dashcard-with-param-mapping-to-venue-id dashboard card)]] - (#'public-api/check-field-is-referenced-by-dashboard (data/id :venues :name) (u/get-id dashboard)))))) + (#'public-api/check-field-is-referenced-by-dashboard (mt/id :venues :name) (u/get-id dashboard)))))) ;; ...*or* if it's a so-called "implicit" param (a Field Filter Template Tag (FFTT) in a SQL Card) (deftest implicit-param - (is (tt/with-temp* [Dashboard [dashboard] + (is (mt/with-temp* [Dashboard [dashboard] Card [card (sql-card-referencing-venue-name)] DashboardCard [_ {:dashboard_id (u/get-id dashboard), :card_id (u/get-id card)}]] - (#'public-api/check-field-is-referenced-by-dashboard (data/id :venues :name) (u/get-id dashboard)))) + (#'public-api/check-field-is-referenced-by-dashboard (mt/id :venues :name) (u/get-id dashboard)))) (is (thrown? Exception - (tt/with-temp* [Dashboard [dashboard] + (mt/with-temp* [Dashboard [dashboard] Card [card (sql-card-referencing-venue-name)] DashboardCard [_ {:dashboard_id (u/get-id dashboard), :card_id (u/get-id card)}]] - (#'public-api/check-field-is-referenced-by-dashboard (data/id :venues :id) (u/get-id dashboard)))))) + (#'public-api/check-field-is-referenced-by-dashboard (mt/id :venues :id) (u/get-id dashboard)))))) ;;; ------------------------------------------- card-and-field-id->values -------------------------------------------- @@ -730,9 +722,9 @@ ["33 Taps"] ["800 Degrees Neapolitan Pizzeria"] ["BCD Tofu House"]] - :field_id (data/id :venues :name)} - (tt/with-temp Card [card (mbql-card-referencing :venues :name)] - (into {} (-> (public-api/card-and-field-id->values (u/get-id card) (data/id :venues :name)) + :field_id (mt/id :venues :name)} + (mt/with-temp Card [card (mbql-card-referencing :venues :name)] + (into {} (-> (public-api/card-and-field-id->values (u/get-id card) (mt/id :venues :name)) (update :values (partial take 5)))))))) (deftest sql-param-field-references-should-work-just-as-well-as-mbql-field-referenced @@ -741,17 +733,17 @@ ["33 Taps"] ["800 Degrees Neapolitan Pizzeria"] ["BCD Tofu House"]] - :field_id (data/id :venues :name)} - (tt/with-temp Card [card (sql-card-referencing-venue-name)] - (into {} (-> (public-api/card-and-field-id->values (u/get-id card) (data/id :venues :name)) + :field_id (mt/id :venues :name)} + (mt/with-temp Card [card (sql-card-referencing-venue-name)] + (into {} (-> (public-api/card-and-field-id->values (u/get-id card) (mt/id :venues :name)) (update :values (partial take 5)))))))) (deftest but-if-the-field-is-not-referenced-we-should-get-an-exception (is (thrown? Exception - (tt/with-temp Card [card (mbql-card-referencing :venues :price)] - (public-api/card-and-field-id->values (u/get-id card) (data/id :venues :name)))))) + (mt/with-temp Card [card (mbql-card-referencing :venues :price)] + (public-api/card-and-field-id->values (u/get-id card) (mt/id :venues :name)))))) @@ -768,8 +760,8 @@ "/values")) (defn- do-with-sharing-enabled-and-temp-card-referencing {:style/indent 2} [table-kw field-kw f] - (tu/with-temporary-setting-values [enable-public-sharing true] - (tt/with-temp Card [card (merge (shared-obj) (mbql-card-referencing table-kw field-kw))] + (mt/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temp Card [card (merge (shared-obj) (mbql-card-referencing table-kw field-kw))] (f card)))) (defmacro ^:private with-sharing-enabled-and-temp-card-referencing @@ -786,34 +778,34 @@ ["33 Taps"] ["800 Degrees Neapolitan Pizzeria"] ["BCD Tofu House"]] - :field_id (data/id :venues :name)} + :field_id (mt/id :venues :name)} (with-sharing-enabled-and-temp-card-referencing :venues :name [card] - (-> (http/client :get 200 (field-values-url card (data/id :venues :name))) + (-> (http/client :get 200 (field-values-url card (mt/id :venues :name))) (update :values (partial take 5))))))) (deftest but-for-fields-that-are-not-referenced-we-should-get-an-exception (is (= "An error occurred." (with-sharing-enabled-and-temp-card-referencing :venues :name [card] - (http/client :get 400 (field-values-url card (data/id :venues :price))))))) + (http/client :get 400 (field-values-url card (mt/id :venues :price))))))) (deftest field-value-endpoint-should-fail-if-public-sharing-is-disabled (is (= "An error occurred." (with-sharing-enabled-and-temp-card-referencing :venues :name [card] - (tu/with-temporary-setting-values [enable-public-sharing false] - (http/client :get 400 (field-values-url card (data/id :venues :name)))))))) + (mt/with-temporary-setting-values [enable-public-sharing false] + (http/client :get 400 (field-values-url card (mt/id :venues :name)))))))) ;;; ----------------------------- GET /api/public/dashboard/:uuid/field/:field-id/values ----------------------------- (defn do-with-sharing-enabled-and-temp-dashcard-referencing {:style/indent 2} [table-kw field-kw f] - (tu/with-temporary-setting-values [enable-public-sharing true] - (tt/with-temp* [Dashboard [dashboard (shared-obj)] + (mt/with-temporary-setting-values [enable-public-sharing true] + (mt/with-temp* [Dashboard [dashboard (shared-obj)] Card [card (mbql-card-referencing table-kw field-kw)] DashboardCard [dashcard {:dashboard_id (u/get-id dashboard) :card_id (u/get-id card) :parameter_mappings [{:card_id (u/get-id card) :target [:dimension [:field-id - (data/id table-kw field-kw)]]}]}]] + (mt/id table-kw field-kw)]]}]}]] (f dashboard card dashcard)))) (defmacro with-sharing-enabled-and-temp-dashcard-referencing @@ -829,21 +821,21 @@ ["33 Taps"] ["800 Degrees Neapolitan Pizzeria"] ["BCD Tofu House"]] - :field_id (data/id :venues :name)} + :field_id (mt/id :venues :name)} (with-sharing-enabled-and-temp-dashcard-referencing :venues :name [dashboard] - (-> (http/client :get 200 (field-values-url dashboard (data/id :venues :name))) + (-> (http/client :get 200 (field-values-url dashboard (mt/id :venues :name))) (update :values (partial take 5))))))) (deftest shound-not-be-able-to-use-the-endpoint-with-a-field-not-referenced-by-the-dashboard (is (= "An error occurred." (with-sharing-enabled-and-temp-dashcard-referencing :venues :name [dashboard] - (http/client :get 400 (field-values-url dashboard (data/id :venues :price))))))) + (http/client :get 400 (field-values-url dashboard (mt/id :venues :price))))))) (deftest endpoint-should-fail-if-public-sharing-is-disabled (is (= "An error occurred." (with-sharing-enabled-and-temp-dashcard-referencing :venues :name [dashboard] - (tu/with-temporary-setting-values [enable-public-sharing false] - (http/client :get 400 (field-values-url dashboard (data/id :venues :name)))))))) + (mt/with-temporary-setting-values [enable-public-sharing false] + (http/client :get 400 (field-values-url dashboard (mt/id :venues :name)))))))) @@ -853,21 +845,21 @@ (deftest search-card-fields (is (= [[93 "33 Taps"]] (with-sharing-enabled-and-temp-card-referencing :venues :id [card] - (public-api/search-card-fields (u/get-id card) (data/id :venues :id) (data/id :venues :name) "33 T" 10))))) + (public-api/search-card-fields (u/get-id card) (mt/id :venues :id) (mt/id :venues :name) "33 T" 10))))) (deftest shouldn-t-work-if-the-search-field-isn-t-allowed-to-be-used-in-combination-with-the-other-field (is (thrown? Exception (with-sharing-enabled-and-temp-card-referencing :venues :id [card] - (public-api/search-card-fields (u/get-id card) (data/id :venues :id) (data/id :venues :price) "33 T" 10))))) + (public-api/search-card-fields (u/get-id card) (mt/id :venues :id) (mt/id :venues :price) "33 T" 10))))) (deftest shouldn-t-work-if-the-field-isn-t-referenced-by-card (is (thrown? Exception (with-sharing-enabled-and-temp-card-referencing :venues :name [card] - (public-api/search-card-fields (u/get-id card) (data/id :venues :id) (data/id :venues :id) "33 T" 10))))) + (public-api/search-card-fields (u/get-id card) (mt/id :venues :id) (mt/id :venues :id) "33 T" 10))))) @@ -885,20 +877,20 @@ (deftest field-search-with-venue (is (= [[93 "33 Taps"]] (with-sharing-enabled-and-temp-card-referencing :venues :id [card] - (http/client :get 200 (field-search-url card (data/id :venues :id) (data/id :venues :name)) + (http/client :get 200 (field-search-url card (mt/id :venues :id) (mt/id :venues :name)) :value "33 T"))))) (deftest if-search-field-isn-t-allowed-to-be-used-with-the-other-field-endpoint-should-return-exception (is (= "An error occurred." (with-sharing-enabled-and-temp-card-referencing :venues :id [card] - (http/client :get 400 (field-search-url card (data/id :venues :id) (data/id :venues :price)) + (http/client :get 400 (field-search-url card (mt/id :venues :id) (mt/id :venues :price)) :value "33 T"))))) (deftest search-endpoint-should-fail-if-public-sharing-is-disabled (is (= "An error occurred." (with-sharing-enabled-and-temp-card-referencing :venues :id [card] - (tu/with-temporary-setting-values [enable-public-sharing false] - (http/client :get 400 (field-search-url card (data/id :venues :id) (data/id :venues :name)) + (mt/with-temporary-setting-values [enable-public-sharing false] + (http/client :get 400 (field-search-url card (mt/id :venues :id) (mt/id :venues :name)) :value "33 T")))))) @@ -909,20 +901,20 @@ (deftest dashboard (is (= [[93 "33 Taps"]] (with-sharing-enabled-and-temp-dashcard-referencing :venues :id [dashboard] - (http/client :get (field-search-url dashboard (data/id :venues :id) (data/id :venues :name)) + (http/client :get (field-search-url dashboard (mt/id :venues :id) (mt/id :venues :name)) :value "33 T"))))) (deftest dashboard-if-search-field-isn-t-allowed-to-be-used-with-the-other-field-endpoint-should-return-exception (is (= "An error occurred." (with-sharing-enabled-and-temp-dashcard-referencing :venues :id [dashboard] - (http/client :get 400 (field-search-url dashboard (data/id :venues :id) (data/id :venues :price)) + (http/client :get 400 (field-search-url dashboard (mt/id :venues :id) (mt/id :venues :price)) :value "33 T"))))) (deftest dashboard-endpoint-should-fail-if-public-sharing-is-disabled (is (= "An error occurred." (with-sharing-enabled-and-temp-dashcard-referencing :venues :id [dashboard] - (tu/with-temporary-setting-values [enable-public-sharing false] - (http/client :get 400 (field-search-url dashboard (data/id :venues :name) (data/id :venues :name)) + (mt/with-temporary-setting-values [enable-public-sharing false] + (http/client :get 400 (field-search-url dashboard (mt/id :venues :name) (mt/id :venues :name)) :value "33 T")))))) ;;; --------------------------------------------- field-remapped-values ---------------------------------------------- @@ -932,11 +924,11 @@ (deftest should-parse-string (is (= [10 "Fred 62"] - (#'public-api/field-remapped-values (data/id :venues :id) (data/id :venues :name) "10")))) + (#'public-api/field-remapped-values (mt/id :venues :id) (mt/id :venues :name) "10")))) (deftest if-the-field-isn-t-allowed (is (thrown? Exception - (#'public-api/field-remapped-values (data/id :venues :id) (data/id :venues :price) "10")))) + (#'public-api/field-remapped-values (mt/id :venues :id) (mt/id :venues :price) "10")))) ;;; ----------------------- GET /api/public/card/:uuid/field/:field-id/remapping/:remapped-id ------------------------ @@ -953,27 +945,27 @@ (deftest we-should-be-able-to-use-the-api-endpoint-and-get-the-same-results-we-get-by-calling-the-function-above-directly (is (= [10 "Fred 62"] (with-sharing-enabled-and-temp-card-referencing :venues :id [card] - (http/client :get 200 (field-remapping-url card (data/id :venues :id) (data/id :venues :name)) + (http/client :get 200 (field-remapping-url card (mt/id :venues :id) (mt/id :venues :name)) :value "10"))))) (deftest shouldn-t-work-if-card-doesn-t-reference-the-field-in-question (is (= "An error occurred." (with-sharing-enabled-and-temp-card-referencing :venues :price [card] - (http/client :get 400 (field-remapping-url card (data/id :venues :id) (data/id :venues :name)) + (http/client :get 400 (field-remapping-url card (mt/id :venues :id) (mt/id :venues :name)) :value "10"))))) (deftest ---or-if-the-remapping-field-isn-t-allowed-to-be-used-with-the-other-field (is (= "An error occurred." (with-sharing-enabled-and-temp-card-referencing :venues :id [card] - (http/client :get 400 (field-remapping-url card (data/id :venues :id) (data/id :venues :price)) + (http/client :get 400 (field-remapping-url card (mt/id :venues :id) (mt/id :venues :price)) :value "10"))))) (deftest ---or-if-public-sharing-is-disabled (is (= "An error occurred." (with-sharing-enabled-and-temp-card-referencing :venues :id [card] - (tu/with-temporary-setting-values [enable-public-sharing false] - (http/client :get 400 (field-remapping-url card (data/id :venues :id) (data/id :venues :name)) + (mt/with-temporary-setting-values [enable-public-sharing false] + (http/client :get 400 (field-remapping-url card (mt/id :venues :id) (mt/id :venues :name)) :value "10")))))) ;;; --------------------- GET /api/public/dashboard/:uuid/field/:field-id/remapping/:remapped-id --------------------- @@ -982,24 +974,24 @@ (deftest api-endpoint-should-return-same-results-as-function (is (= [10 "Fred 62"] (with-sharing-enabled-and-temp-dashcard-referencing :venues :id [dashboard] - (http/client :get 200 (field-remapping-url dashboard (data/id :venues :id) (data/id :venues :name)) + (http/client :get 200 (field-remapping-url dashboard (mt/id :venues :id) (mt/id :venues :name)) :value "10"))))) (deftest field-remapping-shouldn-t-work-if-card-doesn-t-reference-the-field-in-question (is (= "An error occurred." (with-sharing-enabled-and-temp-dashcard-referencing :venues :price [dashboard] - (http/client :get 400 (field-remapping-url dashboard (data/id :venues :id) (data/id :venues :name)) + (http/client :get 400 (field-remapping-url dashboard (mt/id :venues :id) (mt/id :venues :name)) :value "10"))))) (deftest remapping-or-if-the-remapping-field-isn-t-allowed-to-be-used-with-the-other-field (is (= "An error occurred." (with-sharing-enabled-and-temp-dashcard-referencing :venues :id [dashboard] - (http/client :get 400 (field-remapping-url dashboard (data/id :venues :id) (data/id :venues :price)) + (http/client :get 400 (field-remapping-url dashboard (mt/id :venues :id) (mt/id :venues :price)) :value "10"))))) (deftest remapping-or-if-public-sharing-is-disabled (is (= "An error occurred." (with-sharing-enabled-and-temp-dashcard-referencing :venues :id [dashboard] - (tu/with-temporary-setting-values [enable-public-sharing false] - (http/client :get 400 (field-remapping-url dashboard (data/id :venues :id) (data/id :venues :name)) + (mt/with-temporary-setting-values [enable-public-sharing false] + (http/client :get 400 (field-remapping-url dashboard (mt/id :venues :id) (mt/id :venues :name)) :value "10")))))) diff --git a/test/metabase/api/pulse_test.clj b/test/metabase/api/pulse_test.clj index bdccb94bddd..8a81833c5ca 100644 --- a/test/metabase/api/pulse_test.clj +++ b/test/metabase/api/pulse_test.clj @@ -203,36 +203,36 @@ Card [card-2 {:name "The card" :description "Info" :display :table}]] - (merge - pulse-defaults - {:name "A Pulse" - :creator_id (user->id :rasta) - :creator (user-details (fetch-user :rasta)) - :cards (for [card [card-1 card-2]] - (assoc (pulse-card-details card) - :collection_id true)) - :channels [(merge pulse-channel-defaults - {:channel_type "email" - :schedule_type "daily" - :schedule_hour 12 - :recipients []})] - :collection_id true}) - (card-api-test/with-cards-in-readable-collection [card-1 card-2] - (tt/with-temp Collection [collection] - (perms/grant-collection-readwrite-permissions! (perms-group/all-users) collection) - (tu/with-model-cleanup [Pulse] - (-> ((user->client :rasta) :post 200 "pulse" {:name "A Pulse" - :collection_id (u/get-id collection) - :cards [{:id (u/get-id card-1) - :include_csv false - :include_xls false} - (-> card-2 - (select-keys [:id :name :description :display :collection_id]) - (assoc :include_csv false, :include_xls false))] - :channels [daily-email-channel] - :skip_if_empty false}) - pulse-response - (update :channels remove-extra-channels-fields)))))) + (merge + pulse-defaults + {:name "A Pulse" + :creator_id (user->id :rasta) + :creator (user-details (fetch-user :rasta)) + :cards (for [card [card-1 card-2]] + (assoc (pulse-card-details card) + :collection_id true)) + :channels [(merge pulse-channel-defaults + {:channel_type "email" + :schedule_type "daily" + :schedule_hour 12 + :recipients []})] + :collection_id true}) + (card-api-test/with-cards-in-readable-collection [card-1 card-2] + (tt/with-temp Collection [collection] + (perms/grant-collection-readwrite-permissions! (perms-group/all-users) collection) + (tu/with-model-cleanup [Pulse] + (-> ((user->client :rasta) :post 200 "pulse" {:name "A Pulse" + :collection_id (u/get-id collection) + :cards [{:id (u/get-id card-1) + :include_csv false + :include_xls false} + (-> card-2 + (select-keys [:id :name :description :display :collection_id]) + (assoc :include_csv false, :include_xls false))] + :channels [daily-email-channel] + :skip_if_empty false}) + pulse-response + (update :channels remove-extra-channels-fields)))))) ;; Create a pulse with a csv and xls (tt/expect-with-temp [Card [card-1] diff --git a/test/metabase/async/streaming_response_test.clj b/test/metabase/async/streaming_response_test.clj new file mode 100644 index 00000000000..88e653de750 --- /dev/null +++ b/test/metabase/async/streaming_response_test.clj @@ -0,0 +1,123 @@ +(ns metabase.async.streaming-response-test + (:require [clj-http.client :as http] + [clojure.core.async :as a] + [clojure.test :refer :all] + [metabase + [config :as config] + [driver :as driver] + [http-client :as test-client] + [models :refer [Database]] + [test :as mt]] + [metabase.async.streaming-response :as streaming-response] + [metabase.async.streaming-response.thread-pool :as thread-pool] + [metabase.query-processor.context :as context]) + (:import java.util.concurrent.Executors + org.apache.commons.lang3.concurrent.BasicThreadFactory$Builder)) + +(driver/register! ::test-driver) + +(def ^:private canceled? (atom false)) + +(def ^:private thread-pool-size 5) + +(defn- do-with-streaming-response-thread-pool [thunk] + (let [pool (Executors/newFixedThreadPool thread-pool-size + (.build + (doto (BasicThreadFactory$Builder.) + (.namingPattern "streaming-response-test-thread-pool-%d") + ;; Daemon threads do not block shutdown of the JVM + (.daemon true))))] + (with-redefs [thread-pool/thread-pool (constantly pool)] + (try + (thunk) + (finally + (.shutdownNow pool)))))) + +(defmacro ^:private with-streaming-response-thread-pool {:style/indent 0} [& body] + `(do-with-streaming-response-thread-pool (fn [] ~@body))) + +(defmacro ^:private with-test-driver-db {:style/indent 0} [& body] + `(mt/with-temp Database [db# {:engine ::test-driver}] + (mt/with-db db# + (with-streaming-response-thread-pool + ~@body)))) + +(defmethod driver/execute-reducible-query ::test-driver + [_ {{{:keys [sleep]} :query} :native, database-id :database} context respond] + {:pre [(integer? sleep) (integer? database-id)]} + (let [futur (future + (Thread/sleep sleep) + (respond {:cols [{:name "Sleep", :base_type :type/Integer}]} [[sleep]]))] + (a/go + (when (a/<! (context/canceled-chan context)) + (reset! canceled? true) + (future-cancel futur))))) + +(defmethod driver/connection-properties ::test-driver + [& _] + []) + +(deftest basic-test + (testing "Make sure our ::test-driver is working as expected" + (with-test-driver-db + (is (= [[10]] + (mt/rows + ((mt/user->client :lucky) + :post 202 "dataset" + {:database (mt/id) + :type "native" + :native {:query {:sleep 10}}}))))))) + +(deftest truly-async-test + (testing "StreamingResponses should truly be asynchronous, and not block Jetty threads while waiting for results" + (with-test-driver-db + (let [max-threads (or (config/config-int :mb-jetty-maxthreads) 50) + num-requests (+ max-threads 20) + remaining (atom num-requests) + session-token (test-client/authenticate (mt/user->credentials :lucky)) + url (test-client/build-url "dataset" nil) + request (test-client/build-request-map session-token + {:database (mt/id) + :type "native" + :native {:query {:sleep 2000}}})] + (testing (format "%d simultaneous queries" num-requests) + (dotimes [_ num-requests] + (future (http/post url request))) + (Thread/sleep 100) + (let [start-time-ms (System/currentTimeMillis)] + (is (= {:status "ok"} (test-client/client :get 200 "health"))) + (testing "Health endpoint should complete before the first round of queries completes" + (is (> @remaining (inc (- num-requests thread-pool-size))))) + (testing "Health endpoint should complete in under 100ms regardless of how many queries are running" + (let [elapsed-ms (- (System/currentTimeMillis) start-time-ms)] + (is (< elapsed-ms 100)))))))))) + +(deftest newlines-test + (testing "Keepalive newlines should be written while waiting for a response." + (with-redefs [streaming-response/keepalive-interval-ms 50] + (with-test-driver-db + (is (re= #"(?s)^\n{3,}\{\"data\":.*$" + (:body (http/post (test-client/build-url "dataset" nil) + (test-client/build-request-map (mt/user->credentials :lucky) + {:database (mt/id) + :type "native" + :native {:query {:sleep 300}}}))))))))) + +(deftest cancelation-test + (testing "Make sure canceling a HTTP request ultimately causes the query to be canceled" + (with-redefs [streaming-response/keepalive-interval-ms 50] + (with-test-driver-db + (reset! canceled? false) + (let [futur (http/post (test-client/build-url "dataset" nil) + (assoc (test-client/build-request-map (mt/user->credentials :lucky) + {:database (mt/id) + :type "native" + :native {:query {:sleep 5000}}}) + :async true) + identity + (fn [e] (throw e)))] + (Thread/sleep 100) + (future-cancel futur) + (Thread/sleep 100) + (is (= true + @canceled?))))))) diff --git a/test/metabase/http_client.clj b/test/metabase/http_client.clj index 4c82bf5c854..e686196a0a9 100644 --- a/test/metabase/http_client.clj +++ b/test/metabase/http_client.clj @@ -100,7 +100,7 @@ ;;; client -(defn- build-request-map [credentials http-body] +(defn build-request-map [credentials http-body] (merge {:accept :json :headers {@#'mw.session/metabase-session-header diff --git a/test/metabase/query_processor/middleware/async_wait_test.clj b/test/metabase/query_processor/middleware/async_wait_test.clj deleted file mode 100644 index f688df9bc18..00000000000 --- a/test/metabase/query_processor/middleware/async_wait_test.clj +++ /dev/null @@ -1,55 +0,0 @@ -(ns metabase.query-processor.middleware.async-wait-test - (:require [clojure.core.async :as a] - [clojure.test :refer :all] - [metabase - [test :as mt] - [util :as u]] - [metabase.models.database :refer [Database]] - [metabase.query-processor.middleware.async-wait :as async-wait] - [metabase.test.util.async :as tu.async] - [toucan.util.test :as tt]) - (:import java.util.concurrent.Executors - org.apache.commons.lang3.concurrent.BasicThreadFactory$Builder)) - -(def ^:private ^:dynamic *dynamic-var* false) - -(defn- async-wait-bound-value - "Check the bound value of `*dynamic-var*` when a function is executed after the `async-wait` using the thread pool for - Database with `db-id`." - [db-id] - (let [bound-value (promise)] - (tu.async/with-open-channels [canceled-chan (a/promise-chan)] - (mt/test-qp-middleware - async-wait/wait-for-turn - {:database db-id} - {} - [] - {:chans {:canceled-chan canceled-chan} - :run (fn [] - (deliver bound-value *dynamic-var*))})) - (u/deref-with-timeout bound-value 1000))) - -(deftest sanity-check-test - (testing "basic sanity check: bound value of `*dynamic-var*` should be `false`" - (tt/with-temp Database [{db-id :id}] - (= false - (async-wait-bound-value db-id))))) - -(deftest bindings-test - (testing "bound dynamic vars should get re-bound by in the async wait fn" - (tt/with-temp Database [{db-id :id}] - (binding [*dynamic-var* ::wow] - (is (= ::wow - (async-wait-bound-value db-id)))))) - - (testing "binding should not be persisted between executions -- should be reset when we reuse a thread" - (let [thread-pool (Executors/newSingleThreadExecutor - (.build - (doto (BasicThreadFactory$Builder.) - (.daemon true))))] - (with-redefs [async-wait/db-thread-pool (constantly thread-pool)] - (tt/with-temp Database [{db-id :id}] - (binding [*dynamic-var* true] - (async-wait-bound-value db-id)) - (is (= false - (async-wait-bound-value db-id)))))))) diff --git a/test/metabase/query_processor/middleware/process_userland_query_test.clj b/test/metabase/query_processor/middleware/process_userland_query_test.clj index f3d155aac9d..2ca038893f5 100644 --- a/test/metabase/query_processor/middleware/process_userland_query_test.clj +++ b/test/metabase/query_processor/middleware/process_userland_query_test.clj @@ -10,7 +10,7 @@ (defn- do-with-query-execution [query run] (mt/with-open-channels [save-chan (a/promise-chan)] - (with-redefs [process-userland-query/save-query-execution! (partial a/>!! save-chan)] + (with-redefs [process-userland-query/save-query-execution!* (partial a/>!! save-chan)] (run (fn qe-result* [] (let [qe (mt/wait-for-result save-chan)] @@ -86,7 +86,7 @@ :running_time true :dashboard_id nil} (qe)) - "Result should have query execution info. empty `:data` should get added to failures")))) + "QueryExecution saved in the DB should have query execution info. empty `:data` should get added to failures")))) (defn- async-middleware [qp] (fn async-middleware-qp [query rff context] @@ -99,7 +99,7 @@ (deftest cancel-test (let [saved-query-execution? (atom false)] - (with-redefs [process-userland-query/save-query-execution-async! (fn [_] (reset! saved-query-execution? true))] + (with-redefs [process-userland-query/save-query-execution! (fn [_] (reset! saved-query-execution? true))] (mt/with-open-channels [canceled-chan (a/promise-chan)] (future (let [out-chan (mt/test-qp-middleware [process-userland-query/process-userland-query async-middleware] diff --git a/test/metabase/query_processor/streaming_test.clj b/test/metabase/query_processor/streaming_test.clj index 29adb1735d6..29842234bb8 100644 --- a/test/metabase/query_processor/streaming_test.clj +++ b/test/metabase/query_processor/streaming_test.clj @@ -54,11 +54,10 @@ (with-redefs [streaming-response/keepalive-interval-ms 2] (with-open [bos (ByteArrayOutputStream.) os (BufferedOutputStream. bos)] - (ring.protocols/write-body-to-stream - (qp.streaming/streaming-response [context export-format] - (qp/process-query-async query (assoc context :timeout 5000))) - nil - os) + (let [streaming-response (qp.streaming/streaming-response [context export-format] + (qp/process-query-async query (assoc context :timeout 5000)))] + (ring.protocols/write-body-to-stream streaming-response nil os) + (mt/wait-for-result (streaming-response/finished-chan streaming-response) 1000)) (.flush os) (.flush bos) (let [bytea (.toByteArray bos)] diff --git a/test/metabase/query_processor_test.clj b/test/metabase/query_processor_test.clj index ba5fce1968f..c928d4b603d 100644 --- a/test/metabase/query_processor_test.clj +++ b/test/metabase/query_processor_test.clj @@ -298,8 +298,7 @@ "Return the result `data` from a successful query run, or throw an Exception if processing failed." {:style/indent 0} [results] - (when (= (:status results) :failed) - (println "Error running query:" (u/pprint-to-str 'red results)) + (when (#{:failed "failed"} (:status results)) (throw (ex-info (str (or (:error results) "Error running query")) (if (map? results) results {:results results})))) (:data results)) diff --git a/test/metabase/query_processor_test/query_to_native_test.clj b/test/metabase/query_processor_test/query_to_native_test.clj index a72278f9bfa..345ade43820 100644 --- a/test/metabase/query_processor_test/query_to_native_test.clj +++ b/test/metabase/query_processor_test/query_to_native_test.clj @@ -6,8 +6,6 @@ [test :as mt]] [metabase.api.common :as api] [metabase.models.permissions :as perms] - [metabase.query-processor.middleware.async-wait :as async-wait] - [metabase.test.util :as tu] [schema.core :as s])) (deftest query->native-test @@ -34,14 +32,6 @@ :template-tags {"price" {:name "price", :display-name "Price", :type :number, :required false}}} :parameters [{:type "category", :target [:variable [:template-tag "price"]], :value "3"}]}))))) -(deftest no-async-wait-test - (testing "`query->native` should not be subject to async waiting") - (is (= :ok - (tu/throw-if-called async-wait/run-in-thread-pool - (qp/query->native (mt/mbql-query venues)) - :ok)))) - - ;; If user permissions are bound, we should do permissions checking when you call `query->native`; you should need ;; native query execution permissions for the DB in question plus the perms needed for the original query in order to ;; use `query->native` diff --git a/test/metabase/server_test.clj b/test/metabase/server_test.clj index ba5a9b44450..08207cdae2a 100644 --- a/test/metabase/server_test.clj +++ b/test/metabase/server_test.clj @@ -1,23 +1,23 @@ (ns metabase.server-test - (:require [expectations :refer [expect]] + (:require [clojure.test :refer :all] [metabase [config :as config] [server :as server]])) -;; Make sure our Jetty config functions work as expected/we don't accidentally break things (#9333) -(expect - {:keystore "10" - :max-queued 10 - :port 10 - :min-threads 10 - :host "10" - :daemon? false - :ssl? true - :trust-password "10" - :key-password "10" - :truststore "10" - :max-threads 10 - :max-idle-time 10 - :ssl-port 10} - (with-redefs [config/config-str (constantly "10")] - (#'server/jetty-config))) +(deftest config-test + (testing "Make sure our Jetty config functions work as expected/we don't accidentally break things (#9333)" + (with-redefs [config/config-str (constantly "10")] + (is (= {:keystore "10" + :max-queued 10 + :port 10 + :min-threads 10 + :host "10" + :daemon? false + :ssl? true + :trust-password "10" + :key-password "10" + :truststore "10" + :max-threads 10 + :max-idle-time 10 + :ssl-port 10} + (#'server/jetty-config)))))) diff --git a/test/metabase/test.clj b/test/metabase/test.clj index 8139c653c5c..47f38f9dfaf 100644 --- a/test/metabase/test.clj +++ b/test/metabase/test.clj @@ -109,6 +109,7 @@ [test-users user->id user->client + user->credentials with-test-user] [tt diff --git a/test/metabase/test/redefs.clj b/test/metabase/test/redefs.clj index f7512c0eb64..cbafe460f52 100644 --- a/test/metabase/test/redefs.clj +++ b/test/metabase/test/redefs.clj @@ -1,6 +1,5 @@ (ns metabase.test.redefs - (:require [clojure.test :as t] - [metabase.plugins.classloader :as classloader] + (:require [metabase.plugins.classloader :as classloader] [toucan.util.test :as tt])) ;; wrap `do-with-temp` so it initializes the DB before doing the other stuff it usually does @@ -15,15 +14,3 @@ ;; mark `expect-with-temp` as deprecated -- it's not needed for `deftest`-style tests (alter-meta! #'tt/expect-with-temp assoc :deprecated true) - -;; TODO - not a good long-term place to put this. -(defmethod t/assert-expr 're= [msg [_ pattern s]] - `(let [pattern# ~pattern - s# ~s - matches?# (when s# - (re-matches pattern# s#))] - (t/do-report - {:type (if matches?# :pass :fail) - :message ~msg - :expected pattern# - :actual s#}))) diff --git a/test/metabase/test/util.clj b/test/metabase/test/util.clj index 336b8ff4799..1230d007fdc 100644 --- a/test/metabase/test/util.clj +++ b/test/metabase/test/util.clj @@ -31,19 +31,31 @@ org.apache.log4j.Logger [org.quartz CronTrigger JobDetail JobKey Scheduler Trigger])) +(defmethod assert-expr 're= [msg [_ pattern actual]] + `(let [pattern# ~pattern + actual# ~actual + matches?# (some->> actual# (re-matches pattern#))] + (assert (instance? java.util.regex.Pattern pattern#)) + (do-report + {:type (if matches?# :pass :fail) + :message ~msg + :expected pattern# + :actual actual# + :diffs (when-not matches?# + [[actual# [pattern# nil]]])}))) + (defmethod assert-expr 'schema= - [message form] - (let [[_ schema actual] form] - `(let [schema# ~schema - actual# ~actual - pass?# (nil? (s/check schema# actual#))] - (do-report - {:type (if pass?# :pass :fail) - :message ~message - :expected (s/explain schema#) - :actual actual# - :diffs (when-not pass?# - [[actual# [(s/check schema# actual#) nil]]])})))) + [message [_ schema actual]] + `(let [schema# ~schema + actual# ~actual + pass?# (nil? (s/check schema# actual#))] + (do-report + {:type (if pass?# :pass :fail) + :message ~message + :expected (s/explain schema#) + :actual actual# + :diffs (when-not pass?# + [[actual# [(s/check schema# actual#) nil]]])}))) (defmacro ^:deprecated expect-schema "Like `expect`, but checks that results match a schema. DEPRECATED -- you can use `deftest` combined with `schema=` -- GitLab