1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
package main import ( "log" "github.com/streadway/amqp" "time" "os" "errors" ) type Producer struct { name string logger *log.Logger connection *amqp.Connection channel *amqp.Channel done chan bool notifyClose chan *amqp.Error notifyConfirm chan amqp.Confirmation isConnected bool } const ( reconnectDelay = 5 * time.Second // 连接断开后多久重连 resendDelay = 5 * time.Second // 消息发送失败后,多久重发 resendTime = 3 // 消息重发次数 ) var ( errNotConnected = errors.New("not connected to the producer") errAlreadyClosed = errors.New("already closed: not connected to the producer") ) func NewProducer(name string, addr string) *Producer { producer := Producer{ logger: log.New(os.Stdout, "", log.LstdFlags), name: name, done: make(chan bool), } go producer.handleReconnect(addr) return &producer } // 如果连接失败会不断重连 // 如果连接断开会重新连接 func (producer *Producer) handleReconnect(addr string) { for { producer.isConnected = false log.Println("Attempting to connect") for !producer.connect(addr) { log.Println("Failed to connect. Retrying...") time.Sleep(reconnectDelay) } select { case <-producer.done: return case <-producer.notifyClose: } } } // 连接rabbitmq,以生产者的name定义一个队列 func (producer *Producer) connect(addr string) bool { conn, err := amqp.Dial(addr) if err != nil { return false } ch, err := conn.Channel() if err != nil { return false } ch.Confirm(false) _, err = ch.QueueDeclare( producer.name, true, // Durable false, // Delete when unused false, // Exclusive false, // No-wait nil, // Arguments ) if err != nil { return false } producer.changeConnection(conn, ch) producer.isConnected = true log.Println("Connected!") return true } // 监听Rabbit channel的状态 func (producer *Producer) changeConnection(connection *amqp.Connection, channel *amqp.Channel) { producer.connection = connection producer.channel = channel // channels没有必要主动关闭。如果没有协程使用它,它会被垃圾收集器收拾 producer.notifyClose = make(chan *amqp.Error) producer.notifyConfirm = make(chan amqp.Confirmation) producer.channel.NotifyClose(producer.notifyClose) producer.channel.NotifyPublish(producer.notifyConfirm) } // 三次重传的发消息 func (producer *Producer) Push(data []byte) error { if !producer.isConnected { return errors.New("failed to push push: not connected") } var currentTime = 0 for { err := producer.UnsafePush(data) if err != nil { producer.logger.Println("Push failed. Retrying...") currentTime += 1 if currentTime < resendTime { continue }else { return err } } ticker := time.NewTicker(resendDelay) select { case confirm := <-producer.notifyConfirm: if confirm.Ack { producer.logger.Println("Push confirmed!") return nil } case <- ticker.C: } producer.logger.Println("Push didn't confirm. Retrying...") } } // 发送出去,不管是否接受的到 func (producer *Producer) UnsafePush(data []byte) error { if !producer.isConnected { return errNotConnected } return producer.channel.Publish( "", // Exchange producer.name, // Routing key false, // Mandatory false, // Immediate amqp.Publishing{ DeliveryMode: 2, ContentType: "application/json", Body: data, Timestamp: time.Now(), }, ) } // 关闭连接/信道 func (producer *Producer) Close() error { if !producer.isConnected { return errAlreadyClosed } err := producer.channel.Close() if err != nil { return err } err = producer.connection.Close() if err != nil { return err } close(producer.done) producer.isConnected = false return nil } |