Skip to content
Snippets Groups Projects
Commit f4c5ed13 authored by Allen Gilliland's avatar Allen Gilliland
Browse files

basic pub/sub framework using core.async

parent 4aad54ca
Branches
Tags
No related merge requests found
(ns metabase.activity
(:require [clojure.core.async :as async]))
;;; ## ---------------------------------------- PUBLICATION ----------------------------------------
(def ^:private activity-channel
"Channel to host activity publications."
(async/chan))
(def ^:private activity-publication
"Publication for general activity.
Expects a map as input and the map must have a `:topic` key."
(async/pub activity-channel #(:topic %)))
(defn publish-activity
"Publish an item into the activity stream. Returns the published item."
[topic activity-item]
{:pre [(keyword topic)]}
(async/go (async/>! activity-channel {:topic (keyword topic) :item activity-item}))
activity-item)
;;; ## ---------------------------------------- SUBSCRIPTION ----------------------------------------
(defn subscribe-to-activity
"Subscribe to a given topic of the general activity stream.
Expects a topic to subscribe to and a `core.async` channel."
[topic channel]
{:pre [(keyword topic)]}
(async/sub activity-publication (keyword topic) channel))
;;; ## ---------------------------------------- ACTIVITY FEED ----------------------------------------
(def activity-feed-topics
"The `Set` of topics which are subscribed to and included in the Metabase published activity feed."
#{:card-create
:card-update
:dashboard-create
:dashboard-update
:dashboard-add-cards
:dashboard-remove-cards
:dashboard-reposition-cards})
(def ^:private activity-feed
"channel for activity feed subscription."
(async/chan))
;; create the core.async subscription for each of our activity-feed-topics
(loop [[topic & rest] (vec activity-feed-topics)]
(subscribe-to-activity topic activity-feed)
(when rest (recur rest)))
;; this is a placeholder for now
(defn take-and-print [channel prefix]
(async/go-loop []
(let [activity-item (async/<! channel)]
(println "Activity:" (:topic activity-item))
(clojure.pprint/pprint (:item activity-item))
(recur))))
(take-and-print activity-feed "activity-feed")
......@@ -2,6 +2,7 @@
(:require [compojure.core :refer [GET POST DELETE PUT]]
[korma.core :as k]
[medley.core :refer [mapply]]
[metabase.activity :as activity]
[metabase.api.common :refer :all]
[metabase.db :refer :all]
(metabase.models [hydrate :refer [hydrate]]
......@@ -63,15 +64,15 @@
{name [Required NonEmptyString]
public_perms [Required PublicPerms]
display [Required CardDisplayType]}
;; TODO - which other params are required?
(ins Card
:creator_id *current-user-id*
:dataset_query dataset_query
:description description
:display display
:name name
:public_perms public_perms
:visualization_settings visualization_settings))
(->> (ins Card
:creator_id *current-user-id*
:dataset_query dataset_query
:description description
:display display
:name name
:public_perms public_perms
:visualization_settings visualization_settings)
(activity/publish-activity :card-create)))
(defendpoint GET "/:id"
"Get `Card` with ID."
......@@ -87,13 +88,15 @@
public_perms PublicPerms
display CardDisplayType}
(write-check Card id)
(check-500 (upd-non-nil-keys Card id
:dataset_query dataset_query
:description description
:display display
:name name
:public_perms public_perms
:visualization_settings visualization_settings))
(->> (upd-non-nil-keys Card id
:dataset_query dataset_query
:description description
:display display
:name name
:public_perms public_perms
:visualization_settings visualization_settings)
(activity/publish-activity :card-update))
;; TODO - have revision stuff work using activity framework and then we can remove this call
(push-revision :entity Card, :object (Card id)))
(defendpoint DELETE "/:id"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment