总结:官方教程是基于回调的。 下面给出基于式(1)的写入方法。 并实现读取初始化配置地址交换机名称进行动态队列绑定。 例如,如果您正在运行 的多个实例,则可以获得当前名称。 我们建议在创建多个实例、创建日志或建立连接时使用名称。 它还可以更轻松地在问题发生时识别问题。
RabbitMQ官方提供的教程https://www.rabbitmq.com/tuto...是基于回调的。
以下是基于Promise的编写。 它还实现了动态队列绑定。
初始化设置
const amqp = require("amqplib")// RabbitMQ地址 const {amqpAddrHost} = require("../config/index .js ")//切换name const ex = "amq.topic" const amqpAddr = `amqp://${amqpAddrHost}`// 读取主机名。 当运行多个实例时,例如在 k8s 上,HOSTNAME 可以检索当前的 pod 名称。 // 如果有多个实例,最好在写入日志或建立连接时获取 pod 名称。 如果出现问题,您可以轻松识别是哪个 Pod 出现问题。 const hostName = process.env.HOSTNAME // 设置队列属性 // 一般最好配置队列自动删除autoDelete。 当链接断开时,队列也会被删除,因此您不会出现大量浪费的队列。 // Durable用于持久化。 最好将其设置为非持久性。 constqueueAttr = {autoDelete:true,resistance:false}//定义通道引用。 一旦建立链接,所有方法都可以引用CH。 获取通道方法let CH = null
发送消息到队列的函数
//向队列发送消息的函数 function publicMessage (msg) { if (!CH) { return "" } msg = JSON.stringify(msg) // 指定开关 ex、路由 key、消息内容 CH.publish (例如 eventBusTopic 、 Buffer. from(msg))}
如果链接RabbitMQ断开,主动重新连接
function reconnectRabbitMq () { log.info(" reconnect_rabbit_mq ") connectRabbitMq( )}
连接rabbitMQ的主函数
function connectRabbitMq () { amqp.connect(amqpAddr, { // 在RabbitMQ UI中设置connection_name属性 在控制台中,检查连接来自哪个实例clientProperties: { connection_name: hostName } }) .then((conn) => { log.info("rabbitmq_connect_successd") // 链接的错误事件一定要添加处理,如果不处理这个错误,程序会崩溃// 错误是必须处理的特殊事件 // 如果报错,请重新连接 conn.on("error", (err) => { log.error(" connect_error " + err.message, err) reconnectRabbitMq() }) // 创建通道 rreturn conn.createChannel() }) .then((ch) => { CH = ch // 初始化交换机 ch.assertExchange(ex, "topic", {durable: true}) // 初始化队列,如果使用队列名称 hostName 你可以从列名轻松判断是哪个实例创建了队列 return ch.assertQueue(hostName, queueAttr) }) .then((q) => { // 可以在队列创建后立即绑定路由键create 已初始化,暂时无法绑定,也可以稍后动态绑定 // CH.bindQueue(q.queue, ex, "some.topic.aaa") // Consumer,获取消息 CH.consume(q.queue, ( msg) => { var _msg = msg.content.toString() var MSG = JSON.parse( _msg) log.info(_msg, MSG) }, {noAck: true}) }) .catch((err) = > { console .log(err) })}
队列路由 key 动态绑定或解除绑定
functiontoggleBindQueue (routingKey, binding) { return new Promise((resolve, detector ) = > { if (!CH) { log.error("通道已建立") Reject(new Error("通道未建立")) return"" } // 初始化队列。 如果队列已经存在,则直接使用CH.assertQueue(`${hostName}`,queueAttr)。 then((q) => { //如果bind为true则绑定,否则解除绑定 if (bind) { log.info(`bindQueue ${hostName} ${topic}`) return CH.bindQueue(q.queue, ex, topic) } else { return CH.unbindQueue(q.queue, ex, topic) } }) .then((res) => {solve() }) .catch((err) = > {reject(err) log. error(err) }) })}module.exports = { connectRabbitMq,toggleBindQueue,publicMessage}
用法
如果服务器是 Express 使用的话,可以在你的应用程序。 .js
...const {connectRabbitMq} = require("./connect-mq.js")connectRabbitMq()...
完整代码
// onnect-mq.jsconst amqp = require("amqplib")// RabbitMQ 地址 const {amqpAddrHost} = require("../config/index.js")// 开关名称 const ex = " amq.Topic"const amqpAddr = `amqp://${amqpAddrHost}`//读取HOSTNAME。如果运行多个实例,比如在k8s上,HOSTNAME可以获取当前pod的名称。 //多个如果运行const hostName = process 的实例,最好写入日志或连接到它,这样如果出现问题,您可以轻松识别哪个 pod 有问题。 env.HOSTNAME // 设置队列属性 // 通常最好是配置autoDelete自动删除队列。 // 持久化最好设置为非持久化。 }// 一旦建立链接,所有方法都可以通过引用CH来获取通道方法: let CH = null// 向队列发送消息的函数 publicMessage (msg) { if (!CH) { return "" } msg = JSON.stringify(msg) // 指定switch ex、路由key、消息内容 CH. .publish(ex,eventBusTopic,Buffer .from(msg))}// RabbitMQ 链接断开 function reconnectRabbitMq () { log.info("reconnect_rabbit_mq") connectRabbitMq()}// 连接 RabbitMQ 的主要函数 connectRabbitMq () { amqp. connect(amqpAddr, { // 设置connection_name属性。可以使用rabbitMQ完成。在控制台 UI 中,验证链接是否已从 clientProperties 实例化: { connection_name: hostName } }) .then((conn) => { log.info("rabbitmq_connect_successd") // 确保为链接添加错误事件. // 错误是特殊事件,应该处理 // 如果报错,重新连接即可 conn.on("error", ( err) => { log.error( "connect_error " + err.message, err) reconnectRabbitMq () }) // 创建通道 return conn.createChannel() }) .then((ch) => { CH = ch // 初始化 switch ch.assertExchange(ex, "topic", {durable: true}) //初始化队列,从列名hostName,queueAttr)很容易看出是哪个实例创建的队列 } ) .then((q) => { // 可以在队列初始化后立即绑定路由键,也可以暂时跳过,稍后动态绑定 // CH.bindQueue(q.queue, ex, " some .topic.aaa") // Consumer,获取消息 CH.consume(q.queue, (msg) => { var _msg = msg.content.toString()var MSG = JSON.parse(_msg) log.info(_msg, MSG) }, {noAck: true}) }) .catch((err) => { console.log(err) })}//动态给予bind或取消队列 key 的绑定路由 function toggleBindQueue (routingKey, Binding) { return new Promise((resolve,reject) => { if (!CH) { log.error("通道未建立")reject( new Error("通道未建立")已建立) 已建立")) return "" } // 初始化队列。 如果队列已经存在,则直接使用CH.assertQueue(`${hostName}`,queueAttr)。 then((q) => { // 如果绑定,否则解除绑定 if (bind) { log.info(`bindQueue ${hostName} ${topic}`) return CH.bindQueue(q .queue, ex, topic) } else { return CH.unbindQueue (q.queue, ex, topic) } }) .then((res) => {solve() }) .catch((err) => {拒绝 (err) log.error(err ) }) })} module.exports = {connectRabbitMq、toggleBindQueue、publishMessage}
评论前必须登录!
注册