Home Archives Categories Tags Docs

RabbitMQ 介绍

发布时间: 更新时间: 总字数:3681 阅读时间:8m 作者: 分享

RabbitMQ是重要的消息队列组件,OpenStack采用RabbitMQ做消息交互的重要组件之一。

CentOS7上安装rabbitmq

sudo yum install rabbitmq-server -y

启动

sudo service rabbitmq-server start

常用端口

  • 4369 (epmd), 25672 (Erlang distribution)
  • 5672, 5671 (AMQP 0-9-1 without and with TLS)
  • 15672 (if management plugin is enabled)
  • 61613, 61614 (if STOMP is enabled)
  • 1883, 8883 (if MQTT is enabled)


概念

RabbitMQ 实现了 AMQP 0-9-1[Advanced Message Queuing Protocol],

其中重要的概念是 交换机[Exchange] 和 队列[Queue],Exchange 和 Queue 通过 绑定[Bind] 操作进行绑定。

生产者[Producer] 不需要关注 消息[Message] 实际被 路由[Routing] 到哪个队列,而 消费者[Consumer] 也不需要关注 Message 是从哪个 Exchange 接收的。

Exchange 分为 直连[direct]交换机、扇形[fanout]交换机、主题[topic]交换机、头[headers]交换机。

  • direct exchange 与 queue 绑定时需要指定一个 routing_key,消息被发送到 direct exchange 时也需要指定一个 routing_key,消息会被分发到具有相同 routing_key 绑定的 queue 中;

  • fanout exchange 是一个广播型的交换机,发送到 fanout exchange 的消息被分发到与之绑定的所有 queue 中;

  • topic exchange 是更复杂的 direct exchange,它的 routing_key 必须是以 . 分隔的几个字符串,一般是以多个维度分隔 (例如日志系统中以 “应用.级别” [shop.info]来制定),topic exchange 与 queue 绑定时可以指定通配的 routing_key,* 表示一个单词,# 表示任意数量的单词;

  • headers exchange 不通过 routing_key 来建立与 queue 的绑定,而是使用消息的属性来进行分发。

访问用户

guest用户被默认创建,默认在localhost可以用guest/guest来访问。

rabbitmqctl命令可以CRUD用户,见命令help

rabbitmqctl set_user_tags user_admin administrator
# rabbitmqctl set_user_tags user_admin monitoring policymaker
rabbitmqctl set_permissions -p / user_admin ".*" ".*" ".*"

角色

RabbitMQ的用户角色分类:
none、management、policymaker、monitoring、administrator

none

不能访问 management plugin

management

用户可以通过AMQP做的任何事外加:
- 列出自己可以通过AMQP登入的virtual hosts
- 查看自己的virtual hosts中的queues, exchanges 和 bindings
- 查看和关闭自己的channels 和 connections
- 查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动

policymaker

management可以做的任何事外加:
- 查看、创建和删除自己的virtual hosts所属的policies和parameters

monitoring

management可以做的任何事外加:

  • 列出所有virtual hosts,包括他们不能登录的virtual hosts
  • 查看其他用户的connections和channels
  • 查看节点级别的数据如clustering和memory使用情况
  • 查看真正的关于所有virtual hosts的全局的统计信息

administrator

policymaker和monitoring可以做的任何事外加:
- 创建和删除virtual hosts
- 查看、创建和删除users
- 查看创建和删除permissions
- 关闭其他用户的connections

日志

位于/var/log/rabbitmq

ulimit设置

We recommend allowing for at least 65536 file descriptors for user rabbitmq in production environments. 4096 should be sufficient for most development workloads

验证ulimit

cat /proc/$RABBITMQ_BEAM_PROCESS_PID/limits 或者 rabbitmqctl status

其中RABBITMQ_BEAM_PROCESS_PID的值可以从rabbitmqctl status中获取

