目录
1.消息确认模式
在RabbitMQ中,消息确认主要有生产者发生确认和消费者接收确认
1.1生产者发送确认
生产者发送消息到RabbitMQ服务器,如果RabbitMQ服务器收到消息,则会给生产者一个应答,用于告诉生产者该消息已经成功到达RabbitMQ服务器中
1.2消费者接收确认
用于确认消费者是否成功消费了该条消息
消息确认实现方式有两种
- 通过事务的方式
- confirm确认机制,因为事务模式比较消耗性能,在实际工作中用的也不多
2.生产者发送确认
2.1 开启confirm模式
当Channel.Confirm(noWait bool)参数设置为false时,broker会返回一个confirm.ok表示同意发送者将当前channel信道设置为confirm模式。
其他代码和transaction模式类似,只是没有Channel.TxCommit()和Channel.TxRollback()。
1 |
err = channel.Confirm(false) |
2.2 以confirm模式发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
package main import ( "fmt" "github.com/streadway/amqp" "rmq/db/rmq" ) var ( channel *amqp.Channel err error queue amqp.Queue conn *amqp.Connection ) func main() { conn, err = rmq.GetConn() defer conn.Close() channel, err = conn.Channel() if err != nil { fmt.Printf("error: %s \n", err.Error()) return } defer channel.Close() err = channel.Confirm(false) if err != nil { fmt.Printf("error: %s \n", err.Error()) return } queue, err = channel.QueueDeclare("confirm:message", false, false, false, false, nil) if err != nil { fmt.Printf("error: %s \n", err.Error()) return } confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 1)) defer confirmOne(confirms) err = channel.Publish("", queue.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte("confirm message"), }) if err != nil { fmt.Printf("error: %s \n", err.Error()) return } fmt.Println("消息发送成功") } func confirmOne(confirms <-chan amqp.Confirmation) { if confirmed := <-confirms; confirmed.Ack { fmt.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag) } else { fmt.Printf("confirmed delivery of delivery tag: %d", confirmed.DeliveryTag) } } |
消息拒绝
1 2 3 |
_ = d.Nack(false, false) // 手动拒绝消息 可以拒绝多条消息 第二个参数设置为true 将再次放入队列中 _ = d.Reject(true) // 手动拒绝消息 只能拒绝一条消息 为true 将再次放入队列中 _ = d.Ack(false) // 手动确认 |
消息幂等性
消息幂等性其实就是保证同一个消息不被消费者重复消费两次
当消费者消费完之后,通常会发送一个ack应答确认消息给生产者
但是这中间有可能因为网络中断等原因,导致生产者未能收到确认消息,有此这条消息将被重复发送给消费者消费,实际上这条消息已经被消费过了,这就是重复消费的问题!!!
1.1 如何避免重复消费
- 消息全局ID或者写个唯一标识 (时间戳,uuid等),每次消费消息之前根据消息id去判断该消息是否已被消费过,如果已经消费国,则不处理该消息,否则正常消费,并且进行入库操作(消息全局ID作为数据库表的主键,防止重复)
- 利用redis的setnx命令,给消息分配一个全局ID,只要消费过该消息,将id message k:v 形式写入redis 消费者开始消费前 先去redis查询有没有消费记录
1.2 代码演示
生产者
1 2 3 4 5 6 7 |
channel.Publish("", queue.Name, false, false, amqp.Publishing{ MessageId: uuid.NewV4().String(), Timestamp: time.Now(), ContentType: "text/plain", Body: []byte(fmt.Sprintf("hello---%d", i)), }) |
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
go func() { for d := range megs { err = db.GetRedis().Get(d.MessageId).Err() if err != redis.Nil { // 消息已被消费 忽略 logger.Warn("消息已被消费 忽略 %s", d.MessageId) _ = d.Reject(false) continue } logger.Info("messageBody: %s", d.Body) logger.Info("messageID: %s", d.MessageId) logger.Info("Timestamp: %s", d.Timestamp.Format("2006-01-02 15:04:05")) if err := d.Ack(false); err != nil { logger.Error("消息确认失败") } else { db.GetRedis().SetNX(d.MessageId, d.Body, time.Hour*2) logger.Warn("设置消息id") } } }() |