September 22, 2025
By: Kevin

core.async的回顾和最新发展(flow)

  1. 为什么需要 core.async?
  2. core.async 基础
    1. Go 块与 Channel
    2. 缓冲区 (Buffer)
    3. Transducer 与 Pipeline
    4. 错误处理
  3. 进阶 Channel 操作
    1. go 线程池: 暂停(Parking) vs 阻塞(Blocking)
  4. core.async/flow: 构建结构化数据管道
    1. 定义处理节点 (Process)
    2. 创建和连接 Flow
    3. 控制和交互
  5. 结论

clojure.core.async 是 Clojure 处理并发和异步编程的利器. 它借鉴了 Go 语言的 CSP (Communicating Sequential Processes) 模型, 通过轻量级的 go 块和 channel 提供了一种强大而优雅的方式来组织复杂的并发流程.

然而, 当数据处理流程变得越来越复杂, 涉及多个阶段, 分支和合并时, 手动管理所有的 channelgo 块可能会变得繁琐且容易出错. 为了解决这个问题, core.async 引入了一个更高层次的抽象库: core.async/flow.

本文将首先回顾 core.async 的核心基础, 然后深入探讨 flow 库如何建结构化, 可管理, 可监控的数据处理管道.

为什么需要 core.async?

深入 core.async 之前, 先通过一个直观的例子来理解它的核心价值: 轻量级并发.

在传统的基于线程的并发模型中, 每个并发任务通常对应一个操作系统线程. 线程是相对昂贵的资源, 无论是内存占用还是上下文切换的开销都比较大.

让我们用代码来感受一下. 首先, 我们定义两个辅助函数来监控 JVM 的内存和线程使用情况.

(ns exploration
  (:require [clojure.core.async :as a]
            [clojure.core.async.flow :as flow]
            [clojure.core.async.flow-monitor :as fmon]
            [nextjournal.clerk :as clerk])
  (:import [java.lang.management ManagementFactory]
           [java.lang Runtime]))

(defn get-memory-usage
  "查看当前jvm的堆内存, 返回一个map:
  {:total 536, :free 373, :used 163}"
  []
  (let [runtime (Runtime/getRuntime)]
    {:total (int (/ (.totalMemory runtime) 1e6))
     :free (int (/ (.freeMemory runtime) 1e6))
     :used (int (/ (- (.totalMemory runtime) (.freeMemory runtime)) 1e6))}))

(defn get-thread-count "获取总的线程数量" []
  (let [thread-mx-bean (ManagementFactory/getThreadMXBean)]
    (.getThreadCount thread-mx-bean)))

在初始状态下, 我们的 JVM 线程数和内存使用情况如下:

;; 初始线程数
(get-thread-count)
;; => 20 (这个数字会因环境而异)

;; 初始内存使用 (单位: MB)
(clerk/table [ (keys (get-memory-usage)) (vals (get-memory-usage)) ])
:total:free:used
536373163

现在, 我们尝试创建 100 万个 go 块. 每个 go 块都是一个独立的逻辑进程, 它们会等待从一个 channel 中读取数据.

;; **创建百万个 go 进程**
(dotimes [_ 1e6]
  (go
    (a/<! (a/chan))))

再次检查线程和内存:

(get-thread-count)
;; => 29 (只增加了少量线程)

(clerk/table [ (keys (get-memory-usage)) (vals (get-memory-usage)) ])
:total:free:used
536280256

你会发现, 尽管我们启动了 100 万个并发进程, 但实际的线程数几乎没有增加, 内存占用也只是适度增长. 这是因为 go 块在等待(比如 <! 操作)时会 "停车" (park), 让出它所在的线程给其他 go 块使用. core.async 内部维护了一个小规模的线程池来执行这些大量的逻辑进程.

作为对比, 让我们看看创建 1000 个 真实的 Java 线程会发生什么:

;; **创建 1000 个真实线程**
(dotimes [_ 1e3]
  (future
    (Thread/sleep 100000)))

再次检查线程数:

(get-thread-count)
;; => 1029 (线程数急剧增加)

仅仅 1000 个线程就导致了线程数的线性增长. 如果尝试创建 100 万个线程, 系统资源很可能会被耗尽.

这个简单的对比揭示了 core.async 的核心优势: 用少量的系统线程支持海量的并发逻辑单元, 从而实现极高的并发能力和资源利用率.

core.async 基础

在深入 flow 之前, 快速回顾一下 core.async 的一些基本构件.

Go 块与 Channel

  • go: 包裹一段代码, 使其在一个逻辑线程(非操作系统线程)中异步执行.
  • chan: 创建一个 Channel, 它是 go 块之间通信的管道.
  • >!<!: 非阻塞的放入(put)和取出(take)操作. 它们必须在 go 块内部使用. 当 channel 满(对于放入)或空(对于取出)时, go 块会 "停车" 等待, 而不会阻塞底层线程.
  • >!!<!!: 阻塞的放入和取出操作. 它们会阻塞当前线程直到操作完成, 通常用于在 go 块与普通线程代码之间进行桥接.
;; <! 会在 go 中停车等待数据, 不会占用真实线程
(let [requests (a/chan)
      responses (a/chan)]
  (a/go
    (let [req (a/<! requests)]
      ;; 读取后用 >! 异步地将处理结果放入另一个 channel
      (a/>! responses (str "响应:" req))))
  ;; >!! 会阻塞当前线程, 直到有接收者准备好
  (a/>!! requests "ping")
  ;; <!! 同样会阻塞直到读到值
  (a/<!! responses))
;; => "响应:ping"

缓冲区 (Buffer)

默认情况下, channel 是没有缓冲区的. 这意味着放入操作 (>!) 必须与取出操作 (<!) 同步发生. 我们可以为 channel 提供一个缓冲区来解耦生产者和消费者.

  • dropping-buffer: 当缓冲区满时, 新的消息会被直接丢弃.
  • sliding-buffer: 当缓冲区满时, 新的消息会顶掉最旧的消息.
;; dropping-buffer 示例
(def drop-ch (a/chan (a/dropping-buffer 1)))
(a/>!! drop-ch :a)
(a/>!! drop-ch :b) ; 缓冲区已满, :b 被丢弃
(a/<!! drop-ch)
;; => :a

;; sliding-buffer 示例
(def slide-ch (a/chan (a/sliding-buffer 1)))
(a/>!! slide-ch :a)
(a/>!! slide-ch :b) ; :b 顶掉了 :a
(a/<!! slide-ch)
;; => :b

Transducer 与 Pipeline

core.async 与 Clojure 的 transducer 完美集成, 允许在 channel 上直接定义数据转换逻辑, 实现高效的流式处理.

