From 45ed2c6f5765671797a5dc645040822f0a57cd05 Mon Sep 17 00:00:00 2001
From: Oleksandr Yakushev <alex@bytopia.org>
Date: Fri, 6 Sep 2024 15:22:21 +0300
Subject: [PATCH] perf: Optimize some query processor functions (#47658)

* perf: [metabase.util.performance] Faster mapv for small collections

* perf: Optimize some query processor functions
---
 .../middleware/large_int_id.clj               | 46 ++++++++-------
 src/metabase/query_processor/reducible.clj    | 27 ++++-----
 src/metabase/util/performance.clj             | 58 +++++++++++++++++--
 3 files changed, 91 insertions(+), 40 deletions(-)

diff --git a/src/metabase/query_processor/middleware/large_int_id.clj b/src/metabase/query_processor/middleware/large_int_id.clj
index 279b2a9cdbb..d31c62fd5d5 100644
--- a/src/metabase/query_processor/middleware/large_int_id.clj
+++ b/src/metabase/query_processor/middleware/large_int_id.clj
@@ -3,16 +3,17 @@
   (:require
    [metabase.lib.metadata.protocols :as lib.metadata.protocols]
    [metabase.lib.util.match :as lib.util.match]
-   [metabase.query-processor.store :as qp.store]))
+   [metabase.query-processor.store :as qp.store]
+   [metabase.util.performance :as perf]))
 
 (defn- ->string [x]
   (when x
     (str x)))
 
 (defn- result-int->string
-  [field-indexes rf]
+  [field-mask rf]
   ((map (fn [row]
-          (reduce #(update (vec %1) %2 ->string) row field-indexes)))
+          (perf/mapv #(if %2 (->string %1) %1) row field-mask)))
    rf))
 
 (defn- should-convert-to-string? [field]
@@ -21,22 +22,25 @@
        (or (isa? (:base-type field) :type/Integer)
            (isa? (:base-type field) :type/Number))))
 
-(defn- field-indexes [fields]
-  (not-empty
-   (keep-indexed
-    (fn [idx val]
-      ;; TODO -- we could probably fix the rest of #5816 by adding support for
-      ;; `:field` w/ name and removing the PK/FK requirements -- might break
-      ;; the FE client tho.
-      (when-let [field (lib.util.match/match-one val
-                         [:field (field-id :guard integer?) _]
-                         ;; TODO -- can't we use the QP store here? Seems like
-                         ;; we should be able to, but it doesn't work (not
-                         ;; initialized)
-                         (lib.metadata.protocols/field (qp.store/metadata-provider) field-id))]
-        (when (should-convert-to-string? field)
-          idx)))
-    fields)))
+(defn- field-index-mask
+  "Return a mask of booleans for each field. If the mask for the field is true, it should be converted to string."
+  [fields]
+  (let [mask
+        (mapv
+         (fn [val]
+           ;; TODO -- we could probably fix the rest of #5816 by adding support for
+           ;; `:field` w/ name and removing the PK/FK requirements -- might break
+           ;; the FE client tho.
+           (when-let [field (lib.util.match/match-one val
+                              [:field (field-id :guard integer?) _]
+                              ;; TODO -- can't we use the QP store here? Seems like
+                              ;; we should be able to, but it doesn't work (not
+                              ;; initialized)
+                              (lib.metadata.protocols/field (qp.store/metadata-provider) field-id))]
+             (should-convert-to-string? field)))
+         fields)]
+    (when (some true? mask)
+      mask)))
 
 (defn convert-id-to-string
   "Converts any ID (:type/PK and :type/FK) in a result to a string to handle a number > 2^51
@@ -62,7 +66,7 @@
   ;; so, short of turning all `:type/Integer` derived values into strings, this is the best approximation of a fix
   ;; that can be accomplished.
   (let [rff' (when js-int-to-string?
-               (when-let [field-indexes (field-indexes (:fields (:query query)))]
+               (when-let [mask (field-index-mask (:fields (:query query)))]
                  (fn [metadata]
-                   (result-int->string field-indexes (rff metadata)))))]
+                   (result-int->string mask (rff metadata)))))]
     (or rff' rff)))
diff --git a/src/metabase/query_processor/reducible.clj b/src/metabase/query_processor/reducible.clj
index c21b5925558..a3a83807ece 100644
--- a/src/metabase/query_processor/reducible.clj
+++ b/src/metabase/query_processor/reducible.clj
@@ -3,7 +3,8 @@
    [clojure.core.async :as a]
    [metabase.query-processor.pipeline :as qp.pipeline]
    [metabase.util.log :as log]
-   [metabase.util.malli :as mu]))
+   [metabase.util.malli :as mu]
+   [metabase.util.performance :as perf]))
 
 (set! *warn-on-reflection* true)
 
@@ -90,24 +91,24 @@
   [primary-rf     :- ifn?
    additional-rfs :- [:sequential ifn?]
    combine        :- ifn?]
-  (let [additional-accs (volatile! (mapv (fn [rf] (rf))
-                                         additional-rfs))]
+  (let [additional-accs (volatile! (perf/mapv (fn [rf] (rf))
+                                              additional-rfs))]
     (fn combine-additional-reducing-fns-rf*
       ([] (primary-rf))
 
       ([acc]
-       (let [additional-results (map (fn [rf acc]
-                                       (rf (unreduced acc)))
-                                     additional-rfs
-                                     @additional-accs)]
+       (let [additional-results (perf/mapv (fn [rf acc]
+                                             (rf (unreduced acc)))
+                                           additional-rfs
+                                           @additional-accs)]
          (apply combine acc additional-results)))
 
       ([acc x]
        (vswap! additional-accs (fn [accs]
-                                 (mapv (fn [rf acc]
-                                         (if (reduced? acc)
-                                           acc
-                                           (rf acc x)))
-                                       additional-rfs
-                                       accs)))
+                                 (perf/mapv (fn [rf acc]
+                                              (if (reduced? acc)
+                                                acc
+                                                (rf acc x)))
+                                            additional-rfs
+                                            accs)))
        (primary-rf acc x)))))
diff --git a/src/metabase/util/performance.clj b/src/metabase/util/performance.clj
index 3deb78bef9d..f64d28308b8 100644
--- a/src/metabase/util/performance.clj
+++ b/src/metabase/util/performance.clj
@@ -1,6 +1,7 @@
 (ns metabase.util.performance
   "Functions and utilities for faster processing."
-  (:refer-clojure :exclude [reduce mapv]))
+  (:refer-clojure :exclude [reduce mapv])
+  (:import (clojure.lang LazilyPersistentVector RT)))
 
 (set! *warn-on-reflection* true)
 
@@ -59,16 +60,61 @@
                (recur res)))
            res))))))
 
+;; Special case for mapv. If the iterated collection has size <=32, it is more efficient to use object array as
+;; accumulator instead of transients, and then build a vector from it.
+
+(definterface ISmallTransient
+  (conj [x])
+  (persistent []))
+
+(deftype SmallTransientImpl [^objects arr, ^:unsynchronized-mutable ^long cnt]
+  ISmallTransient
+  (conj [this x]
+    (RT/aset arr (unchecked-int cnt) x)
+    (set! cnt (unchecked-inc cnt))
+    this)
+
+  (persistent [_]
+    (LazilyPersistentVector/createOwning arr)))
+
+(defn- small-transient [n]
+  (SmallTransientImpl. (object-array n) 0))
+
+(defn- small-conj!
+  {:inline (fn [st x] `(.conj ~(with-meta st {:tag `ISmallTransient}) ~x))}
+  [^ISmallTransient st x]
+  (.conj st x))
+
+(defn- small-persistent! [^ISmallTransient st]
+  (.persistent st))
+
+(defn- smallest-count
+  (^long [c1 c2] (min (count c1) (count c2)))
+  (^long [c1 c2 c3] (min (min (count c1) (count c2)) (count c3)))
+  (^long [c1 c2 c3 c4] (min (min (count c1) (count c2)) (min (count c3) (count c4)))))
+
 (defn mapv
-  "Like `clojure.core/mapv`, but iterates multiple collections more effectively and uses Java iterators under the hood."
+  "Like `clojure.core/mapv`, but iterates multiple collections more efficiently and uses Java iterators under the hood."
   ([f coll1]
-   (persistent! (reduce #(conj! %1 (f %2)) (transient []) coll1)))
+   (let [n (count coll1)]
+     (cond (= n 0) []
+           (<= n 32) (small-persistent! (reduce #(small-conj! %1 (f %2)) (small-transient n) coll1))
+           :else (persistent! (reduce #(conj! %1 (f %2)) (transient []) coll1)))))
   ([f coll1 coll2]
-   (persistent! (reduce #(conj! %1 (f %2 %3)) (transient []) coll1 coll2)))
+   (let [n (smallest-count coll1 coll2)]
+     (cond (= n 0) []
+           (<= n 32) (small-persistent! (reduce #(small-conj! %1 (f %2 %3)) (small-transient n) coll1 coll2))
+           :else (persistent! (reduce #(conj! %1 (f %2 %3)) (transient []) coll1 coll2)))))
   ([f coll1 coll2 coll3]
-   (persistent! (reduce #(conj! %1 (f %2 %3 %4)) (transient [])  coll1 coll2 coll3)))
+   (let [n (smallest-count coll1 coll2 coll3)]
+     (cond (= n 0) []
+           (<= n 32) (small-persistent! (reduce #(small-conj! %1 (f %2 %3 %4)) (small-transient n) coll1 coll2 coll3))
+           :else (persistent! (reduce #(conj! %1 (f %2 %3 %4)) (transient []) coll1 coll2 coll3)))))
   ([f coll1 coll2 coll3 coll4]
-   (persistent! (reduce #(conj! %1 (f %2 %3 %4 %5)) (transient []) coll1 coll2 coll3 coll4))))
+   (let [n (smallest-count coll1 coll2 coll3 coll4)]
+     (cond (= n 0) []
+           (<= n 32) (small-persistent! (reduce #(small-conj! %1 (f %2 %3 %4 %5)) (small-transient n) coll1 coll2 coll3 coll4))
+           :else (persistent! (reduce #(conj! %1 (f %2 %3 %4 %5)) (transient []) coll1 coll2 coll3 coll4))))))
 
 (defn juxt*
   "Like `clojure.core/juxt`, but accepts a list of functions instead of varargs. Uses more efficient mapping."
-- 
GitLab