本文档实现将 RabbitMQ 消息从一个集群转发到另一个集群功能。

1. 功能说明

RabbitMQ Shovel plugin 接受队列上的消息,并将其转发到另一台服务器上的交换机,实现消息在不同集群间转发。

2. 安装部署

sudo rabbitmq-plugins enable rabbitmq_shovel
sudo rabbitmq-plugins enable rabbitmq_shovel_management

3. 使用方式

RabbitMQ Shovel 存在两种配置方式,Static Shovels 和 Dynamic Shovels。由于 Dynamic Shovels 存在重启对视 Shovel 配置问题,生成建议采用 Static Shovels 配置方案。

3.1 Static Shovels 配置实用

实现将 demo 环境 trove 的 RabbitMQ 队列(compute-1)转发到 controller RabbitMQ cluster(controller 节点)。 配置 compute-1 /etc/rabbitmq/rabbitmq.config,内容如下:

 {rabbitmq_shovel,
  [{shovels,
    [%% A named shovel worker.
     {trove_notifications_info_shovel,
      [

     %% List the source broker(s) from which to consume.
     %%
     {sources,
      [%% URI(s) and pre-declarations for all source broker(s).
       {brokers, ["amqp://"]},
       {declarations, [
         {'queue.declare', [ {queue, <<"notifications.info">>} ]},
         {'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
         {'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
       ]}
      ]},

     %% List the destination broker(s) to publish to.
     {destinations,
      [%% A singular version of the 'brokers' element.
       {brokers, ["amqp://rabbit_user:rabbitmq_pwd@rabbitmq-1/%2f",
                  "amqp://rabbit_user:rabbitmq_pwd@rabbitmq-2/%2f",
                  "amqp://rabbit_user:rabbitmq_pwd@rabbitmq-3/%2f"]},
       {declarations, [
         %% {'queue.declare', [ {queue, <<"notifications.info">>} ]},
         {'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]} %%,
         %% {'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
       ]}
      ]},

     %% Name of the queue to shovel messages from.
     %%
     {queue, <<"notifications.info">>},

     %% Optional prefetch count.
     %%
     %% {prefetch_count, 10},
     {prefetch_count, 1000},

     %% when to acknowledge messages:
     %% - no_ack: never (auto)
     %% - on_publish: after each message is republished
     %% - on_confirm: when the destination broker confirms receipt
     %%
     %% {ack_mode, on_confirm},
     {ack_mode, on_confirm},

     %% Overwrite fields of the outbound basic.publish.
     %%
     {publish_fields, [{exchange,    <<"trove">>}]},
                       %% {routing_key, <<"notifications.info">>}]},

     %% Static list of basic.properties to set on re-publication.
     %%
     {publish_properties, [{delivery_mode, 2}]}

     %% The number of seconds to wait before attempting to
     %% reconnect in the event of a connection failure.
     %%
     %% {reconnect_delay, 2.5}

     ]} %% End of my_first_shovel
    ]}
   %% Rather than specifying some values per-shovel, you can specify
   %% them for all shovels here.
   %%
   %% {defaults, [{prefetch_count,     0},
   %%             {ack_mode,           on_confirm},
   %%             {publish_fields,     []},
   %%             {publish_properties, [{delivery_mode, 2}]},
   %%             {reconnect_delay,    2.5}]}
  ]},

精简后,配置说明如下:

 {rabbitmq_shovel,
  [{shovels,
     {trove_notifications_info_shovel, %% shovel 名称
      [
     {sources, %% 消息源配置
       {brokers, ["amqp://"]}, %% 配置采用本地的 amqp,参考:http://www.rabbitmq.com/uri-spec.html
       {declarations, [
         {'queue.declare', [ {queue, <<"notifications.info">>} ]}, %% 声明队列
         {'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},  %% 声明交换机
         {'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}  %% bind 关系
       ]}
      ]},

     {destinations, %% 目的 RabbitMQ 集群配置
       {brokers, ["amqp://rabbit_user:rabbitmq_pwd@rabbitmq-1/%2f",
                  "amqp://rabbit_user:rabbitmq_pwd@rabbitmq-2/%2f",
                  "amqp://rabbit_user:rabbitmq_pwd@rabbitmq-3/%2f"]}, %% 改配置项配置目标RabbitMQ 集群的 list,仅一个生效,其中 %2f 表示 vhost “/”,支持broker/brokers两种配置。
       {declarations, [
         {'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]} %% 配置目标 exchange
       ]}
      ]},
     {queue, <<"notifications.info">>}, %% 
     {prefetch_count, 1000}, %% 转发时,最大的未 ack 数量,当达到这个数量时,将不在转发数据
     {ack_mode, on_confirm}, %% 每一个转发的消息,均需要确认
     {publish_fields, [{exchange,    <<"trove">>}]}, %% 配置目标 publish exchange
     {publish_properties, [{delivery_mode, 2}]} %% 转发的消息添加 header
    ]}
  ]},

重启服务

systemctl restart rabbitmq-server.service

3.2 Dynamic Shovels

注:采用该方式配置的 Shovel,重启集群会消失。

administrator tag 的用户登录,管理界面 -> admin -> Shovel Management -> Add a new shovel ->

配置参数如下:

name: trove_notification_info_shovel
Source: 
URI: amqp://
Queue: notifications.info
Destination:
URI: amqp://monitor:xiexianbin.cn@192.168.128.103/%2f
Queue: notifications.info
Prefetch count: 1000(默认值)
Reconnect delay: 0 (延迟时间)
Add forwarding headers: 是否启用 forwarding header

启用后,header 格式如下:

x-shovelled:	
shovelled-by:	rabbit@rabbitmq-101
shovel-type:	dynamic
shovel-name:	trove_shovel_2
shovel-vhost:	/
src-uri:	amqp://
dest-uri:	amqp://192.168.128.103/%2f
src-queue:	notifications.info
dest-queue:	notifications.info
Acknowledgement mode: On confirm
Auto-delete: never

点击保存,即可。

4. 高可用配置说明:

  1. 采用 Dynamic Shovels 仅需要为集群中的一个节点配置 Sholve 即可。由于 Dynamic Shovels 存在重启对视 Shovel 配置问题,生成建议采用 Static Shovels 配置方案。
  2. 采用 Static Shovels 需要为集群中的所有节点配置 Sholve,并需要重启服务。
  3. 在转发端的 destinations brokers 指定一个集群所有节点的配置信息,转发时,会随机选择一个可用连接作为同步目标。
  4. 新增 shovel 后,在 Queues 中会新增一个 amq.gen-yWaNGiAPlqtzpUD8YjqV6g 的 queue。

5. 参考文档

https://www.rabbitmq.com/shovel.html https://www.rabbitmq.com/shovel-dynamic.html https://www.rabbitmq.com/shovel-static.html https://www.rabbitmq.com/uri-spec.html

其他配置

版本一:仅配置queue

 {rabbitmq_shovel,
  [{shovels,
    [%% A named shovel worker.
     {trove_notifications_info_shovel,
      [

     %% List the source broker(s) from which to consume.
     %%
     {sources,
      [%% URI(s) and pre-declarations for all source broker(s).
       {brokers, ["amqp://"]},
       {declarations, [
         {'queue.declare', [ {queue, <<"notifications.info">>} ]}
       ]}
      ]},

     %% List the destination broker(s) to publish to.
     {destinations,
      [%% A singular version of the 'brokers' element.
       {broker, "amqp://rabbitmq_user:rabbitmq_pwd@192.168.128.103/%2f"},
       {declarations, [
         {'queue.declare', [ {queue, <<"notifications.info">>} ]}
       ]}
      ]},

     %% Name of the queue to shovel messages from.
     %%
     {queue, <<"notifications.info">>},

     %% Optional prefetch count.
     %%
     %% {prefetch_count, 10},
     {prefetch_count, 1000},

     %% when to acknowledge messages:
     %% - no_ack: never (auto)
     %% - on_publish: after each message is republished
     %% - on_confirm: when the destination broker confirms receipt
     %%
     %% {ack_mode, on_confirm},
     {ack_mode, on_confirm}

     %% Overwrite fields of the outbound basic.publish.
     %%
     %% {publish_fields, [{exchange,    <<"my_exchange">>},
     %%                   {routing_key, <<"from_shovel">>}]},

     %% Static list of basic.properties to set on re-publication.
     %%
     %% {publish_properties, [{delivery_mode, 2}]},

     %% The number of seconds to wait before attempting to
     %% reconnect in the event of a connection failure.
     %%
     %% {reconnect_delay, 2.5}

     ]} %% End of my_first_shovel
    ]}
   %% Rather than specifying some values per-shovel, you can specify
   %% them for all shovels here.
   %%
   %% {defaults, [{prefetch_count,     0},
   %%             {ack_mode,           on_confirm},
   %%             {publish_fields,     []},
   %%             {publish_properties, [{delivery_mode, 2}]},
   %%             {reconnect_delay,    2.5}]}
  ]},

版本二:配置 exchange + queue + 单节点

   {rabbitmq_shovel,
  [{shovels,
    [%% A named shovel worker.
     {trove_notifications_info_shovel,
      [

     %% List the source broker(s) from which to consume.
     %%
     {sources,
      [%% URI(s) and pre-declarations for all source broker(s).
       {brokers, ["amqp://"]},
       {declarations, [
         {'queue.declare', [ {queue, <<"notifications.info">>} ]},
         {'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
         {'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
       ]}
      ]},

     %% List the destination broker(s) to publish to.
     {destinations,
      [%% A singular version of the 'brokers' element.
       {broker, "amqp://rabbitmq_user:rabbitmq_pwd@192.168.128.103/%2f"},
       {declarations, [
         {'queue.declare', [ {queue, <<"notifications.info">>} ]},
         {'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
         {'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
       ]}
      ]},

     %% Name of the queue to shovel messages from.
     %%
     {queue, <<"notifications.info">>},

     %% Optional prefetch count.
     %%
     %% {prefetch_count, 10},
     {prefetch_count, 1000},

     %% when to acknowledge messages:
     %% - no_ack: never (auto)
     %% - on_publish: after each message is republished
     %% - on_confirm: when the destination broker confirms receipt
     %%
     %% {ack_mode, on_confirm},
     {ack_mode, on_confirm},

     %% Overwrite fields of the outbound basic.publish.
     %%
     {publish_fields, [{exchange,    <<"trove">>},
                       {routing_key, <<"notifications.info">>}]},

     %% Static list of basic.properties to set on re-publication.
     %%
     {publish_properties, [{delivery_mode, 2}]}

     %% The number of seconds to wait before attempting to
     %% reconnect in the event of a connection failure.
     %%
     %% {reconnect_delay, 2.5}

     ]} %% End of my_first_shovel
    ]}
   %% Rather than specifying some values per-shovel, you can specify
   %% them for all shovels here.
   %%
   %% {defaults, [{prefetch_count,     0},
   %%             {ack_mode,           on_confirm},
   %%             {publish_fields,     []},
   %%             {publish_properties, [{delivery_mode, 2}]},
   %%             {reconnect_delay,    2.5}]}
  ]},

版本二:配置 exchange + queue + 集群

   {rabbitmq_shovel,
  [{shovels,
    [%% A named shovel worker.
     {trove_notifications_info_shovel,
      [

     %% List the source broker(s) from which to consume.
     %%
     {sources,
      [%% URI(s) and pre-declarations for all source broker(s).
       {brokers, ["amqp://"]},
       {declarations, [
         {'queue.declare', [ {queue, <<"notifications.info">>} ]},
         {'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
         {'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
       ]}
      ]},

     %% List the destination broker(s) to publish to.
     {destinations,
      [%% A singular version of the 'brokers' element.
       {brokers, ["amqp://rabbitmq_user:rabbitmq_pwd@rabbitmq-1/%2f",
                       "amqp://rabbitmq_user:rabbitmq_pwd@rabbitmq-2/%2f",
                       "amqp://rabbitmq_user:rabbitmq_pwd@rabbitmq-3/%2f"]},
       {declarations, [
         {'queue.declare', [ {queue, <<"notifications.info">>} ]},
         {'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
         {'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
       ]}
      ]},

     %% Name of the queue to shovel messages from.
     %%
     {queue, <<"notifications.info">>},

     %% Optional prefetch count.
     %%
     %% {prefetch_count, 10},
     {prefetch_count, 1000},

     %% when to acknowledge messages:
     %% - no_ack: never (auto)
     %% - on_publish: after each message is republished
     %% - on_confirm: when the destination broker confirms receipt
     %%
     %% {ack_mode, on_confirm},
     {ack_mode, on_confirm},

     %% Overwrite fields of the outbound basic.publish.
     %%
     {publish_fields, [{exchange,    <<"trove">>},
                       {routing_key, <<"notifications.info">>}]},

     %% Static list of basic.properties to set on re-publication.
     %%
     {publish_properties, [{delivery_mode, 2}]}

     %% The number of seconds to wait before attempting to
     %% reconnect in the event of a connection failure.
     %%
     %% {reconnect_delay, 2.5}

     ]} %% End of my_first_shovel
    ]}
   %% Rather than specifying some values per-shovel, you can specify
   %% them for all shovels here.
   %%
   %% {defaults, [{prefetch_count,     0},
   %%             {ack_mode,           on_confirm},
   %%             {publish_fields,     []},
   %%             {publish_properties, [{delivery_mode, 2}]},
   %%             {reconnect_delay,    2.5}]}
  ]},

版本四:

   {rabbitmq_shovel,
  [{shovels,
    [%% A named shovel worker.
     {trove_notifications_info_shovel,
      [

     %% List the source broker(s) from which to consume.
     %%
     {sources,
      [%% URI(s) and pre-declarations for all source broker(s).
       {brokers, ["amqp://"]},
       {declarations, [
         {'queue.declare', [ {queue, <<"notifications.info">>} ]},
         {'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
         {'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
       ]}
      ]},

     %% List the destination broker(s) to publish to.
     {destinations,
      [%% A singular version of the 'brokers' element.
       {broker, "amqp://monitor:xiexianbin.cn@192.168.128.103/%2f"},
       {declarations, [
         %% {'queue.declare', [ {queue, <<"notifications.info">>} ]},
         {'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]} %% ,
         %% {'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
       ]}
      ]},

     %% Name of the queue to shovel messages from.
     %%
     {queue, <<"notifications.info">>},

     %% Optional prefetch count.
     %%
     %% {prefetch_count, 10},
     {prefetch_count, 1000},

     %% when to acknowledge messages:
     %% - no_ack: never (auto)
     %% - on_publish: after each message is republished
     %% - on_confirm: when the destination broker confirms receipt
     %%
     %% {ack_mode, on_confirm},
     {ack_mode, on_confirm},

     %% Overwrite fields of the outbound basic.publish.
     %%
     %% {publish_fields, [{exchange,    <<"trove">>},
     %%                   {routing_key, <<"notifications.info">>}]},
     {publish_fields, [{exchange,    <<"trove">>}]},

     %% Static list of basic.properties to set on re-publication.
     %%
     {publish_properties, [{delivery_mode, 2}]}

     %% The number of seconds to wait before attempting to
     %% reconnect in the event of a connection failure.
     %%
     %% {reconnect_delay, 2.5}

     ]} %% End of my_first_shovel
    ]}
   %% Rather than specifying some values per-shovel, you can specify
   %% them for all shovels here.
   %%
   %% {defaults, [{prefetch_count,     0},
   %%             {ack_mode,           on_confirm},
   %%             {publish_fields,     []},
   %%             {publish_properties, [{delivery_mode, 2}]},
   %%             {reconnect_delay,    2.5}]}
  ]},

完毕。