(let [xf (comp (map inc)
               (filter even?)
               (map #(* % %)))
      transducer-ch (a/chan 10 xf)]
  ;; onto-chan! 会自动关闭 channel
  (a/onto-chan! transducer-ch (range 6))
  ;; into 将 channel 的内容收集到集合中
  (a/<!! (a/into [] transducer-ch)))
;; => [4 16 36]

pipeline 则可以将一个转换过程并行化, 进一步提高吞吐量.

(let [xf (comp (map #(* % %))
               (filter odd?))
      in (a/chan 5)
      out (a/chan 5)]
  ;; 使用 4 个并行单元处理数据
  (a/pipeline 4 out xf in)
  (a/onto-chan! in (range 8))
  (a/<!! (a/into [] out)))
;; => [1 9 25 49]

错误处理

go-loop 中使用 try-catch 是处理 channel 流中错误的常见模式. 可以将成功的结果和错误信息分别发送到不同的 channel.

(let [in (a/chan)
      out (a/chan)
      errors (a/chan)]
  (a/go-loop []
    (if-some [value (a/<! in)]
      (do
        (try
          (a/put! out (/ 10 value))
          (catch Throwable t
            (a/put! errors {:input value
                             :message (.getMessage t)})))
        (recur))
      (do
        (a/close! out)
        (a/close! errors))))

  (a/>!! in 2)
  (a/>!! in 0) ; 将导致除零异常
  (a/>!! in 5)
  (a/close! in)

  {:ok-values (a/<!! (a/into [] out))
   :errors    (a/<!! (a/into [] errors))})
;; => {:ok-values [5 2], :errors [{:input 0, :message "Divide by zero"}]}

进阶 Channel 操作

core.async 提供了一系列强大的工具来组合和编排 channel.

  • pipe: 将一个 channel 的输出直接导入另一个 channel.
  • mult / tap: 一对多分发. 创建一个 mult 对象, 然后可以用 tap 将多个目标 channel "挂" 上去, 所有消息都会被广播到所有 tap 上.
  • pub / sub: 基于主题的发布/订阅. 比 mult 更灵活, 消费者可以根据消息内容选择性地订阅.
  • mix / admix: 多对一合并. 将多个源 channel 的消息混合到一个目标 channel 中.
  • alt! / alts!: 从多个 channel 操作中进行选择. 这是 CSP 中非常核心的一个功能, 允许你同时等待多个事件(读或写), 并处理最先就绪的那一个, 避免了复杂的锁和回调.

go 线程池: 暂停(Parking) vs 阻塞(Blocking)

这是一个 core.async 中非常重要且容易混淆的概念.

  • 暂停 (<!, >!): 当 go 块执行 <! 等待数据时, 它会交出控制权, 让底层的线程去执行其他就绪的 go 块. 这个 go 块的上下文被保存起来, 当数据到达时, 它会被唤醒并重新调度到线程池中执行.
  • 阻塞 (<!!, >!!, Thread/sleep): 当在 go 块中执行一个阻塞操作时, 它会霸占整个底层线程, 直到操作完成. 这个线程无法被用于执行其他 go 块.

在版本 1.7.718以前 core.asyncgo 线程池大小是固定的(默认为 8).

core.async 为了引入了flow, 把线程池这个抽象放到了更底层, 而且放开了限制.

新增一个与 thread 类似的 io-thread, 以便用户可以指明一个执行阻塞式 I/O (blocking I/O) 且不含大量计算的工作负载. 在当前的补丁中, 该功能会映射到一个缓存线程池 (cachedThread pool). 在未来的版本中, 如果虚拟线程池 (vthread pool) 可用, 则会动态使用它. 重构 thread-call 的行为, 使其使用 bound-fn 和一个内部函数来将线程结果返回到通道 (channel). 同时对其进行修改, 使其可以接受一个标志 (:io, :compute, :mixed) 来指明预期的工作负载类型, 并根据该标志的值从三个不同的 ExecutorService 实例中选择一个来执行任务. 修改 core.async/thread, 使其在调用内部实现函数时使用 :mixed 标志. 此外, 还新增了一项功能, 允许用户通过系统属性 (sysprop) 提供一个指向执行器服务工厂 (executor service factory) 的限定变量名. 该工厂函数将接受上述标志作为参数, 并返回一个实例或 nil.

本章节以下内容只试用于1.7.718以前版本.

如果所有这些线程都被阻塞操作占用了, 那么整个 go 块调度系统就会被"饿死" , 新的 go 块无法得到执行.

下面的例子演示了这个问题: 我们启动 64 个 go 块, 每个都在内部错误地使用了阻塞的 <!!. 由于 go 线程池(比如只有 8 个线程)被迅速耗尽, 最后用于发送 :done 消息的 go 块根本没有机会执行, 导致超时.

(let [result (a/chan)
      go-count 64]
  (dotimes [_ go-count]
    (a/go
      ;; 错误: <!! 在 go 中会阻塞真实线程
      (a/<!! (a/timeout 200))))
  (a/go (a/>! result :done))
  (let [[value port] (a/alts!! [result (a/timeout 50)])]
    {:value value
     :timed-out? (not= port result) ; => true
     :blocked-go-count go-count}))

正确的方式: 对于那些不得不执行阻塞 I/O 或计算密集型任务的场景, 应该使用 clojure.core.async/thread. 它会为你启动一个新的真实线程来执行代码, 而不会占用宝贵的 go 块线程池.

(let [result (a/chan)
      worker-count 64]
  (dotimes [_ worker-count]
    (a/thread ; a/thread 使用真实线程, 允许阻塞调用
      (a/<!! (a/timeout 200))))
  (a/go (a/>! result :done))
  (let [[value port] (a/alts!! [result (a/timeout 50)])]
    {:value value
     :timed-out? (not= port result) ; => false
     :worker-count worker-count}))

core.async/flow: 构建结构化数据管道

core.async 提供了强大的积木, 而 flow 则提供了一套完整的蓝图和框架, 用于搭建复杂, 可维护的数据处理系统.

flow 的核心思想是将数据处理流程定义为一个由处理节点 (processes)连接 (connections) 组成的有向图.

通过一个具体的例子来理解 flow. 我们将构建一个简单的监控系统:

  1. source: 持续生成随机数.
  2. scheduler: 定时器, 每隔一段时间触发一次.
  3. aggregator: 收集 source 的随机数. 当 scheduler 触发时, 计算并报告平均值. 如果数字超出阈值, 立即发出告警.
  4. printer: 接收并打印告警信息.

定义处理节点 (Process)

一个 flow 的处理节点本质上是一个多 arity (多参数) 的 Clojure 函数, 每个 arity 都有特定用途:

  • ([] ...): 描述. 返回一个 map, 声明该节点的参数 (:params), 输入端口 (:ins) 和输出端口 (:outs).
  • ([args] ...): 初始化. 接收用户提供的参数, 返回节点的初始状态.
  • ([state transition] ...): 状态转换. 处理 :resume, :pause, :stop 等生命周期事件.
  • ([state in msg] ...): 数据处理. 这是核心逻辑, 接收来自特定输入端口 (in) 的消息 (msg), 更新内部状态, 并返回 [新状态, 输出消息map].
;; 数据源: 随机数生成器
(defn source
  ([] {:params {:min "下限", :max "上限", :wait "间隔"}
       :outs {:out "随机数输出"}})
  ([args] (assoc args ::flow/in-ports {:stat (a/chan 100)} :stop (atom false)))
  ([state transition] ... ) ; 处理 resume/stop 来启动/停止生成
  ([state in msg] [state (when (= in :stat) {:out [msg]})]))

;; 聚合器: 计算和告警
(defn aggregator
  ([] {:params {:min "告警下限", :max "告警上限"}
       :ins {:stat "数据输入", :poke "日志触发"}
       :outs {:alert "告警输出"}})
  ([args] (assoc args :vals []))
  ([state transition] state)
  ([{:keys [min max vals] :as state} input-id msg]
   (case input-id
     :stat ... ; 收到数据, 检查是否告警
     :poke ... ; 收到触发信号, 计算平均值并报告
     )))

;; 调度器: 定时触发
(defn scheduler
  ([] {:params {:wait "间隔"}
       :outs {:out "触发输出"}})
  ... )

;; 打印机: 消费最终结果
(defn printer
  ([] {:params {:prefix "日志前缀"}
       :ins {:in "消息输入"}})
  ... )

创建和连接 Flow

定义好所有处理节点后, 我们使用 flow/create-flow 将它们组装起来.

  • :procs 部分定义了图中的所有节点, 并为每个节点提供参数.
  • :conns 部分定义了节点之间的连接, 即数据如何从一个节点的输出端口流向另一个节点的输入端口.
(defn create-flow []
  (flow/create-flow
   {:procs {:generator {:args {:min 0 :max 12 :wait 500} :proc (flow/process #'source)}
            :aggregator {:args {:min 1 :max 10} :proc (flow/process #'aggregator)}
            :scheduler {:args {:wait 3000} :proc (flow/process #'scheduler)}
            :notifier {:args {:prefix "Alert: "} :proc (flow/process #'printer)
                       :chan-opts {:in {:buf-or-n (a/sliding-buffer 3)}}}}
    :conns [;; generator 的 :out 连接到 aggregator 的 :stat
            [[:generator :out] [:aggregator :stat]]
            ;; scheduler 的 :out 连接到 aggregator 的 :poke
            [[:scheduler :out] [:aggregator :poke]]
            ;; aggregator 的 :alert 连接到 notifier 的 :in
            [[:aggregator :alert] [:notifier :in]]]}))

控制和交互

flow 提供了一套完整的 API 来管理整个数据流的生命周期:

(comment
  ;; 创建并启动 flow
  (def f (create-flow))
  (def chs (flow/start f))

  ;; 控制 flow 的状态
  (flow/resume f) ; 开始处理数据
  (flow/pause f)  ; 暂停处理
  (flow/stop f)   ; 停止并清理资源

  ;; 动态注入消息
  ; 手动触发一次 aggregator 的日志报告
  @(flow/inject f [:aggregator :poke] [true])
  ; 手动注入一个会触发告警的数据
  @(flow/inject f [:aggregator :stat] [100])

  ;; 监控
  ; 启动一个 web 服务器来可视化 flow 的状态和吞吐量
  (def server (fmon/start-server {:flow f}))
  (fmon/stop-server server))

结论

clojure.core.async 通过其轻量级的 go 进程和 channel 为我们提供了应对高并发挑战的强大武器. 它让我们能够以顺序化的思维方式编写异步代码, 极大地简化了并发编程的复杂性.

core.async/flow 则在此基础上更进一步, 它是一个"框架的框架". 它提供了一种声明式的方式来定义和组织复杂的多阶段数据处理管道. 通过将业务逻辑, 状态管理和数据流向清晰地分离, flow 使得构建, 测试, 维护和监控大规模异步系统变得前所未有的简单和直观.

如果你正在用 Clojure 构建需要处理复杂数据流的应用程序, 那么 core.async/flow 绝对是你工具箱中不可或缺的一员.

Tags: clojure cider emacs