请问koa中rabbitMq的consume回调中,如何正确的返回ctx.body

发布于 2022-09-11 19:02:30 字数 2357 浏览 42 评论 0

问题描述

consume的回调中,返回ctx.body会404

import amqp from 'amqplib'
import msgSchema from './db/modules/msg'
import mongoose from 'mongoose'
import dbConfig from './db/config'
import Promise from 'bluebird'
global.Promise = Promise
/*eslint-disable no-undef*/
mongoose.Promise = global.Promise
mongoose.connect(dbConfig.dbs, {
  useNewUrlParser: true,
  poolSize: 50,
  autoReconnect: true
})

const fq = 'fq'
amqp.connect('amqp://127.0.0.1').then((conn) => {
  process.on('SIGINT', () => {
    conn.close()
  })
  return conn.createChannel().then(async (ch) => {
    ch.prefetch(1)
    const ackSend = (msg, content) => {
      ch.sendToQueue(msg.properties.replyTo, Buffer.from(content.toString()), {
        connectId: msg.properties.connectId
      })
      ch.ack(msg)
    }
    const reply = async (msg) => {
      const userId = parseInt(msg.content.toString())
      let count = await msgSchema.countDocuments()
      if (count > 100) {
        return ackSend(msg, 'sold out')
      } else {
        let result = await msgSchema.create({userId: userId})
        if (result) {
          return ackSend(msg, 'success, orderid:' + result._id.toString())
        } else {
          console.log('fail');
        }
      }
    }
    // 监听队列并消费
    await ch.assertQueue(fq, {durable: false})
    ch.consume(fq, reply, {noAck: false})
    console.log('wait for message');
  })
})

生产者

/*eslint-disable no-undef */
import Router from 'koa-router'
import amqp from 'amqplib'
import uuid from 'node-uuid'

const router = new Router({
  prefix: '/mq'
})
const fq = 'fq' //前端发送消息队列
const bq = 'bq' //后台回复队列
let conn // mq连接
let userId = 1
let connectId = uuid()

//连接rabbitmq
amqp.connect('amqp://127.0.0.1').then((_conn) => {
  conn = _conn
})
router.get('/', async (ctx) => {
  const number = userId ++
  let ch = await conn.createChannel()
  await ch.assertQueue(bq, {durable: false})
  //ctx.body = 'done'
  ch.sendToQueue(fq, Buffer.from(number.toString()), {replyTo: bq, connectId: connectId})
  ch.consume(bq, (msg) => {
    let result = msg.content.toString()
    console.log(ctx);
    ctx.body = {
      msg: result
    }
    ch.close()
  }, {noAck: false})
})

export default router

你期待的结果是什么?实际看到的错误信息又是什么?

消息队列实现成功,但是调用接口会返回404
请问如何返回正确的信息呢

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

七分※倦醒 2022-09-18 19:02:30

自己回答一下
consume是异步的,所以直接返回404
但是它并不是promise对象,所以直接await是没有用的
解决办法:

const getResult = async () => {
    return new Promise((resolve) => {
        ch.consume(bq, (msg) => {
            ch.close()
            resolve(msg.content.toString())
        })
    })
}
const result = await getResult()
ctx.body = result
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文