Skip to content
Snippets Groups Projects
Commit 6b220353 authored by Allen Gilliland's avatar Allen Gilliland
Browse files

Merge pull request #2428 from crate/crate-support

Added support for Crate
parents 4441395f ebba5884
No related branches found
No related tags found
No related merge requests found
......@@ -13,9 +13,13 @@ dependencies:
- sudo apt-get purge mongodb-org*
- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 7F0CEB10
- echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.0.list
- sudo add-apt-repository ppa:crate/stable -y
- sudo apt-get update
- sudo apt-get install -y mongodb-org
- sudo apt-get install -y mongodb-org crate
# ulimit setting refused Crate service to start on CircleCI container - so comment it
- sudo sed -i '/MAX_LOCKED_MEMORY/s/^/#/' /etc/init/crate.conf
- sudo service mongod restart
- sudo service crate restart
- lein deps
- pip install awscli==1.7.3
- npm cache clean
......@@ -33,7 +37,8 @@ test:
# 4) runs Eastwood linter, Bikeshed linter, docstring-checker & ./bin/reflection-linter
# 5) runs JS linter + JS test
# 6) runs lein uberjar. (We don't run bin/build because we're not really concerned about `npm install` (etc) in this test, which runs elsewhere)
- case $CIRCLE_NODE_INDEX in 0) ENGINES=h2,mongo,mysql,bigquery lein test ;; 1) ENGINES=h2,sqlserver MB_DB_TYPE=postgres MB_DB_DBNAME=circle_test MB_DB_PORT=5432 MB_DB_USER=ubuntu MB_DB_HOST=localhost lein test ;; 2) ENGINES=h2,postgres,sqlite MB_DB_TYPE=mysql MB_DB_DBNAME=circle_test MB_DB_PORT=3306 MB_DB_USER=ubuntu MB_DB_HOST=localhost lein test ;; 3) ENGINES=h2,redshift,druid lein test ;; 4) lein eastwood && lein bikeshed && lein docstring-checker && ./bin/reflection-linter ;; 5) npm install && npm run lint && npm run build && npm run test ;; 6) lein uberjar ;; esac:
# 7) runs unit tests w/ H2 local DB. Runs agains H2, Crate
- case $CIRCLE_NODE_INDEX in 0) ENGINES=h2,mongo,mysql,bigquery lein test ;; 1) ENGINES=h2,sqlserver MB_DB_TYPE=postgres MB_DB_DBNAME=circle_test MB_DB_PORT=5432 MB_DB_USER=ubuntu MB_DB_HOST=localhost lein test ;; 2) ENGINES=h2,postgres,sqlite MB_DB_TYPE=mysql MB_DB_DBNAME=circle_test MB_DB_PORT=3306 MB_DB_USER=ubuntu MB_DB_HOST=localhost lein test ;; 3) ENGINES=h2,redshift,druid lein test ;; 4) lein eastwood && lein bikeshed && lein docstring-checker && ./bin/reflection-linter ;; 5) npm install && npm run lint && npm run build && npm run test ;; 6) lein uberjar ;; 7) ENGINES=h2,crate lein test ;; esac:
parallel: true
deployment:
master:
......
......@@ -64,11 +64,14 @@
[org.yaml/snakeyaml "1.17"] ; YAML parser (required by liquibase)
[org.xerial/sqlite-jdbc "3.8.11.2"] ; SQLite driver
[postgresql "9.3-1102.jdbc41"] ; Postgres driver
[io.crate/crate-jdbc "1.11.0"] ; Crate JDBC driver
[io.crate/crate-client "0.54.7"] ; Crate Java client (used by Crate JDBC)
[prismatic/schema "1.1.1"] ; Data schema declaration and validation library
[ring/ring-jetty-adapter "1.4.0"] ; Ring adapter using Jetty webserver (used to run a Ring server for unit tests)
[ring/ring-json "0.4.0"] ; Ring middleware for reading/writing JSON automatically
[stencil "0.5.0"] ; Mustache templates for Clojure
[swiss-arrows "1.0.0"]] ; 'Magic wand' macro -<>, etc.
:repositories [["bintray" "https://dl.bintray.com/crate/crate"]]
:plugins [[lein-environ "1.0.2"] ; easy access to environment variables
[lein-ring "0.9.7" ; start the HTTP server with 'lein ring server'
:exclusions [org.clojure/clojure]]] ; TODO - should this be a dev dependency ?
......
(ns metabase.driver.crate
(:require [clojure.set :as set]
[metabase.driver :as driver]
[metabase.driver.generic-sql :as sql]
(metabase.driver.crate [query-processor :as qp]
[util :as u]
[generic-sql :as gs]
[native :as n])
[korma.core :as k])
(:import (clojure.lang Named)))
(defn- column->base-type
"Map of Crate column types -> Field base types
Crate data types -> https://crate.io/docs/reference/sql/data_types.html"
[_ column-type]
({:integer :IntegerField
:string :TextField
:boolean :BooleanField
:byte :IntegerField
:short :IntegerField
:long :BigIntegerField
:float :FloatField
:double :FloatField
:ip :UnknownField
:timestamp :DateTimeField
:geo_shape :DictionaryField
:geo_point :ArrayField
:object :DictionaryField
:array :ArrayField
:object_array :ArrayField
:string_array :ArrayField
:integer_array :ArrayField
:float_array :ArrayField
:boolean_array :ArrayField
:byte_array :ArrayField
:timestamp_array :ArrayField
:short_array :ArrayField
:long_array :ArrayField
:double_array :ArrayField
:ip_array :ArrayField
:geo_shape_array :ArrayField
:geo_point_array :ArrayField
} column-type))
(def ^:private now (k/sqlfn :CURRENT_TIMESTAMP (k/raw 3)))
(defrecord CrateDriver []
Named
(getName [_] "Crate"))
(defn- crate-spec
[{:keys [hosts]
:or {hosts "//localhost:4300"}
:as opts}]
(merge {:classname "io.crate.client.jdbc.CrateDriver" ; must be in classpath
:subprotocol "crate"
:subname (str hosts)}
(dissoc opts :hosts)))
(defn- connection-details->spec [_ details]
(crate-spec details))
(defn- can-connect [driver details]
(let [connection (connection-details->spec driver details)]
(= 1 (-> (k/exec-raw connection "select 1 from sys.cluster" :results)
first
vals
first))))
(def CrateISQLDriverMixin
"Implementations of `ISQLDriver` methods for `CrateDriver`."
(merge (sql/ISQLDriverDefaultsMixin)
{:connection-details->spec connection-details->spec
:column->base-type column->base-type
:string-length-fn (constantly :CHAR_LENGTH)
:apply-filter qp/apply-filter
:date u/date
:unix-timestamp->timestamp u/unix-timestamp->timestamp
:current-datetime-fn (constantly now)}))
(extend CrateDriver
driver/IDriver
(merge (sql/IDriverSQLDefaultsMixin)
{:details-fields (constantly [{:name "hosts"
:display-name "Hosts"
:default "//localhost:4300"}])
:can-connect? can-connect
:date-interval u/date-interval
:analyze-table gs/analyze-table
:process-native n/process-and-run
:features (fn [this]
(set/difference (sql/features this)
#{:foreign-keys}))})
sql/ISQLDriver CrateISQLDriverMixin)
(driver/register-driver! :crate (CrateDriver.))
(ns metabase.driver.crate.generic-sql
(:require [metabase.driver.generic-sql :as sql]
[korma.core :as k]
[metabase.models.field :as field]
[metabase.sync-database.analyze :as analyze]))
(defn- field-avg-length [_ field]
(or (some-> (sql/korma-entity (field/table field))
(k/select (k/aggregate (avg (k/sqlfn :CHAR_LENGTH
(sql/escape-field-name (:name field))))
:len))
first
:len
int)
0))
(defn- field-percent-urls [_ field]
(or (let [korma-table (sql/korma-entity (field/table field))]
(when-let [total-non-null-count (:count (first (k/select korma-table
(k/aggregate (count (k/raw "*")) :count)
(k/where {(sql/escape-field-name (:name field)) [not= nil]}))))]
(when (> total-non-null-count 0)
(when-let [url-count (:count (first (k/select korma-table
(k/aggregate (count (k/raw "*")) :count)
(k/where {(sql/escape-field-name (:name field)) [like "http%://_%.__%"]}))))]
(float (/ url-count total-non-null-count))))))
0.0))
(defn analyze-table
"Default implementation of `analyze-table` for SQL drivers."
[driver table new-table-ids]
((analyze/make-analyze-table driver
:field-avg-length-fn (partial field-avg-length driver)
:field-percent-urls-fn (partial field-percent-urls driver))
driver
table
new-table-ids))
(ns metabase.driver.crate.native
(:require [clojure.java.jdbc :as jdbc]
[metabase.models.database :refer [Database]]
[metabase.db :refer [sel]]
[metabase.driver.generic-sql :as sql]
[clojure.tools.logging :as log]
[metabase.util :as u]
[metabase.driver.generic-sql.native :as n]))
(defn process-and-run
"Process and run a native (raw SQL) QUERY."
[driver {{sql :query} :native, database-id :database, :as query}]
(try (let [database (sel :one :fields [Database :engine :details] :id database-id)
db-conn (sql/db->jdbc-connection-spec database)]
(jdbc/with-db-connection [t-conn db-conn]
(let [^java.sql.Connection jdbc-connection (:connection t-conn)]
(try
;; Now run the query itself
(log/debug (u/format-color 'green "%s" sql))
(let [[columns & [first-row :as rows]] (jdbc/query t-conn sql, :as-arrays? true)]
{:rows rows
:columns columns
:cols (for [[column first-value] (partition 2 (interleave columns first-row))]
{:name column
:base_type (n/value->base-type first-value)})})))))
(catch java.sql.SQLException e
(let [^String message (or (->> (.getMessage e) ; error message comes back like 'Column "ZID" not found; SQL statement: ... [error-code]' sometimes
(re-find #"^(.*);") ; the user already knows the SQL, and error code is meaningless
second) ; so just return the part of the exception that is relevant
(.getMessage e))]
(throw (Exception. message))))))
(ns metabase.driver.crate.query-processor
(:require [korma.core :as k]
[metabase.driver.generic-sql.query-processor :as qp]
[korma.sql.fns :as kfns]
[korma.sql.engine :as kengine]
[metabase.query-processor.interface :as i])
(:import (metabase.query_processor.interface ComparisonFilter CompoundFilter)))
(defn- rewrite-between
"Rewrite [:between <field> <min> <max>] -> [:and [:>= <field> <min>] [:<= <field> <max>]]"
[clause]
(i/strict-map->CompoundFilter {:compound-type :and :subclauses [(ComparisonFilter. :>= (:field clause) (:min-val clause))
(ComparisonFilter. :<= (:field clause) (:max-val clause))]}))
(defn resolve-subclauses
"resolve filters recursively"
[clause]
(if (= (count (:subclauses clause)) 0)
(case (:filter-type clause)
:between (qp/filter-clause->predicate (rewrite-between clause))
(qp/filter-clause->predicate clause))
(case (:compound-type clause)
:and (apply kfns/pred-and (map resolve-subclauses (:subclauses clause)))
:or (apply kfns/pred-or (map resolve-subclauses (:subclauses clause)))
:not (kfns/pred-not (kengine/pred-map (qp/filter-subclause->predicate clause))))))
(defn apply-filter
"Apply custom generic SQL filter. This is the place to perform query rewrites."
[_ korma-form {clause :filter}]
(k/where korma-form (resolve-subclauses clause)))
(ns metabase.driver.crate.util
(:require [metabase.util.korma-extensions :as kx]
[korma.sql.utils :as kutils]
[korma.core :as k]
[metabase.util :as u]
[metabase.driver.generic-sql.query-processor :as qp])
(:import (java.sql Timestamp)))
(defn unix-timestamp->timestamp [_ expr seconds-or-milliseconds]
"Converts datetime string to a valid timestamp"
(case seconds-or-milliseconds
:seconds (recur nil (kx/* expr 1000) :milliseconds)
:milliseconds (kutils/func (str "TRY_CAST(%s as TIMESTAMP)") [expr])))
(defn- date-trunc [unit expr]
"date_trunc('interval', timezone, timestamp): truncates a timestamp to a given interval"
(let [timezone (get-in qp/*query* [:settings :report-timezone])]
(if (= (nil? timezone) true)
(k/sqlfn :DATE_TRUNC (kx/literal unit) expr)
(k/sqlfn :DATE_TRUNC (kx/literal unit) timezone expr))))
(defn- date-format [format expr]
"date_format('format_string', timezone, timestamp): formats the timestamp as string"
(let [timezone (get-in qp/*query* [:settings :report-timezone])]
(if (nil? timezone)
(k/sqlfn :DATE_FORMAT format expr)
(k/sqlfn :DATE_FORMAT format timezone expr))))
(defn- extract [unit expr]
"extract(field from expr): extracts subfields of a timestamp"
(case unit
;; Crate DOW starts with Monday (1) to Sunday (7)
:day_of_week (kx/+ (kx/mod (kutils/func (format "EXTRACT(%s FROM %%s)" (name unit)) [expr]) 7) 1)
(kutils/func (format "EXTRACT(%s FROM %%s)" (name unit)) [expr])))
(def ^:private extract-integer
(comp kx/->integer extract))
(def ^:private ^:const second 1000)
(def ^:private ^:const minute (* 60 second))
(def ^:private ^:const hour (* 60 minute))
(def ^:private ^:const day (* 24 hour))
(def ^:private ^:const week (* 7 day))
(def ^:private ^:const year (* 365 day))
(def ^:private ^:const month (Math/round (float (/ year 12))))
(defn date [_ unit expr]
(let [v (if (instance? Timestamp expr)
(kx/literal (u/date->iso-8601 expr))
expr)]
(case unit
:default (date-format (str "%Y-%m-%d %H:%i:%s") v)
:second (date-format (str "%Y-%m-%d %H:%i:%s") (date-trunc :second v))
:minute (date-format (str "%Y-%m-%d %H:%i:%s") (date-trunc :minute v))
:minute-of-hour (extract-integer :minute v)
:hour (date-format (str "%Y-%m-%d %H:%i:%s") (date-trunc :hour v))
:hour-of-day (extract-integer :hour v)
:day (date-format (str "%Y-%m-%d") (date-trunc :day v))
:day-of-week (extract-integer :day_of_week v)
:day-of-month (extract-integer :day_of_month v)
:day-of-year (extract-integer :day_of_year v)
;; Crate weeks start on Monday, so shift this date into the proper bucket and then decrement the resulting day
:week (date-format (str "%Y-%m-%d") (kx/- (date-trunc :week (kx/+ v day)) day))
:week-of-year (extract-integer :week v)
:month (date-format (str "%Y-%m-%d") (date-trunc :month v))
:month-of-year (extract-integer :month v)
:quarter (date-format (str "%Y-%m-%d") (date-trunc :quarter v))
:quarter-of-year (extract-integer :quarter v)
:year (extract-integer :year v))))
(defn- sql-interval [unit amount]
(format "CURRENT_TIMESTAMP + %d" (* unit amount)))
(defn date-interval [_ unit amount]
"defines the sql command required for date-interval calculation"
(case unit
:quarter (recur nil :month (kx/* amount 3))
:year (k/raw (sql-interval year amount))
:month (k/raw (sql-interval month amount))
:week (k/raw (sql-interval week amount))
:day (k/raw (sql-interval day amount))
:hour (k/raw (sql-interval hour amount))
:minute (k/raw (sql-interval minute amount))
:second (k/raw (sql-interval second amount))))
......@@ -15,7 +15,8 @@
java.util.Map
clojure.lang.Keyword
com.mchange.v2.c3p0.ComboPooledDataSource
(metabase.query_processor.interface Field Value)))
(metabase.query_processor.interface Field Value)
(clojure.lang PersistentVector)))
(declare korma-entity)
......@@ -85,6 +86,9 @@
"Return a korma form appropriate for converting a Unix timestamp integer field or value to an proper SQL `Timestamp`.
SECONDS-OR-MILLISECONDS refers to the resolution of the int in question and with be either `:seconds` or `:milliseconds`."))
(extend-protocol jdbc/IResultSetReadColumn
(class (object-array []))
(result-set-read-column [x _ _] (PersistentVector/adopt x)))
(def ^:dynamic ^:private connection-pools
"A map of our currently open connection pools, keyed by DATABASE `:id`."
......
......@@ -8,7 +8,7 @@
[metabase.models.database :refer [Database]]
[metabase.util :as u]))
(defn- value->base-type
(defn value->base-type
"Attempt to match a value we get back from the DB with the corresponding base-type`."
[v]
(driver/class->base-type (type v)))
......
......@@ -150,7 +150,7 @@
(apply k/fields korma-form (for [field fields]
(as (formatted field) field))))
(defn- filter-subclause->predicate
(defn filter-subclause->predicate
"Given a filter SUBCLAUSE, return a Korma filter predicate form for use in korma `where`."
[{:keys [filter-type field value], :as filter}]
{:pre [(map? filter) field]}
......@@ -167,7 +167,7 @@
:= ['= (formatted value)]
:!= ['not= (formatted value)])}))
(defn- filter-clause->predicate [{:keys [compound-type subclause subclauses], :as clause}]
(defn filter-clause->predicate [{:keys [compound-type subclause subclauses], :as clause}]
(case compound-type
:and (apply kfns/pred-and (map filter-clause->predicate subclauses))
:or (apply kfns/pred-or (map filter-clause->predicate subclauses))
......
......@@ -836,16 +836,16 @@
(ql/order-by (ql/asc (ql/aggregate-field 0))))
:data (format-rows-by [int int])))
;;; order_by aggregate ["stddev" field-id]
;; MySQL has a nasty tendency to return different results on different systems so just round everything to the nearest int.
;; It also seems to give slightly different results than less-sucky DBs as evidenced below
;;; ### order_by aggregate ["stddev" field-id]
;; SQRT calculations are always NOT EXACT (normal behavior) so round everything to the nearest int.
;; Databases might use different versions of SQRT implementations
(datasets/expect-with-engines (engines-that-support :standard-deviation-aggregations)
{:columns [(format-name "price")
"stddev"]
:rows [[3 (if (= *engine* :mysql) 25 26)]
:rows [[3 (if (contains? #{:mysql :crate} *engine*) 25 26)]
[1 24]
[2 21]
[4 (if (= *engine* :mysql) 14 15)]]
[4 (if (contains? #{:mysql :crate} *engine*) 14 15)]]
:cols [(venues-col :price)
(aggregate-col :stddev (venues-col :category_id))]}
(->> (run-query venues
......@@ -924,7 +924,7 @@
(expect-with-non-timeseries-dbs
(cond
(= *engine* :sqlite)
(contains? #{:sqlite :crate} *engine*)
[["2015-06-01" 6]
["2015-06-02" 10]
["2015-06-03" 4]
......@@ -1287,7 +1287,7 @@
(expect-with-non-timeseries-dbs
(cond
(= *engine* :sqlite)
(contains? #{:sqlite :crate} *engine*)
[["2015-06-01 10:31:00" 1]
["2015-06-01 16:06:00" 1]
["2015-06-01 17:23:00" 1]
......@@ -1326,7 +1326,7 @@
(expect-with-non-timeseries-dbs
(cond
(= *engine* :sqlite)
(contains? #{:sqlite :crate} *engine*)
[["2015-06-01 10:31:00" 1]
["2015-06-01 16:06:00" 1]
["2015-06-01 17:23:00" 1]
......@@ -1378,7 +1378,7 @@
(expect-with-non-timeseries-dbs
(cond
(= *engine* :sqlite)
(contains? #{:sqlite :crate} *engine*)
[["2015-06-01 10:00:00" 1]
["2015-06-01 16:00:00" 1]
["2015-06-01 17:00:00" 1]
......@@ -1423,7 +1423,7 @@
(expect-with-non-timeseries-dbs
(cond
(= *engine* :sqlite)
(contains? #{:sqlite :crate} *engine*)
[["2015-06-01" 6]
["2015-06-02" 10]
["2015-06-03" 4]
......@@ -1480,7 +1480,7 @@
(expect-with-non-timeseries-dbs
(cond
(= *engine* :sqlite)
(contains? #{:sqlite :crate} *engine*)
[["2015-05-31" 46]
["2015-06-07" 47]
["2015-06-14" 40]
......@@ -1505,7 +1505,7 @@
(expect-with-non-timeseries-dbs
;; Not really sure why different drivers have different opinions on these </3
(cond
(contains? #{:sqlserver :sqlite} *engine*)
(contains? #{:sqlserver :sqlite :crate} *engine*)
[[23 54] [24 46] [25 39] [26 61]]
(contains? #{:mongo :redshift :bigquery :postgres :h2} *engine*)
......@@ -1516,7 +1516,7 @@
(sad-toucan-incidents-with-bucketing :week-of-year))
(expect-with-non-timeseries-dbs
[[(if (= *engine* :sqlite) "2015-06-01", "2015-06-01T00:00:00.000Z") 200]]
[[(if (contains? #{:sqlite :crate} *engine*) "2015-06-01", "2015-06-01T00:00:00.000Z") 200]]
(sad-toucan-incidents-with-bucketing :month))
(expect-with-non-timeseries-dbs
......@@ -1524,7 +1524,7 @@
(sad-toucan-incidents-with-bucketing :month-of-year))
(expect-with-non-timeseries-dbs
[[(if (= *engine* :sqlite) "2015-04-01", "2015-04-01T00:00:00.000Z") 200]]
[[(if (contains? #{:sqlite :crate} *engine*) "2015-04-01", "2015-04-01T00:00:00.000Z") 200]]
(sad-toucan-incidents-with-bucketing :quarter))
(expect-with-non-timeseries-dbs
......
(ns metabase.test.data.crate
"Code for creating / destroying a Crate database from a `DatabaseDefinition`."
(:require [clojure.java.jdbc :as jdbc]
[clojure.string :as s]
[metabase.driver.generic-sql :as sql]
(metabase.test.data [generic-sql :as generic]
[interface :as i])
[metabase.util :as u])
(:import metabase.driver.crate.CrateDriver))
(def ^:private ^:const field-base-type->sql-type
{:BigIntegerField "long"
:BooleanField "boolean"
:CharField "string"
:DateField "timestamp"
:DateTimeField "timestamp"
:DecimalField "integer"
:FloatField "float"
:IntegerField "integer"
:TextField "string"
:TimeField "timestamp"})
(defn- timestamp->CrateDateTime
[value]
(if (instance? java.sql.Timestamp value)
(.getTime (u/->Timestamp value))
(if (and (instance? clojure.lang.PersistentArrayMap value) (contains? value :korma.sql.utils/generated))
(+ (read-string (s/replace (:korma.sql.utils/generated value) #"CURRENT_TIMESTAMP \+" "")) (.getTime (u/new-sql-timestamp)))
value)))
(defn- escape-field-names
"Escape the field-name keys in ROW-OR-ROWS."
[row-or-rows]
(if (sequential? row-or-rows)
(map escape-field-names row-or-rows)
(into {} (for [[k v] row-or-rows]
{(sql/escape-field-name k) (timestamp->CrateDateTime v)}))))
(defn- make-load-data-fn
"Create a `load-data!` function. This creates a function to actually insert a row or rows, wraps it with any WRAP-INSERT-FNS,
the calls the resulting function with the rows to insert."
[& wrap-insert-fns]
(fn [driver dbdef tabledef]
(let [insert! ((apply comp wrap-insert-fns) (fn [row-or-rows]
(apply jdbc/insert!
(generic/database->spec driver :db dbdef)
(keyword (:table-name tabledef))
:transaction? false
(escape-field-names row-or-rows))))
rows (apply list (generic/load-data-get-rows driver dbdef tabledef))]
(insert! rows))))
(def ^:private database->connection-details
(constantly {:host "localhost"
:port 4300}))
(extend CrateDriver
generic/IGenericSQLDatasetLoader
(merge generic/DefaultsMixin
{:execute-sql! generic/sequentially-execute-sql!
:field-base-type->sql-type (u/drop-first-arg field-base-type->sql-type)
:pk-sql-type (constantly "integer")
:create-db-sql (constantly nil)
:add-fk-sql (constantly nil)
:drop-db-if-exists-sql (constantly nil)
:load-data! (make-load-data-fn generic/load-data-add-ids)})
i/IDatasetLoader
(merge generic/IDatasetLoaderMixin
{:database->connection-details database->connection-details
:engine (constantly :crate)
:default-schema (constantly "doc")}))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment