关注分享主机优惠活动
国内外VPS云服务器

Node.js连接RabbitMQ、断开重连、动态绑定路由键

总结:官方教程是基于回调的。 下面给出基于式(1)的写入方法。 并实现读取初始化配置地址交换机名称进行动态队列绑定。 例如,如果您正在运行 的多个实例,则可以获得当前名称。 我们建议在创建多个实例、创建日志或建立连接时使用名称。 它还可以更轻松地在问题发生时识别问题。

RabbitMQ官方提供的教程https://www.rabbitmq.com/tuto...是基于回调的。

以下是基于Promise的编写。 它还实现了动态队列绑定。

初始化设置

const amqp = require("amqplib")// RabbitMQ地址 const {amqpAddrHost} = require("../config/index .js ")//切换name const ex = "amq.topic" const amqpAddr = `amqp://${amqpAddrHost}`// 读取主机名。   当运行多个实例时,例如在 k8s 上,HOSTNAME 可以检索当前的 pod 名称。  // 如果有多个实例,最好在写入日志或建立连接时获取 pod 名称。  如果出现问题,您可以轻松识别是哪个 Pod 出现问题。    const hostName = process.env.HOSTNAME // 设置队列属性 // 一般最好配置队列自动删除autoDelete。  当链接断开时,队列也会被删除,因此您不会出现大量浪费的队列。  // Durable用于持久化。  最好将其设置为非持久性。   constqueueAttr = {autoDelete:true,resistance:false}//定义通道引用。  一旦建立链接,所有方法都可以引用CH。    获取通道方法let CH = null

发送消息到队列的函数

//向队列发送消息的函数 function publicMessage (msg) { if (!CH) { return "" } msg = JSON.stringify(msg) // 指定开关 ex、路由 key、消息内容 CH.publish (例如 eventBusTopic 、 Buffer. from(msg))}

如果链接RabbitMQ断开,主动重新连接

function reconnectRabbitMq () { log.info(" reconnect_rabbit_mq ") connectRabbitMq( )}

连接rabbitMQ的主函数

function connectRabbitMq () { amqp.connect(amqpAddr, { // 在RabbitMQ UI中设置connection_name属性 在控制台中,检查连接来自哪个实例clientProperties: { connection_name: hostName } }) .then((conn) => { log.info("rabbitmq_connect_successd") // 链接的错误事件一定要添加处理,如果不处理这个错误,程序会崩溃// 错误是必须处理的特殊事件 // 如果报错,请重新连接 conn.on("error", (err) => { log.error(" connect_error " + err.message, err) reconnectRabbitMq() }) // 创建通道 rreturn conn.createChannel() }) .then((ch) => { CH = ch // 初始化交换机 ch.assertExchange(ex, "topic", {durable: true}) // 初始化队列,如果使用队列名称 hostName 你可以从列名轻松判断是哪个实例创建了队列 return ch.assertQueue(hostName, queueAttr) }) .then((q) => { // 可以在队列创建后立即绑定路由键create 已初始化,暂时无法绑定,也可以稍后动态绑定 // CH.bindQueue(q.queue, ex, "some.topic.aaa") // Consumer,获取消息 CH.consume(q.queue, ( msg) => { var _msg = msg.content.toString() var MSG = JSON.parse( _msg) log.info(_msg, MSG) }, {noAck: true}) }) .catch((err) = > { console .log(err) })}

队列路由 key 动态绑定或解除绑定

functiontoggleBindQueue (routingKey, binding) { return new Promise((resolve, detector ) = > { if (!CH) { log.error("通道已建立") Reject(new Error("通道未建立")) return"" } // 初始化队列。  如果队列已经存在,则直接使用CH.assertQueue(`${hostName}`,queueAttr)。   then((q) => { //如果bind为true则绑定,否则解除绑定 if (bind) { log.info(`bindQueue ${hostName} ${topic}`) return CH.bindQueue(q.queue, ex, topic) } else { return CH.unbindQueue(q.queue, ex, topic) } }) .then((res) => {solve() }) .catch((err) = > {reject(err) log. error(err) }) })}module.exports = { connectRabbitMq,toggleBindQueue,publicMessage}

用法

如果服务器是 Express 使用的话,可以在你的应用程序。 .js

...const {connectRabbitMq} = require("./connect-mq.js")connectRabbitMq()...

完整代码

// onnect-mq.jsconst amqp = require("amqplib")// RabbitMQ 地址 const {amqpAddrHost} = require("../config/index.js")// 开关名称 const ex = " amq.Topic"const amqpAddr = `amqp://${amqpAddrHost}`//读取HOSTNAME。如果运行多个实例,比如在k8s上,HOSTNAME可以获取当前pod的名称。 //多个如果运行const hostName = process 的实例,最好写入日志或连接到它,这样如果出现问题,您可以轻松识别哪个 pod 有问题。 env.HOSTNAME // 设置队列属性 // 通常最好是配置autoDelete自动删除队列。 // 持久化最好设置为非持久化。 }// 一旦建立链接,所有方法都可以通过引用CH来获取通道方法: let CH = null// 向队列发送消息的函数 publicMessage (msg) { if (!CH) { return "" } msg = JSON.stringify(msg) // 指定switch ex、路由key、消息内容 CH. .publish(ex,eventBusTopic,Buffer .from(msg))}// RabbitMQ 链接断开 function reconnectRabbitMq () { log.info("reconnect_rabbit_mq") connectRabbitMq()}// 连接 RabbitMQ 的主要函数 connectRabbitMq () { amqp. connect(amqpAddr, { // 设置connection_name属性。可以使用rabbitMQ完成。在控制台 UI 中,验证链接是否已从 clientProperties 实例化: { connection_name: hostName } }) .then((conn) => { log.info("rabbitmq_connect_successd") // 确保为链接添加错误事件. // 错误是特殊事件,应该处理 // 如果报错,重新连接即可 conn.on("error", ( err) => { log.error( "connect_error " + err.message, err) reconnectRabbitMq () }) // 创建通道 return conn.createChannel() }) .then((ch) => { CH = ch // 初始化 switch ch.assertExchange(ex, "topic", {durable: true}) //初始化队列,从列名hostName,queueAttr)很容易看出是哪个实例创建的队列 } ) .then((q) => { // 可以在队列初始化后立即绑定路由键,也可以暂时跳过,稍后动态绑定 // CH.bindQueue(q.queue, ex, " some .topic.aaa") // Consumer,获取消息 CH.consume(q.queue, (msg) => { var _msg = msg.content.toString()var MSG = JSON.parse(_msg) log.info(_msg, MSG) }, {noAck: true}) }) .catch((err) => { console.log(err) })}//动态给予bind或取消队列 key 的绑定路由 function toggleBindQueue (routingKey, Binding) { return new Promise((resolve,reject) => { if (!CH) { log.error("通道未建立")reject( new Error("通道未建立")已建立) 已建立")) return "" } // 初始化队列。 如果队列已经存在,则直接使用CH.assertQueue(`${hostName}`,queueAttr)。  then((q) => { // 如果绑定,否则解除绑定 if (bind) { log.info(`bindQueue ${hostName} ${topic}`) return CH.bindQueue(q .queue, ex, topic) } else { return CH.unbindQueue (q.queue, ex, topic) } }) .then((res) => {solve() }) .catch((err) => {拒绝 (err) log.error(err ) }) })} module.exports = {connectRabbitMq、toggleBindQueue、publishMessage}

未经允许不得转载:主机频道 » Node.js连接RabbitMQ、断开重连、动态绑定路由键

评论 抢沙发

评论前必须登录!