• 欢迎来到本博客,希望可以y一起学习与分享

go rabbitmq入门

笔记 benz 来源:Go 学习笔记(57)— Go 第三方库之 amqp (RabbitMQ 生产者、消费者整个流程) 2个月前 (04-13) 16次浏览 0个评论 扫描二维码
文章目录[隐藏]

前言

golang 可使用库 github.com/streadway/amqp 操作 rabbitmq 。使用下面命令安装 RabbitMQ 。

官方示例:https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go

生产者流程

在 Golang 中创建 rabbitmq 生产者基本步骤是:

  1. 连接 Connection
  2. 创建 Channel
  3. 创建或连接一个交换器
  4. 创建或连接一个队列
  5. 交换器绑定队列
  6. 投递消息
  7. 关闭 Channel
  8. 关闭 Connection

创建连接

连接的失败报错:RabbitMQ Exception (403) Reason: "no access to this vhost"
因为没有配置该用户的访问权限,可以通过
rabbitmqctl add_vhost admin
来添加,并赋予权限:
rabbitmqctl set_permissions -p 用户名 admin "." "." ".*"

代码在连接的时候,必须制定对应的vhost,否则是没有访问权限:
conn, err := amqp.Dial("amqp://sky:password@ip:5672/admin”)

创建通道

创建交换器

参数依次说明:

  • name 交换机名称
  • kind 交换机类型
  • durable 持久化标识
  • autoDelete 是否自动删除
  • internal 是否是内置交换机
  • noWait 是否等待服务器确认
  • args 其它配置

参数说明要点:

autoDelete :
自动删除功能必须要在交换器曾经绑定过队列或者交换器的情况下,处于不再使用的时候才会自动删除,如果是刚刚创建的尚未绑定队列或者交换器的交换器或者早已创建只是未进行队列或者交换器绑定的交换器是不会自动删除的。

internal :
内置交换器是一种特殊的交换器,这种交换器不能直接接收生产者发送的消息,只能作为类似于队列的方式绑定到另一个交换器,来接收这个交换器中路由的消息,内置交换器同样可以绑定队列和路由消息,只是其接收消息的来源与普通交换器不同。

noWait
当 noWait 为 true 时,声明时无需等待服务器的确认。

该通道可能由于错误而关闭。 添加一个 NotifyClose 侦听器应对任何异常。创建交换器还有一个差不多的方法( ExchangeDeclarePassive ),他主要是假定交换已存在,并尝试连接到不存在的交换将导致 RabbitMQ 引发异常,可用于检测交换器的存在。

创建队列

参数说明:

  • name 队列名称
  • durable 持久化
  • autoDelete 自动删除
  • exclusive 排他
  • noWait 是否等待服务器确认
  • args Table

参数说明要点:

exclusive 排他
排他队列只对首次创建它的连接可见,排他队列是基于连接( Connection )可见的,并且该连接内的所有信道( Channel)都可以访问这个排他队列,在这个连接断开之后,该队列自动删除,由此可见这个队列可以说是绑到连接上的,对同一服务器的其他连接不可见。

同一连接中不允许建立同名的排他队列的这种排他优先于持久化,即使设置了队列持久化,在连接断开后,该队列也会自动删除。

非排他队列不依附于连接而存在,同一服务器上的多个连接都可以访问这个队列。

autoDelete 设置是否自动删除。
为 true 则设置队列为自动删除。

自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。

不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时,这个队列自动删除”,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。

创建队列还有一个差不多的方法( QueueDeclarePassive ),他主要是假定队列已存在,并尝试连接到不存在的队列将导致 RabbitMQ 引发异常,可用于检测队列的存在。

绑定交换器和队列

参数解析:

  • name 队列名称
  • key BindingKey 根据交换机类型来设定
  • exchange 交换机名称
  • noWait 是否等待服务器确认
  • args Table

绑定交换器(可选)

参数解析:

  • destination 目的交换器
  • key RoutingKey 路由键
  • source 源交换器
  • noWait 是否等待服务器确认
  • args Table  其它参数

生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配的另一个交换器 destination ,井把消息转发到 destination 中,进而存储在 destination 绑定的队列 queue 中,某种程度上来说 destination 交换器可以看作一个队列。如图:

投递消息

参数解析:

  • exchange 交换器名称
  • key RouterKey
  • mandatory 是否为无法路由的消息进行返回处理
  • immediate 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃
  • msg 消息体

参数说明要点:

mandatory
消息发布的时候设置消息的 mandatory 属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式,设置为 true 表示将消息返回到生产者,否则直接丢弃消息。

immediate
参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。 imrnediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递:如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。

RabbitMQ 3.0版本开始去掉了对 imrnediate 参数的支持。

其中 amqp.Publishing 的 DeliveryMode 如果设为 amqp.Persistent 则消息会持久化。需要注意的是如果需要消息持久化 Queue 也是需要设定为持久化才有效。

消费者流程

消费者的步骤和生产者流程基本类似,只是将生产者流程中的投递消息变为消费消息。

Rabbitmq 消费方式共有 2 种,分别是推模式拉模式

推模式

推模式是通过持续订阅的方式来消费信息, Consume 将信道( Channel )设置为接收模式,直到取消队列的订阅为止。在接收模式期间, RabbitMQ 会不断地推送消息给消费者。推送消息的个数还是会受到 channel.Qos 的限制。

参数说明:

  • queue 队列名称
  • consumer 消息者名称
  • autoAck 是否确认消费
  • exclusive 排他
  • noLocal
  • noWait bool
  • args Table

参数说明要点:

noLocal
设置为 true 则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 中的消费者

其中 autoAck 可以设置为 true 或者 false 。

  • 如果设为 true 则消费者一接收到就从 queue 中去除了,如果消费者处理消息中发生意外该消息就丢失了。
  • 如果设为 false 则消费者在处理完消息后,调用 msg.Ack(false) 后消息才从 queue 中去除。即便当前消费者处理该消息发生意外,只要没有执行 msg.Ack(false) 那该消息就仍然在 queue 中,不会丢失。

如果autoAck设置为 false 则表示需要手动进行 ack 消费:

拉模式

相对来说比较简单,是由消费者主动拉取信息来消费,每次只消费一条消息,同样也需要进行 ack 确认消费。


文章 go rabbitmq入门 转载需要注明出处
喜欢 (0)

您必须 登录 才能发表评论!