rabbitmq的三种队列以及使用方式(go)-创新互联

rabbitmq的三种队列以及使用方式(beego)

创新互联建站坚持“要么做到,要么别承诺”的工作理念,服务领域包括:做网站、网站设计、企业官网、英文网站、手机端网站、网站推广等服务,满足客户于互联网时代的博白网站设计、移动媒体设计的需求,帮助企业找到有效的互联网解决方案。努力成为您成熟可靠的网络建设合作伙伴!

提示:最上面右调用的统一调用,最下面有消费的代码注:消费需根据条件修改并另起一个mian.go


前言

面试官问这个问题,肯定是想知道你们公司有一个什么场景需要使用到这个Mq,这个场景有一个什么技术挑战导致必须要用这个mq,用了这个mq之后有什么好处。mq经典的使用场景有解耦,异步,削锋。
而rabbitmq是如何进行使用的他的使用发放是什么呢?


提示:先是统一的引入跟实例化

import (
	"bytes"
	"fmt"
	"github.com/streadway/amqp"
)

type Callback func(str1 string)

//Connect RabbitMQ连接函数
func Connect() (conn *amqp.Connection, err error) {//连接mq
	conn, err = amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
	return conn, err
}


func BytesToString(b *[]byte) *string {s := bytes.NewBuffer(*b)
	r := s.String()
	return &r
}
一、rabbitmq的普通队列(路由模式) 1.生成者

示例:这是最普通的rabbitmq的生成者

//Publish 发送端函数
//exchange交换机名称
//queueName队列名称
//body发送内容
func Publish(exchange string, queueName string, body string) error {//建立连接
	conn, err := Connect()
	if err != nil {return err
	}
	defer conn.Close()

	//创建通道channel
	channel, err := conn.Channel()
	if err != nil {return err
	}
	defer channel.Close()

	//创建队列
	q, err := channel.QueueDeclare(
		queueName, //队列名称
		true,      //持久化
		false,
		false,
		false,
		nil,
	)
	if err != nil {return err
	}
	//发送消息
	err = channel.Publish(exchange, q.Name, false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,
		ContentType:  "text/plain",
		Body:         []byte(body),
	})
	return err
}
2.消费者

实例:这是普通队列的消费

//Consumer 接受方法
func Consumer(exchange string, queueName string, callback Callback) {//建立连接
	conn, err := Connect()
	defer conn.Close()
	if err != nil {fmt.Println(err)
		return
	}
	//创建通道channel
	channel, err := conn.Channel()
	defer channel.Close()

	if err != nil {fmt.Println(err)
		return
	}

	//创建queue
	q, err := channel.QueueDeclare(
		queueName,
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {fmt.Println(err)
		return
	}
	//输出
	msgs, err := channel.Consume(
		q.Name,
		"",
		false, //手动应答
		false,
		false,
		false,
		nil,
	)
	if err != nil {fmt.Println(err)
		return
	}
	forever := make(chan bool)
	go func() {for d := range msgs {	s := BytesToString(&(d.Body))
			callback(*s)
			d.Ack(false)
		}
	}()
	fmt.Printf("Waiting for messages")
	<-forever
}

func BytesToString(b *[]byte) *string {s := bytes.NewBuffer(*b)
	r := s.String()
	return &r
}
func callback(s string) {fmt.Printf("msg:%s", s)
	return
}
二、rabbitmq的并发队列(主题模式) 1.生产者

代码如下(示例):

func PublishEx(exchange string, types string, routingKey string, body string) error {//建立连接
	conn, err := Connect()
	defer conn.Close()
	if err != nil {return err
	}
	//创建channel
	channel, err := conn.Channel()
	defer channel.Close()
	if err != nil {return err
	}

	//创建交换机
	err = channel.ExchangeDeclare(
		exchange,
		types,
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {return err
	}

	err = channel.Publish(exchange, routingKey, false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,
		ContentType:  "text/plain",
		Body:         []byte(body),
	})
	return err
}
2.消费者

代码如下(示例):

func ConsumerEx(exchange string, types string, routingKey string, callback Callback) {//建立连接
	conn, err := Connect()
	defer conn.Close()
	if err != nil {fmt.Println(err)
		return
	}
	//创建通道channel
	channel, err := conn.Channel()
	defer channel.Close()
	if err != nil {fmt.Println(err)
		return
	}

	//创建交换机
	err = channel.ExchangeDeclare(
		exchange,
		types,
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {fmt.Println(err)
		return
	}

	//创建队列
	q, err := channel.QueueDeclare(
		"",
		false,
		false,
		true,
		false,
		nil,
	)
	if err != nil {fmt.Println(err)
		return
	}

	//绑定
	err = channel.QueueBind(
		q.Name,
		routingKey,
		exchange,
		false,
		nil,
	)
	if err != nil {fmt.Println(err)
		return
	}

	msgs, err := channel.Consume(q.Name, "", false, false, false, false, nil)
	if err != nil {fmt.Println(err)
		return
	}

	forever := make(chan bool)
	go func() {for d := range msgs {	s := BytesToString(&(d.Body))
			callback(*s)
			d.Ack(false)
		}
	}()
	fmt.Printf("Waiting for messages\n")
	<-forever
}
   
三、rabbitmq的双队列(死信队列) 1.生产者

代码如下(示例):

func PublishDlx(exchangeA string, body string) error {//建立连接
	conn, err := Connect()
	if err != nil {return err
	}
	defer conn.Close()

	//创建一个Channel
	channel, err := conn.Channel()
	if err != nil {return err
	}
	defer channel.Close()

	//消息发送到A交换机
	err = channel.Publish(exchangeA, "", false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,
		ContentType:  "text/plain",
		Body:         []byte(body),
	})

	return err
}
2.消费者

代码如下(示例):

func ConsumerDlx(exchangeA string, queueAName string, exchangeB string, queueBName string, ttl int, callback Callback) {//建立连接
	conn, err := Connect()
	if err != nil {fmt.Println(err)
		return
	}
	defer conn.Close()

	//创建一个Channel
	channel, err := conn.Channel()
	if err != nil {fmt.Println(err)
		return
	}
	defer channel.Close()

	//创建A交换机
	//创建A队列
	//A交换机和A队列绑定
	err = channel.ExchangeDeclare(
		exchangeA, // name
		"fanout",  // type
		true,      // durable
		false,     // auto-deleted
		false,     // internal
		false,     // no-wait
		nil,       // arguments
	)
	if err != nil {fmt.Println(err)
		return
	}

	//创建一个queue,指定消息过期时间,并且绑定过期以后发送到那个交换机
	queueA, err := channel.QueueDeclare(
		queueAName, // name
		true,       // durable
		false,      // delete when usused
		false,      // exclusive
		false,      // no-wait
		amqp.Table{	// 当消息过期时把消息发送到 exchangeB
			"x-dead-letter-exchange": exchangeB,
			"x-message-ttl":          ttl,
			//"x-dead-letter-queue" : queueBName,
			//"x-dead-letter-routing-key" :
		},
	)
	if err != nil {fmt.Println(err)
		return
	}

	//A交换机和A队列绑定
	err = channel.QueueBind(
		queueA.Name, // queue name
		"",          // routing key
		exchangeA,   // exchange
		false,
		nil,
	)
	if err != nil {fmt.Println(err)
		return
	}
	//创建B交换机
	//创建B队列
	//B交换机和B队列绑定
	err = channel.ExchangeDeclare(
		exchangeB, // name
		"fanout",  // type
		true,      // durable
		false,     // auto-deleted
		false,     // internal
		false,     // no-wait
		nil,       // arguments
	)
	if err != nil {fmt.Println(err)
		return
	}

	//创建一个queue
	queueB, err := channel.QueueDeclare(
		queueBName, // name
		true,       // durable
		false,      // delete when usused
		false,      // exclusive
		false,      // no-wait
		nil,        // arguments
	)
	if err != nil {fmt.Println(err)
		return
	}

	//B交换机和B队列绑定
	err = channel.QueueBind(
		queueB.Name, // queue name
		"",          // routing key
		exchangeB,   // exchange
		false,
		nil,
	)
	if err != nil {fmt.Println(err)
		return
	}

	msgs, err := channel.Consume(queueB.Name, "", false, false, false, false, nil)
	if err != nil {fmt.Println(err)
		return
	}

	forever := make(chan bool)
	go func() {for d := range msgs {	s := BytesToString(&(d.Body))
			callback(*s)
			d.Ack(false)
		}
	}()

	fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
	<-forever
}
   

剩下的统一调用消费者模板

package main

	import (
		"encoding/json"
		"fmt"
		"github.com/beego/beego/v2/client/orm"
		beego "github.com/beego/beego/v2/server/web"
		"github.com/garyburd/redigo/redis"
		"goApi/models"
		_ "goApi/routers"
		redisClient "goApi/services"
		"goApi/services/mq"
		"strconv"
	)
	
	func main() {beego.LoadAppConfig("ini", "../../conf/app.conf")
		//err := orm.RegisterDataBase("default", "mysql", "fukw:ipx4JtpXR6sCxmKt@tcp(127.0.0.1)/fukw?charset=utf8")
		err := orm.RegisterDataBase("default", "mysql", "root:root@tcp(127.0.0.1)/fukw?charset=utf8")
		if err != nil {	fmt.Println("连接数据库失败")
		}
	
		c, err := redis.Dial("tcp", "127.0.0.1:6379")
		if err != nil {	fmt.Println("redis连接失败")
		}
		defer c.Close()
	
		mq.Consumer("", "fyouku_top", callback)
		fmt.Printf("mian执行成功")
		beego.Run()
	}
	
	func callback(s string) {type Data struct {	VideoId int
		}
		var data Data
		err := json.Unmarshal([]byte(s), &data)
		videoInfo, err := models.GetVideoInfo(data.VideoId)
		if err == nil {	conn := redisClient.RedisConnect()
			defer conn.Close()
			//更新排行榜
			//执行的代码我这里是排行榜
			redisChannelKey := "video:top:channel:channelId:" + strconv.Itoa(videoInfo.ChannelId)
			redisTypeKey := "video:top:type:typeId:" + strconv.Itoa(videoInfo.TypeId)
			conn.Do("zincrby", redisChannelKey, 1, data.VideoId)
			conn.Do("zincrby", redisTypeKey, 1, data.VideoId)
		}
		fmt.Printf("msg is :%s\n", s)
	}

总结

这就是rabbitmq的3种队列的的书写形式

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧

当前名称:rabbitmq的三种队列以及使用方式(go)-创新互联
文章源于:https://www.cdcxhl.com/article18/csjigp.html

成都网站建设公司_创新互联,为您提供定制开发网站改版全网营销推广网站制作网站内链品牌网站设计

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联

网站优化排名