core.async 中文文档

Table of Contents

1. 动机

core.async 是 Clojure 的一个 contrib 库, 增加了对使用 channels 进行异步编程的支持.

在所有优秀的程序中, 总有那么一个时刻, 组件或子系统必须停止彼此直接通信. 这通常是通过在数据生产者和数据消费者/处理器之间引入队列来实现的.

这种架构上的间接性确保了重要的决策可以在一定程度上独立做出, 并导致系统更易于理解、管理、监控和更改, 并且能更好地利用计算资源等.

在 JVM 上, java.util.concurrent 包提供了一些优秀的并发阻塞队列.

然而, 为了使用这些队列, 必须为其消费分配一个或多个实际的线程. 每个线程的堆栈分配和任务切换开销限制了实践中可以使用的线程数量. j.u.c. 队列的另一个限制是无法阻塞等待一组备选方案.

而且在 JavaScript 引擎上, 没有线程也没有队列.

线程开销或缺乏线程常常导致人们转向基于事件/回调的系统, 以追求更高的效率 (通常被误称为'可伸缩性', 这不适用, 因为无法伸缩单台机器).

事件将通信和控制流复杂化了. 虽然有各种机制可以使事件/回调更清晰 (FRP, Rx/Observables), 但它们并没有改变其基本性质, 即在一个事件发生时, 会运行任意数量的其他代码 , 可能在同一个线程上, 这导致了诸如“不要在你的处理器中做太多工作”的告诫, 以及“回调地狱”之类的短语.

core.async 的目标是:

  • 提供独立活动线程的设施, 通过类似队列的 channels 进行通信
  • 支持真实线程和线程池的共享使用 (以任何组合), 以及在 JS 引擎上的 ClojureScript
  • 建立在 CSP 及其衍生物的工作之上

我们希望 async channels 能极大地简化高效的服务器端 Clojure 程序, 并为 ClojureScript 中的前端编程提供更简单、更健壮的技术.

1.1. History

这种风格的根源至少可以追溯到 Hoare’s Communicating Sequential Processes (CSP), 随后在例如 occam, Java CSPGo programming language 中得到实现和扩展.

在现代的实现中, channel 的概念成为一等公民, 从而为我们提供了我们寻求的间接性和独立性.

channels 的一个关键特性是它们是阻塞的. 在最原始的形式中, 一个无缓冲的 channel 充当一个会合点, 任何 reader 都会等待 writer, 反之亦然.

可以引入缓冲, 但不鼓励无界缓冲, 因为有界缓冲和阻塞可以是协调步调和反压的重要工具, 确保系统不会承担超出其能力的工作.

1.2. 实现细节

1.2.1. 是一个库

core.async 是一个库. 它不修改 Clojure. 它旨在支持 Clojure 1.5+.

1.2.2. 管道(channel)

可以使用 chan 函数创建一个 channel. 这将返回一个支持多个 writers 和 readers 的 channel. 默认情况下, channel 是无缓冲的, 但可以提供一个数字来表示缓冲区大小 , 或者提供一个通过 buffer, dropping-buffersliding-buffer 创建的 buffer 对象.

channels 上的基本操作是放入和取出值. 这两个操作都可能阻塞, 但阻塞的性质取决于执行操作的控制线程的性质.

core.async 支持两种控制线程 - 普通线程和 IOC (控制反转) 线程. 普通线程可以以任何方式创建, 但 IOC 线程是通过 go blocks 创建的. 因为 JS 没有线程, 所以在 ClojureScript 中只支持 go blocks 和 IOC 线程.

1.2.3. go代码块中的隐式线程

go 是一个宏, 它接受其主体并检查其中是否有任何 channel 操作. 它会将主体变成一个状态机. 当到达任何阻塞操作时, 状态机将被停放(parked), 实际的控制线程将被释放.

这种方法类似于 C# async 中使用的方法. 当阻塞操作完成时, 代码将被恢复 (在一个线程池线程上, 或者在 JS VM 中的唯一线程上).

通过这种方式, 通常会泄露到程序本身的事件/回调系统的控制反转被该机制封装起来, 你剩下的是直接的顺序代码. 它还将为 ClojureScript 提供线程的错觉, 更重要的是, 可分离的顺序子系统.

go blocks 中的主要 channel 操作是 >! (put) 和 <! (take). go block 本身立即返回一个 channel, 它最终会将主体的最后一个表达式的值 (如果非 nil) 放入该 channel, 然后关闭它.

1.2.4. 普通线程上的管道

在普通线程上有类似的操作 - >!! (put blocking) 和 <!! (take blocking), 它们将阻塞调用它们的线程, 直到完成.

虽然可以在用例如 future 创建的线程上使用这些操作, 但还有一个宏, thread, 类似于 go, 它将启动一个一等线程并类似地返回一个 channel, 对于 channel 工作, 应该优先于 future.

1.2.5. 混合使用

你可以从任何一种 >! / >!! 向 channel 放入, 同样地可以用任何一种 <! / <!! 取出, 任意组合, 即 channel 对使用它的线程的性质是无感的.

1.2.6. alt

通常希望能够等待一组 channel 操作中的任何一个 (且仅一个) 完成. 这个强大的功能通过 alts! 函数 (用于 go blocks) 和类似的 alts!! (alts blocking) 提供.

如果有多个操作可以完成, 可以随机选择一个, 或者按优先级选择 (即按它们提供的顺序). 还有相应的 alt!alt!! 宏, 它们将选择与表达式的条件求值结合起来.

1.2.7. Timeouts

Timeouts 只是在一段时间后自动关闭的 channels. 你可以用 timeout 函数创建一个, 然后将 timeout 包含在 alt 变体中.

这样做的一个好处是 timeouts 可以在控制线程之间共享, 例如为了让一组活动共享一个结束条件.

1.2.8. clojure数据可以安全在线程间共享

与 STM 一样, 持久数据结构的普遍使用为 CSP 风格的 channels 提供了特别的好处. 特别是, 将 Clojure 数据结构放入 channel 总是安全且高效的, 不用担心其随后被生产者或消费者使用.

1.2.9. 管道的创建

core.async 与 Go 语言 channels 有明显的相似之处. 与 Go 的一些区别是:

  • 所有操作都是表达式 (不是语句)
  • 这是一个库, 不是语法
  • alts! 是一个函数 (并支持运行时可变数量的操作)
  • alt 中支持优先级

最后, Clojure 存在于宿主平台上, 即我们将这些功能带到现有平台, 而不需要自定义运行时. 另一方面, 我们没有自定义运行时所能拥有的底层支持. 到达现有平台仍然是 Clojure 的核心价值主张.

1.2.10. 和actor模型的关系

我仍然对 actors 不感冒. 它们仍然将生产者与消费者耦合在一起. 是的, 人们可以用 actors 模拟或实现某些类型的队列 (而且, 值得注意的是, 人们经常这样做), 但由于任何 actor 机制已经包含了一个队列, 所以很明显队列更原始.

应该注意的是, Clojure 用于并发使用状态的机制仍然可行, 而 channels 则面向系统的流程方面.

1.2.11. 死锁

请注意, 与其他 Clojure 并发构造不同, channels, 像所有通信一样, 会受到死锁的影响, 最简单的是等待一个永远不会到达的消息, 这必须通过超时等手动处理.

CSP 本身适合某些类型的自动正确性分析. 目前尚未为 core.async 在这方面做任何工作.

另请注意, async channels 并非旨在用于细粒度的计算并行性, 尽管你可能会看到这方面的例子.

1.3. 演进方向

网络 channels 和分发是值得关注的有趣领域. 我们还将进行性能调优和完善 API.

1.4. Team

我要感谢最初帮助 core.async 诞生的团队:

  • Timothy Baldridge
  • Ghadi Shayban
  • Alex Miller
  • Alex Redington
  • Sam Umbach

我希望这些 async channels 能帮助你构建更简单、更健壮的程序.

Rich

2. 参考手册

2.1. 管道

Channels 是携带值并支持多个 writers 和 readers 的队列. Channels 是用 chan 创建的. channel 中的值存储在一个 buffer 中. Buffers 从来不是无界的, 并且提供了几种 buffer 类型:

  • Unbuffered - (chan) - 不使用 buffer, 需要 rendezvous 才能将值从 writer 传递到 reader
  • Fixed size - (chan 10)
  • Dropping - (chan (dropping-buffer 10)) - 固定大小, 满时丢弃最新的值
  • Sliding - (chan (sliding-buffer 10)) - 固定大小, 满时丢弃最旧的值

Channels 是可以像任何其他值一样传递的一等公民值.

Channels 可以选择性地提供一个 transducer 和一个 exception handler. transducer 将应用于通过 channel 的值.

如果提供了 transducer, channel 必须 是缓冲的 (transducers 可以创建必须存储在某处的中间值). Channel transducers 不能阻塞, 无论是通过发出 i/o 操作还是通过外部同步, 否则有阻碍或死锁 go blocks 的风险.

ex-handler 是一个接受一个参数 (一个 Throwable) 的函数. 如果在应用 transducer 时发生异常, 将调用 ex-handler, 任何非 nil 的返回值将被放入 channel.

如果没有提供 ex-handler, 异常将流出并在其发生的地方被处理 (请注意, 这可能在 writer 或 reader 线程中, 取决于操作和 buffer 的状态).

2.1.1. 放(put)和取(take)

任何值都可以放在 channel 上, 除了 nil. channels 上的主要操作是 puttake, 它们有几种变体:

注意: 作为一个助记符, <> 指向值相对于 channel 参数的移动方向. 例如, 在 (>!! chan val) 中, > 指向 channel 内部 (put), 而 (<!! chan) 指向 channel 外部 (take).

用例决定了使用哪种变体. Parking 操作仅在 go blocks 中有效 (详见下文), 并且只能能在 go 的词法作用域内有效. 相反, blocking 操作应仅在 go blocks 之外使用.

异步和非阻塞形式不太常见, 但可以在任一上下文中使用. 使用异步变体来指定一个 channel 和一个在 take 或 put 成功时调用的函数.

take!put! 函数还接受一个可选标志 on-caller?, 以指示 fn 是否可以在当前线程上调用. 非阻塞的 offer!poll! 将要么完成要么立即返回.

Channels 是用 close! 关闭的. 当 channel 关闭时, 不能再添加值, 但 channel 中已有的值可以被取出. 当从关闭的 channel 中取完所有值后, take 操作将返回 nil (这些不是有效值, 而是作为标记).

2.1.2. alts

alts! (parking) 和 alts!! (blocking) 可用于等待一组 channel 操作, 直到其中一个成功. Channel 操作可以是 put (带有一个值) 或 take.

默认情况下, 如果有多个操作变得可用, 它们会以随机顺序被选择, 但设置 :priority true 可以指定一个偏好顺序. 只会发生其中一个操作. 如果没有可用的操作并且指定了 :default val, 则将返回默认值.

由于通常将 alts 与基于所选操作的条件返回结合起来, alt! (parking) 和 alt!! (blocking) 将 alts! 选择与 channel 和值的解构以及结果表达式结合起来.

2.1.3. Promise 管道

Promise channels 是特殊的 channels, 它们只接受单个值. 一旦一个值被放入 promise channel, 所有挂起和未来的消费者将只收到该值. 未来的 puts 会完成但会丢弃值. 当 channel 关闭时, 消费者将永远收到该值 (如果发生过 put) 或 nil (如果未发生 put).

2.2. 管理 processes

2.2.1. go block 和线程

“Processes”, 在最一般的意义上, 要么表示为 go blocks, 要么表示为 threads. Go blocks 模型化了一个可以被“停放”(parked) (暂停) 而不消耗线程的轻量级计算. Go blocks 通过 channels 与外部通信. 任何 core.async parking 操作 (>!, <!, alt!, alts!) 如果不能立即完成, 将导致 block park, 并在操作可以完成时 (当数据到达 channel 以允许它) 自动恢复.

请注意, go blocks 在有限数量的线程上进行多路复用, 并且永远不应被阻塞, 无论是通过使用 core.async blocking 操作 (如 <!!) 还是通过调用可阻塞的 I/O 操作 (如网络调用). 这样做可能会有效地阻塞 go block 池中的所有线程并阻止进一步的进展.

core.async 提供了辅助函数 threadthread-call (类似于 futurefuture-call) 来在单独的线程中异步执行一个 process. 由于这些线程不受限制, 它们适用于阻塞操作, 并且可以通过 channels 与其他 processes 通信. 然而, 请注意, 这些线程并不是特殊的 - 你可以以任何你喜欢的方式创建和管理你自己的线程, 并使用 core.async channels 从这些线程进行通信.

2.2.2. 多线程的 pipeline

pipeline 函数 (及其变体) 旨在将你的工作模型化为一个多线程处理阶段的 pipeline. 各个阶段通过 channels 连接, 每个阶段有 N 个线程执行 transducer xf, 因为值从 from channel 流向 to channel. 变体如下:

  • pipeline - 在 xf 中执行的工作不能阻塞 (专为计算并行性设计). transducer 将独立地应用于每个值, 并行地, 因此有状态的 transducer 函数可能没有用.
  • pipeline-blocking - 在 xf 中执行的工作可能会阻塞, 例如在网络操作上.
  • pipeline-async - 此变体在另一个系统或线程中触发异步工作, 并期望另一个线程将结果放在返回 channel 上.
  • Pipeline ops: pipeline pipeline-blocking pipeline-async

2.3. 使用 channel

2.3.1. channel 上的操作

2.3.2. Channel 连接器

  • Connecting channels: pipe
  • Merging channels: merge
  • Splitting channels: split

2.3.3. Mults

2.3.4. Pub/sub

2.3.5. Mixes

2.4. 配置

2.4.1. go checking

因为 core.async go block 线程池是固定大小的, 所以不应在 go blocks 中执行阻塞 IO 操作. 如果所有 go 线程都阻塞在阻塞操作上, 你可能会遇到死锁或缺乏进展.

一个常见的问题是在 go blocks 内部使用 core.async 阻塞操作. core.async 包含一个调试设施来检测这种情况 (其他类型的阻塞操作无法检测, 因此这只涵盖了问题的一部分). 要启用 go checking, 请设置 Java 系统属性 clojure.core.async.go-checking=true. 此属性在命名空间加载时只读取一次, 应在开发或测试中使用, 而不是在生产中使用.

当 go checking 处于活动状态时, go block 中的无效阻塞调用将在 go block 线程中抛出异常. 默认情况下, 这些异常可能会抛出到 go block 线程的未捕获异常处理程序并被打印出来, 但你可以使用 Thread/setDefaultUncaughtExceptionHandler 来更改默认行为 (或者根据你的系统, 你可能已经有一个将其路由到日志记录的).

3. 流程引导

3.1. 起手

core.async 库通过使用 channels 支持异步编程.

要使用 core.async, 请声明对 Clojure 1.10.0 或更高版本以及最新的 core.async 库的依赖:

{:deps
 {org.clojure/clojure {:mvn/version "1.12.0"}
  org.clojure/core.async {:mvn/version "1.8.741"}}}

要在 REPL 中开始使用 core.async, 请 require clojure.core.async 命名空间:

(require '[clojure.core.async :as a :refer [<!! >!! <! >!]])

或者在你的命名空间中包含它:

(ns my.ns
  (:require [clojure.core.async :as a :refer [<!! >!! <! >!]]))

3.2. 通道

值在类似队列的 channels 上传递. 默认情况下, channels 是无缓冲的 (长度为 0) - 它们要求生产者和消费者会合以通过 channel 传输值.

使用 chan 创建一个无缓冲的 channel:

(a/chan)

传递一个数字来创建一个具有固定缓冲区大小的 channel:

(a/chan 10)

close! 一个 channel 以停止接受 puts. 剩余的值仍然可以被 take. 耗尽的 channels 在 take 时返回 nil. nil 不能显式地通过 channel 发送!

(let [c (a/chan)]
  (a/close! c))

Channels 也可以使用自定义的 buffers, 这些 buffers 对于“满”的情况有不同的策略. API 中提供了两个有用的例子.

;; 使用 `dropping-buffer` 在 buffer 满时丢弃最新的值:
(a/chan (a/dropping-buffer 10))

;; 使用 `sliding-buffer` 在 buffer 满时丢弃最旧的值:
(a/chan (a/sliding-buffer 10))

3.3. 线程

在普通线程中, 我们使用 >!! (阻塞 put) 和 <!! (阻塞 take) 通过 channels 进行通信.

(let [c (a/chan 10)]
  (>!! c "hello")
  (assert (= "hello" (<!! c)))
  (a/close! c))

因为这些是阻塞调用, 如果我们试图向一个无缓冲的 channel put, 我们将阻塞主线程. 我们可以使用 thread (像 future 一样) 在一个池线程中执行一个 body, 并返回一个带有结果的 channel. 这里我们启动一个后台任务向一个 channel put “hello”, 然后在当前线程中读取该值.

(let [c (a/chan)]
  (a/thread (>!! c "hello"))
  (assert (= "hello" (<!! c)))
  (a/close! c))

3.4. Go Block和受控线程

go 宏在其 body 中异步执行, 该 body 在一个特殊的线程池中运行. 会阻塞的 Channel 操作将暂停执行, 而不是阻塞任何线程. 这种机制封装了在事件/回调系统中外部化的控制反转. 在 go blocks 内部, 我们使用 >! (put) 和 <! (take).

这里我们将我们之前的 channel 示例转换为使用 go blocks:

(let [c (a/chan)]
  (a/go (>! c "hello"))
  (assert (= "hello" (<!! (a/go (<! c)))))
  (a/close! c))

我们为生产者使用了一个 go block, 而不是显式的线程和阻塞调用. 消费者使用一个 go block 来 take, 然后返回一个结果 channel, 我们从该 channel 进行阻塞 take.

3.5. Alts

channels 相对于 queues 的一个杀手级特性是能够同时等待多个 channels (就像 socket select 一样). 这是通过 alts!! (普通线程) 或在 go blocks 中使用 alts! 来完成的.

我们可以创建一个带有 alts 的后台线程, 它结合了两个 channels 中任意一个的输入. alts!! 接受一组要执行的操作 - 要么是一个要 take 的 channel, 要么是一个要 put 的 channel 值, 并返回成功的值 (put 时为 nil) 和 channel:

(let [c1 (a/chan)
      c2 (a/chan)]
  (a/thread (while true
              (let [[v ch] (a/alts!! [c1 c2])]
                (println "Read" v "from" ch))))
  (>!! c1 "hi")
  (>!! c2 "there"))

打印 (在 stdout 上, 可能在你的 repl 中不可见):

Read hi from #object[clojure.core.async.impl.channels.ManyToManyChannel ...]
Read there from #object[clojure.core.async.impl.channels.ManyToManyChannel ...]

我们可以使用 alts! 来用 go blocks 做同样的事情:

(let [c1 (a/chan)
      c2 (a/chan)]
  (a/go (while true
          (let [[v ch] (a/alts! [c1 c2])]
            (println "Read" v "from" ch))))
  (a/go (>! c1 "hi"))
  (a/go (>! c2 "there")))

因为 go blocks 是不绑定到线程的轻量级 processes, 我们可以有很多! 这里我们创建 1000 个 go blocks, 它们在 1000 个 channels 上说 hi. 我们使用 alts!! 来在它们准备好时读取它们.

(let [n 1000
      cs (repeatedly n a/chan)
      begin (System/currentTimeMillis)]
  (doseq [c cs] (a/go (>! c "hi")))
  (dotimes [i n]
    (let [[v c] (a/alts!! cs)]
      (assert (= "hi" v))))
  (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))

timeout 创建一个等待指定毫秒数然后关闭的 channel:

(let [t (a/timeout 100)
      begin (System/currentTimeMillis)]
  (<!! t)
  (println "Waited" (- (System/currentTimeMillis) begin)))

我们可以将 timeout 与 alts! 结合起来做定时的 channel 等待. 这里我们等待 100 毫秒, 等待一个值到达 channel, 然后放弃:

(let [c (a/chan)
      begin (System/currentTimeMillis)]
  (a/alts!! [c (a/timeout 100)])
  (println "Gave up after" (- (System/currentTimeMillis) begin)))

4. 流(Flow)

4.1. 动机

core.asyncrationale 中提到 “在所有优秀的程序中, 总有那么一个时刻, 组件或子系统必须停止彼此直接通信.” 而 core.async 提供了实现这一点的基本工具 (channels).

但是, 要很好地使用 core.async, 例如将 I/O 与计算逻辑分开, 需要纪律和架构上的智慧, 并且要在整个应用程序或组织中保持一致性, 还需要约定. 有了 channels, 许多关于线程执行、背压、错误处理等方面的架构决策仍然存在. 而且, 你的通信 processes 网络的拓扑结构往往是在程序的控制流中 涌现 出来的, 因为各种代码片段会创建线程并将 channels 连接在一起, 与计算交织在一起, 使得难以看清拓扑结构或在一个地方对其进行管理.

core.async.flow 的基本目标是实现应用程序逻辑与其拓扑、执行、通信、生命周期、监控和错误处理的严格分离, 所有这些都由 c.a.flow 提供并集中化, 从而产生更一致、更健壮、可测试、可观察和可操作的系统.

4.2. 概览

core.async.flow 提供了另外两个抽象的 具体 实现——'process' (一个活动线程) 和 'flow' (一个通过 channels 通信的 processes 有向图). 单个数据结构描述了你的 flow 拓扑, 并包含了线程、channels 等的所有设置. 一个 process 从 channels 接受数据并向 channels 提供数据. c.a.flow 中的 process 实现处理所有 channel I/O、线程生命周期以及与 flow 图的协调.

在你的应用程序中, 你需要做的就是:

  1. 定义普通的、通常是纯粹的 data->data 函数, processes 将在其内部循环中运行这些函数来执行处理消息的 计算 部分 (也称为 'step' 函数). 这些函数不处理 channels 或线程或生命周期, 甚至不知道它们在 flow 中运行. 它们可以被独立测试, 并且可以热重载. 如果它们遇到问题, 它们可以而且应该直接抛出异常. process 会从那里处理它.
  2. 通过创建一个数据结构来定义一个 flow, 该数据结构枚举了 processes 以及它们输入和输出之间的连接, 以及两者的各种配置设置.

flow-concerns.png?raw=true

Figure 1: core.async.flow concerns

有了这些应用程序输入, c.a.flow 会完成剩下的工作. 它会询问 processes 需要哪些 channels, 创建这些 channels, 然后实例化 processes, 在它们之间建立所有的 channel 连接. processes 进而启动线程 (在完全用户可配置的线程池中), 等待输入, 监控管理控制 channel, 当输入到达时, 对你的应用程序逻辑进行 data->data 调用, 将其返回值发送到指定的输出 channels. processes 遵循一个协议, flow 用它来做生命周期管理和错误处理.

一旦你创建了一个 flow, API 提供了启动/停止(关闭) flow 的函数, 以及暂停/恢复 flow 和单个 processes 的函数, ping processes 以获取其状态及其连接的 channels 的状态, 向图中的任何点注入数据等. flow 提供了包含普通监控/报告流和独立的错误流的 channels.

该库提供了更多细节和功能, 包括通过普通函数创建充当 sources (来自 flow 外部的数据) 或 sinks (到 flow 外部的接收者) 的 processes 的能力, 这样你就可以将你的 flow 置于更广泛的上下文中, 同时仍然与 flow 生命周期协调资源管理.

我希望 core.async.flow 能让你编写更健壮、更小型的应用程序, 更多地关注你的领域逻辑, 而不是管道工程.

Rich Hickey 1/2025

5. Flow指南

5.1. 介绍

flow 库实现了应用程序逻辑与拓扑、执行、通信、生命周期、监控和错误处理等部署问题的严格分离.

5.2. Step fns 和 process launchers

你以 step-fns 的形式向 flow 提供逻辑, 这些 step-fns 被包装成运行中的 processes, 在一个循环中执行. Flow 管理 process 的生命周期, 并通过在 channels 上放置或获取消息来处理传入和传出的消息. Step-fns 不直接访问 channels 或持有状态, 这使得它们易于独立测试和重用.

Step functions 有四个 arities:

step-fn-arities.png?raw=true

Figure 2: step-fn arities

5.2.1. describe: (step-fn) -> descriptor

describe arity 必须返回一个 step-fn 的 :params, :ins, 和 :outs 的静态描述. 其中每一个都是一个从名称 (一个关键字) 到文档字符串的 map.

例如, describe arity 可能会为一个简单的 step-fn 返回以下描述:

{:params {:size "Max size"}       ;; step-fn params
 :ins {:in "Input channel"}       ;; input channels
 :outs {:out "Output channel"}}   ;; output channels

用于输入和输出 channels 的名称应该是不同的 (没有重叠).

5.2.2. init: (step-fn arg-map) -> init-state

init arity 被 process 调用一次, 它从 flow def 中获取一组参数 (对应于 describe arity 返回的 params), 并返回 process 的初始状态.

5.2.3. transition: (step-fn state transition) -> state'

每当 flow 或 process 经历生命周期转换 (::flow/start, ::flow/stop, ::flow/pause, ::flow/resume) 时, 都会调用 transition arity. transition arity 接受当前状态并返回一个更新后的状态, 用于后续调用.

step-fn 应使用 transition arity 来协调 process 中外部资源的创建、暂停和关闭.

5.2.4. transform: (step-fn state input msg) -> [state' {out-id [msgs]}]

transform arity 在 process 的循环中为每个在输入 channel 上接收到的消息调用, 并返回一个新的状态和一个从输出 cid 到要返回的消息的 map. process 将负责将这些消息发送到输出 channels. 输出可以发送到枚举的 :outs 中的零个、任意个或所有个, 和/或由一个 [pid inid] 元组命名的输入 (例如用于 reply-to), 和/或到 ::flow/report 输出. 一个 step 不必输出任何东西 (output 或 msgs 可以是空/nil), 然而, 一个输出 消息 永远不能是 nil (根据 core.async channels).

step-fn 可能会从任何 arity 抛出异常, 它们将由 flow 处理. 如果从 transition 或 transform arities 抛出异常, 异常将被记录在 flow 的 :error-chan 上.

5.2.5. Process 状态

process state 是一个 map. 它可以包含 step-fn transition 和 transform arities 所需的任何键. 此外, 还有一些 flow 特定的键, 在这里描述.

::flow/pid 是由 process 根据 flow def 中提供的名称添加到 state 中的.

::flow/in-ports::flow/out-ports 是从 cid 到外部 channel 的 map, 可选地在 init arity 的初始状态中返回. in-ports 和 out-ports 用于将 source 和 sink processes 连接到外部 channels. 这些 channels 必须由 step-fn 提供并在 init arity map 中返回, 可以通过创建 channel 或使用通过 flow def init args 传递给 process 的 channel 来实现. flow 不管理这些 channels 的生命周期.

::flow/input-filter, 一个 cid 的谓词, 可以在任何 arity 的 state 中返回, 以指示对 process 输入 channel 读取集的过滤. 例如, 一个正在等待来自多个输入的响应的 step-fn 可能会从读取集中移除已经响应的 channels, 直到收到所有响应.

5.2.6. step-fn 帮助

存在一些额外的辅助函数来从其他形式创建 step-fns:

  • lift*->step - 给定一个接受一个参数并返回一个非 nil 值集合的函数 f, 创建一个 process launcher 所需的 step-fn, 它有一个输入和一个输出 (名为 :in 和 :out), 并且没有状态.
  • lift1->step - 类似于 lift*->step, 但用于返回单个值的函数 (当 nil 时, 不产生输出).
  • map->step - 给定一个键为 :describe, :init, :transition, :transform 的 map, 对应于上面的 arities, 创建一个 step-fn.

5.2.7. 创建process launcher

可以使用 process 函数创建 Process launchers, 该函数接受一个 step-fn 和一个包含以下键的选项 map:

  • ::workload - :mixed, :io:compute 之一
  • :compute-timeout-ms - 如果 :workload 是 :compute, 此超时 (默认为 5000 毫秒) 将用于从 future 获取返回值 - 见下文

作为选项提供给 process 的 :workload 将覆盖 process launcher 的 :describe fn 返回的任何 :workload. 如果两者都未提供, 则默认为 :mixed.

在 :mixed 或 :io 的 :workload 上下文中, 这决定了 process 循环将在其中运行的线程类型, 包括其对 transform 的调用.

当指定 :io 时, transform 不应进行大量的计算.

当指定 :compute 时, 每次调用 transform 将在一个单独的线程中运行. process 循环将在一个 :io 上下文中运行 (因为它不再直接调用 transform, 它所做的只是 I/O), 它将把 transform 提交给 :compute executor, 然后等待 (阻塞, 等待 compute-timeout-ms) executor 返回的 future 完成. 如果 future 超时, 它将在 ::flow/error 上报告.

当指定 :compute 时, transform 绝不能阻塞!

请注意, process launchers 是由 ProcLauncher 协议定义的. 虽然你通常会使用 process 来创建 process launcher, 但高级用法也可能直接实现该协议.

5.2.8. 重载step-fn

因为 step-fn 是在循环中调用的, 所以一个好的做法是在一个 var 中定义 step-fn, 并使用 var (#'the-fn) 而不是函数值本身 (the-fn). 这种做法通过允许在 flow 运行时从 repl 重新绑定 var 来支持交互式开发.

5.3. Flow def

step-fns 是你为 flow 中每个 process 提供代码的方式. 你必须提供的另一件事是 flow 配置, 它将 proc launchers 和它们之间的连接联系在一起.

这个 flow 定义被提供给 create-flow 函数, 它由一个包含 :procs, :conns 的 map 组成, 并且可选地包含一些工作流 executors.

:procs 是一个 pid -> proc-def 的 map. proc-def 是一个包含 :proc (process launcher), :args (传递给 step-fn 的 init arity) 和 :chan-opts (可用于指定 channel 属性) 的 map.

:conns 是一个 [[from-pid outid] [to-pid inid]] 元组的集合. 输入和输出支持多个连接. 当一个输出被多次连接时, 每个连接都会收到每条消息, 这与 core.async/mult 的行为一致.

一个示例 flow 定义可能如下所示, 用于一个具有两个 procs 的 flow,其中 in-chan 和 out-chan 通过 source 和 sink args 传递:

{:procs {:source-proc {:proc (process #'source-fn)
                       :args {:source-chan in-chan}}
         :sink-proc   {:proc (process #'sink-fn)
                       :args {:sink-chan out-chan}}}
 :conns [ [[:source-proc :out] [:sink-proc :in]] ]}

通过将 flow 定义传递给 create-flow 来创建 flow.

返回的 flow 对象可以传递给生命周期方法 (见下文). 此外, flow 还可以与 datafy 一起使用, 以获取 flow 的数据化描述. 这是一个静态视图 - 动态视图请参见后面描述的 ping.

5.4. Flow lifecycle

当一个 flow 被创建时, 它以 resumed 状态启动. 可以使用以下 flow 函数来更改 flow 生命周期状态:

  • start - 启动 flow 中的所有 procs, 返回一个包含 :report-chan:error-chan 的 map
  • stop - 停止 flow 中的所有 procs
  • pause - 暂停 flow 中的所有 procs
  • resume - 恢复 flow 中的所有 procs
  • pause-proc - 暂停单个 proc
  • resume-proc - 恢复单个 proc

你还可以使用这些函数来 ping 正在运行的 processes 并返回它们当前的状态和 status:

  • ping - Pings 所有 procs 并返回其 status 的 map
  • ping-proc - 按 pid ping 单个 proc 并返回 status 的 map

此函数可用于向任意 [pid cid] channel 注入消息:

  • inject - 向 flow 中的任何 coord 注入消息

start 返回的 map 具有 flow 的 report 和 error channels. Procs 可以向 :report-chan 输出消息, 以实现整个 flow 的统一日志记录. 由 step-fn 或 flow 中的 procs 抛出的异常都记录到 :error-chan.

5.5. Flow monitor

有关如何使用 flow-monitor 工具, 请参阅 core.async.flow-monitor.

6. clojure.core.async

用于异步编程和通信的设施.

将 Java 系统属性 clojure.core.async.go-checking 设置为 true 可以验证 go block 是否调用了 core.async 的阻塞操作. 该属性在命名空间加载时只读取一次. 建议主要在开发期间使用. 无效的阻塞调用将在 go block 线程中抛出异常 - 使用 Thread.setDefaultUncaughtExceptionHandler() 来捕获和处理.

使用 Java 系统属性 clojure.core.async.executor-factory 来指定一个函数, 该函数将为 core.async 在整个应用程序范围内 提供 ExecutorServices, 以代替其默认设置. 属性值应命名一个 完全限定的 var. 该函数将被传递一个关键字, 指示 executor 的使用上下文, 并应返回一个 ExecutorService, 或 nil 以使用默认值. 每个关键字的结果将被缓存并在 应用程序的其余部分使用. 可能的上下文参数有:

:io - 用于 async/io-thread, flow/process 中的 :io 工作负载, 以及如果没有提供显式分派处理程序时的分派处理 (见下文)

:mixed - 用于 async/thread 和 flow/process 中的 :mixed 工作负载

:compute - 用于 flow/process 中的 :compute 工作负载

:core-async-dispatch - 用于整个 core.async 中的完成函数处理 (例如在 put! 和 take! 中, 以及 go block IOC thunk 处理). 如果未提供, 将使用 :io 的 ExecutorService.

未来上下文集合可能会增长, 因此函数应 对意外的上下文返回 nil.

使用 Java 系统属性 clojure.core.async.vthreads 来控制 core.async 如何使用 JDK 21+ 的虚拟线程. 该属性可以是 以下值之一:

unset - core.async 将在可用时 (≥ Java 21) 机会性地使用 vthreads, 否则将使用旧的 IOC 实现. 如果 vthreads 不可用, io-thread 和 :io 线程池将在平台线程上运行. 如果进行 AOT 编译, go blocks 将始终使用 IOC, 以便 生成的字节码在所有 JVM 上都能工作 (因此编译输出没有变化)

"target" - 意味着你正在以虚拟线程为目标. 在运行时 从源代码, 如果 vthreads 不可用, go blocks 将会抛出异常. 如果进行 AOT 编译, go blocks 总是被编译为在 vthreads 上运行的 普通 Clojure 代码, 如果 vthreads 不可用 (Java < 21), 在运行时将会抛出异常.

"avoid" - 意味着 vthreads 将不会被 core.async 使用 - 你可以 用这个来最小化影响, 如果你还没有准备好在你的应用中利用 vthreads. 如果进行 AOT 编译, go blocks 将使用 IOC. 在运行时, io-thread 和 :io 线程池使用平台线程.

注意: 来自旧版 core.async 的现有 IOC 编译的 go blocks 仍然可以工作 (我们保留并加载 IOC 状态机运行时 - 这不需要分析器), 你可以从 IOC 和 vthread 代码与相同的 channels 交互.

6.1. <!

(<! port)

从 port 中取一个值. 必须在 (go …) block 内部调用, 或在 一个虚拟线程上调用 (无论它是如何启动的). 如果已关闭, 将返回 nil. 如果没有可用的值, 将会 park.

6.2. <!!

(<!! port)

从 port 中取一个值. 如果已关闭, 将返回 nil. 如果没有可用的值, 将会阻塞. 不应在 (go …) block 的直接或间接调用中使用. 使用 clojure.core.async.go-checking 标志来检测无效使用 (见 命名空间文档).

6.3. >!

(>! port val)

将一个值放入 port. 不允许 nil 值. 必须在 (go …) block 内部调用, 或在一个虚拟线程上调用 (无论它是如何 启动的). 如果没有可用的缓冲区空间, 将会 park. 除非 port 已经关闭, 否则返回 true.

6.4. >!!

(>!! port val)

将一个值放入 port. 不允许 nil 值. 如果没有 可用的缓冲区空间, 将会阻塞. 除非 port 已经关闭, 否则返回 true. 不应在 (go …) block 的直接或间接调用中使用. 使用 clojure.core.async.go-checking 标志来检测无效使用 (见 命名空间文档).

6.5. admix

(admix mix ch)

将 ch 添加为 mix 的一个输入.

6.6. alt! (macro)

(alt! & clauses)

在多个 channel 操作之间做出单一选择, 就像通过 alts! 一样, 返回与 完成的操作相对应的结果表达式的值. 必须在 (go …) block 内调用.

每个子句的形式为:

channel-op[s] result-expr

其中 channel-ops 是以下之一:

take-port - 要从中 take 的单个 port [take-port | [put-port put-val] …] - 一个如 alts! 中所定义的 ports 向量 :default | :priority - alts! 的一个选项

而 result-expr 是一个以向量开头的列表, 此时该 向量将被视为操作返回的 [val port] 的绑定, 否则为任何其他表达式.

(alt!
  [c t] ([val ch] (foo ch val))
  x ([v] v)
  [[out val]] :wrote
  :default 42)

每个选项最多出现一次. 选择和 parking 特性与 alts! 相同.

6.7. alt!! (macro)

(alt!! & clauses)

与 alt! 类似, 但就像通过 alts!! 一样, 将会阻塞直到完成, 并且 不应在 (go …) block 中使用.

6.8. alts!

(alts! ports & {:as opts})

最多完成多个 channel 操作中的一个. 必须在 (go …) block 内部调用, 或在一个虚拟线程上调用 (无论它是如何 启动的). ports 是一个 channel 端点的向量, 可以是 要从中 take 的 channel, 也可以是 [channel-to-put-to val-to-put] 的向量, 可以任意组合. take 将像通过 <! 一样进行, put 将像 通过 >! 一样进行. 除非 :priority 选项为 true, 否则如果多个 port 操作已准备就绪, 将进行非确定性选择. 如果没有 操作准备就绪且提供了 :default 值, 将返回 [default-val :default], 否则 alts! 将 park 直到第一个准备就绪的操作 完成. 返回已完成操作的 [val port], 其中 val 是 take 操作取出的值, 对于 put 操作则是布尔值 (true, 除非已关闭, 与 put! 一样).

opts 以 :key val … 的形式传递. 支持的选项:

:default val - 如果没有操作立即准备就绪, 则使用的值 :priority true - (默认为 nil) 当为 true 时, 操作将按顺序尝试.

注意: 不保证 port exps 或 val exprs 会被 使用, 也不保证它们的顺序, 所以不应 依赖它们的副作用.

6.9. alts!!

(alts!! ports & opts)

与 alts! 类似, 但 take 将像通过 <!! 一样进行, put 将 像通过 >!! 一样进行, 将会阻塞直到完成. 不应在 (go …) block 的直接或间接调用中使用. 使用 clojure.core.async.go-checking 标志来检测无效使用 (见 命名空间文档).

6.10. buffer

(buffer n)

返回一个大小为 n 的固定缓冲区. 当满时, puts 将会阻塞/park.

6.11. chan

(chan)
(chan buf-or-n)
(chan buf-or-n xform)
(chan buf-or-n xform ex-handler)

创建一个 channel, 可选地带有一个缓冲区, 一个可选的 transducer (如 (map f), (filter p) 等或其组合), 以及一个 可选的异常处理器. 如果 buf-or-n 是一个数字, 将创建 并使用一个该大小的固定缓冲区. 如果提供了 transducer, 则 必须指定一个缓冲区. ex-handler 必须是一个接受一个参数的函数 - 如果在转换过程中发生异常, 它将被调用, 参数为 Throwable, 任何非 nil 的返回值将被 放入 channel 中. Channels 实现了 Datafiable; 使用 datafy 可以获取 channel 及其缓冲区的状态.

6.12. close!

(close! chan)

关闭一个 channel. 该 channel 将不再接受任何 puts (它们 将被忽略). channel 中的数据仍然可以被 take, 直到 取尽, 之后 takes 将返回 nil. 如果有任何 挂起的 takes, 它们将被分派并得到 nil. 关闭一个已关闭的 channel 是一个无操作. 返回 nil.

逻辑上, 关闭发生在所有 puts 都被交付之后. 因此, 任何 被阻塞或 parked 的 puts 将保持阻塞/parked, 直到有 taker 释放它们.

6.13. defblockingop (macro)

(defblockingop op doc arglist & body)

6.14. defparkingop (macro)

(defparkingop op doc arglist)

生成一个 Var, 其函数检查是否在虚拟线程中运行. 如果是, 则 将调用相关的阻塞操作, 否则函数将抛出异常.

6.15. do-alt

(do-alt alts clauses)

6.16. do-alts

(do-alts fret ports opts)

如果立即完成, 返回可解引用的 [val port], 如果进入队列则返回 nil.

6.17. dropping-buffer

(dropping-buffer n)

返回一个大小为 n 的缓冲区. 当满时, puts 将完成但 值将被丢弃 (不传输).

6.18. fn-handler

(fn-handler f)
(fn-handler f blockable)

6.19. go (macro)

(go & body)

异步执行 body, 立即返回到 调用线程. 此外, body 中任何可见的 <!, >! 和 alt!/alts! channel 操作将在 (必要时) 通过 'parking' 调用线程来阻塞, 而不是占用一个操作系统线程 (或 在 ClojureScript 中的唯一 JS 线程). 操作完成后, body 将被恢复.

go blocks 不应 (直接或间接) 执行 可能无限期阻塞的操作. 这样做有耗尽 固定大小的 go block 线程池的风险, 导致所有 go block 处理停止. 这包括 core.async 阻塞操作 (以 !! 结尾的) 和其他阻塞 IO.

返回一个 channel, 该 channel 将在 body 完成时接收其结果.

6.20. go-loop (macro)

(go-loop bindings & body)

类似于 (go (loop …)).

6.21. into

(into coll ch)

返回一个 channel, 其中包含从 channel 中取出的 所有项 conjoin 到所提供集合的单一 (集合) 结果. ch 必须在 into 产生结果之前关闭.

6.22. io-thread (macro)

(io-thread & body)

在一个线程中执行 body, 立即返回到调用线程. body 可以进行阻塞 I/O, 但不能进行长时间的计算. 返回一个 channel, 该 channel 将在 body 完成时接收其结果, 然后关闭.

6.23. ioc-alts!

(ioc-alts! state cont-block ports & {:as opts})

6.24. map

(map f chs)
(map f chs buf-or-n)

接受一个函数和一组源 channels, 并返回一个 channel, 其中包含将 f 应用于从每个源 channel 中取出的第一组项所产生的值, 接着是将 f 应用于每个 channel 的第二组项, 依此类推, 直到任何一个 channels 关闭, 此时输出 channel 将被 关闭. 返回的 channel 默认是无缓冲的, 或者 可以提供一个 buf-or-n.

6.25. merge

(merge chs)
(merge chs buf-or-n)

接受一组源 channels, 并返回一个 channel, 其中 包含从它们中取出的所有值. 返回的 channel 将 默认是无缓冲的, 或者可以提供一个 buf-or-n. 该 channel 将在所有源 channels 关闭后关闭.

6.26. Mix (protocol)

  1. members

6.26.1. admix*

(admix* m ch)

6.26.2. solo-mode*

(solo-mode* m mode)

6.26.3. toggle*

(toggle* m state-map)

6.26.4. unmix*

(unmix* m ch)

6.26.5. unmix-all*

(unmix-all* m)

6.27. mix

(mix out)

创建并返回一个或多个输入 channels 的 mix, 这些 channels 的内容将被 放入提供的 out channel. 输入源可以通过 'admix' 添加到 mix 中, 并通过 'unmix' 移除. mix 支持 使用 'toggle' 原子地对多个输入进行 soloing, muting 和 pausing, 并且可以使用 'solo-mode' 决定是使用 muting 还是 pausing 来进行 solo.

每个 channel 可以通过 'toggle' 设置零个或多个布尔模式:

:solo - 当为 true 时, 只有这个 (以及其他 soloed 的) channel(s) 会出现 在 mix 输出 channel 中. soloed channels 的 :mute 和 :pause 状态 被忽略. 如果 solo-mode 是 :mute, 非 soloed channels 被 muted; 如果是 :pause, 非 soloed channels 被 paused.

:mute - muted channels 的内容将被消费, 但不包括在 mix 中 :pause - paused channels 的内容将不被消费 (因此也不包括在 mix 中)

6.28. Mult (protocol)

  1. members

6.28.1. tap*

(tap* m ch close?)

6.28.2. untap*

(untap* m ch)

6.28.3. untap-all*

(untap-all* m)

6.29. mult

(mult ch)

创建并返回所提供 channel 的 mult(iple). 包含 channel 副本的 channels 可以用 'tap' 创建, 并用 'untap' 分离.

每个项目都并行且同步地分发给所有 taps, 即每个 tap 必须接受后才能分发下一个项目. 使用 缓冲/窗口化来防止慢的 taps 拖慢 mult.

当没有 taps 时收到的项目将被丢弃.

如果一个 tap 向一个已关闭的 channel put, 它将被从 mult 中移除.

6.30. Mux (protocol)

  1. members

6.30.1. muxch*

(muxch* _)

6.31. offer!

(offer! port val)

如果可以立即将一个值放入 port, 则放入. 不允许 nil 值. 从不阻塞. 如果 offer 成功, 返回 true.

6.32. onto-chan (deprecated in 1.2)

(onto-chan ch coll)
(onto-chan ch coll close?)

已弃用 - 使用 onto-chan! 或 onto-chan!!

6.33. onto-chan!

(onto-chan! ch coll)
(onto-chan! ch coll close?)

将 coll 的内容放入提供的 channel.

默认情况下, 复制完项目后 channel 将被关闭, 但这可以由 close? 参数决定.

返回一个在项目复制完后会关闭的 channel.

如果访问 coll 可能会阻塞, 请改用 onto-chan!!.

6.34. onto-chan!!

(onto-chan!! ch coll)
(onto-chan!! ch coll close?)

与 onto-chan! 类似, 用于访问 coll 可能会阻塞的情况, 例如一个由阻塞操作组成的 lazy seq.

6.35. pipe

(pipe from to)
(pipe from to close?)

从 from channel 中取出元素并提供给 to channel. 默认情况下, 当 from channel 关闭时, to channel 也会关闭, 但这可以由 close? 参数决定. 如果 to channel 关闭, 将停止 消费 from channel.

6.36. pipeline

(pipeline n to xf from)
(pipeline n to xf from close?)
(pipeline n to xf from close? ex-handler)

从 from channel 中取出元素并提供给 to channel, 经过 transducer xf 的处理, 并行度为 n. 因为 是并行的, transducer 将独立地应用于每个 元素, 而不是跨元素应用, 并且每个输入可能产生零个或多个输出. 输出将相对于输入按顺序返回. 默认情况下, 当 from channel 关闭时, to channel 也会关闭, 但这可以由 close? 参数决定. 如果 to channel 关闭, 将停止消费 from channel. 注意, 这应该 用于计算并行. 如果你有多个阻塞操作要 并行执行, 请改用 pipeline-blocking. 如果你有多个异步操作要并行执行, 请改用 pipeline-async. 关于 ex-handler 的语义, 请参见 chan.

6.37. pipeline-async

(pipeline-async n to af from)
(pipeline-async n to af from close?)

从 from channel 中取出元素并提供给 to channel, 经过异步函数 af 的处理, 并行度为 n. af 必须是一个接受两个参数的函数, 第一个是一个输入值, 第二个是一个用于放置结果的 channel. 假设 af 会立即返回, 已经启动了一些 异步操作, 其完成/回调会将结果放入 channel, 然后关闭它. 输出将相对于输入按顺序 返回. 默认情况下, 当 from channel 关闭时, to channel 也会关闭, 但这可以由 close? 参数决定. 如果 to channel 关闭, 将停止消费 from channel. 另见 pipeline, pipeline-blocking.

6.38. pipeline-blocking

(pipeline-blocking n to xf from)
(pipeline-blocking n to xf from close?)
(pipeline-blocking n to xf from close? ex-handler)

与 pipeline 类似, 用于阻塞操作.

6.39. poll!

(poll! port)

如果可以立即从 port 中取出一个值, 则取出. 从不阻塞. 如果成功, 返回值, 否则返回 nil.

6.40. promise-chan

(promise-chan)
(promise-chan xform)
(promise-chan xform ex-handler)

创建一个 promise channel, 可选地带有一个 transducer 和一个可选的 异常处理器. promise channel 只能接收一个值, 消费者将收到这个值. 一旦满了, puts 会完成但值会被丢弃 (不传输). 消费者将阻塞直到 channel 中放入一个值或 channel 关闭, 然后永远返回该值 (或 nil). 关于 xform 和 ex-handler 的语义, 请参见 chan.

6.41. Pub (protocol)

  1. members

6.41.1. sub*

(sub* p v ch close?)

6.41.2. unsub*

(unsub* p v ch)

6.41.3. unsub-all*

(unsub-all* p)
(unsub-all* p v)

6.42. pub

(pub ch topic-fn)
(pub ch topic-fn buf-fn)

创建并返回所提供 channel 的 pub(lication), 通过 topic-fn 将其划分为 topics. topic-fn 将应用于 channel 上的每个值, 结果将决定该值 将被放入的 'topic'. channels 可以通过 'sub' 订阅 以接收 topics 的副本, 并通过 'unsub' 取消订阅. 每个 topic 将由一个内部 mult 在一个专用 channel 上处理. 默认情况下, 这些内部 channels 是无缓冲的, 但可以提供一个 buf-fn, 该函数给定一个 topic, 会创建一个具有所需属性的缓冲区.

每个项目都并行且同步地分发给所有 subs, 即每个 sub 必须接受后才能分发下一个项目. 使用 缓冲/窗口化来防止慢的 subs 拖慢 pub.

当没有匹配的 subs 时收到的项目将被丢弃.

请注意, 如果使用 buf-fns, 则每个 topic 都被异步处理, 即如果一个 channel 订阅了多个 topic, 它不应期望它们的交错方式与源完全相同.

6.43. put!

(put! port val)
(put! port val fn1)
(put! port val fn1 on-caller?)

异步地将一个值放入 port, 完成时调用 fn1 (如果提供), 如果 port 已经关闭, 则传递 false. 不允许 nil 值. 如果 on-caller? (默认为 true) 为 true, 并且 put 被立即接受, 将在调用线程上调用 fn1.

fn1 可能在一个固定大小的分派线程池中运行, 不应 执行阻塞 IO, 包括 core.async 阻塞操作 (以 !! 结尾的).

除非 port 已经关闭, 否则返回 true.

6.44. reduce

(reduce f init ch)

f 应该是一个接受 2 个参数的函数. 返回一个 channel, 其中包含 将 f 应用于 init 和 channel 的第一个项, 然后将 f 应用于 该结果和第二个项, 依此类推的单一结果. 如果 channel 在没有产生项的情况下关闭, 则返回 init, 且 f 不被调用. ch 必须在 reduce 产生结果之前关闭.

6.45. sliding-buffer

(sliding-buffer n)

返回一个大小为 n 的缓冲区. 当满时, puts 将完成, 并且 被缓冲, 但缓冲区中最旧的元素将被丢弃 (不 传输).

6.46. solo-mode

(solo-mode mix mode)

设置 mix 的 solo 模式. mode 必须是 :mute 或 :pause 之一.

6.47. split

(split p ch)
(split p ch t-buf-or-n f-buf-or-n)

接受一个谓词和一个源 channel, 并返回一个包含两个 channels 的向量, 第一个 channel 将包含谓词 返回 true 的值, 第二个 channel 将包含谓词返回 false 的值.

默认情况下, 输出 channels 是无缓冲的, 或者可以 提供两个 buf-or-ns. 这些 channels 将在源 channel 关闭后关闭.

6.48. sub

(sub p topic ch)
(sub p topic ch close?)

将一个 channel 订阅到一个 pub 的一个 topic.

默认情况下, 当源关闭时, channel 也会关闭, 但这可以由 close? 参数决定.

6.49. take

(take n ch)
(take n ch buf-or-n)

返回一个 channel, 该 channel 最多将从 ch 返回 n 个项. 在 n 个项 被返回后, 或者 ch 被关闭后, 返回的 channel 将关闭.

默认情况下, 输出 channel 是无缓冲的, 除非给出 buf-or-n.

6.50. take!

(take! port fn1)
(take! port fn1 on-caller?)

异步地从 port 中取一个值, 传递给 fn1. 如果已关闭, 将传递 nil. 如果 on-caller? (默认为 true) 为 true, 并且值 立即可用, 将在调用线程上调用 fn1.

fn1 可能在一个固定大小的分派线程池中运行, 不应 执行阻塞 IO, 包括 core.async 阻塞操作 (以 !! 结尾的).

返回 nil.

6.51. tap

(tap mult ch)
(tap mult ch close?)

将 mult 源复制到提供的 channel 上.

默认情况下, 当源关闭时, channel 也会关闭, 但这可以由 close? 参数决定.

6.52. thread (macro)

(thread & body)

在另一个线程中执行 body, 立即返回到 调用线程. 返回一个 channel, 该 channel 将在 body 完成时接收其结果, 然后关闭.

6.53. thread-call

(thread-call f)
(thread-call f workload)

在另一个线程中执行 f, 立即返回到调用线程. 返回一个 channel, 该 channel 将在 f 调用完成时 接收其结果, 然后关闭. workload 是一个描述 f 所执行工作的关键字, 其中:

  • :io 可以进行阻塞 I/O, 但不能进行长时间的计算
  • :compute 绝不能阻塞
  • :mixed 其他任何情况 (默认)

当未提供 workload 时, 默认为 :mixed.

6.54. timeout

(timeout msecs)

返回一个在 msecs 后将关闭的 channel.

6.55. to-chan (deprecated in 1.2)

(to-chan coll)

已弃用 - 使用 to-chan! 或 to-chan!!.

6.56. to-chan!

(to-chan! coll)

创建并返回一个 channel, 其中包含 coll 的内容, 当耗尽时关闭.

如果访问 coll 可能会阻塞, 请改用 to-chan!!.

6.57. to-chan!!

(to-chan!! coll)

与 to-chan! 类似, 用于访问 coll 可能会阻塞的情况, 例如一个由阻塞操作组成的 lazy seq.

6.58. toggle

(toggle mix state-map)

原子地设置 mix 中一个或多个 channels 的状态. state map 是一个 channels -> channel-state-map 的 map. 一个 channel-state-map 是一个 attrs -> boolean 的 map, 其中 attr 是 :mute, :pause 或 :solo 中的一个或多个. 任何提供的状态都与 当前状态合并.

注意, channels 可以通过 toggle 添加到 mix 中, 这可以 用于以特定状态 (例如 paused) 添加 channels.

6.59. transduce

(transduce xform f init ch)

使用变换 (xform f) 对 channel 进行 async/reduce. 返回一个包含结果的 channel. ch 必须在 transduce 产生结果之前关闭.

6.60. unblocking-buffer?

(unblocking-buffer? buff)

如果使用 buff 创建的 channel 永远不会阻塞, 则返回 true. 也就是说, 向此缓冲区 put 永远不会导致缓冲区已满.

6.61. unmix

(unmix mix ch)

将 ch 从 mix 的输入中移除.

6.62. unmix-all

(unmix-all mix)

从 mix 中移除所有输入.

6.63. unsub

(unsub p topic ch)

从 pub 的一个 topic 中取消订阅一个 channel.

6.64. unsub-all

(unsub-all p)
(unsub-all p topic)

从 pub 或 pub 的一个 topic 中取消订阅所有 channels.

6.65. untap

(untap mult ch)

将目标 channel 从 mult 中断开连接.

6.66. untap-all

(untap-all mult)

将所有目标 channels 从 mult 中断开连接.

7. clojure.core.async.flow

注意 - Alpha 版本, 仍在开发中, 名称和其他细节可能会有变动

一个用于构建并发、事件驱动的数据处理流的库, 它使用无通信的函数, 同时集中化控制、报告、执行和错误处理. 构建于 core.async 之上.

顶层结构是 flow, 包含: 一组 processes (通常是线程) - 并发活动 一组在 processes 之间流入和流出数据的 channels 一组用于集中控制、报告、错误处理和执行 processes 的 channels

flow 库本身构建 processes、channels 和 flows. 用户提供配置数据和 process 逻辑 (step-fns) 来指定 flow 应该如何工作.

flow 是从 flow 配置数据构建的, 该数据定义了 processes 及其之间连接的有向图. Processes 描述其 I/O 需求, flow (库) 本身会创建 channels 并将它们传递给 请求它们的 processes. 详见 'create-flow'. flow 配置为有关 process 设置、线程、缓冲等 策略决策提供了一个集中的地方.

预计应用程序将很少定义 process 协议的实例, 而是使用 API 函数 'process', 该函数 以调用普通函数 (step-fns) 的方式实现 process 协议, 这些函数可能不包含通信或 core.async 代码. 通过这种方式, 该库帮助您实现应用程序逻辑 与其执行、通信、生命周期、错误处理和监控的严格分离.

请注意, 在几个地方, 库要求用户 为 processes、inputs、outputs 等提供 id. 这些应该是关键字. 当需要命名空间关键字时, 会明确说明. 本文档将库本身使用的各种关键字称为 ::flow/xyz, 其中 ::flow 是 clojure.core.async.flow 的别名.

Flows 支持 Clojure 'datafy' 协议以支持 可观察性. 另请参见 'ping' 和 'ping-proc' 函数, 以获取 processes 的实时视图.

在 flow 定义中, 一个 process 由一个 spi/ProcLauncher 的实现来表示, 该实现会启动它. 详情请参见 spi 文档.

7.1. create-flow

(create-flow config)

从提供的配置创建一个 flow: 一个包含键 :procs 和 :conns 的 map, 以及可选的 :mixed-exec/:io-exec/:compute-exec

:procs - 一个 pid->proc-def 的 map 其中 proc-def 是一个包含 :proc, :args, :chan-opts 键的 map

  • :proc - 启动一个 process 的函数
  • :args - 一个 param->val 的 map, 将传递给 process 的构造函数
  • :chan-opts - 一个 in-or-out-id->{:keys [buf-or-n xform]} 的 map, 其中 buf-or-n 和 xform 的含义与 core.async/chan 中的相同 默认值为 {:buf-or-n 10}
  • :conns - 一个 [[from-pid outid] [to-pid inid]] 元组的集合. 输入和输出支持多个连接. 当一个输出被 多次连接时, 每个连接都将收到每条消息, 就像 core.async/mult 一样.
  • :mixed-exec/:io-exec/:compute-exec -> ExecutorService

这些可用于指定相应工作负载要使用的 ExecutorService, 以代替库的默认设置.

注意: flow 不会自动启动. 请参见 'start'.

7.2. futurize

(futurize f & {:keys [exec], :or {exec :mixed}, :as opts})

接受一个函数 f, 并返回一个函数, 该函数接受与 f 相同的参数 并立即返回一个 future, 它会为 指定的工作负载启动一个线程, 或通过提供的 executor, 用这些参数调用 f, 并用其返回值完成该 future.

futurize 接受关键字参数选项: :exec - 工作负载之一 :mixed, :io, :compute 或一个 j.u.c.ExecutorService 对象, 默认为 :mixed

7.3. inject

(inject g [pid io-id :as coord] msgs)

异步地将消息放入与 process 的 输入或输出相对应的 channel 中, 返回一个 future, 该 future 将在完成时结束.

7.4. lift*->step

(lift*>;step f)

给定一个接受一个参数并返回一个非 nil 值集合的函数 f, 创建一个 process 所需的 step fn, 该 step fn 有一个输入和一个输出 (名为 :in 和 :out), 并且没有状态.

7.5. lift1->step

(lift1->step f)

与 lift*->step 类似, 但接受一个返回单个值的函数, 当该值为 nil 时将不产生输出.

7.6. map->step

(map->step {:keys [describe init transition transform]})

给定一个与 step fn arities 相对应的函数 map (见 'process'), 返回一个适合传递给 'process' 的 step fn. 您可以使用此 map 形式从不同的函数组合 proc 逻辑, 或利用某些入口点的可选性.

map 中的键是:

  • :describe, arity 0 - 必需
  • :init, arity 1 - 可选, 但如果 'describe' 返回 :params, 则应提供.
  • :transition, arity 2 - 可选
  • :transform, arity 3 - 必需

7.7. pause

(pause g)

暂停一个正在运行的 flow.

7.8. pause-proc

(pause-proc g pid)

暂停一个 process.

7.9. ping

(ping g & {:keys [timeout-ms], :or {timeout-ms 1000}})

ping 所有 processes, 返回一个 pid -> proc 状态和 state 的 map, 针对在 timeout-ms (默认 1000) 内回复的 procs.

7.10. ping-proc

(ping-proc g pid & {:keys [timeout-ms], :or {timeout-ms 1000}})

与 ping 类似, 但只 ping 指定的 process.

7.11. process

(process step-fn)
(process step-fn {:keys [workload compute-timeout-ms], :as opts})

给定一个具有四个 arities (0-3) 的函数, 即 'step-fn', 返回一个 launcher, 该 launcher 创建一个符合 process 协议的 process (参见 spi/ProcLauncher 文档).

step-fn 的可能 arities 是

  • 0 - 'describe', () -> description
  • 1 - 'init', (arg-map) -> initial-state
  • 2 - 'transition', (state transition) -> state'
  • 3 - 'transform', (state input msg) -> [state' output-map]

这是通过普通函数定义 processes 逻辑的核心设施. 使用一个持有 fn 的 var 作为 'step-fn' 是 定义 proc 的首选方法, 因为它能够在 flow 中 热重载 proc 逻辑, 并在 datafy 中提供更好的名称.

arity 0 - 'describe', () -> description 其中 description 是一个包含 :params :ins 和 :outs 键的 map, 每个键又是一个关键字到文档字符串的 map, 以及 :workload, 其可能值为 :mixed :io :compute. describe 返回的 map 中的所有条目都是可选的.

  • :params 描述了设置函数状态的初始参数.
  • :ins 枚举了输入, flow 将为其创建 channels.
  • :outs 枚举了输出, flow 可能为其创建 channels.
  • :workload - 描述了工作负载的性质, 是 :mixed :io 或 :compute 之一 :io 工作负载不应进行长时间的计算 :compute 工作负载永远不应阻塞

:ins 和 :outs 中不能有相同的键, 这样可以形成一个统一的 channel 坐标系 [:process-id :channel-id]. 返回的 ins/outs/params 将是 process 的 ins/outs/params. 用户可以调用 describe 来了解如何使用 proc. impl 也会调用它 以发现需要哪些 channels.

arity 1 - 'init', (arg-map) -> initial-state

init arity 将被 process 调用一次以建立任何初始 状态. arg-map 将是一个 param->val 的 map, 如 flow 定义中所提供. 将添加键 ::flow/pid, 映射到与 process 关联的 pid (例如, 如果 process 想要在 reply-to 坐标中引用自身, 这会很有用).

可选地, 返回的 init state 可能包含 键 ::flow/in-ports 和/或 ::flow/out-ports. 这些应该是 cid -> core.async.channel 的 map. cid 不得与 in/out id 冲突. 这些 channels 将成为 process 输入/输出集的一部分, 但在 flow 内部不可见/不可解析. Ports 是一种允许数据 从 flow 外部进入或离开的方式. 使用 :transition 来协调这些外部 channels 的生命周期.

可选地, 任何 返回的状态, 无论是来自 init、transition 还是 transform, 都可能包含键 ::flow/input-filter, 这是一个 cid 的谓词. 只有满足该谓词的输入 (包括 in-ports) 才会成为下一个 channel 读取集的一部分. 在没有此 谓词的情况下, 所有输入都会被读取.

arity 2 - 'transition', (state transition) -> state'

当 process 进行状态转换时, 将调用 transition arity, transition 是 ::flow/resume, ::flow/pause 或 ::flow/stop 之一.

通过这种方式, process impl 可以跟踪变化并协调 资源, 特别是在 :stop 时清理任何资源, 因为 process 在此之后将不再被使用. 详情请参见 SPI. state' 将是提供给后续调用的状态.

arity 3 - 'transform', (state in-name msg) -> [state' output] 其中 output 是 outid->[msgs*] 的 map

每次有消息到达任何一个输入时, 都会调用 transform arity. 输出可以发送到枚举的 :outs 中的零个、任意个或所有个, 和/或由 [pid inid] 元组命名的输入 (例如, 用于 reply-to), 和/או إلى ::flow/report 输出. 一个 step 不必 输出 (output 或 msgs 可以是空/nil), 然而输出 消息 永远不能是 nil (根据 core.async channels). state' 将是 提供给后续调用的状态.

process 还接受一个选项 map, 其中包含键: :workload - :mixed, :io 或 :compute 之一 :compute-timeout-ms - 如果 :workload 是 :compute, 此超时 (默认为 5000 毫秒) 将用于从 future 获取返回值 - 见下文

作为选项提供给 process 的 :workload 将覆盖 process 的 :describe fn 返回的任何 :workload. 如果两者 都未提供, 则默认为 :mixed.

在 :mixed 或 :io 的 :workload 上下文中, 这决定了 process 循环将在其中运行的线程类型, 包括其对 transform 的调用.

当指定 :io 时, transform 不应进行大量的计算.

当指定 :compute 时, 每次调用 transform 将在 一个单独的线程中运行. process 循环将在一个 :io 上下文中运行 (因为它 不再直接调用 transform, 它所做的只是 I/O), 它 将把 transform 提交给 :compute executor, 然后等待 (阻塞, 等待 compute-timeout-ms) executor 返回的 future 完成. 如果 future 超时, 将在 ::flow/error 上报告.

当指定 :compute 时, transform 绝不能阻塞!

7.12. resume

(resume g)

恢复一个已暂停的 flow.

7.13. resume-proc

(resume-proc g pid)

恢复一个 process.

7.14. start

(start g)

从初始值启动整个 flow. processes 开始时处于暂停状态. 调用 'resume' 或 'resume-proc' 来启动 flow. 返回一个包含以下键的 map:

:report-chan - 一个用于读取的 core.async chan. 'ping' 响应 将出现在这里, 任何来自 :transform 的显式 ::flow/report 输出 也会出现在这里.

:error-chan - 一个用于读取的 core.async chan. 在 flow 内部任何线程上 抛出的任何 (且仅有) 异常将出现在发送到此处的 map 中. 至少会有一个 ::flow/ex 条目, 包含异常, 并且可能根据错误的上下文有额外的键, 如 pid, state, status 等.

7.15. stop

(stop g)

关闭 flow, 停止所有 processes 并关闭 error 和 report channels. flow 可以再次启动.

8. clojure.core.async.flow.spi

8.1. ProcLauncher (protocol)

注意 - 定义 ProcLauncher 是一个高级功能, 普通使用该库时不需要. 此协议用于 创建无法用 ::flow/process 创建的新类型的 Processes.

ProcLauncher 是一个 process (活动线程) 的构造函数. 它有两个功能 - 描述 process 的参数和输入/输出 需求, 以及启动它. launcher 不应 获取任何资源, 也不应保留与已启动 process 的任何连接. launcher 可能会被要求 多次启动一个 process, 并且每次调用 start 时都应启动一个新 process.

启动的 process 必须遵守以下规则:

它必须有两个逻辑状态, :paused 和 :running. 在 :paused 状态下, 操作被暂停, 不产生任何输出.

当 process 启动时, 它必须是 :paused

每当 process 对任何 channel 进行读或写时, 都必须使用 alts!! 并包含对 ::flow/control channel 的读取, 并给予它 优先级.

通过 ::flow/control channel 发送的命令消息具有以下键:

  • ::flow/to - ::flow/all 或一个 process id
  • ::flow/command - ::flow/stop|pause|resume|ping 或 process-specific

它必须响应任何 (且仅) 其 ::flow/to 键是其 pid 或 ::flow/all 的控制消息. 它必须响应以下 ::flow/command 的值:

  • ::flow/stop - 所有资源都应被清理, 任何线程 都应正常退出 - 此后将不再使用 该 process.
  • ::flow/pause - 进入 :paused 状态
  • ::flow/resume - 进入 :running 状态并恢复处理
  • ::flow/ping - 向 ::flow/report channel 发送一个 ping 消息 (格式待定), 至少包含其 pid 和 status

一个 process 可以在其自己的命名空间中定义和响应其他命令.

一个 process 不应传输 channel 对象 (应使用 [pid io-id] 数据 坐标代替). 一个 process 不应关闭 channels.

最后, 如果一个 process 遇到错误, 它必须在 ::flow/error channel 上报告 (格式待定) 并尝试继续, 尽管它随后可能会收到一个它必须遵守的 ::flow/stop 命令.

8.2. describe

(describe p)

返回一个包含 :params, :ins 和 :outs 键的 map, 每个键又是一个从关键字到文档字符串的 map.

  • :params 描述了设置 process 状态的初始参数.
  • :ins 枚举了输入, 图将为其创建 channels.
  • :outs 枚举了输出, 图可能会为其创建 channels.

用户可以调用 describe 来了解如何使用 proc. impl 也会调用它以发现 需要哪些 channels.

8.3. start

(start p {:keys [pid args ins outs resolver]})

返回值被忽略, 调用是为了 启动 process (通常是启动其线程) 的效果.

其中:

  • :pid - process 在图中的 id, 以便例如它可以在控制、报告等中引用自身
  • :args - 一个 param->val 的 map, 如在图定义中提供
  • :ins - 一个 in-id->readable-channel 的 map, 加上 ::flow/control channel
  • :outs - 一个 out-id->writeable-channel 的 map, 加上 ::flow/error 和 ::flow/report channels 注意: 如果未连接, 输出可能为 nil
  • :resolver - 一个 spi/Resolver 的实现, 可用于 根据其逻辑 [pid cid] 坐标查找 channels, 以及 获取与逻辑 :mixed/:io/:compute 上下文相对应的 ExecutorServices.

8.4. Resolver (protocol)

8.4.1. get-exec

(get-exec _ context)

返回给定上下文的 ExecutorService, 上下文是 :mixed, :io, :compute 之一.

8.4.2. get-write-chan

(get-write-chan _ coord)

给定一个 [pid cid] 元组, 返回一个用于 写入的 core.async chan, 或者返回 nil (在这种情况下, 输出应该被丢弃, 例如没有东西被连接).

Author: 青岛红创翻译

Created: 2025-11-05 Wed 19:13