January 8, 2020
By: Kevin
core.async使用队列解决异步逻辑的例子
core.async是一个库, 解决了代码块(jvm上可以不在同一个线程里) 异步通讯的问题.
管道或者说队列是非常优秀的解耦工具. 可以用来解决比较复杂的逻辑.
下面是一个例子. 首先是js的一个版本
逻辑
- 扫码枪开始扫描, 等结果
- 如果4s没有结果,认为超时,退出
- 如果有结果, 进入50ms的读取循环, 读到的信息加到msg
- 直到超时

js实现
const { race, take, select, call } = require('redux-saga/effects')
function* read(ch) {
let result = yield take(ch) // 先等第一次的回复
while(true) {
const [resp, timeout] = yield race([
take(ch),
delay(50)
])
if (timeout) break
result += resp
}
return result.trim()
}
exports.scanWithTimeout = function* (timeout = 4000) {
const { port, respChannel } = yield select(state => state.scanner)
if (port === null) return 'Connection Lost!'
port.write('7T\r') // scan command
const [resp, fail] = yield race([
call(read, respChannel),
delay(timeout)
])
if (fail) {
port.write('7U\r') // shutdown light
throw new Error('Timeout!')
}
return resp
}
core.async的实现
可以在代码中加几个空格,观察下执行结果, 很好的模拟了可能的场景.
下面这段代码可以同时执行在cljs/clj, 行为上没有区别.
(require-macros '[clojure.core.async.macros :refer [go go-loop]])
(require '[cljs.core.async :refer [promise-chan chan timeout mult <! >!]])
;; 模拟关灯, 读取不成功时执行关灯操作
(defn- mock-light-off []
(print "关灯!!!"))
;; 模拟读取情况, 随机2000毫秒返回模拟消息
(defn- mock-read [c]
(go (do
(<! (timeout (rand-int 2000)))
(>! c "[msg head....]"))))
;; 模拟生成消息, 60ms之内,随机时间, 消息内容为 '-'
(defn- mock-msg [c]
(go (do
(<! (timeout (rand-int 55)))
(>! c "-"))))
;; 后续消息读取, 50ms读不到就结束,读到就增加到消息尾部, 然后继续读, 直到超时
(defn- mock-read-proceed [c v]
(go (>! c v))
(go-loop [msg ""]
(mock-msg c)
(let [[v q] (alts! [c (timeout 50)])]
(if (= c q)
(do (println "读到消息:" v)
(recur (str msg v)))
(println "读取结束, 最终结果为:" msg)))))
;; 读取函数, 模拟1000秒超时
(defn scan-with-time-out [c]
(mock-read c)
(go
(let [[v q] (alts! [c (timeout 1000)])]
(if (= c q)
(do (println "读到消息头:" v)
(mock-read-proceed c v))
(do (println "首次读取超时,操作结束")
(mock-light-off))))))
(let [c (chan)]
(scan-with-time-out c))