Skip to content
Snippets Groups Projects
Unverified Commit b4d8e35a authored by Jeff Evans's avatar Jeff Evans Committed by GitHub
Browse files

Implement ssh tunnel reconnection (#14563)

* Implement ssh tunnel reconnection

From the connection-with-timezone method of execute, check whether an ssh tunnel that should be open actually is not, and if so, mark the entire pool as invalid (thereby forcing the connection code to rebuild the source and open a new tunnel)

Fixing the create-pool! function so that the relevant ssh tunnel entries are kept (in addition to the :datasource)

Adding test in ssh-test namespace, which will test that the tunnel is reestablished (for now, running with Postgres driver)

* Responding to PR feedback from Dan

* Fixing test by adding AcceptAllForwardingFilter forwardingFilter to the mock password server instance

* Change with-driver to test-driver

* Add ssh tunnel reconnect test that can run against H2

* Implement ssh tunnel reconnection

From the connection-with-timezone method of execute, check whether an ssh tunnel that should be open actually is not, and if so, mark the entire pool as invalid (thereby forcing the connection code to rebuild the source and open a new tunnel)

Fixing the create-pool! function so that the relevant ssh tunnel entries are kept (in addition to the :datasource)

Adding test in ssh-test namespace, which will test that the tunnel is reestablished (for now, running with Postgres driver)

* Responding to PR feedback from Dan

* Fixing test by adding AcceptAllForwardingFilter forwardingFilter to the mock password server instance

* Rebase again to fix merge conflict

* Change test-driver to with-driver in hopes of making CodeCov finally happy

* Adding new multimethod to ssh namespace, called incorporate-ssh-tunnel-details, for accounting for the

Implementing incorporate-ssh-tunnel-details for h2 so update the URI string (:db key) to point to the tunnel entry point

Pulled logic for :sql-jdbc implementation of connection-with-timezone out to a new fn, so it can be called elsewhere

Updating H2 reconnection test imn light of the changes above

Added "!" suffix to name of include-ssh-tunnel to reflect the fact that it does modify global state

* Fixing NPE in incorporate-ssh-tunnel-details implementation for :h2

Moving multimethod declaration for incorporate-ssh-tunnel-details to driver namespace

* Fix :h2 implementation again

* Remove another errant extra line :(

* Rebase onto master

* Inline the private helper fn back into connection-with-timezone

* Remove dead code
parent 6bdd5e09
No related merge requests found
......@@ -81,7 +81,7 @@
(defmethod driver/can-connect? :oracle
[driver details]
(let [connection (sql-jdbc.conn/connection-details->spec driver (ssh/include-ssh-tunnel details))]
(let [connection (sql-jdbc.conn/connection-details->spec driver (ssh/include-ssh-tunnel! details))]
(= 1M (first (vals (first (jdbc/query connection ["SELECT 1 FROM dual"])))))))
(defmethod driver/db-start-of-week :oracle
......
......@@ -594,3 +594,11 @@
{:added "0.37.0" :arglists '([driver])}
dispatch-on-initialized-driver
:hierarchy #'hierarchy)
(defmulti incorporate-ssh-tunnel-details
"A multimethod for driver-specific behavior required to incorporate details for an opened SSH tunnel into the DB
details. In most cases, this will simply involve updating the :host and :port (to point to the tunnel entry point,
instead of the backing database server), but some drivers may have more specific behavior."
{:added "0.39.0" :arglists '([driver db-details])}
dispatch-on-uninitialized-driver
:hierarchy #'hierarchy)
(ns metabase.driver.h2
(:require [clojure.string :as str]
[clojure.tools.logging :as log]
[honeysql.core :as hsql]
[java-time :as t]
[metabase.db.jdbc-protocols :as jdbc-protocols]
......@@ -15,7 +16,8 @@
[metabase.query-processor.store :as qp.store]
[metabase.util :as u]
[metabase.util.honeysql-extensions :as hx]
[metabase.util.i18n :refer [deferred-tru tru]])
[metabase.util.i18n :refer [deferred-tru tru]]
[metabase.util.ssh :as ssh])
(:import [java.sql Clob ResultSet ResultSetMetaData]
java.time.OffsetTime))
......@@ -322,3 +324,14 @@
[driver prepared-statement i t]
(let [local-time (t/local-time (t/with-offset-same-instant t (t/zone-offset 0)))]
(sql-jdbc.execute/set-parameter driver prepared-statement i local-time)))
(defmethod driver/incorporate-ssh-tunnel-details :h2
[_ db-details]
(if (and (:tunnel-enabled db-details) (ssh/ssh-tunnel-open? db-details))
(if (and (:db db-details) (str/starts-with? (:db db-details) "tcp://"))
(let [details (ssh/include-ssh-tunnel! db-details)
db (:db details)]
(assoc details :db (str/replace-first db (str (:orig-port details)) (str (:tunnel-entrance-port details)))))
(do (log/error (tru "SSH tunnel can only be established for H2 connections using the TCP protocol"))
db-details))
db-details))
......@@ -9,7 +9,7 @@
;;; --------------------------------------------------- Hierarchy ----------------------------------------------------
(defonce ^{:doc "Driver hierarchy. Used by driver multimethods for dispatch. Add new drivers with `regsiter!`."}
(defonce ^{:doc "Driver hierarchy. Used by driver multimethods for dispatch. Add new drivers with `register!`."}
hierarchy
(make-hierarchy))
......
......@@ -83,10 +83,13 @@
[{:keys [id details], driver :engine, :as database}]
{:pre [(map? database)]}
(log/debug (u/format-color 'cyan (trs "Creating new connection pool for {0} database {1} ..." driver id)))
(let [details-with-tunnel (ssh/include-ssh-tunnel details) ;; If the tunnel is disabled this returned unchanged
(let [details-with-tunnel (driver/incorporate-ssh-tunnel-details driver details) ;; If the tunnel is disabled this returned unchanged
spec (connection-details->spec driver details-with-tunnel)
properties (data-warehouse-connection-pool-properties driver)]
(connection-pool/connection-pool-spec spec properties)))
(merge
(connection-pool/connection-pool-spec spec properties)
;; also capture entries related to ssh tunneling for later use
(select-keys spec [:tunnel-enabled :tunnel-session :tunnel-tracker :tunnel-entrance-port :tunnel-entrance-host]))))
(defn- destroy-pool! [database-id pool-spec]
(log/debug (u/format-color 'red (trs "Closing old connection pool for database {0} ..." database-id)))
......@@ -112,12 +115,21 @@
(destroy-pool! database-id old-pool-spec))))
nil)
(defn invalidate-pool-for-db!
"Invalidates the connection pool for the given database by closing it and removing it from the cache."
[database]
(set-pool! (u/the-id database) nil))
(defn notify-database-updated
"Default implementation of `driver/notify-database-updated` for JDBC SQL drivers. We are being informed that a
`database` has been updated, so lets shut down the connection pool (if it exists) under the assumption that the
connection details have changed."
[database]
(set-pool! (u/get-id database) nil))
(invalidate-pool-for-db! database))
(defn- log-ssh-tunnel-reconnect-msg! [db-id]
(log/warn (u/format-color 'red (trs "ssh tunnel for database {0} looks closed; marking pool invalid to reopen it"
db-id))))
(defn db->pooled-connection-spec
"Return a JDBC connection spec that includes a cp30 `ComboPooledDataSource`. These connection pools are cached so we
......@@ -126,10 +138,23 @@
(cond
;; db-or-id-or-spec is a Database instance or an integer ID
(u/id db-or-id-or-spec)
(let [database-id (u/get-id db-or-id-or-spec)]
(let [database-id (u/the-id db-or-id-or-spec)
get-fn (fn [db-id log-tunnel-check]
(when-let [details (get @database-id->connection-pool db-id)]
(cond (nil? (:tunnel-session details))
;; no tunnel in use; valid
details
(ssh/ssh-tunnel-open? details)
;; tunnel in use, and open; valid
details
:default
;; tunnel in use, and not open; invalid
(do (when log-tunnel-check
(log-ssh-tunnel-reconnect-msg! db-id))
nil))))]
(or
;; we have an existing pool for this database, so use it
(get @database-id->connection-pool database-id)
(get-fn database-id true)
;; Even tho `set-pool!` will properly shut down old pools if two threads call this method at the same time, we
;; don't want to end up with a bunch of simultaneous threads creating pools only to have them destroyed the
;; very next instant. This will cause their queries to fail. Thus we should do the usual locking here and make
......@@ -137,7 +162,7 @@
(locking database-id->connection-pool
(or
;; check if another thread created the pool while we were waiting to acquire the lock
(get @database-id->connection-pool database-id)
(get-fn database-id false)
;; create a new pool and add it to our cache, then return it
(let [db (or (db/select-one [Database :id :engine :details] :id database-id)
(throw (ex-info (tru "Database {0} does not exist." database-id)
......@@ -157,7 +182,6 @@
;; don't log the actual spec lest we accidentally expose credentials
{:input (class db-or-id-or-spec)}))))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | metabase.driver impls |
;;; +----------------------------------------------------------------------------------------------------------------+
......@@ -166,7 +190,7 @@
"Return an appropriate JDBC connection spec to test whether a set of connection details is valid (i.e., implementing
`can-connect?`)."
[driver details]
(let [details-with-tunnel (ssh/include-ssh-tunnel details)]
(let [details-with-tunnel (driver/incorporate-ssh-tunnel-details driver details)]
(connection-details->spec driver details-with-tunnel)))
(defn can-connect-with-spec?
......
(ns metabase.util.ssh
(:require [clojure.tools.logging :as log]
[metabase.driver :as driver]
[metabase.public-settings :as public-settings]
[metabase.util :as u])
(:import java.io.ByteArrayInputStream
......@@ -120,7 +121,13 @@
[details]
(:tunnel-enabled details))
(defn include-ssh-tunnel
(defn ssh-tunnel-open?
"Is the SSH tunnel currently open for these connection details?"
[details]
(when-let [session (:tunnel-session details)]
(.isOpen ^ClientSession session)))
(defn include-ssh-tunnel!
"Updates connection details for a data warehouse to use the ssh tunnel host and port
For drivers that enter hosts including the protocol (https://host), copy the protocol over as well"
[details]
......@@ -129,27 +136,42 @@
[session ^PortForwardingTracker tracker] (start-ssh-tunnel! (assoc details :host host))
tunnel-entrance-port (.. tracker getBoundAddress getPort)
tunnel-entrance-host (.. tracker getBoundAddress getHostName)
orig-port (:port details)
details-with-tunnel (assoc details
:port tunnel-entrance-port ;; This parameter is set dynamically when the connection is established
:host (str proto "localhost") ;; SSH tunnel will always be through localhost
:orig-port orig-port
:tunnel-entrance-host tunnel-entrance-host
:tunnel-entrance-port tunnel-entrance-port ;; the input port is not known until the connection is opened
:tunnel-enabled true
:tunnel-session session
:tunnel-tracker tracker)]
details-with-tunnel)
details))
(defmethod driver/incorporate-ssh-tunnel-details :sql-jdbc
[_ db-details]
(cond (not (use-ssh-tunnel? db-details))
;; no ssh tunnel in use
db-details
(ssh-tunnel-open? db-details)
;; tunnel in use, and is open
db-details
:default
;; tunnel in use, and is not open
(include-ssh-tunnel! db-details)))
(defn close-tunnel!
"Close a running tunnel session"
[details]
(when (use-ssh-tunnel? details)
(when (and (use-ssh-tunnel? details) (ssh-tunnel-open? details))
(.close ^ClientSession (:tunnel-session details))))
(defn do-with-ssh-tunnel
"Starts an SSH tunnel, runs the supplied function with the tunnel open, then closes it"
[details f]
(if (use-ssh-tunnel? details)
(let [details-with-tunnel (include-ssh-tunnel details)]
(let [details-with-tunnel (include-ssh-tunnel! details)]
(try
(log/trace (u/format-color 'cyan "<< OPENED SSH TUNNEL >>"))
(f details-with-tunnel)
......
......@@ -2,12 +2,22 @@
(:require [clojure.java.io :as io]
[clojure.test :refer :all]
[clojure.tools.logging :as log]
[metabase.models.database :refer [Database]]
[metabase.driver.sql-jdbc.connection :as sql-jdbc.conn]
[metabase.query-processor :as qp]
[metabase.query-processor-test :as qp.test]
[metabase.sync :as sync]
[metabase.test :as mt]
[metabase.util :as u]
[metabase.util.ssh :as sshu])
[metabase.util.ssh :as ssh]
[metabase.sync :as sync]
[metabase.test :as mt]
[metabase.test.data.interface :as tx])
(:import [java.io BufferedReader InputStreamReader PrintWriter]
[java.net InetSocketAddress ServerSocket Socket]
org.apache.sshd.server.forward.AcceptAllForwardingFilter
org.apache.sshd.server.SshServer))
org.apache.sshd.server.SshServer
org.h2.tools.Server))
(def ^:private ssh-username "jsmith")
(def ^:private ssh-password "supersecret")
......@@ -39,6 +49,7 @@
(.setPort ssh-mock-server-with-password-port)
(.setKeyPairProvider keypair-provider)
(.setPasswordAuthenticator password-auth)
(.setForwardingFilter AcceptAllForwardingFilter/INSTANCE)
.start)]
(log/debug "ssh mock server (with password) started")
sshd)
......@@ -109,7 +120,7 @@
;; correct password
(deftest connects-with-correct-password
(sshu/start-ssh-tunnel!
(ssh/start-ssh-tunnel!
{:tunnel-user ssh-username
:tunnel-host "127.0.0.1"
:tunnel-port ssh-mock-server-with-password-port
......@@ -121,7 +132,7 @@
(deftest throws-exception-on-incorrect-password
(is (thrown?
org.apache.sshd.common.SshException
(sshu/start-ssh-tunnel!
(ssh/start-ssh-tunnel!
{:tunnel-user ssh-username
:tunnel-host "127.0.0.1"
:tunnel-port ssh-mock-server-with-password-port
......@@ -132,7 +143,7 @@
;; correct ssh key
(deftest connects-with-correct-ssh-key
(is (some?
(sshu/start-ssh-tunnel!
(ssh/start-ssh-tunnel!
{:tunnel-user ssh-username
:tunnel-host "127.0.0.1"
:tunnel-port ssh-mock-server-with-publickey-port
......@@ -144,7 +155,7 @@
(deftest throws-exception-on-incorrect-ssh-key
(is (thrown?
org.apache.sshd.common.SshException
(sshu/start-ssh-tunnel!
(ssh/start-ssh-tunnel!
{:tunnel-user ssh-username
:tunnel-host "127.0.0.1"
:tunnel-port ssh-mock-server-with-publickey-port
......@@ -155,7 +166,7 @@
;; correct ssh key
(deftest connects-with-correct-ssh-key-and-passphrase
(is (some?
(sshu/start-ssh-tunnel!
(ssh/start-ssh-tunnel!
{:tunnel-user ssh-username
:tunnel-host "127.0.0.1"
:tunnel-port ssh-mock-server-with-publickey-passphrase-port
......@@ -167,7 +178,7 @@
(deftest throws-exception-on-incorrect-ssh-key-and-passphrase
(is (thrown?
java.io.StreamCorruptedException
(sshu/start-ssh-tunnel!
(ssh/start-ssh-tunnel!
{:tunnel-user ssh-username
:tunnel-host "127.0.0.1"
:tunnel-port ssh-mock-server-with-publickey-passphrase-port
......@@ -186,16 +197,93 @@
out-server (PrintWriter. (.getOutputStream client-socket) true)]
(.println out-server "hello from the ssh tunnel")))]
;; this will try to open a TCP connection via the tunnel.
(sshu/with-ssh-tunnel [details-with-tunnel {:tunnel-enabled true
:tunnel-user ssh-username
:tunnel-host "127.0.0.1"
:tunnel-port ssh-mock-server-with-publickey-passphrase-port
:tunnel-private-key (slurp ssh-key-with-passphrase)
:tunnel-private-key-passphrase ssh-key-passphrase
:host "127.0.0.1"
:port port}]
(ssh/with-ssh-tunnel [details-with-tunnel {:tunnel-enabled true
:tunnel-user ssh-username
:tunnel-host "127.0.0.1"
:tunnel-port ssh-mock-server-with-publickey-passphrase-port
:tunnel-private-key (slurp ssh-key-with-passphrase)
:tunnel-private-key-passphrase ssh-key-passphrase
:host "127.0.0.1"
:port port}]
(.connect socket (InetSocketAddress. "127.0.0.1" ^Integer (:tunnel-entrance-port details-with-tunnel)) 3000)
;; cause our future to run to completion
(u/deref-with-timeout server-thread 12000)
(with-open [in-client (BufferedReader. (InputStreamReader. (.getInputStream socket)))]
(is (= "hello from the ssh tunnel" (.readLine in-client)))))))))
(defn- init-h2-tcp-server [port]
(let [args ["-tcp" "-tcpPort", (str port), "-tcpAllowOthers" "-tcpDaemon"]
server (Server/createTcpServer (into-array args))]
(doto server (.start))))
(deftest test-ssh-tunnel-reconnection
;; for now, run against Postgres, although in theory it could run against many different kinds
(mt/test-drivers #{:postgres :mysql}
(testing "ssh tunnel is reestablished if it becomes closed, so subsequent queries still succeed"
(let [tunnel-db-details (assoc (:details (mt/db))
:tunnel-enabled true
:tunnel-host "localhost"
:tunnel-auth-option "password"
:tunnel-port ssh-mock-server-with-password-port
:tunnel-user ssh-username
:tunnel-pass ssh-password)]
(mt/with-temp Database [tunneled-db {:engine (tx/driver), :details tunnel-db-details}]
(mt/with-db tunneled-db
(sync/sync-database! (mt/db))
(letfn [(check-row []
(is (= [["Polo Lounge"]]
(mt/rows (mt/run-mbql-query venues {:filter [:= $id 60] :fields [$name]})))))]
;; check that some data can be queried
(check-row)
;; kill the ssh tunnel; fortunately, we have an existing function that can do that
(ssh/close-tunnel! (sql-jdbc.conn/db->pooled-connection-spec (mt/db)))
;; check the query again; the tunnel should have been reestablished
(check-row))))))))
(deftest test-ssh-tunnel-reconnection-h2
"We need a customized version of this test for H2. It will bring up a new H2 TCP server, pointing to an existing DB
file (stored in source control, called 'tiny-db', with a single table called 'my_tbl' and a GUEST user with
password 'guest'); it will then use an SSH tunnel over localhost to connect to this H2 server's TCP port to execute
native queries against that table."
(mt/with-driver :h2
(testing "ssh tunnel is reestablished if it becomes closed, so subsequent queries still succeed (H2 version)"
(let [h2-port (+ 49152 (rand-int (- 65535 49152))) ; https://stackoverflow.com/a/2675399
server (init-h2-tcp-server h2-port)
uri (format "tcp://localhost:%d/./test_resources/ssh/tiny-db;USER=GUEST;PASSWORD=guest" h2-port)
h2-db {:port h2-port
:host "localhost"
:db uri
:tunnel-enabled true
:tunnel-host "localhost"
:tunnel-auth-option "password"
:tunnel-port ssh-mock-server-with-password-port
:tunnel-user ssh-username
:tunnel-pass ssh-password}]
(try
(mt/with-temp Database [db {:engine :h2, :details h2-db}]
(mt/with-db db
(sync/sync-database! db)
(letfn [(check-data [] (is (= {:cols [{:base_type :type/Text
:display_name "COL1"
:field_ref [:field-literal "COL1" :type/Text]
:name "COL1"
:source :native}
{:base_type :type/Decimal
:display_name "COL2"
:field_ref [:field-literal "COL2" :type/Decimal]
:name "COL2"
:source :native}]
:rows [["First Row" 19.10M]
["Second Row" 100.40M]
["Third Row" 91884.10M]]}
(-> {:query "SELECT col1, col2 FROM my_tbl;"}
(mt/native-query)
(qp/process-query)
(qp.test/rows-and-cols)))))]
;; check that some data can be queried
(check-data)
;; kill the ssh tunnel; fortunately, we have an existing function that can do that
(ssh/close-tunnel! (sql-jdbc.conn/db->pooled-connection-spec db))
;; check the query again; the tunnel should have been reestablished
(check-data))))
(finally (.stop ^Server server)))))))
File added
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment