diff --git a/.clj-kondo/config.edn b/.clj-kondo/config.edn
index 2d64cd7b4826374b3375cf4e26acff042d1ae577..9daf44e271387929a92e89dbe29314a0387453d7 100644
--- a/.clj-kondo/config.edn
+++ b/.clj-kondo/config.edn
@@ -432,7 +432,8 @@
   metabase.db.schema-migrations-test.impl/with-temp-empty-app-db                       clojure.core/let
   metabase.domain-entities.malli/defn                                                  schema.core/defn
   metabase.driver.mongo.query-processor/mongo-let                                      clojure.core/let
-  metabase.driver.mongo.util/with-mongo-connection                                     clojure.core/let
+  metabase.driver.mongo.connection/with-mongo-client                                   clojure.core/let
+  metabase.driver.mongo.connection/with-mongo-database                                 clojure.core/let
   metabase.driver.sql-jdbc.actions/with-jdbc-transaction                               clojure.core/let
   metabase.driver.sql-jdbc.connection/with-connection-spec-for-testing-connection      clojure.core/let
   metabase.driver.sql-jdbc.execute.diagnostic/capturing-diagnostic-info                clojure.core/fn
diff --git a/modules/drivers/mongo/deps.edn b/modules/drivers/mongo/deps.edn
index 1b494f6cc0b6fd83f10d960cb34e0756d1dc5cb5..452588e1ecfe804cf0aaf26d60fbacc44dc8844d 100644
--- a/modules/drivers/mongo/deps.edn
+++ b/modules/drivers/mongo/deps.edn
@@ -3,5 +3,4 @@
 
  :deps
  {com.google.guava/guava {:mvn/version "32.1.3-jre"}
-  com.novemberain/monger {:mvn/version "3.6.0"
-                          :exclusions [com.google.guava/guava]}}}
+  org.mongodb/mongodb-driver-sync {:mvn/version "4.11.1"}}}
diff --git a/modules/drivers/mongo/src/metabase/driver/mongo.clj b/modules/drivers/mongo/src/metabase/driver/mongo.clj
index 443502f155a54da70d6564bdb701f6b7fc0711f1..3620223fac29118192cfd98e2c4091076dd92ebd 100644
--- a/modules/drivers/mongo/src/metabase/driver/mongo.clj
+++ b/modules/drivers/mongo/src/metabase/driver/mongo.clj
@@ -5,37 +5,30 @@
    [cheshire.generate :as json.generate]
    [clojure.string :as str]
    [flatland.ordered.map :as ordered-map]
-   [java-time.api :as t]
    [metabase.db.metadata-queries :as metadata-queries]
    [metabase.driver :as driver]
    [metabase.driver.common :as driver.common]
+   [metabase.driver.mongo.connection :as mongo.connection]
+   [metabase.driver.mongo.database :as mongo.db]
    [metabase.driver.mongo.execute :as mongo.execute]
+   [metabase.driver.mongo.json]
    [metabase.driver.mongo.parameters :as mongo.params]
    [metabase.driver.mongo.query-processor :as mongo.qp]
-   [metabase.driver.mongo.util :refer [with-mongo-connection] :as mongo.util]
+   [metabase.driver.mongo.util :as mongo.util]
    [metabase.driver.util :as driver.u]
    [metabase.lib.metadata :as lib.metadata]
    [metabase.lib.metadata.protocols :as lib.metadata.protocols]
    [metabase.query-processor.store :as qp.store]
-   [metabase.query-processor.timezone :as qp.timezone]
    [metabase.util :as u]
    [metabase.util.log :as log]
-   [monger.command :as cmd]
-   [monger.conversion :as m.conversion]
-   [monger.core :as mg]
-   [monger.db :as mdb]
-   [monger.json]
    [taoensso.nippy :as nippy])
   (:import
-   (com.mongodb DB DBObject)
-   (java.time Instant LocalDate LocalDateTime LocalTime OffsetDateTime OffsetTime ZonedDateTime)
-   (org.bson.types ObjectId)))
+   (org.bson.types ObjectId)
+   (com.mongodb.client MongoClient MongoDatabase)))
 
 (set! *warn-on-reflection* true)
 
-;; See http://clojuremongodb.info/articles/integration.html Loading this namespace will load appropriate Monger
-;; integrations with Cheshire.
-(comment monger.json/keep-me)
+(comment metabase.driver.mongo.json/keep-me)
 
 ;; JSON Encoding (etc.)
 
@@ -53,18 +46,18 @@
 (driver/register! :mongo)
 
 (defmethod driver/can-connect? :mongo
-  [_ details]
-  (with-mongo-connection [^DB conn, details]
-    (let [db-stats (-> (cmd/db-stats conn)
-                       (m.conversion/from-db-object :keywordize))
-          db-names (mg/get-db-names mongo.util/*mongo-client*)]
+  [_ db-details]
+  (mongo.connection/with-mongo-client [^MongoClient c db-details]
+    (let [db-names (mongo.util/list-database-names c)
+          db (mongo.util/database c (mongo.db/db-name db-details))
+          db-stats (mongo.util/run-command db {:dbStats 1} :keywordize true)]
       (and
        ;; 1. check db.dbStats command completes successfully
        (= (float (:ok db-stats))
           1.0)
        ;; 2. check the database is actually on the server
        ;; (this is required because (1) is true even if the database doesn't exist)
-       (contains? db-names (:db db-stats))))))
+       (boolean (some #(= % (:db db-stats)) db-names))))))
 
 (defmethod driver/humanize-connection-error-message
   :mongo
@@ -106,7 +99,7 @@
 
 (defmethod driver/sync-in-context :mongo
   [_ database do-sync-fn]
-  (with-mongo-connection [_ database]
+  (mongo.connection/with-mongo-client [_ database]
     (do-sync-fn)))
 
 (defn- val->semantic-type [field-value]
@@ -118,8 +111,8 @@
 
     ;; 2. json?
     (and (string? field-value)
-         (or (str/starts-with? "{" field-value)
-             (str/starts-with? "[" field-value)))
+         (or (str/starts-with? field-value "{")
+             (str/starts-with? field-value "[")))
     (when-let [j (u/ignore-exceptions (json/parse-string field-value))]
       (when (or (map? j)
                 (sequential? j))
@@ -188,8 +181,8 @@
 
 (defmethod driver/dbms-version :mongo
   [_driver database]
-  (with-mongo-connection [^com.mongodb.DB conn database]
-    (let [build-info (mg/command conn {:buildInfo 1})
+  (mongo.connection/with-mongo-database [db database]
+    (let [build-info (mongo.util/run-command db {:buildInfo 1})
           version-array (get build-info "versionArray")
           sanitized-version-array (into [] (take-while nat-int?) version-array)]
       (when (not= (take 3 version-array) (take 3 sanitized-version-array))
@@ -200,57 +193,38 @@
 
 (defmethod driver/describe-database :mongo
   [_ database]
-  (with-mongo-connection [^com.mongodb.DB conn database]
-    {:tables (set (for [collection (disj (mdb/get-collection-names conn) "system.indexes")]
+  (mongo.connection/with-mongo-database [^MongoDatabase db database]
+    {:tables (set (for [collection (mongo.util/list-collection-names db)
+                        :when (not= collection "system.indexes")]
                     {:schema nil, :name collection}))}))
 
 (defmethod driver/describe-table-indexes :mongo
   [_ database table]
-  (with-mongo-connection [^com.mongodb.DB conn database]
-    ;; using raw DBObject instead of calling `monger/indexes-on`
-    ;; because in case a compound index has more than 8 keys, the `key` returned by
-    ;;`monger/indexes-on` will be a hash-map, and with a hash map we can't determine
-    ;; which key is the first key.
-    (->> (.getIndexInfo (.getCollection conn (:name table)))
-         (map (fn [index]
+  (mongo.connection/with-mongo-database [^MongoDatabase db database]
+    (let [collection (mongo.util/collection db (:name table))]
+      (->> (mongo.util/list-indexes collection)
+           (map (fn [index]
                 ;; for text indexes, column names are specified in the weights
-                (if (contains? index "textIndexVersion")
-                  (get index "weights")
-                  (get index "key"))))
-         (map (comp name first keys))
-         ;; mongo support multi key index, aka nested fields index, so we need to split the keys
-         ;; and represent it as a list of field names
-         (map #(if (str/includes? % ".")
-                 {:type  :nested-column-index
-                  :value (str/split % #"\.")}
-                 {:type  :normal-column-index
-                  :value %}))
-         set)))
-
-(defn- from-db-object
-  "This is mostly a copy of the monger library's own function of the same name with the
-  only difference that it uses an ordered map to represent the document. This ensures that
-  the order of the top level fields of the table is preserved. For anything that's not a
-  DBObject, it falls back to the original function."
-  [input]
-  (if (instance? DBObject input)
-    (let [^DBObject dbobj input]
-      (reduce (fn [m ^String k]
-                (assoc m (keyword k) (m.conversion/from-db-object (.get dbobj k) true)))
-              (ordered-map/ordered-map)
-              (.keySet dbobj)))
-    (m.conversion/from-db-object input true)))
-
-(defn- sample-documents [^com.mongodb.DB conn table sort-direction]
-  (let [collection (.getCollection conn (:name table))]
-    (with-open [cursor (doto (.find collection
-                                    (m.conversion/to-db-object {})
-                                    (m.conversion/as-field-selector []))
-                         (.limit metadata-queries/nested-field-sample-limit)
-                         (.skip 0)
-                         (.sort (m.conversion/to-db-object {:_id sort-direction}))
-                         (.batchSize 256))]
-      (map from-db-object cursor))))
+                  (if (contains? index "textIndexVersion")
+                    (get index "weights")
+                    (get index "key"))))
+           (map (comp name first keys))
+           ;; mongo support multi key index, aka nested fields index, so we need to split the keys
+           ;; and represent it as a list of field names
+           (map #(if (str/includes? % ".")
+                   {:type  :nested-column-index
+                    :value (str/split % #"\.")}
+                   {:type  :normal-column-index
+                    :value %}))
+           set))))
+
+(defn- sample-documents [^MongoDatabase db table sort-direction]
+  (let [coll (mongo.util/collection db (:name table))]
+    (mongo.util/do-find coll {:keywordize true
+                              :limit metadata-queries/nested-field-sample-limit
+                              :skip 0
+                              :sort-criteria [[:_id sort-direction]]
+                              :batch-size 256})))
 
 (defn- table-sample-column-info
   "Sample the rows (i.e., documents) in `table` and return a map of information about the column keys we found in that
@@ -258,7 +232,7 @@
 
       {:_id      {:count 200, :len nil, :types {java.lang.Long 200}, :semantic-types nil, :nested-fields nil},
        :severity {:count 200, :len nil, :types {java.lang.Long 200}, :semantic-types nil, :nested-fields nil}}"
-  [^com.mongodb.DB conn, table]
+  [^MongoDatabase db table]
   (try
     (reduce
      (fn [field-defs row]
@@ -267,14 +241,14 @@
            fields
            (recur more-keys (update fields k (partial update-field-attrs (k row)))))))
      (ordered-map/ordered-map)
-     (concat (sample-documents conn table 1) (sample-documents conn table -1)))
+     (concat (sample-documents db table 1) (sample-documents db table -1)))
     (catch Throwable t
       (log/error (format "Error introspecting collection: %s" (:name table)) t))))
 
 (defmethod driver/describe-table :mongo
   [_ database table]
-  (with-mongo-connection [^com.mongodb.DB conn database]
-    (let [column-info (table-sample-column-info conn table)]
+  (mongo.connection/with-mongo-database [^MongoDatabase db database]
+    (let [column-info (table-sample-column-info db table)]
       {:schema nil
        :name   (:name table)
        :fields (first
@@ -342,53 +316,13 @@
 
 (defmethod driver/execute-reducible-query :mongo
   [_ query context respond]
-  (with-mongo-connection [_ (lib.metadata/database (qp.store/metadata-provider))]
+  (mongo.connection/with-mongo-client [_ (lib.metadata/database (qp.store/metadata-provider))]
     (mongo.execute/execute-reducible-query query context respond)))
 
 (defmethod driver/substitute-native-parameters :mongo
   [driver inner-query]
   (mongo.params/substitute-native-parameters driver inner-query))
 
-;; It seems to be the case that the only thing BSON supports is DateTime which is basically the equivalent of Instant;
-;; for the rest of the types, we'll have to fake it
-(extend-protocol m.conversion/ConvertToDBObject
-  Instant
-  (to-db-object [t]
-    (org.bson.BsonDateTime. (t/to-millis-from-epoch t)))
-
-  LocalDate
-  (to-db-object [t]
-    (m.conversion/to-db-object (t/local-date-time t (t/local-time 0))))
-
-  LocalDateTime
-  (to-db-object [t]
-    ;; QP store won't be bound when loading test data for example.
-    (m.conversion/to-db-object (t/instant t (t/zone-id (try
-                                                         (qp.timezone/results-timezone-id)
-                                                         (catch Throwable _
-                                                           "UTC"))))))
-
-  LocalTime
-  (to-db-object [t]
-    (m.conversion/to-db-object (t/local-date-time (t/local-date "1970-01-01") t)))
-
-  OffsetDateTime
-  (to-db-object [t]
-    (m.conversion/to-db-object (t/instant t)))
-
-  OffsetTime
-  (to-db-object [t]
-    (m.conversion/to-db-object (t/offset-date-time (t/local-date "1970-01-01") t (t/zone-offset t))))
-
-  ZonedDateTime
-  (to-db-object [t]
-    (m.conversion/to-db-object (t/instant t))))
-
-(extend-protocol m.conversion/ConvertFromDBObject
-  java.util.Date
-  (from-db-object [t _]
-    (t/instant t)))
-
 (defmethod driver/db-start-of-week :mongo
   [_]
   :sunday)
@@ -406,7 +340,9 @@
                       :order-by [[:desc [:field (get-id-field-id table) nil]]]}]
       (metadata-queries/table-rows-sample table fields rff (merge mongo-opts opts)))))
 
-(comment
+;; Following code is using monger. Leaving it here for a reference as it could be transformed when there is need
+;; for ssl experiments.
+#_(comment
   (require '[clojure.java.io :as io]
            '[monger.credentials :as mcred])
   (import javax.net.ssl.SSLSocketFactory)
diff --git a/modules/drivers/mongo/src/metabase/driver/mongo/connection.clj b/modules/drivers/mongo/src/metabase/driver/mongo/connection.clj
new file mode 100644
index 0000000000000000000000000000000000000000..0b0d2a4b43863d323307f036b26d1538124c8837
--- /dev/null
+++ b/modules/drivers/mongo/src/metabase/driver/mongo/connection.clj
@@ -0,0 +1,120 @@
+(ns metabase.driver.mongo.connection
+  "This namespace contains code responsible for connecting to mongo deployment."
+  (:require
+   [clojure.string :as str]
+   [metabase.config :as config]
+   [metabase.driver.mongo.database :as mongo.db]
+   [metabase.driver.mongo.util :as mongo.util]
+   [metabase.driver.util :as driver.u]
+   [metabase.util :as u]
+   [metabase.util.log :as log]
+   [metabase.util.ssh :as ssh])
+  (:import
+   (com.mongodb ConnectionString MongoClientSettings MongoClientSettings$Builder)
+   (com.mongodb.connection SslSettings$Builder)))
+
+(set! *warn-on-reflection* true)
+
+(def ^:dynamic *mongo-client*
+  "Stores an instance of `MongoClient` bound by [[with-mongo-client]]."
+  nil)
+
+(defn db-details->connection-string
+  "Generate connection string from database details.
+
+   - `?authSource` is always prestent because we are using `dbname`.
+   - We let the user override options we are passing in by means of `additional-options`."
+  [{:keys [use-conn-uri conn-uri host port user authdb pass dbname additional-options use-srv ssl] :as _db-details}]
+  ;; Connection string docs:
+  ;; http://mongodb.github.io/mongo-java-driver/4.11/apidocs/mongodb-driver-core/com/mongodb/ConnectionString.html
+  (if use-conn-uri
+    conn-uri
+    (str
+     (if use-srv "mongodb+srv" "mongodb")
+     "://"
+     (when (seq user) (str user (when (seq pass) (str ":" pass)) "@"))
+     host
+     (when (and (not use-srv) (some? port)) (str ":" port))
+     "/"
+     dbname
+     "?authSource=" (if (empty? authdb) "admin" authdb)
+     "&appName=" config/mb-app-id-string
+     "&connectTimeoutMS=" (driver.u/db-connection-timeout-ms)
+     "&serverSelectionTimeoutMS=" (driver.u/db-connection-timeout-ms)
+     (when ssl "&ssl=true")
+     (when (seq additional-options) (str "&" additional-options)))))
+
+(defn- maybe-add-ssl-context-to-builder!
+  "Add SSL context to `builder` using `_db-details`. Mutates and returns `builder`."
+  [^MongoClientSettings$Builder builder
+   {:keys [ssl-cert ssl-use-client-auth client-ssl-cert client-ssl-key] :as _db-details}]
+  (let [server-cert? (not (str/blank? ssl-cert))
+        client-cert? (and ssl-use-client-auth
+                          (not-any? str/blank? [client-ssl-cert client-ssl-key]))]
+    (if (or client-cert? server-cert?)
+      (let [ssl-params (cond-> {}
+                         server-cert? (assoc :trust-cert ssl-cert)
+                         client-cert? (assoc :private-key client-ssl-key
+                                             :own-cert client-ssl-cert))
+            ssl-context (driver.u/ssl-context ssl-params)]
+        (.applyToSslSettings builder
+                             (reify com.mongodb.Block
+                               (apply [_this builder]
+                                 (.context ^SslSettings$Builder builder ssl-context)))))
+      builder)))
+
+(defn db-details->mongo-client-settings
+  "Generate `MongoClientSettings` from `db-details`. `ConnectionString` is generated and applied to
+   `MongoClientSettings$Builder` first. Then ssl context is udated in the `builder` object.
+   Afterwards, `MongoClientSettings` are built using `.build`."
+  ^MongoClientSettings
+  [{:keys [use-conn-uri ssl] :as db-details}]
+  (let [connection-string (-> db-details
+                              db-details->connection-string
+                              ConnectionString.)
+        builder (com.mongodb.MongoClientSettings/builder)]
+    (.applyConnectionString builder connection-string)
+    (when (and ssl (not use-conn-uri))
+      (maybe-add-ssl-context-to-builder! builder db-details))
+    (.build builder)))
+
+(defn do-with-mongo-client
+  "Implementation of [[with-mongo-client]]."
+  [thunk database]
+  (let [db-details (mongo.db/details-normalized database)]
+    (ssh/with-ssh-tunnel [details-with-tunnel db-details]
+      (let [client (mongo.util/mongo-client (db-details->mongo-client-settings details-with-tunnel))]
+        (log/debug (u/format-color 'cyan "Opened new MongoClient."))
+        (try
+          (binding [*mongo-client* client]
+            (thunk client))
+          (finally
+            (mongo.util/close client)
+            (log/debug (u/format-color 'cyan "Closed MongoClient."))))))))
+
+(defmacro with-mongo-client
+  "Create instance of `MongoClient` for `database` and bind it to [[*mongo-client*]]. `database` can be anything
+   digestable by [[mongo.db/details-normalized]]. Call of this macro in its body will reuse existing
+   [[*mongo-client*]]."
+  {:clj-kondo/lint-as 'clojure.core/let
+   :clj-kondo/ignore [:unresolved-symbol :type-mismatch]}
+  [[client-sym database] & body]
+  `(let [f# (fn [~client-sym] ~@body)]
+     (if (nil? *mongo-client*)
+       (do-with-mongo-client f# ~database)
+       (f# *mongo-client*))))
+
+(defn do-with-mongo-database
+  "Implementation of [[with-mongo-database]]."
+  [thunk database]
+  (let [db-name (-> database mongo.db/details-normalized mongo.db/details->db-name)]
+    (with-mongo-client [c database]
+      (thunk (mongo.util/database c db-name)))))
+
+(defmacro with-mongo-database
+  "Utility for accessing database directly instead of a client. For more info see [[with-mongo-client]]."
+  {:clj-kondo/lint-as 'clojure.core/let
+   :clj-kondo/ignore [:unresolved-symbol :type-mismatch]}
+  [[db-sym database] & body]
+  `(let [f# (fn [~db-sym] ~@body)]
+     (do-with-mongo-database f# ~database)))
diff --git a/modules/drivers/mongo/src/metabase/driver/mongo/conversion.clj b/modules/drivers/mongo/src/metabase/driver/mongo/conversion.clj
new file mode 100644
index 0000000000000000000000000000000000000000..5a70474f1c6cd427bf31839466d8b236add44573
--- /dev/null
+++ b/modules/drivers/mongo/src/metabase/driver/mongo/conversion.clj
@@ -0,0 +1,127 @@
+(ns metabase.driver.mongo.conversion
+  "This namespace contains utilities for conversion between mongo specific types and clojure.
+
+   It is copy of monger's conversion namespace, that was adjusted for the needs of Document. Extensions that were
+   previously implemented in our mongo driver were also moved into this namespace.
+
+   [[to-document]] and [[from-document]] are meant to be used for transformation of clojure data into mongo aggregation
+   pipelines and results back into clojure structures.
+
+   TODO: Logic is copied from monger's conversions. It seems that lot of implementations are redundant. I'm not sure
+         yet. We should consider further simplifying the namespace.
+   TODO: Consider use of bson's encoders/decoders/codecs instead this code.
+   TODO: Or consider adding types from org.bson package -- eg. BsonInt32. If we'd decide to go this way, we could
+         transform ejson completely to clojure structures. That however requires deciding how to represent
+         eg. ObjectIds (it could be eg. {$oid \"...\"} which would copy EJSON v2 way). [EJSON v2 doc](https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/).
+   TODO: Names of protocol functions and protocols are bit misleading as were in monger.
+
+   TODOs should be addressed during follow-up of monger removal."
+  (:require
+   [flatland.ordered.map :as ordered-map]
+   [java-time.api :as t]
+   [metabase.query-processor.timezone :as qp.timezone]))
+
+(set! *warn-on-reflection* true)
+
+;;;; Protocols defined originally in monger, adjusted for `Document` follow.
+
+(defprotocol ConvertFromDocument
+  (from-document [input opts] "Converts given DBObject instance to a piece of Clojure data"))
+
+(extend-protocol ConvertFromDocument
+  nil
+  (from-document [input _opts] input)
+
+  Object
+  (from-document [input _opts] input)
+
+  org.bson.types.Decimal128
+  (from-document [^org.bson.types.Decimal128 input _opts]
+    (.bigDecimalValue input))
+
+  java.util.List
+  (from-document [^java.util.List input opts]
+    (mapv #(from-document % opts) input))
+
+  java.util.Date
+  (from-document [t _]
+    (t/instant t))
+
+  org.bson.Document
+  (from-document [input {:keys [keywordize] :or {keywordize false} :as opts}]
+    (persistent! (reduce (if keywordize
+                           (fn [m ^String k]
+                             (assoc! m (keyword k) (from-document (.get input k) opts)))
+                           (fn [m ^String k]
+                             (assoc! m k (from-document (.get input k) opts))))
+                         (transient (ordered-map/ordered-map))
+                         (.keySet input)))))
+
+(defprotocol ConvertToDocument
+  (to-document [input] "Converts given piece of Clojure data to org.bson.Document usable by java driver."))
+
+(extend-protocol ConvertToDocument
+  nil
+  (to-document [_input] nil)
+
+  clojure.lang.Ratio
+  (to-document [input] (double input))
+
+  clojure.lang.Keyword
+  (to-document [input] (.getName input))
+
+  clojure.lang.Named
+  (to-document [input] (.getName input))
+
+  clojure.lang.IPersistentMap
+  (to-document [input]
+    (let [o (org.bson.Document.)]
+      (doseq [[k v] input]
+        (.put o (to-document k) (to-document v)))
+      o))
+
+  java.util.List
+  (to-document [input] (mapv to-document input))
+
+  java.util.Set
+  (to-document [input] (mapv to-document input))
+
+  Object
+  (to-document [input] input))
+
+;;;; Protocol extensions gathered from our mongo driver's code follow.
+
+;; It seems to be the case that the only thing BSON supports is DateTime which is basically the equivalent
+;; of Instant; for the rest of the types, we'll have to fake it. (Cam)
+(extend-protocol ConvertToDocument
+  java.time.Instant
+  (to-document [t]
+    (org.bson.BsonDateTime. (t/to-millis-from-epoch t)))
+
+  java.time.LocalDate
+  (to-document [t]
+    (to-document (t/local-date-time t (t/local-time 0))))
+
+  java.time.LocalDateTime
+  (to-document [t]
+    ;; QP store won't be bound when loading test data for example.
+    (to-document (t/instant t (t/zone-id (try
+                                           (qp.timezone/results-timezone-id)
+                                           (catch Throwable _
+                                             "UTC"))))))
+
+  java.time.LocalTime
+  (to-document [t]
+    (to-document (t/local-date-time (t/local-date "1970-01-01") t)))
+
+  java.time.OffsetDateTime
+  (to-document [t]
+    (to-document (t/instant t)))
+
+  java.time.OffsetTime
+  (to-document [t]
+    (to-document (t/offset-date-time (t/local-date "1970-01-01") t (t/zone-offset t))))
+
+  java.time.ZonedDateTime
+  (to-document [t]
+    (to-document (t/instant t))))
diff --git a/modules/drivers/mongo/src/metabase/driver/mongo/database.clj b/modules/drivers/mongo/src/metabase/driver/mongo/database.clj
new file mode 100644
index 0000000000000000000000000000000000000000..6e0828150a95635824767140ffa6c7e24ddf58b0
--- /dev/null
+++ b/modules/drivers/mongo/src/metabase/driver/mongo/database.clj
@@ -0,0 +1,62 @@
+(ns metabase.driver.mongo.database
+  "This namespace contains functions for work with mongo specific database and database details."
+  (:require
+   [metabase.lib.metadata.protocols :as lib.metadata.protocols]
+   [metabase.models.secret :as secret]
+   [metabase.query-processor.store :as qp.store]
+   [metabase.util.i18n :refer [tru]])
+  (:import
+   (com.mongodb ConnectionString)))
+
+(set! *warn-on-reflection* true)
+
+(defn- fqdn?
+  "A very simple way to check if a hostname is fully-qualified:
+   Check if there are two or more periods in the name."
+  [host]
+  (<= 2 (-> host frequencies (get \. 0))))
+
+(defn- validate-db-details! [{:keys [use-conn-uri conn-uri use-srv host] :as _db-details}]
+  (when (and use-srv (not (fqdn? host)))
+    (throw (ex-info (tru "Using DNS SRV requires a FQDN for host")
+                    {:host host})))
+  (when (and use-conn-uri (empty? (-> (ConnectionString. conn-uri) .getDatabase)))
+    (throw (ex-info (tru "No database name specified in URI.")
+                    {:host host}))))
+
+(defn- update-ssl-db-details
+  [db-details]
+  (-> db-details
+      (assoc :client-ssl-key (secret/get-secret-string db-details "client-ssl-key"))
+      (dissoc :client-ssl-key-creator-id
+              :client-ssl-key-created-at
+              :client-ssl-key-id
+              :client-ssl-key-source)))
+
+(defn details-normalized
+  "Gets db-details for `database`. Details are then validated and ssl related keys are updated."
+  [database]
+  (let [db-details
+        (cond
+          (integer? database)             (qp.store/with-metadata-provider database
+                                            (:details (lib.metadata.protocols/database (qp.store/metadata-provider))))
+          (string? database)              {:dbname database}
+          (:dbname (:details database))   (:details database) ; entire Database obj
+          (:dbname database)              database            ; connection details map only
+          (:conn-uri database)            database            ; connection URI has all the parameters
+          (:conn-uri (:details database)) (:details database)
+          :else
+          (throw (ex-info (tru "Unable to to get database details.")
+                          {:database database})))]
+    (validate-db-details! db-details)
+    (update-ssl-db-details db-details)))
+
+(defn details->db-name
+  "Get database name from database `:details`."
+  ^String [{:keys [dbname conn-uri] :as _db-details}]
+  (or (not-empty dbname) (-> (com.mongodb.ConnectionString. conn-uri) .getDatabase)))
+
+(defn db-name
+  "Get db-name from `database`. `database` value is something normalizable to database details."
+  [database]
+  (-> database details-normalized details->db-name))
diff --git a/modules/drivers/mongo/src/metabase/driver/mongo/execute.clj b/modules/drivers/mongo/src/metabase/driver/mongo/execute.clj
index 376f15a64fe078779e368d56a511dcba181b05dd..86cf1225abd4f56d528a28a63eea72f5dc672e9e 100644
--- a/modules/drivers/mongo/src/metabase/driver/mongo/execute.clj
+++ b/modules/drivers/mongo/src/metabase/driver/mongo/execute.clj
@@ -3,19 +3,22 @@
    [clojure.core.async :as a]
    [clojure.set :as set]
    [clojure.string :as str]
+   [metabase.driver.mongo.connection :as mongo.connection]
+   [metabase.driver.mongo.conversion :as mongo.conversion]
+   [metabase.driver.mongo.database :as mongo.db]
    [metabase.driver.mongo.query-processor :as mongo.qp]
    [metabase.driver.mongo.util :as mongo.util]
+   [metabase.lib.metadata :as lib.metadata]
    [metabase.query-processor.context :as qp.context]
    [metabase.query-processor.error-type :as qp.error-type]
    [metabase.query-processor.reducible :as qp.reducible]
+   [metabase.query-processor.store :as qp.store]
    [metabase.util.i18n :refer [tru]]
    [metabase.util.log :as log]
-   [metabase.util.malli :as mu]
-   [monger.conversion :as m.conversion]
-   [monger.util :as m.util])
+   [metabase.util.malli :as mu])
   (:import
-   (com.mongodb BasicDBObject DB DBObject)
-   (com.mongodb.client AggregateIterable ClientSession MongoDatabase MongoCursor)
+   (com.mongodb.client AggregateIterable ClientSession MongoCursor MongoDatabase)
+   (java.util ArrayList Collection)
    (java.util.concurrent TimeUnit)
    (org.bson BsonBoolean BsonInt32)))
 
@@ -105,16 +108,16 @@
 ;;; ------------------------------------------------------ Rows ------------------------------------------------------
 
 (defn- row->vec [row-col-names]
-  (fn [^DBObject row]
+  (fn [^org.bson.Document row]
     (mapv (fn [col-name]
             (let [col-parts (str/split col-name #"\.")
                   val       (reduce
-                             (fn [^BasicDBObject object ^String part-name]
+                             (fn [^org.bson.Document object ^String part-name]
                                (when object
                                  (.get object part-name)))
                              row
                              col-parts)]
-              (m.conversion/from-db-object val :keywordize)))
+              (mongo.conversion/from-document val {:keywordize true})))
           row-col-names)))
 
 (defn- post-process-row [row-col-names]
@@ -126,7 +129,7 @@
 ;;; |                                                      Run                                                       |
 ;;; +----------------------------------------------------------------------------------------------------------------+
 
-(defn- row-keys [^DBObject row]
+(defn- row-keys [^org.bson.Document row]
   (when row
     (.keySet row)))
 
@@ -146,8 +149,8 @@
    ^ClientSession session
    stages timeout-ms]
   (let [coll      (.getCollection db coll)
-        pipe      (m.util/into-array-list (m.conversion/to-db-object stages))
-        aggregate (.aggregate coll session pipe BasicDBObject)]
+        pipe      (ArrayList. ^Collection (mongo.conversion/to-document stages))
+        aggregate (.aggregate coll session pipe)]
     (init-aggregate! aggregate timeout-ms)))
 
 (defn- reducible-rows [context ^MongoCursor cursor first-row post-process]
@@ -176,36 +179,19 @@
                []
                (reducible-rows context cursor first-row (post-process-row row-col-names))))))
 
-
-(defn- connection->database
-  ^MongoDatabase
-  [^DB connection]
-  (let [db-name (.getName connection)]
-    (.. connection getMongoClient (getDatabase db-name))))
-
-(defn- start-session!
-  ^ClientSession
-  [^DB connection]
-  (.. connection getMongoClient startSession))
-
-(defn- kill-session!
-  [^MongoDatabase db
-   ^ClientSession session]
-  (let [session-id (.. session getServerSession getIdentifier)
-        kill-cmd (BasicDBObject. "killSessions" [session-id])]
-    (.runCommand db kill-cmd)))
-
 (defn execute-reducible-query
-  "Process and run a native MongoDB query."
+  "Process and run a native MongoDB query. This function expects initialized [[mongo.connection/*mongo-client*]]."
   [{{query :query collection-name :collection :as native-query} :native} context respond]
   {:pre [(string? collection-name) (fn? respond)]}
   (let [query  (cond-> query
                  (string? query) mongo.qp/parse-query-string)
-        client-database (connection->database mongo.util/*mongo-connection*)]
-    (with-open [session ^ClientSession (start-session! mongo.util/*mongo-connection*)]
+        database (lib.metadata/database (qp.store/metadata-provider))
+        db-name (mongo.db/db-name database)
+        client-database (mongo.util/database mongo.connection/*mongo-client* db-name)]
+    (with-open [session ^ClientSession (mongo.util/start-session! mongo.connection/*mongo-client*)]
       (a/go
         (when (a/<! (qp.context/canceled-chan context))
-          (kill-session! client-database session)))
+          (mongo.util/kill-session! client-database session)))
       (let [aggregate ^AggregateIterable (*aggregate* client-database
                                                       collection-name
                                                       session
diff --git a/modules/drivers/mongo/src/metabase/driver/mongo/json.clj b/modules/drivers/mongo/src/metabase/driver/mongo/json.clj
new file mode 100644
index 0000000000000000000000000000000000000000..3533deed5780687a1d7151b652440d7e40990af7
--- /dev/null
+++ b/modules/drivers/mongo/src/metabase/driver/mongo/json.clj
@@ -0,0 +1,20 @@
+(ns metabase.driver.mongo.json
+  "This namespace adds mongo specific type encoders to `cheshire`. It is copy of the relevant part of monger's `json`
+   namespace.
+
+   TODO: I believe this namespace should be completely removed. Trying to run tests without those mongo specific
+         encoders yield no failures. Unfortunately I was unable to prove it is not needed yet, hence I'm leaving it
+         in just to be safe. Removal should be considered during follow-up of monger removal."
+  (:require
+   [cheshire.generate])
+  (:import
+   (org.bson.types BSONTimestamp ObjectId)))
+
+(set! *warn-on-reflection* true)
+
+(cheshire.generate/add-encoder ObjectId
+                               (fn [^ObjectId oid ^com.fasterxml.jackson.core.json.WriterBasedJsonGenerator generator]
+                                 (.writeString generator (.toString oid))))
+(cheshire.generate/add-encoder BSONTimestamp
+                               (fn [^BSONTimestamp ts ^com.fasterxml.jackson.core.json.WriterBasedJsonGenerator generator]
+                                 (cheshire.generate/encode-map {:time (.getTime ts) :inc (.getInc ts)} generator)))
diff --git a/modules/drivers/mongo/src/metabase/driver/mongo/operators.clj b/modules/drivers/mongo/src/metabase/driver/mongo/operators.clj
new file mode 100644
index 0000000000000000000000000000000000000000..a45ae38d43cf0e0a66b9e4e941e3a765d90db062
--- /dev/null
+++ b/modules/drivers/mongo/src/metabase/driver/mongo/operators.clj
@@ -0,0 +1,216 @@
+(ns metabase.driver.mongo.operators
+  "This namespace provides definitions of mongo operators.
+
+   Namespace is a copy of monger's operator namespace.
+
+   TODO: We should consider removing this namespace completely. Having it just adds need for maintaining list of
+         operators monger provides. We could use keywords instead. Conversion code currently handles
+         transformation of those into strings during transformations to document. More importantly -- we are already
+         using keywords in lot of places in [[metabase.driver.mongo.query-processor]]. Try seraching it for `:\\$``
+         regex.
+
+   TODO should be addressed during follow-up of monger removal."
+  {:clj-kondo/config '{:linters {:missing-docstring {:level :off}}}})
+
+(def $gt "$gt")
+(def $gte "$gte")
+(def $lt "$lt")
+(def $lte "$lte")
+(def $all "$all")
+(def $in "$in")
+(def $nin "$nin")
+(def $eq "$eq")
+(def $ne "$ne")
+(def $elemMatch "$elemMatch")
+(def $regex "$regex")
+(def $options "$options")
+(def $comment "$comment")
+(def $explain "$explain")
+(def $hint "$hint")
+(def $maxTimeMS "$maxTimeMS")
+(def $orderBy "$orderBy")
+(def $query "$query")
+(def $returnKey "$returnKey")
+(def $showDiskLoc "$showDiskLoc")
+(def $natural "$natural")
+(def $expr "$expr")
+(def $jsonSchema "$jsonSchema")
+(def $where "$where")
+(def $and "$and")
+(def $or "$or")
+(def $nor "$nor")
+(def $inc "$inc")
+(def $mul "$mul")
+(def $set "$set")
+(def $unset "$unset")
+(def $setOnInsert "$setOnInsert")
+(def $rename "$rename")
+(def $push "$push")
+(def $position "$position")
+(def $each "$each")
+(def $addToSet "$addToSet")
+(def $pop "$pop")
+(def $pull "$pull")
+(def $pullAll "$pullAll")
+(def $bit "$bit")
+(def $bitsAllClear "$bitsAllClear")
+(def $bitsAllSet "$bitsAllSet")
+(def $bitsAnyClear "$bitsAnyClear")
+(def $bitsAnySet "$bitsAnySet")
+(def $exists "$exists")
+(def $mod "$mod")
+(def $size "$size")
+(def $type "$type")
+(def $not "$not")
+(def $addFields "$addFields")
+(def $bucket "$bucket")
+(def $bucketAuto "$bucketAuto")
+(def $collStats "$collStats")
+(def $facet "$facet")
+(def $geoNear "$geoNear")
+(def $graphLookup "$graphLookup")
+(def $indexStats "$indexStats")
+(def $listSessions "$listSessions")
+(def $lookup "$lookup")
+(def $match "$match")
+(def $merge "$merge")
+(def $out "$out")
+(def $planCacheStats "$planCacheStats")
+(def $project "$project")
+(def $redact "$redact")
+(def $replaceRoot "$replaceRoot")
+(def $replaceWith "$replaceWith")
+(def $sample "$sample")
+(def $limit "$limit")
+(def $skip "$skip")
+(def $unwind "$unwind")
+(def $group "$group")
+(def $sort "$sort")
+(def $sortByCount "$sortByCount")
+(def $currentOp "$currentOp")
+(def $listLocalSessions "$listLocalSessions")
+(def $cmp "$cmp")
+(def $min "$min")
+(def $max "$max")
+(def $avg "$avg")
+(def $stdDevPop "$stdDevPop")
+(def $stdDevSamp "$stdDevSamp")
+(def $sum "$sum")
+(def $let "$let")
+(def $first "$first")
+(def $last "$last")
+(def $abs "$abs")
+(def $add "$add")
+(def $ceil "$ceil")
+(def $divide "$divide")
+(def $exp "$exp")
+(def $floor "$floor")
+(def $ln "$ln")
+(def $log "$log")
+(def $log10 "$log10")
+(def $multiply "$multiply")
+(def $pow "$pow")
+(def $round "$round")
+(def $sqrt "$sqrt")
+(def $subtract "$subtract")
+(def $trunc "$trunc")
+(def $literal "$literal")
+(def $arrayElemAt "$arrayElemAt")
+(def $arrayToObject "$arrayToObject")
+(def $concatArrays "$concatArrays")
+(def $filter "$filter")
+(def $indexOfArray "$indexOfArray")
+(def $isArray "$isArray")
+(def $map "$map")
+(def $objectToArray "$objectToArray")
+(def $range "$range")
+(def $reduce "$reduce")
+(def $reverseArray "$reverseArray")
+(def $zip "$zip")
+(def $mergeObjects "$mergeObjects")
+(def $allElementsTrue "$allElementsTrue")
+(def $anyElementsTrue "$anyElementsTrue")
+(def $setDifference "$setDifference")
+(def $setEquals "$setEquals")
+(def $setIntersection "$setIntersection")
+(def $setIsSubset "$setIsSubset")
+(def $setUnion "$setUnion")
+(def $strcasecmp "$strcasecmp")
+(def $substr "$substr")
+(def $substrBytes "$substrBytes")
+(def $substrCP "$substrCP")
+(def $toLower "$toLower")
+(def $toString "$toString")
+(def $toUpper "$toUpper")
+(def $concat "$concat")
+(def $indexOfBytes "$indexOfBytes")
+(def $indexOfCP "$indexOfCP")
+(def $ltrim "$ltrim")
+(def $regexFind "$regexFind")
+(def $regexFindAll "$regexFindAll")
+(def $regexMatch "$regexMatch")
+(def $rtrim "$rtrim")
+(def $split "$split")
+(def $strLenBytes "$strLenBytes")
+(def $subLenCP "$subLenCP")
+(def $trim "$trim")
+(def $sin "$sin")
+(def $cos "$cos")
+(def $tan "$tan")
+(def $asin "$asin")
+(def $acos "$acos")
+(def $atan "$atan")
+(def $atan2 "$atan2")
+(def $asinh "$asinh")
+(def $acosh "$acosh")
+(def $atanh "$atanh")
+(def $radiansToDegrees "$radiansToDegrees")
+(def $degreesToRadians "$degreesToRadians")
+(def $convert "$convert")
+(def $toBool "$toBool")
+(def $toDecimal "$toDecimal")
+(def $toDouble "$toDouble")
+(def $toInt "$toInt")
+(def $toLong "$toLong")
+(def $toObjectId "$toObjectId")
+(def $dayOfMonth "$dayOfMonth")
+(def $dayOfWeek "$dayOfWeek")
+(def $dayOfYear "$dayOfYear")
+(def $hour "$hour")
+(def $minute "$minute")
+(def $month "$month")
+(def $second "$second")
+(def $millisecond "$millisecond")
+(def $week "$week")
+(def $year "$year")
+(def $isoDate "$isoDate")
+(def $dateFromParts "$dateFromParts")
+(def $dateFromString "$dateFromString")
+(def $dateToParts "$dateToParts")
+(def $dateToString "$dateToString")
+(def $isoDayOfWeek "$isoDayOfWeek")
+(def $isoWeek "$isoWeek")
+(def $isoWeekYear "$isoWeekYear")
+(def $toDate "$toDate")
+(def $ifNull "$ifNull")
+(def $cond "$cond")
+(def $switch "$switch")
+(def $geoWithin "$geoWithin")
+(def $geoIntersects "$geoIntersects")
+(def $near "$near")
+(def $nearSphere "$nearSphere")
+(def $geometry "$geometry")
+(def $maxDistance "$maxDistance")
+(def $minDistance "$minDistance")
+(def $center "$center")
+(def $centerSphere "$centerSphere")
+(def $box "$box")
+(def $polygon "$polygon")
+(def $slice "$slice")
+(def $text "$text")
+(def $meta "$meta")
+(def $search "$search")
+(def $language "$language")
+(def $currentDate "$currentDate")
+(def $isolated "$isolated")
+(def $count "$count")
diff --git a/modules/drivers/mongo/src/metabase/driver/mongo/query_processor.clj b/modules/drivers/mongo/src/metabase/driver/mongo/query_processor.clj
index e218f490aa6ac088f74336ced7a34b87f3dcaff5..1f8f0c6ba2280dd88e7df62c89ff645907bcbf05 100644
--- a/modules/drivers/mongo/src/metabase/driver/mongo/query_processor.clj
+++ b/modules/drivers/mongo/src/metabase/driver/mongo/query_processor.clj
@@ -9,6 +9,11 @@
    [java-time.api :as t]
    [metabase.driver :as driver]
    [metabase.driver.common :as driver.common]
+   [metabase.driver.mongo.operators :refer [$add $addToSet $and $avg $concat $cond
+                                            $dayOfMonth $dayOfWeek $dayOfYear $divide $eq $expr
+                                            $group $gt $gte $hour $limit $literal $lookup $lt $lte $match $max $min
+                                            $minute $mod $month $multiply $ne $not $or $project $regexMatch $second
+                                            $size $skip $sort $strcasecmp $subtract $sum $toLower $unwind $year]]
    [metabase.driver.util :as driver.u]
    [metabase.lib.metadata :as lib.metadata]
    [metabase.mbql.schema :as mbql.s]
@@ -25,12 +30,7 @@
    [metabase.util.i18n :refer [tru]]
    [metabase.util.log :as log]
    [metabase.util.malli :as mu]
-   [metabase.util.malli.schema :as ms]
-   [monger.operators :refer [$add $addToSet $and $avg $concat $cond
-                             $dayOfMonth $dayOfWeek $dayOfYear $divide $eq $expr
-                             $group $gt $gte $hour $limit $literal $lookup $lt $lte $match $max $min $minute
-                             $mod $month $multiply $ne $not $or $project $regexMatch $second $size $skip $sort
-                             $strcasecmp $subtract $sum $toLower $unwind $year]])
+   [metabase.util.malli.schema :as ms])
   (:import
    (org.bson BsonBinarySubType)
    (org.bson.types Binary ObjectId)))
@@ -1370,7 +1370,15 @@
   "Parse a serialized native query. Like a normal JSON parse, but handles BSON/MongoDB extended JSON forms."
   [^String s]
   (try
-    (mapv (fn [^org.bson.BsonValue v] (-> v .asDocument com.mongodb.BasicDBObject.))
+    ;; TODO: Fixme! In following expression we were previously creating BasicDBObject's. As part of Monger removal
+    ;;       in favor of plain mongo-java-driver we now create Documents. I believe following conversion was and is
+    ;;       responsible for https://github.com/metabase/metabase/issues/38181. When pipeline is deserialized,
+    ;;       we end up with vector of `Document`s into which are appended new query stages, which are clojure
+    ;;       structures. When we render the query in "view native query" in query builder, clojure structures
+    ;;       are transformed to json correctly. But documents are rendered to their string representation (screenshot
+    ;;       in the issue). Possible fix could be to represent native queries in ejson v2, which conforms to json rfc,
+    ;;       hence there would be no need for special bson values handling. That is to be further investigated.
+    (mapv (fn [^org.bson.BsonValue v] (-> v .asDocument org.bson.Document.))
           (org.bson.BsonArray/parse s))
     (catch Throwable e
       (throw (ex-info (tru "Unable to parse query: {0}" (.getMessage e))
diff --git a/modules/drivers/mongo/src/metabase/driver/mongo/util.clj b/modules/drivers/mongo/src/metabase/driver/mongo/util.clj
index 93a1298891e7b8efa63eab9d5678a72fdd60bd11..b67a5d1948ee4636fdcf177e87b20e8de46bb81f 100644
--- a/modules/drivers/mongo/src/metabase/driver/mongo/util.clj
+++ b/modules/drivers/mongo/src/metabase/driver/mongo/util.clj
@@ -1,262 +1,96 @@
 (ns metabase.driver.mongo.util
-  "`*mongo-connection*`, `with-mongo-connection`, and other functions shared between several Mongo driver namespaces."
+  "Mongo specific utility functions -- mongo methods that we are using at various places wrapped into clojure
+   functions."
   (:require
-   [clojure.string :as str]
-   [metabase.config :as config]
-   [metabase.driver.util :as driver.u]
-   [metabase.lib.metadata.protocols :as lib.metadata.protocols]
-   [metabase.models.secret :as secret]
-   [metabase.query-processor.store :as qp.store]
-   [metabase.util :as u]
-   [metabase.util.i18n :refer [trs tru]]
-   [metabase.util.log :as log]
-   [metabase.util.ssh :as ssh]
-   [monger.core :as mg]
-   [monger.credentials :as mcred])
+   [flatland.ordered.map :as ordered-map]
+   [metabase.driver.mongo.conversion :as mongo.conversion])
   (:import
-   (com.mongodb MongoClient MongoClientOptions MongoClientOptions$Builder MongoClientURI)))
+   (com.mongodb MongoClientSettings)
+   (com.mongodb.client ClientSession FindIterable MongoClient MongoClients MongoCollection MongoDatabase)))
 
 (set! *warn-on-reflection* true)
 
-(def ^:dynamic ^com.mongodb.DB *mongo-connection*
-  "Connection to a Mongo database. Bound by top-level `with-mongo-connection` so it may be reused within its body."
-  nil)
-
-(def ^:dynamic ^com.mongodb.MongoClient *mongo-client*
-  "Client used to connect to a Mongo database. Bound by top-level `with-mongo-connection` so it may be reused within its body."
-  nil)
-
-;; the code below is done to support "additional connection options" the way some of the JDBC drivers do.
-;; For example, some people might want to specify a`readPreference` of `nearest`. The normal Java way of
-;; doing this would be to do
-;;
-;;     (.readPreference builder (ReadPreference/nearest))
-;;
-;; But the user will enter something like `readPreference=nearest`. Luckily, the Mongo Java lib can parse
-;; these options for us and return a `MongoClientOptions` like we'd prefer. Code below:
-
-(defn- client-options-for-url-params
-  "Return an instance of `MongoClientOptions` from a `url-params` string, e.g.
-
-    (client-options-for-url-params \"readPreference=nearest\")
-    ;; -> #MongoClientOptions{readPreference=nearest, ...}"
-  ^MongoClientOptions [^String url-params]
-  (when (seq url-params)
-    ;; just make a fake connection string to tack the URL params on to. We can use that to have the Mongo lib
-    ;; take care of parsing the params and converting them to Java-style `MongoConnectionOptions`
-    (.getOptions (MongoClientURI. (str "mongodb://localhost/?" url-params)))))
-
-(defn- client-options->builder
-  "Return a `MongoClientOptions.Builder` for a `MongoClientOptions` `client-options`.
-  If `client-options` is `nil`, return a new 'default' builder."
-  ^MongoClientOptions$Builder [^MongoClientOptions client-options]
-  ;; We do it tnis way because (MongoClientOptions$Builder. nil) throws a NullPointerException
-  (if client-options
-    (MongoClientOptions$Builder. client-options)
-    (MongoClientOptions$Builder.)))
-
-(defn- connection-options-builder
-  "Build connection options for Mongo.
-  We have to use `MongoClientOptions.Builder` directly to configure our Mongo connection since Monger's wrapper method
-  doesn't support `.serverSelectionTimeout` or `.sslEnabled`. `additional-options`, a String like
-  `readPreference=nearest`, can be specified as well; when passed, these are parsed into a `MongoClientOptions` that
-  serves as a starting point for the changes made below."
-  ^MongoClientOptions [{:keys [ssl additional-options ssl-cert
-                               ssl-use-client-auth client-ssl-cert client-ssl-key]
-                        :or   {ssl false, ssl-use-client-auth false}}]
-  (let [client-options (-> (client-options-for-url-params additional-options)
-                           client-options->builder
-                           (.description config/mb-app-id-string)
-                           (.connectTimeout (driver.u/db-connection-timeout-ms))
-                           (.serverSelectionTimeout (driver.u/db-connection-timeout-ms))
-                           (.sslEnabled ssl))
-        server-cert? (not (str/blank? ssl-cert))
-        client-cert? (and ssl-use-client-auth
-                          (not-any? str/blank? [client-ssl-cert client-ssl-key]))]
-    (if (or server-cert? client-cert?)
-      (let [ssl-params (cond-> {}
-                         server-cert? (assoc :trust-cert ssl-cert)
-                         client-cert? (assoc :private-key client-ssl-key
-                                             :own-cert client-ssl-cert))]
-        (.socketFactory client-options (driver.u/ssl-socket-factory ssl-params)))
-      client-options)))
-
-;; The arglists metadata for mg/connect are actually *WRONG* -- the function additionally supports a 3-arg airity
-;; where you can pass options and credentials, as we'd like to do. We need to go in and alter the metadata of this
-;; function ourselves because otherwise the Eastwood linter will complain that we're calling the function with the
-;; wrong airity :sad: :/
-(alter-meta! #'mg/connect assoc :arglists '([{:keys [host port uri]}]
-                                            [server-address options]
-                                            [server-address options credentials]))
-
-(defn- database->details
-  "Make sure `database` is in a standard db details format. This is done so we can accept several different types of
-  values for `database`, such as plain strings or the usual MB details map."
-  [database]
-  (cond
-    (integer? database)             (qp.store/with-metadata-provider database
-                                      (:details (lib.metadata.protocols/database (qp.store/metadata-provider))))
-    (string? database)              {:dbname database}
-    (:dbname (:details database))   (:details database) ; entire Database obj
-    (:dbname database)              database            ; connection details map only
-    (:conn-uri database)            database            ; connection URI has all the parameters
-    (:conn-uri (:details database)) (:details database)
-    :else                           (throw (Exception. (str "with-mongo-connection failed: bad connection details:"
-                                                          (:details database))))))
-
-(defn- srv-conn-str
-  "Creates Mongo client connection string to connect using
-   DNS + SRV discovery mechanism."
-  [user pass host dbname authdb]
-  (format "mongodb+srv://%s:%s@%s/%s?authSource=%s" user pass host dbname authdb))
-
-(defn- normalize-details [details]
-  (let [{:keys [dbname host port user pass authdb additional-options use-srv conn-uri ssl ssl-cert ssl-use-client-auth client-ssl-cert]
-         :or   {port 27017, ssl false, ssl-use-client-auth false, use-srv false, ssl-cert "", authdb "admin"}} details
-        ;; ignore empty :user and :pass strings
-        user (not-empty user)
-        pass (not-empty pass)]
-    {:host                    host
-     :port                    port
-     :user                    user
-     :authdb                  authdb
-     :pass                    pass
-     :dbname                  dbname
-     :ssl                     ssl
-     :additional-options      additional-options
-     :conn-uri                conn-uri
-     :srv?                    use-srv
-     :ssl-cert                ssl-cert
-     :ssl-use-client-auth     ssl-use-client-auth
-     :client-ssl-cert         client-ssl-cert
-     :client-ssl-key          (secret/get-secret-string details "client-ssl-key")}))
-
-(defn- fqdn?
-  "A very simple way to check if a hostname is fully-qualified:
-   Check if there are two or more periods in the name."
-  [host]
-  (<= 2 (-> host frequencies (get \. 0))))
-
-(defn- auth-db-or-default
-  "Returns the auth-db to use for a connection, for the given `auth-db` parameter.  If `auth-db` is a non-blank string,
-  it will be returned.  Otherwise, the default value (\"admin\") will be returned."
-  [auth-db]
-  (if (str/blank? auth-db) "admin" auth-db))
-
-(defn- srv-connection-info
-  "Connection info for Mongo using DNS SRV.  Requires FQDN for `host` in the format
-   'subdomain. ... .domain.top-level-domain'.  Only a single host is supported, but a
-   replica list could easily provided instead of a single host.
-   Using SRV automatically enables SSL, though we explicitly set SSL to true anyway.
-   Docs to generate URI string: https://docs.mongodb.com/manual/reference/connection-string/#dns-seedlist-connection-format"
-  [{:keys [host user authdb pass dbname] :as details}]
-  (if-not (fqdn? host)
-    (throw (ex-info (tru "Using DNS SRV requires a FQDN for host")
-                    {:host host}))
-    (let [conn-opts (connection-options-builder details)
-          authdb    (auth-db-or-default authdb)
-          conn-str  (srv-conn-str user pass host dbname authdb)]
-      {:type :srv
-       :uri  (MongoClientURI. conn-str conn-opts)})))
-
-(defn- normal-connection-info
-  "Connection info for Mongo.  Returns options for the fallback method to connect
-   to hostnames that are not FQDNs.  This works with 'localhost', but has been problematic with FQDNs.
-   If you would like to provide a FQDN, use `srv-connection-info`"
-  [{:keys [host port user authdb pass dbname] :as details}]
-  (let [server-address                   (mg/server-address host port)
-        credentials                      (when user
-                                           (mcred/create user (auth-db-or-default authdb) pass))
-        ^MongoClientOptions$Builder opts (connection-options-builder details)]
-    {:type           :normal
-     :server-address server-address
-     :credentials    credentials
-     :dbname         dbname
-     :options        (-> opts .build)}))
-
-(defn- conn-string-info
-  "Connection info for Mongo using a user-provided connection string."
-  [{:keys [conn-uri]}]
-  {:type        :conn-string
-   :conn-string conn-uri})
-
-(defn- details->mongo-connection-info [{:keys [conn-uri srv?], :as details}]
-  (if (str/blank? conn-uri)
-      ((if srv?
-         srv-connection-info
-         normal-connection-info) details)
-      (conn-string-info details)))
-
-(defmulti ^:private connect
-  "Connect to MongoDB using Mongo `connection-info`, return a tuple of `[mongo-client db]`, instances of `MongoClient`
-   and `DB` respectively.
-
-   If `host` is a fully-qualified domain name, then we need to connect to Mongo
-   differently.  It has been problematic to connect to Mongo with an FQDN using
-   `mg/connect`.  The fix was to create a connection string and use DNS SRV for
-   FQDNS.  In this fn we provide the correct connection fn based on host."
-  {:arglists '([connection-info])}
-  :type)
-
-(defmethod connect :srv
-  [{:keys [^MongoClientURI uri]}]
-  (let [mongo-client (MongoClient. uri)]
-    (if-let [db-name (.getDatabase uri)]
-      [mongo-client (.getDB mongo-client db-name)]
-      (throw (ex-info (tru "No database name specified in URI. Monger requires a database to be explicitly configured.")
-                      {:hosts (-> uri .getHosts)
-                       :uri   (-> uri .getURI)
-                       :opts  (-> uri .getOptions)})))))
-
-(defmethod connect :normal
-  [{:keys [server-address options credentials dbname]}]
-  (let [do-connect (partial mg/connect server-address options)
-        mongo-client (if credentials
-                       (do-connect credentials)
-                       (do-connect))]
-    [mongo-client (mg/get-db mongo-client dbname)]))
-
-(defmethod connect :conn-string
-  [{:keys [conn-string]}]
-  (let [mongo-client (mg/connect-via-uri conn-string)]
-    [(:conn mongo-client) (:db mongo-client)]))
-
-(defn do-with-mongo-connection
-  "Run `f` with a new connection (bound to [[*mongo-connection*]]) to `database`. Don't use this directly; use
-  [[with-mongo-connection]]. Also dynamically binds the Mongo client to [[*mongo-client*]]."
-  [f database]
-  (let [details (database->details database)]
-    (ssh/with-ssh-tunnel [details-with-tunnel details]
-      (let [connection-info (details->mongo-connection-info (normalize-details details-with-tunnel))
-            [mongo-client db] (connect connection-info)]
-        (log/debug (u/format-color 'cyan (trs "Opened new MongoDB connection.")))
-        (try
-          (binding [*mongo-connection* db
-                    *mongo-client*     mongo-client]
-            (f *mongo-connection*))
-          (finally
-            (mg/disconnect mongo-client)
-            (log/debug (u/format-color 'cyan (trs "Closed MongoDB connection.")))))))))
-
-(defmacro with-mongo-connection
-  "Open a new MongoDB connection to ``database-or-connection-string`, bind connection to `binding`, execute `body`, and
-  close the connection. The DB connection is re-used by subsequent calls to [[with-mongo-connection]] within
-  `body`. (We're smart about it: `database` isn't even evaluated if [[*mongo-connection*]] is already bound.)
-
-  [[*mongo-client*]] is also dynamically bound to the MongoClient instance.
-
-    ;; delay isn't derefed if *mongo-connection* is already bound
-    (with-mongo-connection [^com.mongodb.DB conn @(:db (sel :one Table ...))]
-      ...)
-
-    ;; You can use a string instead of a Database
-    (with-mongo-connection [^com.mongodb.DB conn \"mongodb://127.0.0.1:27017/test\"]
-       ...)
-
-  `database-or-connection-string` can also optionally be the connection details map on its own."
-  [[database-binding database] & body]
-  `(let [f# (fn [~database-binding]
-              ~@body)]
-     (if *mongo-connection*
-       (f# *mongo-connection*)
-       (do-with-mongo-connection f# ~database))))
+(defn mongo-client
+  "Create `MongoClient` from `MongoClientSettings`."
+  ^MongoClient [^MongoClientSettings settings]
+  (MongoClients/create settings))
+
+(defn close
+  "Close `client`."
+  [^MongoClient client]
+  (.close client))
+
+(defn database
+  "Get database by its name from `client`."
+  ^MongoDatabase [^MongoClient client db-name]
+  (.getDatabase client db-name))
+
+;; https://mongodb.github.io/mongo-java-driver/4.11/apidocs/mongodb-driver-sync/com/mongodb/client/MongoDatabase.html#runCommand(org.bson.conversions.Bson)
+;; Returns Document
+(defn run-command
+  "Run mongo command."
+  ([^MongoDatabase db cmd & {:as opts}]
+   (let [cmd-doc (mongo.conversion/to-document cmd)]
+     (-> (.runCommand db cmd-doc)
+         (mongo.conversion/from-document opts)))))
+
+;; https://mongodb.github.io/mongo-java-driver/4.11/apidocs/mongodb-driver-sync/com/mongodb/client/MongoClient.html#listDatabaseNames()
+;; returns MongoIterable<String>
+(defn list-database-names
+  "Return vector of names of databases for `client`."
+  [^MongoClient client]
+  (vec (.listDatabaseNames client)))
+
+;; https://mongodb.github.io/mongo-java-driver/4.11/apidocs/mongodb-driver-sync/com/mongodb/client/MongoDatabase.html#listCollectionNames()
+(defn list-collection-names
+  "Return vector of collection names for `db`"
+  [^MongoDatabase db]
+  (vec (.listCollectionNames db)))
+
+(defn collection
+  "Return `MongoCollection` for `db` by its name."
+  ^MongoCollection [^MongoDatabase db coll-name]
+  (.getCollection db coll-name))
+
+;; https://mongodb.github.io/mongo-java-driver/4.11/apidocs/mongodb-driver-sync/com/mongodb/client/MongoCollection.html#listIndexes()
+(defn list-indexes
+  "Return vector of Documets describing indexes."
+  [^MongoCollection coll & {:as opts}]
+  (mongo.conversion/from-document (.listIndexes coll) opts))
+
+;; https://mongodb.github.io/mongo-java-driver/4.11/apidocs/mongodb-driver-sync/com/mongodb/client/MongoCollection.html#find()
+(defn do-find
+  "Perform find on collection. `sort-criteria` should be sequence of key value pairs (eg. vector of vectors), or
+   `ordered-map`. Keys are the column name. Keys could be keywords. `opts` could contain also `:keywordize`, which
+   is param for `from-document`."
+  [^MongoCollection coll
+   & {:keys [limit skip batch-size sort-criteria] :as opts}]
+  (->> (cond-> ^FindIterable (.find coll)
+         limit (.limit limit)
+         skip (.skip skip)
+         batch-size (.batchSize (int batch-size))
+         sort-criteria (.sort (mongo.conversion/to-document (ordered-map/ordered-map sort-criteria))))
+       (mapv #(mongo.conversion/from-document % opts))))
+
+(defn create-index
+  "Create index."
+  [^MongoCollection coll cmd-map]
+  (.createIndex coll (mongo.conversion/to-document cmd-map)))
+
+(defn insert-one
+  "Insert document into mongo collection."
+  [^MongoCollection coll document-map]
+  (.insertOne coll (mongo.conversion/to-document document-map)))
+
+(defn start-session!
+  "Start session on client `c`."
+  ^ClientSession [^MongoClient c]
+  (. c startSession))
+
+(defn kill-session!
+  "Kill `session` in `db`."
+  [^MongoDatabase db
+   ^ClientSession session]
+  (let [session-id (.. session getServerSession getIdentifier)
+        kill-cmd (mongo.conversion/to-document {:killSessions [session-id]})]
+    (.runCommand db kill-cmd)))
diff --git a/modules/drivers/mongo/test/metabase/driver/mongo/connection_test.clj b/modules/drivers/mongo/test/metabase/driver/mongo/connection_test.clj
new file mode 100644
index 0000000000000000000000000000000000000000..2030d27f3a898c202e5cbbf9d019bf024bc9252c
--- /dev/null
+++ b/modules/drivers/mongo/test/metabase/driver/mongo/connection_test.clj
@@ -0,0 +1,141 @@
+(ns metabase.driver.mongo.connection-test
+  (:require
+   [clojure.string :as str]
+   [clojure.test :refer :all]
+   [metabase.driver.mongo.connection :as mongo.connection]
+   [metabase.driver.mongo.database :as mongo.db]
+   [metabase.driver.mongo.util :as mongo.util]
+   [metabase.driver.util :as driver.u]
+   [metabase.test :as mt])
+  (:import
+   (com.mongodb ServerAddress)
+   (com.mongodb.client MongoDatabase)))
+
+;;;; TODO: move some tests to db-test?
+
+(set! *warn-on-reflection* true)
+
+(def ^:private mock-details
+  {:user "test-user"
+   :pass "test-pass"
+   :host "test-host.place.com"
+   :dbname "datadb"
+   :authdb "authdb"
+   :use-srv true})
+
+(deftest ^:parallel fqdn?-test
+  (testing "test hostname is fqdn"
+    (is (true? (#'mongo.db/fqdn? "db.mongo.com")))
+    (is (true? (#'mongo.db/fqdn? "replica-01.db.mongo.com")))
+    (is (false? (#'mongo.db/fqdn? "localhost")))
+    (is (false? (#'mongo.db/fqdn? "localhost.localdomain")))))
+
+(deftest ^:parallel srv-conn-str-test
+  (let [db-details {:user "test-user"
+                    :pass "test-pass"
+                    :host "test-host.place.com"
+                    :dbname "datadb"
+                    :authdb "authdb"
+                    :use-srv true}]
+    (testing "mongo+srv connection string used when :use-srv is thruthy"
+      (is (str/includes? (mongo.connection/db-details->connection-string db-details)
+                         "mongodb+srv://test-user:test-pass@test-host.place.com/datadb?authSource=authdb")))
+    (testing "Only fqdn may be used with mongo+srv"
+      (is (thrown-with-msg? Throwable
+                            #"Using DNS SRV requires a FQDN for host"
+                            (-> db-details
+                                (assoc :host "localhost")
+                                mongo.db/details-normalized
+                                mongo.connection/db-details->connection-string))))))
+
+(deftest ^:parallel srv-connection-properties-test
+  (testing "connection properties when using SRV"
+    (are [host msg]
+         (thrown-with-msg? Throwable msg
+                           (mongo.connection/with-mongo-database [^MongoDatabase db
+                                                           {:host               host
+                                                            :port               1015
+                                                            :user               "test-user"
+                                                            :authdb             "test-authdb"
+                                                            :pass               "test-passwd"
+                                                            :dbname             "test-dbname"
+                                                            :ssl                true
+                                                            :additional-options "connectTimeoutMS=2000&serverSelectionTimeoutMS=2000"
+                                                            :use-srv            true}]
+                             (mongo.util/list-collection-names db)))
+      "db.fqdn.test" #"Failed looking up SRV record"
+      "local.test" #"Using DNS SRV requires a FQDN for host")
+    (testing "test host and port are correct for both srv and normal"
+      (let [host                            "localhost"
+            details                         {:host               host
+                                             :port               1010
+                                             :user               "test-user"
+                                             :authdb             "test-authdb"
+                                             :pass               "test-passwd"
+                                             :dbname             "test-dbname"
+                                             :ssl                true
+                                             :additional-options ""}
+            client-settings (mongo.connection/db-details->mongo-client-settings details)
+            ^ServerAddress server-address (-> client-settings .getClusterSettings .getHosts first)]
+        (is (= "localhost"
+               (.getHost server-address)))
+        (is (= 1010
+               (.getPort server-address)))))))
+
+(deftest ^:parallel additional-connection-options-test
+  (mt/test-driver
+   :mongo
+   (testing "test that people can specify additional connection options like `?readPreference=nearest`"
+     (is (= (com.mongodb.ReadPreference/nearest)
+            (-> (assoc mock-details :additional-options "readPreference=nearest")
+                mongo.connection/db-details->mongo-client-settings
+                .getReadPreference)))
+     (is (= (com.mongodb.ReadPreference/secondaryPreferred)
+            (-> mock-details
+                (assoc :additional-options "readPreference=secondaryPreferred")
+                mongo.connection/db-details->mongo-client-settings
+                .getReadPreference))))
+   (testing "make sure we can specify multiple options"
+     (let [settings (-> mock-details
+                        (assoc :additional-options "readPreference=secondary&replicaSet=test")
+                        mongo.connection/db-details->mongo-client-settings)]
+       (is (= "test"
+              (-> settings .getClusterSettings .getRequiredReplicaSetName)))
+       (is (= (com.mongodb.ReadPreference/secondary)
+              (.getReadPreference settings)))))
+   (testing "make sure that invalid additional options throw an Exception"
+     (is (thrown-with-msg?
+          IllegalArgumentException
+          #"No match for read preference of ternary"
+          (-> mock-details
+              (assoc :additional-options "readPreference=ternary")
+              mongo.connection/db-details->mongo-client-settings))))))
+
+(deftest ^:parallel test-ssh-connection
+  (testing "Gets an error when it can't connect to mongo via ssh tunnel"
+    (mt/test-driver
+     :mongo
+     (is (thrown?
+          java.net.ConnectException
+          (try
+            (let [engine :mongo
+                  details {:ssl            false
+                           :password       "changeme"
+                           :tunnel-host    "localhost"
+                           :tunnel-pass    "BOGUS-BOGUS"
+                           :port           5432
+                           :dbname         "test"
+                           :host           "localhost"
+                           :tunnel-enabled true
+                           ;; we want to use a bogus port here on purpose -
+                           ;; so that locally, it gets a ConnectionRefused,
+                           ;; and in CI it does too. Apache's SSHD library
+                           ;; doesn't wrap every exception in an SshdException
+                           :tunnel-port    21212
+                           :tunnel-user    "bogus"}]
+              (driver.u/can-connect-with-details? engine details :throw-exceptions))
+            (catch Throwable e
+              (loop [^Throwable e e]
+                (or (when (instance? java.net.ConnectException e)
+                      (throw e))
+                    (some-> (.getCause e) recur))))))))))
diff --git a/modules/drivers/mongo/test/metabase/driver/mongo/conversion_test.clj b/modules/drivers/mongo/test/metabase/driver/mongo/conversion_test.clj
new file mode 100644
index 0000000000000000000000000000000000000000..9ead5932b568157c459d6383952328a6b2deb6af
--- /dev/null
+++ b/modules/drivers/mongo/test/metabase/driver/mongo/conversion_test.clj
@@ -0,0 +1,28 @@
+(ns metabase.driver.mongo.conversion-test
+  (:require
+   [clojure.test :refer :all]
+   [clojure.walk :as walk]
+   [flatland.ordered.map :as ordered-map]
+   [metabase.driver.mongo.conversion :as mongo.conversion]))
+
+(set! *warn-on-reflection* true)
+
+(deftest ^:parallel transformation-test
+  (let [m (ordered-map/ordered-map
+           :a 1
+           :b "x"
+           :c {:d "e"})
+        ms {"u" "v"
+            "x" {"y" "z"}}]
+    (testing "Transform map"
+      (is (= m
+             (-> m mongo.conversion/to-document (mongo.conversion/from-document {:keywordize true}))))
+      (is (= ms
+             (-> ms mongo.conversion/to-document (mongo.conversion/from-document nil)))))
+    (testing "Transform sequence"
+      (let [mseqk [m (walk/keywordize-keys ms)]
+            mseqs [(walk/stringify-keys m) ms]]
+        (is (= mseqk
+               (-> mseqk mongo.conversion/to-document (mongo.conversion/from-document {:keywordize true}))))
+        (is (= mseqs
+               (-> mseqs mongo.conversion/to-document (mongo.conversion/from-document nil))))))))
diff --git a/modules/drivers/mongo/test/metabase/driver/mongo/database_test.clj b/modules/drivers/mongo/test/metabase/driver/mongo/database_test.clj
new file mode 100644
index 0000000000000000000000000000000000000000..1b3a2cfb3e7bc6b97732d94b4d12c10be36bbcc9
--- /dev/null
+++ b/modules/drivers/mongo/test/metabase/driver/mongo/database_test.clj
@@ -0,0 +1,19 @@
+(ns metabase.driver.mongo.database-test
+  (:require
+   [clojure.test :refer :all]
+   [metabase.driver.mongo.database :as mongo.db]))
+
+(deftest ^:parallel fqdn?-test
+  (testing "test hostname is fqdn"
+    (is (true? (#'mongo.db/fqdn? "db.mongo.com")))
+    (is (true? (#'mongo.db/fqdn? "replica-01.db.mongo.com")))
+    (is (false? (#'mongo.db/fqdn? "localhost")))
+    (is (false? (#'mongo.db/fqdn? "localhost.localdomain")))))
+
+(deftest ^:parallel db-name-test
+  (testing "`dbname` is in db-details"
+    (is (= "some_db"
+           (mongo.db/db-name {:dbname "some_db"}))))
+  (testing "`dbname` is encoded in conn-uri"
+    (is (= "some_db"
+           (mongo.db/db-name {:conn-uri "mongodb://localhost/some_db"})))))
diff --git a/modules/drivers/mongo/test/metabase/driver/mongo/execute_test.clj b/modules/drivers/mongo/test/metabase/driver/mongo/execute_test.clj
index aa2d52e7a85f65f76d2635db54b1934abb4ceeb2..a4dd06b636262a9460c31c25ccc1b1e405cf99d5 100644
--- a/modules/drivers/mongo/test/metabase/driver/mongo/execute_test.clj
+++ b/modules/drivers/mongo/test/metabase/driver/mongo/execute_test.clj
@@ -3,12 +3,13 @@
    [clojure.core.async :as a]
    [clojure.test :refer :all]
    [metabase.async.streaming-response :as streaming-response]
+   [metabase.driver.mongo.conversion :as mongo.conversion]
    [metabase.driver.mongo.execute :as mongo.execute]
    [metabase.query-processor :as qp]
    [metabase.query-processor.context :as qp.context]
    [metabase.test :as mt])
   (:import
-   (com.mongodb BasicDBObject)
+   #_(com.mongodb BasicDBObject)
    (java.util NoSuchElementException)))
 
 (set! *warn-on-reflection* true)
@@ -20,7 +21,7 @@
       (next [_] (let [i @counter]
                   (vswap! counter inc)
                   (if (< i (count rows))
-                    (BasicDBObject. ^java.util.Map (get rows i))
+                    (mongo.conversion/to-document (get rows i))
                     (throw (NoSuchElementException. (str "no element at " i))))))
       (close [_]))))
 
diff --git a/modules/drivers/mongo/test/metabase/driver/mongo/util_test.clj b/modules/drivers/mongo/test/metabase/driver/mongo/util_test.clj
deleted file mode 100644
index 49b3e7678add45515443c8813055260ab76b53cd..0000000000000000000000000000000000000000
--- a/modules/drivers/mongo/test/metabase/driver/mongo/util_test.clj
+++ /dev/null
@@ -1,172 +0,0 @@
-(ns metabase.driver.mongo.util-test
-  (:require
-   [clojure.test :refer :all]
-   [metabase.driver.mongo.util :as mongo.util]
-   [metabase.driver.util :as driver.u]
-   [metabase.test :as mt])
-  (:import
-   (com.mongodb MongoClient MongoClientOptions$Builder ReadPreference ServerAddress)))
-
-(set! *warn-on-reflection* true)
-
-(defn- connect-mongo ^MongoClient [opts]
-  (let [connection-info (#'mongo.util/details->mongo-connection-info
-                         (#'mongo.util/normalize-details
-                          opts))]
-    (#'mongo.util/connect connection-info)))
-
-(def connect-passthrough
-  (fn [{map-type :type}]
-    map-type))
-
-(def srv-passthrough
-  (fn [_] {:type :srv}))
-
-(deftest ^:parallel fqdn?-test
-  (testing "test hostname is fqdn"
-    (is (true? (#'mongo.util/fqdn? "db.mongo.com")))
-    (is (true? (#'mongo.util/fqdn? "replica-01.db.mongo.com")))
-    (is (false? (#'mongo.util/fqdn? "localhost")))
-    (is (false? (#'mongo.util/fqdn? "localhost.localdomain")))))
-
-(deftest ^:parallel srv-conn-str-test
-  (testing "test srv connection string"
-    (is (= "mongodb+srv://test-user:test-pass@test-host.place.com/datadb?authSource=authdb"
-           (#'mongo.util/srv-conn-str "test-user" "test-pass" "test-host.place.com" "datadb" "authdb")))))
-
-(deftest srv-toggle-test
-  (testing "test that srv toggle works"
-    (is (= :srv
-           (with-redefs [mongo.util/srv-connection-info srv-passthrough
-                         mongo.util/connect             connect-passthrough]
-             (let [host "my.fake.domain"
-                   opts {:host               host
-                         :port               1015
-                         :user               "test-user"
-                         :authdb             "test-authdb"
-                         :pass               "test-passwd"
-                         :dbname             "test-dbname"
-                         :ssl                true
-                         :additional-options ""
-                         :use-srv            true}]
-               (connect-mongo opts)))))
-
-    (is (= :normal
-           (with-redefs [mongo.util/connect connect-passthrough]
-             (let [host "localhost"
-                   opts {:host               host
-                         :port               1010
-                         :user               "test-user"
-                         :authdb             "test-authdb"
-                         :pass               "test-passwd"
-                         :dbname             "test-dbname"
-                         :ssl                true
-                         :additional-options ""
-                         :use-srv            false}]
-               (connect-mongo opts)))))
-
-    (is (= :normal
-           (with-redefs [mongo.util/connect connect-passthrough]
-             (let [host "localhost.domain"
-                   opts {:host               host
-                         :port               1010
-                         :user               "test-user"
-                         :authdb             "test-authdb"
-                         :pass               "test-passwd"
-                         :dbname             "test-dbname"
-                         :ssl                true
-                         :additional-options ""}]
-               (connect-mongo opts)))))))
-
-(deftest ^:parallel srv-connection-properties-test
-  (testing "connection properties when using SRV"
-    (are [host msg] (thrown-with-msg? Throwable msg
-                      (connect-mongo {:host host
-                                      :port               1015
-                                      :user               "test-user"
-                                      :authdb             "test-authdb"
-                                      :pass               "test-passwd"
-                                      :dbname             "test-dbname"
-                                      :ssl                true
-                                      :additional-options ""
-                                      :use-srv            true}))
-      "db.fqdn.test" #"Unable to look up TXT record for host db.fqdn.test"
-      "local.test"   #"Using DNS SRV requires a FQDN for host")
-
-    (testing "test host and port are correct for both srv and normal"
-      (let [host                            "localhost"
-            opts                            {:host               host
-                                             :port               1010
-                                             :user               "test-user"
-                                             :authdb             "test-authdb"
-                                             :pass               "test-passwd"
-                                             :dbname             "test-dbname"
-                                             :ssl                true
-                                             :additional-options ""}
-            [^MongoClient mongo-client _db] (connect-mongo opts)
-            ^ServerAddress mongo-addr       (-> mongo-client
-                                                (.getAllAddress)
-                                                first)
-            mongo-host                      (-> mongo-addr .getHost)
-            mongo-port                      (-> mongo-addr .getPort)]
-        (is (= "localhost"
-               mongo-host))
-        (is (= 1010
-               mongo-port))))))
-
-(defn- connection-options-builder ^MongoClientOptions$Builder [details]
-  (#'mongo.util/connection-options-builder details))
-
-(deftest ^:parallel additional-connection-options-test
-  (testing "test that people can specify additional connection options like `?readPreference=nearest`"
-    (is (= (ReadPreference/nearest)
-           (.getReadPreference (-> (connection-options-builder {:additional-options "readPreference=nearest"})
-                                   .build))))
-
-    (is (= (ReadPreference/secondaryPreferred)
-           (.getReadPreference (-> (connection-options-builder {:additional-options "readPreference=secondaryPreferred"})
-                                   .build))))
-
-    (testing "make sure we can specify multiple options"
-      (let [opts (-> (connection-options-builder {:additional-options "readPreference=secondary&replicaSet=test"})
-                     .build)]
-        (is (= "test"
-               (.getRequiredReplicaSetName opts)))
-
-        (is (= (ReadPreference/secondary)
-               (.getReadPreference opts)))))
-
-    (testing "make sure that invalid additional options throw an Exception"
-      (is (thrown-with-msg?
-           IllegalArgumentException
-           #"No match for read preference of ternary"
-           (-> (connection-options-builder {:additional-options "readPreference=ternary"})
-               .build))))))
-
-(deftest ^:parallel test-ssh-connection
-  (testing "Gets an error when it can't connect to mongo via ssh tunnel"
-    (mt/test-driver :mongo
-      (is (thrown?
-           java.net.ConnectException
-           (try
-             (let [engine :mongo
-                   details {:ssl            false
-                            :password       "changeme"
-                            :tunnel-host    "localhost"
-                            :tunnel-pass    "BOGUS-BOGUS"
-                            :port           5432
-                            :dbname         "test"
-                            :host           "localhost"
-                            :tunnel-enabled true
-                            ;; we want to use a bogus port here on purpose -
-                            ;; so that locally, it gets a ConnectionRefused,
-                            ;; and in CI it does too. Apache's SSHD library
-                            ;; doesn't wrap every exception in an SshdException
-                            :tunnel-port    21212
-                            :tunnel-user    "bogus"}]
-               (driver.u/can-connect-with-details? engine details :throw-exceptions))
-             (catch Throwable e
-               (loop [^Throwable e e]
-                 (or (when (instance? java.net.ConnectException e)
-                       (throw e))
-                     (some-> (.getCause e) recur))))))))))
diff --git a/modules/drivers/mongo/test/metabase/driver/mongo_test.clj b/modules/drivers/mongo/test/metabase/driver/mongo_test.clj
index f50e697ab0c2dcd3e871c00ef23e70a353751d73..d644abfda68cea76cb7a03494440f1d2132ac1bd 100644
--- a/modules/drivers/mongo/test/metabase/driver/mongo_test.clj
+++ b/modules/drivers/mongo/test/metabase/driver/mongo_test.clj
@@ -8,6 +8,7 @@
    [metabase.db.metadata-queries :as metadata-queries]
    [metabase.driver :as driver]
    [metabase.driver.mongo :as mongo]
+   [metabase.driver.mongo.connection :as mongo.connection]
    [metabase.driver.mongo.query-processor :as mongo.qp]
    [metabase.driver.mongo.util :as mongo.util]
    [metabase.driver.util :as driver.u]
@@ -25,8 +26,6 @@
    [metabase.test.data.interface :as tx]
    [metabase.test.data.mongo :as tdm]
    [metabase.util.log :as log]
-   [monger.collection :as mcoll]
-   [monger.core :as mg]
    [taoensso.nippy :as nippy]
    [toucan2.core :as t2]
    [toucan2.tools.with-temp :as t2.with-temp])
@@ -73,9 +72,11 @@
                                                              :port   3000
                                                              :dbname "bad-db-name?connectTimeoutMS=50"}
                                                   :expected false}
-                                                 {:details  {:conn-uri "mongodb://metabase:metasample123@localhost:27017/test-data?authSource=admin"}
+                                                 {:details  {:use-conn-uri true
+                                                             :conn-uri "mongodb://metabase:metasample123@localhost:27017/test-data?authSource=admin"}
                                                   :expected (not (tdm/ssl-required?))}
-                                                 {:details  {:conn-uri "mongodb://localhost:3000/bad-db-name?connectTimeoutMS=50"}
+                                                 {:details  {:use-conn-uri true
+                                                             :conn-uri "mongodb://localhost:3000/bad-db-name?connectTimeoutMS=50"}
                                                   :expected false}]
              :let [ssl-details (tdm/conn-details details)]]
        (testing (str "connect with " details)
@@ -244,16 +245,16 @@
          (is (true? (t2/select-one-fn :database_indexed :model/Field (mt/id :singly-index :indexed))))
          (is (false? (t2/select-one-fn :database_indexed :model/Field (mt/id :singly-index :not-indexed)))))
 
-       (testing "compount index"
-         (mongo.util/with-mongo-connection [conn (mt/db)]
-           (mcoll/create-index conn "compound-index" (array-map "first" 1 "second" 1)))
-         (sync/sync-database! (mt/db))
-         (is (true? (t2/select-one-fn :database_indexed :model/Field (mt/id :compound-index :first))))
-         (is (false? (t2/select-one-fn :database_indexed :model/Field (mt/id :compound-index :second)))))
+        (testing "compount index"
+          (mongo.connection/with-mongo-database [db (mt/db)]
+            (mongo.util/create-index (mongo.util/collection db "compound-index") (array-map "first" 1 "second" 1)))
+          (sync/sync-database! (mt/db))
+          (is (true? (t2/select-one-fn :database_indexed :model/Field (mt/id :compound-index :first))))
+          (is (false? (t2/select-one-fn :database_indexed :model/Field (mt/id :compound-index :second)))))
 
        (testing "multi key index"
-         (mongo.util/with-mongo-connection [conn (mt/db)]
-           (mcoll/create-index conn "multi-key-index" (array-map "url.small" 1)))
+         (mongo.connection/with-mongo-database [db (mt/db)]
+           (mongo.util/create-index (mongo.util/collection db "multi-key-index") (array-map "url.small" 1)))
          (sync/sync-database! (mt/db))
          (is (false? (t2/select-one-fn :database_indexed :model/Field :name "url")))
          (is (true? (t2/select-one-fn :database_indexed :model/Field :name "small"))))
@@ -286,38 +287,41 @@
       (try
        (let [describe-indexes (fn [table-name]
                                 (driver/describe-table-indexes :mongo (mt/db) (t2/select-one :model/Table (mt/id table-name))))]
-         (mongo.util/with-mongo-connection [conn (mt/db)]
+         (mongo.connection/with-mongo-database [db (mt/db)]
            (testing "single column index"
-             (mcoll/create-index conn "singly-index" {"a" 1})
+             (mongo.util/create-index (mongo.util/collection db "singly-index") {"a" 1})
              (is (= #{{:type :normal-column-index :value "_id"}
                       {:type :normal-column-index :value "a"}}
                     (describe-indexes :singly-index))))
 
            (testing "compound index column index"
-             (mcoll/create-index conn "compound-index" (array-map "a" 1 "b" 1 "c" 1)) ;; first index column is :a
-             (mcoll/create-index conn "compound-index" (array-map "e" 1 "d" 1 "f" 1)) ;; first index column is :e
+             ;; first index column is :a
+             (mongo.util/create-index (mongo.util/collection db "compound-index") (array-map :a 1 :b 1 :c 1))
+             ;; first index column is :e
+             (mongo.util/create-index (mongo.util/collection db "compound-index") (array-map :e 1 :d 1 :f 1))
              (is (= #{{:type :normal-column-index :value "_id"}
                       {:type :normal-column-index :value "a"}
                       {:type :normal-column-index :value "e"}}
                     (describe-indexes :compound-index))))
 
            (testing "compound index that has many keys can still determine the first key"
-             (mcoll/create-index conn "compound-index-big"
-                                 (array-map "j" 1 "b" 1 "c" 1 "d" 1 "e" 1 "f" 1 "g" 1 "h" 1 "a" 1)) ;; first index column is :j
+              ;; first index column is :j
+             (mongo.util/create-index (mongo.util/collection db "compound-index-big")
+                                 (array-map "j" 1 "b" 1 "c" 1 "d" 1 "e" 1 "f" 1 "g" 1 "h" 1 "a" 1))
              (is (= #{{:type :normal-column-index :value "_id"}
                       {:type :normal-column-index :value "j"}}
                     (describe-indexes :compound-index-big))))
 
            (testing "multi key indexes"
-             (mcoll/create-index conn "multi-key-index" (array-map "a.b" 1))
+             (mongo.util/create-index (mongo.util/collection db "multi-key-index") (array-map "a.b" 1))
              (is (= #{{:type :nested-column-index :value ["a" "b"]}
                       {:type :normal-column-index :value "_id"}}
                     (describe-indexes :multi-key-index))))
 
            (testing "advanced-index: hashed index, text index, geospatial index"
-             (mcoll/create-index conn "advanced-index" (array-map "hashed-field" "hashed"))
-             (mcoll/create-index conn "advanced-index" (array-map "text-field" "text"))
-             (mcoll/create-index conn "advanced-index" (array-map "geospatial-field" "2d"))
+             (mongo.util/create-index (mongo.util/collection db "advanced-index") (array-map "hashed-field" "hashed"))
+             (mongo.util/create-index (mongo.util/collection db "advanced-index") (array-map "text-field" "text"))
+             (mongo.util/create-index (mongo.util/collection db "advanced-index") (array-map "geospatial-field" "2d"))
              (is (= #{{:type :normal-column-index :value "geospatial-field"}
                       {:type :normal-column-index :value "hashed-field"}
                       {:type :normal-column-index :value "_id"}
@@ -603,17 +607,18 @@
         (tx/destroy-db! :mongo dbdef)
         (let [details (tx/dbdef->connection-details :mongo :db dbdef)]
           ;; load rows
-          (mongo.util/with-mongo-connection [conn details]
-            (doseq [[i row] (map-indexed vector row-maps)
-                    :let    [row (assoc row :_id (inc i))]]
-              (try
-                (mcoll/insert conn collection-name row)
-                (catch Throwable e
-                  (throw (ex-info (format "Error inserting row: %s" (ex-message e))
-                                  {:database database-name, :collection collection-name, :details details, :row row}
-                                  e)))))
-            (log/infof "Inserted %d rows into %s collection %s."
-                       (count row-maps) (pr-str database-name) (pr-str collection-name)))
+          (mongo.connection/with-mongo-database [db details]
+            (let [coll (mongo.util/collection db collection-name)]
+              (doseq [[i row] (map-indexed vector row-maps)
+                      :let    [row (assoc row :_id (inc i))]]
+                (try
+                  (mongo.util/insert-one coll row)
+                  (catch Throwable e
+                    (throw (ex-info (format "Error inserting row: %s" (ex-message e))
+                                    {:database database-name, :collection collection-name, :details details, :row row}
+                                    e)))))
+              (log/infof "Inserted %d rows into %s collection %s."
+                         (count row-maps) (pr-str database-name) (pr-str collection-name))))
           ;; now sync the Database.
           (let [db (first (t2/insert-returning-instances! Database {:name database-name, :engine "mongo", :details details}))]
             (sync/sync-database! db)
@@ -662,15 +667,15 @@
 (deftest strange-versionArray-test
   (mt/test-driver :mongo
     (testing "Negative values in versionArray are ignored (#29678)"
-      (with-redefs [mg/command (constantly {"version" "4.0.28-23"
-                                            "versionArray" [4 0 29 -100]})]
+      (with-redefs [mongo.util/run-command (constantly {"version" "4.0.28-23"
+                                                       "versionArray" [4 0 29 -100]})]
         (is (= {:version "4.0.28-23"
                 :semantic-version [4 0 29]}
                (driver/dbms-version :mongo (mt/db))))))
 
     (testing "Any values after rubbish in versionArray are ignored"
-      (with-redefs [mg/command (constantly {"version" "4.0.28-23"
-                                            "versionArray" [4 0 "NaN" 29]})]
+      (with-redefs [mongo.util/run-command (constantly {"version" "4.0.28-23"
+                                                       "versionArray" [4 0 "NaN" 29]})]
         (is (= {:version "4.0.28-23"
                 :semantic-version [4 0]}
                (driver/dbms-version :mongo (mt/db))))))))
diff --git a/modules/drivers/mongo/test/metabase/test/data/mongo.clj b/modules/drivers/mongo/test/metabase/test/data/mongo.clj
index 33a4bfacec0a951da4f91d1e0d05cf43cfd41cc9..b09c1858766feb51e9c3de89ef3e20794bafd803 100644
--- a/modules/drivers/mongo/test/metabase/test/data/mongo.clj
+++ b/modules/drivers/mongo/test/metabase/test/data/mongo.clj
@@ -9,12 +9,12 @@
    [metabase.config :as config]
    [metabase.driver :as driver]
    [metabase.driver.ddl.interface :as ddl.i]
-   [metabase.driver.mongo.util :refer [with-mongo-connection]]
-   [metabase.test.data.interface :as tx]
-   [monger.collection :as mcoll]
-   [monger.core :as mg])
+   [metabase.driver.mongo.connection :as mongo.connection]
+   [metabase.driver.mongo.util :as mongo.util]
+   [metabase.test.data.interface :as tx])
   (:import
-   (com.fasterxml.jackson.core JsonGenerator)))
+   (com.fasterxml.jackson.core JsonGenerator)
+   (com.mongodb.client MongoDatabase)))
 
 (set! *warn-on-reflection* true)
 
@@ -58,8 +58,8 @@
                    {:pass password}))))
 
 (defn- destroy-db! [driver dbdef]
-  (with-mongo-connection [^com.mongodb.DB mongo-connection (tx/dbdef->connection-details driver :server dbdef)]
-    (mg/drop-db (.getMongo mongo-connection) (tx/escaped-database-name dbdef))))
+  (mongo.connection/with-mongo-database [^MongoDatabase db (tx/dbdef->connection-details driver :server dbdef)]
+    (.drop db)))
 
 (def ^:dynamic *remove-nil?*
   "When creating a dataset, omit any nil-valued fields from the documents."
@@ -69,20 +69,21 @@
   [driver {:keys [table-definitions], :as dbdef} & {:keys [skip-drop-db?], :or {skip-drop-db? false}}]
   (when-not skip-drop-db?
     (destroy-db! driver dbdef))
-  (with-mongo-connection [mongo-db (tx/dbdef->connection-details driver :db dbdef)]
+  (mongo.connection/with-mongo-database [^MongoDatabase db (tx/dbdef->connection-details driver :db dbdef)]
     (doseq [{:keys [field-definitions table-name rows]} table-definitions]
       (doseq [{:keys [field-name indexed?]} field-definitions]
         (when indexed?
-          (mcoll/create-index mongo-db table-name {field-name 1})))
+          (mongo.util/create-index (mongo.util/collection db table-name) {field-name 1})))
       (let [field-names (for [field-definition field-definitions]
                           (keyword (:field-name field-definition)))]
         ;; Use map-indexed so we can get an ID for each row (index + 1)
         (doseq [[i row] (map-indexed vector rows)]
           (try
             ;; Insert each row
-            (mcoll/insert mongo-db (name table-name) (into (ordered-map/ordered-map :_id (inc i))
-                                                           (cond->> (zipmap field-names row)
-                                                             *remove-nil?* (m/remove-vals nil?))))
+            (mongo.util/insert-one (mongo.util/collection db (name table-name))
+                                  (into (ordered-map/ordered-map :_id (inc i))
+                                        (cond->> (zipmap field-names row)
+                                          *remove-nil?* (m/remove-vals nil?))))
             ;; If row already exists then nothing to do
             (catch com.mongodb.MongoException _)))))))
 
diff --git a/src/metabase/driver/util.clj b/src/metabase/driver/util.clj
index aec5c429438c8ca0ebf922301c36bdac1162e811..91ce5b7fbe85732e054bd9908a3f8565d6789487 100644
--- a/src/metabase/driver/util.clj
+++ b/src/metabase/driver/util.clj
@@ -597,15 +597,20 @@
     (.init trust-manager-factory trust-store)
     (.getTrustManagers trust-manager-factory)))
 
-(defn ssl-socket-factory
-  "Generates an `SocketFactory` with the custom certificates added"
-  ^SocketFactory [& {:keys [private-key own-cert trust-cert]}]
+(defn ssl-context
+  "Generates a `SSLContext` with the custom certificates added."
+  ^javax.net.ssl.SSLContext [& {:keys [private-key own-cert trust-cert]}]
   (let [ssl-context (SSLContext/getInstance "TLS")]
     (.init ssl-context
            (when (and private-key own-cert) (key-managers private-key (str (random-uuid)) own-cert))
            (when trust-cert (trust-managers trust-cert))
            nil)
-    (.getSocketFactory ssl-context)))
+    ssl-context))
+
+(defn ssl-socket-factory
+  "Generates a `SocketFactory` with the custom certificates added."
+  ^SocketFactory [& {:keys [_private-key _own-cert _trust-cert] :as args}]
+    (.getSocketFactory (ssl-context args)))
 
 (def default-sensitive-fields
   "Set of fields that should always be obfuscated in API responses, as they contain sensitive data."