February 6, 2022
By: eric
ActiveMQ
引言
用队列来协调异步,是我最近的一门功课,message-queue一定会丰富我对队列的理解.
快速创建一个试验田
mac安装ActiveMQ有很多方式,这种最顺利:
brew install activemq
activemq start
http://localhost:8161/admin/ admin admin
核心对象的创建关系
+-------------------+
| ConnectionFactory |
+-------------------+
|
v
+-------------+
| Connection |
+-------------+
|
v +------------------+
+-------------+ +-------------+ | Destination |
| Message |<------| Session |-----+--->|------------------|
+-------------+ +-------------+ | | Queue | Topic |
| +------------------+
|
+---------+----------+
| |
v v
+-------------+ +-------------+
| MsgProducer | | MsgConsumer |
+-------------+ +-------------+
1对1的 Queue
(ns mq-queue
(:import
[javax.jms Connection MessageConsumer MessageProducer Queue Session TextMessage Topic]
org.apache.activemq.ActiveMQConnectionFactory))
(def connection-factory (new org.apache.activemq.ActiveMQConnectionFactory))
(def connection (.createConnection connection-factory))
(.start connection)
(def session (.createSession connection false, javax.jms.Session/AUTO_ACKNOWLEDGE))
(def destination (.createQueue session "demo-queue"))
(def msg-producer (.createProducer session destination))
|------------+----------------------------+---------------------+-------------------+-------------------|
| Name | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+----------------------------+---------------------+-------------------+-------------------|
| demo-queue | 0 | 0 | 0 | 0 |
|------------+----------------------------+---------------------+-------------------+-------------------|
(def message (.createTextMessage session "Jack"))
(.send msg-producer message) ;;此时一个consumer还没有呢.消息已经发到队列上了.
|------------+----------------------------+---------------------+-------------------+-------------------|
| Name | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+----------------------------+---------------------+-------------------+-------------------|
| demo-queue | 1 | 0 | 1 | 0 |
|------------+----------------------------+---------------------+-------------------+-------------------|
(def msg-consumer-a (.createConsumer session destination)) ;; 创建一个consumer
|------------+----------------------------+---------------------+-------------------+-------------------|
| Name | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+----------------------------+---------------------+-------------------+-------------------|
| demo-queue | 1 | 1 | 1 | 0 |
|------------+----------------------------+---------------------+-------------------+-------------------|
(def msg-received-by-a (.receive msg-consumer-a)) ;;consumer-a 取一次消息.
|------------+----------------------------+---------------------+-------------------+-------------------|
| Name | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+----------------------------+---------------------+-------------------+-------------------|
| demo-queue | 0 | 1 | 1 | 1 |
|------------+----------------------------+---------------------+-------------------+-------------------|
;; (println (.getText msg-received-by-a))
(def msg-consumer-b (.createConsumer session destination)) ;;这时候pending是0.
|------------+----------------------------+---------------------+-------------------+-------------------|
| Name | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+----------------------------+---------------------+-------------------+-------------------|
| demo-queue | 0 | 2 | 1 | 1 |
|------------+----------------------------+---------------------+-------------------+-------------------|
;; (def msg-received-by-b (.receive msg-consumer-b)) ;;这时候会阻塞在这里.
;; 那我们先放一个消息,再取呢?
;; (.send msg-producer message)
;; => nil producer再往queue里放一个msg
|------------+----------------------------+---------------------+-------------------+-------------------|
| Name | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+----------------------------+---------------------+-------------------+-------------------|
| demo-queue | 1 | 2 | 2 | 1 |
|------------+----------------------------+---------------------+-------------------+-------------------|
;; (def msg-received-by-b (.receive msg-consumer-b))
;; 按理说这里应该能取出来啊,但是阻塞在这里了.
;; 那我们先放一个消息.再建立消费者呢? 还是一样的.
;; (.send msg-producer message)
;; (def msg-consumer-c (.createConsumer session destination))
;; (def msg-received-by-c (.receive msg-consumer-c))
;; (.close connection)
;; 小总结:
;; 1. pending,等待取的消息数.
;; 2. Messages Enqueued 曾经有多少信息放到过queue上.
;; 3. Messages Dequeued 曾经有多少信息从queue上取走过.
;; 4. 消息先发到queue上, 后注册的consumer也能拿到.
;; 5. 虽然queue的consumer可以有多个,但只有一个能得到消息.
1对n的 Topic
(ns mq-topic-demo-1
(:import
[javax.jms Connection MessageConsumer MessageProducer Queue Session TextMessage Topic]
org.apache.activemq.ActiveMQConnectionFactory))
(def connection-factory (new org.apache.activemq.ActiveMQConnectionFactory))
(def connection (.createConnection connection-factory))
(.start connection)
(def session (.createSession connection false, javax.jms.Session/AUTO_ACKNOWLEDGE))
(def destination (.createTopic session "demo-topic"))
(def msg-producer (.createProducer session destination))
|------------+---------------------+-------------------+-------------------|
| Name | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue | 0 | 0 | 0 |
|------------+---------------------+-------------------+-------------------|
(def message (.createTextMessage session "Rose"))
(.send msg-producer message)
|------------+---------------------+-------------------+-------------------|
| Name | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue | 0 | 1 | 0 |
|------------+---------------------+-------------------+-------------------|
(def msg-consumer-a (.createConsumer session destination)) ;; 创建一个consumer
|------------+---------------------+-------------------+-------------------|
| Name | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue | 1 | 1 | 0 |
|------------+---------------------+-------------------+-------------------|
;; (def msg-received-by-a (.receive msg-consumer-a))
;;这时候取消息会阻塞.
;; (println (.getText msg-received-by-a))
小总结:
topic不像queue能收到订阅前发送的消息. 只能收到订阅后发送的消息.
(ns mq-topic-demo-2
(:import
[javax.jms Connection MessageConsumer MessageProducer Queue Session TextMessage Topic]
org.apache.activemq.ActiveMQConnectionFactory))
(def connection-factory (new org.apache.activemq.ActiveMQConnectionFactory))
(def connection (.createConnection connection-factory))
(.start connection)
(def session (.createSession connection false, javax.jms.Session/AUTO_ACKNOWLEDGE))
(def destination (.createTopic session "demo-topic-2"))
(def msg-producer (.createProducer session destination))
|------------+---------------------+-------------------+-------------------|
| Name | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue | 0 | 0 | 0 |
|------------+---------------------+-------------------+-------------------|
;; 先注册消费者
(def msg-consumer-a (.createConsumer session destination)) ;; 创建一个consumer
|------------+---------------------+-------------------+-------------------|
| Name | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue | 1 | 0 | 0 |
|------------+---------------------+-------------------+-------------------|
;; 像queue一样,也可以注册多个消费者
(def msg-consumer-b (.createConsumer session destination))
|------------+---------------------+-------------------+-------------------|
| Name | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue | 2 | 0 | 0 |
|------------+---------------------+-------------------+-------------------|
;; 再发消息
(def message (.createTextMessage session "Rose"))
(.send msg-producer message)
|------------+---------------------+-------------------+-------------------|
| Name | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue | 2 | 1 | 0 |
|------------+---------------------+-------------------+-------------------|
;; 两个消费者都能收到同样的消息.
(def msg-received-by-a (.receive msg-consumer-a))
(.getText msg-received-by-a);; => "Rose"
|------------+---------------------+-------------------+-------------------|
| Name | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue | 2 | 1 | 1 |
|------------+---------------------+-------------------+-------------------|
(def msg-received-by-b (.receive msg-consumer-b))
(.getText msg-received-by-b);; => "Rose"
|------------+---------------------+-------------------+-------------------|
| Name | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue | 2 | 1 | 2 |
|------------+---------------------+-------------------+-------------------|
;; 小总结:
;; 1. 多个订阅者可以获得相同的消息
;; 2. 订阅者在管理页面的Subscribers中出现
其他
MQ的目的就是为了: 业务解耦/最终一致性/广播/错峰流控等 需要根据目标,学习如何配置.
可配置的项目很多,简单列几个:
- Queue还能持久化消息,这样就算mq服务出现故障,也能恢复.
- 消费者可以用receive同步监听,也可以setMessageListener异步监听.
- 可以设置签收机制,保障数据确保接收.