February 6, 2022
By: eric

ActiveMQ

引言

用队列来协调异步,是我最近的一门功课,message-queue一定会丰富我对队列的理解.

快速创建一个试验田

mac安装ActiveMQ有很多方式,这种最顺利:

brew install activemq
activemq start
http://localhost:8161/admin/ admin admin

核心对象的创建关系

                     +-------------------+
                     | ConnectionFactory |
                     +-------------------+
                               |
                               v
                        +-------------+
                        | Connection  |
                        +-------------+
                               |
                               v                 +------------------+
  +-------------+       +-------------+          |    Destination   |
  |   Message   |<------|  Session    |-----+--->|------------------|
  +-------------+       +-------------+     |    |  Queue  | Topic  |
                                            |    +------------------+
                                            |
                                  +---------+----------+
                                  |                    |
                                  v                    v
                           +-------------+      +-------------+
                           | MsgProducer |      | MsgConsumer |
                           +-------------+      +-------------+

1对1的 Queue

(ns mq-queue
  (:import
   [javax.jms Connection MessageConsumer MessageProducer Queue Session TextMessage Topic]
   org.apache.activemq.ActiveMQConnectionFactory))

(def connection-factory (new org.apache.activemq.ActiveMQConnectionFactory))

(def connection (.createConnection connection-factory))

(.start connection)

(def session (.createSession connection false, javax.jms.Session/AUTO_ACKNOWLEDGE))

(def destination (.createQueue session "demo-queue"))

(def msg-producer (.createProducer session destination))

|------------+----------------------------+---------------------+-------------------+-------------------|
| Name       | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+----------------------------+---------------------+-------------------+-------------------|
| demo-queue |                          0 |                   0 |                 0 |                 0 |
|------------+----------------------------+---------------------+-------------------+-------------------|

(def message (.createTextMessage session "Jack"))

(.send msg-producer message) ;;此时一个consumer还没有呢.消息已经发到队列上了.

|------------+----------------------------+---------------------+-------------------+-------------------|
| Name       | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+----------------------------+---------------------+-------------------+-------------------|
| demo-queue |                          1 |                   0 |                 1 |                 0 |
|------------+----------------------------+---------------------+-------------------+-------------------|

(def msg-consumer-a (.createConsumer session destination)) ;; 创建一个consumer

|------------+----------------------------+---------------------+-------------------+-------------------|
| Name       | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+----------------------------+---------------------+-------------------+-------------------|
| demo-queue |                          1 |                   1 |                 1 |                 0 |
|------------+----------------------------+---------------------+-------------------+-------------------|


(def msg-received-by-a (.receive msg-consumer-a)) ;;consumer-a 取一次消息.

|------------+----------------------------+---------------------+-------------------+-------------------|
| Name       | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+----------------------------+---------------------+-------------------+-------------------|
| demo-queue |                          0 |                   1 |                 1 |                 1 |
|------------+----------------------------+---------------------+-------------------+-------------------|

;; (println (.getText msg-received-by-a))

(def msg-consumer-b (.createConsumer session destination)) ;;这时候pending是0.

|------------+----------------------------+---------------------+-------------------+-------------------|
| Name       | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+----------------------------+---------------------+-------------------+-------------------|
| demo-queue |                          0 |                   2 |                 1 |                 1 |
|------------+----------------------------+---------------------+-------------------+-------------------|


;; (def msg-received-by-b (.receive msg-consumer-b)) ;;这时候会阻塞在这里.


;; 那我们先放一个消息,再取呢?
;; (.send msg-producer message)
;; => nil producer再往queue里放一个msg

|------------+----------------------------+---------------------+-------------------+-------------------|
| Name       | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+----------------------------+---------------------+-------------------+-------------------|
| demo-queue |                          1 |                   2 |                 2 |                 1 |
|------------+----------------------------+---------------------+-------------------+-------------------|

;; (def msg-received-by-b (.receive msg-consumer-b))
;; 按理说这里应该能取出来啊,但是阻塞在这里了.

;; 那我们先放一个消息.再建立消费者呢? 还是一样的.
;; (.send msg-producer message)
;; (def msg-consumer-c (.createConsumer session destination))
;; (def msg-received-by-c (.receive msg-consumer-c))

;; (.close connection)

;; 小总结:
;; 1. pending,等待取的消息数.
;; 2. Messages Enqueued 曾经有多少信息放到过queue上.
;; 3. Messages Dequeued 曾经有多少信息从queue上取走过.
;; 4. 消息先发到queue上, 后注册的consumer也能拿到.
;; 5. 虽然queue的consumer可以有多个,但只有一个能得到消息.

1对n的 Topic

(ns mq-topic-demo-1
  (:import
   [javax.jms Connection MessageConsumer MessageProducer Queue Session TextMessage Topic]
   org.apache.activemq.ActiveMQConnectionFactory))

(def connection-factory (new org.apache.activemq.ActiveMQConnectionFactory))

(def connection (.createConnection connection-factory))

(.start connection)

(def session (.createSession connection false, javax.jms.Session/AUTO_ACKNOWLEDGE))

(def destination (.createTopic session "demo-topic"))

(def msg-producer (.createProducer session destination))

|------------+---------------------+-------------------+-------------------|
| Name       | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue |                   0 |                 0 |                 0 |
|------------+---------------------+-------------------+-------------------|

(def message (.createTextMessage session "Rose"))

(.send msg-producer message)

|------------+---------------------+-------------------+-------------------|
| Name       | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue |                   0 |                 1 |                 0 |
|------------+---------------------+-------------------+-------------------|

(def msg-consumer-a (.createConsumer session destination)) ;; 创建一个consumer

|------------+---------------------+-------------------+-------------------|
| Name       | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue |                   1 |                 1 |                 0 |
|------------+---------------------+-------------------+-------------------|

;; (def msg-received-by-a (.receive msg-consumer-a))
;;这时候取消息会阻塞.
;; (println (.getText msg-received-by-a))

小总结:
topic不像queue能收到订阅前发送的消息. 只能收到订阅后发送的消息.

(ns mq-topic-demo-2
  (:import
   [javax.jms Connection MessageConsumer MessageProducer Queue Session TextMessage Topic]
   org.apache.activemq.ActiveMQConnectionFactory))

(def connection-factory (new org.apache.activemq.ActiveMQConnectionFactory))

(def connection (.createConnection connection-factory))

(.start connection)

(def session (.createSession connection false, javax.jms.Session/AUTO_ACKNOWLEDGE))

(def destination (.createTopic session "demo-topic-2"))

(def msg-producer (.createProducer session destination))

|------------+---------------------+-------------------+-------------------|
| Name       | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue |                   0 |                 0 |                 0 |
|------------+---------------------+-------------------+-------------------|

;; 先注册消费者
(def msg-consumer-a (.createConsumer session destination)) ;; 创建一个consumer

|------------+---------------------+-------------------+-------------------|
| Name       | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue |                   1 |                 0 |                 0 |
|------------+---------------------+-------------------+-------------------|

;; 像queue一样,也可以注册多个消费者
(def msg-consumer-b (.createConsumer session destination))

|------------+---------------------+-------------------+-------------------|
| Name       | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue |                   2 |                 0 |                 0 |
|------------+---------------------+-------------------+-------------------|


;; 再发消息
(def message (.createTextMessage session "Rose"))
(.send msg-producer message)

|------------+---------------------+-------------------+-------------------|
| Name       | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue |                   2 |                 1 |                 0 |
|------------+---------------------+-------------------+-------------------|


;; 两个消费者都能收到同样的消息.
(def msg-received-by-a (.receive msg-consumer-a))
(.getText msg-received-by-a);; => "Rose"
|------------+---------------------+-------------------+-------------------|
| Name       | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue |                   2 |                 1 |                 1 |
|------------+---------------------+-------------------+-------------------|

(def msg-received-by-b (.receive msg-consumer-b))
(.getText msg-received-by-b);; => "Rose"
|------------+---------------------+-------------------+-------------------|
| Name       | Number Of Consumers | Messages Enqueued | Messages Dequeued |
|------------+---------------------+-------------------+-------------------|
| demo-queue |                   2 |                 1 |                 2 |
|------------+---------------------+-------------------+-------------------|


;; 小总结:
;; 1. 多个订阅者可以获得相同的消息
;; 2. 订阅者在管理页面的Subscribers中出现

其他

MQ的目的就是为了: 业务解耦/最终一致性/广播/错峰流控等 需要根据目标,学习如何配置.

可配置的项目很多,简单列几个:

  1. Queue还能持久化消息,这样就算mq服务出现故障,也能恢复.
  2. 消费者可以用receive同步监听,也可以setMessageListener异步监听.
  3. 可以设置签收机制,保障数据确保接收.
Tags: mq