Skip to content
Snippets Groups Projects
Unverified Commit e357b238 authored by lbrdnk's avatar lbrdnk Committed by GitHub
Browse files

Use sessions in mongo aggregation pipelines (#35680)

* Use mongo sessions in aggregation pipelines

Using sessions, cancel signal from streaming response can be handled
correctly - query is killed also on database side.

* Update tests

* Let the middleware handle the exception

While coding the other test, I've found out that if exception is handled
in the `reducible-results`, code clogs for 3 minutes,
which is `.maxTime` for aggregation. I don't know the exact reason yet,
but for time being I'm leaving the exception handling to middleware.

* Add testing string

* Encode aggregation results as `BasicDBObject`

Use this type instead of BSON's Document because code transforming
DBObject to clojure structures is already in place.

* Update exceptions handling and tests

Exception handling now more resembles code in other drivers. Tests were updated
to use mbql query instead of raw native to avoid differences between various
mongodb versions. `$dateTrunc` was not available in version 4.

* Increase cancelation interval in tests

* Address remarks and update test to handle cold start

By cold start I mean running it before dataset is initialized.

* Remove unnecessary hint
parent 95e49461
No related branches found
No related tags found
No related merge requests found
......@@ -4,7 +4,7 @@
[clojure.set :as set]
[clojure.string :as str]
[metabase.driver.mongo.query-processor :as mongo.qp]
[metabase.driver.mongo.util :refer [*mongo-connection*]]
[metabase.driver.mongo.util :as mongo.util]
[metabase.query-processor.context :as qp.context]
[metabase.query-processor.error-type :as qp.error-type]
[metabase.query-processor.reducible :as qp.reducible]
......@@ -14,7 +14,8 @@
[monger.conversion :as m.conversion]
[monger.util :as m.util])
(:import
(com.mongodb AggregationOptions AggregationOptions$OutputMode BasicDBObject Cursor DB DBObject)
(com.mongodb BasicDBObject DB DBObject)
(com.mongodb.client AggregateIterable ClientSession MongoDatabase MongoCursor)
(java.util.concurrent TimeUnit)
(org.bson BsonBoolean BsonInt32)))
......@@ -129,24 +130,27 @@
(when row
(.keySet row)))
(defn- aggregation-options ^AggregationOptions [timeout-ms]
;; see https://mongodb.github.io/mongo-java-driver/3.7/javadoc/com/mongodb/AggregationOptions.Builder.html
(.build (doto (AggregationOptions/builder)
(.allowDiskUse true)
(.outputMode AggregationOptions$OutputMode/CURSOR)
;; TODO - consider what the best batch size option is here. Not sure what the default is.
(.batchSize (int 100))
(.maxTime (int timeout-ms) TimeUnit/MILLISECONDS))))
;; See https://mongodb.github.io/mongo-java-driver/3.12/javadoc/com/mongodb/client/AggregateIterable.html
(defn- init-aggregate!
[^AggregateIterable aggregate
^java.lang.Long timeout-ms]
(doto aggregate
(.allowDiskUse true)
;; TODO - consider what the best batch size option is here. Not sure what the default is.
(.batchSize 100)
(.maxTime timeout-ms TimeUnit/MILLISECONDS)))
(defn- ^:dynamic *aggregate*
"Execute a MongoDB aggregation query."
^Cursor [^DB db ^String coll stages timeout-ms]
(let [coll (.getCollection db coll)
agg-opts (aggregation-options timeout-ms)
pipe (m.util/into-array-list (m.conversion/to-db-object stages))]
(.aggregate coll pipe agg-opts)))
(defn- reducible-rows [context ^Cursor cursor first-row post-process]
[^MongoDatabase db
^String coll
^ClientSession session
stages timeout-ms]
(let [coll (.getCollection db coll)
pipe (m.util/into-array-list (m.conversion/to-db-object stages))
aggregate (.aggregate coll session pipe BasicDBObject)]
(init-aggregate! aggregate timeout-ms)))
(defn- reducible-rows [context ^MongoCursor cursor first-row post-process]
{:pre [(fn? post-process)]}
(let [has-returned-first-row? (volatile! false)]
(letfn [(first-row-thunk []
......@@ -161,30 +165,57 @@
(remaining-rows-thunk)))]
(qp.reducible/reducible-rows row-thunk (qp.context/canceled-chan context)))))
(defn- reduce-results [native-query query context ^Cursor cursor respond]
(try
(let [first-row (when (.hasNext cursor)
(.next cursor))
{row-col-names :row
unescaped-col-names :unescaped} (result-col-names native-query query (row-keys first-row))]
(log/tracef "Renaming columns in results %s -> %s" (pr-str row-col-names) (pr-str unescaped-col-names))
(respond (result-metadata unescaped-col-names)
(if-not first-row
[]
(reducible-rows context cursor first-row (post-process-row row-col-names)))))
(finally
(.close cursor))))
(defn- reduce-results [native-query query context ^MongoCursor cursor respond]
(let [first-row (when (.hasNext cursor)
(.next cursor))
{row-col-names :row
unescaped-col-names :unescaped} (result-col-names native-query query (row-keys first-row))]
(log/tracef "Renaming columns in results %s -> %s" (pr-str row-col-names) (pr-str unescaped-col-names))
(respond (result-metadata unescaped-col-names)
(if-not first-row
[]
(reducible-rows context cursor first-row (post-process-row row-col-names))))))
(defn- connection->database
^MongoDatabase
[^DB connection]
(let [db-name (.getName connection)]
(.. connection getMongoClient (getDatabase db-name))))
(defn- start-session!
^ClientSession
[^DB connection]
(.. connection getMongoClient startSession))
(defn- kill-session!
[^MongoDatabase db
^ClientSession session]
(let [session-id (.. session getServerSession getIdentifier)
kill-cmd (BasicDBObject. "killSessions" [session-id])]
(.runCommand db kill-cmd)))
(defn execute-reducible-query
"Process and run a native MongoDB query."
[{{:keys [collection query], :as native-query} :native} context respond]
{:pre [(string? collection) (fn? respond)]}
[{{query :query collection-name :collection :as native-query} :native} context respond]
{:pre [(string? collection-name) (fn? respond)]}
(let [query (cond-> query
(string? query) mongo.qp/parse-query-string)
cursor (*aggregate* *mongo-connection* collection query (qp.context/timeout context))]
(a/go
(when (a/<! (qp.context/canceled-chan context))
;; Eastwood seems to get confused here and not realize there's already a tag on `cursor` (returned by
;; `aggregate`)
(.close ^Cursor cursor)))
(reduce-results native-query query context cursor respond)))
client-database (connection->database mongo.util/*mongo-connection*)]
(with-open [session ^ClientSession (start-session! mongo.util/*mongo-connection*)]
(a/go
(when (a/<! (qp.context/canceled-chan context))
(kill-session! client-database session)))
(let [aggregate ^AggregateIterable (*aggregate* client-database
collection-name
session
query
(qp.context/timeout context))]
(with-open [^MongoCursor cursor (try (.cursor aggregate)
(catch Throwable e
(throw (ex-info (tru "Error executing query: {0}" (ex-message e))
{:driver :mongo
:native native-query
:type qp.error-type/invalid-query}
e))))]
(reduce-results native-query query context cursor respond))))))
(ns metabase.driver.mongo.execute-test
(:require
[clojure.core.async :as a]
[clojure.test :refer :all]
[metabase.async.streaming-response :as streaming-response]
[metabase.driver.mongo.execute :as mongo.execute]
[metabase.query-processor :as qp]
[metabase.query-processor.context :as qp.context]
[metabase.test :as mt])
(:import
(com.mongodb BasicDBObject)
(java.util NoSuchElementException)))
(set! *warn-on-reflection* true)
(defn- make-mongo-cursor [rows]
(let [counter (volatile! 0)]
(reify com.mongodb.Cursor
(reify com.mongodb.client.MongoCursor
(hasNext [_] (< @counter (count rows)))
(next [_] (let [i @counter]
(vswap! counter inc)
(if (< i (count rows))
(org.bson.BasicBSONObject. ^java.util.Map (get rows i))
(throw (NoSuchElementException. (str "no element at " i)))))))))
(BasicDBObject. ^java.util.Map (get rows i))
(throw (NoSuchElementException. (str "no element at " i))))))
(close [_]))))
(defn- make-mongo-aggregate-iterable [rows]
(reify com.mongodb.client.AggregateIterable
(cursor [_] (make-mongo-cursor rows))))
(deftest ^:parallel field-filter-relative-time-native-test
(mt/test-driver :mongo
(let [now (str (java.time.Instant/now))]
(binding [mongo.execute/*aggregate*
(fn [& _] (make-mongo-cursor [{"_id" 0
"name" "Crowberto"
"alias" "the Brave"}
{"_id" 1
"name" "Rasta"
"last_login" now
"nickname" "Blue"}]))]
(fn [& _] (make-mongo-aggregate-iterable
[{"_id" 0
"name" "Crowberto"
"alias" "the Brave"}
{"_id" 1
"name" "Rasta"
"last_login" now
"nickname" "Blue"}]))]
(testing "Projected and first-row fields are returned"
(let [query {:database (mt/id)
:native
......@@ -53,3 +63,30 @@
[1 "Rasta" now nil]]
:columns ["_id" "name" "last_login" "alias"]}
(mt/rows+column-names (qp/process-query query))))))))))
(deftest kill-an-in-flight-query-test
(mt/test-driver
:mongo
(mt/dataset
sample-dataset
;; Dummy query execution here. If the dataset was not initialized before running this test, the timing gets out of
;; sync and test fails. I suspect dataset initialization happens after (or while) the future is executed.
;; To overcome that next line is executed - and dataset initialization forced - before the test code runs.
(mt/run-mbql-query people {:limit 10})
(let [canceled-chan (a/chan)]
(with-redefs [qp.context/canceled-chan (constantly canceled-chan)]
(let [query (mt/mbql-query orders
{:aggregation [[:sum $total]],
:breakout [!month.created_at],
:order-by [[:asc !month.created_at]],
:joins [{:alias "People_User",
:strategy :left-join,
:condition
[:!= $user_id &People_User.people.id],
:source-table $$people}]})]
(future (Thread/sleep 500)
(a/>!! canceled-chan ::streaming-response/request-canceled))
(testing "Cancel signal kills the in progress query"
(is (thrown-with-msg? Throwable
#"Command failed with error 11601.*operation was interrupted"
(qp/process-query query))))))))))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment