Manifold
Manifold-ABC
简介
Manifold是一个异步队列库,目标是可以统一各种异步队列库. (比如:BlockingQueues, core.async's channel)
(ns learn-manifold.introduction
(:require [clojure.core.async :as a]
[manifold.stream :as s]))
;; 转化core.async的channle到manifold的stream
(def c1 (a/chan))
(def s1 (s/->source c1))
(a/go (a/>! c1 1))
@(s/take! s1);; => 1
;; 连接manifold的stream到core.async.的channel
(def s2 (s/stream))
(def c2 (a/chan))
(s/connect s2 c2)
(s/put! s2 10);; => #<SuccessDeferred@1efc7245: true>
(a/<!! c2);; => 10
核心抽象是deferreds和streams.
类比理解
可以对比数组'(1 2 3)来理解这两个概念. stream就是list ,deferred就是list-item(但不是实现的1 2 3 而是promise(1) promise(2)).
(-> 1 inc dec) (d/chain (Promise. 1) inc dec)
(->> [1 2 3] (map inc) (map dec)) (->> (s/stream) (s/map inc) (s/map dec)
数组是空间上的序列,流是时间上的序列.(流也可以有buffer.)
可以把stream比喻成'外卖柜',会有一系列外卖送到'外卖柜'里,但不知道什么时间和频率到.
stream组成部分
一个stream其实内部分为两截, [sink source] source 可以 take!, try-take!, and close! sink 可以 put!, try-put!, and close!
两个stream的连接也是source -> sink
stream的使用
对stream可以:
- 简单放取
- 注册消费者
- 对接到另一个stream
- 生成衍生stream
1. 简单放/取
(ns learn-manifold.core
(:require [manifold.stream :as s]))
(def s (s/stream)) ;; => #'learn-manifold.core/s
(s/put! s 1) ;; => #<Deferred@4efc48c9: :not-delivered> ;; put给了stream里的sink
(s/take! s) ;; => #<SuccessDeferred@6b238e3b: 1> ;; 从stream里的source取的东西
(s/take! s) ;; => #<Deferred@1b3d2208: :not-delivered>
@(s/try-take! s ::drained 1000 ::timeout) ;; => :learn-manifold.core/timeout ;; 如果不用try-take!怎会阻塞在这里等待.
(s/close! s) ;; => nil ;;关闭流
@(s/put! s 1) ;; => false 当关闭后,便无法放东西.
;; nil 也可以放进去
@(s/take! s ::drained) ;; => :learn-manifold.core/drained ;; take往往接一个默认值::dreained,表示真正没有数据了. stream里是可以放nil的.
2. 注册消费者
(ns learn-manifold.consume
(:require [manifold.stream :as s]))
(def s (s/stream))
(s/consume #(prn "got msg :" %) s)
;; => #<Deferred@c7c41ad: :not-delivered>
@(s/put! s 1);; => true
(s/downstream s);; => ([nil << sink: {:type "callback"} >>])
@(s/put! s 2);; => true
@(s/put! s 3);; => true
s;; => << stream: {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 1, :buffer-capacity 0, :source? true} >>
(s/try-take! s :default-val 1000 :timeout-value);; => #<Deferred@7f6bd875: :not-delivered>
(s/put! s 4);; => #<SuccessDeferred@49d0b671: true>
(def s2 (s/stream))
(s/consume #(prn :hoho %) s2)
(s/put! s2 "a")
(s/put! s2 "b")
(s/consume #(prn :haha %) s2)
(s/downstream s2)
;; => ([nil << sink: {:type "callback"} >>] [nil << sink: {:type "callback"} >>])
(s/put! s2 "c");; => #<SuccessDeferred@49d0b671: true>
3. connect连接
(ns learn-manifold.direct-connect
(:require [manifold.stream :as s]))
(def a (s/stream))
(def b (s/stream))
(s/connect a b)
(s/put! a 1)
(s/take! b);; => #<SuccessDeferred@6aba395c: 1>
4. 衍生连接
(ns learn-manifold.transform
(:require [manifold.stream :as s]))
(def s (s/stream))
(def a (s/map inc s))
(def b (s/map dec s))
@(s/put! s 0);; => true
@(s/take! a);; => 1
@(s/take! b);; => -1
;; 类似的stream-transfomer还有 zip/reduce/buffer/batch/throttle等
连接后,上下游会互相影响
先看连接前,stream的状态
(ns learn-manifold.stream-status
(:require [manifold.stream :as s]))
(def s (s/stream));; => #'learn-manifold.stream-status/s
;; s
;; => << stream:
;; {
;; :type "manifold",
;; :sink? true,
;; :source? true
;; :pending-puts 0, ;; 门外排队等着放进来.
;; :pending-takes 0, ;; 门外排队等着取货.
;; :closed? false, ;; 能放.
;; :drained? false, ;; 关了,肚子里可能还有东西.不能放了,但还能取.
;; :permanent? false, ;; 子关,父也不关.
;; :buffer-size 0,
;; :buffer-capacity 0,
;; } >>
(let [s (s/stream)]
(s/drained? s)
(s/put! s 1)
(s/close! s)
(s/drained? s)) ;; => false
(let [s (s/stream)]
(s/drained? s)
(s/put! s 1)
(s/close! s)
(s/take! s)
);; => #<SuccessDeferred@ed153d4: 1>
(let [s (s/stream)]
(s/drained? s)
(s/put! s 1)
(s/close! s)
(s/take! s)
(s/drained? s)
);; => true
各种连接情况对stream状态的影响
(ns learn-manifold.stream-status-relation
(:require [manifold.stream :as s]))
;; 上游关了,下游就都跟着关了.
;;; connect下游
(def 源头 (s/stream))
(def 下游1 (s/stream))
(def 下游2 (s/stream))
(s/connect 源头 下游1);; => nil
(s/connect 源头 下游2);; => nil
(s/close! 源头);; => nil
源头;; => << stream: {:pending-puts 0, :drained? true, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>
下游1;; => << stream: {:pending-puts 0, :drained? true, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>
下游2;; => << stream: {:pending-puts 0, :drained? true, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>
;;; s/map这种创造衍生下游
(def 源头2 (s/stream));; => #'learn-manifold.stream-status-relation/源头2
(def 下游21 (s/map inc 源头2));; => #'learn-manifold.stream-status-relation/下游21
(def 下游22 (s/map dec 源头2));; => #'learn-manifold.stream-status-relation/下游22
(s/close! 源头);; => nil
源头;; => << stream: {:pending-puts 0, :drained? true, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>
下游1;; => << stream: {:pending-puts 0, :drained? true, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>
下游2;; => << stream: {:pending-puts 0, :drained? true, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>
;; 下游都关了.父也就关了,但是父的反应要慢一拍.
(def 源头3 (s/stream)) ;; => #'learn-manifold.stream-status-relation/源头3
(def 下游31 (s/stream));; => #'learn-manifold.stream-status-relation/下游31
(def 下游32 (s/stream));; => #'learn-manifold.stream-status-relation/下游32
(s/connect 源头3 下游31);; => nil
(s/connect 源头3 下游32);; => nil
(s/close! 下游31);; => nil
下游31;; => << stream: {:pending-puts 0, :drained? true, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>
源头3;; => << stream: {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 1, :buffer-capacity 0, :source? true} >>
(s/close! 下游32);; => nil
下游32;; => << stream: {:pending-puts 0, :drained? true, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>
源头3;; => << stream: {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 1, :buffer-capacity 0, :source? true} >> ;;此时还不知道下游都关闭了.
(s/put! 源头3 10);; => #<SuccessDeferred@1efc7245: true> ;; 这时候才知道.
源头3;; => << stream: {:pending-puts 0, :drained? true, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>
;;; 某个下游是核心下游.
(s/stream {:parent true})
;; 水不够,和水满的情况. 中间管道关闭后,对上下游的影响.
(def 上 (s/stream))
(def 中 (s/stream))
(def 下 (s/stream))
(s/connect 上 中)
(s/connect 中 下)
上;; => << stream: {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 1, :buffer-capacity 0, :source? true} >>
中;; => << stream: {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 1, :buffer-capacity 0, :source? true} >>
下;; => << stream: {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
(s/put! 上 1);; => #<SuccessDeferred@1efc7245: true>
上;; => << stream: {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 1, :buffer-capacity 0, :source? true} >>
中;; => << stream: {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
下;; => << stream: {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
(s/put! 上 2);; => #<SuccessDeferred@1efc7245: true>
上;; => << stream: {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
中;; => << stream: {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
下;; => << stream: {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
(s/put! 上 3);; => #<Deferred@99e8b10: :not-delivered>
上;; => << stream: {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
中;; => << stream: {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
下;; => << stream: {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
(s/put! 上 4);; => #<Deferred@7cf0a176: :not-delivered>
上;; => << stream: {:pending-puts 2, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
中;; => << stream: {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
下;; => << stream: {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
(s/close! 中);; => nil
上;; => << stream: {:pending-puts 2, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
中;; => << stream: {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>
下;; => << stream: {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
(s/put! 上 5);; => #<Deferred@3b0c8ee9: :not-delivered>
上;; => << stream: {:pending-puts 3, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
中;; => << stream: {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>
下;; => << stream: {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
(s/put! 上 6);; => #<Deferred@69bd0634: :not-delivered> ;; not-delivered 没有下游.
上;; => << stream: {:pending-puts 4, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
中;; => << stream: {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>
下;; => << stream: {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
(s/put! 中 7);; => #<SuccessDeferred@41a8b4c6: false> ;; 关闭了.false
上;; => << stream: {:pending-puts 4, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
中;; => << stream: {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>
下;; => << stream: {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>
;; 推不动,backpressure就无法传递信息, 最下面加个水龙头, 就都变成closed了.
(s/consume prn 下) ;; => #<Deferred@2ff6dabe: true>
上;; => << stream: {:pending-puts 2, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>
中;; => << stream: {:pending-puts 0, :drained? true, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>
下;; => << stream: {:pending-puts 0, :drained? true, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>
;; 这样的话,发现不了问题出现在上中下哪个.
;; put take backpressure
套路
stream-handler(感觉和consume同质)
(defn stream-handler
"注册函数`f`到某个`stream`"
[f stream]
(d/loop []
(d/chain (s/take! stream ::drained)
(fn [msg]
(if (identical? ::drained msg)
::drained
(f msg)))
(fn [result]
(when-not (identical? ::drained result)
(d/recur))))))
wrap-duplex-stream
(ns mainifold.taolu
(:require [manifold.stream :as s]))
(defn wrap-duplex-stream
[protocol s]
(let [out (s/stream)]
(s/connect
(s/map #(io/encode protocol %) out) ;; map之后,得到的不是一个stream,而是一个sourceProxy (不是stream,不是sink)
s)
(s/splice
out
(io/decode-stream s protocol)) ))
结合我们的业务
源头来自tcp-socket我们用aleph.tcp/client返回stream 源头来自数据库 用s/periodically生成一个stream 源头来自文件 用aleph.tcp/start-server生成一个stream 水龙头是ws-conn 用aleph/websocket-connection返回一个stream
其他
能演示的
筛选filter 限速throttle 等齐多路信息,再进入下游管道.zip
把目前工作中的管道注册下来,context. 以便查找问题和管道复用.
我还需增加经验才能演示的
快速定位问题,看哪里. 复用导致的问题(看context也难发现的问题). SourceProxy(s/map)