注意事项

  • pika是rabbitmq官方推荐的python客户端
  • queue没有边界,是一个无限大的缓存。
  • producer的代码逻辑:
    • 先获得connection对象
    • 然后就获得channel对象
    • 然后queue_declare创建queue
    • basic_publish,参数routing_key指定了queue的名字,exchange指定了exchange的名字
    • connection.close()会flush缓存,然后关闭连接。
  • consumer的代码逻辑:
    • 先获得connection对象
    • 然后就获得channel对象
    • 然后queue_declare创建queue(是幂等的,没事)
    • 定义callback函数
    • basic_consume,参数queue指定queue的名字,callback传入当作回调,no_ack指定是否要ack
    • start_consuming,开始无限循环的等待。
  • 如果向一个不存在的queue发送message,会被丢弃。所以需要创建queue(queue_declare)
  • 创建queue是幂等的,多次创建不要紧
  • 如果一个queue有多个consumers,rabbitmq默认会round-robin地发送给consumers,每个consumer接受到的总message大致相同
  • 在rabbitmq内部,一个message绝对不会直接发送给queue,而是先到exchange处理
  • Receiving messages from the queue需要注册一个callback函数
  • 为了防止consumer出现异常而导致message丢失,应该使用acknowledgments。
  • 如果consumer死了(connection断、channel断、tcp断)并且没有发回ack,rabbitmq将会把message放回queue中不会删除,如果该queue有其他consumer,那么会立即发送给其他consumer,这样保证message不丢失。
  • 如果consumer一直不ack,rabbitmq将会很危险,内存使用会越来越高。
  • queue_declare时,一定要设置durable为True,否则rabbitmq一重启queue就没了
  • ack是默认开启的
  • 虽然可以将queue设置为持久化(durable),但也不能完全保证,因为rabbitmq不是每个message都fsync的,而是先到cache中,不一定被落到磁盘上
  • rabbitmq只管把queue中的messages往多个consumers扔,但不知道哪个consumer很忙,哪个很闲,可以使用channel.basic_qos(prefetch_count=1),意思是告诉rabbitmq,在consumer ack一个message之前,不要再给这个consumer发message,缺点是:如果consumers都很忙,rabbitmq中的message将堆积。
  • 如果发送时没指定exchange的名字,则会发给routing_key对应的名字
  • queue_bind方法可以做exchange和queues之间的bind操作
  • sudo rabbitmqctl list_exchanges 结果显示的amq.开头的是默认创建的。
  • consumer和rabbitmq之间是长连接。
  • 如果queue中有10条消息,2个consumers,如果其中一个consumer停掉又启动的话,也不会接收到现有的消息,现有的消息已被第1个consumer承包,为了避免这个情况出现,可以设置prefetch大小,这样每个consumer只能承包这么多的消息,而不是所有消息,好让其他的consumer也能处理现有message,如果有一个consumer hang住,并已占据一些message的话,可以重启这个consumer即可。
  • 如果多个consumer监听同一个queue,会round-robin收到,如果想一个message同时被多个consumer收到,必须有多个对应的queue,exchange的类型设置为fanout
  • durable和exclusive如果同时用的话,会如何?
  • durable为false时,如果broker重启,queue则会消失。
  • 什么情况下用topic?openstack中,由于compute node会变化,producer可以写死exchange类型为topic,动态的通过routing_key来指定。
  • 何时通过rest api,何时通过rabbitmq?如果team与team中间不可信,则需要通过rest api,rest api有权限验证机制。
  • rabbitmq是cpu、内存密集型的


HA

  • 每台机器都要有相同的cookie!
  • HA分为service HA和Message HA
  • Message HA可以通过mirror queue实现,也可以通过共享存储实现

exchange的类型

exchange会决定一个message到底是发往特定的queue,还是所有queue。这是通过exchange type的设置来决定,有4种:

  • direct
    • producer设置exchange的routing key,consumer bind时指定该routing key。可以做到通过同一个exchange来达到不同的queue
      ,direct比fanout粒度更细一点
  • topic
    • 发送给topic的routing_key不能随意写,有规则,例如stock.usd.nyse,可以通过通配符的方式来分类。
    • *表示一个word
    • #表示0或多个word
  • headers
  • fanout
    • exchange将它收到的messages扔到后端所有的queue中

exchange_declare用来创建exchange

plugins

rabbitmq支持很多plugins

  • rabbitmq-plugins enable plugin-name
  • rabbitmq-plugins disable plugin-name
  • rabbitmq-plugins list

例如: 启动管理界面,重启rabbitmq后,访问 http://ip:15672

sudo rabbitmq-plugins enable rabbitmq_management
sudo rabbitmq-plugins enable rabbitmq_management_visualiser

参考

相关文章
最近更新
最新评论
加载中...