返回介绍

自定义传输

发布于 2019-10-12 12:56:37 字数 8581 浏览 1294 评论 0 收藏 0

自定义传输

Nest provides TCP and Redis as a built-in transport methods. It makes prototyping incredibly fast & easy, but sometimes you might want to use another type of transport, e.g. RabbitMQ messaging. Is it possible? Yes, sure.

Nest提供TCPRedis作为内置传输方法。这些方法使得原型化特别容易,特别迅速。但是,有时候你可能想使用其他类型的传输类型,比如RabbitMQ messaging

You can port any transport strategy to Nest. You only have to create a class, which extends Server and implements CustomTransportStrategy interface.

你只需要创建一个类,就可以将任何传输方式应用到Nest中。因为这个类可以扩展服务器并实现CustomTransportStrategy接口。

The Server class provides getHandlers() method, which returns MessagePattern mappings (object, where key is a pattern and value is a callback), while CustomTransportStrategy forces on you to implement both listen() and close() methods.

该服务器类提供getHandlers()方法,该方法返回MessagePattern映射(对象,该对象中,key是模式,值是回调函数),CustomTransportStrategy迫使你实现listen()andclose()方法。

Let's create a simple RabbitMQServer class. We will use ampqlib library.

让我们使用amprlib库创建一个简单的RabbitMQServer类。

import * as amqp from 'amqplib';
import { Server, CustomTransportStrategy } from '@nestjs/microservices';
import { Observable } from 'rxjs/Observable';

export class RabbitMQServer extends Server implements CustomTransportStrategy {
    private server = null;
    private channel = null;

    constructor(
        private readonly host: string,
        private readonly queue: string) {
            super();
        }

    public async listen(callback: () => void) {
        await this.init();
        this.channel.consume(`${this.queue}_sub`, this.handleMessage.bind(this), { noAck: true });
    }

    public close() {
        this.channel && this.channel.close();
        this.server && this.server.close();
    }

    private handleMessage(message) {
        const { content } = message;
        const msg = JSON.parse(content.toString());

        const handlers = this.getHandlers();
        const pattern = JSON.stringify(msg.pattern);
        if (!this.messageHandlers[pattern]) {
            return;
        }

        const handler = this.messageHandlers[pattern];
        const response$ = handler(msg.data) as Observable<any>;
        response$ && this.send(response$, (data) => this.sendMessage(data));
    }

    private sendMessage(message) {
        this.channel.sendToQueue(`${this.queue}_pub`, Buffer.from(JSON.stringify(message)));
    }

    private async init() {
        this.server = await amqp.connect(this.host);
        this.channel = await this.server.createChannel();
        this.channel.assertQueue(`${this.queue}_sub`, { durable: false });
        this.channel.assertQueue(`${this.queue}_pub`, { durable: false });
    }
}

The most interesting method is handleMessage(). Its resposibility is to match pattern with appropriate handler and call it with received data. Also, notice that I used send() method inherited from Server class. You should use it too if you want to avoid e.g. sending disposed message when Observable is completed.

最有趣的方法是handleMessage()。该方法负责用合适的handler匹配模式,并且用接收的数据调用该模式。请注意,我使用的是从服务器类继承的send()方法。你也应该使用该方法避免Observable完成后发送设置信息。

Last step is to set-up our RabbitMQ strategy:

最后一个步骤是设置我们的RabbitMQ方法:

const app = NestFactory.createMicroservice(ApplicationModule, {
    strategy: new RabbitMQServer('amqp://localhost', 'example'),
});

It's everything!

客户端

The RabbitMQ server is listening for messages. Now, we must create a client class, which should extends built-in ClientProxy. We only have to override abstract sendSingleMessage() method.

RabbitMQ服务器监听消息。现在,我们必须创建一个可以扩展内置ClientProxy的客户端类。

Let's create RabbitMQClient class:

让我们来创建一个RabbitMQClient类。

import * as amqp from 'amqplib';
import { ClientProxy } from '@nestjs/microservices';

export class RabbitMQClient extends ClientProxy {
    constructor(
        private readonly host: string,
        private readonly queue: string) {
            super();
        }

    protected async sendSingleMessage(msg, callback: (err, result, disposed?: boolean) => void) {
        const server = await amqp.connect(this.host);
        const channel = await server.createChannel();
        const sub = this.getSubscriberQueue();
        const pub = this.getPublisherQueue();

        channel.assertQueue(sub, { durable: false });
        channel.assertQueue(pub, { durable: false });

        channel.consume(pub, (message) => this.handleMessage(message, server, callback), { noAck: true });
        channel.sendToQueue(sub, Buffer.from(JSON.stringify(msg)));
    }

    private handleMessage(message, server, callback: (err, result, disposed?: boolean) => void) {
        const { content } = message;
        const { err, response, disposed } = JSON.parse(content.toString());
        if (disposed) {
            server.close();
        }
        callback(err, response, disposed);
    }

    private getPublisherQueue(): string {
        return `${this.queue}_pub`;
    }

    private getSubscriberQueue(): string {
        return `${this.queue}_sub`;
    }
}

How to use it? There is nothing special, just create an instance:

该怎么使用它呢?只需要创建一个实例即可。

export class ClientController {
    private readonly client = new RabbitMQClient('amqp://localhost', 'example');
}

The rest work equivalently (use send() method).

剩余的跟(use send() 方法)相同。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文