Skip to content
Snippets Groups Projects
Commit d7a0bcf5 authored by Allen Gilliland's avatar Allen Gilliland
Browse files

streamlining more of the event bus and activity stuff and rounding things out....

streamlining more of the event bus and activity stuff and rounding things out.  adding new migration which creates the `activity` table.
parent 551f78f9
No related branches found
No related tags found
No related merge requests found
databaseChangeLog:
- changeSet:
id: 12
author: agilliland
changes:
- createTable:
tableName: activity
columns:
- column:
name: id
type: int
autoIncrement: true
constraints:
primaryKey: true
nullable: false
- column:
name: topic
type: varchar(32)
constraints:
nullable: false
- column:
name: model
type: varchar(16)
constraints:
nullable: true
- column:
name: model_id
type: int
constraints:
nullable: true
- column:
name: user_id
type: int
constraints:
nullable: true
references: core_user(id)
foreignKeyName: fk_activity_ref_user_id
deferrable: false
initiallyDeferred: false
- column:
name: timestamp
type: DATETIME
constraints:
nullable: false
- column:
name: details
type: varchar
constraints:
nullable: false
- createIndex:
tableName: activity
indexName: idx_activity_topic
columns:
column:
name: topic
- createIndex:
tableName: activity
indexName: idx_activity_model_model_id
columns:
column:
name: model
column:
name: model_id
- modifySql:
dbms: postgresql
replace:
replace: WITHOUT
with: WITH
......@@ -11,5 +11,6 @@
{"include": {"file": "migrations/010_add_revision_table.yaml"}},
{"include": {"file": "migrations/011_cleanup_dashboard_perms.yaml"}},
{"include": {"file": "migrations/012_add_card_query_fields.yaml"}}
{"include": {"file": "migrations/012_add_activity_table.yaml"}}
]
}
(ns metabase.activity
(:require [clojure.core.async :as async]))
;;; ## ---------------------------------------- PUBLICATION ----------------------------------------
(def ^:private activity-channel
"Channel to host activity publications."
(async/chan))
(def ^:private activity-publication
"Publication for general activity.
Expects a map as input and the map must have a `:topic` key."
(async/pub activity-channel #(:topic %)))
(defn publish-activity
"Publish an item into the activity stream. Returns the published item."
[topic activity-item]
{:pre [(keyword topic)]}
(async/go (async/>! activity-channel {:topic (keyword topic) :item activity-item}))
activity-item)
;;; ## ---------------------------------------- SUBSCRIPTION ----------------------------------------
(defn subscribe-to-activity
"Subscribe to a given topic of the general activity stream.
Expects a topic to subscribe to and a `core.async` channel."
[topic channel]
{:pre [(keyword topic)]}
(async/sub activity-publication (keyword topic) channel))
;;; ## ---------------------------------------- ACTIVITY FEED ----------------------------------------
(def activity-feed-topics
"The `Set` of topics which are subscribed to and included in the Metabase published activity feed."
#{:card-create
:card-update
:card-delete
:dashboard-create
:dashboard-update
:dashboard-delete
:dashboard-add-cards
:dashboard-remove-cards
:dashboard-reposition-cards})
(def ^:private activity-feed
"channel for activity feed subscription."
(async/chan))
;; create the core.async subscription for each of our activity-feed-topics
(loop [[topic & rest] (vec activity-feed-topics)]
(subscribe-to-activity topic activity-feed)
(when rest (recur rest)))
;; this is a placeholder for now
(defn take-and-print [channel prefix]
(async/go-loop []
(let [activity-item (async/<! channel)]
(println "Activity:" (:topic activity-item))
(clojure.pprint/pprint (:item activity-item))
(recur))))
(take-and-print activity-feed "activity-feed")
......@@ -2,7 +2,7 @@
(:require [compojure.core :refer [GET POST DELETE PUT]]
[korma.core :as k]
[medley.core :refer [mapply]]
[metabase.activity :as activity]
[metabase.events :as events]
[metabase.api.common :refer :all]
[metabase.db :refer :all]
(metabase.models [hydrate :refer [hydrate]]
......@@ -72,7 +72,7 @@
:name name
:public_perms public_perms
:visualization_settings visualization_settings)
(activity/publish-activity :card-create)))
(events/publish-event :card-create)))
(defendpoint GET "/:id"
"Get `Card` with ID."
......@@ -95,7 +95,7 @@
:name name
:public_perms public_perms
:visualization_settings visualization_settings)
(activity/publish-activity :card-update {:id id :actor_id *current-user-id*})
(events/publish-event :card-update {:id id :actor_id *current-user-id*})
(push-revision :entity Card, :object (Card id)))
(defendpoint DELETE "/:id"
......@@ -103,7 +103,7 @@
[id]
(write-check Card id)
(let [result (cascade-delete Card :id id)]
(activity/publish-activity :card-delete {:id id :actor_id *current-user-id*})
(events/publish-event :card-delete {:id id :actor_id *current-user-id*})
result))
(defendpoint GET "/:id/favorite"
......
......@@ -2,7 +2,7 @@
"/api/dash endpoints."
(:require [compojure.core :refer [GET POST PUT DELETE]]
[korma.core :as k]
[metabase.activity :as activity]
[metabase.events :as events]
[metabase.api.common :refer :all]
[metabase.db :refer :all]
(metabase.models [hydrate :refer [hydrate]]
......@@ -35,7 +35,7 @@
:description description
:public_perms public_perms
:creator_id *current-user-id*)
(activity/publish-activity :dashboard-create)))
(events/publish-event :dashboard-create)))
(defendpoint GET "/:id"
"Get `Dashboard` with ID."
......@@ -54,7 +54,7 @@
:description description
:name name
:public_perms public_perms))
(activity/publish-activity :dashboard-update {:id id :actor_id *current-user-id*})
(events/publish-event :dashboard-update {:id id :actor_id *current-user-id*})
(push-revision :entity Dashboard, :object (Dashboard id)))
(defendpoint DELETE "/:id"
......@@ -62,7 +62,7 @@
[id]
(write-check Dashboard id)
(let [result (cascade-delete Dashboard :id id)]
(activity/publish-activity :dashboard-delete {:id id :actor_id *current-user-id*})
(events/publish-event :dashboard-delete {:id id :actor_id *current-user-id*})
result))
(defendpoint POST "/:id/cards"
......@@ -72,7 +72,7 @@
(write-check Dashboard id)
(check-400 (exists? Card :id cardId))
(let [result (ins DashboardCard :card_id cardId :dashboard_id id)]
(activity/publish-activity :dashboard-add-cards (merge {:actor_id *current-user-id*} result))
(events/publish-event :dashboard-add-cards (merge {:actor_id *current-user-id*} result))
(push-revision :entity Dashboard, :object (Dashboard id))
result))
......@@ -82,7 +82,7 @@
{dashcardId [Required String->Integer]}
(write-check Dashboard id)
(let [result (del DashboardCard :id dashcardId :dashboard_id id)]
(activity/publish-activity :dashboard-remove-cards {:id dashcardId :dashboard_id id :actor_id *current-user-id*})
(events/publish-event :dashboard-remove-cards {:id dashcardId :dashboard_id id :actor_id *current-user-id*})
(push-revision :entity Dashboard, :object (Dashboard id))
result))
......@@ -101,7 +101,7 @@
(let [dashcard (sel :one [DashboardCard :id] :id dashcard-id :dashboard_id id)]
(when dashcard
(upd DashboardCard dashcard-id :sizeX sizeX :sizeY sizeY :row row :col col))))
(activity/publish-activity :dashboard-reposition-cards {:dashboard_id id :actor_id *current-user-id* :cards cards})
(events/publish-event :dashboard-reposition-cards {:dashboard_id id :actor_id *current-user-id* :cards cards})
(push-revision :entity Dashboard, :object (Dashboard id))
{:status :ok})
......
......@@ -23,7 +23,8 @@
(metabase.middleware [auth :as auth]
[log-api-call :refer :all]
[format :refer :all])
(metabase.models [setting :refer [defsetting]]
(metabase.models [activity :refer [start-activity-feed]]
[setting :refer [defsetting]]
[database :refer [Database]]
[user :refer [User]])))
......@@ -103,6 +104,9 @@
;; Now start the task runner
(task/start-task-runner!)
;; Bootstrap the activity feed system
(start-activity-feed)
(log/info "Metabase Initialization COMPLETE")
true)
......
(ns metabase.events
"Provides a very simply event bus using `core.async` to allow publishing and subscribing to intersting
topics happening throughout the Metabase system in a decoupled way."
(:require [clojure.core.async :as async]))
;;; ## ---------------------------------------- PUBLICATION ----------------------------------------
(def ^:private events-channel
"Channel to host events publications."
(async/chan))
(def ^:private events-publication
"Publication for general events channel.
Expects a map as input and the map must have a `:topic` key."
(async/pub events-channel #(:topic %)))
(defn publish-event
"Publish an item into the events stream.
Returns the published item to allow for chaining."
[topic event-item]
{:pre [(keyword topic)]}
(async/go (async/>! events-channel {:topic (keyword topic) :item event-item}))
event-item)
;;; ## ---------------------------------------- SUBSCRIPTION ----------------------------------------
(defn subscribe-to-topic
"Subscribe to a given topic of the general events stream.
Expects a topic to subscribe to and a `core.async` channel.
Returns the channel to allow for chaining."
[topic channel]
{:pre [(keyword topic)]}
(async/sub events-publication (keyword topic) channel)
channel)
(defn subscribe-to-topics
"Convenience method for subscribing to series of topics against a single channel."
[topics channel]
{:pre [(coll? topics)]}
(loop [[topic & rest] (vec topics)]
(subscribe-to-topic topic channel)
(when rest (recur rest))))
(ns metabase.models.activity
(:require [clojure.core.async :as async]
[clojure.tools.logging :as log]
[korma.core :refer :all, :exclude [defentity update]]
[metabase.api.common :refer [*current-user-id*]]
[metabase.db :refer :all]
[metabase.events :as events]
(metabase.models [interface :refer :all]
[user :refer [User]])))
;;; ## ---------------------------------------- ACTIVITY ENTITY ----------------------------------------
(defrecord ActivityFeedItemInstance []
clojure.lang.IFn
(invoke [this k]
(get this k)))
(extend-ICanReadWrite ActivityFeedItemInstance :read :public-perms, :write :public-perms)
(defentity ActivityFeedItem
[(table :activity_feed)
timestamped]
(post-select [_ {:keys [user_id] :as feed-item}]
(map->ActivityFeedItemInstance (assoc feed-item :user (delay (User user_id))))))
(extend-ICanReadWrite ActivityFeedItemEntity :read :public-perms, :write :public-perms)
;;; ## ---------------------------------------- ACTIVITY FEED ----------------------------------------
(def activity-feed-topics
"The `Set` of topics which are subscribed to and included in the Metabase published activity feed."
#{:card-create
:card-update
:card-delete
:dashboard-create
:dashboard-update
:dashboard-delete
:dashboard-add-cards
:dashboard-remove-cards
:dashboard-reposition-cards})
(def ^:private activity-feed-channel
"Channel for receiving event notifications we want to subscribe to for the activity feed."
(async/chan))
(declare process-activity-event)
(defn start-activity-feed
"Initialize the Activity Feed. This handles and setup required to bootstrap the activity feed system."
[]
(log/info "Starting up Metabase activity feed and listening for things of interest!")
;; create the core.async subscription for each of our activity-feed-topics
(events/subscribe-to-topics activity-feed-topics activity-feed-channel)
;; start listening for events we care about and does something with them
(async/go-loop []
;; try/catch here to get possible exceptions thrown by core.async trying to read from the channel
(try
(process-activity-event (async/<! activity-feed-channel))
(catch Exception e
(log/error "Unexpected error listening on activity-feed-channel" e)))
(recur)))
(defn- process-activity-event
"Handle processing for a single event notification received on the activity-feed-channel"
[activity-event]
;; try/catch here to prevent individual topic processing exceptions from bubbling up. better to handle them here.
(try
(when-let [{topic :topic object :item} activity-event]
;; TODO - real work will include inserting new Activity entries based on the object
(log/info "Activity:" topic)
(clojure.pprint/pprint object))
(catch Exception e
(log/warn (format "Failed to process activity event. %s" (:topic activity-event)) e))))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment