golang之消息队列rabbitmq_golang 消息队列_中二的灰太狼的博客-CSDN博客


本站和网页 https://blog.csdn.net/weixin_50134791/article/details/120851969 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

golang之消息队列rabbitmq_golang 消息队列_中二的灰太狼的博客-CSDN博客
golang之消息队列rabbitmq
中二的灰太狼
已于 2023-01-03 19:53:32 修改
4384
收藏
17
文章标签:
rabbitmq
golang
消息队列
于 2021-10-19 21:07:24 首次发布
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_50134791/article/details/120851969
版权
文章目录
消息队列的作用:收发流程docker安装官方文档消息收发模式1.简单模式2.工作队列模式3.发布订阅模式(扇出模式)4.direct(路由)模式:5.topic模式
用go操作rabbitmq写代码的思路收发模式2示例:fanout模式示例:routing(路由)模式示例topic模式
高级操作消费者确认模式:消费限流延迟消息持久化交换机持久化:队列持久化消息持久化
消息队列的作用:
异步,将同步的消息变为异步,例如我们可以使用rpc调用另一个服务,但是我们必须等待返回(同步),用mq可以变异步解耦,将单体服务拆分多个微服务,实现了分布式部署,单个服务的修改、增加或删除,不影响其他服务,不需要全部服务关闭重启抗压,由于是异步,解耦的,高并发请求到来时,我们不直接发送给服务,而是发给MQ,让服务决定什么时候接收消息,提供服务,这样就缓解了服务的压力 图示: 用户注册后发邮件和虚拟币: 异步解耦图: 抗压图:
收发流程
生产者发送消息的流程
生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel)生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等生产者通过 routingKey (路由Key)将交换器和队列绑定( binding )起来生产者发送消息至RabbitMQ Broker,其中包含 routingKey (路由键)、交换器等信息相应的交换器根据接收到的 routingKey 查找相匹配的队列。如果找到,则将从生产者发送过来的消息存入相应的队列中。如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者关闭信道。关闭连接
消费者接收消息的过程
消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及 做一些准备工作等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。消费者确认( ack) 接收到的消息。RabbitMQ 从队列中删除相应己经被确认的消息。关闭信道。关闭连接。
docker安装
拉取image:
docker pull rabbitmq:3.8-management-alpine
启动容器: 5672进行通信,15672 ,web管理工具
docker run -d --name rmq \
-e RABBITMQ_DEFAULT_USER=用户名 \
-e RABBITMQ_DEFAULT_PASS=密码 \
-p 15672:15672 \
-p 5672:5672 \
rabbitmq:3.8-management-alpine
官方文档
官方文档
消息收发模式
明确连个概念,exchange(路由) queue(队列) 工作模式: 以下用p 代指生产者,用 c 代指消费者,用 x 代指 exchange
1.简单模式
p发给队列,单个c消费,这里用的默认exchange,收发模式是direct
2.工作队列模式
p发给队列,多个c消费,这里用的默认exchange,收发模式是direct
3.发布订阅模式(扇出模式)
fandout模式:p将消息发给x,x将同一个消息发给所有q,c 按 1,2方式消费q的消息
4.direct(路由)模式:
p 按照路由Routing Key将消息发给q,(一个消息可能发给多个q),c 按 1,2方式消费q的消息
5.topic模式
p 按照路由Routing Key将消息发给q,(一个消息可能发给多个q),c 按 1,2方式消费q的消息,与4的区别是topic可以有通配符匹配
用go操作rabbitmq
写代码的思路
在初始化中完成
声明exchange声明queue将queue与key、exchange绑定
然后用conn.Channel()和rabbitmq交互
go get github.com/rabbitmq/amqp091-go
收发模式2示例:
package main
import (
"fmt"
"github.com/streadway/amqp"
"time"
func main() {
conn, err := amqp.Dial("amqp://用户名:密码@IP:端口/")
if err != nil {
panic(err)
ch, err := conn.Channel()
if err != nil {
panic(err)
//durable 服务器重启还有queue autoDelete 自动删除 exclusive 独占连接,这个q别人连不上 noWait 是否等待返回的一些状态结果
//关于queue的一些设置
q, err := ch.QueueDeclare("go_q1", true, false, false, false, nil)
if err != nil {
panic(err)
// 开启消费者
go consume("c1",conn, q.Name)
go consume("c2",conn, q.Name)
i := 0
for {
i++
err := ch.Publish("", q.Name, false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf("message %d", i)),
})
if err != nil {
panic(err)
time.Sleep(200 * time.Millisecond)
func consume(name string,conn *amqp.Connection, q string) {
ch, err := conn.Channel()
if err != nil {
panic(err)
msgs, err := ch.Consume(q,name,true, false,false,false,nil)
if err != nil {
panic(err)
for msg := range msgs {
fmt.Printf("%s:%s\n",name,msg.Body)
fanout模式示例:
package main
import (
"fmt"
"github.com/streadway/amqp"
"time"
func main() {
conn, err := amqp.Dial("amqp://用户名:密码@IP:端口/")
if err != nil {
panic(err)
ch, err := conn.Channel()
if err != nil {
panic(err)
err = ch.ExchangeDeclare("ex","fanout",true,false,false,false,nil)
if err != nil {
panic(err)
go subscribe(conn,"ex")
go subscribe(conn,"ex")
i := 0
for {
i++
err := ch.Publish("ex", "", false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf("message %d", i)),
})
if err != nil {
panic(err)
time.Sleep(200 * time.Millisecond)
func subscribe(conn *amqp.Connection, ex string) {
ch, err := conn.Channel()
if err != nil {
panic(err)
defer ch.Close()
q, err := ch.QueueDeclare("", false, true, false, false, nil)
if err != nil {
panic(err)
defer ch.QueueDelete(q.Name, false,false,false)
err = ch.QueueBind(q.Name,"",ex,false,nil)
if err != nil {
panic(err)
consume("c3",ch,q.Name)
func consume(name string,ch *amqp.Channel, q string) {
msgs, err := ch.Consume(q,name,true, false,false,false,nil)
if err != nil {
panic(err)
for msg := range msgs {
fmt.Printf("%s:%s\n",name,msg.Body)
写代码的时候注意,收发消息,一定要在不同的channel进行,大家可以把channel认为是一个tcp连接的分割。建立exchang的channel可以进行发消息,不可以进行收消息 可以看到有一个exchange,对应2个queue。对应一条tcp连接(分成3个channel,1个向exchange发,2个从queue收)
routing(路由)模式示例
package main
import (
"fmt"
"github.com/streadway/amqp"
"strconv"
"time"
const (
exchangeName = "ex_routing"
key1 = "key1"
key2 = "key2"
queueBindKey1 = "queue1"
queueBindKey2 = "queue2"
func main() {
dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/", "xxxxx", "xxxxx", "xxxxx", "xxxxx")
conn, err := amqp.Dial(dsn)
if err != nil {
panic(err)
ch, err := conn.Channel()
if err != nil {
panic(err)
InitMQ(ch,queueBindKey1,key1,exchangeName)
InitMQ(ch,queueBindKey2,key2,exchangeName)
go subscribe(conn, key1,queueBindKey1)
go subscribe(conn, key2,queueBindKey2)
i := 0
for {
i++
sendMessage(ch,exchangeName,key1,strconv.Itoa(i))
sendMessage(ch,exchangeName,key2,strconv.Itoa(i))
time.Sleep(500 * time.Millisecond)
func InitMQ(ch *amqp.Channel, queue,key,exchange string) {
// 声明 exchange
err := ch.ExchangeDeclare(exchangeName, "direct", true, false, false, false, nil)
if err != nil {
panic(err)
// 声明 queue
_, err = ch.QueueDeclare(queue, false, false, false, false, nil)
if err != nil {
panic(err)
// 将 queue 与 exchange 和 key 绑定
err = ch.QueueBind(queue, key, exchange, false, nil)
if err != nil {
panic(err)
func sendMessage(ch *amqp.Channel, exchange string, key string,message string) {
err := ch.Publish(exchange, key, false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf("send to %s, message: %v", key,message)),
})
if err != nil {
panic(err)
func subscribe(conn *amqp.Connection, key string,queue string) {
ch, err := conn.Channel()
if err != nil {
panic(err)
defer ch.Close()
key = fmt.Sprintf("%s haha",key)
consume(key, ch, queue)
func consume(name string, ch *amqp.Channel, queue string) {
msgs, err := ch.Consume(queue, name, true, false, false, false, nil)
if err != nil {
panic(err)
for msg := range msgs {
fmt.Printf("%s:%s\n", name, msg.Body)
绑定图:
topic模式
是rabbitmq最高级模式了,没啥说的,重点就是,*匹配1个,#匹配0或多个
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
"time"
const (
TopicExchange = "topicExchange"
BindingKey1 = "*.*.red"
BindingKey2 = "*.error.*"
BindingKey3 = "shanghai.*.*"
Queue1 = "queue1"
Queue2 = "queue2"
Queue3 = "queue3"
RoutingKey1 = "beijing.error"
RoutingKey2 = "shanghai.fatal.red"
func main() {
dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/", "用户名", "密码", "ip", port)
conn, err := amqp.Dial(dsn)
if err != nil {
panic(err)
ch, err := conn.Channel()
if err != nil {
panic(err)
InitMQ(ch, Queue1, BindingKey1, TopicExchange)
InitMQ(ch, Queue2, BindingKey2, TopicExchange)
InitMQ(ch, Queue3, BindingKey3, TopicExchange)
ch2 := GenChannel(conn)
go subscribe(ch2, BindingKey1, Queue1, func(msgs <-chan amqp.Delivery, s string) {
for msg := range msgs {
fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue1,BindingKey1,string(msg.Body))
})
go subscribe(ch2, BindingKey2, Queue2, func(msgs <-chan amqp.Delivery, s string) {
for msg := range msgs {
fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue2,BindingKey2,string(msg.Body))
})
go subscribe(ch2, BindingKey3, Queue3, func(msgs <-chan amqp.Delivery, s string) {
for msg := range msgs {
fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue3,BindingKey3,string(msg.Body))
})
for {
sendMessage(ch, TopicExchange, RoutingKey1, "beijing.error")
sendMessage(ch, TopicExchange, RoutingKey2, "shanghai.fatal.red")
time.Sleep(500 * time.Millisecond)
func GenChannel(conn *amqp.Connection) *amqp.Channel {
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
return ch
func InitMQ(ch *amqp.Channel, queue, key, exchange string) {
// 声明 exchange
err := ch.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
if err != nil {
panic(err)
// 声明 queue
_, err = ch.QueueDeclare(queue, false, false, false, false, nil)
if err != nil {
panic(err)
// 将 queue 与 exchange 和 key 绑定
err = ch.QueueBind(queue, key, exchange, false, nil)
if err != nil {
panic(err)
func sendMessage(ch *amqp.Channel, exchange string, key string, message string) {
err := ch.Publish(exchange, key, false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf("send to %s, message: %v", key, message)),
})
if err != nil {
panic(err)
func subscribe(ch *amqp.Channel, key string, queue string, callback func(<-chan amqp.Delivery, string)) {
msgs, err := ch.Consume(queue, key, true, false, false, false, nil)
if err != nil {
panic(err)
callback(msgs, key)
高级操作
消费者确认模式:
将消费消息,设置为手动确认: 成功时确认:msg.Ack(false) 失败时消息处理方式:
不进行确认,会进入unacked,当消费者重启后,或者同一队列的其他消费者可以消费 重新入列
msg.Reject(true)
丢弃
msg.Reject(false)
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
"time"
const (
TopicExchange = "topicExchange"
BindingKey1 = "*.*.red"
BindingKey2 = "*.error.*"
BindingKey3 = "shanghai.*.*"
Queue1 = "queue1"
Queue2 = "queue2"
Queue3 = "queue3"
RoutingKey1 = "beijing.error"
RoutingKey2 = "shanghai.fatal.red"
func main() {
dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/", "xxxxx", "xxxxx", "xxxxx", "xxxxx")
conn, err := amqp.Dial(dsn)
if err != nil {
panic(err)
ch, err := conn.Channel()
if err != nil {
panic(err)
InitMQ(ch, Queue1, BindingKey1, TopicExchange)
InitMQ(ch, Queue2, BindingKey2, TopicExchange)
InitMQ(ch, Queue3, BindingKey3, TopicExchange)
ch2 := GenChannel(conn)
go subscribe(ch2, BindingKey1, Queue1, func(msgs <-chan amqp.Delivery, s string) {
for msg := range msgs {
fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue1,BindingKey1,string(msg.Body))
// false 拒绝重新入列,即丢弃
//msg.Reject(false)
// true 重新入列
msg.Reject(true)
})
go subscribe(ch2, BindingKey2, Queue2, func(msgs <-chan amqp.Delivery, s string) {
for msg := range msgs {
fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue2,BindingKey2,string(msg.Body))
msg.Ack(false)
})
go subscribe(ch2, BindingKey3, Queue3, func(msgs <-chan amqp.Delivery, s string) {
for msg := range msgs {
fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue3,BindingKey3,string(msg.Body))
msg.Ack(false)
})
cycleCount := 1
for i:=0;i<cycleCount;i++ {
sendMessage(ch, TopicExchange, RoutingKey1, "beijing.error")
sendMessage(ch, TopicExchange, RoutingKey2, "shanghai.fatal.red")
time.Sleep(500 * time.Millisecond)
select {}
func GenChannel(conn *amqp.Connection) *amqp.Channel {
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
return ch
func InitMQ(ch *amqp.Channel, queue, key, exchange string) {
// 声明 exchange
err := ch.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
if err != nil {
panic(err)
// 声明 queue
_, err = ch.QueueDeclare(queue, false, false, false, false, nil)
if err != nil {
panic(err)
// 将 queue 与 exchange 和 key 绑定
err = ch.QueueBind(queue, key, exchange, false, nil)
if err != nil {
panic(err)
func sendMessage(ch *amqp.Channel, exchange string, key string, message string) {
err := ch.Publish(exchange, key, false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf("send to %s, message: %v", key, message)),
})
if err != nil {
panic(err)
func subscribe(ch *amqp.Channel, key string, queue string, callback func(<-chan amqp.Delivery, string)) {
msgs, err := ch.Consume(queue, key, false, false, false, false, nil)
if err != nil {
panic(err)
callback(msgs, key)
消费限流
限制未ack的最多有5个,必须设置为手动ack才有效
示例:
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
"time"
const (
TopicExchange = "topicExchange"
BindingKey1 = "*.*.red"
BindingKey2 = "*.error.*"
BindingKey3 = "shanghai.*.*"
Queue1 = "queue1"
Queue2 = "queue2"
Queue3 = "queue3"
RoutingKey1 = "beijing.error"
RoutingKey2 = "shanghai.fatal.red"
func main() {
dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/", "xxxx", "xxxx", "xxxx", xxxx)
conn, err := amqp.Dial(dsn)
if err != nil {
panic(err)
ch, err := conn.Channel()
if err != nil {
panic(err)
InitMQ(ch, Queue1, BindingKey1, TopicExchange)
InitMQ(ch, Queue2, BindingKey2, TopicExchange)
InitMQ(ch, Queue3, BindingKey3, TopicExchange)
ch2 := GenChannel(conn)
// 限制未ack的最多有5个,必须设置为手动ack才有效
ch2.Qos(5,0,false)
go subscribe(ch2, BindingKey1, Queue1, func(msgs <-chan amqp.Delivery, s string) {
for msg := range msgs {
go func(msg amqp.Delivery) {
fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue1,BindingKey1,string(msg.Body))
time.Sleep(time.Second * 5)
msg.Ack(false)
}(msg)
})
go subscribe(ch2, BindingKey2, Queue2, func(msgs <-chan amqp.Delivery, s string) {
for msg := range msgs {
fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue2,BindingKey2,string(msg.Body))
msg.Ack(false)
})
go subscribe(ch2, BindingKey3, Queue3, func(msgs <-chan amqp.Delivery, s string) {
for msg := range msgs {
fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue3,BindingKey3,string(msg.Body))
msg.Ack(false)
})
cycleCount := 10
for i:=0;i<cycleCount;i++ {
sendMessage(ch, TopicExchange, RoutingKey1, "beijing.error")
sendMessage(ch, TopicExchange, RoutingKey2, "shanghai.fatal.red")
time.Sleep(500 * time.Millisecond)
select {}
func GenChannel(conn *amqp.Connection) *amqp.Channel {
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
return ch
func InitMQ(ch *amqp.Channel, queue, key, exchange string) {
// 声明 exchange
err := ch.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
if err != nil {
panic(err)
// 声明 queue
_, err = ch.QueueDeclare(queue, false, false, false, false, nil)
if err != nil {
panic(err)
// 将 queue 与 exchange 和 key 绑定
err = ch.QueueBind(queue, key, exchange, false, nil)
if err != nil {
panic(err)
func sendMessage(ch *amqp.Channel, exchange string, key string, message string) {
err := ch.Publish(exchange, key, false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf("send to %s, message: %v", key, message)),
})
if err != nil {
panic(err)
func subscribe(ch *amqp.Channel, key string, queue string, callback func(<-chan amqp.Delivery, string)) {
msgs, err := ch.Consume(queue, key, false, false, false, false, nil)
if err != nil {
panic(err)
callback(msgs, key)
延迟消息
借助rabbitmq-delayed-message-exchange插件实现(需要先安装好)
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
"time"
const (
TopicExchange = "topicExchange"
DelayExchange = "delayExchange"
BindingKey1 = "*.*.red"
BindingKey2 = "*.error.#"
BindingKey3 = "shanghai.*.*"
Queue1 = "queue1"
Queue2 = "queue2"
Queue3 = "queue3"
RoutingKey1 = "beijing.error"
RoutingKey2 = "shanghai.fatal.red"
func main() {
dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/", "xxxx", "xxxx", "xxxx", 5672)
conn, err := amqp.Dial(dsn)
if err != nil {
panic(err)
ch, err := conn.Channel()
if err != nil {
panic(err)
InitMQ(ch, Queue1, BindingKey1, TopicExchange)
InitMQ(ch, Queue2, BindingKey2, TopicExchange)
InitMQ(ch, Queue3, BindingKey3, TopicExchange)
InitDelayMQ(ch, Queue2, "", DelayExchange)
ch2 := GenChannel(conn)
// 限制未ack的最多有5个,必须设置为手动ack才有效
ch2.Qos(5, 0, false)
go subscribe(ch2, BindingKey1, Queue1, func(msgs <-chan amqp.Delivery, s string) {
for msg := range msgs {
go func(msg amqp.Delivery) {
fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue1, BindingKey1, string(msg.Body))
msg.Ack(false)
}(msg)
})
go subscribe(ch2, BindingKey2, Queue2, func(msgs <-chan amqp.Delivery, s string) {
for msg := range msgs {
fmt.Println(time.Now())
fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue2, BindingKey2, string(msg.Body))
isFail := true
// 如果失败发送延迟消息给
if isFail {
delay,ok := msg.Headers["x-delay"].(int32)
if ok {
delay = delay * 2
fmt.Println(delay)
}else{
delay = 1000
sendDelayMessage(ch, DelayExchange, "", string(msg.Body), int(delay))
msg.Reject(false)
} else {
msg.Ack(false)
})
go subscribe(ch2, BindingKey3, Queue3, func(msgs <-chan amqp.Delivery, s string) {
for msg := range msgs {
fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue3, BindingKey3, string(msg.Body))
msg.Ack(false)
})
// 设置confirm,发送端消息确认
//var notifyConfirm chan amqp.Confirmation
//SetConfirm(ch, notifyConfirm)
//go ListenConfirm(notifyConfirm)
//var notifyReturn chan amqp.Return
//NotifyReturn(notifyReturn,ch)
//go ListReturn(notifyReturn)
cycleCount := 1
for i := 0; i < cycleCount; i++ {
fmt.Println(i)
//sendDelayMessage(ch, DelayExchange, "", "beijing.error-----------------",3000)
sendMessage(ch, TopicExchange, RoutingKey1, "beijing.error")
//sendMessage(ch, TopicExchange, RoutingKey2, "shanghai.fatal.red")
time.Sleep(500 * time.Millisecond)
select {}
func GenChannel(conn *amqp.Connection) *amqp.Channel {
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
return ch
func InitMQ(ch *amqp.Channel, queue, key, exchange string) {
// 声明 exchange
err := ch.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
if err != nil {
panic(err)
// 声明 queue
_, err = ch.QueueDeclare(queue, false, false, false, false, nil)
if err != nil {
panic(err)
// 将 queue 与 exchange 和 key 绑定
err = ch.QueueBind(queue, key, exchange, false, nil)
if err != nil {
panic(err)
func InitDelayMQ(ch *amqp.Channel, queue, key, exchange string) {
//申明交换机
err := ch.ExchangeDeclare(exchange, "x-delayed-message",
false, false, false, false,
map[string]interface{}{"x-delayed-type": "direct"})
if err != nil {
log.Fatal(err)
// 声明 queue
_, err = ch.QueueDeclare(queue, false, false, false, false, nil)
if err != nil {
panic(err)
// 将 queue 与 exchange 和 key 绑定
err = ch.QueueBind(queue, key, exchange, false, nil)
if err != nil {
panic(err)
func sendMessage(ch *amqp.Channel, exchange string, key string, message string) {
err := ch.Publish(exchange, key, true, false, amqp.Publishing{
Body: []byte(fmt.Sprintf("%v", message)),
})
if err != nil {
panic(err)
func sendDelayMessage(ch *amqp.Channel, exchange string, key string, message string, delay int) {
err := ch.Publish(exchange, key, true, false, amqp.Publishing{
Headers: map[string]interface{}{"x-delay": delay},
Body: []byte(fmt.Sprintf("%v", message)),
})
if err != nil {
panic(err)
func subscribe(ch *amqp.Channel, key string, queue string, callback func(<-chan amqp.Delivery, string)) {
msgs, err := ch.Consume(queue, key, false, false, false, false, nil)
if err != nil {
panic(err)
callback(msgs, key)
func SetConfirm(ch *amqp.Channel, notifyConfirm chan amqp.Confirmation) {
err := ch.Confirm(false)
if err != nil {
log.Println(err)
notifyConfirm = ch.NotifyPublish(make(chan amqp.Confirmation))
func ListenConfirm(notifyConfirm chan amqp.Confirmation) {
for ret := range notifyConfirm {
if ret.Ack {
fmt.Println("消息发送成功")
} else {
fmt.Println("消息发送失败")
func NotifyReturn(notifyReturn chan amqp.Return, channel *amqp.Channel) {
notifyReturn = channel.NotifyReturn(make(chan amqp.Return))
func ListReturn(notifyReturn chan amqp.Return) {
ret := <-notifyReturn
if string(ret.Body) != "" {
fmt.Println("消息没有投递到队列:", string(ret.Body))
panic("skfh")
持久化
交换机持久化:
交换机持久化是指将交换机的属性数据存储在磁盘上,当 MQ 的服务器发生意外或关闭之后,在重启 RabbitMQ 时不需要重新手动或执行代码去创建交换机了,交换机会自动被创建,相当于一直存在。
队列持久化
如果不将队列设置为持久化,那么在 RabbitMQ 服务重启之后,相关队列的元数据会丢失,数据也会丢失。队列都没有了,消息也找不到地方存储了。
消息持久化
RabbitMQ 的消息是依附于队列存在的,所以要想消息持久化,那么前提是队列也必须设置持久化。 在创建消息的时候,添加一个持久化消息的属性(将 delivery_mode 设置为 2)。
设置了队列和消息的持久化,当 RabbitMQ 服务重启之后,消息依旧会存在;
仅设置队列持久化,重启之后消息会丢失;
仅设置消息持久化,重启之后队列会消失,因此消息也就丢失了,所以只设置消息持久化而不设置队列持久化是没有意义的;
将所有的消息都设置为持久化(写入磁盘的速度比写入内存的速度慢的多),可能会影响 RabbitMQ 的性能,对于可靠性不是那么高的消息可以不采用持久化来提高 RabbitMQ 的吞吐量。
中二的灰太狼
关注
关注
点赞
17
收藏
觉得还不错?
一键收藏
打赏
知道了
评论
golang之消息队列rabbitmq
之前我写过一篇关于rocketmq的文章,因为rabbitmq的使用更广泛,所以写一篇关于rabbitmq,及go如何操作rabbitmq的笔记。消息队列的作用:异步,将同步的消息变为异步解耦,将单体服务拆分多个微服务,实现了分布式部署,单个服务的修改、增加或删除,不影响其他服务,不需要全部服务关闭重启抗压,由于是异步,解耦的,高并发请求到来时,我们不直接发送给服务,而是发给MQ,让服务决定什么时候接收消息,提供服务,这样就缓解了服务的压力图示:用户注册后发邮件和虚拟币:异步解耦图:抗压
复制链接
扫一扫
golang版本的RabbitMQ消息订阅的封装,多生产者多消费者。
04-14
golang版本的RabbitMQ消息订阅的封装,可以多个生产者,多个消息订阅者,每个队列可以创建多个协程处理;
可能有些消费者需求需要分布式部署,那需要指定队列名,并且需要设置队列持久化,以及排他性QueueDeclare.exclusive=false设置为非排他性.
golang实现zeromq的各种通讯模式
04-19
使用的zeromq版本为:"github.com/pebbe/zmq4"
含有req-rep pub-sub push-pull router-dealer req-router dealer-rep loadbalance)(负载均衡)的实现
参与评论
您还未登录,请先
登录
后发表或查看评论
Golang使用RabbitMQ的四种工作模式
m0_52530105的博客
06-16
67
​RabbitMQ 2007年发布,是一个在 AMQP (高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。RabbitMQ是同Erlang语言开发对于一般的使用场景而言是一个非常不错的选择。
rmqmonitor:Go(lang)编写的用于监视RabbitMQ的代理
05-18
rmqmonitor
rmqmonitor是用于以监视的代理。
拱门要求
Linux
建造
$make build
代理商启动
$/bin/bash control.sh start | stop | restart
它将在您的当前路径中创建一个临时目录var 。
指标
概述指标:
钥匙
标签
类型
笔记
rabbitmq.overview.publishRate
测量
消息发布率
rabbitmq.overview.deliverRate
测量
信息传递率
rabbitmq.overview.redeliverRate
测量
邮件重新传递的速率
rabbitmq.overview.ackRate
测量
消息确认率
rabbitmq.overview.msgs总计
测量
消息总数(未确认和准备就绪的总和
rabbitmq.overview.msgsReadyTotal
GO实现RabbitMQ【订阅/发布】
Follow_found的博客
06-08
571
RabbitMQ是实现了高级消息队列协议()的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。Producer (生产者) : 消息的生产者,投递方Consumer (消费者) : 消息的消费者RabbitMQ Broker (RabbitMQ 代理) : RabbitMQ 服务节点(单机情况中,就是代表RabbitMQ服务器)Queue (队列) : 在RabbitMQ
消息队列RabbitMQ、整合SpringBoot
weixin_44677487的博客
08-11
169
一、消息队列相关概念
消息队列(message queue)
消息队列是应用系统之间通信的方法,本质是队列,具有先进先出(FIFO)的特点,队列的元素是消息,所以叫消息队列,是一种中间件。
JMS
Java Message Service,即 Java 消息服务,是基于 JVM 消息代理的规范。实现:ActiveMQ 。
AMQP
Advanced Message Queuing Protocol,即高级消息队列协议,也是一个消息代理的规范,兼容 JMS。实现:RabbitMQ。
Spring中对消息队列的
Go语言使用RabbitMQ
稚与的博客
03-14
6448
基本概念
什么是消息队列
消息队列是一种应用(进程)间的通信方式。
生产者只需把消息发布到MQ,消费者只需重MQ中取出,可靠传递由消息队列中的消息系统来确保。
消息队列有什么用
消息队列是一种异步协作机制,最根本的用处在于将一些不需要即时生效的操作拆分出来异步执行,从而达到可靠传递、流量削峰等目的。
比如如果有一个业务需要发送短信,可以在主流程完成之后发送消息到MQ后,让主流程完结。而由另外的线程拉取MQ的消息,完成发送短信的操作。
常用的消息队列
常用的MQ大概有ActiveMQ、RabbitMQ、R
golang消息队列kafka
zorsea的博客
12-11
1336
golang消息队列kafka
go 使用NSQ(消息队列)
General_zy的博客
04-10
631
它们消耗的资源很少,可以与其他服务共存。每个channel都会收到topic所有消息的副本,实际上下游的服务是通过对应的channel来消费topic消息。总而言之,消息是从topic -> channel(每个channel接收该topic的所有消息的副本)多播的,但是从channel -> consumers均匀分布(每个消费者接收该channel的一部分消息)。流量削峰:类似秒杀(大秒)等场景下,某一时间可能会产生大量的请求,使用消息队列能够为后端处理请求提供一定的缓冲区,保证后端服务的稳定性。
go实现消息队列
CY2333333的博客
04-29
2867
使用Golang实现一个消息队列,具体要满足以下功能:
可以向消息队列发送消息和拉取消息
可以向消息队列发送消息和拉取消息
发送的消息不能超过指定的容量
拉取消息支持拉取指定数量的消息,如果不满足指定的数量,等待超时以后返回超时时间内拉取的所有消息(不会超过指定的数量)
使用Golang中的基本数据结构和功能来实现
package main
import (
"time"
// MessageQueue define the interface for the messag
手把手教姐姐写消息队列(golang)- 使用channel实现消息队列
qq_39397165的博客
09-19
4847
前言
这周姐姐入职了新公司,老板想探探他的底,看了一眼他的简历,呦呵,精通kafka,这小姑娘有两下子,既然这样,那你写一个消息队列吧。因为要用go语言写,这可给姐姐愁坏了。赶紧来求助我,我这么坚贞不屈一人,在姐姐的软磨硬泡下还是答应他了,所以接下来我就手把手教姐姐怎么写一个消息队列。下面我们就来看一看我是怎么写的吧~~~。
本代码已上传到我的github:
有需要的小伙伴,可自行下载,顺便给个小星星吧~~~
什么是消息队列
姐姐真是把我愁坏了,自己写的精通kafka,竟然不知道什么是消息队列,于是
使用golang实现的推送系统 结合rabbitmq消息队列 gin-web实现消息接收推送
02-18
golang 推送系统 单机 客户端通过websocket连接到服务端 ...服务端从rabbitmq接收消息,根据消息所属通过websocket推送到具体用户 服务端从http api接收消息,根据消息所属通过websocket推送到具体用户
Golang基于amqp协议实现rabbitMQ队列消费
01-07
发布消息 package main import ( github.com/streadway/amqp log ) //我们还需要一个辅助函数来检查每个amqp调用的返回值: func failOnError(err error, msg string) { if err != nil { log.Fatalf(%s: %s, ...
libmq:golang中的Rabbitmq消息队列监视库可用于Openstack
04-30
libmq libmq是golang中的Rabbitmq消息队列监视库。 该库可用于监视Rabbitmq队列,例如Openstack中的那些。 可以在monitor目录中找到Openstack队列的示例监视。
golang-rabbitmq连接池及channel复用 rabbbitmq集群
最新发布
06-17
golang-rabbitmq连接池及channel复用 rabbbitmq集群。自定义连接池大小及最大处理channel数。消费者底层断线自动重连。生产者底层断线自动重连 v1.0.12。底层使用轮循方式复用...默认交换机、队列、消息都会持久化磁盘
Golang之消息队列——RabbitMQ的使用
QianLiStudent的博客
01-21
5806
消息队列的使用场景
异步处理
异步是指不用一次性执行完,可以先去执行别的,等这边回应了再做处理。这里我就拿一个网上用烂的例子:用户注册。用户在正确填写信息后点击注册,这时会发起网络请求到后端去做数据合法性等的验证,一旦验证通过则会将数据写入数据库并返回注册成功类的信息。但现在很多的应用都会有短信、邮箱等其他媒介的额外通知,这些...
Golang 高性能高可用消息队列框架go-nsq使用
阿俊的博客
04-02
5032
为什么要使用Nsq
最近一直在寻找一个高性能,高可用的消息队列做内部服务之间的通讯。一开始想到用zeromq,但在查找资料的过程中,意外的发现了Nsq这个由golang开发的消息队列,毕竟是golang原汁原味的东西,功能齐全,关键是性能还不错。其中支持动态拓展,消除单点故障等特性, 都可以很好的满足我的需求
下面上一张Nsq与其他mq的对比图,看上去的确强大。下面简单记录一下Nsq的使用方...
golang实现rabbitmq消息队列
qq_38151401的博客
04-22
2663
目录
一、前期准备
二、具体实现过程
1、通用方法定义
2、Simple模式
3、工作模式
4、Publish订阅模式
5、Routine路由模式
6、Topic话题模式
三、完整代码
一、前期准备
1、安装rabbitmq(docker):https://blog.csdn.net/qq_38151401/article/details/103327339
2、熟悉了解r...
Go操作NSQ
weixin_30470643的博客
09-13
177
目录
一、NSQ
二、NSQ介绍
三、NSQ的应用场景
3.1 异步处理
3.2 应用解耦
3.3 流量削峰
四、安装
五、NSQ组件
5.1 nsqd
5.2 nsqloo...
golang 消费 rabbitmq 使用优先级
06-07
在RabbitMQ中,可以使用优先级队列(Priority Queue)来为消息定义优先级。使用优先级队列可以确保高优先级的消息被先处理,从而提高系统的性能和可靠性。
在Golang中,使用RabbitMQ客户端库(如`github.com/streadway/amqp`)可以实现消费者消费优先级队列中的消息。下面是一个使用Golang消费RabbitMQ优先级队列的例子:
```go
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("failed to connect to RabbitMQ: %v", err)
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("failed to open a channel: %v", err)
defer ch.Close()
q, err := ch.QueueDeclare(
"priority_queue", // queue name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
amqp.Table{
"x-max-priority": 10, // max priority level
},
if err != nil {
log.Fatalf("failed to declare a queue: %v", err)
msgs, err := ch.Consume(
q.Name, // queue name
"", // consumer name
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
if err != nil {
log.Fatalf("failed to register a consumer: %v", err)
for msg := range msgs {
log.Printf("received message with priority %d: %s", msg.Priority, string(msg.Body))
if err := msg.Ack(false); err != nil {
log.Printf("failed to ack message: %v", err)
```
在上面的代码中,我们首先使用`QueueDeclare`方法创建一个名为`priority_queue`的优先级队列,其中`x-max-priority`参数指定了最大的优先级级别为10。
然后使用`Consume`方法订阅该队列中的消息,使用`msg.Priority`获取消息的优先级,然后处理消息逻辑,最后使用`msg.Ack`方法手动确认消息已经被消费。
注意,需要确保发送到该队列中的消息设置了正确的优先级。
以上就是使用Golang消费RabbitMQ优先级队列的示例。
“相关推荐”对你有帮助么?
非常没帮助
没帮助
一般
有帮助
非常有帮助
提交
中二的灰太狼
CSDN认证博客专家
CSDN认证企业博客
码龄3年
暂无认证
111
原创
7490
周排名
27万+
总排名
9万+
访问
等级
1509
积分
6433
粉丝
42
获赞
24
评论
250
收藏
私信
关注
热门文章
makefile编写规则
12348
golang的chan(管道)
8719
golang之消息队列rabbitmq
4381
golang之线程安全
4153
go语言操作nacos配置中心
3044
分类专栏
linux
笔记
5篇
微服务
1篇
服务器部署
1篇
重要参考资料
最新评论
C++笔试题(部分取自剑指offer)
YL野鹤:
请问这是中望笔试c卷的题库吗
vs2019连接Linux并编译
mvpbang:
抖个机灵
vscode和vim插件快捷键冲突
weixin_44001669:
vscode和vim插件快捷键冲突
百练自得:
Mac电脑的command键怎么禁用呢?
vscode和vim插件快捷键冲突
中二的灰太狼:
不好意思,这个主要是给我自己看的,备忘,不会很详细,Ctrl Shift c 我也不清楚
您愿意向朋友推荐“博客详情页”吗?
强烈不推荐
不推荐
一般般
推荐
强烈推荐
提交
最新文章
websocket和http2
时序图学习
golang之错误处理
2022年16篇
2021年66篇
2020年29篇
目录
目录
分类专栏
linux
笔记
5篇
微服务
1篇
服务器部署
1篇
重要参考资料
目录
评论
被折叠的 条评论
为什么被折叠?
到【灌水乐园】发言
查看更多评论
添加红包
祝福语
请填写红包祝福语或标题
红包数量
红包个数最小为10个
红包总金额
红包金额最低5元
余额支付
当前余额3.43元
前往充值 >
需支付:10.00元
取消
确定
下一步
知道了
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝
规则
hope_wisdom 发出的红包
打赏作者
中二的灰太狼
你的鼓励将是我创作的最大动力
¥1
¥2
¥4
¥6
¥10
¥20
扫码支付:¥1
获取中
扫码支付
您的余额不足,请更换扫码支付或充值
打赏作者
实付元
使用余额支付
点击重新获取
扫码支付
钱包余额
抵扣说明:
1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。 2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。
余额充值