Skip to content
Snippets Groups Projects
Unverified Commit 3b3ed8b4 authored by Cam Saul's avatar Cam Saul
Browse files

Merge branch 'release-0.32.0' into master

parents 5a1075b4 16290182
Branches
Tags
No related merge requests found
Showing
with 516 additions and 413 deletions
......@@ -48,23 +48,26 @@ fi
# Calculate a checksum of all the driver source files. If we've already built the driver and the checksum is the same
# there's no need to build the driver a second time
calculate_checksum() {
find "$driver_project_dir" -name '*.clj' -or -name '*.yaml' | sort | cat | $md5_command
find "$driver_project_dir" -name '*.clj' -or -name '*.yaml' | sort | xargs cat | $md5_command
}
# Check whether the saved checksum for the driver sources from the last build is the same as the current one. If so,
# we don't need to build again.
checksum_is_same() {
result=""
if [ -f "$checksum_file" ]; then
old_checksum=`cat "$checksum_file"`
if [ "$(calculate_checksum)" == "$old_checksum" ]; then
current_checksum=`calculate_checksum`
echo "Checksum of source files for previous build: $old_checksum"
echo "Current checksum of source files: $current_checksum"
if [ "$current_checksum" == "$old_checksum" ]; then
# Make sure the target driver JAR actually exists as well!
if [ -f "$target_jar" ]; then
result="$driver driver source unchanged since last build. Skipping re-build."
echo "$driver driver source unchanged since last build. Skipping re-build."
return 0
fi
fi
fi
echo "$result"
return 1
}
######################################## BUILDING THE DRIVER ########################################
......@@ -228,9 +231,11 @@ mkdir -p resources/modules
if [ $# -eq 2 ]; then
$2
# Build driver if checksum has changed
elif [ ! "$(checksum_is_same)" ]; then
elif ! checksum_is_same; then
echo "Checksum has changed."
build_driver || retry_clean_build
# Either way, always copy the target uberjar to the dest location
else
echo "Checksum is unchanged."
(copy_target_to_dest && verify_build) || retry_clean_build
fi
This diff is collapsed.
(defproject metabase/snowflake-driver "1.0.0-SNAPSHOT-3.6.20"
(defproject metabase/snowflake-driver "1.0.0-SNAPSHOT-3.6.27"
:min-lein-version "2.5.0"
:dependencies
[[net.snowflake/snowflake-jdbc "3.6.21"]]
[[net.snowflake/snowflake-jdbc "3.6.27"]]
:profiles
{:provided
......
info:
name: Metabase Snowflake Driver
version: 1.0.0-SNAPSHOT-3.6.20
version: 1.0.0-SNAPSHOT-3.6.27
description: Allows Metabase to connect to Snowflake databases.
driver:
name: snowflake
......@@ -33,6 +33,7 @@ driver:
- name: role
display-name: Role
placeholder: my_role
- additional-options
connection-properties-include-tunnel-config: true
init:
- step: load-namespace
......
......@@ -13,6 +13,7 @@
[common :as driver.common]
[sql-jdbc :as sql-jdbc]]
[metabase.driver.sql-jdbc
[common :as sql-jdbc.common]
[connection :as sql-jdbc.conn]
[execute :as sql-jdbc.execute]
[sync :as sql-jdbc.sync]]
......@@ -39,22 +40,26 @@
account)]
;; it appears to be the case that their JDBC driver ignores `db` -- see my bug report at
;; https://support.snowflake.net/s/question/0D50Z00008WTOMCSA5/
(merge {:classname "net.snowflake.client.jdbc.SnowflakeDriver"
:subprotocol "snowflake"
:subname (str "//" host ".snowflakecomputing.com/")
:client_metadata_request_use_connection_ctx true
:ssl true
;; other SESSION parameters
;; use the same week start we use for all the other drivers
:week_start 7
;; not 100% sure why we need to do this but if we don't set the connection to UTC our report timezone
;; stuff doesn't work, even though we ultimately override this when we set the session timezone
:timezone "UTC"}
(-> opts
;; original version of the Snowflake driver incorrectly used `dbname` in the details fields instead of
;; `db`. If we run across `dbname`, correct our behavior
(set/rename-keys {:dbname :db})
(dissoc :host :port :timezone)))))
(-> (merge {:classname "net.snowflake.client.jdbc.SnowflakeDriver"
:subprotocol "snowflake"
:subname (str "//" host ".snowflakecomputing.com/")
:client_metadata_request_use_connection_ctx true
:ssl true
;; keep open connections open indefinitely instead of closing them. See #9674 and
;; https://docs.snowflake.net/manuals/sql-reference/parameters.html#client-session-keep-alive
:client_session_keep_alive true
;; other SESSION parameters
;; use the same week start we use for all the other drivers
:week_start 7
;; not 100% sure why we need to do this but if we don't set the connection to UTC our report timezone
;; stuff doesn't work, even though we ultimately override this when we set the session timezone
:timezone "UTC"}
(-> opts
;; original version of the Snowflake driver incorrectly used `dbname` in the details fields instead of
;; `db`. If we run across `dbname`, correct our behavior
(set/rename-keys {:dbname :db})
(dissoc :host :port :timezone)))
(sql-jdbc.common/handle-additional-options opts))))
(defmethod sql-jdbc.sync/database-type->base-type :snowflake [_ base-type]
({:NUMBER :type/Number
......
......@@ -15,6 +15,7 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d [%t] %-5p%c - %m%n
# customizations to logging by package
log4j.logger.metabase.driver=INFO
log4j.logger.metabase.plugins=DEBUG
log4j.logger.metabase.middleware=DEBUG
......@@ -23,12 +24,15 @@ log4j.logger.metabase.query-processor.permissions=INFO
log4j.logger.metabase.query-processor=INFO
log4j.logger.metabase.sync=DEBUG
log4j.logger.metabase.models.field-values=INFO
# NOCOMMIT
# TODO - we can dial these back a bit once we are satisfied the async stuff isn't so new (0.33.0+)
log4j.logger.metabase.async.api-response=DEBUG
log4j.logger.metabase.async.semaphore-channel=DEBUG
log4j.logger.metabase.async.util=DEBUG
log4j.logger.metabase.middleware.async=DEBUG
log4j.logger.metabase.query-processor.async=DEBUG
log4j.logger.metabase.async.api-response=DEBUG
log4j.logger.metabase=INFO
# c3p0 connection pools tend to log useless warnings way too often; only log actual errors
log4j.logger.com.mchange=ERROR
......@@ -217,14 +217,14 @@
:collection_id collection_id
:collection_position collection_position}
dashboard (db/transaction
;; Adding a new dashboard at `collection_position` could cause other dashboards in this collection to change
;; position, check that and fix up if needed
(api/maybe-reconcile-collection-position! dashboard-data)
;; Ok, now save the Dashboard
(u/prog1 (db/insert! Dashboard dashboard-data)
;; Get cards from existing dashboard and associate to copied dashboard
(doseq [card (:ordered_cards existing-dashboard)]
(api/check-500 (dashboard/add-dashcard! <> (:card_id card) card)))))]
;; Adding a new dashboard at `collection_position` could cause other dashboards in this
;; collection to change position, check that and fix up if needed
(api/maybe-reconcile-collection-position! dashboard-data)
;; Ok, now save the Dashboard
(u/prog1 (db/insert! Dashboard dashboard-data)
;; Get cards from existing dashboard and associate to copied dashboard
(doseq [card (:ordered_cards existing-dashboard)]
(api/check-500 (dashboard/add-dashcard! <> (:card_id card) card)))))]
(events/publish-event! :dashboard-create dashboard)))
......@@ -233,13 +233,7 @@
(api/defendpoint GET "/:id"
"Get `Dashboard` with ID."
[id]
(u/prog1 (-> (Dashboard id)
api/check-404
(hydrate [:ordered_cards :card :series] :can_write)
api/read-check
api/check-not-archived
hide-unreadable-cards
add-query-average-durations)
(u/prog1 (get-dashboard id)
(events/publish-event! :dashboard-read (assoc <> :actor_id api/*current-user-id*))))
......
......@@ -47,7 +47,7 @@
(let [source-card-id (query->source-card-id query)
options {:executed-by api/*current-user-id*, :context :ad-hoc,
:card-id source-card-id, :nested? (boolean source-card-id)}]
(qp.async/process-query-and-save-with-max! query options)))
(qp.async/process-query-and-save-with-max-results-constraints! query options)))
;;; ----------------------------------- Downloading Query Results in Other Formats -----------------------------------
......
(ns metabase.async.api-response
"Handle ring response maps that contain a core.async chan in the :body key:
{:status 200
:body (a/chan)}
and send strings (presumibly \n) as heartbeats to the client until the real results (a seq) is received, then stream
that to the client."
(:require [cheshire.core :as json]
[clojure.core.async :as a]
[clojure.java.io :as io]
......@@ -19,6 +26,7 @@
(def ^:private keepalive-interval-ms
"Interval between sending newline characters to keep Heroku from terminating requests like queries that take a long
time to complete."
;; 1 second
(* 1 1000))
(def ^:private absolute-max-keepalive-ms
......@@ -29,18 +37,16 @@
;; 4 hours
(* 4 60 60 1000))
;; Handle ring response maps that contain a core.async chan in the :body key:
;;
;; {:status 200
;; :body (a/chan)}
;;
;; and send strings (presumibly \n) as heartbeats to the client until the real results (a seq) is received, then
;; stream that to the client
(defn- write-keepalive-character [^Writer out]
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | Writing Results of Async Keep-alive Channel |
;;; +----------------------------------------------------------------------------------------------------------------+
(defn- write-keepalive-character! [^Writer out]
(try
;; a newline padding character as it's harmless and will allow us to check if the client
;; is connected. If sending this character fails because the connection is closed, the
;; chan will then close. Newlines are no-ops when reading JSON which this depends upon.
;; a newline padding character as it's harmless and will allow us to check if the client is connected. If sending
;; this character fails because the connection is closed, the chan will then close. Newlines are no-ops when
;; reading JSON which this depends upon.
(.write out (str \newline))
(.flush out)
true
......@@ -52,7 +58,7 @@
false)))
;; `chunkk` named as such to avoid conflict with `clojure.core/chunk`
(defn- write-response-chunk [chunkk, ^Writer out]
(defn- write-response-chunk! [chunkk, ^Writer out]
(cond
;; An error has occurred, let the user know
(instance? Throwable chunkk)
......@@ -65,12 +71,16 @@
:else
(log/error (trs "Unexpected output in async API response") (class chunkk))))
(defn- write-channel-to-output-stream [chan, ^Writer out]
(defn- write-chan-vals-to-writer!
"Write whatever val(s) come into `chan` onto the Writer wrapping our OutputStream. Vals should be either
`::keepalive`, meaning we should write a keepalive newline character to the Writer, or some other value, which is
the actual response we've been waiting for (at this point we can close both the Writer and the channel)."
[chan, ^Writer out]
(a/go-loop [chunkk (a/<! chan)]
(cond
(= chunkk ::keepalive)
;; keepalive chunkk
(if (write-keepalive-character out)
(= chunkk ::keepalive)
(if (write-keepalive-character! out)
(recur (a/<! chan))
(do
(a/close! chan)
......@@ -86,7 +96,7 @@
(future
(try
;; chunkk *might* be `nil` if the channel already go closed.
(write-response-chunk chunkk out)
(write-response-chunk! chunkk out)
(finally
;; should already be closed, but just to be safe
(a/close! chan)
......@@ -94,15 +104,12 @@
(.close out))))))
nil)
(extend-protocol ring.protocols/StreamableResponseBody
ManyToManyChannel
(write-body-to-stream [chan _ ^OutputStream output-stream]
(log/debug (u/format-color 'green (trs "starting streaming response")))
(write-channel-to-output-stream chan (io/writer output-stream))))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | Async Keep-alive Channel |
;;; +----------------------------------------------------------------------------------------------------------------+
(defn- start-async-keepalive-loop
(defn- start-async-keepalive-loop!
"Starts a go-loop that will send `::keepalive` messages to `output-chan` every second until `input-chan` either
produces a response or one of the two channels is closed. If `output-chan` is closed (because there's no longer
anywhere to write to -- the connection was canceled), closes `input-chan`; this can and is used by producers such as
......@@ -155,19 +162,32 @@
(a/close! output-chan)
(a/close! input-chan))))))))
(defn- async-keepalive-chan [input-chan]
(defn- async-keepalive-channel
"Given a core.async channel `input-chan` which will (presumably) eventually receive an asynchronous result, return a
new channel 'wrapping' the original that will write keepalive bytes until the actual result is obtained."
[input-chan]
;; Output chan only needs to hold on to the last message it got, for example no point in writing multiple `\n`
;; characters if the consumer didn't get a chance to consume them, and no point writing `\n` before writing the
;; actual response
(let [output-chan (a/chan (a/sliding-buffer 1))]
(start-async-keepalive-loop input-chan output-chan)
output-chan))
(u/prog1 (a/chan (a/sliding-buffer 1))
(start-async-keepalive-loop! input-chan <>)))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | Telling Ring & Compojure how to handle core.async channel API responses |
;;; +----------------------------------------------------------------------------------------------------------------+
;; Synchronous Compojure endpoint (e.g. `defendpoint`) responses go directly to here. Async endpoint
;; (`defendpoint-async`) responses go to Sendable and then to here. So technically this affects both sync & async.
(defn- async-keepalive-response [input-chan]
(assoc (response/response (async-keepalive-chan input-chan))
:content-type "applicaton/json; charset=utf-8"))
(extend-protocol ring.protocols/StreamableResponseBody
ManyToManyChannel
(write-body-to-stream [chan _ ^OutputStream output-stream]
(log/debug (u/format-color 'green (trs "starting streaming response")))
(write-chan-vals-to-writer! (async-keepalive-channel chan) (io/writer output-stream))))
(extend-protocol Sendable
ManyToManyChannel
(send* [input-chan _ respond _]
(respond (async-keepalive-response input-chan))))
(respond (assoc (response/response input-chan)
:content-type "applicaton/json; charset=utf-8"))))
......@@ -658,7 +658,7 @@
:question
:xlsx-download))
;; TODO - this schema is somewhat misleading because if you use a function like `qp/process-query-and-save-with-max!`
;; TODO - this schema is somewhat misleading because if you use a function like `qp/process-query-and-save-with-max-results-constraints!`
;; some of these keys (e.g. `:context`) are in fact required
(def Info
"Schema for query `:info` dictionary, which is used for informational purposes to record information about how a query
......
......@@ -62,29 +62,6 @@
(respond ring.json/default-malformed-response))
(handler request respond raise))))
#_(defn check-application-type-headers
"We don't support API requests with any type of content encoding other than JSON so let's be nice and make that
explicit. Added benefit is that it reduces CSRF surface because POSTing a form with JSON content encoding isn't so
easy to do."
[handler]
(fn
[{:keys [request-method body], {:strs [content-type]} :headers, :as request} respond raise]
;; GET or DELETE requests with no body we can go ahead and proceed without Content-Type headers, since they
;; generally don't have bodies.
;;
;; POST/PUT requests always require Content-Type: application/json. GET/DELETE requests that specify any other
;; content type aren't allowed.
(if (or (and (#{:get :delete} request-method)
(nil? content-type))
(#'ring.json/json-request? request))
(handler request respond raise)
(respond
{:status 400
:headers {"Content-Type" "text/plain"}
:body (str (tru "Metabase only supports JSON requests.")
" "
(tru "Make sure you set a 'Content-Type: application/json' header."))}))))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | Streaming JSON Responses |
......
......@@ -83,7 +83,6 @@
(merge
{:same-site :lax
:http-only true
:path "/api"
:max-age (config/config-int :max-session-age)}
;; If the authentication request request was made over HTTPS (hopefully always except for local dev instances)
;; add `Secure` attribute so the cookie is only sent over HTTPS.
......
(ns metabase.models.task-history
(:require [metabase.models.interface :as i]
(:require [clojure.tools.logging :as log]
[metabase.models.interface :as i]
[metabase.util :as u]
[metabase.util
[date :as du]
[i18n :refer [trs]]
[schema :as su]]
[schema.core :as s]
[toucan
......@@ -20,9 +22,9 @@
;; the date that task finished, it deletes everything after that. As we continue to add TaskHistory entries, this
;; ensures we'll have a good amount of history for debugging/troubleshooting, but not grow too large and fill the
;; disk.
(when-let [clean-before-date (db/select-one-field :ended_at TaskHistory {:limit 1
:offset num-rows-to-keep
:order-by [[:ended_at :desc]]})]
(when-let [clean-before-date (db/select-one-field :ended_at TaskHistory {:limit 1
:offset num-rows-to-keep
:order-by [[:ended_at :desc]]})]
(db/simple-delete! TaskHistory :ended_at [:<= clean-before-date])))
(u/strict-extend (class TaskHistory)
......@@ -36,7 +38,7 @@
(s/defn all
"Return all TaskHistory entries, applying `limit` and `offset` if not nil"
[limit :- (s/maybe su/IntGreaterThanZero)
[limit :- (s/maybe su/IntGreaterThanZero)
offset :- (s/maybe su/IntGreaterThanOrEqualToZero)]
(db/select TaskHistory (merge {:order-by [[:ended_at :desc]]}
(when limit
......@@ -58,11 +60,14 @@
(defn- save-task-history! [start-time-ms info]
(let [end-time-ms (System/currentTimeMillis)
duration-ms (- end-time-ms start-time-ms)]
(db/insert! TaskHistory
(assoc info
:started_at (du/->Timestamp start-time-ms)
:ended_at (du/->Timestamp end-time-ms)
:duration duration-ms))))
(try
(db/insert! TaskHistory
(assoc info
:started_at (du/->Timestamp start-time-ms)
:ended_at (du/->Timestamp end-time-ms)
:duration duration-ms))
(catch Throwable e
(log/warn e (trs "Error saving task history"))))))
(s/defn do-with-task-history
"Impl for `with-task-history` macro; see documentation below."
......@@ -85,6 +90,7 @@
"Execute `body`, recording a TaskHistory entry when the task completes; if it failed to complete, records an entry
containing information about the Exception. `info` should contain at least a name for the task (conventionally
lisp-cased) as `:task`; see the `TaskHistoryInfo` schema in this namespace for other optional keys.
(with-task-history {:task \"send-pulses\"}
...)"
{:style/indent 1}
......
......@@ -54,7 +54,7 @@
(when (io/resource "modules")
(let [plugins-path (plugins-dir)]
(files/with-open-path-to-resource [modules-path "modules"]
(files/copy-files-if-not-exists! modules-path plugins-path)))))
(files/copy-files! modules-path plugins-path)))))
;;; +----------------------------------------------------------------------------------------------------------------+
......
......@@ -7,14 +7,15 @@
*file-manipulation* functions for the sorts of operations the plugin system needs to perform."
(:require [clojure.java.io :as io]
[clojure.string :as str]
[clojure.tools.logging :as log]
[metabase.util :as u]
[metabase.util
[date :as du]
[i18n :refer [trs]]])
(:import java.io.FileNotFoundException
java.net.URL
[java.nio.file CopyOption Files FileSystem FileSystems LinkOption OpenOption Path]
java.nio.file.attribute.FileAttribute
[java.nio.file CopyOption Files FileSystem FileSystems LinkOption OpenOption Path StandardCopyOption]
[java.nio.file.attribute FileAttribute FileTime]
java.util.Collections))
;;; --------------------------------------------------- Path Utils ---------------------------------------------------
......@@ -69,20 +70,25 @@
;;; ------------------------------------------------- Copying Stuff --------------------------------------------------
(defn- copy! [^Path source, ^Path dest]
(du/profile (trs "Extract file {0} -> {1}" source dest)
(Files/copy source dest (u/varargs CopyOption))))
(defn- last-modified-time ^FileTime [^Path path]
(Files/getLastModifiedTime path (u/varargs LinkOption)))
(defn- copy-if-not-exists! [^Path source, ^Path dest]
(when-not (exists? dest)
(copy! source dest)))
(defn- copy-file! [^Path source, ^Path dest]
(when (or (not (exists? dest))
(pos? (.compareTo (last-modified-time source) (last-modified-time dest))))
(du/profile (trs "Extract file {0} -> {1}" source dest)
(Files/copy source dest (u/varargs CopyOption [StandardCopyOption/REPLACE_EXISTING])))))
(defn copy-files-if-not-exists!
"Copy all files in `source-dir` to `dest-dir`; skip files if a file of the same name already exists in `dest-dir`."
(defn copy-files!
"Copy all files in `source-dir` to `dest-dir`. Overwrites existing files if last modified date is older than that of
the source file."
[^Path source-dir, ^Path dest-dir]
(doseq [^Path source (files-seq source-dir)
:let [target (append-to-path dest-dir (str (.getFileName source)))]]
(copy-if-not-exists! source target)))
(try
(copy-file! source target)
(catch Throwable e
(log/error e (trs "Failed to copy file"))))))
;;; ------------------------------------------ Opening filesystems for URLs ------------------------------------------
......
......@@ -33,10 +33,11 @@
(when-let [card (Card :id card-id, :archived false)]
(let [{:keys [creator_id dataset_query]} card]
{:card card
:result (qp/process-query-and-save-with-max! dataset_query (merge {:executed-by creator_id,
:context :pulse,
:card-id card-id}
options))}))
:result (qp/process-query-and-save-with-max-results-constraints! dataset_query
(merge {:executed-by creator_id,
:context :pulse,
:card-id card-id}
options))}))
(catch Throwable t
(log/warn t (trs "Error running query for Card {0}" card-id)))))
......
......@@ -395,7 +395,7 @@
{:max-results-bare-rows max-results})
m))
(s/defn process-query-and-save-with-max!
(s/defn process-query-and-save-with-max-results-constraints!
"Same as `process-query-and-save-execution!` but will include the default max rows returned as a constraint. (This
function is ulitmately what powers most API endpoints that run queries, including `POST /api/dataset`.)"
{:style/indent 1}
......
......@@ -70,12 +70,12 @@
[query options]
(do-async (:database query) qp/process-query-and-save-execution! query options))
(defn process-query-and-save-with-max!
"Async version of `metabase.query-processor/process-query-and-save-with-max!`. Runs query asynchronously, and returns
a `core.async` channel that can be used to fetch the results once the query finishes running. Closing the channel
will cancel the query."
(defn process-query-and-save-with-max-results-constraints!
"Async version of `metabase.query-processor/process-query-and-save-with-max-results-constraints!`. Runs query
asynchronously, and returns a `core.async` channel that can be used to fetch the results once the query finishes
running. Closing the channel will cancel the query."
[query options]
(do-async (:database query) qp/process-query-and-save-with-max! query options))
(do-async (:database query) qp/process-query-and-save-with-max-results-constraints! query options))
(defn process-query-without-save!
"Async version of `metabase.query-processor/process-query-without-save!`. Runs query asynchronously, and returns a
......
......@@ -56,9 +56,13 @@
(.setHandler (#'ring-jetty/async-proxy-handler
handler
;; if any API endpoint functions aren't at the very least returning a channel to fetch the results
;; later after 30 seconds we're in serious trouble. Kill the request.
;; later after 10 minutes we're in serious trouble. (Almost everything 'slow' should be returning a
;; channel before then, but some things like CSV downloads don't currently return channels at this
;; time)
;;
;; TODO - I suppose the default value should be moved to the `metabase.config` namespace?
(or (config/config-int :mb-jetty-async-response-timeout)
(* 30 1000))))))
(* 10 60 1000))))))
(defn start-web-server!
"Start the embedded Jetty web server. Returns `:started` if a new server was started; `nil` if there was already a
......
......@@ -411,23 +411,26 @@
[task-name :- su/NonBlankString
database :- i/DatabaseInstance
{:keys [start-time end-time]} :- SyncOperationOrStepRunMetadata]
{:task task-name
:db_id (u/get-id database)
{:task task-name
:db_id (u/get-id database)
:started_at (du/->Timestamp start-time)
:ended_at (du/->Timestamp end-time)
:duration (du/calculate-duration start-time end-time)})
:ended_at (du/->Timestamp end-time)
:duration (du/calculate-duration start-time end-time)})
(s/defn ^:private store-sync-summary!
[operation :- s/Str
database :- i/DatabaseInstance
{:keys [steps] :as sync-md} :- SyncOperationMetadata]
(db/insert-many! TaskHistory
(cons (create-task-history operation database sync-md)
(for [[step-name step-info] steps
:let [task-details (dissoc step-info :start-time :end-time :log-summary-fn)]]
(assoc (create-task-history step-name database step-info)
:task_details (when (seq task-details)
task-details))))))
(try
(db/insert-many! TaskHistory
(cons (create-task-history operation database sync-md)
(for [[step-name step-info] steps
:let [task-details (dissoc step-info :start-time :end-time :log-summary-fn)]]
(assoc (create-task-history step-name database step-info)
:task_details (when (seq task-details)
task-details)))))
(catch Throwable e
(log/warn e (trs "Error saving task history")))))
(s/defn run-sync-operation
"Run `sync-steps` and log a summary message"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment