1 静态协议配置
基本示意图
两个Brokers是通过一个static的协议来网络链接的。一个Consumer链接到brokerB的一个地址上 ,当Producer在brokerA上以相同的地址发送消息时,此时它将被转移到brokerB上。也就是,BrokerA会转发消息到BrokerB上。
在brocker节点中配置,这样的话broker2的consumer可以消费broker1生产者的消息,但是反之不能。如果要实现双向通信需要配置duplex=”true”
<networkConnectors>
<networkConnector name="bridge" uri="static://(tcp://localhost:61616,tcp://192.168.249.201:61616)"/>
</networkConnectors>
这里我们用两台机器演示
broker1:192.168.249.200
broker2:192.168.249.201
1.1 broker1发消息,broker2收消息
如果没有配置,在broker1上发送消息,然后在broker2上消费,发现消费不到任何消息
然后我们在192.168.249.200做如下配置,重启服务
<networkConnectors>
<networkConnector name="bridge" uri="static://(tcp://localhost:61616,tcp://192.168.249.201:61616)"/>
</networkConnectors>
可以看到network那一栏有一个远程连接
这个时候我们在broker1上发消息
有10个消息未消费
然后在broker2上收消息
查看broker1的队列,10个消息已消费
broker1的network,有10个消息传输到网络
broker2的队列,有10个消息已消费
1.2 broker2发消息,beroker1收消息
然后我们在broker2上发消息
在broker1上收消息,发现收不到
然后我们在broker1的networkConnector添加duplex="true"
<networkConnectors>
<networkConnector name="bridge" duplex="true" uri="static://(tcp://localhost:61616,tcp://192.168.249.201:61616)"/>
</networkConnectors>
重启服务再消费可以成功消费到消息
1.3 networkConnector其他属性
1、name:默认是bridge
2、dynamicOnly:默认是false,如果为true, 持久订阅被激活时才创建对应的网路持久订阅。默认是启动时激活
3、decreaseNetworkConsumerPriority:默认是false。设定消费者优先权,如果为true,网络的消费者优先级降低
为-5。如果为false,则默认跟本地消费者一样为0
4、networkTTL :默认是1 ,网络中用于消息和订阅消费的broker数量
5、messageTTL :默认是1 ,网络中用于消息的broker数量
6、consumerTTL:默认是1 ,网络中用于消费的broker数量
7、conduitSubscriptions :默认true,是否把同一个broker的多个consumer当做一个来处理
8、dynamicallyIncludedDestinations :默认为空,要包括的动态消息地址,类似于excludedDestinations,如:
<dynamicallyIncludedDestinations>
<queue physicalName="include.test.foo"/>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>
9、staticallyIncludedDestinations :默认为空,要包括的静态消息地址。类似于excludedDestinations,如:
<staticallyIncludedDestinations>
<queue physicalName="always.include.queue"/>
</staticallyIncludedDestinations>
10)excludedDestinations :默认为空,指定排除的地址,示例如下:
<networkConnectors>
<networkConnector uri="static://(tcp://localhost:61617)"
name="bridge" dynamicOnly="false" conduitSubscriptions="true"
decreaseNetworkConsumerPriority="false">
<excludedDestinations>
<queue physicalName="exclude.test.foo"/>
<topic physicalName="exclude.test.bar"/>
</excludedDestinations>
<dynamicallyIncludedDestinations>
<queue physicalName="include.test.foo"/>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>
<staticallyIncludedDestinations>
<queue physicalName="always.include.queue"/>
<topic physicalName="always.include.topic"/>
</staticallyIncludedDestinations>
</networkConnector>
</networkConnectors>
11)duplex :默认false,设置是否能双向通信。
12)prefetchSize :默认是1000,持有的未确认的最大消息数量,必须大于0,因为网络消费者不能自己轮询消息。
13)suppressDuplicateQueueSubscriptions:默认false,如果为true, 重复的订阅关系一产生即被阻止。
14)bridgeTempDestinations :默认true,是否广播advisory messages来创建临时destination。
15)alwaysSyncSend :默认false,如果为true,非持久化消息也将使用request/reply方式代替oneway方式发送到远程broker。
16)staticBridge :默认false,如果为true,只有staticallyIncludedDestinations中配置的destination可以被处理。
2 消息回流功能
2.1 丢失的消息
如果有broker1和broker2通过networkConnector连接,有一个consumer1连接到broker1,一个consumer2连接到broker2,程序往broker1上面发送30条消息,这时consumer2连接到broker2消费消息,当consumer2消费了15条消息时,broker2挂掉了。 但是还剩下15条消息在broker2上面,这些消息就好像消息了,除非broker2重启了,然后有消费者连接到broker2上来消费消息,遇到这样的情况该怎么办呢?
2.2 消息回流配置
从5.6版本起,在destinationPolicy上新增的选择replayWhenNoConsumers,这个选项使得broker2上有需要转发的消息但是没有消费者时,把消息回流到它原来的broker1上,同时需要把enableAudit设置为false,为了防止消息回流后被当做重复消息而不被分发,activemq.xml配置如下
1)允许双向链接
设置networkConnector 节点的duplex属性为true
2)brocker配置文件添加如下配置
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" enableAudit="false">
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
2.3 代码测试
broker1发10条消息,broker2接收5条消息后停止消费
broker2的队列
这时候我们再用broker1的消费者消费消息发现消费不到。
然后我们在broker2上配置networkBridgeFilterFactory,重启后在进行测试,发现broker1可以消费broker2上未处理的消息。
同样,如果我们想在broker2消费broker1上未处理的消息在broker1上做该配置即可