Skip to content
Snippets Groups Projects
Commit d5fed0e3 authored by Allen Gilliland's avatar Allen Gilliland
Browse files

Merge pull request #2184 from metabase/fix-sql-conn-pools

Fix issue with stale SQL connection pools consuming unneeded connections
parents a876156c 7b130ad9
Branches
Tags
No related merge requests found
......@@ -127,7 +127,7 @@
:engine engine
:details details
:is_full_sync is_full_sync))
(Database id))
(events/publish-event :database-update (Database id)))
;; failed to connect, return error
{:status 400
:body conn-error}))))
......
......@@ -166,6 +166,10 @@
"*OPTIONAL*. Return a humanized (user-facing) version of an connection error message string.
Generic error messages are provided in the constant `connection-error-messages`; return one of these whenever possible.")
(notify-database-updated [this, ^DatabaseInstance database]
"*OPTIONAL*. Notify the driver that the attributes of the DATABASE have changed. This is specifically relevant in
the event that the driver was doing some caching or connection pooling.")
(process-native [this, {^Integer database-id :database, {^String native-query :query} :native, :as ^Map query}]
"Process a native QUERY. This function is called by `metabase.driver/process-query`.
......@@ -253,6 +257,7 @@
:describe-table-fks (constantly nil)
:features (constantly nil)
:humanize-connection-error-message (u/drop-first-arg identity)
:notify-database-updated (constantly nil)
:process-query-in-context (u/drop-first-arg identity)
:sync-in-context (fn [_ _ f] (f))
:table-rows-seq (constantly nil)})
......
......@@ -15,6 +15,7 @@
(:import java.sql.DatabaseMetaData
java.util.Map
clojure.lang.Keyword
com.mchange.v2.c3p0.ComboPooledDataSource
(metabase.driver.query_processor.interface Field Value)))
(declare korma-entity)
......@@ -65,9 +66,6 @@
Return `nil` to prevent FIELD from being aliased.")
(get-connection-for-sync ^java.sql.Connection [this details]
"*OPTIONAL*. Get a connection used for a Sync step. By default, this returns a pooled connection.")
(prepare-value [this, ^Value value]
"*OPTIONAL*. Prepare a value (e.g. a `String` or `Integer`) that will be used in a korma form. By default, this returns VALUE's `:value` as-is, which
is eventually passed as a parameter in a prepared statement. Drivers such as BigQuery that don't support prepared statements can skip this
......@@ -89,38 +87,53 @@
SECONDS-OR-MILLISECONDS refers to the resolution of the int in question and with be either `:seconds` or `:milliseconds`."))
(def ^{:arglists '([connection-spec])}
connection-spec->pooled-connection-spec
(def ^:dynamic ^:private connection-pools
"A map of our currently open connection pools, keyed by DATABASE `:id`."
(atom {}))
(defn- create-connection-pool
"Create a new C3P0 `ComboPooledDataSource` for connecting to the given DATABASE."
[{:keys [id engine details]}]
(log/debug (u/format-color 'magenta "Creating new connection pool for database %d ..." id))
(let [spec (connection-details->spec (driver/engine->driver engine) details)]
(kdb/connection-pool (assoc spec :minimum-pool-size 1
;; prevent broken connections closed by dbs by testing them every 3 mins
:idle-connection-test-period (* 3 60)
;; prevent overly large pools by condensing them when connections are idle for 15m+
:excess-timeout (* 15 60)))))
(defn- notify-database-updated
"We are being informed that a DATABASE has been updated, so lets shut down the connection pool (if it exists) under
the assumption that the connection details have changed."
[_ {:keys [id]}]
(when-let [pool (get @connection-pools id)]
(log/debug (u/format-color 'red "Closing connection pool for database %d ..." id))
;; remove the cached reference to the pool so we don't try to use it anymore
(swap! connection-pools dissoc id)
;; now actively shut down the pool so that any open connections are closed
(.close ^ComboPooledDataSource (:datasource pool))))
(defn db->pooled-connection-spec
"Return a JDBC connection spec that includes a cp30 `ComboPooledDataSource`.
Theses connection pools are cached so we don't create multiple ones to the same DB.
Pools are destroyed after they aren't used for more than 3 hours."
(memoize/ttl
(fn [spec]
(log/debug (u/format-color 'magenta "Creating new connection pool..."))
(kdb/connection-pool (assoc spec :minimum-pool-size 1
;; prevent broken connections closed by dbs by testing them every 3 mins
:idle-connection-test-period (* 3 60)
;; prevent overly large pools by condensing them when connections are idle for 15m+
:excess-timeout (* 15 60))))
:ttl/threshold (* 6 60 60 1000)))
Theses connection pools are cached so we don't create multiple ones to the same DB."
[{:keys [id], :as database}]
(if (contains? @connection-pools id)
;; we have an existing pool for this database, so use it
(get @connection-pools id)
;; create a new pool and add it to our cache, then return it
(u/prog1 (create-connection-pool database)
(swap! connection-pools assoc id <>))))
(defn db->jdbc-connection-spec
"Return a JDBC connection spec for DATABASE. Normally this will have a C3P0 pool as its datasource, unless the database is `short-lived`."
{:arglists '([database] [driver details])}
;; TODO - I don't think short-lived? key is really needed anymore. It's only used by unit tests, and its original purpose was for creating temporary DBs;
;; since we don't destroy databases at the end of each test anymore, it's probably time to remove this
([{:keys [engine details]}]
(db->jdbc-connection-spec (driver/engine->driver engine) details))
([driver {:keys [short-lived?], :as details}]
(let [connection-spec (connection-details->spec driver details)]
(if short-lived?
connection-spec
(connection-spec->pooled-connection-spec connection-spec)))))
(def ^{:arglists '([database] [driver details])}
db->connection
"Return a [possibly pooled] connection to DATABASE. Make sure to close this when you're done! (e.g. by using `with-open`)"
(comp jdbc/get-connection db->jdbc-connection-spec))
[{:keys [engine details], :as database}]
(if (:short-lived? details)
;; short-lived connections are not pooled, so just return a non-pooled spec
(connection-details->spec (driver/engine->driver engine) details)
;; default behavior is to use a pooled connection
(db->pooled-connection-spec database)))
(defn escape-field-name
......@@ -210,8 +223,8 @@
(defmacro with-metadata
"Execute BODY with `java.sql.DatabaseMetaData` for DATABASE."
[[binding driver database] & body]
`(with-open [^java.sql.Connection conn# (get-connection-for-sync ~driver (:details ~database))]
[[binding _ database] & body]
`(with-open [^java.sql.Connection conn# (jdbc/get-connection (db->jdbc-connection-spec ~database))]
(let [~binding (.getMetaData conn#)]
~@body)))
......@@ -309,7 +322,6 @@
:current-datetime-fn (constantly (k/sqlfn* :NOW))
:excluded-schemas (constantly nil)
:field->alias (u/drop-first-arg name)
:get-connection-for-sync db->connection
:prepare-value (u/drop-first-arg :value)
:set-timezone-sql (constantly nil)
:stddev-fn (constantly :STDDEV)})
......@@ -321,16 +333,17 @@
(require 'metabase.driver.generic-sql.native
'metabase.driver.generic-sql.query-processor)
(merge driver/IDriverDefaultsMixin
{:analyze-table analyze-table
:can-connect? can-connect?
:describe-database describe-database
:describe-table describe-table
:describe-table-fks describe-table-fks
:features features
:field-values-lazy-seq field-values-lazy-seq
:process-native (resolve 'metabase.driver.generic-sql.native/process-and-run)
:process-structured (resolve 'metabase.driver.generic-sql.query-processor/process-structured)
:table-rows-seq table-rows-seq}))
{:analyze-table analyze-table
:can-connect? can-connect?
:describe-database describe-database
:describe-table describe-table
:describe-table-fks describe-table-fks
:features features
:field-values-lazy-seq field-values-lazy-seq
:notify-database-updated notify-database-updated
:process-native (resolve 'metabase.driver.generic-sql.native/process-and-run)
:process-structured (resolve 'metabase.driver.generic-sql.query-processor/process-structured)
:table-rows-seq table-rows-seq}))
......@@ -338,14 +351,10 @@
(defn- db->korma-db
"Return a Korma DB spec for Metabase DATABASE."
([database]
(db->korma-db (driver/engine->driver (:engine database)) (:details database)))
([driver details]
(assoc (kx/create-db (connection-details->spec driver details))
:pool (db->jdbc-connection-spec driver details))))
(defn- table+db->entity [{schema :schema, table-name :name} db]
(k/database (kx/create-entity [schema table-name]) db))
[{:keys [details engine], :as database}]
(let [spec (connection-details->spec (driver/engine->driver engine) details)]
(assoc (kx/create-db spec)
:pool (db->jdbc-connection-spec database))))
(defn korma-entity
"Return a Korma entity for [DB and] TABLE.
......@@ -353,8 +362,8 @@
(-> (sel :one Table :id 100)
korma-entity
(select (aggregate (count :*) :count)))"
([table]
(korma-entity (table/database table) table))
([db table] (table+db->entity table (db->korma-db db)))
([driver details table] (table+db->entity table (db->korma-db driver details))))
([table] (korma-entity (table/database table) table))
([db table] (let [{schema :schema, table-name :name} table]
(k/database
(kx/create-entity [schema table-name])
(db->korma-db db)))))
......@@ -16,7 +16,7 @@
(defn process-and-run
"Process and run a native (raw SQL) QUERY."
[driver {{sql :query} :native, database-id :database, settings :settings}]
(try (let [database (sel :one :fields [Database :engine :details] :id database-id)
(try (let [database (sel :one :fields [Database :id :engine :details] :id database-id)
db-conn (sql/db->jdbc-connection-spec database)]
(jdbc/with-db-transaction [t-conn db-conn]
......
(ns metabase.events.driver-notifications
(:require [clojure.core.async :as async]
[clojure.tools.logging :as log]
[metabase.db :as db]
[metabase.driver :as driver]
[metabase.events :as events]
[metabase.models.database :refer [Database]]))
(def ^:const ^:private driver-notifications-topics
"The `Set` of event topics which are subscribed to for use in driver notifications."
#{:database-update})
(def ^:private driver-notifications-channel
"Channel for receiving event notifications we want to subscribe to for driver notifications events."
(async/chan))
;;; ## ---------------------------------------- EVENT PROCESSING ----------------------------------------
(defn process-driver-notifications-event
"Handle processing for a single event notification received on the driver-notifications-channel"
[driver-notifications-event]
;; try/catch here to prevent individual topic processing exceptions from bubbling up. better to handle them here.
(when-let [{topic :topic database :item} driver-notifications-event]
(try
;; notify the appropriate driver about the updated database
(driver/notify-database-updated (driver/engine->driver (:engine database)) database)
(catch Throwable e
(log/warn (format "Failed to process driver notifications event. %s" (:topic driver-notifications-event)) e)))))
;;; ## ---------------------------------------- LIFECYLE ----------------------------------------
(defn events-init
"Automatically called during startup; start event listener for database sync events."
[]
(events/start-event-listener driver-notifications-topics driver-notifications-channel process-driver-notifications-event))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment